使用Kotlin开发一个Spring Boot Webflux Streaming应用(附视频)

首先我们来看一段视频,然后开始今天的内容:

视频内容

上面这样的效果正是我们使用spring webflux实现的。

整体思路

整个项目分为两部分:

1、service。service是一个使用Kotlin来编写的spring boot应用。每200毫秒push一个随机的报价。可以是SSE(server-sent events)或者json stream。

2、app。app里边包含两个功能。

(1)、普通的spring mvc。主要是通过内置的mongodb存储用户,然后展示在前端。

(2)、webflux controller。负责调用service的route来获取随机的报价然后返回到前端(html)。

基本的流程:

Service

接下来我们使用kotlin来编写service。service主要功能就是随机的push一个报价。

依赖:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
   <groupId>com.fasterxml.jackson.datatype</groupId>
   <artifactId>jackson-datatype-jsr310</artifactId>
</dependency>

<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-stdlib-jre8</artifactId>
   <version>${kotlin.version}</version>
</dependency>
<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-reflect</artifactId>
   <version>${kotlin.version}</version>
</dependency>

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-devtools</artifactId>
   <optional>true</optional>
</dependency>

主要是引入webflux、kotlin的jre8的包,以及kotlin的reflect包。另外引入了spring-boot-devtools。这个包可以为我们的开发提供更多的便利,比如一些默认配置、自动重启来及时反映变更等等。

application.properties配置:

server.port=8081

spring.jackson.serialization.write-date-timestamps-as-nanoseconds=false

现在我们先来新建一个报价实体类:

data class Quote(
    val ticker: String, 
    val price: BigDecimal, 
    val instant: Instant = Instant.now()
)

然后新建一个用来生成报价的类:

@Component
class QuoteGenerator {

    val mathContext = MathContext(2)

    val random = Random()

    val prices = listOf(
            Quote("CTXS", BigDecimal(82.26, mathContext)),
            Quote("DELL", BigDecimal(63.74, mathContext)),
            Quote("GOOG", BigDecimal(847.24, mathContext)),
            Quote("MSFT", BigDecimal(65.11, mathContext)),
            Quote("ORCL", BigDecimal(45.71, mathContext)),
            Quote("RHT", BigDecimal(84.29, mathContext)),
            Quote("VMW", BigDecimal(92.21, mathContext))
    )


    fun fetchQuoteStream(period: Duration) = Flux.generate({ 0 },
            { index, sink: SynchronousSink<Quote> ->
                sink.next(updateQuote(prices[index]))
                (index + 1) % prices.size
            }).zipWith(Flux.interval(period))
            .map { it.t1.copy(instant = Instant.now()) }
            .share()
            .log()


    private fun updateQuote(quote: Quote) = quote.copy(
            price = quote.price.add(quote.price.multiply(
                    BigDecimal(0.05 * random.nextDouble()), mathContext))
    )

}

上面主要对外暴露一个function,就是fetchQuoteStream。

然后我们新建一个handler类,用来分别处理sse或者json的请求:

@Component
class QuoteHandler(val quoteGenerator: QuoteGenerator) {

    fun fetchQuotesSSE(req: ServerRequest) = ok()
            .contentType(TEXT_EVENT_STREAM)
            .body(quoteGenerator.fetchQuoteStream(ofMillis(200)), Quote::class.java)

    fun fetchQuotes(req: ServerRequest) = ok()
            .contentType(APPLICATION_STREAM_JSON)
            .body(quoteGenerator.fetchQuoteStream(ofMillis(200)), Quote::class.java)

}

把之前的报价生成器注入,然后分别新建两个函数,一个返回的是json流,一个则是SSE。

然后我们写一个配置类,用来注册映射,分别支持text event stream和application stream json:

@Configuration
class QuoteRoutes(val quoteHandler: QuoteHandler) {

    @Bean
    fun quoteRouter() = router {
        GET("/quotes").nest {
            accept(TEXT_EVENT_STREAM, quoteHandler::fetchQuotesSSE)
            accept(APPLICATION_STREAM_JSON, quoteHandler::fetchQuotes)
        }
    }

}

然后新建一个启动类:

@SpringBootApplication
class QuoteService

fun main(args: Array<String>) {
    SpringApplication.run(QuoteService::class.java, *args)
}

App

app包括后端和前端。后端提供webflux的push能力以及普通的mvc能力。前端使用thymeleaf来写一个简单的页面。主要是使用highcharts来动态的展示后端push过来的最新报价。

依赖:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>

<dependency>
   <groupId>com.fasterxml.jackson.datatype</groupId>
   <artifactId>jackson-datatype-jsr310</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-devtools</artifactId>
   <optional>true</optional>
</dependency>

<dependency>
   <groupId>de.flapdoodle.embed</groupId>
   <artifactId>de.flapdoodle.embed.mongo</artifactId>
   <scope>runtime</scope>
</dependency>

<dependency>
   <groupId>org.webjars</groupId>
   <artifactId>bootstrap</artifactId>
   <scope>runtime</scope>
</dependency>
<dependency>
   <groupId>org.webjars</groupId>
   <artifactId>highcharts</artifactId>
   <scope>runtime</scope>
</dependency>

主要引入webflux、mongodb、thymeleaf、datatype、springboot devtools、embed mongo(内置mongo)、bootstrap以及highcharts。

实体类映射:

@Document
public class User {

   @Id
   private String id;

   private String github;

   private String name;

   public User() {
   }

   public User(String github, String name) {
      this.github = github;
      this.name = name;
   }

   public String getId() {
      return id;
   }

   public void setId(String id) {
      this.id = id;
   }

   public String getGithub() {
      return github;
   }

   public void setGithub(String github) {
      this.github = github;
   }

   public String getName() {
      return name;
   }

   public void setName(String name) {
      this.name = name;
   }

}

mongo mapper:

public interface UserRepository extends ReactiveMongoRepository<User, String> {
   Mono<User> findUserByGithub(String github);
}

报价类:

class Quote {

   private static final MathContext MATH_CONTEXT = new MathContext(2);

   private String ticker;

   private BigDecimal price;

   private Instant instant;

   public Quote() {
   }

   public Quote(String ticker, BigDecimal price) {
      this.ticker = ticker;
      this.price = price;
   }

   public Quote(String ticker, Double price) {
      this(ticker, new BigDecimal(price, MATH_CONTEXT));
   }

   public String getTicker() {
      return ticker;
   }

   public void setTicker(String ticker) {
      this.ticker = ticker;
   }

   public BigDecimal getPrice() {
      return price;
   }

   public void setPrice(BigDecimal price) {
      this.price = price;
   }

   public Instant getInstant() {
      return instant;
   }

   public void setInstant(Instant instant) {
      this.instant = instant;
   }

   public void setInstant(long epoch) {
      this.instant = Instant.ofEpochSecond(epoch);
   }

   @Override
   public String toString() {
      return "Quote{" +
            "ticker='" + ticker + '\'' +
            ", price=" + price +
            ", instant=" + instant +
            '}';
   }

}

controller:

@Controller
public class MainController {

   private final UserRepository userRepository;

   public MainController(UserRepository userRepository) {
      this.userRepository = userRepository;
   }

   @GetMapping("/")
   public String home(Model model) {
      model.addAttribute("users", this.userRepository.findAll());
      return "index";
   }

   @GetMapping("/quotes")
   public String quotes() {
      return "quotes";
   }

   @GetMapping(path = "/quotes/feed", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
   @ResponseBody
   public Flux<Quote> fetchQuotesStream() {
      return WebClient.create("http://localhost:8081")
            .get()
            .uri("/quotes")
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .retrieve()
            .bodyToFlux(Quote.class)
            .share()
            .log();
   }

}

上面的controller主要有两个mapping,一个是flux mapping,一个是普通 spring mvc mapping。

flux mapping:

@GetMapping(path = "/quotes/feed", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Quote> fetchQuotesStream() {
   return WebClient.create("http://localhost:8081")
         .get()
         .uri("/quotes")
         .accept(MediaType.APPLICATION_STREAM_JSON)
         .retrieve()
         .bodyToFlux(Quote.class)
         .share()
         .log();
}

普通的mvc mapping:

@GetMapping("/")
public String home(Model model) {
   model.addAttribute("users", this.userRepository.findAll());
   return "index";
}

启动类:

@SpringBootApplication
public class QuoteApplication {

   public static void main(String[] args) {
      SpringApplication.run(QuoteApplication.class, args);
   }

   @Bean
   public CommandLineRunner createUsers(UserRepository userRepository) {
      return strings -> {
         List<User> users = Arrays.asList(
               new User("importsource", "ImportSource"),
               new User("sdeleuze", "Sebastien Deleuze"),
               new User("bclozel", "Brian Clozel"),
               new User("rstoyanchev", "Rossen Stoyanchev"),
               new User("smaldini", "Stephane Maldini"),
               new User("sbasle", "Simon Basle"),
               new User("snicoll", "Stephane Nicoll")
         );
         userRepository.saveAll(users).blockLast(Duration.ofSeconds(3));
      };
   }

}

通过cammandline runner 在启动时,初始化数据,然后保存到mongodb。

然后在resources目录下新建templates目录,然后新建两个页面:

1、index.html:

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="utf-8"/>
    <meta http-equiv="X-UA-Compatible" content="IE=edge"/>
    <meta name="viewport" content="width=device-width, initial-scale=1"/>
    <meta name="description" content="Spring WebFlux Streaming"/>
    <meta name="author" content="Brian Clozel and Sebastien Deleuze"/>
    <title>Spring WebFlux Streaming</title>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap-theme.min.css"/>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap.min.css"/>
</head>
<body>
<nav class="navbar navbar-default">
    <div class="container-fluid">
        <div class="navbar-header">
            <a class="navbar-brand" href="/">Spring WebFlux Streaming</a>
        </div>
        <div id="navbar" class="navbar-collapse collapse">
            <ul class="nav navbar-nav">
                <li class="active"><a href="/">Home</a></li>
                <li><a href="/quotes">Quotes</a></li>
            </ul>
        </div>
    </div>
</nav>
<div class="container wrapper">
    <h2>List of Spring developers</h2>
    <table class="table table-striped">
        <thead>
        <tr>
            <th>#</th>
            <th>Github</th>
            <th>Name</th>
        </tr>
        </thead>
        <tbody>
        <tr th:each="user: ${users}">
            <th th:text="${user.id}">42</th>
            <td><a th:href="@{'https://github.com/'+${user.github}}" th:text="${user.github}">githubUser</a></td>
            <td th:text="${user.name}">Jane Doe</td>
        </tr>
        </tbody>
    </table>
</div>
<script type="text/javascript" src="/webjars/jquery/1.11.1/jquery.min.js"></script>
<script type="text/javascript" src="/webjars/bootstrap/3.3.7/js/bootstrap.min.js"></script>
</body>
</html>

2、quotes.html:

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="utf-8"/>
    <meta http-equiv="X-UA-Compatible" content="IE=edge"/>
    <meta name="viewport" content="width=device-width, initial-scale=1"/>
    <meta name="description" content="Spring WebFlux Streaming"/>
    <meta name="author" content="Brian Clozel and Sebastien Deleuze"/>
    <title>Spring WebFlux Streaming</title>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap-theme.min.css"/>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap.min.css"/>
    <link rel="stylesheet" href="/webjars/highcharts/5.0.8/css/highcharts.css"/>
</head>
<body>
<nav class="navbar navbar-default">
    <div class="container-fluid">
        <div class="navbar-header">
            <a class="navbar-brand" href="/">Spring WebFlux Streaming</a>
        </div>
        <div id="navbar" class="navbar-collapse collapse">
            <ul class="nav navbar-nav">
                <li><a href="/">Home</a></li>
                <li class="active"><a href="/quotes">Quotes</a></li>
            </ul>
        </div>
    </div>
</nav>
<div class="container wrapper">
    <div id="chart" style="height: 400px; min-width: 310px"></div>
</div>
<script type="text/javascript" src="/webjars/jquery/1.11.1/jquery.min.js"></script>
<script type="text/javascript" src="/webjars/highcharts/5.0.8/highcharts.js"></script>
<script type="text/javascript" src="/webjars/bootstrap/3.3.7/js/bootstrap.min.js"></script>
<script type="text/javascript">
    var chart = new Highcharts.chart('chart', {
        title: {
            text: 'My Stock Portfolio'
        },
        yAxis: {
            title: {
                text: 'Stock Price'
            }
        },
        legend: {
            layout: 'vertical',
            align: 'right',
            verticalAlign: 'middle'
        },
        xAxis: {
            type: 'datetime',
        },
        series: [{
            name: 'CTXS',
            data: []
        }, {
            name: 'MSFT',
            data: []
        }, {
            name: 'ORCL',
            data: []
        }, {
            name: 'RHT',
            data: []
        }, {
            name: 'VMW',
            data: []
        }, {
            name: 'DELL',
            data: []
        }]

    });
    var appendStockData = function (quote) {
        chart.series
            .filter(function (serie) {
                return serie.name == quote.ticker
            })
            .forEach(function (serie) {
                var shift = serie.data.length > 40;
                serie.addPoint([quote.instant * 1000, quote.price], true, shift);
            });
    };
    var stockEventSource = new EventSource("/quotes/feed");
    stockEventSource.onmessage = function (e) {
        appendStockData(JSON.parse(e.data));
    }
</script>
</body>
</html>

演示

先后启动 service和app。然后输入localhost:8080,如下界面:

点击quotes来到动态展示报价页面:

以下是两段视频:

视频内容
视频内容

源码请点击“阅读原文”!

总结

webflux可以让你轻松的构建基于流的,那种动态展现的应用。作为一个与webmvc平级的项目,前景不可限量。kotlin的写法看起来简单而可爱,是当下比较火的函数式编程推进的结果之一,但真正的发展壮大以及语法的严谨性等还有待观察(ps:一个少言寡语的人看起来是挺酷,但说的太少也会让人摸不着他到底想要表达什么)。有关webflux的内容可移步:Spring 5 新增全新的reactive web框架:webflux。有关kotlin的内容可移步:来来来,快来围观那个Kotlin

原文发布于微信公众号 - ImportSource(importsource)

原文发表时间:2017-05-24

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏hbbliyong

5步搭建GO环境

Easy Go Programming Setup for Windows Dec 23, 2014 I’ve had to do this more t...

42370
来自专栏码匠的流水账

聊聊resilience4j的CircuitBreakerConfig

本文主要研究一下resilience4j的CircuitBreakerConfig

35720
来自专栏技术记录

rabbitMQ教程(三) spring整合rabbitMQ代码实例

一、开启rabbitMQ服务,导入MQ jar包和gson jar包(MQ默认的是jackson,但是效率不如Gson,所以我们用gson) ? ?  二、发...

35880
来自专栏ml

linux下多路复用模型之Select模型

Linux关于并发网络分为Apache模型(Process per Connection (进程连接) ) 和TPC , 还有select模型,以及poll模型...

39840
来自专栏一枝花算不算浪漫

SpringBoot自定义序列化的使用方式--WebMvcConfigurationSupport

20310
来自专栏开发与安全

《dive into python3》 笔记摘录

0、In Python 2, the / operator usually meant integer division, but you could make...

28400
来自专栏一个会写诗的程序员的博客

《Springboot极简教程》 第11章 Springboot集成mongodb开发小结

本章我们通过SpringBoot集成mongodb,Java,Kotlin开发一个极简社区文章博客系统。

13740
来自专栏一个会写诗的程序员的博客

Kotlin集成 SpringBoot 混合Java库开发

apply plugin: 'org.springframework.boot' apply plugin: 'kotlin'

9610
来自专栏流柯技术学院

CentOS升级Python2.7导致使用pip等命令安装模块失败

出现这个问题是因为:虽然已经把Python升级到了2.7版本,但是pip仍然是原来的版本,仍在原来python的site-package里面

21030
来自专栏蜉蝣禅修之道

Linux shell之sort命令

21630

扫码关注云+社区

领取腾讯云代金券