首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >Apache中转换后的全局窗口触发器

Apache中转换后的全局窗口触发器
EN

Stack Overflow用户
提问于 2017-03-13 10:45:00
回答 1查看 1.7K关注 0票数 1

我试图在flink中实现窗口触发器,如果平均值高于阈值,就会触发窗口触发器。

流数据由,分隔学生的姓名和标记。该窗口必须被触发,如果学生的平均分数通过90,而不管尝试的次数。

示例数据:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Fred,88
Fred,91
Wilma,93
.
.

当前Flink代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger, Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window}

case class Marks(name : String, mark : Double, count : Int)

class MarksTrigger[W <: Window] extends Trigger[Marks,W] {

  override def onElement(element: Marks, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
    if(element.mark > 90) TriggerResult.FIRE  // fire if avg mark is > 90
    else TriggerResult.CONTINUE
  }

  override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
  override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }

  override def clear(window: W, ctx: TriggerContext) = ???
}

object Main {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val data = env.socketTextStream("localhost", 9999)

    val fdata = data.map { values =>
      val columns = values.split(",")
      Marks(columns(0), columns(1).toDouble, 1)
    }

    val keyed = fdata.keyBy(_.name).
      window(GlobalWindows.create()).
      trigger(new MarksTrigger[GlobalWindow]()). // TODO



    keyed.print()
    env.execute()
  }
}

计算平均值:尝试了以下批处理模式

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
case class Marks(name : String, mark : Double, count : Int)

val data = benv.fromElements(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))

data.map(x => (x._1, x._2, 1)).groupBy(0).reduce { (x, y) => 
    (x._1, x._2 + y._2, x._3 + y._3) 
}.map(x => Marks(x._1, x._2/x._3, x._3)).collect

我怎么把这些绑在一起?是在计算平均值之前调用.window().trigger(),还是在onElement()中进行平均计算

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-03-22 02:35:10

我想出了解决办法

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger, Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window}


class MarksTrigger[W <: Window] extends Trigger[Marks,W] {

  override def onElement(element: Marks, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
    //trigger is fired if average marks of a student cross 80
    if(element.mark > 90) TriggerResult.FIRE
    else TriggerResult.CONTINUE
  }

  override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
  override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }

  override def clear(window: W, ctx: TriggerContext) = ???
}

case class Marks(name : String, mark : Double, count : Int)

object Main {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val data = env.socketTextStream("localhost", 9999)

    // data is obtained in "name,mark" format
    val fdata = data.map { values =>
      val columns = values.split(",")
      (columns(0), columns(1).toDouble, 1)
    }

    // calculating average mark and number of exam attempts
    val keyed1 = fdata.keyBy(0).reduce { (x,y) =>
      (x._1, x._2 + y._2, x._3 + y._3)
    }.map( x => Marks(x._1, x._2 / x._3, x._3))


    val keyed = keyed1.keyBy(_.name).
      window(GlobalWindows.create()).
      trigger(PurgingTrigger.of(new MarksTrigger[GlobalWindow]())).
      maxBy(1)

    keyed.print()
    env.execute()

  }
}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42771164

复制
相关文章
Flink窗口触发器
窗口的触发器定义了窗口是何时被触发并同时决定触发行为(对窗口进行清理或者计算)。触发器确定窗口(由窗口分配程序形成)何时准备由窗口函数处理。每个WindowAssigner都带有一个默认触发器。 注意:窗口的触发在内部是设置定时器来实现的。
神秘的寇先森
2020/03/20
2.3K0
Flink窗口触发器
后仿中的异步D触发器设置
在PR后仿时,经常会遇到讨厌的红色X(不定态)。而debug不定态的起因又很麻烦,有可能用Verdi调试半天还是没能找到根本的原因。
ExASIC
2020/07/15
3.3K0
后仿中的异步D触发器设置
Flink 窗口行为触发器
触发器决定窗口(由窗口分配器形成)何时可以由窗口函数处理。每个WindowAssigner都有一个默认的触发器。如果默认触发器不满足您的需求,您可以使用trigger(…)指定一个自定义触发器。
前Thoughtworks-杨焱
2021/12/07
9440
Apache Flink中的各个窗口时间的概念区分
“ Apache Flink中提供了基于时间的窗口计算,例如计算五分钟内的用户数量或每一分钟计算之前五分钟的服务器异常日志占比等。因此Apache Flink在流处理中提供了不同时间的支持。”
CainGao
2020/04/14
7890
窗口实用触发器:ContinuousEventTimeTrigger
短窗口的计算由于其窗口期较短,那么很快就能获取到结果,但是对于长窗口来说窗口时间比较长,如果等窗口期结束才能看到结果,那么这份数据就不具备实时性,大多数情况我们希望能够看到一个长窗口的结果不断变动的情况,对此Flink提供了ContinuousEventTimeTrigger连续事件时间触发器与ContinuousProcessingTimeTrigger连续处理时间触发器,指定一个固定时间间隔interval,不需要等到窗口结束才能获取结果,能够在固定的interval获取到窗口的中间结果。
Flink实战剖析
2022/04/18
1.3K1
WPF 获取全局所有窗口的创建显示事件 监控窗口打开
本文将告诉大家如何在 WPF 里面进行全局监控任意的窗口创建显示打开,可以获取到每个 WPF 窗口的打开的时机。如此可以用来辅助定位问题和输出日志
林德熙
2023/04/07
2.1K0
关闭模态窗口后,父窗口居然跑到了其他窗口的后面
发布于 2018-02-05 05:58 更新于 2018-06-05 02:55
walterlv
2018/09/18
9.5K1
关闭模态窗口后,父窗口居然跑到了其他窗口的后面
Flink1.4 窗口触发器与Evictors
触发器(Trigger)决定了窗口(请参阅窗口概述)博文)什么时候使用窗口函数处理窗口内元素。每个窗口分配器都带有一个默认的触发器。如果默认触发器不能满足你的要求,可以使用 trigger(...) 指定自定义的触发器。
smartsi
2019/08/07
1.4K0
Apache Flink窗口的几种实现的类别
“ 无界数据于有界数据是一个比较于模糊的概念,无界与有界之间是可以进行转换的。无界数据流在进行某些计算的时候例如每分钟、每小时、每天等操作时都可以看做是有界数据集。Apache Flink使用Windows方式实现了对于无界数据集到有界数据集的计算。”
CainGao
2020/04/14
1.1K0
Electron 无边框窗口开启全局拖拽
最近有个需求,Electron 打开的窗口要实现拖拽功能,大概看了一眼 BrowserWindow 的 API 却只找到了一个 move 事件,这个事件默认是针对有边框窗口的,也即 frame: true 的窗口。
savokiss
2019/11/06
2.9K0
全局日期请求转换处理
使用@InitBinder注解以及Jackson2ObjectMapperBuilderCustomizer
阿超
2022/08/16
6550
全局日期请求转换处理
基于Apache NiFi 实现ETL过程中的数据转换
列名转换是ETL过程中常常遇到的场景。例如来源表user的主键id,要求写入目标表user的uid字段内,那么就需要列名转换.
HostenWang
2020/11/21
2.6K0
基于Apache NiFi 实现ETL过程中的数据转换
PKS中的RS触发器和SR触发器
上大学时,学习《数字电子技术》这门课,第一次接触到RS触发器的概念,当时学了个囫囵吞枣,只知道有个置位端,还有个复位端,当置位端为ON时,RS触发器的输出为ON,当复位端为ON时,RS触发器的输出为OFF,至于置位端和复位端都为ON,或者都为OFF,触发器的输出会怎样,什么情况下需要使用RS触发器,当时根本就没有考虑,看来教学和应用还是有点脱节的。
剑指工控
2022/11/14
1.5K0
PKS中的RS触发器和SR触发器
WPF中的触发器(Trigger)
这节来讲一下WPF中的触发器——Trigger。触发器,是指在既定条件或者特殊场景下被触发,从而去执行一个操作。在WPF中,触发器可以分为以下几类:基本触发器(Trigger);事件触发器(EventTrigger);数据触发器(DataTrigger);多条件触发器(MultiTrigger,MultiDataTrigger)。下面我们来通过代码一一了解。
宿春磊Charles
2022/01/04
3.2K0
WPF中的触发器(Trigger)
MySQL中触发器的使用
如遇到触发器报错“Not allowed to return a result set from a trigger”;请划到最后看详解;
xbhog
2020/12/10
3.3K0
Linux下卸载Apache后再安装Apache
以前在Linux下配置了一个系统监视软件zabbix,总体用起来还不错,因为需要在网页端显示,所以需要搭建lamp环境,然后配置apache2使打开本地网页就显示那个为主页,以后就没再碰过,也没做记录,当时的配置文件和信息早已经忘记,后来胡搞了一通,发现apache服务运行不了了,打开里面的配置文件发现里面都是空的,没有任何信息,于是我开始了apache2的重装。
星哥玩云
2022/07/03
5.1K0
WPF 解决弹出模态窗口关闭后,主窗口不在最前
本文告诉大家如何解决这个问题,在 WPF 的软件,弹出一个模态窗口。使用另一个窗口在模态窗口前面。从任务栏打开模态窗口。关闭模态窗口。这时发现,主窗口会在刚才使用的另一个窗口下面。
林德熙
2019/03/13
5.3K0
PyCharm中的全局搜索
根据每个人的快捷键设置每个人可能都不一样。具体方法是打开设置(File->Settings),找到keymap选项。
全栈程序员站长
2022/08/23
1.2K0
PyCharm中的全局搜索
WPF 解决弹出模态窗口关闭后,主窗口不在最前
本文告诉大家如何解决这个问题,在 WPF 的软件,弹出一个模态窗口。使用另一个窗口在模态窗口前面。从任务栏打开模态窗口。关闭模态窗口。这时发现,主窗口会在刚才使用的另一个窗口下面。
林德熙
2018/09/18
14.9K2
WPF 解决弹出模态窗口关闭后,主窗口不在最前
本文告诉大家如何解决这个问题,在 WPF 的软件,弹出一个模态窗口。使用另一个窗口在模态窗口前面。从任务栏打开模态窗口。关闭模态窗口。这时发现,主窗口会在刚才使用的另一个窗口下面。
林德熙
2022/08/04
5.8K0

相似问题

Apache Beam中全局窗口上基于时间的触发器

116

如何修复Apache Beam中的“连接无界PCollections当前仅支持非全局窗口和触发器”

20

Apache梁:固定窗口的触发器

11

全局窗口自定义触发器上的allowedLateness

23

Cherrypy在Apache窗口后运行

12
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文