回到顶部

一、前言

       很快我们就迎来了第二期,上一期我们主要讲解了 RxJava 1.x 到 2.x 的变化概览,相信各位熟练掌握RxJava 1.x的老司机们随便看一下变化概览就可以上手RxJava 2.x了,但为了满足更广大的年轻一代司机(未来也是老司机),在本节中,我们将学习RxJava 2.x 强大的操作符章节。

     【注】以下所有操作符标题都可直接点击进入官方doc查看。

回到顶部

二、正题

1、Create

      create操作符应该是最常见的操作符了,主要用于产生一个Obserable被观察者对象,为了方便大家的认知,以后的教程中统一把被观察者Observable称为发射器(上游事件),观察者Observer称为接收器(下游事件)。

     万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

 

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

 1 Observable.create(new ObservableOnSubscribe<Integer>() { 2             @Override 3             public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { 4                 mRxOperatorsText.append("Observable emit 1" + "\n"); 5                 Log.e(TAG, "Observable emit 1" + "\n"); 6                 e.onNext(1); 7                 mRxOperatorsText.append("Observable emit 2" + "\n"); 8                 Log.e(TAG, "Observable emit 2" + "\n"); 9                 e.onNext(2);10                 mRxOperatorsText.append("Observable emit 3" + "\n");11                 Log.e(TAG, "Observable emit 3" + "\n");12                 e.onNext(3);13                 e.onComplete();14                 mRxOperatorsText.append("Observable emit 4" + "\n");15                 Log.e(TAG, "Observable emit 4" + "\n" );16                 e.onNext(4);17             }18         }).subscribe(new Observer<Integer>() {19             private int i;20             private Disposable mDisposable;21 22             @Override23             public void onSubscribe(@NonNull Disposable d) {24                 mRxOperatorsText.append("onSubscribe : " + d.isDisposed() + "\n");25                 Log.e(TAG, "onSubscribe : " + d.isDisposed() + "\n" );26                 mDisposable = d;27             }28 29             @Override30             public void onNext(@NonNull Integer integer) {31                 mRxOperatorsText.append("onNext : value : " + integer + "\n");32                 Log.e(TAG, "onNext : value : " + integer + "\n" );33                 i++;34                 if (i == 2) {35                     // 在RxJava 2.x 中,新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件36                     mDisposable.dispose();37                     mRxOperatorsText.append("onNext : isDisposable : " + mDisposable.isDisposed() + "\n");38                     Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed() + "\n");39                 }40             }41 42             @Override43             public void onError(@NonNull Throwable e) {44                 mRxOperatorsText.append("onError : value : " + e.getMessage() + "\n");45                 Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );46             }47 48             @Override49             public void onComplete() {50                 mRxOperatorsText.append("onComplete" + "\n");51                 Log.e(TAG, "onComplete" + "\n" );52             }53         });

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

输出:

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

需要注意的几点是:

1)在发射事件中,我们在发射了数值3之后,直接调用了e.onComlete(),虽然无法接收事件,但发送事件还是继续的。

2) 另外一个值得注意的点是,在RxJava 2.x中,可以看到发射事件方法相比1.x多了一个throws Excetion,意味着我们做一些特定操作再也不用try-catch了。

3) 并且2.x 中有一个Disposable概念,这个东西可以直接调用切断,可以看到,当它的isDisposed()返回为false的时候,接收器能正常接收事件,但当其为true的时候,接收器停止了接收。所以可以通过此参数动态控制接收事件了。

 

2、Map

Map基本算是RxJava中一个最简单的操作符了,熟悉RxJava 1.x的知道,它的作用是对发射时间发送的每一个事件应用一个函数,是的每一个事件都按照指定的函数去变化,而在2.x中它的作用几乎一致。

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

 1 Observable.create(new ObservableOnSubscribe<Integer>() { 2             @Override 3             public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { 4                 e.onNext(1); 5                 e.onNext(2); 6                 e.onNext(3); 7             } 8         }).map(new Function<Integer, String>() { 9             @Override10             public String apply(@NonNull Integer integer) throws Exception {11                 return "This is result " + integer;12             }13         }).subscribe(new Consumer<String>() {14             @Override15             public void accept(@NonNull String s) throws Exception {16                 mRxOperatorsText.append("accept : " + s +"\n");17                 Log.e(TAG, "accept : " + s +"\n" );18             }19         });

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

输出:

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

是的,map基本作用就是将一个Observable通过某种函数关系,转换为另一种Observable,上面例子中就是把我们的Integer数据变成了String类型。从Log日志显而易见。

3、Zip

zip专用于合并事件,该合并不是连接(连接操作符后面会说),而是两两配对,也就意味着,最终配对出的Observable发射事件数目只和少的那个相同。

 

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

 1 Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() { 2             @Override 3             public String apply(@NonNull String s, @NonNull Integer integer) throws Exception { 4                 return s + integer; 5             } 6         }).subscribe(new Consumer<String>() { 7             @Override 8             public void accept(@NonNull String s) throws Exception { 9                 mRxOperatorsText.append("zip : accept : " + s + "\n");10                 Log.e(TAG, "zip : accept : " + s + "\n");11             }12         });

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

 1 private Observable<String> getStringObservable() { 2         return Observable.create(new ObservableOnSubscribe<String>() { 3             @Override 4             public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { 5                 if (!e.isDisposed()) { 6                     e.onNext("A"); 7                     mRxOperatorsText.append("String emit : A \n"); 8                     Log.e(TAG, "String emit : A \n"); 9                     e.onNext("B");10                     mRxOperatorsText.append("String emit : B \n");11                     Log.e(TAG, "String emit : B \n");12                     e.onNext("C");13                     mRxOperatorsText.append("String emit : C \n");14                     Log.e(TAG, "String emit : C \n");15                 }16             }17         });18     }19 20     private Observable<Integer> getIntegerObservable() {21         return Observable.create(new ObservableOnSubscribe<Integer>() {22             @Override23             public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {24                 if (!e.isDisposed()) {25                     e.onNext(1);26                     mRxOperatorsText.append("Integer emit : 1 \n");27                     Log.e(TAG, "Integer emit : 1 \n");28                     e.onNext(2);29                     mRxOperatorsText.append("Integer emit : 2 \n");30                     Log.e(TAG, "Integer emit : 2 \n");31                     e.onNext(3);32                     mRxOperatorsText.append("Integer emit : 3 \n");33                     Log.e(TAG, "Integer emit : 3 \n");34                     e.onNext(4);35                     mRxOperatorsText.append("Integer emit : 4 \n");36                     Log.e(TAG, "Integer emit : 4 \n");37                     e.onNext(5);38                     mRxOperatorsText.append("Integer emit : 5 \n");39                     Log.e(TAG, "Integer emit : 5 \n");40                 }41             }42         });43     }

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

输出:

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

需要注意的是:

1) zip 组合事件的过程就是分别从发射器A和发射器B各取出一个事件来组合,并且一个事件只能被使用一次,组合的顺序是严格按照事件发送的顺序来进行的,所以上面截图中,可以看到,1永远是和A 结合的,2永远是和B结合的。

2) 最终接收器收到的事件数量是和发送器发送事件最少的那个发送器的发送事件数目相同,所以如截图中,5很孤单,没有人愿意和它交往,孤独终老的单身狗。

 

4、Concat

对于单一的把两个发射器连接成一个发射器,虽然 zip 不能完成,但我们还是可以自力更生,官方提供的 concat 让我们的问题得到了完美解决。

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

1 Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))2                 .subscribe(new Consumer<Integer>() {3                     @Override4                     public void accept(@NonNull Integer integer) throws Exception {5                         mRxOperatorsText.append("concat : "+ integer + "\n");6                         Log.e(TAG, "concat : "+ integer + "\n" );7                     }8                 });

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

输出:

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

如图,可以看到。发射器B把自己的三个孩子送给了发射器A,让他们组合成了一个新的发射器,非常懂事的孩子,有条不紊的排序接收。

 

5、FlatMap

FlatMap 是一个很有趣的东西,我坚信你在实际开发中会经常用到。它可以把一个发射器Observable 通过某种方法转换为多个Observables,然后再把这些分散的Observables装进一个单一的发射器Observable。但有个需要注意的是,flatMap并不能保证事件的顺序,如果需要保证,需要用到我们下面要讲的ConcatMap。

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

 

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

 1  Observable.create(new ObservableOnSubscribe<Integer>() { 2             @Override 3             public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { 4                 e.onNext(1); 5                 e.onNext(2); 6                 e.onNext(3); 7             } 8         }).flatMap(new Function<Integer, ObservableSource<String>>() { 9             @Override10             public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {11                 List<String> list = new ArrayList<>();12                 for (int i = 0; i < 3; i++) {13                     list.add("I am value " + integer);14                 }15                 int delayTime = (int) (1 + Math.random() * 10);16                 return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);17             }18         }).subscribeOn(Schedulers.newThread())19                 .observeOn(AndroidSchedu


作  者: 南 尘   万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训 
出  处: http://www.cnblogs.com/liushilin/ 
关于作者:专注于移动前端的项目开发。如有问题或建议,请多多赐教!欢迎加入Android交流群:118116509 
版权声明:本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接。 
特此声明:所有评论和私信都会在第一时间回复。也欢迎园子的大大们指正错误,共同进步。或者直接私信我 
声援博主:如果您觉得文章对您有帮助,可以点击文章下部推荐或侧边关注。您的鼓励是作者坚持原创和持续写作的最大动力! 

http://www.cnblogs.com/liushilin/p/7066074.html

网友评论