StreamExecutionEnvironment是Flink中用于定义和执行流处理程序的主要类。它提供了一系列函数和方法来配置流处理程序的执行环境(例如并行度、checkpoint、时间特性),并将其部署到Flink集群中运行。
StreamExecutionEnvironment的代码比较复杂,这里只提供其中一部分的示例代码作为参考:
public class StreamExecutionEnvironment {
//默认的执行环境并行度
private final int defaultLocalParallelism;
//默认的执行环境
private final ExecutorService defaultExecutorService;
//配置文件
private final Configuration configuration;
//执行环境的ID
private final String executorId;
//用户自定义的类加载器
private final ClassLoader userClassLoader;
//数据源注册中心
private final SourceFunctionRegistry sourceFunctionRegistry;
//转换器注册中心
private final StreamOperatorFactory<?> operatorFactory;
//配置信息
private final CheckpointConfig checkpointConfig;
//时间特性
private final TimeCharacteristic timeCharacteristic;
//状态后端
private final StateBackend stateBackend;
/**
* 构造方法,用于创建一个StreamExecutionEnvironment对象
*
* @param executorService 默认的执行环境
* @param configuration 配置文件
* @param userClassLoader 用户自定义的类加载器
* @param defaultLocalParallelism 默认的执行环境并行度
* @param executorId 执行环境的ID
*/
public StreamExecutionEnvironment(
ExecutorService executorService,
Configuration configuration,
ClassLoader userClassLoader,
int defaultLocalParallelism,
String executorId) {
this.defaultLocalParallelism = defaultLocalParallelism;
this.defaultExecutorService = executorService;
this.configuration = configuration == null ? new Configuration() : configuration;
this.executorId = executorId != null ? executorId : UUID.randomUUID().toString();
this.userClassLoader = userClassLoader == null ? getClass().getClassLoader() : userClassLoader;
this.sourceFunctionRegistry = new SourceFunctionRegistry();
this.operatorFactory = new StreamOperatorFactory<>();
this.checkpointConfig = new CheckpointConfig();
this.timeCharacteristic = TimeCharacteristic.ProcessingTime;
this.stateBackend = null;
}
/**
* 获取数据流处理的默认并行度
*
* @return 默认并行度
*/
public int getDefaultLocalParallelism() {
return defaultLocalParallelism;
}
/**
* 获取配置文件
*
* @return 配置文件
*/
public Configuration getConfiguration() {
return configuration;
}
/**
* 获取执行环境的ID
*
* @return 执行环境的ID
*/
public String getId() {
return executorId;
}
/**
* 获取用户自定义的类加载器
*
* @return 用户自定义的类加载器
*/
public ClassLoader getUserClassLoader() {
return userClassLoader;
}
/**
* 获取数据源注册中心
*
* @return 数据源注册中心
*/
public SourceFunctionRegistry getSourceFunctionRegistry() {
return sourceFunctionRegistry;
}
/**
* 获取转换器注册中心
*
* @return 转换器注册中心
*/
public StreamOperatorFactory<?> getOperatorFactory() {
return operatorFactory;
}
/**
* 获取检查点配置
*
* @return 检查点配置
*/
public CheckpointConfig getCheckpointConfig() {
return checkpointConfig;
}
/**
* 获取时间特性
*
* @return 时间特性
*/
public TimeCharacteristic getTimeCharacteristic() {
return timeCharacteristic;
}
/**
* 获取状态后端
*
* @return 状态后端
*/
public StateBackend getStateBackend() {
return stateBackend;
}
/**
* 设置执行环境的默认并行度
*
* @param parallelism 并行度
*/
public void setDefaultLocalParallelism(int parallelism) {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, parallelism);
}
/**
* 获取一个DataStream对象
*
* @param source 数据源
* @param type 数据类型
* @param <T> 数据类型
* @return DataStream对象
*/
public <T> DataStream<T> addSource(SourceFunction<T> source, TypeInformation<T> type) {
// 创建SourceTransformation对象,表示对数据源进行转换操作
SourceTransformation<T> transform = new SourceTransformation<>(source, "Source", type, defaultLocalParallelism);
// 将SourceTransformation对象添加到转换器注册中心中
operatorFactory.addOperator(transform);
// 返回转换后的DataStream对象
return new DataStream<>(this, getNewNodeId(), transform.getOutputType());
}
/**
* 获取一个DataStream对象
*
* @param source 数据源
* @param <T> 数据类型
* @return DataStream对象
*/
public <T> DataStreamSource<T> addSource(SourceFunction<T> source) {
return addSource(source, TypeExtractor.createTypeInfo(SourceFunction.class, source.getClass(), 0));
}
/**
* 获取一个Table对象
*
* @return Table对象
*/
public TableEnvironment createTableEnvironment() {
// 创建TableEnvironment对象
return TableEnvironment.create(configuration, executorComponents);
}
// 其他操作的实现略
}