单进程用得不少,带着单进程的方案来到多进程看一看
1.多进程的例子
#!/usr/bin/env python
# encoding: utf-8
"""
@version: version 2.6
@author: fudelin
@license: algo Licence
@file: pymain.py
@time: 2018/1/18 11:33
@spec:
"""
import queue
from multiprocessing import Pipe,Process,Pool,Lock
import time,os
import datetime
adata = 12
def run_proc(name):
time.sleep(1)
print('Run child process %s (%s)' % (name, os.getpid()))
adata = 99
print(adata)
if __name__ =='__main__':
print('Run the main process (%s).' % (os.getpid()))
mainStart = time.time() # 记录主进程开始的时间
p = Pool(3) # 开辟进程池
for i in range(3): # 开辟16个进程
p.apply_async(run_proc, args=('Process' + str(i),)) # 每个进程都调用run_proc函数,
# args表示给该函数传递的参数。
print('Waiting for all subprocesses done ...')
p.close() # 关闭进程池
p.join() # 等待开辟的所有进程执行完后,主进程才继续往下执行
print('All subprocesses done')
mainEnd = time.time() # 记录主进程结束时间
print('All process ran %0.2f seconds.'% (mainEnd - mainStart)) # 主进程执行时间
print(adata)
最后全局变量:adata = 12,很明显不按我们的预期进行[请看进程id是3个不相同的编号]
2.多线程的例子
import queue
from multiprocessing import Pipe,Process,Pool,Lock
import time,os
import datetime
import threading
adata = 12
def run_proc(name):
global adata
time.sleep(1)
print('%s (%s)' % (name, os.getpid()))
print(adata)
adata = 99
if __name__ =='__main__':
print('Run the main process (%s).' % (os.getpid()))
t1 = threading.Thread(target=run_proc, args=('fudelin1',))
t2 = threading.Thread(target=run_proc, args=('fudelin2',))
t1.start()
t2.start()
t1.join()
t2.join()
print('-----------------------------------------------')
print(adata)
朝着预期方向进行[请看进程id都是相同的进程编号 ]
3.多进程如何交互信息
Queue,Pipe,socket来实现进程间的通信
4.用multiprocess的pool池时,执行线程任务时抛出异常的行为不一样
5.强大的manage
6.分布式进程
参考2:
https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431929340191970154d52b9d484b88a7b343708fcc60000#0
task_master.py
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
版本:2.0
项目:pycharm
模块名称:task_master.py
模块功能说明:
创建时间:2018-09-22 8:08
修改历史:
1.2018-09-22 8:08
"""
__author__ = "fudelin"
# export this module func,var for oth use
# __all__ = ['']
import random, time, queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
#send queue 发送队列
task_queue = queue.Queue()
#receiver queue 接收队列
result_queue = queue.Queue()
class QueueManager(BaseManager):
pass
#注册2个queue到网络上 使用callable和匿名函数关联了Queue对象
'''仅适用Linux,Windows下callable不能使用lambda表达式赋值
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
'''
def return_task_queue():
global task_queue
return task_queue
def return_result_queue():
global result_queue
return result_queue
def runf():
QueueManager.register('get_task_queue', callable=return_task_queue)
QueueManager.register('get_result_queue', callable=return_result_queue)
#绑定端口5000,设置验证密码'abc'
manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
#Linux下address留空等于本机 Windows下不能留空 127.0.0.0即本机的地址
#启动Queue
manager.start()
#通过网络获取Queue对象
task = manager.get_task_queue()
result = manager.get_result_queue()
#开启示例任务
for i in range(60):
time.sleep(0.1)
n = i # random.randint(0, 10000)
print('Put task %d to the server(127.0.0.1),task raw data = %d' %(i,n))
task.put(n)
#读取任务结果
print('Try to get results...')
for i in range(60):
time.sleep(0.03)
r = result.get(timeout=10)
print('task=%d,Results=%s' %(i,r))
manager.shutdown()
print('master has been shoutdown')
if __name__ == '__main__':
freeze_support()
runf()
task_worker.py
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
版本:2.0
项目:pycharm
模块名称:task_worker.py
模块功能说明:
创建时间:2018-09-22 8:13
修改历史:
1.2018-09-22 8:13
"""
__author__ = "fudelin"
# export this module func,var for oth use
# __all__ = ['']
import time, sys, queue
from multiprocessing.managers import BaseManager
# 创建类似的QueueManager:
class QueueManager(BaseManager):
pass
# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('pc1 Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(30):
try:
n = task.get(timeout=1)
print('pc1 get task %d ,raw data = %d...' % (i, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(0.1)
print('pc1 put the result %d*%d to server(127.0.0.1)' % (n, n))
result.put(r)
except queue.Empty:
print('task queue is empty.')
# 处理结束:
print('worker exit.')
task_worker2.py
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
版本:2.0
项目:pycharm
模块名称:task_worker2.py
模块功能说明:
创建时间:2018-09-22 8:22
修改历史:
1.2018-09-22 8:22
"""
__author__ = "fudelin"
# export this module func,var for oth use
# __all__ = ['']
import time, sys, queue
from multiprocessing.managers import BaseManager
# 创建类似的QueueManager:
class QueueManager(BaseManager):
pass
# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('pc2 Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(30):
try:
n = task.get(timeout=1)
print('pc2 get task %d ,raw data = %d...' % (i, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(0.1)
print('pc2 put the result %d*%d to server(127.0.0.1)'%(n,n))
result.put(r)
except queue.Empty:
print('task queue is empty.')
# 处理结束:
print('worker exit.')
结果:
领取专属 10元无门槛券
私享最新 技术干货