首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka0.10client使用实例

kafka0.10client使用实例

作者头像
code4it
发布2018-09-17 15:05:41
7080
发布2018-09-17 15:05:41
举报

本文主要讲一下怎么简单使用kafka0.10 client去收发消息

maven

    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

如果你是使用log4j的话,那可以不用exclude

producer

    @Test
    public void send(){
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,broker);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 1000000; i++) {
            ProducerRecord record = new ProducerRecord<String, String>(topic, Integer.toString(i),
                    Integer.toString(i));
            producer.send(record, new Callback() {

                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(metadata != null) {
                        System.out.printf("Send record partition:%d, offset:%d, keysize:%d, valuesize:%d %n",
                                metadata.partition(), metadata.offset(), metadata.serializedKeySize(),
                                metadata.serializedValueSize());
                    }
                    if(exception != null) {
                        exception.printStackTrace();
                    }
                }

            });
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        producer.close();
    }

consume

    @Test
    public void receive(){
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        try{
            consumer.subscribe(Arrays.asList(topic));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(10000);
                records.forEach(record -> {
                    System.out.printf("client : %s , topic: %s , partition: %d , offset = %d, key = %s, value = %s%n", clientId, record.topic(),
                            record.partition(), record.offset(), record.key(), record.value());
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }

可以看到跟0.8版本的不一样,不需要topicCountMap了 This client transparently handles the failure of Kafka brokers, and transparently adapts as topic partitions it fetches migrate within the cluster. The consumer is not thread-safe.

consumer多线程方案

  • 启动多个consumer的应用实例,在使用docker以及kubernetes的场景下,这样做比较方便
  • 单个应用实例,里头起多个KafkaConsumer实例
  • 单个应用实例,单个KafkaConsumer实例,多线程/异步 消费消息

个人比较倾向第一个方案,topic的partition有多少个,consumer应用就起多少个实例 对于吞吐量大,又要加速处理消费速度的,那就加上第三个方案

doc

  • kafka-01020-document
  • 【原创】Kafka Consumer多线程实例
  • 总结kafka的consumer消费能力很低的情况下的处理方案
  • 【原创】探讨kafka的分区数与多线程消费
  • Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-10-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • maven
  • producer
  • consume
    • consumer多线程方案
    • doc
    相关产品与服务
    容器服务
    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档