今天学习了处理在分布式系统中使用缓存的会出现的问题(缓存黑洞,缓存雪崩,缓存穿透,缓存击穿)。整体还是有一些边路缓存思想在里面,下面是官方解释和本人的一些解释。
public class DetailsServiceImpl implements DetailsService {
@Autowired
private TbItemMapper itemMapper;
@Autowired
private TbItemDescMapper itemDescMapper;
@Autowired
private TbItemParamItemMapper itemParamItemMapper;
@Autowired
private RedisDao redisDao;//Redis数据库访问对象
@Value("${baizhan.details.itemKeyPrefix}")
private String itemKeyPrefix;
@Value("${baizhan.details.itemDescKeyPrefix}")
private String itemDescKeyPrefix;
@Value("${baizhan.details.itemParamItemKeyPrefix}")
private String itemParamItemKeyPrefix;
@Value("${baizhan.detailsLock.itemKeyPrefix}")
private String lockItemKeyPrefix;
@Value("${baizhan.detailsLock.itemDescKeyPrefix}")
private String lockItemDescKeyPrefix;
@Value("${baizhan.detailsLock.itemParamItemKeyPrefix}")
private String lockItemParamItemKeyPrefix;
//随机工具
private final Random random = new Random();
//随机范围,可以配置
private final int randTimes = 3*60*60;
//记录分布式锁申请的自旋次数
private final ThreadLocal<Integer> times = new ThreadLocal<>();
/**
* 主键查询商品
* 增加缓存逻辑
* 流程:
* 1. 根据key访问redis,查询缓存的商品,如果有则直接返回,后续流程不再运行
* 2. 如果缓存中没有商品数据,则访问数据库查询商品
* 3. 把数据库查询的商品数据,保存到redis数据库中
* 4. 返回最终结果
* 注意:整个代码逻辑,不能因为Redis数据库访问错误,有任何的中断可能
*
* 缓存问题:只要是在分布式系统环境中增加缓存,一定会有各种问题发生
* 1. 缓存黑洞:无限制的在缓存中,保存永久数据,导致缓存服务器不断扩容
* 解决:增加缓存有效期
* 2. 缓存雪崩:如果在某一时刻,大面积缓存数据同时失效,再加上高并发查询请求
* 导致数据库短时间海量处理,衍生出的任何问题,都是缓存雪崩;
* 解决:使用定长+随机的缓存有效期
* 3. 缓存穿透:如果某一个缓存的键值对数据过期了,且有高并发请求查询这个缓存对应的数据,
* 导致数据库短时间海量处理,衍生出的问题,都是缓存穿透.
* 解决:把高并发,转成单请求, 分布式锁+自旋
* 分布式锁:可以在分布式环境和集群环境中生效的锁,是一个全局锁,可以跨进程使用.
* 实现:找一个公共的标记位置即可
* 注意:自旋计数,不能简单的使用基础循环判断.当所有的自旋次数使用完毕后,
* 一定要返回降级数据,不要执行后续的数据库访问
* 4. 缓存击穿:如果有高并发的请求,查询一个不存在的数据.请求一定会绕过缓存,
* 进入数据库,执行查询逻辑,导致数据库压力升高,衍生出的任何问题,都是缓存击穿.
*
* 说白了就是把这些没有的数据的id作为key value是你的商品对象,因为是要存在redis中的
* 商品对象的ID重复了也无所谓,不是存在数据库中的,假如都设置成-1,来几个存几个,下次来了就
* 直接访问redis了.今天上课认为成 每个key都是-1了 那样加没加缓存是一样没效果的.
* 设置成-1后, 再次访问后,从redis中查出来再看ID是-1 那可以认定为这个商品是不存在的.
* 直接返回降级数据
*
* 解决:即使查询的数据不存在,也要缓存,缓存一个短期的无效数据.
* @param id
* @return
*/
@Override
public BaizhanResult getItemById(Long id) {
//拼接key
String key = itemKeyPrefix + id;
//访问Redis数据库,查询缓存数据
try {
TbItem cache = redisDao.get(key);
//判断缓存是否存在
if (cache != null && cache.getId() > 0L){//这里判断id是为了判断这个商品是否存在,前提是redis里面有
//缓存存在,直接返回
return BaizhanResult.ok(cache);
}else if (cache != null && cache.getId() <= 0L){ //这里的判断解决的是缓存击穿
//有缓存,缓存的是一个无效的商品(Id是-1的都是没有的商品,
// 因为这是存在redis里面的 key不同就可以,所以Value的ID
// 都可以设置成-1),缓存击穿解决方案缓存的数据
return BaizhanResult.error("商品已下架,请重新搜索");
}else {//解决缓存穿透问题
while (true){
//从ThreadLoacl中,获取当前线程的自旋的次数
Integer t = times.get();
//判断自旋了几次
if (t ==null ){
//没有自旋过
t = 0;
}
if (t >= 3 ){
//返回降级数据前,一定要清除当前线程对应的自旋次数统计变量
times.remove();
//自旋了超过3次,不在自旋等待.直接返回一个降级数据
return BaizhanResult.error("服务器忙,请稍后重试");
}
//缓存不存在,需要访问数据库查询商品,可能发生缓存穿透
// 获取分布式锁,锁最多保存2秒,避免可能发生的超时或其他代码错误
boolean isLocked = redisDao.setNx(lockItemKeyPrefix + id,"",
2L,TimeUnit.SECONDS);
//自旋结束后,改变ThreadLocal中记录的当前线程自旋次数
t = t +1;
//把改变后的自旋次数保存到ThreadLocal中
times.set(t);
if (!isLocked){
//没有锁,自旋等待100毫秒,等待后,重新访问缓存,如果有数据返回
//如果没有缓存数据,继续尝试获取分布式锁.
Thread.sleep(100);
return getItemById(id);//递归调用本方法,从头开始执行
}else {
//获取到分布式锁,跳出无限循环
break;
}
}
}
}catch (Exception e){
e.printStackTrace(); //日志记录Redis访问错误
}
TbItem item = itemMapper.selectById(id);
//把MySQL数据库的查询结果,保存到Redis数据库中
try {
if (item == null){
//要查询的商品数据不存在,缓存一个短期有效的,没有任何特征的商品数据
item = new TbItem();
item.setId(-1L);//把ID设成-1存到Redis中 key不同 解决了缓存击穿问题
redisDao.set(key,item,1L,TimeUnit.MINUTES);
}else {
redisDao.set(key,item,7L*24*60+random.nextInt(randTimes),
TimeUnit.SECONDS);//解决了缓存黑洞和缓存雪崩问题
}
//执行到当前逻辑的代码,一定是获取到了分布式锁,必须释放分布式锁
redisDao.delete(lockItemKeyPrefix+id);
}catch (Exception e){
e.printStackTrace(); //日志记录redis访问错误
}
//当前线程执行到此位置,有可能经历过自旋,那么对应的ThreadLocal中记录的
//自旋次数是存在的,需要删除.
times.remove();
//返回最终结果
return BaizhanResult.ok(item);
}