首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >RabbitMQ等待多个队列完成

RabbitMQ等待多个队列完成
EN

Stack Overflow用户
提问于 2012-12-13 14:03:27
回答 5查看 8.8K关注 0票数 18

好的,下面是对正在发生的事情的概述:

代码语言:javascript
运行
复制
    M <-- Message with unique id of 1234
    |
    +-Start Queue
    |
    |
    | <-- Exchange
   /|\
  / | \
 /  |  \ <-- bind to multiple queues
Q1  Q2  Q3
\   |   / <-- start of the problem is here
 \  |  / 
  \ | /
   \|/
    |
    Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
    |
    C <-- Consumer 

因此,我有一个交换推送到多个队列,每个队列都有一个任务,一旦所有任务完成,只有队列4才能开始。

因此,具有唯一id为1234的消息被发送到交换,交换将其路由到所有任务队列( Q1、Q2、Q3等).),当消息id 1234的所有任务都完成后,运行消息id 1234的Q4。

我如何实现这一点?

使用Symfony2、RabbitMQBundle和RabbitMQ 3.x

资源:

  • http://www.rabbitmq.com/tutorials/amqp-concepts.html
  • http://www.rabbitmq.com/tutorials/tutorial-six-python.html

更新#1

好吧,我想这就是我要找的:

带有并行处理的RPC,但是如何将关联Id设置为我唯一的id来分组消息并标识哪个队列?

EN

回答 5

Stack Overflow用户

回答已采纳

发布于 2012-12-13 14:36:44

在RabbitMQ站点的RPC教程中,有一种方法可以向队列中的用户传递一个“相关id”,该id可以标识您的消息。

我建议您在前3个队列中使用某种id,然后再使用另一个进程将消息从3排到某种桶中。当这些桶接收到我假设的3项任务完成时,将最后消息发送到第4队列进行处理。

如果要为一个用户向每个队列发送多个工作项,则可能需要进行一些预处理,以了解某个特定用户在队列中放置了多少项,以便进程在4之前就知道排队前预期会有多少项。

我用C#做了rabbitmq,很抱歉我的伪代码没有php风格

代码语言:javascript
运行
复制
// Client
byte[] body = new byte[size];
body[0] = uniqueUserId;
body[1] = howManyWorkItems;
body[2] = command;

// Setup your body here

Queue(body)
代码语言:javascript
运行
复制
// Server
// Process queue 1, 2, 3
Dequeue(message)

switch(message.body[2])
{
    // process however you see fit
}

processedMessages[message.body[0]]++;

if(processedMessages[message.body[0]] == message.body[1])
{
    // Send to queue 4
    Queue(newMessage)
}

对更新#1的响应

与其将客户端视为终端,不如将客户端视为服务器上的一个进程。因此,如果您在像这一个这样的服务器上安装了RPC客户机,那么您所需要做的就是让服务器处理用户唯一id的生成,并将消息发送到适当的队列:

代码语言:javascript
运行
复制
    public function call($uniqueUserId, $workItem) {
        $this->response = null;
        $this->corr_id = uniqid();

        $msg = new AMQPMessage(
            serialize(array($uniqueUserId, $workItem)),
            array('correlation_id' => $this->corr_id,
            'reply_to' => $this->callback_queue)
        );

        $this->channel->basic_publish($msg, '', 'rpc_queue');
        while(!$this->response) {
            $this->channel->wait();
        }

        // We assume that in the response we will get our id back
        return deserialize($this->response);
    }


$rpc = new Rpc();

// Get unique user information and work items here

// Pass even more information in here, like what queue to use or you could even loop over this to send all the work items to the queues they need.
$response = rpc->call($uniqueUserId, $workItem);

$responseBuckets[array[0]]++;

// Just like above code that sees if a bucket is full or not
票数 5
EN

Stack Overflow用户

发布于 2012-12-13 21:50:16

您需要实现这个:http://www.eaipatterns.com/Aggregator.html,但是Symfony的RabbitMQBundle不支持这一点,所以您必须使用底层的php。

来自包的普通消费者回调将得到一个AMQPMessage。从那里,您可以访问通道并手动发布到您的“管道和过滤器”实现中的任何交换。

票数 7
EN

Stack Overflow用户

发布于 2012-12-13 14:30:51

我有点不清楚你想在这里取得什么成就。但是,我可能会稍微修改一下设计,这样一旦从队列中清除了所有消息,您就会发布到一个单独的exchange中,该交换将发布到队列4。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/13861459

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档