前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Oracle 20c:使用 JDBC 异步访问数据库

Oracle 20c:使用 JDBC 异步访问数据库

作者头像
Grainger
发布2022-04-24 09:51:33
1.3K0
发布2022-04-24 09:51:33
举报
文章被收录于专栏:数据与未来数据与未来

在 ojdbc11.jar 的 JDBC 驱动包中,提供了异步数据库访问的方法,通过非阻塞机制来创建 Connection 对象,执行 SQL 语句、获取行、提交事务、回滚事务、关闭 Connection 对象以及读写 BFILE、BLOB和CLOB。

使用异步数据库访问的前置条件:

  • 使用 JDBC Thin Driver 建立连接
  • 使用 JDK11 和 ojdbc11.jar
  • Oracle 20c 或更高版本

构建异步应用程序的步骤与使用标准方法构建应用程序的步骤相同。但是,在这种情况下,将使用新的异步方法。

  • 使用异步方法打开连接
  • 使用异步方法执行 SQL 语句
  • 使用异步方法获取行数据
  • 使用异步方法读取 LOB 数据
  • 使用异步方法写入 LOB 数据
  • 使用异步方法提交事务
  • 使用异步方法关闭连接

1、使用异步方法打开连接

OracleConnectionBuilder 接口提供了用于异步打开连接的方法。

OracleConnectionBuilder.buildConnectionPublisherOracle 方法返回 Flow.Publisher 类型。发布者发出与订阅者的单个连接。一旦订阅服务器发出需求信号,发布服务器就会异步打开一个新的连接。发布的连接与可以使用 ConnectionBuilder.build 方法构建的连接相同。

下面的示例演示如何异步打开连接。

代码语言:javascript
复制
 /**
  * 异步打开新连接
  * @param dataSource dataSource 参数配置了 URL、用户和密码
  * @return 返回单个连接的发布者
  * @throws SQLException
  */
  Flow.Publisher<OracleConnection> openConnection(DataSource dataSource)
    throws SQLException {
    return dataSource.unwrap(OracleDataSource.class)
      .createConnectionBuilder()
      .buildConnectionPublisherOracle();
  }

2、使用异步方法执行 SQL 语句

OraclePreparedStatement 接口包括用于异步 SQL 执行的方法。每个异步方法执行的功能都类似于相应的 SQL 执行同步方法。

下表中列出了执行同步与异步方法的对应关系:

同步方法

异步方法

boolean execute

Flow.Publisher executeAsyncOracle

long executeLargeUpdate

Flow.Publisher executeUpdateAsyncOracle

long[] executeLargeBatch

Flow.Publisher executeBatchAsyncOracle

ResultSet executeQuery

Flow.Publisher executeQueryAsyncOracle

2.1 使用 executeAsyncOracle 方法的标准 SQL 语句执行

executeAsyncOracle 方法,等效于标准 execute 方法。

可以通过调用 OraclePreparedStatement.executeAsyncOracle 方法来执行任何类型的 SQL 语句。该调用返回 Flow.Publisher 类型。发布者发出一个布尔值,并支持多个订阅者。如果布尔值为 TRUE,则表示 SQL 语句已生成行数据,可从 OraclePreparedStatement.getResultSet 方法访问该数据。如果为 FALSE,则表示 SQL 语句已返回更新计数。布尔结果在语义上等效于 execute 方法返回的布尔值。

代码语言:javascript
复制
/**
   * 通过执行 DDL SQL 语句异步创建新表
   * @param connection 连接到创建表的数据库
   * @return 返回发布者发出执行 DDL SQL 的结果
   * @throws SQLException
   */
  Flow.Publisher<Boolean> createTable(Connection connection)
    throws SQLException {

    PreparedStatement createTableStatement =
      connection.prepareStatement(
        "CREATE TABLE employee_names (" +
          "id NUMBER PRIMARY KEY, " +
          "first_name VARCHAR(50), " +
          "last_name VARCHAR2(50))");

    Flow.Publisher<Boolean> createTablePublisher =
      createTableStatement.unwrap(OraclePreparedStatement.class)
        .executeAsyncOracle();

    createTablePublisher.subscribe(
      // 该订阅者将关闭 PreparedStatement
      new Flow.Subscriber<Boolean>() {
        public void onSubscribe(Flow.Subscription subscription) {
          subscription.request(1L);
        }
        public void onNext(Boolean item) { }
        public void onError(Throwable throwable) { closeStatement(); }
        public void onComplete() { closeStatement(); }
        void closeStatement() {
          try { createTableStatement.close(); }
          catch (SQLException closeException) { log(closeException); }
        }
      });

    return createTablePublisher;
  }

2.2 使用 executeUpdateAsyncOracle 方法执行 DML 语句

executeUpdateAsyncOracle 方法,等效于标准 executeLargeUpdate 方法。

可以使用 OraclePreparedStatement.executeUpdateAsyncOracle 方法执行单个(非批处理)DML语句。返回 Flow.Publisher 类型。返回的发布者包含单个 Long 值。此 Long 值指示 DML 语句更新或要插入的行数。

此 Long 值结果在语义上等效于标准 executeLargeUpdate 方法返回的 long 值。

代码语言:javascript
复制
/**
   * 通过执行 DML SQL 语句异步更新表数据
   * @param connection 连接到表数据所在的数据库
   * @return 返回更新的行数的发布者
   * @throws SQLException
   */
  Flow.Publisher<Long> updateData(Connection connection)
    throws SQLException {

    PreparedStatement updateStatement = connection.prepareStatement(
      "UPDATE employee_names SET " +
        "first_name = UPPER(first_name), " +
        "last_name = UPPER(last_name)");

    Flow.Publisher<Long> updatePublisher =
      updateStatement.unwrap(OraclePreparedStatement.class)
        .executeUpdateAsyncOracle();

    updatePublisher.subscribe(
      
      new Flow.Subscriber<Long>() {
        public void onSubscribe(Flow.Subscription subscription) {
          subscription.request(1L);
        }
        public void onNext(Long item) { }
        public void onError(Throwable throwable) { closeStatement(); }
        public void onComplete() { closeStatement(); }
        void closeStatement() {
          try { updateStatement.close(); }
          catch (SQLException closeException) { log(closeException); }
        }
      });

    return updatePublisher;
  }

2.3 使用 executeBatchAsyncOracle 方法批量执行 DML 语句

executeBatchAsyncOracle 方法,等效于标准 executeLargeBatch 方法。

可以使用 OraclePreparedStatement.executeBatchAsyncOracle 方法执行批处理 DML 语句。该调用返回 Flow.Publisher 类型。返回的发布者将为批处理中的每个语句发出 Long 值。Long 值指示每个 DML 语句更新的行数。这些 Long 值结果在语义上等效于标准 executeLargeBatch 方法返回的 long[] 值。

代码语言:javascript
复制
/**
   * 通过执行一批 DML SQL 语句来异步插入数据
   * @param connection 连接到表数据所在的数据库
   * @return 返回更新的行数的发布者
   * @throws SQLException
   */
  Flow.Publisher<Long> createData(
    Connection connection, Iterable<Employee> employeeData)
    throws SQLException {

    PreparedStatement batchStatement = connection.prepareStatement(
      "INSERT INTO employee_names (id, first_name, last_name) " +
        "VALUES (?, ?, ?)");

    for (Employee employee : employeeData) {
      batchStatement.setLong(1, employee.id());
      batchStatement.setString(2, employee.firstName());
      batchStatement.setString(3, employee.lastName());
      batchStatement.addBatch();
    }

    Flow.Publisher<Long> batchPublisher =
      batchStatement.unwrap(OraclePreparedStatement.class)
        .executeBatchAsyncOracle();

    batchPublisher.subscribe(
      
      new Flow.Subscriber<Long>() {
        public void onSubscribe(Flow.Subscription subscription) {
          subscription.request(Long.MAX_VALUE);
        }
        public void onNext(Long item) { }
        public void onError(Throwable throwable) { closeStatement(); }
        public void onComplete() { closeStatement(); }
        void closeStatement() {
          try { batchStatement.close(); }
          catch (SQLException closeException) { log(closeException); }
        }
      });

    return batchPublisher;
  }

2.4 使用 executeQueryAsyncOracle 方法执行 SQL 查询

executeQueryAsyncOracle 方法,等效于标准 executeQuery 方法。

可以使用 OraclePreparedStatement.executeQueryAsyncOracle 方法执行 SQL 查询语句。该调用返回 Flow.Publisher 类型。返回的发布者发出单个 OracleResultSet 值。OracleResultSet 值提供对由 SQL 查询产生的行数据的访问。此 OracleResultSet 在语义上等效于标准 executeQuery 方法返回的 ResultSet。

代码语言:javascript
复制
/**
   * 通过执行 SELECT SQL 语句异步读取表数据
   * @param connection 连接到表数据所在的数据库
   * @return 返回更新的行数的发布者
   * @throws SQLException
   */
  Flow.Publisher<OracleResultSet> readData(Connection connection)
    throws SQLException {

    PreparedStatement queryStatement = connection.prepareStatement(
      "SELECT id, first_name, last_name FROM employee_names");

    Flow.Publisher<OracleResultSet> queryPublisher =
      queryStatement.unwrap(OraclePreparedStatement.class)
        .executeQueryAsyncOracle();

    // 使用完结果集后,关闭 PreparedStatement。
    queryStatement.closeOnCompletion();

    return queryPublisher;
  }

3、使用异步方法获取行数据

OracleResultSet 接口包含了用于异步行数据获取的方法 PublisherOracle(Function <OracleRow,T>)。该方法的参数是行数据的映射函数。映射函数将应用于 ResultSet 的每一行。该方法返回 Flow.Publisher 类型,其中 T 是映射函数的输出类型。映射函数的输入类型为 OracleRow。OracleRow 表示 ResultSet 的单个行,并公开用于访问该行的列值的方法。

下面的示例为如何使用异步方法获取行数据:

代码语言:javascript
复制
/**
   * 从 ResultSet 异步获取表数据。
   * @param resultSet ResultSet 获取表数据。
   * @return 发布者,将获取的数据作为Employee对象发出。
   * @throws SQLException
   */
  Flow.Publisher<Employee> fetchData(ResultSet resultSet)
    throws SQLException {
    
    Statement resultSetStatement = resultSet.getStatement();

    Flow.Publisher<Employee> employeePublisher =
      resultSet.unwrap(OracleResultSet.class)
        .publisherOracle(oracleRow -> {
          try {
            return new Employee(
              oracleRow.getObject("id", Long.class),
              oracleRow.getObject("first_name", String.class),
              oracleRow.getObject("last_name", String.class));
          }
          catch (SQLException getObjectException) {
            // 行映射函数抛出的未经检查的异常将被发送到每个订阅服务器的 onError 方法。
            throw new RuntimeException(getObjectException);
          }
        });

    employeePublisher.subscribe(
      
      new Flow.Subscriber<Employee>() {
        public void onSubscribe(Flow.Subscription subscription) {
          subscription.request(Long.MAX_VALUE);
        }
        public void onNext(Employee item) { }
        public void onError(Throwable throwable) { closeStatement(); }
        public void onComplete() { closeStatement(); }
        void closeStatement() {
          try { resultSetStatement.close(); }
          catch (SQLException closeException) { log(closeException); }
        }
      });

    return employeePublisher;
  }

OracleRow 实例在函数返回后变为无效。将 OracleRow 的访问限制在映射功能的范围内,使驱动程序可以有效地管理用于存储行数据的内存。

如果需要 OracleRow 的永久副本,则可以使用 OracleRow.clone 方法创建新的 OracleRow 实例。由 clone 方法返回的 OracleRow 在映射函数范围之外仍然有效,并且即使关闭数据库连接后仍保留其数据。

行映射函数必须返回非 null 值或引发未经检查的异常。如果映射函数抛出未经检查的异常,那么它将作为 onError 信号传递给行数据订阅者。行数据发布者支持多个订阅者。

4、使用异步方法读取 LOB 数据

OracleBlob,OracleBFile,OracleClob 和 OracleNClob 接口公开了PublisherOracle(long) 方法,用于异步读取 LOB 数据。

PublisherOracle(long) 方法的参数是从中读取数据的 LOB 的位置。OracleBlob.publisherOracle(long)和OracleBFile.publisherOracle(long)方法返回 Publisher 类型。该发布者发出已从 LOB 读取的二进制数据段。

OracleClob.publisherOracle(long) 和OracleNClob.publisherOracle(long) 方法返回 Publisher 类型。该发布者发出已从 LOB 读取的字符数据段。

下面的示例如何从 LOB 异步读取二进制数据。

代码语言:javascript
复制
/**
   * 从BLOB异步读取二进制数据
   * @param connection
   * @param 与 BLOB 关联的 employeeId ID
   * @return 返回 BLOB 的二进制数据的发者者
   * @throws SQLException
   */
  Flow.Publisher<byte[]> readLOB(Connection connection, long employeeId)
    throws SQLException {
    PreparedStatement lobQueryStatement = connection.prepareStatement(
      "SELECT photo_bytes FROM employee_photos WHERE id = ?");
    lobQueryStatement.setLong(1, employeeId);

    ResultSet resultSet = lobQueryStatement.executeQuery();
    if (!resultSet.next())
      throw new SQLException("No photo found for employee ID " + employeeId);

    OracleBlob photoBlob =
      (OracleBlob)resultSet.unwrap(OracleResultSet.class).getBlob(1);
    Flow.Publisher<byte[]> photoPublisher = photoBlob.publisherOracle(1);

    photoPublisher.subscribe(
      
      new Flow.Subscriber<byte[]>() {
        public void onSubscribe(Flow.Subscription subscription) {
          subscription.request(Long.MAX_VALUE);
        }
        public void onNext(byte[] item) { }
        public void onError(Throwable throwable) { freeResources(); }
        public void onComplete() { freeResources(); }
        void freeResources() {
          try { lobQueryStatement.close(); }
          catch (SQLException closeException) { log(closeException); }
          try { photoBlob.free(); }
          catch (SQLException freeException) { log(freeException); }
        }
      });
    return photoPublisher;
  }

无法配置 LOB 发布者发出的数据段的大小。驱动程序选择根据数据库的DB_BLOCK_SIZE 参数优化的段大小。

5、使用异步方法写入 LOB 数据

OracleBlob,OracleClob 和 OracleNClob 接口公开了用于异步写入 LOB 数据的 subscriberOracle(long) 方法。

SubscriberOracle(long) 方法的参数是 LOB 写入数据的位置。OracleBlob.subscriberOracle(long) 方法返回 Subscriber 类型。此订阅者接收写入 LOB 的二进制数据段。OracleClob.subscriberOracle(long) 方法和OracleNClob.subscriberOracle(long) 方法返回 Subscriber 类型。这些订阅者接收写入 LOB 的字符数据段。

下面的示例如何将二进制数据异步写入LOB。

代码语言:javascript
复制
/**
   * 异步将二进制数据写入BLOB
   * @param connection
   * @param bytesPublisher 发出二进制数据的发布者
   * @return CompletionStage
   * @throws SQLException
   */
  CompletionStage<Blob> writeLOB(
    Connection connection, Flow.Publisher<byte[]> bytesPublisher)
    throws SQLException {

    OracleBlob oracleBlob =
      (OracleBlob) connection.unwrap(OracleConnection.class).createBlob();

    CompletableFuture<Blob> writeFuture = new CompletableFuture<>();

    Flow.Subscriber<byte[]> blobSubscriber =
      oracleBlob.subscriberOracle(1L,
        // 当所有 byte[] 都已写入 BLOB 时,该订阅者将接收到终端信号。
        new Flow.Subscriber<Long>() {
          long totalWriteLength = 0;
          @Override
          public void onSubscribe(Flow.Subscription subscription) {
            subscription.request(Long.MAX_VALUE);
          }
          @Override
          public void onNext(Long writeLength) {
            totalWriteLength += writeLength;
            log(totalWriteLength + " bytes written.");
          }
          @Override
          public void onError(Throwable throwable) {
            writeFuture.completeExceptionally(throwable);
          }
          @Override
          public void onComplete() {
            writeFuture.complete(oracleBlob);
          }
        });

    bytesPublisher.subscribe(blobSubscriber);
    return writeFuture;
  }

OracleBlob,OracleClob 和 OracleNClob 接口还公开了subscriberOracle(long,Subscriber ) 方法,该方法执行与subscriberOracle(long) 方法的单参数形式相同的功能。但是,单参数形式也接受 Subscriber 类型。Subscriber 类型通知订阅者接收针对数据库的写操作的结果。每次异步写操作完成时,Subscriber 类型都会收到一个 onNext 信号,该信号具有该操作写入的字节数或字符数。如果异步写操作失败,则 Subscriber 类型将收到一个 onError 信号。最后的写操作完成后,Subscriber 会收到一个 onComplete 信号。

由 writeLOB 方法返回的 CompletionStage 完成后,可以将所得的 Blob 对象传递给 insertLOB 方法,以将 BLOB 数据存储在表中。

以下示例如何插入数据。

代码语言:javascript
复制
/**
   * 通过执行DML SQL将BLOB数据异步插入表中
   * statement
   * @param connection
   * @param 与BLOB数据相关的employeeId ID
   * @param photoBlob 引用BLOB数据
   * @return 发布者发出插入的行数(总是1)
   * @throws SQLException
   */
  Flow.Publisher<Long> insertLOB(
    Connection connection, long employeeId, Blob photoBlob)
    throws SQLException {

    PreparedStatement lobInsertStatement = connection.prepareStatement(
      "INSERT INTO employee_photos(id, photo_bytes) VALUES (? ,?)");
    lobInsertStatement.setLong(1, employeeId);
    lobInsertStatement.setBlob(2, photoBlob);

    Flow.Publisher<Long> insertPublisher =
      lobInsertStatement.unwrap(OraclePreparedStatement.class)
        .executeUpdateAsyncOracle();

    insertPublisher.subscribe(new Flow.Subscriber<Long>() {
      @Override
      public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(1L);
      }
      @Override
      public void onNext(Long item) { }
      @Override
      public void onError(Throwable throwable) { releaseResources(); }
      @Override
      public void onComplete() { releaseResources(); }
      void releaseResources() {
        try { lobInsertStatement.close(); }
        catch (SQLException closeException) { log(closeException); }
        try { photoBlob.free(); }
        catch (SQLException freeException) { log(freeException); }
      }
    });
    return insertPublisher;
  }

6、使用异步方法提交事务

OracleConnection 接口公开了 commitAsyncOracle 和 rollbackAsyncOracle 方法以完成异步事务。

commitAsyncOracle 和 rollbackAsyncOracle 方法均返回 Flow.Publisher 类型。发布者不发出任何项目,如其 类型所指示。发布者发出一个 onComplete 或 onError 信号,以指示提交或回滚操作是否成功完成。

下面的示例演示如何异步提交事务。

代码语言:javascript
复制
/**
   * 异步提交事务
   * @param connection
   * @return 发布者在事务已提交时发出终端信号
   * @throws SQLException
   */
  public Flow.Publisher<Void> commitTransaction(Connection connection)
    throws SQLException {
    return connection.unwrap(OracleConnection.class)
      .commitAsyncOracle();
  }

commitAsyncOracle 和 rollbackAsyncOracle 方法执行与Connection.commit 和 Connection.rollback 方法相同的功能。

7、使用异步方法关闭连接

OracleConnection 接口公开了 closeAsyncOracle 方法,用于关闭异步连接。

closeAsyncOracle 方法返回 Flow.Publisher 类型。发布者不发出任何项目,如其类型所指示。发布者发出一个 onComplete 或 onError 信号,以指示连接是否成功关闭。

下面的示例如何异步关闭连接。

代码语言:javascript
复制
/**
   * 异步关闭连接
   * @param connection
   * @return 当连接已关闭时发出终端信号的发布服务器
   * @throws SQLException
   */
  Flow.Publisher<Void> closeConnection(Connection connection)
    throws SQLException {
    return connection.unwrap(OracleConnection.class)
      .closeAsyncOracle();
  }

closeAsyncOracle 方法执行与 Connection.close 方法相同的功能。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 山东Oracle用户组 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、使用异步方法打开连接
  • 2、使用异步方法执行 SQL 语句
    • 2.1 使用 executeAsyncOracle 方法的标准 SQL 语句执行
      • 2.2 使用 executeUpdateAsyncOracle 方法执行 DML 语句
        • 2.3 使用 executeBatchAsyncOracle 方法批量执行 DML 语句
          • 2.4 使用 executeQueryAsyncOracle 方法执行 SQL 查询
          • 3、使用异步方法获取行数据
          • 4、使用异步方法读取 LOB 数据
          • 5、使用异步方法写入 LOB 数据
          • 6、使用异步方法提交事务
          • 7、使用异步方法关闭连接
          相关产品与服务
          数据保险箱
          数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档