我试图使用Bull框架将DTO发送到Redis队列,并在处理器中处理这些DTO。有时作业传递给处理器(100个中的一个),但大多数情况下由于出错而失败:job stalled more than allowable limit
和我不知道如何修复它。
我给你一个小介绍,下面你可以看到我的代码。我已经创建了queue-api
模块作为队列的包装器,例如order队列。然后,我将这个模块导入到我希望将DTO发布到队列中的模块,在我的例子中是order-module
。
队列-api模块文件
// queue-api.module.ts
@Module({
imports: [
BullModule.registerQueue(
{
name: 'order-queue',
defaultJobOptions: {
backoff: 10000,
attempts: Number.MAX_SAFE_INTEGER,
},
},
),
...
],
providers: [OrderQueue],
exports: [OrderQueue],
})
export class QueueApiModule {}
// order-queue.ts
@Injectable()
export class OrderQueue extends AbstractQueue {
constructor(
@InjectQueue('order-queue')
private readonly queue: Queue,
) {}
async sendSubmitMail(dto: SendSubmitMailDto): Promise<void> {
const job = await this.queue.add('send-submit-mail', dto)
console.log(`Job ${job.id} created.`)
}
}
订单模块文件
// order.module.ts
@Module({
imports: [
QueueApiModule,
...
],
providers: [
OrderProcessor,
...
]
})
export class OrderModule {}
// order-processor.ts
@Processor('order-queue')
export class OrderProcessor {
constructor(private readonly queue: OrderQueue) {}
@Process('send-submit-mail')
async onProcessSubmitMail(job: Job): Promise<void> {
console.log(`Processing of job ${job.id}`)
}
}
这个处理器处理程序几乎从未被调用过。
你知道我的代码有什么问题吗?谢谢你的建议。
发布于 2021-12-26 14:26:10
我也遇到了类似的问题,但还没有找到根本原因。但在此期间,我使用了公牛回购(npm Repl) cli来查看队列状态。当出现停滞错误时,在此之后将不会触发任何作业(似乎队列被卡在失败的作业上)。如果在牛市中运行统计数据,您将看到有一个处于活动状态的作业。您可以手动删除它(使用公牛-repl),然后您将运行下一个作业。我怀疑QueueScheduler没有运行,因此停滞的作业没有得到处理。您还可以增加暂停参数(其中有2-3个),检查https://docs.bullmq.io/bull/important-notes)是否有帮助。在我的例子中,当我在调试中暂停时,锁就会发生。
https://stackoverflow.com/questions/70261732
复制相似问题