序
虽然说发现我打脸了,只要 Spring-Amqp大于1.6 就能解决,我在思路1中提出的第二个问题。但是咱们研究技术,不只是为了解决问题,还得有点自己的追求,提升自己的技术(其实是大活说出去了,不撞南墙不回头,并且已经有思路了,这个思路不落地总感觉憋屈),咱们要为 Spring-Amqp 小于1.6的解决这个问题。
思路3-1
Spring-Amqp1.4 之前,没有@RabbitListener
,所以要是想监听队列,常用的有两种方式, 一种是在 Xml 中配置 <rabbit:listener-container>
和 <rabbit:listener>
,或者手动构建 SimpleMessageListenerContainer
。二者本质是一样的,其中 监听器 必须继承MessageListener
或者 ChannelAwareMessageListener
, 其中将 Message
转成 Bean 是个共性的问题,所以我第一个的想法就是,使用 “模板方法模式”,构建一个抽象父类,将转换逻辑抽取出来,类似于MessageListenerAdapter
,但是MessageListenerAdapter
转换 Bean 必须 __TypeID__ 一样才行。
具体实现
我为了简单和功能强大,只实现了 ChannelAwareMessageListener
,其实MessageListener
的实现 没有什么区别。
/**
* 将Message转成对应Bean的父类,提供模板方法
*
* @author MRZC
*
* @param <T> 需要转换的类型
*/
@Slf4j
public abstract class JsonMessageListener<T> implements ChannelAwareMessageListener
{
// 我是使用 Jackson2解析转换Json,如果不用这个请修改
@Autowired
private ObjectMapper jsonObjectMapper;
// 需要将Message 转换成Bean的 类型
Class<T> cls;
// 子类将转换的类型传给父类
public JsonMessageListener(Class<T> cls)
{
this.cls = cls;
}
@Override
public void onMessage(Message message, Channel channel)
{
MessageProperties properties = message.getMessageProperties();
String contentType = properties.getContentType();
if (contentType != null && contentType.contains("json"))
{
String encoding = properties.getContentEncoding();
if (encoding == null)
{
encoding = getDefaultCharset();
}
try
{
// 将 Message转换成Bean 传给 子类
onBean(convertBytesToBean(message.getBody(), encoding), channel);
}
catch (IOException e)
{
throw new MessageConversionException("Failed to convert Message content", e);
}
}
else
{
if (log.isWarnEnabled())
{
log.warn("Could not convert incoming message with content-type [" + contentType + "]");
}
}
}
private T convertBytesToBean(byte[] body, String encoding) throws IOException
{
String contentAsString = new String(body, encoding);
return this.jsonObjectMapper.readValue(contentAsString, cls);
}
// 子类要实现的抽象函数,用来接受转换后的Bean
protected abstract void onBean(T t, Channel channel);
public String getDefaultCharset()
{
return "UTF-8";
}
}
使用案例
监听器的实现
@Component("bean1Listener")
public class Bean1Listener extends JsonMessageListener<Bean1>
{
public LPInfoListener() throws NoSuchMethodException, SecurityException
{
// 将 需要转换的类型 传给父类
super(Bean1.class);
}
// 获取到的就是转换后的对象
@Override
protected void onBean(LPInfos t, Channel channel)
{
System.out.println(t.batchNo);
}
@Setter
@NoArgsConstructor
public static class Bean1
{
String batchNo;
}
}
监听器的使用
<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="queue.name"
ref="bean1Listener" />
</rabbit:listener-container>
思路3-2
毕竟 Xml 配置太麻烦了,还是注解扫描更方便一点。既然 Spring-Amqp 1.4 之前没有@RabbitListener
,那咱们就通过猜测 高版本的注解实现原理,来自己在低版本实现,加强对 源码的理解。
具体思路概述:扫描自定义注解 标注的函数,然后动态生成SimpleMessageListenerContainer
,并注册到 Spring 容器中。这次 用点新东西,来实现 注解扫描。
Reflections
在研究 Spring 注解扫描机制的时候,无意间发现了 Reflections,非常好用的反射框架。
简单介绍
Reflections通过扫描classpath,索引元数据,并且允许在运行时查询这些元数据。
使用Reflections可以很轻松的获取以下元数据信息:
- 获取某个类型的所有子类。
- 获取某个注解的所有类型/字段变量,支持注解参数匹配。
- 使用正则表达式获取所有匹配的资源文件
- 获取所有具有特定签名的方法,包括参数、参数注释和返回类型。
常用样例
我为了兼容 老项目,使用的是 reflections-0.9.7.jar,介绍中的部分特性还不支持。当前最新版本是 0.9.12,支持介绍的全部特性。
Maven 项目导入
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.7</version>
</dependency>
Reflections reflections = new Reflections(new ConfigurationBuilder()
.forPackages("com.xxx") // 指定要扫描的包名
.addScanners(new SubTypesScanner()) // 添加子类扫描工具
.addScanners(new FieldAnnotationsScanner()) // 添加 属性注解扫描工具
.addScanners(new MethodAnnotationsScanner() ) // 添加 方法注解扫描工具
.addScanners(new MethodParameterScanner() ) // 添加方法参数扫描工具
.addScanners(new TypeAnnotationsScanner() ) // 添加 类型注解扫描工具
.addScanners(new ResourcesScanner() ) // 添加 不是类的资源扫描工具
.addScanners(..) // 扫描工具,不是必选的,但是你使用某些函数,必选传入对应的扫描工具,可以查看源码,或者看官方文档
);
// 反射出子类 SubTypesScanner
Set<Class<? extends ISayHello>> set = reflections.getSubTypesOf( ISayHello.class ) ;
System.out.println("getSubTypesOf:" + set);
// 反射出带有指定注解的类 TypeAnnotationsScanner
Set<Class<?>> ss = reflections.getTypesAnnotatedWith( MyAnnotation.class );
System.out.println("getTypesAnnotatedWith:" + ss);
// 获取带有特定注解对应的方法 MethodAnnotationsScanner
Set<Method> methods = reflections.getMethodsAnnotatedWith( MyMethodAnnotation.class ) ;
System.out.println("getMethodsAnnotatedWith:" + methods);
// 获取带有特定注解对应的字段 FieldAnnotationsScanner
Set<Field> fields = reflections.getFieldsAnnotatedWith( Autowired.class ) ;
System.out.println("getFieldsAnnotatedWith:" + fields);
// 获取特定参数对应的方法 MethodParameterScanner
Set<Method> someMethods = reflections.getMethodsMatchParams(long.class, int.class);
System.out.println("getMethodsMatchParams:" + someMethods);
// 扫描资源 ResourcesScanner
Set<String> properties = reflections.getResources(Pattern.compile(".*\\.properties"));
System.out.println("getResources:" + properties );
参考资料
reflections github
Examples for SubTypesScanner
Examples for TypeAnnotationsScanner
Examples for ResourcesScanner
Examples for MethodAnnotationsScanner
具体实现
为了简单,只实现最基本的功能,可以满足80%的要求,而没有实现@RabbitListener
全部功能。
由于 只实现了 Json 转换,不支持其他 contType 的转换,所以类名全加了 Json 用来标识。
注解:
/**
* 将方法标记为RabbitMQ message 的监听函数,支持将json反序列化成自定义Bean的参数
*
* @author mrzc
*
*/
@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
public @interface JsonRabbitListener
{
/**
* 需要监听的队列名
*
* @return
*/
String[] queues() default {};
/**
* {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
* 的 BeanName,用于创建负责服务于此端点的消息侦听器容器
*
* @return
*/
String connectionFactory() default "";
}
适配器,将 使用 自定义 Bean 作为参数 的监听函数,适配成 MessageListener
/**
* 消息侦听器适配器,通过反射,将消息处理委托给目标函数,并通过函数参数类型进行类型转换
*
* @author MRZC
*
*/
public class JsonMessageListenerAdapter implements ChannelAwareMessageListener
{
@Setter
private ObjectMapper jsonObjectMapper = new ObjectMapper();
// 目标对象
private final Object delegate;
// 具体的监听函数
private final Method listener;
// 函数的参数类型类别
private final Class<?>[] parameterTypes;
// 唯一的需要进行转换的参数类型
private final JavaType javaType;
private JsonMessageListenerAdapter(Object delegate, Method listener)
{
this.delegate = delegate;
this.listener = listener;
parameterTypes = listener.getParameterTypes();
javaType = getJavaType(parameterTypes);
}
/**
* 获取需要转换的参数类型
*
* @param parameterTypes
* @return
*/
private JavaType getJavaType(Class<?>[] parameterTypes)
{
JavaType javaType = null;
if (parameterTypes == null || parameterTypes.length > 3 || parameterTypes.length == 0)
{
throw new IllegalArgumentException("监听函数参数数量错误");
}
int validParamterNum = 0;
for (Class<?> cls : parameterTypes)
{
if (cls == Message.class || cls == Channel.class)
{
validParamterNum++;
continue;
}
if (javaType != null)
{
continue;
}
javaType = TypeFactory.defaultInstance().constructType(cls);
// 为了实现简单,不支持容器转换,如果需要传容器,请在外面再包一层Bean
if (javaType.isContainerType() || javaType.isArrayType())
{
throw new IllegalArgumentException("监听函数参数类型错误,不支持数组和容器");
}
validParamterNum++;
continue;
}
// 为了逻辑简单,只能有一个需要转换的类型
if (validParamterNum != parameterTypes.length)
{
throw new IllegalArgumentException("监听函数参数数量错误,只支持转换一个自定义对象");
}
return javaType;
}
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
// 调用目标函数
Object convertedMessage = convertMessageToBean(message);
listener.invoke(delegate, buildArguments(message, channel, convertedMessage));
}
/**
* 构造目标函数参数列表,根据参数类型顺序,按位填充
*
* @param message
* @param channel
* @param convertedMessage
* @return
*/
protected Object[] buildArguments(Message message, Channel channel, Object convertedMessage)
{
List<Object> parameters = new ArrayList<Object>(parameterTypes.length);
for (Class<?> cls : parameterTypes)
{
if (cls == Message.class)
{
parameters.add(message);
}
else if (cls == Channel.class)
{
parameters.add(channel);
}
else
{
parameters.add(convertedMessage);
}
}
return parameters.toArray();
}
/**
* 将 Message转换成对应的Bean
*
* @param message
* @return
* @throws IOException
*/
private Object convertMessageToBean(Message message) throws IOException
{
MessageProperties properties = message.getMessageProperties();
String contentType = properties.getContentType();
if (contentType != null && contentType.contains("json"))
{
String encoding = properties.getContentEncoding();
if (encoding == null)
{
encoding = getDefaultCharset();
}
String contentAsString = new String(message.getBody(), encoding);
return this.jsonObjectMapper.readValue(contentAsString, javaType);
}
return message;
}
public String getDefaultCharset()
{
return "UTF-8";
}
public static JsonMessageListenerAdapter of(Object delegate, Method listener)
{
return new JsonMessageListenerAdapter(delegate, listener);
}
}
注解扫描,和动态注册Bean
/**
* 扫描 JsonRabbitListener,动态注册监听器的实现
*
* @author mrzc
*
*/
@Slf4j
@Configuration
public class RabbitConfig implements BeanFactoryPostProcessor
{
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException
{
// 扫描注解标识的 监听函数
Reflections reflections = new Reflections("com.xxx", new MethodAnnotationsScanner());
Set<Method> methods = reflections.getMethodsAnnotatedWith(JsonRabbitListener.class);
if (methods.isEmpty())
{
return;
}
try
{
registerListener(beanFactory, methods);
}
catch (InstantiationException e)
{
e.printStackTrace();
}
catch (IllegalAccessException e)
{
e.printStackTrace();
}
}
/**
* 注册监听容器
*
* @param beanFactory
* @param methods
* @throws InstantiationException
* @throws IllegalAccessException
*/
private void registerListener(ConfigurableListableBeanFactory beanFactory, Set<Method> methods)
throws InstantiationException, IllegalAccessException
{
// 缓存同一个类的实例,当同一个类 有多个监听函数时,复用实例
Map<Class<?>, Object> cached = new HashMap<>();
int i = 0;
for (Method method : methods)
{
if (!cached.containsKey(method.getDeclaringClass()))
{
// 监听函数所属对象实例化
cached.put(method.getDeclaringClass(), method.getDeclaringClass().newInstance());
}
// 获取 注解
JsonRabbitListener an = method.getAnnotation(JsonRabbitListener.class);
// 获取队列所属 的 连接工厂
ConnectionFactory connectionFactory = beanFactory.getBean(an.connectionFactory(), ConnectionFactory.class);
// 构建监听容器
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(connectionFactory);
listenerContainer.setQueueNames(an.queues());
listenerContainer.setMessageListener(JsonMessageListenerAdapter.of(cached.get(method.getDeclaringClass()),
method));
String beanName = "simpleMessageListenerContainer#json#" + (i++);
// 由于实例是自己创建的,需要手动调用initializeBean,实现还原容器帮我们自动干的事情,例如 Aware注入
beanFactory.initializeBean(listenerContainer, beanName);
// 将实例注册到容器中
beanFactory.registerSingleton(beanName, listenerContainer);
}
}
}
使用案例
public class Bean1Listener
{
@JsonRabbitListener(connectionFactory = "connectionFactory", queues = "queue.name")
public void onBean(Bean1 bean)
{
System.out.println(bean);
}
@Setter
@NoArgsConstructor
public static class Bean1
{
String batchNo;
}
}
总结
至此关于 SpringBoot集成RabbitMQ序列化和反序列化 就已经完毕了,思路1 中提出的两个都已经解决了。对我现在维护的老项目来说, 思路3-2 灵活性已经绰绰有余了,所以我并没有 实现 Spring-Amqp1.4 之后,跟 注解相关的全量功能。
填了一个坑,我再列一下,现在还有多少坑:
- Spring 通过注解扫描 Bean 的源码分析
- starter 机制,Maven 私服,Maven 发布,Maven 中央仓库。
- 我构建
MessageConverter
的时候,为什么ObjectMapper
是注入的,而不是自己构建。 - mongodb通过索引,每个文档都可以配置多个超时方式,可以有效期,也可以过了某个时间点就失效,在某种特殊场景下,MongoDB比redis缓存失效更灵活,但是 SpringBoot 并没有提供默认的
CacheManager
,研究一下看看能否实现,同时用 AOP 模拟类似的功能,加强理解。 - 类似于 SpringBoot 的 main 函数启动服务器。
- ……我还有其他想写,但是不能同时挖太多的坑,那会让我跳不出来的,先填了再挖。
暂时休息几天,最近为了写博客,已经连续熬夜好几天的, 有点扛不住了。至少下周一肯定会重新开始写。