前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink-RichAsyncFunction

flink-RichAsyncFunction

作者头像
stys35
发布2019-03-19 16:46:02
2.3K0
发布2019-03-19 16:46:02
举报
文章被收录于专栏:工作笔记精华工作笔记精华
代码语言:javascript
复制
/**
	 * Tests the basic functionality of the AsyncWaitOperator: Processing a limited stream of
	 * elements by doubling their value. This is tested in for the ordered and unordered mode.
	 */
@Test
public void testAsyncWaitOperator() throws Exception {
    final int numElements = 5;
    final long timeout = 1000L;
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements));
    AsyncFunction<Tuple2<Integer, NonSerializable>, Integer> function = new RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer>() {

        private static final long serialVersionUID = 7000343199829487985L;

        transient ExecutorService executorService;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            executorService = Executors.newFixedThreadPool(numElements);
        }

        @Override
        public void close() throws Exception {
            super.close();
            executorService.shutdownNow();
        }

        @Override
        public void asyncInvoke(final Tuple2<Integer, NonSerializable> input, final AsyncCollector<Integer> collector) throws Exception {
            executorService.submit(new Runnable() {

                @Override
                public void run() {
                    collector.collect(Collections.singletonList(input.f0 + input.f0));
                }
            });
        }
    };
    DataStream<Integer> orderedResult = AsyncDataStream.orderedWait(input, function, timeout, TimeUnit.MILLISECONDS, 2).setParallelism(1);
    // save result from ordered process
    final MemorySinkFunction sinkFunction1 = new MemorySinkFunction(0);
    final List<Integer> actualResult1 = new ArrayList<>(numElements);
    MemorySinkFunction.registerCollection(0, actualResult1);
    orderedResult.addSink(sinkFunction1).setParallelism(1);
    DataStream<Integer> unorderedResult = AsyncDataStream.unorderedWait(input, function, timeout, TimeUnit.MILLISECONDS, 2);
    // save result from unordered process
    final MemorySinkFunction sinkFunction2 = new MemorySinkFunction(1);
    final List<Integer> actualResult2 = new ArrayList<>(numElements);
    MemorySinkFunction.registerCollection(1, actualResult2);
    unorderedResult.addSink(sinkFunction2);
    Collection<Integer> expected = new ArrayList<>(10);
    for (int i = 0; i < numElements; i++) {
        expected.add(i + i);
    }
    env.execute();
    Assert.assertEquals(expected, actualResult1);
    Collections.sort(actualResult2);
    Assert.assertEquals(expected, actualResult2);
    MemorySinkFunction.clear();
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档