Kafka 运营总结

概述

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统)。

Kafka主要被用于两大类应用:

  1. 在应用间构建实时的数据流通道
  2. 构建传输或处理数据流的实时流式应用

一、Kafka基础架构

Kafka有4个核心API:

  1. Producer API:用于应用程序将数据流发送到一个或多个Kafka topics
  2. Consumer API:用于应用程序订阅一个或多个topics并处理被发送到这些topics中的数据
  3. Streams API:允许应用程序作为流处理器,处理来自一个或多个topics的数据并将处理结果发送到一个或多个topics中,有效的将输入流转化为输出流
  4. Connector API:用于构建和运行将Kafka topics和现有应用或数据系统连接的可重用的produers和consumers。例如,如链接到关系数据库的连接器可能会捕获某个表所有的变更

kafka几个重要概念:

  • Topic

Topic是发布记录的类别。Kafka中的Topics一般是多订阅者的,也就是一个Topic可以有0个或多个Consumer订阅它的数据。

  • Distribution

分区日志分布在集群中服务器中,每个服务器处理一部分分区的数据和请求。每个分区可以配置分布的服务器,以实现容错。

每个分区拥有一个Leader节点,和零或多个Follower。Leader处理该分区所有的读写请求,Follower复制Leader数据。如果Leader节点宕机,将会有一个Follower节点自动的转化为Leader。每个节点成为其部分分区的Leader,并成为剩余分区的Follower,这样整个集群的负载将比较均衡。

  • Producers

Producer发送数据到它选择的Topic。Producer负责决定将数据发送到Topic的那个分区上。这可以通过简单的循环方式来平衡负载,或则可以根据某些语义来决定分区(例如基于数据中一些关键字)。

  • Consumers

Consumer使用一个group name来标识自己的身份,每条被发送到一个Topic的消息都将被分发到属于同一个group的Consumer的一个实例中(group name相同的Consumer属于一个组,一个Topic的一条消息会被这个组中的一个Consumer实例消费)。Consumer实例可以在单独的进程中或者单独的机器上。

二、部署

  • 部署环境

常用部署机型:A5或 TS80 操作系统版本: Tencent tlinux release 2.2 ava 版本: 1.7.0_80 Zookeeper 版本: zookeeper-3.4.6 Kafka 版本:kafka_2.11-0.9.0.1

  • 部署步骤
  • 分别解压zookeeper-3.4.6.tar.gz和kafka_2.11-0.9.0.1.tgz
  1. 修改配置文件,设置相关参数后,分别启动zookeeper和kafka:
/bin/sh /data/home/dc_datazone/games/zookeeper/bin/start_zk.sh
/bin/sh /data/home/dc_datazone/games/kafka/bin/start-kafka.sh
  • 常用命令

创建topic:

bin/kafka-topics.sh  --create  --zookeeper  localhost:2181  --replication-factor 1  --partitions  1  --topic test

列出所有topic:

 bin/kafka-topics.sh --list --zookeeper localhost:2181

查看topic信息(包括分区、副本情况等):

kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

往某topic生产消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

从某topic消费消息:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

三、集群配置

~/games/kafka/config]$ grep -v "#" server.properties

broker.id=0                 //每一个boker都有一个唯一的id作为它们的名字

port=9092                   // broker server服务端口

host.name=nodeIP    // broker的主机地址

num.network.threads=3       // broker处理消息的最大线程数

num.io.threads=8            //处理IO的线程数

socket.send.buffer.bytes=102400 // socket SO_SNDBUFF参数

socket.receive.buffer.bytes=102400  // socket SO_RCVBUFF参数

socket.request.max.bytes=104857600 //控制在一个请求中获取的消息的字节数

log.dirs=/data/home/dc_datazone/kafka-logs,/data/home/dc_datazone/kafka-logs-2,/data/home/dc_datazone/kafka-logs-3,/data/home/dc_datazone/kafka-logs-4

// kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能  /data/kafka-logs-1,/data/kafka-logs-2

num.partitions=6               //默认分区数

num.recovery.threads.per.data.dir=1

log.retention.hours=24                     //控制一个log保留多长个小时

log.segment.bytes=1073741824              //单一的log segment文件大小

log.retention.check.interval.ms=300000       //检查数据是否要写入到硬盘的时间间隔。

zookeeper.connect=node1:2181,node2:2181…. Node7:2181  //指定zookeeper连接字符串

zookeeper.connection.timeout.ms=6000       //指定客户端连接zookeeper的最大超时时间

delete.topic.enable=true

default.replication.factor=2                 //默认副本数

四、监控

通过keeper监控kafka和zk

[dc_datazone@hostname ~/games/datazone-keeper]$ crontab -l

@reboot /bin/sh /data/home/dc_datazone/games/datazone-keeper/datazone-keeper.sh 1>/tmp/datazone-keeper.log 2>&1

5 * * * * /bin/sh /data/home/dc_datazone/games/datazone-keeper/datazone-keeper.sh 1>/tmp/datazone-keeper.log 2>&1



PROCESS_NAME[0]="Kafka"

START_PAHT[0]="/bin/sh /data/home/dc_datazone/games/kafka/bin/start-kafka.sh"

INTERVAL[0]="60"

LOG_PATH[0]="log-kafka-datazone-keeper.log"



PROCESS_NAME[1]="QuorumPeerMain"

START_PAHT[1]="/bin/sh /data/home/dc_datazone/games/zookeeper/bin/start_zk.sh"

INTERVAL[1]="60"

LOG_PATH[1]="log-zookeeper-datazone-keeper.log"

五、增加zookeeper节点

zookeeper集群的容错能力的算法为:2n+1=总数,n即为可容纳的故障节点

即:3台能容错1台,5台能容错2台,7台能容错3台

新搭的一个Kafka集群只有3台zookeeper节点,为了增加容错能力,决定扩容

  1. 先在新节点上安装zookeeper软件,修改myid和zoo.cfg
server.1= node1:2888:3888

server.2= node2:2888:3888

server.3= node3:2888:3888

server.4= node4:2888:3888

server.5= node5:2888:3888

server.6= node6:2888:3888

server.7= node7:2888:3888
  1. 修改整个集群中与Zookeeper相关的系统配置:
~/games/kafka/config]$ vi server.properties

zookeeper.connect=zk1:2181,zk2:2181…zk7:2181
  1. 重启Zookeeper服务
~/games/zookeeper/bin]$ ./zkServer.sh restart
  1. 所有节点依次重启kafka服务
~/games/kafka/bin]$ ./kafka-server-stop.sh

~/games/kafka/bin]$ ./start-kafka.sh

六、修改数据保存时间

  • 存储时间设置太长会导致磁盘空间不够,修改topic存储时间24小时
./kafka-topics.sh --zookeeper node1:2181 --alter --topic dnf_log_dnf --config retention.ms=86400000

./kafka-topics.sh --zookeeper node1:2181 --alter --topic dnf_log_dnf --config segment.ms=86400000

./kafka-topics.sh --zookeeper node1:2181 --alter --topic dnf_log_dnf --config delete.retention.ms=86400000

不需要重启,修改集群默认设置才要重启

七、增加数据目录&迁移数据

原有kafka集群都是使用A5机型,该机型只有一块大磁盘,所以Kafka集群只需配置一个数据目录,把数据存储在该磁盘上,最近的上海端游kafka集群也是按此模板搭建的。但是新集群是TS80机型,提供了4块1.8T的磁盘,随着接入的数据增多,发现一个磁盘快撑满了,另外三块却空着,为了充分利用多个磁盘,需要在原来基础上增加数据目录

~/games/kafka/config]$ vi server.properties

log.dirs=/data/home/dc_datazone/kafka-logs,/data/home/dc_datazone/kafka-logs-2,/data/home/dc_datazone/kafka-logs-3,/data/home/dc_datazone/kafka-logs-4

# root 用户执行



mkdir -p /data2/kafka/kafka-logs

mkdir -p /data3/kafka/kafka-logs

mkdir -p /data4/kafka/kafka-logs

chown -R dc_datazone:yydev  /data2/kafka/kafka-logs

chown -R dc_datazone:yydev  /data3/kafka/kafka-logs

chown -R dc_datazone:yydev  /data4/kafka/kafka-logs

# dc_datazone 用户执行

ln -s /data2/kafka/kafka-logs kafka-logs-2

ln -s /data3/kafka/kafka-logs kafka-logs-3

ln -s /data4/kafka/kafka-logs kafka-logs-4

重启kafka集群:

~/games/kafka/bin]$ ./kafka-server-stop.sh

~/games/kafka/bin]$./start-kafka.sh

~/games/kafka/bin]$ ls /data2/kafka/kafka-logs/

cleaner-offset-checkpoint  meta.properties  recovery-point-offset-checkpoint  replication-offset-checkpoint

重启后可以看到新加的磁盘(目录)下已经产生了初始化文件,说明新目录已添加成功,以后集群新建topic时会优先把partition存到空闲率高的目录下。但是已有的数据不会自动rebalance,需要人为的去reassign-partitions,步骤如下:

先看下现有topic的分布情况,可以看到dnf_log_dnf的分区现在都在/data1

~/kafka-logs]$ ls /data1/kafka/kafka-logs/

cleaner-offset-checkpoint  dnf_log_dnf-0  dnf_log_dnf-2 dnf_log_dnf-3 dnf_log_dnf-4

1.现在开始reassign-partitions,先手动生成一个json文件topic.json

{

    "topics": [

        {"topic": "dnf_log_dnf"}

    ],

    "version": 1

}

2.调用--generate生成迁移计划,使用kafka的kafka-reassign-partitions.sh工具来分配topic的分区位置

./bin/kafka-reassign-partitions.sh --zookeeper zkconnect --topics-to-move-json-file topic.json  --broker-list  "0,1,2"  --generate

Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"dnf_log_dnf","partition":2,"replicas":[2,1]},{"topic":"dnf_log_dnf","partition":1,"replicas":[1,0]},{"topic":"dnf_log_dnf","partition":0,"replicas":[0,2]},{"topic":"dnf_log_dnf","partition":4,"replicas":[1,2]},{"topic":"dnf_log_dnf","partition":3,"replicas":[0,1]},{"topic":"dnf_log_dnf","partition":5,"replicas":[2,0]}]}

上面的json即为新的分区分布,把它复制到reassignment.json

3.执行—execute执行迁移

./bin/kafka-reassign-partitions.sh --zookeeper zkconnect --reassignment-json-file reassignment.json --execute

4.使用--verify查看进度

./bin/kafka-reassign-partitions.sh --zookeeper zkconnect --reassignment-json-file reassignment.json –verify

附扩容时同步状态:

经过两次reassign后,可以看到partion已经均匀分布到四块磁盘上了

再看下topic的分布情况,从命令输出结果可以看到topic dnf_log_dnf有6个partition,分布在0,1,2三个broker上,数据有俩个副本。

~/games/kafka]$ bin/kafka-topics.sh --describe --zookeeper ZKIP:2181 --topic dnf_log_dnf

Topic:dnf_log_dnf   PartitionCount:6    ReplicationFactor:2       Configs:delete.retention.ms=4320000,retention.ms=4320000,segment.ms=4320000

       Topic: dnf_log_dnf  Partition: 0     Leader: 0       Replicas: 0,2   Isr: 2,0

       Topic: dnf_log_dnf  Partition: 1     Leader: 1       Replicas: 1,0   Isr: 0,1

       Topic: dnf_log_dnf  Partition: 2     Leader: 2       Replicas: 2,1   Isr: 2,1

       Topic: dnf_log_dnf  Partition: 3     Leader: 0       Replicas: 0,1   Isr: 1,0

       Topic: dnf_log_dnf  Partition: 4     Leader: 2       Replicas: 1,2   Isr: 2,1

       Topic: dnf_log_dnf  Partition: 5     Leader: 2       Replicas: 2,0   Isr: 2,0

八、扩容broker

1. 启动新节点

将原节点上的 kafka 目录通过 scp 命令拷贝到新节点,只需要修改配置文件中的 broker_id 和 ip 地址,然后依次启动 kafka 服务。

2.Rebalance数据存储

同样集群扩容后数据是不会自动均衡到新机器上的,需要采用kafka-reassign-partitions.sh这个工具脚本。脚本可以工作在三种模式--generate,--execute,--verify,详细步骤可参考第七节:增加数据目录

九、运营经验

1.不同的机型(A5,TS80)磁盘情况不一样,灵活配置数据存储目录 log.dirs,使磁盘利用最大化。

2.集群健康状态监控,检查集群replicas同步情况,有异常发出告警:

  cat monitor.sh 
host_name="`hostname`"
numb=`/data/home/dc_datazone/games/kafka/kafka_default/bin/kafka-topics.sh --describe --zookeeper ZKIP:2183 --topic cf_play_l_topic|grep Leader|awk -F ':' '{print $NF}'|grep -v ','|wc -l`
echo "$numb"
if [ $numb -gt 0 ]; then
/usr/local/agenttools/agent/agentRepStr 973169 "$host_name:broker is crash,please check it!"
fi

3.kafka自带性能测试工具:

kafka/bin/kafka-producer-perf-test.sh

kafka/bin/afka-consumer-perf-test.sh

4.开源管理及监控工具:kafka manager:

KafkaOffsetMonitor:

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

蒋海鹏的专栏

1 篇文章1 人订阅

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏性能与架构

linux查看CPU和内存的使用情况

image.png CPU使用情况 通常使用top命令查看CPU的当前状态,如果是多核CPU,也可以看到每核的信息 # top 执行后按数字1,可以显示多个CP...

3347
来自专栏张善友的专栏

etcd:用于服务发现的键值存储系统

etcd是一个高可用的键值存储系统,主要用于共享配置和服务发现。etcd是由CoreOS开发并维护的,灵感来自于 ZooKeeper 和 Doozer,它使用G...

2626
来自专栏沃趣科技

Oracle集群时间同步

在RAC中集群的时间应该是保持同步的,否则可能导致很多问题,比如:依赖于时间的应用会造成数据的错误,各种日志打印的顺序紊乱,这将会影响问题的诊断,严重的可能会导...

1214
来自专栏数据和云

基于scn备份解决dg归档丢失的方法论

作者介绍 ? 黄堋 多年一线DBA经验,曾服务于电信、电网、医院等行业客户。擅长数据库优化、数据库升级迁移、数据库故障处理 当主备同步中断了,备库想快一点恢复,...

3517
来自专栏CSDN技术头条

Spark Streaming容错的改进和零数据丢失

本文来自Spark Streaming项目带头人Tathagata Das的博客文章,他现在就职于Databricks公司。过去曾在UC Berkeley的AM...

1889
来自专栏恰同学骚年

NoSQL初探之人人都爱Redis:(4)Redis主从复制架构初步探索

  通过前面几篇的介绍中,我们都是在单机上使用Redis进行相关的实践操作,从本篇起,我们将初步探索一下Redis的集群,而集群中最经典的架构便是主从复制架构。...

1142
来自专栏蓝天

Unix&Linux下常见的性能分析工具介绍

Vmstat是一个很全面的性能分析工具,可以观察到系统的进程状态、内存使用、虚拟内存使用、磁盘的IO、中断、上下文切换、CPU使用等。系统性能分析工具中,使用...

561
来自专栏匠心独运的博客

分布式定时任务Elastic-Job框架在SpringBoot工程中的应用实践(一)

摘要:如何构建具备作业分片和弹性扩缩容的定时任务系统是每个大型业务系统在设计时需要考虑的重要问题? 对于构建一般的业务系统来说,使用Quartz或者Sprin...

1252
来自专栏Spark学习技巧

深入了解HBase架构

1232
来自专栏北京马哥教育

MySQL 数据库上线后根据 status 状态优化

马哥linux运维 | 最专业的linux培训机构 ---- 网上有很多的文章教怎么配置mysql服务器,但考虑到服务器硬件配置的不同,具体应用的差别,那些文...

2816

扫码关注云+社区