首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >RabbitMQ和EventMachine::defer

RabbitMQ和EventMachine::defer
EN

Stack Overflow用户
提问于 2011-11-26 00:25:15
回答 1查看 801关注 0票数 0

我正试着让RabbitMQ明白我的意思。我想要一个队列来侦听,当它收到一条消息时,我想让它回复一个匿名队列,这个匿名队列是通过带有多条消息的reply_to头指定的。

到目前为止,我有以下任务,既是reply_to消息的使用者,也是订阅者:

代码语言:javascript
运行
复制
desc "start_consumer", "start the test consumer"
def start_consumer
  puts "Running #{AMQP::VERSION} version of the gem"

      AMQP.start(:host => "localhost", :user => "guest", :password => "guest", :vhost => "/", 
                        :logging => true, :port => 5672) do |connection|

        channel = AMQP::Channel.new(connection)

        requests_queue = channel.queue("one", :exclusive => true, :auto_delete => true)

        Signal.trap("INT") do
          connection.close do
            EM.stop{exit}
          end
        end

        channel.prefetch(1)

        requests_queue.subscribe(:ack => true) do |header, body|
          puts "received in server #{body.inspect}"

          (0..5).each do |n|
            header.ack

            reply = {:reply => "Respone #{n}", :is_last => (n == 5)}

            AMQP::Exchange.default.publish(
                                            MultiJson.encode(reply), 
                                            :routing_key => header.reply_to,
                                            :correlation_id => header.correlation_id
                                          )


            sleep(2)
          end
        end

        puts " [x] Awaiting RPC requests"
      end          
    end

我的调用代码是:

代码语言:javascript
运行
复制
  def publish(urlSearch, routing_key)
    EM.run do

      corr_id = rand(10_000_000).to_s

      requests ||= Hash.new

      connection = AMQP.connect(:host => "localhost")

      callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => true)             

      callback_queue.subscribe do |header, body|

        reply = MultiJson.decode(body)               

        if reply[:is_last.to_s]  
          connection.close do
            EM.stop{exit}
          end
        end
      end                          

      callback_queue.append_callback(:declare) do
        AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id)
      end

    end

问题是,直到迭代中的所有消息都已发布,才会发送消息。

也就是说,在(0..5)循环中的迭代完成之后,所有消息都将被发布。

有人告诉我,使用EventMachine::defer可能是一种选择,但我不确定如何将其应用于循环。

有谁能给出一个解决这个问题的建议吗?在使用AMQP.start时,EventMachine::defer是一个好的选择吗?如果是,我该如何做?

EN

回答 1

Stack Overflow用户

发布于 2011-12-15 15:12:12

使用EM时要记住的是,反应器本身是单线程的。因此,在发送任何内容之前,它会等待循环完成,因为循环占用了该单个线程。(您真的、真的、真的不想在EM应用程序中使用睡眠。你阻塞了整个反应堆,什么也不会发生。)

如果你想发布消息,或者其他任何其他的EM动作发生(触发计时器,接收其他数据等),你必须让反应器循环。

您可以使用类似EM::Iterator的代码为您安排工作,如下所示:

代码语言:javascript
运行
复制
EM::Iterator.new(0..5).each do |n, iter|
  reply = {:reply => "Response #{n}", :is_last => (n == 5)}
  AMQP::Exchange.default.publish(MultiJson.encode(reply), 
                                 :routing_key => header.reply_to,
                                 :correlation_id => header.correlation_id)
  iter.next
end
header.ack
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/8271817

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档