kafka入门篇(四)

六、kafka集群

(利用上面server1,server2,server3,下面以server1为实例)

1)下载kafka0.8(http://kafka.apache.org/downloads.html),保存到服务器/home/wwb目录下kafka-0.8.0-beta1-src.tgz(kafka_2.8.0-0.8.0-beta1.tgz)

2)解压 tar -zxvf kafka-0.8.0-beta1-src.tgz,产生文件夹kafka-0.8.0-beta1-src更改为kafka01

3)配置

修改kafka01/config/server.properties,其中broker.id,log.dirs,zookeeper.connect必须根据实际情况进行修改,其他项根据需要自行斟酌。大致如下:

broker.id=1

port=9091

log.dir=./logs

num.partitions=2

zookeeper.connect=192.168.0.1:2181,192.168.0.2:2182,192.168.0.3:2183

kafka.csv.metrics.dir=/tmp/kafka_metrics

4)初始化因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。

> cd kafka01

> ./sbt update

> ./sbt package

> ./sbt assembly-package-dependency

在第二个命令时可能需要一定时间,由于要下载更新一些依赖包。所以请大家 耐心点。

5) 启动kafka01

>JMX_PORT=9997 bin/kafka-server-start.sh config/server.properties &

a)kafka02操作步骤与kafka01雷同,不同的地方如下

修改kafka02/config/server.properties

broker.id=2

port=9092

##其他配置和kafka-0保持一致

启动kafka02

JMX_PORT=9998 bin/kafka-server-start.shconfig/server.properties &

b)kafka03操作步骤与kafka01雷同,不同的地方如下

修改kafka03/config/server.properties

broker.id=3

port=9093

##其他配置和kafka-0保持一致

启动kafka02

JMX_PORT=9999 bin/kafka-server-start.shconfig/server.properties &

6)创建Topic(包含一个分区,三个副本)

>bin/kafka-create-topic.sh--zookeeper 192.168.0.1:2181 --replica 3 --partition 1 --topicmy-replicated-topic

7)查看topic情况

>bin/kafka-list-top.sh --zookeeper 192.168.0.1:2181

topic: my-replicated-topic partition: 0 leader: 1 replicas: 1,2,0 isr: 1,2,0

8)创建发送者

>bin/kafka-console-producer.sh--broker-list 192.168.0.1:9091 --topic my-replicated-topic

my test message1

my test message2

^C

9)创建消费者

>bin/kafka-console-consumer.sh --zookeeper127.0.0.1:2181 --from-beginning --topic my-replicated-topic

...

my test message1

my test message2

^C

10)杀掉server1上的broker

>pkill -9 -f config/server.properties

11)查看topic

>bin/kafka-list-top.sh --zookeeper192.168.0.1:2181

topic: my-replicated-topic partition: 0 leader: 1 replicas: 1,2,0 isr: 1,2,0

发现topic还正常的存在

11)创建消费者,看是否能查询到消息

>bin/kafka-console-consumer.sh --zookeeper192.168.0.1:2181 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2

^C

说明一切都是正常的。

OK,以上就是对Kafka个人的理解,不对之处请大家及时指出。

补充说明:

1、public Map>> createMessageStreams(Map topicCountMap),其中该方法的参数Map的key为topic名称,value为topic对应的分区数,譬如说如果在kafka中不存在相应的topic时,则会创建一个topic,分区数为value,如果存在的话,该处的value则不起什么作用

2、关于生产者向指定的分区发送数据,通过设置partitioner.class的属性来指定向那个分区发送数据,如果自己指定必须编写相应的程序,默认是kafka.producer.DefaultPartitioner,分区程序是基于散列的键。

3、在多个消费者读取同一个topic的数据,为了保证每个消费者读取数据的唯一性,必须将这些消费者group_id定义为同一个值,这样就构建了一个类似队列的数据结构,如果定义不同,则类似一种广播结构的。

4、在consumerapi中,参数设计到数字部分,类似Map,

numStream,指的都是在topic不存在的时,会创建一个topic,并且分区个数为Integer,numStream,注意如果数字大于broker的配置中num.partitions属性,会以num.partitions为依据创建分区个数的。

5、producerapi,调用send时,如果不存在topic,也会创建topic,在该方法中没有提供分区个数的参数,在这里分区个数是由服务端broker的配置中num.partitions属性决定的

欢迎关注,一起学习大数据!

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181217G1KDCX00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券