首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >以kafka为来源的spark结构化流媒体如何识别消息来源?

以kafka为来源的spark结构化流媒体如何识别消息来源?
EN

Stack Overflow用户
提问于 2019-06-24 15:19:34
回答 1查看 144关注 0票数 1

我有一个使用案例,在这个案例中,我必须在spark structured streaming中订阅多个kafka主题。然后,我必须解析每条消息,并从中形成一个增量湖表。我已经使解析器和消息(以xml的形式)正确地解析并形成了delta-lake表。然而,到目前为止,我只订阅了一个主题。我想订阅多个主题,根据主题,它应该转到专门为这个特定主题制作的解析器。因此,基本上我希望在处理消息时标识所有消息的主题名称,以便可以将它们发送到所需的解析器并进一步处理。

这就是我访问来自不同主题的消息的方式。但是,我不知道如何在处理传入消息时识别它们的来源。

代码语言:javascript
复制
 val stream_dataframe = spark.readStream
  .format(ConfigSetting.getString("source"))
  .option("kafka.bootstrap.servers", ConfigSetting.getString("bootstrap_servers"))
  .option("kafka.ssl.truststore.location", ConfigSetting.getString("trustfile_location"))
  .option("kafka.ssl.truststore.password", ConfigSetting.getString("truststore_password"))
  .option("kafka.sasl.mechanism", ConfigSetting.getString("sasl_mechanism"))
  .option("kafka.security.protocol", ConfigSetting.getString("kafka_security_protocol"))
  .option("kafka.sasl.jaas.config",ConfigSetting.getString("jass_config"))
  .option("encoding",ConfigSetting.getString("encoding"))
  .option("startingOffsets",ConfigSetting.getString("starting_offset_duration"))
  .option("subscribe",ConfigSetting.getString("topics_name"))
  .option("failOnDataLoss",ConfigSetting.getString("fail_on_dataloss")) 
  .load()


 var cast_dataframe = stream_dataframe.select(col("value").cast(StringType))

 cast_dataframe =  cast_dataframe.withColumn("parsed_column",parser(col("value"))) // Parser is the udf, made to parse the xml from the topic. 

在spark structured streaming中处理消息时,如何识别消息的主题名称?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-06-24 15:23:17

根据official documentation (重点是我的)

源中的每一行都具有以下架构:

列类型

密钥二进制

值二进制

主题字符串

分区int

..。

如您所见,输入主题是输出模式的一部分,无需任何特殊操作即可访问。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56731576

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档