前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka 安装与启动

Kafka 安装与启动

作者头像
smartsi
发布2019-11-26 18:18:59
9750
发布2019-11-26 18:18:59
举报
文章被收录于专栏:SmartSiSmartSiSmartSi

1. 下载代码

下载 2.3.0 版本并解压缩:

tar -zxvf kafka_2.12-2.3.0.tgz -C .

创建软连接便于升级:

ln -s kafka_2.12-2.3.0/ kafka

配置环境变量:

export KAFKA_HOME=/Users/smartsi/opt/kafka
export PATH=${KAFKA_HOME}/bin:$PATH

2. 安装ZooKeeper

Kafka 依赖 ZooKeeper,如果你还没有 ZooKeeper 服务器,你需要先启动一个 ZooKeeper 服务器。可以先参考ZooKeeper 安装与启动来安装 ZooKeeper。ZooKeeper 配置如下:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/Users/smartsi/opt/zookeeper/data
clientPort=2181
server.1=localhost:2888:3888

你也可以通过与 kafka 打包在一起的便捷脚本来快速简单地创建一个单节点 ZooKeeper 实例:

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

3. 配置Kafka

第一个 broker 配置 server-9092.properties 如下:

broker.id=0
listeners=PLAINTEXT://127.0.0.1:9092
log.dirs=/Users/smartsi/opt/kafka/logs/log1-9092
zookeeper.connect=localhost:2181/kafka-2.3.0
zookeeper.connection.timeout.ms=6000

运行起来至少要配置四项。上面的前四项。

第二个 broker 配置 server-9093.properties 如下:

broker.id=1
listeners=PLAINTEXT://127.0.0.1:9093
log.dirs=/Users/smartsi/opt/kafka/logs/log-9093
zookeeper.connect=localhost:2181/kafka-2.3.0
zookeeper.connection.timeout.ms=6000

第三个 broker 配置 server-9094.properties 如下:

broker.id=2
listeners=PLAINTEXT://127.0.0.1:9094
log.dirs=/Users/smartsi/opt/kafka/logs/log-9094
zookeeper.connect=localhost:2181/kafka-2.3.0
zookeeper.connection.timeout.ms=6000

我们必须重写端口和日志目录,因为我们在同一台机器上运行这些,我们不希望所有都在同一个端口注册,或者覆盖彼此的数据。所以用端口号9092、9093、9094分别代表三个 broker。

下面具体解释一下我们的配置项:

(1) Broker相关:

broker.id=0

broker 的 Id。每一个 broker 在集群中的唯一标示,要求是正数。每个 broker 都不相同。

(2) Socket服务设置:

listeners=PLAINTEXT://127.0.0.1:9092

Socket服务器监听的地址,如果没有设置,则监听 java.net.InetAddress.getCanonicalHostName() 返回的地址。

(3) ZooKeeper相关:

zookeeper.connect=localhost:2181/kafka-2.3.0
zookeeper.connection.timeout.ms=6000

zookeeper.connect 是一个逗号分隔的 host:port 键值对,每个对应一个 zk 服务器。例如 127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002。你还可以将可选的客户端命名空间 Chroot 字符串追加到 URL 上以指定所有 kafka 的 Znode 的根目录。另外这个 kafka-2.3.0 这个节点需要你提前建立。让 Kafka 把他需要的数据结构都建立在这个节点下,否则会建立在根节点 / 节点下。

(3) 日志相关:

log.dirs=/Users/smartsi/opt/kafka/logs/log-9092

Kafka存储Log的目录。

4. 启动Kafka服务器

有两种方式可以启动 Kafka 服务器:

# 第一种方式(推荐)
bin/kafka-server-start.sh -daemon config/server.properties
# 第二种方式
nohup bin/kafka-server-start.sh config/server.properties &

我们以第一种方式启动 Kafka 服务器:

bin/kafka-server-start.sh -daemon config/server-9092.properties
bin/kafka-server-start.sh -daemon config/server-9093.properties
bin/kafka-server-start.sh -daemon config/server-9094.properties

查看进程和端口:

smartsi:kafka smartsi$ jps
8914 DataNode
42802 Jps
9252 NodeManager
41253 Kafka
41541 Kafka
42790 Kafka
41670 ZooKeeperMain
16731
9164 ResourceManager
1997

我们现在看一下 Kafka 在 ZooKeeper 上创建的节点:

[zk: 127.0.0.1:2181(CONNECTED) 23] ls /kafka-2.3.0
[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

看一下我们在ZooKeeper上注册的两个 broker:

[zk: 127.0.0.1:2181(CONNECTED) 3] ls /kafka-2.3.0/brokers/ids
[0, 1, 2]
[zk: 127.0.0.1:2181(CONNECTED) 4] get /kafka-2.3.0/brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9092"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390121522","port":9092,"version":4}
cZxid = 0x92
ctime = Mon Sep 02 10:08:41 CST 2019
mZxid = 0x92
mtime = Mon Sep 02 10:08:41 CST 2019
pZxid = 0x92
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x100009088560012
dataLength = 188
numChildren = 0
[zk: 127.0.0.1:2181(CONNECTED) 5] get /kafka-2.3.0/brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9093"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390128813","port":9093,"version":4}
cZxid = 0xa7
ctime = Mon Sep 02 10:08:48 CST 2019
mZxid = 0xa7
mtime = Mon Sep 02 10:08:48 CST 2019
pZxid = 0xa7
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x100009088560014
dataLength = 188
numChildren = 0
[zk: 127.0.0.1:2181(CONNECTED) 6] get /kafka-2.3.0/brokers/ids/2
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9094"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390749151","port":9094,"version":4}
cZxid = 0xbd
ctime = Mon Sep 02 10:19:09 CST 2019
mZxid = 0xbd
mtime = Mon Sep 02 10:19:09 CST 2019
pZxid = 0xbd
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x100009088560018
dataLength = 188
numChildren = 0

5. 测试Kafka

5.1 创建Topic

让我们创建一个名为 test 的 Topic,它有一个分区和一个副本:

bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka-2.3.0 --replication-factor 1 --partitions 1 --topic test

现在我们可以运行 list 命令来查看这个 Topic:

smartsi:kafka smartsi$ bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka-2.3.0
test

或者,你也可将代理配置为:在发布的topic不存在时,自动创建topic,而不是手动创建。

5.2 启动生产者

Kafka 自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为消息发送到 Kafka 集群。默认情况下,每行将作为单独的消息发送。

运行 Producer (生产者),然后在控制台输入一些消息以发送到服务器:

smartsi:kafka smartsi$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>this is my first message
>this is my second message
5.3 启动消费者

Kafka 还有一个命令行 Consumer(消费者),将消息转储到标准输出:

smartsi:kafka smartsi$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
this is my first message
this is my second message

如果你将上述命令在不同的终端中运行,那么现在就可以将消息输入到生产者终端中,并将它们在消费终端中显示出来。

原文:Quickstart

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-09-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 下载代码
  • 2. 安装ZooKeeper
  • 3. 配置Kafka
  • 4. 启动Kafka服务器
  • 5. 测试Kafka
    • 5.1 创建Topic
      • 5.2 启动生产者
        • 5.3 启动消费者
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档