深度解析ConcurrentHashMap工作原理

一、引言

在 Java 并发编程领域,数据的线程安全访问是一个至关重要的问题。ConcurrentHashMap 作为 Java 集合框架中的重要成员,为多线程环境下的高效并发操作提供了支持。它允许在多线程环境中安全地进行读写操作,同时避免了传统 Hashtable 因使用全量同步锁而导致的性能瓶颈。本文将深入探讨 ConcurrentHashMap 的原理,涵盖其底层数据结构、核心属性、构造方法、常用操作的实现细节,并结合代码示例进行说明。

二、ConcurrentHashMap 概述

2.1 定义与用途

ConcurrentHashMapjava.util.concurrent 包下的一个类,实现了 ConcurrentMap 接口。它主要用于在多线程环境下存储和操作键值对,能够保证线程安全,并且在并发场景下具有较高的性能。常见的应用场景包括缓存系统、多线程数据统计等。

2.2 继承关系与实现接口

ConcurrentHashMap 的继承关系如下:

java.lang.Object
    └─ java.util.AbstractMap<K,V>
        └─ java.util.concurrent.ConcurrentHashMap<K,V>

它实现了 ConcurrentMap<K,V>Serializable 接口,支持并发操作和序列化。

import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;

public class ConcurrentHashMapOverview {
    public static void main(String[] args) {
        // 创建一个 ConcurrentHashMap 对象
        ConcurrentHashMap<String, Integer> concurrentHashMap = new ConcurrentHashMap<>();
        // 可以将其赋值给 Map 接口类型的变量
        Map<String, Integer> map = concurrentHashMap;
    }
}

三、底层数据结构

3.1 JDK 7 中的实现:分段锁(Segment)

在 JDK 7 中,ConcurrentHashMap 采用分段锁(Segment)机制。它将整个哈希表分成多个段(Segment),每个段相当于一个小的 Hashtable,并且每个段都有自己的锁。不同的线程可以同时访问不同的段,从而提高了并发性能。每个段内部的数据结构是数组 + 链表。

3.2 JDK 8 及以后的实现:CAS + synchronized

从 JDK 8 开始,ConcurrentHashMap 摒弃了分段锁机制,采用 CAS(Compare - And - Swap)和 synchronized 来保证并发操作的线程安全。其底层数据结构为数组 + 链表 + 红黑树。当链表长度超过一定阈值(默认为 8)且数组长度达到 64 时,链表会转换为红黑树,以提高查找效率。

3.3 节点结构

JDK 8 中 ConcurrentHashMap 的节点结构主要有以下几种:

// 普通节点
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }

    public final K getKey()       { return key; }
    public final V getValue()     { return val; }
    public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
    public final String toString(){ return key + "=" + val; }
    public final V setValue(V value) {
        throw new UnsupportedOperationException();
    }

    public final boolean equals(Object o) {
        Object k, v, u; Map.Entry<?,?> e;
        return ((o instanceof Map.Entry) &&
                (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                (v = e.getValue()) != null &&
                (k == key || k.equals(key)) &&
                (v == (u = val) || v.equals(u)));
    }

    /**
     * Virtualized support for map.get(); overridden in subclasses.
     */
    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        }
        return null;
    }
}

// 树节点
static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;

    TreeNode(int hash, K key, V val, Node<K,V> next,
             TreeNode<K,V> parent) {
        super(hash, key, val, next);
        this.parent = parent;
    }

    Node<K,V> find(int h, Object k) {
        return findTreeNode(h, k, null);
    }

    /**
     * Returns the TreeNode (or null if not found) for the given key
     * starting at given root.
     */
    final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
        if (k != null) {
            TreeNode<K,V> p = this;
            do  {
                int ph, dir; K pk; TreeNode<K,V> q;
                TreeNode<K,V> pl = p.left, pr = p.right;
                if ((ph = p.hash) > h)
                    p = pl;
                else if (ph < h)
                    p = pr;
                else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                    return p;
                else if (pl == null)
                    p = pr;
                else if (pr == null)
                    p = pl;
                else if ((kc != null ||
                          (kc = comparableClassFor(k)) != null) &&
                         (dir = compareComparables(kc, k, pk)) != 0)
                    p = (dir < 0) ? pl : pr;
                else if ((q = pr.findTreeNode(h, k, kc)) != null)
                    return q;
                else
                    p = pl;
            } while (p != null);
        }
        return null;
    }
}

四、核心属性

// 存储数据的数组
transient volatile Node<K,V>[] table;
// 扩容时使用的辅助数组
private transient volatile Node<K,V>[] nextTable;
// 基础计数器,用于记录元素数量
private transient volatile long baseCount;
// 控制初始化和扩容操作的标志
private transient volatile int sizeCtl;
// 扩容时的索引
private transient volatile int transferIndex;
// 自旋锁,用于控制并发操作
private transient volatile int cellsBusy;
// 用于并发计数的数组
private transient volatile CounterCell[] counterCells;
  • table:存储键值对的数组,数组中的每个元素是一个链表或红黑树的头节点。
  • nextTable:在扩容时使用的辅助数组,扩容完成后会将其赋值给 table
  • baseCount:基础计数器,用于记录元素的数量。在没有并发竞争时,直接使用该计数器。
  • sizeCtl:控制初始化和扩容操作的标志。负数表示正在进行初始化或扩容操作,正数表示初始化容量或扩容阈值。
  • transferIndex:扩容时的索引,用于标记当前正在处理的桶的位置。
  • cellsBusy:自旋锁,用于控制并发操作,确保只有一个线程可以对 counterCells 数组进行初始化或扩容。
  • counterCells:用于并发计数的数组,当有并发竞争时,会使用该数组来记录元素数量。

五、构造方法

5.1 无参构造方法

public ConcurrentHashMap() {
}

无参构造方法不会立即初始化数组,而是在第一次插入元素时进行初始化。

5.2 指定初始容量的构造方法

public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

该构造方法允许指定初始容量,会根据初始容量计算出一个合适的数组大小,并将其赋值给 sizeCtl

5.3 指定初始容量和负载因子的构造方法

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

此构造方法允许指定初始容量和负载因子,内部调用另一个构造方法。

5.4 指定初始容量、负载因子和并发级别的构造方法

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

该构造方法允许同时指定初始容量、负载因子和并发级别,会根据这些参数计算出合适的数组大小,并将其赋值给 sizeCtl

import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentHashMapConstructors {
    public static void main(String[] args) {
        // 无参构造方法
        ConcurrentHashMap<String, Integer> concurrentHashMap1 = new ConcurrentHashMap<>();

        // 指定初始容量的构造方法
        ConcurrentHashMap<String, Integer> concurrentHashMap2 = new ConcurrentHashMap<>(20);

        // 指定初始容量和负载因子的构造方法
        ConcurrentHashMap<String, Integer> concurrentHashMap3 = new ConcurrentHashMap<>(15, 0.8f);

        // 指定初始容量、负载因子和并发级别的构造方法
        ConcurrentHashMap<String, Integer> concurrentHashMap4 = new ConcurrentHashMap<>(10, 0.75f, 2);

        System.out.println("ConcurrentHashMap1: " + concurrentHashMap1);
        System.out.println("ConcurrentHashMap2: " + concurrentHashMap2);
        System.out.println("ConcurrentHashMap3: " + concurrentHashMap3);
        System.out.println("ConcurrentHashMap4: " + concurrentHashMap4);
    }
}

六、常用操作原理

6.1 插入元素(put 方法)

public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

put 方法的主要步骤如下:

  1. 检查键和值是否为 null,如果为 null 则抛出 NullPointerException
  2. 计算键的哈希值。
  3. 如果数组未初始化,则调用 initTable 方法进行初始化。
  4. 如果对应桶为空,则使用 CAS 操作将新节点插入到桶中。
  5. 如果桶的头节点的哈希值为 MOVED,表示正在进行扩容操作,当前线程会协助进行扩容。
  6. 否则,对桶的头节点加锁,遍历链表或红黑树,查找键是否已经存在,如果存在则更新值,否则插入新节点。
  7. 如果链表长度超过阈值,则将链表转换为红黑树。
  8. 插入完成后,调用 addCount 方法更新元素数量。

6.2 获取元素(get 方法)

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

get 方法的主要步骤如下:

  1. 计算键的哈希值。
  2. 如果数组不为空且对应桶不为空,则获取桶的头节点。
  3. 如果头节点的哈希值等于键的哈希值,则检查键是否相等,如果相等则返回值。
  4. 如果头节点的哈希值小于 0,表示是红黑树节点或正在扩容,调用 find 方法查找值。
  5. 否则,遍历链表查找值。

6.3 删除元素(remove 方法)

public V remove(Object key) {
    return replaceNode(key, null, null);
}

final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode());
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            boolean validated = false;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        validated = true;
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            if (validated) {
                if (oldVal != null) {
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

remove 方法的主要步骤如下:

  1. 计算键的哈希值。
  2. 如果数组未初始化或对应桶为空,则直接返回 null
  3. 如果桶的头节点的哈希值为 MOVED,表示正在进行扩容操作,当前线程会协助进行扩容。
  4. 否则,对桶的头节点加锁,遍历链表或红黑树,查找键是否存在,如果存在则删除该节点。
  5. 删除完成后,调用 addCount 方法更新元素数量。

6.4 扩容操作

ConcurrentHashMap 的扩容操作是并发进行的,多个线程可以同时协助进行扩容。扩容时,会将原数组的元素重新分配到新数组中。主要步骤如下:

  1. 计算新数组的大小。
  2. 创建新数组。
  3. 多个线程同时对原数组的不同部分进行迁移,将元素重新分配到新数组中。
  4. 迁移完成后,将新数组赋值给 table
import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentHashMapOperations {
    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> concurrentHashMap = new ConcurrentHashMap<>();

        // 插入元素
        concurrentHashMap.put("apple", 1);
        concurrentHashMap.put("banana", 2);
        concurrentHashMap.put("cherry", 3);

        // 获取元素
        Integer value = concurrentHashMap.get("banana");
        System.out.println("Value of banana: " + value);

        // 删除元素
        concurrentHashMap.remove("cherry");
        System.out.println("After removing cherry: " + concurrentHashMap);
    }
}

七、性能分析

7.1 时间复杂度

  • 插入操作:平均情况下,插入操作的时间复杂度为 O(1)。在没有哈希冲突的情况下,插入操作可以在常数时间内完成。当链表长度超过阈值时,插入操作的时间复杂度会变为 O(log n)(红黑树的插入操作)。
  • 查找操作:平均情况下,查找操作的时间复杂度为 O(1)。同样,在没有哈希冲突的情况下,查找操作可以在常数时间内完成。当链表转换为红黑树后,查找操作的时间复杂度为 O(log n)。
  • 删除操作:平均情况下,删除操作的时间复杂度为 O(1)。在没有哈希冲突的情况下,删除操作可以在常数时间内完成。当链表转换为红黑树后,删除操作的时间复杂度为 O(log n)。

7.2 空间复杂度

ConcurrentHashMap 的空间复杂度为 O(n),主要用于存储数组、链表和红黑树节点。

八、注意事项

8.1 键和值不能为 null

ConcurrentHashMap 不允许键和值为 null,如果尝试插入 null 键或 null 值,会抛出 NullPointerException

8.2 弱一致性

ConcurrentHashMap 的迭代器是弱一致性的,这意味着在迭代过程中,如果其他线程对 ConcurrentHashMap 进行了修改,迭代器可能不会反映这些修改。

九、总结

ConcurrentHashMap 是 Java 并发编程中一个非常重要的数据结构,它通过 CAS 和 synchronized 机制保证了多线程环境下的线程安全,同时采用数组 + 链表 + 红黑树的数据结构提高了性能。在使用 ConcurrentHashMap 时,需要注意键和值不能为 null,以及迭代器的弱一致性。深入理解 ConcurrentHashMap 的原理和性能特点,有助于我们在实际开发中更好地利用它来处理并发场景下的键值对存储和操作。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值