首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Apache :为DataStream API添加侧输入

Apache :为DataStream API添加侧输入
EN

Stack Overflow用户
提问于 2020-06-02 04:22:14
回答 1查看 908关注 0票数 1

在我的Java应用程序中,我有3 DataStreams。例如,一个流数据是从Kafka消费的,另一个流数据是从Apache消费的。对于这两个流,对象类型是不同的。例如,Stream-1对象类型是Person,Stream-2对象类型是Address.

第三种是广播流(因为这些数据是从Kafka消费的)。

现在,我想将Stream-1和Stream-2组合到一个作业类中,并希望在任务流程元素中进行拆分。如何实现这一点?

注:流-1是主流,-2是侧输入.MainStream不断地从卡夫卡获取数据。对于侧输入,首先当应用程序运行时,从DB加载所有表数据,然后在更新表数据时读取新数据(不频繁)。

样本结构:

代码语言:javascript
运行
复制
DataStream<Person> stream-1 = env.addSource(read data from kafka)....
DataStream<Address> stream-2 = env.addSource(read data from nifi)....
BroadcastStream<String> BroadCastStream = stream-3.broadcast(read data from kafka);

我被称为以下链接。

倒装-17 DataStream API的侧输入

jira/浏览/FLINK-6131

我的用例是:

加入了缓慢变化的数据流:--我们用来丰富内容的侧输入--是随着时间的推移而演变的(数据从DB读取)。这可以通过在处理主输入之前等待一些初始数据可用来完成,并在到达时不断地将新数据吸收到内部侧输入结构中。

EN

回答 1

Stack Overflow用户

发布于 2020-07-25 11:43:31

根据最近的答复,@Arvid的建议实际上是这里所需要的。

答案的核心是:

您可以轻松地加入stream1和stream2,即使它们有不同的类型。然后,可以将广播添加到结果中。

指向文档示例的链接,以及文档中的相关片段(示例太长,不能包含在这里):

代码语言:javascript
运行
复制
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62144792

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档