系统掌握并发编程系列(三)一步步剖析线程返回值

通过前面两篇文章,我们已经掌握了创建线程、运行、停止相关方法了,接下来我们继续学习线程的返回值,这样线程创建、运行、返回、终止的整个过程算是有个闭环了。

Runnable接口

Runnable接口是并发编程的核心接口之一,作用是‌定义线程执行的任务,将任务逻辑与线程的执行机制解耦。

@FunctionalInterface
public interface Runnable {
    /**
     * Runs this operation.
     */
    void run();
}

接口中只有一个run()方法,该方法没有返回值,其设计同样遵循单一职责原则,实现了该接口表明具备运行线程任务的能力,Thread类同样实现了该接口。看下面一个简单的例子。

public class ThreadTask implements Runnable{
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+" run");
    }
    public static void main(String[] args) {
        Thread t = new Thread(new ThreadTask());
        t.start();
    }
}

上面的例子,线程t启动运行完后就完了,但是如果想要线程t运行完后返回一个变量,那该如何实现?

Callable接口

在jdk1.5之后,引入Callable接口来解决线程返回值的问题。

/**
 * A task that returns a result and may throw an exception.
 * Implementors define a single method with no arguments called
 * {@code call}.
 *
 * <p>The {@code Callable} interface is similar to {@link
 * java.lang.Runnable}, in that both are designed for classes whose
 * instances are potentially executed by another thread.  A
 * {@code Runnable}, however, does not return a result and cannot
 * throw a checked exception.
 *
 * <p>The {@link Executors} class contains utility methods to
 * convert from other common forms to {@code Callable} classes.
 *
 * @see Executor
 * @since 1.5
 * @author Doug Lea
 * @param <V> the result type of method {@code call}
 */
@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

从注释中可以看出,Callable接口类似Runnable接口,同样是用来定义线程执行的任务,也是只有一个方法,但是Callable接口比Runnable接口多了两个功能,就是可以返回任意类型的返回值和可以抛出异常。需要注意的是Callable类型的任务对象并不能像Runnable类型的任务对象一样用,直接传入Thread类的构造函数创建线程,从前面的文章中我们得知Thread类的构造函数并没有Callable类型的参数。那Callable怎么用?线程返回结果用什么表示?我们接着往下看。

Future接口

Thread类只接收Runnable类型的任务但是没有返回值,Callable类型的任务有返回值但是不能往Thread类里面传。这不就是牛郎和织女隔着一条银河想相爱又不能靠近吗,给他们搭一座桥满足他们吧。面向对象程序设计就是这么人性化,总能在生活中找比喻的例子。Future与FutureTask的引入就是为了解决这一问题。Future接口也是在jdk1.5之后引入的,源码如下:

/**
 * A {@code Future} represents the result of an asynchronous
 * computation.  Methods are provided to check if the computation is
 * complete, to wait for its completion, and to retrieve the result of
 * the computation.  The result can only be retrieved using method
 * {@code get} when the computation has completed, blocking if
 * necessary until it is ready.  Cancellation is performed by the
 * {@code cancel} method.  Additional methods are provided to
 * determine if the task completed normally or was cancelled. Once a
 * computation has completed, the computation cannot be cancelled.
 * If you would like to use a {@code Future} for the sake
 * of cancellability but not provide a usable result, you can
 * declare types of the form {@code Future<?>} and
 * return {@code null} as a result of the underlying task.
 ...
 *
 * @see FutureTask
 * @see Executor
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this Future's {@code get} method
 */
public interface Future<V> {
    ...
}

Future接口表示一个异步计算任务的返回结果,提供了检查任务是否完成、等待任务完成直到返回结果、取消任务执行、获取任务状态等方法。下面我们来学习下Future接口的属性和方法。

在这里插入图片描述

1、get()方法

/**
 * Waits if necessary for the computation to complete, and then
 * retrieves its result.
 *
 * @return the computed result
 * @throws CancellationException if the computation was cancelled
 * @throws ExecutionException if the computation threw an
 * exception
 * @throws InterruptedException if the current thread was interrupted
 * while waiting
 */
V get() throws InterruptedException, ExecutionException;

调用get()方法将等待任务完成并返回结果,如果在等待任务完成的过程中任务被取消了将会抛出取消异常,如果任务执行过程出现异常将会抛出执行异常,如果在等待任务完成的过程中线程被中断,将会抛出中断异常。V get(long timeout, TimeUnit unit)方法可以指定等待指定时间,等待超时了还没有返回则抛出超时异常。

2、boolean cancel(boolean mayInterruptIfRunning)方法

    /**
     * Attempts to cancel execution of this task.  This method has no
     * effect if the task is already completed or cancelled, or could
     * not be cancelled for some other reason.  Otherwise, if this
     * task has not started when {@code cancel} is called, this task
     * should never run.  If the task has already started, then the
     * {@code mayInterruptIfRunning} parameter determines whether the
     * thread executing this task (when known by the implementation)
     * is interrupted in an attempt to stop the task.
     *
     * <p>The return value from this method does not necessarily
     * indicate whether the task is now cancelled; use {@link
     * #isCancelled}.
     *
     * @param mayInterruptIfRunning {@code true} if the thread
     * executing this task should be interrupted (if the thread is
     * known to the implementation); otherwise, in-progress tasks are
     * allowed to complete
     * @return {@code false} if the task could not be cancelled,
     * typically because it has already completed; {@code true}
     * otherwise. If two or more threads cause a task to be cancelled,
     * then at least one of them returns {@code true}. Implementations
     * may provide stronger guarantees.
     */
    boolean cancel(boolean mayInterruptIfRunning);

调用该方法将试图去取消正在执行的任务,需要注意的是,如果任务还没有启动就调用该方法,那么任务将不会执行。如果任务已经启动了,传入的参数mayInterruptIfRunning将决定正在执行的线程是否可以被中断以便停止任务。

传入的mayInterruptIfRunning的值为true时,正在运行中的线程将被中断(如果已知线程已经执行完成),否则传入false时,进行中的任务应当允许完成。

如果任务不可以被取消那么将返回false,否则返回true。如果有多个线程同时取消了任务,那么至少有一个线程返回true。

综合上述,调用该方法只是试图去取消任务,并不能确定任务现在是已经被取消了,需要通过调用isCancelled()方法来判断,如果任务在正常的完成之前被取消了,那么该方法将返回true。

3、isDone()方法

    /**
     * Returns {@code true} if this task completed.
     *
     * Completion may be due to normal termination, an exception, or
     * cancellation -- in all of these cases, this method will return
     * {@code true}.
     *
     * @return {@code true} if this task completed
     */
    boolean isDone();

判断任务是否已经完成。

4、resultNow()方法

/**
     * Returns the computed result, without waiting.
     *
     * <p> This method is for cases where the caller knows that the task has
     * already completed successfully, for example when filtering a stream
     * of Future objects for the successful tasks and using a mapping
     * operation to obtain a stream of results.
     * {@snippet lang=java :
     *     results = futures.stream()
     *                .filter(f -> f.state() == Future.State.SUCCESS)
     *                .map(Future::resultNow)
     *                .toList();
     * }
     *
     * @implSpec
     * The default implementation invokes {@code isDone()} to test if the task
     * has completed. If done, it invokes {@code get()} to obtain the result.
     *
     * @return the computed result
     * @throws IllegalStateException if the task has not completed or the task
     * did not complete with a result
     * @since 19
     */
    default V resultNow() {
        if (!isDone())
            throw new IllegalStateException("Task has not completed");
        boolean interrupted = false;
        try {
            while (true) {
                try {
                    return get();
                } catch (InterruptedException e) {
                    interrupted = true;
                } catch (ExecutionException e) {
                    throw new IllegalStateException("Task completed with exception");
                } catch (CancellationException e) {
                    throw new IllegalStateException("Task was cancelled");
                }
            }
        } finally {
            if (interrupted) Thread.currentThread().interrupt();
        }
    }

从jdk19开始引入,该方法是从jdk19开始引入的,用来立即返回计算结果,不会等待。通常用在已知任务已经成功完成的场景,内部通过调用isDone()判断任务是否完成,调用get()获取结果。如果在任务还没有完成或者没有返回结果的情况下调用,将会抛非法状态异常。

5、exceptionNow()方法

/**
 * Returns the exception thrown by the task, without waiting.
 *
 * <p> This method is for cases where the caller knows that the task
 * has already completed with an exception.
 *
 * @implSpec
 * The default implementation invokes {@code isDone()} to test if the task
 * has completed. If done and not cancelled, it invokes {@code get()} and
 * catches the {@code ExecutionException} to obtain the exception.
 *
 * @return the exception thrown by the task
 * @throws IllegalStateException if the task has not completed, the task
 * completed normally, or the task was cancelled
 * @since 19
 */
default Throwable exceptionNow() {
    if (!isDone())
        throw new IllegalStateException("Task has not completed");
    if (isCancelled())
        throw new IllegalStateException("Task was cancelled");
    boolean interrupted = false;
    try {
        while (true) {
            try {
                get();
                throw new IllegalStateException("Task completed with a result");
            } catch (InterruptedException e) {
                interrupted = true;
            } catch (ExecutionException e) {
                return e.getCause();
            }
        }
    } finally {
        if (interrupted) Thread.currentThread().interrupt();
    }
}

从jdk19开始引入,该方法用来返回任务执行过程抛出的异常,不会等待。内部通过调用isDone()判断任务是否完成,如果已经完成则抛出状态不合法异常,调用isCancelled()方法判断任务是否已经被取消,如果已经被取消了则抛出状态不合法异常,捕获get()抛出的异常信息并返回。

从源码中可以看出,resultNow()方法和exceptionNow()通过default关键字定义了默认的具体实现。

6、State属性

从jdk19开始引入,用枚举State来表示任务的运行状态,包含运行中、完成、失败、取消状态。

enum State {
    /**
     * The task has not completed.
     */
    RUNNING,
    /**
     * The task completed with a result.
     * @see Future#resultNow()
     */
    SUCCESS,
    /**
     * The task completed with an exception.
     * @see Future#exceptionNow()
     */
    FAILED,
    /**
     * The task was cancelled.
     * @see #cancel(boolean)
     */
    CANCELLED
}
FutureTask类

Future接口更多的是对异步任务执行结果的抽象,并没有提交给线程执行的相关方法,所以还是不能“往Thread类的构造函数里面扔”,所以又引入了FutureTask类。

/**
...
 *
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this FutureTask's {@code get} methods
 */
public class FutureTask<V> implements RunnableFuture<V> {
...
}

FutureTask的引入是对Runnable和Callable对象的包装,FutureTask实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable接口和Future接口。因此一个FutureTask类型的任务将具备作为参数传入Thread类构造函数的能力(Thread构造函数接受Runnable类型的参数)和获取返回任务结果的能力(Future接口的get()方法获取返回结果)

/**
 * A {@link Future} that is {@link Runnable}. Successful execution of
 * the {@code run} method causes completion of the {@code Future}
 * and allows access to its results.
 * @see FutureTask
 * @see Executor
 * @since 1.6
 * @author Doug Lea
 * @param <V> The result type returned by this Future's {@code get} method
 */
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

RunnableFuture接口只有一个run()方法,运行该方法将提交任务执行并对返回结果进行设置,由子类实现具体的逻辑,下面我们来分析下FutureTask的成员和方法。

1、FutureTask的主要成员

public class FutureTask<V> implements RunnableFuture<V> {
    ...
    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
    ...
}

FutureTask的运行状态我们暂且不管,从源码中可以看出:

(1)callable变量,Callable类的任务对象,用来接收外部传入的任务,该任务在运行完成后被设置为空。

(2)outcome变量,Object类型的返回结果,代表任务的执行结果或者运行时抛出的异常。

(3)runner变量,运行Callable任务的线程对象。

(4)waiters变量,等待线程的集合。

2、FutureTask的主要方法

在这里插入图片描述

从上图可以看出,FutureTask实现了Future接口的方法,并实现了run()方法。两个构造函数分别支持传入Runnable类型的任务和Callable类型的任务,我们摘取run()方法和两个构造函数进行分析。

(1)FutureTask(Runnable runnable, V result)

    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Runnable}, and arrange that {@code get} will return the
     * given result on successful completion.
     *
     * @param runnable the runnable task
     * @param result the result to return on successful completion. If
     * you don't need a particular result, consider using
     * constructions of the form:
     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
     * @throws NullPointerException if the runnable is null
     */
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

该构造函数支持传入Runnable类型的参数和泛型的返回结果来创建FutureTask对象,内部调用Executors.callable(runnable, result)方法对Runnable类型的任务包装成Callable类型的任务。

    /**
     * Returns a {@link Callable} object that, when
     * called, runs the given task and returns the given result.  This
     * can be useful when applying methods requiring a
     * {@code Callable} to an otherwise resultless action.
     * @param task the task to run
     * @param result the result to return
     * @param <T> the type of the result
     * @return a callable object
     * @throws NullPointerException if task null
     */
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

Executors.callable()方法内部创建了一个RunnableAdapter对象对Runnable对象进行适配转换成Callable对象。

    /**
     * A callable that runs given task and returns given result.
     */
    private static final class RunnableAdapter<T> implements Callable<T> {
        private final Runnable task;
        private final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
        public String toString() {
            return super.toString() + "[Wrapped task = " + task + "]";
        }
    }

RunnableAdapter适配器实现了Callable接口,因此具备了执行提交任务并返回结果的能力,适配器内部还是用Runnable接收传入的任务,只是将任务放到了重写Callable的call()方法中执行,顺利的将Runnable任务转换为Callable任务。

(2)FutureTask(Callable callable)

    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Callable}.
     *
     * @param  callable the callable task
     * @throws NullPointerException if the callable is null
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

该构造函数直接用Callable任务创建FutureTask对象。

(3)run() 方法

public void run() {
    if (state != NEW ||
        !RUNNER.compareAndSet(this, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //1、执行传入的任务
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                //2、设置任务执行异常
                setException(ex);
            }
            if (ran)
                //3、设置任务返回结果
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
/**
 * Causes this future to report an {@link ExecutionException}
 * with the given throwable as its cause, unless this future has
 * already been set or has been cancelled.
 *
 * <p>This method is invoked internally by the {@link #run} method
 * upon failure of the computation.
 *
 * @param t the cause of failure
 */
protected void setException(Throwable t) {
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
        //将异常信息赋值给返回对象
        outcome = t;
        STATE.setRelease(this, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
/**
 * Sets the result of this future to the given value unless
 * this future has already been set or has been cancelled.
 *
 * <p>This method is invoked internally by the {@link #run} method
 * upon successful completion of the computation.
 *
 * @param v the value
 */
protected void set(V v) {
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
        //将任务运行结果赋值给返回对象
        outcome = v;
        STATE.setRelease(this, NORMAL); // final state
        finishCompletion();
    }
}

创建FutureTask对象后,调用run() 运行任务,run()方法的主要功能是执行传入的任务、设置任务执行异常、设置任务返回结果,从源码中可以看出任务运行结果或者异常信息都是赋值给outcome对象。

FutureTask的使用

回到文章开头的那个简单例子,我们可以用FutureTask来包装运行ThreadTask,代码改造如下:

public class ThreadTask implements Runnable{
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+" run");
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String retValue ="reture value";
        FutureTask<String> futureTask = new FutureTask(new ThreadTask(),retValue);
        /*//主线程main运行ThreadTask
        futureTask.run();
        System.out.println(futureTask.get());*/
        //子线程t运行ThreadTask
        Thread t = new Thread(futureTask);
        t.start();
        System.out.println(futureTask.get());
    }
}

运行结果:

Thread-0 run
reture value

用FutureTask对象包装Runnable对象后,将FutureTask对象传入Thread构造函数创建线程对象并启动线程,也可以使用主线程直接运行任务,但是这里还有一个问题就是返回值不是任务返回的,我们用Callable任务创建FutureTask对象。

public class ThreadTask2 implements Callable<String> {
    @Override
    public String call() throws InterruptedException {
        //模拟线程执行任务耗时
        Thread.sleep(2);
        String retValue ="reture value";
        System.out.println(Thread.currentThread().getName()+" run");
        return retValue;
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask(new ThreadTask2());
        Thread t = new Thread(futureTask);
        t.start();
        Thread.sleep(1);
        //判断线程任务是否完成
        while (!futureTask.isDone()) {
            System.out.println("task running...");
        }
        System.out.println("task finish,"+futureTask.get());
    }
}

运行结果:

...
task running...
task running...
Thread-0 run
task finish,reture value

用FutureTask对象包装Callable对象后,将FutureTask对象传入Thread构造函数创建线程对象并启动线程,通过isDone()方法判断任务是否完成,通过get()方法获取任务返回结果。

总结

本文一步步分析了线程返回值的来龙去脉,文章较长,我们总结下过程:

1、Thread类只接收Runnable类型的任务但是没有返回值,Callable类型的任务有返回值但是不能往Thread类里面传,所以引入了Future接口对任务返回结果进行表示。

2、Future接口更多的是对异步任务执行结果的抽象,并没有提交给线程执行的相关方法,所以还是不能“往Thread类的构造函数里面扔”,所以又引入了FutureTask类,一个FutureTask类型的任务同时具备提交到线程运行的能力和获取返回结果的能力。

如果文章对您有帮助,不妨“帧栈”一下,关注“帧栈”公众号,第一时间获取推送文章,您的肯定将是我写作的动力!

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值