首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >Python: Aioimaplib捕获异常

Python: Aioimaplib捕获异常
EN

Stack Overflow用户
提问于 2018-06-03 23:49:24
回答 1查看 312关注 0票数 2

我正在尝试使用aioimaplib异步检查多个imap登录信息。只要imap服务器可访问和/或客户端不超时,此代码就可以工作。

捕获异常的正确方法是什么?

异常示例:

代码语言:javascript
复制
ERROR:asyncio:Task exception was never retrieved future: <Task finished coro=<BaseEventLoop.create_connection() done, defined at G:\WinPython-3.5.4\python-3.5.4.amd64\lib\asyncio\base_events.py:679> exception=TimeoutError(10060, "Connect call failed ('74.117.114.100', 993)")>

代码:

代码语言:javascript
复制
account_infos = [
    # User            Password     Server
    ('user1@web.com', 'password1', 'imap.google.com'),
    ('user2@web.com', 'password2', 'imap.yandex.com'),
    ('user3@web.com', 'password3', 'imap.server3.com'),
]


class MailLogin:
    def __init__(self):
        self.loop = asyncio.get_event_loop()
        self.queue = asyncio.Queue(loop=self.loop)
        self.max_workers = 2

    async def produce_work(self):
        for i in account_infos:
            await self.queue.put(i)
        for _ in range(max_workers):
            await self.queue.put((None, None, None))

    async def worker(self):
        while True:
            (username, password, server) = await self.queue.get()
            if username is None:
                break

            while True:
                try:
                    s = IMAP4_SSL(server)
                    await s.wait_hello_from_server()
                    r = await s.login(username, password)
                    await s.logout()
                    if r.result != 'NO':
                        print('Information works')
                except Exception as e:
                    # DOES NOT CATCH
                    print(str(e))
                else:
                    break

    def start(self):
        try:
            self.loop.run_until_complete(
                asyncio.gather(self.produce_work(), *[self.worker() for _ in range(self.max_workers)],
                               loop=self.loop, return_exceptions=True)
            )
        finally:
            print('Done')


if __name__ == '__main__':
    MailLogin().start()
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-06-09 06:25:22

several ways可以做到这一点,但是TimeoutError可能在您的except中被捕获。您看不到它,因为str(e)是一个空字符串。

你可以看到asyncio的stacks enabling debug mode

首先,您可以像以前一样捕获异常:

代码语言:javascript
复制
async def fail_fun():
    try:
        imap_client = aioimaplib.IMAP4_SSL(host='foo', timeout=1)
        await imap_client.wait_hello_from_server()
    except Exception as e:
        print('Exception : ' + str(e))

if __name__ == '__main__':
    get_event_loop().run_until_complete(fail_fun())

其次,您可以在run_until_complete捕获异常

代码语言:javascript
复制
async def fail_fun():
    imap_client = aioimaplib.IMAP4_SSL(host='foo', timeout=1)
    await imap_client.wait_hello_from_server()

if __name__ == '__main__':
    try:
        get_event_loop().run_until_complete(fail_fun())
    except Exception as e:
        print('Exception : ' + str(e))

连接是通过使用create_task包装loop.create_connection协程建立的:我们希望在IMAP4构造函数和__init__ should return None中建立连接。

因此,如果你的主机有一个错误的值,你可以在之前测试它,或者等待超时:

代码语言:javascript
复制
socket.gaierror: [Errno -5] No address associated with hostname

如果主机在超时之前没有响应,您可以提高超时。如果在连接过程中连接丢失,您可以在IMAP4构造函数中添加连接丢失回调。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50668191

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档