本文共 7083 字,大约阅读时间需要 23 分钟。
由于要对比swift上传小文件以及fdfs上传小文件的性能,故做性能测试。
1.1 测试环境:
FastDFS集群的搭建方法:【FastDFS分布式文件系统之一】:搭建、部署、配置 tracker server1:node2 tracker server2:node3 group1:node4 / node5 / node6 group2:node7 / node8 / node9 client: node1 use_trunk_file = true(开启chunk存储模式) replica = 3 1.2 机器参数 CPU: Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 24 On-line CPU(s) list: 0-23 Thread(s) per core: 2 Core(s) per socket: 6 Socket(s): 2 NUMA node(s): 2 Vendor ID: GenuineIntel CPU family: 6 Model: 62 Stepping: 4 CPU MHz: 2100.180 BogoMIPS: 4199.42 Virtualization: VT-x L1d cache: 32K L1i cache: 32K L2 cache: 256K L3 cache: 15360K NUMA node0 CPU(s): 0-5,12-17 NUMA node1 CPU(s): 6-11,18-23 内存: 126G 硬盘: NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT sda 8:0 0 200G 0 disk ├─sda1 8:1 0 500M 0 part /boot ├─sda2 8:2 0 4G 0 part [SWAP] └─sda3 8:3 0 195.5G 0 part /sdb 8:16 0 6.4T 0 disk /mnt/xfsd
文件生成分为两种:1.随机生成1~100KB之间大小的文件;2.全部大小都为133KB大小的文件。
文件生成程序:
#!/usr/bin/pythonfrom random import randintimport os data_dir = os.sys.argv[1]n = int(os.sys.argv[2]) if not os.path.exists(data_dir): os.makedirs(data_dir) for x in range(0, n): with open("%s/file_%d" % (data_dir, x), 'wb') as fout: fout.write(os.urandom(1024 * randint(80, 180)))
python中os.urandom(n)的作用:随机产生n个字节的字符串。
通过fastdfs-python-sdk: 编写上传测试文件,文件上传分为串行和并行两种方式:
串行上传:对若干个文件依次调用上传接口,直到完成所有文件上传为止。
并行上传:启动多个进程同时上传文件,每个进程上传多个文件。
串行测试脚本:
#!/usr/local/bin/python2.7import osimport timeimport sys from multiprocessing import Processtry: from fdfs_client.client import * from fdfs_client.exception import *except ImportError: import_path = os.path.abspath('../') sys.path.append(import_path) from fdfs_client.client import * from fdfs_client.exceptions import *#size_total = 0if __name__ == '__main__': starttime = time.time() filenumbers = 100000 #number of processes client = Fdfs_client('/opt/fdfs_client-py/fdfs_client/client.conf') try: for i in range(filenumbers): filename = '/data/files/small/smallfile' + str(i) client.upload_by_filename(filename) except Exception,e: print "error" + str(e) endtime = time.time() #print "%d byte has been stored into the fdfs." % size_total print "%f seconds for sequence processing computation." % ( endtime - starttime ) #print size_total #print "speed is %f KB/s" % size_total/1024/(endtime-starttime)并行测试脚本:
#!/usr/local/bin/python2.7 import osimport timeimport sys import multiprocessingfrom multiprocessing import Processtry: from fdfs_client.client import * from fdfs_client.exception import *except ImportError: import_path = os.path.abspath('../') sys.path.append(import_path) from fdfs_client.client import * from fdfs_client.exceptions import * client = Fdfs_client('/opt/fastdfs/fdfs_client-py/fdfs_client/client.conf') def uploadfile(begin,end,t_time,t_count,t_size,lock): try: for idx in range(begin,end): filename = '/data/files/small-10w/smallfile'+str(idx) for y in range(5): starttime = time.time() ret = client.upload_by_filename(filename) endtime = time.time() if(ret['Status'] != 'Upload successed.'): os.system('echo upload fail >> log') else: os.system('echo upload success >> log') # print ret['Status'] with lock: t_count.value += 1 t_time.value += endtime - starttime t_size.value += os.path.getsize(filename) except Exception,e: print "error" + str(e) if __name__ == '__main__': process = [] nprocess = int(os.sys.argv[1]) file_per_process = 100000/nprocess lock = multiprocessing.Lock() total_time = multiprocessing.Value('f',0.0) total_count = multiprocessing.Value('i',0) total_size = multiprocessing.Value('f',0.0) for i in range(nprocess): process.append( Process(target=uploadfile,args=(i * file_per_process , (i+1) * file_per_process, total_time,total_count,total_size,lock))) for p in process: p.start() for p in process: p.join() print "%f seconds for multiprocessing computation." % total_time.value print "%d total count." % total_count.value print "%f total size." % total_size.value os.system("wc -l log")
上传文件总个数(KB) | 上传文件总大小(KB) | 平均速度(MB/s) | 平均每个文件上传所用时间(ms) | 上传失败次数 |
1000 | 130530 | 21.28 | 5.97 | 0 |
1000 | 130530 | 22.60 | 5.62 | 0 |
10000 | 1294566 | 22.94 | 5.53 | 0 |
10000 | 1294566 | 23.11 | 5.49 | 0 |
100000 | 13018299 | 21.05 | 6.03 | 0 |
100000 | 13018299 | 22.06 | 5.75 | 0 |
并发数 | 上传文件总个数 | 平均每个文件上传所用时间(ms) | 上传失败次数 |
100 | 500000 | 14.62 | 0 |
200 | 500000 | 17.18 | 0 |
250 | 500000 | 22.19 | 0 |
400 | 500000 | 30.62 | 0 |
500 | 500000 | 28.55 | 0 |
800 | 500000 | 27.17 | 0 |
1000 | 500000 | 42.64 | 0 |
Swift上传性能:
上传500000个对象到Swift中
并发数 | 上传文件总个数 | 平均每个文件上传所用时间(ms) | 上传失败百分比 |
100 | 500000 | 78.91 | 0 |
200 | 500000 | 144.27 | 0 |
250 | 500000 | 157.63 | 5.69% |
400 | 195610 | 171.22 | 60.88% |
500 | 193629 | 136.09 | 61.27% |
import timefrom multiprocessing import Process, Valuedef func(val): for i in range(50): time.sleep(0.01) val.value += 1if __name__ == '__main__': v = Value('i', 0) procs = [Process(target=func, args=(v,)) for i in range(10)] for p in procs: p.start() for p in procs: p.join() print v.value多进程实现很简单,使用Process,然后传入目标函数以及参数,start()方法启动进程join()方法等待所有进程结束之后主进程再结束,其中v是通过multiprocessing.Value定义的变量,是进程之间共享的变量。那么我们期望最终得到的v.value会是500,但是结果却是比500少的数字,原因就是没有加锁,在进程竞争资源的情况下没有lock住共享变量。那么如何加锁?
import timefrom multiprocessing import Process, Value, Lockdef func(val, lock): for i in range(50): time.sleep(0.01) with lock: val.value += 1if __name__ == '__main__': v = Value('i', 0) lock = Lock() procs = [Process(target=func, args=(v, lock)) for i in range(10)] for p in procs: p.start() for p in procs: p.join() print v.value方法二:
import timefrom multiprocessing import Process, Value, Lockdef func(val, lock): for i in range(50): time.sleep(0.01) lock.acquire() val.value += 1 lock.release()if __name__ == '__main__': v = Value('i', 0) lock = Lock() procs = [Process(target=func, args=(v, lock)) for i in range(10)] for p in procs: p.start() for p in procs: p.join() print v.value两篇参考文章:
Author:忆之独秀
Email:leaguenew@qq.com
注明出处:http://blog.csdn.net/lavorange/article/details/50829552