RxJava #
官方文档: https://github.com/ReactiveX/RxJava
基本概念 #
发射器发射的数据流经过各种操作符(Operator/Action)处理后,最终被消费(Subscriber)。
- 发射器/被观察者
- Flowable:发射0或N个数据,支持背压。当消费速度比生产速度过慢导致队列溢出会报MissingBackpressureException。
- Observable:发射0或N个数据。
- Single:发射单个数据或单个错误。
- Completable:只处理completion或error信号量。
- Maybe:Single或Completable。
- Scheduler:代替Thread和ExecutorService,定义了事件的处理线程环境。可以通过
Schedulers.from(executor)
手动指定线程池。 - Subject:既是观察者(Observer)也是被观察者(Observable)。
基础使用 #
后台处理任务,主线程刷新数据,是Android APP开发中最常见的使用方式。
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
默认代码运行在同一线程
// t为自定义线程池,core=3,max=6
Flowable.range(1, 10)
.observeOn(scheduler)
// t-1
.map(n -> n)
// main,输出有序
.blockingSubscribe(System.out::println);
并行执行
// t为自定义线程池,core=3,max=6
Flowable.range(1, 10)
.flatMap(v ->
// main
Flowable.just(v)
.subscribeOn(Schedulers.computation())
// t-1/t-2/t-3
.map(w -> w * w)
)
// main,输出无序
.blockingSubscribe(System.out::println);
改变默认并行度
Flowable.range(1, 10)
.parallel()
.runOn(scheduler)
// t-1/t-2/t-3
.map(v -> v * v)
.sequential()
// main,输出有序
.blockingSubscribe(System.out::println);
flatMap()方式用于创建子数据流。
数据依赖
当数据可用时继续执行下一阶段的计算。
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))
service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
)