有没有办法在所有准备运行的协程中控制调度优先级?
具体地说,我有几个协程处理来自网络的流I/O到几个队列,第二组协程将数据从队列摄取到数据结构中。这些摄取协程发出信号通知第三组协程,每当新数据被摄取时,这些协程就会分析该数据结构。
从网络到达的数据是具有不确定消息率的无限数据流。我希望分析步骤在新数据到达时立即运行,但不是在处理完所有挂起的数据之前。我看到的问题是,根据调度的顺序,分析协程可以在也有数据准备好的读取器协程之前运行,因此分析协程甚至不能检查未决数据的摄取队列,因为它可能还没有从网络上读取,即使这些读取器协程已经准备好运行。
一种解决方案可能是将协程构建到优先级组中,以便读者协程总是在分析协程之前调度,如果它们都能够运行的话,但我看不到这样做的方法。
asyncio有没有可以实现这种优先级排序的特性?或者,也许我问错了问题,我可以重新构造协程,这样就不会发生这种情况(但我看不出来)。
-编辑--
基本上,我有一个N个协程,看起来像这样:
while True:
data = await socket.get()
ingestData(data)
self.event.notify()
所以我遇到的问题是,在执行这个协程时,我没有办法知道其他N-1套接字是否有准备好的数据,所以我不知道是否应该通知事件。如果我可以将这些协程优先于分析协程(正在等待self.event.wait()),那么我可以确定当安排分析协程时,它们都是不可运行的。
发布于 2018-02-02 00:50:23
asyncio
不支持显式指定协程优先级,但是可以直接使用库提供的工具来实现相同的效果。给出你问题中的例子:
async def process_pending():
while True:
data = await socket.get()
ingestData(data)
self.event.notify()
您可以使用asyncio.wait
直接等待套接字,然后您就会知道哪些套接字是可操作的,并且只有在处理完所有套接字之后才通知分析器。例如:
def _read_task(self, socket):
loop = asyncio.get_event_loop()
task = loop.create_task(socket.get())
task.__process_socket = socket
return task
async def process_pending_all(self):
tasks = {self._read_task(socket) for socket in self.sockets}
while True:
done, not_done = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
ingestData(task.result())
not_done.add(self._read_task(task.__process_socket))
tasks = not_done
self.event.notify()
https://stackoverflow.com/questions/48361207
复制相似问题