前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >NIFI里的数据库连接池

NIFI里的数据库连接池

作者头像
@阿诚
发布2020-09-15 12:01:11
2.4K0
发布2020-09-15 12:01:11
举报
文章被收录于专栏:Panda诚Panda诚

通常我们在NIFI里最常见的使用场景就是读写关系型数据库,一些组件比如GenerateTableFetch、ExecuteSQL、PutSQL、ExecuteSQLRecord、PutDatabaseRecord等等,都会有一个属性配置大概叫Database Connection Pooling Service的,对应的接口是DBCPService,其实现类有:HiveConnectionPool DBCPConnectionPool DBCPConnectionPoolLookup。我们用的最多的就是DBCPConnectionPool。具体怎么配置这里就不赘述了,看对应的Controller Service文档就可以了。

Database Connection URL

这里大概罗列几个通用的URL(ip+port+db):

代码语言:javascript
复制
  Oracle: jdbc:oracle:thin:@{}:{}:{}
  Oracle 12+: jdbc:oracle:thin:@{}:{}:{}
  MySQL: jdbc:mysql://{}:{}/{}?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&useSSL=false
  Greenplum: jdbc:pivotal:greenplum://{}:{};DatabaseName={}
  PostgreSQL: jdbc:postgresql://{}:{}/{}
  DB2: jdbc:db2://{}:{}/{}
  MS SQL 2008: jdbc:sqlserver://{}:{};DatabaseName={}
  MS SQL 2012+: jdbc:sqlserver://{}:{};DatabaseName={}
  Cache: jdbc:Cache://{}:{}/{}
  Ignite: jdbc:ignite:thin://{}:{};schema={}

Database Driver Class Name

代码语言:javascript
复制
  Oracle: oracle.jdbc.driver.OracleDriver
  Oracle 12+: oracle.jdbc.driver.OracleDriver
  MySQL: com.mysql.jdbc.Driver
  Greenplum: com.pivotal.jdbc.GreenplumDriver
  PostgreSQL: org.postgresql.Driver
  DB2: com.ibm.db2.jcc.DB2Driver
  MS SQL 2008: com.microsoft.sqlserver.jdbc.SQLServerDriver
  MS SQL 2012+: com.microsoft.sqlserver.jdbc.SQLServerDriver
  Cache: com.intersys.jdbc.CacheDriver
  Ignite: org.apache.ignite.IgniteJdbcThinDriver

Database Driver Location(s)

这里有一个小窍门,我们在部署NIFI的时候,通常应该预置一些JDBC驱动,比如说在NIFI目录下新建一个jdbc的目录,里面是各种数据库的驱动文件。然后在指定驱动的时候,我们使用NIFI表达式语言${NIFI_HOME}来获取NIFI的安装目录,进而就可以通用的去获取指定的驱动包了。(这里是利用NIFI表达式语言读取环境变量的功能,NIFI_HOME是在启动的时候设置的临时环境变量,在window10下可能会有些问题,如果是部署Linux以外的环境,还需要自己测试一番。)

代码语言:javascript
复制
  Oracle: ${NIFI_HOME:append('/jdbc/oracle-jdbc-11.2.04.jar')}
  Oracle 12+: ${NIFI_HOME:append('/jdbc/oracle-jdbc-11.2.04.jar')}
  LogMiner: ${NIFI_HOME:append('/jdbc/ojdbc8.jar')}
  MySQL: ${NIFI_HOME:append('/jdbc/mysql-connector-java-5.1.46.jar')}
  Greenplum: ${NIFI_HOME:append('/jdbc/greenplum-1.0.jar')}
  PostgreSQL: ${NIFI_HOME:append('/jdbc/postgresql-9.4.1212.jar')}
  MS SQL 2008: ${NIFI_HOME:append('/jdbc/mssql-jdbc-6.5.4.jre8-preview.jar')}
  MS SQL 2012+: ${NIFI_HOME:append('/jdbc/mssql-jdbc-6.5.4.jre8-preview.jar')}
  Cache: ${NIFI_HOME:append('/jdbc/cache-jdbc-2.0.0.jar')}
  Ignite: ${NIFI_HOME:append('/jdbc/ignite-core-2.8.0.jar')}

底层连接池的选择

代码语言:javascript
复制
 <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-dbcp2</artifactId>
</dependency>

几个研究

研究1:获取数据库连接会有什么异常?

首先我们看一下接口DBCPService.java,这里我们只看到了ProcessException异常,还得看具体实现类。这里额外说一下,getConnection(Map<String,String> attributes)最终是在DBCPConnectionPoolLookup里实现的,作用是根据属性database.name的值去查找对应的DBCPConnectionPool.

代码语言:javascript
复制
Connection getConnection() throws ProcessException;
/**
* Allows a Map of attributes to be passed to the DBCPService for use in configuration, etc.
* An implementation will want to override getConnection() to return getConnection(Collections.emptyMap()),
* and override this method (possibly with its existing getConnection() implementation).
* @param attributes a Map of attributes to be passed to the DBCPService. The use of these
*                   attributes is implementation-specific, and the source of the attributes
*                   is processor-specific
* @return a Connection from the specifed/configured connection pool(s)
* @throws ProcessException if an error occurs while getting a connection
*/
default Connection getConnection(Map<String,String> attributes) throws ProcessException {
    // default implementation (for backwards compatibility) is to call getConnection()
    // without attributes
    return getConnection();
}

DBCPConnectionPool里,使用的是commons-dbcp2获取数据库连接,当if a database access error occurs或超时,会抛出SQLException

代码语言:javascript
复制
@Override
public Connection getConnection() throws ProcessException {
    try {
        final Connection con;
        if (kerberosUser != null) {
            KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger());
            con = kerberosAction.execute();
        } else {
            con = dataSource.getConnection();
        }
        return con;
    } catch (final SQLException e) {
        throw new ProcessException(e);
    }
}

研究2:在某些原因下(IP ping不通、数据库挂了),抛出异常了,组件的流文件怎么办?

这里因为最后抛出的是ProcessException异常,是一个RuntimeException,而获取连接这个动作是在被调度的方法onTrigger里的。先看简单的比如ExecuteSQL,这类组件都是继承AbstractProcessor:

代码语言:javascript
复制
@Override
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
    final ProcessSession session = sessionFactory.createSession();
    try {
        onTrigger(context, session);
        session.commit();
    } catch (final Throwable t) {
        session.rollback(true);
        throw t;
    }
}

看到了session.rollback(true)大概我们就比较明朗了,获取连接异常,被捕获然后NIFI事务回滚了,流文件是回到组件的上游去了。而比如GenerateTableFetch这种的

代码语言:javascript
复制
try (final Connection con = dbcpService.getConnection(finalFileToProcess == null ? Collections.emptyMap() : finalFileToProcess.getAttributes());
            final Statement st = con.createStatement()) {
        ...
    } catch (SQLException e) {
        ...
    }
    ...
} catch (final ProcessException pe) {
    // Log the cause of the ProcessException if it is available
    Throwable t = (pe.getCause() == null ? pe : pe.getCause());
    logger.error("Error during processing: {}", new Object[]{t.getMessage()}, t);
    session.rollback();
    context.yield();
}

最终也会在自己的onTriggersession.rollback()的。比较麻烦的是PutSQL这一类,函数式编程有些绕的

代码语言:javascript
复制
public void onTrigger(ProcessContext context, ProcessSession session, FC functionContext) throws ProcessException {
    ....
    // Pass the FlowFiles to initialize a connection
    try (C connection = initConnection.apply(context, session, functionContext, flowFiles)) {
        ......
    } catch (ProcessException e) {
        throw e;
    } catch (Exception e) {
        // Throw uncaught exception as RuntimeException so that this processor will be yielded.
        final String msg = String.format("Failed to execute due to %s", e);
        logger.error(msg, e);
        throw new RuntimeException(msg, e);
    }
}

上面这段是抽象出去的函数,在PutSQL里被这段代码process.onTrigger(context, session, functionContext)调用

代码语言:javascript
复制
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
    final Boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
    final FunctionContext functionContext = new FunctionContext(rollbackOnFailure);
    functionContext.obtainKeys = context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean();
    RollbackOnFailure.onTrigger(context, sessionFactory, functionContext, getLogger(), session -> process.onTrigger(context, session, functionContext));
}

RollbackOnFailure又传了一段函数(session, t) -> {} ,我们看到session.rollback(shouldPenalize),也是NIFI事务回滚的。

代码语言:javascript
复制
PartialFunctions.onTrigger(context, sessionFactory, logger, onTrigger, (session, t) -> {
    // If RollbackOnFailure is enabled, do not penalize processing FlowFiles when rollback,
    // in order to keep those in the incoming relationship to be processed again.
    final boolean shouldPenalize = !functionContext.isRollbackOnFailure();
    session.rollback(shouldPenalize);

    // However, keeping failed FlowFile in the incoming relationship would retry it too often.
    // So, administratively yield the process.
    if (functionContext.isRollbackOnFailure()) {
        logger.warn("Administratively yielding {} after rolling back due to {}", new Object[]{context.getName(), t}, t);
        context.yield();
    }
});

总体上看,与数据库连接池相关组件在遇到获取数据库连接抛出ProcessException时都是会NIFI回滚事务的,流文件会重返组件上游Connection。

这个疑问再啰嗦一句,这里纠结的是获取数据库连接获得异常,抛出ProcessException后,流文件会回滚到上游还是传输到下游的问题,不要与执行SQL异常混淆了(执行SQL抛出的SQLExeception各个代码都有处理,但毫无疑问流文件都是流向下游的)。然后PutSQL PutDatabaseRecord之类的Rollback On Failure,设置为true的时候,执行SQL报错抛出的SQLExeception也会NIFI回滚事务。

疑问3:多组件多线程,获取数据库连接的总线程数多过数据库连接池里的连接,会怎么样?

我们直接看DBCP2里的核心获取Connection的方法,没有连接了就返回NULL

代码语言:javascript
复制
@Override
public Connection getConnection() throws SQLException {
    try {
        final C conn = pool.borrowObject();
        if (conn == null) {
            return null;
        }
        return new PoolGuardConnectionWrapper<>(conn);
    } ....
}

然后在默认情况下连接池是阻塞队列,当连接池中的连接都被使用,无法立即获取到可用的连接,其中数据库连接池Max Wait Time配置会影响阻塞等待时间(-1是无限阻塞),阻塞等待超过这个时间还没有可用的连接,就会抛出异常。抛出异常后,NIFI回滚事务,流文件还是回到上游。

但是,Max Wait Time设置成-1无限阻塞显然是不合适的,我们可以酌情设置一个时间(估计一下一般一个Connection拿出来,执行SQL,还回池里需要的事件)。最好是建流程的时候,衡量处理器和线程的数量与此连接池的最大连接数,在数据库连接的时候,让处理器处理数据的时候总是可以获取到一个连接,毕竟阻塞在那里,还是耗服务器的资源的。

DBCPConnectionPoolLookup

DBCPConnectionPoolLookup这个Controller Service很简单,也非常有用,说白了,它就是保存了一个我们使用者定义的Map,key是我们自己命名的,value是我们选择的当前流程可用的DBCPConnectionPool,然后在流程运行过程中,DBCPConnectionPoolLookup根据FlowFile中一个叫database.name的属性去这个Map里查找DBCPConnectionPool。使用DBCPConnectionPoolLookup的最大优点是什么?灵活啊!组件不绑定于一个数据库,根据流文件中的属性动态去查找对应的数据库。

代码语言:javascript
复制
文章有帮助的话,小手一抖点击在看,并转发吧。
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-09-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Panda诚 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Database Connection URL
  • Database Driver Class Name
  • Database Driver Location(s)
  • 底层连接池的选择
  • 几个研究
  • DBCPConnectionPoolLookup
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档