前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python 多线程并发程序设计与分析

Python 多线程并发程序设计与分析

作者头像
授客
发布2019-09-11 15:31:08
4820
发布2019-09-11 15:31:08
举报
文章被收录于专栏:授客的专栏

1.技术难点分析与总结

难点1:线程运行时,运行顺序不固定

难点2:同一段代码,再不加锁的情况下,可能被多个线程同时执行,这会造成很多麻烦,比如变量的赋值不正确,方法的重复调用,而如果加锁,或者通过join阻塞方式等来控制,那么又如同运行单进程,效率低下,达不到,“并发”,“高速”的效果。

难点3:不通过join阻塞等方式,主线程可能会优先于子线程退出,这也会导致问题,比如子线程还在用文件句柄,主线程就把文件关闭了。

解决方法:

1、考虑为线程类添加变量属性,这样一来,每个线程都拥有自己的变量,互不影响,比如下面例子中用到的run_times

2、线程公用的一些变量,也可以考虑通过线程类的变量属性传递,比如下面例子中多线程用到的文件句柄file_handler

3、必要时,关键代码可以考虑枷锁Lock、RLock,具体自己看官方文档,比如下方的文件写入,不同线程可能会在同一行写入数据,导致数据统计时不准确,所以加锁,如果出于速度考虑,可以考虑分别给每个线程传递属于自己的文件句柄,写入不同的文件,

4、清理工作,关于这个,需要知道2点:

1)main线程退出时,不会kill非守护线程,但是会kill守护线程

2)通常,子线程start()后会去调用run方法,运行完run方法,子线程停止执行,不会继续运行之后的代码。

所以,通常我们可以这么做,获取当前活动线程数,如果线程数为1,则说明子线程都运行完,可以继续后面的代码清理工作,否则继续循环检测,这里还可以加代码优化,比如每隔一段时间检测一次,以免主线程浪费系统资源

# 利用主线程执行清理工作 current_active_thread_num = len(threading.enumerate()) # 获取当前活动线程数量 while current_active_thread_num != 1: time.sleep(10) # 每10秒检测一次 current_active_thread_num = len(threading.enumerate())

2.代码实践

requestpy.py

#!/usr/bin/env python # -*- coding:utf-8 -*- __author__ = 'shouke' import urllib.request import json import sys import threading from collections import Counter import time import datetime

class SubThread(threading.Thread): mutex_lock = threading.RLock() def __init__(self, file_handler): self.file_handler = file_handler self.run_times = 0 # 记录每个线程的运行次数 threading.Thread.__init__(self) def run(self): while self.run_times <</span> int(sys.argv[2]): url = 'http://xxxxxx/xxxxxcard/kq/codepool/test/' request = urllib.request.Request(url, method='POST') try: response = urllib.request.urlopen(request) response_body = response.read() response_body = response_body.decode('utf-8') response_body = json.loads(response_body) # 写入文件 SubThread.mutex_lock.acquire() self.file_handler.write(str(response_body['code'])) self.file_handler.write('\n') SubThread.mutex_lock.release()

self.run_times = self.run_times + 1 # 记录每个线程的运行次数 print('已经执行%s次请求' % str(self.run_times)) except Exception as e: print('请求错误%s' % e) def analyze(test_result_data): list_data = [] # 存放目标数据 total_line_count = 0 # 读取的文本行数 abnormal_line = 0 # 存放异常数据行数 digit_line = 0 # 存放正确数据函数 with open(test_result_data, 'r') as file: line = file.readline() while line: line = line.strip('\n') if line.isdigit() and len(line) == 12: list_data.append(int(line)) digit_line = digit_line + 1 else: abnormal_line = abnormal_line + 1 print('服务器返回数据异常') line = file.readline() total_line_count = total_line_count + 1 print('读取的总行数:%s' % str(total_line_count)) print('数据正确的行数:%s' % str(digit_line)) print('数据异常的行数:%s' % str(abnormal_line)) # 分析是否存在重复数据 set_data = set(list_data) if len(set_data) == len(list_data): print('不存在重复数据, 总数:%s 条' % len(list_data)) else: print('有重复数据,重复数据:%s条' % (len(list_data) - len(set_data))) if __name__ == '__main__': start_time = datetime.datetime.now()

test_result_data = 'd:\\test_result_data.txt' file = open(test_result_data, 'w') # 存储服务器返回数据 threads_pool = [] # 线程池,存放线程对象 thread_num = 0 # 记录创建的线程数量 while thread_num <</span> int(sys.argv[1]): thread_obj = SubThread(file) threads_pool.append(thread_obj) thread_num = thread_num + 1 for thread in threads_pool: thread.start() # 利用主线程执行清理工作 current_active_thread_num = len(threading.enumerate()) # 获取当前活动线程数量 while current_active_thread_num != 1: time.sleep(10) current_active_thread_num = len(threading.enumerate())

# 清理工作 try: file.close() except Exception as e: print('关闭文件出错%s' % e)

end_time = datetime.datetime.now() print('运行耗时:',end_time - start_time)

# 分析数据 analyze(test_result_data) 运行(禁用time.sleep函数的情况下):

100个线程,每个线程运行50次,总的运行 5000次

python requestpy.py 100 50

Python <wbr>多线程并发程序设计与分析-Part1
Python <wbr>多线程并发程序设计与分析-Part1

修改程序如下 class SubThread(threading.Thread): def __init__(self, file_handler): self.file_handler = file_handler self.run_times = 0 # 记录每个线程的运行次数 threading.Thread.__init__(self) def run(self): while self.run_times < int(sys.argv[2]): url = 'http://xxxxxx/xxxxxcard/kq/codepool/test/' request = urllib.request.Request(url, method='POST') try: response = urllib.request.urlopen(request) response_body = response.read() response_body = response_body.decode('utf-8') response_body = json.loads(response_body) # 写入文件 self.file_handler.write(str(response_body['code'])) self.file_handler.write('\n') self.run_times = self.run_times + 1 # 记录每个线程的运行次数 print('已经执行%s次请求' % str(self.run_times)) except Exception as e: print('请求错误%s' % e) def analyze(test_result_file_list): list_data = [] # 存放目标数据 total_line_count = 0 # 读取的文本行数 abnormal_line = 0 # 存放异常数据行数 digit_line = 0 # 存放正确数据函数 for file in test_result_file_list: with open(file, 'r') as file: line = file.readline() while line: line = line.strip('\n') if line.isdigit() and len(line) == 12: list_data.append(int(line)) digit_line = digit_line + 1 else: abnormal_line = abnormal_line + 1 print('服务器返回数据异常') line = file.readline() total_line_count = total_line_count + 1 print('读取的总行数:%s' % str(total_line_count)) print('数据正确的行数:%s' % str(digit_line)) print('数据异常的行数:%s' % str(abnormal_line)) # 分析是否存在重复数据 set_data = set(list_data) if len(set_data) == len(list_data): print('不存在重复数据, 总数:%s 条' % len(list_data)) else: print('有重复数据,重复数据:%s条' % (len(list_data) - len(set_data))) # 获取重复数据 filehaneder = open('d:\\repeat_data.txt', 'w') c = Counter(list_data) for item in c.items(): if item[1] > 1: print('重复数据:%s' % item[0]) filehaneder.write(str(item[0])) filehaneder.write('\n') filehaneder.close() if __name__ == '__main__': start_time = datetime.datetime.now() base_filename = 'test_result_data' base_dirname = 'd:\\result\\' test_result_file_list = [] # 存储结果数据文件名 sub_thread_file_list = [] # 每个线程的文件句柄 threads_pool = [] # 线程池,存放线程对象 thread_num = 0 # 记录创建的线程数量 while thread_num < int(sys.argv[1]): filename = base_dirname + base_filename + str(thread_num + 1) + '.txt' test_result_file_list.append(filename) file = open(filename, 'w') sub_thread_file_list.append(file) thread_obj = SubThread(file) threads_pool.append(thread_obj) thread_num = thread_num + 1 for thread in threads_pool: thread.start() # # 利用主线程执行清理工作 current_active_thread_num = len(threading.enumerate()) # 获取当前活动线程数量 while current_active_thread_num != 1: time.sleep(300) current_active_thread_num = len(threading.enumerate()) # 清理工作 try: for file in sub_thread_file_list: file.close() except Exception as e: print('关闭文件出错%s' % e)

end_time = datetime.datetime.now() print('运行耗时:',end_time - start_time)

# 分析数据 analyze(test_result_file_list)

运行结果:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017-03-10 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.技术难点分析与总结
  • 2.代码实践
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档