1、单一节点安装kafka
步1:安装zookeeper集群
略
对于zookeeper的启动和停止,可以使用以下脚本:
usage="./zk.sh (start|stop)"
if [ $# -lt 1 ];then
echo "用法:$"
exit 1
fi
#声明操作类型
op=$1
host=(hadoop81 hadoop82 hadoop83);
for host in $;
do
script=""
if [ $op == 'start' ];then
echo "启动$"
script="zkServer.sh start"
fi
if [ $op == 'stop' ];then
echo "停止$host"
script="zkServer.sh stop"
fi
ssh $host $script
done
exit 0
步2:在hadoop81机上安装kafka
上传并解压:略
修改配置文件:
1、conf/server.properties
zookeeper.connect=hadoop81:2181,hadoop82:2181,hadoop83:2181
2、conf/zookeeper.perperties
dataDir=/app/zookeeper
3、启动kafka
$ ./kafka-server-start.sh ../config/server.properties
步3:查看进程和zookeeper中的目录
[wangjian@hadoop81 config]$ jps
1442 Kafka
1367 QuorumPeerMain
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config, hbase]
2、kafka集群
步1、修改master节点的kafka配置文件server.properties
为保证集群正常运行,先只修改4个配置项,hadoop81节点配置:
broker.id=0
listeners=PLAINTEXT://master:9092
log.dirs=/data/kafka/log
zookeeper.connect=master:2181,slave1:2181,slave2:2181
将kafka文件拷贝到slave1和slave2节点上,分别修改配置
broker.id=1
listeners=PLAINTEXT://slave1:9092
log.dirs=/data/kafka/log
zookeeper.connect=master:2181,slave1:2181,slave2:2181
broker.id=2
listeners=PLAINTEXT://slave2:9092
log.dirs=/data/kafka/log
zookeeper.connect=master:2181,slave1:2181,slave2:2181
说明:
broker.id:kafka broker的唯一标识
listeners:监听的url列表
log.dirs:日志路径
zookeeper.connect:zk访问路径集合,可根据实际情况配置
步2、启动集群
请根据具体情况,修改以下脚本:
usage="Usage: $0 (start|stop)"
echo "参数个数$#"
if [ $# -lt 1 ]; then
echo $usage
exit 1
fi
behave=$1
echo "$behave kafka cluster..."
#主机名称
for i in hadoop81 hadoop82 hadoop83
do
#使用ssh进行启动
script=""
if [ $1 == 'start' ];then
script="kafka-server-start.sh /app/kafka-2.0/config/server.properties 1>/app/kafka.out 2>&1 &"
fi
if [ $1 == 'stop' ];then
script="kafka-server-stop.sh"
fi
echo "脚本为ssh $i $script"
ssh $i "$script"
done
exit 0
注意:
/app目录输出目录,必须要存在。否则不可以执行。
步3、创建一个测试topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
或:
$ kafka-topics.sh --create --zookeeper hadoop81:2181,hadoop82:2181,hadoop83:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
然后查看topic
kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
或使用:即指定完整的zookeeper的地址:
$ kafka-topics.sh --describe --zookeeper hadoop81,hadoop82,hadoop83:2181 --topic my-replicated-topic
Topic:my-replicated-topicPartitionCount:1ReplicationFactor:3Configs:
Topic: my-replicated-topicPartition: 0Leader: 81Replicas: 81,82,83Isr: 81,82,83
结果:
$ ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topicPartitionCount:1ReplicationFactor:3(副本数量)Configs:
官方示例:
可选:
1:配置环境变量
export KAFKA_HOME=/app/kafka-2.0
export PATH=$PATH:$KAFKA_HOME/bin
2:注意kafka中的cluster.id不一定从开始
如:
broker.id=81
listeners=PLAINTEXT://hadoop81:9092
log.dirs=/app/kafka/logs
zookeeper.connect=hadoop81:2181,hadoop82:2181,hadoop83:2181
3、使用kafka
https://blog.csdn.net/m_signals/article/details/53206207
步1:创建一个topic
$ kafka-topics.sh --create --zookeeper hadoop81,hadoop82,hadoop83:2181 --replication-factor 1 --partitions 1 --topic mytopic
Created topic "mytopic".
说明:--replication-factor用于控制副本数量
也可以删除一个topic
kafka-topics.sh --delete --topic mytopic --zookeeper hadoop81,hadoop82,hadoo83:2181
现在重新创建一个有三个副本的:
$ kafka-topics.sh --create --zookeeper hadoop81,hadoop82,hadoop83:2181 --replication-factor 3 --partitions 1 --topic mytopic
步2:查看topic
查看刚才创建的topic:
$ kafka-topics.sh --zookeeper hadoop81,hadoop82,hadoop83:2181 --describe --topic mytopic
Topic:mytopicPartitionCount:1ReplicationFactor:3Configs:
Topic: mytopicPartition: 0Leader: 83Replicas: 83,81,82Isr: 83,81,82
步3:发送信息
$ kafka-console-producer.sh --broker-list hadoop81:9092 --topic mytopic
>This is a Message
>hello Jack
>Mary Rose
官方示例:
步4:创建消费者
创建以后,就可以接收到信息:
$ kafka-console-consumer.sh --bootstrap-server hadoop81:9092 --topic mytopic --from-beginning
This is a Message
hello Jack
Mary Rose
重复在producder上输入信息,可以在消费者这边收到信息
使用CTRL+C停止。
步5:创建producer并指定多个服务器
$ kafka-console-producer.sh --broker-listhadoop81:9092,hadoop82:9092,hadoop83:9092--topic mytopic
>Jack Mary Rose张三
步6、创建消费者-同样也指定多个服务器
$ kafka-console-consumer.sh --bootstrap-server hadoop81:9092,hadoop82:9092,hadoop83:9092 --topic mytopic --from-beginning
This is a Message
hello Jack
Mary Rose
He
你好
你是谁
Jack Mary Rose张三
包含最早的信息,都收到了。
4、JavaAPI
需要添加依赖:
dependency>
groupId>org.apache.kafka>
artifactId>kafka_2.12>
version>2.0.0>
dependency>
重点两个类:KafkaConsumer,KafkaProducer
1、开发生产者
注意红色字部分为定义的topic:
packagecn.kafka;
importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.Producer;
importorg.apache.kafka.clients.producer.ProducerRecord;
importjava.util.Properties;
/**
*开发生产者
*@authorwangjian
*/
public classProducerTest {
public static voidmain(String[] args)throwsException {
Properties prop =newProperties();
prop.put("bootstrap.servers","hadoop81:9092,hadoop82:9092,hadoop83:9092");
prop.put("acks","all");
prop.put("retries",);
prop.put("batch.size",16384);
prop.put("linger.ms",1);
prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producer producer =newKafkaProducer(prop);
for(inti =; i
ProducerRecord r =newProducerRecord("mytopic","key-"+ i,"value-"+ i);
producer.send(r);
}
producer.close();
}
}
官方代码示例:
2、开发消费者
packagecn.kafka;
importorg.apache.kafka.clients.consumer.Consumer;
importorg.apache.kafka.clients.consumer.ConsumerRecord;
importorg.apache.kafka.clients.consumer.ConsumerRecords;
importorg.apache.kafka.clients.consumer.KafkaConsumer;
importjava.time.Duration;
importjava.util.Arrays;
importjava.util.Properties;
/**
*开发消费者
*@authorwangjian
*/
public classDemo02_Consumer {
public static voidmain(String[] args)throwsException {
Properties props =newProperties();
props.put("bootstrap.servers","hadoop81:9092,hadoop82:9092,hadoop83:9092");
props.put("group.id","test");
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","1000");
props.put("session.timeout.ms","30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
Consumer consumer =newKafkaConsumer(props);
consumer.subscribe(Arrays.asList("mytopic"));//主题名称
System.out.println("开始...");
while(true){
System.out.println("数据..");
ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
for(ConsumerRecord record : records){
System.out.println("offset="+ record.offset() +",--key="+ record.key() +",--value="+ record.value());
}
}
}
}
3、先启动消费者再启动生产者发数据测试
在消费者控制台输出:
offset=505,--key=key-95,--value=value-95
offset=506,--key=key-96,--value=value-96
offset=507,--key=key-97,--value=value-97
offset=508,--key=key-98,--value=value-98
offset=509,--key=key-99,--value=value-99
领取专属 10元无门槛券
私享最新 技术干货