ThreadLocal可以实现完全基于无锁且也不是基于CAS的线程隔离需求,让每个线程可以有自己的本地实例,但如果对ThreadLocal底层设计不了解,那么对甚至无法正确ThreadLocal及其可能出现的内存泄露问题。可以说ThreadLocal的源代码设计也一种非常优秀的可支持“高并发”的实现。
1、基本用法:
1.1 demo1
public class ThreadLocalDemo {
static class ProducerA extends Thread{
@Override
public void run(){
ThreadLocal<String> var1=new ThreadLocal<>();
ThreadLocal<String> var2=new ThreadLocal<>();
// 普通用法
var1.set("foo A");
var2.set("bar A");
System.out.println(var1.get()); // 输出foo A
System.out.println(var2.get()); // 输出bar A
}
}
static class ProducerB extends Thread{
@Override
public void run(){
ThreadLocal<String> var1=new ThreadLocal<>();
ThreadLocal<String> var2=new ThreadLocal<>();
// 普通用法
var1.set("foo B");
var2.set("bar B");
System.out.println(var1.get()); // 输出foo B
System.out.println(var2.get()); // 输出bar B
}
}
public static void main(String[] args) {
new ProducerA().start();
new ProducerB().start();
}
}
这里创建了两个线程,每个线程内部有自己的ThreadLocal变量,线程之间ThreadLocal变量内部的set和get互相独立,互不影响,无需使用锁即可实现了线程安全操作。线程内部的变量使用set方法给定初始值、get方法取值,可以猜测其内部有类似HashMap这样的设计,但是否照搬HashMap数据结构设计呢? 其实不然,
在这里,ProducerA内部创建了一个ThreadLocalMap,ProducerB内部也创建了一个ThreadLocalMap,也即每个线程绑定一个自己内部ThreadLocalMap,这里提到的ThreadLocalMap就是提供了set、get方法的底层Map数据结构,所谓的ThreadLocal数据结构分析其实就是特指其内部的ThreadLocalMap的数据结构分析。
1.2 demo2
public class ThreadLocalDemo {
static class HoldCount{
int count;
final long tid=0;
}
public static void main(String[] args) {
ThreadLocal<HoldCount> rh=ThreadLocal.withInitial(HoldCount::new); // 设定rh这个ThreadLocal变量的初始值
rh.set(new HoldCount()); // 将计数器放在rh中缓存
HoldCount h= rh.get();
System.out.println(h.count); // 这里输出的rh初始值,也即HoldCount的count属性初始值:0。
for (int i = 0; i < 10; i++) {
h.count++;
}
System.out.println(h.count);
HoldCount newh=rh.get();// 更新缓存计数器后,再从ThreadLocal重新读取
System.out.println(newh.count); // 可以读取新的计数值
// 在rh这个ThreadLocal里面的Map结构中移除HoldCount实例对象
rh.remove();
System.out.println(rh.get()); // 此时rh里面Map已经不存在HoldCount对象,因此这里返回NUll
}
}
在demo2中,给出了使用ThreadLocal后需要及时删除其实例对象的情况,这部分原因将在文章后面给出深入分析。
2、ThreadLocal内部数据结构简析
可以看到set方法是由内部ThreadLocalMap实现的set方法,既然是个“Map”,那么当然可以猜测是否跟HashMap的数据结构:数据+链表+红黑树类似呢?
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
其实ThreadLocalMap的数据结构没有HashMap数据结构复杂,ThreadLocalMap底层仅有一个table数组,这里,也许你会好奇:HashMap为了解决hash冲突,在数组的桶位上加入一条单向链表,冲突的entry自然会放入到此链表中(或者红黑树),那么问题来了,ThreadLocalMap底层仅有一个table数组,它是如何解决hash冲突?以下正式其设计原理之一,这里的数组给出最简单的情况,不包括“stale entry(无效entry)”的情况,以便让读者快速理解ThreadLocalMap设计原理:
可以看到图中所说的“解决冲突的方式:从i=3开始向后遍历出首个空slot,也即i=5,将keyC放入此空slot即可”的逻辑被称为“线性探测法”,所谓的“线性”就是o(n)复杂度的遍历操作,所谓的“探测”就是不断向后“探测、寻找”,直到找到首个空slot位置。
以上内容为ThreadLocalMap的放入new Entry的简单情况,如果有理解HashMap源代码设计的读者应该可以猜到其他重要设计:例如,当数组容量不够时,如何扩容,也即rehash(注意ThreadLocalMap里面的resize和rehash不是同一个逻辑),再例如ThreadLocalMap里面已经存在的entry,如果它的key已经变成无效(stale),那么如何该清理,或者说在set和get的线性探测过程中遇到有stale entry时,该如何清理?这些问题将在后面逐个深入探讨。
3、ThreadLocalMap基本成员变量的说明
static class ThreadLocalMap {
/**
* The entries in this hash map extend WeakReference, using
* its main ref field as the key (which is always a
* ThreadLocal object). Note that null keys (i.e. entry.get()
* == null) mean that the key is no longer referenced, so the
* entry can be expunged from table. Such entries are referred to
* as "stale entries" in the code that follows.
*/
// ThreadLocalMap底层数组存放的WeakReference类型的entry,使用弱引用类型是为了能够高效GC,避免内存泄露,文章后面给出此设计的讨论
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
/* entry的key就是ThreadLocal对象,例如一个线程内部有10个ThreadLocal变量,那么此线程内部的ThreadLocalMap将存放这10个entry,这里的value就是ThreadLocal变量的“值”。
例如demo1中:
ThreadLocal<String> var1=new ThreadLocal<>();
var1.set("foo A")
那么entry的key就是这个名称为var1的ThreadLocal对象,value就是字符串“foo A”
*/
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
/**
* The initial capacity -- MUST be a power of two.
*/
// 数组的初始容量16
private static final int INITIAL_CAPACITY = 16;
/**
* The table, resized as necessary.
* table.length MUST always be a power of two.
*/
//ThreadLocalMap的底层数组,这里也采用2的次方,原因在HashMap的源代码讨论已经给出深入的解析,这里不再累赘。
private Entry[] table;
/**
* The number of entries in the table.
*/
// 数据含有entry的个数,注意即使entry的key处于stale状态,它也算一个entry
private int size = 0;
/**
* The next size value at which to resize.
*/
private int threshold; // Default to 0
/**
* Set the resize threshold to maintain at worst a 2/3 load factor.
*/
// 注意区别于HashMap的扩容阈值是len*3/4
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
/**
* Increment i modulo len.
*/
// 返回数组当前位置i的下一个位置i+1,如果下一个位置超过数组长度,那么下一个位置又从下标0开始,这种方式实现了所谓的“环形数组”,在后面get、set方法中或者stale entry清空机制的处理中可以看到它的用处
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
/**
* Decrement i modulo len.
*/
// 逻辑同上,方向相反,返回当前位置i的前一个位置i+1,如果来到数组头部,那么前一个位置即回到数组末尾
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}
4、set方法完全解析
4.1 set方法本身
/**
* Set the value associated with key.
*
* @param key the thread local object
* @param value the value to be set
*/
private void set(ThreadLocal<?> key, Object value) {
// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.
Entry[] tab = table;
int len = tab.length;
// 1.计算给定key对应的桶位,此hash算法能够最大程度将key平均分布到数组对应的桶位上,具体算法参考文后说明
int i = key.threadLocalHashCode & (len-1);
/* 2.线性探测发的实现
从定位到i桶位开始遍历,直到遇到一个entry确实是null的空桶位,如果此遍历过程中遇到stale entry那么将其替换即可完成set操作。
*/
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
// 3.如果此桶位的key恰好是给定给定的key,那么更新此桶位的value后可直接返回。
if (k == key) {
e.value = value;
return;
}
// 4. 当前桶位的entry的key为null(注意这个key是弱引用类型,说明此entry已经被GC),使用replaceStaleEntry放入新entry
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
// 5.在2的线性探测过程中,遇到entry为空的即可来到这流程,直接放入新entry,并且数组的entry数量加1,在这里应该可以猜到,当向数组添加一个新entry后接下来就要判断是否需要扩容
tab[i] = new Entry(key, value);
int sz = ++size;
// 6.满足扩容的条件:cleanSomeSlots返回False且entry数量达到扩容阈值
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
关于线性探测法的说明
1.为何会有“环形数组或者环形遍历”的设计:nextIndex
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)])
图1所示,假设目前有一个keyX定位到的桶位是i=13,但此桶位已存在entry,只能继续向后探测,来到数组尾部的桶位也不为空,此时经过e = tab[i = nextIndex(i, len)]
的计算后,线性探测再次回到数组的头部位置重新遍历,如图2所示,当遍历到i=6时,发现此桶位为空,即可跳出循环接着在此位置放置新的entry,这就是“环形数组或者环形遍历”的底层设计逻辑。
4.2 replaceStaleEntry方法解析(核心内容)
在set方法中的第4点:替换失效的entry
// 4. 当前桶位的entry的key为null(注意这个key是弱引用类型,说明此entry已经被GC),使用replaceStaleEntry放入新entry
if (k == null) {
replaceStaleEntry(key, value, i);
return;
其源码设计包括两个重要的核心功能:替换对应位置失效的entry和具有顺带功能(As a side effect)的清理其他失效entry,其中清理entry的逻辑设计最为复杂。
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
// Back up to check for prior stale entry in current run.
// We clean out whole runs at a time to avoid continual
// incremental rehashing due to garbage collector freeing
// up refs in bunches (i.e., whenever the collector runs).
int slotToExpunge = staleSlot;
/*
1、staleSlot是对于给定key用线性探测法“前向遍历”找到的首次出现的stale entry对应的下标
为何第1步骤的前向遍历没有安排类似第2步骤的“替换操作等逻辑呢”,因为“ThreadLocal本身用的是开发地址法,冲突的key都被放置在后面空的slot,就算来到table末尾再从头遍历,它也是遵循“向后放置发生冲突的key””
*/
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
// Find either the key or trailing null slot of run, whichever
// occurs first
// 2. 从set方法传入的staleSlot下标开始向后遍历
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// If we find key, then we need to swap it
// with the stale entry to maintain hash table order.
// The newly stale slot, or any other stale slot
// encountered above it, can then be sent to expungeStaleEntry
// to remove or rehash all of the other entries in run.
/*
以下逻辑非常关键:
3.1 如果从stale slot开始的“后向遍历”的第i下标又出现了key冲突,说明给定的key“本应放在slate slot 下标位置,但是因为冲突,被迫挪到比slate slot 更靠后的位置i”,既然现在slate slot已失效,那么就可以将给定key放回本应该更靠近hash定位的下标位置slateSlot。这里采用交换两者位置即可实现此逻辑。这就是”to swap it with the stale entry to maintain hash table order”所要表达的逻辑。
*/
if (k == key) {
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// Start expunge at preceding stale entry if it exists
// 3.2 如果slotToExpunge还是staleSlot,说明第1步骤的“前向探测”没有stale entry,那么就将清理起始下标改到i,因为i下标位置存放的是在3.1交换过来的stale entry:tab[i] = tab[staleSlot]
if (slotToExpunge == staleSlot)
slotToExpunge = i;
/* 3.3 到此,我们知道,截止到i下标的stale entry情况ß:
[某个空slot,staleSlot):从staleSlot的前向位置都没有stale entry
staleSlot:将i位置的有效entry交换过来tab[staleSlot] = e
[staleSlot+1,i-1]:后向遍历没有出现stale entry
i:存放的是从staleSlot交换过来的stale entry
因此slotToExpunge肯定是从i下标开始做清理工作。
*/
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// If we didn't find stale entry on backward scan, the
// first stale entry seen while scanning for key is the
// first still present in the run.
// 3.4 后向遍历在第i下标发现了一个stale entry且在前向遍历没有出现stale entry,那么清理开始下标当然要重置为i,那么staleSlot位置还存放着stale entry且没有也没有像3.1这样的“swap it”的设计,那么staleSlot自己是如何处理呢。它会在接下里的第4步骤中被处理掉! 那么在这个步骤发现第i号下标新的stale entry又是如何处理呢? 它会在第5个步骤被清理掉
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// If key not found, put new entry in stale slot
/* 4. 在整个run都没有找到对应的key且也没有发现stale entry(除了staleSlot本身整个),那么好办,直接将staleSlot这个在set方法一开始就发现的slate entry的位置替换为新 entry即可,这就是为何方法名字命名为replaceStaleEntry。
*/
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// If there are any other stale entries in run, expunge them
// 第5步骤:接第3.4步骤出现的情况。
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
关于“A run”的理解(理解run及其内部清理标记逻辑才能透彻理解set背后的原理)
As a side effect, this method expunges all stale entries in the “run” containing the stale entry(A run is a sequence of entries between two null slots.)
如上图所示:
一个"run"就是在两个空slot之间的slots,例如上图,i=0和i=12之间就是一个“run”。
1.为何这“run”是以两个空slot作为边界呢?
这是因为replaceStaleEntry的第1步骤使用prevIndex前向探测,直到遇到null slot则结束循环,而第2步骤使用nextIndex后向探测,直到遇到null slot则结束循环,因此可以得出一前一后都是null slot作为边界。
2.结合replaceStaleEntry的源代码分析的第1点:显然经过prevIndex的“前向探测”探测到了首个i=1的stale entry,因此slotToExpunge指向i=1表示此下标是接下expungeStaleEntry清理的起始下标。
3.根据1可知,如果slotToExpunge下标和staleSlot下标相等,说明“前向探测”根本没发现stale entry,也即slotToExpunge指向没动过。
4.根据第3.4步骤可知,k == null && slotToExpunge == staleSlot
,说明除了在set方法第一次发现的staleSlot,还在replaceStaleEntry的后向探测中的第i位置又发现了一个stale entry,因此起始清理下标要重置为slotToExpunge=i
关于replaceStaleEntry内部最关键的“替换算法”,也即对应第第3.1步骤到第3.3步骤如上图1和图2所示:
不妨假设i=8就是“给定的key”hash定位时发生的冲突下标,假设i=11的key1等于“给定的key”,也即对应第3.1步骤,
此时实施3.1步骤的“swap”逻辑:
将staleSlot=9的stale entry交换到key1的i=11位置,原i=11位置entry交换到staleSlot=9位置并且key不变但value被更新为“给定key对应的value”。
经过这么处理,“给定的key”所在桶位显然更靠近原本属于它的9号桶位,而不是像之前“被迫挪到”11号桶位,这就是源代码注释说提到的“we need to swap it with the stale entry to maintain hash table order”
以上的算法设计可以抽象为以下类比逻辑:
A本应坐在1号位,但发现来晚了,1号位置有人坐了,2、3、4也有人坐了,A被迫坐在5号位,某个时刻“新来的B”发现2号位已经变成staleSlot且1号位还有人在坐,那么此时B可以将A交换到2号位且把2号位的slate entry交换到5号位,那么此刻位于2号位的A显然更靠近“本属于自己的1号座位
4.3 关于staleSlot的清理逻辑设计
在4.2 中replaceStaleEntry为我们展示了精密的如何找出“起始清理下标”的算法设计,从
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
可知,清理逻辑被设计为两部分:
第一部分:expungeStaleEntry(slotToExpunge)
,此方法返回一个i下标。第一部分的清理可称为“线性地清理”——“linear expunge”。注意此过程还包括rehash过程!!
第二部分:cleanSomeSlots(i, len)
,第二部分的清理可以称为“Heuristically expunge”,这里并不打算翻译为“启发性清理”,因为此处不建议使用中文硬翻译。(若要翻译,则可以翻译为“试探性地清理”)
以下是关于“线性地清理-expungeStaleEntry”的解析:
/**
* Expunge a stale entry by rehashing any possibly colliding entries
* lying between staleSlot and the next null slot. This also expunges
* any other stale entries encountered before the trailing null. See
* Knuth, Section 6.4
*
* @param staleSlot index of slot known to have null key
* @return the index of the next null slot after staleSlot
* (all between staleSlot and this slot will have been checked
* for expunging).
*/
// 1.这里入参的staleSlot,就是replaceStaleEntry探测到的slotToExpunge下标
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// expunge entry at staleSlot
// 2.首先清空当前slotToExpunge下标的stale entry
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// Rehash until we encounter null
// 3. 在slotToExpunge+1到恰好遇到null slot之间进行逐个探测清理
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 3.1 又出现stale entry可直接清空
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
// 3.2 再次计算当前位置i放置的entry对应的hash值,如果hash值和当前i桶位一致,说明没有冲突,此entry恰好就是位于“本属于自己的桶位上”,如果hash值和当前i不一致,说明“此entry因为冲突被迫放到了第i位置,而第i位置不是此entry的直接定位”,可以将位于i桶位的“entry”放在“属于自己的h桶位”,这样就保证了entry能最大程度靠近或者就位于“本属于自己的桶位”范围以内,目的是为了提供线性探测查询效率!
int h = k.threadLocalHashCode & (len - 1);
// 从向后探测的开放地址法可知,h值更小,i值更大,正是因为原h位置有冲突,e才被放置到更靠后的第i位置
if (h != i) {
// 因为发生冲突被迫放置在i位置的entry,后面会被放到它的直接定位h桶位,因此i位置可以置为null
tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
/*
虽然h桶位就是此entry的直接定位,但是考虑到h桶位可能被放置了其他entry,因此需要加入“向后探测”的逻辑,直到发现下一个位置为null slot。
tab[h] = e 的写法就实现了“因为发生冲突被迫放置在i位置的entry,现在能够最接近地放到本属于自己直接定位的h桶位*/
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
/*
显然此i就是第3步骤for循环里面从slotToExpunge向后探测到下一个null slot下标,此下标会被cleanSomeSlots方法中利用起来。
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len))
*/
}
expungeStaleEntry难点其实在第3.2步骤中遇到的情况,需要做个简单的rehash,保证entry更加靠近“本属于自己的直接定位h桶位”,过程解析参考以下算法流程:
当然,如果在图2的中i=8不是null slot,那么从h位置开始while (tab[h] != null)
探测,也会探测到i=10位置是个null slot,结果就是table[h=i=10]=e
,e还是位于第i位置上。
4.3 cleanSomeSlots(i, len)方法
注意,要清楚cleanSomeSlots(i, len)
i和len含义,否则无法理解cleanSomeSlots目的,这里的i
就是expungeStaleEntry
返回的一个空slot,len是table长度。
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
// 1.
i = nextIndex(i, len);
Entry e = tab[i];
//2.
if (e != null && e.get() == null) {
// 3.
n = len;
removed = true;
//4 .
i = expungeStaleEntry(i);
}
// 5.控制探测的次数
} while ( (n >>>= 1) != 0);
return removed;
}
1.首先,考虑最简单的情况,如果第i和len之前都不存在stale entry,那么就相当于在1和len范围内折半探测,时间复杂度为log2(n)
2.其次,考虑到探测在i和len过程中,出现了stale entry,此时会将n重置为len长度,继续while,再一轮log2(n)次遍历
这就是所谓的“Heuristically scan”,因为是log2(n),即使出现如2情况,此试探性的探测动作也是可以很快完成。
4.5、set方法内部的rehash
tab[i] = new Entry(key, value);
int sz = ++size;
/*
1. 显然如果cleanSomeSlots返回true,表明在table中清理了不少于1个的stale entry,恰好可以腾出不少于1个空slot,显然不需要table扩容。
2. 当!cleanSomeSlots(i, sz) 表示没有遇到stale entry且table的entry数量已经达到了阈值,可以进入扩容逻辑
*/
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
rehash内部设计:
/**
* Re-pack and/or re-size the table. First scan the entire
* table removing stale entries. If this doesn't sufficiently
* shrink the size of the table, double the table size.
*/
private void rehash() {
//1.扩容前,先从头到尾线性清理一下stale entry
expungeStaleEntries();
// Use lower threshold for doubling to avoid hysteresis
// 2.只有当table的entry个数达到了0.75threshold才会去扩容
if (size >= threshold - threshold / 4)
resize();
}
/**
* Double the capacity of the table.
*/
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
// 这里看出是两倍扩容
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
// 1.从旧表开始逐个遍历
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
// 2.旧表当前j位置出现stale entry,那么直接将entry的value强引用设为null,Help the GC
if (k == null) {
e.value = null; // Help the GC
} else {
// 3. 旧表当前遍历位置j是正常的entry,那么用新表newLen计算它在新表的桶位号
int h = k.threadLocalHashCode & (newLen - 1);
// 4. 开放地址法在新表中为“当前旧表遍历位置下entry”对应的新表h位置
while (newTab[h] != null)
h = nextIndex(h, newLen); // 注意这里是在新表计算
newTab[h] = e;
count++;
}
}
}
setThreshold(newLen);
size = count;
table = newTab; // table 指向新表引用
}
rehash的逻辑相对简单。
5、get方法解析
有了set方法完全解析流程后,对于get方法则很好理解
/**
* Returns the value in the current thread's copy of this
* thread-local variable. If the variable has no value for the
* current thread, it is first initialized to the value returned
* by an invocation of the {@link #initialValue} method.
*
* @return the current thread's value of this thread-local
*/
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
// 1、ThreadLocalMap已经存在时
if (map != null) {
// getEntry才是正在在底层table去查找给定key对应的entry
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 2. 如果线程t的ThreadLocal内部ThreadLocalMap还未初始化,直接返回ThreadLocal初始化时设定的初始值
return setInitialValue();
}
getMap的逻辑
/**
* Get the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @return the map
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
/**
* Create the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @param firstValue value for the initial entry of the map
*/
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
/**
* Construct a new map initially containing (firstKey, firstValue).
* ThreadLocalMaps are constructed lazily, so we only create
* one when we have at least one entry to put in it.
*/
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
getEntry整体设计:
/**
* Get the entry associated with key. This method
* itself handles only the fast path: a direct hit of existing
* key. It otherwise relays to getEntryAfterMiss. This is
* designed to maximize performance for direct hits, in part
* by making this method readily inlinable.
*
* @param key the thread local object
* @return the entry associated with key, or null if no such
*/
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
// 1. 对应上面源代码注释提到“Only the fast path:a direct hit of existing key” 逻辑,也即给定key对应的i桶位的entry恰好存放的就是key的entry。
if (e != null && e.get() == key)
return e;
// 2. 说明这个key在之前是发生冲突了,放置到比i更靠后的位置,需要采用“后向探测”去检索。
else
return getEntryAfterMiss(key, i, e);
}
/**
* Version of getEntry method for use when key is not found in
* its direct hash slot.
*
* @param key the thread local object
* @param i the table index for key's hash code
* @param e the entry at table[i]
* @return the entry associated with key, or null if no such
*/
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
// 从给定的entry开始后向遍历探测
while (e != null) {
ThreadLocal<?> k = e.get();
// 1.找到,则返回
if (k == key)
return e;
// 2.遇到stale entry,调用expungeStaleEntry清理它,此时i位置就是slotToExpunge起始清理的下标
if (k == null)
expungeStaleEntry(i);
else
// 3.继续向后探测下一个entry
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
6、remove方法
/**
* Remove the entry for key.
*/
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
// 目的是主动让entry的父类成员变量置null,那么entry自然就不会存在任何引用了,直接从正常的entry变成stale entry
e.clear();
//
expungeStaleEntry(i);
return;
}
}
}
e.clear()关键逻辑:此方法来自import java.lang.ref.Reference;
可以看到 e.clear()
目的是主动让entry的父类成员变量置null,那么entry自然就不会存在任何引用了,直接从正常的entry变成stale entry
/**
* Clears this reference object. Invoking this method will not cause this
* object to be enqueued.
*
* <p> This method is invoked only by Java code; when the garbage collector
* clears references it does so directly, without invoking this method.
*/
public void clear() {
this.referent = null;
}
7、ThreadLocal弱引用和内存泄露问题
在前面的所有内容中,我们都知道在线性探测中用if (k == null)
去判断当前桶位的entry是否为变为一个stale entry,放入一个正常的entry的为何会在某个时刻变成“失效的entry”?这是因为entry的key被设计为WeakReference ,
这是ThreadLocalMap关键设计之一。
/**
注意以下源代码的解析
To help deal with
* very large and long-lived usages, the hash table entries use
* WeakReferences for keys.
*/
static class ThreadLocalMap {
/**
* The entries in this hash map extend WeakReference, using
* its main ref field as the key (which is always a
* ThreadLocal object). Note that null keys (i.e. entry.get()
* == null) mean that the key is no longer referenced, so the
* entry can be expunged from table. Such entries are referred to
* as "stale entries" in the code that follows.
*/
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k); // key 是弱引用类型
value = v; // value 是强引用类型
}
}
所谓的java弱引用:一旦有gc,那么WeakReference类型的对象就会被回收,用以下demo说明:
public class WeakReferenceDemo {
static class Entry{
private String key;
private String value;
Entry(String key,String value) {
this.key = key;
this.value=value;
}
@Override
public String toString() {
return "Entry{" +
"key='" + key + '\'' +
", value='" + value + '\'' +
'}';
}
}
public static void main(String[] args) {
Entry entry=new Entry("foo","bar"); // entry显然是一个强引用
WeakReference<Entry> entryWeakReference=new WeakReference<>(entry); // entryWeakReference是一个弱引用
System.gc();// gc1
System.out.println("after gc1,entry:"+entry); // Entry{key='foo', value='bar'}
System.out.println("after gc1,entryWeakReference:"+entryWeakReference.get()); // Entry{key='foo', value='bar'}
entry=null; // 此时entry强引用被置为null,那么会被gc回收
System.gc();// gc2
System.out.println("after gc2,entryWeakReference:"+entryWeakReference.get()); // null
}
}
输出:
after gc1,entry:Entry{key='foo', value='bar'}
after gc1,entryWeakReference:Entry{key='foo', value='bar'}
after gc2,entryWeakReference:null
注意到ThreadLocalMap中的Entry,key类型是WeakReference<ThreadLocal<?>>
弱引用的,因此一旦此key没有指向强引用,那么key显然会变为null,那么gc时作为key的ThreadLocal对象在jvm堆中就会被回收,对应的Entry就是一个stale entry
,注意,如果Entry的value此时还不是null也即还是处于强引用类型状态,这会引出另外一个问题:ThreadLocal内存泄露问题,或者说:
为何ThreadLocal会有内存泄露问题?
其实比较好理解,首先ThreadLocal内部的Entry对象和当前线程的生命周期一致,只要线程不结束,且Entry的value也即给ThreadLocal对象设置的value没有被删除(强引用还在),那么这个Entry就不会被回收,假设一个线程内部的ThreadLocalMap里面有很多这样的Entry,那么就会面临内存泄露的风险,
考虑线程池的情况,例如有线程使用ThreadlLocal对象,此线程位于线程池中会一直保持运行,对于它的ThreadlLocal对象内部的ThreadLocalMap来说,如果map中Entry的value没有被外界使用完后及时删除,就导致此Entry一直得不到回收,容易发生内存泄露。
7.1 如何避免内存泄露
代码中满足一定ThreadLocal.get()、ThreadLocal.set()
逻辑设计的情况下,主动调用ThreadLocalMap.remove
来移除Entry对象的引用关系,这种高级且科学用法,其实在ReentrantReadWriteLock
的源代码设计有所体现:
/**
定义一个给每个线程自己的内部读锁计数器
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
/**
* ThreadLocal subclass. Easiest to explicitly define for sake
* of deserialization mechanics.
*/
// 如何实现每个线程独立记录的读锁计数器? 使用ThreadLocal即可保证线程隔离的计数,互不影响。
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() { // 线程自己持有的读锁计数器初始值0
return new HoldCounter();
}
}
/**
* The number of reentrant read locks held by current thread.
* Initialized only in constructor and readObject.
* Removed whenever a thread's read hold count drops to 0.
*/
private transient ThreadLocalHoldCounter readHolds;
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
// 线程释放自己持有的读锁
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 第一个持有读锁的线程恰好是当前线程,
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// “第一个持有读锁的线程” 也准备释放读锁,firstReader不再指向任何读线程
if (firstReaderHoldCount == 1)
firstReader = null;
// 否则“第一个持有读锁的线程”重入锁次数减1
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
// 如果线程自己缓存的读锁计数器对象为空,或者线程自己的读锁计数器缓存的线程不是当前线程
if (rh == null || rh.tid != getThreadId(current))
// 当前线程持有读锁的计数器readHolds
rh = readHolds.get();
int count = rh.count;
// 当前线程持有读锁的计小于等于1,说明在本次读锁退出后,当当前线程不再持有任何读锁,因此用在它身上的ThreadLocal<HoldCounter>对象需要马上移除,避免ThreadLocal发生内存泄露
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
// 线程自己持有读锁自减1
--rh.count;
}
for (;;) {
// 总的读锁锁-1
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
8、为何ThreadLocal没有直接采用ConcurrentHashMap这样的Map数据结构?
-
首先,如果ThreadLocal使用ConcurrentHashMap来达到key-value管理目的,那么是无法实现“每个线程持有自己的本地实例”这样的需求,因此对于
Josh Bloch and Doug Lea
来说,需要设计ThreadLocal的数据结构及其一些算法细节,真正打造出可以支持和实现“每个线程可以持有自己本地实例”功能的数据结构,这显然是非常创新的工作,虽然ConcurrentHashMap的源代码设计已经堪称十分优秀。 -
其次,既然不采用ConcurrentHashMap这样内部复杂设计的Map结构,那么就要设计出非常高效、简约的数据结构,因此设计了ThreadLocalMap,底层只有一个数组table,不再有什么单链表、红黑树等结构,采用开放寻址法解决hash冲突,同时,只基于数组实现相关逻辑的代码会变得更加直观、简单,例如在扩容、清理stale entry方面,仅需基于数组的前后线性遍历即可。
-
ThreadLocal底层只基于一个数组table,结合设计特定的hash魔数,可以使得Entry的hash在数组中分散很均匀,大大降低了冲突概率,提高查询效率。