专栏首页码字搬砖Flink消费kafka如何获取每条消息对应的topic

Flink消费kafka如何获取每条消息对应的topic

1.首先自定义个 KafkaDeserializationSchema

public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
	@Override
	//nextElement 是否表示流的最后一条元素,我们要设置为 false ,因为我们需要 msg 源源不断的被消费
	public boolean isEndOfStream(Tuple2<String, String> nextElement) {
		return false;
	}
	
	@Override
	// 反序列化 kafka 的 record,我们直接返回一个 tuple2<kafkaTopicName,kafkaMsgValue>
	public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
		return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));
	}
	
	@Override
	//告诉 Flink 我输入的数据类型, 方便 Flink 的类型推断
	public TypeInformation<Tuple2<String, String>> getProducedType() {
		return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
	}
}

2.使用自定义的 KafkaDeserializationSchema 进行消费

public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
				
		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "localhost:9092");
		properties.setProperty("group.id", "test");
		
		FlinkKafkaConsumer<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer<>("test", new CustomKafkaDeserializationSchema(), properties);
		kafkaConsumer.setStartFromEarliest();
		env.addSource(kafkaConsumer).flatMap(new FlatMapFunction<Tuple2<String, String>, Object>() {
			@Override
			public void flatMap(Tuple2<String, String> value, Collector<Object> out) throws Exception {
				System.out.println("topic==== " + value.f0);
			}
		});
		
		// execute program
		env.execute("Flink Streaming Java API Skeleton");
	} 

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark Streaming 中使用 zookeeper 保存 offset 并重用 Java版

    最近中使用spark Streaming +kafka,由于涉及到金额,所以需要保证at only one, 而网上关于java版的kafka offset...

    shengjk1
  • 读 Guide to Java String Pool

    1.相同的字面意义的字符创仅仅会在 String Pool 中存放一个 2.当我们创建 String 时, 比如 String a=“aa”; JVM 首先会...

    shengjk1
  • 字符串的压缩以及解压缩

    shengjk1
  • java-小程序微信支付

    哈喽 我是你们的KingYiFan,一直说把微信支付给分享出来一直没有机会。终于闲下来了。听着音乐给你们分享一下。不懂可以随时联系我。。

    猿码优创
  • 手写dubbo框架4-服务治理(服务发现-zookeeper)

    博客中代码地址:https://github.com/farliu/farpc.git

    far
  • Map排序

    Map排序的方式有很多种,这里记录下自己总结的两种比较常用的方式:按键排序(sort by key), 按值排序(sort by value)。 按键排序(...

    xiangzhihong
  • Java基础知识之Scanner类和String类学习,讲明白了,适合初学者

    1、Scanner 的概述和方法介绍 A:Scanner 的概述 B:Scanner 的构造方法原理 Scanner(InputStream source) S...

    用户1289394
  • Flink 中这样管理配置,你知道?

    如果你了解 Apache Flink 的话,那么你应该熟悉该如何像 Flink 发送数据或者如何从 Flink 获取数据。但是在某些情况下,我们需要将配置数据发...

    zhisheng
  • 数据字典 加载到 web 项目的 application 全局

    北漂的我
  • java学习day17 --排序,时间比较,Ip转换,JSON格式

    2018.6.29 1.读取JSON数据 1)首先需要有 fastjson-1.2.7.jar 包 2)创建一个数据类型的对象,用来封装 ...

    曼路

扫码关注云+社区

领取腾讯云代金券