专栏首页Spark学习技巧Spark Structured Streaming的高效处理-RunOnceTrigger

Spark Structured Streaming的高效处理-RunOnceTrigger

传统意义上,当人们想到流处理时,诸如”实时”,”24*7”或者”always on”之类的词语就会浮现在脑海中。生产中可能会遇到这种情况,数据仅仅会在固定间隔到达,比如每小时,或者每天。对于这些情况,对这些数据进行增量处理仍然是有益的。但是在集群中运行一个24*7的Streaming job就显得有些浪费了,这时候仅仅需要每天进行少量的处理即可受益。

幸运的是,在spark 2.2版本中通过使用 Structured Streaming的Run Once trigger特性,可获得Catalyst Optimizer带来的好处和集群运行空闲job带来的成本节约,这两方面好处。

一,Structured Streaming的Triggers

在Structured Streaming中,Trigger用来指定Streaming 查询产生结果的频率。一旦Trigger触发,Spark将会检查是否有新数据可用。如果有新数据,查询将增量的从上次触发的地方执行。如果没有新数据,Stream继续睡眠,直到下次Trigger触发。

Structured Streaming的默认行为尽可能低延迟地运行,trigger会在上次trigger触发结束之后立即运行。针对一些有低延迟要求的使用案例,Structured Streaming支持ProcessingTime trigger,也即将会用用户提供的时间间隔,例如每分钟,去触发一次查询。

这虽然很好,但是也免不了24*7运行。相反,RunOnce Trigger仅仅会执行一次查询,然后停止查询。

Trigger在你启动Streams的时候指定。

import org.apache.spark.sql.streaming.Trigger

 // Load your Streaming DataFrame
 val sdf = spark.readStream.format("json").schema(my_schema).load("/in/path")
// Perform transformations and then write…
 sdf.writeStream.trigger(Trigger.Once).format("parquet").start("/out/path")

二,RunOnce相比Batch高效之处

1,Bookkeeping

当运行一个执行增量更新的批处理作业时,通常要处理哪些数据是更新的,哪些是该处理的,哪些是不该处理的。Structured Streaming已经为你做好了这一切,在处理一般流式应用程序时,你应该只关心业务逻辑,而不是低级的Bookkeeping。

2,表级原子性

大数据处理引擎,最重要的性质是它如何容忍失误和失败。ETL作业可能(实际上常会)失败。如果,你的工作失败了,那么你需要确保你的工作产出被清理干净,否则在你的下一次成功的工作之后你会得到重复的或者垃圾的数据。使用Structured Streaming编写基于文件的表时,Structured Streaming将每个作业创建的所有文件在每次成功的出发后提交到log中。当Spark重新读取表时,会通过log来识别哪些文件是有效的。这样可以确保因失败引入的垃圾不会被下游的应用程序所消费。

3,夸runs的状态操作

如果,你的数据流有可能产生重复的记录,但是你要实现一次语义,如何在batch处理中来实现呢?通过Structured Streaming,可以使用dropDuplicates()来实现去重。配置watermark足够长,包含若干Streaming job的runs,可以保证你不会夸runs处理到重复的数据。

4,成本节约

运行一个24*7的Streamingjob很浪费。可能有些情况,数据计算有些延迟是可以接受的,或者数据本身就会以每小时或者每天为周期产生。为了获得Structured Streaming所有上述描述的好处,你可能会人为需要一直占用集群运行程序,但是现在,使用仅执行一次的Trigger,就可以不必要一直占用集群了。

三,总结

在这篇文章中,引入了,使用Structured Streaming获取的仅执行一次的Trigger。虽然执行一此Trigger类似于运行一个批处理的job,但我们讨论了它在批处理作业方法之上的所有优点,特别是:

1,管理所有处理数据的bookkeeping

2,提供基于文件的表级别的原子ETL操作。

3,确保夸Run操作,可以轻松去重。

4,可以节省成本。通过避免运行没必要24*7运行的流处理。

跑Spark Streaming还是跑Structured Streaming,全在你一念之间。

(此处少了一个Job Scheduler,你留意到了么?)

本文分享自微信公众号 - Spark学习技巧(bigdatatip),作者:浪尖

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2017-09-29

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark高级操作之json复杂和嵌套数据结构的操作一

    一,基本介绍 本文主要讲spark2.0版本以后存在的Sparksql的一些实用的函数,帮助解决复杂嵌套的json数据格式,比如,map和嵌套结构。Spark2...

    Spark学习技巧
  • Kafka源码系列之源码分析zookeeper在kafka的作用

    Spark学习技巧
  • JAVA程序员面试总结,高手整理加强版

    第一阶段:三年 我认为三年对于程序员来说是第一个门槛,这个阶段将会淘汰掉一批不适合写代码的人。这一阶段,我们走出校园,迈入社会,成为一名程序员,正式从书本 上的...

    Spark学习技巧
  • ffmpeg -- 时间基准

    ffmpeg中的内部计时单位(时间基),ffmepg中的所有时间都是于它为一个单位,AV_TIME_BASE定义为:

    小蚂蚁与大象
  • Spy Banker木马新变种Telax利用谷歌云服务器进行传播

    安全公司Zscaler发现一种新型恶意活动,它基于一种新型的Spy Banker网银恶意软件Telax,利用谷歌云服务器来驻留木马下载器,且主要通过社交媒体平台...

    FB客服
  • nodejs的路径问题

    最近公司的一个开发项目,后端用的是nodejs。这两天需要打包给客户演示,就让公司一个小伙把之前3D机房的打包工具移植过来。打包之后,发现原本在开发环境下的跑的...

    用户3158888
  • Redis 到底能解决哪些问题

    最初的需求非常简单,我们有一个提供热点新闻列表的API:http://api.xxx.com/hot-news,API 的消费者抱怨说每次请求都要 2 秒左右才...

    良月柒
  • Mockplus3.5.0.1新增标注功能

    Mockplus3.5.0.1版本中,新增了标注功能。多种标注模式,智能生成,随时查看。原型设计效率更高。

    奔跑的小鹿
  • Pytorch Sampler详解

    其原理是首先在初始化的时候拿到数据集data_source,之后在__iter__方法中首先得到一个和data_source一样长度的range可迭代器。每次只...

    marsggbo
  • Pytorch的Sampler详解

    其原理是首先在初始化的时候拿到数据集data_source,之后在__iter__方法中首先得到一个和data_source一样长度的range可迭代器。每次只...

    于小勇

扫码关注云+社区

领取腾讯云代金券