前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >CKafka系列学习文章 - 搭建单机模式zookeeper+kafka(十四)

CKafka系列学习文章 - 搭建单机模式zookeeper+kafka(十四)

原创
作者头像
发哥说消息队列
修改2019-09-18 10:47:31
8270
修改2019-09-18 10:47:31
举报

导语:搭建单机模式的zookeeper+kafka,用来做开发测试环境,管理主题、分区、生产消费及主题数据的导入导出。

一、安装JDK1.8

下载地址:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

mv jdk-8u221-linux-x64.tar.gz /usr/local/jdk/

二、单机模式的zookeeper

下载地址:http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

解压:

Zookeeper配置文件:

代码语言:javascript
复制
[root@VM_1_250_centos zookeeper]# cat conf/zoo.cfg 

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/opt/zookeeperData/zookeeper
dataLogDir=/opt/zookeeperData/logs
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1
minSessionTimeout=4000
maxSessionTimeout=10000
server.1=10.1.1.250:2888:3888

###加入环境变量

代码语言:javascript
复制
[root@VM_1_250_centos zookeeper]# cat ~/.bash_profile 
# .bash_profile
# Get the aliases and functions
if [ -f ~/.bashrc ]; then
	. ~/.bashrc
fi
# User specific environment and startup programs
export ZK_HOME=/opt/zookeeper
export PATH=$PATH:$ZK_HOME/bin
PATH=$PATH:$HOME/bin
export PATH

加载环境变量:source ~/.bash_profile

###启动zookeeper

三、单机模式的kafka

下载地址:https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.10-0.10.2.0.tgz

解压:

###加入环境变量

代码语言:javascript
复制
[root@VM_1_250_centos zookeeper]# cat ~/.bash_profile 
# .bash_profile
# Get the aliases and functions
if [ -f ~/.bashrc ]; then
	. ~/.bashrc
fi
# User specific environment and startup programs
export ZK_HOME=/opt/zookeeper
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin:$ZK_HOME/bin
PATH=$PATH:$HOME/bin
export PATH

加载环境变量:source ~/.bash_profile

启动kafka:

代码语言:javascript
复制
[root@VM_1_250_centos jdk]# kafka-server-start.sh /opt/kafka/config/server.properties &

四、管理主题:

1、创建主题

代码语言:javascript
复制
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 6 --topic ip_login

查看在zookeeper中的元数据:

[root@VM_1_250_centos kafka-logs]# zkCli.sh -server localhost:2181

Connecting to localhost:2181

2、查看主题

查看ip_login主题的详细信息

代码语言:javascript
复制
kafka-topics.sh --describe --zookeeper localhost:2181 --topic ip_login

查看所有的主题:

代码语言:javascript
复制
kafka-topics.sh --describe --zookeeper localhost:2181

查看所有主题名:

代码语言:javascript
复制
kafka-topics.sh --list --zookeeper localhost:2181

查看正在同步的主题:

代码语言:javascript
复制
kafka-topics.sh --describe --zookeeper localhost:2181 --under-replicated-partitions

查看主题中不可用的分区:

代码语言:javascript
复制
kafka-topics.sh --describe --zookeeper localhost:2181 --unavailable-partitions

查看主题重写的配置:

代码语言:javascript
复制
kafka-topics.sh --describe --zookeeper localhost:2181 --topics-with-overrides

3、修改主题

代码语言:javascript
复制
 kafka-topics.sh --alter --zookeeper localhost:2181  --topic user_order1 --config max.message.bytes=204800

4、删除主题

代码语言:javascript
复制
kafka-topics.sh  --zookeeper localhost:2181 --delete  --topic user_order1

五、管理分区和副本

1,修改分区

代码语言:javascript
复制
kafka-topics.sh --partitions 8 --alter --zookeeper localhost:2181 --topic ip_login

2、修改副本数(单机模式只能有1个副本)

代码语言:javascript
复制
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 3 --topic user_order3
代码语言:javascript
复制
cat user_order3_replicas.json
{
“partitions”:
   [
       {
           “topic”:”user_order3”,
           “partition”:0,
           “replicas”:[2,0,1]
            },
       {
           “topic”:”user_order3”,
           “partition”:1,
           “replicas”:[0,1,2]
            },
       {
           “topic”:”user_order3”,
           “partition:2,
           “replicas”:[1,2,0]
            },
       {
           “topic”:”user_order3”,
           “partition”:3,
           “replicas”:[2,1,0]
            },
       {
           “topic”:”user_order3”,
           “partition”:4,
           “replicas”:[0,2,1]
            },
       {
           “topic”:”user_order3”,
           “partition”:5,
           “replicas”:[1,0,2]
            },
],
“version”:1
}

###执行副本修改操作

代码语言:javascript
复制
kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file user_order3_replicas.json –execute

###执行验证操作

代码语言:javascript
复制
kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file user_order3_replicas.json –verify

六、生产消息

代码语言:javascript
复制
kafka-console-producer.sh --broker-list localhost:9092 --topic ip_login

七、消费消息

###用新接口启动消费者程序

代码语言:javascript
复制
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ip_login

###指定消费者组

代码语言:javascript
复制
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ip_login --consumer-property group.id=console-consumer-54466

###查看消费者组

代码语言:javascript
复制
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer

###用旧接口启动消费者程序(和新的接口不同:新的代理IP和端口,旧的用的是zookeeper的IP和端口)

代码语言:javascript
复制
kafka-console-consumer.sh --zookeeper localhost:9092 --topic ip_login --consumer-property group.id=console-consumer-54466 --from-beginning --delete-consumer-offsets

上述消费者命令中,各个参数所代表含义如下:

--zookeeper:Zookeeper连接地址,用来获取Kafka元数据信息;

--topic: Kafka集群中的主题名

--consumer-property:配置消费者级别参数,比如自定义设置消费者组名

--from-beginning: 从消息记录最开始的位置开始“消费”

--delete-consumer-offsets: 删除Zookeeper中已消费的偏移量

八、将数据导入到Kafka主题中

###导入导出都要用客户端

代码语言:javascript
复制
[root@VM_1_250_centos kafka]# cat config/connect-file-source.properties 

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/test.txt
topic=ip_login

###创建导入文件

##启动一个单机模式的连接器

./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties

导入成功!
导入成功!

九、将Kafka主题中的数据导出到文件

./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-sink.properties

导出成功!!!
导出成功!!!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、安装JDK1.8
  • 二、单机模式的zookeeper
  • 三、单机模式的kafka
  • 四、管理主题:
  • 五、管理分区和副本
  • 六、生产消息
  • 七、消费消息
  • 八、将数据导入到Kafka主题中
  • 九、将Kafka主题中的数据导出到文件
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档