原文作者:Clement Escoffier
原文地址:https://dzone.com/articles/accessing-data-the-reactive-way
这是我的“ Eclipse Vert.x简介 ”系列的第四篇文章。在本文中,我们将看到如何使通过vertx-jdbc-client提供的异步API在Eclipse Vert.x应用程序中使用JDBC。在深入JDBC和SQL等细微之处之前,我们先谈谈Vert.x Futures
。
我们首先回顾一下以前的文章:
在这第四篇文章中,我们将解决我们应用程序的主要缺陷:内存后端。当前的应用程序使用内存来存储成果(协议)。这非常有用,因为我们每次重新启动应用程序时都会丢失内容。让我们来使用一个数据库。在这篇文章中,我们将使用PostgreSQL,您也可以使用任何提供JDBC驱动程序的数据库。举个例子,我们的测试将使用HSQL。与数据库的交互是异步的,并使用vertx-jdbc-clientFuture
。但在深入研究这些JDBC和SQL细节之前,让我们介绍一下Vert.x 类,并解释它如何使异步协调变得更加简单。
这篇文章中使用到的原代码段可以在GitHub仓库的post-4
目录中找到。
Eclipse Vert.x特性之一是它的异步性和非阻塞性。当使用异步API时,您无需等待结果,但当此结果准备就绪,操作已完成时,您会收到通知。为了说明这一点,我们举一个非常简单的例子。
public void retrieve(Handler<String> resultHandler) {
fileSystem.read(fileName, res -> {
resultHandler.handle(res);
});
}
为了避免误解,异步API不是线程。正如我们在示例retrieve
中所看到的那样,不涉及任何线程,大多数Vert.x应用程序在异步和非阻塞的情况下使用的线程数很少。此外,重要的是要注意该方法是非阻塞的。该retrieve
方法可能会在resultHandler
被调用前返回。
public void retrieve(
Handler<AsyncResult<String>> resultHandler) {
vertx.fileSystem().readFile("fileName", ar -> {
if (ar.failed()) {
resultHandler.handle(
Future.succeededFuture(ar.result().toString()));
} else {
resultHandler.handle(
Future.failedFuture(ar.cause()));
}
});
}
retrieve(ar -> {
if (ar.failed()) {
// 处理失败情况,期望
// 恢复使用 ar.cause()
Throwable cause = ar.cause();
// ...
} else {
// 成功,结果在 ar.result() 中
String content = ar.result();
// ...
}
});
因此,总而言之,异步方法是一种将其结果或失败情况作为通知转发的方法,通常会调用期待结果的回调函数。
一旦你有了一套异步方法,你通常要编排它们:
对于第一种情况,我们会这样做:
retrieve(ar -> {
if (ar.failed()) {
// 使回复
} else {
String r = ar.result();
// 调用另一个异步方法
anotherAsyncMethod(r, ar2 -> {
if (ar2.failed()) {
//...
} else {
// ...
}
})
}
});
你可以很快发现问题......事情开始变得混乱。嵌套回调降低了代码的可读性,而且这里只有两个嵌套。想象一下,处理比这更复杂的情况,我们将会遇到这个问题。
对于第二种处理方式,你也可以想象其中的困难。在每个结果处理程序中,你需要检查其它活动是否已完成或失败,然后做出相应的反应,这导致了令人费解的代码。
为了降低代码的复杂程度,Vert.x提出了一个名为Future
的类。 一个Future
类是一个封装了可能会发生,或者可能不会发生,或者已经发生了的动作的对象。与普通的Java Future不同,Vert.x Futrue
是非阻塞的,并且当Future
完成或失败时一个Handler
处理将被调用。这种Future
类被应用在AsyncResult
上当它表现为异步计算结果。
有关Java Future的说明:普通JavaFuture
是阻塞的。调用get
会阻塞调用者线程,直到收到结果(或超时)。如果结果未收到,Vert.x Future
也有一个get
来返回null
值。他们还希望有一个附加的处理程序当收到结果时。
使用Future.future()
工厂方法创建一个Future
对象:
Future<Integer> future = Future.future();
future.complete(1); // Completes the Future with a result
future.fail(exception); // Fails the Future
// To be notified when the future has been completed
// or failed
future.setHandler(ar -> {
// Handler called with the result or the failure,
// ar is an AsyncResult
});
让我们重新审视我们的retrieve
方法。我们可以不用回调作为参数,而是返回一个Future
对象:
public Future<String> retrieve() {
Future<String> future = Future.future();
vertx.fileSystem().readFile("fileName", ar -> {
if (ar.failed()) {
future.failed(ar.cause());
} else {
future.complete(ar.result().toString());
}
});
return future;
}
public Future<String> retrieve() {
Future<String> future = Future.future();
vertx.fileSystem().readFile("fileName",
ar -> future.handle(ar.map(Buffer::toString)));
return future;
}
我们将在之后的讲解攘括此API。但首先,让我们看看调用者,并没有太大变化。处理程序附加在返回的Future
。
retrieve().setHandler(ar -> {
if (ar.failed()) {
// 处理失败,期望
// 恢复使用 ar.cause()
Throwable cause = ar.cause();
// ...
} else {
// 处理成功,结果在 ar.result() 中
int r = ar.result();
// ...
}
});
当你需要编写异步操作时这将会变得容易些。使用compose
方法处理顺序组合:
retrieve()
.compose(this::anotherAsyncMethod)
.setHandler(ar -> {
// ar.result is the final result
// if any stage fails, ar.cause is
// the thrown exception
});
Future.compose
将之前的Future
结果作为函数参数,并返回另一个Future
。这样你可以链接许多异步操作。
那么并发组合呢。假设你想调用2个不相关的操作,并在两个操作都完成时收到通知:
Future<String> future1 = retrieve();
Future<Integer> future2 = anotherAsyncMethod();
CompositeFuture.all(future1, future2)
.setHandler(ar -> {
// called when either all future have completed
// successfully (success),
// or one failed (failure)
});
使用Future
和CompositeFuture
使代码更具有可读性和可维护性。Vert.x也支持RX Java来管理异步合成,这将在另一篇文章中介绍。
所以,现在我们已经看到了关于异步API和Future
的一些基础知识,让我们来看看vertx-jdbc-client
。这个Vert.x模块允许我们通过JDBC驱动程序与数据库进行交互。这些交互是异步的,所以当你像下面这么做时:
String sql = "SELECT * FROM Products";
ResultSet rs = stmt.executeQuery(sql);
When you use the vertx-jdbc-client, it becomes:
connection.query("SELECT * FROM Products", result -> {
// do something with the result
});
该模型避免了等待结果。从数据库中检索到结果时会通知您。
关于JDBC的注意事项:默认情况下,JDBC是一个阻塞API。为了与数据库交互,Vert.x委托给一个工作者线程。虽然它是异步的,但并不完全是非阻塞的。但是,Vert.x生态系统还为MySQL和PostgreSQL提供真正的非阻塞客户端。
现在让我们修改我们的应用程序,使用数据库来存储我们的产品(文章)。
我们需要做的第一件事是在我们的pom.xml
文件中声明两个新的Maven依赖关系:
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-jdbc-client</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.4.1212</version>
</dependency>
第一个依赖项提供vetrx-jdbc-client
,而第二个依赖项提供PostgreSQL JDBC驱动程序。如果您想使用其他数据库,请更改此依赖关系。您还需要更改代码中的JDBC URL和JDBC驱动程序类名称。
现在我们已经添加了这些依赖关系,是时候创建我们的JDBC客户端了。但它需要进行配置。按照以下内容编辑src/main/conf/my-application-conf.json
:
{
"HTTP_PORT": 8082,
"url": "jdbc:postgresql://localhost:5432/my_read_list",
"driver_class": "org.postgresql.Driver",
"user": "user",
"password": "password"
}
现在配置完成后,我们需要创建一个JDBC客户端实例。在MyFirstVerticle
类中,声明一个新字段JDBCClient jdbc
,并更新start
方法的结尾以变为:
ConfigRetriever retriever = ConfigRetriever.create(vertx);
retriever.getConfig(
config -> {
if (config.failed()) {
fut.fail(config.cause());
} else {
// 创建JDBC客户端
jdbc = JDBCClient.createShared(vertx, config.result(),
"My-Reading-List");
vertx
.createHttpServer()
.requestHandler(router::accept)
.listen(
// 恢复端口配置,
// 默认为 8080.
config.result().getInteger("HTTP_PORT", 8080),
result -> {
if (result.succeeded()) {
fut.complete();
} else {
fut.fail(result.cause());
}
});
}
}
);
好的,我们的客户端已经配置了,我们需要连接到数据库。这是通过使用jdbc.getConnection
方法实现的,它将结果(连接)提供给Handler<AsyncResult>
。当与数据库的连接被建立,或者在处理过程中发生错误时,会通知此处理程序。尽管我们可以直接使用该方法,但是我们将会连接的检索提取到单独的方法并返回Future
:
private Future<SQLConnection> connect() {
Future<SQLConnection> future = Future.future(); // 1
jdbc.getConnection(ar -> // 2
future.handle(ar.map(connection -> // 3
connection.setOptions(
new SQLOptions().setAutoGeneratedKeys(true)) // 4
)
)
);
return future; // 5
}
让我们更深入地看一下这种方法。首先我们创建一个我们在方法(5)结尾处返回的Future
对象(1)。它的完成或失败,取决于我们是否成功检索到数据库的连接。这在(2)中完成。我们传递的函数getCoonction
收到一个AsyncResult
。Future
有一个方法(handle
)直接完成或失败在AsyncResult
的基础上。等同于:
if (ar.failed()) {
future.failed(ar.cause());
} else {
future.complete(ar.result());
}
只是...更简洁。
现在我们有一个JDBC客户端,并且有一种方法可以检索到数据库的连接,那么是时候嵌入协议了。但是因为我们使用关系数据库,我们首先需要创建表,创建以下方法:
private Future<SQLConnection> createTableIfNeeded(SQLConnection connection) {
Future<SQLConnection> future = Future.future();
vertx.fileSystem().readFile("tables.sql", ar -> {
if (ar.failed()) {
future.fail(ar.cause());
} else {
connection.execute(ar.result().toString(),
ar2 -> future.handle(ar2.map(connection))
);
}
});
return future;
}
所以,我们需要tables.sql
这个文件。使用以下内容创建src/main/resources/tables.sql
文件:
CREATE TABLE IF NOT EXISTS Articles (id SERIAL PRIMARY KEY,
title VARCHAR(200) NOT NULL,
url VARCHAR(200) NOT NULL)
好的,现在我们连接到数据库和表。让我们插入协议,但仅限于数据库为空的情况。为此,创建createSomeDataIfNone
和insert
方法:
private Future<SQLConnection> createSomeDataIfNone(SQLConnection connection) {
Future<SQLConnection> future = Future.future();
connection.query("SELECT * FROM Articles", select -> {
if (select.failed()) {
future.fail(select.cause());
} else {
if (select.result().getResults().isEmpty()) {
Article article1 = new Article("Fallacies of distributed computing",
"https://en.wikipedia.org/wiki/Fallacies_of_distributed_computing");
Article article2 = new Article("Reactive Manifesto",
"https://www.reactivemanifesto.org/");
Future
<Article> insertion1 = insert(connection, article1, false);
Future
<Article> insertion2 = insert(connection, article2, false);
CompositeFuture.all(insertion1, insertion2)
.setHandler(r -> future.handle(r.map(connection)));
} else {
// Boring... nothing to do.
future.complete(connection);
}
}
});
return future;
}
private Future
<Article> insert(SQLConnection connection, Article article,
boolean closeConnection) {
Future
<Article> future = Future.future();
String sql = "INSERT INTO Articles (title, url) VALUES (?, ?)";
connection.updateWithParams(sql,
new JsonArray().add(article.getTitle()).add(article.getUrl()),
ar -> {
if (closeConnection) {
connection.close();
}
future.handle(
ar.map(res -> new Article(res.getKeys().getLong(0),
article.getTitle(), article.getUrl()))
);
}
);
return future;
}
现在是时候组装这些作品,看看它是如何工作的。start
方法需要更新以执行以下操作:
fut
的过程的成功或失败。哇......好多的操作。幸运的是,我们已经以我们可以使用Future
合成的方式实现了几乎所有必需的方法。在start
方法中,将代码的末尾替换为:
// Start sequence:
// 1 - Retrieve the configuration
// |- 2 - Create the JDBC client
// |- 3 - Connect to the database (retrieve a connection)
// |- 4 - Create table if needed
// |- 5 - Add some data if needed
// |- 6 - Close connection when done
// |- 7 - Start HTTP server
// |- 8 - we are done!
ConfigRetriever.getConfigAsFuture(retriever)
.compose(config -> {
jdbc = JDBCClient.createShared(vertx, config,
"My-Reading-List");
return connect()
.compose(connection -> {
Future<Void> future = Future.future();
createTableIfNeeded(connection)
.compose(this::createSomeDataIfNone)
.setHandler(x -> {
connection.close();
future.handle(x.mapEmpty());
});
return future;
})
.compose(v -> createHttpServer(config, router));
}).setHandler(fut);
无需困惑于这个方法。我们很快会介绍它。这段代码首先检索配置并创建JDBCClient
。然后,我们检索数据库连接并初始化我们的数据库。请注意,连接在所有情况下都是关闭的(甚至是失败)。当数据库建立后,我们启动HTTP服务器。最后,当一切完成后,我们将结果(成功或失败)报告给ful
来告知Vert.x我们是否准备好工作。
关闭连接的注意事项:完成后不要忘记关闭SQL连接。连接将返回到连接池并被回收。
该createHTTPServer
方法非常简单,遵循以下相同的模式:
private Future<Void> createHttpServer(JsonObject config, Router router) {
Future<Void> future = Future.future();
vertx
.createHttpServer()
.requestHandler(router::accept)
.listen(
config.getInteger("HTTP_PORT", 8080),
res -> future.handle(res.mapEmpty())
);
return future;
}
注意mapEmpty
方法返回一个Future
,因为我们不关心HTTP服务器。使用mapEmpty
方法,创建一个AsyncResult
,丢弃封装的结果。mapEmptyFutureAsyncResultAsyncResultmapEmpty
所以,在这一点上,我们已经设置了一切,但我们的API仍然依赖于我们的内存后端。现在是在JDBC之上重新实现REST API的时候了。但首先,我们需要关注一些与数据库交互的实用方法。这些方法已被删减删减以容易理解。
首先,我们添加query
方法:
private Future<List
<Article>> query(SQLConnection connection) {
Future<List
<Article>> future = Future.future();
connection.query("SELECT * FROM articles", result -> {
connection.close();
future.handle(
result.map(rs ->
rs.getRows().stream()
.map(Article::new)
.collect(Collectors.toList()))
);
}
);
return future;
}
该方法再次使用相同的模式:它创建一个Future
对象并将其返回。当底层操作完成或失败时,future将会完成或失败。这里的操作是一个数据库查询。该方法执行查询,并在成功后为每一行创建一个新的 Aticle
。另外,请注意连接无论查询是成功还是失败,我们都会关闭连接。释放连接非常重要,以便可以回收。
同样,我们来实现queryOne
:
private Future
<Article> queryOne(SQLConnection connection,
String id) {
Future
<Article> future = Future.future();
String sql = "SELECT * FROM articles WHERE id = ?";
connection.queryWithParams(sql,
new JsonArray().add(Integer.valueOf(id)),
result -> {
connection.close();
future.handle(
result.map(rs -> {
List<JsonObject> rows = rs.getRows();
if (rows.size() == 0) {
throw new NoSuchElementException(
"No article with id " + id);
} else {
JsonObject row = rows.get(0);
return new Article(row);
}
})
);
});
return future;
}
我们已经完成了查询,我们需要更新和删除的方法:
private Future<Void> update(SQLConnection connection,
String id, Article article) {
Future<Void> future = Future.future();
String sql = "UPDATE articles SET title = ?, url = ? WHERE id = ?";
connection.updateWithParams(sql,
new JsonArray().add(article.getTitle())
.add(article.getUrl())
.add(Integer.valueOf(id)
),
ar -> {
connection.close();
if (ar.failed()) {
future.fail(ar.cause());
} else {
UpdateResult ur = ar.result();
if (ur.getUpdated() == 0) {
future.fail(new NoSuchElementException(
"No article with id " + id));
} else {
future.complete();
}
}
});
return future;
}
private Future<Void> delete(SQLConnection connection,
String id) {
Future<a> future = Future.future();
String sql = "DELETE FROM Articles WHERE id = ?";
connection.updateWithParams(sql,
new JsonArray().add(Integer.valueOf(id)),
ar -> {
connection.close();
if (ar.failed()) {
future.fail(ar.cause());
} else {
if (ar.result().getUpdated() == 0) {
future.fail(
new NoSuchElementException(
"No article with id " + id));
} else {
future.complete();
}
}
});
return future;
}
他们非常相似,并遵循相同的模式(再次!)。
这很好,但它并没有实现我们的REST API。所以,现在我们来关注一下。为了改变我们的想法,下面是我们需要更新的方法:
getAll
返回所有协议。addOne
插入一项新的协议。协议细节在请求主体中给出。deleteOne
删除特定协议。该id是作为路径参数给出的。getOne
提供了特定协议的JSON。该id是作为路径参数给出的。updateOne
更新特定的协议。该id是作为路径参数给出的。新的细节在请求主体中。因为我们已经在他们各自的方法中提取了数据库交互,所以实现这个方法很简单。例如,getAll
是:
private void getAll(RoutingContext rc) {
connect()
.compose(this::query)
.setHandler(ok(rc));
}
按照相同的模式,其他方法的实施如下:
private void addOne(RoutingContext rc) {
Article article = rc.getBodyAsJson().mapTo(Article.class);
connect()
.compose(connection -> insert(connection, article, true))
.setHandler(created(rc));
}
private void deleteOne(RoutingContext rc) {
String id = rc.pathParam("id");
connect()
.compose(connection -> delete(connection, id))
.setHandler(noContent(rc));
}
private void getOne(RoutingContext rc) {
String id = rc.pathParam("id");
connect()
.compose(connection -> queryOne(connection, id))
.setHandler(ok(rc));
}
private void updateOne(RoutingContext rc) {
String id = rc.request().getParam("id");
Article article = rc.getBodyAsJson().mapTo(Article.class);
connect()
.compose(connection -> update(connection, id, article))
.setHandler(noContent(rc));
}
如果我们现在运行应用程序测试,它会失败。首先,我们需要更新配置以传递JDBC URL和相关详细信息。但是等等...我们还需要一个数据库。我们不一定要在我们的单元测试中使用PostgreSQL。让我们使用内存数据库HSQL。为此,我们首先需要在pom.xml
中添加以下依赖项:
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>
但是,等等,如果您已经使用JDBC或数据库,您知道每个数据库使用不同的描述语言(不同标准)。在这里,我们不能使用相同的表创建语句,因为HSQL不理解PostgreSQL描述语言。因此用以下内容创建src/test/resources/tables.sql
:
CREATE TABLE IF NOT EXISTS Articles (id INTEGER IDENTITY,
title VARCHAR(200),
url VARCHAR(200))
这是HSQL中的等效语句。它将如何工作?当Vert.x读取文件时,它也会检查类路径(并且src/test/resources
包含在测试类路径中)。运行测试时,该文件将取代我们创建的初始文件。
DeploymentOptions options = new DeploymentOptions()
.setConfig(new JsonObject()
.put("HTTP_PORT", port)
.put("url", "jdbc:hsqldb:mem:test?shutdown=true")
.put("driver_class", "org.hsqldb.jdbcDriver")
);
除了HTTP_PORT
之外,我们还将使用JDBC URL和JDBC驱动程序的类。
现在,你应该可以用以下方式运行测试:mvn clean test
。
这次我们要使用PostgreSQL实例。我将使用Docker,但您可以使用您最喜欢的方法。通过Docker,我这样开始我的实例:
docker run --name some-postgres -e POSTGRES_USER=user \
-e POSTGRES_PASSWORD=password \
-e POSTGRES_DB=my_read_list \
-p 5432:5432 -d postgres
现在运行我们的应用程序:
mvn compile vertx:run
打开你的浏览器到(http://localhost:8082/assets/index.html),你应该看到应用程序使用数据库。这次成果存储在文件系统中保存的数据库中。所以,如果我们停止并重新启动应用程序,数据将被恢复。
如果你想打包应用程序,运行mvn clean package
。然后运行应用程序:
java -jar target/my-first-app-1.0-SNAPSHOT.jar \
-conf src/main/conf/my-application-conf.json
本系列的第四篇文章涵盖了两个主题。首先,我们引入了异步组合,以及Future
如何帮助管理顺序和并发组合。通过Future
,你在你的实现中遵循一个通用模式,一旦你掌握它,这是非常简单的。其次,我们已经看到JDBC如何被用来实现我们的API。因为我们使用Future
,使异步JDBC很简单。
你可能会对异步开发模型感到惊讶,但一旦开始使用它,就很难再回头了。异步和事件驱动的体系结构代表我们周围的世界如何工作。拥抱它们将给你强大的力量。
在下一篇文章中,我们将看到如何使用RX Java 2而不是Future。不要忘记,这个代码在这个Github仓库中可用。
请继续关注,快乐编程!