我试图在Qt应用程序中的jupyter内核中执行代码。我有下面的代码片段,它应该异步运行代码,然后打印结果。
import sys
import asyncio
import qasync
from qasync import QApplication
from PySide6.QtWidgets import QWidget
from jupyter_client import AsyncKernelManager
CODE = """print('test')"""
class Test():
def __init__(self):
kernel_manager = AsyncKernelManager()
kernel_manager.start_kernel()
self.client = kernel_manager.client()
self.client.start_channels()
def run(self):
loop = asyncio.get_event_loop()
asyncio.ensure_future(self.execute(), loop=loop)
async def execute(self):
self.client.execute(CODE)
response: Coroutine = self.client.get_shell_msg()
print('Before')
res = await response
print('After')
def main():
app = QApplication(sys.argv)
test = Test()
test.run()
sys.exit(app.exec())
main()
通过上面的输出,我得到了以下输出
/tmp/test/test.py:16: RuntimeWarning: coroutine 'KernelManager._async_start_kernel' was never awaited
kernel_manager.start_kernel()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/tmp/test/test.py:22: DeprecationWarning: There is no current event loop
loop = asyncio.get_event_loop()
因此,尝试根据Q异步中的一个示例调整代码,使之类似于
async def main():
app = QApplication(sys.argv)
test = Test()
test.run()
sys.exit(app.exec())
qasync.run(main())
将导致以下异常
Traceback (most recent call last):
File "/tmp/test/test.py", line 40, in <module>
qasync.run(main())
File "/tmp/test/.venv/lib/python3.10/site-packages/qasync/__init__.py", line 821, in run
return asyncio.run(*args, **kwargs)
File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/tmp/test/.venv/lib/python3.10/site-packages/qasync/__init__.py", line 409, in run_until_complete
return future.result()
File "/tmp/test/test.py", line 34, in main
app = QApplication(sys.argv)
RuntimeError: Please destroy the QApplication singleton before creating a new QApplication instance.
我现在很迷茫,有人知道怎么做吗?
发布于 2022-04-23 18:55:56
您必须创建一个QEventLoop,start_kernel也必须使用等待。另一方面,它首先导入PySide6,然后导入其他依赖于PySide6的库,比如q异步,这样就可以推断出正确的Qt绑定。
import sys
import asyncio
from functools import cached_property
from PySide6.QtWidgets import QApplication
import qasync
from jupyter_client import AsyncKernelManager
CODE = """print('test')"""
class Test:
@cached_property
def kernel_manager(self):
return AsyncKernelManager()
@cached_property
def client(self):
return self.kernel_manager.client()
async def start(self):
await self.kernel_manager.start_kernel()
self.client.start_channels()
asyncio.ensure_future(self.execute())
async def execute(self):
self.client.execute(CODE)
response = self.client.get_shell_msg()
print("Before")
res = await response
print("After", res)
def main():
app = QApplication(sys.argv)
loop = qasync.QEventLoop(app)
asyncio.set_event_loop(loop)
test = Test()
asyncio.ensure_future(test.start())
with loop:
loop.run_forever()
if __name__ == "__main__":
main()
https://stackoverflow.com/questions/71954476
复制相似问题