首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Go并发设计:RabbitMQ多队列消费者模式

Go并发设计:RabbitMQ多队列消费者模式

作者头像
运维开发王义杰
发布2023-08-10 17:00:43
发布2023-08-10 17:00:43
1.2K00
代码可运行
举报
运行总次数:0
代码可运行

在处理大规模的消息流时,一个常见的场景是从多个RabbitMQ队列中获取并处理消息。在这篇文章中,我们将讨论如何使用Go并发地从30个不同的RabbitMQ队列中接收消息,并为每个消息类型设计特定的处理程序。

首先,我们需要定义一个执行器接口和几个不同的执行器,每个执行器对应一种消息类型:

代码语言:javascript
代码运行次数:0
运行
复制
type MessageHandler interface {
  Handle(message string) error
}

type HandlerType1 struct{}
func (h *HandlerType1) Handle(message string) error {
  // 处理类型1的任务
  fmt.Println("HandlerType1 is processing message", message)
  return nil
}

type HandlerType2 struct{}
func (h *HandlerType2) Handle(message string) error {
  // 处理类型2的任务
  fmt.Println("HandlerType2 is processing message", message)
  return nil
}

// add more handlers as needed

然后,我们使用工厂模式创建特定类型的处理器:

代码语言:javascript
代码运行次数:0
运行
复制
type MessageHandlerFactory struct {
  handlers map[ExecType]MessageHandler
}

func NewMessageHandlerFactory() *MessageHandlerFactory {
  return &MessageHandlerFactory{
    handlers: map[ExecType]MessageHandler{
      ExecType1: &HandlerType1{},
      ExecType2: &HandlerType2{},
      // add more handlers as needed
    },
  }
}

func (f *MessageHandlerFactory) GetHandler(execType ExecType) MessageHandler {
  return f.handlers[execType]
}

使用工厂可以帮助我们降低代码冗余,并提高代码的可扩展性。当我们需要添加新的消息类型时,我们只需要添加新的处理器,并在工厂中注册即可。

接下来,我们需要并发地从多个RabbitMQ队列中接收消息。我们可以为每个队列创建一个消费者,并在一个独立的Go协程中运行:

代码语言:javascript
代码运行次数:0
运行
复制
type QueueConsumer struct {
  conn    *amqp.Connection
  channel *amqp.Channel
  done    chan error
}

// ... NewQueueConsumer, Shutdown, etc ...
func NewQueueConsumer(amqpURI string, queueName string) *QueueConsumer {
  conn, err := amqp.Dial(amqpURI)
  if err != nil {
    log.Fatalf("Failed to connect to RabbitMQ: %v", err)
  }

  ch, err := conn.Channel()
  if err != nil {
    log.Fatalf("Failed to open a channel: %v", err)
  }

  q, err := ch.QueueDeclare(
    queueName,
    true,
    false,
    false,
    false,
    nil,
  )
  if err != nil {
    log.Fatalf("Failed to declare a queue: %v", err)
  }

  msgs, err := ch.Consume(
    q.Name,
    "",
    true,
    false,
    false,
    false,
    nil,
  )
  if err != nil {
    log.Fatalf("Failed to register a consumer: %v", err)
  }

  consumer := &QueueConsumer{
    conn:    conn,
    channel: ch,
    done:    make(chan error),
  }

  go func() {
    for d := range msgs {
      // Handle the message here.
      // You can also decode the message based on its type and call the corresponding handler.
      fmt.Println("Received a message: ", string(d.Body))
    }
    close(consumer.done)
  }()

  return consumer
}

func (c *QueueConsumer) Shutdown() error {
  // Gracefully stop the consumption of messages.
  if err := c.channel.Cancel("", true); err != nil {
    return fmt.Errorf("consumer cancel failed: %s", err)
  }

  if err := c.conn.Close(); err != nil {
    return fmt.Errorf("AMQP connection close error: %s", err)
  }

  // Wait for the consumer to stop receiving messages.
  <-c.done

  return nil
}

func main() {
  var consumers []*QueueConsumer
  for i := 0; i < 30; i++ {
    queueName := fmt.Sprintf("queue%d", i)
    consumer := NewQueueConsumer("amqp://guest:guest@localhost:5672/", queueName)
    consumers = append(consumers, consumer)
  }

  defer func() {
    for _, consumer := range consumers {
      if err := consumer.Shutdown(); err != nil {
        log.Fatalf("Failed to shut down consumer cleanly: %v", err)
      }
    }
  }()

  select {}
}

这样,我们就可以同时处理多个队列的消息了。

最后,当我们接收到一个消息时,我们可以使用工厂来创建一个处理器,并让它处理这个消息:

代码语言:javascript
代码运行次数:0
运行
复制
for d := range msgs {
  message := string(d.Body)
  execType := getExecTypeFromMessage(message)

  handler := factory.GetHandler(execType)
  if handler != nil {
    handler.Handle(message)
  } else {
    // handle the error
  }
}

我们的设计有几个优点:

  • 使用Go协程可以让我们并发地处理多个队列的消息,充分利用多核CPU。
  • 使用工厂模式和处理器接口可以降低代码冗余,并使得代码更易于扩展。

不过,这只是一个简单的示例,你可能需要根据实际的需求来调整代码。例如,你可能需要使用goroutines来并发地处理多个消息,或者你可能需要使用channels和wait groups来控制goroutines的数量。你也可能需要添加更多的错误处理代码来处理可能发生的错误。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-07-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 运维开发王义杰 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档