数据批处理(队列方式)
public class DataProcessor {
private static final int THREAD_COUNT = 4;
private static final int QUEUE_SIZE = 10;
private LinkedBlockingQueue<Data> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
public DataProcessor() {
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
executor.execute(() -> {
try {
while (true) {
Data data = queue.take();
processData(data);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
public void addData(Data data) {
try {
queue.put(data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void processData(Data data) {
// Process data here
System.out.println("Processing data: " + data);
}
public static void main(String[] args) {
DataProcessor processor = new DataProcessor();
// Add data to be processed
for (int i = 0; i < 20; i++) {
Data data = new Data("Data " + i);
processor.addData(data);
}
}
static class Data {
private String value;
public Data(String value) {
this.value = value;
}
@Override
public String toString() {
return value;
}
}
}
参考网址:
一:https://blog.csdn.net/zhizhengguan/article/details/86551270
二:https://blog.csdn.net/qq_41128049/article/details/134442487
数据批处理入库(线程池方式)
@Configuration
public class ThreadPoolsConfiguration {
//线程池的核心线程数量
private static final int HANDLER_CORE_POOL_SIZE = 2;
//线程池的最大线程数
private static final int HANDLER_MAX_POOL_SIZE = 10;
//阻塞队列的容量
private static final int HANDLER_QUEUE_CAPACITY = 5000;
//当线程数大于核心线程数时,多余的空闲线程存活的最长时间()
private static final int HANDLER_KEEP_ALIVE_TIME = 1;
@Bean(name = {"cardRecordSyncExecutor"})
public Executor cardRecordSyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(HANDLER_CORE_POOL_SIZE);
executor.setMaxPoolSize(HANDLER_MAX_POOL_SIZE);
executor.setQueueCapacity(HANDLER_QUEUE_CAPACITY);
executor.setKeepAliveSeconds(HANDLER_KEEP_ALIVE_TIME);
executor.setThreadNamePrefix("cardRecordSyncExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
executor.initialize();
return executor;
}
}
@Resource(name = "cardRecordSyncExecutor")
private Executor executor;
public void reSync() {
// 超过半小时状态为1的数据状态重置0
Date date = DateUtil.getAddMinuteDate(new Date(), -30);
baseMapper.resetTimeoutStatus(date);
// 查询推送失败且失败次数小于6次的
List<CardRecordSync> list = baseMapper.listFail();
if (list.isEmpty()) {
return;
}
log.info("待处理补推记录数size={}", list.size());
List<List<CardRecordSync>> partition = Lists.partition(list, 100);
for (List<CardRecordSync> syncs : partition) {
executor.execute(() -> {
List<Long> ids = syncs.stream().map(CardRecordSync::getId).collect(Collectors.toList());
// 状态变更为处理中
EntityWrapper<CardRecordSync> wrapper = new EntityWrapper<>();
wrapper.in("ID", ids);
CardRecordSync po = new CardRecordSync();
po.setStatus(CardRecordSyncStatus.PUSHING.getStatus());
baseMapper.update(po, wrapper);
syncs.forEach(record -> {
// boolean flag = thirdService.cardRecordSync(record.getSyncParam());
DataSyncResult dataSyncResult = thirdService.cardRecordSync(record.getSyncParam());
boolean flag = dataSyncResult.isDataSyncSuc();
int status = CardRecordSyncStatus.FAIL.getStatus();
if (flag) {
status = CardRecordSyncStatus.SUCCESS.getStatus();
}
baseMapper.updateStatusDescById(dataSyncResult.getDataSyncSucDesc(),status, record.getId());
});
});
}
}