springboot 使用webflux响应式开发教程(二)

摘要:
本文是对使用web flux的spring boot的响应式开发教程(i)的进一步研究。Web容器spring boot starter webflux附带了spring boot starter reactor netty,因此默认使用ReactorNetty作为Web服务器。如果使用Tomcat,请将pom添加到<dependency><groupId>org。springframework Bootspringframeworkstarter tomcat还支持Undertow和Jetty响应数据库操作。本例使用MongoDB。到目前为止,MySQL还没有活动的驱动程序,据报告,该驱动程序正在开发@ComponentpublicclassUsersCommandLineRunnerimplementsCommandLineRunner{privatefinalTradingUserRepositoryrepository;publicUsersCommand LineRunner{this.repository=repositoryOverridepublicvoidrunthrowsException{Listusers=Arrays.asList;this.repository.insert.blockLast;}}由于此方法是void类型,并且其实现被阻止,因此当存储库插入数据并返回Flux时,需要调用blockLast。也可以使用then()。块将Flux转换为Mono<Void>并等待执行完成。

本篇是对springboot 使用webflux响应式开发教程(一)的进一步学习。
分三个部分:

数据库操作
webservice
websocket
创建项目,artifactId = trading-service,groupId=io.spring.workshop。选择Reactive Web , Devtools, Thymeleaf , Reactive Mongo。
WEB容器
spring-boot-starter-webflux 附带了 spring-boot-starter-reactor-netty,所以默认使用Reactor Netty作为web server。
如果要用Tomcat,添加pom即可

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
同样支持Undertow和Jetty

响应式数据库操作

这个示例使用MongoDB。作为reactive模式,数据库的驱动与传统模式区分开。截至目前还没有mysql的reactive驱动,据悉正在研发。本例中使用内存版的mongodb,需要添加依赖

<dependency>
    <groupId>de.flapdoodle.embed</groupId>
    <artifactId>de.flapdoodle.embed.mongo</artifactId>
</dependency>
在初次运行时会自动下载mongodb模块,但是墙国是直连不到mongodb的官网,所以在需要添加代理。在这推荐使用JVM参数的方式,-DproxySet=true -Dhttps.proxyHost=127.0.0.1 -Dhttps.proxyPort=1080。需要注意的是http和https协议是区分开来配置的,如果需要http的代理就需要把Dhttps改为Dhttp。 
数据库的存储实体 TradingUser
@Document
@Data
public class TradingUser {

    @Id
    private String id;

    private String userName;

    private String fullName;

    public TradingUser() {
    }

    public TradingUser(String id, String userName, String fullName) {
        this.id = id;
        this.userName = userName;
        this.fullName = fullName;
    }

    public TradingUser(String userName, String fullName) {
        this.userName = userName;
        this.fullName = fullName;
    }



    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        TradingUser that = (TradingUser) o;

        if (!id.equals(that.id)) return false;
        return userName.equals(that.userName);
    }

    @Override
    public int hashCode() {
        int result = id.hashCode();
        result = 31 * result + userName.hashCode();
        return result;
    }
}

创建TradingUserRepository继承ReactiveMongoRepository。添加findByUserName方法返回一个实体。
在项目启动的时候我们要初始化一些数据,为此创建UsersCommandLineRunner并继承CommandLineRunner并重写run方法,在该方法里初始化数据,并插入到数据库中。

@Component
public class UsersCommandLineRunner implements CommandLineRunner {

    private final TradingUserRepository repository;

    public UsersCommandLineRunner(TradingUserRepository repository) {
        this.repository = repository;
    }

    @Override
    public void run(String... strings) throws Exception {
        List<TradingUser> users = Arrays.asList(
                new TradingUser("sdeleuze", "Sebastien Deleuze"),
                new TradingUser("snicoll", "Stephane Nicoll"),
                new TradingUser("rstoyanchev", "Rossen Stoyanchev"),
                new TradingUser("poutsma", "Arjen Poutsma"),
                new TradingUser("smaldini", "Stephane Maldini"),
                new TradingUser("simonbasle", "Simon Basle"),
                new TradingUser("violetagg", "Violeta Georgieva"),
                new TradingUser("bclozel", "Brian Clozel")
        );
        this.repository.insert(users).blockLast(Duration.ofSeconds(3));
    }
}
由于该方法是void类型,实现是阻塞的,因此在 repository 插入数据返回Flux的时候需要调用 blockLast(Duration) 
。也可以使用 then().block(Duration) 将 Flux 转化为 Mono<Void> 等待执行结束。

创建 webservice, @RestController标注 的 UserController,添加两个控制器方法
1、get请求,”/users”,返回所有TradingUser,content-type = “application/json”
2、get请求,”/users/{username}”,返回单个TradingUser,content-type = “application/json”

@RestController
public class UserController {

    private final TradingUserRepository tradingUserRepository;

    public UserController(TradingUserRepository tradingUserRepository) {
        this.tradingUserRepository = tradingUserRepository;
    }
    @GetMapping(path = "/users", produces = MediaType.APPLICATION_JSON_VALUE)
    public Flux<TradingUser> listUsers() {
        return this.tradingUserRepository.findAll();
    }

    @GetMapping(path = "/users/{username}", produces = MediaType.APPLICATION_JSON_VALUE)
    public Mono<TradingUser> showUsers(@PathVariable String username) {
        return this.tradingUserRepository.findByUserName(username);
    }
}

编写测试

@RunWith(SpringRunner.class)
@WebFluxTest(UserController.class)
public class UserControllerTests {

  @Autowired
  private WebTestClient webTestClient;

  @MockBean
  private TradingUserRepository repository;

  @Test
  public void listUsers() {
    TradingUser juergen = new TradingUser("1", "jhoeller", "Juergen Hoeller");
    TradingUser andy = new TradingUser("2", "wilkinsona", "Andy Wilkinson");

    BDDMockito.given(this.repository.findAll())
        .willReturn(Flux.just(juergen, andy));

    this.webTestClient.get().uri("/users").accept(MediaType.APPLICATION_JSON)
        .exchange()
        .expectBodyList(TradingUser.class)
        .hasSize(2)
        .contains(juergen, andy);

  }

  @Test
  public void showUser() {
    TradingUser juergen = new TradingUser("1", "jhoeller", "Juergen Hoeller");

    BDDMockito.given(this.repository.findByUserName("jhoeller"))
        .willReturn(Mono.just(juergen));

    this.webTestClient.get().uri("/users/jhoeller").accept(MediaType.APPLICATION_JSON)
        .exchange()
        .expectBody(TradingUser.class)
        .isEqualTo(juergen);
  }

}

用Thymeleaf渲染页面 
pom添加前端依赖

<dependency>
    <groupId>org.webjars</groupId>
    <artifactId>bootstrap</artifactId>
    <version>3.3.7</version>
</dependency>
<dependency>
    <groupId>org.webjars</groupId>
    <artifactId>highcharts</artifactId>
    <version>5.0.8</version>
</dependency>

创建HomeController

@Controller
public class HomeController {

    private final TradingUserRepository tradingUserRepository;

    public HomeController(TradingUserRepository tradingUserRepository) {
        this.tradingUserRepository = tradingUserRepository;
    }

    @GetMapping("/")
    public String home(Model model) {
        model.addAttribute("users", this.tradingUserRepository.findAll());
        return "index";
    }
}

创建首页

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="utf-8"/>
    <meta http-equiv="X-UA-Compatible" content="IE=edge"/>
    <meta name="viewport" content="width=device-width, initial-scale=1"/>
    <meta name="description" content="Spring WebFlux Workshop"/>
    <meta name="author" content="Violeta Georgieva and Brian Clozel"/>
    <title>Spring Trading application</title>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap-theme.min.css"/>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap.min.css"/>
</head>
<body>
<nav class="navbar navbar-default">
    <div class="container-fluid">
        <div class="navbar-header">
            <a class="navbar-brand" href="/">Spring Trading application</a>
        </div>
        <div id="navbar" class="navbar-collapse collapse">
            <ul class="nav navbar-nav">
                <li class="active"><a href="/">Home</a></li>
                <li><a href="/quotes">Quotes</a></li>
                <li><a href="/websocket">Websocket</a></li>
            </ul>
        </div>
    </div>
</nav>
<div class="container wrapper">
    <h2>Trading users</h2>
    <table class="table table-striped">
        <thead>
        <tr>
            <th>#</th>
            <th>User name</th>
            <th>Full name</th>
        </tr>
        </thead>
        <tbody>
        <tr th:each="user: ${users}">
            <th scope="row" th:text="${user.id}">42</th>
            <td th:text="${user.userName}">janedoe</td>
            <td th:text="${user.fullName}">Jane Doe</td>
        </tr>
        </tbody>
    </table>
</div>
<script type="text/javascript" src="/webjars/jquery/1.11.1/jquery.min.js"></script>
<script type="text/javascript" src="/webjars/bootstrap/3.3.7/js/bootstrap.min.js"></script>
</body>
</html>
Spring WebFlux在渲染视图之前自动解析Publisher实例,因此不需包含阻塞代码

使用WebClient 将 stream JSON 输送到浏览器

现在要用到springboot 使用webflux响应式开发教程(一)的示例,远程调用该服务。然后创建视图

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="utf-8"/>
    <meta http-equiv="X-UA-Compatible" content="IE=edge"/>
    <meta name="viewport" content="width=device-width, initial-scale=1"/>
    <meta name="description" content="Spring WebFlux Workshop"/>
    <meta name="author" content="Violeta Georgieva and Brian Clozel"/>
    <title>Spring Trading application</title>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap-theme.min.css"/>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap.min.css"/>
    <link rel="stylesheet" href="/webjars/highcharts/5.0.8/css/highcharts.css"/>
</head>
<body>
<nav class="navbar navbar-default">
    <div class="container-fluid">
        <div class="navbar-header">
            <a class="navbar-brand" href="/">Spring Trading application</a>
        </div>
        <div id="navbar" class="navbar-collapse collapse">
            <ul class="nav navbar-nav">
                <li><a href="/">Home</a></li>
                <li class="active"><a href="/quotes">Quotes</a></li>
                <li><a href="/websocket">Websocket</a></li>
            </ul>
        </div>
    </div>
</nav>
<div class="container wrapper">
    <div id="chart" style="height: 400px; min- 310px"></div>
</div>
<script type="text/javascript" src="/webjars/jquery/1.11.1/jquery.min.js"></script>
<script type="text/javascript" src="/webjars/highcharts/5.0.8/highcharts.js"></script>
<script type="text/javascript" src="/webjars/bootstrap/3.3.7/js/bootstrap.min.js"></script>
<script type="text/javascript">

    // Setting up the chart
    var chart = new Highcharts.chart('chart', {
        title: {
            text: 'My Stock Portfolio'
        },
        yAxis: {
            title: {
                text: 'Stock Price'
            }
        },
        legend: {
            layout: 'vertical',
            align: 'right',
            verticalAlign: 'middle'
        },
        xAxis: {
            type: 'datetime',
        },
        series: [{
            name: 'CTXS',
            data: []
        }, {
            name: 'MSFT',
            data: []
        }, {
            name: 'ORCL',
            data: []
        }, {
            name: 'RHT',
            data: []
        }, {
            name: 'VMW',
            data: []
        }, {
            name: 'DELL',
            data: []
        }]
    });

    // This function adds the given data point to the chart
    var appendStockData = function (quote) {
        chart.series
            .filter(function (serie) {
                return serie.name == quote.ticker
            })
            .forEach(function (serie) {
                var shift = serie.data.length > 40;
                serie.addPoint([new Date(quote.instant), quote.price], true, shift);
            });
    };

    // The browser connects to the server and receives quotes using ServerSentEvents
    // those quotes are appended to the chart as they're received
    var stockEventSource = new EventSource("/quotes/feed");
    stockEventSource.onmessage = function (e) {
        appendStockData(JSON.parse(e.data));
    };
</script>
</body>
</html>
页面会通过Server Sent Event(SSE) 向服务器请求Quotes。

创建控制器QuotesController并添加两个方法如下

@Controller
public class QuotesController {

    @GetMapping("/quotes")
    public String quotes() {
        return "quotes";
    }

    @GetMapping(path = "/quotes/feed", produces = TEXT_EVENT_STREAM_VALUE)
    @ResponseBody
    public Flux<Quote> quotesStream() {
        return WebClient.create("http://localhost:8081")
                .get()
                .uri("/quotes")
                .accept(APPLICATION_STREAM_JSON)
                .retrieve()
                .bodyToFlux(Quote.class)
                .share()
                .log("io.spring.workshop.tradingservice");
    }
}
quotesStream方法返回的content-type为”text/event-stream”,并将Flux<Quote>作为响应主体,数据已由stock-quotes提供,在这使用WebClient来请求并检索数据。 
同时应该避免为每个浏览器的请求都去向数据服务提供方发送请求,可以使用Flux.share()

接下来进入页面查看效果

创建WebSocket Handler
WebFlux 支持函数响应式WebSocket 客户端和服务端。
服务端主要分两部分:WebSocketHandlerAdapter 负责处理请求,然后委托给WebSocketService和WebSocketHandler返回响应完成会话。
spring mvc 的 reactive websocket 官方文档参考 这里.

先创建EchoWebSocketHandler 实现 WebSocketHandler接口

public class EchoWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.send(session.receive()
                .doOnNext(WebSocketMessage::retain)
                .delayElements(Duration.ofSeconds(1)).log());
    }
}

实现handle方法,接收传入的消息然后在延迟一秒后输出。 
为了将请求映射到Handler,需要创建WebSocketRouter

@Configuration
public class WebSocketRouter {

    @Bean
    public HandlerMapping handlerMapping() {

        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/websocket/echo", new EchoWebSocketHandler());

        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setOrder(10);
        mapping.setUrlMap(map);
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

然后创建WebSocketController

@Controller
public class WebSocketController {

    @GetMapping("/websocket")
    public String websocket() {
        return "websocket";
    }
}

返回视图,在页面上查看效果

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="utf-8"/>
    <meta http-equiv="X-UA-Compatible" content="IE=edge"/>
    <meta name="viewport" content="width=device-width, initial-scale=1"/>
    <meta name="description" content="Spring WebFlux Workshop"/>
    <meta name="author" content="Violeta Georgieva and Brian Clozel"/>
    <title>Spring Trading application</title>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap-theme.min.css"/>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap.min.css"/>
</head>
<body>
<nav class="navbar navbar-default">
    <div class="container-fluid">
        <div class="navbar-header">
            <a class="navbar-brand" href="/">Spring Trading application</a>
        </div>
        <div id="navbar" class="navbar-collapse collapse">
            <ul class="nav navbar-nav">
                <li><a href="/">Home</a></li>
                <li><a href="/quotes">Quotes</a></li>
                <li class="active"><a href="/websocket">Websocket</a></li>
            </ul>
        </div>
    </div>
</nav>
<div class="container wrapper">
    <h2>Websocket Echo</h2>
    <form class="form-inline">
        <div class="form-group">
            <input class="form-control" type="text" id="input" value="type something">
            <input class="btn btn-default" type="submit" id="button" value="Send"/>
        </div>
    </form>
    <div id="output"></div>
</div>
<script type="text/javascript" src="/webjars/jquery/1.11.1/jquery.min.js"></script>
<script type="text/javascript" src="/webjars/bootstrap/3.3.7/js/bootstrap.min.js"></script>
<script type="text/javascript">
    $(document).ready(function () {
        if (!("WebSocket" in window)) WebSocket = MozWebSocket;
        var socket = new WebSocket("ws://localhost:8080/websocket/echo");

        socket.onopen = function (event) {
            var newMessage = document.createElement('p');
            newMessage.textContent = "-- CONNECTED";
            document.getElementById('output').appendChild(newMessage);

            socket.onmessage = function (e) {
                var newMessage = document.createElement('p');
                newMessage.textContent = "<< SERVER: " + e.data;
                document.getElementById('output').appendChild(newMessage);
            }

            $("#button").click(function (e) {
                e.preventDefault();
                var message = $("#input").val();
                socket.send(message);
                var newMessage = document.createElement('p');
                newMessage.textContent = ">> CLIENT: " + message;
                document.getElementById('output').appendChild(newMessage);
            });
        }
    });
</script>
</body>
</html>

也可以使用WebSocketClient写测试

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class EchoWebSocketHandlerTests {

    @LocalServerPort
    private String port;

    @Test
    public void echo() throws Exception {
        int count = 4;
        Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
        ReplayProcessor<Object> output = ReplayProcessor.create(count);

        WebSocketClient client = new StandardWebSocketClient();
        client.execute(getUrl("/websocket/echo"),
                session -> session
                        .send(input.map(session::textMessage))
                        .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
                        .subscribeWith(output)
                        .then())
                .block(Duration.ofMillis(5000));

        assertEquals(input.collectList().block(Duration.ofMillis(5000)), output.collectList().block(Duration.ofMillis(5000)));
    }

    protected URI getUrl(String path) throws URISyntaxException {
        return new URI("ws://localhost:" + this.port + path);
    }
}

github源码地址

免责声明:文章转载自《springboot 使用webflux响应式开发教程(二)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇谷歌浏览器插件设置域名cookie的方法nginx补丁格式说明(CVE-2016-4450为例)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

spring boot jpa

spring boot jpa 访问数据库的方式一般来说有两种,一种以Java Entity为中心,将实体和实体关系对应到数据库的表和表关系,例如Hibernate框架(Spring Data JPA由此实现);另一种以原生SQL为中心,更加灵活便捷,例如Mybatis。这里重点介绍下Spring Data JPA技术。 spring boot jpa介绍...

shiro源码解析

一、web.xml 文件中配置的 DelegatingFilterProxy 的 <filter-name>为啥与Spring文件中配置的ShiroFilterFactoryBean的Bean id 保持一致? <filter> <filter-name>shiroFilter</filter-n...

Gradle 自定义插件

使用版本 5.6.2 插件被用来封装构建逻辑和一些通用配置。将可重复使用的构建逻辑和默认约定封装到插件里,以便于其他项目使用。 你可以使用你喜欢的语言开发插件,但是最终是要编译成字节码在 JVM 运行的。 Gradle 有两种插件,脚本插件和二进制插件。 关于插件的介绍,可以参考我的另一篇文章 Gradle 插件 这里讲的自定义插件是二进制插件,二进制插...

Swing组件集合的事件处理(六)

2.3 Swing特定的事件处理 请记住,Swing组件是构建在AWT库之上的,Swing组件库具有一些改进的功能从而使得事件处理更为简单。功能改进覆盖AWT核心事件处理特性之上,由基本的动作监听到焦点管理。 为了简化事件处理,Swing库使用Action接口扩展了原始的ActionListener接口来存储具有事件处理器的可视属性。这使得事件处理器的创...

格式化一个文件的大小

格式化一个文件的大小,虽然很简单,但却是很常用的,这里分享一个C#写的格式化文件大小的方法:   public static String FormatFileSize(Int64 fileSize){    if (fileSize < 0)    {        throw new ArgumentOutOfRangeException("fi...

自定义动软代码模版编写

转载:https://www.cnblogs.com/TivonStone/archive/2013/03/26/2982277.html 这里使用的是动软的模板. 这是动软代码生成器的一个整体界面。 下面做的示例是从右边模板管理中的选一个模板进行修改,这里我选了简单三层模板中的DAL.cmt模板 1 2 3 4 5 6 7 8 9 10 11...