首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Kafka快速入门

1、单一节点安装kafka

步1:安装zookeeper集群

对于zookeeper的启动和停止,可以使用以下脚本:

usage="./zk.sh (start|stop)"

if [ $# -lt 1 ];then

echo "用法:$"

exit 1

fi

#声明操作类型

op=$1

host=(hadoop81 hadoop82 hadoop83);

for host in $;

do

script=""

if [ $op == 'start' ];then

echo "启动$"

script="zkServer.sh start"

fi

if [ $op == 'stop' ];then

echo "停止$host"

script="zkServer.sh stop"

fi

ssh $host $script

done

exit 0

步2:在hadoop81机上安装kafka

上传并解压:略

修改配置文件:

1、conf/server.properties

zookeeper.connect=hadoop81:2181,hadoop82:2181,hadoop83:2181

2、conf/zookeeper.perperties

dataDir=/app/zookeeper

3、启动kafka

$ ./kafka-server-start.sh ../config/server.properties

步3:查看进程和zookeeper中的目录

[wangjian@hadoop81 config]$ jps

1442 Kafka

1367 QuorumPeerMain

[zk: localhost:2181(CONNECTED) 0] ls /

[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config, hbase]

2、kafka集群

步1、修改master节点的kafka配置文件server.properties

为保证集群正常运行,先只修改4个配置项,hadoop81节点配置:

broker.id=0

listeners=PLAINTEXT://master:9092

log.dirs=/data/kafka/log

zookeeper.connect=master:2181,slave1:2181,slave2:2181

将kafka文件拷贝到slave1和slave2节点上,分别修改配置

broker.id=1

listeners=PLAINTEXT://slave1:9092

log.dirs=/data/kafka/log

zookeeper.connect=master:2181,slave1:2181,slave2:2181

broker.id=2

listeners=PLAINTEXT://slave2:9092

log.dirs=/data/kafka/log

zookeeper.connect=master:2181,slave1:2181,slave2:2181

说明:

broker.id:kafka broker的唯一标识

listeners:监听的url列表

log.dirs:日志路径

zookeeper.connect:zk访问路径集合,可根据实际情况配置

步2、启动集群

请根据具体情况,修改以下脚本:

usage="Usage: $0 (start|stop)"

echo "参数个数$#"

if [ $# -lt 1 ]; then

echo $usage

exit 1

fi

behave=$1

echo "$behave kafka cluster..."

#主机名称

for i in hadoop81 hadoop82 hadoop83

do

#使用ssh进行启动

script=""

if [ $1 == 'start' ];then

script="kafka-server-start.sh /app/kafka-2.0/config/server.properties 1>/app/kafka.out 2>&1 &"

fi

if [ $1 == 'stop' ];then

script="kafka-server-stop.sh"

fi

echo "脚本为ssh $i $script"

ssh $i "$script"

done

exit 0

注意:

/app目录输出目录,必须要存在。否则不可以执行。

步3、创建一个测试topic

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

或:

$ kafka-topics.sh --create --zookeeper hadoop81:2181,hadoop82:2181,hadoop83:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

然后查看topic

kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

或使用:即指定完整的zookeeper的地址:

$ kafka-topics.sh --describe --zookeeper hadoop81,hadoop82,hadoop83:2181 --topic my-replicated-topic

Topic:my-replicated-topicPartitionCount:1ReplicationFactor:3Configs:

Topic: my-replicated-topicPartition: 0Leader: 81Replicas: 81,82,83Isr: 81,82,83

结果:

$ ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topicPartitionCount:1ReplicationFactor:3(副本数量)Configs:

官方示例:

可选:

1:配置环境变量

export KAFKA_HOME=/app/kafka-2.0

export PATH=$PATH:$KAFKA_HOME/bin

2:注意kafka中的cluster.id不一定从开始

如:

broker.id=81

listeners=PLAINTEXT://hadoop81:9092

log.dirs=/app/kafka/logs

zookeeper.connect=hadoop81:2181,hadoop82:2181,hadoop83:2181

3、使用kafka

https://blog.csdn.net/m_signals/article/details/53206207

步1:创建一个topic

$ kafka-topics.sh --create --zookeeper hadoop81,hadoop82,hadoop83:2181 --replication-factor 1 --partitions 1 --topic mytopic

Created topic "mytopic".

说明:--replication-factor用于控制副本数量

也可以删除一个topic

kafka-topics.sh --delete --topic mytopic --zookeeper hadoop81,hadoop82,hadoo83:2181

现在重新创建一个有三个副本的:

$ kafka-topics.sh --create --zookeeper hadoop81,hadoop82,hadoop83:2181 --replication-factor 3 --partitions 1 --topic mytopic

步2:查看topic

查看刚才创建的topic:

$ kafka-topics.sh --zookeeper hadoop81,hadoop82,hadoop83:2181 --describe --topic mytopic

Topic:mytopicPartitionCount:1ReplicationFactor:3Configs:

Topic: mytopicPartition: 0Leader: 83Replicas: 83,81,82Isr: 83,81,82

步3:发送信息

$ kafka-console-producer.sh --broker-list hadoop81:9092 --topic mytopic

>This is a Message

>hello Jack

>Mary Rose

官方示例:

步4:创建消费者

创建以后,就可以接收到信息:

$ kafka-console-consumer.sh --bootstrap-server hadoop81:9092 --topic mytopic --from-beginning

This is a Message

hello Jack

Mary Rose

重复在producder上输入信息,可以在消费者这边收到信息

使用CTRL+C停止。

步5:创建producer并指定多个服务器

$ kafka-console-producer.sh --broker-listhadoop81:9092,hadoop82:9092,hadoop83:9092--topic mytopic

>Jack Mary Rose张三

步6、创建消费者-同样也指定多个服务器

$ kafka-console-consumer.sh --bootstrap-server hadoop81:9092,hadoop82:9092,hadoop83:9092 --topic mytopic --from-beginning

This is a Message

hello Jack

Mary Rose

He

你好

你是谁

Jack Mary Rose张三

包含最早的信息,都收到了。

4、JavaAPI

需要添加依赖:

dependency>

groupId>org.apache.kafka>

artifactId>kafka_2.12>

version>2.0.0>

dependency>

重点两个类:KafkaConsumer,KafkaProducer

1、开发生产者

注意红色字部分为定义的topic:

packagecn.kafka;

importorg.apache.kafka.clients.producer.KafkaProducer;

importorg.apache.kafka.clients.producer.Producer;

importorg.apache.kafka.clients.producer.ProducerRecord;

importjava.util.Properties;

/**

*开发生产者

*@authorwangjian

*/

public classProducerTest {

public static voidmain(String[] args)throwsException {

Properties prop =newProperties();

prop.put("bootstrap.servers","hadoop81:9092,hadoop82:9092,hadoop83:9092");

prop.put("acks","all");

prop.put("retries",);

prop.put("batch.size",16384);

prop.put("linger.ms",1);

prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

Producer producer =newKafkaProducer(prop);

for(inti =; i

ProducerRecord r =newProducerRecord("mytopic","key-"+ i,"value-"+ i);

producer.send(r);

}

producer.close();

}

}

官方代码示例:

2、开发消费者

packagecn.kafka;

importorg.apache.kafka.clients.consumer.Consumer;

importorg.apache.kafka.clients.consumer.ConsumerRecord;

importorg.apache.kafka.clients.consumer.ConsumerRecords;

importorg.apache.kafka.clients.consumer.KafkaConsumer;

importjava.time.Duration;

importjava.util.Arrays;

importjava.util.Properties;

/**

*开发消费者

*@authorwangjian

*/

public classDemo02_Consumer {

public static voidmain(String[] args)throwsException {

Properties props =newProperties();

props.put("bootstrap.servers","hadoop81:9092,hadoop82:9092,hadoop83:9092");

props.put("group.id","test");

props.put("enable.auto.commit","true");

props.put("auto.commit.interval.ms","1000");

props.put("session.timeout.ms","30000");

props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

Consumer consumer =newKafkaConsumer(props);

consumer.subscribe(Arrays.asList("mytopic"));//主题名称

System.out.println("开始...");

while(true){

System.out.println("数据..");

ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));

for(ConsumerRecord record : records){

System.out.println("offset="+ record.offset() +",--key="+ record.key() +",--value="+ record.value());

}

}

}

}

3、先启动消费者再启动生产者发数据测试

在消费者控制台输出:

offset=505,--key=key-95,--value=value-95

offset=506,--key=key-96,--value=value-96

offset=507,--key=key-97,--value=value-97

offset=508,--key=key-98,--value=value-98

offset=509,--key=key-99,--value=value-99

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180916G0OJ7B00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券