1、首先启动自己的kafka集群哟。
1 启动zk:
2 bin/zkServer.sh start conf/zoo.cfg。
3 验证zk是否启动成功:
4 bin/zkServer.sh status conf/zoo.cfg。
5 启动kafka:
6 bin/kafka-server-start.sh -daemon config/server.properties。
2、生产者生产消息,模拟生产一百条数据。
1 package com.bie.kafka.producer;
2
3 import java.util.Properties;
4 import java.util.concurrent.ExecutionException;
5
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.Producer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.clients.producer.RecordMetadata;
10 import org.apache.kafka.common.serialization.StringSerializer;
11
12 /**
13 *
14 * @Description TODO
15 * @author biehl
16 * @Date 2019年5月25日 下午2:34:46
17 *
18 */
19 public class KafKaProducerHeart {
20
21 public static void main(String[] args) {
22 Properties props = new Properties();
23 props.put("bootstrap.servers", "slaver1:9092,slaver2:9092,slaver3:9092");
24 props.put("acks", "-1");
25 props.put("retries", 3);
26 props.put("batch.size", 323840);
27 props.put("linger.ms", 10);
28 props.put("buffer.memory", 33554432);
29 props.put("max.block.ms", 3000);
30 StringSerializer keySerializer = new StringSerializer();
31 StringSerializer valueSerializer = new StringSerializer();
32 Producer<String, String> producer = new KafkaProducer<String, String>(props, keySerializer, valueSerializer);
33 String topic = "topic1";
34 String value = " biehl ? wj 1314 ";
35 /*
36 * for (int i = 0; i < 100; i++) {
37 * //topic-key-value三元组确定消息所在位置
38 * producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i),
39 * value));
40 * }
41 */
42
43 // 异步发送
44 /*for (int i = 0; i < 100; i++) {
45 ProducerRecord<String, String> record = new ProducerRecord<>(topic, Integer.toString(i), value);
46 // 异步发送
47 producer.send(record, new Callback() {
48
49 @Override
50 public void onCompletion(RecordMetadata recordmetadata, Exception exception) {
51 if (exception == null) {
52 System.out.println("消息发送成功");
53 System.out.println(
54 "topic: " + recordmetadata.topic() + ", partition分区: " + recordmetadata.partition());
55 } else {
56 System.out.println("消息发送失败");
57 }
58 }
59 });
60 }*/
61
62 // 同步发送
63 for (int i = 0; i < 100; i++) {
64 ProducerRecord<String, String> record = new ProducerRecord<>(topic, Integer.toString(i), value);
65 try {
66 RecordMetadata recordMetadata = producer.send(record).get();
67 System.out.println(
68 "topic: " + recordMetadata.topic() + ", partition分区: " + recordMetadata.partition());
69 } catch (InterruptedException e) {
70 e.printStackTrace();
71 } catch (ExecutionException e) {
72 e.printStackTrace();
73 }
74 }
75
76 producer.close();
77 }
78
79 }
3、kafka中消费者消费消息之每个线程维护一个KafkaConsumer实例:
ConsumerRunnable,消费线程类,执行真正的消费任务
1 package com.bie.kafka.kafkaThrea;
2
3 import java.time.Duration;
4 import java.util.Arrays;
5 import java.util.Properties;
6
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
8 import org.apache.kafka.clients.consumer.ConsumerRecords;
9 import org.apache.kafka.clients.consumer.KafkaConsumer;
10
11 /**
12 *
13 * @Description TODO
14 * @author biehl
15 * @Date 2019年6月1日 上午11:48:53
16 *
17 * 1、KafkaConsumer是非线程安全的,KafkaProducer是线程安全的。
18 * 2、该案例是每个线程维护一个KafkaConsumer实例
19 * 用户创建多个线程消费topic数据,每个线程都会创建专属该线程的KafkaConsumer实例
20 * 3、ConsumerRunnable,消费线程类,执行真正的消费任务
21 */
22 public class ConsumerRunnable implements Runnable {
23
24 // 每个线程维护私有的kafkaConsumer实例
25 private final KafkaConsumer<String, String> consumer;
26
27 /**
28 * 默认每个消费者的配置参数初始化
29 *
30 * @param brokerList
31 * @param groupId
32 * @param topic
33 */
34 public ConsumerRunnable(String brokerList, String groupId, String topic) {
35 // 带参数的构造方法
36 Properties props = new Properties();
37 // kafka的列表
38 props.put("bootstrap.servers", brokerList);
39 // 消费者组编号
40 props.put("group.id", groupId);
41 // 自动提交
42 props.put("enable.auto.commit", true);
43 // 提交提交每个一秒钟
44 props.put("auto.commit.interval.ms", "1000");
45 // 反序列化key
46 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
47 // 反序列化value
48 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
49 // 将配置信息进行初始化操作
50 this.consumer = new KafkaConsumer<>(props);
51 // 定义响应的主题信息topic
52 consumer.subscribe(Arrays.asList(topic));
53 }
54
55 /**
56 *
57 */
58 @Override
59 public void run() {
60 // 消费者保持一直消费的状态
61 while (true) {
62 // 将获取到消费的信息
63 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(200));
64 // 遍历出每个消费的消息
65 for (ConsumerRecord<String, String> record : records) {
66 // 输出打印消息
67 System.out.println(
68 "当前线程名称 : " + Thread.currentThread().getName() + ", 主题名称 :" + record.topic() + ", 分区名称 :"
69 + record.partition() + ", 位移名称 :" + record.offset() + ", value :" + record.value());
70 }
71 }
72 }
73
74 }
消费线程管理类,创建多个线程类执行消费任务:
1 package com.bie.kafka.kafkaThrea;
2
3 import java.util.ArrayList;
4 import java.util.List;
5
6 /**
7 *
8 * @Description TODO
9 * @author biehl
10 * @Date 2019年6月1日 上午11:56:42
11 *
12 * 1、消费线程管理类,创建多个线程类执行消费任务
13 */
14 public class ConsumerGroup {
15
16 // 消费者群组,多消费者。
17 private List<ConsumerRunnable> consumers;
18
19 /**
20 *
21 * @param consumerNum
22 * @param groupId
23 * @param topic
24 * @param brokerList
25 */
26 public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
27 // 初始化消费者组
28 consumers = new ArrayList<>(consumerNum);
29 // 初始化消费者,创建多少个消费者
30 for (int i = 0; i < consumerNum; i++) {
31 // 根据消费者构造方法,创建消费者实例
32 ConsumerRunnable consumerRunnable = new ConsumerRunnable(brokerList, groupId, topic);
33 // 将创建的消费者实例添加到消费者组中
34 consumers.add(consumerRunnable);
35 }
36 }
37
38 /**
39 *
40 */
41 public void execute() {
42 // 将消费者组里面的消费者遍历出来
43 for (ConsumerRunnable task : consumers) {
44 // 创建一个消费者线程,并且启动该线程
45 new Thread(task).start();
46 }
47 }
48
49 }
1 package com.bie.kafka.kafkaThrea;
2
3 /**
4 *
5 * @Description TODO
6 * @author biehl
7 * @Date 2019年6月1日 下午2:19:52
8 *
9 */
10 public class ConsumerMain {
11
12 public static void main(String[] args) {
13 // kafka即broker列表
14 String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092";
15 // group组名称
16 String groupId = "group1";
17 // topic主题名称
18 String topic = "topic1";
19 // 消费者的数量
20 int consumerNum = 3;
21 // 通过构造器创建出一个对象
22 ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
23 // 执行execute的方法,创建出ConsumerRunnable消费者实例。多线程多消费者实例
24 consumerGroup.execute();
25 }
26
27 }
效果如下所示:
生产者生产消息的案例:
消费者消费消息的案例:
待续......