Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >如何在Kafka nodejs中延迟消费消息?

如何在Kafka nodejs中延迟消费消息?
EN

Stack Overflow用户
提问于 2016-05-31 12:14:37
回答 2查看 910关注 0票数 0

我正在使用NodeJS消费来自Kafka的消息,收到消息后,我会带着它在Elasticsearch中创建索引。这是我的代码片段:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
kafkaConsumer.on('message', function (message) {
    elasticClient.index({
        index: 'test',
        type: 'sample',
        body: message
    }, function (error, response) {
        if (error) {

            // Stop consuming message here

            console.log(error);
        }
        console.log(response);
    });
});

我希望确保在继续使用下一条消息之前必须成功创建索引,因为我不希望丢失任何消息。

EN

回答 2

Stack Overflow用户

发布于 2016-05-31 12:51:47

试试consumer.pause()/resume()吧。

pause()暂停使用者

resume()恢复消费者

票数 0
EN

Stack Overflow用户

发布于 2016-05-31 12:55:00

您可以暂停您的消费者,这样它就不会订阅和获取新消息。一旦你得到elasticClient的响应,你就可以恢复你的kafka消费者了。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
kafkaConsumer.pause();
kafkaConsumer.resume();

根据您的设置使用它。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37545494

复制
相关文章
Flink消费kafka消息实战
构建kafka相关的环境不是本文重点,因此这里利用docker快速实现,步骤如下:
程序员欣宸
2019/05/29
5.2K0
Flink消费kafka消息实战
查看kafka消息消费情况
消息堆积是消费滞后(Lag)的一种表现形式,消息中间件服务端中所留存的消息与消费掉的消息之间的差值即为消息堆积量,也称之为消费滞后(Lag)量。 对于Kafka而言,消息被发送至Topic中,而Topic又分成了多个分区(Partition),每一个Partition都有一个预写式的日志文件,虽然Partition可以继续细分为若干个段文件(Segment),但是对于上层应用来说可以将Partition看成最小的存储单元(一个由多个Segment文件拼接的“巨型文件”)。 每个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中。我们来看下图,其就是Partition的一个真实写照:
chenchenchen
2022/05/07
2.5K0
查看kafka消息消费情况
使用storm trident消费kafka消息
storm通过保证数据至少被处理一次来保证数据的完整性,由于元祖可以重发,对于一些需要数据精确的场景,可以考虑用storm trident实现。 传统的事物型拓扑中存在几种bolt: 1.1 BasicBolt 这是最基本的Bolt,BasicBolt每次只能处理一个tuple,而且必须等前一个tuple成功处理后下一个tuple才能继续处理,显然效率不高。 1.2 BatchBolt storm的一个优势就是能够批量处理tuple,BatchBolt支持批量处理tuple,每一个batch中的t
Albert陈凯
2018/04/04
9170
Kafka 消息的生产消费方式
主要内容: 1. kafka 整体结构 2. 消息的生产方式 3. 消息的读取方式 整体结构 在 kafka 中创建 topic(主题),producer(生产者)向 topic 写入消息,consu
dys
2018/04/04
1.3K0
Kafka 消息的生产消费方式
如何在MQ中实现支持任意延迟的消息?
定时消息与延迟消息在代码配置上存在一些差异,但是最终达到的效果相同:消息在发送到 MQ 服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。
林一
2018/07/24
6.1K3
如何在MQ中实现支持任意延迟的消息?
消息队列之kafka的重复消费
Kafka 是对分区进行读写的,对于每一个分区的消费,都有一个 offset 代表消息的写入分区时的位置,consumer 消费了数据之后,每隔一段时间,会把自己消费过的消息的 offset 提交一下。表示已记录当当前的消费位置,从这里开始消费。
conanma
2022/04/11
1K0
Kafka消费者 之 如何进行消息消费
放弃不难,但坚持很酷~由于消费者模块的知识涉及太多,所以决定先按模块来整理知识,最后再进行知识模块汇总。
create17
2019/07/16
3.7K0
SpringBoot2.3整合RabbitMQ实现延迟消费消息
1.源码获取地址 文章末尾有源代码地址 https://www.sunnyblog.top/detail.html?id=1265257400324063232 本章节主要实现消息的延迟消费,在学
sunny1009
2020/06/19
8210
SpringBoot2.3整合RabbitMQ实现延迟消费消息
Kafka OffsetMonitor:监控消费者和延迟的队列
一个小应用程序来监视kafka消费者的进度和它们的延迟的队列。 KafkaOffsetMonitor是用来实时监控Kafka集群中的consumer以及在队列中的位置(偏移量)。 你可以查看当前的消费者组,每个topic队列的所有partition的消费情况。可以很快地知道每个partition中的消息是否 很快被消费以及相应的队列消息增长速度等信息。这些可以debug kafka的producer和consumer,你完全知道你的系统将 会发生什么。 这个web管理平台保留的partition offset和consumer滞后的历史数据(具体数据保存多少天我们可以在启动的时候配 置),所以你可以很轻易了解这几天consumer消费情况。 KafkaOffsetMonitor这款软件是用Scala代码编写的,消息等历史数据是保存在名为offsetapp.db数据库文件中,该数据 库是SQLLite文件,非常的轻量级。虽然我们可以在启动KafkaOffsetMonitor程序的时候指定数据更新的频率和数据保存 的时间,但是不建议更新很频繁,或者保存大量的数据,因为在KafkaOffsetMonitor图形展示的时候会出现图像展示过 慢,或者是直接导致内存溢出了。 所有的关于消息的偏移量、kafka集群的数量等信息都是从Zookeeper中获取到的,日志大小是通过计算得到的。 消费者组列表
加米谷大数据
2018/03/26
2.5K2
Kafka OffsetMonitor:监控消费者和延迟的队列
kafka中消费者消费消息之每个线程维护一个KafkaConsumer实例
3、kafka中消费者消费消息之每个线程维护一个KafkaConsumer实例:
别先生
2019/06/03
5330
Kafka的消息是如何被消费的?Kafka源码分析-汇总
Kafka的消息消费是以消费的group为单位; 同属一个group中的多个consumer分别消费topic的不同partition; 同组内consumer的变化, partition变化, coordinator的变化都会引发balance; 消费的offset的提交 Kafka wiki: Kafka Detailed Consumer Coordinator Design 和 Kafka Client-side Assignment Proposal ---- GroupMetadata类 所
扫帚的影子
2018/09/05
1.3K0
Kafka的消息是如何被消费的?Kafka源码分析-汇总
RabbitMQ延迟消费和重复消费
转载自 https://blog.csdn.net/quliuwuyiz/article/details/79301054
allsmallpig
2021/02/25
2.4K0
Kafka 消费线程模型在中通消息服务运维平台的应用
以上问题看出来这位朋友刚接触 Kafka,我们都知道 Kafka 相对 RocketMQ 来说,消费端是非常 “原生” 的,不像 RocketMQ 将消费线程模型都封装好,用户不用关注内部消费细节。
张乘辉
2020/07/07
1K0
Flink消费kafka如何获取每条消息对应的topic
1.首先自定义个 KafkaDeserializationSchema public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> { @Override //nextElement 是否表示流的最后一条元素,我们要设置为 false ,因为我们需要 msg 源源不断的被消费 public boolean isEndOfStream(Tuple2<
shengjk1
2020/03/26
2.4K0
kafka学习之消息的消费原理与存储(二)
在 kafka 中,topic 是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到 kafka 集群的消息都有一个类别。物理上来说,不同的 topic 的消息是分开存储的,每个 topic 可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息。
周杰伦本人
2022/10/25
5220
kafka学习之消息的消费原理与存储(二)
Apache Kafka-消费端_批量消费消息的核心参数及功能实现
https://kafka.apache.org/24/documentation.html#consumerconfigs
小小工匠
2021/08/17
2.9K0
第四十六章:SpringBoot & RabbitMQ完成消息延迟消费
在2018-3-1日SpringBoot官方发版了2.0.0.RELEASE最新版本,新版本完全基于Spring5.0来构建,JDK最低支持也从原来的1.6也改成了1.8,不再兼容1.8以下的版本,更多新特性请查看官方文档。 本章目标 基于SpringBoot整合RabbitMQ完成消息延迟消费。 构建项目 注意前言 由于SpringBoot的内置扫描机制,我们如果不自动配置扫描路径,请保持下面rabbitmq-common模块内的配置可以被SpringBoot扫描到,否则不会自动创建队列,控制台会输出4
恒宇少年
2018/06/27
8400
RabbitMQ 延迟队列,消息延迟推送
在上面两种场景中,如果我们使用下面两种传统解决方案无疑大大降低了系统的整体性能和吞吐量:
海向
2019/09/23
2.2K0
RabbitMQ 延迟队列,消息延迟推送
延迟消息处理
之前有这样一个需求,运营在后端配置一条系统消息或者营销活动等类型的消息等到了需要推送的时间以后会自动的将消息推送给用户APP端显示,一开始是采用的任务调度的方式(定时器),通过轮询扫表去做,因为具体什么时候推送消息没有固定的频率,固定的时间,因此需要每分钟扫表以避免消息在指定时间内未及时推送给APP端内.所以每次都是1分钟扫描一次,太过于频繁。所以不太适合(定时器适合那种固定频率或时间段处理)。
用户1215919
2021/12/28
8240
点击加载更多

相似问题

Spring kafka消息消费延迟

17

Apache Kafka中消费者消费消息的延迟

10

Kafka 1.0 Streaming API: partitions消息消费延迟

117

NodeJS Kafka消费者收到重复的消息?

121

Kafka不消费消息

130
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文