我开始使用RabbitMQ,在遵循教程之后,我现在正试图让它以我需要的方式工作,而且我遇到了困难。我所拥有的设置是,我首先需要能够创建一个RPC,然后基于来自这个响应的响应,客户机将(或不会)向工作队列发送另一条消息(在那里,我不需要对客户机的响应)。不幸的是,我的努力,使它一起工作,似乎没有达到我想要的方式。在服务器端,我有类似的东西(我尝试过许多不同的变体,都有相同的问题):
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "jobs", type: "direct", durable: true);
// I started with a named queue, not sure if that's better or worse for this
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, 
    exchange: "jobs",
    routingKey: "saveJob_queue");
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    // save stuff that was sent with the saveJob_queue routingKey
}
channel.BasicConsume(queue: queueName, 
    noAck: false,
    consumer: consumer);
// set up channel for RPC
// Not sure if this has to have another channel, but it wasn't working on the same channel either
rpcChannel = connection.CreateModel();
var rpcQueueName = rpcChannel.QueueDeclare().QueueName;
rpcChannel.QueueBind(queue: rpcQueueName,  
    exchange: "jobs",
    routingKey: "rpc_CheckJob_queue");
var rpcConsumer = new EventingBasicConsumer(rpcChannel);
rpcConsumer.Received += (model, ea) =>
{
    // do my remote call and send back a response
}我遇到的问题是,发送给jobs交换的带有路由密钥rpc_CheckJob_queue的消息仍然会触发第一个信道上的Recieved事件,尽管它应该只接收saveJob_queue路由。我可以在这个处理程序中检查ea.RoutingKey,忽略这些消息,但是我不明白它们是如何结束的,为什么一开始就出现在那里?
设置连接的正确方法是什么,这样它就可以接收工作队列消息和RPC消息,并正确地处理它们。
发布于 2016-04-08 13:51:24
因此,我放弃了这一点,决定只在Received事件中过滤。我认为问题在于RabbitMQ只在通道上有一个Received事件,而没有在队列上。因此,Received事件无论以哪种方式都会被击中。所以现在我有了这个:
channel.QueueDeclare(queue: queueName,  
         durable: true,
         exclusive: false,
         autoDelete: false,
         arguments: null);
channel.QueueDeclare(queue: rpcQueueName,
         durable: false,
         exclusive: false,
         autoDelete: false,
         arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    switch (ea.RoutingKey)
    {
        case queueName:
            SaveJob(ea);
            break;
        case rpcQueueName:
            CheckJob(ea);
            break;
    }
    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName, 
    noAck: false,
    consumer: consumer);
channel.BasicConsume(queue: rpcQueueName,
                     noAck: false,
                     consumer: consumer);我愿意接受更好的建议,因为这似乎有点离谱。
所以发送只是:
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
                     routingKey: queueName,
                     basicProperties: properties,
                     body: body);从事正规工作,并:
var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;
var messageBytes = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "",
                     routingKey: rpcQueueName,
                     basicProperties: props,
                     body: messageBytes);
while (true)   
{
    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    if (ea.BasicProperties.CorrelationId == corrId)
    {
        return ea.Body != null && ea.Body.Any() ? BitConverter.ToInt32(ea.Body,0) : (int?)null;
    }
}为了RPC。
发布于 2016-04-08 11:08:35
由于您没有为您的队列指定名称,因此我怀疑您正在两次获得相同的队列。所以我认为现在发生的事情本质上是这样的。
作业--> saveJob_queue --> SomeSystemQueue
作业->rpc_CheckJob_queue-> SomeSystemQueue
尝试选择两个单独的队列名,然后再次运行代码。所以,不是这样的:
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, 
    exchange: "jobs",
    routingKey: "saveJob_queue");拥有:
var name = "Queue A";
channel.QueueDeclare(name);
channel.QueueBind(queue: queueName, 
        exchange: "jobs",
        routingKey: "saveJob_queue");然后将第二个队列命名为其他内容,并尝试一下。
https://stackoverflow.com/questions/36477071
复制相似问题