Java高级主题:深入解析ThreadLocal的数据结构设计原理及其源代码实现

ThreadLocal可以实现完全基于无锁且也不是基于CAS的线程隔离需求,让每个线程可以有自己的本地实例,但如果对ThreadLocal底层设计不了解,那么对甚至无法正确ThreadLocal及其可能出现的内存泄露问题。可以说ThreadLocal的源代码设计也一种非常优秀的可支持“高并发”的实现。

1、基本用法:

1.1 demo1
public class ThreadLocalDemo {
    static class ProducerA extends Thread{
        @Override
        public void run(){
            ThreadLocal<String> var1=new ThreadLocal<>();
            ThreadLocal<String> var2=new ThreadLocal<>();
            // 普通用法
            var1.set("foo A");
            var2.set("bar A");
            System.out.println(var1.get()); // 输出foo A
            System.out.println(var2.get()); // 输出bar A
        }
    }

    static class ProducerB extends Thread{
        @Override
        public void run(){
            ThreadLocal<String> var1=new ThreadLocal<>();
            ThreadLocal<String> var2=new ThreadLocal<>();
            // 普通用法
            var1.set("foo B");
            var2.set("bar B");
            System.out.println(var1.get()); // 输出foo B
            System.out.println(var2.get()); // 输出bar B
        }
    }


    public static void main(String[] args) {

        new ProducerA().start();
        new ProducerB().start();
    }
}

这里创建了两个线程,每个线程内部有自己的ThreadLocal变量,线程之间ThreadLocal变量内部的set和get互相独立,互不影响,无需使用锁即可实现了线程安全操作。线程内部的变量使用set方法给定初始值、get方法取值,可以猜测其内部有类似HashMap这样的设计,但是否照搬HashMap数据结构设计呢? 其实不然,

在这里,ProducerA内部创建了一个ThreadLocalMap,ProducerB内部也创建了一个ThreadLocalMap,也即每个线程绑定一个自己内部ThreadLocalMap,这里提到的ThreadLocalMap就是提供了set、get方法的底层Map数据结构,所谓的ThreadLocal数据结构分析其实就是特指其内部的ThreadLocalMap的数据结构分析。

1.2 demo2
public class ThreadLocalDemo {

    static class HoldCount{
        int count;
        final long tid=0;
    }
    public static void main(String[] args) {

        ThreadLocal<HoldCount> rh=ThreadLocal.withInitial(HoldCount::new); // 设定rh这个ThreadLocal变量的初始值
        rh.set(new HoldCount());  // 将计数器放在rh中缓存
        HoldCount h= rh.get();
        System.out.println(h.count); // 这里输出的rh初始值,也即HoldCount的count属性初始值:0。
        for (int i = 0; i < 10; i++) {
            h.count++;
        }
        System.out.println(h.count);

        HoldCount newh=rh.get();// 更新缓存计数器后,再从ThreadLocal重新读取
        System.out.println(newh.count); // 可以读取新的计数值
        // 在rh这个ThreadLocal里面的Map结构中移除HoldCount实例对象
        rh.remove();
        System.out.println(rh.get()); // 此时rh里面Map已经不存在HoldCount对象,因此这里返回NUll

    }
}

在demo2中,给出了使用ThreadLocal后需要及时删除其实例对象的情况,这部分原因将在文章后面给出深入分析。

2、ThreadLocal内部数据结构简析

可以看到set方法是由内部ThreadLocalMap实现的set方法,既然是个“Map”,那么当然可以猜测是否跟HashMap的数据结构:数据+链表+红黑树类似呢?

    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }

其实ThreadLocalMap的数据结构没有HashMap数据结构复杂,ThreadLocalMap底层仅有一个table数组,这里,也许你会好奇:HashMap为了解决hash冲突,在数组的桶位上加入一条单向链表,冲突的entry自然会放入到此链表中(或者红黑树),那么问题来了,ThreadLocalMap底层仅有一个table数组,它是如何解决hash冲突?以下正式其设计原理之一,这里的数组给出最简单的情况,不包括“stale entry(无效entry)”的情况,以便让读者快速理解ThreadLocalMap设计原理:
在这里插入图片描述
可以看到图中所说的“解决冲突的方式:从i=3开始向后遍历出首个空slot,也即i=5,将keyC放入此空slot即可”的逻辑被称为“线性探测法”,所谓的“线性”就是o(n)复杂度的遍历操作,所谓的“探测”就是不断向后“探测、寻找”,直到找到首个空slot位置。

以上内容为ThreadLocalMap的放入new Entry的简单情况,如果有理解HashMap源代码设计的读者应该可以猜到其他重要设计:例如,当数组容量不够时,如何扩容,也即rehash(注意ThreadLocalMap里面的resize和rehash不是同一个逻辑),再例如ThreadLocalMap里面已经存在的entry,如果它的key已经变成无效(stale),那么如何该清理,或者说在set和get的线性探测过程中遇到有stale entry时,该如何清理?这些问题将在后面逐个深入探讨。

3、ThreadLocalMap基本成员变量的说明

    static class ThreadLocalMap {

        /**
         * The entries in this hash map extend WeakReference, using
         * its main ref field as the key (which is always a
         * ThreadLocal object).  Note that null keys (i.e. entry.get()
         * == null) mean that the key is no longer referenced, so the
         * entry can be expunged from table.  Such entries are referred to
         * as "stale entries" in the code that follows.
         */
      	// ThreadLocalMap底层数组存放的WeakReference类型的entry,使用弱引用类型是为了能够高效GC,避免内存泄露,文章后面给出此设计的讨论
        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;
						/* entry的key就是ThreadLocal对象,例如一个线程内部有10个ThreadLocal变量,那么此线程内部的ThreadLocalMap将存放这10个entry,这里的value就是ThreadLocal变量的“值”。
				    例如demo1中:
            ThreadLocal<String> var1=new ThreadLocal<>();
            var1.set("foo A")
            那么entry的key就是这个名称为var1的ThreadLocal对象,value就是字符串“foo A”
            */  
            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

        /**
         * The initial capacity -- MUST be a power of two.
         */
      	// 数组的初始容量16
        private static final int INITIAL_CAPACITY = 16;

        /**
         * The table, resized as necessary.
         * table.length MUST always be a power of two.
         */
        //ThreadLocalMap的底层数组,这里也采用2的次方,原因在HashMap的源代码讨论已经给出深入的解析,这里不再累赘。
        private Entry[] table;

        /**
         * The number of entries in the table.
         */
      	// 数据含有entry的个数,注意即使entry的key处于stale状态,它也算一个entry
        private int size = 0;

        /**
         * The next size value at which to resize.
         */
        private int threshold; // Default to 0

        /**
         * Set the resize threshold to maintain at worst a 2/3 load factor.
         */
      	// 注意区别于HashMap的扩容阈值是len*3/4
        private void setThreshold(int len) {
            threshold = len * 2 / 3;
        }

        /**
         * Increment i modulo len.
         */
      	// 返回数组当前位置i的下一个位置i+1,如果下一个位置超过数组长度,那么下一个位置又从下标0开始,这种方式实现了所谓的“环形数组”,在后面get、set方法中或者stale entry清空机制的处理中可以看到它的用处
        private static int nextIndex(int i, int len) {
            return ((i + 1 < len) ? i + 1 : 0);
        }

        /**
         * Decrement i modulo len.
         */
         // 逻辑同上,方向相反,返回当前位置i的前一个位置i+1,如果来到数组头部,那么前一个位置即回到数组末尾
        private static int prevIndex(int i, int len) {
            return ((i - 1 >= 0) ? i - 1 : len - 1);
        }

4、set方法完全解析

4.1 set方法本身
        /**
         * Set the value associated with key.
         *
         * @param key the thread local object
         * @param value the value to be set
         */
        private void set(ThreadLocal<?> key, Object value) {

            // We don't use a fast path as with get() because it is at
            // least as common to use set() to create new entries as
            // it is to replace existing ones, in which case, a fast
            // path would fail more often than not.
            
            Entry[] tab = table;
            int len = tab.length;
            // 1.计算给定key对应的桶位,此hash算法能够最大程度将key平均分布到数组对应的桶位上,具体算法参考文后说明
            int i = key.threadLocalHashCode & (len-1);
            /* 2.线性探测发的实现
            从定位到i桶位开始遍历,直到遇到一个entry确实是null的空桶位,如果此遍历过程中遇到stale entry那么将其替换即可完成set操作。
            */
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                ThreadLocal<?> k = e.get();
								// 3.如果此桶位的key恰好是给定给定的key,那么更新此桶位的value后可直接返回。
                if (k == key) {
                    e.value = value;
                    return;
                }
								// 4. 当前桶位的entry的key为null(注意这个key是弱引用类型,说明此entry已经被GC),使用replaceStaleEntry放入新entry
                if (k == null) {
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }
						// 5.在2的线性探测过程中,遇到entry为空的即可来到这流程,直接放入新entry,并且数组的entry数量加1,在这里应该可以猜到,当向数组添加一个新entry后接下来就要判断是否需要扩容
            tab[i] = new Entry(key, value);
            int sz = ++size;
          	// 6.满足扩容的条件:cleanSomeSlots返回False且entry数量达到扩容阈值
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }

关于线性探测法的说明

1.为何会有“环形数组或者环形遍历”的设计:nextIndex

            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) 

在这里插入图片描述

图1所示,假设目前有一个keyX定位到的桶位是i=13,但此桶位已存在entry,只能继续向后探测,来到数组尾部的桶位也不为空,此时经过e = tab[i = nextIndex(i, len)]的计算后,线性探测再次回到数组的头部位置重新遍历,如图2所示,当遍历到i=6时,发现此桶位为空,即可跳出循环接着在此位置放置新的entry,这就是“环形数组或者环形遍历”的底层设计逻辑。

4.2 replaceStaleEntry方法解析(核心内容)

在set方法中的第4点:替换失效的entry

								// 4. 当前桶位的entry的key为null(注意这个key是弱引用类型,说明此entry已经被GC),使用replaceStaleEntry放入新entry
                if (k == null) {
                    replaceStaleEntry(key, value, i);
                    return;

其源码设计包括两个重要的核心功能:替换对应位置失效的entry和具有顺带功能(As a side effect)的清理其他失效entry,其中清理entry的逻辑设计最为复杂。

        private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                       int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;
            Entry e;

            // Back up to check for prior stale entry in current run.
            // We clean out whole runs at a time to avoid continual
            // incremental rehashing due to garbage collector freeing
            // up refs in bunches (i.e., whenever the collector runs).
            int slotToExpunge = staleSlot;
            /*
            1、staleSlot是对于给定key用线性探测法“前向遍历”找到的首次出现的stale entry对应的下标
            为何第1步骤的前向遍历没有安排类似第2步骤的“替换操作等逻辑呢”,因为“ThreadLocal本身用的是开发地址法,冲突的key都被放置在后面空的slot,就算来到table末尾再从头遍历,它也是遵循“向后放置发生冲突的key””
            */
            for (int i = prevIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = prevIndex(i, len))
                if (e.get() == null)
                    slotToExpunge = i;

            // Find either the key or trailing null slot of run, whichever
            // occurs first
           // 2. 从set方法传入的staleSlot下标开始向后遍历
            for (int i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();

                // If we find key, then we need to swap it
                // with the stale entry to maintain hash table order.
                // The newly stale slot, or any other stale slot
                // encountered above it, can then be sent to expungeStaleEntry
                // to remove or rehash all of the other entries in run.
             /*
             以下逻辑非常关键:
             3.1 如果从stale slot开始的“后向遍历”的第i下标又出现了key冲突,说明给定的key“本应放在slate slot 下标位置,但是因为冲突,被迫挪到比slate slot 更靠后的位置i”,既然现在slate slot已失效,那么就可以将给定key放回本应该更靠近hash定位的下标位置slateSlot。这里采用交换两者位置即可实现此逻辑。这就是”to swap it with the stale entry to maintain hash table order”所要表达的逻辑。
             */
                if (k == key) {
                    e.value = value;  
                    tab[i] = tab[staleSlot];
                    tab[staleSlot] = e;

                    // Start expunge at preceding stale entry if it exists
                   // 3.2 如果slotToExpunge还是staleSlot,说明第1步骤的“前向探测”没有stale entry,那么就将清理起始下标改到i,因为i下标位置存放的是在3.1交换过来的stale entry:tab[i] = tab[staleSlot]
                    if (slotToExpunge == staleSlot)
                        slotToExpunge = i;
                /* 3.3 到此,我们知道,截止到i下标的stale entry情况ß:
                 [某个空slot,staleSlot):从staleSlot的前向位置都没有stale entry
                 staleSlot:将i位置的有效entry交换过来tab[staleSlot] = e
                 [staleSlot+1,i-1]:后向遍历没有出现stale entry
                 i:存放的是从staleSlot交换过来的stale entry
                 因此slotToExpunge肯定是从i下标开始做清理工作。
                */
                    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                    return;
                }

                // If we didn't find stale entry on backward scan, the
                // first stale entry seen while scanning for key is the
                // first still present in the run.
              	// 3.4 后向遍历在第i下标发现了一个stale entry且在前向遍历没有出现stale entry,那么清理开始下标当然要重置为i,那么staleSlot位置还存放着stale entry且没有也没有像3.1这样的“swap it”的设计,那么staleSlot自己是如何处理呢。它会在接下里的第4步骤中被处理掉! 那么在这个步骤发现第i号下标新的stale entry又是如何处理呢? 它会在第5个步骤被清理掉
                if (k == null && slotToExpunge == staleSlot)
                    slotToExpunge = i;
            }

            // If key not found, put new entry in stale slot
           /* 4. 在整个run都没有找到对应的key且也没有发现stale entry(除了staleSlot本身整个),那么好办,直接将staleSlot这个在set方法一开始就发现的slate entry的位置替换为新 entry即可,这就是为何方法名字命名为replaceStaleEntry。
           
           */
            tab[staleSlot].value = null;
            tab[staleSlot] = new Entry(key, value);

            // If there are any other stale entries in run, expunge them
          // 第5步骤:接第3.4步骤出现的情况。
            if (slotToExpunge != staleSlot)
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
        }

关于“A run”的理解(理解run及其内部清理标记逻辑才能透彻理解set背后的原理)

As a side effect, this method expunges all stale entries in the “run” containing the stale entry(A run is a sequence of entries between two null slots.)
在这里插入图片描述
如上图所示:

一个"run"就是在两个空slot之间的slots,例如上图,i=0和i=12之间就是一个“run”。

1.为何这“run”是以两个空slot作为边界呢?

这是因为replaceStaleEntry的第1步骤使用prevIndex前向探测,直到遇到null slot则结束循环,而第2步骤使用nextIndex后向探测,直到遇到null slot则结束循环,因此可以得出一前一后都是null slot作为边界。

2.结合replaceStaleEntry的源代码分析的第1点:显然经过prevIndex的“前向探测”探测到了首个i=1的stale entry,因此slotToExpunge指向i=1表示此下标是接下expungeStaleEntry清理的起始下标。

3.根据1可知,如果slotToExpunge下标和staleSlot下标相等,说明“前向探测”根本没发现stale entry,也即slotToExpunge指向没动过。

4.根据第3.4步骤可知,k == null && slotToExpunge == staleSlot,说明除了在set方法第一次发现的staleSlot,还在replaceStaleEntry的后向探测中的第i位置又发现了一个stale entry,因此起始清理下标要重置为slotToExpunge=i

在这里插入图片描述

关于replaceStaleEntry内部最关键的“替换算法”,也即对应第第3.1步骤到第3.3步骤如上图1和图2所示:

不妨假设i=8就是“给定的key”hash定位时发生的冲突下标,假设i=11的key1等于“给定的key”,也即对应第3.1步骤,

此时实施3.1步骤的“swap”逻辑:

将staleSlot=9的stale entry交换到key1的i=11位置,原i=11位置entry交换到staleSlot=9位置并且key不变但value被更新为“给定key对应的value”。

经过这么处理,“给定的key”所在桶位显然更靠近原本属于它的9号桶位,而不是像之前“被迫挪到”11号桶位,这就是源代码注释说提到的“we need to swap it with the stale entry to maintain hash table order”

以上的算法设计可以抽象为以下类比逻辑:

A本应坐在1号位,但发现来晚了,1号位置有人坐了,2、3、4也有人坐了,A被迫坐在5号位,某个时刻“新来的B”发现2号位已经变成staleSlot且1号位还有人在坐,那么此时B可以将A交换到2号位且把2号位的slate entry交换到5号位,那么此刻位于2号位的A显然更靠近“本属于自己的1号座位

4.3 关于staleSlot的清理逻辑设计

在4.2 中replaceStaleEntry为我们展示了精密的如何找出“起始清理下标”的算法设计,从

cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);可知,清理逻辑被设计为两部分:
第一部分:expungeStaleEntry(slotToExpunge),此方法返回一个i下标。第一部分的清理可称为“线性地清理”——“linear expunge”。注意此过程还包括rehash过程!!

第二部分:cleanSomeSlots(i, len),第二部分的清理可以称为“Heuristically expunge”,这里并不打算翻译为“启发性清理”,因为此处不建议使用中文硬翻译。(若要翻译,则可以翻译为“试探性地清理”)

以下是关于“线性地清理-expungeStaleEntry”的解析:

        /**
         * Expunge a stale entry by rehashing any possibly colliding entries
         * lying between staleSlot and the next null slot.  This also expunges
         * any other stale entries encountered before the trailing null.  See
         * Knuth, Section 6.4
         *
         * @param staleSlot index of slot known to have null key
         * @return the index of the next null slot after staleSlot
         * (all between staleSlot and this slot will have been checked
         * for expunging).
         */
        // 1.这里入参的staleSlot,就是replaceStaleEntry探测到的slotToExpunge下标
        private int expungeStaleEntry(int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;

            // expunge entry at staleSlot
            // 2.首先清空当前slotToExpunge下标的stale entry
            tab[staleSlot].value = null;
            tab[staleSlot] = null;
            size--;

            // Rehash until we encounter null
          // 3. 在slotToExpunge+1到恰好遇到null slot之间进行逐个探测清理
            Entry e;
            int i;
            for (i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();
                // 3.1 又出现stale entry可直接清空
                if (k == null) {
                    e.value = null;
                    tab[i] = null;
                    size--;
                } else {
                    // 3.2 再次计算当前位置i放置的entry对应的hash值,如果hash值和当前i桶位一致,说明没有冲突,此entry恰好就是位于“本属于自己的桶位上”,如果hash值和当前i不一致,说明“此entry因为冲突被迫放到了第i位置,而第i位置不是此entry的直接定位”,可以将位于i桶位的“entry”放在“属于自己的h桶位”,这样就保证了entry能最大程度靠近或者就位于“本属于自己的桶位”范围以内,目的是为了提供线性探测查询效率!
                 
                    int h = k.threadLocalHashCode & (len - 1); 
                  	// 从向后探测的开放地址法可知,h值更小,i值更大,正是因为原h位置有冲突,e才被放置到更靠后的第i位置
                    if (h != i) {
                      // 因为发生冲突被迫放置在i位置的entry,后面会被放到它的直接定位h桶位,因此i位置可以置为null
                        tab[i] = null; 

                        // Unlike Knuth 6.4 Algorithm R, we must scan until
                        // null because multiple entries could have been stale.
                       /*
           虽然h桶位就是此entry的直接定位,但是考虑到h桶位可能被放置了其他entry,因此需要加入“向后探测”的逻辑,直到发现下一个位置为null slot。
           tab[h] = e  的写法就实现了“因为发生冲突被迫放置在i位置的entry,现在能够最接近地放到本属于自己直接定位的h桶位*/
                        while (tab[h] != null)
                            h = nextIndex(h, len);
                        tab[h] = e;
                    }
                }
            }

          
            return i; 
         /*
         显然此i就是第3步骤for循环里面从slotToExpunge向后探测到下一个null slot下标,此下标会被cleanSomeSlots方法中利用起来。
         for (i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len))
         */
        }

expungeStaleEntry难点其实在第3.2步骤中遇到的情况,需要做个简单的rehash,保证entry更加靠近“本属于自己的直接定位h桶位”,过程解析参考以下算法流程:
在这里插入图片描述
当然,如果在图2的中i=8不是null slot,那么从h位置开始while (tab[h] != null)探测,也会探测到i=10位置是个null slot,结果就是table[h=i=10]=e,e还是位于第i位置上。

4.3 cleanSomeSlots(i, len)方法

注意,要清楚cleanSomeSlots(i, len) i和len含义,否则无法理解cleanSomeSlots目的,这里的i就是expungeStaleEntry返回的一个空slot,len是table长度。

        private boolean cleanSomeSlots(int i, int n) {
            boolean removed = false;
            Entry[] tab = table;
            int len = tab.length;
            do {
                // 1.
                i = nextIndex(i, len);
                Entry e = tab[i];
                //2.
                if (e != null && e.get() == null) {
                  // 3.
                    n = len;
                    removed = true;
                    //4 .
                    i = expungeStaleEntry(i);
                }
               // 5.控制探测的次数
            } while ( (n >>>= 1) != 0);
            return removed;
        }

1.首先,考虑最简单的情况,如果第i和len之前都不存在stale entry,那么就相当于在1和len范围内折半探测,时间复杂度为log2(n)

2.其次,考虑到探测在i和len过程中,出现了stale entry,此时会将n重置为len长度,继续while,再一轮log2(n)次遍历

这就是所谓的“Heuristically scan”,因为是log2(n),即使出现如2情况,此试探性的探测动作也是可以很快完成。

4.5、set方法内部的rehash
            tab[i] = new Entry(key, value);
            int sz = ++size;
						/*
						1. 显然如果cleanSomeSlots返回true,表明在table中清理了不少于1个的stale entry,恰好可以腾出不少于1个空slot,显然不需要table扩容。
						2. 当!cleanSomeSlots(i, sz) 表示没有遇到stale entry且table的entry数量已经达到了阈值,可以进入扩容逻辑
						*/ 
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }

rehash内部设计:

        /**
         * Re-pack and/or re-size the table. First scan the entire
         * table removing stale entries. If this doesn't sufficiently
         * shrink the size of the table, double the table size.
         */
        private void rehash() {
            //1.扩容前,先从头到尾线性清理一下stale entry
            expungeStaleEntries();

            // Use lower threshold for doubling to avoid hysteresis
            // 2.只有当table的entry个数达到了0.75threshold才会去扩容
            if (size >= threshold - threshold / 4)
                resize();
        }

        /**
         * Double the capacity of the table.
         */
        private void resize() {
            Entry[] oldTab = table;
            int oldLen = oldTab.length;
            // 这里看出是两倍扩容
            int newLen = oldLen * 2;
            Entry[] newTab = new Entry[newLen];
            int count = 0;
						// 1.从旧表开始逐个遍历
            for (int j = 0; j < oldLen; ++j) {
                Entry e = oldTab[j];
                if (e != null) {
                    ThreadLocal<?> k = e.get();
                    // 2.旧表当前j位置出现stale entry,那么直接将entry的value强引用设为null,Help the GC
                    if (k == null) {
                        e.value = null; // Help the GC
                    } else {
                    // 3. 旧表当前遍历位置j是正常的entry,那么用新表newLen计算它在新表的桶位号  
                        int h = k.threadLocalHashCode & (newLen - 1);
                        // 4. 开放地址法在新表中为“当前旧表遍历位置下entry”对应的新表h位置
                        while (newTab[h] != null)
                            h = nextIndex(h, newLen); // 注意这里是在新表计算
                        newTab[h] = e; 
                        count++; 
                    }
                }
            }

            setThreshold(newLen);
            size = count;
            table = newTab; // table 指向新表引用
        }

rehash的逻辑相对简单。

5、get方法解析

有了set方法完全解析流程后,对于get方法则很好理解

    /**
     * Returns the value in the current thread's copy of this
     * thread-local variable.  If the variable has no value for the
     * current thread, it is first initialized to the value returned
     * by an invocation of the {@link #initialValue} method.
     *
     * @return the current thread's value of this thread-local
     */
    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
         // 1、ThreadLocalMap已经存在时
        if (map != null) {
            // getEntry才是正在在底层table去查找给定key对应的entry
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        // 2. 如果线程t的ThreadLocal内部ThreadLocalMap还未初始化,直接返回ThreadLocal初始化时设定的初始值
        return setInitialValue();
    }

getMap的逻辑

    /**
     * Get the map associated with a ThreadLocal. Overridden in
     * InheritableThreadLocal.
     *
     * @param  t the current thread
     * @return the map
     */
    ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }

    /**
     * Create the map associated with a ThreadLocal. Overridden in
     * InheritableThreadLocal.
     *
     * @param t the current thread
     * @param firstValue value for the initial entry of the map
     */
    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }


        /**
         * Construct a new map initially containing (firstKey, firstValue).
         * ThreadLocalMaps are constructed lazily, so we only create
         * one when we have at least one entry to put in it.
         */
        ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            table = new Entry[INITIAL_CAPACITY];
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
            table[i] = new Entry(firstKey, firstValue);
            size = 1;
            setThreshold(INITIAL_CAPACITY);
        }

getEntry整体设计:

        /**
         * Get the entry associated with key.  This method
         * itself handles only the fast path: a direct hit of existing
         * key. It otherwise relays to getEntryAfterMiss.  This is
         * designed to maximize performance for direct hits, in part
         * by making this method readily inlinable.
         *
         * @param  key the thread local object
         * @return the entry associated with key, or null if no such
         */
        private Entry getEntry(ThreadLocal<?> key) {
            int i = key.threadLocalHashCode & (table.length - 1);
            Entry e = table[i];
            // 1. 对应上面源代码注释提到“Only the fast path:a direct hit of existing key” 逻辑,也即给定key对应的i桶位的entry恰好存放的就是key的entry。
            if (e != null && e.get() == key)
                return e;
           // 2. 说明这个key在之前是发生冲突了,放置到比i更靠后的位置,需要采用“后向探测”去检索。
            else
                return getEntryAfterMiss(key, i, e);
        }

        /**
         * Version of getEntry method for use when key is not found in
         * its direct hash slot.
         *
         * @param  key the thread local object
         * @param  i the table index for key's hash code
         * @param  e the entry at table[i]
         * @return the entry associated with key, or null if no such
         */
        private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
            Entry[] tab = table;
            int len = tab.length;
            // 从给定的entry开始后向遍历探测
            while (e != null) {
                ThreadLocal<?> k = e.get();
                // 1.找到,则返回
                if (k == key)
                    return e;
                // 2.遇到stale entry,调用expungeStaleEntry清理它,此时i位置就是slotToExpunge起始清理的下标
                if (k == null)
                    expungeStaleEntry(i);
                else
                // 3.继续向后探测下一个entry  
                    i = nextIndex(i, len);
                e = tab[i];
            }
            return null;
        }

6、remove方法

        /**
         * Remove the entry for key.
         */
        private void remove(ThreadLocal<?> key) {
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                if (e.get() == key) {
        // 目的是主动让entry的父类成员变量置null,那么entry自然就不会存在任何引用了,直接从正常的entry变成stale entry
                    e.clear(); 
        //            
                    expungeStaleEntry(i);
                    return;
                }
            }
        }

e.clear()关键逻辑:此方法来自import java.lang.ref.Reference;可以看到 e.clear()目的是主动让entry的父类成员变量置null,那么entry自然就不会存在任何引用了,直接从正常的entry变成stale entry

    /**
     * Clears this reference object.  Invoking this method will not cause this
     * object to be enqueued.
     *
     * <p> This method is invoked only by Java code; when the garbage collector
     * clears references it does so directly, without invoking this method.
     */
    public void clear() {
        this.referent = null;
    }

7、ThreadLocal弱引用和内存泄露问题

在前面的所有内容中,我们都知道在线性探测中用if (k == null) 去判断当前桶位的entry是否为变为一个stale entry,放入一个正常的entry的为何会在某个时刻变成“失效的entry”?这是因为entry的key被设计为WeakReference ,这是ThreadLocalMap关键设计之一。

    /**
    注意以下源代码的解析
       To help deal with
     * very large and long-lived usages, the hash table entries use
     * WeakReferences for keys.
     */
    static class ThreadLocalMap {

        /**
         * The entries in this hash map extend WeakReference, using
         * its main ref field as the key (which is always a
         * ThreadLocal object).  Note that null keys (i.e. entry.get()
         * == null) mean that the key is no longer referenced, so the
         * entry can be expunged from table.  Such entries are referred to
         * as "stale entries" in the code that follows.
         */
        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k); // key 是弱引用类型
                value = v; // value 是强引用类型  
            }
        }

所谓的java弱引用:一旦有gc,那么WeakReference类型的对象就会被回收,用以下demo说明:

public class WeakReferenceDemo {
    static class Entry{
        private String key;
        private String value;
        Entry(String key,String value) {
            this.key = key;
            this.value=value;
        }

        @Override
        public String toString() {
            return "Entry{" +
                    "key='" + key + '\'' +
                    ", value='" + value + '\'' +
                    '}';
        }
    }

    public static void main(String[] args) {
        Entry entry=new Entry("foo","bar"); // entry显然是一个强引用
        WeakReference<Entry> entryWeakReference=new WeakReference<>(entry); // entryWeakReference是一个弱引用

        System.gc();// gc1
        System.out.println("after gc1,entry:"+entry); // Entry{key='foo', value='bar'}
        System.out.println("after gc1,entryWeakReference:"+entryWeakReference.get()); // Entry{key='foo', value='bar'}


        entry=null; // 此时entry强引用被置为null,那么会被gc回收
        System.gc();// gc2
        System.out.println("after gc2,entryWeakReference:"+entryWeakReference.get()); // null

    }
}

输出:

after gc1,entry:Entry{key='foo', value='bar'}
after gc1,entryWeakReference:Entry{key='foo', value='bar'}
after gc2,entryWeakReference:null

注意到ThreadLocalMap中的Entry,key类型是WeakReference<ThreadLocal<?>> 弱引用的,因此一旦此key没有指向强引用,那么key显然会变为null,那么gc时作为key的ThreadLocal对象在jvm堆中就会被回收,对应的Entry就是一个stale entry,注意,如果Entry的value此时还不是null也即还是处于强引用类型状态,这会引出另外一个问题:ThreadLocal内存泄露问题,或者说:

为何ThreadLocal会有内存泄露问题?

其实比较好理解,首先ThreadLocal内部的Entry对象和当前线程的生命周期一致,只要线程不结束,且Entry的value也即给ThreadLocal对象设置的value没有被删除(强引用还在),那么这个Entry就不会被回收,假设一个线程内部的ThreadLocalMap里面有很多这样的Entry,那么就会面临内存泄露的风险,

考虑线程池的情况,例如有线程使用ThreadlLocal对象,此线程位于线程池中会一直保持运行,对于它的ThreadlLocal对象内部的ThreadLocalMap来说,如果map中Entry的value没有被外界使用完后及时删除,就导致此Entry一直得不到回收,容易发生内存泄露。

7.1 如何避免内存泄露

代码中满足一定ThreadLocal.get()、ThreadLocal.set()逻辑设计的情况下,主动调用ThreadLocalMap.remove 来移除Entry对象的引用关系,这种高级且科学用法,其实在ReentrantReadWriteLock的源代码设计有所体现:

        /**
         定义一个给每个线程自己的内部读锁计数器
         * A counter for per-thread read hold counts.
         * Maintained as a ThreadLocal; cached in cachedHoldCounter
         */
        static final class HoldCounter {
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }

        /**
         * ThreadLocal subclass. Easiest to explicitly define for sake
         * of deserialization mechanics.
         */
				// 如何实现每个线程独立记录的读锁计数器? 使用ThreadLocal即可保证线程隔离的计数,互不影响。
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {  // 线程自己持有的读锁计数器初始值0
                return new HoldCounter();
            }
        }  
			
        /**
         * The number of reentrant read locks held by current thread.
         * Initialized only in constructor and readObject.
         * Removed whenever a thread's read hold count drops to 0.
         */
        private transient ThreadLocalHoldCounter readHolds;

        Sync() {
            readHolds = new ThreadLocalHoldCounter();
            setState(getState()); // ensures visibility of readHolds
        }


      // 线程释放自己持有的读锁
			protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            // 第一个持有读锁的线程恰好是当前线程,
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                // “第一个持有读锁的线程” 也准备释放读锁,firstReader不再指向任何读线程
                if (firstReaderHoldCount == 1)
                    firstReader = null;
              	// 否则“第一个持有读锁的线程”重入锁次数减1
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                // 如果线程自己缓存的读锁计数器对象为空,或者线程自己的读锁计数器缓存的线程不是当前线程
                if (rh == null || rh.tid != getThreadId(current))
                   // 当前线程持有读锁的计数器readHolds
                    rh = readHolds.get();
                int count = rh.count;
                // 当前线程持有读锁的计小于等于1,说明在本次读锁退出后,当当前线程不再持有任何读锁,因此用在它身上的ThreadLocal<HoldCounter>对象需要马上移除,避免ThreadLocal发生内存泄露
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
               // 线程自己持有读锁自减1
                --rh.count;
            }
            for (;;) {
              	// 总的读锁锁-1
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

8、为何ThreadLocal没有直接采用ConcurrentHashMap这样的Map数据结构?

  1. 首先,如果ThreadLocal使用ConcurrentHashMap来达到key-value管理目的,那么是无法实现“每个线程持有自己的本地实例”这样的需求,因此对于Josh Bloch and Doug Lea来说,需要设计ThreadLocal的数据结构及其一些算法细节,真正打造出可以支持和实现“每个线程可以持有自己本地实例”功能的数据结构,这显然是非常创新的工作,虽然ConcurrentHashMap的源代码设计已经堪称十分优秀。

  2. 其次,既然不采用ConcurrentHashMap这样内部复杂设计的Map结构,那么就要设计出非常高效、简约的数据结构,因此设计了ThreadLocalMap,底层只有一个数组table,不再有什么单链表、红黑树等结构,采用开放寻址法解决hash冲突,同时,只基于数组实现相关逻辑的代码会变得更加直观、简单,例如在扩容、清理stale entry方面,仅需基于数组的前后线性遍历即可。

  3. ThreadLocal底层只基于一个数组table,结合设计特定的hash魔数,可以使得Entry的hash在数组中分散很均匀,大大降低了冲突概率,提高查询效率。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值