首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >spark流检查点恢复非常非常慢

spark流检查点恢复非常非常慢
EN

Stack Overflow用户
提问于 2016-07-15 15:43:57
回答 3查看 2.1K关注 0票数 21
  • 目标:通过spark streaming.
  • Situation:从Kinesis读取数据并以拼图格式存储到S3应用程序最初运行良好,运行批处理时间为1小时,平均处理时间不到30分钟。由于某些原因,假设应用程序崩溃,我们尝试从检查点重新启动。现在,处理过程将持续很长时间,并且不会继续进行。我们尝试以1分钟的批处理间隔测试相同的内容,处理运行正常,批处理需要1.2分钟才能完成。当我们从检查点恢复时,每批大约需要15分钟。
  • 注意:我们对检查点使用s3,每个执行器使用19g内存和3个内核

附加截图:

首次运行-在检查点恢复之前

尝试从检查点恢复的

Config.scala

代码语言:javascript
复制
object Config {

  val sparkConf = new SparkConf


  val sc = new SparkContext(sparkConf)

  val sqlContext = new HiveContext(sc)


  val eventsS3Path = sc.hadoopConfiguration.get("eventsS3Path")
  val useIAMInstanceRole = sc.hadoopConfiguration.getBoolean("useIAMInstanceRole",true)

  val checkpointDirectory =  sc.hadoopConfiguration.get("checkpointDirectory")

//  sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

  DateTimeZone.setDefault(DateTimeZone.forID("America/Los_Angeles"))

  val numStreams = 2

  def getSparkContext(): SparkContext = {
    this.sc
  }

  def getSqlContext(): HiveContext = {
    this.sqlContext
  }





}

S3Basin.scala

代码语言:javascript
复制
object S3Basin {
  def main(args: Array[String]): Unit = {
    Kinesis.startStreaming(s3basinFunction _)
  }

  def s3basinFunction(streams : DStream[Array[Byte]]): Unit ={
    streams.foreachRDD(jsonRDDRaw =>{
      println(s"Old partitions ${jsonRDDRaw.partitions.length}")
      val jsonRDD = jsonRDDRaw.coalesce(10,true)
      println(s"New partitions ${jsonRDD.partitions.length}")

      if(!jsonRDD.isEmpty()){
        val sqlContext =  SQLContext.getOrCreate(jsonRDD.context)

        sqlContext.read.json(jsonRDD.map(f=>{
          val str = new String(f)
          if(str.startsWith("{\"message\"")){
            str.substring(11,str.indexOf("@version")-2)
          }
          else{
            str
          }
        })).registerTempTable("events")

        sqlContext.sql(
          """
            |select
            |to_date(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_date,
            |hour(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_hour,
            |*
            |from events
          """.stripMargin).coalesce(1).write.mode(SaveMode.Append).partitionBy("event_date", "event_hour","verb").parquet(Config.eventsS3Path)


        sqlContext.dropTempTable("events")
      }
    })
  }
}

Kinesis.scala

代码语言:javascript
复制
object Kinesis{


  def functionToCreateContext(streamFunc: (DStream[Array[Byte]]) => Unit): StreamingContext = {
    val streamingContext = new StreamingContext(Config.sc, Minutes(Config.sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1)))   // new context
    streamingContext.checkpoint(Config.checkpointDirectory)   // set checkpoint directory
    val sc = Config.getSparkContext

    var awsCredentails : BasicAWSCredentials = null
    val kinesisClient = if(Config.useIAMInstanceRole){
      new AmazonKinesisClient()
    }
    else{
      awsCredentails = new BasicAWSCredentials(sc.hadoopConfiguration.get("kinesis.awsAccessKeyId"),sc.hadoopConfiguration.get("kinesis.awsSecretAccessKey"))
      new AmazonKinesisClient(awsCredentails)
    }


    val endpointUrl = sc.hadoopConfiguration.get("kinesis.endpointUrl")
    val appName = sc.hadoopConfiguration.get("kinesis.appName")

    val streamName = sc.hadoopConfiguration.get("kinesis.streamName")

    kinesisClient.setEndpoint(endpointUrl)
    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size

    val batchInterval = Minutes(sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1))

    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval

    // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
    // DynamoDB of the same region as the Kinesis stream
    val regionName = sc.hadoopConfiguration.get("kinesis.regionName")


    val kinesisStreams = (0 until Config.numStreams).map { i =>
        println(s"creating stream for $i")
        if(Config.useIAMInstanceRole){
          KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
            InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)

        }else{
          KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
            InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2,awsCredentails.getAWSAccessKeyId,awsCredentails.getAWSSecretKey)

        }
      }

    val unionStreams = streamingContext.union(kinesisStreams)
    streamFunc(unionStreams)

    streamingContext
  }


  def startStreaming(streamFunc: (DStream[Array[Byte]]) => Unit) = {

    val sc = Config.getSparkContext

    if(sc.defaultParallelism < Config.numStreams+1){
      throw  new Exception(s"Number of shards = ${Config.numStreams} , number of processor = ${sc.defaultParallelism}")
    }

    val streamingContext =  StreamingContext.getOrCreate(Config.checkpointDirectory, () => functionToCreateContext(streamFunc))


//    sys.ShutdownHookThread {
//      println("Gracefully stopping Spark Streaming Application")
//      streamingContext.stop(true, true)
//      println("Application stopped greacefully")
//    }
//

    streamingContext.start()
    streamingContext.awaitTermination()


  }




}

DAG

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2017-03-01 16:24:28

提出了一个Jira问题:https://issues.apache.org/jira/browse/SPARK-19304

问题是因为我们每次迭代读取的数据比所需的数据多,然后丢弃这些数据。这可以通过增加对getResults aws call的限制来避免。

修复:https://github.com/apache/spark/pull/16842

票数 4
EN

Stack Overflow用户

发布于 2016-09-14 00:38:20

重新启动出现故障的驱动程序时,会发生以下情况:

预写恢复计算-检查点信息用于重新启动驱动程序、重建上下文和重新启动所有receivers.

  • Recover块元数据-继续处理所需的所有块的元数据将是预写未完成的作业-对于由于故障而尚未完成处理的批处理,将使用恢复的块元数据重新生成RDDs和相应的作业。

  • 读取日志中保存的块-执行这些作业时,将直接从预写日志中读取块数据。这将恢复可靠地保存到日志中的所有必要数据。

  • 重新发送未确认的数据-源将重新发送失败时未保存到日志中的缓冲数据。因为它还没有被接收者确认。

由于所有这些步骤都是在驱动程序中执行的,所以您的0事件批处理会花费很多时间。这应该发生在第一批中,然后事情才会正常。

参考here

票数 1
EN

Stack Overflow用户

发布于 2017-01-20 15:00:38

我以前也遇到过类似的问题,我的应用程序变得越来越慢。

使用rdd后尝试释放内存,调用rdd.unpersist() https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#unpersist(boolean)

或通过spark.streaming.backpressure.enabled连接到true

http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval

http://spark.apache.org/docs/latest/streaming-programming-guide.html#requirements

另外,检查你的locality设置,也许移动的数据太多了。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38390567

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档