Reactor 3 学习笔记(2)

摘要:
0011223344-----------------------------0123401234merge示意图:mergeSequential示意图:与mergeSequential类似的,还有一个contact方法,示意图如下:4.11、combineLatest@TestpublicvoidcombineLatestTest(){Flux.combineLatest.toStream().forEach;System.out.println;Flux.combineLatest.toStream().forEach;System.out.println;Flux.combineLatest.toStream().forEach;}该操作会将所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素。输出效果:示意图:5.2onErrorReturn即:遇到错误时,用其它指定值返回@TestpublicvoidsubscribeTest2(){Flux.just.concatWith(Flux.error(newIndexOutOfBoundsException("下标越界啦!

上篇继续学习各种方法:

4.9、reduce/reduceWith

    @Test
    public void reduceTest() {
        Flux.range(1, 10).reduce((x, y) -> x + y).subscribe(System.out::println);
        Flux.range(1, 10).reduceWith(() -> 10, (x, y) -> x + y).subscribe(System.out::println);
    }

输出:

55
65

上面的代码,reduce相当于把1到10累加求和,reduceWith则是先指定一个起始值,然后在这个起始值基础上再累加。(tips: 除了累加,还可以做阶乘) 

reduce示意图:

Reactor 3 学习笔记(2)第1张

reduceWith示意图:

Reactor 3 学习笔记(2)第2张

4.10、merge/mergeSequential/contact

    @Test
    public void mergeTest() {
        Flux.merge(Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).take(5),
                Flux.interval(Duration.of(600, ChronoUnit.MILLIS), Duration.of(500, ChronoUnit.MILLIS)).take(5))
                .toStream().forEach(System.out::println);
        System.out.println("-----------------------------");
        Flux.mergeSequential(Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).take(5),
                Flux.interval(Duration.of(600, ChronoUnit.MILLIS), Duration.of(500, ChronoUnit.MILLIS)).take(5))
                .toStream().forEach(System.out::println);
    }  

merge就是将把多个Flux"按元素实际产生的顺序"合并,而mergeSequential则是按多个Flux"被订阅的顺序"来合并,以上面的代码来说,二个Flux,从时间上看,元素是交替产生的,所以merge的输出结果,是混在一起的,而mergeSequential则是能分出Flux整体的先后顺序。

0
0
1
1
2
2
3
3
4
4
-----------------------------
0
1
2
3
4
0
1
2
3
4

merge示意图:

Reactor 3 学习笔记(2)第3张

mergeSequential示意图:

Reactor 3 学习笔记(2)第4张

与mergeSequential类似的,还有一个contact方法,示意图如下:

Reactor 3 学习笔记(2)第5张

4.11、combineLatest

    @Test
    public void combineLatestTest() {
        Flux.combineLatest(
                Arrays::toString,
                Flux.interval(Duration.of(10000, ChronoUnit.MILLIS)).take(3),
                Flux.just("A", "B"))
                .toStream().forEach(System.out::println);
        System.out.println("------------------");
        Flux.combineLatest(
                Arrays::toString,
                Flux.just(0, 1),
                Flux.just("A", "B"))
                .toStream().forEach(System.out::println);
        System.out.println("------------------");
        Flux.combineLatest(
                Arrays::toString,
                Flux.interval(Duration.of(1000, ChronoUnit.MILLIS)).take(2),
                Flux.interval(Duration.of(10000, ChronoUnit.MILLIS)).take(2))
                .toStream().forEach(System.out::println);
    }

该操作会将所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素。只要其中任何一个流中产生了新的元素,合并操作就会被执行一次。

分析一下第一段输出:

第1个Flux用了延时生成,第1个数字0,10秒后才产生,这时第2个Flux中的A,B早就生成完毕,所以此时二个Flux中最新生在的元素,就是[0,B],类似的,10秒后,第2个数字1依次产生,再执行1次合并,生成[1,B]...

输出:

[0, B]
[1, B]
[2, B]
------------------
[1, A]
[1, B]
------------------
[1, 0]
[1, 1]

示意图如下:

Reactor 3 学习笔记(2)第6张

4.12、first

    @Test
    public void firstTest() {
        Flux.first(Flux.fromArray(new String[]{"A", "B"}),
                Flux.just(1, 2, 3))
                .subscribe(System.out::println);
    }

这个很简单理解,多个Flux,只取第1个Flux的元素。输出如下:

A
B

示意图:

Reactor 3 学习笔记(2)第7张 

4.13、 map

    @Test
    public void mapTest() {
        Flux.just('A', 'B', 'C').map(a -> (int) (a)).subscribe(System.out::println);
    }

map相当于把一种类型的元素序列,转换成另一种类型,输出如下:

65
66
67

示意图:

Reactor 3 学习笔记(2)第8张 

五、消息处理

写代码时,难免会遇到各种异常或错误,所谓消息处理,就是指如何处理这些异常。

5.1 订阅错误消息

    @Test
    public void subscribeTest1() {
        Flux.just("A", "B", "C")
                .concatWith(Flux.error(new IndexOutOfBoundsException("下标越界啦!")))
                .subscribe(System.out::println, System.err::println);
    }

注意:这里subscribe第2个参数,指定了System.err::println ,即错误消息,输出到异常控制台上。

输出效果:

Reactor 3 学习笔记(2)第9张

示意图:

Reactor 3 学习笔记(2)第10张

5.2onErrorReturn

即:遇到错误时,用其它指定值返回

    @Test
    public void subscribeTest2() {
        Flux.just("A", "B", "C")
                .concatWith(Flux.error(new IndexOutOfBoundsException("下标越界啦!")))
                .onErrorReturn("X")
                .subscribe(System.out::println, System.err::println);
    }

输出:

A
B
C
X

示意图:

Reactor 3 学习笔记(2)第11张

5.3 onErrorResume

跟onErrorReturn有点接近,但更灵活,可以根据异常的类型,有选择性的处理返回值。

    @Test
    public void subscribeTest3() {
        Flux.just("A", "B", "C")
                .concatWith(Flux.error(new IndexOutOfBoundsException("下标越界啦!")))
                .onErrorResume(e -> {
                    if (e instanceof IndexOutOfBoundsException) {
                        return Flux.just("X", "Y", "Z");
                    } else {
                        return Mono.empty();
                    }
                })
                .subscribe(System.out::println, System.err::println);
    }

输出:

A
B
C
X
Y
Z

示意图:

Reactor 3 学习笔记(2)第12张 

5.4 retry

即:遇到异常,就重试。

    @Test
    public void subscribeTest4() {
        Flux.just("A", "B", "C")
                .concatWith(Flux.error(new IndexOutOfBoundsException("下标越界啦!")))
                .retry(1)
                .subscribe(System.out::println, System.err::println);
    }

输出:

Reactor 3 学习笔记(2)第13张

示意图:

Reactor 3 学习笔记(2)第14张

六、(线程)调度器

reactor中到处充满了异步调用,内部必然有一堆线程调度,Schedulers提供了如下几种调用策略:

6.1 Schedulers.immediate() - 使用当前线程
6.2 Schedulers.elastic() - 使用线程池
6.3 Schedulers.newElastic("test1") - 使用(新)线程池(可以指定名称,更方便调试)
6.4 Schedulers.single() - 单个线程
6.5 Schedulers.newSingle("test2") -(新)单个线程(可以指定名称,更方便调试)
6.6 Schedulers.parallel() - 使用并行处理的线程池(取决于CPU核数)
6.7 Schedulers.newParallel("test3")- 使用并行处理的线程池(取决于CPU核数,可以指定名称,方便调试)
6.8 Schedulers.fromExecutorService(Executors.newScheduledThreadPool(5)) - 使用Executor(这个最灵活)

示例代码:

    @Test
    public void schedulesTest() {
        Flux.fromArray(new String[]{"A", "B", "C", "D"})
                .publishOn(Schedulers.newSingle("TEST-SINGLE", true))
                .map(x -> String.format("[%s]: %s", Thread.currentThread().getName(), x))
                .toStream()
                .forEach(System.out::println);
    }

输出: 

[TEST-SINGLE-1]: A
[TEST-SINGLE-1]: B
[TEST-SINGLE-1]: C
[TEST-SINGLE-1]: D

七、测试&调试

异步处理,通常是比较难测试的,reactor提供了StepVerifier工具来进行测试。

7.1 常规单元测试

    @Test
    public void stepTest() {
        StepVerifier.create(Flux.just(1, 2)
                .concatWith(Mono.error(new IndexOutOfBoundsException("test")))
                .onErrorReturn(3))
                .expectNext(1)
                .expectNext(2)
                .expectNext(3)
                .verifyComplete();
    }

上面的示例,Flux先生成1,2这两个元素,然后抛了个错误,但马上用onErrorReturn处理了异常,所以最终应该是期待1,2,3,complete这样的序列。 

7.2 模拟时间流逝

Flux.interval这类延时操作,如果延时较大,比如几个小时之类的,要真实模拟的话,效率很低,StepVerifier提供了withVirtualTime方法,来模拟加快时间的流逝(是不是很体贴^_^)

    @Test
    public void stepTest2() {
        StepVerifier.withVirtualTime(() -> Flux.interval(Duration.of(10, ChronoUnit.MINUTES),
                Duration.of(5, ChronoUnit.SECONDS))
                .take(2))
                .expectSubscription()
                .expectNoEvent(Duration.of(10, ChronoUnit.MINUTES))
                .thenAwait(Duration.of(5, ChronoUnit.SECONDS))
                .expectNext(0L)
                .thenAwait(Duration.of(5, ChronoUnit.SECONDS))
                .expectNext(1L)
                .verifyComplete();
    }

上面这个Flux先停10分钟,然后每隔5秒生成一个数字,然后取前2个数字。代码先调用

expectSubscription 期待流被订阅,然后
expectNoEvent(Duration.of(10, ChronoUnit.MINUTES)) 期望10分钟内,无任何事件(即:验证Flux先暂停10分钟),然后
thenAwait(Duration.of(5, ChronoUnit.SECONDS)) 等5秒钟,这时已经生成了数字0
expectNext(0L) 期待0L
... 后面的大家自行理解吧。

7.3 记录日志 

    @Test
    public void publisherTest() {
        Flux.just(1, 0)
                .map(c -> 1 / c)
                .log("MY-TEST")
                .subscribe(System.out::println);
    }

输出:

Reactor 3 学习笔记(2)第15张

示意图:

Reactor 3 学习笔记(2)第16张

7.4 checkpoint检查点

可以在一些怀疑的地方,加上checkpoint检查,参考下面的代码:

    @Test
    public void publisherTest() {
        Flux.just(1, 0)
                .map(c -> 1 / c)
                .checkpoint("AAA")
                .subscribe(System.out::println);
    }

输出:

点击查看原图

免责声明:文章转载自《Reactor 3 学习笔记(2)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇oracle创建数据库和用户、表空间,管理表空间和数据文件简介无锡美新赵阳:创业18年,一辈子做好一家企业(创业是一种生活方式;为了赚钱而创业,那是扯淡”。最重要的是做自己喜欢做的事情)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Postgresql在线备份和恢复

1.实验环境 OS: RedHat Linux Enterprisedb 6.3 DB: postgresql 9.3 PGHOME: /opt/PostgreSQL/9.3 PGDATA: /opt/PostgreSQL/9.3/data 归档目录:/opt/pg_archive 基础备份目录:/opt/base_archive --生产环境中归档和数...

CPU亲和度

CPU亲和度(CPU Affinity),就是将一个进程或者线程强制绑定在CPU的某一个core上运行。 参考:https://www.cnblogs.com/zhangxuan/p/6427533.html https://www.cnblogs.com/LubinLew/p/cpu_affinity.html demo是将ljj_test进程强制绑定在...

Spring Boot 2.4 配置文件将加载机制大变化

Spring Boot 2.4.0.M2 刚刚发布,它对 application.properties 和 application.yml 文件的加载方式进行重构。如果应用程序仅使用单个 application.properties 或 application.yml 作为配置文件,那么可能感受不到任何区别。但是如果您的应用程序使用更复杂的配置(例如,Sp...

python2.7.12操作Hbase

前置条件:您已经安装好Hbase、python2.7题外话:最好自己安装个虚拟环境,以下操作都是在虚拟环境中的(ma) hadoop@master:/usr/local/pycharm/bin$ sudo pip install thrift[sudo] password for hadoop: The directory '/home/hadoop/.c...

2小时入门Robot Framework

1、介绍 1.1、介绍Robot         Robot Framework是一个基于关键字驱动的自动化测试框架。通过该框架,测试人员可使用python封装关键字,并在非代码环境下使用关键字构建可被执行的测试用例         Robot Framework官方网站:http://robotframework.org/ 1.2、安装 1.2.1...

go语言 robfig/cron包 实现定时 调用

package main import ( "github.com/robfig/cron" "time" "fmt" "os" log "github.com/cihub/seelog" ) var ( ttt int ) const ( logFilePath = "hard/log/test/t...