我的框架(Locust,https://github.com/locustio/locust)是基于gevent和greenlets的。但是我想利用剧作家(https://playwright.dev/python/),它是建立在异步之上的。
天真地使用Playwright同步api不起作用,并给出了一个例外:
playwright._impl._api_types.Error: It looks like you are using Playwright Sync API inside the asyncio loop.
Please use the Async API instead.
我正在寻找一些关于如何将异步与gevent结合使用的最佳实践。
我尝试过几种不同的方法,但我不知道自己是否很亲近,或者我想做的事情是否可能(我对gevent有一些经验,但以前还没有真正使用过异步)
编辑:我现在有一些有用的东西(我已经移除蝗虫,只是直接产生了一些小绿,以使它更容易下垂)。这是最好的,还是有更好的解决方案?
import asyncio
import threading
from playwright.async_api import async_playwright
import gevent
def thr(i):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(do_stuff(i))
loop.close()
async def do_stuff(i):
playwright = await async_playwright().start()
browser = await playwright.chromium.launch(headless=False)
page = await browser.new_page()
await page.wait_for_timeout(5000)
await page.goto(f"https://google.com")
await page.close()
print(i)
def green(i):
t = threading.Thread(target=thr, args=(i,))
t.start()
# t.join() # joining doesnt work, but I couldnt be bothered right now :)
g1 = gevent.spawn(green, 1)
g2 = gevent.spawn(green, 2)
g1.join()
g2.join()
发布于 2022-02-05 20:45:57
在@user4815162342的评论中,我写了这样的内容:
from playwright.async_api import async_playwright # need to import this first
from gevent import monkey, spawn
import asyncio
import gevent
monkey.patch_all()
loop = asyncio.new_event_loop()
async def f():
print("start")
playwright = await async_playwright().start()
browser = await playwright.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
await page.goto(f"https://www.google.com")
print("done")
def greeny():
while True: # and not other_exit_condition
future = asyncio.run_coroutine_threadsafe(f(), loop)
while not future.done():
gevent.sleep(1)
greenlet1 = spawn(greeny)
greenlet2 = spawn(greeny)
loop.run_forever()
实际的实现总有一天会在蝗虫中结束,可能是经过一些优化(重用浏览器实例等)。
发布于 2022-08-29 03:04:23
下面是一种集成异步和gevent的简单方法:
asyncio.run_coroutine_threadsafe()
运行协同线gevent.event.Event
等待协同线解析import asyncio
import threading
import gevent
loop = asyncio.new_event_loop()
loop_thread = threading.Thread(target=loop.run_forever, daemon=True)
loop_thread.start()
async def your_coro():
# ...
def wait_until_complete(coro):
future = asyncio.run_coroutine_threadsafe(coro, loop)
event = gevent.event.Event()
future.add_dome_callback(lambda _: event.set())
event.wait()
return future.result()
result = wait_until_complete(your_coro())
https://stackoverflow.com/questions/69888850
复制相似问题