redis学习

redis学习

Posted by DYC on March 15, 2024

redis学习

redis基础

NoSQL

非关系型数据库

  1. 结构化与非结构化

    传统关系型数据库是结构化数据,每张表都有严格的约束信息,插入数据必须遵守这些约束

    而NoSQL则对数据库格式没有严格约束,往往形式松散,比如像键值型数据库、文档型等

  2. 关联与非关联

    传统数据库的表与表之间往往存在关联,比如使用外键等

    非关系型数据库不存在关联关系,要维护关系要么靠代码中的业务逻辑,要么靠数据之间的耦合

  3. 查询方式

    SQL有固定的语法查询格式,有统一标准

    不同的非关系数据库查询语法差异极大

    image-20240514110537755

  4. 事务

    关系型数据库支持事务,支持ACID原则,非关系型数据库不支持事务

  5. 扩展性

    关系型数据库集群模式一般是主从,主从数据一致,起到数据备份的作用,称为垂直扩展

    非关系型数据库将数据拆分,存储在不同机器上,可以保存海量数据,称为水平扩展

redis安装
  1. 将安装包放到一个位置,可以是/usr/local/src 目录

  2. 解压

    1
    
    tar -xzf redis-6.2.6.tar.gz
    
  3. 进入解压目录

    1
    
    cd redis-6.2.6
    
  4. 运行编译

    1
    
    make && make install
    
  5. 设置后台配置

    如果文件是只读模式,可以使用sudo vi的模式进行修改

    在配置文件中想找某个关键词,可以在esc模式下,用:/keyword进行检索

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    
    # 允许访问的地址,默认是127.0.0.1,会导致只能在本地访问。修改为0.0.0.0则可以在任意IP访问,生产环境不要设置为0.0.0.0
    bind 0.0.0.0
    # 守护进程,修改为yes后即可后台运行
    daemonize yes 
    # 密码,设置后访问Redis必须输入密码
    requirepass 123321
    # 监听的端口
    port 6379
    # 工作目录,默认是当前目录,也就是运行redis-server时的命令,日志、持久化等文件会保存在这个目录
    dir .
    # 数据库数量,设置为1,代表只使用1个库,默认有16个库,编号0~15
    databases 1
    # 设置redis能够使用的最大内存
    maxmemory 512mb
    # 日志文件,默认为空,不记录日志,可以指定日志文件名
    logfile "redis.log"
    
    1
    2
    3
    4
    
    # 进入redis安装目录 
    cd /usr/local/src/redis-6.2.6
    # 启动
    redis-server redis.conf
    
  6. 运行

    1
    
    redis-server
    
  7. 关闭redis

    1
    2
    3
    4
    5
    6
    
    #找到进程号,接着kill掉进程
    lsof -i :6379
    #或者
    ps aux | grep redis-server
    #接着
    kill -9 pid
    
  8. 连接客户端

    Redis安装完成后就自带了命令行客户端:redis-cli,使用方式如下:

    1
    
    redis-cli [options] [commonds]
    

    其中常见的options有:

    • -h 127.0.0.1:指定要连接的redis节点的IP地址,默认是127.0.0.1
    • -p 6379:指定要连接的redis节点的端口,默认是6379
    • -a 123321:指定redis的访问密码

    其中的commonds就是Redis的操作命令,例如:

    • ping:与redis服务端做心跳测试,服务端正常会返回pong
    1
    
    redis-cli -h 127.0.0.1 -p 6379 -a 123321
    

查找redis的安装目录

1
which redis-server

查找目标端口正在运行的进程

1
2
#找到进程号,接着kill掉进程
lsof -i :6379

如果使用可视化客户端,远程连接失败的话,可以考虑设置防火墙

1
firewall-cmd --zone=public --add-port=6379/tcp --permanent
redis数据类型

String类型

String类型,其value是字符串,不过根据字符串的格式不同,又可以分为3类:

  • string:普通字符串
  • int:整数类型,可以做自增、自减操作
  • float:浮点类型,可以做自增、自减操作

不管是哪种格式,底层都是字节数组形式存储,只不过是编码方式不同。字符串类型的最大空间不能超过512m.

key的结构

由于Redis没有表的概念,为了能更好的区分不同类型的key(比如两个物品id都是1),可以对key添加前缀进行区分

Redis的key允许有多个单词形成层级结构,多个单词之间用’:’隔开,格式如下:

1
	项目名:业务名:类型:id

例如项目名称叫 heima,有user和product两种不同类型的数据,我们可以这样定义key:

  • user相关的key:heima:user:1

  • product相关的key:heima:product:1

如果Value是一个Java对象,例如一个User对象,则可以将对象序列化为JSON字符串后存储:

KEY VALUE
heima:user:1 {“id”:1, “name”: “Jack”, “age”: 21}
heima:product:1 {“id”:1, “name”: “小米11”, “price”: 4999}

Hash类型

考虑到对string类型进行修改时,其value类型修改起来只能覆盖,所以使用hash将对象中的每个字段独立存储,可以针对单个字段做CRUD

hash类型修改value时相对于string更灵活

image-20240515111912754

List类型

类似于双向链表的结构,既支持正向检索,也支持反向检索

保存对先后顺序有要求的结构,常用来存储一个有序数据,例如:朋友圈点赞列表,评论列表等。

阻塞式获取(BLPOP和BRPOP):会设置一个等待时间,在这个时间内有的话就返回。与LPOP和RPOP类似,只不过在没有元素时等待指定时间,而不是直接返回nil

Set类型

Redis的Set结构与Java中的HashSet类似,可以看做是一个value为null的HashMap。因为也是一个hash表,因此具备与HashSet类似的特征:

  • 无序

  • 元素不可重复

  • 查找快

  • 支持交集、并集、差集等功能

Set的常见命令有:

单个集合操作

  • SADD key member … :向set中添加一个或多个元素
  • SREM key member … : 移除set中的指定元素
  • SCARD key: 返回set中元素的个数
  • SISMEMBER key member:判断一个元素是否存在于set中
  • SMEMBERS:获取set中的所有元素

多个集合操作

  • SINTER key1 key2 … :求key1与key2的交集
  • SDIFF key1 key2 … :求key1与key2的差集
  • SUNION key1 key2 … :求key1与key2的并集

SortedSet

因为SortedSet的可排序特性,经常被用来实现排行榜这样的功能

所有的排名默认都是升序,如果要降序则在命令的Z后面添加REV即可

添加元素时,是有score和元素的

SortedSet的常见命令有:

  • ZADD key score member:添加一个或多个元素到sorted set ,如果已经存在则更新其score值
  • ZREM key member:删除sorted set中的一个指定元素
  • ZSCORE key member : 获取sorted set中的指定元素的score值
  • ZRANK key member:获取sorted set 中的指定元素的排名
  • ZCARD key:获取sorted set中的元素个数
  • ZCOUNT key min max:统计score值在给定范围内的所有元素的个数
  • ZINCRBY key increment member:让sorted set中的指定元素自增,步长为指定的increment值
  • ZRANGE key min max:按照score排序后,获取指定排名范围内的元素
  • ZRANGEBYSCORE key min max:按照score排序后,获取指定score范围内的元素
  • ZDIFF、ZINTER、ZUNION:求差集、交集、并集

注意:所有的排名默认都是升序,如果要降序则在命令的Z后面添加REV即可,例如:

  • 升序获取sorted set 中的指定元素的排名:ZRANK key member

  • 降序获取sorted set 中的指定元素的排名:ZREVRANK key memeber

Jedis客户端
  • Jedis和Lettuce:这两个主要是提供了Redis命令对应的API,方便我们操作Redis,而SpringDataRedis又对这两种做了抽象和封装,因此我们后期会直接以SpringDataRedis来学习。
  • Redisson:是在Redis基础上实现了分布式的可伸缩的java数据结构,例如Map、Queue等,而且支持跨进程的同步机制:Lock、Semaphore等待,比较适合用来实现特殊的功能需求

连接池

Jedis本身是线程不安全的,并且频繁的创建和销毁连接会有性能损耗,因此我们推荐大家使用Jedis连接池代替Jedis的直连方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.heima.jedis.util;

import redis.clients.jedis.*;

public class JedisConnectionFactory {

    private static JedisPool jedisPool;

    static {
        // 配置连接池
        JedisPoolConfig poolConfig = new JedisPoolConfig();
      	//最大连接数
        poolConfig.setMaxTotal(8);
      	//最大空闲数
        poolConfig.setMaxIdle(8);
      	//最小空闲连接
        poolConfig.setMinIdle(0);
      	//连接池没有连接时最长等待时间
        poolConfig.setMaxWaitMillis(1000);
        // 创建连接池对象,参数:连接池配置、服务端ip、服务端端口、超时时间、密码
        jedisPool = new JedisPool(poolConfig, "192.168.150.101", 6379, 1000, "123321");
    }

    public static Jedis getJedis(){
        return jedisPool.getResource();
    }
}
SpringDataRedis客户端

SpringData是Spring中数据操作的模块,包含对各种数据库的集成,其中对Redis的集成模块就叫做SpringDataRedis

  • 提供了对不同Redis客户端的整合(Lettuce和Jedis)
  • 提供了RedisTemplate统一API来操作Redis
  • 支持Redis的发布订阅模型
  • 支持Redis哨兵和Redis集群
  • 支持基于Lettuce的响应式编程
  • 支持基于JDK、JSON、字符串、Spring对象的数据序列化及反序列化
  • 支持基于Redis的JDKCollection实现

SpringDataRedis中提供了RedisTemplate工具类,其中封装了各种对Redis的操作。并且将不同数据类型的操作API封装到了不同的类型中:

image-20240515150852817

使用

  1. 引入依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.5.7</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.heima</groupId>
        <artifactId>redis-demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>redis-demo</name>
        <description>Demo project for Spring Boot</description>
        <properties>
            <java.version>1.8</java.version>
        </properties>
        <dependencies>
            <!--redis依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <!--common-pool-->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
            </dependency>
            <!--Jackson依赖-->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
       
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <configuration>
                        <excludes>
                            <exclude>
                                <groupId>org.projectlombok</groupId>
                                <artifactId>lombok</artifactId>
                            </exclude>
                        </excludes>
                    </configuration>
                </plugin>
            </plugins>
        </build>
       
    </project>
    
  2. 编写配置

    spring默认使用lettuce,如果要用jedis就要再单独引入依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    
    spring:
      redis:
        host: 192.168.150.101
        port: 6379
        password: 123321
        lettuce:
          pool:
            max-active: 8
            max-idle: 8
            min-idle: 0
            max-wait: 100ms
    
  3. 注入

    1
    2
    3
    4
    5
    6
    
    @SpringBootTest
    class RedisStringTests {
       
        @Autowired
        private RedisTemplate redisTemplate;
    }
    
  4. 编写测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    
    @SpringBootTest
    class RedisStringTests {
       
        @Autowired
        private RedisTemplate edisTemplate;
       
        @Test
        void testString() {
            // 写入一条String数据
            redisTemplate.opsForValue().set("name", "虎哥");
            // 获取string数据
            Object name = stringRedisTemplate.opsForValue().get("name");
            System.out.println("name = " + name);
        }
    }
    

自定义序列化

RedisTemplate可以接收任意Object作为值写入Redis,其原理是讲得到的object对象序列化为字节形式,默认采用JDK序列化,所以得到下面的存储效果

image-20240515152101959

这样的方式可读性差,内存占用大

所以可以自定义RedisTemplate的序列化方式,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory){
        // 创建RedisTemplate对象
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 设置连接工厂
        template.setConnectionFactory(connectionFactory);
        // 创建JSON序列化工具
        GenericJackson2JsonRedisSerializer jsonRedisSerializer = 
            							new GenericJackson2JsonRedisSerializer();
        // 设置Key的序列化
        template.setKeySerializer(RedisSerializer.string());
        template.setHashKeySerializer(RedisSerializer.string());
        // 设置Value的序列化
        template.setValueSerializer(jsonRedisSerializer);
        template.setHashValueSerializer(jsonRedisSerializer);
        // 返回
        return template;
    }
}

上面是通过采用了JSON序列化来代替默认的JDK序列化方式,将Java对象自动的序列化为JSON字符串,并且查询时能自动把JSON反序列化为Java对象。不过,其中记录了序列化时对应的class名称,目的是为了查询时实现自动反序列化。这会带来额外的内存开销。

1
2
3
4
5
6
@SpringBootTest
class RedisStringTests {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
}

但这样的方式存在额外内存消耗的问题,多了下面这个class

image-20240515153703273

这是因为为了反序列化的时候知道对象类型,JSON序列化器会将class类型写入json中,存入redis,带来额外的内存花销

StringRedisTemplate

为了节省内存空间,我们可以不使用JSON序列化器来处理value,而是统一使用String序列化器,要求只能存储String类型的key和value。当需要存储Java对象时,手动完成对象的序列化和反序列化。

使用StringRedisTemplate,它的key和value的序列化方式默认就是String方式如果value是对象的话就需要手动序列化,省去了我们自定义RedisTemplate的序列化方式的步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Autowired
private StringRedisTemplate stringRedisTemplate;
// JSON序列化工具
private static final ObjectMapper mapper = new ObjectMapper();

@Test
void testSaveUser() throws JsonProcessingException {
    // 创建对象
    User user = new User("虎哥", 21);
    // 手动序列化
    String json = mapper.writeValueAsString(user);
    // 写入数据
    stringRedisTemplate.opsForValue().set("user:200", json);

    // 获取数据
    String jsonUser = stringRedisTemplate.opsForValue().get("user:200");
    // 手动反序列化
    User user1 = mapper.readValue(jsonUser, User.class);
    System.out.println("user1 = " + user1);
}

redis项目

手机或者app端发起请求,请求我们的nginx服务器,nginx基于七层模型走的事HTTP协议,可以实现基于Lua直接绕开tomcat访问redis,也可以作为静态资源服务器,轻松扛下上万并发, 负载均衡到下游tomcat服务器,打散流量,我们都知道一台4核8G的tomcat,在优化和处理简单业务的加持下,大不了就处理1000左右的并发, 经过nginx的负载均衡分流后,利用集群支撑起整个项目,同时nginx在部署了前端项目后,更是可以做到动静分离,进一步降低tomcat服务的压力,这些功能都得靠nginx起作用,所以nginx是整个项目中重要的一环。

在tomcat支撑起并发流量后,我们如果让tomcat直接去访问Mysql,根据经验Mysql企业级服务器只要上点并发,一般是16或32 核心cpu,32 或64G内存,像企业级mysql加上固态硬盘能够支撑的并发,大概就是4000起~7000左右,上万并发, 瞬间就会让Mysql服务器的cpu,硬盘全部打满,容易崩溃,所以我们在高并发场景下,会选择使用mysql集群,同时为了进一步降低Mysql的压力,同时增加访问的性能,我们也会加入Redis,同时使用Redis集群使得Redis对外提供更好的服务。

image-20240515165236101

环境搭建

  1. 导入后端项目,修改properites文件

    修改之后在services中添加一个springboot的启动器,进行启动,接着访问http://localhost:8081/shop-type/list

  2. 导入前端项目

    1. 通过brew info nginx找到nginx的

      image-20240515193127857

    2. 将前端文件的html文件夹放入安装目录中,再将前端文件的conf方法下载位置中,进行覆盖

      可以打开目标文件夹

      1
      
      open xxxx
      
    3. 接着输入sudo nginx即可运行

    4. 如果运行失败,考虑是否端口被占用

      1
      2
      3
      4
      5
      6
      
      #看目标端口哪个进程在用
      lsof -i:8081
      #看哪些进程在用nginx
      ps -ef|grep nginx
      #找到进程号之后就可以kill掉
      kill -9 pid
      

输入链接验证

短信登录

基于session实现登录

image-20240515211103953

发送短信验证码

controller层

1
2
3
4
5
6
    @PostMapping("code")
    public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {

        // TODO 发送短信验证码并保存验证码
        return userService.sendCode(phone,session);
    }

service层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 @Override
    public Result sendCode(String phone, HttpSession session) {

        // 1.校验手机号
        if (RegexUtils.isPhoneInvalid(phone)) {
            // 2.如果不符合,返回错误信息
            return Result.fail("手机号格式错误!");
        }
        // 3.符合,生成验证码
        String code = RandomUtil.randomNumbers(6);

        // 4.保存验证码到 session
        session.setAttribute("code",code);
        // 5.发送验证码
        log.debug("发送短信验证码成功,验证码:{}",code);
        // 返回ok
        return Result.ok();
    }

登录

service层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
    @Override
    public Result login(LoginFormDTO loginForm, HttpSession session) {
        // 1.校验手机号
        String phone = loginForm.getPhone();
        if (RegexUtils.isPhoneInvalid(phone)) {
            // 2.如果不符合,返回错误信息
            return Result.fail("手机号格式错误!");
        }
        // 3.校验验证码
        Object cacheCode = session.getAttribute("code");
        String code = loginForm.getCode();
        if(cacheCode == null || !cacheCode.toString().equals(code)){
             //3.不一致,报错
            return Result.fail("验证码错误");
        }
        //一致,根据手机号查询用户
        User user = query().eq("phone", phone).one();

        //5.判断用户是否存在
        if(user == null){
            //不存在,则创建
            user =  createUserWithPhone(phone);
        }
        //7.保存用户信息到session中
        session.setAttribute("user",user);

        return Result.ok();
    }

	    private User createUserWithPhone(String phone) {
        //创建user对象
        User user=new User();
        user.setPhone(phone);
        user.setNickName("user_"+RandomUtil.randomString(10));
        //保存user对象到数据库中,使用mybatis-plus的方法
        save(user);
        return user;
    }

到这里算登录成功了,但由于设置拦截功能,所以还需要实现登录拦截,当用户进行访问的时候,检验用户的session进行验证

统一拦截

使用拦截器进行统一拦截,如果通过拦截则进行放行,由于是在拦截器上进行登录处理的,为了能存储用户信息,所以使用threadlocal

不使用session是因为HttpSession是基于Servlet容器的会话管理机制,在每个请求处理线程中,都会使用相同的HttpSession实例。如果多个线程同时操作同一个HttpSession对象,可能会导致数据混乱或者并发访问异常。

在utlis文件夹中创建一个拦截器类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class loginInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //1.获取session
        HttpSession session = request.getSession();
        //2.获取session中的用户
        Object user = session.getAttribute("user");
        //3.判断用户是否存在
        if(user == null){
            //4.不存在,拦截,返回401状态码
            response.setStatus(401);
            return false;
        }
      
        UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
        //5.存在,保存用户信息到Threadlocal
        UserHolder.saveUser(userDTO);
        //6.放行
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        //移除用户信息
        UserHolder.removeUser();
    }
}

在config中创建一个mvcconfig类,用于让拦截器生效,并且设置要拦截哪些路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
public class MvcConfig implements WebMvcConfigurer {
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        // 登录拦截器
        registry.addInterceptor(new loginInterceptor())
                .excludePathPatterns(
                        "/shop/**",
                        "/voucher/**",
                        "/shop-type/**",
                        "/upload/**",
                        "/blog/hot",
                        "/user/code",
                        "/user/login"
                ).order(1);
        // token刷新的拦截器
        registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);
    }
}

整体过程:

首先会进行登录验证—-登录验证成功后将user存入session中,接着返回前端—-前端就会访问me接口—-进入拦截器—-通过拦截器将userdto存入thradlocal中—-访问mecontroller将threadlocal中userdto拿出返回给前端进行查询

这里要注意因为threadlocal用的对象是userdto,如果强转则前端无法通过,需要使用BeanUtil.copyProperties进行属性复制,存入threadlocal中,通过拦截器后mecontroller会从thradlocal中拿出userdto返回给前端,前端拿出userdto信息进行查询显示,如果失败就转到login界面,可以从源头login那里加入session的时候就改成userDto

基于Redis实现登录

session共享问题

问题:因为后续可能会有多个tomcat实现负载均衡,多个tomcat之间的session是不共享的,这样会造成的问题是当用户登录的时候用的第一个tomcat,将用户信息保存在了第一个tomcat的session中,但后面的请求用的其他的tomcat,但其他tomcat的session没有保存用户信息,就会登录失败

image-20240516150946916

之前的解决方案是通过session拷贝的方式,但存在服务器压力大,有延迟的问题,所以后续方案采用基于redis来完成,我们把session换成redis,redis数据本身就是共享的,就可以避免session共享的问题了

Redis代替session

使用string结构

之前使用session时,是用户请求时有cookie,cookie中有session id,通过session id找到对应的session进行拦截登录

现在则需要使用token存储用户信息进行访问,登录的时候就会创建token保存在redis中,登录拦截的时候就比较token,判断是否存在该对象,如果存在就放到threadlocal

image-20240516160917799

当1结束后,前端接收到token,会将token保存到sessionStorage,以后每次访问就会带着token进行访问

首先修改login方法,将其修改成保存在redis中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
    // 1.校验手机号
    String phone = loginForm.getPhone();
    if (RegexUtils.isPhoneInvalid(phone)) {
        // 2.如果不符合,返回错误信息
        return Result.fail("手机号格式错误!");
    }
    // 3.因为在sendcode方法中就会把code放到redis中,从redis获取验证码并校验
    String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
    String code = loginForm.getCode();
    if (cacheCode == null || !cacheCode.equals(code)) {
        // 不一致,报错
        return Result.fail("验证码错误");
    }

    // 4.一致,根据手机号查询用户 select * from tb_user where phone = ?
    User user = query().eq("phone", phone).one();

    // 5.判断用户是否存在
    if (user == null) {
        // 6.不存在,创建新用户并保存
        user = createUserWithPhone(phone);
    }

    // 7.保存用户信息到 redis中
    // 7.1.随机生成token,作为登录令牌
    String token = UUID.randomUUID().toString(true);
    // 7.2.将User对象转为HashMap存储
    UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
    Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
            CopyOptions.create()
                    .setIgnoreNullValue(true)
                    .setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
    // 7.3.存储
    String tokenKey = LOGIN_USER_KEY + token;
    stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
    // 7.4.设置token有效期
  	// 考虑到如果期间有操作那过期时间就应该重置,所以需要在拦截器那里重新修改
    stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);

    // 8.返回token
    return Result.ok(token);
}

接着就可以修改拦截器,拦截器中需要每次操作时就重置一次token的过期时间,但考虑到最开始设置的拦截器是拦截部分功能请求,但这种重置应该是对于所有功能请求都有效的

所以要再添加一个拦截器,第一个拦截器就获取toekn,查询用户,刷新token有效期,第二个拦截器来判断用户是否存在进行放行

image-20240516202147762

RefreshTokenInterceptor

这个拦截器需要获取请求头中的token,基于token判断是否在redis中存在,如果存在将用户信息保存到threadlocal中,并且刷新token时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class RefreshTokenInterceptor implements HandlerInterceptor {
    private StringRedisTemplate stringRedisTemplate;

    public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 1.获取请求头中的token
        String token = request.getHeader("authorization");
        if (StrUtil.isBlank(token)) {
            return true;
        }
        // 2.基于TOKEN获取redis中的用户
        String key  = LOGIN_USER_KEY + token;
        Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
        // 3.判断用户是否存在
        if (userMap.isEmpty()) {
            return true;
        }
        // 5.将查询到的hash数据转为UserDTO
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
        // 6.存在,保存用户信息到 ThreadLocal
        UserHolder.saveUser(userDTO);
        // 7.刷新token有效期
        stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);
        // 8.放行
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 移除用户
        UserHolder.removeUser();
    }
}

LoginInterceptor

因为上一个拦截器会将用户存到threadlocal中,所以这个拦截器只需要判断threadlocal中是否含有该用户即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class LoginInterceptor implements HandlerInterceptor {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 1.判断是否需要拦截(ThreadLocal中是否有用户)
        if (UserHolder.getUser() == null) {
            // 没有,需要拦截,设置状态码
            response.setStatus(401);
            // 拦截
            return false;
        }
        // 有用户,则放行
        return true;
    }
}

MvcConfig

order用于设置拦截器先后顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
public class MvcConfig implements WebMvcConfigurer {
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        // 登录拦截器 order用于设置拦截器先后顺序
        registry.addInterceptor(new loginInterceptor())
                .excludePathPatterns(
                        "/shop/**",
                        "/voucher/**",
                        "/shop-type/**",
                        "/upload/**",
                        "/blog/hot",
                        "/user/code",
                        "/user/login"
                ).order(1);
        // token刷新的拦截器
        registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);
    }
}

商品查询缓存

缓存(Cache),就是数据交换的缓冲区,俗称的缓存就是缓冲区内的数据,一般从数据库中获取,存储于本地代码

添加商品缓存

添加商品缓存

查询数据库之前先查询缓存,如果缓存数据存在,则直接从缓存中返回,如果缓存数据不存在,再查询数据库,然后将数据存入redis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public Result queryById(Long id) {
        String key="cache:shop:"+id;
        //1.从redis查询缓存
        String shopJson=stringRedisTemplate.opsForValue().get(key);
        //2.判断是否存在
        if(StrUtil.isNotBlank(shopJson)){
            //3.存在返回
            Shop shop= JSONUtil.toBean(shopJson,Shop.class);
            return Result.ok(shop);
        }
        //4.不存在从数据库中查
        Shop shop=getById(id);
        //5.数据库不存在报错
        if(shop==null){
            return Result.fail("店铺不存在");
        }
        //6.写入redis
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop));
        //7.返回
        return Result.ok(shop);
    }

添加商品类型缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Resource
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public Result queryList() {
        String key="type";
        //1.从redis查询缓存
        String typeListJson=stringRedisTemplate.opsForValue().get(key);
        //2.判断是否存在
        if(StrUtil.isNotBlank(typeListJson)){
            //3.存在返回
            List<ShopType> typeList = JSONUtil.toList(JSONUtil.parseArray(typeListJson), ShopType.class);
            return Result.ok(typeList);
        }
        //4.不存在从数据库中查
        List<ShopType> typeList = query().orderByAsc("sort").list();
        //5.数据库不存在报错
        if(typeList==null){
            return Result.fail("店铺不存在");
        }
        //6.写入redis
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(typeList));
        //7.返回
        return Result.ok(typeList);
    }
缓存更新策略

主动更新有下面三种策略

  • Cache Aside Pattern 人工编码方式:缓存调用者在更新完数据库后再去更新缓存,也称之为双写方案
  • Read/Write Through Pattern : 由系统本身完成,数据库与缓存的问题交由系统本身去处理
  • Write Behind Caching Pattern :调用者只操作缓存,其他线程去异步处理数据库,实现最终一致

对于第一种方案,考虑到每次操作数据库后,都操作缓存,但是中间如果没有人查询,那么这个更新动作实际上只有最后一次生效,中间的更新动作意义并不大,我们可以把缓存删除,等待再次查询时,将缓存中的数据加载出来

image-20240516234050459

先操作数据库,再删除缓存这种模式出现问题的概率低一些

修改之前的代码

修改重点代码1:修改ShopServiceImpl的queryById方法

设置redis缓存时添加过期时间

image-20240516235011855

修改重点代码2

代码分析:通过之前的淘汰,我们确定了采用删除策略,来解决双写问题,当我们修改了数据之后,然后把缓存中的数据进行删除,查询时发现缓存中没有数据,则会从mysql中加载最新的数据,从而避免数据库和缓存不一致的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
		@Override
    @Transactional
    public Result update(Shop shop) {
        Long id=shop.getId();
        if(id==null){
            return Result.fail("店铺id不能为空");
        }
        //更新数据库
        updateById(shop);
        //删除缓存
        stringRedisTemplate.delete("cache:shop:"+shop.getId());
        return Result.ok();
    }
缓存穿透

常见的解决方案有两种:

  • 缓存空对象

    如果发生穿透,创建一个null对象存到缓存,可以设置一个ttl,在这个ttl时间内如果访问可以直接redis返回

    • 优点:实现简单,维护方便
    • 缺点:
      • 额外的内存消耗
      • 可能造成短期的不一致
  • 布隆过滤

    • 优点:内存占用较少,没有多余key
    • 缺点:
      • 实现复杂
      • 存在误判可能
  • 增强id的复杂度,避免被猜测id规律,就可以做好数据的基础格式校验

  • 做好热点参数的限流

使用缓存空对象解决缓存穿透

需要进行两步操作

  1. 数据库中不存在时,设置空字符串到redis中
  2. 判断如果redis该值为空,但不为null,就是之前设置的空字符串,所以就直接返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public Shop queryWithPassThrough(Long id){
        String key="cache:shop:"+id;
        //1.从redis查询缓存
        String shopJson=stringRedisTemplate.opsForValue().get(key);
        //2.判断是否存在
        if(StrUtil.isNotBlank(shopJson)){
            //3.存在返回
            return JSONUtil.toBean(shopJson,Shop.class);
        }
        if(shopJson!=null){
            //为空,但不是null,则一定是空字符串,即设置的空缓存对象
            return null;
        }
        //4.不存在从数据库中查
        Shop shop=getById(id);
        //5.数据库不存在,则发生了缓存穿透
        if(shop==null){
            //将空值写入redis
            stringRedisTemplate.opsForValue().set(key,"",2L, TimeUnit.MINUTES);
            return null;
        }
        //6.写入redis
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),30L, TimeUnit.MINUTES);
        //7.返回
        return shop;
    }
缓存雪崩

缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力

解决方案:

  • 给不同的Key的TTL添加随机值
  • 利用Redis集群提高服务的可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存
缓存击穿

缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

解决方案

  1. 利用互斥锁

    假设线程过来,只能一个人一个人的来访问数据库,从而避免对于数据库访问压力过大,但这也会影响查询的性能,因为此时会让查询的性能从并行变成了串行,我们可以采用tryLock方法 + double check来解决这样的问题

    image-20240517104304616

  2. 逻辑过期

    设置缓存数据永不过期,如果发现逻辑过期,通过异步操作去更新缓存数据,这个期间其他线程来就用旧数据

    image-20240517104651807

    问题是返回的是脏数据

互斥锁方案

进行查询之后,如果从缓存没有查询到数据,则进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有获得到,则休眠,过一会再进行尝试,直到获取到锁为止,才能进行查询

image-20240517111821471

利用redis的setnx方法来表示获取锁,该方法含义是

  • redis中如果没有这个key,则插入成功,返回1,在stringRedisTemplate中返回true

  • 如果有这个key则插入失败,则返回0,在stringRedisTemplate返回false

我们可以通过true,或者是false,来表示是否有线程成功插入key,成功插入的key的线程我们认为他就是获得到锁的线程

首先编写加锁和解锁方法

1
2
3
4
5
6
7
8
9
    private boolean tryLock(String key) {
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        //最后不要直接返回,拆箱可能造成空值
        return BooleanUtil.isTrue(flag);
    }

    private void unlock(String key) {
        stringRedisTemplate.delete(key);
    }

操作代码

Try lock+double check的操作

就是在之前缓存穿透的代码上在要获取数据库的时候进行获取互斥锁的判断,只有获取了互斥锁才能操作数据库,存入redis中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public Shop queryWithMutex(Long id)  {
        String key = CACHE_SHOP_KEY + id;
        // 1、从redis中查询商铺缓存
        String shopJson = stringRedisTemplate.opsForValue().get("key");
        // 2、判断是否存在
        if (StrUtil.isNotBlank(shopJson)) {
            // 存在,直接返回
            return JSONUtil.toBean(shopJson, Shop.class);
        }
        //判断命中的值是否是空值
        if (shopJson != null) {
            //返回一个错误信息
            return null;
        }
        // 4.实现缓存重构
        //4.1 获取互斥锁
        String lockKey = "lock:shop:" + id;
        Shop shop = null;
        try {
            boolean isLock = tryLock(lockKey);
            // 4.2 判断否获取成功
            if(!isLock){
                //4.3 失败,则休眠重试
                Thread.sleep(50);
                return queryWithMutex(id);
            }
            //获取锁成功之后可以在检查一下redis是否有该数据,做一个double check
            //4.4 成功,根据id查询数据库
            shop = getById(id);
            // 5.不存在,返回错误
            if(shop == null){
                //将空值写入redis
                stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
                //返回错误信息
                return null;
            }
            //6.写入redis
            stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_NULL_TTL,TimeUnit.MINUTES);

        }catch (Exception e){
            throw new RuntimeException(e);
        }
        finally {
            //7.释放互斥锁
            unlock(lockKey);
        }
        return shop;
    }

逻辑过期方案

思路分析:

  1. 当用户开始查询redis时,判断是否命中,如果没有命中则直接返回空数据,不查询数据库
  2. 而一旦命中后,将value取出,判断value中的过期时间是否满足
  3. 如果没有过期,则直接返回redis中的数据
  4. 如果过期,则在开启独立线程后直接返回之前的数据,独立线程去重构数据,重构完成后释放互斥锁。

步骤一:

因为现在redis中存储的数据的value需要带上过期时间,所以需要创建一个新的实体类

1
2
3
4
5
@Data
public class RedisData {
    private LocalDateTime expireTime;
    private Object data;
}

步骤二:

因为考虑的是一定命中的情况,所以首先需要对redis进行预热,将shop数据先加入到redis中

1
2
3
4
5
6
7
    public void saveShopto2Redis(Long id,Long expireSeconds){
        Shop shop=getById(id);
        RedisData redisData=new RedisData();
        redisData.setData(shop);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
        stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
    }

这里需要提前写个测试类,存一个这个redis信息进去,因为代码中考虑的是如果redis差不多就返回空

接着就可以写逻辑过期正式代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
		//定义线程池
		private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10);
		//定义逻辑过期查询方法
    public Shop queryWithLogicalExpire(Long id){
        String key = CACHE_SHOP_KEY + id;
        // 1.从redis查询商铺缓存
        String json = stringRedisTemplate.opsForValue().get(key);
        // 2.判断是否存在
        if (StrUtil.isBlank(json)) {
            // 3.存在,直接返回
            return null;
        }
        // 4.命中,需要先把json反序列化为对象,因为要看过期时间
        RedisData redisData = JSONUtil.toBean(json, RedisData.class);
        //获取店铺
        Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
        //获取过期时间
        LocalDateTime expireTime = redisData.getExpireTime();
        // 5.判断是否过期
        if(expireTime.isAfter(LocalDateTime.now())) {
            // 5.1.未过期,直接返回店铺信息
            return shop;
        }
        // 5.2.已过期,需要缓存重建
        // 6.缓存重建
        // 6.1.获取互斥锁
        String lockKey = LOCK_SHOP_KEY + id;
        boolean isLock = tryLock(lockKey);
        // 6.2.判断是否获取锁成功
        if (isLock){
            //异步重建缓存,用缓存池来实现
            CACHE_REBUILD_EXECUTOR.submit( ()->{
                try{
                    //重建缓存
                    this.saveShopto2Redis(id,20L);
                }catch (Exception e){
                    throw new RuntimeException(e);
                }finally {
                    unlock(lockKey);
                }
            });
        }
        // 6.4.返回过期的商铺信息
        return shop;
    }
封装redis工具类

基于StringRedisTemplate封装一个缓存工具类,满足下列需求:

  • 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
  • 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓

存击穿问题

  • 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
  • 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题

工具类编写,主要是因为不知道data类型所以需要使用泛型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@Slf4j
@Component
public class CacheClient {

    private final StringRedisTemplate stringRedisTemplate;

    private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

    public CacheClient(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    //设置缓存
    public void set(String key, Object value, Long time, TimeUnit unit) {
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);
    }

    public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) {
        // 设置逻辑过期
        RedisData redisData = new RedisData();
        redisData.setData(value);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
        // 写入Redis
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
    }

    //解决缓存穿透,因为返回值不确定,所以用泛型,Class<R> type泛型的推断类别
    //Function<ID, R> dbFallback是数据库查询逻辑,因为具体不知道怎么查数据库,所以让调用者来写
    //Long time, TimeUnit unit是设置空对象过期的时间和单位
    public <R,ID> R queryWithPassThrough(
            String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit){
        String key = keyPrefix + id;
        // 1.从redis查询商铺缓存
        String json = stringRedisTemplate.opsForValue().get(key);
        // 2.判断是否存在
        if (StrUtil.isNotBlank(json)) {
            // 3.存在,直接返回
            return JSONUtil.toBean(json, type);
        }
        // 判断命中的是否是空值
        if (json != null) {
            // 返回一个错误信息
            return null;
        }

        // 4.不存在,根据id查询数据库
        R r = dbFallback.apply(id);
        // 5.不存在,返回错误
        if (r == null) {
            // 将空值写入redis
            stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
            // 返回错误信息
            return null;
        }
        // 6.存在,写入redis
        this.set(key, r, time, unit);
        return r;
    }

    //通过逻辑过期解决缓存击穿
    public <R, ID> R queryWithLogicalExpire(
            String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
        String key = keyPrefix + id;
        // 1.从redis查询商铺缓存
        String json = stringRedisTemplate.opsForValue().get(key);
        // 2.判断是否存在
        if (StrUtil.isBlank(json)) {
            // 3.存在,直接返回
            return null;
        }
        // 4.命中,需要先把json反序列化为对象
        RedisData redisData = JSONUtil.toBean(json, RedisData.class);
        R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
        LocalDateTime expireTime = redisData.getExpireTime();
        // 5.判断是否过期
        if(expireTime.isAfter(LocalDateTime.now())) {
            // 5.1.未过期,直接返回店铺信息
            return r;
        }
        // 5.2.已过期,需要缓存重建
        // 6.缓存重建
        // 6.1.获取互斥锁
        String lockKey = LOCK_SHOP_KEY + id;
        boolean isLock = tryLock(lockKey);
        // 6.2.判断是否获取锁成功
        if (isLock){
            // 6.3.成功,开启独立线程,实现缓存重建
            CACHE_REBUILD_EXECUTOR.submit(() -> {
                try {
                    // 查询数据库
                    R newR = dbFallback.apply(id);
                    // 重建缓存
                    this.setWithLogicalExpire(key, newR, time, unit);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }finally {
                    // 释放锁
                    unlock(lockKey);
                }
            });
        }
        // 6.4.返回过期的商铺信息
        return r;
    }

    //通过互斥锁解决缓存击穿
    public <R, ID> R queryWithMutex(
            String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
        String key = keyPrefix + id;
        // 1.从redis查询商铺缓存
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        // 2.判断是否存在
        if (StrUtil.isNotBlank(shopJson)) {
            // 3.存在,直接返回
            return JSONUtil.toBean(shopJson, type);
        }
        // 判断命中的是否是空值
        if (shopJson != null) {
            // 返回一个错误信息
            return null;
        }

        // 4.实现缓存重建
        // 4.1.获取互斥锁
        String lockKey = LOCK_SHOP_KEY + id;
        R r = null;
        try {
            boolean isLock = tryLock(lockKey);
            // 4.2.判断是否获取成功
            if (!isLock) {
                // 4.3.获取锁失败,休眠并重试
                Thread.sleep(50);
                return queryWithMutex(keyPrefix, id, type, dbFallback, time, unit);
            }
            // 4.4.获取锁成功,根据id查询数据库
            r = dbFallback.apply(id);
            // 5.不存在,返回错误
            if (r == null) {
                // 将空值写入redis
                stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
                // 返回错误信息
                return null;
            }
            // 6.存在,写入redis
            this.set(key, r, time, unit);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }finally {
            // 7.释放锁
            unlock(lockKey);
        }
        // 8.返回
        return r;
    }

    private boolean tryLock(String key) {
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }

    private void unlock(String key) {
        stringRedisTemplate.delete(key);
    }
}

接着就可以在ShopServiceImpl 中调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Resource
private CacheClient cacheClient;

 @Override
    public Result queryById(Long id) {
        // 解决缓存穿透
        Shop shop = cacheClient
                .queryWithPassThrough(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);

        // 互斥锁解决缓存击穿
        // Shop shop = cacheClient
        //         .queryWithMutex(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);

        // 逻辑过期解决缓存击穿
        // Shop shop = cacheClient
        //         .queryWithLogicalExpire(CACHE_SHOP_KEY, id, Shop.class, this::getById, 20L, TimeUnit.SECONDS);

        if (shop == null) {
            return Result.fail("店铺不存在!");
        }
        // 7.返回
        return Result.ok(shop);
    }

优惠卷秒杀

全局唯一ID

用户抢购后,会会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:

  • id的规律性太明显
  • 受单表数据量的限制

对于第二条就可能造成如果表过大,mysql的单表的容量不宜超过500W,数据量过大之后,我们要进行拆库拆表,但拆分表了之后,他们从逻辑上讲他们是同一张表,所以他们的id是不能一样的, 于是乎我们需要保证id的唯一性。

所以需要用Redis来全局生成ID

  • ID的组成部分:符号位:1bit,永远为0

  • 时间戳:31bit,以秒为单位,可以使用69年

  • 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID

image-20240517161514424

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class RedisIdWorker {
    //定义初始时间戳
    private static final long BEGIN_TIMESTAMP = 1640995200L;
    private static final short COUNT_BITS = 32;
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    public Long nextId(String keyPrefix){
        //生成时间戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;
        //生成序列号
        // 2.1.获取当前日期,精确到天
        String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        // 2.2.自增长
        long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
        //拼接并返回  
        // 因为如果直接拼接就是字符串,为了拼接成数字,所以首先需要时间戳左移32位,在以或运算拼接序列号
        return timestamp << COUNT_BITS | count;
    }
}

接着可以用测试类进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 @Resource
    private RedisIdWorker redisIdWorker;
    private ExecutorService executorService= Executors.newFixedThreadPool(500);

    @Test
    void testIdWorker(){
      	//因为想统计时间,但由于线程是异步的,所以用CountDownLatch当其他线程等一下
        CountDownLatch latch = new CountDownLatch(300);
        Runnable task=()->{
            for(int i=0;i<100;i++){
                long id=redisIdWorker.nextId("order");
                System.out.println("id:"+id);
            }
            latch.countDown();
        };
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 300; i++) {
            executorService.submit(task);
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        long end = System.currentTimeMillis();
        System.out.println("time = " + (end - begin));
    }

countdownlatch

countdownlatch名为信号枪:主要的作用是同步协调在多线程的等待于唤醒问题

我们如果没有CountDownLatch ,那么由于程序是异步的,当异步程序没有执行完时,主线程就已经执行完了,然后我们期望的是分线程全部走完之后,主线程再走,所以我们此时需要使用到CountDownLatch

CountDownLatch 中有两个最重要的方法

  1. countDown
  2. await

await 方法 是阻塞方法,我们担心分线程没有执行完时,main线程就先执行,所以使用await可以让main线程阻塞,那么什么时候main线程不再阻塞呢?当CountDownLatch 内部维护的 变量变为0时,就不再阻塞,直接放行,那么什么时候CountDownLatch 维护的变量变为0 呢,我们只需要调用一次countDown ,内部变量就减少1,我们让分线程和变量绑定, 执行完一个分线程就减少一个变量,当分线程全部走完,CountDownLatch 维护的变量就是0,此时await就不再阻塞,统计出来的时间也就是所有分线程执行完后的时间。

添加优惠卷

针对特价卷,特价卷除了具有优惠卷的基本信息以外,还具有库存,抢购时间,结束时间等等字段

**新增普通卷代码: **VoucherController

1
2
3
4
5
@PostMapping
public Result addVoucher(@RequestBody Voucher voucher) {
    voucherService.save(voucher);
    return Result.ok(voucher.getId());
}

新增秒杀卷代码:

VoucherController

1
2
3
4
5
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
    voucherService.addSeckillVoucher(voucher);
    return Result.ok(voucher.getId());
}

VoucherServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
    // 保存优惠券
    save(voucher);
    // 保存秒杀信息
    SeckillVoucher seckillVoucher = new SeckillVoucher();
    seckillVoucher.setVoucherId(voucher.getId());
    seckillVoucher.setStock(voucher.getStock());
    seckillVoucher.setBeginTime(voucher.getBeginTime());
    seckillVoucher.setEndTime(voucher.getEndTime());
    seckillVoucherService.save(seckillVoucher);
    // 保存秒杀库存到Redis中
    stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
实现秒杀下单

下单时需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
  • 库存是否充足,不足则无法下单

VoucherOrderServiceImpl类下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
		@Resource
    private SeckillVoucherServiceImpl seckillVoucherService;
    @Resource
    private RedisIdWorker redisIdWorker;
    @Override
    public Result seckillVoucher(Long voucherId) {
        // 1.查询优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        // 2.判断秒杀是否开始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            // 尚未开始
            return Result.fail("秒杀尚未开始!");
        }
        // 3.判断秒杀是否已经结束
        if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
            // 尚未开始
            return Result.fail("秒杀已经结束!");
        }
        // 4.判断库存是否充足
        if (voucher.getStock() < 1) {
            // 库存不足
            return Result.fail("库存不足!");
        }
        //5,扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock= stock -1")
                .eq("voucher_id", voucherId).update();
        if (!success) {
            //扣减库存
            return Result.fail("库存不足!");
        }
        //6.创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        // 6.1.订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 6.2.用户id
        Long userId = UserHolder.getUser().getId();
        voucherOrder.setUserId(userId);
        // 6.3.代金券id
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);
        return Result.ok(orderId);
    }

库存超卖问题

image-20240517234914762

使用乐观锁来解决超卖问题

  1. 版本号机制
  2. CAS

版本号机制

  1. 线程查询资源的时候有库存和版本号
  2. 当线程要对资源进行操作的时候会判断当前的版本号之后是之前那个
    1. 如果是之前那个版本号就进行操作,并且将版本号加1
    2. 如果版本号不是之前那个就不能对库存进行操作

在这个应用中由于库存和版本号是一样的,所以用库存代替版本号进行操作

思路一:设置乐观锁版本号机制

image-20240518000630950

但是以上这种方式通过测试发现会有很多失败的情况,失败的原因在于:在使用乐观锁过程中假设100个线程同时都拿到了100的库存,然后大家一起去进行扣减,但是100个人中只有1个人能扣减成功,其他的人在处理时,他们在扣减时,库存已经被修改过了,所以此时其他线程都会失败

思路二:对上一个进行的修改,没必要要一定相等,只要库存大于0即可

1
2
3
4
5
 boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", voucherId)
                .gt("stock", 0)    
                .update();

这种方式就可以解决超卖问题

优惠劵秒杀-一人一单

之前的代码对于每个人能抢的优惠劵数量是没有限制的,所以应该加一层逻辑让每个用户只能下一单,需要根据优惠卷id和用户id查询是否已经下过这个订单,如果下过这个订单,则不再下单,否则进行下单

所以在VoucherOrderServiceImpl的seckillVoucher加入下面逻辑

1
2
3
4
5
6
7
8
9
    // 5.一人一单逻辑
    // 5.1.用户id
    Long userId = UserHolder.getUser().getId();
    int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
    // 5.2.判断是否存在
    if (count > 0) {
        // 用户已经购买过了
        return Result.fail("用户已经购买过一次!");
    }

但这个代码也存在之前说的多线程问题,即可以同一个id的多个线程都先进行查询,然后都查到是没有的,然后都就通过这段代码,然后进行后面的扣减库存

所以只能用悲观锁进行判断,首先我们的初始方案是封装了一个createVoucherOrder方法,同时为了确保他线程安全,在方法上添加了一把synchronized 锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {

	Long userId = UserHolder.getUser().getId();
         // 5.1.查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        // 5.2.判断是否存在
        if (count > 0) {
            // 用户已经购买过了
            return Result.fail("用户已经购买过一次!");
        }

        // 6.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1") // set stock = stock - 1
                .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
                .update();
        if (!success) {
            // 扣减失败
            return Result.fail("库存不足!");
        }

        // 7.创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        // 7.1.订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 7.2.用户id
        voucherOrder.setUserId(userId);
        // 7.3.代金券id
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);

        // 7.返回订单id
        return Result.ok(orderId);
}

但这种方式虽然可以避免问题,但锁的粒度太粗了,导致效率太低,所以最好是对userId加锁

重点:这里需要注意的是synchronized是锁的对象,即如果是相同对象,才会被锁住,userId.toString()的底层是返回的new了一个对象,所以是不会被锁住的,这里需要加一个intern() ,intern() 是从常量池拿到数据,就可以保证是相同的对象,就可以锁住

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Transactional
public  Result createVoucherOrder(Long voucherId) {
	Long userId = UserHolder.getUser().getId();
	synchronized(userId.toString().intern()){
         // 5.1.查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        // 5.2.判断是否存在
        if (count > 0) {
            // 用户已经购买过了
            return Result.fail("用户已经购买过一次!");
        }

        // 6.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1") // set stock = stock - 1
                .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
                .update();
        if (!success) {
            // 扣减失败
            return Result.fail("库存不足!");
        }

        // 7.创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        // 7.1.订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 7.2.用户id
        voucherOrder.setUserId(userId);
        // 7.3.代金券id
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);

        // 7.返回订单id
        return Result.ok(orderId);
    }
}

但这个解法还是有问题,即这里锁的范围是在事务里面,这样的话如果一个线程扣减库存后,释放了锁,但是还没提交事务,这个时候另一个线程就开始获取锁查询订单,这样查询的就是没有提交事务前的订单,这样还是错误的,所以需要将锁放在事务外面,就查询id锁userid,同一种id,同一时间只能有一个线程进入

所以将锁移到seckillVoucher方法里面,这样就是锁覆盖事务了

image-20240518224241937

重点:但这里还是有问题,这里牵扯到spring事务失效的问题,在上面代码中调用createVoucherOrder方法是用this调用的,是当前的VoucherOrderServiceImpl对象,不是他的代理对象,而想要事务生效,是需要利用代理生效,所以这个地方,我们需要获得原始的事务对象, 来操作事务

1
2
3
4
5
        Long userId = UserHolder.getUser().getId();
        synchronized(userId.toString().intern()){
            IVoucherOrderService proxy= (IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        }

但为了让AopContext.currentProxy()生效,需要下面两个操作

  1. 添加依赖

    1
    2
    3
    4
    
            <dependency>
                <groupId>org.aspectj</groupId>
                <artifactId>aspectjweaver</artifactId>
            </dependency>
    
  2. 添加注解

    image-20240518230006486

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了

有关锁失效原因分析

由于现在我们部署了多个tomcat,每个tomcat都有一个属于自己的jvm

那么假设在服务器A的tomcat内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的

但是如果现在是服务器B的tomcat内部,又有两个线程,但是他们的锁对象写的虽然和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2实现互斥

这就是 集群环境下,多个JVM存在,每个JVM都有自己的锁,syn锁失效的原因,在这种情况下,我们就需要使用分布式锁来解决这个问题。

分布式锁

syn只能保证单个JVM内,对于同一个对象的线程互斥

JVM内部因为只有一个锁监视器,所以只能有一个线程获取锁,当有多个JVM就有多个锁监视器

为了让多个JVM用一个锁监视器,需要使用分布式锁

分布式锁

  1. 可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思
  2. 互斥:互斥是分布式锁的最基本的条件,使得程序串行执行
  3. 高可用:程序不易崩溃,时时刻刻都保证较高的可用性
  4. 高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能
  5. 安全性:安全也是程序中必不可少的一环

常见的分布式锁有三种

Mysql:mysql本身就带有锁机制,但是由于mysql性能本身一般,所以采用分布式锁的情况下,其实使用mysql作为分布式锁比较少见

Redis:redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用redis或者zookeeper作为分布式锁,利用setnx这个方法,如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁

Zookeeper:利用节点的唯一性和有序性实现互斥,利用业务有序性来创建互斥,比如让节点最小的获取锁,释放的话可以直接删除节点,即可断开连接

自己实现分布式锁
利用redis实现分布式锁

我们利用redis 的setNx 方法,当有多个线程进入时,我们就利用该方法,第一个线程进入时,redis 中就有这个key 了,返回了1,如果结果是1,则表示他抢到了锁,那么他去执行业务,然后再删除锁,退出锁逻辑,没有抢到锁的线程,等待一定时间后重试即可

实现

锁的基本接口

1
2
3
4
5
6
7
public interface ILock {

    boolean tryLock(long timeoutSec);

    void unLock();

}

定义SimpleRedisLock类实现ILock

利用setnx方法进行加锁,同时增加过期时间,防止死锁,此方法可以保证加锁和增加过期时间具有原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SimpleRedisLock implements ILock{

    private String name;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    private static final String KEY_PREFIX="lock:";

    public SimpleRedisLock(String name) {
        this.name = name;
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        // 获取线程标示
        long threadId = Thread.currentThread().getId();
        // 获取锁
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
        //这里涉及到了拆箱的问题
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unLock() {
      	//释放锁
        stringRedisTemplate.delete(KEY_PREFIX+name);
    }
}

修改业务代码

对VoucherOrderServiceImpl类中的seckillVoucher进行修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public Result seckillVoucher(Long voucherId) {
        // 1.查询优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        // 2.判断秒杀是否开始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            // 尚未开始
            return Result.fail("秒杀尚未开始!");
        }
        // 3.判断秒杀是否已经结束
        if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
            // 尚未开始
            return Result.fail("秒杀已经结束!");
        }
        // 4.判断库存是否充足
        if (voucher.getStock() < 1) {
            // 库存不足
            return Result.fail("库存不足!");
        }
        Long userId = UserHolder.getUser().getId();
  
        //使用redis分布式锁
        String name="order:"+userId;
        SimpleRedisLock simpleRedisLock=new SimpleRedisLock(name,stringRedisTemplate);
        boolean isLock=simpleRedisLock.tryLock(1200);
        if(!isLock){
            //获取锁失败,说明已经有了
            return Result.fail("没人只运行下一单");
        }
        try{
            //因为有可能会发生异常,所以把锁放在finally中
            IVoucherOrderService proxy= (IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        }finally {
            simpleRedisLock.unLock();
        }
    }

上面这种用redis作为分布式锁也存在一些问题

Redis锁误删问题

比如:同一个用户有多个线程ABC

  1. 持有锁的线程A在锁的内部出现了阻塞,redis设置的时间超时了,这个时候redis会自动释放锁
  2. 这个时候线程B过来获取了redis锁
  3. 此时线程A反应过来,然后删除锁,因为是同一个user,所以可以删除
  4. 这个时候线程二的锁就被删了,那同一个用户的其他线程就可以进入获取锁,都进入了这个逻辑中

所以在设置锁的时候需要存入线程标示(可以用UUID表示)在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致

核心逻辑:在存入锁时,放入自己线程的标识,在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除

这里不能直接使用线程id,因为多个JVM创建线程id的算法都是递增的,这样可能造成冲突

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class SimpleRedisLock implements ILock{

    private String name;
    private StringRedisTemplate stringRedisTemplate;
    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }
    private static final String KEY_PREFIX="lock:";
    private static final String ID_PREFIX = UUID.randomUUID().toString() + "-";
    @Override
    public boolean tryLock(long timeoutSec) {
        // 获取线程标示
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    public void unLock() {
        // 获取线程标示
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁中的标示
        String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
        // 判断标示是否一致
        if(threadId.equals(id)) {
            // 释放锁
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
    }
}

Redis锁原子性问题

image-20240519225652827

所以必须判断锁和释放锁这两个动作是原子操作

使用Lua脚本解决多条命令原子性

Redis提供的调用函数,语法如下:

1
redis.call('命令名称', 'key', '其它参数', ...)

例如,我们要执行set name jack,则脚本是这样:

1
2
# 执行 set name jack
redis.call('set', 'name', 'jack')

例如,我们要先执行set name Rose,再执行get name,则脚本如下:

1
2
3
4
5
6
# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:

image-20240519230639854

例如,我们要执行 redis.call(‘set’, ‘name’, ‘jack’) 这个脚本,语法如下:

image-20240519230737746

如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:

image-20240519230822337

释放锁的业务流程是这样的

​ 1、获取锁中的线程标识

​ 2、判断是否与指定的标识(当前线程标识)一致

​ 3、如果一致则释放锁(删除)

​ 4、如果不一致则什么都不做

如果用Lua脚本来表示则是这样的:

最终我们操作redis的拿锁比锁删锁的lua脚本就会变成这样

1
2
3
4
5
6
7
8
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
  -- 一致,则删除锁
  return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

利用Java代码调用Lua脚本

RedisTemplate中,可以利用execute方法去执行lua脚本,参数对应关系就如下图

image-20240520100625733

重写SimpleRedisLock的unlock方法 这个是不支持可重入锁的,所以容易造成死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class SimpleRedisLock implements ILock{

    private String name;
    private StringRedisTemplate stringRedisTemplate;
    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }
    private static final String KEY_PREFIX="lock:";
    private static final String ID_PREFIX = UUID.randomUUID().toString() + "-";

    //创建一个RedisScript
    private static final DefaultRedisScript<Long>UNLOCK_SCRIPT;
    static {
        UNLOCK_SCRIPT=new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }
    @Override
    public boolean tryLock(long timeoutSec) {
        // 获取线程标示
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    //基于lua脚本进行解锁
    public void unLock() {
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                Collections.singletonList(KEY_PREFIX + name),
                ID_PREFIX + Thread.currentThread().getId());
    }}
分布式锁redission

上面的代码还存在下面的问题:

重入问题:是指同一个线程在持有锁的情况下,可以再次获取该锁而不会被阻塞。Java中的synchronized和Lock(如ReentrantLock)都是可重入的,这意味着一个线程在持有锁后可以再次进入同一锁保护的代码块而不会造成死锁。

如果锁不可重入,当一个线程在持有锁后再次尝试获取同一锁时,它会被阻塞,导致死锁。这种情况在嵌套方法调用中非常常见。例如,在Hashtable中,方法之间的相互调用可能会导致死锁。

计数器机制: 可重入锁内部通常有一个计数器和一个所有者标识。计数器记录了锁被获取的次数,所有者标识是当前持有锁的线程。

​ • 当一个线程第一次获取锁时,计数器设为1,所有者设为该线程。

​ • 如果同一线程再次获取锁,计数器递增。

​ • 线程在释放锁时,计数器递减。如果计数器为0,锁被完全释放。

不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。

超时释放:我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患

主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

Redission使用

引入依赖:

1
2
3
4
5
<dependency>
	<groupId>org.redisson</groupId>
	<artifactId>redisson</artifactId>
	<version>3.13.6</version>
</dependency>

配置Redisson客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        // 配置
        Config config = new Config();
      	//这里用自己的redis地址和密码
        config.useSingleServer().setAddress("redis://192.168.150.101:6379")
            .setPassword("123321");
        // 创建RedissonClient对象
        return Redisson.create(config);
    }
}

在 VoucherOrderServiceImpl注入RedissonClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
    public Result seckillVoucher(Long voucherId) {
        // 1.查询优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        // 2.判断秒杀是否开始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            // 尚未开始
            return Result.fail("秒杀尚未开始!");
        }
        // 3.判断秒杀是否已经结束
        if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
            // 尚未开始
            return Result.fail("秒杀已经结束!");
        }
        // 4.判断库存是否充足
        if (voucher.getStock() < 1) {
            // 库存不足
            return Result.fail("库存不足!");
        }
        Long userId = UserHolder.getUser().getId();
        //使用redis分布式锁
        String name="order:"+userId;

        //使用redisson分布式锁
      	//1.利用redissonClient创建锁
        RLock lock=redissonClient.getLock("Lock:order:"+userId);
      	//2.获取锁
        boolean isLock=lock.tryLock();
        if(!isLock){
            //获取锁失败,说明已经有了
            return Result.fail("没人只运行下一单");
        }
        try{
            //因为有可能会发生异常,所以把锁放在finally中
            IVoucherOrderService proxy= (IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        }finally {
          	//3.释放锁
            lock.unlock();
        }
    }
redission可重入锁原理

借助于底层的一个voaltile的一个state变量来记录重入的状态的

比如

  1. 当前没有人持有这把锁,那么state=0,假如有人持有这把锁,那么state=1
  2. 如果持有这把锁的人再次持有这把锁,那么state就会+1 ,如果是对于synchronized而言,他在c语言代码中会有一个count,原理和state类似,也是重入一次就加一
  3. 释放一次就-1
  4. 直到减少成0 时,表示当前这把锁没有被人持有,即释放锁

所以要实现的话,需要使用hash结构,存储线程id,和state的值

所以在判断锁存在的时候,还要判断锁是不是自己的,如果是自己的就可重入,锁计数+1

之前是解锁的时候要判断锁是不是自己的,现在加锁的时候也要判断,并且释放锁的时候只是state-1,再判断state是否等于0再释放

image-20240520121308063

这系列步骤也需要用lua脚本来写,保证整体的原子性

1
2
3
4
5
6
7
8
9
10
11
"if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);"
redission锁重试原理

redission获取锁源码

如果使用的是没有参数的构造函数,那么waitTime默认是-1,leaseTime根据watchdog的默认时间是30s,时间单位是毫秒。

waitTime是0表示如果获取不到就直接返回,就会再重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
  			//当前时间
        long current = System.currentTimeMillis();
  			//线程id
        long threadId = Thread.currentThread().getId();
  			//根据watchdog的默认值是30s,即默认超时时间是30s
  			//tryAcquire如果获取锁成功就返回null,如果不成功就会返回锁剩余有效期
        Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
        if (ttl == null) {
          	//即获取锁成功
            return true;
        } else {
          	//获取锁消耗时间,再用等待时间相减
            time -= System.currentTimeMillis() - current;
            if (time <= 0L) {
              	//如果将等待时间消耗完
                this.acquireFailed(waitTime, unit, threadId);
                return false;
            } else {
                current = System.currentTimeMillis();
              	//subscribe订阅别人释放锁的信号,因为目前是知道有线程还在的
              	//所以没必要立即再去获取锁,通过subscribe订阅别人释放锁的信号
                RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
              	//用await等待剩余time的时间
                if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
                  	//这里说明已经超过等待时间了,就删除订阅,返回false
                    if (!subscribeFuture.cancel(false)) {
                        subscribeFuture.onComplete((res, e) -> {
                            if (e == null) {
                                this.unsubscribe(subscribeFuture, threadId);
                            }

                        });
                    }
                    this.acquireFailed(waitTime, unit, threadId);
                    return false;
                } else {
                    boolean var14;          
                    try {
                      	//再用当前时间减去用sub等待的时间
                        time -= System.currentTimeMillis() - current;
                        if (time > 0L) {
                          	//大于0说明还有等待时间
                            boolean var16;
                          	//这里是一个do-while循环,条件是time > 0
                          	//会在循环里面一直进行尝试获取锁,直到获取锁成功或者等待时间为0
                            do {
                                long currentTime = System.currentTimeMillis();
                                ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
                                if (ttl == null) {
                                    var16 = true;
                                    return var16;
                                }

                                time -= System.currentTimeMillis() - currentTime;
                                if (time <= 0L) {
                                    this.acquireFailed(waitTime, unit, threadId);
                                    var16 = false;
                                    return var16;
                                }

                                currentTime = System.currentTimeMillis();
                                if (ttl >= 0L && ttl < time) {
                                    ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                                } else {
                                    ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                                }

                                time -= System.currentTimeMillis() - currentTime;
                            } while(time > 0L);

                            this.acquireFailed(waitTime, unit, threadId);
                            var16 = false;
                            return var16;
                        }

                        this.acquireFailed(waitTime, unit, threadId);
                        var14 = false;
                    } finally {
                        this.unsubscribe(subscribeFuture, threadId);
                    }

                    return var14;
                }
            }
        }
    }

下面的2、3、4步解决重试问题

总体流程:

如果是有参的构造函数:

  1. 尝试获取锁,如果获取锁失败(已经有锁,并且不是自己的id)
  2. 就看waittime剩余多少,如果没剩就返回false
  3. 如果有剩,会使用subscribe订阅别人释放锁的信号
  4. 如果在time时间内得到了信号,就会再次尝试去获取锁,这里会进入一个do-while循环,条件是time > 0,会在循环里面一直进行尝试获取锁,直到获取锁成功或者等待时间为0

如果是无参的构造函数:

只会尝试获取锁一次,如果失败了,就返回false,不会重试

redisson锁超时原理

之前对simpleredisLock进行实现的时候,虽然用了lua脚本对unlock进行原子操作,保证了不存在误删锁的问题

但是如果线程1获取redis锁操作的时候,阻塞了,锁过期了,但任务还没完成,线程2这个时候获取了锁,那这个时候也会存在线程安全问题,所以必须要确保锁是因为业务完成释放,而不是因为阻塞释放

这里redisson使用了一个watchdog机制来解决超时问题,会有一个自动更新,续约的操作

原理如下:

这一段是在获取锁成功之后,启用watchdog不断更新有效期

因为锁的失效时间是30s,当10s之后,此时这个timeTask 就触发了,他就去进行续约,把当前这把锁续约成30s,如果操作成功,那么此时就会递归调用自己,再重新设置一个timeTask(),于是再过10s后又再设置一个timerTask,完成不停的续约

释放锁操作中就会取消续约任务

注意点:

  1. 这里续约还会进行判断是不是重入的,重入的是不会续约的
  2. 是否开启watchdog取决于leaseTime是否为默认值,如果刚开始没参数传递,使用默认值30s,才会开启watchdog

image-20240520171327997

redission锁的主从一致原理

过程:我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了,这个时候新的主机是没有锁的,就会出现问题

为了解决这个问题,redission提出来了MutiLock锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。

具体实施:当我们去设置了多个锁时,redission会将多个锁添加到一个list集合中,然后用while循环去不停去尝试拿锁,但是会有一个总共的加锁时间,这个时间是用需要加锁的个数 * 1500ms ,假设有3个锁,那么时间就是4500ms,假设在这4500ms内,所有的锁都加锁成功, 那么此时才算是加锁成功,如果在4500ms有线程加锁失败,则会再次去进行重试.

image-20240520172932066

秒杀优化

方案

当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤

直接在redis中判断秒杀资格,如果有资格就直接返回成功,再再后台开一个线程,这个线程就慢慢执行queue中的信息,在数据库中创建订单

image-20240520181612563

但这里存在几个难点:

  1. 怎么在redis中进行校验一人一单(设置一个set集合存储userid),和库存判断(设置一个value值记录
  2. 由于校验和下单是两个线程,那么我们如何知道到底哪个单他最后是否成功,或者是下单完成(我们会将一些信息返回给前端,同时也会把这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询我们tomcat中的下单逻辑是否完成了)

为了解决上面的问题,思路如下:

1.校验

  1. 当用户下单之后,判断库存是否充足,只需要到redis中去根据key找对应的value是否大于0即可
  2. 如果不充足,则直接结束
  3. 如果充足,继续在redis中判断用户是否可以下单
  4. 如果set集合中没有这条数据(用于保证唯一性),说明他可以下单,则将userId和优惠卷存入到redis中,并且返回0,整个过程需要保证是原子性的,我们可以使用lua来操作

2.执行订单

  1. 判断当前redis中返回的结果是否是0 ,如果是0,则表示可以下单
  2. 将之前说的信息存入到到queue中去,然后返回
  3. 新创建线程异步的下单
  4. 前端可以通过返回的订单id来判断是否下单成功。

image-20240520183124331

实现
  1. 新增秒杀优惠券的同时,将优惠券信息保存到Redis中

    VoucherServiceImpl类中修改添加优惠劵方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    
    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
        // 保存优惠券
        save(voucher);
        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        seckillVoucher.setVoucherId(voucher.getId());
        seckillVoucher.setStock(voucher.getStock());
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);
        // 保存秒杀库存到Redis中
        //SECKILL_STOCK_KEY 这个变量定义在RedisConstans中
        //private static final String SECKILL_STOCK_KEY ="seckill:stock:"
        stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
    }
    
  2. 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

    完整lua表达式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    
    -- 1.参数列表
    -- 1.1.优惠券id
    local voucherId = ARGV[1]
    -- 1.2.用户id
    local userId = ARGV[2]
    -- 1.3.订单id
    local orderId = ARGV[3]
       
    -- 2.数据key
    -- 2.1.库存key
    local stockKey = 'seckill:stock:' .. voucherId
    -- 2.2.订单key
    local orderKey = 'seckill:order:' .. voucherId
       
    -- 3.脚本业务
    -- 3.1.判断库存是否充足 get stockKey
    if(tonumber(redis.call('get', stockKey)) <= 0) then
        -- 3.2.库存不足,返回1
        return 1
    end
    -- 3.2.判断用户是否下单 SISMEMBER orderKey userId
    if(redis.call('sismember', orderKey, userId) == 1) then
        -- 3.3.存在,说明是重复下单,返回2
        return 2
    end
    -- 3.4.扣库存 incrby stockKey -1
    redis.call('incrby', stockKey, -1)
    -- 3.5.下单(保存用户)sadd orderKey userId
    redis.call('sadd', orderKey, userId)
    -- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
    redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
    return 0
    
  3. 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
        static {
            SECKILL_SCRIPT=new DefaultRedisScript<>();
            SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
            SECKILL_SCRIPT.setResultType(Long.class);
        }
       
        //秒杀优化版本
        @Override
        public Result seckillVoucher(Long voucherId) {
            //获取用户
            Long userId = UserHolder.getUser().getId();
            long orderId = redisIdWorker.nextId("order");
            // 1.执行lua脚本
            Long result = stringRedisTemplate.execute(
                    SECKILL_SCRIPT,
                    Collections.emptyList(),
                    voucherId.toString(), userId.toString(), String.valueOf(orderId)
            );
            int r = result.intValue();
            // 2.判断结果是否为0
            if (r != 0) {
                // 2.1.不为0 ,代表没有购买资格
                return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
            }
            //TODO 保存阻塞队列
            // 3.返回订单id
            return Result.ok(orderId);
        }
    
  4. 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    
    rivate static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
       
        //在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的
        //让类初始化后,就开始执行VoucherOrderHandler任务
        @PostConstruct
        private void init() {
            SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
        }
       
        // 用于线程池处理的任务
        // 当初始化完毕后,就会去从对列中去拿信息
        private class VoucherOrderHandler implements Runnable {
            @Override
            public void run() {
                //不断从队列中尝试获取
                while (true) {
                    try {
                        // 1.获取队列中的订单信息
                        VoucherOrder voucherOrder = orderTasks.take();
                        // 2.创建订单
                        handleVoucherOrder(voucherOrder);
                    } catch (Exception e) {
                        log.error("处理订单异常", e);
                    }
                }
            }
        }
       
        private IVoucherOrderService proxy;
       
        //因为这是一个全新线程,所以不能从threadlocal中获取userId
        private void handleVoucherOrder(VoucherOrder voucherOrder) {
            //1.获取用户
            Long userId = voucherOrder.getUserId();
            // 2.创建锁对象
            RLock redisLock = redissonClient.getLock("lock:order:" + userId);
            // 3.尝试获取锁
            boolean isLock = redisLock.tryLock();
            // 4.判断是否获得锁成功
            if (!isLock) {
                // 获取锁失败,直接返回失败或者重试
                log.error("不允许重复下单!");
                return;
            }
            try {
                //注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效
                //所以这里需要在主线程中获取代理对象proxy,为了能在这使用,定义为成员变量
                //以前传入的是id,现在传入对象
                proxy.createVoucherOrder(voucherOrder);
            } finally {
                // 释放锁
                redisLock.unlock();
            }
        }
    

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Resource
    private SeckillVoucherServiceImpl seckillVoucherService;
    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedissonClient redissonClient;

  	//获取lua脚本
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    //创建一个阻塞队列
    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
  
  	//秒杀优化版本
    public Result seckillVoucher(Long voucherId) {
        Long userId = UserHolder.getUser().getId();
        long orderId = redisIdWorker.nextId("order");
        // 1.执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString(), String.valueOf(orderId)
        );
        int r = result.intValue();
        // 2.判断结果是否为0
        if (r != 0) {
            // 2.1.不为0 ,代表没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
        VoucherOrder voucherOrder = new VoucherOrder();
        // 2.3.订单id
        voucherOrder.setId(orderId);
        // 2.4.用户id
        voucherOrder.setUserId(userId);
        // 2.5.代金券id
        voucherOrder.setVoucherId(voucherId);
        // 2.6.放入阻塞队列
        orderTasks.add(voucherOrder);
        //3.异步下单,需要准备线程池和线程任务
        // 获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();
        //4.返回订单id
        return Result.ok(orderId);
    }
  
    //创建线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    //在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的
    //让类初始化后,就开始执行VoucherOrderHandler任务
    @PostConstruct
    private void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    // 用于线程池处理的任务
    // 当初始化完毕后,就会去从对列中去拿信息
    private class VoucherOrderHandler implements Runnable {
        @Override
        public void run() {
            //不断从队列中尝试获取
            while (true) {
                try {
                    // 1.获取队列中的订单信息
                    VoucherOrder voucherOrder = orderTasks.take();
                    // 2.创建订单
                    handleVoucherOrder(voucherOrder);
                } catch (Exception e) {
                    log.error("处理订单异常", e);
                }
            }
        }
    }

 		//定义成成员变量方便调用
    private IVoucherOrderService proxy;

    //这个方法就是作用是加锁,因为不能在create里面加,所以在这个方法中加锁,调用create方法
    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        //因为这是一个全新线程,所以不能从threadlocal中获取userId
        Long userId = voucherOrder.getUserId();
        // 2.创建锁对象
        RLock redisLock = redissonClient.getLock("lock:order:" + userId);
        // 3.尝试获取锁
        boolean isLock = redisLock.tryLock();
        // 4.判断是否获得锁成功
        if (!isLock) {
            // 获取锁失败,直接返回失败或者重试
            log.error("不允许重复下单!");
            return;
        }
        try {
            //注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效
            //所以这里需要在主线程中获取代理对象proxy,为了能在这使用,定义为成员变量
            //以前传入的是id,现在传入对象
            proxy.createVoucherOrder(voucherOrder);
        } finally {
            // 释放锁
            redisLock.unlock();
        }
    }

    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        //同样的由于是新的线程所以不能通过threadlocal获取
        Long userId = voucherOrder.getUserId();
        // 5.1.查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
        // 5.2.判断是否存在
        if (count > 0) {
            // 用户已经购买过了
            log.error("用户已经购买过一次!");
        }

        // 6.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1") // set stock = stock - 1
                .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0
                .update();
        if (!success) {
            // 扣减失败
            log.error("用户已经购买过一次!");
        }

        save(voucherOrder);
    }
}

上面代码基本完成业务需求,但仍存在两个问题

  • 内存限制问题:因为使用的是jdk的阻塞队列,所以占用jvm内存,如果创建对象过多,会导致内存溢出问题
  • 数据安全问题:如果突然内存宕机,这个时候订单还没存到数据库中就会产生安全问题
Redis消息队列

常用的消息队列有RabbitMQKafka

1. Apache Kafka

​ • 特点:高吞吐量、低延迟、水平扩展、持久化存储。

​ • 用途:实时数据流处理、日志收集、事件源、流分析。

​ • 优点

​ • 高性能:每秒处理数百万条消息。

​ • 可持久化:消息持久化到磁盘,支持故障恢复。

​ • 高可用:支持集群部署,分区和复制保证数据高可用性。

2. RabbitMQ

​ • 特点:基于AMQP协议、消息确认机制、灵活的路由功能。

​ • 用途:任务队列、消息路由、微服务通信。

​ • 优点

​ • 灵活的路由:通过交换机(Exchange)和队列(Queue)实现复杂的消息路由。

​ • 支持多种协议:AMQP、STOMP、MQTT等。

​ • 消息确认和重发:确保消息可靠传递。

基于List模拟消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。

队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。 不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

基于Stream的消息队列

  1. 消息可回溯:消息持久化
  2. 消息可以多个消费者读取
  3. 可以阻塞读取
  4. 有消息漏读的风险:连发多条,但只读最新一条

消费者组

多个生产者将日志消息写入Stream,消费者组并行处理这些日志,进行存储、分析和展示

将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

image-20240520232531740

从消费者组读取消息:

1
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID

”>”:从下一个未消费的消息开始 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

读取一个消息之后要确认一条消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private class VoucherOrderHandler implements Runnable {

    @Override
    public void run() {
        while (true) {
            try {
                // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                    StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                );
                // 2.判断订单信息是否为空
                if (list == null || list.isEmpty()) {
                    // 如果为null,说明没有消息,继续下一次循环
                    continue;
                }
                // 解析数据
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> value = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                // 3.创建订单
                createVoucherOrder(voucherOrder);
                // 4.确认消息 XACK
                stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
            } catch (Exception e) {
                log.error("处理订单异常", e);
                //处理异常消息
                handlePendingList();
            }
        }
    }

    private void handlePendingList() {
        while (true) {
            try {
                // 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().count(1),
                    StreamOffset.create("stream.orders", ReadOffset.from("0"))
                );
                // 2.判断订单信息是否为空
                if (list == null || list.isEmpty()) {
                    // 如果为null,说明没有异常消息,结束循环
                    break;
                }
                // 解析数据
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> value = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                // 3.创建订单
                createVoucherOrder(voucherOrder);
                // 4.确认消息 XACK
                stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
            } catch (Exception e) {
                log.error("处理pendding订单异常", e);
                try{
                    Thread.sleep(20);
                }catch(Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}

image-20240520232818286

探店

发布探店笔记

发布笔记

上传接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Slf4j
@RestController
@RequestMapping("upload")
public class UploadController {

    @PostMapping("blog")
    public Result uploadImage(@RequestParam("file") MultipartFile image) {
        try {
            // 获取原始文件名称
            String originalFilename = image.getOriginalFilename();
            // 生成新文件名
            String fileName = createNewFileName(originalFilename);
            // 保存文件
            image.transferTo(new File(SystemConstants.IMAGE_UPLOAD_DIR, fileName));
            // 返回结果
            log.debug("文件上传成功,{}", fileName);
            return Result.ok(fileName);
        } catch (IOException e) {
            throw new RuntimeException("文件上传失败", e);
        }
    }

}

需要修改SystemConstants.IMAGE_UPLOAD_DIR 的地址,修改到nginx-html-images

提交博客接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RestController
@RequestMapping("/blog")
public class BlogController {

    @Resource
    private IBlogService blogService;

    @PostMapping
    public Result saveBlog(@RequestBody Blog blog) {
        //获取登录用户
        UserDTO user = UserHolder.getUser();
        blog.setUpdateTime(user.getId());
        //保存探店博文
        blogService.saveBlog(blog);
        //返回id
        return Result.ok(blog.getId());
    }
}

上传笔记

首先在BlogController层编写

1
2
3
4
    @GetMapping("/{id}")
    public Result queryById(@PathVariable("id") Long id){
        return blogService.queryBlogById(id);
    }

接着在service层实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
  
    @Resource
    private IUserService userService;

    @Override
    public Result queryBlogById(Long id) {
         Blog blog=getById(id);
        if (blog == null) {
            return Result.fail("笔记不存在!");
        }
        // 2.查询blog有关的用户
        queryBlogUser(blog);
        return Result.ok(blog);
    }
    
    private void queryBlogUser(Blog blog){
        Long user_id=blog.getUserId();
        User user = userService.getById(user_id);
        blog.setName(user.getNickName());
        blog.setIcon(user.getIcon());
    }
}

点赞(Set)

需求:

  • 同一个用户只能点赞一次,再次点击则取消点赞
  • 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)

实现步骤:

  • 给Blog类中添加一个isLike字段,标示是否被当前用户点赞
  • 修改点赞功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数+1,已点赞过则点赞数-1
  • 修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值给isLike字段
  • 修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段

具体实现

1、在Blog 添加一个字段

1
2
@TableField(exist = false)
private Boolean isLike;

2、修改代码

首先在redis中判断是否点赞过,如果有就取消点赞,删除数据库record并从redis中删除,如果没有点赞过就在两者中添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {

    @Resource
    private IUserService userService;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    //实现点赞功能
    @Override
    public Result likeBlog(Long id) {
        // 1.获取登录用户
        Long userId = UserHolder.getUser().getId();
        // 2.判断当前登录用户是否已经点赞
        String key = BLOG_LIKED_KEY + id;
        Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
        //isMember是包装类
        if(BooleanUtil.isFalse(isMember)){
            //3.如果未点赞,可以点赞
            //3.1 数据库点赞数+1
            boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
            //3.2 保存用户到Redis的set集合
            if(isSuccess){
                stringRedisTemplate.opsForSet().add(key,userId.toString());
            }
        }else{
            //4.如果已点赞,取消点赞
            //4.1 数据库点赞数-1
            boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
            //4.2 把用户从Redis的set集合移除
            if(isSuccess){
                stringRedisTemplate.opsForSet().remove(key,userId.toString());
            }
        }
        return Result.ok();
    }

    @Override
    public Result queryBlogById(Long id) {
         Blog blog=getById(id);
        if (blog == null) {
            return Result.fail("笔记不存在!");
        }
        // 2.查询blog有关的用户
        queryBlogUser(blog);
      	//3.查询是否点赞过,在blog中记录,用于前端显示高亮
        isBlogLiked(blog);
        return Result.ok(blog);
    }
  
  	//查询是否点赞过,在blog中记录,用于前端显示高亮
    private void isBlogLiked(Blog blog) {
        // 1.获取登录用户
        Long userId = UserHolder.getUser().getId();
        // 2.判断当前登录用户是否已经点赞
        String key = BLOG_LIKED_KEY + blog.getId();
        Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
        blog.setIsLike(BooleanUtil.isTrue(isMember));
    }

  	//在blog中记录用户信息,便于前端显示
    private void queryBlogUser(Blog blog){
        Long user_id=blog.getUserId();
        User user = userService.getById(user_id);
        blog.setName(user.getNickName());
        blog.setIcon(user.getIcon());
    }
}
点赞排行(ZSet)

在探店笔记的详情页面,应该把给该笔记点赞的人显示出来,比如最早点赞的TOP5,形成点赞排行榜:

之前的点赞是放到set集合,但是set集合是不能排序的,所以这个时候,咱们可以采用一个可以排序的set集合,就是咱们的sortedSet,其中的score用时间戳即可

所以需要重新修改上个业务的代码,将set改为zset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {

    @Resource
    private IUserService userService;

    @Resource
    private StringRedisTemplate stringRedisTemplate;
    
    //实现点赞功能
    @Override
    public Result likeBlog(Long id) {
        // 1.获取登录用户
        Long userId = UserHolder.getUser().getId();
        // 2.判断当前登录用户是否已经点赞
        String key = BLOG_LIKED_KEY + id;
      	//这里对zset进行查询,返回的score
        Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
        //isMember是包装类
        if(score==null){
            //3.如果未点赞,可以点赞
            //3.1 数据库点赞数+1
            boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
            //3.2 保存用户到Redis的ZSet集合
            if(isSuccess){
                stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
            }
        }else{
            //4.如果已点赞,取消点赞
            //4.1 数据库点赞数-1
            boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
            //4.2 把用户从Redis的Zset集合移除
            if(isSuccess){
                stringRedisTemplate.opsForZSet().remove(key,userId.toString());
            }
        }
        return Result.ok();
    }

    private void isBlogLiked(Blog blog) {
        // 1.获取登录用户
        Long userId = UserHolder.getUser().getId();
        // 2.判断当前登录用户是否已经点赞
        String key = BLOG_LIKED_KEY + blog.getId();
      	//这里用zset
        Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
        blog.setIsLike(score!=null);
    }
}

编写点赞列表查询接口

BlogController

1
2
3
4
5
@GetMapping("/likes/{id}")
public Result queryBlogLikes(@PathVariable("id") Long id) {

    return blogService.queryBlogLikes(id);
}

BlogService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public Result queryBlogLikes(Long id) {
    String key = BLOG_LIKED_KEY + id;
    // 1.查询top5的点赞用户 zrange key 0 4
    Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
    if (top5 == null || top5.isEmpty()) {
        return Result.ok(Collections.emptyList());
    }
    // 2.解析出其中的用户id
    List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
    String idStr = StrUtil.join(",", ids);
    // 3.根据用户id查询用户 WHERE id IN ( 5 , 1 ) ORDER BY FIELD(id, 5, 1)
    List<UserDTO> userDTOS = userService.query()
            .in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list()
            .stream()
            .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
            .collect(Collectors.toList());
    // 4.返回
    return Result.ok(userDTOS);
}

完整代码

这里的zset查询前5可以学习一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {

    @Resource
    private IUserService userService;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    //实现点赞功能
    @Override
    public Result likeBlog(Long id) {
        // 1.获取登录用户
        Long userId = UserHolder.getUser().getId();
        // 2.判断当前登录用户是否已经点赞
        String key = BLOG_LIKED_KEY + id;
        Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
        //isMember是包装类
        if(score==null){
            //3.如果未点赞,可以点赞
            //3.1 数据库点赞数+1
            boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
            //3.2 保存用户到Redis的ZSet集合
            if(isSuccess){
                stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
            }
        }else{
            //4.如果已点赞,取消点赞
            //4.1 数据库点赞数-1
            boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
            //4.2 把用户从Redis的set集合移除
            if(isSuccess){
                stringRedisTemplate.opsForZSet().remove(key,userId.toString());
            }
        }
        return Result.ok();
    }

    @Override
    public Result queryByLikes(Long id) {
        String key = BLOG_LIKED_KEY + id;
        // 1.查询top5的点赞用户 用ZRange key 0 4
        Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
        if (top5 == null || top5.isEmpty()) {
            return Result.ok(Collections.emptyList());
        }
        // 2.解析出其中的用户id
        List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
        // 3.根据用户id查询用户 WHERE id IN ( 5 , 1 ) ORDER BY FIELD(id, 5, 1)
        //在数据库中查询的时候用in,他不会按照给定id顺序进行查询,会默认id顺讯进行查询,
        //所以这里需要手动进行排序,"ORDER BY FIELD(id," + idStr + ")"
        //所以需要提前把字符串进行拼接
        String idStr = StrUtil.join(",", ids);
        List<UserDTO> userDTOS = userService.query()
                .in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list()
                .stream()
                .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());
        // 4.返回
        return Result.ok(userDTOS);
    }

    @Override
    public Result queryBlogById(Long id) {
         Blog blog=getById(id);
        if (blog == null) {
            return Result.fail("笔记不存在!");
        }
        // 2.查询blog有关的用户
        queryBlogUser(blog);
        isBlogLiked(blog);
        return Result.ok(blog);
    }

    //这里有一个问题,就是未登录用户也可以访问,所以第一步的从threadlocal中查id是不对的
    //所以首先要进行判断
    private void isBlogLiked(Blog blog) {
        UserDTO userDTO=UserHolder.getUser();
        if(userDTO==null){
            return;
        }
        // 1.获取登录用户
        Long userId = UserHolder.getUser().getId();
        // 2.判断当前登录用户是否已经点赞
        String key = BLOG_LIKED_KEY + blog.getId();
        Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
        blog.setIsLike(score!=null);
    }

    @Override
    public Result queryHotBlogById(Integer current) {
        // 根据用户查询
        Page<Blog> page = query()
                .orderByDesc("liked")
                .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
        // 获取当前页数据
        List<Blog> records = page.getRecords();
        // 查询用户
        records.forEach(blog -> {
            this.isBlogLiked(blog);
            this.queryBlogUser(blog);
        });
        return Result.ok(records);
    }

    private void queryBlogUser(Blog blog){
        Long user_id=blog.getUserId();
        User user = userService.getById(user_id);
        blog.setName(user.getNickName());
        blog.setIcon(user.getIcon());
    }
}

好友关注

关注

实现思路:

需求:基于该表数据结构,实现两个接口:

  • 关注和取关接口
  • 判断是否已关注的接口

首先编写FollowController层

1
2
3
4
5
6
7
8
9
10
//关注
@PutMapping("/{id}/{isFollow}")
public Result follow(@PathVariable("id") Long followUserId, @PathVariable("isFollow") Boolean isFollow) {
    return followService.follow(followUserId, isFollow);
}
//取消关注
@GetMapping("/or/not/{id}")
public Result isFollow(@PathVariable("id") Long followUserId) {
      return followService.isFollow(followUserId);
}

FollowService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {

    //关注
    @Override
    public Result follow(Long followUserId, Boolean isFollow) {
        // 1.获取登录用户
        Long userId = UserHolder.getUser().getId();
        String key = "follows:" + userId;
        // 1.判断到底是关注还是取关
        if (isFollow) {
            // 2.关注,新增数据
            Follow follow = new Follow();
            follow.setUserId(userId);
            follow.setFollowUserId(followUserId);
            save(follow);

        } else {
            // 3.取关,删除 delete from tb_follow where user_id = ? and follow_user_id = ?
            remove(new QueryWrapper<Follow>()
                    .eq("user_id", userId).eq("follow_user_id", followUserId));

        }
        return Result.ok();
    }

    @Override
    public Result isFollow(Long followUserId) {
        // 1.获取登录用户
        Long userId = UserHolder.getUser().getId();
        // 2.查询是否关注 select count(*) from tb_follow where user_id = ? and follow_user_id = ?
        Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();
        // 3.判断
        return Result.ok(count > 0);
    }
}
共同关注(Set)

首先需要能进入关注者的个人主页,这里会有两个请求

1、去查询用户的详情

2、去查询用户的笔记

对于1,需要在UserController中实现

1
2
3
4
5
6
7
8
9
10
11
12
// UserController 根据id查询用户
@GetMapping("/{id}")
public Result queryUserById(@PathVariable("id") Long userId){
	// 查询详情
	User user = userService.getById(userId);
	if (user == null) {
		return Result.ok();
	}
	UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
	// 返回
	return Result.ok(userDTO);
}

对于2,需要在BlogController中实现

1
2
3
4
5
6
7
8
9
10
11
@GetMapping("/of/user")
public Result queryBlogByUserId(
		@RequestParam(value = "current", defaultValue = "1") Integer current,
		@RequestParam("id") Long id) {
	// 根据用户查询
	Page<Blog> page = blogService.query()
			.eq("user_id", id).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
	// 获取当前页数据
	List<Blog> records = page.getRecords();
	return Result.ok(records);
}

接下来开始看共同关注

这里需要用到Redis中的set集合,set集合有个intersect可以实现求多个key的交集

所以这里首先需要在之前写的关注用户的代码中,将一个用户的follow对象都保存在一个set中

FollowServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public Result follow(Long followUserId, Boolean isFollow) {
    // 1.获取登录用户
    Long userId = UserHolder.getUser().getId();
    String key = "follows:" + userId;
    // 1.判断到底是关注还是取关
    if (isFollow) {
        // 2.关注,新增数据
        Follow follow = new Follow();
        follow.setUserId(userId);
        follow.setFollowUserId(followUserId);
        boolean isSuccess = save(follow);
        if (isSuccess) {
            // 把关注用户的id,放入redis的set集合 sadd userId followerUserId
            stringRedisTemplate.opsForSet().add(key, followUserId.toString());
        }
    } else {
        // 3.取关,删除 delete from tb_follow where user_id = ? and follow_user_id = ?
        boolean isSuccess = remove(new QueryWrapper<Follow>()
                .eq("user_id", userId).eq("follow_user_id", followUserId));
        if (isSuccess) {
            // 把关注用户的id从Redis集合中移除
            stringRedisTemplate.opsForSet().remove(key, followUserId.toString());
        }
    }
    return Result.ok();
}

实现共同关注

FollowServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
    public Result followCommons(Long id) {
        // 1.获取当前用户
        Long userId = UserHolder.getUser().getId();
        String key = "follows:" + userId;
        // 2.求交集
        String key2 = "follows:" + id;
        Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2);
        if (intersect == null || intersect.isEmpty()) {
            // 无交集
            return Result.ok(Collections.emptyList());
        }
        // 3.解析id集合
        List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
        // 4.查询用户
        List<UserDTO> users = userService.listByIds(ids)
                .stream()
                .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());
        return Result.ok(users);
    }
关注推送(Feed流)
Feed流

关注推送也叫做Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。

Feed流产品有两种常见模式: Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈

  • 优点:信息全面,不会有缺失。并且实现也相对简单
  • 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低

智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户

  • 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
  • 缺点:如果算法不精准,可能起到反作用

本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现方案有三种:

  • 拉模式
  • 推模式
  • 推拉结合

拉模式

该模式的核心含义就是:当张三和李四和王五发了消息后,都会保存在自己的邮箱中,假设赵六要读取信息,那么他会从读取他自己的收件箱,此时系统会从他关注的人群中,把他关注人的信息全部都进行拉取,然后在进行排序

优点:比较节约空间,因为赵六在读信息时,并没有重复读取,而且读取完之后可以把他的收件箱进行清除。

缺点:比较延迟,当用户读取数据时才去关注的人里边去读取数据,假设用户关注了大量的用户,那么此时就会拉取海量的内容,对服务器压力巨大。

推模式

推模式是没有写邮箱的,当张三写了一个内容,此时会主动的把张三写的内容发送到他的粉丝收件箱中去,假设此时李四再来读取,就不用再去临时拉取了

优点:时效快,不用临时拉取

缺点:内存压力大,假设一个大V写信息,很多人关注他, 就会写很多分数据到粉丝那边去

拉推模式

也叫做读写混合,兼具推和拉两种模式的优点。

推拉模式是一个折中的方案

站在发件人角度

  • 如果是个普通的人,那么我们采用写扩散的方式,直接把数据写入到他的粉丝中去,因为普通的人他的粉丝关注量比较小,所以这样做没有压力
  • 如果是大V,那么他是直接将数据先写入到一份到发件箱里边去,然后再直接写一份到活跃粉丝收件箱里边去

现在站在收件人角度

  • 如果是活跃粉丝,那么大V和普通的人发的都会直接写入到自己收件箱里边来
  • 而如果是普通的粉丝,由于他们上线不是很频繁,所以等他们上线时,再从发件箱里边去拉信息。

image-20240521170413302

image-20240521170605539

推模式

推送到粉丝收件箱

需求:

  • 修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
  • 收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
  • 查询收件箱数据时,可以实现分页查询
滚动分页

feed流分页问题

这个有个问题,就是feed流不能使用传统的分页,因为数据在一直变化的

image-20240521172311018

这里只能使用滚动分页

我们需要记录每次操作的最后一条,然后从这个位置开始去读取数据

举个例子:我们从t1时刻开始,拿第一页数据,拿到了10~6,然后记录下当前最后一次拿取的记录,就是6,t2时刻发布了新的记录,此时这个11放到最顶上,但是不会影响我们之前记录的6,此时t3时刻来拿第二页,第二页这个时候拿数据,还是从6后一点的5去拿,就拿到了5-1的记录。我们这个地方可以采用sortedSet来做,可以进行范围查询,并且还可以记录当前获取数据时间戳最小值,就可以实现滚动分页了

image-20240521172500463

首先要重新编写发布博客方法,新增博客之后,要逐一将粉丝id+博客id保存redis中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//发布博客
    @Override
    public Result saveBlog(Blog blog) {
        // 1.获取登录用户
        UserDTO user = UserHolder.getUser();
        blog.setUserId(user.getId());
        // 2.保存探店笔记
        boolean isSuccess = save(blog);
        if(!isSuccess){
            return Result.fail("新增笔记失败!");
        }
        // 3.查询笔记作者的所有粉丝 select * from tb_follow where follow_user_id = ?
        List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
        // 4.推送笔记id给所有粉丝
        for (Follow follow : follows) {
            // 4.1.获取粉丝id
            Long userId = follow.getUserId();
            // 4.2.推送
            String key = FEED_KEY + userId;
            stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());
        }
        // 5.返回id
        return Result.ok(blog.getId());
    }

实现分页查询

需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息

按score排序,每次查询记录最后一个score的大小,下一次查询就从这一次记录的score开始查询

但是这还是会有一个问题,就是如果score相同的情况

对于这个问题,每次查询的时候要记录本次查询最小score的相同的个数,作为下次查询的偏移量,用于跳过

具体操作如下:

1、每次查询完成后,我们要分析出查询出数据的最小时间戳,这个值会作为下一次查询的条件

2、我们需要找到与上一次查询相同的查询个数作为偏移量,下次查询时,跳过这些查询过的数据,拿到我们需要的数据

image-20240521202808595

所以第一步需要定义一个返回值的实体类

在dto文件夹下定义ScrollResult

1
2
3
4
5
6
@Data
public class ScrollResult {
    private List<?> list;
    private Long minTime;
    private Integer offset;
}

BlogController

注意:RequestParam 表示接受url地址栏传参的注解,当方法上参数的名称和url地址栏不相同时,可以通过RequestParam 来进行指定

1
2
3
4
5
@GetMapping("/of/follow")
public Result queryBlogOfFollow(
    @RequestParam("lastId") Long max, @RequestParam(value = "offset", defaultValue = "0") Integer offset){
    return blogService.queryBlogOfFollow(max, offset);
}

BlogServiceImpl编写queryBlogOfFollow方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 @Override
    public Result queryBlogOfFollow(Long max, Integer offset) {
        // 1.获取当前用户
        Long userId = UserHolder.getUser().getId();
        // 2.查询收件箱 ZREVRANGEBYSCORE key Max Min LIMIT offset count
        String key = FEED_KEY + userId;
        //滚动分页查询,max是上一次的最后一个,offset是偏移量,2表示每页查两条
        Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
                .reverseRangeByScoreWithScores(key, 0, max, offset, 2);
        // 3.非空判断
        if (typedTuples == null || typedTuples.isEmpty()) {
            return Result.ok();
        }
        // 4.解析数据:blogId、minTime(时间戳)、offset
        List<Long> ids = new ArrayList<>(typedTuples.size());
        //minTime,os在遍历过程中更新,为下一次分页查询做准备
        long minTime = 0; // 2
        int os = 1; // 2
        for (ZSetOperations.TypedTuple<String> tuple : typedTuples) { // 5 4 4 2 2
            // 4.1.获取id
            ids.add(Long.valueOf(tuple.getValue()));
            // 4.2.获取分数(时间戳)
            long time = tuple.getScore().longValue();
            if(time == minTime){
                os++;
            }else{
                minTime = time;
                os = 1;
            }
        }
      //如果是最小的分数,说明就没有了,就不必再偏移了
        os = minTime == max ? os : os + offset;
        // 5.根据id查询blog
        //这里和之前的一样的问题,因为sql中in会改变顺序,所以得ORDER BY FIELD
        String idStr = StrUtil.join(",", ids);
        List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();

        //这里还需要对于每个blog查询他是否被点赞了和有关的用户
        //这里会调用这些方法进行set相关的变量
        for (Blog blog : blogs) {
            // 5.1.查询blog有关的用户
            queryBlogUser(blog);
            // 5.2.查询blog是否被点赞
            isBlogLiked(blog);
        }

        // 6.封装并返回
        ScrollResult r = new ScrollResult();
        r.setList(blogs);
        r.setOffset(os);
        r.setMinTime(minTime);

        return Result.ok(r);
    }

附近商户

GEO数据结构

GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。常见的命令有:

  • GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
  • GEODIST:计算指定的两个点之间的距离并返回
  • GEOHASH:将指定member的坐标转为hash字符串形式并返回
  • GEOPOS:返回指定member的坐标
  • GEORADIUS:指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.以后已废弃
  • GEOSEARCH:在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能
  • GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。 6.2.新功能
导入店铺数据到GEO

image-20240521225150027

这里考虑到多种排序方式,一个是距离,一个是商户类型,所以可以用商户类型作为key,把用一个类型的商店放到一个GEO中,GEO中value就是店铺id,score就是地址

image-20240521231954727

创建一个测试类,将数据库中的店铺信息和店址存入到redis的Geo中

一共三个步骤

  1. 查询所有店铺信息放到一个list中
  2. 将list按照typeId分组,放到一个hashmap中
  3. 遍历hashmap,对于每一个组的店铺就放到redis的一个key中

注意:这里用了一个巧妙的方法创建了一个

List<RedisGeoCommands.GeoLocation> locations,将同一个类别的所有shop都放进去了,一次插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
    void loadShopData() {
        // 1.查询店铺信息
        List<Shop> list = shopService.list();
        // 2.把店铺分组,按照typeId分组,typeId一致的放到一个集合
        Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
        // 3.分批完成写入Redis
        for (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {
            // 3.1.获取类型id
            Long typeId = entry.getKey();
            String key = SHOP_GEO_KEY + typeId;
            // 3.2.获取同类型的店铺的集合
            List<Shop> value = entry.getValue();
            //创建这个集合,将所有shop信息放进去,一次请求
            List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());
            // 3.3.写入redis GEOADD key 经度 纬度 member
            for (Shop shop : value) {
                //每一个shop都请求一次效率太低了,所以创建locations集合,直接一次请求
                // stringRedisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()), shop.getId().toString());
                locations.add(new RedisGeoCommands.GeoLocation<>(
                        shop.getId().toString(),
                        new Point(shop.getX(), shop.getY())
                ));
            }
            stringRedisTemplate.opsForGeo().add(key, locations);
        }
    }
实现附近商户功能

第一步更新依赖,之前的依赖太老了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
				<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.data</groupId>
                    <artifactId>spring-data-redis</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>io.lettuce</groupId>
                    <artifactId>lettuce-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-redis</artifactId>
            <version>2.6.2</version>
        </dependency>
        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>6.1.9.RELEASE</version>
        </dependency>

第二步:

修改shopController,之前的缺了x,y两个参数

required = false因为用户不一定有这个需求,所以前端不一定会传过来,所以这里把required = false

1
2
3
4
5
6
7
8
9
@GetMapping("/of/type")
public Result queryShopByType(
        @RequestParam("typeId") Integer typeId,
        @RequestParam(value = "current", defaultValue = "1") Integer current,
        @RequestParam(value = "x", required = false) Double x,
        @RequestParam(value = "y", required = false) Double y
) {
   return shopService.queryShopByType(typeId, current, x, y);
}

修改ShopServiceImpl的queryShopByType方法

总体思路:

  1. 判断是否需要根据坐标查询
  2. 计算分页参数
  3. 查询redis、按照距离排序、分页
  4. 解析出id和距离
  5. 根据id查询Shop

这里面难点一个是从redis中提取数据要解析出来进行处理,另一个就是分页,因为redis GEO不支持从from-end的分页,只支持从0-end的分页,所以要自己手动截取from-end这一块的分页

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Override
    public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
        // 1.判断是否需要根据坐标查询
        if (x == null || y == null) {
            // 不需要坐标查询,按数据库查询
            Page<Shop> page = query()
                    .eq("type_id", typeId)
                    .page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
            // 返回数据
            return Result.ok(page.getRecords());
        }

        // 2.计算分页参数
        int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;
        int end = current * SystemConstants.DEFAULT_PAGE_SIZE;

        // 3.查询redis、按照距离排序、分页。结果:shopId、distance
        String key = SHOP_GEO_KEY + typeId;
        GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo() // GEOSEARCH key BYLONLAT x y BYRADIUS 10 WITHDISTANCE
                .search(
                        key,
                        //圆心
                        GeoReference.fromCoordinate(x, y),
                        //指定半径的值
                        new Distance(5000),
                        //includeDistance结果带上距离  ,limit表示分页查询最后一个到哪
                        RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end)
                );
        // 4.解析出id
        if (results == null) {
            return Result.ok(Collections.emptyList());
        }
        List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
      //因为后面是跳过的,所以这里也要判断是否超过了
        if (list.size() <= from) {
            // 没有下一页了,结束
            return Result.ok(Collections.emptyList());
        }
        // 4.1.截取 from ~ end的部分
        //因为前面从redis查询的时候,只能查询到0-end,所以这里要截取从from-end
        List<Long> ids = new ArrayList<>(list.size());
        //这里为了让店铺id和distance匹配,所以要建立一个map
        Map<String, Distance> distanceMap = new HashMap<>(list.size());
        //通过stream从from开始
        //这里需要遍历,从list中查找到店铺id和距离
        list.stream().skip(from).forEach(result -> {
            // 4.2.获取店铺id
            String shopIdStr = result.getContent().getName();
            ids.add(Long.valueOf(shopIdStr));
            // 4.3.获取距离
            Distance distance = result.getDistance();
            distanceMap.put(shopIdStr, distance);
        });
        // 5.根据id查询Shop
        String idStr = StrUtil.join(",", ids);
        List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
        //给每个shop设置距离
        for (Shop shop : shops) {
            shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
        }
        // 6.返回
        return Result.ok(shops);
    }

签到功能

BitMap

用数据库存储太耗内存了,所以使用redis的bitmap实现

我们按月来统计用户签到信息,签到记录为1,未签到则记录为0.

把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为位图(BitMap)。这样我们就用极小的空间,来实现了大量数据的表示

Redis中是利用string类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是 2^32个bit位。

image-20240522113018967

BitMap的操作命令有:

  • SETBIT:向指定位置(offset)存入一个0或1
  • GETBIT :获取指定位置(offset)的bit值
  • BITCOUNT :统计BitMap中值为1的bit位的数量
  • BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
  • BITFIELD_RO :获取BitMap中bit数组,并以十进制形式返回
  • BITOP :将多个BitMap的结果做位运算(与 、或、异或)
  • BITPOS :查找bit数组中指定范围内第一个0或1出现的位置
签到功能

UserController

1
2
3
4
 @PostMapping("/sign")
 public Result sign(){
    return userService.sign();
 }

UserServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public Result sign() {
    // 1.获取当前登录用户
    Long userId = UserHolder.getUser().getId();
    // 2.获取日期
    LocalDateTime now = LocalDateTime.now();
    // 3.拼接key
    String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
    String key = USER_SIGN_KEY + userId + keySuffix;
    // 4.获取今天是本月的第几天
    int dayOfMonth = now.getDayOfMonth();
    // 5.写入Redis SETBIT key offset 1
    stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);
    return Result.ok();
}
签到统计

统计当前已连续签到次数,即从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数

为了实现这个功能,首先要解决两个问题

  1. 如果得到本月到今天所有签到数据

    BITFIELD key GET u[dayOfMonth] 0

    获取当前这一天的位数,就可以这这一天以前的所有数据进行计算即可

  2. 如何从后往前遍历每个bit位

    因为bitMap返回的数据是十进制,统计1的数量,可以与1做与运算,如果返回零则当前为0,如果返回1,则将签到结果右移一位,继续与运算

具体实现

UserController

1
2
3
4
@GetMapping("/sign/count")
public Result signCount(){
    return userService.signCount();
}

UserServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
 @Override
    public Result signCount() {
        //获取当前的用户
        Long user_id= UserHolder.getUser().getId();
        //获取日期
        LocalDateTime dateTime=LocalDateTime.now();
        //拼接得到key
        String key_suffix=dateTime.format(DateTimeFormatter.ofPattern(":yyyyMM"));
        String key=USER_SIGN_KEY+user_id+key_suffix;
        //获取今天是本月第几天
        int dayOfMonth = dateTime.getDayOfMonth();
        //从redis中获取的签到记录,返回的是10进制数字
        // 5.获取本月截止今天为止的所有的签到记录,返回的是一个十进制的数字 BITFIELD sign:5:202203 GET u14 0
        List<Long> result = stringRedisTemplate.opsForValue().bitField(
                key,
                BitFieldSubCommands.create()
                        .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
        );
        if (result == null || result.isEmpty()) {
            // 没有任何签到结果
            return Result.ok(0);
        }
        Long num = result.get(0);
        if (num == null || num == 0) {
            return Result.ok(0);
        }
        // 6.循环遍历
        int count = 0;
        while (true) {
            // 6.1.让这个数字与1做与运算,得到数字的最后一个bit位  
          	// 判断这个bit位是否为0
            if ((num & 1) == 0) {
                // 如果为0,说明未签到,结束
                break;
            }else {
                // 如果不为0,说明已签到,计数器+1
                count++;
            }
            // 把数字右移一位,抛弃最后一个bit位,继续下一个bit位
            num >>>= 1;
        }
        return Result.ok(count);

    }

UV统计

HyperLogLog
  • UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
  • PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。

从上面定义可以看出来,UV是统计网站用户量,PV可以看出网站的用户凝度

UV统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖,那怎么处理呢?

Hyperloglog

  • 能够使用极少的内存来统计巨量的数据,在 Redis 中实现的 HyperLogLog,只需要12K内存就能统计2^64个数据。
  • 计数存在一定的误差,误差率整体较低。标准误差为 0.81% 。
  • 误差可以被设置辅助计算因子进行降低。

Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值

Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略。

测试百万数据的统计
1
2
3
4
5
6
7
8
9
10
11
12
13
    @Test
    void TestHyperLogLog(){
        String[]nums=new String[1000];
        int index=0;
        for(int i=1;i<1000000;i++){
            nums[index++]="user"+i;
            if(i%1000==0){
                index=0;
                stringRedisTemplate.opsForHyperLogLog().add("hl2",nums);
            }
        }
        System.out.println(stringRedisTemplate.opsForHyperLogLog().size());
    }

Hyperloglog原理

  1. 哈希函数

HyperLogLog使用哈希函数将输入数据(集合中的每个元素)映射为一个大范围的哈希值。哈希函数的目的是将输入数据均匀分布到一个大的整数范围内,通常是0, 2^m-1。

  1. 哈希值的二进制表示

对于每个哈希值,HyperLogLog关注其二进制表示中前导零的数量。前导零的数量越多,表示该哈希值越大(因为哈希值在大的整数范围内均匀分布)。

  1. 分桶

将哈希值分为多个桶(buckets)。假设有 2^b 个桶,其中 b 是一个参数。哈希值的前 b 位用于确定桶的编号,剩余的位用于计算前导零的数量。

  1. 记录最大前导零数量

对于每个桶,记录其中出现的最大前导零数量。这样,桶中的值可以表示为该桶所见到的最“大”的哈希值。

  1. 估算基数

通过桶中记录的最大前导零数量,使用以下公式估算集合的基数: image-20240527212015308 其中:

1
2
3
4
•	 E  是估算的基数
•	 \alpha_m  是一个常数,与桶的数量  m  相关
•	 m  是桶的数量,即  2^b 
•	 M[j]  是第  j  个桶中记录的最大前导零数量
  1. 偏差校正

对于小基数和大的基数,算法分别进行校正:

1
2
•	对于小基数,使用线性计数器进行校正。
•	对于大的基数,使用以下公式进行校正: ![image-20240527212054301](https://cdn.jsdelivr.net/gh/ddyycc123/imageloader@main/image-20240527212054301.png) 其中  V  是桶中值为零的数量。

关键点总结

1
2
3
4
•	哈希映射:将数据映射为均匀分布的哈希值。
•	前导零数量:计算并记录每个哈希值的前导零数量。
•	分桶记录:将哈希值分桶,并记录每个桶中出现的最大前导零数量。
•	基数估算:使用记录的前导零数量估算集合基数,并进行偏差校正。