用于记录在学习《RxJava-Essentials》[1]时的一些理解。
1.关于observable和subject的区别
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 PublishSubject<String> stringPublishSubject = PublishSubject.create(); Subscription subscription = stringPublishSubject.subscribe(new Observer<String>() { @Override public void onCompleted () { } @Override public void onError (Throwable e) { } @Override public void onNext (String s) { } }); stringPublishSubject.onNext("test" );
subject需要我们自己调用onNext方法,去发送主题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>(){ @Override public void call (Subscriber<? super Integer> subscriber) { for (int i = 0 ; i < 5 ; i++) { subscriber.onNext(i); } subscriber.onCompleted(); } }); Subscription subscriptionPrint = observable.subscribe(new Observer<Integer>() { @Override public void onCompleted () { } @Override public void onError (Throwable e) { } @Override public void onNext (Integer integer) { } });
而Observable则只要我们订阅了,就会自己发送主题。这里当我们调用observable.subscribe时,就会调用到observable之前create匿名参数中的call方法。
2.Observable.from()做了什么? 首先observable.subscribe会调用
1 hook .onSubscribeStart (observable , observable .onSubscribe ).call (subscriber );
也就是我们在Observable.create()中传入的匿名内部类的方法,接下来看Observable.from();实际上也是调用了create方法。
1 2 3 4 5 6 7 8 9 10 public static <T> Observable<T> from(T[] array ) { int n = array .length; if (n == 0 ) { return empty(); } else if (n == 1 ) { return just(array [0 ]); } return create(new OnSubscribeFromArray<T>(array )); }
所以当我们订阅该Observable时,会调用到OnSubscribeFromArray中的call方法
1 2 3 public void call (Subscriber<? super T> child) { child.setProducer(new FromArrayProducer<T>(child, array)); }
这个方法会调用setProducer,在其中,会调用producer.request方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void request (long n) { if (n < 0 ) { throw new IllegalArgumentException("n >= 0 required but it was " + n); } if (n == Long.MAX_VALUE) { if (BackpressureUtils.getAndAddRequest(this , n) == 0 ) { fastPath(); } } else if (n != 0 ) { if (BackpressureUtils.getAndAddRequest(this , n) == 0 ) { slowPath(n); } } }
其中的fastPath和slowPath就会调用到child.onNext方法。 同理还有just()、range()、defer()、range()等都是一样。
3.flatMap()理解 flatMap()函数如下
1 2 3 4 5 6 public final <R > Observable <R > flatMap(Func1 <? super T , ? extends Observable <? extends R >> func ) { if (getClass() == ScalarSynchronousObservable .class ) { return ((ScalarSynchronousObservable <T >)this).scalarFlatMap(func ); } return merge (map (func ) ); }
可以理解为先map,再merge,这个merge的官方文档解释是 Flattens an Observable that emits Observables into a single Observable that emits the items emitted by those Observables, without any transformation.
也就是将那些发送Observables的Observable都合并成一个Observable。而map方法为Returns an Observable that applies a specified function to each item emitted by the source Observable and emits the results of these function applications.也就是根据map中的函数参数将其运用在提供的序列上。
4.关于subscribeOn和observeOn的理解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private Observable<AppInfo> getApps() { return Observable . create(subscriber -> { List <AppInfo> apps = new ArrayList<>(); SharedPreferences sharedPref = getActivity(). getPreferences(Context. MODE_PRIVATE); Type appInfoType = new TypeToken<List <AppInfo>>() { }. getType(); String serializedApps = sharedPref. getString("APPS" , "" ); if (! "" . equals (serializedApps)) { apps = new Gson(). fromJson(serializedApps, appInfoType); } for (AppInfo app : apps) { subscriber. onNext(app); } subscriber. onCompleted(); }); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 getApps() .onBackpressureBuffer() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<AppInfo>() { @Override public void onCompleted () { mSwipeRefreshLayout.setRefreshing(false ); Toast.makeText(getActivity(), "Here is the list!" , Toast.LENGTH_LONG).show(); } @Override public void onError (Throwable e) { Toast.makeText(getActivity(), "Something went wrong!" , Toast.LENGTH_SHORT).show(); mSwipeRefreshLayout.setRefreshing(false ); } @Override public void onNext (AppInfo appInfo) { mAddedApps.add(appInfo); mAdapter.addApplication(mAddedApps.size() - 1 , appInfo); } });
实际上就是将getApps()的操作交给Schedulers.io()这个IO调度器去处理,该调度器会在非主线程来调用getApps。而observeOn则会在指定的调度器上返回结果。
参考文献 [1]https://www.gitbook.com/book/yuxingxin/rxjava-essentials-cn/details