首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >查找由writeStream操作编写的记录数量: SparkListener OnTaskEnd总是在结构化流中返回0

查找由writeStream操作编写的记录数量: SparkListener OnTaskEnd总是在结构化流中返回0
EN

Stack Overflow用户
提问于 2018-07-25 07:41:18
回答 2查看 653关注 0票数 3

我想得到由writeStream操作编写的记录的数量。我有这个密码。

代码语言:javascript
运行
复制
spark.sparkContext.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    val metrics = taskEnd.taskMetrics
    if(metrics.inputMetrics != None){
      inputRecords += metrics.inputMetrics.recordsRead
    }
    if(metrics.outputMetrics != None) {
      println("OUTPUTMETRICIS NOT NONE")
      recordsWritten += metrics.outputMetrics.recordsWritten
      bytesWritten += metrics.outputMetrics.bytesWritten
    }
    numTasks += 1
    println("recordsWritten = " + recordsWritten)
    println("bytesWritten = " + bytesWritten)
    println("numTasks = " + numTasks)
  }
})

代码进入块,但是由recordsWritten编写的输入记录的值始终为0。

编辑:升级到2.3.1,因为有一个修复。仍然给出0

代码语言:javascript
运行
复制
Streaming query made progress: {
  "id" : "9c345af0-042c-4eeb-80db-828c5f69e442",
  "runId" : "d309f7cf-624a-42e5-bb54-dfb4fa939228",
  "name" : "WriteToSource",
  "timestamp" : "2018-07-30T14:20:33.486Z",
  "batchId" : 3,
  "numInputRows" : 3511,
  "inputRowsPerSecond" : 2113.786875376279,
  "processedRowsPerSecond" : 3013.733905579399,
  "durationMs" : {
    "addBatch" : 1044,
    "getBatch" : 29,
    "getOffset" : 23,
    "queryPlanning" : 25,
    "triggerExecution" : 1165,
    "walCommit" : 44
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[proto2-events-identification-carrier]]",
    "startOffset" : {
      "proto2-events-identification-carrier" : {
        "2" : 22400403,
        "1" : 22313194,
        "0" : 22381260
      }
    },
    "endOffset" : {
      "proto2-events-identification-carrier" : {
        "2" : 22403914,
        "1" : 22313194,
        "0" : 22381260
      }
    },
    "numInputRows" : 3511,
    "inputRowsPerSecond" : 2113.786875376279,
    "processedRowsPerSecond" : 3013.733905579399
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@1350f304"
  }
}

显示这一点,但我无法在代码中得到它。

EN

回答 2

Stack Overflow用户

发布于 2018-07-28 00:21:47

在2.3.1版本中修复的火花结构化流的是一只虫子在FileStreamSink中。

作为一种解决办法,您可以在将数据写入接收器之前使用蓄能器

票数 1
EN

Stack Overflow用户

发布于 2022-02-17 17:55:19

星火结构流有一个专用的侦听器位置,用于StreamingQueryManager中的查询进度。

代码语言:javascript
运行
复制
 spark.streams.addListener(new StreamingQueryListener {
    override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
      //code
    }

    override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
      println("Total number of records written to the sink: "+event.progress.sink.numOutputRows)
    }

    override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
      //code
    }
})
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51513411

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档