浅析Java响应式编程(Reactive Programming)

响应式编程是一种新的编程风格,它提供了一种基于事件驱动的方式来处理异步数据流的能力,其特点是允许开发人员构建事件驱动、可扩展性、弹性的响应式系统。

响应式编程遵循观察者设计模式,该模式可以定义为:当一个事物发生状态变化时,其他事物将被相应地通知和更新。 因此,观察者不需要轮询事件的变化,而是异步等待事件变化的通知,所以观察者收到通知后就可以处理该事件。 在这个过程中,观察者是发生事件变化时执行的函数,而数据流是可以被观察到的实际可观测数据,也就是被观察者或者称作主题。

几乎所有的语言和框架都在其生态系统中采用了这种响应式编程方法,Java也紧跟时代步伐, 在Java8中引入了响应式编程。现在响应式编程已经开始渗透到Java 8和Java EE 8版本的各个部分。 Java8中引入了诸如CompletionStage及其实现,CompletableFuture等概念,这些类和概念在诸如JAX-RS的Reactive Client API等规范中使用。

JAX-RS客户端API

接下来我们来看看如何在Java EE 8应用程序中使用响应式编程。 在开始本例之前,您需要熟悉基本的Java EE API。 JAX-RS 2.1引入了创建REST客户端的新方法,支持响应式编程。 JAX-RS提供的默认调用者实现是同步的,这意味着创建的客户端将阻塞对服务器端的调用。 这个实现的例子如例一所示。

例一

Response response =ClientBuilder.newClient()
                 .target("http://localhost:8080/service-url")
                 .request()
                 .get();

从JAX-RS 2.0版本开始,JAX-RS为客户端提供了异步的支持,通过调用async()方法为客户端API创建异步调用器,如例二所示。

例二

Future<Response> response =ClientBuilder.newClient()
                 .target("http://localhost:8080/service-url")
                 .request()
                 .async()
                 .get();

在客户端上使用异步调用器时将返回Future类型的实例,泛型类型为javax.ws.rs .core.Response。 这种方式可以通过调用future.get()来轮询响应,或者通过注册一个回调函数,当HTTP响应可用时将回调该方法。 这两种实现方式都适用于异步编程,但是当你想嵌套回调函数或者在这些异步执行点添加控制条件时会使程序变得复杂。

JAX-RS 2.1提供了一种响应式的编程方式来解决这些问题。当用新的JAX-RS响应式客户端API来构建客户端时, 只需要调用rx()方法就可以完成响应式调用。 在例三中,rx()方法返回存在于客户端运行时的响应式调用者,并且客户端返回类型为CompletionStage.rx()的响应,通过此简单调用就可以实现从同步调用器切换到异步调用器。

例三

CompletionStage<Response> response =ClientBuilder.newClient()
                 .target("http://localhost:8080/service-url")
                 .request()
                 .rx()
                 .get();

CompletionStage是Java 8中引入的一个新接口,它的名称意味着它可以作为大规模计算中的一个阶段的计算。当我们得到响应实例后,可以调用thenAcceptAsync()方法,在该方法中我们可以提供自己的业务逻辑代码,当响应变为可用时,这些业务逻辑代码片段将异步执行,如例四所示。

例四

response.thenAcceptAsync(res -> {
    Temperature t = res.readEntity(Temperature.class);
    //do stuff with t
});

响应式编程在服务端的应用

响应式方法不仅局限于JAX-RS中的客户端; 也可以在服务器端利用它。 为了演示这一点,我们将首先模拟一个简单的场景,即我们可以从一个服务器端查询位置列表。 对于每个位置,我们将用该位置数据再次调用另一个服务器端点以获取温度值。 端点的交互如图1所示。

图1 端点交互图

首先,我们定义域模型,然后定义每个域模型的服务。 例五定义了Forecast类,它包装了Temperature和Location类。

例五

public class Temperature {
    private Double temperature;
    private String scale;
    // getters & setters
}
public class Location {
    String name;
    public Location() {}
    public Location(String name) {
        this.name = name;
}
    // getters & setters
}
public class Forecast {
    private Location location;
    private Temperature temperature;
    public Forecast(Location location) {
        this.location = location;
    }
public Forecast setTemperature(
                final Temperature temperature) {
        this.temperature = temperature;
        return this;
    }
// getters }

例六中实现了ServiceResponse类,该类封装了温度预测列表。

例六

public class ServiceResponse {
    private long processingTime;
    private List<Forecast> forecasts = new ArrayList<>();
    public void setProcessingTime(long processingTime) {
        this.processingTime = processingTime;
}
    public ServiceResponse forecasts(
               List<Forecast> forecasts) {
        this.forecasts = forecasts;
        return this;
    }
// getters
}

例七中显示的LocationResource定义了三个返回的样本位置,请求URL是:/location。

例七

@Path("/location")
public class LocationResource {
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public Response getLocations() {
        List<Location> locations = new ArrayList<>();
        locations.add(new Location("London"));
        locations.add(new Location("Istanbul"));
        locations.add(new Location("Prague"));
        return Response.ok(
            new GenericEntity<List<Location>>(locations){})
            .build();
} }

如例八所示,TemperatureResource返回给定位置的随机生成的温度值,温度值介于30到50之间。 在实现中添加500 ms的延迟以模拟传感器获取数据。

例八

@Path("/temperature")
public class TemperatureResource {
    @GET
    @Path("/{city}")
    @Produces(MediaType.APPLICATION_JSON)
    public Response getAverageTemperature(
           @PathParam("city") String cityName) {
        Temperature temperature = new Temperature();
        temperature.setTemperature(
            (double) (new Random().nextInt(20)+30));
        temperature.setScale("Celsius");
        try {
            Thread.sleep(500);
        } catch (InterruptedException ignored) {}
        return Response.ok(temperature).build();
    }
}

这里首先显示ForecastResource的同步实现方式(如例九所示),它首先获取所有位置。 然后,对于每个位置,它再调用温度服务来检索该位置的温度值。

例九

@Path("/forecast")
public class ForecastResource {
    @Uri("location")
    private WebTarget locationTarget;
    @Uri("temperature/{city}")
    private WebTarget temperatureTarget;
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public Response getLocationsWithTemperature() {
        long startTime = System.currentTimeMillis();
        ServiceResponse response = new ServiceResponse();
        List<Location> locations = locationTarget.request()
                .get(new GenericType<List<Location>>() {});
        locations.forEach(location -> {
            Temperature temperature = temperatureTarget
                .resolveTemplate("city", location.getName())
                .request()
                .get(Temperature.class);
            response.getForecasts().add(
                new Forecast(location)
                    .setTemperature(temperature));
        });
        long endTime = System.currentTimeMillis();
        response.setProcessingTime(endTime - startTime);
        return Response.ok(response).build();
    }
}

当请求为URL /forecast时,您应该看到类似于例十的输出结果。请注意,请求的处理时间花费了1,533ms,这很有意义,因为同时为三个不同位置请求温度值的累积请求时间理论上应该为1,500ms(500ms*3)。

例十

{
   "forecasts": [
      {
         "location": {
            "name": "London"
         },
         "temperature": {
            "scale": "Celsius",
            "temperature": 33
} },
      {
         "location": {
            "name": "Istanbul"
         },
         "temperature": {
            "scale": "Celsius",
            "temperature": 38
} },
      {
         "location": {
            "name": "Prague"
         },
         "temperature": {
            "scale": "Celsius",
            "temperature": 46
} }
    ],
}

现在是时候在服务器端引入响应式编程了,在获得所有位置之后,可以并行地完成每个位置的温度服务调用。 这可以绝对增强前面显示的同步调用效率低下的问题。 在例十一中,定义了此温度预测服务的响应式编程版本。

例十一

@Path("/reactiveForecast")
public class ForecastReactiveResource {
    @Uri("location")
    private WebTarget locationTarget;
    @Uri("temperature/{city}")
    private WebTarget temperatureTarget;
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public void getLocationsWithTemperature(
          @Suspended final AsyncResponse async) {
        long startTime = System.currentTimeMillis();
        // Create a stage on retrieving locations
        CompletionStage<List<Location>> locationCS =
            locationTarget.request()
                .rx()
                .get(new GenericType<List<Location>>() {});
    
// By composing another stage on the location stage
// created above, collect the list of forecasts
// as in one big completion stage
final CompletionStage<List<Forecast>> forecastCS =
locationCS.thenCompose(locations -> {
   // Create a stage for retrieving forecasts
   // as a list of completion stages
   List<CompletionStage<Forecast>> forecastList =
      // Stream locations and process each
      // location individually
      locations.stream().map(location -> {
         // Create a stage for fetching the
         // temperature value just for one city
         // given by its name
         final CompletionStage<Temperature> tempCS =
             temperatureTarget
             .resolveTemplate("city",
                              location.getName())
             .request()
             .rx()
             .get(Temperature.class);
         // Then create a completable future that
         // contains an instance of forecast
         // with location and temperature values
         return CompletableFuture.completedFuture(
            new Forecast(location))
                .thenCombine(tempCS,
});
                Forecast::setTemperature);
}).collect(Collectors.toList());
// Return a final completable future instance
// when all provided completable futures are
// completed
return CompletableFuture.allOf(
   forecastList.toArray(
      new CompletableFuture[forecastList.size()]))
        .thenApply(v -> forecastList.stream()
        .map(CompletionStage::toCompletableFuture)
                .map(CompletableFuture::join)
                .collect(Collectors.toList()));
// Create an instance of ServiceResponse,
// which contains the whole list of forecasts
// along with the processing time.
// Create a completed future of it and combine to
// forecastCS in order to retrieve the forecasts
// and set into service response
CompletableFuture.completedFuture(
new ServiceResponse())
   .thenCombine(forecastCS,
   ServiceResponse::forecasts)
    .whenCompleteAsync((response, throwable) -> {
       response.setProcessingTime(
         System.currentTimeMillis() - startTime);
       async.resume(response);
});
} }

响应式编程可能第一眼看起来很复杂,但仔细研究后,你会发现它相当简单。在ForecastReactiveResource中,我们首先借助JAX-RS响应式客户端API创建一个客户端调用位置服务。正如我前面提到的,这是对Java EE 8的补充,它可以通过简单地调用rx()方法创建响应式客户端调用者。

响应式编程不仅仅增强了从同步到异步的实现,它也可以通过嵌套阶段等概念简化开发。现在我们根据位置组成另一个阶段来收集温度预测列表。它们将温度预测列表存储在一个名为forecastCS的大完成阶段,作为预测列表。我最终只会使用forecastCS创建服务调用的响应。接下来,我们将每个位置的温度预测阶段存储在forecastList变量中。为了创建每个位置的预测的完成阶段,我在这些位置上进行流式处理,然后再次使用JAX-RS反应客户端API创建tempCS变量,该API将调用指定城市名称的温度服务。在这里我们用resolveTemplate()方法来构建一个客户端,并使我能够将该城市的名称作为参数传递给构建器。

在locations流式输出位置的最后一步,我通过创建一个新的Forecast实例作为参数来调用CompletableFuture.completedFuture()方法。我将这个Future与tempCS阶段结合起来,以便迭代获取每个位置的温度值。

例十一中的CompletableFuture.allOf()方法将完成阶段列表转换为forecastCS。当所有提供的可完成Future完成时,执行此步骤会返回一个完成的Future实例。温度预测服务的响应是ServiceResponse类的一个实例,因此我为此创建了一个完整的Future,然后将forecastCS完成阶段与预测列表组合在一起,并计算服务的响应时间。

当然,这种响应式编程只会使服务器端异步执行;客户端将被阻塞,直到服务器将响应发送回请求者。为了解决这个问题,Server Sent Events(SSE)也可以用来部分发送响应,以便对于每个位置,温度值可以逐一推送到客户端。 ForecastReactiveResource的输出将与例十二类似。如输出所示,处理时间为515ms,这是用于检索一个位置的温度值的理想执行时间。

例十二

{
   "forecasts": [
      {
         "location": {
            "name": "London"
         },
         "temperature": {
            "scale": "Celsius",
            "temperature": 49
} },
      {
         "location": {
            "name": "Istanbul"
         },
         "temperature": {
            "scale": "Celsius",
            "temperature": 32
} },
      {
         "location": {
            "name": "Prague"
         },
         "temperature": {
            "scale": "Celsius",
            "temperature": 45
         }
} ],
   "processingTime": 515
}

在本文的所有例子中,我们首先展示了以同步方式来检索温度预测信息示例。 接着我们采用响应式编程方法,以便在服务调用之间进行异步处理。 当利用Java EE 8的JAX-RS响应式客户端API以及CompletionStage和CompletableFuture等Java 8的类时,异步处理的威力在响应式编程的帮助下释放出来。响应式编程不仅仅是增强从同步模型到异步模型的实现; 它也可以通过嵌套阶段等概念简化开发。 采用的越多,在并行编程中处理复杂场景就越容易。

原文发布于微信公众号 - 安恒信息(DBAPP2013)

原文发表时间:2018-04-10

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏娱乐心理测试

Invalid prop: type check failed for prop "price". Expected String, got Number.

在谷歌浏览器上写Vue项目时,总会有很多警告,关键是红色的,非常刺眼,一片红好像是严重的错误,在有强迫症的程序员眼里非常之别扭,准备清除警告!

20320
来自专栏企鹅号快讯

Java 9 逆天的十大新特性

在介绍 Java 9 之前,我们先来看看 Java 成立到现在的所有版本。 1990 年初,最初被命名为 Oak; 1995 年 5 月 23 日,Java 语...

25550
来自专栏JackieZheng

Nutch源码阅读进程3---fetch

走了一遍Inject和Generate,基本了解了nutch在执行爬取前的一些前期预热工作,包括url的过滤、规则化、分值计算以及其与mapreduce的联系紧...

23750
来自专栏偏前端工程师的驿站

JS读书心得:《JavaScript框架设计》——第12章 异步处理

一、何为异步                                 执行任务的过程可以被分为发起和执行两个部分。 同步执行模式:任务发起后必须等待直...

22570
来自专栏互扯程序

Java 9 逆天的十大新特性

KS Knowledge Sharing 知识分享 现在是资源共享的时代,同样也是知识分享的时代,如果你觉得本文能学到知识,请把知识与别人分享。 在介绍...

27960
来自专栏java思维导图

值得收藏!Redis五大数据类型应用场景(二)

Redis开创了一种新的数据存储思路,使用Redis,我们不用在面对功能单调的数据库时,把精力放在如何把大象放进冰箱这样的问题上,而是利用Redis灵活多变的数...

20120
来自专栏wOw的Android小站

[设计模式]之五:职责链模式

使多个对象都有机会处理请求,从而避免请求的发送者和接收者之间的耦合关系。将这个对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理它为止。

20620
来自专栏腾讯Bugly的专栏

小萝莉说Crash(一):Unrecognized selector sent to instance xxxx

大家好,我是来自Bugly Crash实验室的小萝莉(害羞ing),很高兴能和大家一起讨论关于移动终端App的Crash问题及解决方法。 在上次的“精神哥讲Cr...

89840
来自专栏JarvanMo的IT专栏

Flutter实战:手把手教你写Flutter Plugin

如果你对移动端有所关注,那么你一定会听说过Flutter。得益于Google,Flutter一经推出便得受到了广泛关注。很多开发者跃跃欲试,国内部分大厂,诸如美...

1.9K20
来自专栏osc同步分享-java技术分享站

在springBoot项目中使用activiti

http://www.jvm123.com/2019/08/springboot-activiti/

4.2K70

扫码关注云+社区

领取腾讯云代金券