RxJava 2 异常处理分析

使用 RxJava 2 时稍不注意就会出现 UndeliverableException。在桌面环境中该异常还不算严重,但对于 Android 应用来说却是致命的。

问题描述

这个版本上线后出现收到很多 UndeliverableException 上报,导致 crash 率明显上升。UndeliverableException 是使用 RxJava 2 过程中上报的。但之前也使用了 RxJava 2,并没有引起这么多 crash。分析后发现 crash 是这样引起的:

  1. 这个版本对网络层了进行改造。准备逐步删除旧的网络层,统一使用新的网络层,所以业务代码调整成使用新的网络层请求后台
  2. 旧的网络层使用了 RxJava 2 处理连接请求(例如,先请求 token,拿到 token 后再查个人信息),但新的网络层对 RxJava 2 支持不完善
  3. 为了既能使用新的网络层又不对原有业务代码做过多修改,增加 RxRequestServiceBridge 工具类将新网络层网络请求调用转换成 RxJava 的形式
  4. 问题出在 RxRequestServiceBridge 中,它有缺陷。其源码类似下面这样
  5. 业务代码经过 RxRequestServiceBridge 转换后访问后台接口,其中有个接口稳定性不好,频繁出错
  6. 后台接口访问失败将 RxRequestServiceBridge 的问题暴露出来,最终导致 app 崩溃

RxRequestServiceBridge 的代码类似这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class RxRequestServiceBridge {

public static <T extends Message> Observable<T> convert(final Message req) {
return convert(900, req);
}

public static <T extends Message> Observable<T> convert(final int cmd, final Message req) {
return Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(final ObservableEmitter<T> emitter) throws Exception {
call(emitter, cmd, req);
}
});
}

private static <T extends Message> void call(final ObservableEmitter<T> emitter, final int cmd, final Message req) {
new RequestService()
.reqNetWork(cmd, req)
.setCallBack(new RequestService.OnCallBack<T>() {
@Override
public void onSuccess(T message) {
emitter.onNext(message);
emitter.onComplete();
}
}

@Override
public void onFailed(int errorCode, String errorMsg, T message) {
emitter.onError(new RetrofitException(errorCode, errorMsg));
}
}).request();
}

}

crash 日志如下:

1
2
3
4
5
com.app.RetrofitException: 连接失败,请重试!(#99)
io.reactivex.exceptions.UndeliverableException:com.app.RetrofitException: cmd=[3], reqName=[], resultCode=[99], errorMessage=[连接失败,请重试!(#99)]
io.reactivex.plugins.RxJavaPlugins.void onError(java.lang.Throwable)(Unknown Source:20)
......
com.app.RxRequestServiceBridge$2.void onFailed(int,java.lang.String,com.squareup.wire.Message)(Unknown Source:32)

RxJava 2 异常处理

搜了下,才发现2017年就有人给 RxJava 2 提出类似的问题,见这个issue #5214

复现步骤很简单:

  1. 订阅某个可能抛出异常的 Observable,Disposable disposable = observable.subscribe()。以访问后台接口这个过程为例,其中可能抛出 IOException
  2. Observable 耗时太久,在其结束前调用 disposable.dispose()。以 Android app 为例,某个界面网络加载太久,用户等不及所以退出了,Activity.onDestory() 中调用了 disposable.dispose()
  3. Observable 发生 Exception。以网络请求为例,可能的场景是发生了读超时或连接超时

写代码来看看复现结果 (注,这段代码参考自 issue #5214)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void disposeBeforeExceptionOccurredWithoutErrorHandler() throws InterruptedException {
RxJavaPlugins.setErrorHandler(null);

// 让当前线程不要过早退出
final CountDownLatch keepRunning = new CountDownLatch(1);

final CountDownLatch mainThreadLock = new CountDownLatch(1);
final CountDownLatch workerThreadLock = new CountDownLatch(1);

Disposable disposable = Observable.fromCallable(
new Callable<Integer>() {
@Override
public Integer call() throws Exception {
mainThreadLock.countDown();
workerThreadLock.await();
throw new Exception("后台线程有异常, 会被 RxJava 吞掉么");
}
})
.subscribeOn(Schedulers.io())
.subscribe(emptyConsumer(),
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.err.println("I'm Local Exception Handler");
throwable.printStackTrace();
}
});

mainThreadLock.await();
disposable.dispose();
workerThreadLock.countDown();

keepRunning.await(5, TimeUnit.SECONDS);
}

这里对代码中有三个不同作用的 CountDownLatch

  • keepRunning 用于保持当前线程至少运行5秒
  • mainThreadLockworkerThreadLock 用于制造 dispose() 后抛出异常的场景

运行后果然有情况,我们自己抛出的异常不见了,出现另一个预期外的异常 UndeliverableException

1
2
3
4
5
6
io.reactivex.exceptions.UndeliverableException: java.lang.InterruptedException
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:48)
at io.reactivex.Observable.subscribe(Observable.java:11194)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:463)

官方对这个预期外的异常解释如下:

The exception is there because RxJava 2 has the policy of NEVER allowing an onError call to be lost. It is either delivered downstream or thrown as a global UndeliverableException if the observable has already terminated. It is up to the creator of the Observable to ‘properly’ handle the case where the observable has ended and an Exception occurs.

RxJava can’t decide if your exception thrown from you callable is important or not, even if the sequence is cancelled. Indeed you have to install a global error consumer and decide yourself if that error is something your app should crash on or not.
The current wisdom is that NullPointerExceptions, IllegalArgumentExceptions, IllegalStateExceptions are crash-worthy while IOExceptions, SocketExceptions and InterruptedExceptions are likely not, but depends on your actual app.

简单来说,

  • 如果 dispose() 后拿到数据也好,抛出异常也好,RxJava 是不知道该如何处理的。但它肯定不能”吞掉异常”,所以就有了 UndeliverableException
  • 可以使用 RxJavaPlugins.setErrorHandler() 设置一个 ErrorHandler 用于处理上述情况中的异常

RxJava 官方文档中 Error handle 其实对此有比较详细的描述。

RxJava error handle

文档(译)

这一节内容翻译自 Error handle 其实对此有比较详细的描述。

RxJava 2.0 一个重要的设计就是不能吞掉任何 Throwable。这意味着由于下游的生命周期已到达结束状态导致无法发射(emit)错误或者下游取消一个序列等情形,会导致发射(emit)一个错误。

这类错误会被路由到 RxJavaPlugins.onError 处理器。可以使用 RxJavaPlugins.setErrorHandler(Consumer<Throwable>) 方法来设置新的 onError 处理器。如果没有指定这个处理器,缺省情况下 RxJava 会将 Throwable 堆栈信息输出到控制台并调用当前线程的未捕获异常处理器(uncaught exception handler)。

在桌面 Java 环境中,uncaught exception handler 不会对基于 ExecutorServiceScheduler 中运行的线程产生的错误进行任何处理,所以应用可以继续运行。但 Android 平台处理更严格,它在这种情况下会终止应用。

这是否是预想中的行为还有争论,但如果你想避免调用到 uncaught exception handler,无论是直接使用还是间接使用 RxJava2,最终的应用都应该设置一个无操作的 handler (no-op handler):

1
2
3
4
5
// If Java 8 lambdas are supported
RxJavaPlugins.setErrorHandler(e -> { });

// If no Retrolambda or Jack
RxJavaPlugins.setErrorHandler(Functions.<Throwable>emptyConsumer());

不建议基于 RxJava2 的第三方库在测试环境以外修改 error handler。(It is not advised intermediate libraries change the error handler outside their own testing environment.)

不幸的是,RxJava 不知道哪些对象处于生命周期结束状态(out-of-lifecycle),未被分发的异常是应该让应用 crash。而定位这类问题的根源又非常麻烦,尤其是问题来自链的底部(especially if they originate from a source and get routed to RxJavaPlugins.onError somewhere lower the chain) 却被路由到 RxJavaPlugins.onError 时。

所以 RxJava 2.0.6 引入了特定的异常包装器用于包装原始异常以便容易定位问题。

  • OnErrorNotImplementedException - 重新引入这个异常,用于检查用户是否忘记给 subscribe() 方法传入一个 error handler
  • ProtocolViolationException - 这个异常表示操作符中有 bug
  • UndeliverableException - Subscriber/Observer 生命周期限制导致原始异常无法分发时,使用 UndeliverableException 进行包装。RxJavaPlugins.onError 自动使用这个异常以保持原有的完整堆栈信息。

如果未被分发的异常是 NullPointerException, IllegalStateException (UndeliverableExceptionProtocolViolationException 继承自 IllegalStateException), IllegalArgumentException, CompositeException, MissingBackpressureException, OnErrorNotImplementedException 的实例或子类实例,不会使用 UndeliverableException 对该异常进行包装。

此外,一些第三方库被 cancel()dispose() 调用中断时会抛出异常,大部分时候这种中断会引起未分发的异常。RxJava 2.0.6 内部现在总是先对 Subscription/Disposable 执行 canceldispose,再 cancelling/disposing 一个 task 或 worker (这会导致目标线程出现中断,which causes the interrupt on the target thread)。

1
2
3
4
5
6
7
8
9
10
// in some library
try {
doSomethingBlockingly()
} catch (InterruptedException ex) {
// check if the interrupt is due to cancellation
// if so, no need to signal the InterruptedException
if (!disposable.isDisposed()) {
observer.onError(ex);
}
}

如果第三方库已经这样做了,InterruptedExceptions 会停止而不是继续住下游传递。如果第三方库还没有使用这个模式,建议尽快升级/更新有问题的代码。

如果准备添加一个非空的全局 error consumer,以下是个例子。这个例子基于异常是 bug 还是可忽略的应用/网络状态来管理未分发的异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
RxJavaPlugins.setErrorHandler(e -> {
if (e instanceof UndeliverableException) {
e = e.getCause();
}
if ((e instanceof IOException) || (e instanceof SocketException)) {
// fine, irrelevant network problem or API that throws on cancellation
return;
}
if (e instanceof InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return;
}
if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
// that's likely a bug in the application
Thread.currentThread().getUncaughtExceptionHandler()
.handleException(Thread.currentThread(), e);
return;
}
if (e instanceof IllegalStateException) {
// that's a bug in RxJava or in a custom operator
Thread.currentThread().getUncaughtExceptionHandler()
.handleException(Thread.currentThread(), e);
return;
}
Log.warning("Undeliverable exception received, not sure what to do", e);
});

RxJava2 Adapter

RxJava2 Adapter 用于 Retrofit 2 集成 RxJava 2。看看它是如何进行错误处理的。关键代码见 CallExecuteObservable.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
final class CallExecuteObservable<T> extends Observable<Response<T>> {

@Override protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
CallDisposable disposable = new CallDisposable(call);
observer.onSubscribe(disposable);
if (disposable.isDisposed()) {
return;
}

boolean terminated = false;
try {
Response<T> response = call.execute();
if (!disposable.isDisposed()) {
observer.onNext(response);
}
if (!disposable.isDisposed()) {
terminated = true;
observer.onComplete();
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (terminated) {
RxJavaPlugins.onError(t);
} else if (!disposable.isDisposed()) {
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
}



}

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public final class RxJavaPlugins {
@Nullable
static volatile Consumer<? super Throwable> errorHandler;

public static void setErrorHandler(@Nullable Consumer<? super Throwable> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
errorHandler = handler;
}

public static void onError(@NonNull Throwable error) {
Consumer<? super Throwable> f = errorHandler;

if (error == null) {
error = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
} else {
if (!isBug(error)) {
error = new UndeliverableException(error);
}
}

if (f != null) {
try {
// 关键点二
f.accept(error);
return;
} catch (Throwable e) {
// Exceptions.throwIfFatal(e); TODO decide
e.printStackTrace(); // NOPMD
uncaught(e);
}
}

// 关键点三
error.printStackTrace(); // NOPMD
uncaught(error);
}

static void uncaught(@NonNull Throwable error) {
Thread currentThread = Thread.currentThread();
UncaughtExceptionHandler handler = currentThread.getUncaughtExceptionHandler();
handler.uncaughtException(currentThread, error);
}
}

public final class ObservableFromCallable<T> extends Observable<T> implements Callable<T> {
final Callable<? extends T> callable;
public ObservableFromCallable(Callable<? extends T> callable) {
this.callable = callable;
}
@Override
public void subscribeActual(Observer<? super T> s) {
DeferredScalarDisposable<T> d = new DeferredScalarDisposable<T>(s);
s.onSubscribe(d);
if (d.isDisposed()) {
return;
}
T value;
try {
value = ObjectHelper.requireNonNull(callable.call(), "Callable returned null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// 关键点一
if (!d.isDisposed()) {
s.onError(e);
} else {
RxJavaPlugins.onError(e);
}
return;
}
d.complete(value);
}
}

ObservableFromCallable.subscribeActual() 方法为例可以看到 RxJava 处理异常时有几个关键点:

  • 关键点一,isDisposed() 为 false 才交由 local error handler 进行错误处理,为 true 时直接交由 global error handler
  • 关键点二,global error handler 不为 null 时,由其进行错误处理
  • 关键点三,global error handler 为 null 时,RxJava 先出异常信息,最终交由当前线程的 UncaughtExceptionHandler 进行错误处理

当前线程的 UncaughtExceptionHandler 如果处理错误很关键。很不幸,在 Android 应用中默认的处理行为往往是 结束 应用,也就是我们看到的 crash。

On desktop Java, this latter handler does nothing on an ExecutorService backed Scheduler and the application can keep running. However, Android is more strict and terminates the application in such uncaught exception cases.

解决方法

经过上面的源码分析,解决方法其实很简单。RxRequestServiceBridge 的代码修改如下,主要变化是调用 ObservableEmitter.onXXX() 方法前先判断是否已经被 dispose 掉了。如果是,不应将数据或异常往下游 ObservableEmitter 继续传递。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class RxRequestServiceBridge {

public static <T extends Message> Observable<T> convert(final Message req) {
return convert(900, req);
}

public static <T extends Message> Observable<T> convert(final int cmd, final Message req) {
return Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(final ObservableEmitter<T> emitter) throws Exception {
call(emitter, cmd, req);
}
});
}

private static <T extends Message> void call(final ObservableEmitter<T> emitter, final int cmd, final Message req) {
new RequestService()
.reqNetWork(cmd, req)
.setCallBack(new RequestService.OnCallBack<T>() {
@Override
public void onSuccess(T message) {
if (!emitter.isDisposed()) {
emitter.onNext(message);
emitter.onComplete();
}
}

@Override
public void onFailed(int errorCode, String errorMsg, T message) {
if (!emitter.isDisposed()) {
emitter.onError(new RetrofitException(errorCode, errorMsg));
}
}
}).request();
}

}

参考