转载请注明出处: http://blog.csdn.net/methods2011/article/details/52167248
上一篇由于后面写的匆匆,没有把工作线程处理那部分写的详细。上一篇链接:http://blog.csdn.net/methods2011/article/details/52139316
今天主要把工作线程相关的概念介绍一下,及他们在AsyncTask中的使用。
1.ThreadFactory
2.BlockingQueue<T>
3.LinkedBlockingQueue<T>
4.Executor
5.ThreadPoolExecutor
6.SerialExecutor
7.WorkerRunnable
8.Callable
9.ArrayDeque<Runnable>
10.FutureTask
11.AsyncTaskResult
线给他们归个类,
接口:
1.ThreadFactory
2.BlockingQueue<T>
4.Executor
8.Callable
1.ThreadFactory 线程工厂,他是用的工厂设计模式其中只定义了一个方法,如下是源码
package java.util.concurrent;
public interface ThreadFactory {
Thread newThread(Runnable r);
}
在AsyncTask中对它进行了实现
private static final ThreadFactory sThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(Runnable r) { return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); } };每次new一个Tread 并对其计数
看看怎么使用它的呢?
public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);只有这么一处没这个线程池用于创建线程需要传入的参数,用于创建多个线程。
2.BlockingQueue<T> 是个阻塞队列, 本身是个借口,继承Queue ,Queue又继承自Collection。是集合类中的一员,Queue和List相似
BlockingQueue包含Collection中的add remove等接口,但在这里还有另外一些方法,也是删除添加元素用。
1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则招聘异常
2)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.
3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.
4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null
5)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
看完这几个方法就明白了,其实就是队列。往里面添加数据,删除数据。其中有四个实现类
1)ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.
2)LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的
3)PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序.
4)SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的.
其中的2)LinkedBlockingQueue 就是AsyncTask中使用的
再看看AsyncTask中的源码
private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(128);
新建了一个最大为128的队列。
在AsyncTask中唯一应用的地方,用于线程池的创建
public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
4.Executor 是Java5中的java.util.concurrent包中的执行器(Executor)管理Thread对象。
看下源码就一个方法
package java.util.concurrent;
public interface Executor {
void execute(Runnable command);
}
在AsyncTask中实现它的类有ThreadPoolExecutor,SerialExecutor
ThreadPoolExecutor 这个是线程池,可以并发执行。创建时要传一些参数
SerialExecutor 这个是只能串行执行的,只能一个任务一个任务去执行,这个的实现是在AsyncTask中直接实现的。
默认AsyncTask中就是用的串行的SerialExecutor
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
这两个是可以切换的,下面这个是pubulic方法,把THREAD_POOL_EXECUTOR这个传进去就可以了。
public static void setDefaultExecutor(Executor exec) { sDefaultExecutor = exec; }再分别看看两个类
ThreadPoolExecutor
public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);其中的参数
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); private static final int CORE_POOL_SIZE = CPU_COUNT + 1; private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; private static final int KEEP_ALIVE = 1;分别介绍下他们的含义:
CORE_POOL_SIZE 核心线程池大小
MAXIMUM_POOL_SIZE 最大线程池大小
KEEP_ALIVE 线程池中超过CORE_POOL_SIZE数目的空闲线程最大存活时间
TimeUnit.SECONDS KEEP_ALIVE时间的单位,现在设置的是秒
sPoolWorkQueue 阻塞任务队列
sThreadFactory 新建线程工厂
ThreadPoolExecutor的使用,其实是预留切换并行用的,再另外的串行中,也用着工厂执行工作线程的任务了。看源码就明白了
private static class SerialExecutor implements Executor { final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>(); Runnable mActive; public synchronized void execute(final Runnable r) { mTasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (mActive == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((mActive = mTasks.poll()) != null) { THREAD_POOL_EXECUTOR.execute(mActive); } } }在最下面那个红色的就是用它来执行任务的。
正好直接看下SerialExecutor
SerialExecutor 线性执行器,就是,里边的任务要一个一个执行
这其中有个概念ArrayDeque<Runnable>双端队列,也是集成Queue的,是存入任务的地方。
可以看到这个类里的两个方法都是线程同步的,包含了synchronized 关键字
就是说只能一个任务一个任务执行,有多少线程也只能一个一个跑。
有一个挺有意思的,就是Runnable,调用了r.run()方法,一开始还不明白什么意思。
后来想了一会终于想明白了。正常开启线程都是start()方法,这里是没有开启新的线程,直接执行run()方法了
就是说直接在当前的工作线程执行了。
mTask.offer()方法是往队列里添加一个任务,就相当于往ArrayList里add一个item一样,只是这个用的是offer方法,添加的是实现了的Runner接口。
然后就执行了scheduleNext方法,poll方法是从里边取出刚才存放那个,赋值给mActive,然后让线程执行器去执行这个任务Runnable。
知道任务执行完毕为止。
到此这个串行就完成。并行的话就是线程池一起执行里边的所有任务,线程池也是有上线的。
7.WorkerRunnable
这个其实是一个抽象内部类,实现了CallBack接口,下面是在AsyncTask中的源码
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> { Params[] mParams; }
mWorker = new WorkerRunnable<Params, Result>() { public Result call() throws Exception { mTaskInvoked.set(true); Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); //noinspection unchecked Result result = doInBackground(mParams); Binder.flushPendingCommands(); return postResult(result); }这个上节讲过了这个Callable接口和Runnable接口差不多,他们俩都是可以被执行的线程。
只是Callable接口有返回值,
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }这是Callable源码。
Callable 和 Future接口
* Callable是类似于Runnable的接口,实现Callable接口的类和实现Runnable的类都是可被其它线程执行的任务。
* Callable和Runnable有几点不同:
* (1)Callable规定的方法是call(),而Runnable规定的方法是run().
* (2)Callable的任务执行后可返回值,而Runnable的任务是不能返回值的。
* (3)call()方法可抛出异常,而run()方法是不能抛出异常的。
* (4)运行Callable任务可拿到一个Future对象,
* Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。
* 通过Future对象可了解任务执行情况,可取消任务的执行,还可获取任务执行的结果。
AsyncTaskResult
这个类就是AsyncTask中封装的一个对象,发送Message用的,在AsyncTask中的代码如下
private static class AsyncTaskResult<Data> { final AsyncTask mTask; final Data[] mData; AsyncTaskResult(AsyncTask task, Data... data) { mTask = task; mData = data; } }handler接受消息进行处理
private static class InternalHandler extends Handler { public InternalHandler() { super(Looper.getMainLooper()); } @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"}) @Override public void handleMessage(Message msg) { AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj; switch (msg.what) { case MESSAGE_POST_RESULT: // There is only one result result.mTask.finish(result.mData[0]); break; case MESSAGE_POST_PROGRESS: result.mTask.onProgressUpdate(result.mData); break; } } }发送消息
*/ @WorkerThread protected final void publishProgress(Progress... values) { if (!isCancelled()) { getHandler().obtainMessage(MESSAGE_POST_PROGRESS, new AsyncTaskResult<Progress>(this, values)).sendToTarget(); } }发送消息
private Result postResult(Result result) { @SuppressWarnings("unchecked") Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT, new AsyncTaskResult<Result>(this, result)); message.sendToTarget(); return result; }
10.FutureTask
最后一个类先看源码
public class FutureTask<V> implements RunnableFuture<V> {
package java.util.concurrent; /** * A {@link Future} that is {@link Runnable}. Successful execution of * the {@code run} method causes completion of the {@code Future} * and allows access to its results. * @see FutureTask * @see Executor * @since 1.6 * @author Doug Lea * @param <V> The result type returned by this Future's {@code get} method */ public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }这个RunnableFuture很奇葩,实现两个接口
FutureTask其实就是执行任务的封装,可以当Runnable理解,也可以当Callable理解
任务执行的中间可以取消。
在AsyncTask中的源码
private final FutureTask<Result> mFuture;
mFuture = new FutureTask<Result>(mWorker) { @Override protected void done() { try { postResultIfNotInvoked(get()); } catch (InterruptedException e) { android.util.Log.w(LOG_TAG, e); } catch (ExecutionException e) { throw new RuntimeException("An error occurred while executing doInBackground()", e.getCause()); } catch (CancellationException e) { postResultIfNotInvoked(null); } } };重写了他的done方法,就是判断是否任务真正执行成功了。