转载

RxJava(四):线程操作

博客主页

1. 调度器(Scheduler)的种类

1.1 RxJava 线程介绍

RxJava 是一个为异步编程而实现的库,异步是其重要特色,合理地利用异步编程能够提高系统的处理速度。但是异步也会带来线程的安全问题,而且异步并不等于并发,与异步概念相对应的是同步。

在默认情况下, RxJava 只在当前线程中运行,它是单线程的。此时 Observable 用于发射数据流,Observer 用于接收和响应数据流,各种操作符( Operators )用于加工数据流,它们都在同一个线程中运行,实现出来的是一个同步的函数响应式。然而,函数响应式的实际应用是大部分操作都在后台处理,前台响应的一个过程。所以需要对刚才的流程做一下修改,改成 Observable 生成发射数据流, Operators 加工数据流在后台线程中进行, Observer 在前台线程中接收井响应数据。此时会涉及使用多线程来操作 RxJava ,我们可以使用 RxJava 的调度器(Scheduler)来实现。

1.2 Scheduler

Scheduler 是 RxJava 对线程控制器的一个抽象, RxJava 内置了多个 Scheduler 的实现,它们基本满足绝大多数使用场景。

RxJava(四):线程操作

如果内置的 Scheduler 不能满足业务需求,那么可以使用自定义的 Executor 作为调度器,以满足个性化需求。

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
        emitter.onNext("hello");
        emitter.onNext("world");
    }
}).observeOn(Schedulers.newThread())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, Thread.currentThread().getName() + "#Next: " + s);
            }
        });

// 执行结果
 subscribe: main
 RxNewThreadScheduler-1#Next: hello
 RxNewThreadScheduler-1#Next: world

这里的 Observable 发射完数据之后,切换到 newThread 。后面的两次打印都是在 newThread 中进行的。

2. RxJava 线程模型

RxJava 的被观察者们在使用操作符时可以利用线程调度器——Scheduler 来切换线程

Observable.just("aaa", "bbb")
        .observeOn(Schedulers.newThread())
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                Log.d(TAG, "apply: " + Thread.currentThread().getName());
                return s.toUpperCase();
            }
        }).subscribeOn(Schedulers.single())
        .observeOn(Schedulers.io())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, Thread.currentThread().getName() + "#Next: " + s);
            }
        });

// 执行结果
 apply: RxNewThreadScheduler-1
 apply: RxNewThreadScheduler-1
 RxCachedThreadScheduler-1#Next: AAA
 RxCachedThreadScheduler-1#Next: BBB

RxJava(四):线程操作

其中,蓝色表示主线程、橙色表示 newThread 、粉色表示 I/O 线程

2.1 线程调度器

Schedulers 一个静态工厂类,通过分析 Schedulers 的源码可以看到它有多种不同类型的 Scheduler 。下面是 Schedulers 的各个工厂方法。

2.1.1 computation()

computation() 用于 CPU 密集型的计算任务,但井不适合 I/O 操作

public static Scheduler computation() {
    return RxJavaPlugins.onComputationScheduler(COMPUTATION);
}

2.1.2 io()

io() 用于 I/O 密集型任务,支持异步阻塞 I/O 操作,这个调度器的线程池会根据需要增长。对于普通的计算任务,请使用 Schedulers.computation()。

public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}

2.1.3 trampoline()

在 RxJava 2 中与在 RxJava 1 中的作用不同 。在 RxJava 2 表示立即执行,如果当前线程有任务在执行,则会将其暂停,等插入进来的新任务执行完成之后,再接着执行原先未完成的任务。在 RxJava 1 中,表示在当前线程中等待其他任务完成之后,再执行新的任务

public static Scheduler trampoline() {
    return TRAMPOLINE;
}

2.1.4 newThread()

newThread() 为每个任务创建一个新线程。

public static Scheduler newThread() {
    return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}

2.1.5 single()

single() 拥有一个线程单例,所有的任务都在这一个钱程中执行。当此线程中有任务执行时,它的任务将会按照先进先出的顺序依次执行。

public static Scheduler single() {
    return RxJavaPlugins.onSingleScheduler(SINGLE);
}

2.1.6 自定义的 Executor 来作为调度器

public static Scheduler from(@NonNull Executor executor) {
    return new ExecutorScheduler(executor, false);
}

2.2 Scheduler源码

Scheduler 是 RxJava 的线程任务调度器, Worker 是线程任务的具体执行者。从 Scheduler 源码可以看到, Scheduler 在 scheduleDirect()、 schedulePeriodicallyDirect() 方法中创建了 Worker,然后会分别调用 Worker 的 schedule()、 schedulePeriodically() 来执行任务.

// Scheduler.java

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}

public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);

    Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
    if (d == EmptyDisposable.INSTANCE) {
        return d;
    }

    return periodicTask;
}

Worker 是一个抽象类,每种 Scheduler 会对应一种具体的 Worker

public abstract static class Worker implements Disposable {
   
    @NonNull
    public Disposable schedule(@NonNull Runnable run) {
        return schedule(run, 0L, TimeUnit.NANOSECONDS);
    }

    @NonNull
    public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);

    @NonNull
    public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
        final SequentialDisposable first = new SequentialDisposable();

        final SequentialDisposable sd = new SequentialDisposable(first);

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        final long periodInNanoseconds = unit.toNanos(period);
        final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
        final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);

        Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
                periodInNanoseconds), initialDelay, unit);

        if (d == EmptyDisposable.INSTANCE) {
            return d;
        }
        first.replace(d);

        return sd;
    }
}

2.2.1 SingleScheduler

SingleScheduler 是 RxJava 2 新增 Scheduler。SingleScheduler 中有一个属性叫作 executor,它是使用 AtomicReference 包装 ScheduledExecutorService

// SingleScheduler.java

final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();

在 SingleScheduler 构造函数中, executor 会调用 lazySet()

public SingleScheduler(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
    executor.lazySet(createExecutor(threadFactory));
}

它的 createExecutor() 用于创建工作线程,看到通过 SchedulerPoolFactory 来创建 ScheduledExecutorService

static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
    return SchedulerPoolFactory.create(threadFactory);
}

在 SchedulerPoolFactory 类的 create(ThreadFactory factory) 中,使用 newScheduledThreadPool 线程池定义定时器,最大允许线程数为 1

public static ScheduledExecutorService create(ThreadFactory factory) {
    final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
    tryPutIntoPool(PURGE_ENABLED, exec);
    return exec;
}

在 SingleScheduler 中, 每次使用 ScheduledExecutorService 时,其实是使用 executor.get()。所以说, single 拥有一个线程单例

SingleScheduler 会创建一个 ScheduledWorker, ScheduledWorker 使用 JDK 的ScheduledExecutorService 作为 executor

下面是 ScheduledWorker 的 schedule() 方法, 使用 ScheduledExecutorService 的 submit() 或者 schedule() 来执行 runnable

public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    if (disposed) {
        return EmptyDisposable.INSTANCE;
    }

    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, tasks);
    tasks.add(sr);

    try {
        Future<?> f;
        if (delay <= 0L) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delay, unit);
        }

        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        dispose();
        RxJavaPlugins.onError(ex);
        return EmptyDisposable.INSTANCE;
    }

    return sr;
}

2.2.2 ComputationScheduler

ComputationScheduler 使用 FixedSchedulerPool 作为线程池,井且 FixedSchedulerPool 被

AtomicReference 装了一下。

从 ComputationScheduler 的源码中可以看出, MAX_THREADS 是 CPU 的数目。 FixedSchedulerPool 可以理解为拥有固定数量的线程池,数量为 MAX_THREADS

static {
    MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
    // ... 
}

static int cap(int cpuCount, int paramThreads) {
    return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads;
}

ComputationScheduler 会创建一个 EventLoopWorker

public Worker createWorker() {
    return new EventLoopWorker(pool.get().getEventLoop());
}

其中 getEventLoop() 是 FixedSchedulerPool 中的方法,返回了 FixedSchedulerPool 中的一个

PoolWorker

public PoolWorker getEventLoop() {
    int c = cores;
    if (c == 0) {
        return SHUTDOWN_WORKER;
    }
    // simple round robin, improvements to come
    return eventLoops[(int)(n++ % c)];
}

PoolWorker 继承自 NewThreadWorker, 也是线程数为 1 的 ScheduledExecutorService

2.2.3 IoScheduler

IoScheduler 使用 CachedWorkerPool 作为线程池,井且 CachedWorkerPool 也被 AtomicReference 包装了。

CachedWorkerPool 是基于 RxThreadFactory 这个 ThreadFactory 来创建的

static {
    // ... 

    WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);;

    NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
    // ...
}

在 RxThreadFactory 中, prefix 和 incrementAndGet() 来创建新线程的名称

@Override
public Thread newThread(Runnable r) {
    StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());

    String name = nameBuilder.toString();
    Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
    t.setPriority(priority);
    t.setDaemon(true);
    return t;
}

IoScheduler 创建的线程数是不固定的,可以通过 IoScheduler 的 size() 来获得当前的线程数。一般情况下, ComputationScheduler 的线程数等于 CPU 的数目

public int size() {
    return pool.get().allWorkers.size();
}

需要特别注意的是, ComputationScheduler 和 IoScheduler 都是依赖线程池来维护线程的,区别就是 IoScheduler 线程池中的个数是无限的,由 prefix 和 incrementAndGet()产生的递增值来决定线程的名字。而 ComputationScheduler 中则是一个固定线程数量的线程池,数据为 CPU 数目,并且不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU

同样, IoScheduler 会创建 EventLoopWorker

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

但这个 EventLoopWorker 是 IoScheduler 的内部类,与 ComputationScheduler 创建的 EventLoopWorker 不同。只是二者的名称相同。

2.2.4 NewThreadScheduler

NewThreadScheduler 会创建 NewThreadWorker, NewThreadWorker 的构造函数使用的也是 SchedulerPoolFactory

public NewThreadWorker(ThreadFactory threadFactory) {
    executor = SchedulerPoolFactory.create(threadFactory);
}

与 SingleScheduler 不同的是, SingleScheduler 的 executor 是使用 AtomicReference 包装的 ScheduledExecutorService。每次使用时,都会调用 executor.get()

然而, NewThreadScheduler 每次都会创建一个新的线程。

2.2.5 TrampolineScheduler

TrampolineScheduler 会创建 TrampolineWorker,在 TrampolineWorker 内部维护着一个 PriorityBlockingQueue 。任务进入该队列之前,会先用 TimedRunnable 封装一下。

static final class TimedRunnable implements Comparable<TimedRunnable> {
    final Runnable run;
    final long execTime;
    final int count; // In case if time between enqueueing took less than 1ms

    volatile boolean disposed;

    TimedRunnable(Runnable run, Long execTime, int count) {
        this.run = run;
        this.execTime = execTime;
        this.count = count;
    }

    @Override
    public int compareTo(TimedRunnable that) {
        int result = ObjectHelper.compare(execTime, that.execTime);
        if (result == 0) {
            return ObjectHelper.compare(count, that.count);
        }
        return result;
    }
}

可以看到 TimedRunnable 实现了 Comparable 接口,会比较任务的 execTime 和 count

任务在进入 queue 之前, count 每次都会 +1

final TimedRunnable timedRunnable = new TimedRunnable(action, execTime, counter.incrementAndGet());
 queue.add(timedRunnable);

所以,在使用 TrampolineScheduler 时,新的任务总是会优先执行。

2.3 线程调度

默认情况下不做任何线程处理, Observable 和 Observer 处于同一线程中。如果想要切换线程,则可以使用 subscribeOn() 和 observeOn()。

2.3.1 subscribeOn

subscribeOn 通过接收一个 Scheduler 参数,来指定对数据的处理运行在特定的线程调度器 Scheduler 上

若多次执行 subscribeOn ,则只有一次起作用

subscribeOn 的源码可以看到,每次调用 subscribeOn 都会创建一个 ObservableSubscribeOn 对象。

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

ObservableSubscribeOn 真正发生订阅的方法是 subscribeActual(observer)

@Override
public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

    observer.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

其中 SubscribeOnObserver 是下游的 Observer 通过装饰器模式生成的,它实现了 Observer、Disposable 接口。

接下来,在上游的线程中执行下游 Observer 的 onSubscribe(Disposable d)方法。

observer.onSubscribe(parent);

然后,将子线程的操作加入 Disposable 管理中, 加入 Disposable 后可以方便上下游的统一管理。

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

在这里,已经调用了对应 scheduler 的 scheduleDirect 方法。 scheduleDirect() 传入的是一个Runnable ,也就是下面的 SubscribeTask

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

SubscribeTask 会执行 run() 对上游的 Observable,从而进行订阅。

此时,己经在对应的 Scheduler 线程中运行了

source.subscribe(parent);

在 RxJava 链式操作中,数据的处理是自下而下。如果多次调用 subscribeOn,则最上面的线程切换最晚执行,所以就变成了只有第一次切换线程才有效。

2.3.2 observeOn

observeOn 同样接收一个 Scheduler 参数,用来指定下游操作运行在特定的线程调度器 Scheduler 上。

若多次执行 observeOn,则每次都起作用,线程会一直切换。

observeOn() 的源码可以看到,每次调用 observeOn() 都会创建 ObservableObserveOn 对象

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

ObservableObserveOn 真正发生订阅的方法是 subscribeActual(observer)

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

如果 scheduler 是 TrampolineScheduler,则上游事件和下游事件会立即产生订阅。

如果不 TrampolineScheduler,则 scheduler 会创建自己的 Worker,然后上游事件和下游事件产生订阅,生成一个 ObserveOnObserver 对象,封装了下游真正的 Observer

ObserveOnObserver 是 ObservableObserveOn 内部类,实现了 Observer、Runnable 接口。与 SubscribeOnObserver 不同的是, SubscribeOnObserver 实现了 Observer、Disposable 接口

在 ObserveOnObserver 的 onNext 中, schedule() 执行了具体调度的方法

public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

其中 worker 是当前 scheduler 创建的 Worker,this 指的是当前的 ObserveOnObserver 对象,

this 也实现了 Runnable 接口

再来看看 Runnable 接口的实现方法 run(),这个方法是在 Worker 对应的线程里执行的。drainNormal 会取出 ObserveOnObserver 的 queue 里的数据进行发送。

@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

若下游多次调用 observeOn(),则线程会一直切换。每次切换线程,都会把对应的 Observer 对象的各个方法的处理执行在指定的线程中。

2.4 示例

2.4.1 单独使用subscribeOn

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
        emitter.onNext("hello");
        emitter.onNext("world");
    }
}).subscribeOn(Schedulers.newThread())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, Thread.currentThread().getName() + "#Next: " + s);
            }
        });

// 执行结果
 subscribe: RxNewThreadScheduler-1
 RxNewThreadScheduler-1#Next: hello
 RxNewThreadScheduler-1#Next: world

所有的操作都走在 newThread 运行的,包括发射数据。

2.4.2 多次切换线程

多次调用 subscribeOn 和 observeOn 的例子

Observable.just("HELLO#1", "HELLO#2")
        .subscribeOn(Schedulers.single())
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                s = s.toLowerCase();
                Log.d(TAG, "map##1 threadName: " + Thread.currentThread().getName() + " s:" + s);
                return s;
            }
        }).observeOn(Schedulers.io())
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                s = s + " RxJava.";
                Log.d(TAG, "map##2 threadName: " + Thread.currentThread().getName() + " s:" + s);
                return s;
            }
        })
        .subscribeOn(Schedulers.computation())
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                s = s + "it is a test.";
                Log.d(TAG, "map##3 threadName: " + Thread.currentThread().getName() + " s:" + s);
                return s;
            }
        })
        .observeOn(Schedulers.newThread())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "subscribe threadName:" + Thread.currentThread().getName() + "#Next: " + s);
            }
        });

//执行结果
 map##1 threadName: RxSingleScheduler-1 s:hello#1
 map##1 threadName: RxSingleScheduler-1 s:hello#2
 map##2 threadName: RxCachedThreadScheduler-1 s:hello#1 RxJava.
 map##3 threadName: RxCachedThreadScheduler-1 s:hello#1 RxJava.it is a test.
 map##2 threadName: RxCachedThreadScheduler-1 s:hello#2 RxJava.
 map##3 threadName: RxCachedThreadScheduler-1 s:hello#2 RxJava.it is a test.
 subscribe threadName:RxNewThreadScheduler-1#Next: hello#1 RxJava.it is a test.
 subscribe threadName:RxNewThreadScheduler-1#Next: hello#2 RxJava.it is a test.

3. Scheduler 的测试

TestScheduler 是专门用于测试的调度器,与其他调度器的区别是,TestScheduler 只有被调用了时间才会继续。 TestScheduler 是一种特殊的、非线程安全的调度器,用于测试一些不引入真实并发性、允许手动推进虚拟时间的调度器。

在 RxJava 2.x 中,原先 RxJava l.x 的 Schedulers.test() 被去掉了 。要想获得 TestScheduler 对象,则可以通过直接 new TestScheduler() 的方式来实现。

TestScheduler 所包含的方法井不多,下面罗列几个关键的方法。

3.1 advanceTimeTo

将调度器的时钟移动到某个特定时刻。

如,时钟移动到 lOms.

scheduler.advanceTimeTo(10, TimeUnit.MILLISECONDS);

下例展示了 0s、20s、40s 各会打印什么结果

TestScheduler scheduler = new TestScheduler();

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "immediate");
    }
});

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "20s");
    }
}, 20, TimeUnit.SECONDS);

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "40s");
    }
}, 40, TimeUnit.SECONDS);

scheduler.advanceTimeTo(1, TimeUnit.MILLISECONDS);
Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.MILLISECONDS));

scheduler.advanceTimeTo(20, TimeUnit.SECONDS);
Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.SECONDS));

scheduler.advanceTimeTo(40, TimeUnit.SECONDS);
Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.SECONDS));

// 执行结果
 immediate
 virtual time :1
 20s
 virtual time :20
 40s
 virtual time :40

使用 advanceTimeTo 之后,移动不同的时间点会打印不同的内容。

3.2 advanceTimeBy

将调度程序的时钟按指定的时间向前移动

例如,时钟移动了 lOms

scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);

再次调用刚才的方法,时钟又会移动 lOms。此时,时钟移动到 20ms,这是一个累加的过程。

下例,使用了 timer 操作符, timer 是按照指定时间延迟发送的操作符,timer() 井不会按周期地执行。该例子展示了 2s 后 atomicLong 会自动加1

TestScheduler scheduler = new TestScheduler();

final AtomicLong atomicLong = new AtomicLong();
Observable.timer(2, TimeUnit.SECONDS, scheduler).subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long aLong) throws Exception {
        atomicLong.incrementAndGet();
    }
});

Log.d(TAG, "atomicLong's value=" + atomicLong.get() + ", virtual time:" + scheduler.now(TimeUnit.SECONDS));

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

Log.d(TAG, "atomicLong's value=" + atomicLong.get() + ", virtual time:" + scheduler.now(TimeUnit.SECONDS));

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

Log.d(TAG, "atomicLong's value=" + atomicLong.get() + ", virtual time:" + scheduler.now(TimeUnit.SECONDS));

 // 执行结果
 atomicLong's value=0, virtual time:0
 atomicLong's value=0, virtual time:1
 atomicLong's value=1, virtual time:2

这个结果符合预期,最初 atomicLong 为 0,时钟移动到 1s 时它的值仍然为 0;时钟再移动 ls ,即相当于时钟移动到 2s 所以它的值变为 1

advanceTimeBy() 也可以传负数,表示回到过去。

3.3 triggerActions

triggerActions 不会修改时间,它执行计划中的但是未启动的任务,已经执行过的任务不会再启动。

TestScheduler scheduler = new TestScheduler();

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "immediate");
    }
});

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "20s");
    }
}, 20, TimeUnit.SECONDS);

Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.MILLISECONDS));

// 执行结果
 virtual time :0

增加 scheduler.triggerActions() 后

TestScheduler scheduler = new TestScheduler();

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "immediate");
    }
});

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "20s");
    }
}, 20, TimeUnit.SECONDS);

scheduler.triggerActions();
Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.MILLISECONDS));

// 执行结果
 immediate
 virtual time :0

再增加 advanceTimeBy()

TestScheduler scheduler = new TestScheduler();

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "immediate");
    }
});

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "20s");
    }
}, 20, TimeUnit.SECONDS);

scheduler.triggerActions();
Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.MILLISECONDS));

scheduler.advanceTimeBy(20, TimeUnit.SECONDS);

// 执行结果
 immediate
 virtual time :0
 20s

如果将 triggerActions() 放在最后, 看看效果。

TestScheduler scheduler = new TestScheduler();

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "immediate");
    }
});

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "20s");
    }
}, 20, TimeUnit.SECONDS);


Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.MILLISECONDS));

scheduler.advanceTimeBy(20, TimeUnit.SECONDS);

scheduler.triggerActions();

// 执行结果
 virtual time :0
 immediate
 20s

因为己经使用了 advanceTimeBy(),所以即使再调用 triggerActions(),也不会执行己经启动过的任务。

如果我的文章对您有帮助,不妨点个赞鼓励一下(^_^)

原文  https://segmentfault.com/a/1190000021590933
正文到此结束
Loading...