1. 使用场景
1.1. 解决问题
假设现在有一大批需要进行计算的任务,为了提高整批任务的执行效率,你可能会使用线程池,向线程池中不断submit异步计算任务,同时你需要保留与每个任务关联的Future,最后遍历这些Future,通过调用Future接口实现类的get方法获取整批计算任务的各个结果。
虽然使用了线程池提高了整体的执行效率,但遍历这些Future,调用Future接口实现类的get方法是阻塞的,也就是和当前这个Future关联的计算任务真正执行完成的时候,get方法才返回结果,如果当前计算任务没有执行完成,而有其它Future关联的计算任务已经执行完成了,就会白白浪费很多等待的时间,所以最好是遍历的时候谁先执行完成就先获取哪个结果,这样就节省了很多持续等待的时间。
ExecutorCompletionService可以实现这样的效果,它的内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。
1.2. 示例代码
可以看到,在获取计算结果时,没有因为不同线程执行时间的不同,而浪费了等待时间。
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.*;
public class ExecutorCompletionServiceTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
int nThread = 5;
Executor executor = Executors.newFixedThreadPool(nThread,
new ThreadFactoryBuilder().setNameFormat("test-%d").setDaemon(true).build());
CompletionService<String> service = new ExecutorCompletionService<>(executor);
for (int i = 0; i < nThread; i++) {
int seqNo = i;
service.submit(new Callable<String>() {
@Override
public String call() throws Exception {
int sleepTime = (nThread - seqNo)*100;
TimeUnit.MILLISECONDS.sleep(sleepTime);
return String.format("thread:%s sleep:%sms,HelloWorld-%s",
Thread.currentThread().getName(), sleepTime, seqNo);
}
});
}
long start = System.currentTimeMillis();
for (int j = 0; j < 5; j++) {
System.out.println(service.take().get());
}
System.out.println(String.format("共耗时:%sms",System.currentTimeMillis()-start));
}
}
输出如下:
thread:test-4 sleep:100ms,HelloWorld-4
thread:test-3 sleep:200ms,HelloWorld-3
thread:test-2 sleep:300ms,HelloWorld-2
thread:test-1 sleep:400ms,HelloWorld-1
thread:test-0 sleep:500ms,HelloWorld-0
共耗时:500ms
1.3. 核心方法说明
ExecutorCompletionService实现了CompletionService接口,在CompletionService接口中定义了如下这些方法:
- Future<V> submit(Callable<V> task):提交一个Callable类型任务,并返回该任务执行结果关联的Future;
- Future<V> submit(Runnable task,V result):提交一个Runnable类型任务,并返回该任务执行结果关联的Future;
- Future<V> take():从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞,直到有任务完成;
- Future<V> poll():从内部阻塞队列中获取并移除第一个执行完成的任务,获取不到则返回null,不阻塞;
- Future<V> poll(long timeout, TimeUnit unit):从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞时间为timeout,获取不到则返回null;
1.4. 类图
2. 实现原理
ExecutorCompletionService有三个私有属性,分别是executor、aes和completionQueue,其中completionQueue就是存储已完成任务的队列,具体代码如下:
- 在构造方法内部给三个属性赋值,completionQueue被初始化为一个LinkedBlockingQueue类型的先进先出阻塞队列
- 在submit方法中,将实现Callable或Runnable接口的任务转换了RunnableFuture对象
- 将RunnableFuture包装成QueueingFuture对象,在QueueingFuture类中实现了FutureTask的done方法
- done方法会在任务执行完成或发生异常时调用,当此方法调用时,将任务添加到completionQueue队列中。这样先执行完成的任务就先添加到完成队列中。
- 将take,poll转换为调用completionQueue中的take,poll方法,实现阻塞队列的效果
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
//删除一些代码
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}