在 ojdbc11.jar 的 JDBC 驱动包中,提供了异步数据库访问的方法,通过非阻塞机制来创建 Connection 对象,执行 SQL 语句、获取行、提交事务、回滚事务、关闭 Connection 对象以及读写 BFILE、BLOB和CLOB。
使用异步数据库访问的前置条件:
构建异步应用程序的步骤与使用标准方法构建应用程序的步骤相同。但是,在这种情况下,将使用新的异步方法。
OracleConnectionBuilder 接口提供了用于异步打开连接的方法。
OracleConnectionBuilder.buildConnectionPublisherOracle 方法返回 Flow.Publisher 类型。发布者发出与订阅者的单个连接。一旦订阅服务器发出需求信号,发布服务器就会异步打开一个新的连接。发布的连接与可以使用 ConnectionBuilder.build 方法构建的连接相同。
下面的示例演示如何异步打开连接。
/**
* 异步打开新连接
* @param dataSource dataSource 参数配置了 URL、用户和密码
* @return 返回单个连接的发布者
* @throws SQLException
*/
Flow.Publisher<OracleConnection> openConnection(DataSource dataSource)
throws SQLException {
return dataSource.unwrap(OracleDataSource.class)
.createConnectionBuilder()
.buildConnectionPublisherOracle();
}
OraclePreparedStatement 接口包括用于异步 SQL 执行的方法。每个异步方法执行的功能都类似于相应的 SQL 执行同步方法。
下表中列出了执行同步与异步方法的对应关系:
同步方法 | 异步方法 |
---|---|
boolean execute | Flow.Publisher executeAsyncOracle |
long executeLargeUpdate | Flow.Publisher executeUpdateAsyncOracle |
long[] executeLargeBatch | Flow.Publisher executeBatchAsyncOracle |
ResultSet executeQuery | Flow.Publisher executeQueryAsyncOracle |
executeAsyncOracle 方法,等效于标准 execute 方法。
可以通过调用 OraclePreparedStatement.executeAsyncOracle 方法来执行任何类型的 SQL 语句。该调用返回 Flow.Publisher 类型。发布者发出一个布尔值,并支持多个订阅者。如果布尔值为 TRUE,则表示 SQL 语句已生成行数据,可从 OraclePreparedStatement.getResultSet 方法访问该数据。如果为 FALSE,则表示 SQL 语句已返回更新计数。布尔结果在语义上等效于 execute 方法返回的布尔值。
/**
* 通过执行 DDL SQL 语句异步创建新表
* @param connection 连接到创建表的数据库
* @return 返回发布者发出执行 DDL SQL 的结果
* @throws SQLException
*/
Flow.Publisher<Boolean> createTable(Connection connection)
throws SQLException {
PreparedStatement createTableStatement =
connection.prepareStatement(
"CREATE TABLE employee_names (" +
"id NUMBER PRIMARY KEY, " +
"first_name VARCHAR(50), " +
"last_name VARCHAR2(50))");
Flow.Publisher<Boolean> createTablePublisher =
createTableStatement.unwrap(OraclePreparedStatement.class)
.executeAsyncOracle();
createTablePublisher.subscribe(
// 该订阅者将关闭 PreparedStatement
new Flow.Subscriber<Boolean>() {
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(1L);
}
public void onNext(Boolean item) { }
public void onError(Throwable throwable) { closeStatement(); }
public void onComplete() { closeStatement(); }
void closeStatement() {
try { createTableStatement.close(); }
catch (SQLException closeException) { log(closeException); }
}
});
return createTablePublisher;
}
executeUpdateAsyncOracle 方法,等效于标准 executeLargeUpdate 方法。
可以使用 OraclePreparedStatement.executeUpdateAsyncOracle 方法执行单个(非批处理)DML语句。返回 Flow.Publisher 类型。返回的发布者包含单个 Long 值。此 Long 值指示 DML 语句更新或要插入的行数。
此 Long 值结果在语义上等效于标准 executeLargeUpdate 方法返回的 long 值。
/**
* 通过执行 DML SQL 语句异步更新表数据
* @param connection 连接到表数据所在的数据库
* @return 返回更新的行数的发布者
* @throws SQLException
*/
Flow.Publisher<Long> updateData(Connection connection)
throws SQLException {
PreparedStatement updateStatement = connection.prepareStatement(
"UPDATE employee_names SET " +
"first_name = UPPER(first_name), " +
"last_name = UPPER(last_name)");
Flow.Publisher<Long> updatePublisher =
updateStatement.unwrap(OraclePreparedStatement.class)
.executeUpdateAsyncOracle();
updatePublisher.subscribe(
new Flow.Subscriber<Long>() {
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(1L);
}
public void onNext(Long item) { }
public void onError(Throwable throwable) { closeStatement(); }
public void onComplete() { closeStatement(); }
void closeStatement() {
try { updateStatement.close(); }
catch (SQLException closeException) { log(closeException); }
}
});
return updatePublisher;
}
executeBatchAsyncOracle 方法,等效于标准 executeLargeBatch 方法。
可以使用 OraclePreparedStatement.executeBatchAsyncOracle 方法执行批处理 DML 语句。该调用返回 Flow.Publisher 类型。返回的发布者将为批处理中的每个语句发出 Long 值。Long 值指示每个 DML 语句更新的行数。这些 Long 值结果在语义上等效于标准 executeLargeBatch 方法返回的 long[] 值。
/**
* 通过执行一批 DML SQL 语句来异步插入数据
* @param connection 连接到表数据所在的数据库
* @return 返回更新的行数的发布者
* @throws SQLException
*/
Flow.Publisher<Long> createData(
Connection connection, Iterable<Employee> employeeData)
throws SQLException {
PreparedStatement batchStatement = connection.prepareStatement(
"INSERT INTO employee_names (id, first_name, last_name) " +
"VALUES (?, ?, ?)");
for (Employee employee : employeeData) {
batchStatement.setLong(1, employee.id());
batchStatement.setString(2, employee.firstName());
batchStatement.setString(3, employee.lastName());
batchStatement.addBatch();
}
Flow.Publisher<Long> batchPublisher =
batchStatement.unwrap(OraclePreparedStatement.class)
.executeBatchAsyncOracle();
batchPublisher.subscribe(
new Flow.Subscriber<Long>() {
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
public void onNext(Long item) { }
public void onError(Throwable throwable) { closeStatement(); }
public void onComplete() { closeStatement(); }
void closeStatement() {
try { batchStatement.close(); }
catch (SQLException closeException) { log(closeException); }
}
});
return batchPublisher;
}
executeQueryAsyncOracle 方法,等效于标准 executeQuery 方法。
可以使用 OraclePreparedStatement.executeQueryAsyncOracle 方法执行 SQL 查询语句。该调用返回 Flow.Publisher 类型。返回的发布者发出单个 OracleResultSet 值。OracleResultSet 值提供对由 SQL 查询产生的行数据的访问。此 OracleResultSet 在语义上等效于标准 executeQuery 方法返回的 ResultSet。
/**
* 通过执行 SELECT SQL 语句异步读取表数据
* @param connection 连接到表数据所在的数据库
* @return 返回更新的行数的发布者
* @throws SQLException
*/
Flow.Publisher<OracleResultSet> readData(Connection connection)
throws SQLException {
PreparedStatement queryStatement = connection.prepareStatement(
"SELECT id, first_name, last_name FROM employee_names");
Flow.Publisher<OracleResultSet> queryPublisher =
queryStatement.unwrap(OraclePreparedStatement.class)
.executeQueryAsyncOracle();
// 使用完结果集后,关闭 PreparedStatement。
queryStatement.closeOnCompletion();
return queryPublisher;
}
OracleResultSet 接口包含了用于异步行数据获取的方法 PublisherOracle(Function <OracleRow,T>)。该方法的参数是行数据的映射函数。映射函数将应用于 ResultSet 的每一行。该方法返回 Flow.Publisher 类型,其中 T 是映射函数的输出类型。映射函数的输入类型为 OracleRow。OracleRow 表示 ResultSet 的单个行,并公开用于访问该行的列值的方法。
下面的示例为如何使用异步方法获取行数据:
/**
* 从 ResultSet 异步获取表数据。
* @param resultSet ResultSet 获取表数据。
* @return 发布者,将获取的数据作为Employee对象发出。
* @throws SQLException
*/
Flow.Publisher<Employee> fetchData(ResultSet resultSet)
throws SQLException {
Statement resultSetStatement = resultSet.getStatement();
Flow.Publisher<Employee> employeePublisher =
resultSet.unwrap(OracleResultSet.class)
.publisherOracle(oracleRow -> {
try {
return new Employee(
oracleRow.getObject("id", Long.class),
oracleRow.getObject("first_name", String.class),
oracleRow.getObject("last_name", String.class));
}
catch (SQLException getObjectException) {
// 行映射函数抛出的未经检查的异常将被发送到每个订阅服务器的 onError 方法。
throw new RuntimeException(getObjectException);
}
});
employeePublisher.subscribe(
new Flow.Subscriber<Employee>() {
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
public void onNext(Employee item) { }
public void onError(Throwable throwable) { closeStatement(); }
public void onComplete() { closeStatement(); }
void closeStatement() {
try { resultSetStatement.close(); }
catch (SQLException closeException) { log(closeException); }
}
});
return employeePublisher;
}
OracleRow 实例在函数返回后变为无效。将 OracleRow 的访问限制在映射功能的范围内,使驱动程序可以有效地管理用于存储行数据的内存。
如果需要 OracleRow 的永久副本,则可以使用 OracleRow.clone 方法创建新的 OracleRow 实例。由 clone 方法返回的 OracleRow 在映射函数范围之外仍然有效,并且即使关闭数据库连接后仍保留其数据。
行映射函数必须返回非 null 值或引发未经检查的异常。如果映射函数抛出未经检查的异常,那么它将作为 onError 信号传递给行数据订阅者。行数据发布者支持多个订阅者。
OracleBlob,OracleBFile,OracleClob 和 OracleNClob 接口公开了PublisherOracle(long) 方法,用于异步读取 LOB 数据。
PublisherOracle(long) 方法的参数是从中读取数据的 LOB 的位置。OracleBlob.publisherOracle(long)和OracleBFile.publisherOracle(long)方法返回 Publisher 类型。该发布者发出已从 LOB 读取的二进制数据段。
OracleClob.publisherOracle(long) 和OracleNClob.publisherOracle(long) 方法返回 Publisher 类型。该发布者发出已从 LOB 读取的字符数据段。
下面的示例如何从 LOB 异步读取二进制数据。
/**
* 从BLOB异步读取二进制数据
* @param connection
* @param 与 BLOB 关联的 employeeId ID
* @return 返回 BLOB 的二进制数据的发者者
* @throws SQLException
*/
Flow.Publisher<byte[]> readLOB(Connection connection, long employeeId)
throws SQLException {
PreparedStatement lobQueryStatement = connection.prepareStatement(
"SELECT photo_bytes FROM employee_photos WHERE id = ?");
lobQueryStatement.setLong(1, employeeId);
ResultSet resultSet = lobQueryStatement.executeQuery();
if (!resultSet.next())
throw new SQLException("No photo found for employee ID " + employeeId);
OracleBlob photoBlob =
(OracleBlob)resultSet.unwrap(OracleResultSet.class).getBlob(1);
Flow.Publisher<byte[]> photoPublisher = photoBlob.publisherOracle(1);
photoPublisher.subscribe(
new Flow.Subscriber<byte[]>() {
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
public void onNext(byte[] item) { }
public void onError(Throwable throwable) { freeResources(); }
public void onComplete() { freeResources(); }
void freeResources() {
try { lobQueryStatement.close(); }
catch (SQLException closeException) { log(closeException); }
try { photoBlob.free(); }
catch (SQLException freeException) { log(freeException); }
}
});
return photoPublisher;
}
无法配置 LOB 发布者发出的数据段的大小。驱动程序选择根据数据库的DB_BLOCK_SIZE 参数优化的段大小。
OracleBlob,OracleClob 和 OracleNClob 接口公开了用于异步写入 LOB 数据的 subscriberOracle(long) 方法。
SubscriberOracle(long) 方法的参数是 LOB 写入数据的位置。OracleBlob.subscriberOracle(long) 方法返回 Subscriber 类型。此订阅者接收写入 LOB 的二进制数据段。OracleClob.subscriberOracle(long) 方法和OracleNClob.subscriberOracle(long) 方法返回 Subscriber 类型。这些订阅者接收写入 LOB 的字符数据段。
下面的示例如何将二进制数据异步写入LOB。
/**
* 异步将二进制数据写入BLOB
* @param connection
* @param bytesPublisher 发出二进制数据的发布者
* @return CompletionStage
* @throws SQLException
*/
CompletionStage<Blob> writeLOB(
Connection connection, Flow.Publisher<byte[]> bytesPublisher)
throws SQLException {
OracleBlob oracleBlob =
(OracleBlob) connection.unwrap(OracleConnection.class).createBlob();
CompletableFuture<Blob> writeFuture = new CompletableFuture<>();
Flow.Subscriber<byte[]> blobSubscriber =
oracleBlob.subscriberOracle(1L,
// 当所有 byte[] 都已写入 BLOB 时,该订阅者将接收到终端信号。
new Flow.Subscriber<Long>() {
long totalWriteLength = 0;
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Long writeLength) {
totalWriteLength += writeLength;
log(totalWriteLength + " bytes written.");
}
@Override
public void onError(Throwable throwable) {
writeFuture.completeExceptionally(throwable);
}
@Override
public void onComplete() {
writeFuture.complete(oracleBlob);
}
});
bytesPublisher.subscribe(blobSubscriber);
return writeFuture;
}
OracleBlob,OracleClob 和 OracleNClob 接口还公开了subscriberOracle(long,Subscriber ) 方法,该方法执行与subscriberOracle(long) 方法的单参数形式相同的功能。但是,单参数形式也接受 Subscriber 类型。Subscriber 类型通知订阅者接收针对数据库的写操作的结果。每次异步写操作完成时,Subscriber 类型都会收到一个 onNext 信号,该信号具有该操作写入的字节数或字符数。如果异步写操作失败,则 Subscriber 类型将收到一个 onError 信号。最后的写操作完成后,Subscriber 会收到一个 onComplete 信号。
由 writeLOB 方法返回的 CompletionStage 完成后,可以将所得的 Blob 对象传递给 insertLOB 方法,以将 BLOB 数据存储在表中。
以下示例如何插入数据。
/**
* 通过执行DML SQL将BLOB数据异步插入表中
* statement
* @param connection
* @param 与BLOB数据相关的employeeId ID
* @param photoBlob 引用BLOB数据
* @return 发布者发出插入的行数(总是1)
* @throws SQLException
*/
Flow.Publisher<Long> insertLOB(
Connection connection, long employeeId, Blob photoBlob)
throws SQLException {
PreparedStatement lobInsertStatement = connection.prepareStatement(
"INSERT INTO employee_photos(id, photo_bytes) VALUES (? ,?)");
lobInsertStatement.setLong(1, employeeId);
lobInsertStatement.setBlob(2, photoBlob);
Flow.Publisher<Long> insertPublisher =
lobInsertStatement.unwrap(OraclePreparedStatement.class)
.executeUpdateAsyncOracle();
insertPublisher.subscribe(new Flow.Subscriber<Long>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(1L);
}
@Override
public void onNext(Long item) { }
@Override
public void onError(Throwable throwable) { releaseResources(); }
@Override
public void onComplete() { releaseResources(); }
void releaseResources() {
try { lobInsertStatement.close(); }
catch (SQLException closeException) { log(closeException); }
try { photoBlob.free(); }
catch (SQLException freeException) { log(freeException); }
}
});
return insertPublisher;
}
OracleConnection 接口公开了 commitAsyncOracle 和 rollbackAsyncOracle 方法以完成异步事务。
commitAsyncOracle 和 rollbackAsyncOracle 方法均返回 Flow.Publisher 类型。发布者不发出任何项目,如其 类型所指示。发布者发出一个 onComplete 或 onError 信号,以指示提交或回滚操作是否成功完成。
下面的示例演示如何异步提交事务。
/**
* 异步提交事务
* @param connection
* @return 发布者在事务已提交时发出终端信号
* @throws SQLException
*/
public Flow.Publisher<Void> commitTransaction(Connection connection)
throws SQLException {
return connection.unwrap(OracleConnection.class)
.commitAsyncOracle();
}
commitAsyncOracle 和 rollbackAsyncOracle 方法执行与Connection.commit 和 Connection.rollback 方法相同的功能。
OracleConnection 接口公开了 closeAsyncOracle 方法,用于关闭异步连接。
closeAsyncOracle 方法返回 Flow.Publisher 类型。发布者不发出任何项目,如其类型所指示。发布者发出一个 onComplete 或 onError 信号,以指示连接是否成功关闭。
下面的示例如何异步关闭连接。
/**
* 异步关闭连接
* @param connection
* @return 当连接已关闭时发出终端信号的发布服务器
* @throws SQLException
*/
Flow.Publisher<Void> closeConnection(Connection connection)
throws SQLException {
return connection.unwrap(OracleConnection.class)
.closeAsyncOracle();
}
closeAsyncOracle 方法执行与 Connection.close 方法相同的功能。