SpringBoot+RocketMq+Mybatis项目整合demo

Demo实现的功能

项目启动生产者和消费者2个服务后, 生产者端执行用户查询, 从数据库查出用户数据后, 发送给消息中间件rocketMq, 消费者监听到mq消息后获取到用户数据.

github源码地址

https://github.com/mikewuhao/soringBoot-mq-demo/

搭建详细步骤

1. 准备工作
提前把rocketMq环境搭建和启动好, rocketMq可视化工具安装启动好.
(本人mac环境部署rocketMq,参考https://www.jianshu.com/p/a759e8ea6ac1)
(windows环境部署rocketMq,参考https://www.cnblogs.com/darendu/p/12036380.html)

注意: 按照官方启动rocketMq ,可能会报运行环境内存不足,建议修改内存, 参考: https://blog.csdn.net/u014803081/article/details/90705792?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-6

2. 项目结构
在这里插入图片描述
在idea开发工具里面先建普通空的project, 命名为springBoot-mq-demo, 再新建2个maven类型的module, 分别命名 consumer(消费者服务), provider(生产者服务).
3. provider生产者服务创建
3.1 provider服务目录结构
在这里插入图片描述
3.2 provider服务的pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wuhao.demo</groupId>
    <artifactId>provider</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.7.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- spring-boot整合mybatis -->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.1.1</version>
        </dependency>

        <!-- mysql驱动 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
        </dependency>

        <!--rocketMq-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3.3 provider服务的application.properties

server.port=8082

logging.level.org.springframework=DEBUG

#数据库
spring.datasource.driver-class-name= com.mysql.jdbc.Driver
spring.datasource.url = jdbc:mysql://127.0.0.1:3306/boot?useUnicode=true&characterEncoding=utf-8
spring.datasource.username = root
spring.datasource.password = root

#mybatis
mybatis.type-aliases-package=com.wuhao.domain
mybatis.mapper-locations=classpath:mapper/*.xml

#Rocketmq producer
rocketmq.producer.groupName=ProducerGroup
rocketmq.producer.namesrvAddr=127.0.0.1:9876
rocketmq.producer.instanceName=ProducerGroup
rocketmq.producer.topic=topic2020
rocketmq.producer.tag=test
rocketmq.producer.maxMessageSize=131072
rocketmq.producer.sendMsgTimeout=10000


3.4 provider服务的启动类, ProviderApplication

package com.wuhao;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProviderApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProviderApplication.class, args);
    }

}

3.5 provider服务的controller, MqController

package com.wuhao.controller;

import com.alibaba.fastjson.JSONObject;
import com.wuhao.domain.User;
import com.wuhao.mq.RocketMQProducer;
import com.wuhao.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;
/**
 * @author wuhao
 * @Title: MqController
 * @Description: Mq测试 controller
 * @date 2020/6/9 17:38
 */
@RestController
@Slf4j
public class MqController {
    @Autowired
    @Qualifier("rocketMQProducer")
    RocketMQProducer rocketMQProducer;

    @Autowired
    private UserService userService;

    @GetMapping("/testSend")
    public void testSend() {

        DefaultMQProducer producer = rocketMQProducer.getRocketMQProducer();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        User user = userService.queryUserById(1L);
        String body = "hi RocketMQ, now is  " + sdf.format(new Date()) + "---"+ JSONObject.toJSONString(user);
        Message message = new Message("topic2020", "test", body.getBytes());
        try {
            producer.send(message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

3.6 provider服务的mq配置类, RocketMQProducer

package com.wuhao.mq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author wuhao
 * @Title: RocketMQProducer
 * @Description: 消息生产者
 * @date 2020/6/9 17:31
 */
@Configuration
@Slf4j
public class RocketMQProducer {

    @Value("${rocketmq.producer.groupName}")
    private String groupName;

    @Value("${rocketmq.producer.namesrvAddr}")
    private String nameserAddr;

    @Value("${rocketmq.producer.instanceName}")
    private String instanceName;

    @Value("${rocketmq.producer.maxMessageSize}")
    private int maxMessageSize;

    @Value("${rocketmq.producer.sendMsgTimeout}")
    private int sendMsgTimeout;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQProducer getRocketMQProducer() {
        DefaultMQProducer producer = new DefaultMQProducer(groupName);
        producer.setNamesrvAddr(nameserAddr);
        producer.setInstanceName(instanceName);
        producer.setMaxMessageSize(maxMessageSize);
        producer.setSendMsgTimeout(sendMsgTimeout);
        producer.setVipChannelEnabled(false);
        log.info("================>生产者创建完成,ProducerGroupName{}<================", groupName);
        return producer;
    }

}

3.7 provider服务的实体类, User

package com.wuhao.domain;

import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;

import javax.persistence.*;

/**
 * @Description: User实体
 * @CreateDate: 2020-02-04 19:25
 * @Author: wuhao
 */
@Entity
@NoArgsConstructor
@AllArgsConstructor
public class User {


    @Id
    @GeneratedValue
    private Long id;

    @Column(name = "username")
    private String username;

    @Column(name = "birthday")
    private String birthday;

    @Column(name = "sex")
    private String sex;

    @Column(name = "address")
    private String address;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getBirthday() {
        return birthday;
    }

    public void setBirthday(String birthday) {
        this.birthday = birthday;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", username='" + username + '\'' +
                ", birthday='" + birthday + '\'' +
                ", sex='" + sex + '\'' +
                ", address='" + address + '\'' +
                '}';
    }

}

3.8 provider服务的接口

package com.wuhao.service;

import com.wuhao.domain.User;

/**
 * @Description: User的Service
 * @CreateDate: 2020-06-09 09:35
 * @Author: wuhao
 */
public interface UserService {

    User queryUserById(Long id);

    int addUser(User user);

    int modifyUser(User user);

    int deleteUserById(Long id);

}

3.8 provider服务的接口实现类

package com.wuhao.service.impl;

import com.wuhao.domain.User;
import com.wuhao.dao.UserMapper;
import com.wuhao.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @Description:User的Service实现类
 * @CreateDate: 2020-02-05 09:36
 * @Author: wuhao
 */
@Service
public class UserServiceImpl implements UserService {

    @Autowired
    private UserMapper userMapper;

    @Override
    public User queryUserById(Long id) {
        return userMapper.queryUserById(id);
    }

    @Override
    public int addUser(User user) {
        return userMapper.addUser(user);

    }

    @Override
    public int modifyUser(User user) {
        return userMapper.modifyUser(user);
    }

    @Override
    public int deleteUserById(Long id) {
        return userMapper.deleteUserById(id);
    }

 }

3.9 provider服务的Dao层, UserMapper

package com.wuhao.dao;

import com.wuhao.domain.User;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.stereotype.Repository;

/**
 * @Description: user的mapper
 * @CreateDate: 2020-06-04 19:38
 * @Author: wuhao
 */
@Mapper
@Repository
public interface UserMapper {

    User queryUserById(Long id);

    int addUser(User user);

    int modifyUser(User user);

    int deleteUserById(Long id);

}

3.10 provider服务sql的xml文件, userMapper.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">

    <mapper namespace="com.wuhao.dao.UserMapper">

    <!--按id查询用户-->
    <select id="queryUserById" resultType="com.wuhao.domain.User">
        select * from `user` where id = #{id}
    </select>

    <!--用户更新-->
    <update id="modifyUser" parameterType="com.wuhao.domain.User" >
        update `user` set username=#{username},birthday=#{birthday},sex=#{sex}, address=#{address} where id=#{id}
    </update>

    <!--删除用户-->
    <delete id="deleteUserById" parameterType="long">
        delete from `user` where id=#{id}
    </delete>

    <!--用户添加-->
    <insert id="addUser" parameterType="com.wuhao.domain.User">
        insert into `user` (username,birthday,sex,address)
             values(#{username},#{birthday},#{sex},#{address})
    </insert>


</mapper>

4. consumer消费者服务创建
4.1 consumer服务目录结构
在这里插入图片描述
4.2 consumer服务的pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wuhao.demo</groupId>
    <artifactId>consumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.7.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--rocketMq-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

4.3 consumer服务的application.properties

server.port=8083

logging.level.org.springframework=DEBUG

#Rocketmq consumer
rocketmq.consumer.namesrvAddr=127.0.0.1:9876
rocketmq.consumer.groupName=ConsumerGroup
rocketmq.consumer.topic=topic2020
rocketmq.consumer.tag=test
rocketmq.consumer.consumeThreadMin=20
rocketmq.consumer.consumeThreadMax=64

4.4 consumer服务的启动类

package com.wuhao;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

}

4.5 consumer服务的实体类, User

package com.wuhao.domain;

import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;

/**
 * @Description: User实体
 * @CreateDate: 2020-02-04 19:25
 * @Author: wuhao
 */
@Entity
@NoArgsConstructor
@AllArgsConstructor
public class User {


    @Id
    @GeneratedValue
    private Long id;

    @Column(name = "username")
    private String username;

    @Column(name = "birthday")
    private String birthday;

    @Column(name = "sex")
    private String sex;

    @Column(name = "address")
    private String address;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getBirthday() {
        return birthday;
    }

    public void setBirthday(String birthday) {
        this.birthday = birthday;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", username='" + username + '\'' +
                ", birthday='" + birthday + '\'' +
                ", sex='" + sex + '\'' +
                ", address='" + address + '\'' +
                '}';
    }

}

4.6 consumer服务的监听器, MessageListen

package com.wuhao.mq;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @author wuhao
 * @Title: MessageListen
 * @Description: 消息监听类
 * @date 2020/4/17 17:28
 */
@Component
public class MessageListen implements MessageListenerConcurrently {

    @Autowired
    private MessageProcessor messageProcessor;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        MessageExt ext = list.get(0);
        boolean result = messageProcessor.handle(ext);
        if (!result) {
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

4.7 consumer服务的mq消息处理接口

package com.wuhao.mq;

import org.apache.rocketmq.common.message.MessageExt;

/**
 * @author wuhao
 * @Title: MessageProcessor
     * @Description: mq消息处理接口
 * @date 2020/4/17 17:24
 */
public interface MessageProcessor {

    boolean handle(MessageExt messageExt);
}

4.8 consumer服务的消息处理类

package com.wuhao.mq;

import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Service;

/**
 * @author wuhao
 * @Title: MessageProcessorImpl
 * @Description: 消息处理类
 * @date 2020/4/17 17:27
 */
@Service
public class MessageProcessorImpl implements MessageProcessor {
    @Override
    public boolean handle(MessageExt messageExt) {
        // 收到的body(消息体),字节类型,需转为String
        String result = new String(messageExt.getBody());
        System.out.println("监听到了消息,消息为:"+ result);
        return true;
    }
}

4.9 consumer服务的mq消费者配置类

package com.wuhao.mq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author wuhao
 * @Title: RocketMQConsumer
 * @Description: Mq消费者
 * @date 2020/4/17 17:36
 */
@Configuration
@Slf4j
public class RocketMQConsumer {

    @Autowired
    private MessageListen messageListen;

    @Value("${rocketmq.consumer.namesrvAddr}")
    private String namesrvAddr;

    @Value("${rocketmq.consumer.groupName}")
    private String groupName;

    @Value("${rocketmq.consumer.topic}")
    private String topic;

    @Value("${rocketmq.consumer.tag}")
    private String tag;

    @Value("${rocketmq.consumer.consumeThreadMin}")
    private int consumeThreadMin;

    @Value("${rocketmq.consumer.consumeThreadMax}")
    private int consumeThreadMax;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQPushConsumer getRocketMQConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.setVipChannelEnabled(false);
        // 我们自己实现的监听类
        consumer.registerMessageListener(messageListen);
        try {
            consumer.subscribe(topic, tag);
            log.info("================>消费者创建完成,ConsumerGroupName{}<================", groupName);
            log.info("============>消费者监听开始,groupName:{},topic:{}<============", groupName, topic);
        } catch (MQClientException e) {
            log.error("消费者启动失败");
            e.printStackTrace();
        }
        return consumer;
    }

}

演示效果

编写完成后, 按顺序启动provider服务, consumer服务
1 浏览器上执行http://localhost:8082/testSend, 开始发送消息
2 查看rocketMq可视化工具, mq里已经有消息了.
在这里插入图片描述3查看consumer服务的控制台日志, 已经有消息输出了
在这里插入图片描述

遇到过的问题

安装启动rocketMq时 ,windows环境下一直报运行环境内存不足,修改内存也不起作用, 不知道是够是个人电脑问题, 后来换成mac环境, 成功安装和启动了rocketMq.

参考博客

https://www.jianshu.com/p/38a3596beea6

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值