项目小版本上线,抽空简单学习了下久仰大名的RxJava
- 强大灵活的事件流处理(多线程/多事件/复合对象)
- 强大灵活优雅简洁的异步
- 链式调用
- 可自动Lambda化
类比 | 类比 | 实际 | 实际 | 职责 |
演讲者 | Button | (可)被订阅者 (同右) | (可)被观察者 Observable | 决定什么时候触发事件以及触发怎样的事件 |
听众 | OnClickListener | 订阅者 Subscriber | 观察者 Observer | 决定事件触发的时候将有怎样的行为 |
买票 | setOnClickListener() | 订阅 subscribe | 注册 | |
命令 | onClick() | 事件 | 事件 |
- 很多onNext()
- onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的onNext() 发出时,需要触发 onCompleted() 方法作为标志。
- onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
- 创建演讲者
- 创建听众
- 买票
- 不是通过new的方法,而是一个内部静态工厂方法create( )来创建,这个方法需要传入一个 OnSubscribe 对象作为参数
- 不过换个角度,把它当成一个需要传入一个参数的构造方法就好了(虽然内部是有点其他货的)
- 方法具体是:Observable<T> create ( OnSubscribe<T> f )
- 这个 OnSubscribe 类本身是Observable的内部类
- 这个对象在create( )时传入后会存储在返回的 Observable 对象中
- 当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用
- 这个call方法会遍历传入所有的听众o,这些听众都实现了听众的那三个约定方法,在这里就可以执行自己需要的业务代码,并在需要的时候回调听众的那三个约定方法
- 换个角度说,OnSubscribe 的作用相当于一个计划表或者说事件发生器
- 这个泛型T是输出类型,也就是会传给听众的类型
简写: Observable observable = Observable.create(new OnSubscribe() { @Override public void call(Object o) { } }); |
具体的实际的写法:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Hi"); subscriber.onCompleted(); } }); |
- 这个方法有10个重载,分别可以传入1个到10个参数(orz)
- 这是一个简便方法,会将传入的参数依次发送出来
- 可以写成基本的 create( ) 形式
Observable observable = Observable.just("Hello", "Hi"); |
等于上面的 create( ): Observable observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Hi"); subscriber.onCompleted(); } }); |
- 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来
- 可以写成基本的 create( ) 形式
String[] words = {"Hello", "Hi"}; Observable observable = Observable.from(words); |
- 注意不是java中util包中的接口
Observer<String> observer = new Observer<String>() { @Override public void onNext(String s) {} @Override public void onCompleted() {} @Override public void onError(Throwable e) {} }; |
- 它实现了 Observer 接口,并进行了一些扩展
- 使用方法跟Observer一样,而且必须实现的方法就是 Observer 接口中的方法,在订阅(subscribe)的时候,Observer 也总是会先被转换成一个 Subscriber 再使用,所以基本建议可以用Observer的地方都用Subscriber吧
- 这个抽象类扩展了两个可选的方法:
- onStart( )
- 它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置
- 需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法
- unsubscribe( )
- 这个方法用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件
- 一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态
- unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用unsubscribe() 来解除引用关系,以避免内存泄露的发生
- onStart( )
- 这个泛型T是输入类型,也就是听众能接收的类型
Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String s) {} @Override public void onCompleted() {} @Override public void onError(Throwable e) {} }; |
- ActionX 接口有很多,Action0 和 Action1 最常用,还有Action2, Action3,X代表传入的参数的数目
- 他们其实是对具有不同传入参数、但是无返回值的方法的包装接口
- subscribe()会根据这些ActionX对象生成正常的Subscriber
- Action0
- Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的
- 由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调
- 这样其实也可以看做将 onCompleted() 方法作为参数传进了subscribe(),相当于其他某些语言中的『闭包』
- Action1
- Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;
- 与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调
- new 出 action 对象
- 组合塞进 subsribe 方法
通过Action0、Action1构造三种方法的包装对象 // onNext() Action1<String> action1 = new Action1<String>() { @Override public void call(String s) {} }; // onError() Action1<Throwable> action2 = new Action1<Throwable>() { @Override public void call(Throwable throwable) {} }; // onCompleted() Action0 action3 = new Action0() { @Override public void call() {} }; |
// 自动创建 Subscriber ,并使用 action1 来定义 onNext() observable.subscribe(action1); // 自动创建 Subscriber ,并使用 action1 和 action2 来定义 onNext() 和 onError() observable.subscribe(action1, action2); // 自动创建 Subscriber ,并使用 action1、 action2 和 action3 来定义 onNext()、 onError() 和 onCompleted() observable.subscribe(action1, action2, action3); |
Observable.just("1","2","3") .subscribe(new Action1<String>() { @Override public void call(String name) { Log.d(tag, name); } }); |
observable.subscribe(subscriber); |
- 有人可能会注意到, subscribe() 这个方法有点怪:它看起来是『observalbe 订阅了 subscriber』而不是『subscriber 订阅了 observalbe』,这看起来就像『杂志订阅了读者』一样颠倒了对象关系。这让人读起来有点别扭
- 因为如果把 API 设计成 subscriber.subscribe(observable) ,虽然更加符合思维逻辑,但对流式 API 的设计就造成影响了,因为对于流事件,是有一个生产者和一系列消费者的,所以生产者放前边,后边跟一串消费者才是更流的形式
- 这种形式其实跟普通的java观察者模式很像,演讲者.add(听众)
1.基本的rxjava: compile 'io.reactivex:rxjava:1.0.14' 2.带android特性的rxjava: compile 'io.reactivex:rxandroid:1.0.1' 3.支持rxjava的网络加载库retrofit compile 'com.squareup.retrofit:retrofit:1.9.0' |
- 三步走
- 最简单的just方法
- 一个普通的subscriber
Observable.just("1","2","3").subscribe(new Subscriber<String>() { @Override public void onNext(String s) { Log.d(tag, s); } @Override public void onCompleted() {} @Override public void onError(Throwable e) {} }); |
- 需求:
- 实现:
- 还是基本的三步走,稍微注意下格式(流式结构)
final Drawable drawable = getActivity().getResources().getDrawable(R.drawable.1); final ImageView imageView = new ImageView(getActivity()); Observable.create(new Observable.OnSubscribe<Drawable>() { @Override public void call(Subscriber<? super Drawable> subscriber) { subscriber.onNext(drawable); subscriber.onCompleted(); } }).subscribe(new Subscriber<Drawable>() { @Override public void onNext(Drawable drawable) { imageView.setImageDrawable(drawable); } @Override public void onCompleted() {} @Override public void onError(Throwable e) { Log.d(tag, "error!"); } }); |
- 在不指定线程的情况下, RxJava 遵循的是线程不变的原则
- 即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。
- 如果需要切换线程,就需要用到 Scheduler (调度器)
- Schedulers.immediate()
- 直接在当前线程运行,相当于不指定线程
- 这是默认的 Scheduler
- Schedulers.newThread()
- 总是启用新线程,并在新线程执行操作
- Schedulers.io( )
- I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler
- 行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率
- 不要把计算工作放在 io() 中,可以避免创建不必要的线程
- Schedulers.computation( )
- 计算所使用的 Scheduler,这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算
- 这个 Scheduler 使用的固定的线程池,大小为 CPU 核数
- 不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU
- Android 还有一个专用的 AndroidSchedulers.mainThread()
- 它指定的操作将在 Android 主线程运行
- subscribeOn( )
- 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程
- 而这其实就是事件产生的线程,也就是Observable活动的线程
- (看到这不免有点乱,Observable活动在subscribeOn指定的线程,那这里就只能用[Observable.OnSubscribe 被激活]这件事来记了)
- observeOn( )
- 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程
- (同样正好相反)
Observable.just(1, 2, 3, 4) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Action1<Integer>() { @Override public void call(Integer number) { Log.d(tag, "number:" + number); } }); |
|
- observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 不一定是最终subscribe() 时的Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber
- 换句话说,observeOn() 指定的是它之后的操作所在的线程
- 因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可
- 如下,通过 observeOn() 的多次调用,程序实现了线程的多次切换
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定 .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .map(mapOperator1) // 新线程,由 observeOn() 指定 .observeOn(Schedulers.io()) .map(mapOperator2) // IO 线程,由 observeOn() 指定 .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber); // Android 主线程,由 observeOn() 指定 |
- 在前面讲 Subscriber 的时候,提到过 Subscriber 的 onStart() 可以用作流程开始前的初始化
- 然而 onStart() 由于在subscribe() 发生时就被调用了,也就是没有被observeOn() 指定线程,因此是执行在 subscribe() 被调用时的线程也就是原线程
- 这就导致如果 onStart() 中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测subscribe() 将会在什么线程执行
- Observable(不是Subscriber的)有一个方法doOnSubscribe()
- 它和 Subscriber.onStart() 同样是在subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程
- 默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程
- 而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程
Observable.create(onSubscribe) .subscribeOn(Schedulers.io()) .doOnSubscribe(new Action0() { @Override public void call() { progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行 } }) .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程 .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber); |
- RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的最大原因
- 所谓变换,就是将事件序列中的对象或整个事件队列进行加工处理,转换成不同的事件或事件序列
- 变换算法是一系列方法,比如:map()、flatMap(),他们决定变换的方式,是随方法固定的
- 变换枢纽是一种类似管道枢纽的容器,里边由你写具体的变换细节;输入和输出就像管道,如何分流就像管道的布线
- 往一个变换算法里传入一个变换枢纽就组成了一个变换节点
- 变换枢纽是FuncX系列接口的对象
- FuncX系列接口跟前面讲的ActionX系列接口非常像
- 他们是位于 rx.functions 包下的全部两个系列接口
- ActionX系列是可以传入1个或多个参数,但是无返回值的call方法的包装
- FuncX系列是可以传入1个或多个参数,但是有一个返回值的call方法的包装
- 正是两个方法的唯一区别(是否有返回值)决定了在rx链中两者不同的角色
- ActionX系列由于没有返回值,所以只能作为链的终点,也就是为观众服务,可以被组合构成观众
- FuncX系列由于有返回值,但他又不能作为链的起点,所以就自然成了我们这里要说的新角色:链的中继者,或者说变换器
- FuncX系列的泛型位置是这样:前面的所有参数是输入参数,最后一个参数是输出参数
- map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回
- 在经过 map() 方法后,事件的参数类型也由 String转为了 Bitmap
Observable.just("images/logo.png") // 输入类型 String .map(new Func1<String, Bitmap>() { @Override public Bitmap call(String filePath) { // 参数类型 String return getBitmapFromPath(filePath); // 返回类型 Bitmap } }) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { // 参数类型 Bitmap showBitmap(bitmap); } }); |
Student[] students = new Student[10]; Observable.from(students) .map(new Func1<Student, String>() { @Override public String call(Student student) { return student.getName(); } }) .subscribe(new Subscriber<String>() { @Override public void onNext(String name) { Log.d(tag, name); } ... }); |
Student[] students = new Student[10]; Observable.from(students) .subscribe(new Subscriber<Student>() { @Override public void onNext(Student student) { List<Course> courses = student.getCourses(); for (int i = 0; i < courses.size(); i++) { Course course = courses.get(i); Log.d(tag, course.getName()); } } ... }); |
- 上面实现倒是实现了,但是这个for循环的存在不免显得有点不太优雅
- 如果不想在 Subscriber 中使用 for 循环,而是希望在 Subscriber 中直接接收单个的 Course 对象呢(这对于代码复用很重要)?
- 用 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化。那怎么才能把一个 Student 转化成多个 Course 呢?
- 这个时候,就需要用 flatMap( ) 了
Student[] students = new Student[10]; Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call(Student student) { return Observable.from(student.getCourses()); } }) .subscribe(new Subscriber<Course>() { @Override public void onNext(Course course) { Log.d(tag, course.getName()); } ... }); |
- flatMap() 和 map() 有一个相同点:也是把传入的参数转化之后返回另一个对象
- 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中,而是
- 先使用传入的事件对象创建一个 Observable 对象
- 但是并不发送这个 Observable, 而是将它激活(subscribe),于是它开始发送事件
- 每一个创建出来的子Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法
- 上面这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去
- 这个『铺平』就是 flatMap() 所谓的 flat
- 每个list就像卷起的纸,flatMap就是把一卷卷纸展开再连在一起
RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释 .throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms .subscribe(subscriber); |
- RxBinding 是 Jake Wharton 的一个开源库,它提供了一套在 Android 平台上的基于 RxJava 的 Binding API
- 所谓 Binding,就是类似设置 OnClickListener 、设置 TextWatcher 这样的注册绑定对象的 API
Button button = ...; RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件 .subscribe(new Action1<ViewClickEvent>() { @Override public void call(ViewClickEvent event) { } }); |
RxView.clickEvents(button) .throttleFirst(500, TimeUnit.MILLISECONDS) .subscribe(clickAction); |
- 主被动关系,往往会有人觉得观察者是主动观察,其实不然
- 名字就是一字之差,容易混淆,造成理解障碍
- Button -> 被观察者
- OnClickListener -> 观察者
- setOnClickListener() -> 订阅
- onClick() -> 事件
参考资料:http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2015/1012/3572.html