博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RxJava操作符之组合操作符(六)
阅读量:5830 次
发布时间:2019-06-18

本文共 11429 字,大约阅读时间需要 38 分钟。

前言

  上一篇文章我们学习了过滤类操作符,本篇我们将一起来学习RxJava组合类操作符。组合操作符主要是用来同时处理多个Observable,将他们进行组合创建出新的满足我们需求的Observable,一起来看下都有哪些。

  

组合操作符

Merge

  merge操作符,将两个Observable要发射的观测序列合并为一个序列进行发射。按照两个序列每个元素的发射时间先后进行排序,同一时间点发射的元素则是无序的。

//将一个发送字母的Observable与发送数字的Observable合并发射final String[] words = new String[]{
"A", "B", "C", "D", "E", "F", "G", "H", "I"};//字母Observable,每200ms发射一次Observable
wordSequence = Observable.interval(200, TimeUnit.MILLISECONDS) .map(new Func1
() { @Override public String call(Long position) { return words[position.intValue()]; } }) .take(words.length);//数字Observable,每500ms发射一次Observable
numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS).take(4);Observable.merge(wordSequence, numberSequence) .subscribe(new Action1
() { @Override public void call(Serializable serializable) { Log.e("rx_test", "merge:" + serializable.toString()); } });复制代码

  输出结果:

merge:Amerge:Bmerge:0merge:Cmerge:Dmerge:Emerge:1merge:Fmerge:Gmerge:2merge:Hmerge:Imerge:3复制代码

  原理图:

  merge操作符还有一种入参merge(Observable[]),可传入含有多个Observable的集合,merge操作符也可将这多个Observable的序列合并后发射。

MergeDelayError

  mergeDelayError操作符,与merge功能类似,都是用来合并Observable的。不同之处在于mergeDelayError操作符在合并过程中发生异常的话不会立即停止合并,而会在所有元素合并发射完毕之后再发射异常。但发生异常的那个Observable就不会发射数据了。

//字母Observable,每200ms发射一次,模拟过程中产生一个异常Observable
wordSequence = Observable.interval(200, TimeUnit.MILLISECONDS) .map(new Func1
() { @Override public String call(Long position) { Long cache = position; if (cache == 3) { cache = cache / 0; } return words[position.intValue()]; } }) .take(words.length);//数字Observable,每500ms发射一次Observable
numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS).take(4);Observable.mergeDelayError(wordSequence, numberSequence) .subscribe(new Action1
() { @Override public void call(Serializable serializable) { Log.e("rx_test", "mergeDelayError:" + serializable.toString()); } }, new Action1
() { @Override public void call(Throwable throwable) { Log.e("rx_test", "mergeDelayError:" + throwable.getMessage()); } }, new Action0() { @Override public void call() { Log.e("rx_test", "mergeDelayError:onComplete"); } });复制代码

  输出结果:

mergeDelayError:AmergeDelayError:BmergeDelayError:0mergeDelayError:CmergeDelayError:1mergeDelayError:2mergeDelayError:3mergeDelayError:divide by zero复制代码

  由输出结果可看出,wordSequence在发射到C时抛出了一个异常,停止发射其剩下的数据,但合并没有停止。合并完成之后这个异常才被发射了出来。

  原理图:

Concat

  concat操作符,将多个Obserbavle发射的数据进行合并后发射,类似于merge操作符。但concat操作符是将Observable依次发射,是有序的。

Observable
wordSequence = Observable.just("A", "B", "C", "D", "E");Observable
numberSequence = Observable.just(1, 2, 3, 4, 5);Observable
nameSequence = Observable.just("Sherlock", "Holmes", "Xu", "Lei");Observable.concat(wordSequence, numberSequence, nameSequence) .subscribe(new Action1
() { @Override public void call(Serializable serializable) { Log.e("rx_test", "concat:" + serializable.toString()); } });复制代码

  输出结果:

concat:Aconcat:Bconcat:Cconcat:Dconcat:Econcat:1concat:2concat:3concat:4concat:5concat:Sherloconcat:Holmesconcat:Xuconcat:Lei复制代码

  原理图:

Zip

  zip(Observable, Observable, Func2)操作符,根据Func2中的call()方法规则合并两个Observable的数据项并发射。

  注意:若其中一个Observable数据发送结束或出现异常后,另一个Observable也会停止发射数据。

Observable
wordSequence = Observable.just("A", "B", "C", "D", "E");Observable
numberSequence = Observable.just(1, 2, 3, 4, 5, 6);Observable.zip(wordSequence, numberSequence, new Func2
() { @Override public String call(String s, Integer integer) { return s + integer; }}).subscribe(new Action1
() { @Override public void call(String s) { Log.e("rx_test", "zip:" + s); }});复制代码

  输出结果:

zip:A1zip:B2zip:C3zip:D4zip:E5复制代码

  由输出结果可看出numberSequence观测序列最后的6并没有发射出来,由于wordSequence观测序列已发射完所有数据,所以组合序列也停止发射数据了。

  原理图:

StartWith

  startWith操作符,用于在源Observable发射的数据前,插入指定的数据并发射。

Observable.just(4, 5, 6, 7)        .startWith(1, 2, 3)        .subscribe(new Action1
() { @Override public void call(Integer integer) { Log.e("rx_test", "startWith:" + integer); } });复制代码

  输出结果:

startWith:1startWith:2startWith:3startWith:4startWith:5startWith:6startWith:7复制代码

  原理图:

  startWith还有两种入参:

  • startWith(Iterable):可在源Observable发射的数据前插入Iterable数据并发射。
  • startWith(Observable):可在源Observable发射的数据前插入另一Observable发射的数据并发射。

    SwitchOnNext

      switchOnNext操作符,用来将一个发射多个小Observable的源Observable转化为一个Observable,然后发射多个小Observable所发射的数据。若小Observable正在发射数据时,源Observable又发射了新的小Observable,则前一个小Observable还未发射的数据会被抛弃,直接发射新的小Observable所发射的数据,上例子。
    ```java
    //每隔500ms产生一个Observable
    Observable> observable = Observable.interval(500, TimeUnit.MILLISECONDS)
    .map(new Func1
    >() { @Override public Observable
    call(Long aLong) { //每隔200毫秒产生一组数据(0,10,20,30,40) return Observable.interval(200, TimeUnit.MILLISECONDS) .map(new Func1
    () { @Override public Long call(Long aLong) { return aLong * 10; } }).take(5); } }).take(2);复制代码

Observable.switchOnNext(observable)

.subscribe(new Action1() {
@Override
public void call(Long aLong) {
Log.e("rx_test", "switchOnNext:" + aLong);
}
});

  输出结果:```javaswitchOnNext:0switchOnNext:10switchOnNext:0switchOnNext:10switchOnNext:20switchOnNext:30switchOnNext:40复制代码

  由输出结果发现第一个小Observable打印到10则停止了发射数据,说明其发射到10时,新的小Observable被创建了出来,第一个小Observable则被中断发射,开始发射新的小Observable的数据。

  原理图:

CombineLatest

  combineLatest操作符,用于将两个Observale最近发射的数据以Func2函数的规则进行组合并发射。

//引用merge的例子final String[] words = new String[]{
"A", "B", "C", "D", "E", "F", "G", "H", "I"};Observable
wordSequence = Observable.interval(300, TimeUnit.MILLISECONDS) .map(new Func1
() { @Override public String call(Long position) { return words[position.intValue()]; } }) .take(words.length);Observable
numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS) .take(5);Observable.combineLatest(wordSequence, numberSequence, new Func2
() { @Override public String call(String s, Long aLong) { return s + aLong; } }) .subscribe(new Action1
() { @Override public void call(Serializable serializable) { Log.e("rx_test", "combineLatest:" + serializable.toString()); } });复制代码

  输出结果:

combineLatest:A0combineLatest:B0combineLatest:C0combineLatest:C1combineLatest:D1combineLatest:E1combineLatest:E2combineLatest:F2combineLatest:F3combineLatest:G3combineLatest:H3combineLatest:H4combineLatest:I4复制代码

  如果将wordSequence与numberSequence的入参顺序互换,输出结果也会不同:

combineLatest:0AcombineLatest:0BcombineLatest:0CcombineLatest:1CcombineLatest:1DcombineLatest:2DcombineLatest:2EcombineLatest:2FcombineLatest:3FcombineLatest:3GcombineLatest:3HcombineLatest:4HcombineLatest:4I复制代码

  wordSequence每300ms发射一个字符,numberSequence每500ms发射一个数字。可能有些码友不知道这个输出结果怎么来的,这个操作符确实不太好理解。我们来看一下这个原理图就很清楚了。

  原理图:

Join

  join(Observable, Func1, Func1, Func2)操作符,类似于combineLatest操作符,用于将ObservableA与ObservableB发射的数据进行排列组合。但join操作符可以控制Observable发射的每个数据的生命周期,在每个发射数据的生命周期内,可与另一个Observable发射的数据按照一定规则进行合并,来看下join的几个入参。

  • Observable:需要与源Observable进行组合的目标Observable。
  • Func1:接收从源Observable发射来的数据,并返回一个Observable,这个Observable的声明周期决定了源Obsrvable发射出来的数据的有效期;
  • Func1:接收目标Observable发射来的数据,并返回一个Observable,这个Observable的声明周期决定了目标Obsrvable发射出来的数据的有效期;
  • Func2:接收从源Observable和目标Observable发射出来的数据,并将这两个数据按自定的规则组合后返回。
    //产生字母的序列,周期为1000msString[] words = new String[]{
    "A", "B", "C", "D", "E", "F", "G", "H"};Observable
    observableA = Observable.interval(1000, TimeUnit.MILLISECONDS) .map(new Func1
    () { @Override public String call(Long aLong) { return words[aLong.intValue()]; } }).take(8);//产0,1,2,3,4,5,6,7的序列,延时500ms发射,周期为1000msObservable
    observableB = Observable.interval(500, 1000, TimeUnit.MILLISECONDS) .map(new Func1
    () { @Override public Long call(Long aLong) { return aLong; } }).take(words.length);//joinobservableA.join(observableB, new Func1
    >() { @Override public Observable
    call(String s) { //ObservableA发射的数据有效期为600ms return Observable.timer(600, TimeUnit.MILLISECONDS); } }, new Func1
    >() { @Override public Observable
    call(Long aLong) { //ObservableB发射的数据有效期为600ms return Observable.timer(600, TimeUnit.MILLISECONDS); } }, new Func2
    () { @Override public String call(String s, Long aLong) { return s + aLong; } }).subscribe(new Action1
    () { @Override public void call(String s) { Log.e("rx_test", "join:" + s); }});复制代码
      join操作符的组合方式类似于数学上的排列组合规则,以ObservableA为基准源Observable,按照其自身周期发射数据,且每个发射出来的数据都有其有效期。而ObservableB每发射出来一个数据,都与A发射出来的并且还在有效期内的数据按Func2函数中的规则进行组合,B发射出来的数据也有其有效期。最后再将结果发射给观察者进行处理。
      输出结果:
    join:A0join:A1join:B1join:B2join:C2join:C3join:D3join:D4join:E4join:E5join:F5join:F6join:G6join:G7join:H7复制代码
      原理图:

GroupJoin

  groupJoin操作符,类似于join操作符,区别在于第四个参数Func2的传入函数不同,对join之后的结果包装了一层小的Observable,便于用户再次进行一些过滤转换等操作再发射给Observable。

observableA.groupJoin(observableB,        new Func1
>() { @Override public Observable
call(String s) { return Observable.timer(600, TimeUnit.MILLISECONDS); } }, new Func1
>() { @Override public Observable
call(Long aLong) { return Observable.timer(600, TimeUnit.MILLISECONDS); } }, new Func2
, Observable
>() { @Override public Observable
call(final String s, Observable
longObservable) { return longObservable.map(new Func1
() { @Override public String call(Long aLong) { return s + aLong; } }); } }) .subscribe(new Action1
>() { @Override public void call(Observable
stringObservable) { stringObservable.subscribe(new Action1
() { @Override public void call(String s) { Log.e("rx_test", "groupJoin:" + s); } }); } });复制代码

  输出结果:

groupJoin:A0groupJoin:A1groupJoin:B1groupJoin:B2groupJoin:C2groupJoin:C3groupJoin:D3groupJoin:D4groupJoin:E4groupJoin:E5groupJoin:F5groupJoin:F6groupJoin:G6groupJoin:G7groupJoin:H7复制代码

  原理图:

总结

  到此,本篇关于RxJava的常用组合类操作符就讲解完毕了。通过以上四篇文章对RxJava四类操作符的学习,相信大家已经基本掌握RxJava如何使用了。实践是检验真理的唯一标准,下一篇我们来一起上项目看看实践中如何使用RxJava。

  技术渣一枚,有写的不对的地方欢迎大神们留言指正,有什么疑惑或者建议也可以在我Github上RxJavaDemo项目Issues中提出,我会及时回复。
  附上RxJavaDemo的地址:
  

转载地址:http://vnodx.baihongyu.com/

你可能感兴趣的文章
选择排序
查看>>
SQL Server 数据库的数据和日志空间信息
查看>>
前端基础之JavaScript
查看>>
自己动手做个智能小车(6)
查看>>
自己遇到的,曾未知道的知识点
查看>>
P1382 楼房 set用法小结
查看>>
分类器性能度量
查看>>
windows 环境下切换 python2 与 pythone3 以及常用命令
查看>>
docker 基础
查看>>
伊朗黑客对中东发起名为Magic Hound的网络间谍行为
查看>>
你的系统还未打补丁?小心恶意广告攻击
查看>>
全自主可控 赛凡云梦数据仓解决行业难题
查看>>
爬虫-化被动为主动
查看>>
互联网大数据时代 以数据为“轻创业”加分
查看>>
是时候考虑云中的无线管理了
查看>>
中国移动再推企业版飞信,为何心不死?可是力可足?
查看>>
AT&T和三星加入5GAA 进军联网汽车市场
查看>>
IT招聘市场最热门的五类开源技术人才
查看>>
达梦7数据库安装记录
查看>>
python+Eclipse+PyDev搭建记录
查看>>