前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka0.8消费者实例

kafka0.8消费者实例

作者头像
code4it
发布2018-09-17 15:01:27
1.3K0
发布2018-09-17 15:01:27
举报
文章被收录于专栏:码匠的流水账码匠的流水账

这里简单展示一下如何使用kafka0.8的client去消费一个topic。

maven

代码语言:javascript
复制
<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.2</version>
        </dependency>

初始化客户端

代码语言:javascript
复制
Properties props = new Properties();
        props.put("zookeeper.connect", zk);
//        props.put("auto.offset.reset","smallest");
        props.put("group.id",group);
        props.put("zookeeper.session.timeout.ms", "10000");
        props.put("zookeeper.sync.time.ms", "2000");
        props.put("auto.commit.interval.ms", "10000");
        props.put("consumer.timeout.ms","10000"); //设置ConsumerIterator的hasNext的超时时间,不设置则永远阻塞直到有新消息来
        props.put(org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
        ConsumerConfig consumerConfig =  new kafka.consumer.ConsumerConfig(props);
        ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, consumerCount);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                .createMessageStreams(topicCountMap);

并发消费

代码语言:javascript
复制
consumerMap.get(topic).stream().forEach(stream -> {

            pool.submit(new Runnable() {
                @Override
                public void run() {
                    ConsumerIterator<byte[], byte[]> it = stream.iterator();

                    //it.hasNext()取决于consumer.timeout.ms的值,默认为-1
                    try{
                        while (it.hasNext()) {
                            System.out.println(Thread.currentThread().getName()+" hello");
                            //是hasNext抛出异常,而不是next抛出
                            System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message()));
                        }
                    }catch (ConsumerTimeoutException e){
                        e.printStackTrace();
                    }

                    System.out.println(Thread.currentThread().getName()+" end");
                }
            });

        });

注意事项

消费者实例数*每个实例的消费线程数 <= topic的partition数量,否则多余的就浪费了。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-09-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • maven
  • 初始化客户端
  • 并发消费
  • 注意事项
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档