Android 异步框架 RxJava2

摘要:
观察者模式RxJava的概念是Android的异步框架。官方介绍的是可观察序列,它构成了基于异步事件的程序库。它的特点是观察者模式和基于事件流的链式调用。随着异步操作调度过程的复杂性,程序逻辑变得越来越复杂,但RxJava仍然可以保持简单。

观察者模式的概念

RxJava是android的异步框架,官方介绍是可观测的序列,组成异步基于事件程序的库。特点是观察者模式,基于事件流的链式调用,随着异步操作调度过程复杂的情况下,程序逻辑也变得越来越复杂,但RxJava依然能够保持简洁。

简单的说观察者A与被观察者B建立订阅关系,当被观察者B发生某种改变时,立即通知观察者A

添加依赖

compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

基本模式

Observable被观察者

注意各地方添加泛型避免大片警告,onNext()是事件的回调,onComplete()是事件的结尾。onComplete()与onError互斥需要保持唯一性,并只能调用一次。

Observable<String> observable= Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("消息1");
                e.onNext("消息2");
                e.onNext("消息3");
                e.onComplete();
    }
});

Observer观察者

创建观察者时回调的onSubscribe可以获取Disposable对象,在合适的时候判断条件,调用dispose()即可接触订阅关系

Observer<String> observer=new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        //通过判断解除订阅关系
         d.dispose();
    }

    @Override
    public void onNext(String o) {
        //对应observable的onNext方法
    }

    @Override
    public void onError(Throwable e) {
        //对应observable的onError方法
    }

    @Override
    public void onComplete() {
        //对应observable的onComplete方法
    }
};

建立订阅关系

observable.subscribeOn(Schedulers.io()) //指定事件生产在子线程
          .observeOn(AndroidSchedulers.mainThread()) //指定事件消费在UI线程
          .subscribe(observer);

Observable被观察者的其他模式

//just模式,将自动发送onNext()事件
Observable<String> observable = Observable.just("发送消息");

//fromIterable模式,遍历集合,并自动发送onNext()事件
Observable<String> observable = Observable.fromIterable((Iterable<String>) mList);

//interval模式,定时自动发送整数序列,从0开始每隔2秒计数,
Observable<Long> observable = Observable.interval(0,2, TimeUnit.SECONDS)

//range模式,自动发送特定的整数序列,0表示不发送,负数会抛异常,从1开始发送到20
Observable<Integer> observable = Observable.range(1,20);

//timer模式,定时执行观察者的onNext()方法
Observable<Integer> observable = Observable.timer(2, TimeUnit.SECONDS);

Observable被观察者的更多创建方式以及操作符

如创建操作,数据过滤操作,条件操作,转载以下博客,很详细:

RxJava操作符大全

Scheduler调度器

四种常见模式

Schedulers.immediate() 默认模式,在当前线程运行

Schedulers.newThread() 创建新的子线程运行

Schedulers.io() 创建新的子线程运行,内部使用的是无上限的线程池,可重用空闲的线程,效率高

 AndroidSchedulers.mainThread() 在UI主线程运行

订阅事件时的生产与消费线程

subscribeOn() 指定Observable(被观察者)所在的线程,或者叫做事件产生的线程

observeOn() 指定 Observer(观察者)所运行在的线程,或者叫做事件消费的线程

新的观察者模式

Flowable被观察者

Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
    @Override
    public void subscribe(FlowableEmitter<String> e) throws Exception {
                e.onNext("hello RxJava!");
                e.onComplete();
    }
},BackpressureStrategy.BUFFER);//增加背压模式

Subscriber观察者

onSubscribe()会返回Subscription对象,调用cancel()即可取消订阅关系,request()即可指定消费事件的数量 

Subscriber<String> subscriber=new Subscriber<String>() {
    @Override
    public void onSubscribe(Subscription s) {
         s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(String s) {
        Log.i("RxJava", "onNext: "+s);
    }

    @Override
    public void onError(Throwable t) {
        Log.i("RxJava", "onError");
    }

    @Override
    public void onComplete() {
        Log.i("RxJava", "onComplete");
    }
};
flowable.subscribe(subscriber);//建立订阅关系

Backpressure背压模式

如果生产者和消费者不在同一线程的情况下,如果生产者的速度大于消费者的速度,就会产生Backpressure问题。即异步情况下,Backpressure问题才会存在。

BUFFER

所谓BUFFER就是把RxJava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。
这样,消费者通过request()即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。
但是这种方式任然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM。

DROP

当消费者处理不了事件,就丢弃。
消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉

LATEST

LATEST与DROP功能基本一致,唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件

ERROR

这种方式会在产生Backpressure问题的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException

免责声明:文章转载自《Android 异步框架 RxJava2》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇tomcat及Jetty远程调试debugPHP 命名空间下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

java中String编码转换 UTF-8转GBK

1.GB2312等都可以用GBK代替.2.new String(row.getBytes("GB2312"), "UTF8") 这种写法是不对的, 中文仍然会乱码. 方案:解决GBK字符转UTF-8乱码问题: https://www.cnblogs.com/xijin-wu/p/5884822.html 彻底搞懂编码 GBK 和 UTF8:https:/...

Objective-C学习--字符串

C语言将字符串作为简单地字符数组处理,并且在数组最后添加尾部零字符作为结束标志。而Cocoa中的NSString则有很多内置方法,他们让字符串的处理变得简单很多 1. 创建字符串    NSString的stringWithFormat:方法 +(id) stringWithFormat:(NSString *) format, ...;//省略号表示这个...

VB.NET中LINQ TO List泛型查询语句(分组,聚合函数)

Public Class LinqToList 'LINQ在C#中使用比较方便,但是在VB中使用比较麻烦,复杂,和C#用法并不太一样 Dim listNew As List(Of Product) = New List(Of Product) '新商品 Dim listOld As List(Of Product) = New L...

Android的string-array数据源简单使用

在Android中,用string-array是一种简单的提取XML资源文件数据的方法。 例子如下: 把相应的数据放到values文件夹的arrays.xml文件里 <?xml version="1.0" encoding="utf-8"?> <resources> <string-array name="city"&...

关于对JMM(java内存模型)的个人理解

java内存模型是一种虚拟机规范,它定义了Java内存模型,用于屏蔽各种不同硬件和操作系统访问内存差异,以实现让java程序在各种平台下都能达到一致的并发效果.JMM规范了java虚拟机与计算机内存是如何协同工作的,规定了一个线程如何和何时可以看到由其他线程修改过后的共享变量的值,以及在必须时如何同步的访问共享变量. 关于主内存与工作内存之间的具体交互协...

C#通过SFTP协议操作文件

本文主要是C#调用SSH实现文件上传下载功能,主要是要引用第三方类库Tamir.SharpSSH.dll。 以下是SFTPHelper类,实现了对文件的操作,可供参考。 public classSFTPHelper { privateSession m_session; privateChannel m_chann...