并发编程--如何快速获取批量异步任务的结果

本文介绍了如何利用ExecutorCompletionService优化线程池任务执行效率,避免等待浪费,通过实例代码演示了其核心方法和工作原理,展示了如何通过先进先出队列获取并处理异步计算结果。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

1. 使用场景

1.1. 解决问题

         假设现在有一大批需要进行计算的任务,为了提高整批任务的执行效率,你可能会使用线程池,向线程池中不断submit异步计算任务,同时你需要保留与每个任务关联的Future,最后遍历这些Future,通过调用Future接口实现类的get方法获取整批计算任务的各个结果。

       虽然使用了线程池提高了整体的执行效率,但遍历这些Future,调用Future接口实现类的get方法是阻塞的,也就是和当前这个Future关联的计算任务真正执行完成的时候,get方法才返回结果,如果当前计算任务没有执行完成,而有其它Future关联的计算任务已经执行完成了,就会白白浪费很多等待的时间,所以最好是遍历的时候谁先执行完成就先获取哪个结果,这样就节省了很多持续等待的时间

       ExecutorCompletionService可以实现这样的效果,它的内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果

1.2. 示例代码

         可以看到,在获取计算结果时,没有因为不同线程执行时间的不同,而浪费了等待时间。

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.util.concurrent.*;

public class ExecutorCompletionServiceTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int nThread = 5;
        Executor executor = Executors.newFixedThreadPool(nThread,
                new ThreadFactoryBuilder().setNameFormat("test-%d").setDaemon(true).build());
        CompletionService<String> service = new ExecutorCompletionService<>(executor);
        for (int i = 0; i < nThread; i++) {
            int seqNo = i;
            service.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    int sleepTime = (nThread - seqNo)*100;
                    TimeUnit.MILLISECONDS.sleep(sleepTime);
                    return String.format("thread:%s sleep:%sms,HelloWorld-%s",
                            Thread.currentThread().getName(), sleepTime, seqNo);
                }
            });
        }
        long start = System.currentTimeMillis();
        for (int j = 0; j < 5; j++) {
            System.out.println(service.take().get());
        }
        System.out.println(String.format("共耗时:%sms",System.currentTimeMillis()-start));
    }
}

输出如下:

thread:test-4 sleep:100ms,HelloWorld-4
thread:test-3 sleep:200ms,HelloWorld-3
thread:test-2 sleep:300ms,HelloWorld-2
thread:test-1 sleep:400ms,HelloWorld-1
thread:test-0 sleep:500ms,HelloWorld-0
共耗时:500ms

1.3. 核心方法说明

ExecutorCompletionService实现了CompletionService接口,在CompletionService接口中定义了如下这些方法:

  • Future<V> submit(Callable<V> task):提交一个Callable类型任务,并返回该任务执行结果关联的Future;
  • Future<V> submit(Runnable task,V result):提交一个Runnable类型任务,并返回该任务执行结果关联的Future;
  • Future<V> take():从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞,直到有任务完成;
  • Future<V> poll():从内部阻塞队列中获取并移除第一个执行完成的任务,获取不到则返回null,不阻塞;
  • Future<V> poll(long timeout, TimeUnit unit):从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞时间为timeout,获取不到则返回null;

1.4. 类图

         

2. 实现原理

        ExecutorCompletionService有三个私有属性,分别是executor、aes和completionQueue,其中completionQueue就是存储已完成任务的队列,具体代码如下:

  1. 在构造方法内部给三个属性赋值,completionQueue被初始化为一个LinkedBlockingQueue类型的先进先出阻塞队列
  2. 在submit方法中,将实现Callable或Runnable接口的任务转换了RunnableFuture对象
  3. 将RunnableFuture包装成QueueingFuture对象,在QueueingFuture类中实现了FutureTask的done方法
  4. done方法会在任务执行完成或发生异常时调用,当此方法调用时,将任务添加到completionQueue队列中。这样先执行完成的任务就先添加到完成队列中。
  5. 将take,poll转换为调用completionQueue中的take,poll方法,实现阻塞队列的效果
public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

//删除一些代码
public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }


    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }


    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

 

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

enjoy编程

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值