concat操作符可以连接俩个
Observable,只有第一个Observable调用了onComplete后,才会触发第二个Observable。
比如在读取数据时,先查询缓存,缓存存在直接处理,不存在查询数据库,然后在处理。
package com.netty.demo.vertx;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Slf4j
public class RxJavaTest {
public static void main(String[] args) throws InterruptedException {
Observable o1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
//查询缓存数据
Integer cacheData = Integer.MAX_VALUE;
if (cacheData != null) {
//缓存不为空则直接传递给观察者
emitter.onNext(cacheData);
} else {
//缓存为空则调用onComplete,触发第二个(o2)的执行逻辑
emitter.onComplete();
}
}
});
Observable o2 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
//查询数据库
Integer cacheData = Integer.MAX_VALUE;
//传递给观察者
emitter.onNext(cacheData);
}
});
Observable.concat(o1, o2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
log.info("onSubscribe");
}
@Override
public void onNext(Integer o) {
log.info(o.toString());
}
@Override
public void onError(Throwable e) {
log.info("onError");
}
@Override
public void onComplete() {
log.info("onComplete");
}
});
Thread.sleep(300000000);
}
}