专栏首页Spark学习技巧Structured Streaming的任意状态操作

Structured Streaming的任意状态操作

很多使用案例需要比聚合更高级的状态操作。例如,在很多案例中,你必须跟踪来自于事件数据流的会话操作。为了处理这种会话机制,必须存储任意类型的数据作为状态,同时每次触发使用数据流事件对状态做任意的状态操作。从spark2.2开始,可以使用mapGroupsWithState和更强大操作flatMapGroupsWithState。两个操作都允许你对分组的datasets使用自定义代码去更新自定义状态。

mapGroupsWithState

该方法使用紧跟着groupByKey,对每个key的一个group组数据进行处理,同时会保存每个group的状态。结果dataset也传入状态更新函数返回值的封装。对于一个batch dataset,该函数只会为每个分组调用一次。对于streaming dataset,该函数会在每次trigger的时候触发,同时会更新每个组的状态。

@Experimental
@InterfaceStability.Evolving
def mapGroupsWithState[S, U](
    func: MapGroupsWithStateFunction[K, V, S, U],
    stateEncoder:Encoder[S],
    outputEncoder:Encoder[U],
    timeoutConf:GroupStateTimeout): Dataset[U] ={
  mapGroupsWithState[S, U](timeoutConf)(
    (key: K, it:Iterator[V], s: GroupState[S])=> func.call(key, it.asJava, s)
  )(stateEncoder, outputEncoder)
}

S,U是两个类型参数。S代表的是用户自定义状态类型,该类型必须可以编码成Spark SQL类型。U代表的是输出对象的类型,该类型也必须可以编码为Spark SQL类型。

  • func就是对每个group进行处理,更新状态并返回结果的函数。
  • stateEncoder是状态类型参数S的编码器。
  • outputEncoder是输出类型参数U的编码器。
  • timeoutConf一段时间内未接收数据的组的超时配置。

具体案例

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package bigdata.spark.StructuredStreaming.KafkaSourceOperator

import java.sql.Timestamp

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.get_json_object
import org.apache.spark.sql.streaming._

object StructuredSessionizationMap {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
      .set("yarn.resourcemanager.hostname", "localhost")
      .set("spark.executor.instances","2")
      .set("spark.default.parallelism","4")
      .set("spark.sql.shuffle.partitions","4")
      .setJars(List("/opt/sparkjar/bigdata.jar"
        ,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
        ,"/opt/jars/kafka-clients-0.10.2.2.jar"
        ,"/opt/jars/kafka_2.11-0.10.2.2.jar"
        ,"/opt/jars/spark-sql-kafka-0-10_2.11-2.0.2.jar"))

    val spark = SparkSession
      .builder
      .appName("StructuredKafkaWordCount")
      .config(sparkConf)
      .getOrCreate()
    import spark.implicits._

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers","localhost:9093")
      .option("subscribe", "jsontest")
      .load()
    val words = df.selectExpr("CAST(value AS STRING)")

    val fruit = words.select(
      get_json_object($"value", "$.time").alias("timestamp").cast("long")
      , get_json_object($"value", "$.fruit").alias("fruit"))

    val events = fruit
//      .select(fruit("timestamp")
//        .cast("timestamp"), fruit("fruit"))
      .as[(Long,String )]
      .map { case (timestamp, fruitCol) =>
      Event(sessionId = fruitCol, timestamp)
    }

    // Sessionize the events. Track number of events, start and end timestamps of session, and
    // and report session updates.
    val sessionUpdates = events
      .groupByKey(event => event.sessionId)
      .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) {

        case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>

          // If timed out, then remove session and send final update
          if (state.hasTimedOut) {
            val finalUpdate =
              SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true)
            state.remove()
            finalUpdate
          } else {
            // Update start and end timestamps in session
            val timestamps = events.
              map(_.timestamp)
              .toSeq
            val updatedSession = if (state.exists) {
              val oldSession = state.get
              SessionInfo(
                oldSession.numEvents + timestamps.size,
                oldSession.startTimestampMs,
                math.max(oldSession.endTimestampMs, timestamps.max))
            } else {
              SessionInfo(timestamps.size, timestamps.min, timestamps.max)
            }
            state.update(updatedSession)

            // Set timeout such that the session will be expired if no data received for 10 seconds
            state.setTimeoutDuration("10 seconds")
            SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false)
          }
      }

    // Start running the query that prints the session updates to the console
    val query = sessionUpdates
      .writeStream
      .outputMode(OutputMode.Update())
      .format("console")
      .start()
    query.awaitTermination()
  }
}
/** User-defined data type representing the input events */
case class Event(sessionId: String, timestamp: Long)

/**
 * User-defined data type for storing a session information as state in mapGroupsWithState.
 *
 * @param numEvents        total number of events received in the session
 * @param startTimestampMs timestamp of first event received in the session when it started
 * @param endTimestampMs   timestamp of last event received in the session before it expired
 */
case class SessionInfo(
    numEvents: Int,
    startTimestampMs: Long,
    endTimestampMs: Long) {

  /** Duration of the session, between the first and last events */
  def durationMs: Long = endTimestampMs - startTimestampMs
}

/**
 * User-defined data type representing the update information returned by mapGroupsWithState.
 *
 * @param id          Id of the session
 * @param durationMs  Duration the session was active, that is, from first event to its expiry
 * @param numEvents   Number of events received by the session while it was active
 * @param expired     Is the session active or expired
 */
case class SessionUpdate(
    id: String,
    durationMs: Long,
    numEvents: Int,
    expired: Boolean)

// scalastyle:on println

本文分享自微信公众号 - Spark学习技巧(bigdatatip),作者:浪院长

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-07-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Kafka的2种日志清理策略感受一下

    Kafka是一个基于日志的流处理平台,一个topic可以有多个分区(partition),分区是复制的基本单元,在单节点上,一个分区的数据文件可以存储在多个磁盘...

    Spark学习技巧
  • Kafka源码系列之kafka如何实现高性能读写的

    本文依然是以kafka 0.8.2.2的源码为例进行讲解。 一,kafka高性能的原因 Kafka吞吐量是大家公认的高,那么这是为什么呢?个人总结为以下三点: ...

    Spark学习技巧
  • Hbase源码系列之scan源码解析及调优

    一,hbase的scan基本使用问题介绍 Hbase的Scan方法是基于Rowkey进行数据扫描的,过程中client会将我们的请求,转化为向服务端的RPC请求...

    Spark学习技巧
  • K8S学习笔记之Flannel解读

    我们知道docker官方并没有提供多主机的容器通信方案,单机网络的模式主要有host,container,brige,none。none这种模式,顾名思义就是d...

    Jetpropelledsnake21
  • Shodan渗透测试手册

    Shodan是一个让你探索互联网的工具;发现连接设备或网络服务、监控网络安全、进行全局统计等。

    半月弧
  • 互连系统的不透明度保持有限抽象的合成(EESS.SY)

    在本文中,我们提出了一种组合方法来构造离散时间非线性控制系统网络的保持不透明度的有限抽象(也称为符号模型)。特别是,我们引入了新的仿真功能概念,这些功能描述了控...

    蔡小雪7100294
  • 大数据、物联网未来在山东值得期待 | 调研

    T客汇官网:tikehui.com 撰文 | 窦悦怡 ? 移动信息化研究中心联合山东CIO联盟,针对山东省企业实践创新应用的情况,移动信息化研究中心就山东省企业...

    人称T客
  • TCP/IP 之IP数据报ip分片ip分片过程

    我们将在这篇文章详细介绍ip数据报的格式 首先,ip数据报分为两部分,首部和数据

    desperate633
  • win10 uwp xaml 绑定接口

    早上快乐 就在你的心问了我一个问题,他使用的属性是显式继承,但是无法在xaml绑定

    林德熙
  • AI Feynman 2.0:利用图模块性的帕累托最优符号回归(CS LG)

    本文提出了一种改进的符号回归方法,该方法旨在将数据拟合到Pareto最优公式,即在给定复杂度下具有最佳精度。它改进了以往的最新技术,对噪声和不良数据的鲁棒性提高...

    用户7454091

扫码关注云+社区

领取腾讯云代金券