前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink消费kafka如何获取每条消息对应的topic

Flink消费kafka如何获取每条消息对应的topic

作者头像
shengjk1
发布2020-03-26 18:02:10
2.3K0
发布2020-03-26 18:02:10
举报
文章被收录于专栏:码字搬砖

1.首先自定义个 KafkaDeserializationSchema

代码语言:javascript
复制
public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
	@Override
	//nextElement 是否表示流的最后一条元素,我们要设置为 false ,因为我们需要 msg 源源不断的被消费
	public boolean isEndOfStream(Tuple2<String, String> nextElement) {
		return false;
	}
	
	@Override
	// 反序列化 kafka 的 record,我们直接返回一个 tuple2<kafkaTopicName,kafkaMsgValue>
	public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
		return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));
	}
	
	@Override
	//告诉 Flink 我输入的数据类型, 方便 Flink 的类型推断
	public TypeInformation<Tuple2<String, String>> getProducedType() {
		return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
	}
}

2.使用自定义的 KafkaDeserializationSchema 进行消费

代码语言:javascript
复制
public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
				
		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "localhost:9092");
		properties.setProperty("group.id", "test");
		
		FlinkKafkaConsumer<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer<>("test", new CustomKafkaDeserializationSchema(), properties);
		kafkaConsumer.setStartFromEarliest();
		env.addSource(kafkaConsumer).flatMap(new FlatMapFunction<Tuple2<String, String>, Object>() {
			@Override
			public void flatMap(Tuple2<String, String> value, Collector<Object> out) throws Exception {
				System.out.println("topic==== " + value.f0);
			}
		});
		
		// execute program
		env.execute("Flink Streaming Java API Skeleton");
	} 
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/03/25 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档