Rxjava允许我们来自定义操作符来满足我们特殊的需求。如果我们的自定义操作符想要作用到Observable发射出来的数据上,我们就要使用lift操作符;如果我们的自定义操作符想要改变整个的Observable,就需要使用compose操作符了。
一、 lift
当我们自定义了一个操作符后,使用lift可以将我们自定义的操作符和其它的操作符一起做链式调用,就好像Rxjava原生的操作符一样。下面我们自定义一个操作符,并使用lift添加到Observable的链式调用里面:
我们使用lift将我们自定义的操作符夹在两个map操作符之间,形成链式的调用。
对其进行订阅:
运行结果如下,可以看到输出的结果也正如预期的一样,我们自定义的操作符作用在两个map操作符之间。
二、compose
Compose操作符是将源Observable按照自定义的方式转化成另外一个新的Observable。可以这么说compose是对Observable进行操作的而lift是对Subscriber进行操作的,作用点是不同的。
下面我们自定义一个Transformer对象,并使用compse操作符将其应用到Observable上:
在这个我们自定义的Transformer对象中,我们将原先发射Integer的Observable转化成了一个发射String内容的Observable;并且我们还添加了doOnNext操作符来打印出发射的数据,方便我们进行后续的调试。个人感觉Transformer更像是一个批量转化器,如你有很多Observable对象在使用,可以定义一个通用的Transformer对象,里面可以通过doOnNext打印数据,可以定义subscribeOn和observeOn的线程等等,最后使用compose操作符将其应用到我们所有的Observable对象上就可以统一进行设定了。
下面我们进行一下订阅吧:
运行的结果如下,我们可以看到对每个数据都将其转化为String之后通过doOnNext输出了数据的值,然后输出了Subscriber接收到的数据。
关于自定义操作符还有一些需要注意的地方:
- 自定义Operator在发射任何数据之前都要使用!subscriber.isUnsubscribed()来检查Subscriber的状态,如果没有任何Subscriber订阅就没有必要去发射数据了
自定义Operator要遵循Observable的核心原则:
- 可以多次调用Subscriber的onNext方法,但是同一个数据只能调用一次。
- 可以调用Subscriber的onComplete或者onError方法,但是这两个是互斥的,调用了一个就不能再调用另外一个了,并且一旦调用了任何一个方法就不能再调用onNext方法了。
- 如果无法保证遵守以上两条,可以对自定义操作符加上serialize操作符,这个操作符会强制发射正确的数据。
自定义Operator内部不能阻塞住。
- 如果通过compose组合多个操作符就能达到目的就不要自己去写新的代码来实现,在Rxjava的源码中就有很多这样的例子,如:
- first()操作符是通take(1).single()来实现的。
- ignoreElements()是通过 filter(alwaysFalse())来实现的。
- reduce(a) 是通过 scan(a).last()来实现的。
- 当有异常的时候,不能继续发射正常的数据,要立刻调用Subscriber的onError方法将异常抛出去。
- 注意发射数据为null的情况,这和完全不发射数据不是一回事。
关于自定义操作符就到这里了,本文的demo程序见github