通过前面两篇文章,我们已经掌握了创建线程、运行、停止相关方法了,接下来我们继续学习线程的返回值,这样线程创建、运行、返回、终止的整个过程算是有个闭环了。
Runnable接口
Runnable接口是并发编程的核心接口之一,作用是定义线程执行的任务,将任务逻辑与线程的执行机制解耦。
@FunctionalInterface
public interface Runnable {
/**
* Runs this operation.
*/
void run();
}
接口中只有一个run()方法,该方法没有返回值,其设计同样遵循单一职责原则,实现了该接口表明具备运行线程任务的能力,Thread类同样实现了该接口。看下面一个简单的例子。
public class ThreadTask implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" run");
}
public static void main(String[] args) {
Thread t = new Thread(new ThreadTask());
t.start();
}
}
上面的例子,线程t启动运行完后就完了,但是如果想要线程t运行完后返回一个变量,那该如何实现?
Callable接口
在jdk1.5之后,引入Callable接口来解决线程返回值的问题。
/**
* A task that returns a result and may throw an exception.
* Implementors define a single method with no arguments called
* {@code call}.
*
* <p>The {@code Callable} interface is similar to {@link
* java.lang.Runnable}, in that both are designed for classes whose
* instances are potentially executed by another thread. A
* {@code Runnable}, however, does not return a result and cannot
* throw a checked exception.
*
* <p>The {@link Executors} class contains utility methods to
* convert from other common forms to {@code Callable} classes.
*
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> the result type of method {@code call}
*/
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
从注释中可以看出,Callable接口类似Runnable接口,同样是用来定义线程执行的任务,也是只有一个方法,但是Callable接口比Runnable接口多了两个功能,就是可以返回任意类型的返回值和可以抛出异常。需要注意的是Callable类型的任务对象并不能像Runnable类型的任务对象一样用,直接传入Thread类的构造函数创建线程,从前面的文章中我们得知Thread类的构造函数并没有Callable类型的参数。那Callable怎么用?线程返回结果用什么表示?我们接着往下看。
Future接口
Thread类只接收Runnable类型的任务但是没有返回值,Callable类型的任务有返回值但是不能往Thread类里面传。这不就是牛郎和织女隔着一条银河想相爱又不能靠近吗,给他们搭一座桥满足他们吧。面向对象程序设计就是这么人性化,总能在生活中找比喻的例子。Future与FutureTask的引入就是为了解决这一问题。Future接口也是在jdk1.5之后引入的,源码如下:
/**
* A {@code Future} represents the result of an asynchronous
* computation. Methods are provided to check if the computation is
* complete, to wait for its completion, and to retrieve the result of
* the computation. The result can only be retrieved using method
* {@code get} when the computation has completed, blocking if
* necessary until it is ready. Cancellation is performed by the
* {@code cancel} method. Additional methods are provided to
* determine if the task completed normally or was cancelled. Once a
* computation has completed, the computation cannot be cancelled.
* If you would like to use a {@code Future} for the sake
* of cancellability but not provide a usable result, you can
* declare types of the form {@code Future<?>} and
* return {@code null} as a result of the underlying task.
...
*
* @see FutureTask
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface Future<V> {
...
}
Future接口表示一个异步计算任务的返回结果,提供了检查任务是否完成、等待任务完成直到返回结果、取消任务执行、获取任务状态等方法。下面我们来学习下Future接口的属性和方法。
1、get()方法
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;
调用get()方法将等待任务完成并返回结果,如果在等待任务完成的过程中任务被取消了将会抛出取消异常,如果任务执行过程出现异常将会抛出执行异常,如果在等待任务完成的过程中线程被中断,将会抛出中断异常。V get(long timeout, TimeUnit unit)方法可以指定等待指定时间,等待超时了还没有返回则抛出超时异常。
2、boolean cancel(boolean mayInterruptIfRunning)方法
/**
* Attempts to cancel execution of this task. This method has no
* effect if the task is already completed or cancelled, or could
* not be cancelled for some other reason. Otherwise, if this
* task has not started when {@code cancel} is called, this task
* should never run. If the task has already started, then the
* {@code mayInterruptIfRunning} parameter determines whether the
* thread executing this task (when known by the implementation)
* is interrupted in an attempt to stop the task.
*
* <p>The return value from this method does not necessarily
* indicate whether the task is now cancelled; use {@link
* #isCancelled}.
*
* @param mayInterruptIfRunning {@code true} if the thread
* executing this task should be interrupted (if the thread is
* known to the implementation); otherwise, in-progress tasks are
* allowed to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed; {@code true}
* otherwise. If two or more threads cause a task to be cancelled,
* then at least one of them returns {@code true}. Implementations
* may provide stronger guarantees.
*/
boolean cancel(boolean mayInterruptIfRunning);
调用该方法将试图去取消正在执行的任务,需要注意的是,如果任务还没有启动就调用该方法,那么任务将不会执行。如果任务已经启动了,传入的参数mayInterruptIfRunning将决定正在执行的线程是否可以被中断以便停止任务。
传入的mayInterruptIfRunning的值为true时,正在运行中的线程将被中断(如果已知线程已经执行完成),否则传入false时,进行中的任务应当允许完成。
如果任务不可以被取消那么将返回false,否则返回true。如果有多个线程同时取消了任务,那么至少有一个线程返回true。
综合上述,调用该方法只是试图去取消任务,并不能确定任务现在是已经被取消了,需要通过调用isCancelled()方法来判断,如果任务在正常的完成之前被取消了,那么该方法将返回true。
3、isDone()方法
/**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/
boolean isDone();
判断任务是否已经完成。
4、resultNow()方法
/**
* Returns the computed result, without waiting.
*
* <p> This method is for cases where the caller knows that the task has
* already completed successfully, for example when filtering a stream
* of Future objects for the successful tasks and using a mapping
* operation to obtain a stream of results.
* {@snippet lang=java :
* results = futures.stream()
* .filter(f -> f.state() == Future.State.SUCCESS)
* .map(Future::resultNow)
* .toList();
* }
*
* @implSpec
* The default implementation invokes {@code isDone()} to test if the task
* has completed. If done, it invokes {@code get()} to obtain the result.
*
* @return the computed result
* @throws IllegalStateException if the task has not completed or the task
* did not complete with a result
* @since 19
*/
default V resultNow() {
if (!isDone())
throw new IllegalStateException("Task has not completed");
boolean interrupted = false;
try {
while (true) {
try {
return get();
} catch (InterruptedException e) {
interrupted = true;
} catch (ExecutionException e) {
throw new IllegalStateException("Task completed with exception");
} catch (CancellationException e) {
throw new IllegalStateException("Task was cancelled");
}
}
} finally {
if (interrupted) Thread.currentThread().interrupt();
}
}
从jdk19开始引入,该方法是从jdk19开始引入的,用来立即返回计算结果,不会等待。通常用在已知任务已经成功完成的场景,内部通过调用isDone()判断任务是否完成,调用get()获取结果。如果在任务还没有完成或者没有返回结果的情况下调用,将会抛非法状态异常。
5、exceptionNow()方法
/**
* Returns the exception thrown by the task, without waiting.
*
* <p> This method is for cases where the caller knows that the task
* has already completed with an exception.
*
* @implSpec
* The default implementation invokes {@code isDone()} to test if the task
* has completed. If done and not cancelled, it invokes {@code get()} and
* catches the {@code ExecutionException} to obtain the exception.
*
* @return the exception thrown by the task
* @throws IllegalStateException if the task has not completed, the task
* completed normally, or the task was cancelled
* @since 19
*/
default Throwable exceptionNow() {
if (!isDone())
throw new IllegalStateException("Task has not completed");
if (isCancelled())
throw new IllegalStateException("Task was cancelled");
boolean interrupted = false;
try {
while (true) {
try {
get();
throw new IllegalStateException("Task completed with a result");
} catch (InterruptedException e) {
interrupted = true;
} catch (ExecutionException e) {
return e.getCause();
}
}
} finally {
if (interrupted) Thread.currentThread().interrupt();
}
}
从jdk19开始引入,该方法用来返回任务执行过程抛出的异常,不会等待。内部通过调用isDone()判断任务是否完成,如果已经完成则抛出状态不合法异常,调用isCancelled()方法判断任务是否已经被取消,如果已经被取消了则抛出状态不合法异常,捕获get()抛出的异常信息并返回。
从源码中可以看出,resultNow()方法和exceptionNow()通过default关键字定义了默认的具体实现。
6、State属性
从jdk19开始引入,用枚举State来表示任务的运行状态,包含运行中、完成、失败、取消状态。
enum State {
/**
* The task has not completed.
*/
RUNNING,
/**
* The task completed with a result.
* @see Future#resultNow()
*/
SUCCESS,
/**
* The task completed with an exception.
* @see Future#exceptionNow()
*/
FAILED,
/**
* The task was cancelled.
* @see #cancel(boolean)
*/
CANCELLED
}
FutureTask类
Future接口更多的是对异步任务执行结果的抽象,并没有提交给线程执行的相关方法,所以还是不能“往Thread类的构造函数里面扔”,所以又引入了FutureTask类。
/**
...
*
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this FutureTask's {@code get} methods
*/
public class FutureTask<V> implements RunnableFuture<V> {
...
}
FutureTask的引入是对Runnable和Callable对象的包装,FutureTask实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable接口和Future接口。因此一个FutureTask类型的任务将具备作为参数传入Thread类构造函数的能力(Thread构造函数接受Runnable类型的参数)和获取返回任务结果的能力(Future接口的get()方法获取返回结果)。
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
RunnableFuture接口只有一个run()方法,运行该方法将提交任务执行并对返回结果进行设置,由子类实现具体的逻辑,下面我们来分析下FutureTask的成员和方法。
1、FutureTask的主要成员
public class FutureTask<V> implements RunnableFuture<V> {
...
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
...
}
FutureTask的运行状态我们暂且不管,从源码中可以看出:
(1)callable变量,Callable类的任务对象,用来接收外部传入的任务,该任务在运行完成后被设置为空。
(2)outcome变量,Object类型的返回结果,代表任务的执行结果或者运行时抛出的异常。
(3)runner变量,运行Callable任务的线程对象。
(4)waiters变量,等待线程的集合。
2、FutureTask的主要方法
从上图可以看出,FutureTask实现了Future接口的方法,并实现了run()方法。两个构造函数分别支持传入Runnable类型的任务和Callable类型的任务,我们摘取run()方法和两个构造函数进行分析。
(1)FutureTask(Runnable runnable, V result)
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
该构造函数支持传入Runnable类型的参数和泛型的返回结果来创建FutureTask对象,内部调用Executors.callable(runnable, result)方法对Runnable类型的任务包装成Callable类型的任务。
/**
* Returns a {@link Callable} object that, when
* called, runs the given task and returns the given result. This
* can be useful when applying methods requiring a
* {@code Callable} to an otherwise resultless action.
* @param task the task to run
* @param result the result to return
* @param <T> the type of the result
* @return a callable object
* @throws NullPointerException if task null
*/
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
Executors.callable()方法内部创建了一个RunnableAdapter对象对Runnable对象进行适配转换成Callable对象。
/**
* A callable that runs given task and returns given result.
*/
private static final class RunnableAdapter<T> implements Callable<T> {
private final Runnable task;
private final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
public String toString() {
return super.toString() + "[Wrapped task = " + task + "]";
}
}
RunnableAdapter适配器实现了Callable接口,因此具备了执行提交任务并返回结果的能力,适配器内部还是用Runnable接收传入的任务,只是将任务放到了重写Callable的call()方法中执行,顺利的将Runnable任务转换为Callable任务。
(2)FutureTask(Callable callable)
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
该构造函数直接用Callable任务创建FutureTask对象。
(3)run() 方法
public void run() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//1、执行传入的任务
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//2、设置任务执行异常
setException(ex);
}
if (ran)
//3、设置任务返回结果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
/**
* Causes this future to report an {@link ExecutionException}
* with the given throwable as its cause, unless this future has
* already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon failure of the computation.
*
* @param t the cause of failure
*/
protected void setException(Throwable t) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
//将异常信息赋值给返回对象
outcome = t;
STATE.setRelease(this, EXCEPTIONAL); // final state
finishCompletion();
}
}
/**
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
//将任务运行结果赋值给返回对象
outcome = v;
STATE.setRelease(this, NORMAL); // final state
finishCompletion();
}
}
创建FutureTask对象后,调用run() 运行任务,run()方法的主要功能是执行传入的任务、设置任务执行异常、设置任务返回结果,从源码中可以看出任务运行结果或者异常信息都是赋值给outcome对象。
FutureTask的使用
回到文章开头的那个简单例子,我们可以用FutureTask来包装运行ThreadTask,代码改造如下:
public class ThreadTask implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" run");
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
String retValue ="reture value";
FutureTask<String> futureTask = new FutureTask(new ThreadTask(),retValue);
/*//主线程main运行ThreadTask
futureTask.run();
System.out.println(futureTask.get());*/
//子线程t运行ThreadTask
Thread t = new Thread(futureTask);
t.start();
System.out.println(futureTask.get());
}
}
运行结果:
Thread-0 run
reture value
用FutureTask对象包装Runnable对象后,将FutureTask对象传入Thread构造函数创建线程对象并启动线程,也可以使用主线程直接运行任务,但是这里还有一个问题就是返回值不是任务返回的,我们用Callable任务创建FutureTask对象。
public class ThreadTask2 implements Callable<String> {
@Override
public String call() throws InterruptedException {
//模拟线程执行任务耗时
Thread.sleep(2);
String retValue ="reture value";
System.out.println(Thread.currentThread().getName()+" run");
return retValue;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask(new ThreadTask2());
Thread t = new Thread(futureTask);
t.start();
Thread.sleep(1);
//判断线程任务是否完成
while (!futureTask.isDone()) {
System.out.println("task running...");
}
System.out.println("task finish,"+futureTask.get());
}
}
运行结果:
...
task running...
task running...
Thread-0 run
task finish,reture value
用FutureTask对象包装Callable对象后,将FutureTask对象传入Thread构造函数创建线程对象并启动线程,通过isDone()方法判断任务是否完成,通过get()方法获取任务返回结果。
总结
本文一步步分析了线程返回值的来龙去脉,文章较长,我们总结下过程:
1、Thread类只接收Runnable类型的任务但是没有返回值,Callable类型的任务有返回值但是不能往Thread类里面传,所以引入了Future接口对任务返回结果进行表示。
2、Future接口更多的是对异步任务执行结果的抽象,并没有提交给线程执行的相关方法,所以还是不能“往Thread类的构造函数里面扔”,所以又引入了FutureTask类,一个FutureTask类型的任务同时具备提交到线程运行的能力和获取返回结果的能力。
如果文章对您有帮助,不妨“帧栈”一下,关注“帧栈”公众号,第一时间获取推送文章,您的肯定将是我写作的动力!