首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >从Kafka中读取并写入parquet中的hdfs

从Kafka中读取并写入parquet中的hdfs
EN

Stack Overflow用户
提问于 2017-08-23 06:01:30
回答 3查看 12.9K关注 0票数 12

我是BigData生态系统的新手,也算是入门了。

我读过几篇关于使用spark流媒体阅读kafka主题的文章,但我想知道是否可以使用spark作业而不是流媒体来阅读kafka?如果是,你们可以帮我指出一些文章或代码片段,可以帮助我入门吗?

我的问题的第二部分是以parquet格式写入hdfs。一旦我读了Kafka,我想我会有一个rdd。将这个rdd转换成一个dataframe,然后把dataframe写成一个parquet文件。这是正确的方法吗?

感谢您的帮助。

谢谢

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2017-08-23 13:24:01

对于从Kafka读取数据并将其写入HDFS的过程,使用Spark批处理作业而不是流,您可以使用Spark Structured Streaming

结构化流是一个构建在Spark SQL引擎上的可伸缩和容错的流处理引擎。您可以像在静态数据上表示批处理计算一样来表示流计算。Spark SQL引擎将负责递增和连续地运行它,并在流数据不断到达时更新最终结果。您可以在Scala、Java、Python或R中使用Dataset/DataFrame API来表示流聚合、事件时间窗口、流到批处理连接等。计算在相同的优化Spark SQL引擎上执行。最后,系统通过检查点和预写日志确保端到端的一次容错保证。简而言之,结构化流提供了快速、可扩展、容错、端到端的一次流处理,而用户不必对流进行推理。

它附带了Kafka作为内置源,也就是说,我们可以从Kafka中调查数据。兼容Kafka broker 0.10.0及以上版本。

为了批量拉取Kafka中的数据,您可以为定义的偏移量范围创建Dataset/DataFrame。

代码语言:javascript
运行
复制
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

源中的每一行都具有以下架构:

代码语言:javascript
运行
复制
| Column           | Type          |
|:-----------------|--------------:|
| key              |        binary |
| value            |        binary |
| topic            |        string |
| partition        |           int |
| offset           |          long |
| timestamp        |          long |
| timestampType    |           int |

现在,要将数据以parquet格式写入HDFS,可以编写以下代码:

代码语言:javascript
运行
复制
df.write.parquet("hdfs://data.parquet")

关于Spark Structured Streaming + Kafka的更多信息,请参考以下指南- Kafka Integration Guide

我希望它能有所帮助!

票数 7
EN

Stack Overflow用户

发布于 2018-08-20 10:37:36

关于这个主题,你已经有了几个很好的答案。

只是想强调一下--当心直接流进镶木拼花桌上。当拼图的行组足够大时(为了简单起见,你可以说文件大小应该在64-256Mb ),以利用字典压缩、bloom过滤器等(一个拼图文件可以有多个行块,通常每个文件有多个行块;尽管行块不能跨越多个拼图文件),拼图的性能就会大放异彩。

如果您直接流式传输到镶木工作台,那么您很可能最终会得到一堆很小的镶木文件(取决于Spark Streaming的小批量大小和数据量)。查询这样的文件可能会非常慢。例如,Parquet可能需要读取所有文件头来协调模式,这是一个很大的开销。如果是这种情况,您将需要一个单独的进程,例如,作为变通方法,它将读取较旧的文件,并将它们写入“合并”(这不是简单的文件级合并,一个进程实际上需要读取所有拼图数据并溢出更大的拼图文件)。

此解决方法可能会扼杀数据“流”的最初目的。你也可以在这里看看其他的技术,比如Apache Kudu,Apache Kafka,Apache Druid,Kinesis等等,它们可以在这里更好地工作。

更新:自从我发布了这个答案后,这里现在有了一个新的强大的玩家- Delta Lakehttps://delta.io/如果你习惯拼花,你会发现Delta非常有吸引力(实际上,Delta是建立在拼花图层+元数据的基础上的)。Delta Lake提供:

Spark上的ACID事务:

  • 可序列化隔离级别可确保读者永远不会看到不一致的数据。
  • 可伸缩元数据处理:利用Spark的分布式处理能力来处理as级表的所有元数据,这些表具有数十亿个文件,具有ease.
  • Streaming和批处理统一: Delta Lake中的表既是批次表,又是流来源和流接收器。流式数据摄取、批处理历史回填、交互式查询全部开箱即用。
  • 架构实施:自动处理架构变化,以防止在

传送过程中插入错误记录:数据版本控制支持回滚、完整的历史审核跟踪以及可重现的机器学习和删除:支持合并、更新和删除操作,以启用更改数据捕获、缓慢变化的维操作、流式更新等复杂用法。

票数 5
EN

Stack Overflow用户

发布于 2017-10-30 23:10:14

使用Kafka Streams。SparkStreaming是一个用词不当的词(它只是幕后的小产品,至少是2.2版本)。

https://eng.verizondigitalmedia.com/2017/04/28/Kafka-to-Hdfs-ParquetSerializer/

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45827664

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档