首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

多进程或多线程,数据共享处理方案-python实现

单进程用得不少,带着单进程的方案来到多进程看一看

1.多进程的例子

#!/usr/bin/env python

# encoding: utf-8

"""

@version: version 2.6

@author: fudelin

@license: algo Licence

@file: pymain.py

@time: 2018/1/18 11:33

@spec:

"""

import queue

from multiprocessing import Pipe,Process,Pool,Lock

import time,os

import datetime

adata = 12

def run_proc(name):

time.sleep(1)

print('Run child process %s (%s)' % (name, os.getpid()))

adata = 99

print(adata)

if __name__ =='__main__':

print('Run the main process (%s).' % (os.getpid()))

mainStart = time.time() # 记录主进程开始的时间

p = Pool(3) # 开辟进程池

for i in range(3): # 开辟16个进程

p.apply_async(run_proc, args=('Process' + str(i),)) # 每个进程都调用run_proc函数,

# args表示给该函数传递的参数。

print('Waiting for all subprocesses done ...')

p.close() # 关闭进程池

p.join() # 等待开辟的所有进程执行完后,主进程才继续往下执行

print('All subprocesses done')

mainEnd = time.time() # 记录主进程结束时间

print('All process ran %0.2f seconds.'% (mainEnd - mainStart)) # 主进程执行时间

print(adata)

最后全局变量:adata = 12,很明显不按我们的预期进行[请看进程id是3个不相同的编号]

2.多线程的例子

import queue

from multiprocessing import Pipe,Process,Pool,Lock

import time,os

import datetime

import threading

adata = 12

def run_proc(name):

global adata

time.sleep(1)

print('%s (%s)' % (name, os.getpid()))

print(adata)

adata = 99

if __name__ =='__main__':

print('Run the main process (%s).' % (os.getpid()))

t1 = threading.Thread(target=run_proc, args=('fudelin1',))

t2 = threading.Thread(target=run_proc, args=('fudelin2',))

t1.start()

t2.start()

t1.join()

t2.join()

print('-----------------------------------------------')

print(adata)

朝着预期方向进行[请看进程id都是相同的进程编号 ]

3.多进程如何交互信息

Queue,Pipe,socket来实现进程间的通信

4.用multiprocess的pool池时,执行线程任务时抛出异常的行为不一样

5.强大的manage

6.分布式进程

参考2:

https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431929340191970154d52b9d484b88a7b343708fcc60000#0

task_master.py

#!/usr/bin/env python3

# -*- coding: UTF-8 -*-

"""

版本:2.0

项目:pycharm

模块名称:task_master.py

模块功能说明:

创建时间:2018-09-22 8:08

修改历史:

1.2018-09-22 8:08

"""

__author__ = "fudelin"

# export this module func,var for oth use

# __all__ = ['']

import random, time, queue

from multiprocessing.managers import BaseManager

from multiprocessing import freeze_support

#send queue 发送队列

task_queue = queue.Queue()

#receiver queue 接收队列

result_queue = queue.Queue()

class QueueManager(BaseManager):

pass

#注册2个queue到网络上 使用callable和匿名函数关联了Queue对象

'''仅适用Linux,Windows下callable不能使用lambda表达式赋值

QueueManager.register('get_task_queue', callable=lambda: task_queue)

QueueManager.register('get_result_queue', callable=lambda: result_queue)

'''

def return_task_queue():

global task_queue

return task_queue

def return_result_queue():

global result_queue

return result_queue

def runf():

QueueManager.register('get_task_queue', callable=return_task_queue)

QueueManager.register('get_result_queue', callable=return_result_queue)

#绑定端口5000,设置验证密码'abc'

manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')

#Linux下address留空等于本机 Windows下不能留空 127.0.0.0即本机的地址

#启动Queue

manager.start()

#通过网络获取Queue对象

task = manager.get_task_queue()

result = manager.get_result_queue()

#开启示例任务

for i in range(60):

time.sleep(0.1)

n = i # random.randint(0, 10000)

print('Put task %d to the server(127.0.0.1),task raw data = %d' %(i,n))

task.put(n)

#读取任务结果

print('Try to get results...')

for i in range(60):

time.sleep(0.03)

r = result.get(timeout=10)

print('task=%d,Results=%s' %(i,r))

manager.shutdown()

print('master has been shoutdown')

if __name__ == '__main__':

freeze_support()

runf()

task_worker.py

#!/usr/bin/env python3

# -*- coding: UTF-8 -*-

"""

版本:2.0

项目:pycharm

模块名称:task_worker.py

模块功能说明:

创建时间:2018-09-22 8:13

修改历史:

1.2018-09-22 8:13

"""

__author__ = "fudelin"

# export this module func,var for oth use

# __all__ = ['']

import time, sys, queue

from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:

class QueueManager(BaseManager):

pass

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:

QueueManager.register('get_task_queue')

QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行task_master.py的机器:

server_addr = '127.0.0.1'

print('pc1 Connect to server %s...' % server_addr)

# 端口和验证码注意保持与task_master.py设置的完全一致:

m = QueueManager(address=(server_addr, 5000), authkey=b'abc')

# 从网络连接:

m.connect()

# 获取Queue的对象:

task = m.get_task_queue()

result = m.get_result_queue()

# 从task队列取任务,并把结果写入result队列:

for i in range(30):

try:

n = task.get(timeout=1)

print('pc1 get task %d ,raw data = %d...' % (i, n))

r = '%d * %d = %d' % (n, n, n*n)

time.sleep(0.1)

print('pc1 put the result %d*%d to server(127.0.0.1)' % (n, n))

result.put(r)

except queue.Empty:

print('task queue is empty.')

# 处理结束:

print('worker exit.')

task_worker2.py

#!/usr/bin/env python3

# -*- coding: UTF-8 -*-

"""

版本:2.0

项目:pycharm

模块名称:task_worker2.py

模块功能说明:

创建时间:2018-09-22 8:22

修改历史:

1.2018-09-22 8:22

"""

__author__ = "fudelin"

# export this module func,var for oth use

# __all__ = ['']

import time, sys, queue

from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:

class QueueManager(BaseManager):

pass

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:

QueueManager.register('get_task_queue')

QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行task_master.py的机器:

server_addr = '127.0.0.1'

print('pc2 Connect to server %s...' % server_addr)

# 端口和验证码注意保持与task_master.py设置的完全一致:

m = QueueManager(address=(server_addr, 5000), authkey=b'abc')

# 从网络连接:

m.connect()

# 获取Queue的对象:

task = m.get_task_queue()

result = m.get_result_queue()

# 从task队列取任务,并把结果写入result队列:

for i in range(30):

try:

n = task.get(timeout=1)

print('pc2 get task %d ,raw data = %d...' % (i, n))

r = '%d * %d = %d' % (n, n, n*n)

time.sleep(0.1)

print('pc2 put the result %d*%d to server(127.0.0.1)'%(n,n))

result.put(r)

except queue.Empty:

print('task queue is empty.')

# 处理结束:

print('worker exit.')

结果:

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180922G0EXK300?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券