首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >火花onQueryProgress复制

火花onQueryProgress复制
EN

Stack Overflow用户
提问于 2017-08-15 07:53:18
回答 1查看 282关注 0票数 0

当我使用StreamingQueryListener监视结构化流时,我在onQueryProgress上发现了重复

代码语言:javascript
运行
复制
  override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {

        if(queryProgress.progress.numInputRows!=0) {

          println("Query made progress: " + queryProgress.progress)

        }

结果是

代码语言:javascript
运行
复制
Query made progress: {
  "id" : "e76a8789-738c-49f6-b7f4-d85356c28600",
  "runId" : "d8ce0fad-db38-4566-9198-90169efeb2d8",
  "name" : null,
  "timestamp" : "2017-08-15T07:28:27.077Z",
  "numInputRows" : 1,
  "processedRowsPerSecond" : 0.3050640634533252,
  "durationMs" : {
    "addBatch" : 2452,
    "getBatch" : 461,
    "queryPlanning" : 276,
    "triggerExecution" : 3278
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[test1]]",
    "startOffset" : {
      "test1" : {
        "0" : 19
      }
    },
    "endOffset" : {
      "test1" : {
        "0" : 20
      }
    },
    "numInputRows" : 1,
    "processedRowsPerSecond" : 0.3050640634533252
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ForeachSink@3ec8a100"
  }
}
Query made progress: {
  "id" : "a5b1f905-5575-43a7-afe9-dead0e4de2a7",
  "runId" : "8caea640-8772-4aab-ab13-84c1e952fb77",
  "name" : null,
  "timestamp" : "2017-08-15T07:28:27.075Z",
  "numInputRows" : 1,
  "processedRowsPerSecond" : 0.272108843537415,
  "durationMs" : {
    "addBatch" : 2844,
    "getBatch" : 445,
    "queryPlanning" : 293,
    "triggerExecution" : 3672
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[test1]]",
    "startOffset" : {
      "test1" : {
        "0" : 19
      }
    },
    "endOffset" : {
      "test1" : {
        "0" : 20
      }
    },
    "numInputRows" : 1,
    "processedRowsPerSecond" : 0.272108843537415
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ForeachSink@6953f971"
  }
}

为什么我发送一个消息,那么它有两个不同的结果。

  1. 我的主要程序问题是,我应该使用星火每一个特定的5分钟,如00:00-00:05,00:05-00:10等等。一天对卡尔有288点。
  2. 因此,我的想法是使用结构化流来过滤特定的数据,而不是过滤器是存储数据库,下一次是将数据库和结构化流一起读取。
  3. 因此,我应该听每一批更新我的时间来读取数据库。
EN

回答 1

Stack Overflow用户

发布于 2017-08-15 21:16:09

这两个事件来自不同的查询。您可以看到idrunId是不同的。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45688611

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档