
大家好,又见面了,我是你们的朋友全栈君。
前面看了datax的 通讯类communication,现在看看在他之上包装的一个容器通信类ContainerCommunicator
dataX中提供了一个基类
AbstractContainerCommunicator来处理JobContainer、TaskGroupContainer和Task的通讯。AbstractContainerCommunicator提供了注册、收集信息等接口,信息的单位是Communication。
类继承关系

类的主要方法

AbstractContainerCommunicator主要将其功能委托给2个属性: private AbstractCollector collector; private AbstractReporter reporter;
Collector负责管理下级注册到上级,搜集并合并下级所有的信息。 dataX提供一个基类
AbstractCollector和一个实现类ProcessInnerCollector。

AbstractCollector同时包含将Task注册到TaskGroupContainer(registerTaskCommunication方法)和将TaskGroupContainer注册到JobContainer(registerTGCommunication方法)的功能。具体如下: /** * Task注册到TaskGroupContainer * * @param taskConfigurationList List<Configuration> */
public void registerTaskCommunication(List<Configuration> taskConfigurationList) {
for (Configuration taskConfig : taskConfigurationList) {
int taskId = taskConfig.getInt(CoreConstant.TASK_ID);
this.taskCommunicationMap.put(taskId, new Communication());
}
} /** * 将TaskGroupContainer注册到JobContainer * * @param taskGroupConfigurationList List<Configuration> */
public void registerTGCommunication(List<Configuration> taskGroupConfigurationList) {
for (Configuration config : taskGroupConfigurationList) {
int taskGroupId = config.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
LocalTGCommunicationManager.registerTaskGroupCommunication(taskGroupId, new Communication());
}
}此外AbstractCollector#collectFromTask提供搜集所有任务信息的功能;
/** * 搜集所有任务信息的功能 * @return Communication */
public Communication collectFromTask() {
Communication communication = new Communication();
communication.setState(State.SUCCEEDED);
for (Communication taskCommunication :
this.taskCommunicationMap.values()) {
communication.mergeFrom(taskCommunication);
}
return communication;
}ProcessInnerCollector只实现 了一个方法collectFromTaskGroup,collectFromTaskGroup提供搜集所有TaskGroupContainer的信息。 @Override
public Communication collectFromTaskGroup() {
return LocalTGCommunicationManager.getJobCommunication();
}Reporter的主要功能是将收集到的信息上报给上级。dataX提供一个基类
AbstractReporter和一个实现类ProcessInnerCollector.
类继承关系

主要方法

ProcessInnerCollector#reportTGCommunication将TaskGroupContianer的信息汇报给上级,操作也很简单直接更新注册时分配给该TaskGroup的Communication(Map中的值)public class ProcessInnerReporter extends AbstractReporter {
@Override
public void reportJobCommunication(Long jobId, Communication communication) {
// do nothing
}
/** * 将TaskGroupContianer的信息汇报给上级,操作也很简单直接更新注册时分配给该TaskGroup的Communication(Map中的值) * * @param taskGroupId Integer * @param communication Communication */
@Override
public void reportTGCommunication(Integer taskGroupId, Communication communication) {
LocalTGCommunicationManager.updateTaskGroupCommunication(taskGroupId, communication);
}
}StandAloneJobContainerCommunicator是AbstractContainerCommunicator一个实现类,主要处理JobContainer和TaskGroupContainer之间的信息传递。
/** * 主要处理JobContainer和TaskGroupContainer之间的信息传递 */
public class StandAloneJobContainerCommunicator extends AbstractContainerCommunicator {
private static final Logger LOG = LoggerFactory
.getLogger(StandAloneJobContainerCommunicator.class);
public StandAloneJobContainerCommunicator(Configuration cfg) {
super(cfg);
super.setCollector(new ProcessInnerCollector(cfg.getLong(DATAX_CORE_CONTAINER_JOB_ID)));
super.setReporter(new ProcessInnerReporter());
}
@Override
public void registerCommunication(List<Configuration> configurationList) {
super.getCollector().registerTGCommunication(configurationList);
}
/** * JobContainer每隔一段时间 主动 获取TaskGroup的信息。最后调用本类的#report向上级汇报, * 这里JobContainer已经是最上级了,向日志中输出先关信息即可 * @return */
@Override
public Communication collect() {
return super.getCollector().collectFromTaskGroup();
}
@Override
public State collectState() {
return this.collect().getState();
}
/** * 和 DistributeJobContainerCollector 的 report 实现一样 * 每隔一段时间向JobContainer 主动 发送自己的状态 */
@Override
public void report(Communication communication) {
super.getReporter().reportJobCommunication(super.getJobId(), communication);
LOG.info(CommunicationTool.Stringify.getSnapshot(communication));
reportVmInfo();
}
@Override
public Communication getCommunication(Integer taskGroupId) {
return super.getCollector().getTGCommunication(taskGroupId);
}
@Override
public Map<Integer, Communication> getCommunicationMap() {
return super.getCollector().getTGCommunicationMap();
}
}AbstractTGContainerCommunicator是AbstractContainerCommunicator的另一个抽象实现类,
/** * 该类是用于处理 taskGroupContainer 的 communication 的收集汇报的父类 * 主要是 taskCommunicationMap 记录了 taskExecutor 的 communication 属性 * 主要处理TaskGroupContainer和Task之间的信息 */
public abstract class AbstractTGContainerCommunicator extends AbstractContainerCommunicator {
protected long jobId;
/** * 由于taskGroupContainer是进程内部调度 * 其registerCommunication(),getCommunication(), * getCommunications(),collect()等方法是一致的 * 所有TG的Collector都是ProcessInnerCollector */
protected int taskGroupId;
public AbstractTGContainerCommunicator(Configuration configuration) {
super(configuration);
this.jobId = configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
super.setCollector(new ProcessInnerCollector(this.jobId));
this.taskGroupId = configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
}
@Override
public void registerCommunication(List<Configuration> configurationList) {
super.getCollector().registerTaskCommunication(configurationList);
}
@Override
public final Communication collect() {
return this.getCollector().collectFromTask();
}
@Override
public final State collectState() {
Communication communication = new Communication();
communication.setState(State.SUCCEEDED);
for (Communication taskCommunication :
super.getCollector().getTaskCommunicationMap().values()) {
communication.mergeStateFrom(taskCommunication);
}
return communication.getState();
}
@Override
public final Communication getCommunication(Integer taskId) {
Validate.isTrue(taskId >= 0, "注册的taskId不能小于0");
return super.getCollector().getTaskCommunication(taskId);
}
@Override
public final Map<Integer, Communication> getCommunicationMap() {
return super.getCollector().getTaskCommunicationMap();
}
}从类的继承实现看最终实现类是StandaloneTGContainerCommunicator, 该类主要处理TaskGroupContainer和Task之间的信息,处理逻辑和StandAloneJobContainerCommunicator差不多
/** * 独立模式的taskGroup 的通讯类 主要处理TaskGroupContainer和Task之间的信息,处理逻辑和StandAloneJobContainerCommunicator差不多 */
public class StandaloneTGContainerCommunicator extends AbstractTGContainerCommunicator {
/** * 单机版的容器沟通者(独立模式的taskGroup 的通讯类) * * @param configuration */
public StandaloneTGContainerCommunicator(Configuration configuration) {
super(configuration);
super.setReporter(new ProcessInnerReporter());
}
@Override
public void report(Communication communication) {
super.getReporter().reportTGCommunication(super.taskGroupId, communication);
}
}注:
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/145375.html原文链接:https://javaforall.cn