代码
public class RxBus {
private final Subject<Object, Object> bus;
// PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
private RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
public static RxBus getDefault() {
return RxBusHolder.sInstance;
}
private static class RxBusHolder {
private static final RxBus sInstance = new RxBus();
}
// 提供了一个新的事件
public void post(Object o) {
bus.onNext(o);
}
// 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
public <T> Observable<T> toObservable(Class<T> eventType) {
return bus.ofType(eventType);
}
}
示例
void registerEvent() {
Subscription rxSubscription = RxBus.getDefault().toObservable(NightModeEvent.class)
.compose(RxUtil.<NightModeEvent>rxSchedulerHelper())
.map(new Func1<NightModeEvent, Boolean>() {
@Override
public Boolean call(NightModeEvent nightModeEvent) {
return nightModeEvent.getNightMode();
}
})
.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
mView.showError("切换模式失败ヽ(≧Д≦)ノ");
}
@Override
public void onNext(Boolean aBoolean) {
mView.useNightMode(aBoolean);
}
});
addSubscrebe(rxSubscription);
}
public static <T> Observable.Transformer<T, T> rxSchedulerHelper() { //compose简化线程
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
理解
1、产生特定类型的被观察者 2、post提供一个新事件 3、如果是属于该类型的被观察者则将事件传递,观察者执行onNext
compose()操作符
-
是唯一一个能从流中获取原生Observable 的方法,因此,影响整个流的操作符(像subscribeOn()和observeOn())需要使用compose(),相对的,如果你在flatMap()中使用subscribeOn()/observeOn(),它只影响你创建的flatMap()中的Observable,而不是整个流。
-
当你创建一个Observable流并且内联了一堆操作符以后,compose()会立即执行,flatMap()则是在onNext()被调用以后才会执行,换句话说,flatMap()转换的是每个项目,而compose()转换的是整个流。
-
flatMap()一定是低效率的,因为他每次调用onNext()之后都需要创建一个新的Observable,compose()是操作在整个流上的。