Kafka单节点至集群的安装部署及注意事项

kafka简介

kafka的重要作用:

发布和订阅

像消息传递系统一样读写数据流。

处理

编写实时响应事件的可伸缩流处理应用程序

存储系统

将数据流安全地存储在分布式的,副本的,容错存储系统。kafka常见的企业应用。Spark Structured Streaming,kafka Sql

企业中常见的kafka架构图:

本文主要内容是讲解kafka单节点的安装,集群的安装部署,集群安装过程中的重要配置,错误排查监控等内容。希望帮助大家快速入门。

一 下载安装

由于kafka依赖于Zookeeper,第一步要先下载安装Zookeeper

tar -zxvf zookeeper-3.4.5.tar.gz -C /opt/modules/

复制配置文件:

cp conf/zoo_sample.cfg conf/zoo.cfg

配置数据存储目录:

dataDir=/opt/modules/zookeeper-3.4.5/data

创建数据存储目录:

mkdir /opt/modules/zookpeer-3.4.5/data

启动

bin/zkServer.sh start

查看状态

bin/zkServer.sh status

kafka下载版本是0.11.0.1,链接:

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz

tar -xzf kafka_2.11-0.11.0.1.tgz

cd kafka_2.11-0.11.0.1

二 启动server

启动在前台,关闭窗口或者ctrl+c就会关闭。

bin/kafka-server-start.sh config/server.properties

启动在后台

nohup /opt/modules/kafka_2.11-0.11.0.1/bin/kafka-server-start.sh /opt/modules/kafka_2.11-0.11.0.1/config/server.properties >/dev/null 2>&1 &

三 创建topic并测试

创建一个名字为test的topic,仅一个分区,一个副本

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

注意:副本数不能超过kafka Broker数目,否则会报错。

查看topic信息

bin/kafka-topics.sh --list --zookeeper localhost:2181

另外,上面采用的是显示的创建topic,也可以配置Broker在往不存在的Broker发数据的时候自动创建topic。

启动生产者并发送消息测试:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a message

This is another message

启动消费者,接受消息:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

注意,这条指令会记录自己的偏移到Zookeeper或者kafka,所以假如每次都原封不动的执行这条指令会重复消费历史数据,想要从上次断开处消费消息,只需要执行的时候去掉from-beginning

四 设置多节点集群

上面测试的例子是单节点,单节点无需做什么处理直接启动即可,但是生产中单节点是满足不了我们的需求的,所以我们要学会和了解如何部署多节点集群。

由于测试机器能用资源有限,就用单节点去部署三个kafka服务。

要注意的是,我们的数据存储目录要不相同,端口也要不同,Broker id也要唯一。这三个要求满足之后就可以去启动kafka服务了。

cp config/server.properties config/server-1.properties

cp config/server.properties config/server-2.properties

修改配置文件如下:

config/server-1.properties:

broker.id=1

listeners=PLAINTEXT://:9093

log.dir=/tmp/kafka-logs-1

config/server-2.properties:

broker.id=2

listeners=PLAINTEXT://:9094

log.dir=/tmp/kafka-logs-2

启动各个服务实例

nohup bin/kafka-server-start.sh config/server-1.properties >/dev/null 2>&1 &

nohup bin/kafka-server-start.sh config/server-2.properties >/dev/null 2>&1 &

由于现在我们有三个Broker实例,虽然在同一台机器。这时候我们就可以创建副本数1<x<4的topic,命令如下:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

查看我们topic的描述信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:

Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

上面的是对输出的解释,第一行是一个关于所有分区的概览。每一个额外的行都会显示一个分区的信息。创建topic时候,由于我们只给定了一个分区,所以仅仅会有一行。

1),leader :leader代表给定分区负责读写的Broker节点。每个Broker节点都会是部分随机选择分区的leader。

2),replicas: 真正复制给定分区日志数据的node列表,包括leader和Follower。

3),isr: 在同步副本的集合。Isr是副本集合的子集,代表着当前是存活的并且能跟上leader的步伐。

在例子里,Brokerid 为1的是分区的leader。

对于,在单节点安装的时候创建的topic,我们依然可以查看其信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic:test PartitionCount:1 ReplicationFactor:1 Configs:

Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

由于当时创建的时候,只有Broker id为0的节点,所以test分区只会存在该节点。假如我们想将该分区迁移到其它节点,也可以实现,该内容就不在这里讲了。

Producer几条数据给my-replicated-topic

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

...

my test message 1

my test message 2

消费出来

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2

测试,kafka的容错,可以将Broker id为1的kafka实例停掉:

ps aux | grep server-1.properties

kill -9 pid

查看topic的信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:

Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0

消费刚刚已经生产进去的消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2

五 重要配置详解

1, 部署一个kafka集群要修改的配置点

统一的Zookeeper集群,逗号隔开的Zookeeper集群

zookeeper.connect=localhost:2181

唯一的Broker id,大于等于0的整数

broker.id=0

数据存储目录,可以是多目录,逗号隔开

log.dirs=/tmp/kafka-logs

2, 问题排查及注意事项

经常有人在技术交流群里,@我,说浪尖,我代码在windows,kafka在虚拟机,明明可以ping通为何就是不能生产发送消息,而且虚拟机里面kafka的命令行都是正常的。

实际上就是应该配置Listeners参数。

0.8.2.2版本参数

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>

0.9+版本参数

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = security_protocol://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

3, 配置级别的管理

这个主要是想说的一个参数,就是topic日志的存储时间,我们这个版本默认是

log.retention.hours=168(7天)

log.retention.minutes

log.retention.ms

这样默认在server.properties配置看起来没啥问题,实际上,再生成中我们的数据缓存时间长度要求往往是不一样的,比如有些数据量比较大,我们磁盘优先仅仅想缓存几个小时,而有些却想缓存一周或者更久,那么这个时候就要使用topic级别的配置了。

retention.ms 604800000

修改topic级别配置方式如下

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --add-config max.message.bytes=128000

六 监控系统讲解

Kafka监控,也是我们生产中常见的,在这里我们推荐的监控工具有

Kafka manager

Kafkatools

这些监控工具使用起来很简单,在这里就不详细介绍了。

推荐阅读:

1,Kafka源码系列之副本同步机制及isr列表更新

2,Kafka源码系列之分组消费的再平衡策略

3,Kafka源码系列之Consumer高级API性能分析

4,Kafka源码系列之以kafka为例讲解分布式存储系统

本文分享自微信公众号 - Spark学习技巧(bigdatatip)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2017-11-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏陈树义

高效开发技巧:为什么你下载Git项目这么慢?

笔者所在公司采用的是 GitLab 进行版本管理,但许多同事下载 Git 项目的路径是这样的: 打开浏览器 -> 输入网址 -> 查找项目地址 -> 复制项目...

38040
来自专栏博客园迁移

jenkins自动部署应用到tomcat中,编译后shell脚本的简单示例

1.先配置好jenkins需要用到的其他外部组件  jdk,maven,git/svn

28430
来自专栏Hadoop实操

如何编译Azkaban插件

前面Fayson介绍了《如何编译安装Azkaban服务》,在安装的时候我们再指定配置文件中有配置Web服务和Executor服务的插件路径,说明Azkaban支...

1.4K60
来自专栏java小白

IDEA快捷键生成serialVersionUID

63960
来自专栏weixuqin 的专栏

一些杂想(服务器篇)

8320
来自专栏Hadoop实操

如何在CDH5.15中安装CDSW1.4

48260
来自专栏静下来

php提示Notice:undefined index的解决方法

今天一个朋友的网站被他的服务商关站了,说是网站每天报错日志能有30M。想想一个文本,就一些提示就能有30M的大小,那错误是蛮多的了。 因为他用的虚拟空间,他这个...

43350
来自专栏Hadoop实操

如何在CDH中使用HBase的ACLs进行授权

42940
来自专栏云计算教程系列

如何在Ubuntu 14.04上使用Ansible部署基本PHP应用程序

本教程介绍使用Ansible配置基本PHP应用程序的过程。本教程结束时的目标是让您新Web服务器为基本的PHP应用程序提供服务,而无需在目标腾讯CVM上运行单个...

17200
来自专栏王小雷

Ubuntu16.04安装Hadoop2.6+Spark1.6+开发实例

Ubuntu16.04安装Hadoop2.6+Spark1.6,并安装python开发工具Jupyter notebook,通过pyspark测试一个实例,調通...

259100

扫码关注云+社区

领取腾讯云代金券