StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(2);
DataStream<VideoOrder> videoOrderDS = env.addSource(new VideoOrderSource());
DataStream<VideoOrder> filterDS = videoOrderDS.filter(new FilterFunction<VideoOrder>() {
@Override
public boolean filter(VideoOrder videoOrder) throws Exception {
return videoOrder.getMoney()>5;
}
}).setParallelism(3);
filterDS.print().setParallelism(4);
//DataStream需要调用execute,可以取个名称
env.execute("source job");
}
数据流中最大的并行度,就是算子链中最大算子的数量,比如source 2个并行度,filter 4个,sink 4个,最大就是4