首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >为什么流查询不向HDFS写入数据?

为什么流查询不向HDFS写入数据?
EN

Stack Overflow用户
提问于 2018-12-13 03:53:21
回答 1查看 352关注 0票数 1

我在Spark 2.3.1中使用Spark Structured Streaming,下面是我的代码:

代码语言:javascript
运行
复制
val sparkSession = SparkSession
.builder
.appName("xxx")
.config("spark.serializer", 
  "org.apache.spark.serializer.KryoSerializer")
.config("spark.rpc.netty.dispatcher.numThreads", "2")
.config("spark.shuffle.compress", "true")
.config("spark.rdd.compress", "true")
.config("spark.sql.inMemoryColumnarStorage.compressed", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.broadcast.compress", "true")
.config("spark.sql.hive.thriftServer.singleSession", "true")
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.streaming.receiver.writeAheadLog.enable","true")
.enableHiveSupport()
.getOrCreate()

val rawStreamDF = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", <value>)
.option("subscribe", <value>)
.option("key.serializer", <value>)
.option("value.serializer", <value>)
.option("startingOffsets", "earliest")
.option("auto.offset.reset",earliest)
.option("group.id",  <value>)
.option("fetchOffset.numRetries", 3)
.option("fetchOffset.retryIntervalMs", 10)
.option("IncludeTimestamp", true)
.option("enable.auto.commit",  <value>)
.option("security.protocol",  <value>)
.option("ssl.keystore.location",  <value>)
.option("ssl.keystore.password",  <value>)
.option("ssl.truststore.location",  <value>)
.option("ssl.truststore.password",  <value>)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

我正在尝试将数据写入hdfs_path中的一个文件:

代码语言:javascript
运行
复制
val query = rawStreamDF
  .writeStream
  .format("json")
  .option("startingOffsets", "latest")
  .option("path", "STREAM_DATA_PATH")
  .option("checkpointLocation", "checkpointPath")
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .outputMode("append")
  .start

Logger.log.info("Status:"+query.status)
print("Streaming Status1:"+query.status)

query.awaitTermination(450)

但是,我得到的query.status值如下:

代码语言:javascript
运行
复制
Status:{ "message" : "Initializing sources", "isDataAvailable" : false, "isTriggerActive" : false }

你能告诉我哪里出错了吗?

EN

回答 1

Stack Overflow用户

发布于 2018-12-24 16:54:51

但是,我得到query.status的值,如下所示。

状态:{ "message“:”初始化源“,"isDataAvailable”: false,"isTriggerActive“:false}

你能告诉我哪里出错了吗?

看起来一切都很好。Spark Structured streaming的流引擎似乎还没有启动查询,但只需将其标记为在单独的线程上启动。

如果您创建了一个单独的线程来监视结构化查询,您会注意到,在处理第一批之后,状态会立即发生变化。

请参考Structured Streaming Programming Guide的官方文档。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53750415

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档