RxJava

RxJava #

官方文档: https://github.com/ReactiveX/RxJava

基本概念 #

发射器发射的数据流经过各种操作符(Operator/Action)处理后,最终被消费(Subscriber)。

  • 发射器/被观察者
    • Flowable:发射0或N个数据,支持背压。当消费速度比生产速度过慢导致队列溢出会报MissingBackpressureException。
    • Observable:发射0或N个数据。
    • Single:发射单个数据或单个错误。
    • Completable:只处理completion或error信号量。
    • Maybe:Single或Completable。
  • Scheduler:代替Thread和ExecutorService,定义了事件的处理线程环境。可以通过Schedulers.from(executor)手动指定线程池。
  • Subject:既是观察者(Observer)也是被观察者(Observable)。

基础使用 #

后台处理任务,主线程刷新数据,是Android APP开发中最常见的使用方式。

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

默认代码运行在同一线程

// t为自定义线程池,core=3,max=6
Flowable.range(1, 10)
        .observeOn(scheduler)
        // t-1
        .map(n -> n)
        // main,输出有序
        .blockingSubscribe(System.out::println);

并行执行

// t为自定义线程池,core=3,max=6
Flowable.range(1, 10)
        .flatMap(v ->
                // main
                Flowable.just(v)
                        .subscribeOn(Schedulers.computation())
                        // t-1/t-2/t-3
                        .map(w -> w * w)
        )
        // main,输出无序
        .blockingSubscribe(System.out::println);

改变默认并行度

Flowable.range(1, 10)
        .parallel()
        .runOn(scheduler)
        // t-1/t-2/t-3
        .map(v -> v * v)
        .sequential()
        // main,输出有序
        .blockingSubscribe(System.out::println);

flatMap()方式用于创建子数据流。

数据依赖

当数据可用时继续执行下一阶段的计算。

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)
沪ICP备17055033号-2