首先我们来看一段视频,然后开始今天的内容:
上面这样的效果正是我们使用spring webflux实现的。
整体思路
整个项目分为两部分:
1、service。service是一个使用Kotlin来编写的spring boot应用。每200毫秒push一个随机的报价。可以是SSE(server-sent events)或者json stream。
2、app。app里边包含两个功能。
(1)、普通的spring mvc。主要是通过内置的mongodb存储用户,然后展示在前端。
(2)、webflux controller。负责调用service的route来获取随机的报价然后返回到前端(html)。
基本的流程:
Service
接下来我们使用kotlin来编写service。service主要功能就是随机的push一个报价。
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jre8</artifactId>
<version>${kotlin.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
<version>${kotlin.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
主要是引入webflux、kotlin的jre8的包,以及kotlin的reflect包。另外引入了spring-boot-devtools。这个包可以为我们的开发提供更多的便利,比如一些默认配置、自动重启来及时反映变更等等。
application.properties配置:
server.port=8081
spring.jackson.serialization.write-date-timestamps-as-nanoseconds=false
现在我们先来新建一个报价实体类:
data class Quote(
val ticker: String,
val price: BigDecimal,
val instant: Instant = Instant.now()
)
然后新建一个用来生成报价的类:
@Component
class QuoteGenerator {
val mathContext = MathContext(2)
val random = Random()
val prices = listOf(
Quote("CTXS", BigDecimal(82.26, mathContext)),
Quote("DELL", BigDecimal(63.74, mathContext)),
Quote("GOOG", BigDecimal(847.24, mathContext)),
Quote("MSFT", BigDecimal(65.11, mathContext)),
Quote("ORCL", BigDecimal(45.71, mathContext)),
Quote("RHT", BigDecimal(84.29, mathContext)),
Quote("VMW", BigDecimal(92.21, mathContext))
)
fun fetchQuoteStream(period: Duration) = Flux.generate({ 0 },
{ index, sink: SynchronousSink<Quote> ->
sink.next(updateQuote(prices[index]))
(index + 1) % prices.size
}).zipWith(Flux.interval(period))
.map { it.t1.copy(instant = Instant.now()) }
.share()
.log()
private fun updateQuote(quote: Quote) = quote.copy(
price = quote.price.add(quote.price.multiply(
BigDecimal(0.05 * random.nextDouble()), mathContext))
)
}
上面主要对外暴露一个function,就是fetchQuoteStream。
然后我们新建一个handler类,用来分别处理sse或者json的请求:
@Component
class QuoteHandler(val quoteGenerator: QuoteGenerator) {
fun fetchQuotesSSE(req: ServerRequest) = ok()
.contentType(TEXT_EVENT_STREAM)
.body(quoteGenerator.fetchQuoteStream(ofMillis(200)), Quote::class.java)
fun fetchQuotes(req: ServerRequest) = ok()
.contentType(APPLICATION_STREAM_JSON)
.body(quoteGenerator.fetchQuoteStream(ofMillis(200)), Quote::class.java)
}
把之前的报价生成器注入,然后分别新建两个函数,一个返回的是json流,一个则是SSE。
然后我们写一个配置类,用来注册映射,分别支持text event stream和application stream json:
@Configuration
class QuoteRoutes(val quoteHandler: QuoteHandler) {
@Bean
fun quoteRouter() = router {
GET("/quotes").nest {
accept(TEXT_EVENT_STREAM, quoteHandler::fetchQuotesSSE)
accept(APPLICATION_STREAM_JSON, quoteHandler::fetchQuotes)
}
}
}
然后新建一个启动类:
@SpringBootApplication
class QuoteService
fun main(args: Array<String>) {
SpringApplication.run(QuoteService::class.java, *args)
}
App
app包括后端和前端。后端提供webflux的push能力以及普通的mvc能力。前端使用thymeleaf来写一个简单的页面。主要是使用highcharts来动态的展示后端push过来的最新报价。
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>bootstrap</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>highcharts</artifactId>
<scope>runtime</scope>
</dependency>
主要引入webflux、mongodb、thymeleaf、datatype、springboot devtools、embed mongo(内置mongo)、bootstrap以及highcharts。
实体类映射:
@Document
public class User {
@Id
private String id;
private String github;
private String name;
public User() {
}
public User(String github, String name) {
this.github = github;
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getGithub() {
return github;
}
public void setGithub(String github) {
this.github = github;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
mongo mapper:
public interface UserRepository extends ReactiveMongoRepository<User, String> {
Mono<User> findUserByGithub(String github);
}
报价类:
class Quote {
private static final MathContext MATH_CONTEXT = new MathContext(2);
private String ticker;
private BigDecimal price;
private Instant instant;
public Quote() {
}
public Quote(String ticker, BigDecimal price) {
this.ticker = ticker;
this.price = price;
}
public Quote(String ticker, Double price) {
this(ticker, new BigDecimal(price, MATH_CONTEXT));
}
public String getTicker() {
return ticker;
}
public void setTicker(String ticker) {
this.ticker = ticker;
}
public BigDecimal getPrice() {
return price;
}
public void setPrice(BigDecimal price) {
this.price = price;
}
public Instant getInstant() {
return instant;
}
public void setInstant(Instant instant) {
this.instant = instant;
}
public void setInstant(long epoch) {
this.instant = Instant.ofEpochSecond(epoch);
}
@Override
public String toString() {
return "Quote{" +
"ticker='" + ticker + '\'' +
", price=" + price +
", instant=" + instant +
'}';
}
}
controller:
@Controller
public class MainController {
private final UserRepository userRepository;
public MainController(UserRepository userRepository) {
this.userRepository = userRepository;
}
@GetMapping("/")
public String home(Model model) {
model.addAttribute("users", this.userRepository.findAll());
return "index";
}
@GetMapping("/quotes")
public String quotes() {
return "quotes";
}
@GetMapping(path = "/quotes/feed", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Quote> fetchQuotesStream() {
return WebClient.create("http://localhost:8081")
.get()
.uri("/quotes")
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(Quote.class)
.share()
.log();
}
}
上面的controller主要有两个mapping,一个是flux mapping,一个是普通 spring mvc mapping。
flux mapping:
@GetMapping(path = "/quotes/feed", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Quote> fetchQuotesStream() {
return WebClient.create("http://localhost:8081")
.get()
.uri("/quotes")
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(Quote.class)
.share()
.log();
}
普通的mvc mapping:
@GetMapping("/")
public String home(Model model) {
model.addAttribute("users", this.userRepository.findAll());
return "index";
}
启动类:
@SpringBootApplication
public class QuoteApplication {
public static void main(String[] args) {
SpringApplication.run(QuoteApplication.class, args);
}
@Bean
public CommandLineRunner createUsers(UserRepository userRepository) {
return strings -> {
List<User> users = Arrays.asList(
new User("importsource", "ImportSource"),
new User("sdeleuze", "Sebastien Deleuze"),
new User("bclozel", "Brian Clozel"),
new User("rstoyanchev", "Rossen Stoyanchev"),
new User("smaldini", "Stephane Maldini"),
new User("sbasle", "Simon Basle"),
new User("snicoll", "Stephane Nicoll")
);
userRepository.saveAll(users).blockLast(Duration.ofSeconds(3));
};
}
}
通过cammandline runner 在启动时,初始化数据,然后保存到mongodb。
然后在resources目录下新建templates目录,然后新建两个页面:
1、index.html:
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="utf-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=edge"/>
<meta name="viewport" content="width=device-width, initial-scale=1"/>
<meta name="description" content="Spring WebFlux Streaming"/>
<meta name="author" content="Brian Clozel and Sebastien Deleuze"/>
<title>Spring WebFlux Streaming</title>
<link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap-theme.min.css"/>
<link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap.min.css"/>
</head>
<body>
<nav class="navbar navbar-default">
<div class="container-fluid">
<div class="navbar-header">
<a class="navbar-brand" href="/">Spring WebFlux Streaming</a>
</div>
<div id="navbar" class="navbar-collapse collapse">
<ul class="nav navbar-nav">
<li class="active"><a href="/">Home</a></li>
<li><a href="/quotes">Quotes</a></li>
</ul>
</div>
</div>
</nav>
<div class="container wrapper">
<h2>List of Spring developers</h2>
<table class="table table-striped">
<thead>
<tr>
<th>#</th>
<th>Github</th>
<th>Name</th>
</tr>
</thead>
<tbody>
<tr th:each="user: ${users}">
<th th:text="${user.id}">42</th>
<td><a th:href="@{'https://github.com/'+${user.github}}" th:text="${user.github}">githubUser</a></td>
<td th:text="${user.name}">Jane Doe</td>
</tr>
</tbody>
</table>
</div>
<script type="text/javascript" src="/webjars/jquery/1.11.1/jquery.min.js"></script>
<script type="text/javascript" src="/webjars/bootstrap/3.3.7/js/bootstrap.min.js"></script>
</body>
</html>
2、quotes.html:
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="utf-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=edge"/>
<meta name="viewport" content="width=device-width, initial-scale=1"/>
<meta name="description" content="Spring WebFlux Streaming"/>
<meta name="author" content="Brian Clozel and Sebastien Deleuze"/>
<title>Spring WebFlux Streaming</title>
<link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap-theme.min.css"/>
<link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap.min.css"/>
<link rel="stylesheet" href="/webjars/highcharts/5.0.8/css/highcharts.css"/>
</head>
<body>
<nav class="navbar navbar-default">
<div class="container-fluid">
<div class="navbar-header">
<a class="navbar-brand" href="/">Spring WebFlux Streaming</a>
</div>
<div id="navbar" class="navbar-collapse collapse">
<ul class="nav navbar-nav">
<li><a href="/">Home</a></li>
<li class="active"><a href="/quotes">Quotes</a></li>
</ul>
</div>
</div>
</nav>
<div class="container wrapper">
<div id="chart" style="height: 400px; min-width: 310px"></div>
</div>
<script type="text/javascript" src="/webjars/jquery/1.11.1/jquery.min.js"></script>
<script type="text/javascript" src="/webjars/highcharts/5.0.8/highcharts.js"></script>
<script type="text/javascript" src="/webjars/bootstrap/3.3.7/js/bootstrap.min.js"></script>
<script type="text/javascript">
var chart = new Highcharts.chart('chart', {
title: {
text: 'My Stock Portfolio'
},
yAxis: {
title: {
text: 'Stock Price'
}
},
legend: {
layout: 'vertical',
align: 'right',
verticalAlign: 'middle'
},
xAxis: {
type: 'datetime',
},
series: [{
name: 'CTXS',
data: []
}, {
name: 'MSFT',
data: []
}, {
name: 'ORCL',
data: []
}, {
name: 'RHT',
data: []
}, {
name: 'VMW',
data: []
}, {
name: 'DELL',
data: []
}]
});
var appendStockData = function (quote) {
chart.series
.filter(function (serie) {
return serie.name == quote.ticker
})
.forEach(function (serie) {
var shift = serie.data.length > 40;
serie.addPoint([quote.instant * 1000, quote.price], true, shift);
});
};
var stockEventSource = new EventSource("/quotes/feed");
stockEventSource.onmessage = function (e) {
appendStockData(JSON.parse(e.data));
}
</script>
</body>
</html>
演示
先后启动 service和app。然后输入localhost:8080,如下界面:
点击quotes来到动态展示报价页面:
以下是两段视频:
源码请点击“阅读原文”!
总结
webflux可以让你轻松的构建基于流的,那种动态展现的应用。作为一个与webmvc平级的项目,前景不可限量。kotlin的写法看起来简单而可爱,是当下比较火的函数式编程推进的结果之一,但真正的发展壮大以及语法的严谨性等还有待观察(ps:一个少言寡语的人看起来是挺酷,但说的太少也会让人摸不着他到底想要表达什么)。有关webflux的内容可移步:Spring 5 新增全新的reactive web框架:webflux。有关kotlin的内容可移步:来来来,快来围观那个Kotlin。
本文分享自 ImportSource 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!