【Android开源框架面试题】 RxJava框架线程切换的原理,RxJava1与RxJava2有哪些区别?
我整理了一套Android面试题合集,除了以上面试题,还包含【Java 基础、集合、多线程、虚拟机、反射、泛型、并发编程、Android四大组件、异步任务和消息机制、UI绘制、性能调优、SDN、第三方框架、设计模式、Kotlin、计算机网络、系统启动流程、Dart、Flutter、算法和数据结构、NDK、H.264、H.265.音频编解码、FFmpeg、OpenMax、OpenCV、OpenGL
RxJava框架线程切换的原理,RxJava1与RxJava2有哪些区别?
这道题想考察什么?
对流行框架RxJava的掌握情况。能否灵活运用RxJava线程切换来实现复杂的应用场景
考察的知识点
- RxJava原理
- Handler线程切换
考生应该如何回答
线程切换
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
RxJava切换线程分为两部分:subscribeOn()和observeOn()
subscribeOn()
首先是subscribeOn()源码如下:
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
我们传进去了一个Scheduler类,Scheduler是一个调度类,能够延时或周期性地去执行一个任务。
Scheduler有如下类型:
类型 | 使用方式 | 含义 | 使用场景 |
---|---|---|---|
IoScheduler | Schedulers.io() | io操作线程 | 读写SD卡文件,查询数据库,访问网络等IO密集型操作 |
NewThreadScheduler | Schedulers.newThread() | 创建新线程 | 耗时操作等 |
SingleScheduler | Schedulers.single() | 单例线程 | 只需一个单例线程时 |
ComputationScheduler | Schedulers.computation() | CPU计算操作线程 | 图片压缩取样、xml,json解析等CPU密集型计算 |
TrampolineScheduler | Schedulers.trampoline() | 当前线程 | 需要在当前线程立即执行任务时 |
HandlerScheduler | AndroidSchedulers.mainThread() | Android主线程 | 更新UI等 |
接着就没什么了,只是返回一个ObservableSubscribeOn对象而已。
observeOn()
首先看源码如下:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
这里也是没什么,只是最终返回一个ObservableObserveOn对象而已。
接着还是像原来那样调用subscribe()方法进行订阅,看起来好像整体变化不大,就是封装了一些对象而已,不过着恰恰是RxJava源码的精华,当他再次调用subscribeActual()方法时,已经不是之前的ObservableCreate()里subscribeActual方法了,而是最先调用ObservableObserveOn的subscribeActual()方法,对应源码如下:
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
在这里有两点要讲,一点是ObserveOnObserver是执行观察者的线程,后面还会详解,然后就是source.subscribe,这个source.subscribe调的是ObservableSubscribeOn的subscribe方法,而subscribe方法因为继承的也是Observable,是Observable里的方法,所以和上面的ObservableCreate一样的方法,所以会调用ObservableSubscribeOn里的subscribeActual()方法,对应的代码如下:
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
上面代码中,首先把ObserveOnObserver返回给来的用SubscribeOnObserver“包装”起来,然后在回调Observer的onSubscribe(),就是对应模板代码的onSubscribe()方法。
接着看SubscribeTask类的源码:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
其中的source.subscribe(parent),就是我们执行子线程的回调方法,对应我们模板代码里的被观察者的subscribe()方法。它放在run()方法里,并且继承Runnable,说明这个类主要是线程运行。接着看scheduler.scheduleDirect()方法对应的源码如下:
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
在这里,createWorker()也是一个抽象方法,调用的是我们的调度类对应的Schedulers类里面的方法,这里是IoScheduler类:
public final class IoScheduler extends Scheduler{
final AtomicReference<CachedWorkerPool> pool;
//省略....
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
//省略....
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
}
static final class CachedWorkerPool implements Runnable {
//省略....
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
//省略....
}
这就是IoScheduler的createWorker()的方法,其实最主要的意思就是获取线程池,以便于生成子线程,让SubscribeTask()可以运行。然后直接调用 w.schedule(task, delay, unit)方法让它在线程池里执行。上面中那ThreadWorker的源码如下:
static final class ThreadWorker extends NewThreadWorker {
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
//省略代码....
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
}
可以看到,这就调了原始的java API来进行线程池操作。
然后最后一环在子线程调用source.subscribe(parent)方法,然后回调刚开始创建的ObservableCreate的subscribeActual(),既:
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
进行消息的订阅绑定。
当我们在调用 emitter.onNext(内容)时,是在io线程里的,那回调的onNext()又是什么时候切换的?那就是前面为了整个流程流畅性没讲的在observeOn()里的ObserveOnObserver是执行观察者的线程的过程。
class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
//省略代码....
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
//省略代码....
}
当调用emitter.onNext(内容)方法,会调用上面的onNext()方法,然后在这个方法里会把数据压入一个队列,然后执行worker.schedule(this)方法,work是什么呢,还记得AndroidSchedulers.mainThread()吗,这个对应这个HandlerScheduler这个类,所以createWorker()对应着:
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, unit.toMillis(delay));
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
}
在next()方法里,运用android自带的Handler消息机制,通过把方法包裹在Message里,同通过handler.sendMessageDelayed()发送消息,就会在ui线程里回调Next()方法,从而实现从子线程切换到android主线程的操作。我们在主线程拿到数据就可以进行各种在主线程的操作了。
RxJava1和RxJava2区别
-
RxJava 2.0 不再支持 null 值
Observable.just(null); Single.just(null); Flowable.just(null);
-
RxJava 2.0 所有的函数接口(Function/Action/Consumer)均设计为可抛出Exception,解决编译异常需要转换问题;
-
RxJava 1.0 中Observable不能很好支持背压,在RxJava2.0 中将Oberservable彻底实现成不支持背压,而新增 Flowable 来支持背压。
背压是指在异步场景下,被观察者发送事件速度远快于观察者处理的速度,从而导致下游的buffer溢出。
最后
我整理了一套Android面试题合集,除了以上面试题,还包含【Java 基础、集合、多线程、虚拟机、反射、泛型、并发编程、Android四大组件、异步任务和消息机制、UI绘制、性能调优、SDN、第三方框架、设计模式、Kotlin、计算机网络、系统启动流程、Dart、Flutter、算法和数据结构、NDK、H.264、H.265.音频编解码、FFmpeg、OpenMax、OpenCV、OpenGL ES】
有需要的朋友可以扫描下方二维码,免费领取全部面试题+答案解析!!!
更多推荐
所有评论(0)