我使用一个NestJS应用程序来使用一个RabbitMQ队列。无论订单如何,都可以处理每条消息,所以我想知道为同一队列声明新消费者的最佳做法是什么。
预期行为:队列由该服务处理,该服务使用多个使用者
队列: 1,2,3,4,5,6,...N;
在nestJS中,您可以使用@RabbitSubscribe
装饰器分配一个函数来处理数据。我想要做的事情可以通过简单地用装饰器复制(并重命名)函数来实现,所以这个函数也会被调用来处理队列中的数据。
@RabbitSubscribe({
...
queue: 'my-queue',
})
async firstSubscriber(data){
// 1, 3, 5...
}
@RabbitSubscribe({
...
queue: 'my-queue',
})
async secondSubscriber(data){
// 2, 4, 6...
}
我知道我可以在水平上重复这个项目和规模,但是我更喜欢在相同的过程中这样做。
我如何通过编程声明订阅者获得同样的行为,这样我就可以使用更多的并发处理来处理数据了?
发布于 2021-08-09 04:55:05
如果您使用@golevelup/nestjs包来支持不同的消息、Queque消耗量,如果您的应用程序是混合的,您将从中受益。首次安装
npm i @golevelup/nestjs-rabbitmq
那么您的nestjs应用程序结构应该如下所示
src --
|
app.module.ts
main.ts
app.module.ts
someHttpModule1 --
|
someHttpModule1.controller.ts
someHttpModule1.module.ts
someHttpModule1.service.ts
...
someHttpModule2 --
|
someHttpModule2.controller.ts
someHttpModule2.module.ts
someHttpModule2.service.ts
...
...
// Events module is designed for consuming messages from rabbitmq
events --
|
events.module.ts
someEventConsumerModule1 --
|
someEventConsumerModule1.module.ts
someEventConsumerModule1.service.ts
someEventConsumerModule2 --
|
someEventConsumerModule2.module.ts
someEventConsumerModule2.service.ts
...
在src/app.module.ts文件中
// module imports
import { SomeHttpModule1 } from './someHttpModule1/someHttpModule1.module'
import { SomeHttpModule2 } from './someHttpModule2/someHttpModule.module'
import { EventsModule } from './events/events.module'
// and other necessery modules
@Module(
imports: [
SomeHttpModule1,
SomeHttpModule2,
EventsModule,
// and other dependent modules
],
controller: [],
providers: []
})
export class AppModule{}
在你的events.module.ts文件中
// imports
import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq'
import { SomeEventConsumerModule1 } from './someEventConsumerModule1/someEventConsumerModule1.module'
import { SomeEventConsumerModule2 } from './someEventConsumerModule2/someEventConsumerModule2.module'
// and so on
@Module({
imports: [
RabbitMQModule.forRoot(RabbitMQModule, {
exchanges: [{
name: 'amq.direct',
type: 'direct' // check out docs for more information on exchange types
}],
uri: 'amqp://guest:guest@localhost:5672', // default login and password is guest, and listens locally to 5672 port in amqp protocol
connectionInitOptions: { wait: false }
}),
SomeEventConsumerModule1,
SomeEventConsumerModule2,
// ... and other dependent consumer modules
]
})
export class EventsModule {}
下面是一个消费者模块(someEventConsumerModule1.module.ts)的示例
// imports
import { SomeEventConsumerModule1Service } from './someEventConsumerModule1.service'
// ...
@Module({
imports: [
SomeEventConsumerModule1,
// other modules if injected
],
providers: [
SomeEventConsumerModule1Service
]
})
export class SomeEventConsumerModule1 {}
以及在服务文件中放置业务逻辑如何处理消息。
// imports
import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'
import { Injectable } from '@nestjs/common'
import { ConsumeMessage, Channel } from 'amqplib' // for type safety you will need to install package first
// ... so on
@Injectable()
export class SomeEventConsumerModule1Service {
constructor(
// other module services if needs to be injected
) {}
@RabbitSubscribe({
exchange: 'amq.direct',
routingKey: 'direct-route-key', // up to you
queue: 'queueNameToBeConsumed',
errorHandler: (channel: Channel, msg: ConsumeMessage, error: Error) => {
console.log(error)
channel.reject(msg, false) // use error handler, or otherwise app will crush in not intended way
}
})
public async onQueueConsumption(msg: {}, amqpMsg: ConsumeMessage) {
const eventData = JSON.parse(amqpMsg.content.toString())
// do something with eventData
console.log(`EventData: ${eventData}, successfully consumed!`)
}
// ... and in the same way
}
https://stackoverflow.com/questions/68255418
复制相似问题