前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka 快速起步

Kafka 快速起步

作者头像
dys
发布2018-04-04 11:45:48
9470
发布2018-04-04 11:45:48
举报
文章被收录于专栏:性能与架构

主要内容: 1. kafka 安装、启动 2. 消息的 生产、消费 3. 配置启动集群 4. 集群下的容错测试 5. 从文件中导入数据,并导出到文件

单机示例

安装

代码语言:javascript
复制
tar -xzf kafka_2.10-0.10.1.1.tgz
cd kafka_2.10-0.10.1.1

启动

代码语言:javascript
复制
> bin/zookeeper-server-start.sh \
config/zookeeper.properties
> bin/kafka-server-start.sh \
config/server.properties

创建topic

打开一个新的终端窗口

代码语言:javascript
复制
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test

发送消息

打开一个新的终端窗口

代码语言:javascript
复制
bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test

进入输入模式,随意输入信息,例如:

代码语言:javascript
复制
hello world
hi

获取消息

打开一个新的终端窗口

代码语言:javascript
复制
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test \
--from-beginning

便会显示出刚才发送的两条消息:

代码语言:javascript
复制
hello world
hi

这时可以打开发送消息的终端窗口,输入新的信息,再返回来就可以看到自动接收到了新消息

配置集群

新建两个启动配置文件

代码语言:javascript
复制
> cp config/server.properties \
config/server-1.properties
> cp config/server.properties \
config/server-2.properties

修改 config/server-1.properties 的以下几项配置:

代码语言:javascript
复制
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=logs/kafka-logs-1

修改 config/server-2.properties 的以下几项配置:

代码语言:javascript
复制
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=logs/kafka-logs-2

启动

代码语言:javascript
复制
> bin/kafka-server-start.sh \
config/server-1.properties &
> bin/kafka-server-start.sh \
config/server-2.properties &

创建一个topic,设置3个复制

代码语言:javascript
复制
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 3 \
--partitions 1 \
--topic my-replicated-topic

发送消息

代码语言:javascript
复制
bin/kafka-console-producer.sh 
--broker-list localhost:9092 \
--topic my-replicated-topic

输入消息:

代码语言:javascript
复制
my test message 1
my test message 2

获取消息

代码语言:javascript
复制
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--from-beginning \
--topic my-replicated-topic

可以正常取得消息

容错测试

代码语言:javascript
复制
# 取得server1的进程号
ps aux | grep server-1.properties

# 杀掉进程
kill -9 43116

读取消息

代码语言:javascript
复制
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--from-beginning \
--topic my-replicated-topic

返回信息:

代码语言:javascript
复制
my test message 1
my test message 2

仍然可以正常取得消息

Kafka Connect

Kafka 中的 connecter 可以与外部系统进行连接,例如文件系统、数据库

下面实验一个简单文件系统交互,从一个文件中导入数据,然后导出到另一个文件中

创建一个测试文件,用于导入数据使用

代码语言:javascript
复制
echo -e "foo\nbar" > test.txt

启动 connect,执行数据的导入导出

代码语言:javascript
复制
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/connect-file-source.properties \
config/connect-file-sink.properties

命令执行后,会输出一系列的日志信息,等待执行完毕

查看导出结果

代码语言:javascript
复制
cat test.sink.txt

返回结果:

代码语言:javascript
复制
foo
bar 

成功导出了 test.txt 中的数据

过程分析

执行第2步的命令后,为什么是去读test.txt?为什么写入了test.sink.txt?中间的过程是什么样的?

原因是在于两个配置文件

config/connect-file-source.properties (导入配置)

代码语言:javascript
复制
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test  

file指定了是从test.txt中导入数据

topic指定了把数据发送到connect-test这个topic

connect-file-sink.properties(导出配置)

代码语言:javascript
复制
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

file指定了把数据导出到test.txt中导入数据

topic指定从connect-test这个topic中读取数据

查看一下connect-test这个topic

代码语言:javascript
复制
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic connect-test \
--from-beginning

结果为:

代码语言:javascript
复制
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

现在向test.txt中添加一条新数据:

代码语言:javascript
复制
echo "Another line" >> test.txt

再次执行 cat test.sink.txt 就会看到刚刚添加的数据:

代码语言:javascript
复制
foo
bar
Another line        
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-01-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 JAVA高性能架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 单机示例
    • 安装
      • 启动
        • 创建topic
          • 发送消息
            • 获取消息
            • 配置集群
              • 新建两个启动配置文件
                • 启动
                  • 创建一个topic,设置3个复制
                    • 发送消息
                      • 获取消息
                        • 容错测试
                        • Kafka Connect
                          • 创建一个测试文件,用于导入数据使用
                            • 启动 connect,执行数据的导入导出
                              • 查看导出结果
                                • 过程分析
                                领券
                                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档