第1章:走进分布式中间件

分布式概念:多个程序运行在不同的机器上,共同完成一个功能,而用户感知不到集群的存在。

发展历程:

1、单点集中式Web应用–数据库、War包以及文件等都在同一个机器上

2、应用与文件服务及数据库单独拆分

3、引入缓存与集群–Redis、Nginx+Lvs,多台应用服务器构成负载均衡

4、数据库读写分离,并提供反向代理及CDN加速访问服务–抢票高峰,静态资源放到CDN中,加入反向代理的配置,减少访问网站时直接去服务器读取静态数据

5、分布式文件系统与分布式数据库–减少DB的压力,进行分库分表,根据业务来拆分数据库

第2章:搭建微服务项目

Spring Boot项目搭建规范:

在这里插入图片描述

依赖层级关系:server依赖model,model依赖api

Spring Boot项目搭建流程:

  1. File-New Project

  2. Maven-选择SDK-Next

  3. GroupId为包名,ArtifactId为项目名称

  4. 选择路径,不要出现中文或特殊符号路径

  5. pom.xml文件:父模块的依赖配置文件,指定项目资源的编码以及JDK版本

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.debug.middleware</groupId>
        <artifactId>middleware</artifactId>
        <packaging>pom</packaging>
        <version>1.0.1</version>
        <modules>
            <module>api</module>
            <module>model</module>
            <module>server</module>
        </modules>
    
        <properties>
            <!--定义项目整体资源的编码为UTF-8,JDK的版本为1.8-->
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <java.version>1.8</java.version>
            <maven.compiler.source>${java.version}</maven.compiler.source>
            <maven.compiler.target>${java.version}</maven.compiler.target>
        </properties>
    
    </project>
  6. 创建子模块api以及相关依赖配置(整个项目都共用的依赖配置)

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>middleware</artifactId>
            <groupId>com.debug.middleware</groupId>
            <version>1.0.1</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>api</artifactId>
    
        <properties>
            <lombok.version>1.16.10</lombok.version>
            <jackson-annotations-version>2.6.5</jackson-annotations-version>
        </properties>
    
        <dependencies>
            <!--Lombok-->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>${lombok.version}</version>
            </dependency>
    
            <!--jackson-->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-annotations</artifactId>
                <version>${jackson-annotations-version}</version>
                <scope>compile</scope>
            </dependency>
        </dependencies>
    
    </project>
  7. 创建子模块model,添加api子模块以及Spring-MyBatis依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>middleware</artifactId>
            <groupId>com.debug.middleware</groupId>
            <version>1.0.1</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>model</artifactId>
    
        <properties>
            <mybatis-spring-boot.version>1.1.1</mybatis-spring-boot.version>
            <mybatis-pagehelper.version>4.1.2</mybatis-pagehelper.version>
        </properties>
    
        <dependencies>
            <!--api-->
            <dependency>
                <groupId>com.debug.middleware</groupId>
                <artifactId>api</artifactId>
                <version>${project.parent.version}</version>
            </dependency>
    
            <!--spring-mybatis-->
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
                <version>${mybatis-spring-boot.version}</version>
            </dependency>
        </dependencies>
    
    </project>
  8. 创建子模块server,添加Spring Boot依赖,日志log4j及MySQL,Druid等依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>middleware</artifactId>
            <groupId>com.debug.middleware</groupId>
            <version>1.0.1</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>server</artifactId>
    
        <packaging>jar</packaging>
        <properties>
            <start-class>com.debug.middleware.server.MainApplication</start-class>
    
            <spring-boot.version>1.5.6.RELEASE</spring-boot.version>
            <spring-session.version>1.2.0.RELEASE</spring-session.version>
            <mysql.version>5.1.37</mysql.version>
            <druid.version>1.0.16</druid.version>
            <guava.version>19.0</guava.version>
    
            <skipTests>true</skipTests>
        </properties>
    
        <!-- 依赖管理 -->
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-dependencies</artifactId>
                    <version>${spring-boot.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
    
        <dependencies>
            <!--日志-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-log4j</artifactId>
                <version>1.3.3.RELEASE</version>
            </dependency>
    
            <!--model-->
            <dependency>
                <groupId>com.debug.middleware</groupId>
                <artifactId>model</artifactId>
                <version>${project.parent.version}</version>
            </dependency>
    
            <!--guava-->
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>${guava.version}</version>
            </dependency>
    
            <!--mysql-->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql.version}</version>
            </dependency>
    
            <!--druid-->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>${druid.version}</version>
            </dependency>
    
            <!--spring-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
                <version>${spring-boot.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>ch.qos.logback</groupId>
                        <artifactId>logback-classic</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>log4j-over-slf4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <!-- redis -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-redis</artifactId>
                <version>1.3.3.RELEASE</version>
            </dependency>
    
            <!--rabbitmq-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
                <version>1.3.3.RELEASE</version>
            </dependency>
    
            <!--zookeeper-->
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.6</version>
                <exclusions>
                    <exclusion>
                        <artifactId>slf4j-log4j12</artifactId>
                        <groupId>org.slf4j</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>2.10.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>2.10.0</version>
            </dependency>
    
    
            <!--redisson-->
            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>3.10.6</version>
            </dependency>
    
            <!--由于redisson底层是采用基于nio的netty框架进行通信,故而需要加入依赖-->
            <!--<dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.32.Final</version>
            </dependency>-->
    
            <!--for test-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <!--<scope>test</scope>-->
            </dependency>
        </dependencies>
    
        <build>
            <finalName>book_middleware_${project.parent.version}</finalName>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring-boot.version}</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-war-plugin</artifactId>
                    <version>2.4</version>
                    <configuration>
                        <failOnMissingWebXml>false</failOnMissingWebXml>
                    </configuration>
                </plugin>
            </plugins>
    
            <resources>
                <resource>
                    <directory>src/main/resources</directory>
                    <filtering>true</filtering>
                </resource>
            </resources>
        </build>
    </project>
  9. 对MainApplication进行修改

    @SpringBootApplication
    @MapperScan(basePackages = "com.debug.middleware.model")
    public class MainApplication extends SpringBootServletInitializer {
    
        @Override
        protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
            return super.configure(builder);
        }
    
        public static void main(String[] args) {
            SpringApplication.run(MainApplication.class,args);
        }
    }
  10. application.properties配置文件

    #profile
    #spring.profiles.active=productions
    #spring.profiles.active=local
    #指定应用访问的上下文及端口
    server.context-path=/middleware
    server.port=8087
    #logging日志配置
    logging.path=/srv/dubbo/middleware/logs
    logging.file=middleware
    logging.level.org.springframework=info
    logging.level.com.fasterxml.jackson=info
    logging.level.debug.middleware=debug
    #json日期格式化
    spring.jackson.date-format=yyyy-mm-dd HH:mm:ss
    spring.jackson.time-zone=GMT+8;
    spring.datasource.initialize=false
    spring.jmx.enabled=false
    #数据库访问配置
    spring.datasource.url=jdbc:mysql://localhost:3306/db_middleware?useUnicode=true&amp;characterEncoding=utf-8
    spring.datasource.username=root
    spring.datasource.password=root
    #MyBatis配置
    mybatis.config-location=classpath:mybatis-config.xml
    mybatis.check-config-location=true
    mybatis.mapper-locations=classpath:mappers/*.xml
    #Redis连接配置
    spring.redis.host=127.0.0.1
    spring.redis.port=6379
  11. mybatis配置文件mybatis-config.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
            "http://mybatis.org/dtd/mybatis-3-config.dtd">
    <configuration>
        <settings>
            <!--允许使用缓存配置-->
            <setting name="cacheEnabled" value="true"/>
            <!--SQL执行语句的默认响应超时时间-->
            <setting name="defaultStatementTimeout" value="3000"/>
            <!--允许驼峰命令的配置-->
            <setting name="mapUnderscoreToCamelCase" value="true"/>
            <!--允许执行完SQL插入语句后返回主键配置-->
            <setting name="useGeneratedKeys" value="true"/>
            <!--设置控制台打印SQL-->
            <!--<setting name="logImpl" value="stdout_logging"/>-->
        </settings>
    </configuration>
  12. 日志配置文件log4j.properties

    #Console Log
    log4j.rootLogger=INFO,console,debug.info,warn,error
    LOG_PATTERN=[%d{yyyy-MM-dd HH:mm:ss.sss}] boot%X{context} - %5p [%t] ---%c{1}: %m%n
    #打印日志到Console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.Threshold=DEBUG
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=${LOG_PATTERN}
    
    log4j.appender.info=org.apache.log4jDailyRollingFileAppender
    log4j.appender.info.Threshold=INFO
    log4j.appender.info.File=${LOG_PATH}/${LOG_FILE}_info.log
    log4j.appender.info.DatePattern='.'yyyy-MM-dd
    log4j.appender.info.layout=org.apache.lgo4j.PatternLayout
    log4j.appender.info.layout.ConversionPattern=${LOG_PATTERN}
    
    log4j.appender.error=org.apache.log4j.DailyRollingFileAppender
    log4j.appender.error.Threshold=ERROR
    log4j.appender.error.File=${LOG_PATH}/${LOG_FILE}error.log
    log4j.appender.error.DatePattern='.'yyyy-MM-dd
    log4j.appender.error.layout=org.apache.log4j.PatternLayout
    log4j.appender.error.layout.ConversionPattern=${LOG_PATTERN}
    
    log4j.appender.debug=org.apache.log4j.DailyRollingFileAppender
    log4j.appender.debug.Threshold=DEBUG
    log4j.appender.debug.File=${LOG_PATH}/${LOG_FILE}debug.log
    log4j.appender.debug.DatePattern='.'yyyy-MM-dd
    log4j.appender.debug.layout=org.apache.log4j.PatternLayout
    log4j.appender.debug.layout.ConversionPattern=${LOG_PATTERN}
    
    log4j.appender.warn=org.apache.log4j.DailyRollingFileAppender
    log4j.appender.warn.Threshold=WARN
    log4j.appender.warn.File=${LOG_PATH}/${LOG_FILE}warn.log
    log4j.appender.warn.DatePattern='.'yyyy-MM-dd
    log4j.appender.warn.layout=org.apache.log4j.PatternLayout
    log4j.appender.warn.layout.ConversionPattern=${LOG_PATTERN}
  1. 最终结构

在这里插入图片描述

Hello World小试牛刀

在server模块创建controller包和entity包

controller包下新建一个BookController类,用于处理请求,entity包下新建Book类,封装书的字段信息

@Data
public class Book {
    private Integer bookNo;
    private String name;
}
@RestController
@RequestMapping("/book")
public class BookController {
    private static final Logger log = LoggerFactory.getLogger(BookController.class);
    /***获取书籍对象信息*/
    @RequestMapping(value="info",method = RequestMethod.GET)
    public Book info(Integer bookNo,String bookName){
        Book book = new Book();
        book.setBookNo(bookNo);
        book.setName(bookName);
        return book;
    }
}

运行程序

在这里插入图片描述

第3章:缓存中间件Redis

Windows下启动Redis:双击redis-server.exe

Windows下运行Redis命令行窗口:双击redis-cli.exe

常用命令:

  • 查看Redis缓存中存储的所有Key:key *
  • 缓存中创建一个名为redis:order:no:10011,值为10011的key:set redis:order:no:10011 10011
  • 查看缓存中指定key的值:get redis:order:no:10011
  • 删除缓存中指定的key:del redis:order:no:10011

Spring Boot整合Redis

1、加入Redis的依赖jar

<!-- redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-redis</artifactId>
    <version>1.3.3.RELEASE</version>
</dependency>

2、在配置文件application.properties中加入Redis的连接配置

#Redis连接配置
spring.redis.host=127.0.0.1
spring.redis.port=6379

Redis自定义注入Bean组件配置

//com.coding.fight.server.config.CommonConfig
@Configuration
public class CommonConfig {
    /**redis链接工厂*/
    @Autowired
    private RedisConnectionFactory redisConnectionFactory;
    /**缓存操作组件RedisTemplate的自定义配置*/
    @Bean
    public RedisTemplate<String,Object>redisTemplate(){
        //定义RedisTemplate实例
        RedisTemplate<String,Object>redisTemplate = new RedisTemplate<>();
        //设置Redis的链接工厂
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        //TODO:指定大key序列化策略为String序列化,value为JDK自带的序列化策略
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
        //TODO:指定hashKey序列化策略为String序列化-针对hash散列存储
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        return redisTemplate;
    }
    /**缓存操作组件StringRedisTemplate*/
    @Bean
    public StringRedisTemplate stringRedisTemplate(){
        //采用默认配置即可,后续有自定义配置时则在此处添加即可
        StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
        stringRedisTemplate.setConnectionFactory(redisConnectionFactory);
        return stringRedisTemplate;
    }
}

RedisTemplate实战

目标一:采用RedisTemplate将字符串写入缓存中,并读取出来展示到控制台上

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RedisTest {
    //定义日志
    private static final Logger log = LoggerFactory.getLogger(RedisTest.class);
    //定义RedisTemplate操作组件
    @Autowired
    private RedisTemplate redisTemplate;
    //定义JSON序列化和反序列化框架类
    @Autowired
    private ObjectMapper objectMapper;

    @Test
    public void one() {
        log.info("---开始RedisTemplate操作组件实战---");
        //定义字符串内容及存入缓存的key
        final String content = "RedisTemplate实战字符串信息";
        final String key = "redis:template:one:string";
        //Redis通用的操作组件
        ValueOperations valueOperations = redisTemplate.opsForValue();
        //将字符串信息写入缓存中
        log.info("写入缓存中的内容:{}", content);
        valueOperations.set(key, content);
        //从缓存中读取内容
        Object result = valueOperations.get(key);
        log.info("读取出来的内容:{}", result);
    }
}

效果:

[2020-12-30 23:38:29.029] boot -  INFO [main] ---RedisTest: ---开始RedisTemplate操作组件实战---
[2020-12-30 23:38:29.029] boot -  INFO [main] ---RedisTest: 写入缓存中的内容:RedisTemplate实战字符串信息
[2020-12-30 23:38:29.029] boot -  INFO [main] ---RedisTest: 读取出来的内容:RedisTemplate实战字符串信息

目标二:采用RedisTemplate将对象信息序列化为JSON格式的字符串后写入缓存中,然后将其读取出来,最后反序列化解析其中的内容并展示在控制台

@Test
    public void two() throws IOException {
        log.info("-----开始RedisTemplate操作组件实战----");
        //构造对象信息
        User user = new User(1, "debug", "阿修罗");
        //Redis通用的操作组件
        ValueOperations valueOperations = redisTemplate.opsForValue();
        //将序列化后的信息写入缓存中
        final String key = "redis:template:two:object";
        final String content = objectMapper.writeValueAsString(user);
        valueOperations.set(key, content);
        log.info("写入缓存对象的信息:{}", user);
        //从缓存中读取内容
        Object result = valueOperations.get(key);
        if (result != null) {
            User resultUser = objectMapper.readValue(result.toString(),User.class);
            log.info("读取缓存内容并反序列化后的结果:{}",resultUser);
        }
    }

效果:

[2021-01-03 15:05:14.014] boot -  INFO [main] ---RedisTest: -----开始RedisTemplate操作组件实战----
[2021-01-03 15:05:15.015] boot -  INFO [main] ---RedisTest: 写入缓存对象的信息:User(id=1, userName=debug, name=阿修罗)
[2021-01-03 15:05:15.015] boot -  INFO [main] ---RedisTest: 读取缓存内容并反序列化后的结果:User(id=1, userName=debug, name=阿修罗)

Redis常见数据结构实战

字符串

业务场景:将用户个人信息存储至缓存中,实现每次前端请求获取用户个人详情时直接从缓存中读取

Person类:

@Data
@ToString
public class Person implements Serializable {
    private Integer id;
    private Integer age;
    private String name;
    private String userName;
    private String location;

    public Person() {
    }

    public Person(Integer id, Integer age, String name, String userName, String location) {
        this.id = id;
        this.age = age;
        this.name = name;
        this.userName = userName;
        this.location = location;
    }
}

测试类

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RedisTest2 {
    //定义日志
    private static final Logger log = LoggerFactory.getLogger(RedisTest2.class);
    //定义RedisTemplate操作组件
    @Autowired
    private RedisTemplate redisTemplate;
    //JSON序列化与反序列化框架类
    @Autowired
    private ObjectMapper objectMapper;

    //单元测试方法
    @Test
    public void one() throws IOException {
        //构造用户个人实体对象
        Person p = new Person(10013, 23, "修罗", "debug", "火星");
        //定义key与即将存入缓存中的value
        final String key = "redis:test:1";
        String value = objectMapper.writeValueAsString(p);
        //写入缓存中
        log.info("存入缓存中的用户实体对象信息为:{}", p);
        redisTemplate.opsForValue().set(key, value);
        //从缓存中获取用户实体信息
        Object res = redisTemplate.opsForValue().get(key);
        if (res != null) {
            Person resP = objectMapper.readValue(res.toString(), Person.class);
            log.info("从缓存中读取信息:{}", resP);
        }
    }
}

效果

[2021-01-03 17:59:55.055] boot -  INFO [main] ---RedisTest2: 存入缓存中的用户实体对象信息为:Person(id=10013, age=23, name=修罗, userName=debug, location=火星)
[2021-01-03 17:59:55.055] boot -  INFO [main] ---RedisTest2: 从缓存中读取信息:Person(id=10013, age=23, name=修罗, userName=debug, location=火星)

列表

案例:将一组已经排好序的用户对象列表存储在缓存中,按照排名的先后顺序获取出来并输出打印到控制台

@Test
public void two() {
    //构造已经排好序的用户对象列表
    List<Person> list = new ArrayList<>();
    list.add(new Person(1, 21, "修罗", "debug", "火星"));
    list.add(new Person(2, 22, "大圣", "jack", "水帘洞"));
    list.add(new Person(3, 23, "盘古", "Lee", "上古"));
    log.info("构造已经排好序的用户对象列表:{}", list);
    //将列表数据存储至Redis的List中
    final String key = "redis:test:2";
    ListOperations listOperations = redisTemplate.opsForList();
    for (Person p : list) {
        //往列表中添加数据-从队尾中添加
        listOperations.leftPush(key, p);
    }
    //获取Redis中List的数据-从队头中遍历获取,直到没有元素为止
    log.info("--获取Redis中List的数据-从队头中获取---");
    Object res = listOperations.rightPop(key);
    Person resP;
    while (res != null) {
        resP = (Person) res;
        log.info("当前数据:{}", resP);
        res = listOperations.rightPop(key);
    }
}

效果

[2021-01-03 18:23:53.053] boot -  INFO [main] ---RedisTest2: 构造已经排好序的用户对象列表:[Person(id=1, age=21, name=修罗, userName=debug, location=火星), Person(id=2, age=22, name=大圣, userName=jack, location=水帘洞), Person(id=3, age=23, name=盘古, userName=Lee, location=上古)]
[2021-01-03 18:23:53.053] boot -  INFO [main] ---RedisTest2: --获取Redis中List的数据-从队头中获取---
[2021-01-03 18:23:53.053] boot -  INFO [main] ---RedisTest2: 当前数据:Person(id=1, age=21, name=修罗, userName=debug, location=火星)
[2021-01-03 18:23:53.053] boot -  INFO [main] ---RedisTest2: 当前数据:Person(id=2, age=22, name=大圣, userName=jack, location=水帘洞)
[2021-01-03 18:23:53.053] boot -  INFO [main] ---RedisTest2: 当前数据:Person(id=3, age=23, name=盘古, userName=Lee, location=上古)

集合

用于存储具有相同类型或特征的不重复的数据

需求:给定一组用户姓名列表,要求剔除具有相同姓名的人员并组成新的集合,存放至缓存中并用于前端访问

核心代码:

@Test
    public void three() {
        //构造一组用户姓名列表
        List<String> userList = new ArrayList<>();
        userList.add("debug");
        userList.add("jack");
        userList.add("修罗");
        userList.add("大圣");
        userList.add("debug");
        userList.add("jack");
        userList.add("steadyheart");
        userList.add("修罗");
        userList.add("大圣");
        log.info("待处理的用户姓名列表:{}", userList);
        //遍历访问,剔除相同姓名的用户并塞入集合中,最终存入缓存中
        final String key = "redis:test:3";
        SetOperations setOperations = redisTemplate.opsForSet();
        for (String s : userList) {
            setOperations.add(key,s);
        }
        //从缓存中获取用户对象集合
        Object res = setOperations.pop(key);
        while (res != null) {
            log.info("从缓存中获取的用户集合-当前用户:{}", res);
            res = setOperations.pop(key);
        }
    }

效果:

[2021-01-04 13:02:05.005] boot -  INFO [main] ---RedisTest2: 待处理的用户姓名列表:[debug, jack, 修罗, 大圣, debug, jack, steadyheart, 修罗, 大圣]
[2021-01-04 13:02:05.005] boot -  INFO [main] ---RedisTest2: 从缓存中获取的用户集合-当前用户:大圣
[2021-01-04 13:02:05.005] boot -  INFO [main] ---RedisTest2: 从缓存中获取的用户集合-当前用户:steadyheart
[2021-01-04 13:02:05.005] boot -  INFO [main] ---RedisTest2: 从缓存中获取的用户集合-当前用户:debug
[2021-01-04 13:02:05.005] boot -  INFO [main] ---RedisTest2: 从缓存中获取的用户集合-当前用户:jack
[2021-01-04 13:02:05.005] boot -  INFO [main] ---RedisTest2: 从缓存中获取的用户集合-当前用户:修罗

有序集合

与sort 的不同之处在于可以通过底层的Score值对数据进行排序

需求:找出一个星期内手机话费单次金额前6名的用户列表,并要求按照金额从大到小进行排序

代码:

@Data
@ToString
public class PhoneUser implements Serializable {
    private String phone;
    private Double fare;

    public PhoneUser() {
    }

    public PhoneUser(String phone, Double fare) {
        this.phone = phone;
        this.fare = fare;
    }

    /**
     * 手机号相同,代表充值记录重复,所以需要重写equals和hashCode方法
     */
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        PhoneUser phoneUser = (PhoneUser) o;
        return phone != null ? phone.equals(phoneUser.phone) : phoneUser.phone == null;
    }

    @Override
    public int hashCode() {
        return phone != null ? phone.hashCode() : 0;
    }
}
@Test
    public void four() {
        //构造一组无序的用户手机充值对象列表
        List<PhoneUser> list = new ArrayList<>();
        list.add(new PhoneUser("103", 130.0));
        list.add(new PhoneUser("101", 120.0));
        list.add(new PhoneUser("105", 110.0));
        list.add(new PhoneUser("104", 100.0));
        list.add(new PhoneUser("106", 150.0));
        list.add(new PhoneUser("102", 160.0));
        log.info("构造一组无序的用户手机充值对象列表:{}", list);
        //遍历访问充值对象雷暴,将信息塞入Redis的有序集合中
        final String key = "redis:test:4";
        //因为zSet在add元素进入缓存后,下次就不能进行更新了,因而为了测试方便,进入操作之前先清空该缓存
        redisTemplate.delete(key);
        //获取有序集合SortedSet操作组件zSetOperations
        ZSetOperations zSetOperations = redisTemplate.opsForZSet();
        for (PhoneUser user : list) {
            zSetOperations.add(key, user, user.getFare());
        }
        //前端获取访问充值排名靠前的用户列表
        Long size = zSetOperations.size(key);
        //从小到大排序
        //Set<PhoneUser> resSet = zSetOperations.range(key,0L,size);
        //从大到小排序
        Set<PhoneUser> resSet = zSetOperations.reverseRange(key, 0L, size);
        //遍历获取有序集合中的元素
        for (PhoneUser u : resSet) {
            log.info("从缓存中读取手机充值记录排序列表,当前记录:{}", u);
        }
    }

效果:

[2021-01-04 13:24:38.038] boot -  INFO [main] ---RedisTest2: 构造一组无序的用户手机充值对象列表:[PhoneUser(phone=103, fare=130.0), PhoneUser(phone=101, fare=120.0), PhoneUser(phone=105, fare=110.0), PhoneUser(phone=104, fare=100.0), PhoneUser(phone=106, fare=150.0), PhoneUser(phone=102, fare=160.0)]
[2021-01-04 13:24:38.038] boot -  INFO [main] ---RedisTest2: 从缓存中读取手机充值记录排序列表,当前记录:PhoneUser(phone=102, fare=160.0)
[2021-01-04 13:24:38.038] boot -  INFO [main] ---RedisTest2: 从缓存中读取手机充值记录排序列表,当前记录:PhoneUser(phone=106, fare=150.0)
[2021-01-04 13:24:38.038] boot -  INFO [main] ---RedisTest2: 从缓存中读取手机充值记录排序列表,当前记录:PhoneUser(phone=103, fare=130.0)
[2021-01-04 13:24:38.038] boot -  INFO [main] ---RedisTest2: 从缓存中读取手机充值记录排序列表,当前记录:PhoneUser(phone=101, fare=120.0)
[2021-01-04 13:24:38.038] boot -  INFO [main] ---RedisTest2: 从缓存中读取手机充值记录排序列表,当前记录:PhoneUser(phone=105, fare=110.0)
[2021-01-04 13:24:38.038] boot -  INFO [main] ---RedisTest2: 从缓存中读取手机充值记录排序列表,当前记录:PhoneUser(phone=104, fare=100.0)

哈希Hash存储

Redis的哈希存储底层数据结构是由Key-value组成的映射表,value由Filed-Value对构成

需求:对学生对象列表和水果对象列表进行存储

核心代码:

@Data
@ToString
public class Student implements Serializable {
    private String id;
    private String userName;
    private String name;

    public Student() {
    }

    public Student(String id, String userName, String name) {
        this.id = id;
        this.userName = userName;
        this.name = name;
    }
}

@Data
@ToString
public class Fruit implements Serializable {
    private String name;
    private String color;

    public Fruit() {
    }

    public Fruit(String name, String color) {
        this.name = name;
        this.color = color;
    }
}
@Test
public void five() {
    //构造学生对象和水果对象列表
    List<Student> students = new ArrayList<>();
    List<Fruit> fruits = new ArrayList<>();
    //往学生集合中添加学生对象
    students.add(new Student("10010", "debug", "大圣"));
    students.add(new Student("10011", "jack", "修罗"));
    students.add(new Student("10012", "sam", "上古"));
    //往水果集合中添加水果对象
    fruits.add(new Fruit("apple", "红色"));
    fruits.add(new Fruit("orange", "橙色"));
    fruits.add(new Fruit("banana", "黄色"));
    //分别遍历不同的对象列表,并采用Hash存储至缓存中
    final String sKey = "redis:test:5";
    final String fKey = "redis:test:6";
    //获取Hash存储操作组件HashOperations,遍历获取集合中的对象并添加进缓存中
    HashOperations hashOperations = redisTemplate.opsForHash();
    for (Student student : students) {
        hashOperations.put(sKey, student.getId(), student);
    }
    for (Fruit fruit : fruits) {
        hashOperations.put(fKey, fruit.getName(), fruit);
    }
    //获取学生对象列表与水果对象列表
    Map<String, Student> sMap = hashOperations.entries(sKey);
    log.info("获取学生对象列表:{}", sMap);
    Map<String, Fruit> fMap = hashOperations.entries(fKey);
    log.info("获取水果对象列表:{}", fMap);
    //获取指定的学生对象
    String SField = "10012";
    Student s = (Student) hashOperations.get(sKey, SField);
    log.info("获取指定的学生对象:{} -> {}", SField, s);

    //获取指定的水果对象
    String fField = "orange";
    Fruit f = (Fruit) hashOperations.get(fKey, fField);
    log.info("获取指定的水果对象:{} -> {}", fField, f);
}

效果

[2021-01-04 13:50:14.014] boot -  INFO [main] ---RedisTest2: 获取学生对象列表:{10010=Student(id=10010, userName=debug, name=大圣), 10011=Student(id=10011, userName=jack, name=修罗), 10012=Student(id=10012, userName=sam, name=上古)}
[2021-01-04 13:50:14.014] boot -  INFO [main] ---RedisTest2: 获取水果对象列表:{apple=Fruit(name=apple, color=红色), banana=Fruit(name=banana, color=黄色), orange=Fruit(name=orange, color=橙色)}
[2021-01-04 13:50:14.014] boot -  INFO [main] ---RedisTest2: 获取指定的学生对象:10012 -> Student(id=10012, userName=sam, name=上古)
[2021-01-04 13:50:14.014] boot -  INFO [main] ---RedisTest2: 获取指定的水果对象:orange -> Fruit(name=orange, color=橙色)

Key失效与判断是否存在

在Redis缓存体系中,Delete与Expire操作都可以用于清理缓存中的Key,Delete需要手动触发,Expire只需提供一个TTL(过期时间),就能实现Key的自动失效。

下面代码演示如何使缓存中的Key失效:

方法一:调用SETEX方法中指定Key的过期时间

@Test
 public void six() throws InterruptedException {
     //构造key与Redis操作组件ValueOperations
     final String key1 = "redis:test:6";
     ValueOperations valueOperations = redisTemplate.opsForValue();
     //第一种方法:在往缓存中set数据时,提供一个TTL,表示ttl时间一到,缓存中的key将自动失效,即被清理
     valueOperations.set(key1, "expire操作", 10L, TimeUnit.SECONDS);
     //等待5秒-判断key是否还存在
     Thread.sleep(5000);
     Boolean expireKey1 = redisTemplate.hasKey(key1);
     Object value = valueOperations.get(key1);
     log.info("等待5秒-判断key是否还存在:{}对应的值:{}", expireKey1, value);
     //再等待5秒,判断key是否还存在
     Thread.sleep(5000);
     expireKey1 = redisTemplate.hasKey(key1);
     value = valueOperations.get(key1);
     log.info("再等待5秒-再判断key是否还存在:{}对应的值:{}", expireKey1, value);
 }

效果

[2021-01-04 14:02:16.016] boot -  INFO [main] ---RedisTest2: 等待5秒-判断key是否还存在:true对应的值:expire操作
[2021-01-04 14:02:21.021] boot -  INFO [main] ---RedisTest2: 再等待5秒-再判断key是否还存在:false对应的值:null

方法二:采用RedisTemplate操作组件的Expire()方法指定失效的Key

@Test
  public void seven() throws InterruptedException {
      //构造key和redis操作组件
      final String key2 = "redis:test:7";
      ValueOperations valueOperations = redisTemplate.opsForValue();
      //第二种方法:在往缓存中set数据后,采用redisTemplate的expire方法使该key失效
      valueOperations.set(key2,"expire操作-2");
      redisTemplate.expire(key2,10L,TimeUnit.SECONDS);
      //等待5秒-判断key是否还存在
      Thread.sleep(5000);
      Boolean expireKey = redisTemplate.hasKey(key2);
      Object value = valueOperations.get(key2);
      log.info("等待5秒-判断key是否还存在:{}对应的值:{}", expireKey, value);
      //再等待5秒,判断key是否还存在
      Thread.sleep(5000);
      expireKey = redisTemplate.hasKey(key2);
      value = valueOperations.get(key2);
      log.info("再等待5秒-再判断key是否还存在:{}对应的值:{}", expireKey, value);
  }

通过RedisTemplate.hasKey()方法来判断缓存中的Key是否存在。

Redis实战场景之缓存穿透

缓存穿透:前端频繁请求数据库中不存在的数据

解决方案:当查询数据库时如果没有查询到数据,则将Null返回给前端用户,同时将该Null 数据塞入缓存中,并对对应的Key设置一定的过期时间。

实战:以”商城用户访问某个热销的商品“为案例

  1. 在数据库中建立数据库表,命令为item

    DROP TABLE IF EXISTS `item`;
    CREATE TABLE `item` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `code` varchar(255) DEFAULT NULL COMMENT '商品编号',
      `name` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '商品名称',
      `create_time` datetime DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COMMENT='商品信息表';

    在数据库表创建一条记录,code取值为“book_10010”,name取值为”分布式中间件实战“,create_time取值为当前时间。

  2. 采用MyBatis的逆向工程生成该实体类对应的Model,Mapper和Mapper.xml,并将这三个文件存放至model模块下不同的目录中

    public class Item {
        private Integer id;
        private String code;
        private String name;
        @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
        private Date createTime;
    
        public Integer getId() {
            return id;
        }
    
        public void setId(Integer id) {
            this.id = id;
        }
    
        public String getCode() {
            return code;
        }
    
        public void setCode(String code) {
            this.code = code;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public Date getCreateTime() {
            return createTime;
        }
    
        public void setCreateTime(Date createTime) {
            this.createTime = createTime;
        }
    }
    public interface ItemMapper {
        /**
         * 此为MyBatis逆向工程自动生成的方法-增、删、改、查
         */
        int deleteByPrimaryKey(Integer id);
    
        int insert(Item record);
    
        int insertSelective(Item record);
    
        Item selectByPrimaryKey(Integer id);
    
        int updateByPrimaryKeySelective(Item record);
    
        int updateByPrimaryKey(Item record);
    
        /**
         * 根据商品编码,查询商品详情
         */
        Item selectByCode(@Param("code") String code);
    
    }
    <?xml version="1.0" encoding="UTF-8" ?>
    <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
    <mapper namespace="com.debug.middleware.model.mapper.ItemMapper" >
        <resultMap id="BaseResultMap" type="com.debug.middleware.model.entity.Item" >
            <id column="id" property="id" jdbcType="INTEGER" />
            <result column="code" property="code" jdbcType="VARCHAR" />
            <result column="name" property="name" jdbcType="VARCHAR" />
            <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
        </resultMap>
        <sql id="Base_Column_List" >
            id, code, name, create_time
        </sql>
        <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
            select
            <include refid="Base_Column_List" />
            from item
            where id = #{id,jdbcType=INTEGER}
        </select>
        <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer" >
            delete from item
            where id = #{id,jdbcType=INTEGER}
        </delete>
        <insert id="insert" parameterType="com.debug.middleware.model.entity.Item" >
            insert into item (id, code, name,
                              create_time)
            values (#{id,jdbcType=INTEGER}, #{code,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR},
                    #{createTime,jdbcType=TIMESTAMP})
        </insert>
        <insert id="insertSelective" parameterType="com.debug.middleware.model.entity.Item" >
            insert into item
            <trim prefix="(" suffix=")" suffixOverrides="," >
                <if test="id != null" >
                    id,
                </if>
                <if test="code != null" >
                    code,
                </if>
                <if test="name != null" >
                    name,
                </if>
                <if test="createTime != null" >
                    create_time,
                </if>
            </trim>
            <trim prefix="values (" suffix=")" suffixOverrides="," >
                <if test="id != null" >
                    #{id,jdbcType=INTEGER},
                </if>
                <if test="code != null" >
                    #{code,jdbcType=VARCHAR},
                </if>
                <if test="name != null" >
                    #{name,jdbcType=VARCHAR},
                </if>
                <if test="createTime != null" >
                    #{createTime,jdbcType=TIMESTAMP},
                </if>
            </trim>
        </insert>
        <update id="updateByPrimaryKeySelective" parameterType="com.debug.middleware.model.entity.Item" >
            update item
            <set >
                <if test="code != null" >
                    code = #{code,jdbcType=VARCHAR},
                </if>
                <if test="name != null" >
                    name = #{name,jdbcType=VARCHAR},
                </if>
                <if test="createTime != null" >
                    create_time = #{createTime,jdbcType=TIMESTAMP},
                </if>
            </set>
            where id = #{id,jdbcType=INTEGER}
        </update>
        <update id="updateByPrimaryKey" parameterType="com.debug.middleware.model.entity.Item" >
            update item
            set code = #{code,jdbcType=VARCHAR},
                name = #{name,jdbcType=VARCHAR},
                create_time = #{createTime,jdbcType=TIMESTAMP}
            where id = #{id,jdbcType=INTEGER}
        </update>
    
        <!--根据商品编码查询-->
        <select id="selectByCode" resultType="com.debug.middleware.model.entity.Item">
            select
            <include refid="Base_Column_List" />
            from item
            where code = #{code}
        </select>
    
    </mapper>
  3. 在启动类MainApplication加上@MapperScan注解,用于扫描MyBatis动态SQL对应的Mapper接口所在的包。

    @MapperScan(basePackages = "com.debug.middleware.model")
  4. 建立对应的控制层Controller与实际业务逻辑处理层Service

    @RestController
    public class CachePassController {
        private static final Logger log = LoggerFactory.getLogger(CachePassController.class);
        private static final String prefix = "cache/pass";
        /**
         * 定义缓存穿透处理服务类
         */
        @Autowired
        private CachePassService cachePassService;
    
        /**
         * 获取热销商品信息
         */
        @RequestMapping(value = prefix + "/item/info", method = RequestMethod.GET)
        public Map<String, Object> getItem(@RequestParam String itemCode) {
            //定义接口返回的格式,主要包括code,msg和data
            Map<String, Object> resMap = new HashMap<>();
            resMap.put("code", 0);
            resMap.put("msg", "成功");
            try {
                //调用缓存穿透处理服务类得到返回结果,并将其添加进结果Map中
                resMap.put("data", cachePassService.getItemInfo(itemCode));
            } catch (Exception e) {
                resMap.put("code", -1);
                resMap.put("msg", "失败" + e.getMessage());
            }
            return resMap;
        }
    }
    @Service
    public class CachePassService {
        private static final Logger log = LoggerFactory.getLogger(CachePassService.class);
        /**
         * 定义Mapper
         */
        @Autowired
        private ItemMapper itemMapper;
        /**
         * 定义Redis操作组件RedisTemplate
         */
        @Autowired
        private RedisTemplate redisTemplate;
    
        /**
         * 定义JSON序列化与反序列化框架
         */
        @Autowired
        private ObjectMapper objectMapper;
        /**
         * 定义缓存中key命名的前缀
         */
        private static final String keyPrefix = "item:";
    
        /**
         * 获取商品详情,如果缓存有,则从缓存中取,如果没有,则从数据库中查询,并将查询结果塞入缓存中
         */
        public Item getItemInfo(String itemCode) throws IOException {
            //定义商品对象
            Item item = null;
            //定义缓存中真正的key:由前缀和商品编码组成
            final String key = keyPrefix + itemCode;
            //定义Redis的操作组件ValueOperations
            ValueOperations valueOperations = redisTemplate.opsForValue();
            if (redisTemplate.hasKey(key)) {
                log.info("---获取商品详情-缓存中存在该商品---商品编号为:{}", itemCode);
                //从缓存中查询该商品详情
                Object res = valueOperations.get(key);
                if (res != null && !Strings.isNullOrEmpty(res.toString())) {
                    //如果可以找到该商品,则进行JSON反序列化解析
                    item = objectMapper.readValue(res.toString(), Item.class);
                }
            }else {
                //如果缓存中没有找到该商品
                log.info("---获取商品详情-缓存中不存在该商品-从数据库中查询---商品编号为“{}", itemCode);
                //从数据库中获取该商品详情
                item = itemMapper.selectByCode(itemCode);
                if (item != null) {
                    //如果数据库中查得到该商品,则将其序列化后写入缓存中
                    valueOperations.set(key, objectMapper.writeValueAsString(item));
                } else {
                    //过期失效时间TTL设置为30分钟,“ ”很关键
                    valueOperations.set(key, " ", 30L, TimeUnit.MINUTES);
                }
            }
            return item;
        }
    }
  5. 运行

    这里出现异常,原因是数据库依赖版本与实际数据库不一致,这里进行如下修改:

    #数据库访问配置
    #spring.datasource.url=jdbc:mysql://localhost:3306/db_middleware?useUnicode=true&amp;characterEncoding=utf-8
    spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
    spring.datasource.url=jdbc:mysql://127.0.0.1:3306/db_middleware?useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC
    <mysql.version>8.0.19</mysql.version>

    最终效果:在浏览器中输入http://localhost:8087/middleware/cache/pass/item/info?itemCode=book_10010,运行窗口第一次打印数据是从数据库中查询得到,第二次是从缓存中获取。

    [2021-01-05 11:19:15.015] boot -  INFO [http-nio-8087-exec-1] ---CachePassService: ---获取商品详情-缓存中不存在该商品-从数据库中查询---商品编号为“book_10010
    [2021-01-05 11:19:23.023] boot -  INFO [http-nio-8087-exec-2] ---CachePassService: ---获取商品详情-缓存中存在该商品---商品编号为:book_10010

    如果查找一个不存在的商品,第一次会从数据库中查询,后面都是从缓存中查询

    [2021-01-05 11:29:50.050] boot -  INFO [http-nio-8087-exec-1] ---CachePassService: ---获取商品详情-缓存中不存在该商品-从数据库中查询---商品编号为“book_10012
    [2021-01-05 11:29:56.056] boot -  INFO [http-nio-8087-exec-2] ---CachePassService: ---获取商品详情-缓存中存在该商品---商品编号为:book_10012
    [2021-01-05 11:30:00.000] boot -  INFO [http-nio-8087-exec-3] ---CachePassService: ---获取商品详情-缓存中存在该商品---商品编号为:book_10012

其他典型问题介绍

缓存雪崩:缓存中的Key集体失效,解决办法:为这些Key设置不同的,随机的过期时间

缓存击穿:热点事件,解决办法:让这个Key永不失效

第4章:Redis典型应用场景实战之抢红包系统

4.1 整体业务流程介绍

系统整体业务流程主要由两大业务组成:发红包和抢红包,其中抢红包可以分为用户点红包和用户拆红包。

抢红包系统整体业务模块划分:

  • 发红包模块:主要包括接受并处理用户发红包请求的逻辑处理
  • 抢红包模块:主要包括用户点红包和拆红包请求的逻辑处理
  • 数据操作DB模块:主要包括系统整体业务逻辑处理过程中的数据记录
  • 缓存中间件Redis模块:主要用于缓存红包个数以及红包随机金额

4.2 数据库表设计与环境搭建

4.2.1 数据库表设计

发红包记录表

DROP TABLE IF EXISTS `red_record`;
CREATE TABLE `red_record` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) NOT NULL COMMENT '用户id',
  `red_packet` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '红包全局唯一标识串',
  `total` int(11) NOT NULL COMMENT '人数',
  `amount` decimal(10,2) DEFAULT NULL COMMENT '总金额(单位为分)',
  `is_active` tinyint(4) DEFAULT '1',
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8 COMMENT='发红包记录';

红包随机金额明细表

DROP TABLE IF EXISTS `red_detail`;
CREATE TABLE `red_detail` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `record_id` int(11) NOT NULL COMMENT '红包记录id',
  `amount` decimal(8,2) DEFAULT NULL COMMENT '金额(单位为分)',
  `is_active` tinyint(4) DEFAULT '1',
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=133 DEFAULT CHARSET=utf8 COMMENT='红包明细金额';

抢红包记录表

DROP TABLE IF EXISTS `red_rob_record`;
CREATE TABLE `red_rob_record` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL COMMENT '用户账号',
  `red_packet` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '红包标识串',
  `amount` decimal(8,2) DEFAULT NULL COMMENT '红包金额(单位为分)',
  `rob_time` datetime DEFAULT NULL COMMENT '时间',
  `is_active` tinyint(4) DEFAULT '1',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=118 DEFAULT CHARSET=utf8 COMMENT='抢红包记录';

4.2.2 开发环境搭建

1、开发三个实体类

发红包记录实体类

@Data
public class RedRecord {
    private Integer id;
    private Integer userId;
    private String redPacket;
    private Integer total;
    private BigDecimal amount;
    private Byte isActive;
    private Date createTime;
}

红包金额明细实体类

@Data
public class RedDetail {
    private Integer id;
    private Integer recordId;
    private BigDecimal amount;
    private Byte isActive;
    private Date createTime;
}

抢到红包时金额等相关信息记录表

@Data
public class RedRobRecord {
    private Integer id;
    private Integer userId;
    private String redPacket;
    private BigDecimal amount;
    private Date robTime;
    private Byte isActive;
}

2、开发3个Mapper接口

public interface RedRecordMapper {
    /**
     * 根据主键id删除
     */
    int deleteByPrimaryKey(Integer id);

    /**
     * 插入数据记录
     */
    int insert(RedRecord record);

    /**
     * 插入数据记录
     */
    int insertSelective(RedRecord record);

    /**
     * 根据主键id查询记录
     */
    RedRecord selectByPrimaryKey(Integer id);

    /**
     * 更新数据记录
     */
    int updateByPrimaryKeySelective(RedRecord record);

    /**
     * 更新数据记录
     */
    int updateByPrimaryKey(RedRecord record);
}
public interface RedDetailMapper {
    int deleteByPrimaryKey(Integer id);
    int insert(RedDetail record);
    int insertSelective(RedDetail record);
    RedDetail selectByPrimaryKey(Integer id);
    int updateByPrimaryKeySelective(RedDetail record);
    int updateByPrimaryKey(RedDetail record);
}

4.2.3 开发流程介绍

发红包:系统后端根据用户输入金额和个数生成对应的红包随机金额列表,并将红包的总个数以及对应的随机金额列表缓存至Redis中,同时将红包的总金额、随机金额列表和红包全局唯一标识串等信息异步记录到相应的数据库中。

抢红包:先对用户身份进行验证,然后处理用户“点红包”的逻辑,主要是从缓存中获取当前剩余红包个数,如果剩余红包个数大于0,则执行拆红包的逻辑。

为了保证系统开发了流程的规范性、可扩展性以及接口的健壮性,这里约定了处理用户请求信息后将返回统一的响应格式。

public enum  StatusCode {
    Success(0,"成功"),
    Fail(-1,"失败"),
    InvalidParams(201,"非法的参数!"),
    InvalidGrantType(202,"非法的授权类型");
    
    private Integer code;
    private String msg;
    StatusCode(Integer code,String msg){
        this.code = code;
        this.msg = msg;
    }

    public Integer getCode() {
        return code;
    }

    public void setCode(Integer code) {
        this.code = code;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}
public class BaseResponse<T> {
    private Integer code;
    private String msg;
    private T data;

    public BaseResponse(Integer code, String msg) {
        this.code = code;
        this.msg = msg;
    }
    
    public BaseResponse(StatusCode statusCode){
        this.code = statusCode.getCode();
        this.msg = statusCode.getMsg();
    }

    public BaseResponse(Integer code, String msg, T data) {
        this.code = code;
        this.msg = msg;
        this.data = data;
    }

    public Integer getCode() {
        return code;
    }

    public void setCode(Integer code) {
        this.code = code;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }
}

4.3 “红包金额”随机生成算法实战

执行逻辑在于不断地更新总金额M和剩余人数N,并根据M和N组成一个随机区间,并在这个区间内产生一个随机金额,如此不断的进行循环迭代,直至N-1为0,此时剩余的金额即为最后一个随机金额。

源代码:

package com.debug.middleware.server.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * @Author: gavin
 * @GitHub: https://github.com/gavin-yyj
 * @Date: Created in 14:04 2021/1/6
 * @Description: 二倍均值法的代码实战-封装成工具类
 */

public class RedPacketUtil {
    /**
     * 发红包算法,金额参数以分为单位
     *
     * @param totalAmount    红包总金额-单位为分
     * @param totalPeopleNum 总人数
     * @return
     */
    public static List<Integer> divideRedPackage(Integer totalAmount, Integer totalPeopleNum) {
        //用于存储每次产生的小红包随机金额List -金额单位为分
        List<Integer> amountList = new ArrayList<>();
        //判断总金额和总个数参数的合法性
        if (totalAmount > 0 && totalPeopleNum > 0) {
            //记录剩余的总金额-初始化时即为红包的总金额
            Integer restAmount = totalAmount;
            //记录剩余的总人数-初始化即为指定的总人数
            Integer restPeopleNum = totalPeopleNum;
            //定义产生随机数的实例对象
            Random random = new Random();
            //不断循环遍历,迭代更新地产生随机金额,直到N-1>0
            for (int i = 0; i < totalPeopleNum-1; i++) {
                //随机范围:[1,剩余人均金额的两倍),左闭右开-amount即为产生的随机金额R-单位为分
                int amount = random.nextInt(restAmount / restPeopleNum * 2 - 1) + 1;
                //更新剩余的总金额M=M-R
                restAmount -= amount;
                //更新剩余的总人数N=N-1
                restPeopleNum--;
                //将产生的随机金额添加进列表List中
                amountList.add(amount);
            }
            //循环完毕,剩余的金额即为最后一个随机金额,也需要将其添加进列表中
            amountList.add(restAmount);
        }
        //将最终产生的随机金额列表返回
        return amountList;
    }
}

测试:

@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class RedPacketTest {
    //定义日志
    private static final Logger log = LoggerFactory.getLogger(RedPacketTest.class);

    //二倍均值法自测方法
    @Test
    public void one() {
        //总金额单位为分,在这里假设总金额为1000分,即10元
        Integer amout = 1000;
        //总人数即红包总个数,在这里假设为10个
        Integer total = 10;
        //调用二倍均值法工具类产生随机金额列表的方法得到小红包随机金额列表
        List<Integer> list = RedPacketUtil.divideRedPackage(amout, total);
        log.info("总金额={}分,总人数={}个", amout, total);
        //用于统计生成的随机金额之和是否等于总金额
        Integer sum = 0;
        //遍历输出每个随机金额
        for (Integer i : list) {
            //输出随机金额时包括单位为分和单位为元的信息
            log.info("随机金额为:{}分,即{}元", i,new BigDecimal(i.toString()).divide(new BigDecimal(100)));
            sum += i;
        }
        log.info("所有随机金额叠加之和={}分", sum);
    }
}

效果:(代码有误,因此这里有11份红包,已更正)

[2021-01-25 10:17:24.024] boot -  INFO [main] ---RedPacketTest: 总金额=1000分,总人数=10个
[2021-01-25 10:17:24.024] boot -  INFO [main] ---RedPacketTest: 随机金额为:164分,即1.64元
[2021-01-25 10:17:24.024] boot -  INFO [main] ---RedPacketTest: 随机金额为:23分,即0.23元
[2021-01-25 10:17:24.024] boot -  INFO [main] ---RedPacketTest: 随机金额为:196分,即1.96元
[2021-01-25 10:17:24.024] boot -  INFO [main] ---RedPacketTest: 随机金额为:119分,即1.19元
[2021-01-25 10:17:24.024] boot -  INFO [main] ---RedPacketTest: 随机金额为:49分,即0.49元
[2021-01-25 10:17:24.024] boot -  INFO [main] ---RedPacketTest: 随机金额为:94分,即0.94元
[2021-01-25 10:17:24.024] boot -  INFO [main] ---RedPacketTest: 随机金额为:18分,即0.18元
[2021-01-25 10:17:24.024] boot -  INFO [main] ---RedPacketTest: 随机金额为:186分,即1.86元
[2021-01-25 10:17:24.024] boot -  INFO [main] ---RedPacketTest: 随机金额为:105分,即1.05元
[2021-01-25 10:17:24.024] boot -  INFO [main] ---RedPacketTest: 随机金额为:35分,即0.35元
[2021-01-25 10:17:24.024] boot -  INFO [main] ---RedPacketTest: 随机金额为:11分,即0.11元
[2021-01-25 10:17:24.024] boot -  INFO [main] ---RedPacketTest: 所有随机金额叠加之和=1000

4.4 “发红包”模块实战

系统后端接口需要根据红包个数N和总金额M采用二倍均值法拆分成多个随机金额,并生成红包的全局唯一标识串返回给前端,前端用户发起抢红包请求时将带上这个标识串参数,从而实现后续的“点红包”“拆红包”流程。

4.4.1 业务模块分析

后端接口在接收到前端用户发红包的请求时,将采用当前的时间戳(纳秒级别)作为红包全局唯一标识串,并将这一标识串返回给前端,后续用户发起“抢红包”的请求时,将会带上这一参数,目的是为了给发出的红包打标记,并根据这一标记去缓存中查询红包个数和随机金额列表等数据。

在处理“发红包”的请求时,后端接口需要接收红包总金额和总个数等参数,因而将其封装成实体对象RedPacketDto,源代码如下:

@Data
@ToString
public class RedPacketDto {
    private Integer userId;
    @NotNull
    private Integer total;
    @NotNull
    private Integer amount;
}

本系统采用MVCM模式进行开发,最后的M指Middleware(中间件层,即采用中间件辅助处理业务逻辑的服务类)

4.4.2 整体流程实战

1、处理发红包请求的RedPacketController,主要用于接收前端用户请求的参数并执行响应的判断处理逻辑,源代码:

@RestController
public class RedPacketController {
    private static final Logger log = LoggerFactory.getLogger(RedPacketController.class);
    /**
     * 定义请求路径的前缀
     */
    private static final String prefix = "red/packet";
    /**
     * 注入红包业务逻辑处理接口服务
     */
    @Autowired
    private IRedPacketService redPacketService;

    /**
     * 发红包请求-请求方法为post,请求参数采用JSON格式进行交互
     */
    @RequestMapping(value = prefix + "/hand/out", method = RequestMethod.POST,
            consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public BaseResponse handOut(@Validated @RequestBody RedPacketDto dto, BindingResult result) {
        //参数校验
        if (result.hasErrors()) {
            return new BaseResponse(StatusCode.InvalidParams);
        }
        BaseResponse response = new BaseResponse(StatusCode.Success);
        try {
            //核心业务逻辑处理服务-最终返回红包全局唯一标识串
            String redId = redPacketService.handOut(dto);
            //将红包全局唯一标识串返回给前端
            response.setData(redId);
        } catch (Exception e) {
            //如果报异常则打印日志并返回相应的错误信息
            log.error("发红包发生异常:dto={}", dto, e.fillInStackTrace());
            response = new BaseResponse(StatusCode.Fail.getCode(), e.getMessage());
        }
        return response;
    }
}

2、IRedPacketService为红包业务逻辑处理接口,主要包括对“发红包”与“抢红包”逻辑的处理,代码如下:

public interface IRedPacketService {
    /**发红包核心业务逻辑的实现*/
    String handOut(RedPacketDto dto);
    /**抢红包*/
    BigDecimal rob(Integer userId,String redId);
}

IRedPacketService的实现类RedPacketService主要用于处理真正的业务逻辑,源代码如下:

@Service
public class RedPacketService implements IRedPacketService {
    private static final Logger log = LoggerFactory.getLogger(RedPacketService.class);
    /**
     * 存储至缓存系统Redis时定义的Key前缀
     */
    private static final String keyPrefix = "redis:red:packet:";
    /**
     * 定义Redis操作Bean组件
     */
    @Autowired
    private RedisTemplate redisTemplate;
    /**
     * 自动注入红包业务逻辑处理过程数据记录接口服务
     */
    @Autowired
    private IRedService redService;

    /**
     * 发红包
     */
    @Override
    public String handOut(RedPacketDto dto) throws Exception {
        //判断参数的合法性
        if (dto.getTotal() > 0 && dto.getAmount() > 0) {
            //采用二倍均值法生成随机金额列表,在上一节已经采用代码实现了二倍均值法
            List<Integer> list = RedPacketUtil.divideRedPackage(dto.getAmount(), dto.getTotal());
            //生成红包全局唯一标识串
            String timestamp = String.valueOf(System.nanoTime());
            //根据缓存key的前缀与其他信息拼接成一个新的用于存储随机金额列表的Key
            String redId = new StringBuffer(keyPrefix).append(dto.getUserId()).append(":").append(timestamp).toString();
            //将随机金额列表存入缓存List中
            redisTemplate.opsForList().leftPushAll(redId, list);
            //根据缓存Key的前缀与其他信息拼接成一个新的用于存储红包总数的Key
            String redTotalKey = redId + ":total";
            //将红包总数存入缓存中
            redisTemplate.opsForValue().set(redTotalKey, dto.getTotal());
            //异步记录红包的全局唯一标识串,红包个数与随机金额列表信息至数据库中
            redService.recordRedPacket(dto, redId, list);
            //将红包的全局唯一标识串返回给前端
            return redId;
        } else {
            throw new Exception("系统异常-分发红包-参数不合法!");
        }
    }

    /**
     * 抢红包处理逻辑
     */
    @Override
    public BigDecimal rob(Integer userId, String redId) {
        return null;
    }
}

3、“红包业务逻辑处理过程数据记录接口”服务IRedService,主要用于将发红包时红包的相关信息与抢红包时用户抢到的红包金额等信息记入数据库中,源代码如下:

public interface IRedService {
    /**记录发红包时的全局唯一标识串,随机金额列表和个数等信息入数据库*/
    void recordRedPacket(RedPacketDto dto, String redId, List<Integer> list);
    
    /**记录抢红包时用户抢到的红包金额等信息入数据库*/
    void recordRobRedPacket(Integer userId, String redId, BigDecimal amount);
}

实现类RedService源代码如下:

@Service
@EnableAsync
public class RedService implements IRedService {
    private static final Logger log = LoggerFactory.getLogger(RedService.class);
    /**发红包时红包全局唯一标识串等信息操作接口Mapper*/
    @Autowired
    private RedRecordMapper redRecordMapper;
    /**发红包时随机数算法生成的随机金额列表等信息操作接口Mapper*/
    @Autowired
    private RedDetailMapper redDetailMapper;
    /**抢红包时相关数据信息操作接口Mapper*/
    @Autowired
    private RedRobRecordMapper redRobRecordMapper;

    /**
     * 发红包记录-异步方式
     * @param dto 红包总金额+个数
     * @param redId 红包全局唯一标识串
     * @param list 红包随机金额列表
     */
    @Override
    @Async
    @Transactional(rollbackFor = Exception.class)
    public void recordRedPacket(RedPacketDto dto, String redId, List<Integer> list) {
        //定义实体类对象
        RedRecord redRecord = new RedRecord();
        //设置字段的取值信息
        redRecord.setUserId(dto.getUserId());
        redRecord.setRedPacket(redId);
        redRecord.setTotal(dto.getTotal());
        redRecord.setAmount(BigDecimal.valueOf(dto.getAmount()));
        redRecord.setCreateTime(new Date());
        //将对象信息插入数据库
        redRecordMapper.insertSelective(redRecord);
        //定义红包随机金额明细实体类对象
        RedDetail detail;
        //遍历随机金额列表,将金额等信息设置到相应的字段中
        for (Integer i : list) {
            detail = new RedDetail();
            detail.setRecordId(redRecord.getId());
            detail.setAmount(BigDecimal.valueOf(i));
            detail.setCreateTime(new Date());
            //将对象信息插入数据库
            redDetailMapper.insertSelective(detail);
        }
    }

    /**
     * 抢红包记录
     * @param userId
     * @param redId
     * @param amount
     */
    @Override
    public void recordRobRedPacket(Integer userId, String redId, BigDecimal amount) {
    
    }
}

4.4.3 业务模块自测

使用Postman工具测试抢红包系统发红包业务的整体流程,打开Postman,在地址栏中输入“发红包”请求对应的链接,即http://127.0.0.1:8087/middleware/red/packet/hand/out,选择请求方式Post,并在请求体中输入请求参数,如下:

{
    "userId":10010,
    "total":10,
    "amount":1000
}

4.5 “抢红包”模块实战

4.5.1 业务模块分析

  • 业务角度:点红包+拆红包
  • 系统架构:低耦合和服务的高内聚
  • 技术角度:抢红包业务模块对应的后端接口需要频繁访问缓存系统Redis,用于获取红包剩余个数和随机金额列表,进而判断用户点红包、拆红包是否成功。用户每次成功抢到红包之后,后端进行数据更新,并将相应的信息记入数据库中。

前端用户发起抢红包请求,需要带上红包全局唯一标识串resId和当前用户账户Id到系统后端接口。

4.5.2 整体流程

在RedPacketController类中添加一个处理抢红包请求的方法,源代码:

/**
     * 处理抢红包请求;接受当前用户账号id和红包全局唯一标识串参数
     */
    @RequestMapping(value = prefix+"/rob",method = RequestMethod.GET)
    public BaseResponse rob(@RequestParam Integer userId,@RequestParam String redId){
        //定义响应对象
        BaseResponse response = new BaseResponse(StatusCode.Success);
        try {
            //调用红包业务逻辑处理接口中的抢红包方法,最终返回抢到的红包金额,单位为元(Null表示抢完了)
            BigDecimal result = redPacketService.rob(userId, redId);
            if(result != null){
                //将抢到的红包金额返回到前端
                response.setData(result);
            }else{
                //没有抢到红包,即已经被抢完了
                response = new BaseResponse(StatusCode.Fail.getCode(),"红包已被抢完!");
            }
        }catch (Exception e){
            //处理过程发生异常,则打印异常信息并返回给前端
            log.error("抢红包发生异常:userId={} redId={}",userId,redId,e.fillInStackTrace());
            response = new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
        }
        //返回处理结果给前端
        return response;
    }

抢红包方法rob():

/**
    * 抢红包实际业务逻辑处理
    * @param userId 当前用户id-抢红包者
    * @param redId 红包全局唯一标识串
    * @return 返回抢到的红包金额或者抢不到红包金额的Null
    */
   @Override
   public BigDecimal rob(Integer userId, String redId) throws Exception {
       //定义Redis操作组件的值操作方法
       ValueOperations valueOperations = redisTemplate.opsForValue();
       //在处理用户抢红包之前,需要先判断一下当前用户是否已经抢过该红包
       //如果抢过,则直接返回红包金额,并在前端显示出来
       Object obj = valueOperations.get(redId + userId + ":rob");
       if(obj!=null){
           return new BigDecimal(obj.toString());
       }
       //"点红包"业务逻辑-主要用于判断缓存系统中是否仍然有红包,即剩余红包个数是否大于0
       Boolean res = click(redId);
       if(res){
           //res为true,则可以进入“拆红包”业务逻辑的处理,从小红包随机金额列表中弹出一个随机金额
           Object value = redisTemplate.opsForList().rightPop(redId);
           if(value != null){
               //当前用户抢到一个红包,则进入后续的更新缓存,并将信息记入数据库
               String redTotalKey = redId + ":total";
               //更新缓存系统中剩余的红包个数,即红包个数减1
               Integer currTotal = valueOperations.get(redTotalKey)!=null?
                       (Integer)valueOperations.get(redTotalKey):0;
               //将红包金额返回给用户前,这里金额的单位设置为“元”
               //如果你不想设置,则可以直接返回value,但是前端需要作除以100的操作
               BigDecimal result = new BigDecimal(value.toString()).divide(new BigDecimal(100));
               //将抢到红包时用户的账号信息以及抢到的金额等信息记入数据库
               redService.recordRobRedPacket(userId,redId,new BigDecimal(value.toString()));
               //将当前抢到红包的用户设置进缓存系统中,用于表示当前用户已经抢过红包了
               valueOperations.set(redId+userId+":rob",result,24L, TimeUnit.HOURS);
               //打印当前用户抢到红包的记录信息
               log.info("当前用户抢到红包了:userId={} key={} 金额={}",userId,redId,result);
               return result;
           }
           return null;
       }
   }

   private Boolean click(String redId) {
       //定义Redis的Bean操作组件-值操作组件
       ValueOperations valueOperations = redisTemplate.opsForValue();
       //定义用于查询缓存系统中红包剩余个数的Key
       //这在发红包业务模块中已经指定过了
       String redTotalKey = redId + ":total";
       //获取缓存系统Redis中农红包剩余个数
       Object total = valueOperations.get(redTotalKey);
       //判断红包剩余个数total是否大于0,如果大于0,则返回true,代表还有红包
       if(total!=null && Integer.valueOf(total.toString())>0){
           return true;
       }
       return false;
   }

去缓存系统中查询红包剩余个数Total和随机金额列表时需要提供Key,这些Key 是在“发红包”业务模块代码中定义的。

3、当用户抢到红包后,需要将当前用户的账号信息及抢到的金额等信息记入数据库中,这一实现逻辑是通过调用redService实现类的recordRobRedPacket()方法实现的,源代码如下:

/**
    * 抢红包记录
    * @param userId
    * @param redId
    * @param amount
    * @throws Exception
    */
   @Override
   @Async
   public void recordRobRedPacket(Integer userId, String redId, BigDecimal amount) throws Exception {
       RedRobRecord redRobRecord=new RedRobRecord();
       redRobRecord.setUserId(userId);
       redRobRecord.setRedPacket(redId);
       redRobRecord.setAmount(amount);
       redRobRecord.setRobTime(new Date());
       //将实体对象信息插入数据库中
       redRobRecordMapper.insertSelective(redRobRecord);
   }

4.5.3 业务模块自测

以“发红包”业务模块自测中发出的红包作为测试数据,redId取值为redis:red:packet:10010:371054244610400

1、打开Postman,在地址栏中输入http://127.0.0.1:8087/middleware/red/packet/rob?userId=10010&redId=redis:red:packet:10010:371054244610400,查看控制台输出结果

[2021-01-26 19:35:06.006] boot -  INFO [http-nio-8087-exec-1] ---RedPacketService: 当前用户抢到红包了:userId=10010 key=redis:red:packet:10010:371054244610400 金额=0.76

2、对同一个链接进行多次发送,系统并不会处理该请求

4.6 Jmeter压力测试高并发抢红包

Apache Jmeter是基于Java的压力测试工具,能够模拟实际生产环境中高并发产生的巨大负载,从而对应用服务器,网络或对象整体性能进行测试,并对产生的测试结果进行分析和反馈。

下载测试工具包:http://jmeter.apache.org/download_jmeter.cgi

找到bin文件目录,然后双击jmeter.sh文件

右击“文件”选项,新建一个测试计划,然后在该测试计划下新建线程组,最后在线程组下新建“HTTP请求””CSV数据文件设置”“查看结果树”

打开Postman,发起发红包请求,参数如下:

{
    "userId":10030,
    "total":10,
    "amount":500
}

抢红包请求设置如下:

在这里插入图片描述

用户抢红包账号等设置如下:共有10030~10035六个用户

在这里插入图片描述

运行后查看结果树:

在这里插入图片描述

运行栏数据:

[2021-01-27 09:45:43.043] boot -  INFO [http-nio-8087-exec-90] ---RedPacketService: 当前用户抢到红包了:userId=10031 key=redis:red:packet:10030:426373717637600 金额=0.28
[2021-01-27 09:45:43.043] boot -  INFO [http-nio-8087-exec-212] ---RedPacketService: 当前用户抢到红包了:userId=10031 key=redis:red:packet:10030:426373717637600 金额=0.57
[2021-01-27 09:45:43.043] boot -  INFO [http-nio-8087-exec-33] ---RedPacketService: 当前用户抢到红包了:userId=10030 key=redis:red:packet:10030:426373717637600 金额=0.79
[2021-01-27 09:45:43.043] boot -  INFO [http-nio-8087-exec-74] ---RedPacketService: 当前用户抢到红包了:userId=10033 key=redis:red:packet:10030:426373717637600 金额=0.82
[2021-01-27 09:45:43.043] boot -  INFO [http-nio-8087-exec-223] ---RedPacketService: 当前用户抢到红包了:userId=10035 key=redis:red:packet:10030:426373717637600 金额=0.54
[2021-01-27 09:45:43.043] boot -  INFO [http-nio-8087-exec-204] ---RedPacketService: 当前用户抢到红包了:userId=10035 key=redis:red:packet:10030:426373717637600 金额=0.44
[2021-01-27 09:45:43.043] boot -  INFO [http-nio-8087-exec-210] ---RedPacketService: 当前用户抢到红包了:userId=10030 key=redis:red:packet:10030:426373717637600 金额=0.08
[2021-01-27 09:45:43.043] boot -  INFO [http-nio-8087-exec-215] ---RedPacketService: 当前用户抢到红包了:userId=10031 key=redis:red:packet:10030:426373717637600 金额=0.02
[2021-01-27 09:45:43.043] boot -  INFO [http-nio-8087-exec-195] ---RedPacketService: 当前用户抢到红包了:userId=10032 key=redis:red:packet:10030:426373717637600 金额=0.67
[2021-01-27 09:45:43.043] boot -  INFO [http-nio-8087-exec-211] ---RedPacketService: 当前用户抢到红包了:userId=10033 key=redis:red:packet:10030:426373717637600 金额=0.79

异常现象:有用户抢到多个红包!!!

4.7 问题分析与优化方案

问题所在:同一时刻多个并发的线程对共享资源进行了访问操作。

优化方案

传统方案:在核心业务逻辑代码中加锁操作(Synchronized)。在分布式系统架构下的服务一般部署在不同的节点(服务器)下,从而当出现高并发请求时,Synchronized无法解决。

主流方案:分布式锁

Redis分布式锁实战

通过Redis的院子操作selfAbsent()方法对该业务逻辑加分布式锁,表示“如果当前的Key不存在于缓存中,则设置其对应的Value,该方法的操作结果返回true;如果当前的Key已经存在于缓存中,则设置其对应的Value失败,即该方法的操作结果将返回false,由于该方法具备原子性,故当多个并发的线程同一时刻调用该方法时Redis的底层会将线程加入”队列“排队处理。

改造后的rob()方法如下:

@Override
    public BigDecimal rob(Integer userId, String redId) throws Exception {
        //定义Redis操作组件的值操作方法
        ValueOperations valueOperations = redisTemplate.opsForValue();
        //在处理用户抢红包之前,需要先判断一下当前用户是否已经抢过该红包
        //如果抢过,则直接返回红包金额,并在前端显示出来
        Object obj = valueOperations.get(redId + userId + ":rob");
        if(obj!=null){
            return new BigDecimal(obj.toString());
        }
        //"点红包"业务逻辑-主要用于判断缓存系统中是否仍然有红包,即剩余红包个数是否大于0
        Boolean res = click(redId);
        if(res){
            //上分布式锁,一个红包每个人只能抢到一次随机金额,即要永远保证一对一的关系
            //构造缓存中的key
            final String lockKey = redId + userId + "-lock";
            //调用setIfAbsent()方法,其实就是间接实现了分布式锁
            Boolean lock = valueOperations.setIfAbsent(lockKey, redId);
            //设定该分布式锁的过期时间为24小时
            redisTemplate.expire(lockKey,24L,TimeUnit.HOURS);
            try {
                //表示当前线程获取到了该分布式锁
                if(lock){

效果:

[2021-01-27 10:50:30.030] boot -  INFO [http-nio-8087-exec-10] ---RedPacketService: 当前用户抢到红包了:userId=10033 key=redis:red:packet:10050:430694037472700 金额=1.71
[2021-01-27 10:50:30.030] boot -  INFO [http-nio-8087-exec-2] ---RedPacketService: 当前用户抢到红包了:userId=10034 key=redis:red:packet:10050:430694037472700 金额=0.03
[2021-01-27 10:50:30.030] boot -  INFO [http-nio-8087-exec-7] ---RedPacketService: 当前用户抢到红包了:userId=10030 key=redis:red:packet:10050:430694037472700 金额=0.67
[2021-01-27 10:50:30.030] boot -  INFO [http-nio-8087-exec-5] ---RedPacketService: 当前用户抢到红包了:userId=10032 key=redis:red:packet:10050:430694037472700 金额=1.29
[2021-01-27 10:50:30.030] boot -  INFO [http-nio-8087-exec-6] ---RedPacketService: 当前用户抢到红包了:userId=10035 key=redis:red:packet:10050:430694037472700 金额=0.55
[2021-01-27 10:50:30.030] boot -  INFO [http-nio-8087-exec-9] ---RedPacketService: 当前用户抢到红包了:userId=10031 key=redis:red:packet:10050:430694037472700 金额=1.46

第5章 消息中间件RabbitMQ

问题:传统应用在系统接口和服务处理模块层面仍然沿用“高耦合”和“同步”的处理方式,导致接口由于线程阻塞而延长了整体响应时间,即所谓的“高延迟”。

5.1 RabbitMQ简介

RabbitMQ的作用:异步通信、服务解耦、接口限流、消息分发、延迟处理

5.1.3 RabbitMQ后端控制台介绍

安装Erlang以及去官网下载RabbitMQ安装包,安装完成后在浏览器中输入http://127.0.0.1:15672

5.1.4 基于Spring的事件驱动模型实战

Spring事件驱动模型主要由3部分组成,包括发送消息的生产者、消息(或事件)和监听消息的消费者

用户登录成功后的事件实体类LoginEvent

@Data
@ToString
public class LoginEvent extends ApplicationEvent implements Serializable {
    private String userName;
    private String loginTime;
    private String ip;
    /**构造方法一*/
    public LoginEvent(Object source) {
        super(source);
    }
    /**构造方法二*/
    public LoginEvent(Object source,String userName,String loginTime,String ip) {
        super(source);
        this.userName = userName;
        this.loginTime = loginTime;
        this.ip = ip;
    }
}

开发监听消息的消费者Consumer类:

@Component
@EnableAsync
public class Consumer implements ApplicationListener<LoginEvent> {
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);

    /**
     * 监听消费信息
     * @param loginEvent
     */
    @Override
    @Async
    public void onApplicationEvent(LoginEvent loginEvent) {
        //打印日志信息
        log.info("Spring事件驱动模型-接收消息:{}",loginEvent);
    }
}

用于发送消息或产生事件的生产者Producter

@Component
public class Publisher {
    private static final Logger log = LoggerFactory.getLogger(Publisher.class);
    @Autowired
    private ApplicationEventPublisher publisher;
    /**
     * 发送消息的方法
     */
    public void sendMsg() {
        //构造登录成功后用户的实体信息
        LoginEvent event = new LoginEvent(this, "debug", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()), "127.0.0.1");
        //发送消息
        publisher.publishEvent(event);
        log.info("Spring事件驱动模型-发送消息:{}", event);
    }
}

测试类:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class EventTest {
    @Autowired
    private Publisher publisher;
    @Test
    public void test1(){
        publisher.sendMsg();
    }
}

效果:

[2021-01-28 16:33:32.032] boot -  INFO [SimpleAsyncTaskExecutor-1] ---Consumer: Spring事件驱动模型-接收消息:LoginEvent(userName=debug, loginTime=2021-01-28 16:33:32, ip=127.0.0.1)
[2021-01-28 16:33:32.032] boot -  INFO [main] ---Publisher: Spring事件驱动模型-发送消息:LoginEvent(userName=debug, loginTime=2021-01-28 16:33:32, ip=127.0.0.1)

5.2 SpringBoot项目整合RabbitMQ

5.2.1 RabbitMQ相关词汇介绍

  • 生产者
  • 消费者
  • 消息
  • 队列:消息的暂存区或者是存储区
  • 交换机:消息的中转站,用于首次接收和分发消息
  • 路由:相当于秘钥、地址或者“第三者”,将消息路由到指定的队列

5.2.2 Spring Boot项目整合RabbitMQ

1、加入RabbitMQ依赖

<!--rabbitmq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>1.3.3.RELEASE</version>
</dependency>

2、加入RabbitMQ相关配置,包括RabbitMQ服务器所在的Host、端口号、用户名和密码等配置

#RabbitMQ配置
spring.rabbitmq.virtual-host=/
#RabbitMQ服务器所在的host,这里连接本地即可
spring.rabbitmq.host=127.0.0.1
#5672为RabbitMQ提供服务时的端口
spring.rabbitmq.port=5672
#guest和guest为连接到RabbitMQ服务器的账号名和密码
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#这里是自定义变量,表示本地开发环境
mq.env=local

5.2.3 自定义注入配置Bean相关组件

@Configuration
public class RabbitmqConfig {
    private static final Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);
    /**自动装配RabbitMQ的链接工厂实例*/
    @Autowired
    private CachingConnectionFactory connectionFactory;
    /**自动装配消息监听器所在的容器工厂配置类实例*/
    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    /**
     * 为单一消费者实例的配置
     * @return
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        //定义消息监听器所在的容器工厂
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置容器工厂所用的实例
        factory.setConnectionFactory(connectionFactory);
        //设置消息在传输中的格式,在这里采用JSON格式进行传输
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置并发消费者实例的初始数量,这里为1个
        factory.setConcurrentConsumers(1);
        //设置并发消费者实例的最大数量,这里为1个
        factory.setMaxConcurrentConsumers(1);
        //设置并发消费者实例中每个实例拉取的消息数量-在这里为1个
        factory.setPrefetchCount(1);
        return factory;
    }
    /**
     * 多个消费者实例的配置,主要针对高并发业务场景的配置
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
        //定义消息监听器所在的容器工厂
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置容器工厂所用的实例
        factoryConfigurer.configure(factory,connectionFactory);
        //设置消息在传输中的格式,在这里采用JSON格式进行传输
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置消息的确认消费模式,在这里为NONE,表示不需要确认消费
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        //设置并发消费者实例的初始数量,这里为10个
        factory.setConcurrentConsumers(10);
        //设置并发消费者实例的最大数量,这里为15个
        factory.setMaxConcurrentConsumers(15);
        //设置并发消费者实例中每个实例拉取的消息数量-在这里为10个
        factory.setPrefetchCount(10);
        return factory;
    }
    /**自定义配置RabbitMQ发送消息的操作组件RabbitTemplate*/
    @Bean
    public RabbitTemplate rabbitTemplate(){
        //设置“发送消息后进行确认”
        connectionFactory.setPublisherConfirms(true);
        //设置"发送消息后返回确认信息"
        connectionFactory.setPublisherReturns(true);
        //构造发送消息组件实例对象
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        //发送消息后,如果发送成功,则输出“消息发送成功”的反馈信息
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }
        });
        //发送消息后,如果发送失败,则输出"消息发送失败-消息丢失"的反馈信息
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("“消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
            }
        });
        //最终返回RabbitMQ的操作组件实例RabbitTemplate
        return rabbitTemplate;
    }
}

当RabbitMQ需要处理高并发业务场景时,可以通过配置“多消费实例”的方式来实现;而在正常情况下,对消息不需要并发监听消费处理时,则只需要配置“单一消费者实例”的容器工厂即可。

5.2.4 RabbitMQ发送、接收消息实战

以“生产者发送一串简单的字符串信息到基本的消息模型中,并由消费者进行监听消费处理”为案例进行代码演练

1、在RabbitmqConfig类中创建队列,交换机,路由及其绑定

/**
* 定义读取配置文件的环境变量实例
*/
@Autowired
private Environment env;
/**创建简单消息模型:队列、交换机和路由 **/
@Bean(name = "basicQueue")
public Queue basicQueue(){
	return new Queue(env.getProperty("mq.basic.info.queue.name"),true);
}
@Bean
public DirectExchange basicExchange(){
	return new DirectExchange(env.getProperty("mq.basic.info.exchange.name"),true,false);
}
@Bean
public Binding basicBinding(){
    return 	BindingBuilder.bind(basicQueue()).to(basicExchange())
    .with(env.getProperty("mq.basic.info.routing.key.name"));
}

其中,环境变量实例env读取的相关变量是在配置文件application.properties

#定义基本消息模型中队列、交换机和路由的名称
mq.basic.info.queue.name=${mq.env}.middleware.mq.basic.info.queue
mq.basic.info.exchange.name=${mq.env}.middleware.mq.basic.info.exchange
mq.basic.info.routing.key.name=${mq.env}.middleware.mq.basic.info.routing.key

2、开发发送消息的生产者BasicPublisher,在这里指定待发送的消息为一串字符串值,代码如下:

@Component
public class BasicPublisher {
    private static final Logger log = LoggerFactory.getLogger(BasicPublisher.class);
    /**
     * 定义JSON序列化和反序列化实例
     */
    @Autowired
    private ObjectMapper objectMapper;
    /**
     * 定义RabbitMQ消息操作组件RabbitTemplate
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 定义环境变量读取实例
     */
    @Autowired
    private Environment env;

    /**
     * 发送消息
     *
     * @param message 待发送的消息,即一串字符串值
     */
    public void sendMsg(String message) {
        //判断字符串值是否为空
        if (!Strings.isNullOrEmpty(message)) {
            try {
                //定义消息传输的格式为JSON字符串格式
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                //指定消息模型中的交换机
                rabbitTemplate.setExchange(env.getProperty("mq.basic.info.exchange.name"));
                //指定消息模型中的路由
                rabbitTemplate.setRoutingKey(env.getProperty("mq.basic.info.routing.key.name"));
                //将字符串值转化为待发送的消息,即一串二进制的数据流
                Message msg = MessageBuilder.withBody(message.getBytes("utf-8")).build();
                //转化并发消息
                rabbitTemplate.convertAndSend(msg);
                //打印日志信息
                log.info("基本消息模型-生产者-发送消息:{}", message);
            } catch (Exception e) {
                log.error("基本消息模型-生产者-发送消息发生异常:{}", message, e.fillInStackTrace());
            }
        }
    }
}

3、开发监听并接收消费处理信息的消费者实例BasicConsumer

@Component
public class BasicConsumer {
    private static final Logger log = LoggerFactory.getLogger(BasicConsumer.class);
    /**
     * 定义JSON序列化和反序列化实例
     */
    @Autowired
    private ObjectMapper objectMapper;

    /**
     * 监听并接收消费队列中的消息-在这里采用单一容器工厂实例即可
     */
    @RabbitListener(queues = "${mq.basic.info.queue.name}", containerFactory = "singleListenerContainer")
    /**由于消息本质上是一串二进制数据流,因而监听接收的消息采用字节数据接收*/
    public void consumeMsg(@Payload byte[] msg) {
        try {
            //将字节数组的消息转化为字符串并打印
            String message = new String(msg, "utf-8");
            log.info("基本消息模型-消费者-监听消费到消息:{}", message);
        } catch (Exception e) {
            log.error("基本消息模型-消费者-发生异常:", e.fillInStackTrace());
        }
    }
}

4、测试类

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RabbitmqTest {
    private static final Logger log = LoggerFactory.getLogger(RabbitmqTest.class);
    /**定义JSON序列化和反序列化实例*/
    private ObjectMapper objectMapper;
    /**定义基本消息模型中发送消息的生产者*/
    @Autowired
    private BasicPublisher basicPublisher;
    /**用于发送消息的测试方法*/
    @Test
    public void test(){
        String msg = "~~~~这是一串字符串消息~~~";
        basicPublisher.sendMsg(msg);
    }
}

5.2.5 其他发送接收消息方式实战

RabbitMQ在实际的应用系统中,可以通过发送字节型(通过getBytes()方法或者序列化方法)的消息和采用@RabbitListener接收字节数组类型的消息之外,还可以通过发送、接收“对象类型”的方式实现消息的发送和接收。

1、在server模块下建立一个用于RabbitMQ操作的消息对应的对象实体包目录:com.debug.middleware.server.rabbitmq.entity,并在该包目录下建立Person类:

@Data
@ToString
public class Person {
    private Integer id;
    private String name;
    private String userName;

    public Person() {
    }

    public Person(Integer id, String name, String userName) {
        this.id = id;
        this.name = name;
        this.userName = userName;
    }
}

2、在RabbitmqConfig配置类中创建用于发送对象类型消息的队列、交换机、路由及其绑定:

/**创建简单消息模型-对象类型:队列、交换机和路由*/
   @Bean(name="objectQueue")
   public Queue objectQueue(){
       return new Queue(env.getProperty("mq.object.info.queue.name"),true);
   }
   @Bean
   public DirectExchange objectExchange(){
       return new DirectExchange(env.getProperty("mq.object.info.exchange.name"),true,false);
   }
   @Bean
   public Binding objectBinding(){
       return BindingBuilder.bind(objectQueue()).to(objectExchange())
               .with(env.getProperty("mq.object.info.routing.key.name"));
   }

参数配置

#基本消息模型-对象消息
mq.object.info.queue.name=${mq.env}.middleware.mq.object.info.queue
mq.object.info.exchange.name=${mq.env}.middleware.mq.object.info.exchange
mq.object.info.routing.key.name=${mq.env}.middleware.mq.object.info.routing.key

3、生产者

/**发送对象类型的消息*/
    public void sendObjectMsg(Person p){
        //判断对象是否为空
        if(p != null){
            try {
                //设置消息在传输过程中的格式,这里指定为JSON格式
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                //指定发送消息时对应的交换机
                rabbitTemplate.setExchange(env.getProperty("mq.object.info.exchange.name"));
                //指定发送消息时对应的路由
                rabbitTemplate.setRoutingKey(env.getProperty("mq.object.info.routing.key.name"));
                //采用convertAndSend方法即可发送消息
                rabbitTemplate.convertAndSend(p, new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        //获取消息的属性
                        MessageProperties messageProperties = message.getMessageProperties();
                        //设置消息的持久化模式
                        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        //设置消息的类型,这里指定为Person类型
                        messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,Person.class);
                        //返回消息实例
                        return message;
                    }
                });
                //打印日志信息
                log.info("基本消息模型-生产者-发送对象类型的消息:{}",p);
            }catch (Exception e){
                log.error("基本消息模型-生产者-发送对象类型的消息发生异常:{}",p,e.fillInStackTrace());
            }
        }

如果需要发送对象类型的消息,则需要借助RabbitTemplate的convertAndSend方法,通过MessagePostProcessor的实现类直接指定待发送消息的类型。

4、开发用于监听消费处理消息的消费者功能

@RabbitListener(queues = "${mq.object.info.queue.name}",containerFactory = "singleListenerContainer")
   public void consumeObjectMsg(@Payload Person person){
       try {
           log.info("基本消息模型-监听消费处理对象信息-消费者-监听消费到消息:{}",person);
       }catch (Exception e){
           log.error("基本消息模型-监听消费处理对象信息-消费者-发生异常:",e.fillInStackTrace());
       }
   }

5、测试

@Test
public void test2(){
    Person p = new Person(1,"大圣","debug");
    basicPublisher.sendObjectMsg(p);
}

5.3 RabbitMQ多种消息模型实战

RabbitMQ的核心组件体系中,主要有4中典型的消息模型:即基于HeadersExchange的消息模型、基于fanoutExchange的消息模型、基于DirectExchange的消息模型以及基于TopicExchange的消息模型。

5.3.1 基于FanoutExchange的消息模型实战

交换机的一种,具有“广播消息”的作用,当消息进入交换机这个“中转站”时,交换机会检查哪个队列跟自己是绑定在一起的,找到相应的队列后,将消息传输到相应的绑定队列中,并最终由队列对应的消费者进行监听消费。

1、在RabbitmqConfig中创建交换机、多条队列及其绑定

/**创建消息模型-fanoutExchange*/
  @Bean(name="fanoutQueueOne")
  public Queue fanoutQueueOne(){
      return new Queue(env.getProperty("mq.fanout.queue.one.name"),true);
  }
  @Bean(name = "fanoutQueueTwo")
  public Queue fanoutQueueTwo(){
      return new Queue(env.getProperty("mq.fanout.queue.two.name"),true);
  }
  @Bean
  public FanoutExchange fanoutExchange(){
      return new FanoutExchange(env.getProperty("mq.fanout.exchange.name"),true,false);
  }
  @Bean
  public Binding fanoutBindingOne(){
      return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
  }
  @Bean
  public Binding fanoutBindingTwo(){
      return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
  }

环境变量实例env读取的变量所在配置文件

#消息模型-fanoutExchange
mq.fanout.queue.one.name=${mq.env}.middleware.mq.fanout.one.queue
mq.fanout.queue.two.name=${mq.env}.middleware.mq.fanout.two.queue
mq.fanout.exchange.name=${mq.env}.middleware.mq.fanout.exchange

2、创建对象实体信息EventInfo

@Data
@ToString
public class EventInfo implements Serializable {
    private Integer id;
    private String module;
    private String name;
    private String desc;

    public EventInfo() {
    }

    public EventInfo(Integer id, String module, String name, String desc) {
        this.id = id;
        this.module = module;
        this.name = name;
        this.desc = desc;
    }
}

3、开发生产消息的生产者ModelPublisher类

@Component
public class ModelPublisher {
    private static final Logger log = LoggerFactory.getLogger(ModelPublisher.class);
    /**JSON序列化和反序列化组件*/
    @Autowired
    private ObjectMapper objectMapper;
    /**定义发送消息的操作组件RabbitTemplate*/
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**定义读取环境变量的实例*/
    @Autowired
    private Environment env;
    
    public void sendMsg(EventInfo info){
        if(info != null){
            try {
                //定义消息的传输格式为JSON
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                //设置广播式交换机FanoutExchange
                rabbitTemplate.setExchange(env.getProperty("mq.fanout.exchange.name"));
                //创建消息实例
                Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(info)).build();
                //发送消息
                rabbitTemplate.convertAndSend(msg);
                //打印日志
                log.info("消息模型fanoutExchange-生产者-发送消息:{}",info);
            } catch (Exception e) {
                log.error("消息模型fanoutExchange-生产者-发送消息发生异常:{}",info,e.fillInStackTrace());
            }
        }
    }
}

4、开发用于监听接收消费处理消息的消费者ModelConsumer,开发两条队列分别对应的监听消费方法

@Component
public class ModelConsumer {
    private static final Logger log = LoggerFactory.getLogger(ModelConsumer.class);
    @Autowired
    public ObjectMapper objectMapper;

    /**
     * 监听消费队列中的消息-fanoutExchange-one-这是第一条队列对应的消费者
     * @param msg
     */
    @RabbitListener(queues = "${mq.fanout.queue.one.name}",containerFactory = "singleListenerContainer")
    public void consumeFanoutMsgOne(@Payload byte[] msg){
        try {
            //监听消费队列中的消息,并进行解析处理
            EventInfo info = objectMapper.readValue(msg, EventInfo.class);
            log.info("消息模型fanoutExchange-one-消费者-监听消费到消息:{}",info);
        }catch (Exception e){
            log.error("消息模型-消费者-发生异常:",e.fillInStackTrace());
        }
    }
    /**
     * 监听消费队列中的消息-fanoutExchange-two-这是第二条队列对应的消费者
     * @param msg
     */
    @RabbitListener(queues = "${mq.fanout.queue.two.name}",containerFactory = "singleListenerContainer")
    public void consumeFanoutMsgTwo(@Payload byte[] msg){
        try {
            //监听消费队列中的消息,并进行解析处理
            EventInfo info = objectMapper.readValue(msg, EventInfo.class);
            log.info("消息模型fanoutExchange-two-消费者-监听消费到消息:{}",info);
        }catch (Exception e){
            log.error("消息模型-消费者-发生异常:",e.fillInStackTrace());
        }
    }
}

5、测试类

@Autowired
   private ModelPublisher modelPublisher;
   @Test
   public void test3(){
       //创建对象实例
       EventInfo info = new EventInfo(1, "增删改查模块", "基于fanoutExchange的消息模型", "这是基于fanoutExchange的消息模型");
       //触发生产者发送消息
       modelPublisher.sendMsg(info);
   }

效果:

[2021-02-01 10:16:11.011] boot -  INFO [SimpleAsyncTaskExecutor-1] ---ModelConsumer: 消息模型fanoutExchange-two-消费者-监听消费到消息:EventInfo(id=1, module=增删改查模块, name=基于fanoutExchange的消息模型, desc=这是基于fanoutExchange的消息模型)
[2021-02-01 10:16:11.011] boot -  INFO [SimpleAsyncTaskExecutor-1] ---ModelConsumer: 消息模型fanoutExchange-one-消费者-监听消费到消息:EventInfo(id=1, module=增删改查模块, name=基于fanoutExchange的消息模型, desc=这是基于fanoutExchange的消息模型)

由于该FanoutExchange绑定了两条队列,因而两条队列分别对应的消费者将监听接收到相应的消息。

总结:

基于FanoutExchange消费模型主要的核心组件是交换机和队列,一个交换机可以对应并绑定多个队列,从而对应多个消费者!

此种消费模型适用于“业务数据需要广播传输”的场景,比如“用户操作写日志”。

5.3.2 基于DirectExchange的消息模型实战

交换机,具有“直连传输消息”的作用,即当消息进入交换机这个“中转站”时,交换机会检查哪个路由跟自己绑定在一起,并根据生产者发送消息指定的路由进行匹配,如果能找到对应的绑定模型,则将消息直接路由传输到指定的队列,最终由队列对应的消费者进行监听消费。

业务场景:将实体对象信息当做消息,并发送到基于DirectExchange构成的消息模型中,根据绑定的路由,将消息路由至对应绑定的队列中,最终由对应的消费者进行监听消费处理。

1、在RabbitmqConfig配置文件中创建一个交换机、两个路由和对应绑定的两条队列。

/**创建消息模型-DirectExchange*/
   @Bean
   public DirectExchange directExchange(){
       return new DirectExchange(env.getProperty("mq.direct.exchange.name"),true,false);
   }
   @Bean(name = "directQueueOne")
   public Queue directQueueOne(){
       return new Queue(env.getProperty("mq.direct.queue.one.name"),true);
   }
   @Bean(name = "directQueueTwo")
   public Queue directQueueTwo(){
       return new Queue(env.getProperty("mq.direct.queue.two.name"),true);
   }
   @Bean
   public Binding directBindOne(){
       return BindingBuilder.bind(directQueueOne()).to(directExchange())
               .with(env.getProperty("mq.direct.routing.key.one.name"));
   }
   @Bean
   public Binding directBindTwo(){
       return BindingBuilder.bind(directQueueTwo()).to(directExchange())
               .with(env.getProperty("mq.direct.routing.key.two.name"));
   }

配置文件中变量取值如下:

#消息模型DirectExchange
mq.direct.exchange.name=${mq.env}.middleware.mq.direct.exchange
mq.direct.routing.key.one.name=${mq.env}.middleware.mq.direct.routing.key.one
mq.direct.routing.key.two.name=${mq.env}.middleware.mq.direct.routing.key.two
mq.direct.queue.one.name=${mq.env}.middleware.mq.direct.one.queue
mq.direct.queue.two.name=${mq.env}.middleware.mq.direct.two.queue

2、生产者,开发两个用于发送消息的生产者方法

/**
     * 发送消息-基于DirectExchange消息模型-one
     */
    public void sendMsgDirectOne(EventInfo info){
        if(info != null){
            try {
                //设置消息传输的格式为JSON
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                //设置交换机
                rabbitTemplate.setExchange(env.getProperty("mq.direct.exchange.name"));
                //设置路由1
                rabbitTemplate.setRoutingKey(env.getProperty("mq.direct.routing.key.one.name"));
                //创建消息
                Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(info)).build();
                //发送消息
                rabbitTemplate.convertAndSend(msg);
                //打印日志
                log.info("消息模型DirectExchange-one-生产者-发送消息:{}",info);
            }catch (Exception e){
                log.error("消息模型DirectExchange-one-生产者-发送消息发生异常:{}",info,e.fillInStackTrace());
            }
        }
    }

    /**
     * 发送消息-基于DirectExchange消息模型-two
     * @param info
     */
    public void sendMsgDirectTwo(EventInfo info){
        if (info != null){
            try {
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                rabbitTemplate.setExchange(env.getProperty("mq.direct.exchange.name"));
                rabbitTemplate.setRoutingKey(env.getProperty("mq.direct.routing.key.two.name"));
                Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(info)).build();
                rabbitTemplate.convertAndSend(msg);
                log.info("消息模型DirectExchange-two-生产者-发送消息:{}",info);
            }catch (Exception e){
                log.error("消息模型DirectExchange-two-生产者-发送消息发生异常:{}",info,e.fillInStackTrace());
            }
        }
    }

3、两个消费者方法,用于监听不同队列中的消息

/**
    * 第一个路由绑定的对应队列的消费者方法
    * @param msg
    */
   @RabbitListener(queues = "${mq.direct.queue.one.name}",containerFactory = "singleListenerContainer")
   public void consumeDirectMsgOne(@Payload byte[] msg){
       try {
           //监听消费信息并进行JSON反序列化解析
           EventInfo info = objectMapper.readValue(msg, EventInfo.class);
           //打印日志消息
           log.info("消息模型DirectExchange-one-消费者-监听消费到消息:{}",info);
       }catch (Exception e){
           log.error("消息模型DirectExchange-one-消费者-监听消费发生异常:",e.fillInStackTrace());
       }
   }
   /**
    * 第二个路由绑定的对应队列的消费者方法
    * @param msg
    */
   @RabbitListener(queues = "${mq.direct.queue.two.name}",containerFactory = "singleListenerContainer")
   public void consumeDirectMsgTwo(@Payload byte[] msg) {
       try {
           //监听消费信息并进行JSON反序列化解析
           EventInfo info = objectMapper.readValue(msg, EventInfo.class);
           //打印日志消息

           log.info("消息模型DirectExchange-two-消费者-监听消费到消息:{}", info);
       } catch (Exception e) {
           log.error("消息模型DirectExchange-two-消费者-监听消费发生异常:", e.fillInStackTrace());
       }
   }

4、测试类

@Test
   public void test4(){
       //创建对象实例
       EventInfo info = new EventInfo(
               1,
               "增删改查模块-1",
               "基于directExchange的消息模型-1",
               "这是基于directExchange的消息模型-1");
       //触发生产者发送消息
       modelPublisher.sendMsgDirectOne(info);
       info = new EventInfo(
               2,
               "增删改查模块-2",
               "基于directExchange的消息模型-2",
               "这是基于directExchange的消息模型-2");
       modelPublisher.sendMsgDirectTwo(info);
   }

效果:各自的消费者都监听消费到了各自的消息

5.3.3 基于TopicExchange的消息模型实战

“发布-主题-订阅”式的交换机,可以通过为路由的名称指定特定的通配符“”和“#”,从而绑定到不同的队列中。“\“表示一个特定的单词,“#”表示任意的单词(可以是一个,多个,也可以没有)

业务场景:将一串字符串信息当做消息,并发送到基于TopicExchange构成的消息模型中,根据绑定的路由将消息路由至绑定的队列中,最终由对应的消费者进行监听消费处理。

1、在RabbitmqConfig配置文件中创建基于TopicExchange的消息模型,这里创建一个交换机、两个分别包含通配符“*”和“#”的路由及队列绑定

/**创建绑定-通配符为*的路由*/
   @Bean
   public Binding topicBindingOne(){
       return BindingBuilder.bind(topicQueueOne()).to(topicExchange())
               .with(env.getProperty("mq.topic.routing.key.one.name"));
   }
   /**创建绑定-通配符为#的路由*/
   @Bean
   public Binding topicBindingTwo(){
       return BindingBuilder.bind(topicQueueTwo()).to(topicExchange())
               .with(env.getProperty("mq.topic.routing.key.two.name"));
   }

环境变量实例env读取的变量取值:

#消息模型-TopicExchange
mq.topic.exchange.name=${mq.env}.middleware.mq.topic.exchange
mq.topic.routing.key.one.name=${mq.env}.middleware.mq.topic.routing.*.key
mq.topic.routing.key.two.name=${mq.env}.middleware.mq.topic.routing.#.key
mq.topic.queue.one.name=${mq.env}.middleware.mq.topic.one.queue
mq.topic.queue.two.name=${mq.env}.middleware.mq.topic.two.queue

2、生产者,这里将“路由”参数开放出来,供调用者调用时指定该参数值,目的是用于测试两个通配符所起的作用。

public void sendMsgTopic(String msg,String routingKey){
       if(!Strings.isNullOrEmpty(msg) && !Strings.isNullOrEmpty(routingKey)){
           try {
               //设置消息的传输格式为JSON
               rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
               //指定交换机
               rabbitTemplate.setExchange(env.getProperty("mq.topic.exchange.name"));
               //指定路由的实际取值,根据不同取值,RabbitMQ将自行进行匹配通配符,从而路由到不同的队列中
               rabbitTemplate.setRoutingKey(routingKey);
               //创建消息
               Message message = MessageBuilder.withBody(msg.getBytes("UTF-8")).build();
               //发送消息
               rabbitTemplate.convertAndSend(message);
               //打印日志
               log.info("消息模型TopicExchange-生产者-发送消息:{} 路由:{} ",msg,routingKey);
           }catch (Exception e){
               log.error("消息模型TopicExchange-生产者-发送消息发生异常:{}",msg,e.fillInStackTrace());
           }
       }

3、开发用于监听消费消息的消费者

/**
     * 监听消费队列中的消息-TopicExchange-*通配符
     * @param msg
     */
    @RabbitListener(queues = "${mq.topic.queue.one.name}",containerFactory = "singleListenerContainer")
    public void consumeTopicMsgOne(@Payload byte[] msg){
        try {
            String message = new String(msg, "utf-8");
            log.info("消息模型-TopicExchange-*-消费者-监听消费到消息:{}",message);
        }catch (Exception e){
            log.error("消息模型-TopicExchange-*-消费者-监听消息发生异常:",e.fillInStackTrace());
        }
    }
    /**
     * 监听消费队列中的消息-TopicExchange-#通配符
     * @param msg
     */
    @RabbitListener(queues = "${mq.topic.queue.two.name}",containerFactory = "singleListenerContainer")
    public void consumeTopicMsgTwo(@Payload byte[] msg){
        try {
            String message = new String(msg, "utf-8");
            log.info("消息模型-TopicExchange-#-消费者-监听消费到消息:{}",message);
        }catch (Exception e){
            log.error("消息模型-TopicExchange-#-消费者-监听消息发生异常:",e.fillInStackTrace());
        }
    }

4、测试类:

  @Test
    public void test5(){
        String msg = "这是TopicExchange消息模型的消息";
        String routingKeyOne = "local.middleware.mq.topic.routing.java.key";
        String routingKeyTwo = "local.middleware.mq.topic.routing.php.python.key";
        String routingKeyThree = "local.middleware.mq.topic.routing.key";
//        modelPublisher.sendMsgTopic(msg,routingKeyOne);
        modelPublisher.sendMsgTopic(msg,routingKeyTwo);
//        modelPublisher.sendMsgTopic(msg,routingKeyThree);
    }

TopicExchange消息模型适用于“发布订阅主题式”的场景,具有普适性的特点。

5.4 RabbitMQ确认消费机制

RabbitMQ在消息的发送、传输和接收过程中,可以保证消息成功发送、不会丢失,以及被确认消费。

5.4.1 消息高可用和确认消费

1、RabbitMQ会在生产者在发送完消息之后进行“发送确认”,当确认成功时即代表消息一经成功发送出去了,相关代码如下:

/**自定义配置RabbitMQ发送消息的操作组件RabbitTemplate*/
    @Bean
    public RabbitTemplate rabbitTemplate(){
        //设置“发送消息后进行确认”
        connectionFactory.setPublisherConfirms(true);
        //设置"发送消息后返回确认信息"
        connectionFactory.setPublisherReturns(true);
        //构造发送消息组件实例对象
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        //发送消息后,如果发送成功,则输出“消息发送成功”的反馈信息
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }
        });
        //发送消息后,如果发送失败,则输出"消息发送失败-消息丢失"的反馈信息
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("“消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
            }
        });
        //最终返回RabbitMQ的操作组件实例RabbitTemplate
        return rabbitTemplate;
    }

2、保证RabbitMQ队列中的消息“不丢失”,是在创建队列、交换机时设置其持久化参数为true,即durable参数取值为true。

在这里插入图片描述

在创建消息时,RabbitMQ要求设置消息的持久化模式为“持久化”,从而保证RabbitMQ服务器出现崩溃并执行重启操作之后,队列、交换机仍旧存在而且消息不会丢失。

public Message postProcessMessage(Message message) throws AmqpException {
                        //获取消息的属性
                        MessageProperties messageProperties = message.getMessageProperties();
                        //设置消息的持久化模式
                        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        //设置消息的类型,这里指定为Person类型
                        messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,Person.class);
                        //返回消息实例
                        return message;
                    }

3、保证消息能够被准确消费、不重复消费,RabbitMQ提供了“消息确认机制”,只有当消息被确认消费后,消息才会从队列中被移除。

public enum AcknowledgeMode {
    NONE,
    MANUAL,
    AUTO;

    private AcknowledgeMode() {
    }

    public boolean isTransactionAllowed() {
        return this == AUTO || this == MANUAL;
    }

    public boolean isAutoAck() {
        return this == NONE;
    }

    public boolean isManual() {
        return this == MANUAL;
    }
}

5.4.2 常见的确认消费模式介绍

三种模式:NONE、AUTO、MANUAL

NONE:无需确认,即生产者将消息发送至队列,消费者监听到该消息时,无需发送任何反馈信息给RabbitMQ服务器。

AUTO:自动确认,即生产者将消息发送至队列,消费者监听到该消息时,需要发送一个AUTO ACK的反馈信息给RabbitMQ服务器,之后该消息将在RabbitMQ的队列中被移除。

MANUAL:人为手动确认消费,即生产者将消息发送至队列,消费者监听到该消息时需要手动地“以代码的形式”发送一个ACK的反馈信息给RabbitMQ服务器,之后该消息将在RabbitMQ的队列中被移除,同时告知生产者,消息已经成功发送并且成功被消费者监听消费了。

5.4.3 基于自动确认消费模式实战

1、KnowledgeInfo类充当“消息”进行传输

@Data
@ToString
public class KnowledeInfo {
    private Integer id;
    private String mode;
    private String code;
}

2、在RabbitmqConfig配置类中创建相应的队列,交换机,路由及其绑定,同时再创建一个基于自动确认消费模式的“监听器容器工厂实例”,目的是用于指定特定的队列对应的消费者采用“自动确认消费”的模式。

/**单一消费者-确认模式为AUTO*/
  @Bean(name = "singleListenerContainerAuto")
  public SimpleRabbitListenerContainerFactory listenerContainerAuto(){
      //创建消息监听器所在的容器工厂实例
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      //容器工厂实例设置链接工厂
      factory.setConnectionFactory(connectionFactory);
      //设置消息在传输中的格式
      factory.setMessageConverter(new Jackson2JsonMessageConverter());
      //设置消费者并发实例,在这里采用单一的模式
      factory.setConcurrentConsumers(1);
      //设置消费者并发最大数量的实例
      factory.setMaxConcurrentConsumers(1);
      //设置消费者每个并发的实例预拉取的消息数据量
      factory.setPrefetchCount(1);
      //设置确认消费模式为自动确认消费AUTO
      factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
      //返回监听器容器工厂实例
      return factory;
  }
  /**创建队列*/
  @Bean
  public Queue autoQueue(){
      return new Queue(env.getProperty("mq.auto.knowledge.queue.name"),true);
  }
  /**创建交换机*/
  @Bean
  public DirectExchange autoExchange(){
      return new DirectExchange(env.getProperty("mq.auto.knowledge.exchange.name"),true,false);
  }
  /**创建绑定*/
  @Bean
  public Binding autoBinding(){
      return BindingBuilder.bind(autoQueue()).to(autoExchange())
              .with(env.getProperty("mq.auto.knowledge.routing.ken.name"));
  }

相关配置文件中的变量:

#确认消费模式为自动确认机制
mq.auto.knowledge.queue.name=${mq.env}.middleware.auto.knowledge.queue
mq.auto.knowledge.exchange.name=${mq.env}.middleware.auto.knowledge.exchange
mq.auto.knowledge.routing.ken.name=${mq.env}.middleware.auto.knowledge.routing.key

3、生产者

@Component
public class KnowledgePublisher {
    private static final Logger log = LoggerFactory.getLogger(KnowledgePublisher.class);
    /**定义JSON序列化和反序列化组件实例*/
    @Autowired
    private ObjectMapper objectMapper;
    /**定义读取环境变量的实例*/
    @Autowired
    private Environment env;
    /**定义RabbitMQ操作组件实例*/
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 基于AUTO机制-生产者发送消息
     */
    public void sendAutoMsg(KnowledgeInfo info){
        try {
            if(info != null){
                //设置消息的传输格式
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                //设置交换机
                rabbitTemplate.setExchange(env.getProperty("mq.auto.knowledge.exchange.name"));
                //设置路由
                rabbitTemplate.setRoutingKey(env.getProperty("mq.auto.knowledge.routing.key.name"));
                //创建消息,并对消息进行持久化策略设置
                Message message = MessageBuilder
                        .withBody(objectMapper.writeValueAsBytes(info))
                        .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                        .build();
                //发送消息
                rabbitTemplate.convertAndSend(message);
                log.info("基于AUTO机制-生产者发送消息-内容为:{}",info);
            }
        }catch (Exception e){
            log.error("基于AUTO机制-生产者发送消息-发生异常:{}",info,e.fillInStackTrace());
        }
    }
}

4、开发用于监听消费消息的消费者:

@Component
public class KnowledgeConsumer {
    private static final Logger log = LoggerFactory.getLogger(KnowledgeConsumer.class);
    @Autowired
    private ObjectMapper objectMapper;
    @RabbitListener(queues = "${mq.auto.knowledge.queue.name}",containerFactory = "singleListenerContainerAuto")
    public void consumeAutoMsg(@Payload byte[] msg){
        try {
            //监听消费解析消息体
            KnowledgeInfo info = objectMapper.readValue(msg, KnowledgeInfo.class);
            log.info("基于AUTO的确认消费模式-消费者监听消费信息-内容为:{}",info);
        }catch (Exception e){
            log.error("基于AUTO的确认消费模式-消费者监听消费消息-发生异常:",e.fillInStackTrace());
        }
    }
}

5、测试方法

@Autowired
    public KnowledgePublisher knowledgePublisher;
    @Test
    public void test6(){
        KnowledgeInfo info = new KnowledgeInfo();
        info.setId(10010);
        info.setCode("auto");
        info.setMode("基于AUTO的消息确认消费模式");
        knowledgePublisher.sendAutoMsg(info);
    }

5.4.4 基于手动确认消费模式实战

1、在RabbitmqConfig配置类中创建相关组件,并创建“基于MANUAL手动确认消费模式”对应的消费者所在的监听器容器工厂实例。

/**
     * 单一消费者-确认模式为MANUAL
     */
    @Bean(name = "manualQueue")
    public Queue manualQueue(){
        return new Queue(env.getProperty("mq.manual.knowledge.queue.name"),true);
    }
    @Bean
    public TopicExchange manualExchange(){
        return new TopicExchange(env.getProperty("mq.manual.knowledge.exchange.name"),true,false);
    }
    @Bean
    public Binding manualBinding(){
        return BindingBuilder.bind(manualQueue()).to(manualExchange())
                .with(env.getProperty("mq.manual.knowledge.routing.key.name"));
    }
    /**定义手动确认消费模式对应的消费者监听器实例*/
    @Autowired
    private KnowledgeManualConsumer knowledgeManualConsumer;
    /**
     * 创建消费者监听器容器工厂实例-确认模式为MANUAL,并指定监听的队列和消费者
     */
    @Bean(name = "simpleContainerManual")
    public SimpleMessageListenerContainer simpleMessageListenerContainer(
            @Qualifier("manualQueue") Queue manualQueue){
        //创建消息监听器容器实例
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        //设置链接工厂
        container.setConnectionFactory(connectionFactory);
        //设置消息的传输格式-JSON
        container.setMessageConverter(new Jackson2JsonMessageConverter());
        //单一消费者实例配置
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        container.setPrefetchCount(1);
        //设置消息的确认模式,采用手动确认消费机制
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setQueues(manualQueue);
        //指定该容器中消息监听器,即消费者
        container.setMessageListener(knowledgeManualConsumer);
        //返回容器工厂实例
        return container;
    }
#确认消费模式为手动确认机制
mq.manual.knowledge.queue.name=${mq.env}.middleware.manual.knowledge.queue
mq.manual.knowledge.exchange.name=${mq.env}.middleware.manual.knowledge.exchange
mq.manual.knowledge.routing.key.name=${mq.env}.middleware.manual.knowledge.routing.key

2、消费者,需要实现“RabbitMQ通道确认消息监听器“,即ChannelAwareMessageListener接口,并实现onMessage()方法。

@Component("knowledgeManualConsumer")
public class KnowledgeManualConsumer implements ChannelAwareMessageListener {
    private static final Logger log = LoggerFactory.getLogger(KnowledgeManualConsumer.class);
    @Autowired
    private ObjectMapper objectMapper;

    /**
     * 监听消费消息
     * @param message 消息实体
     * @param channel 通道实例
     * @throws Exception
     */
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //获取消息属性
        MessageProperties messageProperties = message.getMessageProperties();
        //获取消息分发时的全局唯一标识
        long deliveryTag = messageProperties.getDeliveryTag();
        try {
            //获得消息体
            byte[] msg = message.getBody();
            //解析消息体
            KnowledgeInfo info = objectMapper.readValue(msg, KnowledgeInfo.class);
            log.info("确认消费模式-人为手动确认消费-监听器监听消费消息-内容为:{}",info);
            //执行完业务逻辑后,手动进行确认消费,其中第一个参数为:消息的分发标识;第二个参数;是否允许批量确认消费
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){
            log.info("确认消费模式-人为手动确认消费-监听器监听消费消息-发生异常:",e.fillInStackTrace());
            //如果在处理消息的过程中发生了异常,则依旧需要人为手动确认消费掉该消息,
            // 否则该消息将一直留在队列中,从而导致消息的重复消费
            channel.basicReject(deliveryTag,false);
        }
    }
}

3、生产者

@Component
public class KnowLedgeManualPublisher {
    private static final Logger log = LoggerFactory.getLogger(KnowLedgeManualPublisher.class);
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    private Environment env;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 基于MANUAL机制-生产者发送消费
     * @param info
     */
    public void sendAutoMsg(KnowledgeInfo info){
        try {
            if(info != null){
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                rabbitTemplate.setExchange(env.getProperty("mq.manual.knowledge.exchange.name"));
                rabbitTemplate.setRoutingKey(env.getProperty("mq.manual.knowledge.routing.key.name"));
                //创建消息
                Message message = MessageBuilder.withBody(objectMapper.writeValueAsBytes(info))
                        .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                        .build();
                rabbitTemplate.convertAndSend(message);
                log.info("基于MANUAL机制-生产者发送消息-内容为:{}",info);
            }
        }catch (Exception e){
            log.error("基于MANUAL机制-生产者发送消息-发生异常:{}",info,e.fillInStackTrace());
        }
    }

4、测试方法

@Autowired
    private KnowLedgeManualPublisher knowLedgeManualPublisher;
    @Test
    public void test7(){
        KnowledgeInfo info = new KnowledgeInfo();
        info.setId(10011);
        info.setCode("manual");
        info.setMode("基于MANUAL的消息确认消费模式");
        knowLedgeManualPublisher.sendAutoMsg(info);
    }

5.5 典型应用场景实战之用户登录成功写日志

5.5.1 需求分析

将每次用户登录成功之后的登录信息记入数据库,以便用于相关的日志分析

整体业务流程包含“登录模块”和“日志记录模块”。

5.5.2 数据库表设计

1、进行用户信息表的设计,主要包含用户id,用户名和登录密码等字段。

CREATE TABLE `user`(
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户id',
  `user_name` varchar(255) NOT NULL COMMENT '用户名',
  `password` varchar(255) NOT NULL COMMENT '密码',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_user_name` (`user_name`) USING BTREE 
)ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET = utf8 COMMENT = '用户信息表';

2、日志记录表

CREATE TABLE `sys_log`(
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT '0' COMMENT '用户id',
  `module` varchar(255) DEFAULT NULL COMMENT '所属操作模块',
  `data` varchar(5000) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '操作数据',
  `memo` varchar(5000) CHARACTER SET  utf8mb4 DEFAULT NULL COMMENT '备注',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
)ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET = utf8 COMMENT = '日志记录表';

3、对应的实体类,Mapper操作接口以及Mapper.xml

User类:

public class User {
    private Integer id;

    private String userName;

    private String password;

    private Date createTime;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName == null ? null : userName.trim();
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password == null ? null : password.trim();
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }
}

SysLog类

public class SysLog {
    private Integer id;

    private Integer userId;

    private String module;

    private String data;

    private String memo;

    private Date createTime;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getModule() {
        return module;
    }

    public void setModule(String module) {
        this.module = module == null ? null : module.trim();
    }

    public String getData() {
        return data;
    }

    public void setData(String data) {
        this.data = data == null ? null : data.trim();
    }

    public String getMemo() {
        return memo;
    }

    public void setMemo(String memo) {
        this.memo = memo == null ? null : memo.trim();
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }
}

User类对应的Mapper接口:

public interface UserMapper {
    int deleteByPrimaryKey(Integer id);

    int insert(User record);

    int insertSelective(User record);

    User selectByPrimaryKey(Integer id);

    int updateByPrimaryKeySelective(User record);

    int updateByPrimaryKey(User record);

    User selectByUserNamePassword(@Param("userName") String userName, @Param("password") String password);
}

UserMapper.xml:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.debug.middleware.model.mapper.UserMapper" >
    <resultMap id="BaseResultMap" type="com.debug.middleware.model.entity.User" >
        <id column="id" property="id" jdbcType="INTEGER" />
        <result column="user_name" property="userName" jdbcType="VARCHAR" />
        <result column="password" property="password" jdbcType="VARCHAR" />
        <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
    </resultMap>
    <sql id="Base_Column_List" >
        id, user_name, password, create_time
    </sql>
    <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
        select
        <include refid="Base_Column_List" />
        from user
        where id = #{id,jdbcType=INTEGER}
    </select>
    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer" >
        delete from user
        where id = #{id,jdbcType=INTEGER}
    </delete>
    <insert id="insert" parameterType="com.debug.middleware.model.entity.User" >
        insert into user (id, user_name, password,
                          create_time)
        values (#{id,jdbcType=INTEGER}, #{userName,jdbcType=VARCHAR}, #{password,jdbcType=VARCHAR},
                #{createTime,jdbcType=TIMESTAMP})
    </insert>
    <insert id="insertSelective" parameterType="com.debug.middleware.model.entity.User" >
        insert into user
        <trim prefix="(" suffix=")" suffixOverrides="," >
            <if test="id != null" >
                id,
            </if>
            <if test="userName != null" >
                user_name,
            </if>
            <if test="password != null" >
                password,
            </if>
            <if test="createTime != null" >
                create_time,
            </if>
        </trim>
        <trim prefix="values (" suffix=")" suffixOverrides="," >
            <if test="id != null" >
                #{id,jdbcType=INTEGER},
            </if>
            <if test="userName != null" >
                #{userName,jdbcType=VARCHAR},
            </if>
            <if test="password != null" >
                #{password,jdbcType=VARCHAR},
            </if>
            <if test="createTime != null" >
                #{createTime,jdbcType=TIMESTAMP},
            </if>
        </trim>
    </insert>
    <update id="updateByPrimaryKeySelective" parameterType="com.debug.middleware.model.entity.User" >
        update user
        <set >
            <if test="userName != null" >
                user_name = #{userName,jdbcType=VARCHAR},
            </if>
            <if test="password != null" >
                password = #{password,jdbcType=VARCHAR},
            </if>
            <if test="createTime != null" >
                create_time = #{createTime,jdbcType=TIMESTAMP},
            </if>
        </set>
        where id = #{id,jdbcType=INTEGER}
    </update>
    <update id="updateByPrimaryKey" parameterType="com.debug.middleware.model.entity.User" >
        update user
        set user_name = #{userName,jdbcType=VARCHAR},
            password = #{password,jdbcType=VARCHAR},
            create_time = #{createTime,jdbcType=TIMESTAMP}
        where id = #{id,jdbcType=INTEGER}
    </update>

    <!--根据用户名、密码查询-->
    <select id="selectByUserNamePassword" resultType="com.debug.middleware.model.entity.User">
        SELECT <include refid="Base_Column_List"/>
        FROM user WHERE user_name=#{userName} AND password=#{password}
    </select>
</mapper>

SysLog对应的Mapper操作接口:

public interface SysLogMapper {
    int deleteByPrimaryKey(Integer id);

    int insert(SysLog record);

    int insertSelective(SysLog record);

    SysLog selectByPrimaryKey(Integer id);

    int updateByPrimaryKeySelective(SysLog record);

    int updateByPrimaryKey(SysLog record);
}

SysLogMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!--xml头部定义-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<!--SysLogMapper操作接口所在的命名空间-->
<mapper namespace="com.debug.middleware.model.mapper.SysLogMapper" >
    <!--查询得到的结果集映射配置-->
    <resultMap id="BaseResultMap" type="com.debug.middleware.model.entity.SysLog" >
        <id column="id" property="id" jdbcType="INTEGER" />
        <result column="user_id" property="userId" jdbcType="INTEGER" />
        <result column="module" property="module" jdbcType="VARCHAR" />
        <result column="data" property="data" jdbcType="VARCHAR" />
        <result column="memo" property="memo" jdbcType="VARCHAR" />
        <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
    </resultMap>
    <!--定义查询的SQL片段-->
    <sql id="Base_Column_List" >
        id, user_id, module, data, memo, create_time
    </sql>
    <!--根据主键id查询日志记录-->
    <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
        select
        <include refid="Base_Column_List" />
        from sys_log
        where id = #{id,jdbcType=INTEGER}
    </select>
    <!--根据主键id删除日志记录-->
    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer" >
        delete from sys_log
        where id = #{id,jdbcType=INTEGER}
    </delete>
    <!--插入日志记录-->
    <insert id="insert" parameterType="com.debug.middleware.model.entity.SysLog" >
        insert into sys_log (id, user_id, module,
                             data, memo, create_time
                )
        values (#{id,jdbcType=INTEGER}, #{userId,jdbcType=INTEGER}, #{module,jdbcType=VARCHAR},
                #{data,jdbcType=VARCHAR}, #{memo,jdbcType=VARCHAR}, #{createTime,jdbcType=TIMESTAMP}
                       )
    </insert>
    <!--插入日志记录-->
    <insert id="insertSelective" parameterType="com.debug.middleware.model.entity.SysLog" >
        insert into sys_log
        <trim prefix="(" suffix=")" suffixOverrides="," >
            <if test="id != null" >
                id,
            </if>
            <if test="userId != null" >
                user_id,
            </if>
            <if test="module != null" >
                module,
            </if>
            <if test="data != null" >
                data,
            </if>
            <if test="memo != null" >
                memo,
            </if>
            <if test="createTime != null" >
                create_time,
            </if>
        </trim>
        <trim prefix="values (" suffix=")" suffixOverrides="," >
            <if test="id != null" >
                #{id,jdbcType=INTEGER},
            </if>
            <if test="userId != null" >
                #{userId,jdbcType=INTEGER},
            </if>
            <if test="module != null" >
                #{module,jdbcType=VARCHAR},
            </if>
            <if test="data != null" >
                #{data,jdbcType=VARCHAR},
            </if>
            <if test="memo != null" >
                #{memo,jdbcType=VARCHAR},
            </if>
            <if test="createTime != null" >
                #{createTime,jdbcType=TIMESTAMP},
            </if>
        </trim>
    </insert>
    <!--更新日志记录-->
    <update id="updateByPrimaryKeySelective" parameterType="com.debug.middleware.model.entity.SysLog" >
        update sys_log
        <set >
            <if test="userId != null" >
                user_id = #{userId,jdbcType=INTEGER},
            </if>
            <if test="module != null" >
                module = #{module,jdbcType=VARCHAR},
            </if>
            <if test="data != null" >
                data = #{data,jdbcType=VARCHAR},
            </if>
            <if test="memo != null" >
                memo = #{memo,jdbcType=VARCHAR},
            </if>
            <if test="createTime != null" >
                create_time = #{createTime,jdbcType=TIMESTAMP},
            </if>
        </set>
        where id = #{id,jdbcType=INTEGER}
    </update>
    <!--更新日志记录-->
    <update id="updateByPrimaryKey" parameterType="com.debug.middleware.model.entity.SysLog" >
        update sys_log
        set user_id = #{userId,jdbcType=INTEGER},
            module = #{module,jdbcType=VARCHAR},
            data = #{data,jdbcType=VARCHAR},
            memo = #{memo,jdbcType=VARCHAR},
            create_time = #{createTime,jdbcType=TIMESTAMP}
        where id = #{id,jdbcType=INTEGER}
    </update>
</mapper>

5.5.3 开发环境搭建

1、定义用户单击登录按钮时,后端需要接收的用户登录信息

@Data
@ToString
public class UserLoginDto implements Serializable {
    @NotBlank
    private String userName;
    @NotBlank
    private String password;
    private Integer userId;
}

2、开发“接收并处理用户登录实体信息”的控制器UserController,主要用于校验前端用户提交的相关登录信息,并将校验通过后的数据传递给Service层进行处理:

@RestController
public class UserController {
    private static final Logger log = LoggerFactory.getLogger(UserController.class);
    //前端请求前缀
    private static final String prefix = "user";
    /**
     * 注入用户操作Service层实例
     */
    @Autowired
    private UserService userService;
    @RequestMapping(value = prefix+"/login",method = RequestMethod.POST,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public BaseResponse login(@RequestBody @Validated UserLoginDto dto, BindingResult result){
        //校验前端用户提交的用户登录信息的合法性
        if(result.hasErrors()){
            return new BaseResponse(StatusCode.InvalidParams);
        }
        //定义返回结果实例
        BaseResponse response = new BaseResponse(StatusCode.Success);
        try {
            //调用Service层方法真正处理用户登录逻辑
            Boolean res = userService.login(dto);
            if(res){
                response = new BaseResponse(StatusCode.Success.getCode(),"登录成功");
            }else {
                response = new BaseResponse(StatusCode.Fail.getCode(),"登录失败-账户名密码不匹配");
            }
        }catch (Exception e){
            //表示处理过程发生异常
            response = new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
        }
        return response;
    }
}

3、开发用户登录服务UserService类相关功能

@Service
public class UserService {
    private static final Logger log = LoggerFactory.getLogger(UserService.class);
    /**
     * 注入用户实体类对应的Mapper操作接口
     */
    @Autowired
    private UserMapper userMapper;
    /**
     * 日志生产者-用于发送登录成功后相关用户消息
     */
    @Autowired
    private LogPublisher logPublisher;
    public Boolean login(UserLoginDto dto){
        //根据用户名和密码查询用户实体记录
        User user = userMapper.selectByUserNamePassword(dto.getUserName(),dto.getPassword());
        if(user!=null){
            dto.setUserId(user.getId());
            logPublisher.sendLogMsg(dto);
            return true;
        }else {
            return false;
        }
    }
}

5.5.4 基于TopicExchange构建日志消息模型

用户登录成功之后将相关的用户登录信息通过RabbitMQ的消息队列异步写入数据库中,相关配置如下:

/**
     * 用户登录成功写日志消息模型创建
     */
    @Bean(name = "loginQueue")
    public Queue loginQueue(){
        return new Queue(env.getProperty("mq.login.queue.name"),true);
    }
    @Bean
    public TopicExchange loginExchange(){
        return new TopicExchange(env.getProperty("mq.login.exchange.name"),true,false);
    }
    @Bean
    public Binding loginBinding(){
        return BindingBuilder.bind(loginQueue()).to(loginExchange())
                .with(env.getProperty("mq.login.routing.key.name"));
    }

取值:

#用户登录成功写日志消息模型
mq.login.queue.name=${mq.env}.middleware.login.queue
mq.login.exchange.name=${mq.env}.middleware.login.exchange
mq.login.routing.key.name=${mq.env}.middleware.login.routing.key

5.5.5 异步发送接收登录日志消息实战

用户登录成功后将相关登录信息异步写入数据库,即在LogPublisher类中开发相应的方法,用于将相关消息发送给RabbitMQ队列,并被相应的消费者监听消费。

@Component
public class LogPublisher {
    private static final Logger log = LoggerFactory.getLogger(LogPublisher.class);
    /**
     * 定义RabbitMQ操作组件
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**定义环境变量读取实例env*/
    @Autowired
    private Environment env;
    /**定义JSON序列化和反序列化组件*/
    @Autowired
    private ObjectMapper objectMapper;
    public void sendLogMsg(UserLoginDto loginDto){
        try {
            //设置消息传输格式-JSON
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            //设置交换机
            rabbitTemplate.setExchange(env.getProperty("mq.login.exchange.name"));
            //设置路由
            rabbitTemplate.setRoutingKey(env.getProperty("mq.login.routing.key.name"));
            //发送消息
            rabbitTemplate.convertAndSend(loginDto, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //获取消息属性
                    MessageProperties messageProperties = message.getMessageProperties();
                    //设置消息的持久化模式为持久化
                    messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    //设置消息头,表示传输的消息直接指定为某个类实例,
                    // 消费者在监听消费时可以直接定义该类对象参数进行接收即可
                    messageProperties.setHeader(AbstractJavaTypeMapper
                                      .DEFAULT_CONTENT_CLASSID_FIELD_NAME,UserLoginDto.class);
                    return message;
                }
            });
            log.info("系统日志记录-生产者-将登陆成功后的用户相关信息发送给队列-内容:{}",loginDto);
        }catch (Exception e){
            log.error("系统日志记录-生产者-将登陆成功后的用户相关信息发送给队列-发生异常:{}",loginDto,e.fillInStackTrace());
        }
    }
}

开发“用户登录成功写日志”队列对应的消费者的相关功能

@Component
public class LogConsumer {
    private static final Logger log = LoggerFactory.getLogger(LogConsumer.class);
    //定义系统日志服务实例
    @Autowired
    private SysLogService sysLogService;
    /**
     * 监听消费并处理用户登录成功后的消息
     */
    @RabbitListener(queues = "${mq.login.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeMsg(@Payload UserLoginDto loginDto){
        try {
            log.info("系统日志记录-消费者-监听消费用户登录成功后的消息=内容:{}",loginDto);
            //调用日志记录服务-用于记录用户登录成功后将相关登录信息记入数据库
            sysLogService.recordLog(loginDto);
        }catch (Exception e){
            log.error("系统日志记录-消费者-监听消费用户登录成功后的的消息-发生异常:{}",loginDto,e.fillInStackTrace());
        }
    }
}

SysLogService类为记录系统日志服务,主要是将相关日志信息记入数据库:

@Service
@EnableAsync
public class SysLogService {
    private static final Logger log = LoggerFactory.getLogger(SysLogService.class);
    /**定义系统日志操作接口Mapper*/
    @Autowired
    private SysLogMapper sysLogMapper;
    @Autowired
    private ObjectMapper objectMapper;

    /**
     * 将用户登录成功的信息记入数据库
     */
    public void recordLog(UserLoginDto dto) {
        try {
            //定义系统日志对象,并设置相应字段的取值
            SysLog entity = new SysLog();
            entity.setUserId(dto.getUserId());
            entity.setModule("用户登录模块");
            entity.setData(objectMapper.writeValueAsString(dto));
            entity.setMemo("用户登录成功记录相关登录信息");
            entity.setCreateTime(new Date());
            //插入数据库
            sysLogMapper.insertSelective(entity);
        }catch (Exception e){
            log.error("系统日志服务-记录用户登录成功的信息入数据库-发生异常:{}",dto,e.fillInStackTrace());
        }
    }
}

5.5.6 整体业务模块自测实战

1、打开Postman,在地址栏输入http://127.0.0.1:8087/middleware/user/login,请求方法为POST,选择Body为JSON,请求体中 参数userName:jack,password:123456

2、将用户名和登录密码等信息提前在数据库表user中输入,充当“测试数据”。

第6章 死信队列/延迟队列实战

6.1 死信队列概述

6.1.1 死信队列简介与作用

死信队列又称延迟队列、延时队列,即进入该队列中的消息会被延迟消费的队列。

RabbitMQ的引入主要是替代了传统处理流程的“定时器”处理逻辑,利用死信队列来进行处理。

6.2 RabbitMQ死信队列实战

死信队列占用系统资源少,不需要轮询数据库获取数据,减少DB层面资源的消耗,人为干预很少,只需要搭建好死信队列模型即可,而且会自动消费处理。

6.2.1 死信队列专有词汇介绍

  • DLX:死信交换机
  • DLK:死信路由
  • TTL:进入死信队列中的消息可以存活的时间

6.2.2 死信队列消息模型实战

生产者生产的消息首先到达第一个中转站(即基本交换机),由于基本交换机和基本路由绑定,并对应到指定的“死信队列”,因而消息将进入第一个暂存区,即“死信队列”中,当存活时间一到,消息将进入第二个中转站,即“真正的消息模型”中的死信交换机,然后直接被路由到第二个暂存区,即“真正的队列”中最终该消息被“真正的队列”对应的消费者监听消费。

基本交换机和基本路由绑定死信队列,死信交换机和死信路由绑定真正的队列。

1、DeadInfo实体类充当“消息”

2、在RabbitmqConfig配置类中创建包含死信队列的基本消息模型和包含真正队列的真正消息模型:

/**
     * 死信队列消息模型构建
     */
    /**创建死信队列*/
    @Bean
    public Queue basicDeadQueue() {
        //创建死信队列的组成成分map,用于存放组成成分的相关成员
        Map<String, Object> args = new HashMap<>();
        //创建死信交换机
        args.put("x-dead-letter-exchange",env.getProperty("mq.dead.exchange.name"));
        //创建死信路由
        args.put("x-dead-letter-routing-key",env.getProperty("mq.dead.routing.key.name"));
        //设定TTL,单位为ms,在这里指10s
        args.put("x-message-ttl",10000);
        //创建并返回死信队列实例
        return new Queue(env.getProperty("mq.dead.queue.name"),true,false,false,args);
    }
    /**创建“基本消息模型”的基本交换机-面向生产者*/
    @Bean
    public TopicExchange basicProducerExchange(){
        //创建并返回基本交换机实例
        return new TopicExchange(env.getProperty("mq.producer.basic.exchange.name"),true,false);
    }
    /**创建“基本消息模型”的基本绑定-基本交换机+基本路由 -面向生产者*/
    @Bean
    public Binding basicProducerBingding(){
        return BindingBuilder.bind(basicQueue()).to(basicExchange())
                .with(env.getProperty("mq.producer.basic.routing.key.name"));
    }
    /**创建真正的队列-面向消费者*/
    @Bean
    public Queue realConsumerQueue(){
        return new Queue(env.getProperty("mq.consumer.real.queue.name"),true);
    }
    /**创建死信交换机*/
    @Bean
    public TopicExchange basicDeadExchange(){
        return new TopicExchange(env.getProperty("mq.dead.exchange.name"),true,false);
    }
    /**创建死信路由及其绑定*/
    @Bean
    public Binding basicDeadBinding(){
        return BindingBuilder.bind(realConsumerQueue()).to(basicDeadExchange())
                .with(env.getProperty("mq.dead.routing.key.name"));
    }

3、环境变量实例env取值如下:

#死信队列消息模型
#定义死信队列的名称
mq.dead.queue.name=${mq.env}.middleware.dead.queue
#定义死信交换机的名称
mq.dead.exchange.name=${mq.env}.middleware.dead.exchange
#定义死信路由的名称
mq.dead.routing.key.name=${mq.env}.middleware.dead.routing.key
#定义“基本消息模型”中的基本交换机的名称
mq.producer.basic.exchange.name=${mq.env}.middleware.producer.basic.exchange
#定义“基本消息模型”中的基本路由名称
mq.producer.basic.routing.key.name=${mq.env}.middleware.producer.baic.routing.key
#定义面向消费者真正的队列名称
mq.consumer.real.queue.name=${mq.env}.middleware.consumer.real.queue

4、运行项目,在浏览器地址栏中输入http://127.0.0.1:15672并回车,输入账号和密码后,查看死信队列和真正的队列。

6.2.3 死信队列延迟发送消息实战

1、开发用于生产、发送消息的生产者DeadPublisher类,主要是将实体对象充当消息发送到“基本消息模型’中

@Component
public class DeadPublisher {
    private static final Logger log = LoggerFactory.getLogger(DeadPublisher.class);
    /**定义环境变量*/
    @Autowired
    private Environment env;
    /**定义RabbitMQ操作组件*/
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**定义JSON序列化和反序列化组件实例*/
    @Autowired
    private ObjectMapper objectMapper;
    /**发送对象类型的消息给死信队列*/
    public void sendMsg(DeadInfo info){
        try {
            //设置消息的传输格式为JSON
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            //设置基本的交换机
            rabbitTemplate.setExchange(env.getProperty("mq.producer.basic.exchange.name"));
            //设置基本路由
            rabbitTemplate.setRoutingKey(env.getProperty("mq.producer.basic.routing.key.name"));
            //发送对象类型的消息
            rabbitTemplate.convertAndSend(info, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //获取消息属性对象
                    MessageProperties messageProperties = message.getMessageProperties();
                    //设置消息的持久化策略
                    messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    //设置消息头,即直接指定发送的消息所属的对象类型
                    messageProperties.setHeader(AbstractJavaTypeMapper
                                                .DEFAULT_CONTENT_CLASSID_FIELD_NAME,DeadInfo.class);
                    //设置消息的TTL,当消息和队列都设置了TTL时,则取较短时间的值
                    //messageProperties.setExpiration(String.valueOf(10000));
                    //返回消息实例
                    return message;
                }
            });
            //打印日志
            log.info("死信队列实例-发送对象类型的消息入死信队列-内容为:{}",info);
        }catch (Exception e){
            log.error("死信队列实例-发送对象类型的消息入死信队列-发生异常:{}",info,e.fillInStackTrace());
        }
    }
}

2、消费者

@Component
public class DeadConsumer {
    private static final Logger log = LoggerFactory.getLogger(DeadConsumer.class);
    @Autowired
    private ObjectMapper objectMapper;
    @RabbitListener(queues = "${mq.consumer.real.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeMsg(@Payload DeadInfo info){
        try {
            log.info("死信队列实战-监听真正的队列-消费队列中的消息,监听到消息内容为:{}",info);
            //TODO:用于执行后续的相关业务逻辑
            
        }catch (Exception e){
            log.error("死信队列实战-监听真正的队列-消费队列中的消息 - 面向消费者发生异常:{}",info,e.fillInStackTrace());
        }
    }
}

3、测试

/**定义死信队列消息模型生产者实例*/
   @Autowired
   private DeadPublisher deadPublisher;
   @Test
   public void test8() throws InterruptedException {
       DeadInfo info = new DeadInfo(1,"~~我是第一则消息~~");
       deadPublisher.sendMsg(info);
       info = new DeadInfo(2,"~~我是第二则消息~~");
       deadPublisher.sendMsg(info);
       Thread.sleep(30000);
   }

6.3 典型应用场景实战之商城平台订单支付超时

6.3.1 整体业务场景介绍

实现“自动检测用户下单记录是否已经超过了支付时间”

6.3.2 整体业务流程分析

该业务场景主要由三大核心业务流程组成:

  • 用户下单:将用户下单记录存储至数据库,设置支付状态为“已保存”
  • 死信队列发送和延迟监听下单记录:实现自动监听超时支付的消息记录
  • 更新用户下单记录状态:当监听到消息记录时查询并决定是否需要更新下单记录状态,如果状态仍为“已保存”,则说明未付款,将该状态更新为“已失效”。

6.3.3 数据库设计

用户下单记录表,用来记录用户的下单历史记录

CREATE TABLE `user_order` (
  `id`          int(11)      NOT NULL AUTO_INCREMENT,
  `order_no`    varchar(255) NOT NULL
  COMMENT '订单编号',
  `user_id`     int(11)      NOT NULL
  COMMENT '用户id',
  `status`      int(11)               DEFAULT NULL
  COMMENT '状态(1=已保存,2=已付款,3=已取消)',
  `is_active`   int(255)              DEFAULT '1'
  COMMENT '是否有效(1=有效,0=失效)',
  `create_time` datetime              DEFAULT NULL,
  `update_time` datetime              DEFAULT NULL,
  PRIMARY KEY (`id`)
)
  ENGINE = InnoDB
  DEFAULT CHARSET = utf8
  COMMENT = '用户下单记录表'

RabbitMQ失效下单记录的历史记录表

CREATE TABLE `mq_order` (
  `id`            int(11) NOT NULL AUTO_INCREMENT,
  `order_id`      int(11) NOT NULL
  COMMENT '下单记录id',
  `business_time` datetime         DEFAULT NULL
  COMMENT '失效下单记录的时间',
  `memo`          varchar(255)     DEFAULT NULL
  COMMENT '备注信息',
  PRIMARY KEY (`id`)
)
  ENGINE = InnoDB
  DEFAULT CHARSET = utf8
  COMMENT = 'RabbitMQ失效下单记录的历史记录表'

1、UserOrder,用户下单实体类

public class UserOrder {
    private Integer id;

    private String orderNo;

    private Integer userId;

    private Integer status;

    private Integer isActive;

    private Date createTime;

    private Date updateTime;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getOrderNo() {
        return orderNo;
    }

    public void setOrderNo(String orderNo) {
        this.orderNo = orderNo == null ? null : orderNo.trim();
    }

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }

    public Integer getIsActive() {
        return isActive;
    }

    public void setIsActive(Integer isActive) {
        this.isActive = isActive;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Date getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }

    @Override
    public String toString() {
        return "UserOrder{" +
                "id=" + id +
                ", orderNo='" + orderNo + '\'' +
                ", userId=" + userId +
                ", status=" + status +
                ", isActive=" + isActive +
                ", createTime=" + createTime +
                ", updateTime=" + updateTime +
                '}';
    }
}

MqOrder类,RabbitMQ死信队列更新失效订单的状态实体

public class MqOrder {
    private Integer id; //主键id
    private Integer orderId; //下单记录id
    private Date businessTime; //失效下单记录状态的业务时间
    private String memo; //备注信息

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public Integer getOrderId() {
        return orderId;
    }

    public void setOrderId(Integer orderId) {
        this.orderId = orderId;
    }

    public Date getBusinessTime() {
        return businessTime;
    }

    public void setBusinessTime(Date businessTime) {
        this.businessTime = businessTime;
    }

    public String getMemo() {
        return memo;
    }

    public void setMemo(String memo) {
        this.memo = memo == null ? null : memo.trim();
    }
}

2、MqOrderMapper

public interface MqOrderMapper {
    /**根据主键id删除记录*/
    int deleteByPrimaryKey(Integer id);
    /**插入记录*/
    int insert(MqOrder record);
    /**插入记录*/
    int insertSelective(MqOrder record);
    /**根据主键id查询记录*/
    MqOrder selectByPrimaryKey(Integer id);
    /**更新记录*/
    int updateByPrimaryKeySelective(MqOrder record);
    /**更新记录*/
    int updateByPrimaryKey(MqOrder record);
}

对应的MqOrderMapper.xml代码如下:

<?xml version="1.0" encoding="UTF-8" ?>
<!--xml文档类型定义-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<!--Mapper操作接口-->
<mapper namespace="com.debug.middleware.model.mapper.MqOrderMapper" >
  <!--Mapper查询结果集映射-->
  <resultMap id="BaseResultMap" type="com.debug.middleware.model.entity.MqOrder" >
    <id column="id" property="id" jdbcType="INTEGER" />
    <result column="order_id" property="orderId" jdbcType="INTEGER" />
    <result column="business_time" property="businessTime" jdbcType="TIMESTAMP" />
    <result column="memo" property="memo" jdbcType="VARCHAR" />
  </resultMap>
  <!--Mapper查询SQL片段定义-->
  <sql id="Base_Column_List" >
    id, order_id, business_time, memo
  </sql>
  <!--根据主键id查询-->
  <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
    select 
    <include refid="Base_Column_List" />
    from mq_order
    where id = #{id,jdbcType=INTEGER}
  </select>
  <!--根据主键id删除-->
  <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer" >
    delete from mq_order
    where id = #{id,jdbcType=INTEGER}
  </delete>
  <!--插入记录-->
  <insert id="insert" parameterType="com.debug.middleware.model.entity.MqOrder" >
    insert into mq_order (id, order_id, business_time, 
      memo)
    values (#{id,jdbcType=INTEGER}, #{orderId,jdbcType=INTEGER}, #{businessTime,jdbcType=TIMESTAMP}, 
      #{memo,jdbcType=VARCHAR})
  </insert>
  <!--插入记录-->
  <insert id="insertSelective" parameterType="com.debug.middleware.model.entity.MqOrder" >
    insert into mq_order
    <trim prefix="(" suffix=")" suffixOverrides="," >
      <if test="id != null" >
        id,
      </if>
      <if test="orderId != null" >
        order_id,
      </if>
      <if test="businessTime != null" >
        business_time,
      </if>
      <if test="memo != null" >
        memo,
      </if>
    </trim>
    <trim prefix="values (" suffix=")" suffixOverrides="," >
      <if test="id != null" >
        #{id,jdbcType=INTEGER},
      </if>
      <if test="orderId != null" >
        #{orderId,jdbcType=INTEGER},
      </if>
      <if test="businessTime != null" >
        #{businessTime,jdbcType=TIMESTAMP},
      </if>
      <if test="memo != null" >
        #{memo,jdbcType=VARCHAR},
      </if>
    </trim>
  </insert>
  <!--更新记录-->
  <update id="updateByPrimaryKeySelective" parameterType="com.debug.middleware.model.entity.MqOrder" >
    update mq_order
    <set >
      <if test="orderId != null" >
        order_id = #{orderId,jdbcType=INTEGER},
      </if>
      <if test="businessTime != null" >
        business_time = #{businessTime,jdbcType=TIMESTAMP},
      </if>
      <if test="memo != null" >
        memo = #{memo,jdbcType=VARCHAR},
      </if>
    </set>
    where id = #{id,jdbcType=INTEGER}
  </update>
  <!--更新记录-->
  <update id="updateByPrimaryKey" parameterType="com.debug.middleware.model.entity.MqOrder" >
    update mq_order
    set order_id = #{orderId,jdbcType=INTEGER},
      business_time = #{businessTime,jdbcType=TIMESTAMP},
      memo = #{memo,jdbcType=VARCHAR}
    where id = #{id,jdbcType=INTEGER}
  </update>
</mapper>

3、UserOrderMapper操作接口和对应的xml配置文件

public interface UserOrderMapper {
    /**根据主键id删除记录*/
    int deleteByPrimaryKey(Integer id);
    /**插入记录*/
    int insert(UserOrder record);
    /**插入记录*/
    int insertSelective(UserOrder record);
    /**根据主键id查询记录*/
    UserOrder selectByPrimaryKey(Integer id);
    /**更新记录*/
    int updateByPrimaryKeySelective(UserOrder record);
    /**更新记录*/
    int updateByPrimaryKey(UserOrder record);
    /**根据下单记录Id和支付状态查询*/
    UserOrder selectByIdAndStatus(@Param("id") Integer id, @Param("status") Integer status);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!--xml文档类型说明-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<!--UserOrderMapper操作接口所在的命名空间定义-->
<mapper namespace="com.debug.middleware.model.mapper.UserOrderMapper" >
  <!--查询得到的结果集映射-->
  <resultMap id="BaseResultMap" type="com.debug.middleware.model.entity.UserOrder" >
    <id column="id" property="id" jdbcType="INTEGER" />
    <result column="order_no" property="orderNo" jdbcType="VARCHAR" />
    <result column="user_id" property="userId" jdbcType="INTEGER" />
    <result column="status" property="status" jdbcType="INTEGER" />
    <result column="is_active" property="isActive" jdbcType="INTEGER" />
    <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
    <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
  </resultMap>
  <!--查询的SQL片段-->
  <sql id="Base_Column_List" >
    id, order_no, user_id, status, is_active, create_time, update_time
  </sql>
  <!--根据下单记录Id查询记录-->
  <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
    select 
    <include refid="Base_Column_List" />
    from user_order
    where id = #{id,jdbcType=INTEGER}
  </select>
  <!--根据主键id删除记录-->
  <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer" >
    delete from user_order
    where id = #{id,jdbcType=INTEGER}
  </delete>
  <!--插入用户下单记录-->
  <insert id="insert" useGeneratedKeys="true" keyProperty="id" parameterType="com.debug.middleware.model.entity.UserOrder" >
    insert into user_order (id, order_no, user_id, 
      status, is_active, create_time, 
      update_time)
    values (#{id,jdbcType=INTEGER}, #{orderNo,jdbcType=VARCHAR}, #{userId,jdbcType=INTEGER}, 
      #{status,jdbcType=INTEGER}, #{isActive,jdbcType=INTEGER}, #{createTime,jdbcType=TIMESTAMP}, 
      #{updateTime,jdbcType=TIMESTAMP})
  </insert>
  <!--插入用户下单记录-->
  <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id" parameterType="com.debug.middleware.model.entity.UserOrder" >
    insert into user_order
    <trim prefix="(" suffix=")" suffixOverrides="," >
      <if test="id != null" >
        id,
      </if>
      <if test="orderNo != null" >
        order_no,
      </if>
      <if test="userId != null" >
        user_id,
      </if>
      <if test="status != null" >
        status,
      </if>
      <if test="isActive != null" >
        is_active,
      </if>
      <if test="createTime != null" >
        create_time,
      </if>
      <if test="updateTime != null" >
        update_time,
      </if>
    </trim>
    <trim prefix="values (" suffix=")" suffixOverrides="," >
      <if test="id != null" >
        #{id,jdbcType=INTEGER},
      </if>
      <if test="orderNo != null" >
        #{orderNo,jdbcType=VARCHAR},
      </if>
      <if test="userId != null" >
        #{userId,jdbcType=INTEGER},
      </if>
      <if test="status != null" >
        #{status,jdbcType=INTEGER},
      </if>
      <if test="isActive != null" >
        #{isActive,jdbcType=INTEGER},
      </if>
      <if test="createTime != null" >
        #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="updateTime != null" >
        #{updateTime,jdbcType=TIMESTAMP},
      </if>
    </trim>
  </insert>
  <!--根据下单记录id更新记录-->
  <update id="updateByPrimaryKeySelective" parameterType="com.debug.middleware.model.entity.UserOrder" >
    update user_order
    <set >
      <if test="orderNo != null" >
        order_no = #{orderNo,jdbcType=VARCHAR},
      </if>
      <if test="userId != null" >
        user_id = #{userId,jdbcType=INTEGER},
      </if>
      <if test="status != null" >
        status = #{status,jdbcType=INTEGER},
      </if>
      <if test="isActive != null" >
        is_active = #{isActive,jdbcType=INTEGER},
      </if>
      <if test="createTime != null" >
        create_time = #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="updateTime != null" >
        update_time = #{updateTime,jdbcType=TIMESTAMP},
      </if>
    </set>
    where id = #{id,jdbcType=INTEGER}
  </update>
  <!--根据下单记录id更新记录-->
  <update id="updateByPrimaryKey" parameterType="com.debug.middleware.model.entity.UserOrder" >
    update user_order
    set order_no = #{orderNo,jdbcType=VARCHAR},
      user_id = #{userId,jdbcType=INTEGER},
      status = #{status,jdbcType=INTEGER},
      is_active = #{isActive,jdbcType=INTEGER},
      create_time = #{createTime,jdbcType=TIMESTAMP},
      update_time = #{updateTime,jdbcType=TIMESTAMP}
    where id = #{id,jdbcType=INTEGER}
  </update>
  <!--根据下单记录Id和支付状态查询-->
  <select id="selectByIdAndStatus" resultType="com.debug.middleware.model.entity.UserOrder">
    SELECT
        <include refid="Base_Column_List"/>
    FROM
        user_order
    WHERE
        is_active
    AND `status` = #{status}
    AND id = #{id}
  </select>

</mapper>

6.3.4 构建RabbitMQ死信队列消息模型

假设用户下单之后超时支付的时间为“10秒”

1、在RabbitmqConfig配置类中构建“用户下单支付超时”对应的死信队列消息模型

/**用户下单支付超时-RabbitMQ死信队列消息模型构建**/

    //创建死信队列
    @Bean
    public Queue orderDeadQueue() {
        Map<String, Object> args = new HashMap();
        args.put("x-dead-letter-exchange", env.getProperty("mq.order.dead.exchange.name"));
        args.put("x-dead-letter-routing-key", env.getProperty("mq.order.dead.routing.key.name"));

        //设定TTL,单位为ms,在这里为了测试方便,设置为10s,当然实际业务场景可能为1h或者更长
        args.put("x-message-ttl", 10000);
        return new Queue(env.getProperty("mq.order.dead.queue.name"), true, false, false, args);
    }

    //创建“基本消息模型”的基本交换机 - 面向生产者
    @Bean
    public TopicExchange orderProducerExchange() {
        return new TopicExchange(env.getProperty("mq.producer.order.exchange.name"), true, false);
    }

    //创建“基本消息模型”的基本绑定-基本交换机+基本路由 - 面向生产者
    @Bean
    public Binding orderProducerBinding() {
        return BindingBuilder.bind(orderDeadQueue()).to(orderProducerExchange())
            .with(env.getProperty("mq.producer.order.routing.key.name"));
    }

    //创建真正队列 - 面向消费者
    @Bean
    public Queue realOrderConsumerQueue() {
        return new Queue(env.getProperty("mq.consumer.order.real.queue.name"), true);
    }

    //创建死信交换机
    @Bean
    public TopicExchange basicOrderDeadExchange() {
        return new TopicExchange(env.getProperty("mq.order.dead.exchange.name"), true, false);
    }

    //创建死信路由及其绑定
    @Bean
    public Binding basicOrderDeadBinding() {
        return BindingBuilder.bind(realOrderConsumerQueue()).to(basicOrderDeadExchange())
            .with(env.getProperty("mq.order.dead.routing.key.name"));
    }

读取的变量取值如下:

#用户下单支付超时-死信队列消息模型
#定义死信队列名称
mq.order.dead.queue.name=${mq.env}.middleware.order.dead.queue
#定义交换机名称
mq.order.dead.exchange.name=${mq.env}.middleware.order.dead.exchange
#定义死信路由名称
mq.order.dead.routing.key.name=${mq.env}.middleware.order.dead.routing.key
#定义基本交换机名称
mq.producer.order.exchange.name=${mq.env}.middleware.order.basic.exchange
#定义基本路由名称
mq.producer.order.routing.key.name=${mq.env}.middleware.order.basic.routing.key
#定义真正队列名称
mq.consumer.order.real.queue.name=${mq.env}.middleware.consumer.order.real.queue

2、运行项目,在浏览器地址栏中输入http://127.0.0.1.15672,查看该消息模型对应的“死信队列”和“真正的队列”信息

6.3.5 Controller层开发用户下单及订单失效功能

Controller层指应用系统针对指定的业务模块开发相关的业务逻辑处理代码,包括接收用户前端请求的控制层、处理核心业务逻辑的服务层及提供额外服务的中间件处理层。

1、开发用于接受并处理用户前端请求的控制层UserOrderController,主要用于接收前端用户下单时的相关信息

@RestController
public class UserOrderController {
    private static final Logger log = LoggerFactory.getLogger(UserOrderController.class);
    //定义请求前缀
    private static final String prefix = "user/order";
    /**
     * 用户下单处理服务实例
     */
    @Autowired
    private DeadUserOrderService deadUserOrderService;
    /**
     * 用户下单请求的接收和处理
     */
    @RequestMapping(value = prefix+"/push",method = RequestMethod.POST,
    consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public BaseResponse login(@RequestBody @Validated UserOrderDto dto, BindingResult result){
        //判断请求参数的合法性
        if(result.hasErrors()){
            return new BaseResponse(StatusCode.InvalidParams);
        }
        //定义响应结果实例
        BaseResponse response = new BaseResponse(StatusCode.Success);
        try {
            //调用Service层真正处理用户下单的业务逻辑
            deadUserOrderService.pushUserOrder(dto);
        }catch (Exception e){
            //Service层在处理的过程中如果发生异常,
            //则抛出异常并被Controller层捕获返回给前端用户
            response = new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
        }
        return response;
    }
}

2、UserOrderDto实体类用于接收前端用户下单时提交的信息。

@Data
@ToString
public class UserOrderDto {
    @NotBlank
    private String orderNo;
    @NotNull
    private Integer userId;
}

3、DeadUserOrderService类,主要包含两大核心功能,即“用户下单”功能和“更新用户下单记录的状态”功能

@Service
public class DeadUserOrderService {
    private static final Logger log = LoggerFactory.getLogger(DeadUserOrderService.class);
    /**定义用户下单操作Mapper*/
    @Autowired
    private UserOrderMapper userOrderMapper;
    /**定义更新失效用户下单记录状态Mapper*/
    @Autowired
    private MqOrderMapper mqOrderMapper;
    /**
     * 用户下单-将生成的下单记录id压入死信队列中等待延迟处理
     * @param userOrderDto
     */
    public void pushUserOrder(UserOrderDto userOrderDto){
        //创建用户下单实例
        UserOrder userOrder = new UserOrder();
        //复制userOrderDto对应的字段取值到新的实例对象userOrder中
        BeanUtils.copyProperties(userOrderDto,userOrder);
        //设置支付状态为已保存
        userOrder.setStatus(1);
        //设置下单时间
        userOrder.setCreateTime(new Date());
        //插入用户下单记录
        userOrderMapper.insertSelective(userOrder);
        log.info("用户成功下单,下单信息为“{}",userOrder);
    }

    /**
     * 更新用户下单记录的状态
     */
    public void updateUserOrderRecord(UserOrder userOrder){
        try {
            //判断用户下单记录实体是否为null
            if(userOrder != null){
                //更新失效用户下单记录
                userOrder.setIsActive(0);
                //设置失效时进行更新的时间
                userOrder.setUpdateTime(new Date());
                //更新下单记录实体信息
                userOrderMapper.updateByPrimaryKeySelective(userOrder);
                
                //记录“失效用户下单记录”的历史
                //定义Rabbitmq死信队列历史失效记录实例
                MqOrder mqOrder = new MqOrder();;
                //设置失效时间
                mqOrder.setBusinessTime(new Date());
                //设置备注信息
                mqOrder.setMemo("更新失效当前用户下单记录Id,orderId="+userOrder.getId());
                //设置下单记录id
                mqOrder.setOrderId(userOrder.getId());
                //插入失效记录
                mqOrderMapper.insertSelective(mqOrder);
            }
        }catch (Exception e){
            log.error("用户下单支付超时-处理服务-更新用户下单记录的状态发生异常:",e.fillInStackTrace());
        }
    }
}

4、运行项目,打开Postman,在地址栏中输入“访问用户下单”的请求链接http://127.0.0.1:8087/middleware/user/order/push,请求方式为Post,并选择请求体的数据格式为JSON,请求体的数据如下:

{
	"orderNo":201190411001,
	"userId":10010
}

此时Postman得到了“成功”的响应结果,并且用户下单记录已经成功存储至数据表中。

6.3.6 “用户下单支付超时”延迟发送接收实战

1、开发生产者

@Component
public class DeadOrderPublisher {
    private static final Logger log = LoggerFactory.getLogger(DeadOrderPublisher.class);
    /**定义读取环境变量实例*/
    @Autowired
    private Environment env;
    /**定义RabbitMQ操作组件*/
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**定义JSON序列化和反序列化组件实例*/
    @Autowired
    private ObjectMapper objectMapper;
    /**
     * 将用户下单记录id充当消息发送给死信队列
     */
    public void sendMsg(Integer orderId){
        try {
            //设置消息的传输格式-JSON
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            //设置基本交换机
            rabbitTemplate.setExchange(env.getProperty("mq.producer.order.exchange.name"));
            //设置基本路由
            rabbitTemplate.setRoutingKey(env.getProperty("mq.producer.order.routing.key.name"));
            //发送对象类型的消息
            rabbitTemplate.convertAndSend(orderId, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //获取消息属性对象
                    MessageProperties messageProperties = message.getMessageProperties();
                    //设置消息的持久化策略
                    messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    //设置消息头,即直接指定发送的消息所属的对象类型
                    messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,Integer.class);
                    //返回消息实例
                    return message;
                }
            });
            //打印日志
            log.info("用户下单支付超时-发送用户下单记录id的消息入死信队列-内容为:orderId={} ",orderId);
        }catch (Exception e){
            log.error("用户下单支付超时-发送用户下单记录id的消息入死信队列-发生异常:orderId={} ",orderId,e.fillInStackTrace());
        }
    }
}

2、开发消费者

@Component
public class DeadOrderConsumer {
    private static final Logger log = LoggerFactory.getLogger(DeadOrderConsumer.class);
    /**定义JSON序列化和反序列化组件*/
    @Autowired
    private ObjectMapper objectMapper;
    /**定义用户下单操作Mapper*/
    @Autowired
    private UserOrderMapper userOrderMapper;
    /**用户下单支付超时-处理服务实例*/
    @Autowired
    private DeadUserOrderService deadUserOrderService;
    /**用户下单支付超时消息模型-监听真正的队列*/
    @RabbitListener(queues = "${mq.consumer.order.real.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeMsg(@Payload Integer orderId){
        try {
            log.info("用户下单支付超时消息模型-监听真正的队列-监听到消息内容为:orderId={}",orderId);
            //TODO:接下来是执行核心的业务逻辑
            //查询该用户下单记录id对应的支付状态是否为“已保存”
            UserOrder userOrder = userOrderMapper.selectByIdAndStatus(orderId,1);
            if(userOrder != null){
                //不等于null,则代表该用户下单记录仍然为“已保存”状态,即该用户已经超时,
                // 没支付该笔订单,因而需要失效该笔下单价记录
                deadUserOrderService.updateUserOrderRecord(userOrder);
            }
        }catch (Exception e){
            log.error("用户下单支付超时消息模型-监听真正队列-发生异常:orderId={}"
                    ,orderId,e.fillInStackTrace());
        }
    }
}

3、在用户下单服务DeadUserOrderService类的pushUserOrder方法中,加入死信队列生产者“发送订单记录id”的方法调用

public void pushUserOrder(UserOrderDto userOrderDto){
        //创建用户下单实例
        UserOrder userOrder = new UserOrder();
        //复制userOrderDto对应的字段取值到新的实例对象userOrder中
        BeanUtils.copyProperties(userOrderDto,userOrder);
        //设置支付状态为已保存
        userOrder.setStatus(1);
        //设置下单时间
        userOrder.setCreateTime(new Date());
        //插入用户下单记录
        userOrderMapper.insertSelective(userOrder);
        log.info("用户成功下单,下单信息为“{}",userOrder);
        
        //生成用户下单记录id
        Integer orderId = userOrder.getId();
        //将生成的用户下单记录id压入死信队列中等待延迟处理
        deadOrderPublisher.sendMsg(orderId);
    }

6.3.7 “用户下单支付超时”整体功能自测

1、打开postman,在地址栏中输入“访问用户下单”的请求链接http://127.0.0.1:8087/middleware/user/order/push,请求方式为Post,并选择请求体的数据格式为JSON,请求体的数据如下:

{
	"orderNo":201190412001,
	"userId":10011
}

表示当前用户id为10011,购物车对应的订单编号为20190412001,单击Send按钮。

第7章 分布式锁实战

7.1 分布式锁概念

集群、分布式部署的服务实例一般是部署在不同机器上,此种资源共享将不再是传统的线程共享,而是跨JVM进程之间资源的共享。

7.1.1 锁机制

共享资源:指可以被多个线程、进程同时访问并进行操作的数据或者代码块。

public class LockOne {
    private static final Logger log = LoggerFactory.getLogger(LockOne.class);

    public static void main(String[] args) {
        //创建存钱线程实例
        Thread tAdd = new Thread(new LockThread(100));
        //创建取钱线程实例
        Thread tSub = new Thread(new LockThread(-100));
        //开启存钱线程的操作
        tAdd.start();
        //开启取钱线程的操作
        tSub.start();
    }
}
class LockThread implements Runnable{
    private static final Logger log = LoggerFactory.getLogger(LockThread.class);
    /**定义成员变量-用于接收线程初始化时提供的金额-代表取/存的金额*/
    private int count;

    public LockThread(int count) {
        this.count = count;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                //通过传进来的金额(可正、可负)执行叠加操作
                SysConstant.amount = SysConstant.amount + count;
                log.info("此时账户余额为:{}",SysConstant.amount);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

其中,SysConstant为存放共享变量的系统常量类

public class SysConstant {
    public static Integer amount = 500;
}

未完…待定~


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

Git常用指令 Previous
程序员高效开发手册 Next