一致性哈希HashRing
一致性哈希算法是一种高效的分布式存储和负载均衡技术,广泛应用于分布式系统中,如缓存集群、分布式数据库等。它通过将数据和节点映射到一个环形的哈希空间,实现了数据的均匀分布和节点的动态扩展。本文将详细介绍一致性哈希算法的原理,并通过一个完整的Java实现来展示其应用。
一、一致性哈希算法原理
一致性哈希算法的核心思想是将数据和节点映射到一个环形的哈希空间中。具体步骤如下:
1. 哈希环的构建:
- 将所有节点通过哈希函数映射到一个环形空间中,形成一个哈希环。
- 为了提高数据分布的均匀性,通常会为每个物理节点创建多个虚拟节点(Virtual Nodes),并将虚拟节点映射到哈希环上。
2. 数据分配:
- 当需要存储数据时,通过哈希函数计算数据的哈希值,并在哈希环上找到顺时针方向的第一个节点,将数据存储到该节点上。
- 如果哈希环为空,则数据无法存储。
3. 节点的添加与删除:
- 添加节点:新节点加入时,会为其分配虚拟节点,并将其添加到哈希环中。为了最小化数据迁移,通常会将部分数据从已有的节点迁移到新节点。
- 删除节点:节点删除时,会从哈希环中移除其对应的虚拟节点,并将这些虚拟节点上的数据重新分配到其他节点。
4. 数据查询:
查询数据时,通过哈希函数计算数据的哈希值,并在哈希环上找到顺时针方向的第一个节点,从而确定数据存储的位置。
一致性哈希算法的优点包括:
- 数据分布均匀。
- 节点的增删操作对数据分布的影响较小。
- 支持动态扩展。
二、Java实现
以下是一个基于Java的一致性哈希算法的实现,包括节点管理、数据分配、数据查询等功能。
import org.apache.commons.codec.digest.DigestUtils;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
public class ConsistentHashingSharding {
private final int virtualNodeSize;
private final ConcurrentSkipListMap<Long, String> hashRing = new ConcurrentSkipListMap<>();
private final Map<String, Set<String>> nodeDataMap = new ConcurrentHashMap<>();
private final Map<String, Set<String>> physicalNodeMap = new ConcurrentHashMap<>();
private final ReentrantLock lock = new ReentrantLock(false);
public ConsistentHashingSharding(int virtualNodeSize) {
this.virtualNodeSize = virtualNodeSize;
}
private long hash(String key) {
return DigestUtils.md5Hex(key).hashCode() & Integer.MAX_VALUE;
}
public void addNode(String node) {
lock.lock();
try {
if (!hashRing.containsValue(node)) {
Set<String> virtualNodes = new ConcurrentSkipListSet<>();
for (int i = 0; i < virtualNodeSize; i++) {
String virtualNode = node + "#v" + i;
long hashValue = hash(virtualNode);
hashRing.put(hashValue, virtualNode);
nodeDataMap.put(virtualNode, new ConcurrentSkipListSet<>());
virtualNodes.add(virtualNode);
}
physicalNodeMap.put(node, virtualNodes);
System.out.println("Added node: " + node);
if (physicalNodeMap.size() > 1 && virtualNodeSize > physicalNodeMap.size()) {
performMinimalDataMigration(node);
}
} else {
System.out.println("Node already exists: " + node);
}
} finally {
lock.unlock();
}
}
private void performMinimalDataMigration(String newNode) {
String firstVirtualNode = physicalNodeMap.get(newNode).iterator().next();
long newHash = hash(firstVirtualNode);
Map.Entry<Long, String> nextEntry = hashRing.higherEntry(newHash);
if (nextEntry == null) {
nextEntry = hashRing.firstEntry();
}
String targetNode = nextEntry.getValue();
Set<String> targetData = nodeDataMap.get(targetNode);
if (targetData != null && !targetData.isEmpty()) {
List<String> dataList = new ArrayList<>(targetData);
int halfSize = dataList.size() / 2;
for (int i = 0; i < halfSize; i++) {
String data = dataList.get(i);
targetData.remove(data);
nodeDataMap.get(firstVirtualNode).add(data);
}
}
System.out.println("Data migration completed. Half data migrated from " + targetNode + " to " + firstVirtualNode);
}
private String getAssignedNode(long dataHash) {
if (hashRing.isEmpty()) {
return null;
}
Map.Entry<Long, String> entry = hashRing.ceilingEntry(dataHash);
if (entry == null) {
entry = hashRing.firstEntry();
}
return entry.getValue();
}
public synchronized void addData(String data) {
long dataHash = hash(data);
String assignedNode = getAssignedNode(dataHash);
if (assignedNode != null) {
nodeDataMap.get(assignedNode).add(data);
System.out.println("Data '" + data + "' added to node: " + assignedNode);
} else {
System.out.println("No nodes available to store data.");
}
}
public String queryData(String data) {
long dataHash = hash(data);
String assignedNode = getAssignedNode(dataHash);
if (assignedNode != null && nodeDataMap.get(assignedNode).contains(data)) {
return "Data '" + data + "' is stored in node: " + assignedNode;
} else {
return "Data not found.";
}
}
public Set<String> findNodeData(String virtualNode) {
return nodeDataMap.getOrDefault(virtualNode, Collections.emptySet());
}
public void displayHashRingDistribution() {
if (hashRing.isEmpty()) {
System.out.println("Hash ring is empty.");
} else {
System.out.println(":::::::::::::::::::::::::::::Hash Ring Distribution:::::::::::::::::::::::::::::");
for (Map.Entry<Long, String> entry : hashRing.entrySet()) {
System.out.println("Hash: " + entry.getKey() + " -> Node: " + entry.getValue());
}
}
}
public void displayVNodesDataDistribution() {
System.out.println(":::::::::::::::::::::::::::::Virtual Node Distribution:::::::::::::::::::::::::::::");
for (Map.Entry<String, Set<String>> entry : physicalNodeMap.entrySet()) {
for (String virtualNode : entry.getValue()) {
Set<String> data = findNodeData(virtualNode);
System.out.println("Virtual node: " + virtualNode + " data size: " + data.size());
}
}
}
public Map<String, Set<String>> getPhysicalNodeMap() {
return physicalNodeMap;
}
public void removeNode(String node) {
lock.lock();
try {
if (!physicalNodeMap.containsKey(node)) {
System.out.println("Node not found: " + node);
return;
}
Set<String> virtualNodes = physicalNodeMap.get(node);
List<String> allData = new ArrayList<>();
for (String virtualNode : virtualNodes) {
allData.addAll(nodeDataMap.get(virtualNode));
}
for (String virtualNode : virtualNodes) {
hashRing.remove(hash(virtualNode));
nodeDataMap.remove(virtualNode);
}
physicalNodeMap.remove(node);
System.out.println("Removed node: " + node);
for (String data : allData) {
addData(data);
}
} finally {
lock.unlock();
}
}
}
三、测试用例
以下是使用JUnit框架为一致性哈希算法实现编写的测试用例。这些测试用例涵盖了主要功能,包括节点管理、数据分配、数据查询、节点增删等操作。
import org.junit.jupiter.api.*;
import static org.junit.jupiter.api.Assertions.*;
class ConsistentHashingShardingTest {
private ConsistentHashingSharding sharding;
@BeforeEach
void setUp() {
sharding = new ConsistentHashingSharding(3);
}
@Test
@DisplayName("测试添加物理节点")
void testAddNode() {
sharding.addNode("Node1");
sharding.addNode("Node2");
assertEquals(2, sharding.getPhysicalNodeMap().size(), "物理节点数量应为2");
assertEquals(3, sharding.getPhysicalNodeMap().get("Node1").size(), "Node1的虚拟节点数量应为3");
assertEquals(3, sharding.getPhysicalNodeMap().get("Node2").size(), "Node2的虚拟节点数量应为3");
sharding.addNode("Node1");
assertEquals(2, sharding.getPhysicalNodeMap().size(), "重复添加Node1后,物理节点数量仍应为2");
}
@Test
@DisplayName("测试删除物理节点")
void testRemoveNode() {
sharding.addNode("Node1");
sharding.addNode("Node2");
sharding.addData("Data1");
sharding.addData("Data2");
sharding.removeNode("Node1");
assertFalse(sharding.getPhysicalNodeMap().containsKey("Node1"), "Node1应被删除");
Set<String> remainingNodes = sharding.getPhysicalNodeMap().keySet();
for (String data : Arrays.asList("Data1", "Data2")) {
String assignedNode = sharding.queryData(data).split(" ")[3];
assertTrue(remainingNodes.contains(assignedNode), "数据应重新分配到剩余节点");
}
}
@Test
@DisplayName("测试数据分配")
void testAddData() {
sharding.addNode("Node1");
sharding.addNode("Node2");
sharding.addData("Data1");
sharding.addData("Data2");
String assignedNode1 = sharding.queryData("Data1").split(" ")[3];
String assignedNode2 = sharding.queryData("Data2").split(" ")[3];
assertNotNull(assignedNode1, "Data1应分配到某个节点");
assertNotNull(assignedNode2, "Data2应分配到某个节点");
Set<String> data1NodeData = sharding.findNodeData(assignedNode1);
Set<String> data2NodeData = sharding.findNodeData(assignedNode2);
assertTrue(data1NodeData.contains("Data1"), "Data1应存储在分配的虚拟节点中");
assertTrue(data2NodeData.contains("Data2"), "Data2应存储在分配的虚拟节点中");
}
@Test
@DisplayName("测试数据查询")
void testQueryData() {
sharding.addNode("Node1");
sharding.addNode("Node2");
sharding.addData("Data1");
sharding.addData("Data2");
String result1 = sharding.queryData("Data1");
String result2 = sharding.queryData("Data2");
assertTrue(result1.startsWith("Data 'Data1' is stored in node:"), "Data1查询结果应正确");
assertTrue(result2.startsWith("Data 'Data2' is stored in node:"), "Data2查询结果应正确");
String result3 = sharding.queryData("Data3");
assertEquals("Data not found.", result3, "Data3查询结果应为未找到");
}
@Test
@DisplayName("测试哈希环分布")
void testHashRingDistribution() {
sharding.addNode("Node1");
sharding.addNode("Node2");
Map<Long, String> hashRing = sharding.getPhysicalNodeMap().values().stream()
.flatMap(Collection::stream)
.map(virtualNode -> new AbstractMap.SimpleEntry<>(sharding.hash(virtualNode), virtualNode))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
assertEquals(6, hashRing.size(), "哈希环应包含6个虚拟节点");
}
@Test
@DisplayName("测试节点数据迁移")
void testMinimalDataMigration() {
sharding.addNode("Node1");
sharding.addNode("Node2");
for (int i = 0; i < 100; i++) {
sharding.addData("Data" + i);
}
sharding.addNode("Node3");
Map<String, Long> nodeDataCounts = new HashMap<>();
for (String data : sharding.nodeDataMap.values().stream().flatMap(Collection::stream).toList()) {
String assignedNode = sharding.queryData(data).split(" ")[3];
nodeDataCounts.merge(assignedNode, 1L, Long::sum);
}
long avgDataCount = nodeDataCounts.values().stream().mapToLong(Long::longValue).average().orElse(0);
assertTrue(avgDataCount > 0, "数据应均匀分布到各个节点");
}
}