注意:轻量级队列可以使用工具类,重量级数据量 请使用 MQ
本文章基于redis使用redisson客户端实现轻量级队列,以及代码、执行结果演示
一、常见队列了解
- 普通队列:先进先出(FIFO),只能在一端添加元素,在另一端移除元素。
- 循环队列:利用数组和取模运算实现队尾连接队首。
- 双端队列:两端都可以添加和移除元素。
- 优先级队列:根据元素的优先级顺序处理元素。
- 阻塞队列:在多线程中使用,队空时取元素会等待,队满时加元素会等待。
- 有限队列:队列长度固定,队满时新元素加入会导致队头元素自动移除。
二、工具类
基于redisson 实现的分布式工具类,copy即用
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class QueueUtils {
private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
/**
* 获取客户端实例
*/
public static RedissonClient getClient() {
return CLIENT;
}
/**
* 添加普通队列数据
*
* @param queueName 队列名
* @param data 数据
*/
public static <T> boolean addQueueObject(String queueName, T data) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
return queue.offer(data);
}
/**
* 通用获取一个队列数据 没有数据返回 null(不支持延迟队列)
*
* @param queueName 队列名
*/
public static <T> T getQueueObject(String queueName) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
return queue.poll();
}
/**
* 通用删除队列数据(不支持延迟队列)
*/
public static <T> boolean removeQueueObject(String queueName, T data) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
return queue.remove(data);
}
/**
* 通用销毁队列 所有阻塞监听 报错(不支持延迟队列)
*/
public static <T> boolean destroyQueue(String queueName) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
return queue.delete();
}
/**
* 添加延迟队列数据 默认毫秒
*
* @param queueName 队列名
* @param data 数据
* @param time 延迟时间
*/
public static <T> void addDelayedQueueObject(String queueName, T data, long time) {
addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS);
}
/**
* 添加延迟队列数据
*
* @param queueName 队列名
* @param data 数据
* @param time 延迟时间
* @param timeUnit 单位
*/
public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
delayedQueue.offer(data, time, timeUnit);
}
/**
* 获取一个延迟队列数据 没有数据返回 null
*
* @param queueName 队列名
*/
public static <T> T getDelayedQueueObject(String queueName) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
return delayedQueue.poll();
}
/**
* 删除延迟队列数据
*/
public static <T> boolean removeDelayedQueueObject(String queueName, T data) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
return delayedQueue.remove(data);
}
/**
* 销毁延迟队列 所有阻塞监听 报错
*/
public static <T> void destroyDelayedQueue(String queueName) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
delayedQueue.destroy();
}
/**
* 添加优先队列数据
*
* @param queueName 队列名
* @param data 数据
*/
public static <T> boolean addPriorityQueueObject(String queueName, T data) {
RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
return priorityBlockingQueue.offer(data);
}
/**
* 优先队列获取一个队列数据 没有数据返回 null(不支持延迟队列)
*
* @param queueName 队列名
*/
public static <T> T getPriorityQueueObject(String queueName) {
RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
return queue.poll();
}
/**
* 优先队列删除队列数据(不支持延迟队列)
*/
public static <T> boolean removePriorityQueueObject(String queueName, T data) {
RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
return queue.remove(data);
}
/**
* 优先队列销毁队列 所有阻塞监听 报错(不支持延迟队列)
*/
public static <T> boolean destroyPriorityQueue(String queueName) {
RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
return queue.delete();
}
/**
* 尝试设置 有界队列 容量 用于限制数量
*
* @param queueName 队列名
* @param capacity 容量
*/
public