首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >为什么多进程python服务器不能工作?

为什么多进程python服务器不能工作?
EN

Stack Overflow用户
提问于 2020-07-08 15:37:47
回答 3查看 5.6K关注 0票数 10

我通过多进程池实例化每个子进程一个grpc服务器。当我使用多个客户机访问服务器时,我发现了以下两个问题:

  • 所有客户端都访问同一个服务器子进程
  • 客户端提高MaybeEncodingError

顺便说一下,我的开发环境是:

代码语言:javascript
复制
[OS]
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G5033

[packages]
grpcio = '1.30.0'
grpcio-tools = '1.30.0'
multiprocess = "0.70.10"
grpcio-status = "1.30.0"
googleapis-common-protos = "1.52.0"

[requires]
python_version = "3.8.3"

以下是服务器输出:

代码语言:javascript
复制
[PID 83287] Binding to 'localhost:52909'
[PID 83288] Starting new server.
[PID 83289] Starting new server.
[PID 83290] Starting new server.
[PID 83291] Starting new server.
[PID 83292] Starting new server.
[PID 83293] Starting new server.
[PID 83294] Starting new server.
[PID 83295] Starting new server.
[PID 83295] Determining primality of 2
[PID 83295] Determining primality of 9
[PID 83295] Determining primality of 23
[PID 83295] Determining primality of 16
[PID 83295] Determining primality of 10
[PID 83295] Determining primality of 3
[PID 83295] Determining primality of 24
[PID 83295] Determining primality of 17
[PID 83295] Determining primality of 11
[PID 83295] Determining primality of 25
[PID 83295] Determining primality of 4
[PID 83295] Determining primality of 18
[PID 83295] Determining primality of 5
[PID 83295] Determining primality of 12
[PID 83295] Determining primality of 19
[PID 83295] Determining primality of 26
[PID 83295] Determining primality of 6
[PID 83295] Determining primality of 13
[PID 83295] Determining primality of 27
[PID 83295] Determining primality of 20
[PID 83295] Determining primality of 7
[PID 83295] Determining primality of 14
[PID 83295] Determining primality of 8
[PID 83295] Determining primality of 28
[PID 83295] Determining primality of 15

如上文所示,所有客户端都访问相同的服务器subprocess**[PID 83295]**.。为什么

以下是客户端错误信息:

代码语言:javascript
复制
Traceback (most recent call last):
  File "/Users/zhaolong/PycharmProjects/pipEnvGrpc/grpc_multi/client.py", line 96, in <module>
    main()
  File "/Users/zhaolong/PycharmProjects/pipEnvGrpc/grpc_multi/client.py", line 86, in main
    primes = _calculate_primes(args.server_address)
  File "/Users/zhaolong/PycharmProjects/pipEnvGrpc/grpc_multi/client.py", line 71, in _calculate_primes
    primality = worker_pool.map(_run_worker_query, check_range)
  File "/Users/zhaolong/.local/share/virtualenvs/pipEnvGrpc-7cHuZ_0E/lib/python3.8/site-packages/multiprocess/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/zhaolong/.local/share/virtualenvs/pipEnvGrpc-7cHuZ_0E/lib/python3.8/site-packages/multiprocess/pool.py", line 771, in get
    raise self._value
multiprocess.pool.MaybeEncodingError: Error sending result: '[isPrime: true
, , isPrime: true
, , isPrime: true
, , ]'. Reason: 'PicklingError("Can't pickle <class 'prime_pb2.Primality'>: it's not the same object as prime_pb2.Primality")'

如上文所示,它提出了multiprocess.pool.MaybeEncodingError PicklingError**.。为什么?**

以下是完整的代码:

代码语言:javascript
复制
// the prime.proto from https://github.com/grpc/grpc/tree/v1.30.0/examples/python/multiprocessing

syntax = "proto3";

package prime;

// A candidate integer for primality testing.
message PrimeCandidate {
    // The candidate.
    int64 candidate = 1;
}

// The primality of the requested integer candidate.
message Primality {
    // Is the candidate prime?
    bool isPrime = 1;
}

// Service to check primality.
service PrimeChecker {
    // Determines the primality of an integer.
    rpc check (PrimeCandidate) returns (Primality) {}
}
代码语言:javascript
复制
# the server.py from https://github.com/grpc/grpc/tree/v1.30.0/examples/python/multiprocessing,
# and I have modify some palces.


"""An example of multiprocess concurrency with gRPC."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import multiprocessing
from concurrent import futures
import contextlib
import datetime
import logging
import math
import time
import socket
import sys
from multiprocess import pool
import grpc

from grpc_multi import prime_pb2
from grpc_multi import prime_pb2_grpc

_LOGGER = logging.getLogger(__name__)

_ONE_DAY = datetime.timedelta(days=1)
_PROCESS_COUNT = multiprocessing.cpu_count()
_THREAD_CONCURRENCY = _PROCESS_COUNT


def is_prime(n):
    for i in range(2, int(math.ceil(math.sqrt(n)))):
        if n % i == 0:
            return False
    else:
        return True


class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer):

    def check(self, request, context):
        _LOGGER.info('Determining primality of %s', request.candidate)
        return prime_pb2.Primality(isPrime=is_prime(request.candidate))


def _wait_forever(server):
    try:
        while True:
            time.sleep(_ONE_DAY.total_seconds())
    except KeyboardInterrupt:
        server.stop(None)


def _run_server(bind_address):
    """Start a server in a subprocess."""
    _LOGGER.info('Starting new server.')
    options = (('grpc.so_reuseport', 1),)

    server = grpc.server(futures.ThreadPoolExecutor(
        max_workers=_THREAD_CONCURRENCY,),
                         options=options)
    prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server)
    server.add_insecure_port(bind_address)
    server.start()
    # _wait_forever(server)
    server.wait_for_termination()


@contextlib.contextmanager
def _reserve_port():
    """Find and reserve a port for all subprocesses to use."""
    sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
    if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
        raise RuntimeError("Failed to set SO_REUSEPORT.")
    sock.bind(('', 0))
    try:
        yield sock.getsockname()[1]
    finally:
        sock.close()


def main():
    with _reserve_port() as port:
        bind_address = 'localhost:{}'.format(port)
        _LOGGER.info("Binding to '%s'", bind_address)
        sys.stdout.flush()
        addrs = [bind_address for _ in range(_PROCESS_COUNT)]
        server_pool = pool.Pool(processes=_PROCESS_COUNT)
        server_pool.map(_run_server, addrs)
        server_pool.close()
        # server_pool.join()

if __name__ == '__main__':
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter('[PID %(process)d] %(message)s')
    handler.setFormatter(formatter)
    _LOGGER.addHandler(handler)
    _LOGGER.setLevel(logging.INFO)
    main()
代码语言:javascript
复制
# the client.py from https://github.com/grpc/grpc/tree/v1.30.0/examples/python/multiprocessing,
# and I have modify some palces.


"""An example of multiprocessing concurrency with gRPC."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import atexit
import logging
from multiprocess import pool
import operator
import sys

import grpc

from grpc_multi import prime_pb2
from grpc_multi import prime_pb2_grpc

_PROCESS_COUNT = 4
_MAXIMUM_CANDIDATE = 100

# Each worker process initializes a single channel after forking.
# It's regrettable, but to ensure that each subprocess only has to instantiate
# a single channel to be reused across all RPCs, we use globals.
_worker_channel_singleton = None
_worker_stub_singleton = None

_LOGGER = logging.getLogger(__name__)


def _shutdown_worker():
    _LOGGER.info('Shutting worker process down.')
    if _worker_channel_singleton is not None:
        _worker_channel_singleton.close()


def _initialize_worker(server_address):
    global _worker_channel_singleton  # pylint: disable=global-statement
    global _worker_stub_singleton  # pylint: disable=global-statement
    _LOGGER.info('Initializing worker process.')
    _worker_channel_singleton = grpc.insecure_channel(server_address)
    _worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub(
        _worker_channel_singleton)
    atexit.register(_shutdown_worker)


def _run_worker_query(primality_candidate):
    _LOGGER.info('Checking primality of %s.', primality_candidate)
    return _worker_stub_singleton.check(
        prime_pb2.PrimeCandidate(candidate=primality_candidate))


def _calculate_primes(server_address):
    worker_pool = pool.Pool(processes=_PROCESS_COUNT,
                                       initializer=_initialize_worker,
                                       initargs=(server_address,))
    check_range = range(2, _MAXIMUM_CANDIDATE)
    primality = worker_pool.map(_run_worker_query, check_range)
    worker_pool.close()
    worker_pool.join()
    primes = zip(check_range, map(operator.attrgetter('isPrime'), primality))
    return tuple(primes)


def main():
    msg = 'Determine the primality of the first {} integers.'.format(
        _MAXIMUM_CANDIDATE)
    parser = argparse.ArgumentParser(description=msg)
    parser.add_argument('--server_address',
                        default='localhost:52909',
                        help='The address of the server (e.g. localhost:50051)')
    args = parser.parse_args()
    primes = _calculate_primes(args.server_address)
    print(primes)


if __name__ == '__main__':
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter('[PID %(process)d] %(message)s')
    handler.setFormatter(formatter)
    _LOGGER.addHandler(handler)
    _LOGGER.setLevel(logging.INFO)
    main()

按照郑丽蒂的建议,我尝试过多种渠道为客户服务,但似乎仍然不起作用,我的方法错了吗?

代码语言:javascript
复制
def _run_worker_query(param):
    server_address, primality_candidate = param
    worker_channel_singleton = grpc.insecure_channel(server_address)
    worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub(worker_channel_singleton)
    _LOGGER.info('Checking primality of %s.', primality_candidate)
    res = worker_stub_singleton.check(
        prime_pb2.PrimeCandidate(candidate=primality_candidate))
    return res.isPrime


def _calculate_primes(server_address):
    # worker_pool = pool.Pool(processes=_PROCESS_COUNT,
    #                                    initializer=_initialize_worker,
    #                                    initargs=(server_address,))
    worker_pool = pool.Pool(processes=_PROCESS_COUNT)
    check_range = range(2, _MAXIMUM_CANDIDATE)
    params = [(server_address, r) for r in check_range]
    primality = worker_pool.map(_run_worker_query, params)
    worker_pool.close()
    worker_pool.join()
    # primes = zip(check_range, map(operator.attrgetter('isPrime'), primality))
    primes = zip(check_range, primality)
    return tuple(primes)

服务器输出是:

代码语言:javascript
复制
[PID 43279] Determining primality of 3
[PID 43279] Determining primality of 2
[PID 43279] Determining primality of 4
[PID 43279] Determining primality of 5
[PID 43279] Determining primality of 6
[PID 43279] Determining primality of 7
[PID 43279] Determining primality of 8
[PID 43279] Determining primality of 9
EN

回答 3

Stack Overflow用户

发布于 2020-07-09 17:14:24

感谢您提供详细的复制案例。我能复制第一期,但不能复制第二期。

多处理gRPC服务器的使用是正确的。在深入研究细节之前,我想重复一下关于多处理gRPC服务器的棘手问题。Python启动后不支持gRPC。要在gRPC中启用多处理,应用程序需要尽早分叉(参见示例-多处理)。

关于第一个问题,原因是客户端只创建了一个通道。_initialize_worker使用全局变量_worker_channel_singleton来存储通道,这意味着尽管存在多个进程,但只有一个通道。一个gRPC通道意味着一个TCP连接。SO_REUSEPORT的自动负载平衡只在TCP连接的粒度上起作用。多渠道尝试,它应该能解决你的问题。(附带注意:要负载平衡来自单个gRPC信道的请求,可以使用L7负载均衡器,例如L7 ILB)。

关于第二个问题,protobuf消息是不可挑选的。要在进程之间传递protobuf消息,客户端需要将它们序列化为字符串并在另一端反序列化。这是由于protobuf的性质,序列化的消息可以映射到许多(如果不是无限的)可能的原始消息。如果没有模式,就不可能解码。

票数 6
EN

Stack Overflow用户

发布于 2020-09-08 03:30:14

响应使用的单一PID。

这似乎是OSX的一个问题。我在Ubuntu中尝试过这一点,它可以很好地处理所有正在使用的服务器PID。

票数 3
EN

Stack Overflow用户

发布于 2022-03-10 05:24:31

如果macOS上的多进程不使用单独的进程,则应将start方法更改为spawn

代码语言:javascript
复制
if os.uname().sysname == "Darwin":
    multiprocessing.set_start_method('spawn')
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62798507

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档