博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【FastDFS分布式文件系统之二】:FastDFS小文件上传性能测试及Python客户端上传操作
阅读量:1973 次
发布时间:2019-04-27

本文共 7083 字,大约阅读时间需要 23 分钟。

  由于要对比swift上传小文件以及fdfs上传小文件的性能,故做性能测试。

1.1 测试环境:

FastDFS集群的搭建方法:【FastDFS分布式文件系统之一】:搭建、部署、配置
tracker server1:node2
tracker server2:node3
group1:node4 / node5 / node6
group2:node7 / node8 / node9
client: node1
use_trunk_file = true(开启chunk存储模式)
replica = 3
1.2 机器参数
CPU:
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 24
On-line CPU(s) list: 0-23
Thread(s) per core: 2
Core(s) per socket: 6
Socket(s): 2
NUMA node(s): 2
Vendor ID: GenuineIntel
CPU family: 6
Model: 62
Stepping: 4
CPU MHz: 2100.180
BogoMIPS: 4199.42
Virtualization: VT-x
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 15360K
NUMA node0 CPU(s): 0-5,12-17
NUMA node1 CPU(s): 6-11,18-23
内存:
126G
硬盘:
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
sda 8:0 0 200G 0 disk 
├─sda1 8:1 0 500M 0 part /boot
├─sda2 8:2 0 4G 0 part [SWAP]
└─sda3 8:3 0 195.5G 0 part /

sdb 8:16 0 6.4T 0 disk /mnt/xfsd

1.3 测试方法:

文件生成分为两种:1.随机生成1~100KB之间大小的文件;2.全部大小都为133KB大小的文件。

文件生成程序:

#!/usr/bin/pythonfrom random import randintimport os data_dir = os.sys.argv[1]n = int(os.sys.argv[2]) if not os.path.exists(data_dir):    os.makedirs(data_dir) for x in range(0, n):    with open("%s/file_%d" % (data_dir, x), 'wb') as fout:        fout.write(os.urandom(1024 * randint(80, 180)))

python中os.urandom(n)的作用:随机产生n个字节的字符串。

通过fastdfs-python-sdk: 编写上传测试文件,文件上传分为串行和并行两种方式:

串行上传:对若干个文件依次调用上传接口,直到完成所有文件上传为止。

并行上传:启动多个进程同时上传文件,每个进程上传多个文件。

串行测试脚本:

#!/usr/local/bin/python2.7import osimport timeimport sys from multiprocessing import Processtry:    from fdfs_client.client import *    from fdfs_client.exception import *except ImportError:    import_path = os.path.abspath('../')    sys.path.append(import_path)    from fdfs_client.client import *    from fdfs_client.exceptions import *#size_total = 0if __name__ == '__main__':    starttime = time.time()    filenumbers = 100000 #number of processes                                                                                                                                                             client = Fdfs_client('/opt/fdfs_client-py/fdfs_client/client.conf')    try:        for i in range(filenumbers):            filename = '/data/files/small/smallfile' + str(i)            client.upload_by_filename(filename)    except Exception,e:        print "error" + str(e)    endtime = time.time()     #print "%d byte has been stored into the fdfs." % size_total    print "%f seconds for sequence processing computation." % ( endtime - starttime )    #print size_total    #print "speed is %f KB/s" % size_total/1024/(endtime-starttime)
并行测试脚本:

#!/usr/local/bin/python2.7                                                                                                                   import osimport timeimport sys import multiprocessingfrom multiprocessing import Processtry:    from fdfs_client.client import *    from fdfs_client.exception import *except ImportError:    import_path = os.path.abspath('../')    sys.path.append(import_path)    from fdfs_client.client import *    from fdfs_client.exceptions import * client = Fdfs_client('/opt/fastdfs/fdfs_client-py/fdfs_client/client.conf')  def uploadfile(begin,end,t_time,t_count,t_size,lock):    try:        for idx in range(begin,end):            filename = '/data/files/small-10w/smallfile'+str(idx)            for y in range(5):                starttime = time.time()                ret = client.upload_by_filename(filename)                endtime = time.time()                if(ret['Status'] != 'Upload successed.'):                    os.system('echo upload fail >> log')                else:                    os.system('echo upload success >> log')                #    print ret['Status']                with lock:                    t_count.value += 1                    t_time.value += endtime - starttime                    t_size.value += os.path.getsize(filename)                except Exception,e:        print "error" + str(e) if __name__ == '__main__':    process = []     nprocess = int(os.sys.argv[1])    file_per_process = 100000/nprocess		lock = multiprocessing.Lock()     total_time = multiprocessing.Value('f',0.0)    total_count = multiprocessing.Value('i',0)    total_size = multiprocessing.Value('f',0.0)     for i in range(nprocess):        process.append( Process(target=uploadfile,args=(i * file_per_process , (i+1) * file_per_process, total_time,total_count,total_size,lock)))     for p in process:        p.start()     for p in process:        p.join()     print "%f seconds for multiprocessing computation." % total_time.value    print "%d total count." % total_count.value    print "%f total size." % total_size.value    os.system("wc -l log")

2.测试结果

串行上传(文件大小80KB~180KB之间,平均文件大小130KB):

 

上传文件总个数(KB)
上传文件总大小(KB)
平均速度(MB/s)
平均每个文件上传所用时间(ms)
上传失败次数
1000
130530
21.28 5.97
0
1000
130530
22.60
5.62
0
10000
1294566
22.94
5.53
0
10000
1294566
23.11
5.49
0
100000
13018299
21.05
6.03
0
100000
13018299
22.06
5.75
0
并行上传(文件大小80KB~180KB之间,平均文件大小130KB):
并发数
上传文件总个数 平均每个文件上传所用时间(ms) 上传失败次数
100 500000 14.62 0
200
500000
17.18
0
250
500000
22.19
0
400
500000
30.62
0
500
500000
28.55
0
800
500000
27.17
0
1000
500000
42.64
0

 

Swift上传性能:

上传500000个对象到Swift中

并发数

上传文件总个数

平均每个文件上传所用时间(ms)

上传失败百分比

100

500000

78.91

0

200

500000

144.27

0

250

500000

157.63

5.69%

400

195610

171.22

60.88%

500

193629

136.09

61.27%

3.结论

  • 速度方面,FastDFS在高并发的情况下上传小文件所用时间要比Swift小很多。
  • 稳定性方面:在高并发的情况下,FastDFS上传失败次数为0次,比Swift上传失败次数少。
4.Python并行
       起初想用多线程来进行几十万次的并发上传,以为线程相对轻量,占用资源少,那么最终统计的上传时间会比较少,其实不然,多线程模拟并发上传比多进程要花更大的时间,原因跟python所谓的GIL(Global Interpreter Lock)全局解释锁有关。具体它是什么可以参考一篇文章:
给出一个让人困惑的结论:不要使用多线程,请使用多进程。那么就简单讲一下python multiprocessing。
一个错误的例子:
import timefrom multiprocessing import Process, Valuedef func(val):    for i in range(50):        time.sleep(0.01)        val.value += 1if __name__ == '__main__':    v = Value('i', 0)    procs = [Process(target=func, args=(v,)) for i in range(10)]    for p in procs: p.start()    for p in procs: p.join()    print v.value
多进程实现很简单,使用Process,然后传入目标函数以及参数,start()方法启动进程join()方法等待所有进程结束之后主进程再结束,其中v是通过multiprocessing.Value定义的变量,是进程之间共享的变量。那么我们期望最终得到的v.value会是500,但是结果却是比500少的数字,原因就是没有加锁,在进程竞争资源的情况下没有lock住共享变量。那么如何加锁?
方法一:
import timefrom multiprocessing import Process, Value, Lockdef func(val, lock):    for i in range(50):        time.sleep(0.01)        with lock:            val.value += 1if __name__ == '__main__':    v = Value('i', 0)    lock = Lock()    procs = [Process(target=func, args=(v, lock)) for i in range(10)]    for p in procs: p.start()    for p in procs: p.join()    print v.value
方法二:
import timefrom multiprocessing import Process, Value, Lockdef func(val, lock):    for i in range(50):        time.sleep(0.01)        lock.acquire()        val.value += 1	lock.release()if __name__ == '__main__':    v = Value('i', 0)    lock = Lock()    procs = [Process(target=func, args=(v, lock)) for i in range(10)]    for p in procs: p.start()    for p in procs: p.join()    print v.value
两篇参考文章:
1.Shared counter with Python's Multiprocessing:
2.python进程间通信:

Author:忆之独秀

Email:leaguenew@qq.com

注明出处:http://blog.csdn.net/lavorange/article/details/50829552

你可能感兴趣的文章
微服务springcloud2系列篇-集成分布式事务seata1.4.1(数据库mysql8)与nacos1.4.1注册配置(WINDOWS环境)
查看>>
数据库系列篇-多数据源集成+springboot+数据库连接池druid(使用注解切换方式) (方便的直接切换方式待更新)
查看>>
数据库系列篇mysql8-分库分表中间件mycat(WINDOWS环境)
查看>>
用户权限设计-基于RBAC模型
查看>>
微服务springcloud2系列篇-网关GATEWAY跨域问题
查看>>
微服务springcloud(H版本)与springboot(2.X版本) maven常见问题整理记录
查看>>
Java并发以及多线程基础
查看>>
软件质量的8个特性
查看>>
应届渣渣前端的艰难求职之路
查看>>
2021年不可错过的17种JS优化技巧(一)
查看>>
2021年不可错过的17种JS优化技巧(二)
查看>>
月薪15~20k的前端面试问什么?
查看>>
一文学会使用Vue3
查看>>
我是如何看Vue源码的
查看>>
在 Vue 中用 Axios 异步请求API
查看>>
LVM逻辑卷------基础命令详解(三分钟入门)
查看>>
LVM逻辑卷------详细操作过程(三分钟上手)
查看>>
mysql——介绍及安装与基本用法
查看>>
MySQL数据库之索引
查看>>
MYSQL——事务操作+视图+存储引擎
查看>>