用于记录在学习《RxJava-Essentials》[1]时的一些理解。

1.关于observable和subject的区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
PublishSubject<String> stringPublishSubject = PublishSubject.create();
Subscription subscription = stringPublishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {

}
});
stringPublishSubject.onNext("test");

subject需要我们自己调用onNext方法,去发送主题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>(){
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});

Subscription subscriptionPrint = observable.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer integer) {

}
});

而Observable则只要我们订阅了,就会自己发送主题。这里当我们调用observable.subscribe时,就会调用到observable之前create匿名参数中的call方法。

2.Observable.from()做了什么?
首先observable.subscribe会调用

1
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);

也就是我们在Observable.create()中传入的匿名内部类的方法,接下来看Observable.from();实际上也是调用了create方法。

1
2
3
4
5
6
7
8
9
10
public static <T> Observable<T> from(T[] array) {
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);
}
return create(new OnSubscribeFromArray<T>(array));
}

所以当我们订阅该Observable时,会调用到OnSubscribeFromArray中的call方法

1
2
3
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}

这个方法会调用setProducer,在其中,会调用producer.request方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
if (n == Long.MAX_VALUE) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
fastPath();
}
} else
if (n != 0) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
slowPath(n);
}
}
}

其中的fastPath和slowPath就会调用到child.onNext方法。
同理还有just()、range()、defer()、range()等都是一样。

3.flatMap()理解
flatMap()函数如下

1
2
3
4
5
6
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func));

}

可以理解为先map,再merge,这个merge的官方文档解释是
Flattens an Observable that emits Observables into a single Observable that emits the items emitted by those Observables, without any transformation.

也就是将那些发送Observables的Observable都合并成一个Observable。而map方法为Returns an Observable that applies a specified function to each item emitted by the source Observable and emits the results of these function applications.也就是根据map中的函数参数将其运用在提供的序列上。

4.关于subscribeOn和observeOn的理解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private Observable<AppInfo> getApps() {
return Observable
.create(subscriber -> {
List<AppInfo> apps = new ArrayList<>();

SharedPreferences sharedPref = getActivity().getPreferences(Context.MODE_PRIVATE);
Type appInfoType = new TypeToken<List<AppInfo>>() {
}.getType();
String serializedApps = sharedPref.getString("APPS", "");
if (!"".equals(serializedApps)) {
apps = new Gson().fromJson(serializedApps, appInfoType);
}

for (AppInfo app : apps) {
subscriber.onNext(app);
}
subscriber.onCompleted();
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
getApps()
.onBackpressureBuffer()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}

@Override
public void onError(Throwable e) {
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
mSwipeRefreshLayout.setRefreshing(false);
}

@Override
public void onNext(AppInfo appInfo) {
mAddedApps.add(appInfo);
mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
}
});

实际上就是将getApps()的操作交给Schedulers.io()这个IO调度器去处理,该调度器会在非主线程来调用getApps。而observeOn则会在指定的调度器上返回结果。

参考文献
[1]https://www.gitbook.com/book/yuxingxin/rxjava-essentials-cn/details