Python3 与 C# 并发编程之~ 进程篇中

上节课:Python3 与 C# 并发编程之~ 进程篇上

接着上面继续拓展,补充说说获取函数返回值。 上面是通过成功后的回调函数来获取返回值,这次说说自带的方法:

import time
from multiprocessing import Pool

def test(x):
    """开平方"""
    time.sleep(1)
    return x * x

def main():
    pool = Pool()
    task = pool.apply_async(test, (10, ))
    print(task)
    try:
        task.get(timeout=1) # raises multiprocessing.TimeoutError
    except Exception:
        print("超时了~")

if __name__ == '__main__':
    main()

输出:( apply_async返回一个 ApplyResult类,里面有个get方法可以获取返回值)

<multiprocessing.pool.ApplyResult object at 0x7fbc354f50b8>
超时了~

再举个例子,顺便把 Pool里面的 mapimap方法搞个案例(类比jq)

import time
from multiprocessing import Pool

def test(x):
    return x * x

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        task = pool.apply_async(test, (10, ))
        print(task.get(timeout=1))

        obj_list = pool.map(test, range(10))
        print(obj_list)
        # 返回一个可迭代类的实例对象
        obj_iter = pool.imap(test, range(10))
        print(obj_iter)
        next(obj_iter)
        for i in obj_iter:
            print(i, end=" ")

输出:

100
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
<multiprocessing.pool.IMapIterator object at 0x7ff7f9734198>
1 4 9 16 25 36 49 64 81

微微看一眼源码:(基础忘了可以查看==> 点我 )

class IMapIterator(object):
    def __init__(self, cache):
        self._cond = threading.Condition(threading.Lock())
        self._job = next(job_counter)
        self._cache = cache
        self._items = collections.deque()
        self._index = 0
        self._length = None
        self._unsorted = {}
        cache[self._job] = self

    def __iter__(self):
        return self # 返回一个迭代器

    # 实现next方法
    def next(self, timeout=None):
        with self._cond:
            try:
                item = self._items.popleft()
            except IndexError:
                if self._index == self._length:
                    raise StopIteration from None
                self._cond.wait(timeout)
                try:
                    item = self._items.popleft()
                except IndexError:
                    if self._index == self._length:
                        raise StopIteration from None
                    raise TimeoutError from None

        success, value = item
        if success:
            return value
        raise value
......

扩展:优雅杀死子进程的探讨 https://segmentfault.com/q/1010000005077517


1.4.拓展之subprocess

官方文档:https://docs.python.org/3/library/subprocess.html

还记得之前李代桃僵的 execlxxx系列吗?

这不, subprocess就是它的一层封装,当然了要强大的多,先看个例子:(以 os.execlp的例子为引)

import subprocess

def main():
    # os.execlp("ls", "ls", "-al")  # 执行Path环境变量可以搜索到的命令
    result = subprocess.run(["ls", "-al"])
    print(result)

if __name__ == '__main__':
    main()

输出

总用量 44
drwxrwxr-x 2 dnt dnt 4096 8月   7 17:32 .
drwxrwxr-x 4 dnt dnt 4096 8月   6 08:01 ..
-rw-rw-r-- 1 dnt dnt  151 8月   3 10:49 0.assert.py
-rw-rw-r-- 1 dnt dnt  723 8月   5 18:00 1.process2.py
-rw-rw-r-- 1 dnt dnt  501 8月   3 10:20 1.process.py
-rw-rw-r-- 1 dnt dnt 1286 8月   6 08:16 2.pool1.py
-rw-rw-r-- 1 dnt dnt  340 8月   7 16:38 2.pool2.py
-rw-rw-r-- 1 dnt dnt  481 8月   7 16:50 2.pool3.py
-rw-rw-r-- 1 dnt dnt  652 8月   5 17:01 2.pool.py
-rw-rw-r-- 1 dnt dnt  191 8月   7 17:33 3.subprocess.py
CompletedProcess(args=['ls', '-al'], returncode=0)

文档

现在看下官方的文档描述来理解一下:

r"""
具有可访问I / O流的子进程
Subprocesses with accessible I/O streams

此模块允许您生成进程,连接到它们输入/输出/错误管道,并获取其返回代码。
This module allows you to spawn processes, connect to their
input/output/error pipes, and obtain their return codes.

完整文档可以查看:https://docs.python.org/3/library/subprocess.html
For a complete description of this module see the Python documentation.

Main API
========
run(...): 运行命令,等待它完成,然后返回`CompletedProcess`实例。
Runs a command, waits for it to complete, 
then returns a CompletedProcess instance.

Popen(...): 用于在新进程中灵活执行命令的类
A class for flexibly executing a command in a new process

Constants(常量)
---------
DEVNULL: 特殊值,表示应该使用`os.devnull`
Special value that indicates that os.devnull should be used

PIPE:    表示应创建`PIPE`管道的特殊值
Special value that indicates a pipe should be created

STDOUT:  特殊值,表示`stderr`应该转到`stdout`
Special value that indicates that stderr should go to stdout

Older API(尽量不用,说不定以后就淘汰了)
=========
call(...): 运行命令,等待它完成,然后返回返回码。
Runs a command, waits for it to complete, then returns the return code.

check_call(...): Same as call() but raises CalledProcessError()
    if return code is not 0(返回值不是0就引发异常)

check_output(...): 与check_call()相同,但返回`stdout`的内容,而不是返回代码
Same as check_call but returns the contents of stdout instead of a return code

getoutput(...): 在shell中运行命令,等待它完成,然后返回输出
Runs a command in the shell, waits for it to complete,then returns the output

getstatusoutput(...): 在shell中运行命令,等待它完成,然后返回一个(exitcode,output)元组
Runs a command in the shell, waits for it to complete,
then returns a (exitcode, output) tuple
"""

其实看看源码很有意思:(内部其实就是调用的 os.popen【进程先导篇讲进程守护的时候用过】)

def run(*popenargs, input=None, capture_output=False,
        timeout=None, check=False, **kwargs):

    if input is not None:
        if 'stdin' in kwargs:
            raise ValueError('stdin和输入参数可能都不会被使用。')
        kwargs['stdin'] = PIPE

    if capture_output:
        if ('stdout' in kwargs) or ('stderr' in kwargs):
            raise ValueError('不能和capture_outpu一起使用stdout 或 stderr')
        kwargs['stdout'] = PIPE
        kwargs['stderr'] = PIPE

    with Popen(*popenargs, **kwargs) as process:
        try:
            stdout, stderr = process.communicate(input, timeout=timeout)
        except TimeoutExpired:
            process.kill()
            stdout, stderr = process.communicate()
            raise TimeoutExpired(
                process.args, timeout, output=stdout, stderr=stderr)
        except:  # 包括KeyboardInterrupt的通信处理。
            process.kill()
            # 不用使用process.wait(),.__ exit__为我们做了这件事。
            raise
        retcode = process.poll()
        if check and retcode:
            raise CalledProcessError(
                retcode, process.args, output=stdout, stderr=stderr)
    return CompletedProcess(process.args, retcode, stdout, stderr)

返回值类型: CompletedProcess

# https://github.com/lotapp/cpython3/blob/master/Lib/subprocess.py
class CompletedProcess(object):
    def __init__(self, args, returncode, stdout=None, stderr=None):
        self.args = args
        self.returncode = returncode
        self.stdout = stdout
        self.stderr = stderr

    def __repr__(self):
    """对象按指定的格式显示"""
        args = [
            'args={!r}'.format(self.args),
            'returncode={!r}'.format(self.returncode)
        ]
        if self.stdout is not None:
            args.append('stdout={!r}'.format(self.stdout))
        if self.stderr is not None:
            args.append('stderr={!r}'.format(self.stderr))
        return "{}({})".format(type(self).__name__, ', '.join(args))

    def check_returncode(self):
        """如果退出代码非零,则引发CalledProcessError"""
        if self.returncode:
            raise CalledProcessError(self.returncode, self.args, self.stdout,
                                     self.stderr)

简单demo

再来个案例体会一下方便之处:

import subprocess

def main():
    result = subprocess.run(["ping", "www.baidu.com"])
    print(result.stdout)

if __name__ == '__main__':
    main()

图示:

交互demo

再来个强大的案例(交互的程序都可以,比如 ftpnslookup 等等): popen1.communicate

import subprocess

def main():
    process = subprocess.Popen(
        ["ipython3"],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE)
    try:
        # 对pstree进行交互
        out, err = process.communicate(input=b'print("hello")', timeout=3)
        print("Out:%s\nErr:%s" % (out.decode(), err.decode()))
    except TimeoutError:
        # 如果超时到期,则子进程不会被终止,需要自己处理一下
        process.kill()
        out, err = process.communicate()
        print("Out:%s\nErr:%s" % (out.decode(), err.decode()))

if __name__ == '__main__':
    main()

输出:

IPython 6.4.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: hello

In [2]: Do you really want to exit ([y]/n)?

Err:

注意点:如果超时到期,则子进程不会被终止,需要自己处理一下(官方提醒)

通信demo

这个等会说进程间通信还会说,所以简单举个例子,老规矩拿 ps aux|grep bash说事:

import subprocess


def main():
    # ps aux | grep bash
    # 进程1获取结果
    p1 = subprocess.Popen(["ps", "-aux"], stdout=subprocess.PIPE)
    # 得到进程1的结果再进行筛选
    p2 = subprocess.Popen(["grep", "bash"], stdin=p1.stdout, stdout=subprocess.PIPE)
    # 关闭写段(结果已经获取到进程2中了,防止干扰显示)
    p1.stdout.close()
    # 与流程交互:将数据发送到stdin并关闭它。
    msg_tuple = p2.communicate()
    # 输出结果
    print(msg_tuple[0].decode())

if __name__ == '__main__':
    main()

输出:(以前案例:进程间通信~PIPE匿名管道)

dnt       2470  0.0  0.1  24612  5236 pts/0    Ss   06:01   0:00 bash
dnt       2512  0.0  0.1  24744  5760 pts/1    Ss   06:02   0:00 bash
dnt      20784  0.0  0.1  24692  5588 pts/2    Ss+  06:21   0:00 /bin/bash
dnt      22377  0.0  0.0  16180  1052 pts/1    S+   06:30   0:00 grep bash

原文发布于微信公众号 - 我为Net狂(dotNetCrazy)

原文发表时间:2018-08-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏吴伟祥

Velocity语法大全 转

本文转载自:http://www.cnblogs.com/codingsilence/archive/2011/03/29/2146580.html

7140
来自专栏Golang语言社区

【Go 语言社区】并发性

并发性 并发的特点是需要锁Lock和互斥Mutex。在Java中加锁和解锁是一个复杂过程代码如下: try { mutex.acquire(); try...

370110
来自专栏技术翻译

如何编写自己的jQuery插件?

对于那些不知道的人来说,jQuery是一个JavaScript库,它包含了许多特性,非常小而且速度很快。它还包括一个易于使用的API,在所有浏览器上都是兼容的,...

18210
来自专栏听雨堂

同步等待方法

function waitVar(key,varb, fun) { //等待指定变量,返回:-1:无数据,继续等待 -2:超时 1:成功。fun不支持...

28480
来自专栏finleyMa

补充上一篇 实现基于最新chrome的动态按需加载组件

上面代码中,import函数的参数specifier,指定所要加载的模块的位置。import命令能够接受什么参数,import()函数就能接受什么参数,两者区别...

12750
来自专栏IMWeb前端团队

移动端minimvvm框架qvm实现

gitHub地址 1,移动端minimvvm框架qvm实现 qvm概念,一个适用于移动端的mini mvvm(什么是mvvm?没了解的同学自己去了解)框架。参考...

225100
来自专栏北京马哥教育

python实现简单爬虫功能

iOS开发如果之前没接触过除了c和c++(c++太难了,不花个十来年基本不可能精通)的语言,第二门语言最好的选择就是Python.原因就是 1.语法简单 2.库...

33870
来自专栏木子昭的博客

css进阶 less的使用

less 官网http://lesscss.org/ ? less.png npm install -g less # 查看版本 lessc -v 编写les...

42260
来自专栏Django中文社区

Django模板标签regroup的妙用

在使用 Django 开发时,有时候我们需要在模板中按对象的某个属性分组显示一系列数据。例如博客文章按照时间归档分组显示文章列表(示例效果请看我的博客的归档页面...

33760
来自专栏GreenLeaves

JS框架设计之加载器所在路径的探知一模块加载系统

1、要加载一个模块,我们需要一个URL作为加载地址,一个script作为加载媒介,但用户在require是都用ID,我们需要一个将ID转换为URL的方法,思路很...

21950

扫码关注云+社区

领取腾讯云代金券