前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >单独KafkaConsumer实例and多worker线程。

单独KafkaConsumer实例and多worker线程。

作者头像
别先生
发布2019-06-03 11:21:20
5480
发布2019-06-03 11:21:20
举报
文章被收录于专栏:别先生别先生

1、单独KafkaConsumer实例and多worker线程。 将获取的消息和消息的处理解耦,将消息的处理放入单独的工作者线程中,即工作线程中,同时维护一个或者若各干consumer实例执行消息获取任务。 本例使用全局的KafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作,之后worker线程完成处理后上报位移状态,由全局consumer提交位移。

代码语言:javascript
复制
  1 package com.bie.kafka.kafkaWorker;
  2 
  3 import java.time.Duration;
  4 import java.util.Arrays;
  5 import java.util.Collection;
  6 import java.util.Collections;
  7 import java.util.HashMap;
  8 import java.util.Map;
  9 import java.util.Properties;
 10 import java.util.concurrent.ArrayBlockingQueue;
 11 import java.util.concurrent.ExecutorService;
 12 import java.util.concurrent.ThreadPoolExecutor;
 13 import java.util.concurrent.TimeUnit;
 14 
 15 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 16 import org.apache.kafka.clients.consumer.ConsumerRecords;
 17 import org.apache.kafka.clients.consumer.KafkaConsumer;
 18 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 19 import org.apache.kafka.common.TopicPartition;
 20 import org.apache.kafka.common.errors.WakeupException;
 21 
 22 /**
 23  * 
 24  * @Description TODO
 25  * @author biehl
 26  * @Date 2019年6月1日 下午3:28:53
 27  * 
 28  * @param <K>
 29  * @param <V>
 30  * 
 31  *            1、consumer多线程管理类,用于创建线程池以及为每个线程分配消息集合。 另外consumer位移提交也在该类中完成。
 32  * 
 33  */
 34 public class ConsumerThreadHandler<K, V> {
 35 
 36     // KafkaConsumer实例
 37     private final KafkaConsumer<K, V> consumer;
 38     // ExecutorService实例
 39     private ExecutorService executors;
 40     // 位移信息offsets
 41     private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
 42 
 43     /**
 44      * 
 45      * @param brokerList
 46      *            kafka列表
 47      * @param groupId
 48      *            消费组groupId
 49      * @param topic
 50      *            主题topic
 51      */
 52     public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
 53         Properties props = new Properties();
 54         // broker列表
 55         props.put("bootstrap.servers", brokerList);
 56         // 消费者组编号Id
 57         props.put("group.id", groupId);
 58         // 非自动提交位移信息
 59         props.put("enable.auto.commit", "false");
 60         // 从最早的位移处开始消费消息
 61         props.put("auto.offset.reset", "earliest");
 62         // key反序列化
 63         props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
 64         // value反序列化
 65         props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
 66         // 将配置信息装配到消费者实例里面
 67         consumer = new KafkaConsumer<>(props);
 68         // 消费者订阅消息,并实现重平衡rebalance
 69         // rebalance监听器,创建一个匿名内部类。使用rebalance监听器前提是使用消费者组(consumer group)。
 70         // 监听器最常见用法就是手动提交位移到第三方存储以及在rebalance前后执行一些必要的审计操作。
 71         consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
 72 
 73             /**
 74              * 在coordinator开启新一轮rebalance前onPartitionsRevoked方法会被调用。
 75              */
 76             @Override
 77             public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
 78                 // 提交位移
 79                 consumer.commitSync(offsets);
 80             }
 81 
 82             /**
 83              * rebalance完成后会调用onPartitionsAssigned方法。
 84              */
 85             @Override
 86             public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
 87                 // 清除位移信息
 88                 offsets.clear();
 89             }
 90         });
 91     }
 92 
 93     /**
 94      * 消费主方法
 95      * 
 96      * @param threadNumber
 97      *            线程池中的线程数
 98      */
 99     public void consume(int threadNumber) {
100         executors = new ThreadPoolExecutor(
101                 threadNumber, 
102                 threadNumber, 
103                 0L, 
104                 TimeUnit.MILLISECONDS,
105                 new ArrayBlockingQueue<Runnable>(1000), 
106                 new ThreadPoolExecutor.CallerRunsPolicy());
107         try {
108             // 消费者一直处于等待状态,等待消息消费
109             while (true) {
110                 // 从主题中获取消息
111                 ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1000L));
112                 // 如果获取到的消息不为空
113                 if (!records.isEmpty()) {
114                     // 将消息信息、位移信息封装到ConsumerWorker中进行提交
115                     executors.submit(new ConsumerWorker<>(records, offsets));
116                 }
117                 // 调用提交位移信息、尽量降低synchronized块对offsets锁定的时间
118                 this.commitOffsets();
119             }
120         } catch (WakeupException e) {
121             // 此处忽略此异常的处理.WakeupException异常是从poll方法中抛出来的异常
122             //如果不忽略异常信息,此处会打印错误哦,亲
123             //e.printStackTrace();
124         } finally {
125             // 调用提交位移信息、尽量降低synchronized块对offsets锁定的时间
126             this.commitOffsets();
127             // 关闭consumer
128             consumer.close();
129         }
130     }
131 
132     /**
133      * 尽量降低synchronized块对offsets锁定的时间
134      */
135     private void commitOffsets() {
136         // 尽量降低synchronized块对offsets锁定的时间
137         Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
138         // 保证线程安全、同步锁,锁住offsets
139         synchronized (offsets) {
140             // 判断如果offsets位移信息为空,直接返回,节省同步锁对offsets的锁定的时间
141             if (offsets.isEmpty()) {
142                 return;
143             }
144             // 如果offsets位移信息不为空,将位移信息offsets放到集合中,方便同步
145             unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
146             // 清除位移信息offsets
147             offsets.clear();
148         }
149         // 将封装好的位移信息unmodfiedMap集合进行同步提交
150         // 手动提交位移信息
151         consumer.commitSync(unmodfiedMap);
152     }
153 
154     /**
155      * 关闭消费者
156      */
157     public void close() {
158         // 在另一个线程中调用consumer.wakeup();方法来触发consume的关闭。
159         // KafkaConsumer不是线程安全的,但是另外一个例外,用户可以安全的在另一个线程中调用consume.wakeup()。
160         // wakeup()方法是特例,其他KafkaConsumer方法都不能同时在多线程中使用
161         consumer.wakeup();
162         // 关闭ExecutorService实例
163         executors.shutdown();
164     }
165 
166 }
代码语言:javascript
复制
 1 package com.bie.kafka.kafkaWorker;
 2 
 3 import java.util.List;
 4 import java.util.Map;
 5 
 6 import org.apache.kafka.clients.consumer.ConsumerRecord;
 7 import org.apache.kafka.clients.consumer.ConsumerRecords;
 8 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 9 import org.apache.kafka.common.TopicPartition;
10 
11 /**
12  * 
13  * @Description TODO
14  * @author biehl
15  * @Date 2019年6月1日 下午3:45:38
16  * 
17  * @param <K>
18  * @param <V>
19  * 
20  *            1、本质上是一个Runnable,执行真正的消费逻辑并且上报位移信息给ConsumerThreadHandler。
21  * 
22  */
23 public class ConsumerWorker<K, V> implements Runnable {
24 
25     // 获取到的消息
26     private final ConsumerRecords<K, V> records;
27     // 位移信息
28     private final Map<TopicPartition, OffsetAndMetadata> offsets;
29 
30     /**
31      * ConsumerWorker有参构造方法
32      * 
33      * @param records
34      *            获取到的消息
35      * @param offsets
36      *            位移信息
37      */
38     public ConsumerWorker(ConsumerRecords<K, V> records, Map<TopicPartition, OffsetAndMetadata> offsets) {
39         this.records = records;
40         this.offsets = offsets;
41     }
42 
43     /**
44      * 
45      */
46     @Override
47     public void run() {
48         // 获取到分区的信息
49         for (TopicPartition partition : records.partitions()) {
50             // 获取到分区的消息记录
51             List<ConsumerRecord<K, V>> partConsumerRecords = records.records(partition);
52             // 遍历获取到的消息记录
53             for (ConsumerRecord<K, V> record : partConsumerRecords) {
54                 // 打印消息
55                 System.out.println("topic: " + record.topic() + ",partition: " + record.partition() + ",offset: "
56                         + record.offset() 
57                         + ",消息记录: " + record.value());
58             }
59             // 上报位移信息。获取到最后的位移消息,由于位移消息从0开始,所以最后位移减一获取到位移位置
60             long lastOffset = partConsumerRecords.get(partConsumerRecords.size() - 1).offset();
61             // 同步锁,锁住offsets位移
62             synchronized (offsets) {
63                 // 如果offsets位移不包含partition这个key信息
64                 if (!offsets.containsKey(partition)) {
65                     // 就将位移信息设置到map集合里面
66                     offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
67                 } else {
68                     // 否则,offsets位移包含partition这个key信息
69                     // 获取到offsets的位置信息
70                     long curr = offsets.get(partition).offset();
71                     // 如果获取到的位置信息小于等于上一次位移信息大小
72                     if (curr <= lastOffset + 1) {
73                         // 将这个partition的位置信息设置到map集合中。并保存到broker中。
74                         offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
75                     }
76                 }
77             }
78         }
79     }
80 
81 }
代码语言:javascript
复制
 1 package com.bie.kafka.kafkaWorker;
 2 
 3 /**
 4  * 
 5  * @Description TODO
 6  * @author biehl
 7  * @Date 2019年6月1日 下午4:13:25
 8  *
 9  *       1、单独KafkaConsumer实例和多worker线程。
10  *       2、将获取的消息和消息的处理解耦,将消息的处理放入单独的工作者线程中,即工作线程中,
11  *       同时维护一个或者若各干consumer实例执行消息获取任务。
12  *       3、本例使用全局的KafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作,
13  *       之后worker线程完成处理后上报位移状态,由全局consumer提交位移。
14  * 
15  * 
16  */
17 
18 public class ConsumerMain {
19 
20     public static void main(String[] args) {
21         // broker列表
22         String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092";
23         // 主题信息topic
24         String topic = "topic1";
25         // 消费者组信息group
26         String groupId = "group2";
27         // 根据ConsumerThreadHandler构造方法构造出消费者
28         final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupId, topic);
29         final int cpuCount = Runtime.getRuntime().availableProcessors();
30         System.out.println("cpuCount : " + cpuCount);
31         // 创建线程的匿名内部类
32         Runnable runnable = new Runnable() {
33 
34             @Override
35             public void run() {
36                 // 执行consume,在此线程中执行消费者消费消息。
37                 handler.consume(cpuCount);
38             }
39         };
40         // 直接调用runnable此线程,并运行
41         new Thread(runnable).start();
42 
43         try {
44             // 此线程休眠20000
45             Thread.sleep(20000L);
46         } catch (InterruptedException e) {
47             e.printStackTrace();
48         }
49         System.out.println("Starting to close the consumer...");
50         // 关闭消费者
51         handler.close();
52     }
53 
54 }

待续......

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-06-02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档