基本上,有一个小的aiohttp应用程序,它接收Impala查询列表,然后发送给Impala。然而,有些查询可能需要很长时间才能完成,因此决定以异步/并行的方式完成。我得到了一个线程工作的解决方案,但我想看看是否有可能只使用asyncio/tornado实现相同的速度。
我的代码如下:
async def run(self, queries):
# Here I validate queries
query_list = await self.build_query_list(split_queries) # Format: [[queries for connection_1], [queries for connection_2], ...]
start = time.time()
# Assing group of queries to each connection and wait results
result_queue = deque()
await multi([self.execute_impala_query(connection.connection, query_list[index], result_queue) for index, connection in enumerate(connection_list)])
# Close all connections
[await self.impala_connect_advance_pool.release_connection(connection) for connection in connection_list]
# Wait for Impala responses
while len(result_queue) < connect_limit:
continue
# Send results back
async def execute_impala_query(self, impala_connect, queries, queue):
return await multi([self.impala_response_to_json_response(impala_connect.cursor(), query, queue) for query in queries])
async def impala_response_to_json_response(self, impala_cursor, query, queue):
self.logger.info('execute query: {}'.format(query))
print ('execute query: {}'.format(query))
def get_results():
impala_cursor.execute(query)
results = as_pandas(impala_cursor)
impala_cursor.close()
self.logger.info('{} completed'.format(query))
print ('{} completed'.format(query))
queue.append(results.to_json(orient='records'))
IOLoop.current().spawn_callback(get_results)一旦它运行了,我可以看到'execute query: query‘消息被打印在stdout中,我假设它们都被触发并正在执行,然而,它花费的时间是线程版本的2倍(或更多)。我是不是把整个异步概念搞错了,还是在方法中的某个地方弄错了?
发布于 2018-09-04 16:27:53
整个异步概念错误是的,仅仅用spawn_callback调用一个函数并不能使其成为异步的:你的DB连接器应该支持异步IO。正如我所看到的:我建议您研究一下execute_async方法。然后你需要像Impyla's _wait_to_finish一样编写你自己的等待函数,但是使用tornado.gen.sleep而不是time.sleep()。
https://stackoverflow.com/questions/52154935
复制相似问题