首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >使用kafka时Java与Avro之间的转换

使用kafka时Java与Avro之间的转换
EN

Stack Overflow用户
提问于 2016-04-26 08:59:57
回答 2查看 7.8K关注 0票数 3

我使用的是confluent平台,0.9.0.1和kafka-avro-serializer 2.0.1。尝试将事件发送到kafka并读回它们,我不知道如何将事件转换回Java对象。我已经阅读了avro和confluent文档,有迹象表明这是可行的,但我看不到有什么好的例子。这是我的代码,当我使用KafkaConsumer读取GenericData$记录时,我得到了它,我的问题是如何将它返回到Java pojo中。我找到了这个用来序列化对象的代码的bit

下面是我的代码:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.joda.time.DateTime;
import org.junit.Test;

import java.io.ByteArrayOutputStream;
import java.util.Collections;
import java.util.Properties;

/**
 * This is a test...
 */
public class KafkaAvroProducerTest {
    private static final Logger log = LogManager.getLogger(KafkaAvroProducerTest.class);

    @Test
    public void produceAndSendAndEvent() throws Exception {
        Properties props = new Properties();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                org.apache.kafka.common.serialization.StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put("schema.registry.url", "http://localhost:8081");
        KafkaProducer producer = new KafkaProducer(props);

        log.debug("starting producer");
        String topic = "topic11";
        Schema schema = ReflectData.get().getSchema(Purchase.class);
        Purchase purchase = new Purchase("appStore", 9.99d, DateTime.now().getMillis(), "BRXh2lf9wm");

        ReflectDatumWriter<Purchase> reflectDatumWriter = new ReflectDatumWriter<>(schema);
        GenericDatumReader<Object> genericRecordReader = new GenericDatumReader<>(schema);
        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
        reflectDatumWriter.write(purchase, EncoderFactory.get().directBinaryEncoder(bytes, null));
        GenericRecord avroRecord = (GenericRecord) genericRecordReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));
        ProducerRecord record = new ProducerRecord<Object, Object>(topic, avroRecord);

        Thread producerThread = new Thread(() -> {
            try {
                while(true) {
                    log.debug("send a message {}", record);
                    producer.send(record);
                    Thread.sleep(2000);
                }
            }catch(Exception ex) {
                log.error("error", ex);
            }
        });
        producerThread.start();

        props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "testGroup");
        props.put("auto.commit.enable", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("schema.registry.url", "http://localhost:8081");
        org.apache.kafka.clients.consumer.KafkaConsumer<String, GenericRecord> kafkaConsumer = new KafkaConsumer(props);
        kafkaConsumer.subscribe(Collections.singletonList(topic));

        Thread consumerThread = new Thread(() -> {
            try {
                while(true) {
                    try {
                        ConsumerRecords<String, GenericRecord> records = kafkaConsumer.poll(1000);
                        for (ConsumerRecord<String, GenericRecord> record1 : records) {//
                            log.debug("read - {}", record1.value().getClass());
                        }
                    }catch(Exception ex) {
                        log.error("error", ex);
                    }
                }
            }catch(Exception ex) {
                log.error("error", ex);
            }
        });
        consumerThread.start();
        System.in.read();
    }
}
EN

回答 2

Stack Overflow用户

发布于 2016-04-29 22:27:59

我从来不使用Avro,但是看看https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html,为什么你不能简单地手动填充你的POJO呢?

class MyPojo {
    public int v1;
    public String v2;
}

// copied from your example code
ConsumerRecords<String, GenericRecord> records = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, GenericRecord> record1 : records) {
    GenericRecord avroRecord = record1.value();
    MyPojo pojo = new MyPojo();
    pojo.v1 = (Integer)avroRecord.get("<fieldname1>");
    pojo.v2 = (String)avroRecord.get("<fieldname2>");

    // process current pojo
}

不确定这是否有意义。如果这行得通,我会把它移到一个构造函数MyPojo(GenericRecord)中。

票数 2
EN

Stack Overflow用户

发布于 2018-08-03 05:34:19

就像手动将数据从GenericRecord序列化程序中提取到java类中一样……既然有一个映射库可以为你做,为什么还要手动编写呢?

为了自动转换为注册的java类型,您将考虑创建您自己的KafkaAvroDeserializer,它创建一个通过ReflectDatumReader创建的SpecificRecord,如本文中列出的stackoverflow帖子所示。- KafkaAvroDeserializer does not return SpecificRecord but returns GenericRecord

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36853618

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档