五月的爸爸 2019-12-25 10:42 采纳率: 0%
浏览 431

RxJava toList或者collect运算符后无法进行订阅

final Observer<List<SportCheckTaskEntity>> observer =  new Observer<List<SportCheckTaskEntity>>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(List<SportCheckTaskEntity> sportCheckTaskEntities) {
                    iSportCheckTaskPresenterView.onRefreshDataSuccess(flag, sportCheckTaskEntities);
                }

                @Override
                public void onError(Throwable e) {
                    Log.i("1216", e.toString());
                }

                @Override
                public void onComplete() {

                }
            };
            sportCheckDao.querySportCheckTaskManager(userId, teamId, currentTime)
                    // 当前的类型和期望的类型
                    .flatMap(new Function<List<SportCheckTaskEntity>, ObservableSource<SportCheckTaskEntity>>() {
                        @Override
                        public ObservableSource<SportCheckTaskEntity> apply(List<SportCheckTaskEntity> sportCheckTaskEntities) throws Exception {
                            return Observable.fromIterable(sportCheckTaskEntities);
                        }
                    })
                    .flatMap(new Function<SportCheckTaskEntity, ObservableSource<SportCheckTaskEntity>>() {
                        @Override
                        public ObservableSource<SportCheckTaskEntity> apply(final SportCheckTaskEntity sportCheckTaskEntity) throws Exception {
                            return sportCheckDao.getSportCheckTaskPrepareItemCountByTaskId(sportCheckTaskEntity.getSportCheckTaskId()).map(new Function<Integer, SportCheckTaskEntity>() {
                                @Override
                                public SportCheckTaskEntity apply(Integer integer) throws Exception {
                                    sportCheckTaskEntity.setPrepareItemNumber(integer);
                                    return sportCheckTaskEntity;
                                }
                            });
                        }
                    })
                    .flatMap(new Function<SportCheckTaskEntity, ObservableSource<SportCheckTaskEntity>>() {
                        @Override
                        public ObservableSource<SportCheckTaskEntity> apply(final SportCheckTaskEntity sportCheckTaskEntity) throws Exception {
                            return sportCheckDao.getSportCheckTaskTotalItemCountByTaskId(sportCheckTaskEntity.getSportCheckTaskId()).map(new Function<Integer, SportCheckTaskEntity>() {
                                @Override
                                public SportCheckTaskEntity apply(Integer integer) throws Exception {
                                    sportCheckTaskEntity.setTotalItemNumber(integer);
                                    return sportCheckTaskEntity;
                                }
                            });
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .toList()
                    .toObservable()
                    .subscribe(observer);

第一个flatMap是用来发射单个Observable的
第二个flatMap是用来给集合中的对象添加某个值的
第二个flatMap是用来给集合中的对象添加某个值的

在这之前,都没有问题,但是在执行toList()或者collect()运算符后,后续的订阅没有执行.

网上查了原因,说是toList()运算符引起的阻塞,需要使用onComplete()方法进行调用,但是不清楚具体该如何使用

  • 写回答

1条回答 默认 最新

  • 你知我知皆知 2024-07-27 23:42
    关注

    以下回答参考 皆我百晓生券券喵儿 等免费微信小程序相关内容作答,并由本人整理回复。

    在 RxJava 中,toList()collect() 运算符不会立即返回结果。它们会在完成数据处理(比如从数据库查询或从网络请求中获取数据)之后才返回。因此,在这些操作之间添加 toList() 或者 collect() 操作可能会导致阻塞。

    要解决这个问题,你可以考虑以下几种方式:

    1. 使用 doOnNext()doAfterTerminate() 来捕获数据处理过程中发生的错误,并在处理完成后继续执行其他操作。
    2. 在数据处理结束后再调用 toList()collect()
    3. 如果可能的话,将所有数据处理的操作放在一个单独的方法中,然后在该方法内部调用 toList()collect()

    例如:

    private final List<SportCheckTaskEntity> data;
    
    public void fetchData() {
        doSomethingInAnotherMethod(); // 假设这是一个异步任务
        collect(data); // 在这里调用
    }
    

    在这个例子中,我们首先运行异步任务(doSomethingInAnotherMethod()),然后收集数据并调用 collect() 方法。这样可以在数据处理的过程中继续执行其他操作,而不需要等待数据处理完毕。

    请注意,这三种解决方案都有其适用场景。如果你的数据处理过程比较复杂,且你希望保持代码的简洁性,那么选择第一个方案。如果数据处理的过程相对简单,或者你更喜欢在处理完成后立即执行其他操作,那么第二个方案可能是更好的选择。如果你的数据处理过程涉及到大量的并发操作,那么第三个方案可能是最佳的选择。

    评论

报告相同问题?