前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka(二)Kafka快速入门

kafka(二)Kafka快速入门

作者头像
用户1483438
发布2022-04-10 09:23:43
6410
发布2022-04-10 09:23:43
举报
文章被收录于专栏:大数据共享大数据共享

集群部署

  1. 配置 server.properties
代码语言:javascript
复制
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能,当前版本此配置默认为true,已从配置文件移除
delete.topic.enable=true
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

其他服务器一样配置

  1. 启动集群
代码语言:javascript
复制
bin/kafka-server-start.sh -daemon config/server.properties

其他服务器一样。

Kafka 命令行操作

topic 操作

脚本 kafka]$ bin\kafka-topics.sh 命令选项

选项

描述

--alter

更改分区数,副本分配,和/或主题的配置。

--at-min-isr-partitions

如果在描述主题时设置,则仅显示 isr 计数为的分区等于配置的最小值。 不是支持 --zookeeper 选项。

--bootstrap-server <String: server to connect to>

必需:要连接的 Kafka 服务器。 如果提供此项,则不需要直接的 Zookeeper 连接。

--command-config <String: command config property file>

包含要传递给管理客户端的配置的属性文件。 这仅与 --bootstrap-server 选项一起用于描述和更改代理配置。

--config <String: name=value>

--create

创建一个新的topic

--delete

删除一个topic

--delete-config <String: name>

要为现有主题删除的主题配置覆盖(请参阅 --config 选项下的配置列表)。 不支持 --bootstrap-server 选项。

--describe

列出给定主题的详细信息。

--disable-rack-aware

禁用机架感知副本分配

--exclude-internal

运行 list 或 describe 命令时排除内部主题。 默认会列出内部主题

--force

禁止控制台提示

--help

打印帮助信息。

--if-exists

如果在更改或删除或描述主题时设置,则该操作仅在主题存在时执行。 不支持 --bootstrap-server 选项。

--if-not-exists

如果在创建主题时设置,则只有在主题不存在时才会执行操作。 不支持 --bootstrap- 服务器选项。

--list

列出所有可用的topic。

--partitions <Integer: # of partitions>

设置topic 分区数

--replication-factor <Integer:replication factor>

指定topic的副本数

--topic <String: topic>

指定topic 名称

--topics-with-overrides

如果在描述主题时设置,则仅显示已覆盖配置的主题

--unavailable-partitions

如果在描述主题时设置,则只显示其领导者不可用的分区

--under-min-isr-partitions

如果在描述主题时设置,则仅显示 isr 计数小于配置的最小值的分区。 不支持 --zookeeper 选项。

--under-replicated-partitions

如果在描述主题时设置,则仅显示在复制分区下

--version

展示Kafka版本

--zookeeper <String: hosts>

已弃用,zookeeper 连接的连接字符串,格式为 host:port。 可以提供多个主机以允许故障转移。

案例

  1. 创建一个 topic 语法:kafka-topics.sh --create --zookeeper <host>:<port> --if-not-exists --replication-factor <副本数> --partitions <分区数> --topic <副本名称>
代码语言:javascript
复制
bin]$ kafka-topics.sh --create --zookeeper hadoop102:2181 --if-not-exists --replication-factor 3 --partitions 3 --topic test
#输出结果
Created topic test.
  1. 查看当前服务器中的所有 topic 语法: kafka-topics.sh --zookeeper <host>:<port> --list
代码语言:javascript
复制
bin]$ kafka-topics.sh --zookeeper hadoop102:2181 --list
# 输出结果
__consumer_offsets
abc
test
  1. 删除一个topic 语法:kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic test 需要server.properties中设置delete.topic.enable=true否则只是标记删除。
代码语言:javascript
复制
bin]$ kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic test
# 输出结果
Topic test is marked for deletion. # 并不会马上删除,而是先对该topic做一个标记,后面再进行删除
#需要在 配置中设置 delete.topic.enable=true ,否则不会进行删除
Note: This will have no impact if delete.topic.enable is not set to true.
  1. 查看 topic 详情 语法:--describe
代码语言:javascript
复制
[atguigu@hadoop102 bin]$ kafka-topics.sh  --describe --bootstrap-server hadoop102:9092 --topic abc
#  topic  abc 详细信息
Topic: abc  PartitionCount: 1   ReplicationFactor: 3    Configs: segment.bytes=1073741824
    Topic: abc  Partition: 0    Leader: 1   Replicas: 2,0,1 Isr: 1,2,0

参数

描述

Topic

topic名称

PartitionCount

分区数

ReplicationFactor

定义的分区数

Configs

配置

Partition

当前分区位置

Leader

当前那个broker为Leader

Replicas

副本位置

Isr

lsr同步队列


producer 操作

脚本 kafka]$ bin\kafka-console-producer.sh 命令选项

选项

描述

--batch-size <Integer: size>

如果消息不是同步发送的,则要在单个批次中发送的消息数。 (默认值:200)

--broker-list <String: broker-list>

链接Kafka,必需:采用 HOST1:PORT1,HOST2:PORT2 形式的代理列表字符串。

--compression-codec [String: compression-codec]

支持的压缩方式'none', 'gzip', 'snappy', 'lz4', or 'zstd'. 默认 'gzip'

--help

打印帮助信息

--line-reader <String: reader_class>

用于从标准输入读取行的类的类名。默认情况下,每行都作为单独的消息读取。 (默认:kafka.tools.ConsoleProducer$LineMessageReader)

--max-block-ms <Long: >

生产者发送的最大时间(默认:60000)

--max-memory-bytes <Long: >

缓冲大小,以字节为单位 (默认:33554432)

--max-partition-memory-bytes <Long: The buffer size allocated for a memory in bytes per partition>

合并数据的最小数 (默认: 16384)

--message-send-max-retries <Integer>

退休数,默认为3

--metadata-expiry-ms <Long:>

强制刷新数据条数默认为300000,元数据以毫秒为单位的过期间隔时间段

--producer-property <String>

传递用户定义的Producer_Prop的机制

--producer.config <String: config file>

指定配置文件。 请注意, [producer-property] 优先于此配置。

--property <String: prop>

一种将用户定义的属性以 key=value 的形式传递给消息阅读器的机制。 这允许对用户定义的消息阅读器进行自定义配置。

--request-required-acks <String:>

设置ack(确认收到)的三种模式(0,1,-1),默认为1

--request-timeout-ms <Integer:>

设置ack 的超时时间(单位毫秒)默认为 1500

--retry-backoff-ms <Integer>

等待选举时间,默认为100)

--socket-buffer-size <Integer: size>

设置 tcp RECV 大小(默认: 102400)

--sync

设置为同步的

--timeout <Integer: timeout_ms>

如果设置和生产者运行异步模式,这给一条消息的最长时间是否有足够的队列等待批处理大小。该值以ms为单位。(默认:1000)

--topic <String: topic>

生产的消息发送给定的主题

--version

显示Kafka版本

  1. 发送消息 语法:kafka-console-producer.sh --broker-list <kafkaIP1>:<端口> <kafkaIP2>:<端口> --topic <topic名称>
代码语言:javascript
复制
bin]$ kafka-console-producer.sh --broker-list hadoop102:9092 hadoop103:9092 --topic abc
#输出
>hello

hadoop102 接收 topic abc 消息

代码语言:javascript
复制
[admin@hadoop102 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic abc
#接收生产者推送的消息
hello

hadoop103 接收 topic abc 消息

代码语言:javascript
复制
[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc
#接收生产者推送的消息
hello

consumer操作

脚本 kafka]$ bin/kafka-console-consumer.sh 命令选项

选项

描述

--bootstrap-server <String: server to connect to>

需:要连接的服务器。

--consumer-property <String: consumer_prop>

一种将用户定义的属性以 key=value 的形式传递给消费者的机制。

--consumer.config <String: config file>

consumer配置属性文件。 请注意, [consumer-property] 优先于此配置。

--enable-systest-events

记录消费者的消息及生命周期,用于系统测试

--formatter <String: class>

用于格式化 kafka 消息以供显示的类的名称。 (默认:kafka.tools.DefaultMessageFormatter)

--from-beginning

如果消费者还没有一个既定的偏移量来消费,那么从日志中出现的最早的消息而不是最新的消息开始。

--group <String: consumer group id>

消费者的消费者组ID。

--help

打印帮助信息

--isolation-level <String>

设置为 read_committed 以过滤掉未提交的事务消息。 设置为 read_uncommitted 以读取所有消息。 (默认值:read_uncommitted)

--key-deserializer <String: deserializer for key>

设置 密钥的解串器

--max-messages <Integer: num_messages>

退出前消费的最大消息数。 如果未设置,则消耗是连续的。

--offset <String: consume offset>

要消耗的偏移量 id(非负数),或 'earliest' 表示从开始,或 'latest' 表示从结束(默认值:latest)

--partition <Integer: partition>

要消费的分区。 除非指定了“--offset”,否则消耗从分区的末尾开始。

--property <String: prop>

初始化消息格式化程序的属性

--skip-message-on-error

如果在处理消息时出现错误,请跳过而不是暂停。

--timeout-ms <Integer: timeout_ms>

如果指定,则在指定的时间间隔内没有可供消费的消息时退出。要消费的主题 ID。

--value-deserializer <String: deserializer for values>

值的解串器

--version

显示Kafka版本

--whitelist <String: whitelist>

指定要包含以供使用的主题白名单的正则表达式。

案例

  1. 消费消息 语法:kafka-console-consumer.sh --bootstrap-server <host>:<post> --topic <topic名称>
代码语言:javascript
复制
[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc
#接收生产者推送的消息
hello
  1. 消费所有的消息 语法:--from-beginning
代码语言:javascript
复制
[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc --from-beginning
#接收生产者推送的消息
sh
nihao
发哦那旮
ka
niha
hdalfajkl
你好
股东大法师
hello
python
hello
haoh
hello
hello
hflahfla
flajklfja
flajla
afadf

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 集群部署
  • Kafka 命令行操作
    • topic 操作
      • producer 操作
        • consumer操作
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档