当Vert.x符合Reactive eXtensions(Vert.x简介的第5部分)

这篇文章是我介绍Eclipse Vert.x系列的第五篇文章。在上一篇文章中,我们看到了Vert.x如何与数据库交互。我们使用Future对象来驯服Vert.x的异步特性。在这篇文章中,我们将看到另一种管理异步代码的方式:反应式编程。我们将看到Vert.x如何与Reactive eXtensions结合来为您提供巨大的能量。

让我们先用以前的帖子刷新我们的记忆:

  • 第一篇文章描述了如何使用Apache Maven构建Vert.x应用程序并执行单元测试。
  • 第二篇文章描述了这个应用程序如何变得可配置。
  • 第三篇文章介绍了vertx-web,并开发了一个集合管理应用程序。此应用程序公开了HTML / JavaScript前端可调用的REST API。
  • 在第四篇文章中,我们将内存后端替换为数据库,并引入Future编排我们的异步操作。

在这篇文章中,我们不会添加新功能。相反,我们将探索另一种编程模式:反应式编程。

这篇文章的代码可以在GitHub仓库post-5目录中找到。

反应式思考

请忘记你对代码的所有认知并抬头看看。用代码来建模这个世界是极具挑战的。作为开发人员,我们倾向于使用反直觉方法。自20世纪80年代以来,面向对象计算被视为高招。来自我们世界的每个实体都由一个包含字段和曝光法的对象来表示。大多数情况下,我们使用阻塞和同步协议完成与这些对象的交互。调用一个方法并等待响应。但是......我们生活的世界是异步的。交互是通过使用事件,消息和刺激来完成的。为了克服面向对象的局限性,出现了许多模式和范例。最近,函数式编程正在卷土重来,但它不是为了取代面向对象编程,而是为了补充它。反应式编程 是一种功能性的事件驱动的编程方法,与常规的面向对象的范例结合使用。

几年前,微软创建了一个名为Reactive eXtensions(也称为ReactiveX或RX)的.NET反应式编程框架。RX是一个用可观察流进行异步编程的API 。该API已被移植到多种语言,如JavaScript,Python,C ++和Java。

让我们停下来静静地观察我们的世界。观察运动中的实体:交通拥堵,天气,谈话和金融市场。事情在并发演化着。多件事情同时发生,有时是独立的,有时是以精心安排的方式发生。每个对象创建活动。例如,您的鼠标光标位置正在移动。位置序列是一个流。房间里的人数可能是固定的,但有人会进进出出,产生新的价值。所以我们有另一个价值流​​。反应式编程的背后有一个基本的原则:事件即是数据,数据即是事件

关于RX和异步编程的重要理解是流的异步特性。您会观察到一个流,并在流发出某个项目时通知您。你不知道什么时候会发生,但你正在观察。这个观察是通过一个操作完成的。subscribe

RxJava是Java编程语言RX的直接实现。它是用于Java中的反应式编程的非常流行的库,具有联网数据处理应用程序和JavaFX和Android的图形用户界面。RxJava是Java中反应式库的通用语言,它提供以下五种类型来描述发布者:

流中的项目数

RxJava 2种类型

RX签名

回调签名

未来的签名

通知,数据流

0..N

可观察,可流动

Observable stream()可流式流()

ReadStream方法()

N / A

异步操作产生结果

1

Single get()

void get(Handler <AsyncResult> handler)

未来获得()

异步操作不产生或一个结果

0..1

也许

也许findById(String id)

void get(String id,Handler <AsyncResult> handler)

未来获得(字符串id)

异步操作不产生任何结果

0

Completable

可完全冲洗()

void flush(Handler <AsyncResult> handler)

未来flush()

之间的差和是处理背压(实施反应性流协议),而没有。更适合来自支持背压源(例如,TCP连接)的大量数据流,而更适合处理无法应用背压的“热”可观测数据(例如,GUI事件)。ObservableFlowableFlowableObservableFlowableObservable

这篇文章不是反应式编程或RX的介绍。如果您需要关于反应式编程和RX的介绍级课程,请查看本教程

在之前的文章中,我们曾经撰写过异步操作。在这篇文章中,我们将使用流和RxJava。怎么样?感谢Vert.x和RxJava 2 API。事实上,Vert.x提供了一组接收 API。但是,不要忘记:Future

  • 您可以在不使用Vert.x的情况下使用RxJava。
  • 您可以在不使用RxJava的情况下使用Vert.x。

将它们结合起来可以为您提供超级用户,因为它利用RxJava流和运算符的强大功能将异步执行模型从Vert.x扩展到了Vert.x。

足够说话,给我看一些代码

它始终始于Maven依赖项。在你的文件中添加这个:pom.xml

<dependency>
   <groupId>io.vertx</groupId>
   <artifactId>vertx-rx-java2</artifactId>
   <version>${vertx.version}</version>
</dependency>

然后,打开这个类并用这个替换import语句:io.vertx.intro.first.MyFirstVerticle

import io.reactivex.Completable;
import io.reactivex.Single;
import io.vertx.core.Future;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.sql.SQLOptions;
import io.vertx.reactivex.CompletableHelper;
import io.vertx.reactivex.config.ConfigRetriever;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.http.HttpServerResponse;
import io.vertx.reactivex.ext.jdbc.JDBCClient;
import io.vertx.reactivex.ext.sql.SQLConnection;
import io.vertx.reactivex.ext.web.Router;
import io.vertx.reactivex.ext.web.RoutingContext;
import io.vertx.reactivex.ext.web.handler.BodyHandler;
import io.vertx.reactivex.ext.web.handler.StaticHandler;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;

注意包装。这是Vert.x RX API的实现。因此,我们现在正在扩展,而不是延长。注入的实例提出了以前缀开头的新方法,如或。以前缀为前缀的方法返回RxJava 2类型,如or 。io.vertx.reactivexio.vertx.core.AbstractVerticleio.vertx.reactivex.core.AbstractVerticlevertxrxrxDeployVerticlerxCloserxSingleCompletable

从回归未来到回归单一完成

为了受益于RX API并能够使用RX运营商,我们需要使用RX类型。例如,以前我们有这样的:

private Future createHttpServer(JsonObject config, 
  Router router) {
  Future future = Future.future();
  vertx
    .createHttpServer()
    .requestHandler(router::accept)
    .listen(
      config.getInteger("HTTP_PORT", 8080),
      res -> future.handle(res.mapEmpty())
    );
  return future;
}

Future被映射到RX中,也就是只是表示其完成的流。因此,对于RX,此代码变为以下内容:Completable

private Completable createHttpServer(JsonObject config,
  Router router) {
  return vertx
    .createHttpServer()
    .requestHandler(router::accept)
    .rxListen(config.getInteger("HTTP_PORT", 8080))
    .toCompletable();
}

你发现差异吗?我们使用返回a 的方法。因为我们不需要服务器,所以我们使用该方法将其转换为一个。这是可用的,因为我们使用了rx-ified实例。rxListenSingleCompletabletoCompletablerxListenvertx

现在我们来重写这个方法。正在返回一个。这被翻译成:connectconnectFutureSingle

private Single connect() {
  return jdbc.rxGetConnection()
    .map(c -> c.setOptions(
       new SQLOptions().setAutoGeneratedKeys(true)));
}

该客户端还提供了一个API。返回一个。要启用密钥生成,我们使用该方法。从观察结果中获取结果并使用映射函数对其进行转换。这里我们只是调整选项。jdbcrxrxGetConnectionSinglemapmapSingle

遵循相同的原则,该方法改写如下:insert

private Single<Article> insert(SQLConnection connection, 
 Article article, boolean closeConnection) {
  String sql = "INSERT INTO Articles (title, url) VALUES (?, ?)";
  return connection
    .rxUpdateWithParams(sql,
      new JsonArray().add(article.getTitle()).add(article.getUrl()))
    .map(res -&gt; new Article(res.getKeys().getLong(0),
      article.getTitle(), article.getUrl()))
    .doFinally(() -&gt; {
      if (closeConnection) {
        connection.close();
      }
    });
}

在这里,我们使用执行语句。结果转化为一个。注意。当操作完成或失败时调用此方法。在这两种情况下,如果要求,我们关闭连接。INSERTrxUpdateWithParamsArticledoFinally

同样的方法适用于使用该方法的方法:queryrxQuery

private Single<Article> query(SQLConnection connection) {
  return connection.rxQuery("SELECT * FROM articles")
    .map(rs -&gt; rs.getRows().stream()
      .map(Article::new)
      .collect(Collectors.toList())
    )
    .doFinally(connection::close);
}

queryOne如果搜索到的文章不存在,则需要引发错误:

private Single<Article> queryOne(SQLConnection connection, String id) {
  String sql = "SELECT * FROM articles WHERE id = ?";
  return connection.rxQueryWithParams(sql,
    new JsonArray().add(Integer.valueOf(id))
    )
    .doFinally(connection::close)
    .map(rs -&gt; {
      List rows = rs.getRows();
      if (rows.size() == 0) {
          throw new NoSuchElementException(
            "No article with id " + id);
      } else {
          JsonObject row = rows.get(0);
          return new Article(row);
        }
    });
}

映射器函数抛出的异常被传播到流中。所以观察者可以对它做出反应并恢复。

转换类型

我们已经看到上面的方法丢弃了结果并仅通知用户成功完成或操作失败。在和方法中,我们需要做几乎相同的事情。我们执行SQL语句,如果我们发现这些语句没有更改行,我们会报告错误。为了实现这一点,我们正在使用。这种方法是家庭的一部分,是一个非常强大的接收运营商。该方法将参数作为函数。为观察流发出的每个项目调用此函数。如果流是a ,那么它将被称为零(错误情况)或一个(操作成功并带有结果)次。与运营商不同,toCompletableSingleupdatedeleteflatMapCompletableflatMapSinglemapflatMap函数返回一个流。例如,在我们的上下文中,函数被调用并返回a :flatMapCompletableUpdateResultCompletable

private Completable update(SQLConnection connection, String id,
  Article article) {
  String sql = "UPDATE articles SET title = ?,
    url = ? WHERE id = ?";
  JsonArray params = new JsonArray().add(article.getTitle())
    .add(article.getUrl())
    .add(Integer.valueOf(id));
  return connection.rxUpdateWithParams(sql, params)
    .flatMapCompletable(ur ->
      ur.getUpdated() == 0 ?
        Completable
            .error(new NoSuchElementException(
                "No article with id " + id))
        : Completable.complete()
    )
    .doFinally(connection::close);
}
private Completable delete(SQLConnection connection, String id) {
  String sql = "DELETE FROM Articles WHERE id = ?";
  JsonArray params = new JsonArray().add(Integer.valueOf(id));
  return connection.rxUpdateWithParams(sql, params)
    .doFinally(connection::close)
    .flatMapCompletable(ur ->
        ur.getUpdated() == 0 ?
          Completable
              .error(new NoSuchElementException(
                  "No article with id " + id))
          : Completable.complete()
    );
}

在这两种情况下,我们检查更新行的数量,如果为0,则产生失败。所以用户收到成功()或错误()。请注意,此代码也可以使用以前的方法:使用操作符,抛出异常并使用结果放弃。CompletableCompletable.completeCompletable.errormaptoCompletable

显然,我们也可以将a 转换为:CompletableSingle

private Single createTableIfNeeded(
  SQLConnection connection) {
    return vertx.fileSystem().rxReadFile("tables.sql")
        .map(Buffer::toString)
        .flatMapCompletable(connection::rxExecute)
        .toSingleDefault(connection);
}

rxExecute返回一个。但在这里我们需要转发。幸运的是,运营商将其转换为发射给定值。CompletableSQLConnectiontoSingleDefaultCompletableSingle

撰写异步操作

到目前为止,我们正在使用方法并调整结果。但是我们如何处理顺序组合呢?执行第一个操作,然后执行第二个操作并返回第一个操作的结果?这可以使用操作员完成。如上所述,是一个非常强大的运营商。它接受一个函数作为参数,不同的是运营商,这个函数返回一个流(所以,,...)。从观察到的流中为每个项目调用此函数,并将返回的流展平,以便项目序列化为单个流。由于流是异步构造,调用会创建一个顺序组合。我们来看看这个方法。最初的实施如下:rxflatMapflatMapmapSingleMaybeCompletableflatMapcreateSomeDataIfNone

private Future createSomeDataIfNone(
  SQLConnection connection) {
  Future future = Future.future();
  connection.query("SELECT * FROM Articles", select -&gt; {
    if (select.failed()) {
      future.fail(select.cause());
    } else {
      if (select.result().getResults().isEmpty()) {
        Article article1 = new Article("Fallacies of distributed computing",            "<a class="vglnk" href="https://en.wikipedia.org/wiki/Fallacies_of_distributed_computing" rel="nofollow"><span>https</span><span>://</span><span>en</span><span>.</span><span>wikipedia</span><span>.</span><span>org</span><span>/</span><span>wiki</span><span>/</span><span>Fallacies</span><span>_</span><span>of</span><span>_</span><span>distributed</span><span>_</span><span>computing</span></a>");
        Article article2 = new Article("Reactive Manifesto",
            "<a class="vglnk" href="https://www.reactivemanifesto.org/" rel="nofollow"><span>https</span><span>://</span><span>www</span><span>.</span><span>reactivemanifesto</span><span>.</span><span>org</span><span>/</span></a>");
        Future<Article> insertion1 = insert(connection, article1, false);
        Future<Article> insertion2 = insert(connection, article2, false);
        CompositeFuture.all(insertion1, insertion2)
            .setHandler(r -&gt; future.handle(r.map(connection)));
      } else {
        future.complete(connection);
      }
    }
  });
  return future;
}

在这个方法中,我们执行查询并根据结果插入文章。RX的实现如下:

private Single createSomeDataIfNone(
  SQLConnection c) {
  return c.rxQuery("SELECT * FROM Articles")
    .flatMap(rs -> {
      if (rs.getResults().isEmpty()) {
        Article article1 = new Article("Fallacies of distributed computing",
            "<a class="vglnk" href="https://en.wikipedia.org/wiki/Fallacies_of_distributed_computing" rel="nofollow"><span>https</span><span>://</span><span>en</span><span>.</span><span>wikipedia</span><span>.</span><span>org</span><span>/</span><span>wiki</span><span>/</span><span>Fallacies</span><span>_</span><span>of</span><span>_</span><span>distributed</span><span>_</span><span>computing</span></a>");
        Article article2 = new Article("Reactive Manifesto",
            "<a class="vglnk" href="https://www.reactivemanifesto.org/" rel="nofollow"><span>https</span><span>://</span><span>www</span><span>.</span><span>reactivemanifesto</span><span>.</span><span>org</span><span>/</span></a>");
        return Single.zip(
            insert(connection, article1, false),
            insert(connection, article2, false),
            (a1, a2) -> c
        );
      } else {
          return Single.just(c);
        }
    });
}

首先,我们执行查询。然后,当我们得到结果时,调用传递给该方法的函数,实现顺序组合。您可能想知道错误情况。我们不需要处理它,因为错误会传播到流中,并且最终的观察者会收到它。发生错误时不会调用该函数。flatMap

异步操作可以同时发生。但有时你需要知道他们什么时候完成。这被称为并行组合。该运营商可以让你做到这一点。在,我们插入两篇文章。该操作使用(返回a )完成。该操作者观察的两个给定的事件,并呼吁作为最后一个参数传递时都已经完成了方法。在这种情况下,我们只是转发。zipcreateSomeDataIfNoneinsertSinglezipSingleSQLConnection

合成一切准备就绪

我们已经改写了我们大部分的功能,但我们需要调整方法。记住我们需要实现的开始顺序:start

//开始序列:// 1 - 检索配置// | - 2 - 创建JDBC客户端// | - 3 - 连接到数据库(检索连接)// | - 4 - - 5 - 如果需要添加一些数据// | - 6 - 完成后关闭连接// | - 7 - 启动HTTP服务器// | - 9 - 我们完成了!

这个组合可以使用运算符来实现:flatMap

retriever.rxGetConfig()
  .doOnSuccess(config ->
    jdbc = JDBCClient.createShared(vertx, config, 
      "My-Reading-List"))
  .flatMap(config ->
    connect()
      .flatMap(connection ->
          this.createTableIfNeeded(connection)
              .flatMap(this::createSomeDataIfNone)
              .doAfterTerminate(connection::close)
      )
      .map(x -> config)
  )
  .flatMapCompletable(c -> createHttpServer(c, router))
  .subscribe(CompletableHelper.toObserver(fut));

这是一个动作操作符,它从观察到的流中接收项目并让您实现副作用。在这里我们分配字段。doOnSuccessjdbc

然后,我们只是使用操作员编排我们不同的操作。看看。这个操作符让我们在完整流被使用时关闭连接,这对于清理非常有用。flatMapdoAfterTerminate

这个代码中有一个重要的部分。到目前为止,我们返回了RX类型,但从未调用过。如果您不订阅,则不会发生任何事情:流是懒惰的。所以不要忘记订阅。订阅实现了管道并触发排放。在我们的代码中,它会触发启动序列。传递给方法的参数只是报告传递给方法的对象的失败和成功。基本上,它将a映射到a 。subscribesubscribeFuturestartFutureSubscriber

实施HTTP操作

我们差不多完成了。我们只需要更新我们的HTTP动作,即HTTP请求所调用的方法。为了简化代码,我们来修改这个类。这个类提供返回的方法。但是这种类型对于需要用户的RX API来说并不是很好。让我们用返回更适合类型的方法替换这些方法:ActionHelperHandler<AsyncResult>

private static  BiConsumer writeJsonResponse(
  RoutingContext context, int status) {
  return (res, err) -> {
    if (err != null) {
      if (err instanceof NoSuchElementException) {
        context.response().setStatusCode(404)
          .end(err.getMessage());
      } else {
        context.fail(err);
      }
    } else {
      context.response().setStatusCode(status)
        .putHeader("content-type", 
            "application/json; charset=utf-8")
        .end(Json.encodePrettily(res));
    }
  };
}
static  BiConsumer; ok(RoutingContext rc) {
  return writeJsonResponse(rc, 200);
}
static  BiConsumer created(RoutingContext rc) {
  return writeJsonResponse(rc, 201);
}
static Action noContent(RoutingContext rc) {
  return () -> rc.response().setStatusCode(204).end();
}
static Consumer onError(RoutingContext rc) {
  return err -> {
      if (err instanceof NoSuchElementException) {
          rc.response().setStatusCode(404)
           .end(err.getMessage());
      } else {
          rc.fail(err);
      }
  };
}

现在我们准备实施我们的HTTP操作方法。回到课堂:用这个替换动作方法:MyFirstVerticle

private void getAll(RoutingContext rc) {
  connect()
      .flatMap(this::query)
      .subscribe(ok(rc));
}
private void addOne(RoutingContext rc) {
  Article article = rc.getBodyAsJson()
    .mapTo(Article.class);
  connect()
      .flatMap(c -> insert(c, article, true))
      .subscribe(created(rc));
}
private void deleteOne(RoutingContext rc) {
  String id = rc.pathParam("id");
  connect()
      .flatMapCompletable(c -> delete(c, id))
      .subscribe(noContent(rc), onError(rc));
}
private void getOne(RoutingContext rc) {
  String id = rc.pathParam("id");
  connect()
      .flatMap(connection -> queryOne(connection, id))
      .subscribe(ok(rc));
}
private void updateOne(RoutingContext rc) {
  String id = rc.request().getParam("id");
  Article article = rc.getBodyAsJson()
    .mapTo(Article.class);
  connect()
      .flatMapCompletable(c -> update(c, id, article))
      .subscribe(noContent(rc), onError(rc));
}

正如你所看到的,这些方法是使用我们之前看到的操作符来实现的。它们包含写入HTTP响应的调用。就这么简单...subscribe

结论

我们完了!在这篇文章中,我们调整了我们的代码,使用反应式编程和RxJava 2. Vert.x和RxJava的组合将您的反应性带到了另一个层次。您可以非常轻松地编写和处理异步操作和流。

现在,不要忘记没有什么是免费的。RX可能很难理解。它可能看起来很奇怪。根据你的背景,你可能更喜欢和回调。Vert.x为您提供选择,并且您可以自由选择您喜欢的模型。Future

如果你想进一步,这里有一些资源:

本系列的下一篇文章将介绍在Kubernetes和OpenShift上部署我们的应用程序。

请继续关注,快乐的编码!

本文的版权归 evildickman 所有,如需转载请联系作者。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Golang语言社区

Go 语言实现 MapReduce 框架

MapReduce 是 Google 提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算。简而言之,就是将任务切分成很小的任务然后一个一个区的执行最后...

3576
来自专栏拂晓风起

关于0xFFFFFFFF和alpha,温故而知新

875
来自专栏一名合格java开发的自我修养

java使用Map做缓存你真的用对了吗?弱引用WeakHashMap了解一下

序:使用java的Map做缓存,你是否考虑过容量导致的OOM问题,是否考虑命中率对性能的影响??

1231
来自专栏Java技术栈

Java 虚拟机对锁优化所做的努力

作为一款公用平台,JDK 本身也为并发程序的性能绞尽脑汁,在 JDK 内部也想尽一切办法提供并发时的系统吞吐量。这里,我将向大家简单介绍几种 JDK 内部的 "...

572
来自专栏架构师之路

业界难题-“跨库分页”的四种方案

一、需求缘起 分页需求 互联网很多业务都有分页拉取数据的需求,例如: (1)微信消息过多时,拉取第N页消息 (2)京东下单过多时,拉取第N页订单 (3)浏览58...

4614
来自专栏Java架构师学习

分享我在阿里工作十年接触过Java框架设计模式一、前言二、责任链设计模式(Chain of Responsibility Pattern)三、工厂模式(Factory Pattern)四、单例设计模式

一、前言 说起来设计模式,大家应该都耳熟能详,设计模式代表了软件设计的最佳实践,是经过不断总结提炼出来的代码设计经验的分类总结,这些模式或者可以简化代码,或者可...

4668
来自专栏desperate633

深入理解Spring框架的作用(Spring in action 学习笔记)激发POJO的潜能依赖注入应用切面使用模板消除样板式代码

纵览Spring , 读者会发现Spring 可以做非常多的事情。 但归根结底, 支撑Spring的仅仅是少许的基本理念, 所有的理念都可以追溯到Spring最...

993
来自专栏Golang语言社区

Golang视角下的设计模式

这篇文章想聊聊Golang语言下的设计模式问题,我觉得这个话题还是比较有意思的。Golang没有像java那样对设计模式疯狂的迷恋,而是摆出了一份“看庭前花开花...

872
来自专栏小灰灰

时序数据库InfluxDB基本概念小结

InfluxDB作为时序数据库,与传统的关系型数据库相比而言,还是有一些区别的,下面尽量以简单明了的方式介绍下相关的术语概念

1504
来自专栏ImportSource

设计模式-搞个接口,留有余地,让你我不再尴尬

设计模式,Design Patterns,Pattern,翻译为“模式”总感觉不够接地气,用今天的话来说可以叫“套路”。设计模式就是写代码的过程中一些常规打法和...

33811

扫码关注云+社区