我想得到由writeStream操作编写的记录的数量。我有这个密码。
spark.sparkContext.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val metrics = taskEnd.taskMetrics
if(metrics.inputMetrics != None){
inputRecords += metrics.inputMetrics.recordsRead
}
if(metrics.outputMetrics != None) {
println("OUTPUTMETRICIS NOT NONE")
recordsWritten += metrics.outputMetrics.recordsWritten
bytesWritten += metrics.outputMetrics.bytesWritten
}
numTasks += 1
println("recordsWritten = " + recordsWritten)
println("bytesWritten = " + bytesWritten)
println("numTasks = " + numTasks)
}
})
代码进入块,但是由recordsWritten编写的输入记录的值始终为0。
编辑:升级到2.3.1,因为有一个修复。仍然给出0
Streaming query made progress: {
"id" : "9c345af0-042c-4eeb-80db-828c5f69e442",
"runId" : "d309f7cf-624a-42e5-bb54-dfb4fa939228",
"name" : "WriteToSource",
"timestamp" : "2018-07-30T14:20:33.486Z",
"batchId" : 3,
"numInputRows" : 3511,
"inputRowsPerSecond" : 2113.786875376279,
"processedRowsPerSecond" : 3013.733905579399,
"durationMs" : {
"addBatch" : 1044,
"getBatch" : 29,
"getOffset" : 23,
"queryPlanning" : 25,
"triggerExecution" : 1165,
"walCommit" : 44
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[proto2-events-identification-carrier]]",
"startOffset" : {
"proto2-events-identification-carrier" : {
"2" : 22400403,
"1" : 22313194,
"0" : 22381260
}
},
"endOffset" : {
"proto2-events-identification-carrier" : {
"2" : 22403914,
"1" : 22313194,
"0" : 22381260
}
},
"numInputRows" : 3511,
"inputRowsPerSecond" : 2113.786875376279,
"processedRowsPerSecond" : 3013.733905579399
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@1350f304"
}
}
显示这一点,但我无法在代码中得到它。
发布于 2018-07-27 16:21:47
发布于 2022-02-17 09:55:19
星火结构流有一个专用的侦听器位置,用于StreamingQueryManager中的查询进度。
spark.streams.addListener(new StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
//code
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
println("Total number of records written to the sink: "+event.progress.sink.numOutputRows)
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
//code
}
})
https://stackoverflow.com/questions/51513411
复制相似问题