在说RxJava的切换这一特性之前先说点别的。

经过以上,可能你会发现Observable每次发射事件都是在OnSubscribe中的call方法中在直接在onNext后面添加参数,如果我们要挨个发送一个List的元素的话会这样写。

final List<String> list = new ArrayList();
list.add("1");
list.add("2");
list.add("3");
Observable<String> listObservalbe= Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext(list.get(0));
        subscriber.onNext(list.get(1));
        subscriber.onNext(list.get(2));
    }
});

Wtf?这看起来有点low吧。RxJava这么腻害怎么可能只能这样写。

RxJava可是深藏不露的剑客。让我们看看他的出剑方式吧

以下操作符适用于Observable创立之时发射事件。

 

  • Just 上述发送list可以写成
listObservalbe = Observable.just("1","2","3");
  •          From 适用于集合或者数组形式的发送时间。
listObservalbe = Observable.from(list);

上述两种方式的发送和create是挨个写onNext的效果相同。

OK,下来是要讲的我们的第二大快,切换线程

切换线程(我跳,我跳,我跳跳跳)

说道Rxjava的第二个厉害之处,那就是它的切换线程功能了。

各种scheduler搞得你眼花缭乱,应接不暇。??措辞有问题啊。反正就是各种灵活咯。

在切换线程之前,先来准备一下基础环境。

在之前的例子中,一直只用到了观察者subscriber的onNext(T t)方法,但是我们在写subscriber方法的时候,会提示我们复写三个方法,onStart,onNext,onComplete和onError那么他们之间有什么关系呢?onNext的调用和其余三者有顺序关联吗?

在subscriberon()方法执行之时,会首先执行subscriber的onStart方法,然后每个事件都走一遍onNext方法,在onNext方法执行之时,如果有任何的exceptionp抛出,那么就会执行onError方法并且事件停止发射。如果所有的事件都安全无误的发射完毕后那么在最后会执行onCompleted()方法作为end。

理解上述后我们看一下rxjava切换线程。

在RxJAva中,线程用Scheduler来表示,可以通过Schedulers(注意多了个s)来说明线程。

在RxJava中,切换线程有两种方式 一种是subscribeOn(Scheduler)还有一种是observeOn(Scheduler)。我刚开始也是被这两个方法搞得晕头转向。但是慢慢的总结出了其中的一些规律。

我先po几个在网上看到的资料。

第一个是利用ObserveOn来切换线程。并打印出切换线程后后续的操作符和观察者执行时所在的线程。(为了方便,我已经注释//了其运行的线程。)

Observable.just("")
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                Log.e("Thread","map1 "+Thread.currentThread().getName());//Main   Thread
                return s;
            }
        }).observeOn(Schedulers.io())
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                Log.e("Thread","map2 "+Thread.currentThread().getName());//Io Thread
                return s;
            }
        }).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.e("Thread","subscriber "+Thread.currentThread().getName());//Io   Thread
    }
});

执行结果为:

可以看到第一个map是执行在原本程序运行的main thread上,经过一个obsereOn指定线程为io线程后,后面的map操作符和观察者的执行方法都运行在了io 线程上。由此可以看出observeOn影响的是它之后的操作符和之后的观察者内部的执行方法。

这是一个ObserveOn()在使用的时候,如果我们切换了多次线程,使用了多次ObserveOn方法的时候,线程的切换是怎样的呢?

Observable.just("")
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                Log.e("thread","map1--->"+Thread.currentThread().getName());//Main Thread
                return  s;
            }
        }).observeOn(Schedulers.io())
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                Log.e("thread","map2--->"+Thread.currentThread().getName());//Io Thread
                return s;
            }
        }).observeOn(Schedulers.computation())
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                Log.e("thread","map3--->"+Thread.currentThread().getName());//Computation Thread
                return s;
            }
        }).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.e("thread","subscribe--->"+Thread.currentThread().getName());//Computation Thread
    }
});

执行结果为

使用多个observeOn时,可以看到每次oberservOn切换线程时影响的是后面的操作符和观察者的执行方法,作用域是下一个observerOn之前。即每执行一次oberserOn就切换一次线程。

///

现在我们来看subscribeOn切换线程的影响范围。

Observable.just("")
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                Log.e("thread", "map1--->"+Thread.currentThread().getName());//Computation Thread
                return s;
            }
        }).subscribeOn(Schedulers.computation())
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                Log.e("thread", "map2--->"+Thread.currentThread().getName());//Computation Thread
                return s;
            }
        }).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {
        Log.e("thread", Thread.currentThread().getName());
    }
});

可以看到不仅subsribeOn指定线程之后的map和观察者执行的线程在computation线程上,就连第一个本该运行在main thread线程的map也运行在了computation线程上。由此可知

subscribeOn影响的不只是前面的操作符和方法,也影响后面的操作符

这不禁让人产生疑问,如果既影响前面又影响后面,那么如果多个subscribeOn一起时用线程切换怎样的。做个试验

Observable.just("")
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                Log.e("thread","map1--->"+Thread.currentThread().getName());//Io Thread
                return  s;
            }
        }).subscribeOn(Schedulers.io())
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                Log.e("thread","map2--->"+Thread.currentThread().getName());//Io Thread
                return s;
            }
        }).subscribeOn(Schedulers.computation())
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                Log.e("thread","map3--->"+Thread.currentThread().getName());//Io Thread
                return s;
            }
        }).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.e("thread","subscribe--->"+Thread.currentThread().getName());//Io Thread
    }
});

执行结果:

可以看到,尽管多个subscribeOn同时作用,但是只有第一个起作用了。后面的subscribeOn都没有将线程切换成功。所以可见多个subsricbeOn作用的时候只有第一个subcribeOn切换成功。

 那这个时候,问题来了,如果subscribeOn和observeOn同时使用,线程是怎么个切换方式呢,想想还有点小激动呢,稳住,我们先来看个实验。

Observable.just("")
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                Log.e("thread","map1--->"+Thread.currentThread().getName());//Computation Thread
                return  s;
            }
        }).observeOn(Schedulers.io())
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                Log.e("thread","map2--->"+Thread.currentThread().getName());//Io Thread
                return s;
            }
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.computation())
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                Log.e("thread","map3--->"+Thread.currentThread().getName());//Main Thread
                return s;
            }
        }).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.e("thread","subscribe--->"+Thread.currentThread().getName());//Main Thread
    }
});

执行结果为:

可以看到observeOn影响的是之后的线程,只要是observeOn之后的线程都在observeOn所指定的线程中运行,而没有被ObserveOn指定线程的部分(本例子中为第一个map方法)则被subscribeOn的线程指定运行线程。

由此我们得出的结论为:

observerOn控制的是紧随其后的操作符和观察者的执行线程,且可以被新的observeOn切换线程。

subscribe控制的是之前和之后的流程的线程。且不能被后来的subsrcibeOn 切换。

observeOn和subscribeOn结合使用时,优先ObserverOn的执行线程,observeOn范围之外的subsribeOn来控制。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐