核心知识点
- Observable.create()创建一个观察者Observable,也有很多更有针对性地api,如Observable.fromCallable(),还有一个Observable.interval(),Observable.just()
- subscribeOn(Schedulers.io())指定事件执行的线程
- observeOn(AndroidSchedulers.mainThread())指定观察者回调的线程
- subscribe指定观察者回调的内容
完整的例子
Observable.fromCallable(() -> DataService.insert(new User("LiMing", 17)))
.subscribeOn(Schedulers.io()) // 在子线程执行插入
.observeOn(AndroidSchedulers.mainThread()); // 结果回调到主线程
.subscribe(id -> {
Log.d("RxJava", "插入成功,ID: " + id); // 主线程回调
}, throwable -> {
Log.e("RxJava", "插入失败: " + throwable.getMessage());
})
进阶扩展
CompositeDisposable 是 RxJava 中用于**集中管理多个订阅(Disposable)**的工具类,主要解决以下核心问题:
典型用法
public class MainActivity extends AppCompatActivity {
private CompositeDisposable disposables = new CompositeDisposable();
private UserRepository repo = new UserRepository();
@Override
protected void onCreate(Bundle savedInstanceState) {
disposables.add(
repo.getUsers()
.subscribe(users -> updateUI(users))
);
disposables.add(
repo.getNotifications()
.subscribe(notifications -> showNotifications(notifications))
);
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear(); // 取消所有订阅
}
}
关键特性
参考文章:https://blog.csdn.net/qq_46279058/article/details/146548669
一个进阶的例子
apiService.loginGetToken(header, body)
// 这个是rxhelper里的验证数据内容的逻辑,一般会后端返回的值会带有code字段等,判断返回的数据是否正常
.compose(ValidateTransformers.validate())
// 这个是把泛型里的东西提取出来的代码
.compose(DataTransformers.convertToData())
.compose(DataTransformers.convertToData())
// 这个指定上游操作执行的线程,即从Observerable被创建到observerOn之前的操作
// 下游是observerOn之后的操作到subscribe
.subscribeOn(Schedulers.io())
.doOnSuccess {
saveToken(it)
savePhone(phone)
}.delay(3000, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.compose(ErrorTransformers.convertError<String>(CodeErrorConverter))
.compose(LifecycleTransformer.bind<String, ViewModelClearedEvent>(lifecycleObservable))
.subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
_dataLoading.value = true
loginStatus.value = LOGIN_STATUS_DISABLE
}
override fun onSuccess(t: String) {
_token.value = t
_dataLoading.value = false
_loginSuccess.value = true
loginStatus.value = LOGIN_STATUS_ENABLE
}
override fun onError(e: Throwable) {
e.printStackTrace()
_dataLoading.value = false
_loginSuccess.value = false
_loginError.value = e
loginStatus.value = LOGIN_STATUS_ENABLE
}
})