假设我有一个发布者和多个监听器。当发布者发送消息时,它必须被所有侦听器接收。如果其中一个听众关闭了,他应该会在重新启动后立即收到消息。
我如何实现这一点?
我在考虑使用队列:每个监听器创建自己的队列,并向发布者发送一条带有队列位置的订阅消息。发布者将该位置保存到一个文件或DB中,并开始向该队列发送消息。
因此,这将是时间线:
Publisher已启动。目前还没有监听器。
发布者发送消息% 1。
发布者发送消息2。
发布者发送消息3。
侦听器1启动并订阅publisher。
发布者发送消息4。
监听程序1接收消息4。
Listener 2启动并订阅publisher。
发布者发送消息5。
监听程序1接收消息5。
监听器2接收消息5。
Listener 2 chrashes
发布者发送消息6。
监听器1接收消息6。
Publisher发送消息7。
监听器1接收消息7。
Listener 2重新启动,无需再次订阅。
监听器2接收消息6。
监听器2接收消息7。
底线是我需要每个监听器一个队列,以及一个队列或通道来发送和接收“开始监听”和“停止监听”的消息。我是在思考正确的方向,还是完全错了?
发布于 2011-05-27 22:57:57
您不应该为每个订阅者单独创建一个队列,但您至少需要两个队列。可伸缩性的最初关键是确保当发布者传递其初始消息时,您不会试图在该时间点将其“散布”给所有订阅者。相反,您将它放在一个已接收的队列中,并立即返回,让发布者知道它成功了。在那里,您拥有由主接收队列提供的工作进程,它们的职责是将消息“散布”给各个订阅者。它通过找出这些订阅者是谁来实现这一点,并生成N条消息,其中包含来自发布者的原始消息以及每个监听器的地址/绑定信息,并将这些消息放到一个传递队列中。最后,还有一些工作者负责将消息从传递队列中拉出,并尝试使用地址/绑定信息进行传递。
可以通过将消息移动到重试队列来处理传递错误,在重试队列中,消息将休眠X个时间量,然后再次被替换到传递队列中。然后,你当然必须处理有害的消息,比如你重试了5次,而监听器每次都会抛出错误。这些将需要移动到某种类型的死信队列中以进行错误报告。
发布于 2011-05-27 23:03:40
您说得对,这就是NServiceBus (例如)如何在MSMQ上实现发布订阅的。点击这里了解更多信息:http://docs.particular.net/samples/pubsub/。
https://stackoverflow.com/questions/6153014
复制相似问题