在看到Jurgen Hoeller引入 新的Spring 5功能后, 我终于开始尝试在尚未发布的Spring Boot 2.0.0 Snapshot中尝试新的Spring WebFlux项目。开始吧:
Maven WebFlux项目生成
演示(反应端点)
在刚刚生成的Spring WebFlux项目中,让我们构建一个REST端点,以 Reactive方式 获取通用存储Item 。首先让我们开始:
@RestController
<b>public</b> <b>class</b> ItemsReactiveController {
@Autowired
<b>private</b> IItemsService iItemsService;
<b>public</b> Flux<Item> findById(String id) {
<b>try</b> {
System.out.println(<font>"Getting the data from DB..."</font><font>);
Thread.sleep(500);
} <b>catch</b> (InterruptedException e) {
e.printStackTrace();
}
<b>return</b> Flux.just(iItemsService.findById(id));
}
@GetMapping(value = </font><font>"/store/{id}"</font><font>, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
<b>public</b> <b>void</b> getItemById(@PathVariable String id) {
System.out.println(</font><font>"Controller start...."</font><font>);
findById(id).subscribe(<b>new</b> Subscriber<Item>() {
@Override
<b>public</b> <b>void</b> onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
<b>public</b> <b>void</b> onNext(Item t) {
System.out.println(</font><font>"Retrieved: "</font><font>+t.getName());
}
@Override
<b>public</b> <b>void</b> onError(Throwable t) {
}
@Override
<b>public</b> <b>void</b> onComplete() {
}
});
System.out.println(</font><font>"End of method."</font><font>);
}
}
</font>
代码详细
方法findById返回Flux <Item>类型。 Flux 是一种以反应方式返回0..N项的数据类型。另外一种可能性是使用返回0或1项的 Mono <Item>类型。
Jurgen Hoeller清楚地提到如果端点返回上述数据类型之一,那么实际上没有返回结果,但是Spring给调用者一个管道,其中结果将最终落地。正如您从NodeJS所知,但是在Spring方式中,引擎盖下的机制非常接近EventLoop。要从Reactive端点获取任何数据,您需要订阅返回的管道。如果您想获得结果的回调,那么管道上的订阅阶段或错误就会被使用来自反应流的 订阅者 ,这是 Project reactor 的底层实现。
第一次测试:
项目源码: https://bitbucket.org/tomask79/spring-reactive-rest.git
让我们用Oracle Store中现有Item的{id}调用先前创建的端点(我不会厌烦使用的JPA配置,它不是演示的主题)。点击浏览器: http://localhost:8081/store/{id }
系统输出:
Controller start....
Getting the data from DB...
<p>[EL Fine]: sql: 2017-03-20 14:19:56.321--ServerSession(26882836)--Connection(29914401)--SELECT ID, ITEM_NAME FROM ITEMS WHERE (ID = ?)
bind => [1 parameter bound]
Retrieved: <Name of the Item>
End of method.
如您所见,代码输出每个步骤:
默认情况下,从主线程获取订阅数据,当然因为我使用的数据存储不提供反应式驱动程序(Oracle),因此调用存储是阻塞的。目前,支持反应式编程(解锁通话)的存储是:
当然,激活检查新项目 Spring Data“Kay” ,才能在上面提到的Spring Data项目中启用反应范例。
要实际启用异步发布我们的项目,我们需要将控制器更改为:
<b>package</b> com.example.controller;
<b>import</b> com.example.domain.Item;
<b>import</b> com.example.service.IItemsService;
<b>import</b> org.reactivestreams.Subscriber;
<b>import</b> org.reactivestreams.Subscription;
<b>import</b> org.springframework.beans.factory.annotation.Autowired;
<b>import</b> org.springframework.http.MediaType;
<b>import</b> org.springframework.web.bind.annotation.GetMapping;
<b>import</b> org.springframework.web.bind.annotation.PathVariable;
<b>import</b> org.springframework.web.bind.annotation.RequestParam;
<b>import</b> org.springframework.web.bind.annotation.RestController;
<b>import</b> reactor.core.publisher.Flux;
<b>import</b> reactor.core.scheduler.Scheduler;
<b>import</b> reactor.core.scheduler.Schedulers;
<font><i>/**
* Created by Tomas.Kloucek on 17.3.2017.
*/</i></font><font>
@RestController
<b>public</b> <b>class</b> ItemsReactiveController {
@Autowired
<b>private</b> IItemsService iItemsService;
<b>public</b> Flux<Item> findById(String id) {
<b>try</b> {
System.out.println(</font><font>"Getting the data from DB..."</font><font>);
Thread.sleep(500);
} <b>catch</b> (InterruptedException e) {
e.printStackTrace();
}
<b>return</b> Flux.just(iItemsService.findById(id)).publishOn(Schedulers.parallel());
}
@GetMapping(value = </font><font>"/store/{id}"</font><font>, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
<b>public</b> <b>void</b> getItemById(@PathVariable String id) {
System.out.println(</font><font>"Controller start...."</font><font>);
findById(id).subscribe(v -> {
System.out.println(</font><font>"Consumed: "</font><font>+v.getId());
});
System.out.println(</font><font>"End of method."</font><font>);
}
}
</font>
现在,如果我们从浏览器再次点击端点,输出将是:
Controller start....
Getting the data from DB...
<p>[EL Fine]: sql: 2017-03-20 14:16:49.69--ServerSession(18245293)--Connection(9048111)--SELECT ID, ITEM_NAME FROM ITEMS WHERE (ID = ?)
bind => [1 parameter bound]
End of method.
Consumed: <ID of your Item entity>
正如您所看到的,Spring在给出订阅请求数据之前就已到达方法的最后(End of method)。
如何创建客户端以调用Reactive Endpoint
让我们用代码创建另一个Spring Boot WebReactive应用程序:
@SpringBootApplication
<b>public</b> <b>class</b> DemoApplication {
@Bean
<b>public</b> WebClient webClient() {
<b>return</b> WebClient.create(<font>"http://<reactiveAppHost>:<reactiveAppPort>"</font><font>);
}
@Bean
CommandLineRunner launch(WebClient webClient) {
<b>return</b> args -> {
webClient.get().Yuri(</font><font>"/store/{id}"</font><font>)
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.flatMap(cr -> cr.bodyToFlux(Item.<b>class</b>))
.subscribe(v -> {
System.out.println(</font><font>"Received from MS: "</font><font>+v.getName());
});
};
}
<b>public</b> <b>static</b> <b>void</b> main(String args[]) {
<b>new</b> SpringApplicationBuilder(DemoApplication.<b>class</b>).properties
(Collections.singletonMap(</font><font>"server.port"</font><font>, </font><font>"8082"</font><font>))
.run(args);
}
}
</font>
代码详细:
要调用Reactive端点,您需要首先获取 WebClient 实例。在演示案例中放入create方法 http://localhost:8081 .。由WebClient.exchange()方法调用执行的自调用方法,
但要实际在管道上放置订阅以获取结果,您需要调用 ClientRequest .bodyToFlux(<ResultClassMapping> .class),这种订阅才是可能的。如果我们运行这个应用程序,那么结果应该是:
Started DemoApplication in 4.631 seconds (JVM running <b>for</b> 4.954) Received from MS: <Item text>
这部分客户端代码:
git clone https://tomask79@bitbucket.org/tomask79/spring-reactive-rest-client.git
用于异步调用REST端点的API是否必要?
我对这种订阅和异步调用端点的新反应趋势的观点是一种悲观。如果我需要异步调用端点,那么JMS或AMPQ是第一个让我进入大脑的想法,特别是在MicroServices中。但我们会看到这将如何发展。Spring Framework 5中的其他计划更改很有希望:
本站文章点击标题看原文!