前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink1.10基于工厂模式的任务提交与SPI机制

Flink1.10基于工厂模式的任务提交与SPI机制

作者头像
Flink实战剖析
发布2022-04-18 13:24:41
3940
发布2022-04-18 13:24:41
举报
文章被收录于专栏:Flink实战剖析

Flink任务执行模式包含了yarn-session、standalone、per-job、local, 在1.10中又增加k8s的执行模式,那么在任务提交过程中如何根据不同的执行模式进行任务提交呢?主要通过两个接口来实现:PipelineExecutorFactory 与 PipelineExecutor。PipelineExecutorFactory用于在不同模式下创建不同的PipelineExecutor, 用于提交任务,PipelineExecutorFactory表示的一个创建执行器工厂接口,PipelineExecutor 表示一个执行器接口,正如你所想这里使用的就是经典的工厂设计模式,在任务提交过程中会根据不同的提交模式, 使用不同的PipelineExecutorFactory创建不同的PipelineExecutor。

代码语言:javascript
复制
public interface PipelineExecutorFactory {
   /**
    * Returns the name of the executor that this factory creates.
    */
   String getName();
   /**
      根据configuration判断是否满足当前的factory
    */
   boolean isCompatibleWith(final Configuration configuration);
   /**
    * 获取对应模式下的executor
    */
   PipelineExecutor getExecutor(final Configuration configuration);
}

PipelineExecutorFactory几个实现分别为:

1. LocalExecutorFactory(local)

2. RemoteExecutorFactory(standalone)

3. YarnJobClusterExecutorFactory(per-job)

4. YarnSessionClusterExecutorFactory(yarn-session)

代码语言:javascript
复制
public interface PipelineExecutor {

   /**
    * 执行任务
    */
   CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception;
}

PipelineExecutor对应实现:

1. RemoteExecutor(standalone)

2. LocalExecutor(local)

3. YarnJobClusterExecutor(per-job)

4. YarnSessionClusterExecutor(yarn-session)

那么具体是如何选择factory呢?由PipelineExecutorServiceLoader接口来完成,其只有一个实现类DefaultExecutorServiceLoader, 透过命名你可能会才想到这里面用到了ServiceLoader,你的猜想是正确的,它就是通过SPI机制去加载flink所提供的不同factory,在META-INF.services 下可以找到其对应的配置:

DefaultExecutorServiceLoader.java部分源码

代码语言:javascript
复制
//SPI机制
private static final ServiceLoader<PipelineExecutorFactory> defaultLoader = ServiceLoader.load(PipelineExecutorFactory.class);

//获取对应的factory
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
   checkNotNull(configuration);

   final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();
   final Iterator<PipelineExecutorFactory> factories = defaultLoader.iterator();
   while (factories.hasNext()) {
      try {
         final PipelineExecutorFactory factory = factories.next();
         //判断标准 根据任务启动配置
         if (factory != null && factory.isCompatibleWith(configuration)) {
            compatibleFactories.add(factory);
         }
      } catch (Throwable e) {
         if (e.getCause() instanceof NoClassDefFoundError) {
            LOG.info("Could not load factory due to missing dependencies.");
         } else {
            throw e;
         }
      }
   }
   //只能有一个factory符合要求
   if (compatibleFactories.size() > 1) {
      final String configStr =
            configuration.toMap().entrySet().stream()
                  .map(e -> e.getKey() + "=" + e.getValue())
                  .collect(Collectors.joining("\n"));

      throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
   }
   return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0);
}

ServiceLoader.load(PipelineExecutorFactory.class) 会从类路径的META-INF.services下找到PipelineExecutorFactory的全路径文件,然后实例化出所有的factory,通过PipelineExecutorFactory.isCompatibleWith找到匹配的factory。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-06-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档