首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在Python中订阅NATS主题并不断接收消息?

如何在Python中订阅NATS主题并不断接收消息?
EN

Stack Overflow用户
提问于 2020-09-11 19:41:49
回答 2查看 2.2K关注 0票数 2

我尝试了下面的例子(来自this页面):

代码语言:javascript
运行
复制
nc = NATS()

await nc.connect(servers=["nats://demo.nats.io:4222"])

future = asyncio.Future()

async def cb(msg):
  nonlocal future
  future.set_result(msg)

await nc.subscribe("updates", cb=cb)
await nc.publish("updates", b'All is Well')
await nc.flush()

# Wait for message to come in
msg = await asyncio.wait_for(future, 1)

但这似乎只对接收一条消息有用。我如何订阅并继续接收消息?

我也见过the package example,但它似乎只是扮演对话双方的角色,然后退出。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-09-11 22:46:07

我不太了解python,但看起来您只是在等待一条消息和程序结束。您应该看看订阅者示例here。正如您所看到的,有一个循环将永远等待或等待SIGTERM信号。

票数 2
EN

Stack Overflow用户

发布于 2020-09-11 22:50:39

您还可以找到一个长时间运行的服务示例:https://github.com/nats-io/nats.py/blob/master/examples/service.py

代码语言:javascript
运行
复制
import asyncio
from nats.aio.client import Client as NATS

async def run(loop):
    nc = NATS()

    async def disconnected_cb():
        print("Got disconnected...")

    async def reconnected_cb():
        print("Got reconnected...")

    await nc.connect("127.0.0.1",
                     reconnected_cb=reconnected_cb,
                     disconnected_cb=disconnected_cb,
                     max_reconnect_attempts=-1,
                     loop=loop)

    async def help_request(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))
        await nc.publish(reply, b'I can help')

    # Use queue named 'workers' for distributing requests
    # among subscribers.
    await nc.subscribe("help", "workers", help_request)

    print("Listening for requests on 'help' subject...")
    for i in range(1, 1000000):
        await asyncio.sleep(1)
        try:
            response = await nc.request("help", b'hi')
            print(response)
        except Exception as e:
            print("Error:", e)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()
    loop.close()
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63846385

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档