首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka使用avro序列化和反序列化

kafka使用avro序列化和反序列化

作者头像
我是李超人
发布2020-08-20 21:31:00
1.9K0
发布2020-08-20 21:31:00
举报

使用avro生成entity文件可以查看这篇文章https://cloud.tencent.com/developer/article/1683778

生产者代码

    public static void CustomerTest() {
        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers","192.168.0.31:9092,192.168.0.32:9092,192.168.0.33:9092");
        kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");

        KafkaProducer producer = new KafkaProducer<String,byte[]>(kafkaProps);
        for(int i = 0;i < 1000;i++){
            Customer customer = new Customer();
            customer.setEmail("23132@163.com-" + i);
            customer.setName("ric-" + i);
            customer.setId(i);
            customer.setImages(null);
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, (BinaryEncoder)null);
            SpecificDatumWriter writer = new SpecificDatumWriter(customer.getSchema());
            try {
                writer.write(customer, encoder);
                encoder.flush();
                out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            ProducerRecord<String,byte[]> record = new ProducerRecord<String, byte[]>("Customer","customer-"+i,out.toByteArray());
            producer.send(record);
        }
        producer.close();
    }

消费者代码

    public static void CustomerTest() {
        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers","192.168.0.31:9092,192.168.0.32:9092,192.168.0.33:9092");

        kafkaProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProps.put("value.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");

        kafkaProps.put("group.id","DemoAvroKafkaConsumer2");

        kafkaProps.put("auto.offset.reset","earliest");

        KafkaConsumer<String ,byte[]> consumer = new KafkaConsumer<String, byte[]>(kafkaProps);

        consumer.subscribe(Collections.singletonList("Customer"));

        SpecificDatumReader<Customer> reader = new SpecificDatumReader<>(Customer.getClassSchema());
        try {
            while (true){
                ConsumerRecords<String,byte[]> records = consumer.poll(10);
                for(ConsumerRecord<String,byte[]> record : records){
                    Decoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
                    Customer customer = null;
                    try {
                        customer = reader.read(null,decoder);
                        System.out.println(record.key() + ":" + customer.get("id") + "\t" + customer.get("name") + "\t" + customer.get("email"));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        } finally {
            consumer.close();
        }
    }

相关pom依赖

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>1.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.8.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-tools</artifactId>
      <version>1.8.2</version>
    </dependency>
    <dependency>
      <groupId>com.twitter</groupId>
      <artifactId>bijection-avro_2.11</artifactId>
      <version>0.9.6</version>
    </dependency>
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-12-08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档