1.配置多线程
private @NotNull List<Map<String, Object>> getData(SysRule rule) throws InterruptedException, ExecutionException {
int pageSize = 200; // 每页查询的数量
int threadPoolSize = Runtime.getRuntime().availableProcessors(); // 根据CPU核心数确定线程池大小
PageQueryExecutor executor = new PageQueryExecutor(threadPoolSize);
int currentPage = 1;
boolean hasMorePages = true;
List<Map<String, Object>> data = new ArrayList<>();
while (hasMorePages) {
log.info("当前页数:" + currentPage);
List<PageQueryTask> tasks = new ArrayList<>();
for (int i = 0; i < threadPoolSize; i++) {
tasks.add(new PageQueryTask(rule.getValue(), pageSize, currentPage));
currentPage++;
}
List<Future<List<Map<String, Object>>>> futures = executor.executeQueries(tasks);
for (Future<List<Map<String, Object>>> future : futures) {
List<Map<String, Object>> results = future.get();
if (!results.isEmpty()) {
data.addAll(results);
// 处理查询结果
log.info("Received " + results.size() + " records.");
} else {
// 如果某一页为空,则停止查询
hasMorePages = false;
break;
}
}
// 等待所有任务完成
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
executor = new PageQueryExecutor(threadPoolSize); // 重新创建线程池
}
executor.shutdown();
return matrixData;
}
2.配置任务
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
public class PageQueryTask implements Callable<List<Map<String,Object>>> {
private String value;
private int pageSize;
private int page;
public PageQueryTask(String value, int pageSize, int page) {
this.value = value;
this.pageSize = pageSize;
this.page = page;
}
@Override
public List<Map<String,Object>> call() throws Exception {
//业务逻辑
List<Map<String, Object>> condition = getPageList(value,page,pageSize);
return new ArrayList<>(condition);
}
}
3.配置多线程执行类
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class PageQueryExecutor {
private ExecutorService executorService;
public PageQueryExecutor(int poolSize) {
this.executorService = Executors.newFixedThreadPool(poolSize);
}
public List<Future<List<Map<String,Object>>>> executeQueries(List<PageQueryTask> tasks) throws InterruptedException {
return executorService.invokeAll((Collection<? extends Callable<List<Map<String, Object>>>>) tasks);
}
public void shutdown() {
executorService.shutdown();
}
}
4.调用查询多线程
List<Map<String, Object>> data = getData(rule);
上述是不知道总数,根据分页查询数据的多线程实现方式。