前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)

0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)

作者头像
方亮
发布2023-11-01 09:17:27
2760
发布2023-11-01 09:17:27
举报
文章被收录于专栏:方亮
《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们介绍了滚动窗口。本节我们要介绍滑动窗口。

滑动(Sliding)和滚动(Tumbling)的区别

正如其名,“滑动”是指这个窗口沿着一定的方向,按着一定的速度“滑行”。

而滚动窗口,则是一个个“衔接着”,而不是像上面那样交错着。

它们的相同之处就是:只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算。

样例

我们只要对《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》中的代码做轻微的改动即可。为了简化样例,我们只看Key为E的元素的滑动。

代码语言:javascript
复制
word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
    # define the source
    # mappging
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

    # keying
    keyed=source.key_by(lambda i: i[0]) 

窗口为2,滑动距离为1

count_window会根据传入的第二参数决定是构建滚动(CountTumblingWindowAssigner)窗口还是滑动(CountSlidingWindowAssigner)窗口。

代码语言:javascript
复制
    def count_window(self, size: int, slide: int = 0):
        """
        Windows this KeyedStream into tumbling or sliding count windows.

        :param size: The size of the windows in number of elements.
        :param slide: The slide interval in number of elements.

        .. versionadded:: 1.16.0
        """
        if slide == 0:
            return WindowedStream(self, CountTumblingWindowAssigner(size))
        else:
            return WindowedStream(self, CountSlidingWindowAssigner(size, slide))

我们只要给count_window第二个参数传递一个不为0的值,即可达到滑动效果。

代码语言:javascript
复制
    # reducing
    windows_size = 2
    sliding_size = 1
    reduced=keyed.count_window(windows_size, sliding_size) \
                .apply(SumWindowFunction(),
                       Types.TUPLE([Types.STRING(), Types.INT()]))

    # # define the sink
    reduced.print()

    # submit for execution
    env.execute()

(E,2) (E,2) (E,2) (E,2) (E,2)

窗口为3,滑动距离为1

代码语言:javascript
复制
    # reducing
    windows_size = 3
    sliding_size = 1
    reduced=keyed.count_window(windows_size, sliding_size) \
                .apply(SumWindowFunction(),
                       Types.TUPLE([Types.STRING(), Types.INT()]))

(E,3) (E,3) (E,3) (E,3)

窗口为3,滑动距离为2

代码语言:javascript
复制
    # reducing
    windows_size = 3
    sliding_size = 2
    reduced=keyed.count_window(windows_size, sliding_size) \
                .apply(SumWindowFunction(),
                       Types.TUPLE([Types.STRING(), Types.INT()]))

(E,3) (E,3)

窗口为3,滑动距离为3

这个就等效于滚动窗口了,因为“滑”过了窗口大小。

代码语言:javascript
复制
    # reducing
    windows_size = 3
    sliding_size = 3
    reduced=keyed.count_window(windows_size, sliding_size) \
                .apply(SumWindowFunction(),
                       Types.TUPLE([Types.STRING(), Types.INT()]))

(E,3) (E,3)

完整代码

代码语言:javascript
复制
from typing import Iterable

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import CountWindow

class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):
    def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):
        return [(key,  len([e for e in inputs]))]


word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
    # define the source
    # mappging
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

    # keying
    keyed=source.key_by(lambda i: i[0]) 
    
    # reducing
    windows_size = 3
    sliding_size = 1
    reduced=keyed.count_window(windows_size, sliding_size) \
                .apply(SumWindowFunction(),
                       Types.TUPLE([Types.STRING(), Types.INT()]))

    # # define the sink
    reduced.print()

    # submit for execution
    env.execute()

if __name__ == '__main__':
    word_count()

参考资料

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-11-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 在 《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们介绍了滚动窗口。本节我们要介绍滑动窗口。
  • 滑动(Sliding)和滚动(Tumbling)的区别
  • 样例
    • 窗口为2,滑动距离为1
      • 窗口为3,滑动距离为1
        • 窗口为3,滑动距离为2
          • 窗口为3,滑动距离为3
          • 完整代码
          • 参考资料
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档