前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Oceanus 开发自定义SQL Connector指南

Oceanus 开发自定义SQL Connector指南

作者头像
spiderwu
修改2021-08-20 14:48:57
8840
修改2021-08-20 14:48:57
举报
文章被收录于专栏:Flink 实践Flink 实践

Oceanus兼容原生的Flink 框架,基于Flink开发的Connector能够实现100%兼容。

原文:https://blog.csdn.net/leiline/article/details/106925864

Flink 提供了丰富的connector组件帮助用户连接外部系统。但是很多时候原生的connector并不能够完全满足用户的需求,因此需要自定义开发connector组件。本文介绍如何进行Flink1.10 SQL CONNECTOR的开发工作。

通过SPI去加载不同的factory,实现了Connector的统一。

SPI机制

代码语言:txt
复制
SPI,全称为Service Provider Interface,是Java提供的一套用于第三方实现或拓展的API。

基于工厂模式的任务提交

代码语言:txt
复制
public interface PipelineExecutor{
		/**
		 * 执行任务
		 */
		CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration)throws Exception;
}

PipelineExecutor 提供多种实现方式:

代码语言:txt
复制
RemoteExecutor(standalone)

LocalExecutor (local)

YanrJobClusterExecutor (per-job)

YarnSessionClusterExecutor (yarn-session)

支持用户在多种场景下提交flink任务。

Flink 提供PipelineExecutorServiceLoader接口,其中实现类DefaultExecutorServiceLoader支持通过名称去加载类。它的原理就是通过SPI机制去查找flink所提供的所有的工厂类,找到合适的类,进行加载。

基于工厂模式的SQL CONNECTOR设计

注:Flink 1.10提供SPI方式支持与connector进行交互,Flink会去扫描包中resources/META-INF/services目录下的org.apache.flink.table.factories.TableFactory,获取所有Factory类,根据sql中with传进来的参数(k=v)进行匹配,找到匹配到的那个Factory类,如果没有找到的话,则会报错。自定义connector没有此文件的话需创建,并将自定connector类路径写入此文件。

代码语言:txt
复制
public class SqlHdfsSinkFactory implements StreamTableSinkFactory<Row> {

    @Override
    public List<String> supportedProperties() {
        List<String> properties = new ArrayList<>();
        properties.add(SqlConstants.HIVE_CONNECTOR_PROPERTY_DB_NAME);
        properties.add(SqlConstants.HIVE_CONNECTOR_PROPERTY_TABLE_NAME);
        properties.add(SqlConstants.HIVE_CONNECTOR_PROPERTY_CONFIG);

        // schema
        properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
        properties.add(SCHEMA + ".#." + SCHEMA_NAME);

        return properties;
    }

    @Override
    public Map<String, String> requiredContext() {
        Map<String, String> context = new HashMap<>();
        context.put(SqlConstants.HIVE_CONNECTOR_PROPERTY_CONNECTOR_TYPE, SqlConstants.CONNECTOR_TYPE_VALUE_HIVE); // hive
        context.put(SqlConstants.HIVE_CONNECTOR_PROPERTY_PACKAGE_VERSION, hiveVersion()); // version
        context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility

        return context;
    }

    @Override
    public TableSink createTableSink(Map properties) {
        return new SqlHdfsSink(properties
                .get(SqlConstants.HIVE_CONNECTOR_PROPERTY_CONFIG).toString());
    }

    @Override
    public StreamTableSink createStreamTableSink(Map properties) {

        return new SqlHdfsSink(properties
                .get(SqlConstants.HIVE_CONNECTOR_PROPERTY_CONFIG).toString());

    }

    private String hiveVersion() {
        return SqlConstants.CONNECTOR_HIVE_VERSION_VALUE_211;
    }
}

其中对几个方法进行了重写:

supportedProperties() 记录sink参数对应的k-v值,这里参数值是固定的

requiredContext() 记录sink参数对应的k-v值,这里参数值是支持动态的

createTableSink() 创建sink对象的入口, 其中properties参数表示sql with语句中的k-v。

在createTableSink()方法中,实例化了SqlHdfsSink类,这个类实现了AppendStreamTableSink接口,是真正将数据写入到HDFS中的核心类。

public class SqlHdfsSink implements AppendStreamTableSink<Row> { }

编译打包

代码语言:txt
复制
mvn clean package -DskipTests -Dtest.skip=true -Drat.skip=true -Dcheckstyle.skip=true -Dskip.npm=true -Denforcer.skip=true

使用自定义的SQL Sink

Flink 1.10使用classLoader方式加载sql sink,对于用户来讲使用起来比较简单,定义好StreamTableEnvironment stEnv = StreamTableEnvironment.create(env, settings) 即可将SQL语句传到stEnv中,自动加载sink。

代码语言:txt
复制
stEnv.sqlUpdate("CREATE TABLE gis_log_sink (\n" +
                "   deptSiteId STRING,\n" +
                "   latitude STRING,\n" +
                "   passZoneCode STRING\n" +
                "   ) \n" +
                "with (\n" +
                "    'version' = '2.1.1',\n" +
                "    'dbName' = 'dm_drcs',\n" +
                "    'connectorType' = 'sink',\n" +
                "    'connectorName' = 'HIVE',\n" +
                "    'columnDelimiter' = '\\u0001',\n" +
                "    'hadoopClusterName' = 'bdp_sit',\n" +
                "    'timeoutRollInterval' = '1000',\n" +
                "    'configuration' = '{\"properties\":{\"batchSize\":10485760,\"columnDelimiter\":\"\\\\u0001\",\"dateTimeBucketer\":\"yyyyMMdd\",\"dbNamePrefix\":\"\",\"dbName\":\"dm_lxb001\",\"tableName\":\"17757_tjn_gis_table110\",\"timeoutRollInterval\":8,\"tmpPath\":\"hdfs://10.202.77.200/hive/tmp/dm/dm_lxb001/tjn_gis_table111\",\"finalPath\":\"hdfs://10.202.77.200/hive/warehouse/dm/dm_lxb001/tjn_gis_table110\",\"tableColumns\":[{\"name\":\"deptSite\",\"type\":\"string\"},{\"name\":\"gpsTime\",\"type\":\"string\"},{\"name\":\"latitude\",\"type\":\"string\"}],\"hdfsConfigInfo\":{\"namenodeInfoList\":[{\"nodeName\":\"namenode1\",\"nodeAddr\":\"10.202.77.200\",\"nodePort\":\"8020\"},{\"nodeName\":\"namenode2\",\"nodeAddr\":\"10.202.77.201\",\"nodePort\":\"8020\"}],\"nameservices\":\"test-cluster\"},\"inputFormat\":\"text\",\"insertFieldNames\":\"deptsite, gpstime, latitude\"}}'\n" +
                ")");

总结

Flink 1.10基于SPI机制加载CONNECTOR ,统一了Flink 与外部系统的交互,也降低了用户实现自定义CONNECTOR的门槛。

开发自定义SQL Connector,本质上是在原有的Sink类前面封装一个factory类,Flink会去读取这个factory 类,并将其加载到runtime中执行。

开源Connector参考

apache-flink connectors参考列表:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/

bahir-flink connectors参考列表:

https://github.com/apache/bahir-flink

ververica/flink connectors参考列表:

https://github.com/ververica/flink-cdc-connectors

图数据库:

neo4j:

https://github.com/albertodelazzari/flink-neo4j

Nebula Graph:

https://github.com/vesoft-inc/nebula-flink-connector

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档