一、CombineLatest
CombineLatest操作符可以将2~9个Observable发射的数据组装起来然后再发射出来。不过还有两个前提:
- 所有的Observable都发射过数据。
- 满足条件1的时候任何一个Observable发射一个数据,就将所有Observable最新发射的数据按照提供的函数组装起来发射出去。
Rxjava实现CombineLast操作符可以让我们直接将组装的Observable作为参数传值,也可以将所有的Observable装在一个List里面穿进去。
下面我们创建几个Observable对象,分别直接传值和使用List传值将其组装起来
对其进行订阅
运行结果如下
二、Join
Join操作符根据时间窗口来组合两个Observable发射的数据,每个Observable都有一个自己的时间窗口,要组合的时候,在这个时间窗口内的数据都有有效的,可以拿来组合。
Rxjava还实现了groupJoin,基本和join相同,只是最后组合函数的参数不同。
使用join操作符需要4个参数,分别是:
- 源Observable所要组合的目标Observable
- 一个函数,就收从源Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了源Observable发射出来数据的有效期
- 一个函数,就收从目标Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了目标Observable发射出来数据的有效期
- 一个函数,接收从源Observable和目标Observable发射来的数据,并返回最终组合完的数据。
下面我们使用join和groupJoin操作符分别来组合两个Observable对象
分别进行订阅
运行结果如下,可以看到虽然目标Observable发射了4个数据,但是源Observable只发射了一个有效期为3秒的数据,所以最终的组合结果也只有3个数据。
三、Merege
Merge操作符将多个Observable发射的数据整合起来发射,就如同是一个Observable发射的数据一样。但是其发射的数据有可能是交错的,如果想要没有交错,可以使用concat操作符。
当某一个Observable发出onError的时候,merge的过程会被停止并将错误分发给Subscriber,如果不想让错误终止merge的过程,可以使用MeregeDelayError操作符,会将错误在merge结束后再分发。
下面我们分别使用merge和mergeDelayError操作符来进行merge操作。
分别对其订阅
运行结果如下:
四、StartWith、Switch
StartWith操作符会在源Observable发射的数据前面插上一些数据。不仅仅只可以插入一些数据,还可以将Iterable和Observable插入进入。如果插入的是Observable,则这个Observable发射的数据会插入到
源Observable发射数据的前面。
switch操作符在Rxjava上的实现为switchOnNext,用来将一个发射多个小Observable的源Observable转化为一个Observable,然后发射这多个小Observable所发射的数据。
需要注意的就是,如果一个小的Observable正在发射数据的时候,源Observable又发射出一个新的小Observable,则前一个Observable发射的数据会被抛弃,直接发射新
的小Observable所发射的数据。可以看示意图中的黄色圆圈就被丢弃了。
下面使用startWith和switchOnNext操作符来组合两个Observable
分别进行订阅
运行结果如下,可以看到startwith将-1和0插入到前面。使用siwtch的时候第一个小Observable只发射出了两个数据,第二个小Observable就被源Observable发射出来了,所以其接下来的两个数据被丢弃。
五、Zip
Zip操作符将多个Observable发射的数据按顺序组合起来,每个数据只能组合一次,而且都是有序的。最终组合的数据的数量由发射数据最少的Observable来决定。
Rxjava实现了zip和zipWith两个操作符。
下面我们使用zip和zipWith操作符来组合数据
分别进行订阅
运行结果如下,可以看到,最终都发射出了两个数据,因为createObserver(2)所创建的Observable只会发射两个数据,所以其他Observable多余发射的数据都被丢弃了。
Combning的操作符就到这了,本文的demo程序见github