RxJava入门

摘要:
RxJava不仅单独处理每个事件,还将它们视为一个队列。RxJava规定,当没有发出新的onNext()时,需要将onCompleted()方法作为标志触发。superString˃subscriber){subscriber.onNext;subscriber.on Next;subsumer.onCompleted();}});1.2.从()1.2.1创建静态工厂just()的两种快速方法。Observable<T>just这个方法有10个重载,可以分别传入1到10个参数。这是一个简单的方法。传递的参数将依次发送出去,可以写入基本的create()形式Observable=Observable。jst;等于上面的create():ObservableObservable=Observable。create(newObservable.OnSubscribe<String>(){@Overridepublicvoid调用(Subscriber<?

项目小版本上线,抽空简单学习了下久仰大名的RxJava

一、引入
个人觉得rxjava的特点:
  1. 强大灵活的事件流处理(多线程/多事件/复合对象)
  2. 强大灵活优雅简洁的异步
  3. 链式调用
  4. 可自动Lambda化
 
实现:RxJava 是通过一种扩展的观察者模式来实现的
类比类比实际实际职责
演讲者Button
(可)被订阅者
(同右)
(可)被观察者
Observable
决定什么时候触发事件以及触发怎样的事件
听众OnClickListener
订阅者
Subscriber
观察者
Observer
决定事件触发的时候将有怎样的行为
买票setOnClickListener()
订阅
subscribe
注册 
命令onClick()事件事件 
演讲者只有Observable一种类,但是听众有Subscriber和Observer两种类,有点跟演讲者一个听众很多类似
 
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
  1. 很多onNext()
  2. onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的onNext() 发出时,需要触发 onCompleted() 方法作为标志。
  3. onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
 
 

二、实现方式
步骤:
  1. 创建演讲者
  2. 创建听众
  3. 买票
 
1、创建演讲者
创建演讲者的方式有点特殊,它不是通过new的方法,而是通过一系列不同的内部静态工厂方法来创建的,最普通的有create( ) 方法
 
1.1.create( ) 方法创建
  1. 不是通过new的方法,而是一个内部静态工厂方法create( )来创建,这个方法需要传入一个 OnSubscribe 对象作为参数
    1. 不过换个角度,把它当成一个需要传入一个参数的构造方法就好了(虽然内部是有点其他货的)
  2. 方法具体是:Observable<T>   create ( OnSubscribe<T> f )
  3. 这个  OnSubscribe 类本身是Observable的内部类
    1. 这个对象在create( )时传入后会存储在返回的 Observable 对象中
    2. 当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用
    3. 这个call方法会遍历传入所有的听众o,这些听众都实现了听众的那三个约定方法,在这里就可以执行自己需要的业务代码,并在需要的时候回调听众的那三个约定方法
    4. 换个角度说,OnSubscribe 的作用相当于一个计划表或者说事件发生器
    5. 这个泛型T是输出类型,也就是会传给听众的类型
 
简写:
Observable observable = Observable.create(new OnSubscribe() {
    @Override
    public void call(Object o) {

    }
});
具体的实际的写法:
  1. 传入听众的表示格式是这样:Subscriber<? super String> subscriber(前面的泛型是string类)
  2. 这个例子很简单:
    1. 事件的内容是字符串,而不是一些复杂的对象
    2. 事件的内容是已经定好了的,而不像有的观察者模式一样是待确定的(例如网络请求的结果在请求返回之前是未知的)
    3. 所有事件在一瞬间被全部发送出去,而不是夹杂一些确定或不确定的时间间隔或者经过某种触发器来触发的
 
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onCompleted();
    }
});
 
1.2.两个快捷创建静态工厂方法 just( ) , from( )
1.2.1.Observable<T>   just( T t1, T t2 )
  1. 这个方法有10个重载,分别可以传入1个到10个参数(orz)
  2. 这是一个简便方法,会将传入的参数依次发送出来
  3. 可以写成基本的 create( ) 形式
 
Observable observable = Observable.just("Hello", "Hi");
等于上面的 create( ):
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onCompleted();
    }
});
 
1.2.2.
Observable<T>   from( T[ ]  array )
Observable<T>   from( Iterable<? extends T>  array )
  1. 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来
  2. 可以写成基本的 create( ) 形式
 
String[] words = {"Hello", "Hi"};
Observable observable = Observable.from(words);
 
2、创建听众
这个有几种方式
2.1.通过rx包中的 Observer 接口
  1. 注意不是java中util包中的接口
Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {}
    @Override
    public void onCompleted() {}
    @Override
    public void onError(Throwable e) {}
};
 
2.2.通过rx包中的 Subscriber 抽象类[推荐]
  1. 它实现了 Observer 接口,并进行了一些扩展
  2. 使用方法跟Observer一样,而且必须实现的方法就是 Observer 接口中的方法,在订阅(subscribe)的时候,Observer 也总是会先被转换成一个 Subscriber 再使用,所以基本建议可以用Observer的地方都用Subscriber吧
  3. 这个抽象类扩展了两个可选的方法:
    1. onStart( )
      1. 它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置
      2. 需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法
    2. unsubscribe( )
      1. 这个方法用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件
      2. 一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态
      3. unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用unsubscribe() 来解除引用关系,以避免内存泄露的发生
  4. 这个泛型T是输入类型,也就是听众能接收的类型
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {}
    @Override
    public void onCompleted() {}
    @Override
    public void onError(Throwable e) {}
};
 
2.3.快捷创建方法:ActionX 接口[推荐]
听众也有快捷的创建方式,那就是通过ActionX 接口
  1. ActionX 接口有很多,Action0 和 Action1 最常用,还有Action2, Action3,X代表传入的参数的数目
  2. 他们其实是对具有不同传入参数、但是无返回值的方法的包装接口
  3. subscribe()会根据这些ActionX对象生成正常的Subscriber
 
以0和1为例
  1. Action0
    1. Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的
    2. 由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调
    3. 这样其实也可以看做将 onCompleted() 方法作为参数传进了subscribe(),相当于其他某些语言中的『闭包』
  2. Action1
    1. Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;
    2. 与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调
 
正常使用方式是:
  1. new 出 action 对象
  2. 组合塞进 subsribe 方法
通过Action0、Action1构造三种方法的包装对象
 
// onNext()
Action1<String> action1 = new Action1<String>() {
    @Override
    public void call(String s) {}
};

// onError()
Action1<Throwable> action2 = new Action1<Throwable>() {
    @Override
    public void call(Throwable throwable) {}
};

// onCompleted()
Action0 action3 = new Action0() {
    @Override
    public void call() {}
};
// 自动创建 Subscriber ,并使用 action1 来定义 onNext()
observable.subscribe(action1);

// 自动创建 Subscriber ,并使用 action1 和 action2 来定义 onNext() 和 onError()
observable.subscribe(action1, action2);

// 自动创建 Subscriber ,并使用 action1、 action2 和 action3 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(action1, action2, action3);
 
如果只是要onNext( ) ,这样使用匿名类也很清晰
Observable.just("1","2","3")
        .subscribe(new Action1<String>() {
            @Override
            public void call(String name) {
                Log.d(tag, name);
            }
        });
 
 
3、买票(进行订阅)
创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了
 
代码形式很简单:
observable.subscribe(subscriber);
  1. 有人可能会注意到, subscribe() 这个方法有点怪:它看起来是『observalbe 订阅了 subscriber』而不是『subscriber 订阅了 observalbe』,这看起来就像『杂志订阅了读者』一样颠倒了对象关系。这让人读起来有点别扭
  2. 因为如果把 API 设计成 subscriber.subscribe(observable) ,虽然更加符合思维逻辑,但对流式 API 的设计就造成影响了,因为对于流事件,是有一个生产者和一系列消费者的,所以生产者放前边,后边跟一串消费者才是更流的形式
  3. 这种形式其实跟普通的java观察者模式很像,演讲者.add(听众)
 
 

三、快速使用及举例
 
使用rxjava首先是引入依赖:
1.基本的rxjava:
compile 'io.reactivex:rxjava:1.0.14'
2.带android特性的rxjava:
compile 'io.reactivex:rxandroid:1.0.1'
3.支持rxjava的网络加载库retrofit
compile 'com.squareup.retrofit:retrofit:1.9.0'
 
然后几个简单的例子:
1.真正最简单的例子:打印几个字符串
  1. 三步走
  2. 最简单的just方法
  3. 一个普通的subscriber
Observable.just("1","2","3").subscribe(new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, s);
    }
    @Override
    public void onCompleted() {}
    @Override
    public void onError(Throwable e) {}
});
 
2.复杂一点点:给ImageView set 图片
  1. 需求:
  2. 实现:
    1. 还是基本的三步走,稍微注意下格式(流式结构)
final Drawable drawable = getActivity().getResources().getDrawable(R.drawable.1);
final ImageView imageView = new ImageView(getActivity());

Observable.create(new Observable.OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
}).subscribe(new Subscriber<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {}

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "error!");
    }
});
 

四、线程控制:Scheduler 和 subscribeOn()、observeOn()
  1. 在不指定线程的情况下, RxJava 遵循的是线程不变的原则
  2. 即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。
  3. 如果需要切换线程,就需要用到 Scheduler (调度器)
 
1.Scheduler
RxJava 通过Scheduler来指定每一段代码应该运行在什么样的线程
RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:
  1. Schedulers.immediate()
    1. 直接在当前线程运行,相当于不指定线程
    2. 这是默认的 Scheduler
  2. Schedulers.newThread()
    1. 总是启用新线程,并在新线程执行操作
  3. Schedulers.io( )
    1. I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler
    2. 行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率
    3. 不要把计算工作放在 io() 中,可以避免创建不必要的线程
  4. Schedulers.computation( )
    1. 计算所使用的 Scheduler,这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算
    2. 这个 Scheduler 使用的固定的线程池,大小为 CPU 核数
    3. 不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU
  5.  Android 还有一个专用的 AndroidSchedulers.mainThread()
    1. 它指定的操作将在 Android 主线程运行
 
2.subscribeOn() 和 observeOn()
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了
  1. subscribeOn( )
    1. 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程
    2. 而这其实就是事件产生的线程,也就是Observable活动的线程
    3. (看到这不免有点乱,Observable活动在subscribeOn指定的线程,那这里就只能用[Observable.OnSubscribe 被激活]这件事来记了)
  2. observeOn( )
    1. 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程
    2. (同样正好相反)
 
3.例子
Observable.just(1, 2, 3, 4)
        .subscribeOn(Schedulers.io())                // 指定 subscribe() 发生在 IO 线程
        .observeOn(AndroidSchedulers.mainThread())   // 指定 Subscriber 的回调发生在主线程
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer number) {
                Log.d(tag, "number:" + number);
            }
        });
  1. 这种在subscribe() 之前写上两句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常套路
  2. 它适用于多数的 『后台线程取数据,主线程显示』的程序策略
  3. 前面加载图片的代码中应该要加上这两句的
 
4.拓展
4.1.事件消费的线程可多次变换
  1. observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 不一定是最终subscribe() 时的Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber
  2. 换句话说,observeOn() 指定的是它之后的操作所在的线程
  3. 因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可
  4. 如下,通过 observeOn() 的多次调用,程序实现了线程的多次切换
Observable.just(1, 2, 3, 4)            // IO 线程,由 subscribeOn() 指定
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .map(mapOperator1)             // 新线程,由 observeOn() 指定
        .observeOn(Schedulers.io())
        .map(mapOperator2)             // IO 线程,由 observeOn() 指定
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(subscriber);        // Android 主线程,由 observeOn() 指定
 
4.2.事件产生线程是固定的
不同于 observeOn() , subscribeOn() 的位置虽然放在哪里都可以,但它是只能调用一次
当使用了多个subscribeOn() 的时候,只有第一个 subscribeOn() 起作用
 
4.3.流程开始前的初始化问题:【Subscriber 的 onStart( )】【Observable 的 doOnSubscribe( )】
  1. 在前面讲 Subscriber 的时候,提到过 Subscriber 的 onStart() 可以用作流程开始前的初始化
  2. 然而 onStart() 由于在subscribe() 发生时就被调用了,也就是没有被observeOn() 指定线程,因此是执行在 subscribe() 被调用时的线程也就是原线程
  3. 这就导致如果 onStart() 中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测subscribe() 将会在什么线程执行
 
解决方法
  1. Observable(不是Subscriber的)有一个方法doOnSubscribe()
  2. 它和 Subscriber.onStart() 同样是在subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程
  3. 默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程
  4. 而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程
Observable.create(onSubscribe)
        .subscribeOn(Schedulers.io())
        .doOnSubscribe(new Action0() {
            @Override
            public void call() {
                progressBar.setVisibility(View.VISIBLE);  // 需要在主线程执行
            }
        })
        .subscribeOn(AndroidSchedulers.mainThread())      // 指定主线程
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(subscriber);
 

五、变换
  1. RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的最大原因
  2. 所谓变换,就是将事件序列中的对象整个事件队列进行加工处理,转换成不同的事件或事件序列
 
变换节点是由一个【变换算法】和一个【变换枢纽】构成的
  1. 变换算法是一系列方法,比如:map()、flatMap(),他们决定变换的方式,是随方法固定的
  2. 变换枢纽是一种类似管道枢纽的容器,里边由你写具体的变换细节;输入和输出就像管道,如何分流就像管道的布线
  3. 往一个变换算法里传入一个变换枢纽就组成了一个变换节点
 
1.变换枢纽
  1. 变换枢纽是FuncX系列接口的对象
  2. FuncX系列接口跟前面讲的ActionX系列接口非常像
    1. 他们是位于 rx.functions 包下的全部两个系列接口
    2. ActionX系列是可以传入1个或多个参数,但是无返回值的call方法的包装
    3. FuncX系列是可以传入1个或多个参数,但是有一个返回值的call方法的包装
  3. 正是两个方法的唯一区别(是否有返回值)决定了在rx链中两者不同的角色
    1. ActionX系列由于没有返回值,所以只能作为链的终点,也就是为观众服务,可以被组合构成观众
    2. FuncX系列由于有返回值,但他又不能作为链的起点,所以就自然成了我们这里要说的新角色:链的中继者,或者说变换器
  4. FuncX系列的泛型位置是这样:前面的所有参数是输入参数,最后一个参数是输出参数
 
2.变换算法
2.1.map( )
  1. map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回
  2. 在经过 map() 方法后,事件的参数类型也由 String转为了 Bitmap
Observable.just("images/logo.png")                  // 输入类型 String
        .map(new Func1<String, Bitmap>() {
            @Override
            public Bitmap call(String filePath) {   // 参数类型 String
                return getBitmapFromPath(filePath); // 返回类型 Bitmap
            }
        })
        .subscribe(new Action1<Bitmap>() {
            @Override
            public void call(Bitmap bitmap) {       // 参数类型 Bitmap
                showBitmap(bitmap);
            }
        });
 
2.2.flatMap( )
这是一个很有用但不太好难理解的变换
 
2.2.1.情景推导
首先假设这么一种需求:有一个数据结构『学生』,现在需要打印出一组学生的名字
用上面提到的map方法实现起来很简单:
Student[] students = new Student[10];
Observable.from(students)
        .map(new Func1<Student, String>() {
            @Override
            public String call(Student student) {
                return student.getName();
            }
        })
        .subscribe(new Subscriber<String>() {
            @Override
            public void onNext(String name) {
                Log.d(tag, name);
            }
            ...
        });
 
再假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程)
这个时候用不了map了,先用普通方式实现一下:
Student[] students = new Student[10];
Observable.from(students)
        .subscribe(new Subscriber<Student>() {
            @Override
            public void onNext(Student student) {
                List<Course> courses = student.getCourses();
                for (int i = 0; i < courses.size(); i++) {
                    Course course = courses.get(i);
                    Log.d(tag, course.getName());
                }
            }
            ...
        });
 
2.2.2.引入
  1. 上面实现倒是实现了,但是这个for循环的存在不免显得有点不太优雅
  2. 如果不想在 Subscriber 中使用 for 循环,而是希望在 Subscriber 中直接接收单个的 Course 对象呢(这对于代码复用很重要)?
  3. 用 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化。那怎么才能把一个 Student 转化成多个 Course 呢?
  4. 这个时候,就需要用 flatMap( ) 了
 
2.2.3.实现
Student[] students = new Student[10];
Observable.from(students)
        .flatMap(new Func1<Student, Observable<Course>>() {
            @Override
            public Observable<Course> call(Student student) {
                return Observable.from(student.getCourses());
            }
        })
        .subscribe(new Subscriber<Course>() {
            @Override
            public void onNext(Course course) {
                Log.d(tag, course.getName());
            }
            ...
        });
  1. flatMap() 和 map() 有一个相同点:也是把传入的参数转化之后返回另一个对象
  2. 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中,而是
    1. 先使用传入的事件对象创建一个 Observable 对象
    2. 但是并不发送这个 Observable, 而是将它激活(subscribe),于是它开始发送事件
    3. 每一个创建出来的子Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法
  3. 上面这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去
    1. 这个『铺平』就是 flatMap() 所谓的 flat
    2. 每个list就像卷起的纸,flatMap就是把一卷卷纸展开再连在一起
 
2.3.throttleFirst( )
在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器
妈妈再也不怕我的用户手抖点开两个重复的界面啦
RxView.clickEvents(button)                         // RxBinding 代码,后面的文章有解释
        .throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms
        .subscribe(subscriber);
 

六、应用
1.Retrofit
 
2. RxBinding
  1. RxBinding 是 Jake Wharton 的一个开源库,它提供了一套在 Android 平台上的基于 RxJava 的 Binding API
  2. 所谓 Binding,就是类似设置 OnClickListener 、设置 TextWatcher 这样的注册绑定对象的 API
 
举个设置点击监听的例子。使用 RxBinding ,可以把事件监听用这样的方法来设置:
Button button = ...;
RxView.clickEvents(button)         // 以 Observable 形式来反馈点击事件
        .subscribe(new Action1<ViewClickEvent>() {
            @Override
            public void call(ViewClickEvent event) {
               
            }
        });
 
看起来除了形式变了没什么区别,实质上也是这样。甚至如果你看一下它的源码,你会发现它连实现都没什么惊喜:它的内部是直接用一个包裹着的 setOnClickListener() 来实现的。
然而,仅仅这一个形式的改变,却恰好就是 RxBinding 的目的:扩展性。
通过RxBinding 把点击监听转换成 Observable 之后,就有了对它进行扩展的可能。
扩展的方式有很多,根据需求而定。一个例子是前面提到过的 throttleFirst() ,用于去抖动,也就是消除手抖导致的快速连环点击:
RxView.clickEvents(button)
        .throttleFirst(500, TimeUnit.MILLISECONDS)
        .subscribe(clickAction);
 
3. 各种异步操作
前面举的 Retrofit 和 RxBinding 的例子,是两个可以提供现成的 Observable 的库。而如果你有某些异步操作无法用这些库来自动生成 Observable,也完全可以自己写。例如数据库的读写、大图片的载入、文件压缩/解压等各种需要放在后台工作的耗时操作,都可以用 RxJava 来实现,有了之前几章的例子,这里应该不用再举例了。
 
4. RxBus
RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用Otto 或者 GreenRobot 的 EventBus。至于什么是 RxBus,可以看这篇文章。顺便说一句,Flipboard 已经用 RxBus 替换掉了 Otto,目前为止没有不良反应。
 
 

附录
 
1.关于观察者模式
其实觉得这个名字非常容易误解:观察者模式里两个角色,观察者和被观察者,两个方面会让人误解:
  1. 主被动关系,往往会有人觉得观察者是主动观察,其实不然
  2. 名字就是一字之差,容易混淆,造成理解障碍
第二点就不说了,来讨论下第一点,其实里边一个是事件发出者,一个是被事件驱动者,主被动关系是反过来的,应该是 主人和仆人,指挥官和小兵,演讲者和听众,button和onclicklistener 的关系,观察者其实是被动触发的,而不是主动观察
 
观察者模式用button和onclicklistener的模型来类比真是非常形象好理解(因为:1常用,平易近人;2非常符合)
对设置 OnClickListener 来说, View 是被观察者, OnClickListener 是观察者,二者通过 setOnClickListener() 方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener 。采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。
    1. Button -> 被观察者
    2. OnClickListener -> 观察者
    3. setOnClickListener() -> 订阅
    4. onClick() -> 事件

参考资料:http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2015/1012/3572.html

免责声明:文章转载自《RxJava入门》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇统计量java中子类继承父类程序执行顺序问题下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

JSF: 动态生成的DataTable, 固定表头, 固定行标

自己写了段小代码, 希望可以供大家学习和参考。 代码里没有太多注释, 有时间的话我会补充上去。自己在写动态生成DataTable的时候也查阅了很多相关文章, 以及实现固定表头等等。在解决固定表头问题上我是用的两张表(加行标是3张表)实现的, 因为我发现如果用JSF1.1的化实现固定表头几乎不可能(如果有人有好的想法, 比如用JS比较在行的朋友请告诉我解决方...

hive函数之~字符串函数

1、字符串长度函数:length 语法: length(string A)返回值: int说明:返回字符串A的长度 hive> selectlength('abcedfg') fromtableName; 7 2、字符串反转函数:reverse 语法: reverse(string A)返回值: string说明:返回字符串A的反转结果 h...

C#去掉json字符串中的换行符

【出错状况】 从数据库中返回json格式的数据,但由于数据库中的数据中有换行符,导致返回的json数据错误。 【原因分析】 用for循环语句来分析出错字段字符串中每个字符的ASCII码,可以看出存在值分别为13、10的两个字符,造成换行,导致json格式出错。            char tempstring = '\n';   (10)       ...

获取DataTable中一列的数据

#region 获取合同号DataRow[] arrRow = new DataRow[ds.Rows.Count];int w = 0;foreach (DataRow row in ds.Rows){arrRow[w] = row;w++;}string[] ary = Array.ConvertAll(arrRow, r => r["cdon...

android SQLite使用SQLiteOpenHelper类对数据库进行操作

一、 SQLite介绍SQLite是android内置的一个很小的关系型数据库。SQLite的官网是http://www.sqlite.org/,可以去下载一些文档或相关信息。博客中有一篇有稍微详细一点的介绍,大家可以去看一下。二、 SQLiteOpenHelper的使用方法SQLiteOpenHelper是一个辅助类来管理数据库的创建和版本。可以通过继承...

.NET ActionFilterAttribute等

public override void OnException(HttpActionExecutedContext actionExecutedContext){//加LOG actionExecutedContext.Exception //2.返回调用方具体的异常信息if (actionExecutedContext.Exception is Not...