HashedWheelTimer源码分析

前言

Java Timer 是个单线程的基于小根堆结构来维护延时任务的调度器,任务的调度和执行在同一个线程上,任务之间会相互影响,同时基于数组实现的小根堆也不适合维护大量任务。

之后,Java 又推出了多线程版本的 ScheduledThreadPoolExecutor,性能更好,资源利用率更高,因为是多线程,任务之间的影响也相对较小,同时对异常进行了捕获,不会因为单个任务执行异常而退出线程。但是,基于数组实现的小根堆同样不适合维护大量任务,任务数一旦多起来,对内存的压力以及堆的维护开销都不容小觑。其次,延时任务的调度和执行还是在同一线程,并没有隔离开来。

延时任务的实现,还有一种时间轮算法,用来解决传统定时器在大量任务时的性能问题,它非常高效,尤其在处理大量短时定时任务时,时间复杂度接近O(1),代价是要牺牲一定的时间精度。

Netty 的 HashedWheelTimer 就是时间轮算法的一种实现,Netty 内部在它来处理 连接超时、空闲检测等任务。

时间轮算法

时间轮算法的思想:如图所示,圆环就是一个时间轮,它共有 8 个刻度,假设每个刻度代表一秒钟。延时任务会根据延迟时间添加到时间轮对应的刻度上。Data1、Data2 延迟时间都是一秒,所以被添加到刻度1上;Data4 延迟时间 14 秒,饶了一圈后被添加到刻度6上。同时,会有一个指向当前时间刻度的指针,沿着时间轮顺时针旋转,指针每秒前进一个刻度,并把当前刻度上所有的延迟时间已到的延时任务全部执行一遍。

时间轮算法的核心是,仅靠单个线程,就可以完成大量任务的调度。因为任务被分配到不同的Bucket上,线程调度时只关心当前Bucket上有哪些任务到期,性能也会更好。

HashedWheelTimer

HashedWheelTimer的构造函数以及各参数含义

public HashedWheelTimer(
    ThreadFactory threadFactory,
    long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
    long maxPendingTimeouts, Executor taskExecutor) {
    checkNotNull(threadFactory, "threadFactory");
    checkNotNull(unit, "unit");
    checkPositive(tickDuration, "tickDuration");
    checkPositive(ticksPerWheel, "ticksPerWheel");
    this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
    // Normalize ticksPerWheel to power of two and initialize the wheel.
    wheel = createWheel(ticksPerWheel);
    mask = wheel.length - 1;
    // Convert tickDuration to nanos.
    long duration = unit.toNanos(tickDuration);
    // Prevent overflow.
    if (duration >= Long.MAX_VALUE / wheel.length) {
        throw new IllegalArgumentException(String.format(
            "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
            tickDuration, Long.MAX_VALUE / wheel.length));
    }
    if (duration < MILLISECOND_NANOS) {
        logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
                    tickDuration, MILLISECOND_NANOS);
        this.tickDuration = MILLISECOND_NANOS;
    } else {
        this.tickDuration = duration;
    }
    workerThread = threadFactory.newThread(worker);
    leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
    this.maxPendingTimeouts = maxPendingTimeouts;
    if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
        WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
        reportTooManyInstances();
    }
}
  • threadFactory 创建Worker线程的工厂
  • tickDuration 时间轮的间隔时间,最小精确到1毫秒
  • unit 时间轮的间隔时间单位
  • ticksPerWheel 时间轮的大小,默认512
  • leakDetection 是否开启内存泄漏检测
  • maxPendingTimeouts 最大等待任务数,默认不限制
  • taskExecutor 任务执行器,如果异步执行任务,那么调度线程和执行线程就隔离开了,更加利于任务的调度

构造函数流程:

  1. createWheel 创建时间轮,也就是 HashedWheelBucket 数组,数组的大小必须是2的幂次方大小
  2. 计算 mask 掩码,便于计算当前要处理的时间轮下标
  3. 时间轮的调度间隔 tickDuration 转换为纳秒单位,间隔不可小于1毫秒
  4. 通过线程工厂创建工作线程 workerThread
  5. 判断是否要开启内存泄漏检测,开启则创建一个 ResourceLeakTracker
  6. 判断时间轮的实例数是否超过64个,超过则日志告警

createWheel 用来创建时间轮,底层就是个环形数组,数组的长度必须是2的幂次方数。

private static HashedWheelBucket[] createWheel(int ticksPerWheel) {

checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {
    wheel[i] = new HashedWheelBucket();
}
return wheel;
}

数组中的元素是 HashedWheelBucket,用来存储落在同一刻度上的任务,底层是双向链表实现。

private static final class HashedWheelBucket {
    private HashedWheelTimeout head; // 头节点
    private HashedWheelTimeout tail; // 尾节点
}

延时任务提交 newTimeout,任务被封装成 TimerTask,同时提供任务延迟执行的时间

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    checkNotNull(task, "task");
    checkNotNull(unit, "unit");
    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts ("
                                             + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                                             + "timeouts (" + maxPendingTimeouts + ")");
    }
    start();

    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    if (delay > 0 && deadline < 0) {
        deadline = Long.MAX_VALUE;
    }
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);
    return timeout;
}
  1. pendingTimeouts 代表等待执行的任务数量,首先是检查任务数是否超过阈值,超过则直接抛出拒绝异常。默认是不限制的,需要注意是否有足够的内存来容纳这些任务。
  2. 接着 start 开启线程,Java Timer 实例化时就启动线程,HashedWheelTimer 则是懒启动,更利于资源。
  3. Worker线程启动后,会把启动时间赋值给 startTime,延时任务的 deadline 就是当前时间+延迟时间-startTime
  4. 最后把任务封装成 HashedWheelTimeout,加入到 timeouts 队列。

注意,此时任务只是加入到 timeouts 队列,还没有加入到时间轮的Bucket中。

Worker线程启动后,执行的 Runnable 是 内部类 Worker,变量 tick 代表时间轮运行的指针,也就是根据 tick 计算当前要处理的 Bucket。

private final class Worker implements Runnable {
    private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

    private long tick;

    @Override
    public void run() {
        // Initialize the startTime.
        startTime = System.nanoTime();
        if (startTime == 0) {
            // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
            startTime = 1;
        }

        // Notify the other threads waiting for the initialization at start().
        startTimeInitialized.countDown();

        do {
            final long deadline = waitForNextTick();
            if (deadline > 0) {
                int idx = (int) (tick & mask);
                processCancelledTasks();
                HashedWheelBucket bucket =
                wheel[idx];
                transferTimeoutsToBuckets();
                bucket.expireTimeouts(deadline);
                tick++;
            }
        } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

        // Fill the unprocessedTimeouts so we can return them from stop() method.
        for (HashedWheelBucket bucket: wheel) {
            bucket.clearTimeouts(unprocessedTimeouts);
        }
        for (;;) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                break;
            }
            if (!timeout.isCancelled()) {
                unprocessedTimeouts.add(timeout);
            }
        }
        processCancelledTasks();
    }
}

Worker#run 流程:

  1. 先根据 tickDuration 和 tick 计算要睡眠的时间,线程sleep
  2. 时间一到,根据 (tick & mask) 计算要处理的 Bucket
  3. processCancelledTasks 先移除掉期间已经被取消的任务
  4. transferTimeoutsToBuckets 把先前入到 timeouts 队列里的任务,迁移到 Bucket 里
  5. bucket.expireTimeouts 把 Bucket 里已经到期的任务执行并移除
  6. 时间轮关闭后,把没执行的任务添加到 unprocessedTimeouts,以便在 stop 中返回

transferTimeoutsToBuckets 会把 timeouts 中的任务迁移到对应 Bucket

private void transferTimeoutsToBuckets() {
    for (int i = 0; i < 100000; i++) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            // all processed
            break;
        }
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
            // Was cancelled in the meantime.
            continue;
        }

        long calculated = timeout.deadline / tickDuration;
        timeout.remainingRounds = (calculated - tick) / wheel.length;

        final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
        int stopIndex = (int) (ticks & mask);

        HashedWheelBucket bucket = wheel[stopIndex];
        bucket.addTimeout(timeout);
    }
}
  1. 为了防止任务迁移耗时太久,导致Worker线程时间精度失效,限制最多迁移100000个任务
  2. 时间轮一圈的时间范围由 tickDuration 和 Bucket 数量决定,如果任务的到期时间超过一轮,属性 remainingRounds 就会记录剩余的轮数。时间轮调度时,对于 remainingRounds>0 的任务,仅会递减剩余的轮数,等待下一轮再调度执行

任务迁移到对应 Bucket 后,bucket#expireTimeouts 会处理当前 Bucket 里的任务

public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) {
    HashedWheelTimeout next = timeout.next;
    if (timeout.remainingRounds <= 0) {
        next = remove(timeout);
        if (timeout.deadline <= deadline) {
            timeout.expire();
        } else {
            // The timeout was placed into a wrong slot. This should never happen.
            throw new IllegalStateException(String.format(
                "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
        }
    } else if (timeout.isCancelled()) {
        next = remove(timeout);
    } else {
        timeout.remainingRounds --;
    }
    timeout = next;
}
}
  1. HashedWheelBucket 是双向链表,先根据 head 找到头节点,再向后扫描
  2. 如果任务的 remainingRounds>0 ,代表任务的延迟时间超过一轮,仅递减轮数,等待后续调度
  3. 如果任务已经被取消,则直接移除掉
  4. 否则代表任务到期,将其从Bucket里移除,再执行任务

尾巴

HashedWheelTimer 是 Netty 基于 时间轮算法‌ 实现的高效定时任务调度器,适用于处理大规模延迟任务或周期性任务(如超时检测、心跳等)。它将任务散列到时间轮的Bucket中,通过固定时间间隔推进轮子,实现任务的调度和执行。

它的特点是 适合处理大量的短时任务,效率非常高,代价是牺牲了一定的时间精度,任务的调度时间受到 tickDuration 的影响,不适用于高精度定时场景。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

程序员小潘

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值