kafka0.8消费者实例

这里简单展示一下如何使用kafka0.8的client去消费一个topic。

maven

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.2</version>
        </dependency>

初始化客户端

Properties props = new Properties();
        props.put("zookeeper.connect", zk);
//        props.put("auto.offset.reset","smallest");
        props.put("group.id",group);
        props.put("zookeeper.session.timeout.ms", "10000");
        props.put("zookeeper.sync.time.ms", "2000");
        props.put("auto.commit.interval.ms", "10000");
        props.put("consumer.timeout.ms","10000"); //设置ConsumerIterator的hasNext的超时时间,不设置则永远阻塞直到有新消息来
        props.put(org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
        ConsumerConfig consumerConfig =  new kafka.consumer.ConsumerConfig(props);
        ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, consumerCount);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                .createMessageStreams(topicCountMap);

并发消费

consumerMap.get(topic).stream().forEach(stream -> {

            pool.submit(new Runnable() {
                @Override
                public void run() {
                    ConsumerIterator<byte[], byte[]> it = stream.iterator();

                    //it.hasNext()取决于consumer.timeout.ms的值,默认为-1
                    try{
                        while (it.hasNext()) {
                            System.out.println(Thread.currentThread().getName()+" hello");
                            //是hasNext抛出异常,而不是next抛出
                            System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message()));
                        }
                    }catch (ConsumerTimeoutException e){
                        e.printStackTrace();
                    }

                    System.out.println(Thread.currentThread().getName()+" end");
                }
            });

        });

注意事项

消费者实例数*每个实例的消费线程数 <= topic的partition数量,否则多余的就浪费了。

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2017-09-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏蘑菇先生的技术笔记

探索c#之不可变数据类型

21340
来自专栏iOS技术杂谈

iOS runtime探究(四): 从runtiem开始实践Category添加属性与黑魔法method swizzling你要知道的runtime都在这里

你要知道的runtime都在这里 转载请注明出处 https://cloud.tencent.com/developer/user/1605429 本文主要讲解...

34360
来自专栏二进制文集

JDK源码分析 异常

对于JDK源码分析的文章,仅仅记录我认为重要的地方。源码的细节实在太多,不可能面面俱到地写清每个逻辑。所以我的JDK源码分析,着重在JDK的体系架构层面,具体源...

30640
来自专栏编码小白

ofbiz实体引擎(五) ModelGroupReader

public class ModelGroupReader implements Serializable { public static final...

35670
来自专栏Android知识点总结

00--图解数据结构之开篇+集合基类

14080
来自专栏小樱的经验随笔

【Java学习笔记之二十八】深入了解Java8新特性

前言: Java 8 已经发布很久了,很多报道表明java 8 是一次重大的版本升级。在Java Code Geeks上已经有很多介绍Java 8新特性的文章,...

37770
来自专栏一枝花算不算浪漫

[Java Collection]List分组之简单应用.

35050
来自专栏开发与安全

《linux c 编程一站式学习》课后部分习题解答

1、假设变量x和n是两个正整数,我们知道x/n这个表达式的结果要取Floor,例如x是17,n是4,则结果是4。如果希望结果取Ceiling应该怎么写表达式呢?...

48160
来自专栏码匠的流水账

聊聊storm的AggregateProcessor的execute及finishBatch方法

本文主要研究一下storm的AggregateProcessor的execute及finishBatch方法

13050
来自专栏一个会写诗的程序员的博客

第9章 文件IO操作、正则表达式与多线程第9章 文件IO操作、正则表达式与多线程

我们在《第6章 扩展函数与属性》中已经介绍过Kotlin中的类扩展的特性。使用Kotlin的扩展函数功能,我们可以直接为 String 类实现一个 inc() ...

12030

扫码关注云+社区

领取腾讯云代金券