首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >以编程方式在RabbitMQ / Node.js中声明NestJS使用者?

以编程方式在RabbitMQ / Node.js中声明NestJS使用者?
EN

Stack Overflow用户
提问于 2021-07-05 11:33:04
回答 1查看 3.3K关注 0票数 0

我使用一个NestJS应用程序来使用一个RabbitMQ队列。无论订单如何,都可以处理每条消息,所以我想知道为同一队列声明新消费者的最佳做法是什么。

预期行为:队列由该服务处理,该服务使用多个使用者

队列: 1,2,3,4,5,6,...N;

在nestJS中,您可以使用@RabbitSubscribe装饰器分配一个函数来处理数据。我想要做的事情可以通过简单地用装饰器复制(并重命名)函数来实现,所以这个函数也会被调用来处理队列中的数据。

代码语言:javascript
运行
复制
  @RabbitSubscribe({
    ...
    queue: 'my-queue',
  })
  async firstSubscriber(data){
  // 1, 3, 5...
  }


 @RabbitSubscribe({
    ...
    queue: 'my-queue',
  })
  async secondSubscriber(data){
  // 2, 4, 6...
  }

我知道我可以在水平上重复这个项目和规模,但是我更喜欢在相同的过程中这样做。

我如何通过编程声明订阅者获得同样的行为,这样我就可以使用更多的并发处理来处理数据了?

EN

回答 1

Stack Overflow用户

发布于 2021-08-09 04:55:05

如果您使用@golevelup/nestjs包来支持不同的消息、Queque消耗量,如果您的应用程序是混合的,您将从中受益。首次安装

代码语言:javascript
运行
复制
npm i @golevelup/nestjs-rabbitmq

那么您的nestjs应用程序结构应该如下所示

代码语言:javascript
运行
复制
src --
     |
     app.module.ts
     main.ts
     app.module.ts
     someHttpModule1 --
                      |
                      someHttpModule1.controller.ts
                      someHttpModule1.module.ts
                      someHttpModule1.service.ts
                      ...
     someHttpModule2 --
                      |
                      someHttpModule2.controller.ts
                      someHttpModule2.module.ts
                      someHttpModule2.service.ts
                      ...
     ...
     // Events module is designed for consuming messages from rabbitmq
     events --
             |
             events.module.ts
             someEventConsumerModule1 --
                                       |
                                       someEventConsumerModule1.module.ts
                                       someEventConsumerModule1.service.ts
             someEventConsumerModule2 --
                                       |
                                       someEventConsumerModule2.module.ts
                                       someEventConsumerModule2.service.ts
             ...

在src/app.module.ts文件中

代码语言:javascript
运行
复制
// module imports
import { SomeHttpModule1 } from './someHttpModule1/someHttpModule1.module'
import { SomeHttpModule2 } from './someHttpModule2/someHttpModule.module'
import { EventsModule } from './events/events.module'
// and other necessery modules

@Module(
  imports: [
    SomeHttpModule1, 
    SomeHttpModule2,
    EventsModule, 
    // and other dependent modules
  ],
  controller: [],
  providers: []
})

export class AppModule{}

在你的events.module.ts文件中

代码语言:javascript
运行
复制
// imports
import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq'
import { SomeEventConsumerModule1 } from './someEventConsumerModule1/someEventConsumerModule1.module'
import { SomeEventConsumerModule2 } from './someEventConsumerModule2/someEventConsumerModule2.module'
// and so on

@Module({
  imports: [
    RabbitMQModule.forRoot(RabbitMQModule, {
      exchanges: [{
        name: 'amq.direct',
        type: 'direct' // check out docs for more information on exchange types
      }],
      uri: 'amqp://guest:guest@localhost:5672', // default login and password is guest, and listens locally to 5672 port in amqp protocol
      connectionInitOptions: { wait: false }
    }),
    SomeEventConsumerModule1,
    SomeEventConsumerModule2,
    // ... and other dependent consumer modules 
  ]
})
export class EventsModule {}

下面是一个消费者模块(someEventConsumerModule1.module.ts)的示例

代码语言:javascript
运行
复制
// imports
import { SomeEventConsumerModule1Service } from './someEventConsumerModule1.service'
// ...

@Module({
  imports: [
    SomeEventConsumerModule1,
    // other modules if injected
  ],
  providers: [
    SomeEventConsumerModule1Service
  ]
})
export class SomeEventConsumerModule1 {}

以及在服务文件中放置业务逻辑如何处理消息。

代码语言:javascript
运行
复制
// imports
import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'
import { Injectable } from '@nestjs/common'
import { ConsumeMessage, Channel } from 'amqplib' // for type safety you will need to install package first
// ... so on

@Injectable()
export class SomeEventConsumerModule1Service {
  constructor(
    // other module services if needs to be injected
  ) {}
  @RabbitSubscribe({
    exchange: 'amq.direct',
    routingKey: 'direct-route-key', // up to you
    queue: 'queueNameToBeConsumed',
    errorHandler: (channel: Channel, msg: ConsumeMessage, error: Error) => {
      console.log(error)
      channel.reject(msg, false) // use error handler, or otherwise app will crush in not intended way
    }
  })
  public async onQueueConsumption(msg: {}, amqpMsg: ConsumeMessage) {
    const eventData = JSON.parse(amqpMsg.content.toString())
    // do something with eventData
    console.log(`EventData: ${eventData}, successfully consumed!`)
  }
  // ... and in the same way
}
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68255418

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档