前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Trino源码学习】Trino源码剖析之plugin加载

【Trino源码学习】Trino源码剖析之plugin加载

作者头像
skyyws
发布2022-05-20 08:52:49
1.3K0
发布2022-05-20 08:52:49
举报
文章被收录于专栏:skyyws的技术专栏

目录

最近在研究Trino的相关代码,发现用到了大量的函数式编程和lambda表达式等java8的新特性。刚开始接触门槛比较高,代码阅读也比较费劲,因此希望借这个系列,对自己的代码学习做一些记录,能够帮助到一些刚入门的同学。本文将会跟着代码,一步一步分析Trino的plugin加载到底做了哪些事情,当前代码分析都是基于trino的maste分支(最新的commit记录为:6a48c4352dfc6835997c43d7d5f7a599c0a712a5)。

加载流程归纳

由于后面涉及到了比较多的代码分析,防止阅读起来比较混乱,这里先用一个流程图对整个plugin的加载做一个整体的归纳总结,能够有一个大致的印象,后面在阅读代码的时候也可以对照着来看。需要注意的是,由于代码里面很多都是通过lambda表达式作为参数,进行循环嵌套处理,所以整个流程与后续的章节略有不同。

1
1

Plugin类图

在正式分析代码流程之前,先简单看下Plugin相关的类图,Trino支持的每一个plugin,在代码里面都会有一个对应的Plugin的实现类,如下所示:

2
2

可以看到,主要分成JdbcPlugin和其他的Plugin,例如KuduPlugin、BigQueryPlugin等。而JdbcPlugin又包含了MysqlPlugin、OraclePlugin、ClickHousePlugin等。通过Plugin的方法,可以获取对应的ConnectorFactory。ConnectorFactory也分为两种,主要是JdbcConnectorFactory和其他的ConnectorFactory实现类。需要注意的是,例如Mysql、Oracle等这些jdbc类型的plugin,都是直接使用了JdbcConnectorFactory。也就是说,通过MysqlPlugin、ClickhousePlugin等获取到的都是JdbcConnectorFactory,相关代码如下所示:

代码语言:javascript
复制
//JdbcPlugin.java,MysqlPlugin、OraclePlugin等都是直接继承的这个方法,没有单独实现
public Iterable<ConnectorFactory> getConnectorFactories() {
  return ImmutableList.of(new JdbcConnectorFactory(name,
      combine(new CredentialProviderModule(),
          new ExtraCredentialsBasedIdentityCacheMappingModule(),module)));
}

关于Plugin和ConnectorFactory,我们会在下面的代码分析中反复提到。

加载流程源码分析

Trino服务在启动的时候,会从指定的路径来加载所有的plugin,默认是/plugin目录下。在该目录下,每个plugin都是以单独的子目录形式存在,该plugin所需要的jar,都放在这个子目录中。以Clickhouse为例,如下所示:

3
3

Trino会遍历/plugin目录下的所有子目录,然后依次加载,相关的函数调用如下所示:

代码语言:javascript
复制
doStart(Server.java):126
-loadPlugins(PluginManager.java):135
--loadPlugin(PluginManager.java):155
---loadPlugin(PluginManager.java):169
----installPlugin(PluginManager.java):175
-----installPluginInternal(PluginManager.java):198

在Trino中,大量用到了函数式编程和lambda表达式。在循环遍历/plugin目录来加载所有plugin的时候,主要是通过如下代码来操作的:

代码语言:javascript
复制
//PluginManager.loadPlugins():135
pluginsProvider.loadPlugins(this::loadPlugin, PluginManager::createClassLoader);

函数式接口PluginsProvider

其中,pluginsProvider是一个interface,如下所示:

代码语言:javascript
复制
//PluginManager.java
public interface PluginsProvider {
  void loadPlugins(Loader loader, ClassLoaderFactory createClassLoader);

  interface Loader {
    void load(String description, Supplier<PluginClassLoader> getClassLoader);
  }

  interface ClassLoaderFactory {
    PluginClassLoader create(String pluginName, List<URL> urls);
  }
}

可以看到PluginsProvider内部有一个loadPlugins,属于abstract method,因此是一个函数式接口。同理,嵌套的Loader和ClassLoaderFactory也属于函数式接口。

Lambda表达式展开

上面loadPlugins的那一行代码,其实是进行了lambda表达的简写,我们展开来看,对于初学者更加友好:

代码语言:javascript
复制
//演示代码,实际源码中不存在
PluginsProvider.Loader loader =
  (plugin, createClassLoader) -> loadPlugin(plugin, createClassLoader);
  
PluginsProvider.ClassLoaderFactory factory =
  (pluginName, urls) -> createClassLoader(pluginName, urls);
  
pluginsProvider.loadPlugins(loader, factory);

实际源码中就是将两个lambda表达式,简写为了“类名::实例方法名”,然后作为参数,传给loadPlugins方法。

循环处理plugin

PluginsProvider本身是一个接口,它有一个实现类,相应的loadPlugins方法实现,如下所示:

代码语言:javascript
复制
//ServerPluginsProvider.java
public void loadPlugins(Loader loader, ClassLoaderFactory createClassLoader) {
  executeUntilFailure(
      executor,
      listFiles(installedPluginsDir).stream()
           .filter(File::isDirectory)
           .map(file -> (Callable<?>) () -> {
               loader.load(file.getAbsolutePath(), () ->
                   createClassLoader.create(file.getName(), buildClassPath(file)));
               return null;
           })
           .collect(toImmutableList()));
}

可以看到使用了很长的lambda表达式,这里我们也简单拆解来看。

遍历/plugin所有子目录进行处理

首先会遍历/plugin下所有的子目录,然后转换成File,并使用isDirectory进行过滤,对应如下所示的代码:

代码语言:javascript
复制
listFiles(installedPluginsDir).stream().filter(File::isDirectory)

private static List<File> listFiles(File path) {
  //省略无关代码
  return stream(directoryStream).map(Path::toFile).sorted().collect(toImmutableList());
}

然后再对获取到的所有子目录进行逐个处理。整个处理过程又可以分为两个部分,我们来分别看一下。

构造PluginClassLoader

首先看最里面的lambda表达式:

代码语言:javascript
复制
() -> createClassLoader.create(file.getName(), buildClassPath(file))

private static List<URL> buildClassPath(File path) {
  return listFiles(path).stream().map(ServerPluginsProvider::fileToUrl).collect(toImmutableList());
}

private static URL fileToUrl(File file) {
  //省略无关代码
  return file.toURI().toURL();
}

首先要明白,输入参数file代表的是一个子目录,对应着一个plugin,这个子目录下会包含这个plugin所依赖的jar。这里就是将这些jar转换成一个URL类型的集合,作为输入参数。然后跟plugin的名称一起作为参数,传给ClassLoaderFactory的create方法,实际绑定的是PluginManager.createClassLoader方法,我们通过上面的lambda展开代码可以看出,最终会返回一个PluginClassLoader,如下所示:

代码语言:javascript
复制
public static PluginClassLoader createClassLoader(String pluginName, List<URL> urls) {
  ClassLoader parent = PluginManager.class.getClassLoader();
  return new PluginClassLoader(pluginName, urls, parent, SPI_PACKAGES);
}

这样,对于每一个trino支持的plugin,都会创建一个对应的PluginClassLoader,并且这个PluginClassLoader会包含该plugin依赖的所有jar。后面就是真正进行plugin的一系列加载操作。我们继续来看一下。

单个plugin处理

构造完成PluginClassLoader之后,将该plugin的absolute path和PluginClassLoader本身作为参数传给Loader的load方法:

代码语言:javascript
复制
//这里为了展示,将原先的代码进行了展开,不是原代码的实际内容
//由于这里是lambda表达式,所以不能直接用PluginClassLoader作为返回值,需要使用Supplier接口将其包装起来
Supplier<PluginClassLoader> supplier  =
    () -> createClassLoader.create(file.getName(),buildClassPath(file));

xxx.map(file -> (Callable<?>) () -> {
    loader.load(file.getAbsolutePath(), supplier);
    return null;
})

这里实际绑定的是PluginManager.loadPlugin方法,我们同样可以通过从最上面的展开代码看出。函数主体如下所示:

代码语言:javascript
复制
private void loadPlugin(String plugin, Supplier<PluginClassLoader> createClassLoader) {
  //省略无关代码
  PluginClassLoader pluginClassLoader = createClassLoader.get();
  handleResolver.registerClassLoader(pluginClassLoader);
  try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(pluginClassLoader)) {
    loadPlugin(pluginClassLoader);
  }
}

这个loadPlugin方法主要可以分为三个部分,我们逐个来看下。

注册PluginClassLoader到HandleResolver

首先是将当前的PluginClassLoader注册到HandleResolver中,这个类型也很简单,就是保存所有创建了的ClassLoader,如下所示:

代码语言:javascript
复制
//HandleResolver.java
private final Map<String, ClassLoader> classLoaders = new ConcurrentHashMap<>();
public void registerClassLoader(PluginClassLoader classLoader) {
  ClassLoader existingClassLoader = classLoaders.putIfAbsent(classLoader.getId(), classLoader);
  checkState(xxx);
}

//PluginClassLoader.java
public String getId() {
  return pluginName + catalogName.map(name -> ":" + name).orElse("");
}

将PluginClassLoader本身和id加到了map中,id使用了pluginName和catalogName进行拼接。pluginName就是每个plugin对应目录的名称,catalogName我们在后续的文章详细介绍,这里暂不展开。

设置当前线程ClassLoader

接着,会将当前的线程的ClassLoader设置为该PluginClassLoader,如下所示:

代码语言:javascript
复制
//ThreadContextClassLoader.java
private final ClassLoader originalThreadContextClassLoader;
public ThreadContextClassLoader(ClassLoader  newThreadContextClassLoader) {
  this.originalThreadContextClassLoader = Thread.currentThread().getContextClassLoader();
  Thread.currentThread().setContextClassLoader(newThreadContextClassLoader);
}

这样是为了保证每个plugin依赖的类,都会用各自的PluginClassLoader去进行加载。这样就可以保证不同plugin之间,依赖的jar不会产生冲突。 然后继续调用重载的loadPlugin方法,这个方法目前只接收一个PluginClassLoader作为参数进行后续的加载操作。

获取Plugin实现类

我们接着往下看重载的loadPlugin的函数主体:

代码语言:javascript
复制
//PluginManager.java,省略了部分无关代码
private void loadPlugin(PluginClassLoader pluginClassLoader) {
  ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader);
  List<Plugin> plugins = ImmutableList.copyOf(serviceLoader);

  for (Plugin plugin : plugins) {
    installPlugin(plugin, pluginClassLoader::duplicate);
  }
}

首先,通过ServiceLoader来获取当前PluginClassLoader中所有Plugin接口的实现类。以Hive为例,就是寻找plugin/hive下所有jar中,Plugin接口的实现类,这里对应的只有一个HivePlugin。所以,虽然这里使用了循环调用installPlugin,但通常每个plugin路径下,都是只有一个实现类。我们可以通过服务端日志进行验证:

代码语言:javascript
复制
2022-02-18T16:51:25.936+0800    INFO    main    io.trino.server.PluginManager   -- Loading plugin /data/impala/presto/data/plugin/hive --

2022-02-18T16:51:25.964+0800    INFO    main    io.trino.server.PluginManager   Installing io.trino.plugin.hive.HivePlugin

然后调用installPlugin来加载每一个Plugin的实现类。这里同样用到了lambda表达式,将pluginClassLoader::duplicate作为参数传入。

Duplicate PluginClassLoader

我们展开上述的duplicate表达式来看:

代码语言:javascript
复制
//这里也是演示代码,实际原代码中不存在
Function<CatalogName, ClassLoader> function = (CatalogName name) -> pluginClassLoader.duplicate(name);
installPlugin(plugin, function);

这里duplicate函数就是为了复制一个PluginClassLoader,但是与原先PluginClassLoader不同的时候,这里会传入新的catalogName和urls,如下所示:

代码语言:javascript
复制
//除了catalogName和urls,其他都是复用了当前的这个PluginClassLoader的成员
public PluginClassLoader duplicate(CatalogName catalogName) {
  return new PluginClassLoader(
      pluginName,
      Optional.of(requireNonNull(catalogName, "catalogName is null")),
      ImmutableList.copyOf(getURLs()),
      spiClassLoader,
      spiPackages,
      spiResources);
}

关于这个duplicate函数,我们会在后续catalog的加载时用到,这里暂不展开说明。我们只需要知道,这里将duplicate函数作为Function的主体传给了installPlugin函数即可。

构造InternalConnectorFactory

接着将Plugin和Function作为参数,传入installPluginInternal函数。在installPluginInternal函数中,会进行各种注册,包括connector、functions、resource group等。这里主要关注下connector的注册,这个跟后续的catalog加载有关系,相关代码如下所示: 我们继续看后续的调用栈,如下所示:

代码语言:javascript
复制
//PluginManager.java
private void installPluginInternal(Plugin plugin, Function<CatalogName, ClassLoader>    
    duplicatePluginClassLoaderFactory) {
  for (ConnectorFactory connectorFactory : plugin.getConnectorFactories()) {
    connectorManager.addConnectorFactory(connectorFactory, duplicatePluginClassLoaderFactory);
  }
}

//ConnectorManager.java
ConcurrentMap<String, InternalConnectorFactory> connectorFactories = new ConcurrentHashMap<>();

public synchronized void addConnectorFactory(ConnectorFactory connectorFactory, Function<CatalogName, 
    ClassLoader> duplicatePluginClassLoaderFactory) {
  InternalConnectorFactory existingConnectorFactory = connectorFactories.putIfAbsent(
      connectorFactory.getName(),
          new InternalConnectorFactory(connectorFactory, duplicatePluginClassLoaderFactory));
}

上述代码进行了精简,主要就是通过addConnectorFactory方法,将plugin对应的ConnectorFactory和Function构造为一个InternalConnectorFactory,然后添加到到map中。ConnectorFactory的name是在构造具体的实现类时传入的,一般就是plugin的名称,例如hive、clickhouse等。关于这个InternalConnectorFactory,同样会在后面catalog的加载时用到,这里只是进行了一个注册操作。

小结

到这里,关于plugin的加载基本就已经完成了,主要的操作都是在PluginManager这个类中完成的。通过上述的源码解析可以看到,trino中应用了很多的函数式接口和lambda表达式简写,这种写法可以让代码在很大程度上精简,原来好几行代码才能实现的功能,现在通过一行lambda表达式就可以达到目的,非常方便。笔者之前对于函数式编程和lambda表达式用的也很少,希望以后也可以在工程实践中多多用到。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-02-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 加载流程归纳
  • Plugin类图
  • 加载流程源码分析
    • 函数式接口PluginsProvider
      • Lambda表达式展开
        • 循环处理plugin
          • 遍历/plugin所有子目录进行处理
          • 构造PluginClassLoader
        • 单个plugin处理
          • 注册PluginClassLoader到HandleResolver
          • 设置当前线程ClassLoader
          • 获取Plugin实现类
          • Duplicate PluginClassLoader
        • 构造InternalConnectorFactory
        • 小结
        相关产品与服务
        腾讯云代码分析
        腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,支撑团队传承代码文化。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档