响应式编程系列(一):什么是响应式编程?reactor入门

摘要:
什么是反应式编程?(5) Springactive:使用SpringWebFlux(VI)Springactive:利用webClient简介Spring框架5的一个新特性:响应式编程。那么什么是响应性的呢?基本模型我们需要首先了解响应式编程的基本模型:Flux Reactors中的发布者由两个类定义,即Flux和Mono,它们提供了丰富的运算符。

响应式编程 系列文章目录

(一)什么是响应式编程?reactor入门

(二)Flux入门学习:流的概念,特性和基本操作

(三)Flux深入学习:流的高级特性和进阶用法

(四)reactor-core响应式api如何测试和调试?

(五)Spring reactive: Spring WebFlux的使用

(六)Spring reactive: webClient的使用

引言

  Spring framework 5 的一大新特性:响应式编程(Reactive Programming)。那么什么是响应式?他能给我们带来什么?如何优雅地使用?本系列会从最基础的概念和简单的api讲起,再慢慢深入探讨响应式的一些高级特性,最后讲解实战内容,例如WebFlux和WebClient等在Spring boot中的使用,如何测试和调试。

  想要了解原理的话,美团点评的这篇博客 Java NIO浅析 非常适合入门。

简单地说:

  当我们调用socket.read()、socket.write()这类阻塞函数的时候,这类函数不能立即返回,也无法中断,需要等待socket可读或者可写,才会返回,因此一个线程只能处理一个请求。在这等待的过程中,cpu并不干活,(即阻塞住了),那么cpu的资源就没有很好地利用起来。因此对于这种情况,我们使用多线程来提高cpu资源的利用率:在等待的这段时间,就可以切换到别的线程去处理事件,直到socket可读或可写了,通过中断信号通知cpu,再切换回来继续处理数据。例如线程A正在等待socket可读,而线程B已经就绪了,那么就可以先切换到线程B去处理。虽然上下文切换也会花一些时间,但是远比阻塞在线程A这里空等要好。当然计算机内部实际的情况比这复杂得多。

  而NIO的读写函数可以立刻返回,这就给了我们不开线程利用CPU的最好机会:如果一个连接不能读写(socket.read()返回0或者socket.write()返回0),我们可以把这件事记下来。因此只需要一个线程不断地轮询这些事件,一旦有就绪的时间,处理即可。不需要多线程。

阻塞型IO

  • 需要多线程,即需要很大的线程池。
  • 每个请求都要有一个单独的线程去处理。

非阻塞型IO

  • 只需要数量非常少的线程。
  • 固定的几个工作线程去处理事件。

使用NIO我们能得到什么?

  • 事件驱动模型
  • 避免多线程
  • 单线程处理多任务
  • 非阻塞I/O,I/O读写不再阻塞,而是返回0
  • 基于block的传输,通常比基于流的传输更高效
  • 更高级的IO函数,zero-copy
  • IO多路复用大大提高了Java网络应用的可伸缩性和实用性

响应式编程入门

  响应式编程就是基于reactor的思想,当你做一个带有一定延迟的才能够返回的io操作时,不会阻塞,而是立刻返回一个流,并且订阅这个流,当这个流上产生了返回数据,可以立刻得到通知并调用回调函数处理数据。

基本模型

我们首先需要理解响应式编程的基本模型:

响应式编程系列(一):什么是响应式编程?reactor入门第1张

Flux

  Reactor中的发布者(Publisher)由FluxMono两个类定义,它们都提供了丰富的操作符(operator)。一个Flux对象代表一个包含0..N个元素的响应式序列,元素可以是普通对象、数据库查询的结果、http响应体,甚至是异常。而一个Mono对象代表一个包含零/一个(0..1)元素的结果。上图就是一个Flux类型的数据流,Flux往流上发送了3个元素,Subscriber通过订阅这个流来接收通知。

如何创建一个流?最简单的方式有以下几种:

复制代码
//创建一个流,并直接往流上发布一个值为value数据
Flux.just(value);

//通过list创建一个流,往流上依次发布list中的数据
Flux.fromIterable(list);

//创建一个流,并向流上从i开始连续发布n个数据,数据类型为Integer
Flux.range(i, n);

//创建一个流,并定时向流上发布一个数据,数据从0开始递增,数据类型为Long
Flux.interval(Duration.ofSeconds(n));
复制代码

既然是“数据流”的发布者,Flux和Mono都可以发出三种“数据信号”:元素值、错误信号、完成信号,错误信号和完成信号都是终止信号,完成信号用于告知下游订阅者该数据流正常结束,错误信号终止数据流的同时将错误传递给下游订阅者。

Subscriber 

subscriber是一个订阅者,他只有非常简单的4个接口:

复制代码
public interface Subscriber<T> {
    void onSubscribe(Subscription var1);

    //收到下一个元素值信号时的行为
    void onNext(T var1);

    //收到错误信号时的行为
    void onError(Throwable var1);

    //收到终止信号时的行为
    void onComplete();
}
复制代码

Subscriber必须要订阅一个Flux才能够接收通知:

flux.subscribe(
    value -> handleData(value),
    error -> handleError(error),
    () -> handleComplete()
);

上面这个例子通过lambda表达式,定义了Subscriber分别在收到消息,收到错误,和消息流结束时的行为,当Subscriber接收到一个新数据,就会异步地执行handleData方法处理数据。

简单例子:

接下来我们创建几个最简单的流来试一下:

首先我们新建一个maven项目,引入reactor的类库:

复制代码
<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>3.2.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <version>3.2.3.RELEASE</version>
        <scope>test</scope>
    </dependency>
</dependencies>
复制代码

编写代码如下:

复制代码
public class ReactorTests {

    @After
    public void after() {
        sleep(30_000);
    }

    @Test
    public void testJust() {
        Flux.just("hello", "world")
            .subscribe(System.out::println);
    }

    @Test
    public void testList() {
        List<String> words = Arrays.asList(
            "hello",
            "reactive",
            "world"
        );

        Flux.fromIterable(words)
            .subscribe(System.out::println);
    }

    @Test
    public void testRange() {
        Flux.range(1, 10)
            .subscribe(System.out::println);
    }

    @Test
    public void testInterval() {
        Flux.interval(Duration.ofSeconds(1))
            .subscribe(System.out::println);
    }
}
复制代码

订阅这些流,收到数据之后只是简单地把它打印出来,运行这些Test,就能够看到订阅者在接收到流上的数据时,异步地去处理这些数据。

转载:https://www.cnblogs.com/yuanrw/p/10050509.html

免责声明:文章转载自《响应式编程系列(一):什么是响应式编程?reactor入门》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇BootStrap同时显示多个Modal解决方案Shell终端收听音乐--豆瓣FM命令行版下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

pthread到Win32thread

一.什么是线程。       线程(thread)是为了提高系统内程序的并发(concurrency)执行程度而提出来的概念,它是比进程更小的能够独立运行的基本单位。在引入线程的系统中,线程是处理器调度(schedule)的基本单位,而传统的进程则只是资源分配的基本单位。同一进程中的线程共享这个进程的全部资源与地址空间,除此之外,线程基本上不拥有其他任何系...

Qt 实现简单的TCP通信

这段时间用到了QT的TCP通信,做了初步的学习与尝试,编写了一个客户端和服务器基于窗口通信的小例程。 使用QT的网络套接字需要.pro文件中加入一句: QT += network 一、客户端 1、客户端的代码比服务器稍简单,总的来说,使用QT中的QTcpSocket类与服务器进行通信只需要以下5步: (1)创建QTcpSocket套接字对象 socket...

jvm锁的四种状态 无锁状态 偏向锁状态 轻量级锁状态 重量级锁状态

一:java多线程互斥,和java多线程引入偏向锁和轻量级锁的原因? --->synchronized是在jvm层面实现同步的一种机制。    jvm规范中可以看到synchronized在jvm里实现原理,jvm基于进入和退出Monitor对象来实现方法同步和代码块同的。在代码同步的开始位置织入monitorenter,在结束同步的位置(正常结束和...

JUC 并发编程--04 常用的辅助类CountDownLatch , CyclicBarrier , Semaphore , 读写锁 , 阻塞队列,CompletableFuture(异步回调)

CountDownLatch 相当于一个减法计数器, 构造方法指定一个数字,比如6, 一个线程执行一次,这个数字减1, 当变为0 的时候, await()方法,才开始往下执行,, 看这个例子 CyclicBarrier 的用法, 字面意思:循环栅栏, 这是构造方法, 第一个参数parties 是线程数量, 第二个参数是barrierAction:...

Socket异步通信——使用SocketAsyncEventArgs

  上一次的博文说错了东西,幸好有园友指出。才把错误改正过来,顺便也把利用SocketAsyncEventArgs进行Socket异步通信这方面的知识整理一下。       之前看了网上的代码,每进行一次异步操作都new 一个SocketAsyncEventArgs对象,然后网友评论太浪费资源了,于是就误以为用BeginXXX进行Socket异步通信会更优...

超全!iOS 面试题汇总

作者:Job_Yang 之前看了很多面试题,感觉要不是不够就是过于冗余,于是我将网上的一些面试题进行了删减和重排,现在分享给大家。(题目来源于网络,侵删) 1. Object-c的类可以多重继承么?可以实现多个接口么?Category是什么?重写一个类的方式用继承好还是分类好?为什么? 答: Object-c的类不可以多重继承;可以实现多个接口,通过实现多...