
在处理大规模的消息流时,一个常见的场景是从多个RabbitMQ队列中获取并处理消息。在这篇文章中,我们将讨论如何使用Go并发地从30个不同的RabbitMQ队列中接收消息,并为每个消息类型设计特定的处理程序。
首先,我们需要定义一个执行器接口和几个不同的执行器,每个执行器对应一种消息类型:
type MessageHandler interface {
Handle(message string) error
}
type HandlerType1 struct{}
func (h *HandlerType1) Handle(message string) error {
// 处理类型1的任务
fmt.Println("HandlerType1 is processing message", message)
return nil
}
type HandlerType2 struct{}
func (h *HandlerType2) Handle(message string) error {
// 处理类型2的任务
fmt.Println("HandlerType2 is processing message", message)
return nil
}
// add more handlers as needed
然后,我们使用工厂模式创建特定类型的处理器:
type MessageHandlerFactory struct {
handlers map[ExecType]MessageHandler
}
func NewMessageHandlerFactory() *MessageHandlerFactory {
return &MessageHandlerFactory{
handlers: map[ExecType]MessageHandler{
ExecType1: &HandlerType1{},
ExecType2: &HandlerType2{},
// add more handlers as needed
},
}
}
func (f *MessageHandlerFactory) GetHandler(execType ExecType) MessageHandler {
return f.handlers[execType]
}
使用工厂可以帮助我们降低代码冗余,并提高代码的可扩展性。当我们需要添加新的消息类型时,我们只需要添加新的处理器,并在工厂中注册即可。
接下来,我们需要并发地从多个RabbitMQ队列中接收消息。我们可以为每个队列创建一个消费者,并在一个独立的Go协程中运行:
type QueueConsumer struct {
conn *amqp.Connection
channel *amqp.Channel
done chan error
}
// ... NewQueueConsumer, Shutdown, etc ...
func NewQueueConsumer(amqpURI string, queueName string) *QueueConsumer {
conn, err := amqp.Dial(amqpURI)
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
q, err := ch.QueueDeclare(
queueName,
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
consumer := &QueueConsumer{
conn: conn,
channel: ch,
done: make(chan error),
}
go func() {
for d := range msgs {
// Handle the message here.
// You can also decode the message based on its type and call the corresponding handler.
fmt.Println("Received a message: ", string(d.Body))
}
close(consumer.done)
}()
return consumer
}
func (c *QueueConsumer) Shutdown() error {
// Gracefully stop the consumption of messages.
if err := c.channel.Cancel("", true); err != nil {
return fmt.Errorf("consumer cancel failed: %s", err)
}
if err := c.conn.Close(); err != nil {
return fmt.Errorf("AMQP connection close error: %s", err)
}
// Wait for the consumer to stop receiving messages.
<-c.done
return nil
}
func main() {
var consumers []*QueueConsumer
for i := 0; i < 30; i++ {
queueName := fmt.Sprintf("queue%d", i)
consumer := NewQueueConsumer("amqp://guest:guest@localhost:5672/", queueName)
consumers = append(consumers, consumer)
}
defer func() {
for _, consumer := range consumers {
if err := consumer.Shutdown(); err != nil {
log.Fatalf("Failed to shut down consumer cleanly: %v", err)
}
}
}()
select {}
}
这样,我们就可以同时处理多个队列的消息了。
最后,当我们接收到一个消息时,我们可以使用工厂来创建一个处理器,并让它处理这个消息:
for d := range msgs {
message := string(d.Body)
execType := getExecTypeFromMessage(message)
handler := factory.GetHandler(execType)
if handler != nil {
handler.Handle(message)
} else {
// handle the error
}
}
我们的设计有几个优点:
不过,这只是一个简单的示例,你可能需要根据实际的需求来调整代码。例如,你可能需要使用goroutines来并发地处理多个消息,或者你可能需要使用channels和wait groups来控制goroutines的数量。你也可能需要添加更多的错误处理代码来处理可能发生的错误。