转载

Spring WebFlux和Reactive编程

在看到Jurgen Hoeller引入 新的Spring 5功能后, 我终于开始尝试在尚未发布的Spring Boot 2.0.0 Snapshot中尝试新的Spring WebFlux项目。开始吧:

Maven WebFlux项目生成

  • 转到 Spring启动应用程序生成器
  • 在Spring Boot版本中选择“2.0.0”以上版本
  • 在依赖项中搜索“ Reactive Web ”
  • 保存生成的maven项目

演示(反应端点)

在刚刚生成的Spring WebFlux项目中,让我们构建一个REST端点,以 Reactive方式 获取通用存储Item 。首先让我们开始:

@RestController
<b>public</b> <b>class</b> ItemsReactiveController {

    @Autowired
    <b>private</b> IItemsService iItemsService;

    <b>public</b> Flux<Item> findById(String id) {
        <b>try</b> {
            System.out.println(<font>"Getting the data from DB..."</font><font>);
            Thread.sleep(500);
        } <b>catch</b> (InterruptedException e) {
            e.printStackTrace();
        }
        <b>return</b> Flux.just(iItemsService.findById(id));
    }

    @GetMapping(value = </font><font>"/store/{id}"</font><font>, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    <b>public</b> <b>void</b> getItemById(@PathVariable String id) {
        System.out.println(</font><font>"Controller start...."</font><font>);
        findById(id).subscribe(<b>new</b> Subscriber<Item>() {

            @Override
            <b>public</b> <b>void</b> onSubscribe(Subscription s) {
                s.request(Long.MAX_VALUE);
            }
            @Override
            <b>public</b> <b>void</b> onNext(Item t) {
                System.out.println(</font><font>"Retrieved: "</font><font>+t.getName());
            }
            @Override
            <b>public</b> <b>void</b> onError(Throwable t) {
            }
            @Override
            <b>public</b> <b>void</b> onComplete() {
            }
        });
        System.out.println(</font><font>"End of method."</font><font>);
    }
}
</font>

代码详细

方法findById返回Flux <Item>类型。 Flux 是一种以反应方式返回0..N项的数据类型。另外一种可能性是使用返回0或1项的 Mono <Item>类型。

Jurgen Hoeller清楚地提到如果端点返回上述数据类型之一,那么实际上没有返回结果,但是Spring给调用者一个管道,其中结果将最终落地。正如您从NodeJS所知,但是在Spring方式中,引擎盖下的机制非常接近EventLoop。要从Reactive端点获取任何数据,您需要订阅返回的管道。如果您想获得结果的回调,那么管道上的订阅阶段或错误就会被使用来自反应流的 订阅者 ,这是 Project reactor 的底层实现。

第一次测试:

项目源码: https://bitbucket.org/tomask79/spring-reactive-rest.git

让我们用Oracle Store中现有Item的{id}调用先前创建的端点(我不会厌烦使用的JPA配置,它不是演示的主题)。点击浏览器: http://localhost:8081/store/{id }

系统输出:

Controller start....
Getting the data from DB...
<p>[EL Fine]: sql: 2017-03-20 14:19:56.321--ServerSession(26882836)--Connection(29914401)--SELECT ID, ITEM_NAME FROM ITEMS WHERE (ID = ?)
        bind => [1 parameter bound]
Retrieved: <Name of the Item>
End of method.

如您所见,代码输出每个步骤:

  • 调用控制器(Controller start ....)
  • 获取item调用服务(Retrieved: <Name of the Item>)
  • 达到了方法的结束。(End of method)

默认情况下,从主线程获取订阅数据,当然因为我使用的数据存储不提供反应式驱动程序(Oracle),因此调用存储是阻塞的。目前,支持反应式编程(解锁通话)的存储是:

  • Mongo
  • Redis
  • Cassandra
  • Postgres

当然,激活检查新项目 Spring Data“Kay” ,才能在上面提到的Spring Data项目中启用反应范例。

要实际启用异步发布我们的项目,我们需要将控制器更改为:

<b>package</b> com.example.controller;

<b>import</b> com.example.domain.Item;
<b>import</b> com.example.service.IItemsService;
<b>import</b> org.reactivestreams.Subscriber;
<b>import</b> org.reactivestreams.Subscription;
<b>import</b> org.springframework.beans.factory.annotation.Autowired;
<b>import</b> org.springframework.http.MediaType;
<b>import</b> org.springframework.web.bind.annotation.GetMapping;
<b>import</b> org.springframework.web.bind.annotation.PathVariable;
<b>import</b> org.springframework.web.bind.annotation.RequestParam;
<b>import</b> org.springframework.web.bind.annotation.RestController;
<b>import</b> reactor.core.publisher.Flux;
<b>import</b> reactor.core.scheduler.Scheduler;
<b>import</b> reactor.core.scheduler.Schedulers;

<font><i>/**
 * Created by Tomas.Kloucek on 17.3.2017.
 */</i></font><font>
@RestController
<b>public</b> <b>class</b> ItemsReactiveController {

    @Autowired
    <b>private</b> IItemsService iItemsService;

    <b>public</b> Flux<Item> findById(String id) {
        <b>try</b> {
            System.out.println(</font><font>"Getting the data from DB..."</font><font>);
            Thread.sleep(500);
        } <b>catch</b> (InterruptedException e) {
            e.printStackTrace();
        }
        <b>return</b> Flux.just(iItemsService.findById(id)).publishOn(Schedulers.parallel());
    }

    @GetMapping(value = </font><font>"/store/{id}"</font><font>, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    <b>public</b> <b>void</b> getItemById(@PathVariable String id) {
        System.out.println(</font><font>"Controller start...."</font><font>);
        findById(id).subscribe(v -> {
           System.out.println(</font><font>"Consumed: "</font><font>+v.getId());
        });
        System.out.println(</font><font>"End of method."</font><font>);
    }
}
</font>
  • 改变了订阅只获得结果。如果你知道RxJava你应该熟悉。
  • 添加了publishOn方法,并调度了用于异步发布Item的线程。

现在,如果我们从浏览器再次点击端点,输出将是:

Controller start....
Getting the data from DB...
<p>[EL Fine]: sql: 2017-03-20 14:16:49.69--ServerSession(18245293)--Connection(9048111)--SELECT ID, ITEM_NAME FROM ITEMS WHERE (ID = ?)
        bind => [1 parameter bound]
End of method.
Consumed: <ID of your Item entity>

正如您所看到的,Spring在给出订阅请求数据之前就已到达方法的最后(End of method)。

如何创建客户端以调用Reactive Endpoint

让我们用代码创建另一个Spring Boot WebReactive应用程序:

@SpringBootApplication
<b>public</b> <b>class</b> DemoApplication {

    @Bean
    <b>public</b> WebClient webClient() {
        <b>return</b> WebClient.create(<font>"http://<reactiveAppHost>:<reactiveAppPort>"</font><font>);
    }

    @Bean
    CommandLineRunner launch(WebClient webClient) {
        <b>return</b> args -> {
            webClient.get().Yuri(</font><font>"/store/{id}"</font><font>)
                    .accept(MediaType.TEXT_EVENT_STREAM)
                    .exchange()
                    .flatMap(cr -> cr.bodyToFlux(Item.<b>class</b>))
                    .subscribe(v -> {
                        System.out.println(</font><font>"Received from MS: "</font><font>+v.getName());
                    });
        };
    }

    <b>public</b> <b>static</b> <b>void</b> main(String args[]) {
        <b>new</b> SpringApplicationBuilder(DemoApplication.<b>class</b>).properties
                (Collections.singletonMap(</font><font>"server.port"</font><font>, </font><font>"8082"</font><font>))
                .run(args);
    }
}
</font>

代码详细:

要调用Reactive端点,您需要首先获取 WebClient 实例。在演示案例中放入create方法 http://localhost:8081 .。由WebClient.exchange()方法调用执行的自调用方法,

但要实际在管道上放置订阅以获取结果,您需要调用 ClientRequest .bodyToFlux(<ResultClassMapping> .class),这种订阅才是可能的。如果我们运行这个应用程序,那么结果应该是:

Started DemoApplication in 4.631 seconds (JVM running <b>for</b> 4.954)
Received from MS: <Item text>

这部分客户端代码:

git clone https://tomask79@bitbucket.org/tomask79/spring-reactive-rest-client.git

用于异步调用REST端点的API是否必要?

我对这种订阅和异步调用端点的新反应趋势的观点是一种悲观。如果我需要异步调用端点,那么JMS或AMPQ是第一个让我进入大脑的想法,特别是在MicroServices中。但我们会看到这将如何发展。Spring Framework 5中的其他计划更改很有希望:

  • 带有Angular的函数 框架 ,类似Router
  • 支持Project Jigsaw
  • 注册beanLamdas。 

​​​​​​​本站文章点击标题看原文!

原文  https://www.jdon.com/51399
正文到此结束
Loading...