ScheduledThreadPoolExecutor源码解读(一)——DelayedWorkQueue高度定制延迟阻塞优先工作队列

更多JUC源码解读系列文章请持续关注JUC源码解读文章目录JDK8


一、前言

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,所以其内部的数据结构和ThreadPoolExecutor基本一样,并在其基础上增加了按时间调度执行任务的功能,分为延迟执行任务和周期性执行任务。

二、构造函数

ScheduledThreadPoolExecutor的构造函数只能传3个参数corePoolSizeThreadFactoryRejectedExecutionHandler,默认maximumPoolSizeInteger.MAX_VALUE

工作队列是高度定制化的延迟阻塞队列DelayedWorkQueue,其实现原理和DelayQueue基本一样,核心数据结构是二叉最小堆的优先队列,队列满时会自动扩容,所以offer操作永远不会阻塞,maximumPoolSize也就用不上了,所以线程池中永远会保持至多有corePoolSize个工作线程正在运行。

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

三、DelayedWorkQueue延迟阻塞队列

1、基本架构

DelayedWorkQueue的实现原理中规中矩,内部维护了一个以RunnableScheduledFuture类型数组实现的最小二叉堆,初始容量是16,使用ReentrantLockCondition实现生产者和消费者模式。

static class DelayedWorkQueue extends AbstractQueue<Runnable>
    implements BlockingQueue<Runnable> {
    
    private static final int INITIAL_CAPACITY = 16;
    private RunnableScheduledFuture<?>[] queue =
        new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    private final ReentrantLock lock = new ReentrantLock();
    private int size = 0;
    private Thread leader = null;
    private final Condition available = lock.newCondition();
}

2、offer添加元素

ScheduledThreadPoolExecutor提交任务时调用的是DelayedWorkQueue.add,而addput等一些对外提供的添加元素的方法都调用了offer,其基本流程如下:

  • 其作为生产者的入口,首先获取锁。
  • 判断队列是否要满了(size >= queue.length),满了就扩容grow()
  • 队列未满,size+1。
  • 判断添加的元素是否是第一个,是则不需要堆化。
  • 添加的元素不是第一个,则需要堆化siftUp
  • 如果堆顶元素刚好是此时被添加的元素,则唤醒take线程消费。
  • 最终释放锁。
public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        if (i >= queue.length)
            //扩容
            grow();
        size = i + 1;
        if (i == 0) {
            //如果是入的是第一个元素,不需要堆化
            queue[0] = e;
            setIndex(e, 0);
        } else {
            //堆化
            siftUp(i, e);
        }
        if (queue[0] == e) {
            //如果堆顶元素刚好是入队列的元素,则唤醒take
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}

如图为offer基本流程图:
offer

(1)扩容grow

可以看到,当队列满时,不会阻塞等待,而是继续扩容。新容量newCapacity在旧容量oldCapacity的基础上扩容50%(oldCapacity >> 1相当于oldCapacity /2)。最后Arrays.copyOf,先根据newCapacity创建一个新的空数组,然后将旧数组的数据复制到新数组中。

private void grow() {
    int oldCapacity = queue.length;
    int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
    if (newCapacity < 0) // overflow
        newCapacity = Integer.MAX_VALUE;
    queue = Arrays.copyOf(queue, newCapacity);
}

(2)向上堆化siftUp

新添加的元素先会加到堆底,然后一步步和上面的父亲节点比较,若小于父亲节点则和父亲节点互换位置,循环比较直至大于父亲节点才结束循环。

private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        //找到父亲节点
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture<?> e = queue[parent];
        if (key.compareTo(e) >= 0)
            // 添加的元素 大于父亲节点,则结束循环
            break;
        //添加的元素小于父亲节点,则位置互换
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
}

如下图为siftUp向上堆化过程图:

siftup

3、take消费元素

Worker工作线程启动后就会循环消费工作队列中的元素,因为ScheduledThreadPoolExecutorkeepAliveTime=0,所以消费任务其只调用了DelayedWorkQueue.take。take基本流程如下:

  • 首先获取可中断锁,判断堆顶元素是否是空,空的则阻塞等待available.await()
  • 堆顶元素不为空,则获取其延迟执行时间delaydelay <= 0说明到了执行时间,出队列finishPoll
  • delay > 0还没到执行时间,判断leader线程是否为空,不为空则说明有其他take线程也在等待,当前take将无限期阻塞等待。
  • leader线程为空,当前take线程设置为leader,并阻塞等待delay时长。
  • 当前leader线程等待delay时长自动唤醒护着被其他take线程唤醒,则最终将leader设置为null
  • 再循环一次判断delay <= 0出队列。
  • 跳出循环后判断leader为空并且堆顶元素不为空,则唤醒其他take线程,最后是否锁。
public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0]; //取出堆顶元素
            if (first == null)
                //堆为空,等待
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    //到了执行时间,出队列
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                //还没到执行时间
                if (leader != null)
                    //此时若有其他take线程在等待,当前take将无限期等待
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

如下图为take基本流程图:
take

(1)take线程阻塞等待

可以看出这个生产者take线程会在两种情况下阻塞等待:

  • 堆顶元素为空。
  • 堆顶元素的delay>0。
(2)finishPoll出队列

堆顶元素delay<=0,执行时间到,出队列就是一个向下堆化的过程siftDown

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    int s = --size;
    RunnableScheduledFuture<?> x = queue[s];
    queue[s] = null;
    if (s != 0)
        siftDown(0, x);
    setIndex(f, -1);
    return f;
}
(3)siftDown向下堆化

由于堆顶元素出队列后,就破坏了堆的结构,需要组织整理下,将堆尾元素移到堆顶,然后向下堆化:

  • 从堆顶开始,父亲节点与左右子节点中较小的孩子节点比较(左孩子不一定小于右孩子)。
  • 父亲节点小于等于较小孩子节点,则结束循环,不需要交换位置。
  • 若父亲节点大于较小孩子节点,则交换位置。
  • 继续向下循环判断父亲节点和孩子节点的关系,直到父亲节点小于等于较小孩子节点才结束循环。
private void siftDown(int k, RunnableScheduledFuture<?> key) {
    //k = 0, key = queue[size-1]
    //无符号右移,相当于size/2
    int half = size >>> 1;
    while (k < half) {
        //只需要比较一半
        //找到左孩子节点
        // child = 2k + 1
        int child = (k << 1) + 1;
        RunnableScheduledFuture<?> c = queue[child];
        //右孩子节点
        int right = child + 1;
        //比较左右孩子大小
        if (right < size && c.compareTo(queue[right]) > 0)
            //c左孩子大于右孩子,则将有孩子赋值给左孩子
            c = queue[child = right];
        //比较key和孩子c
        if (key.compareTo(c) <= 0)
            //key小于等于c,则结束循环
            break;
        //key大于孩子c,则key与孩子交换位置
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    queue[k] = key;
    setIndex(key, k);
}

代码中使用移位运算,需要说明:

  • half = size >>> 1无符号右移,相当于size/2,只比较一般的元素,即左子树,因为左右子树比较后较小元素会在左边。
  • child = (k << 1) + 1相当于child = 2k + 1

如下图为siftDown向下堆化过程图:

siftDown

(4)leader线程

leader线程的设计,是Leader-Follower模式的变种,旨在于为了不必要的时间等待。当一个take线程变成leader线程时,只需要等待下一次的延迟时间,而不是leader线程的其他take线程则需要等leader线程出队列了才唤醒其他take线程。

4、remove删除指定元素

删除指定元素一般用于取消任务时,任务还在阻塞队列中,则需要将其删除。当删除的元素不是堆尾元素时,需要做堆化处理。

public boolean remove(Object x) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = indexOf(x);
        if (i < 0)
            return false;
        //维护heapIndex
        setIndex(queue[i], -1);
        int s = --size;
        RunnableScheduledFuture<?> replacement = queue[s];
        queue[s] = null;
        if (s != i) {
            //删除的不是堆尾元素,则需要堆化处理
            //先向下堆化
            siftDown(i, replacement);
            if (queue[i] == replacement)
                //若向下堆化后,i位置的元素还是replacement,说明四无需向下堆化的,
                //则需要向上堆化
                siftUp(i, replacement);
        }
        return true;
    } finally {
        lock.unlock();
    }
}

四、总结

  1. DelayedWorkQueue添加元素满了之后会自动扩容原来容量的1/2,即永远不会阻塞,最大扩容可达Integer.MAX_VALUE,所以线程池中至多有corePoolSize个工作线程正在运行。。
  2. DelayedWorkQueue 消费元素take,在堆顶元素为空和delay >0 时,阻塞等待。
  3. DelayedWorkQueue 是一个生产永远不会阻塞,消费可以阻塞的生产者消费者模式。
  4. DelayedWorkQueue 有一个leader线程的变量,是Leader-Follower模式的变种。当一个take线程变成leader线程时,只需要等待下一次的延迟时间,而不是leader线程的其他take线程则需要等leader线程出队列了才唤醒其他take线程。

PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

徐同学呀

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值