前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的TableFactory

聊聊flink的TableFactory

作者头像
code4it
发布2019-03-04 10:14:06
6940
发布2019-03-04 10:14:06
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink的TableFactory

实例

代码语言:javascript
复制
class MySystemTableSourceFactory implements StreamTableSourceFactory<Row> {

  @Override
  public Map<String, String> requiredContext() {
    Map<String, String> context = new HashMap<>();
    context.put("update-mode", "append");
    context.put("connector.type", "my-system");
    return context;
  }

  @Override
  public List<String> supportedProperties() {
    List<String> list = new ArrayList<>();
    list.add("connector.debug");
    return list;
  }

  @Override
  public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
    boolean isDebug = Boolean.valueOf(properties.get("connector.debug"));

    # additional validation of the passed properties can also happen here

    return new MySystemAppendTableSource(isDebug);
  }
}

public class MySystemConnector extends ConnectorDescriptor {

  public final boolean isDebug;

  public MySystemConnector(boolean isDebug) {
    super("my-system", 1, false);
    this.isDebug = isDebug;
  }

  @Override
  protected Map<String, String> toConnectorProperties() {
    Map<String, String> properties = new HashMap<>();
    properties.put("connector.debug", Boolean.toString(isDebug));
    return properties;
  }
}
  • 本实例定义了MySystemTableSourceFactory,它的requiredContext为update-mode=append及connector.type=my-system,它的supportedProperties为connector.debug,它的createStreamTableSource方法创建的是MySystemAppendTableSource;MySystemConnector继承了ConnectorDescriptor,它定义了connector.type值为my-system,connector.property-version值为1,formatNeeded属性为false,其toConnectorProperties定义了connector.debug的值

TableFactory

flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactory.java

代码语言:javascript
复制
@PublicEvolving
public interface TableFactory {

    Map<String, String> requiredContext();

    List<String> supportedProperties();
}
  • TableFactory定义了requiredContext及supportedProperties两个方法,其中requiredContext用于进行factory的匹配,supportedProperties用于指定factory支持的属性,如果传入了factory不支持的属性则会抛出异常

BatchTableSourceFactory

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/BatchTableSourceFactory.scala

代码语言:javascript
复制
trait BatchTableSourceFactory[T] extends TableFactory {

  def createBatchTableSource(properties: util.Map[String, String]): BatchTableSource[T]
}
  • BatchTableSourceFactory继承了TableFactory,定义了createBatchTableSource方法

BatchTableSinkFactory

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/BatchTableSinkFactory.scala

代码语言:javascript
复制
trait BatchTableSinkFactory[T] extends TableFactory {

  def createBatchTableSink(properties: util.Map[String, String]): BatchTableSink[T]
}
  • BatchTableSinkFactory继承了TableFactory,定义了createBatchTableSink方法

StreamTableSourceFactory

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/StreamTableSourceFactory.scala

代码语言:javascript
复制
trait StreamTableSourceFactory[T] extends TableFactory {

  def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[T]
}
  • StreamTableSourceFactory继承了TableFactory,定义了createStreamTableSource方法

StreamTableSinkFactory

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/StreamTableSinkFactory.scala

代码语言:javascript
复制
trait StreamTableSinkFactory[T] extends TableFactory {

  def createStreamTableSink(properties: util.Map[String, String]): StreamTableSink[T]
}
  • StreamTableSinkFactory继承了TableFactory,定义了createStreamTableSink方法

ConnectorDescriptor

flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/descriptors/ConnectorDescriptor.java

代码语言:javascript
复制
@PublicEvolving
public abstract class ConnectorDescriptor extends DescriptorBase implements Descriptor {

    private String type;

    private int version;

    private boolean formatNeeded;

    /**
     * Constructs a {@link ConnectorDescriptor}.
     *
     * @param type string that identifies this connector
     * @param version property version for backwards compatibility
     * @param formatNeeded flag for basic validation of a needed format descriptor
     */
    public ConnectorDescriptor(String type, int version, boolean formatNeeded) {
        this.type = type;
        this.version = version;
        this.formatNeeded = formatNeeded;
    }

    @Override
    public final Map<String, String> toProperties() {
        final DescriptorProperties properties = new DescriptorProperties();
        properties.putString(CONNECTOR_TYPE, type);
        properties.putLong(CONNECTOR_PROPERTY_VERSION, version);
        properties.putProperties(toConnectorProperties());
        return properties.asMap();
    }

    /**
     * Returns if this connector requires a format descriptor.
     */
    protected final boolean isFormatNeeded() {
        return formatNeeded;
    }

    /**
     * Converts this descriptor into a set of connector properties. Usually prefixed with
     * {@link FormatDescriptorValidator#FORMAT}.
     */
    protected abstract Map<String, String> toConnectorProperties();
}
  • ConnectorDescriptor继承了DescriptorBase,实现了Descriptor接口,它重写了Descriptor接口的toProperties方法,定义内置属性CONNECTOR_TYPE及CONNECTOR_PROPERTY_VERSION,之后还通过抽象方法toConnectorProperties来合并子类定义的属性

TableFactoryUtil

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactoryUtil.scala

代码语言:javascript
复制
object TableFactoryUtil {

  /**
    * Returns a table source for a table environment.
    */
  def findAndCreateTableSource[T](
      tableEnvironment: TableEnvironment,
      descriptor: Descriptor)
    : TableSource[T] = {

    val javaMap = descriptor.toProperties

    tableEnvironment match {
      case _: BatchTableEnvironment =>
        TableFactoryService
          .find(classOf[BatchTableSourceFactory[T]], javaMap)
          .createBatchTableSource(javaMap)

      case _: StreamTableEnvironment =>
        TableFactoryService
          .find(classOf[StreamTableSourceFactory[T]], javaMap)
          .createStreamTableSource(javaMap)

      case e@_ =>
        throw new TableException(s"Unsupported table environment: ${e.getClass.getName}")
    }
  }

  /**
    * Returns a table sink for a table environment.
    */
  def findAndCreateTableSink[T](
      tableEnvironment: TableEnvironment,
      descriptor: Descriptor)
    : TableSink[T] = {

    val javaMap = descriptor.toProperties

    tableEnvironment match {
      case _: BatchTableEnvironment =>
        TableFactoryService
          .find(classOf[BatchTableSinkFactory[T]], javaMap)
          .createBatchTableSink(javaMap)

      case _: StreamTableEnvironment =>
        TableFactoryService
          .find(classOf[StreamTableSinkFactory[T]], javaMap)
          .createStreamTableSink(javaMap)

      case e@_ =>
        throw new TableException(s"Unsupported table environment: ${e.getClass.getName}")
    }
  }
}
  • TableFactoryUtil是个工具类,主要用于根据指定的TableEnvironment及Descriptor来创建TableSource或TableSink;它内部利用TableFactoryService,使用descriptor.toProperties来寻找对应的TableFactory

TableFactoryService

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactoryService.scala

代码语言:javascript
复制
object TableFactoryService extends Logging {

  private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])

  /**
    * Finds a table factory of the given class and descriptor.
    *
    * @param factoryClass desired factory class
    * @param descriptor descriptor describing the factory configuration
    * @tparam T factory class type
    * @return the matching factory
    */
  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    Preconditions.checkNotNull(descriptor)

    findInternal(factoryClass, descriptor.toProperties, None)
  }

  /**
    * Finds a table factory of the given class, descriptor, and classloader.
    *
    * @param factoryClass desired factory class
    * @param descriptor descriptor describing the factory configuration
    * @param classLoader classloader for service loading
    * @tparam T factory class type
    * @return the matching factory
    */
  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
    Preconditions.checkNotNull(descriptor)
    Preconditions.checkNotNull(classLoader)

    findInternal(factoryClass, descriptor.toProperties, Some(classLoader))
  }

  /**
    * Finds a table factory of the given class and property map.
    *
    * @param factoryClass desired factory class
    * @param propertyMap properties that describe the factory configuration
    * @tparam T factory class type
    * @return the matching factory
    */
  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
    findInternal(factoryClass, propertyMap, None)
  }

  /**
    * Finds a table factory of the given class, property map, and classloader.
    *
    * @param factoryClass desired factory class
    * @param propertyMap properties that describe the factory configuration
    * @param classLoader classloader for service loading
    * @tparam T factory class type
    * @return the matching factory
    */
  def find[T](
      factoryClass: Class[T],
      propertyMap: JMap[String, String],
      classLoader: ClassLoader)
    : T = {
    Preconditions.checkNotNull(classLoader)

    findInternal(factoryClass, propertyMap, Some(classLoader))
  }

  /**
    * Finds a table factory of the given class, property map, and classloader.
    *
    * @param factoryClass desired factory class
    * @param propertyMap properties that describe the factory configuration
    * @param classLoader classloader for service loading
    * @tparam T factory class type
    * @return the matching factory
    */
  private def findInternal[T](
      factoryClass: Class[T],
      propertyMap: JMap[String, String],
      classLoader: Option[ClassLoader])
    : T = {

    Preconditions.checkNotNull(factoryClass)
    Preconditions.checkNotNull(propertyMap)

    val properties = propertyMap.asScala.toMap

    val foundFactories = discoverFactories(classLoader)

    val classFactories = filterByFactoryClass(
      factoryClass,
      properties,
      foundFactories)

    val contextFactories = filterByContext(
      factoryClass,
      properties,
      foundFactories,
      classFactories)

    filterBySupportedProperties(
      factoryClass,
      properties,
      foundFactories,
      contextFactories)
  }

  /**
    * Searches for factories using Java service providers.
    *
    * @return all factories in the classpath
    */
  private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
    try {
      val iterator = classLoader match {
        case Some(customClassLoader) =>
          val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
          customLoader.iterator()
        case None =>
          defaultLoader.iterator()
      }
      iterator.asScala.toSeq
    } catch {
      case e: ServiceConfigurationError =>
        LOG.error("Could not load service provider for table factories.", e)
        throw new TableException("Could not load service provider for table factories.", e)
    }
  }

  /**
    * Filters factories with matching context by factory class.
    */
  private def filterByFactoryClass[T](
      factoryClass: Class[T],
      properties: Map[String, String],
      foundFactories: Seq[TableFactory])
    : Seq[TableFactory] = {

    val classFactories = foundFactories.filter(f => factoryClass.isAssignableFrom(f.getClass))
    if (classFactories.isEmpty) {
      throw new NoMatchingTableFactoryException(
        s"No factory implements '${factoryClass.getCanonicalName}'.",
        factoryClass,
        foundFactories,
        properties)
    }
    classFactories
  }

  /**
    * Filters for factories with matching context.
    *
    * @return all matching factories
    */
  private def filterByContext[T](
      factoryClass: Class[T],
      properties: Map[String, String],
      foundFactories: Seq[TableFactory],
      classFactories: Seq[TableFactory])
    : Seq[TableFactory] = {

    val matchingFactories = classFactories.filter { factory =>
      val requestedContext = normalizeContext(factory)

      val plainContext = mutable.Map[String, String]()
      plainContext ++= requestedContext
      // we remove the version for now until we have the first backwards compatibility case
      // with the version we can provide mappings in case the format changes
      plainContext.remove(CONNECTOR_PROPERTY_VERSION)
      plainContext.remove(FORMAT_PROPERTY_VERSION)
      plainContext.remove(METADATA_PROPERTY_VERSION)
      plainContext.remove(STATISTICS_PROPERTY_VERSION)

      // check if required context is met
      plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)
    }

    if (matchingFactories.isEmpty) {
      throw new NoMatchingTableFactoryException(
        "No context matches.",
        factoryClass,
        foundFactories,
        properties)
    }

    matchingFactories
  }

  /**
    * Prepares the properties of a context to be used for match operations.
    */
  private def normalizeContext(factory: TableFactory): Map[String, String] = {
    val requiredContextJava = factory.requiredContext()
    if (requiredContextJava == null) {
      throw new TableException(
        s"Required context of factory '${factory.getClass.getName}' must not be null.")
    }
    requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)).toMap
  }

  /**
    * Filters the matching class factories by supported properties.
    */
  private def filterBySupportedProperties[T](
      factoryClass: Class[T],
      properties: Map[String, String],
      foundFactories: Seq[TableFactory],
      classFactories: Seq[TableFactory])
    : T = {

    val plainGivenKeys = mutable.ArrayBuffer[String]()
    properties.keys.foreach { k =>
      // replace arrays with wildcard
      val key = k.replaceAll(".\\d+", ".#")
      // ignore duplicates
      if (!plainGivenKeys.contains(key)) {
        plainGivenKeys += key
      }
    }
    var lastKey: Option[String] = None
    val supportedFactories = classFactories.filter { factory =>
      val requiredContextKeys = normalizeContext(factory).keySet
      val (supportedKeys, wildcards) = normalizeSupportedProperties(factory)
      // ignore context keys
      val givenContextFreeKeys = plainGivenKeys.filter(!requiredContextKeys.contains(_))
      // perform factory specific filtering of keys
      val givenFilteredKeys = filterSupportedPropertiesFactorySpecific(
        factory,
        givenContextFreeKeys)

      givenFilteredKeys.forall { k =>
        lastKey = Option(k)
        supportedKeys.contains(k) || wildcards.exists(k.startsWith)
      }
    }

    if (supportedFactories.isEmpty && classFactories.length == 1 && lastKey.isDefined) {
      // special case: when there is only one matching factory but the last property key
      // was incorrect
      val factory = classFactories.head
      val (supportedKeys, _) = normalizeSupportedProperties(factory)
      throw new NoMatchingTableFactoryException(
        s"""
          |The matching factory '${factory.getClass.getName}' doesn't support '${lastKey.get}'.
          |
          |Supported properties of this factory are:
          |${supportedKeys.sorted.mkString("\n")}""".stripMargin,
        factoryClass,
        foundFactories,
        properties)
    } else if (supportedFactories.isEmpty) {
      throw new NoMatchingTableFactoryException(
        s"No factory supports all properties.",
        factoryClass,
        foundFactories,
        properties)
    } else if (supportedFactories.length > 1) {
      throw new AmbiguousTableFactoryException(
        supportedFactories,
        factoryClass,
        foundFactories,
        properties)
    }

    supportedFactories.head.asInstanceOf[T]
  }

  /**
    * Prepares the supported properties of a factory to be used for match operations.
    */
  private def normalizeSupportedProperties(factory: TableFactory): (Seq[String], Seq[String]) = {
    val supportedPropertiesJava = factory.supportedProperties()
    if (supportedPropertiesJava == null) {
      throw new TableException(
        s"Supported properties of factory '${factory.getClass.getName}' must not be null.")
    }
    val supportedKeys = supportedPropertiesJava.asScala.map(_.toLowerCase)

    // extract wildcard prefixes
    val wildcards = extractWildcardPrefixes(supportedKeys)

    (supportedKeys, wildcards)
  }

  /**
    * Converts the prefix of properties with wildcards (e.g., "format.*").
    */
  private def extractWildcardPrefixes(propertyKeys: Seq[String]): Seq[String] = {
    propertyKeys
      .filter(_.endsWith("*"))
      .map(s => s.substring(0, s.length - 1))
  }

  /**
    * Performs filtering for special cases (i.e. table format factories with schema derivation).
    */
  private def filterSupportedPropertiesFactorySpecific(
      factory: TableFactory,
      keys: Seq[String])
    : Seq[String] = factory match {

    case formatFactory: TableFormatFactory[_] =>
      val includeSchema = formatFactory.supportsSchemaDerivation()
      // ignore non-format (or schema) keys
      keys.filter { k =>
        if (includeSchema) {
          k.startsWith(SchemaValidator.SCHEMA + ".") ||
            k.startsWith(FormatDescriptorValidator.FORMAT + ".")
        } else {
          k.startsWith(FormatDescriptorValidator.FORMAT + ".")
        }
      }

    case _ =>
      keys
  }
}
  • TableFactoryService主要用于根据factoryClass及Descriptor(或者descriptor.toProperties)来查找匹配的TableFactory,其主要的匹配逻辑在于filterByFactoryClass、filterByContext、filterBySupportedProperties这几个方法中;filterByFactoryClass方法根据指定的factoryClass查找classFactories;filterByContext方法根据descriptor.toProperties来进一步过滤classFactories得到contextFactories;filterBySupportedProperties方法则根据supportedProperties进一步过滤contextFactories

小结

  • TableFactory定义了requiredContext及supportedProperties两个方法,其中requiredContext用于进行factory的匹配,supportedProperties用于指定factory支持的属性,如果传入了factory不支持的属性则会抛出异常
  • BatchTableSourceFactory继承了TableFactory,定义了createBatchTableSource方法;BatchTableSinkFactory继承了TableFactory,定义了createBatchTableSink方法;StreamTableSourceFactory继承了TableFactory,定义了createStreamTableSource方法;StreamTableSinkFactory继承了TableFactory,定义了createStreamTableSink方法
  • ConnectorDescriptor继承了DescriptorBase,实现了Descriptor接口,它重写了Descriptor接口的toProperties方法,定义内置属性CONNECTOR_TYPE及CONNECTOR_PROPERTY_VERSION,之后还通过抽象方法toConnectorProperties来合并子类定义的属性
  • TableFactoryUtil是个工具类,主要用于根据指定的TableEnvironment及Descriptor来创建TableSource或TableSink;它内部利用TableFactoryService,使用descriptor.toProperties来寻找对应的TableFactory
  • TableFactoryService主要用于根据factoryClass及Descriptor(或者descriptor.toProperties)来查找匹配的TableFactory,其主要的匹配逻辑在于filterByFactoryClass、filterByContext、filterBySupportedProperties这几个方法中;filterByFactoryClass方法根据指定的factoryClass查找classFactories;filterByContext方法根据descriptor.toProperties来进一步过滤classFactories得到contextFactories;filterBySupportedProperties方法则根据supportedProperties进一步过滤contextFactories

doc

  • Define a TableFactory
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-02-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实例
  • TableFactory
  • BatchTableSourceFactory
  • BatchTableSinkFactory
  • StreamTableSourceFactory
  • StreamTableSinkFactory
  • ConnectorDescriptor
  • TableFactoryUtil
  • TableFactoryService
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档