优雅的解决SpringBoot集成RabbitMQ序列化和反序列化的思路3 与Reflections简单介绍

本文详细介绍了如何在SpringBoot项目中集成RabbitMQ,包括序列化和反序列化的解决方案,以及在不同版本的Spring-Amqp中实现监听器的方法。提供了自定义注解和反射框架Reflections的应用实例。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

虽然说发现我打脸了,只要 Spring-Amqp大于1.6 就能解决,我在思路1中提出的第二个问题。但是咱们研究技术,不只是为了解决问题,还得有点自己的追求,提升自己的技术(其实是大活说出去了,不撞南墙不回头,并且已经有思路了,这个思路不落地总感觉憋屈),咱们要为 Spring-Amqp 小于1.6的解决这个问题。

思路3-1

Spring-Amqp1.4 之前,没有@RabbitListener,所以要是想监听队列,常用的有两种方式, 一种是在 Xml 中配置 <rabbit:listener-container><rabbit:listener>,或者手动构建 SimpleMessageListenerContainer。二者本质是一样的,其中 监听器 必须继承MessageListener 或者 ChannelAwareMessageListener, 其中将 Message 转成 Bean 是个共性的问题,所以我第一个的想法就是,使用 “模板方法模式”,构建一个抽象父类,将转换逻辑抽取出来,类似于MessageListenerAdapter,但是MessageListenerAdapter转换 Bean 必须 __TypeID__ 一样才行。

具体实现

我为了简单和功能强大,只实现了 ChannelAwareMessageListener,其实MessageListener的实现 没有什么区别。


/**
 * 将Message转成对应Bean的父类,提供模板方法
 * 
 * @author MRZC
 * 
 * @param <T> 需要转换的类型
 */
@Slf4j
public abstract class JsonMessageListener<T> implements ChannelAwareMessageListener
{

    // 我是使用 Jackson2解析转换Json,如果不用这个请修改
    @Autowired
    private ObjectMapper jsonObjectMapper;

    // 需要将Message 转换成Bean的 类型
    Class<T> cls;

    // 子类将转换的类型传给父类
    public JsonMessageListener(Class<T> cls)
    {
        this.cls = cls;
    }

    @Override
    public void onMessage(Message message, Channel channel)
    {
        MessageProperties properties = message.getMessageProperties();
        String contentType = properties.getContentType();
        if (contentType != null && contentType.contains("json"))
        {
            String encoding = properties.getContentEncoding();
            if (encoding == null)
            {
                encoding = getDefaultCharset();
            }
            try
            {
                // 将 Message转换成Bean 传给 子类
                onBean(convertBytesToBean(message.getBody(), encoding), channel);
            }
            catch (IOException e)
            {
                throw new MessageConversionException("Failed to convert Message content", e);
            }
        }
        else
        {
            if (log.isWarnEnabled())
            {
                log.warn("Could not convert incoming message with content-type [" + contentType + "]");
            }
        }
    }

    private T convertBytesToBean(byte[] body, String encoding) throws IOException
    {
        String contentAsString = new String(body, encoding);
        return this.jsonObjectMapper.readValue(contentAsString, cls);
    }

    // 子类要实现的抽象函数,用来接受转换后的Bean
    protected abstract void onBean(T t, Channel channel);

    public String getDefaultCharset()
    {
        return "UTF-8";
    }
}

使用案例

监听器的实现

@Component("bean1Listener")
public class Bean1Listener extends JsonMessageListener<Bean1>
{
    public LPInfoListener() throws NoSuchMethodException, SecurityException
    {
    	// 将 需要转换的类型 传给父类
        super(Bean1.class);
    }

	// 获取到的就是转换后的对象
    @Override
    protected void onBean(LPInfos t, Channel channel)
    {
        System.out.println(t.batchNo);
    }

    @Setter
    @NoArgsConstructor
    public static class Bean1
    {
        String batchNo;
    }
}

监听器的使用

    <rabbit:listener-container
        connection-factory="connectionFactory" acknowledge="auto">
        <rabbit:listener queues="queue.name"
            ref="bean1Listener" />
    </rabbit:listener-container>

思路3-2

毕竟 Xml 配置太麻烦了,还是注解扫描更方便一点。既然 Spring-Amqp 1.4 之前没有@RabbitListener,那咱们就通过猜测 高版本的注解实现原理,来自己在低版本实现,加强对 源码的理解。
具体思路概述:扫描自定义注解 标注的函数,然后动态生成SimpleMessageListenerContainer,并注册到 Spring 容器中。这次 用点新东西,来实现 注解扫描。

Reflections

在研究 Spring 注解扫描机制的时候,无意间发现了 Reflections,非常好用的反射框架。

简单介绍

Reflections通过扫描classpath,索引元数据,并且允许在运行时查询这些元数据。

使用Reflections可以很轻松的获取以下元数据信息:

  • 获取某个类型的所有子类。
  • 获取某个注解的所有类型/字段变量,支持注解参数匹配。
  • 使用正则表达式获取所有匹配的资源文件
  • 获取所有具有特定签名的方法,包括参数、参数注释和返回类型。
常用样例

我为了兼容 老项目,使用的是 reflections-0.9.7.jar,介绍中的部分特性还不支持。当前最新版本是 0.9.12,支持介绍的全部特性。
Maven 项目导入

<dependency>
    <groupId>org.reflections</groupId>
    <artifactId>reflections</artifactId>
    <version>0.9.7</version>
</dependency>
        Reflections reflections = new Reflections(new ConfigurationBuilder()
                .forPackages("com.xxx") // 指定要扫描的包名
                .addScanners(new SubTypesScanner()) // 添加子类扫描工具
                .addScanners(new FieldAnnotationsScanner()) // 添加 属性注解扫描工具
                .addScanners(new MethodAnnotationsScanner() ) // 添加 方法注解扫描工具
                .addScanners(new MethodParameterScanner() ) // 添加方法参数扫描工具
                .addScanners(new TypeAnnotationsScanner() ) // 添加 类型注解扫描工具
                .addScanners(new ResourcesScanner() ) // 添加 不是类的资源扫描工具
                .addScanners(..) // 扫描工具,不是必选的,但是你使用某些函数,必选传入对应的扫描工具,可以查看源码,或者看官方文档
                );

        // 反射出子类 SubTypesScanner
        Set<Class<? extends ISayHello>> set = reflections.getSubTypesOf( ISayHello.class ) ;
        System.out.println("getSubTypesOf:" + set);

        // 反射出带有指定注解的类 TypeAnnotationsScanner 
        Set<Class<?>> ss = reflections.getTypesAnnotatedWith( MyAnnotation.class );
        System.out.println("getTypesAnnotatedWith:" + ss);

        // 获取带有特定注解对应的方法 MethodAnnotationsScanner
        Set<Method> methods = reflections.getMethodsAnnotatedWith( MyMethodAnnotation.class ) ;
        System.out.println("getMethodsAnnotatedWith:" + methods);

        // 获取带有特定注解对应的字段 FieldAnnotationsScanner
        Set<Field> fields = reflections.getFieldsAnnotatedWith( Autowired.class ) ;
        System.out.println("getFieldsAnnotatedWith:" + fields);

        // 获取特定参数对应的方法 MethodParameterScanner
        Set<Method> someMethods = reflections.getMethodsMatchParams(long.class, int.class);
        System.out.println("getMethodsMatchParams:" + someMethods);

		// 扫描资源 ResourcesScanner
		Set<String> properties = reflections.getResources(Pattern.compile(".*\\.properties"));
		System.out.println("getResources:" + properties );
参考资料

reflections github
Examples for SubTypesScanner
Examples for TypeAnnotationsScanner
Examples for ResourcesScanner
Examples for MethodAnnotationsScanner

具体实现

为了简单,只实现最基本的功能,可以满足80%的要求,而没有实现@RabbitListener 全部功能。
由于 只实现了 Json 转换,不支持其他 contType 的转换,所以类名全加了 Json 用来标识。

注解:


/**
 * 将方法标记为RabbitMQ message 的监听函数,支持将json反序列化成自定义Bean的参数
 * 
 * @author mrzc
 * 
 */
@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
public @interface JsonRabbitListener
{
    /**
     * 需要监听的队列名
     * 
     * @return
     */
    String[] queues() default {};

    /**
     * {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
     * 的 BeanName,用于创建负责服务于此端点的消息侦听器容器
     * 
     * @return
     */
    String connectionFactory() default "";
}

适配器,将 使用 自定义 Bean 作为参数 的监听函数,适配成 MessageListener

/**
 * 消息侦听器适配器,通过反射,将消息处理委托给目标函数,并通过函数参数类型进行类型转换
 * 
 * @author MRZC
 * 
 */
public class JsonMessageListenerAdapter implements ChannelAwareMessageListener
{
    @Setter
    private ObjectMapper jsonObjectMapper = new ObjectMapper();

    // 目标对象
    private final Object delegate;

    // 具体的监听函数
    private final Method listener;

    // 函数的参数类型类别
    private final Class<?>[] parameterTypes;

    // 唯一的需要进行转换的参数类型
    private final JavaType javaType;

    private JsonMessageListenerAdapter(Object delegate, Method listener)
    {
        this.delegate = delegate;
        this.listener = listener;
        parameterTypes = listener.getParameterTypes();
        javaType = getJavaType(parameterTypes);
    }

    /**
     * 获取需要转换的参数类型
     * 
     * @param parameterTypes
     * @return
     */
    private JavaType getJavaType(Class<?>[] parameterTypes)
    {
        JavaType javaType = null;
        if (parameterTypes == null || parameterTypes.length > 3 || parameterTypes.length == 0)
        {
            throw new IllegalArgumentException("监听函数参数数量错误");
        }

        int validParamterNum = 0;
        for (Class<?> cls : parameterTypes)
        {
            if (cls == Message.class || cls == Channel.class)
            {
                validParamterNum++;
                continue;
            }

            if (javaType != null)
            {
                continue;
            }
            javaType = TypeFactory.defaultInstance().constructType(cls);
            // 为了实现简单,不支持容器转换,如果需要传容器,请在外面再包一层Bean
            if (javaType.isContainerType() || javaType.isArrayType())
            {
                throw new IllegalArgumentException("监听函数参数类型错误,不支持数组和容器");
            }
            validParamterNum++;
            continue;
        }
        // 为了逻辑简单,只能有一个需要转换的类型
        if (validParamterNum != parameterTypes.length)
        {
            throw new IllegalArgumentException("监听函数参数数量错误,只支持转换一个自定义对象");
        }
        return javaType;
    }

    @Override
    public void onMessage(Message message, Channel channel) throws Exception
    {
        // 调用目标函数
        Object convertedMessage = convertMessageToBean(message);
        listener.invoke(delegate, buildArguments(message, channel, convertedMessage));
    }

    /**
     * 构造目标函数参数列表,根据参数类型顺序,按位填充
     * 
     * @param message
     * @param channel
     * @param convertedMessage
     * @return
     */
    protected Object[] buildArguments(Message message, Channel channel, Object convertedMessage)
    {
        List<Object> parameters = new ArrayList<Object>(parameterTypes.length);
        for (Class<?> cls : parameterTypes)
        {
            if (cls == Message.class)
            {
                parameters.add(message);
            }
            else if (cls == Channel.class)
            {
                parameters.add(channel);
            }
            else
            {
                parameters.add(convertedMessage);
            }
        }
        return parameters.toArray();
    }

    /**
     * 将 Message转换成对应的Bean
     * 
     * @param message
     * @return
     * @throws IOException
     */
    private Object convertMessageToBean(Message message) throws IOException
    {
        MessageProperties properties = message.getMessageProperties();
        String contentType = properties.getContentType();
        if (contentType != null && contentType.contains("json"))
        {
            String encoding = properties.getContentEncoding();
            if (encoding == null)
            {
                encoding = getDefaultCharset();
            }
            String contentAsString = new String(message.getBody(), encoding);
            return this.jsonObjectMapper.readValue(contentAsString, javaType);
        }
        return message;
    }

    public String getDefaultCharset()
    {
        return "UTF-8";
    }

    public static JsonMessageListenerAdapter of(Object delegate, Method listener)
    {
        return new JsonMessageListenerAdapter(delegate, listener);
    }
}

注解扫描,和动态注册Bean

/**
 * 扫描 JsonRabbitListener,动态注册监听器的实现
 * 
 * @author mrzc
 * 
 */
@Slf4j
@Configuration
public class RabbitConfig implements BeanFactoryPostProcessor
{
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException
    {
        // 扫描注解标识的 监听函数
        Reflections reflections = new Reflections("com.xxx", new MethodAnnotationsScanner());
        Set<Method> methods = reflections.getMethodsAnnotatedWith(JsonRabbitListener.class);
        if (methods.isEmpty())
        {
            return;
        }

        try
        {
            registerListener(beanFactory, methods);
        }
        catch (InstantiationException e)
        {
            e.printStackTrace();
        }
        catch (IllegalAccessException e)
        {
            e.printStackTrace();
        }
    }

    /**
     * 注册监听容器
     * 
     * @param beanFactory
     * @param methods
     * @throws InstantiationException
     * @throws IllegalAccessException
     */
    private void registerListener(ConfigurableListableBeanFactory beanFactory, Set<Method> methods)
            throws InstantiationException, IllegalAccessException
    {
        // 缓存同一个类的实例,当同一个类 有多个监听函数时,复用实例
        Map<Class<?>, Object> cached = new HashMap<>();
        int i = 0;
        for (Method method : methods)
        {
            if (!cached.containsKey(method.getDeclaringClass()))
            {
                // 监听函数所属对象实例化
                cached.put(method.getDeclaringClass(), method.getDeclaringClass().newInstance());
            }
            // 获取 注解
            JsonRabbitListener an = method.getAnnotation(JsonRabbitListener.class);
            // 获取队列所属 的 连接工厂
            ConnectionFactory connectionFactory = beanFactory.getBean(an.connectionFactory(), ConnectionFactory.class);
            // 构建监听容器
            SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(connectionFactory);
            listenerContainer.setQueueNames(an.queues());
            listenerContainer.setMessageListener(JsonMessageListenerAdapter.of(cached.get(method.getDeclaringClass()),
                    method));
            String beanName = "simpleMessageListenerContainer#json#" + (i++);
            // 由于实例是自己创建的,需要手动调用initializeBean,实现还原容器帮我们自动干的事情,例如 Aware注入
            beanFactory.initializeBean(listenerContainer, beanName);
            // 将实例注册到容器中
            beanFactory.registerSingleton(beanName, listenerContainer);
        }
    }
}

使用案例

public class Bean1Listener
{
    @JsonRabbitListener(connectionFactory = "connectionFactory", queues = "queue.name")
    public void onBean(Bean1 bean)
    {
        System.out.println(bean);
    }

    @Setter
    @NoArgsConstructor
    public static class Bean1
    {
        String batchNo;
    }
}

总结

至此关于 SpringBoot集成RabbitMQ序列化和反序列化 就已经完毕了,思路1 中提出的两个都已经解决了。对我现在维护的老项目来说, 思路3-2 灵活性已经绰绰有余了,所以我并没有 实现 Spring-Amqp1.4 之后,跟 注解相关的全量功能。

填了一个坑,我再列一下,现在还有多少坑:

  1. Spring 通过注解扫描 Bean 的源码分析
  2. starter 机制,Maven 私服,Maven 发布,Maven 中央仓库。
  3. 我构建MessageConverter的时候,为什么 ObjectMapper 是注入的,而不是自己构建。
  4. mongodb通过索引,每个文档都可以配置多个超时方式,可以有效期,也可以过了某个时间点就失效,在某种特殊场景下,MongoDB比redis缓存失效更灵活,但是 SpringBoot 并没有提供默认的 CacheManager,研究一下看看能否实现,同时用 AOP 模拟类似的功能,加强理解。
  5. 类似于 SpringBootmain 函数启动服务器。
  6. ……我还有其他想写,但是不能同时挖太多的坑,那会让我跳不出来的,先填了再挖。

暂时休息几天,最近为了写博客,已经连续熬夜好几天的, 有点扛不住了。至少下周一肯定会重新开始写。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值