前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >3.【kafka运维】Topic的生产和发送运维脚本(3)

3.【kafka运维】Topic的生产和发送运维脚本(3)

作者头像
石臻臻的杂货铺[同名公众号]
发布2021-08-03 15:25:44
5320
发布2021-08-03 15:25:44
举报
文章被收录于专栏:kafka专栏kafka专栏

文章目录

日常运维问题排查 怎么能够少了滴滴开源的 滴滴开源LogiKM一站式Kafka监控与管控平台

1.Topic的发送kafka-console-producer.sh

4.1 生产无key消息

代码语言:javascript
复制
## 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties

4.2 生产有key消息 加上属性--property parse.key=true

代码语言:javascript
复制
## 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties  --property parse.key=true

默认消息key与消息value间使用“Tab键”进行分隔,所以消息key以及value中切勿使用转义字符(\t)


可选参数

参数

值类型

说明

有效值

–bootstrap-server

String

要连接的服务器必需(除非指定–broker-list)

如:host1:prot1,host2:prot2

–topic

String

(必需)接收消息的主题名称

–batch-size

Integer

单个批处理中发送的消息数

200(默认值)

–compression-codec

String

压缩编解码器

none、gzip(默认值)snappy、lz4、zstd

–max-block-ms

Long

在发送请求期间,生产者将阻止的最长时间

60000(默认值)

–max-memory-bytes

Long

生产者用来缓冲等待发送到服务器的总内存

33554432(默认值)

–max-partition-memory-bytes

Long

为分区分配的缓冲区大小

16384

–message-send-max-retries

Integer

最大的重试发送次数

3

–metadata-expiry-ms

Long

强制更新元数据的时间阈值(ms)

300000

–producer-property

String

将自定义属性传递给生成器的机制

如:key=value

–producer.config

String

生产者配置属性文件[–producer-property]优先于此配置 配置文件完整路径

–property

String

自定义消息读取器

parse.key=true/false key.separator=ignore.error=true/false

–request-required-acks

String

生产者请求的确认方式

0、1(默认值)、all

–request-timeout-ms

Integer

生产者请求的确认超时时间

1500(默认值)

–retry-backoff-ms

Integer

生产者重试前,刷新元数据的等待时间阈值

100(默认值)

–socket-buffer-size

Integer

TCP接收缓冲大小

102400(默认值)

–timeout

Integer

消息排队异步等待处理的时间阈值

1000(默认值)

–sync

同步发送消息

–version

显示 Kafka 版本

不配合其他参数时,显示为本地Kafka版本

–help

打印帮助信息

2. Topic的消费kafka-console-consumer.sh

1. 新客户端从头消费--from-beginning (注意这里是新客户端,如果之前已经消费过了是不会从头消费的) 下面没有指定客户端名称,所以每次执行都是新客户端都会从头消费

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2. 正则表达式匹配topic进行消费--whitelist 消费所有的topic

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’

消费所有的topic,并且还从头消费

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’ --from-beginning

3.显示key进行消费--property print.key=true

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true

4. 指定分区消费--partition 指定起始偏移量消费--offset

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100

5. 给客户端命名--group

注意给客户端命名之后,如果之前有过消费,那么--from-beginning就不会再从头消费了

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group

6. 添加客户端属性--consumer-property

这个参数也可以给客户端添加属性,但是注意 不能多个地方配置同一个属性,他们是互斥的;比如在下面的基础上还加上属性--group test-group 那肯定不行

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-consumer-group

7. 添加客户端属性--consumer.config

--consumer-property 一样的性质,都是添加客户端的属性,不过这里是指定一个文件,把属性写在文件里面, --consumer-property 的优先级大于 --consumer.config

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties


参数

描述

例子

--group

指定消费者所属组的ID

--topic

被消费的topic

--partition

指定分区 ;除非指定–offset,否则从分区结束(latest)开始消费

--partition 0

--offset

执行消费的起始offset位置 ;默认值: latest; /latest /earliest /偏移量

--offset 10

--whitelist

正则表达式匹配topic;--topic就不用指定了; 匹配到的所有topic都会消费; 当然用了这个参数,--partition --offset等就不能使用了

--consumer-property

将用户定义的属性以key=value的形式传递给使用者

--consumer-propertygroup.id=test-consumer-group

--consumer.config

消费者配置属性文件请注意,[consumer-property]优先于此配置

--consumer.config config/consumer.properties

--property

初始化消息格式化程序的属性

print.timestamp=true,false 、print.key=true,false 、print.value=true,false 、key.separator= 、line.separator=、key.deserializer=、value.deserializer=

--from-beginning

从存在的最早消息开始,而不是从最新消息开始,注意如果配置了客户端名称并且之前消费过,那就不会从头消费了

--max-messages

消费的最大数据量,若不指定,则持续消费下去

--max-messages 100

--skip-message-on-error

如果处理消息时出错,请跳过它而不是暂停

--isolation-level

设置为read_committed以过滤掉未提交的事务性消息,设置为read_uncommitted以读取所有消息,默认值:read_uncommitted

--formatter

kafka.tools.DefaultMessageFormatter、kafka.tools.LoggingMessageFormatter、kafka.tools.NoOpMessageFormatter、kafka.tools.ChecksumMessageFormatter

More

Kafka专栏持续更新中…(源码、原理、实战、运维、视频、面试视频)


本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-08-02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 1.Topic的发送kafka-console-producer.sh
  • 2. Topic的消费kafka-console-consumer.sh
  • More
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档