从Android开发者的角度去理解RxJava(三)
在说RxJava的切换这一特性之前先说点别的。经过以上,可能你会发现Observable每次发射事件都是在OnSubscribe中的call方法中在直接在onNext后面添加参数,如果我们要挨个发送一个List的元素的话会这样写。final List<String> list = new ArrayList();list.add("1");list.
在说RxJava的切换这一特性之前先说点别的。
经过以上,可能你会发现Observable每次发射事件都是在OnSubscribe中的call方法中在直接在onNext后面添加参数,如果我们要挨个发送一个List的元素的话会这样写。
final List<String> list = new ArrayList();
list.add("1");
list.add("2");
list.add("3");
Observable<String> listObservalbe= Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext(list.get(0));
subscriber.onNext(list.get(1));
subscriber.onNext(list.get(2));
}
});
Wtf?这看起来有点low吧。RxJava这么腻害怎么可能只能这样写。
RxJava可是深藏不露的剑客。让我们看看他的出剑方式吧
以下操作符适用于Observable创立之时发射事件。
- Just 上述发送list可以写成
listObservalbe = Observable.just("1","2","3");
- From 适用于集合或者数组形式的发送时间。
listObservalbe = Observable.from(list);
上述两种方式的发送和create是挨个写onNext的效果相同。
OK,下来是要讲的我们的第二大快,切换线程!
切换线程(我跳,我跳,我跳跳跳)
说道Rxjava的第二个厉害之处,那就是它的切换线程功能了。
各种scheduler搞得你眼花缭乱,应接不暇。??措辞有问题啊。反正就是各种灵活咯。
在切换线程之前,先来准备一下基础环境。
在之前的例子中,一直只用到了观察者subscriber的onNext(T t)方法,但是我们在写subscriber方法的时候,会提示我们复写三个方法,onStart,onNext,onComplete和onError那么他们之间有什么关系呢?onNext的调用和其余三者有顺序关联吗?
在subscriberon()方法执行之时,会首先执行subscriber的onStart方法,然后每个事件都走一遍onNext方法,在onNext方法执行之时,如果有任何的exceptionp抛出,那么就会执行onError方法并且事件停止发射。如果所有的事件都安全无误的发射完毕后那么在最后会执行onCompleted()方法作为end。
理解上述后我们看一下rxjava切换线程。
在RxJAva中,线程用Scheduler来表示,可以通过Schedulers(注意多了个s)来说明线程。
在RxJava中,切换线程有两种方式 一种是subscribeOn(Scheduler)还有一种是observeOn(Scheduler)。我刚开始也是被这两个方法搞得晕头转向。但是慢慢的总结出了其中的一些规律。
我先po几个在网上看到的资料。
第一个是利用ObserveOn来切换线程。并打印出切换线程后后续的操作符和观察者执行时所在的线程。(为了方便,我已经注释//了其运行的线程。)
Observable.just("")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
Log.e("Thread","map1 "+Thread.currentThread().getName());//Main Thread
return s;
}
}).observeOn(Schedulers.io())
.map(new Func1<String, String>() {
@Override
public String call(String s) {
Log.e("Thread","map2 "+Thread.currentThread().getName());//Io Thread
return s;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("Thread","subscriber "+Thread.currentThread().getName());//Io Thread
}
});
执行结果为:
可以看到第一个map是执行在原本程序运行的main thread上,经过一个obsereOn指定线程为io线程后,后面的map操作符和观察者的执行方法都运行在了io 线程上。由此可以看出observeOn影响的是它之后的操作符和之后的观察者内部的执行方法。
这是一个ObserveOn()在使用的时候,如果我们切换了多次线程,使用了多次ObserveOn方法的时候,线程的切换是怎样的呢?
Observable.just("")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
Log.e("thread","map1--->"+Thread.currentThread().getName());//Main Thread
return s;
}
}).observeOn(Schedulers.io())
.map(new Func1<String, String>() {
@Override
public String call(String s) {
Log.e("thread","map2--->"+Thread.currentThread().getName());//Io Thread
return s;
}
}).observeOn(Schedulers.computation())
.map(new Func1<String, String>() {
@Override
public String call(String s) {
Log.e("thread","map3--->"+Thread.currentThread().getName());//Computation Thread
return s;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("thread","subscribe--->"+Thread.currentThread().getName());//Computation Thread
}
});
执行结果为
使用多个observeOn时,可以看到每次oberservOn切换线程时影响的是后面的操作符和观察者的执行方法,作用域是下一个observerOn之前。即每执行一次oberserOn就切换一次线程。
///
现在我们来看subscribeOn切换线程的影响范围。
Observable.just("")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
Log.e("thread", "map1--->"+Thread.currentThread().getName());//Computation Thread
return s;
}
}).subscribeOn(Schedulers.computation())
.map(new Func1<String, String>() {
@Override
public String call(String s) {
Log.e("thread", "map2--->"+Thread.currentThread().getName());//Computation Thread
return s;
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("thread", Thread.currentThread().getName());
}
});
可以看到不仅subsribeOn指定线程之后的map和观察者执行的线程在computation线程上,就连第一个本该运行在main thread线程的map也运行在了computation线程上。由此可知
subscribeOn影响的不只是前面的操作符和方法,也影响后面的操作符。
这不禁让人产生疑问,如果既影响前面又影响后面,那么如果多个subscribeOn一起时用线程切换怎样的。做个试验
Observable.just("")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
Log.e("thread","map1--->"+Thread.currentThread().getName());//Io Thread
return s;
}
}).subscribeOn(Schedulers.io())
.map(new Func1<String, String>() {
@Override
public String call(String s) {
Log.e("thread","map2--->"+Thread.currentThread().getName());//Io Thread
return s;
}
}).subscribeOn(Schedulers.computation())
.map(new Func1<String, String>() {
@Override
public String call(String s) {
Log.e("thread","map3--->"+Thread.currentThread().getName());//Io Thread
return s;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("thread","subscribe--->"+Thread.currentThread().getName());//Io Thread
}
});
执行结果:
可以看到,尽管多个subscribeOn同时作用,但是只有第一个起作用了。后面的subscribeOn都没有将线程切换成功。所以可见多个subsricbeOn作用的时候只有第一个subcribeOn切换成功。
那这个时候,问题来了,如果subscribeOn和observeOn同时使用,线程是怎么个切换方式呢,想想还有点小激动呢,稳住,我们先来看个实验。
Observable.just("")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
Log.e("thread","map1--->"+Thread.currentThread().getName());//Computation Thread
return s;
}
}).observeOn(Schedulers.io())
.map(new Func1<String, String>() {
@Override
public String call(String s) {
Log.e("thread","map2--->"+Thread.currentThread().getName());//Io Thread
return s;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.computation())
.map(new Func1<String, String>() {
@Override
public String call(String s) {
Log.e("thread","map3--->"+Thread.currentThread().getName());//Main Thread
return s;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("thread","subscribe--->"+Thread.currentThread().getName());//Main Thread
}
});
执行结果为:
可以看到observeOn影响的是之后的线程,只要是observeOn之后的线程都在observeOn所指定的线程中运行,而没有被ObserveOn指定线程的部分(本例子中为第一个map方法)则被subscribeOn的线程指定运行线程。
由此我们得出的结论为:
observerOn控制的是紧随其后的操作符和观察者的执行线程,且可以被新的observeOn切换线程。
subscribe控制的是之前和之后的流程的线程。且不能被后来的subsrcibeOn 切换。
observeOn和subscribeOn结合使用时,优先ObserverOn的执行线程,observeOn范围之外的subsribeOn来控制。
更多推荐
所有评论(0)