RxJava的错误处理主要分为两类,retry系列以及onErrorRetrun系列。retry系列是当错误的时候,重新subscribe。onErrorReturn系列则是当出错了返回数据到onNext中。本文介绍下retry系列相关用法。
retry
系列的操作符主要有 retry()
), retry(long)
)和 retry(Func2)
), retry(n)
当发生onError的时候会重试n次,例如如下代码:
@Test
public void testRetry() {
final AtomicInteger atomicInteger = new AtomicInteger(0);
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext(String.valueOf(System.currentTimeMillis()));
subscriber.onError(new Error("error"));
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
atomicInteger.incrementAndGet();
}
})
.retry(2)
.toBlocking()
.subscribe(new TestSubscriber<String>());
Assert.assertTrue(atomicInteger.intValue() == 3);
}
初始化 atomicInteger
为0,在 doOnSubscribe
加一,重试次数为2次,所以最终相当于 onSubscribe
执行了3次。
另外一个方法 retryWhen
的方法是根据得到的 Throwable
生成新的 Observerable
, 示例代码如下:
@Test
public void testRetryWhen() {
final AtomicInteger atomicInteger = new AtomicInteger(3);
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext(String.valueOf(System.currentTimeMillis()));
subscriber.onError(new Error(String.valueOf(atomicInteger.decrementAndGet())));
}
})
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.takeWhile(new Func1<Throwable, Boolean>() {
@Override
public Boolean call(Throwable throwable) {
return Integer.parseInt(throwable.getMessage()) > 0;
}
})
.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
return Observable.timer(1, TimeUnit.SECONDS);
}
});
}
})
.toBlocking()
.subscribe(new TestSubscriber<String>());
Assert.assertEquals(atomicInteger.intValue(), 0);
}
这里接受到 throwable
的 Observerable
后,用 takeWhile
来判断 thrwoable
的属性,这里用一个AtomicInteger,设置最大重试次数为3,每次减1,当等于0则不再重试,再现实生活中,也可以判断 Exception
的类型等方式判断是否需要重试。接着用 flatMap
返回 Observerable.timer
来延迟重试到1秒以后。