前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Flink笔记】kafka-connector消费protobuf格式数据

【Flink笔记】kafka-connector消费protobuf格式数据

原创
作者头像
皮皮熊
修改2022-02-28 20:20:48
4K0
修改2022-02-28 20:20:48
举报

TOC

一、基础概念

1、protobuf

简介

Protobuf是谷歌开源的一种平台无关、语言无关、可扩展且轻便高效的序列化数据结构的协议,可以用于网络通信和数据存储。

优缺点

image.png
image.png

安装protobuf

http://google.github.io/proto-lens/installing-protoc.html

考虑到和flink的兼容性,建议使用3.8版本。

image.png
image.png

idea也包含一个protobuf的插件,方便我们开发使用。

2、kafka-connector

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

参考相关文档

二、实际案例

1、背景介绍

image.png
image.png

在我们skywalking项目中,除了探针将Trace数据写入OAPServer中外,我们还需要通过Flink的kafka-connector消费其protobuf序列化后的数据,进行一些自定义的实时计算。

2、protoc生成java代码

代码语言:txt
复制
protoc --proto_path=${SKYWALKING_HOME}/apm-protocol/apm-network/src/main/proto/  --java_out=${RESULT_HOME}/src/main/java $(find ${SKYWALKING_HOME}/apm-protocol/apm-network/src/main/proto/  -iname "*.proto")

通过这个脚本可以批量转换所有的proto为java代码。

3、构建Deserializer

以Skywalking的JVMMetricCollection为例:

代码语言:txt
复制
public class KafkaJvmMetricDeserializer implements KafkaDeserializationSchema<JVMMetricCollection> {

    @Override
    public boolean isEndOfStream(JVMMetricCollection jvmMetricCollection) {
        return false;
    }

    @Override
    public JVMMetricCollection deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {

        return JVMMetricCollection.parseFrom(consumerRecord.value());
    }

    @Override
    public TypeInformation<JVMMetricCollection> getProducedType() {
        return TypeInformation.of(JVMMetricCollection.class);
    }

}

注意: TypeInformation方法必须要写,否则env调用时会显示异常,需要人工指定returnType。

4、注册registerTypeWithKryoSerializer

代码语言:txt
复制
  env.getConfig().registerTypeWithKryoSerializer(JVMMetricCollection.class, ProtobufSerializer.class);

注册完才能在Flink的DataFlow里面识别。

5、FlinkKafkaConsumer启动消费

代码语言:txt
复制
        FlinkKafkaConsumer<JVMMetricCollection> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
                new KafkaJvmMetricDeserializer(), properties);
        DataStream<JVMMetricCollection> stream = env.addSource(kafkaConsumer);

三、问题排查

1、protobuf版本问题

image.png
image.png

回退到protobuf 3.8版本就ok

四、附录

1、maven配置

代码语言:txt
复制
<dependency>
	<groupId>io.grpc</groupId>
	<artifactId>grpc-protobuf</artifactId>
	<version>${grpc.version}</version>
</dependency>

<dependency>
	<groupId>com.google.protobuf</groupId>
	<artifactId>protobuf-java</artifactId>
	<version>${protobuf.version}</version>
	<scope>compile</scope>
</dependency>

<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>chill-protobuf</artifactId>
	<version>0.7.6</version>
	<!-- exclusions for dependency conversion -->
	<exclusions>
		<exclusion>
			<groupId>com.esotericsoftware.kryo</groupId>
			<artifactId>kryo</artifactId>
		</exclusion>
	</exclusions>
</dependency>

更多内容可以关注我的公众号~

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、基础概念
    • 1、protobuf
      • 简介
      • 优缺点
      • 安装protobuf
    • 2、kafka-connector
    • 二、实际案例
      • 1、背景介绍
        • 2、protoc生成java代码
          • 3、构建Deserializer类
            • 4、注册registerTypeWithKryoSerializer
              • 5、FlinkKafkaConsumer启动消费
              • 三、问题排查
                • 1、protobuf版本问题
                • 四、附录
                  • 1、maven配置
                  相关产品与服务
                  大数据处理套件 TBDS
                  腾讯大数据处理套件(Tencent Big Data Suite,TBDS)依托腾讯多年海量数据处理经验,基于云原生技术和泛 Hadoop 生态开源技术提供的可靠、安全、易用的大数据处理平台。 TBDS可在公有云、私有云、非云化环境,根据不同数据处理需求组合合适的存算分析组件,包括 Hive、Spark、HBase、Flink、Presto、Iceberg、Elasticsearch、StarRocks 等,以快速构建企业级数据湖仓。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档