首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink中使用Avro格式的自定义序列化反序列化传输

Flink中使用Avro格式的自定义序列化反序列化传输

作者头像
小勇DW3
发布2020-02-14 15:20:59
1.7K0
发布2020-02-14 15:20:59
举报
文章被收录于专栏:小勇DW3小勇DW3小勇DW3

正文前先来一波福利推荐:

福利一:

百万年薪架构师视频,该视频可以学到很多东西,是本人花钱买的VIP课程,学习消化了一年,为了支持一下女朋友公众号也方便大家学习,共享给大家。

福利二:

毕业答辩以及工作上各种答辩,平时积累了不少精品PPT,现在共享给大家,大大小小加起来有几千套,总有适合你的一款,很多是网上是下载不到。

获取方式:

微信关注 精品3分钟 ,id为 jingpin3mins,关注后回复   百万年薪架构师 ,精品收藏PPT  获取云盘链接,谢谢大家支持!

------------------------正文开始---------------------------

生产者配置:

FlinkKafkaProducer09<DoubtEventPreformatDataAvro> convertOutTopicProducer = new FlinkKafkaProducer09<>(
        outputTopic,
        ConfluentRegistryAvroSerializationSchema.<DoubtEventPreformatDataAvro>ofValue(outputTopic, jobConfig.getKafkaMasterConfig()),
        jobConfig.getKafkaMasterConfig(), 
     (FlinkKafkaPartitioner)null);
ConfluentRegistryAvroSerializationSchema 实现自定义序列化方法:
public class ConfluentRegistryAvroSerializationSchema <V extends SpecificRecord> implements SerializationSchema<V> {

    private transient KafkaAvroSerializer kafkaAvroSerializer;


    private String topic;

    private Map<String, Object> config;

    private boolean isKey;

    private ConfluentRegistryAvroSerializationSchema(String topic, boolean isKey, Map<String, Object> config) {
        this.topic = topic;
        this.isKey = isKey;
        this.config = config;
        initKafkaSerializer();
    }

    public static ConfluentRegistryAvroSerializationSchema ofValue(String topic, Properties config) {
        return new ConfluentRegistryAvroSerializationSchema(topic, false, config);
    }
    public static ConfluentRegistryAvroSerializationSchema ofKey(String topic, Properties config) {
        return new ConfluentRegistryAvroSerializationSchema(topic, true, config);
    }
    private void initKafkaSerializer(){
        kafkaAvroSerializer = new KafkaAvroSerializer();
        kafkaAvroSerializer.configure(config, isKey);
    }

    @Override
    public byte[] serialize(V element) {
        if(kafkaAvroSerializer == null){
            initKafkaSerializer();
        }
        return kafkaAvroSerializer.serialize(topic, element);
    }
}

生产者的数据源:

private DoubtEventPreformatDataAvro convert(JSONObject jsonValue){

avro格式的反序列化:

        FlinkKafkaConsumer09<RetryKeyPreformatAvroValue> inputPreformatTopicConsumer = new FlinkKafkaConsumer09<>(
                jobConfig.getKafkaInputTopicName(), new RetryKeyPreformatAvroValueDeserializationSchema(schemaUrl), kafkaMasterConfig);
        JobUtils.setStartupMode(jobConfig.getStartModeOfInputTopic(), inputPreformatTopicConsumer);
        inputPreformatTopicConsumer.setCommitOffsetsOnCheckpoints(true);

自定义实现反序列化的函数:

public class RetryKeyPreformatAvroValueDeserializationSchema
        extends AbstractAvroKeyValueDeserializationSchema<KafkaRetryKeyMeta, DoubtEventPreformatDataAvro, RetryKeyPreformatAvroValue>{


    public RetryKeyPreformatAvroValueDeserializationSchema(String schemaRegisterUrl) {
        super(KafkaRetryKeyMeta.class, DoubtEventPreformatDataAvro.class, RetryKeyPreformatAvroValue.class, schemaRegisterUrl);
    }

    @Override
    protected RetryKeyPreformatAvroValue newInstance() {
        return new RetryKeyPreformatAvroValue();
    }

}
public abstract class AbstractAvroKeyValueDeserializationSchema<K extends SpecificRecord, V extends SpecificRecord, R extends KeyValueBase<K, V>>  extends AbstractKeyValueDeserializationSchema<K, V, R> {

    private static final long serialVersionUID = 1509391548173891955L;


    public AbstractAvroKeyValueDeserializationSchema() {

    }
    public AbstractAvroKeyValueDeserializationSchema(Class<K> kClass, Class<V> vClass, Class<R> kvClass, String schemaRegisterUrl) {
        this.kClass = kClass;
        this.vClass = vClass;
        this.kvClass = kvClass;
        this.schemaRegisterUrl = schemaRegisterUrl;
    }

    @Override
    DeserializationSchema<K> newKeyDeserializer() {
        return ConfluentRegistryAvroDeserializationSchema.forSpecific(kClass, schemaRegisterUrl);
    }

    @Override
    DeserializationSchema<V> newValueDeserializer() {
        return ConfluentRegistryAvroDeserializationSchema.forSpecific(vClass, schemaRegisterUrl);
    }
}
public abstract class AbstractKeyValueDeserializationSchema<K, V, R extends KeyValueBase<K, V>>  implements KafkaDeserializationSchema<R> {

    private static final long serialVersionUID = 1509391548173891955L;

    private DeserializationSchema<K> keyDeserializer;
    private DeserializationSchema<V> valueDeserializer ;

    protected Class<K> kClass;
    protected Class<V> vClass;
    protected Class<R> kvClass;

    protected String schemaRegisterUrl;
    public AbstractKeyValueDeserializationSchema() {

    }
    public AbstractKeyValueDeserializationSchema(Class<K> kClass, Class<V> vClass, Class<R> kvClass, String schemaRegisterUrl) {
        this.kClass = kClass;
        this.vClass = vClass;
        this.kvClass = kvClass;
        this.schemaRegisterUrl = schemaRegisterUrl;
        initDeserializer();
    }


    private void initDeserializer(){
        keyDeserializer = newKeyDeserializer();
        valueDeserializer = newValueDeserializer();
    }

    abstract DeserializationSchema<K> newKeyDeserializer();
    abstract DeserializationSchema<V> newValueDeserializer();

    @Override
    public R deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {

        if(keyDeserializer == null || valueDeserializer == null){
            initDeserializer();
        }

        R keyValue = newInstance();

        if(record.key() != null){
            try {
                keyValue.key = keyDeserializer.deserialize(record.key());
            } catch (Exception e) {
            }
        }

        if (record.value() != null) {
            try{
                keyValue.value = valueDeserializer.deserialize(record.value());
            } catch (Exception e) {

            }
        }
        return keyValue;
    }

    protected abstract R newInstance();


    @Override
    public boolean isEndOfStream(R nextElement) {
        return false;
    }

    @Override
    public TypeInformation<R> getProducedType() {
        return getForClass(kvClass);
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-01-20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档