ShardingSphere复合分片之hash槽算法

前言

  上一篇《ShardingSphere复合分片》中有详细介绍多key多value的复合分片算法应该如何设计,在大部分情况下该算法是没有问题的,但是一旦涉及到数据迁移时,该算法的缺点就暴露无疑了。

为满足日益增长的用户或者订单的需求,在分库分表的同时,我们不可避免的会对数据库进行扩容,这时就涉及到数据迁移,但是上一篇最终采用的是求模取余的算法对数据库进行分片,该算法一旦需要增加或者删减数据库服务器的时候,几乎要使所有数据进行迁移,影响面特别广,且不好控制,容易出错。为解决这一弊端,我们可以借鉴redis中多主多从集群中的数据分片策略,即hash槽策略来优化我们mysql的分片策略。

hash槽算法

  首先该算法会分配16384|(2^14)个槽,通过hash函数,将数据均匀分布在这些槽,针对分片,我们只需要关注将这16384槽如何分配到具体的分片信息上。如果用户需要增加分片,那么我们只需要移动一部分槽到新的分片上就完成了数据的迁移,其他大部分槽是不需要移动的,这就避免了大规模的数据迁移。用户如果需要减少分片也类似。

将Hash槽算法引入到ShardingSphere

  在上一篇我们介绍到,通过生成分布式id,同时让这个分布式id记录所处的库以及所处的表,也就是记录所处分片,以完成整个分片逻辑。使用Hash槽算法也类似,在生成分布式订单id过程中,通过传入的relatedId进行hash运算后,计算得到的hash值应该处于哪个槽,再由具体的槽计算出应该划分到哪个分片,最后将分片信息保存到分布式id中。以上这样就完成了分片逻辑的计算,后续的ShardingSphere逻辑和上一篇一模一样,这里就不再冗余了。

  不过需要注意一点,由于数据迁移时,是根据槽信息来迁移的,所以在信息入库时我们也需要保存槽信息,也就是说我们生成的分布式id需要包含槽信息,以便数据迁移。

具体部分实现代码如下所示:

分布式订单id生成类:


/** 根据Hash槽生成分布式id,同时解决复合key的分库分表问题,以及在增删节点时,求模取余算法在分库分表时大规模的数据迁移问题。
 *
 *  该算法借鉴redis hash槽
 * @author chenjian
 * @version 1.0
 * @date 2025/04/15 10:17
 * @className HashSlotKeyGenerator
 * @desc 自定义分布式主键生成器
 */
@Component
public class HashSlotKeyGenerator implements KeyGenerator {
    @Autowired
    private AppProperties appProperties;

    @Resource(name = "hashSlotDefaultShardingStrategy")
    private HashSlotAllocationStrategy hashSlotAllocationStrategy;

    @Autowired
    private SequenceGenerator sequenceGenerator;

    @Override
    public String generateKey(DbAndTableEnum targetEnum, String relateId) {
        if (StringUtils.isBlank(relateId)) {
            throw new IllegalArgumentException("路由id参数为空");
        }

        Map<String,String> map = null;
        int slot = crc16Hash(relateId) % SLOT_COUNT;
        // 根据slot分配数据库节点

        map =  hashSlotAllocationStrategy.shardingBySlot(slot,Integer.parseInt(appProperties.getShardingNodes().getAmount()),DbAndTableEnum.T_ORDER);
        StringBuilder key = new StringBuilder();
        /* 总共37位 */
        /** 1.id业务前缀*/
        String idPrefix = targetEnum.getCharsPrefix();
        /** 2.id数据库索引位*/
        String dbIndex = map.get(ShardingConstant.DB_INDEX);
        /** 3.id表索引位*/
        String tbIndex = map.get(ShardingConstant.TB_INDEX);
        /** 4.id规则版本位*/
        String idVersion = targetEnum.getIdVersion();
        /** 5.id时间戳位*/
        String timeString = DateUtils.formatTime(new Date());
        /** 6.id分布式机器位 2位*/
        String distributedIndex = ApiUtils.getDistributedId(2);
        /** 7.随机数位*/
        String sequenceId = sequenceGenerator.getNextVal(targetEnum, Integer.parseInt(dbIndex), Integer.parseInt(tbIndex));
        /** 8.5位slot值,用于保存到数据库,方便数据迁移*/
        String hashSlot = StringUtil.fillZero(String.valueOf(slot), ShardingConstant.DB_SUFFIX_LENGTH);

        /** 库表索引靠前*/
        return key.append(idPrefix)
                .append(dbIndex)
                .append(tbIndex)
                .append(idVersion)
                .append(timeString)
                .append(distributedIndex)
                .append(sequenceId)
                .append(hashSlot).toString();
    }

    private int crc16Hash(String key) {
        int crc = 0xFFFF;
        int polynomial = 0x1021;
        for (byte b : key.getBytes()) {
            crc ^= (b & 0xFF);
            for (int i = 0; i < 8; i++) {
                if ((crc & 0x0001) != 0) {
                    crc = (crc >>> 1) ^ polynomial;
                } else {
                    crc = crc >>> 1;
                }
            }
        }
        return crc;
    }
}

根据槽信息获取具体分片

​

@Service("hashSlotDefaultShardingStrategy")
public class HashSlotDefaultShardingStrategy implements HashSlotAllocationStrategy{
    @Override
    public Map<String, String> shardingBySlot(int slot, int shardingAmount, DbAndTableEnum targetEnum) {
        if (slot < 0 || slot >= SLOT_COUNT) {
            throw new IllegalArgumentException("槽号超出范围");
        }

        int baseSlotsPerShard = SLOT_COUNT / shardingAmount;
        int remainingSlots = SLOT_COUNT % shardingAmount;

        int shardIndex;
        if (slot < remainingSlots * (baseSlotsPerShard + 1)) {
            shardIndex = slot / (baseSlotsPerShard + 1);
        } else {
            shardIndex = remainingSlots + (slot - remainingSlots * (baseSlotsPerShard + 1)) / baseSlotsPerShard;
        }

        Map<String,String> map = new HashMap<>(2);
        String preDbIndex = String.valueOf(StringUtil.getDbIndexByMod(shardIndex,targetEnum.getDbCount(),targetEnum.getTbCount()));
        String dbIndex = StringUtil.fillZero(preDbIndex, ShardingConstant.DB_SUFFIX_LENGTH);
        /** 获取表索引*/
        String preTbIndex = String
                .valueOf(StringUtil.getTbIndexByMod(shardIndex,targetEnum.getDbCount(),targetEnum.getTbCount()));
        String tbIndex = StringUtil
                .fillZero(preTbIndex,ShardingConstant.TABLE_SUFFIX_LENGTH);
        map.put(ShardingConstant.DB_INDEX, dbIndex);
        map.put(ShardingConstant.TB_INDEX, tbIndex);
        return map;
    }
}

​

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值