我有一个ASP.NET MVC操作,它验证连接字符串是否正常工作,以便使用RabbitMQ连接到服务器。我的方法是创建一个队列,订阅它,然后立即向它发布一条消息。我希望消息最多几秒钟后就会出现在我的订阅者上,但事实并非如此。订阅者只会在我第一次调用操作时得到通知(就在我使用基于web的管理器从RabbitMQ中删除队列之后),但一旦我发布了下面的消息并创建了队列,就无法调用订阅者。请看一下我的代码,如果你看到了我没有看到的东西,请告诉我。提前谢谢。
//This is just the action called on a POST request.
//Here is where the test is done.
[HttpPost]
public void DoConnectionVerificationAsync()
{
const string paramName = "queueSuccessful";
try
{
//This is my routing key
const string routingKey = "management.verify";
//Here I declare the key and the exchange and bind them.
var queue = Queue.DeclareDurable(routingKey);
var exchange = Exchange.DeclareDirect("Orchard");
queue.BindTo(exchange, routingKey);
//Here I just generate a random int to send in the test message to the queue
Random random = new Random();
int randomInt = random.Next();
//Instantiate the actual message with the random integer.
var message = new Message<VerifyMessage>(new VerifyMessage
{
Content = randomInt.ToString(CultureInfo.InvariantCulture)
});
message.Properties.AppId = "CRM";
//Because this is an asynchronous action, here I hint the beginning of the async operation.
AsyncManager.OutstandingOperations.Increment();
//Here I have my subscriber. The subscriber gets called only the first time, when the queue hasn't been created yet. That's a problem, it should be called every time I publish, which is in the following lines of code.
_bus.Subscribe<VerifyMessage>(queue, (response, messageReceivedInfo) => Task.Factory.StartNew(() =>
{
VerifyMessage receivedMessage = response.Body;
string content = receivedMessage.Content;
int integer = int.Parse(content);
//I expect the int received from the queue is the same as the one I sent
bool success = integer == randomInt;
AsyncManager.Parameters[paramName] = success;
AsyncManager.OutstandingOperations.Decrement();
}));
//And here I publish the message. This always works, I can see the message stored in the queue using the web based RabbitMQ Manager
//The problem is that the message gets stuck in the queue and never gets sent to the subscriber defined above
using (var publishChannel = _bus.OpenPublishChannel(x => x.WithPublisherConfirms()))
{
publishChannel.Publish(exchange, routingKey, message, t =>
t.OnSuccess(() =>
{
// If we successfully publish, then there's nothing we really need to do. So this function stays empty.
})
.OnFailure(() =>
{
AsyncManager.Parameters[paramName] = false; AsyncManager.OutstandingOperations.Decrement();
}));
}
}
catch (EasyNetQException)
{
AsyncManager.Parameters[paramName] = false;
AsyncManager.OutstandingOperations.Decrement();
}
}
//These functions down here don't really matter, but I'm including them just in case so that you can see it all.
public ActionResult DoConnectionVerificationCompleted(bool queueSuccessful)
{
return RedirectToAction("VerifyQueueConnectionResult", new { queueSuccessful });
}
public ActionResult VerifyQueueConnectionResult(bool queueSuccessful)
{
VerifyQueueConnectionResultModel model = new VerifyQueueConnectionResultModel();
model.Succeded = queueSuccessful;
return View(model);
}发布于 2013-04-30 23:47:20
每次调用bus.Subscribe都会创建一个新的使用者。因此,当它第一次工作时,您的队列上只有一个使用者,当您向其发布消息时,消息将被路由到该使用者。
第二次调用bus.Subscribe时,第二个消费者被绑定到同一个队列。当RabbitMQ在单个队列上有多个使用者时,它会将消息循环传递给使用者。这是一个特性。这就是它的工作方式--开箱即用地分享。您的消息将被路由到第一个消费者,而不是您刚刚声明的消费者。对于第三个、第四个等消费者也是如此,因此消息似乎还没有到达。
发布于 2013-04-30 04:07:02
好吧,我找到了一个解决这个问题的简单方法。最后,我搜索了EasyNetQ应用程序接口中的可用方法,找到了Queue.SetAsSingleUse()。在将队列绑定到交换之后,我简单地调用了该方法,这基本上就完成了。不确定queue.SetAsSingleUse()是做什么的,但从它的名字来看,它听起来像是在发布第一条消息并将其传递给订阅者之后处理队列。在我的例子中,每次测试只使用一次队列是有意义的,但是出于某种原因想要保留队列的人可能会遇到麻烦。
发布于 2013-05-22 20:09:15
您可以为每个不同的订阅者使用不同的队列,如下所示:
bus.Subscribe<MyMessage>("my_subscription_1"...
bus.Subscribe<MyMessage>("my_subscription_2"...这样,您发布的任何MyMessage类型的消息都将到达所有这些队列。
https://stackoverflow.com/questions/16286712
复制相似问题