Redis:缓存原理与高并发实战

Redis基础

初识Redis

认识NoSQL

SQL:结构化查询语言 => 关系型数据库

NoSQL:非关系型数据库

SQL与NoSQL的差异:

  1. 数据结构
    1. SQL结构化:表的信息依赖于表的结构
    2. NoSQL非结构化:存储的信息为KV形式
  2. 数据关联
    1. SQL关联:可以建立并维护表与表之间的关系(外键)
    2. NoSQL无关联:需要自行维护数据关联
  3. 查询方式
    1. SQL:使用SQL语句查询
    2. NoSQL:没有统一标准(Redis、MongoDB)
  4. 事物特性
    1. SQL:满足ACID
    2. NoSQL:基本可用BASE
  5. 存储方式
    1. SQL:磁盘
    2. NoSQL:内存(速度快)
  6. 扩展性
    1. SQL:垂直(本机)
    2. NoSQL:水平(支持分布式)

Redis特点

  1. KV结构
  2. 单线程,命令执行具备原子性
  3. 低延迟、性能快(基于内存、IO多路复用、C语言编写)
  4. 支持数据持久化
  5. 支持主从集群(扩展速度)、分片集群(扩展存储量)
  6. 支持多语言客户端

Redis安装

Redis官方只提供了Linux版本

Redis常见命令

Redis通用命令

KEYS:查看符合模板的所有key

不建议在生产环境(或集群的主节点)上使用。Redis单线程,会导致线程阻塞。

shell
KEYS *name*

DEL:删除key

shell
DEL username

DEL username password

EXISTS:判断key存在

shell
EXISTS username

EXPIRE:给key设置有效期,单位是秒(到期自动删除)

TTL:查看有效期(-2表示不存在、-1表示永不过期)

shell
EXPIRE name 20

TTL name

Redis数据结构

  1. String
  2. Hash(类似JSON)
  3. List(列表)
  4. Set(无重复元素的集合)
  5. SortedSet(有序集合)
  6. GEO(经纬度坐标)
  7. BitMap
  8. HyperLog
String

字符串,最简单的数据类型。上限是512M。

String的类型

  • string:字符串
  • int:整型,可以自增自减
  • float:浮点类型,可以自增自减

常见命令

  1. SET:添加、修改KV
  2. GET:通过K查询V
  3. MSET:批量添加
  4. MGET:批量查询
  5. INCR:整型i++(INCR age:age++)
  6. INCRBY:整型i+=n(INCRBY age 2:age += 2)
  7. INCRBYFLOAT:浮点i+=n(INCRBYFLOAT level 2:level += 2)
  8. SETNX:新增KV(若存在K则不添加)
  9. SETEX:添加KV,并指定有效期

key的结构:层级存储。用于key名称的设置,避免了key重复的问题

项目名:业务名:类型:id

hpan:user:username:1
hpan:admin:username:1
Hash

无序字典,类似JSON。

相比于序列化JSON,以字符串形式保存。Hash结构可以更方便地修改。

常见命令

  1. HSET key field value:添加、修改key的field值(HSET user:1 username wmh
  2. HGET key field:查询key的field值(HSET user:1 username
  3. HMSET、HMGET:批量
  4. HGETALL:查询所有的field和value
  5. HKEYS、HVALS:查询所有的field或value
  6. HINCRBY:整型value+=n(HINCRBY user:4 age 2:user[4].age+=2)
  7. HSETNX:新增field-value(若存在K则不添加)
List

双向链表,类似于LinkedList。可以正向、反向查找。

  1. LPUSH/RPUSH key value:左/右侧插入元素
  2. LPOP/RPOP key:左/右侧弹出元素
  3. LRANGE key i j:从i到j遍历元素
  4. BLPOP、BRPOP:阻塞式获取(有元素就取、没有元素就等)
  • 模拟栈:入口出口在同一边
  • 模拟队列:入口出口不在同一边
  • 模拟阻塞队列:入口出口不在同一边,出队使用BLPOP、BRPOP
Set

无序、元素不可重复、查找速度快、支持交集、并集、差集等。

单个Set:

  1. SADD key item...:添加item
  2. SREM key item...:移除item
  3. SCARD key:查询元素个数
  4. SISMEMBER key item:判断item是否存在
  5. SMEMBERS:获取所有元素

Set之间:

  1. SINTER k1 k2:交集
  2. SDIFF k1 k2:差集
  3. SUNION k1 k2:并集
SortedSet

每个元素都带一个score属性,根据score排序。可排序、元素不重复、查询速度快。

  1. ZADD key score item...:添加item
  2. ZREM key item...:移除item
  3. ZSCORE key item:查询item的score
  4. ZRANK key item:获取item的排名
  5. ZCARD key:查询元素个数
  6. ZCOUNT key min max:获取分数在min到max的元素个数
  7. ZRANGE key i j:获取排名为i到j的元素
  8. ZRANGEBYSCORE key min max:获取分数为min到max的元素
  9. ZINCRBY key n item:item.score+=n
  10. ZDIFF、ZINTER、ZUNION:差集交集并集

排名默认为升序,降序排名需要加上REV:ZRANK → ZREVRANK

Redis的Java客户端

客户端介绍

  1. Jedis:方法命名和Redis命令相似。线程不安全,多线程需要连接池。
  2. letture:基于Netty,支持同步、异步和响应式编程方式,线程安全。支持哨兵模式、集群模式和管道模式。
  3. Redisson:实现了分布式、可伸缩的Java数据结构。和使用原生集合一样使用Redis集合。

SpringDataRedis:整合了Jedis、letture

Jedis

redis/jedis: Redis Java client (github.com)

Jedis使用:

1、引入依赖

xml
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>5.0.0</version>
</dependency>

2、建立连接

java
Jedis jedis = new Jedis("localhost", 6379);
jedis.auth("123.com");
jedis.set("name", "wmhs");
jedis.close();

多线程 → 使用Jedis连接池

SpringDataRedis

简介

Spring Data Redis

  • 整合Lettuce和jedis
  • 提供了RedisTemplate统一API来操作Redis
  • 支持Redis的发布订阅模型、哨兵、集群
  • 支持基于Lettuce的响应式编程
  • 支持基于JDK、JSON、宇符串、Spring对象的数据序列化及反序列化
  • 支持基于Redis的DKCollection实现

SpringDataRedis中提供了RedisTemplate工具类,其中封装了各种对Redis的操作。

不同数据类型的操作API封装到了不同的类型中

自定义RedisSerializer

RedisTemplate可以接收Object作为值写入Redis。

But,采用JDK序列化Object为字节形式。占用内存大、可读性差。

So,可以自行编写RedisSerializer实现(使用jackson)。

But,写入的对象有@class属性,造成了额外开销。

StringRedisTemplate

Spring默认提供了一个StringRedisTemplate类,KV的序列化方式默认就是String方式。

如何存对象? → 自行序列化和反序列化

常见的JSON序列化库:

  1. gson(google 的)
  2. fastjson alibaba(阿里,快但是有Bug)
  3. jackson
  4. kryo

使用步骤:

1、导入pom.xml配置

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>    
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

2、配置application.yml

yaml
spring:
  data:
    redis:
      port: 6379
      host: 127.0.0.1
      password: 123456
      database: 0
      timeout: 10000
      lettuce:
        pool:
          enabled: true
          max-active: 100
          max-wait: 10000
          max-idle: 10
          min-idle: 1

3、编写测试类

不连接数据库启动SpringBoot项目

java
@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
java
@Test
void StringRedisTemplateTest() {
    ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
    // name
    ops.set("name", "wmh");
    System.out.println("name: " + ops.get("name"));
    // yx
    ops.set("yx", gson.toJson(new User("yx", 18)));
    String yx = ops.get("yx");
    System.out.println("yx: " + gson.fromJson(yx, User.class));
}

## 基于Token单点登录

获取验证码

流程图

mermaid
graph LR
	1([begin])
	2([end])
	a[提交手机号]
	b{手机号合法校验}
	e[保存到Redis]
	f[发送验证码]
	
	1 --> a
    a --> b
    b -->|"合法"| e --> f --> 2
    b -->|"不合法"| 1

业务实现

使用 Hutools工具类 生成随机验证码

java
String code = RandomUtil.randomNumbers(6);

将验证码保存到Redis

java
redisTemplate.opsForValue().set(
    REDIS_CODE_PRE + phone, code, 2, TimeUnit.MINUTES
);

登录

流程图

登录拦截和Token刷新

流程图

使用拦截器实现登录拦截和Token刷新

业务实现

拦截器配置类
java
@Configuration
public class MvcConfig implements WebMvcConfigurer {
    @Autowired
    private StringRedisTemplate template;
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new TokenInterceptor(template)).addPathPatterns(
            "/**"
        ).order(0);
        registry.addInterceptor(new LoginInterceptor()).addPathPatterns(
            "/user"
        ).order(2);
    }
}
Token刷新拦截器

拦截所有请求

用于刷新Token有效期,并将用户信息保存到ThreadLocal。之后使用ThreadLocal获取用户信息。

mermaid
graph LR
	1([请求])
	2([Controller])
	
	a[提交Token]
	b[在Redis中查询用户]
	
	1 --> a --> b
	b -->|不存在| 2
	b -->|存在| 查询用户信息 --> 保存到ThreadLocal --> Token续期 --> 2

在请求头上获取Token

java
String token = request.getHeader("authorization"));

使用ThreadLocal保存用户信息

java
public class UserHolder {
    private static final ThreadLocal<UserDTO> tl = new ThreadLocal<UserDTO>();

    public static void saveUser(UserDTO user) {
        tl.set(user);
    }

    public static UserDTO getUser() {
        return tl.get();
    }

    public static void removeUser() {
        tl.remove();
    }
}
登录拦截器

拦截需要登录才能访问的接口(/user 个人信息)

用于判断是否登录,通过ThreadLocal判空操作获取登录信息。

mermaid
graph LR
	1([请求])
	2([Controller])
	3(["❎ 401"])
	a[从ThreadLoacl中获取用户信息]
	1 -->a -->|存在| 2
	a -->|不存在| 3
java
@Data
public class LoginInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        if (UserHolder.getUser() == null) {
            response.setStatus(401);
            return false;
        }
        return true;
    }
}

缓存实现及其问题

认识缓存

缓存就是数据交换的缓冲区,是存贮数据的临时地方,一般读写性能较高。

缓存的作用:

  1. 降低后端负载
  2. 提高读写效率,降低响应时间

缓存的成本:

  1. 数据一致性成本
  2. 代码维护成本
  3. 运维成本

添加缓存

缓存作用模型

查询商铺缓存的流程

添加缓存业务代码

java
@Override
public List<UserDTO> getUserlist() {
    Gson gson = new Gson();
    // 1. 查询redis缓存
    String cache = redisTemplate.opsForValue().get(CACHE_LIST_PRE);
    // 2.1. 存在缓存
    if (StrUtil.isNotBlank(cache)) {
        // 3. 反序列化
        return gson.fromJson(cache, new TypeToken<List<UserDTO>>() {}.getType());
    }
    // 2.2. 不存在缓存
    // 3. 查询数据库
    List<User> userList = list();
    // 4. 信息脱敏
    ArrayList<UserDTO> userDTOList = new ArrayList<>();
    for (User user : userList) {
        UserDTO dto = new UserDTO();
        BeanUtil.copyProperties(user, dto);
        dto.setPhone(DesensitizedUtil.mobilePhone(dto.getPhone()));
        userDTOList.add(dto);
    }
    // 5. 保存到Redis
    redisTemplate.opsForValue()
        .set(CACHE_LIST_PRE, gson.toJson(userDTOList), 2, TimeUnit.MINUTES);
    return userDTOList;
}

缓存更新

缓存更新策略

内存淘汰超时剔除主动更新
说明利用Redis的内存淘汰机制,内存不足时自动淘汰部分数据。给缓存数据添加TTL时间,到期后自动删除缓存。下次查询时更新缓存。编写业务逻辑,在修改数据库的同时,更新缓存。
一致性一般
维护成本
  • 低一致性需求:使用内存淘汰机制
  • 高一致性需求:主动更新 + 超时剔除

主动更新策略

  • 读操作:
    • 缓存命中则直接返回
    • 缓存未命中则查询数据库,并写入缓存,设定超时时间
  • 写操作:
    • 先操作数据库,然后再删除缓存
    • 要确保数据库与缓存操作的原子性(事物/分布式事物)

缓存穿透

什么是缓存穿透

缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。

缓存空对象

优点:实现简单,维护方便

缺点:额外的内存消耗、可能造成短期的不一致

业务实现

java
// 缓存空对象
@Override
public UserDTO getInfoById(Long id) {
    // 缓存查询
    String userString = redisTemplate.opsForValue()
        .get(CACHE_USER_PRE + id);
    if (StrUtil.isNotBlank(userString)) {
        // 有缓存 => 真实数据
        return gson.fromJson(userString, UserDTO.class);
    }
    if (userString != null) {
        // 有缓存 => 空对象
        throw new BusinessException(404, "用户不存在");
    }
    // 数据库查询
    User user = getById(id);
    if (user == null) {
        // 缓存空对象
        redisTemplate.opsForValue()
            .set(CACHE_USER_PRE + id, "", 2, TimeUnit.MINUTES);
        throw new BusinessException(404, "用户不存在");
    }
    // 信息脱敏
    UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
    // 缓存真实数据
    redisTemplate.opsForValue()
        .set(CACHE_USER_PRE + id, gson.toJson(userDTO), 2, TimeUnit.MINUTES);
    return userDTO;
}

布隆过滤器

优点:内存占用较少,没有多余key

缺点:实现复杂、存在误判可能

image-20240117202714944

缓存雪崩

缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。

解决方案:

  1. 给不同的Key的TTL添加随机值
  2. 利用Redis集群提高服务的可用性
  3. 给缓存业务添加降级限流策略
  4. 给业务添加多级缓存

缓存击穿/热点Key

什么是缓存击穿

缓存击穿问题也叫热点Key问题:一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

互斥锁

优点:没有额外的内存消耗、保证一致性、实现简单

缺点:线程需要等待,性能受影响、可能有死锁风险

互斥锁流程图

互斥锁业务代码

java
// 热点key-互斥锁
@Override
public List<UserDTO> getUserlist() throws InterruptedException {
    // 1. 查询redis缓存
    String cache = redisTemplate.opsForValue().get(CACHE_LIST_PRE);
    // 2.1. 存在缓存
    if (StrUtil.isNotBlank(cache)) {
        // 3. 反序列化
        return gson.fromJson(cache, new TypeToken<List<UserDTO>>() {}.getType());
    }
    // ⭐️ 获取互斥锁
    String lock = REDIS_LOCK_PRE + "userlist";
    Boolean flag = redisTemplate.opsForValue()
        .setIfAbsent(lock, "1", 30, TimeUnit.SECONDS);
    if (BooleanUtil.isFalse(flag)) {
        // ⭐️ 获取锁失败了 => 休眠 + 递归
        Thread.sleep(200);
        return getUserlist();
    }

    ArrayList<UserDTO> userDTOList = new ArrayList<>();
    try {
        // 2.2. 不存在缓存
        // 3. 查询数据库
        List<User> userList = list();
        log.info("查询数据库");
        // 4. 信息脱敏
        for (User user : userList) {
            UserDTO dto = new UserDTO();
            BeanUtil.copyProperties(user, dto);
            dto.setPhone(DesensitizedUtil.mobilePhone(dto.getPhone()));
            userDTOList.add(dto);
        }
        // 5. 保存到Redis
        redisTemplate.opsForValue()
            .set(CACHE_LIST_PRE, gson.toJson(userDTOList), 10, TimeUnit.SECONDS);
    } catch (Exception ignored) {
    } finally {
        // ⭐️ 释放锁
        redisTemplate.delete(lock);
    }
    return userDTOList;
}

逻辑过期

优点:线程无需等待,性能较好

缺点:不保证一致性、有额外内存消耗、实现复杂

逻辑过期流程图

逻辑过期业务代码

1、LogicalExpiration逻辑过期实体类,使用泛型使其通用化

java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class LogicalExpiration<T> {
    private T value;
    private Date date;
}

2、核心业务代码

java
public UserDTO getUserById(Long id) {
    // 1. 查询缓存
    String userString = redisTemplate.opsForValue().get(CACHE_USER_PRE + id);
    if (StrUtil.isBlank(userString)) {
        // 没有缓存 -> 查询数据
        User user = getById(id);
        // 没有数据 -> 报错
        if (user == null) {
            throw new BusinessException(500, "用户不存在");
        }
        // 数据脱敏
        UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
        // 新建缓存
        Date date = new Date();
        date.setTime(System.currentTimeMillis() + 2 * 60 * 1000);
        redisTemplate.opsForValue().set(CACHE_USER_PRE + id, gson.toJson(
            new LogicalExpiration<>(userDTO, date)
        ));
        // 返回数据
        return userDTO;
    }
    // 存在缓存 => 反序列化拿到对象
    LogicalExpiration<UserDTO> logicalExpiration = gson.fromJson(
        userString, new TypeToken<LogicalExpiration<UserDTO>>() {}.getType());
    // 判断缓存是否过期
    if (logicalExpiration.getDate().before(new Date())) {
        // 已经过期 => 新建线程进行更新缓存
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            UserDTO userDTO = BeanUtil.copyProperties(getById(id), UserDTO.class);
            Date date = new Date();
            // 设置TTL为2min
            date.setTime(System.currentTimeMillis() + 2 * 60 * 1000);
            redisTemplate.opsForValue().set(CACHE_USER_PRE + id, gson.toJson(
                new LogicalExpiration<>(userDTO, date)
            ));
        });
    }
    // 返回数据
    return logicalExpiration.getValue();
}

购物秒杀及分布式锁

全局唯一ID

Redis自增ID策略

ID构造是:时间戳 + 计数器

每天一个key,方便统计订单量

业务实现

获取指定时间的秒数

java
LocalDateTime timeBegin = LocalDateTime.of(2024, 1, 1, 0, 0, 0);
long second = timeBegin.toEpochSecond(ZoneOffset.UTC);

获取当前时间的秒数

java
long now = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);

全局唯一ID业务代码

java
public Long getID(String key) {
    // 获取时间戳
    long now = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
    long timestamp = now - TIMESTAMP_BEGIN;

    // 利用redis实现自增长
    String date = new SimpleDateFormat("yyyyMMdd").format(new Date());
    Long inc = redisTemplate.opsForValue().increment("ID:" + key + ":" + date);

    // 拼接并返回
    return timestamp << 32 | inc;
}

实现秒杀下单

秒杀下单逻辑流程

业务实现

mermaid
graph LR
	begin(begin) 
	a{判断库存}
	b{判断存在}
	e(end)
	
	begin --> 提交商品id --> b -->|存在| a
	b -->|不存在| 返回异常
	a -->|充足| 扣减库存 --> 创建订单 --> 返回订单id --> e
	a -->|不足| 返回异常 --> e
java
@Transactional
public String getProduct(Long id) {
    // 判断商品是否存在
    Product pro = getById(id);
    if (pro == null) {
        throw new BusinessException(400, "商品不存在");
    }
    // 判断库存
    int num = pro.getNum();
    if (num <= 0) {
        throw new BusinessException(400, "库存不足");
    }
    // 库存 - 1
    pro.setNum(num - 1);
    update(pro, new QueryWrapper<>());
    
    // 下订单
    Ordertable order = new Ordertable();
    order.setOrderID(idUtil.getID("order").toString());
    order.setUserID(123);
    ordertableService.save(order);
    return order.getOrderID();
}

“超卖”问题

问题复现

在多线程并发会产生问题

使用5000个线程进行测试

20个库存卖了210个订单

原因分析

锁的类型

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁

悲观锁:认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。

  • 例如Synchronized、Lock都属于悲观锁

乐观锁:认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。

  • 如果没有修改则认为是安全的,自己才更新数据。
  • 如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。

乐观锁

乐观锁的关键是判断之前查询得到的数据是否有被修改过。(CAS法)

代码实现

mermaid
graph LR
	begin(begin) 
	a{判断库存}
	b{判断存在}
	c{"⭐️判断库存<br/>是否被修改"}
	e(end)
	
	begin --> 提交商品id --> b -->|存在| a
	a -->|充足| c -->|没被修改| 扣减库存 --> 创建订单 --> 返回订单id --> e
	b -->|不存在| 返回异常
	a -->|不足| 返回异常
    c -->|被修改| 返回异常
    返回异常 --> e
java
@Transactional
public String getProduct(Long id) {
    // 判断商品是否存在
    Product pro = getById(id);
    if (pro == null) {
        throw new BusinessException(400, "商品不存在");
    }
    // 判断库存
    int num = pro.getNum();
    if (num <= 0) {
        throw new BusinessException(400, "库存不足");
    }
    // 下订单
    Ordertable order = new Ordertable();
    order.setOrderID(idUtil.getID("order").toString());
    order.setUserID(111);
    // 库存 - 1
    pro.setNum(num - 1);
    boolean success = update(pro, new QueryWrapper<Product>()
                             .eq("num", num));
    if (!success) {
        throw new BusinessException(400, "下单失败");
    }
    // 保存订单
    ordertableService.save(order);
    return order.getOrderID();
}

经过多线程测试,成功解决超卖问题。

“一人一单”问题

问题分析

新要求:一个用户只能下一单

解决并发问题:使用 synchronized 锁机制。

业务实现

mermaid
graph LR
	begin(begin) 
	a{判断库存}
	b{判断存在}
	c{"判断库存<br/>是否被修改"}
	d{"⭐️判断是否<br/>为重复订单"}
	e(end)
	
	begin --> 提交商品id --> b -->|存在| a
	a -->|充足| c -->|没被修改| 扣减库存 --> d -->|"非重复订单"| 创建订单 --> 返回订单id --> e
	d -->|重复订单| 返回异常
	b -->|不存在| 返回异常
	a -->|不足| 返回异常
    c -->|被修改| 返回异常
    返回异常 --> e
java
@Transactional
public String getProductOnlyOne(Long id) {
    // 判断商品是否存在
    Product pro = getById(id);
    if (pro == null) {
        throw new BusinessException(400, "商品不存在");
    }
    // 判断库存
    int num = pro.getNum();
    if (num <= 0) {
        throw new BusinessException(400, "库存不足");
    }
    synchronized (UserHolder.getUser().getPhone()) {
        // ⭐️ 判断是否有重复订单
        long cnt = ordertableService.count(new QueryWrapper<Ordertable>().eq("userID", UserHolder.getUser().getPhone()).eq("productID", id));

        if (cnt != 0) {
            // ⭐️ 存在重复订单
            throw new BusinessException(400, "不允许重复下单");
        }
        // 下单
        Ordertable newOrder = new Ordertable();
        newOrder.setOrderID(idUtil.getID("order").toString());
        newOrder.setUserID(UserHolder.getUser().getPhone());
        newOrder.setProductID(Math.toIntExact(id));
        // 库存 - 1
        pro.setNum(num - 1);
        boolean success = update(pro, new QueryWrapper<Product>().eq("num", num));
        if (!success) {
            throw new BusinessException(400, "下单失败");
        }
        // 保存订单
        ordertableService.save(newOrder);
        return newOrder.getOrderID();
    }
}

分布式锁

单机锁的问题

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。

每个锁监视器只对当前JVM有效,集群模式依旧会产生并发安全问题。

什么是分布式锁

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

使用一个独立于JVM的锁监视器(分布式锁),让所有集群的JVM共享一个锁监视器,使得只有一个线程运行。

分布式锁的实现方式

分布式锁的核心是实现多进程之间互斥操作

MySQLRedisZookeeper
互斥利用mysql本身的互斥锁机制利用setnx互斥命令利用节点的唯一性和有序性实现互斥
高可用
高性能一般一般
安全性断开连接,自动释放锁利用锁超时时间,到期释放临时节点,断开连接自动释放

Redis分布式锁(1.0)

使用setnx互斥命令

sh
## 添加锁,NX是互斥、EX是设置超时时间
SET lock thread1 NX EX 10

## 释放锁
DEL key

Redis实现分布式锁流程图

image-20240120203619920

业务实现:

java
@Component
public class RedisLock {
    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final UUID uuid;

    static {
        uuid = UUID.randomUUID(true);
    }

    // 获取锁
    public boolean trylock(String key, Thread thread) {
        String value = uuid.toString() + thread.getId();
        Boolean success = redisTemplate
            .opsForValue().setIfAbsent(key, value, 10, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    // 删除锁
    public void unlock(String key) {
        redisTemplate.delete(key);
    }
}
java
@Transactional
@Override
public String getProductOnlyOneByRedis(Long id) {
    // 判断商品是否存在
    Product pro = getById(id);
    if (pro == null) {
        throw new BusinessException(400, "商品不存在");
    }
    // 判断库存
    int num = pro.getNum();
    if (num <= 0) {
        throw new BusinessException(400, "库存不足");
    }
    
    String lock_key = REDIS_LOCK_PRE + UserHolder.getUser().getPhone();
    try {
        // 获取分布式锁(给用户上锁)
        boolean lock = redisLock.trylock(lock_key, Thread.currentThread());
        if (!lock) {
            throw new BusinessException(400, "下单失败");
        }
        // 判断是否有重复订单
        long cnt = ordertableService.count(
            new QueryWrapper<Ordertable>()
            .eq("userID", UserHolder.getUser().getPhone())
            .eq("productID", id));

        if (cnt != 0) {
            // 存在重复订单
            throw new BusinessException(400, "不允许重复下单");
        }
        // 下单
        Ordertable newOrder = new Ordertable();
        newOrder.setOrderID(idUtil.getID("order").toString());
        newOrder.setUserID(UserHolder.getUser().getPhone());
        newOrder.setProductID(Math.toIntExact(id));
        // 库存 - 1
        pro.setNum(num - 1);
        boolean success = update(pro, new QueryWrapper<Product>().eq("num", num));
        if (!success) {
            throw new BusinessException(400, "下单失败");
        }
        // 保存订单
        ordertableService.save(newOrder);
        return newOrder.getOrderID();
    } finally {
        // 释放锁
        redisLock.unlock(lock_key);
    }
}

Redis的Lua脚本

使用Lua脚本:在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。

lua
-- 执行 set name jack
redis.call('set', 'name', 'jack')
lua
-- 先执行 set name jack
redis.call('set', 'name', 'jack')
-- 再执行 get name
local name = redis.call('get', 'name')
-- 返回
return name

需要用Redis命令来调用脚本:

shell
## 调用脚本 0: 脚本需要的key类型的参数个数
EVAL "return redis.call('set', 'name', 'jack')"  0

脚本中的key、value可以作为参数传递。

key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数。

sh
EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Rose

Redis分布式锁(2.0)

Redis分布式锁(1.0)存在的问题:线程阻塞会导致锁过期,其他线程抢占锁后,之前线程会释放掉不属于自己的锁。

解决办法:释放锁时检查释放是自己的锁。

新问题:检查自己的锁时,锁过期,被其他线程抢占锁,前一个线程再次释放掉其他线程的锁。

新解决办法:释放锁时检查释放是自己的锁(且需要保证原子性)

使用lua脚本操作redis,保证操作的原子性。

基于Redis的分布式锁实现思路:

  1. 利用set nx ex获取锁,并设置过期时间,保存线程标示
  2. 释放锁时先判断线程标示是否与自己一致,一致则删除锁

lua脚本逻辑:

  1. 获取锁中的线程标示
  2. 判断是否与指定的标示(当前线程标示)一致
  3. 如果一致则释放锁(删除)
  4. 如果不一致则什么都不做
lua
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
    -- 一致,则删除锁
    return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

使用java调用lua脚本

业务代码:

java
@Component
public class RedisLock {
    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final UUID uuid = UUID.randomUUID(true);

    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

    static {
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setLocation(new ClassPathResource("lua/unlock.lua"));
        UNLOCK_SCRIPT = redisScript;
    }

    // 获取锁
    public boolean trylock(String key, Thread thread) {
        Boolean success = redisTemplate.opsForValue().setIfAbsent(
            key, uuid.toString() + thread.getId(), 10, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    // 删除锁
    public void unlock(String key) {
        redisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(key),
                uuid.toString() + Thread.currentThread().getId());
    }
}

Redis分布式锁(2.0)存在的问题:

  1. 不可重入:同一个线程无法多次获取同一把锁
  2. 不可重试:获取锁只尝试一次就返回 false,没有重试机制
  3. 超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
  4. 主从一致性:如果 Redis 提供了主从集群,主从同步存在延迟,当主宕机时,如果从并同步主中的锁数据,则会出现锁实现

⭐️ Redisson

Redisson功能介绍

Redisson是一个分布式操作 Redis 的 Java 客户端(分布式 Redis 数据网格),可以像在使用本地的集合一样操作 Redis。Redisson还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

redisson/redisson: Redisson (github.com)

Redisson 官网

image-20240120223739525

Redisson使用入门

1、引入依赖

xml
<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.19.1</version>
</dependency>

2、配置Redisson

java
@Configuration
@ConfigurationProperties(prefix = "spring.data.redis")
@Data
public class RedissonConfig {
    private String host;
    private String port;
    private String password;

    @Bean
    public RedissonClient redissonClient() {
        // 1. 创建配置
        Config config = new Config();
        String redisAddress = String.format("redis://%s:%s", host, port);
        // 使用单个Redis
        config.useSingleServer()
            .setAddress(redisAddress).setPassword(password).setDatabase(0);
        // 2. 创建实例
        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }
}

可重入锁业务实现

解决同一个线程无法多次获取同一把锁的问题。

java
@Override
public String getProductOnlyOneByRedisson(Long id) {
    // 判断商品是否存在
    Product pro = getById(id);
    if (pro == null) {
        throw new BusinessException(400, "商品不存在");
    }
    // 判断库存
    int num = pro.getNum();
    if (num <= 0) {
        throw new BusinessException(400, "库存不足");
    }

    String lock_key = REDIS_LOCK_PRE + UserHolder.getUser().getPhone();
    RLock lock = redissonClient.getLock(lock_key);
    try {
        // ⭐️ 使用Redisson自带的可重入锁(给用户上锁)
        if (!lock.tryLock()) {
            throw new BusinessException(400, "下单失败");
        }
        // 判断是否有重复订单
        long cnt = ordertableService.count(
            new QueryWrapper<Ordertable>()
            .eq("userID", UserHolder.getUser().getPhone())
            .eq("productID", id));

        if (cnt != 0) {
            // 存在重复订单
            throw new BusinessException(400, "不允许重复下单");
        }
        // 下单
        Ordertable newOrder = new Ordertable();
        newOrder.setOrderID(idUtil.getID("order").toString());
        newOrder.setUserID(UserHolder.getUser().getPhone());
        newOrder.setProductID(Math.toIntExact(id));
        // 库存 - 1
        pro.setNum(num - 1);
        boolean success = update(pro, new QueryWrapper<Product>()
                                 .eq("num", num));
        if (!success) {
            throw new BusinessException(400, "下单失败");
        }
        // 保存订单
        ordertableService.save(newOrder);
        return newOrder.getOrderID();
    } finally {
        // ⭐️ 释放锁
        lock.unlock();
    }
}

可重入锁原理

image-20240121170553645

锁重试和看门狗机制

Redisson分布式锁原理:

  1. 可重入:利用hash结构记录线程id和重入次数
  2. 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
  3. 锁过期问题:watchDog看门狗机制,每隔一段时间(releaseTime / 3),重置超时时间

锁重试和看门狗机制流程图

看门狗原理:

  1. 监听当前线程,默认过期时间是 30 秒,每 10 秒续期一次(补到 30 秒)
  2. 如果线程挂掉(注意 debug 模式也会被它当成服务器宕机),则不会续期
java
@Test
void testWatchDog() {
    RLock lock = redissonClient.getLock("lock");
    try {
        // 只有一个线程能获取到锁(超时时间设置为-1)
        if (lock.tryLock(0, -1, TimeUnit.MILLISECONDS)) {
            // todo 实际要执行的代码
            Thread.sleep(300000);
            System.out.println("getLock: " + Thread.currentThread().getId());
        }
    } catch (InterruptedException e) {
        System.out.println(e.getMessage());
    } finally {
        // 只能释放自己的锁
        if (lock.isHeldByCurrentThread()) {
            System.out.println("unLock: " + Thread.currentThread().getId());
            lock.unlock();
        }
    }
}

分布式锁主从一致性

联锁机制:获取所有的节点都获取到锁才算成功。

业务实现:

Redis优化秒杀

优化秒杀思路

优化流程:使用异步操作提高吞吐量。

  1. 先利用Redis完成库存余量、一人一单判断,完成抢单业务
  2. 再将下单业务放入阻塞队列,利用独立线程异步下单

使用Redis存储秒杀库存数量和“一人一单”的订单信息,使用Lua脚本实现原子操作。

使用Lua脚本实现秒杀

使用lua脚本实现原子性

lua
-- 参数
local productID = ARGV[1]
local userID = ARGV[2]

-- 数据
local productKey = 'product:num:' .. productID
local orderKey = "product:order" .. productID

-- 业务
-- 库存判断
if tonumber(redis.call('get', productKey)) <= 0 then
    return 1
end
-- 是否下单过
if redis.call('sismember', orderKey, userID) == 1 then
    return 2
end

redis.call('incrby', productKey, -1)
redis.call('sadd', orderKey, userID)

return 0

执行lua脚本完成下单

java
@Override
public String getProductOnlyOneByRedis2(Long id) {
    DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
    redisScript.setLocation(new ClassPathResource("lua/shop.lua"));
    Long success = redisTemplate
        .execute(redisScript, Collections.emptyList(), id, 
                 UserHolder.getUser().getPhone());
    if (success == null || success != 0) {
        throw new BusinessException(400, "下单失败");
    }
    Long orderID = idUtil.getID("order");
    // todo 保存到阻塞队列
    return orderID.toString();
}

模拟创建订单

java
@Test
void addProduct() {
    Product pro = new Product();
    pro.setProductName("yixuan");
    pro.setNum(10);
    boolean success = productService.save(pro);
    String key = "RedisSessionDemo:productNum:" + pro.getId();
    String value = pro.getNum().toString();
    if (success) {
        stringRedisTemplate.opsForValue().set(key, value);
    }
    Assertions.assertNotNull(stringRedisTemplate.opsForValue().get(key));
}

基于阻塞队列实现异步下单

java
private static final DefaultRedisScript<Long> redisScript;

static {
    redisScript = new DefaultRedisScript<>();
    redisScript.setLocation(new ClassPathResource("lua/shop.lua"));
    redisScript.setResultType(Long.class);
}

private BlockingQueue<Ordertable> orderTasks = 
    new ArrayBlockingQueue<>(1024 * 1024);
private static final ExecutorService SHOP_ORDER_EXECUTOR = 
    Executors.newSingleThreadExecutor();

@PostConstruct
private void init() {
    SHOP_ORDER_EXECUTOR.submit(new OrderHandler());
}

private class OrderHandler implements Runnable {
    @Override
    public void run() {
        while (true) {
            try {
                // 获取队列
                Ordertable order = orderTasks.take();
                // 库存 - 1
                boolean success = update().setSql("num = num - 1")
                    .eq("id", order.getProductID())
                    .gt("num", 0)
                    .update();
                if (!success) {
                    throw new BusinessException(400, "下单失败");
                }
                // 保存订单
                ordertableService.save(order);
            } catch (Exception e) {
                log.error("", e);
            }
        }
    }
}

@Override
public String getProductOnlyOneByRedis2(Long id) {
    // 执行lua脚本
    String userPhone = UserHolder.getUser().getPhone();
    Long success = redisTemplate.execute(
        redisScript,Collections.emptyList(), id.toString(), userPhone);
    if (success == null || success != 0) {
        throw new BusinessException(400, "下单失败");
    }

    // 创建订单
    Long orderID = idUtil.getID("order");
    Ordertable order = new Ordertable();
    order.setOrderID(orderID.toString());
    order.setProductID(Math.toIntExact(id));
    order.setUserID(userPhone);

    // 保存到阻塞队列
    orderTasks.add(order);
    return orderID.toString();
}

Redis消息队列

什么是消息队列

消息队列(Message Queue):存放消息的队列。

最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

使用List模拟MQ

Redis的list数据结构是一个双向链表,很容易模拟出队列效果。

队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。

当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此应该使用BRPOP或者BLPOP来实现阻塞效果。

优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

基于PubSub的MQ

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

sh
## 订阅一个或多个频道
SUBSCRIBE channel [channel] 

## 向一个频道发送消息
PUBLISH channel msg

## 订阅与pattern格式匹配的所有频道
PSUBSCRIBE pattern[pattern]

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

基于Stream的MQ

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息的命令:XADD

读取消息:XREAD

① 读取第一个消息

image-20240130125054472image-20240130125054472

② 阻塞方式读取消息

image-20240130125106814image-20240130125106814

当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。

STREAM类型消息队列的XREAD命令特点:

  1. 消息可回溯
  2. 一个消息可以被多个消费者读取
  3. 可以阻塞读取
  4. 有消息漏读的风险

基于Stream消费者组的MQ

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。

特点:消息分流、消息标示、消息确认。

创建消费者组:

sh
XGROUP CREATE key groupName ID [MKSTREAM]
## 1. key:队列名称
## 2. groupName:消费者组名称
## 3. ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
## 4. MKSTREAM:队列不存在时自动创建队列

其它常见命令:

sh
## 删除指定的消费者组
XGROUP DESTORY key groupName

## 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername

## 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername

从消费者组读取消息:

sh
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
## 1. group:消费组名称
## 2. consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
## 3. count:本次查询的最大数量
## 4. BLOCK milliseconds:当没有消息时最长等待时间
## 5. NOACK:无需手动ACK,获取到消息后自动确认
## 6. STREAMS key:指定队列名称
## 7. ID:获取消息的起始ID:
## 7.1. ">":从下一个未消费的消息开始
## 7.2. 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

消费者监听消息的基本思路

java
while (true) {
    //尝试监听队列,使用阻塞模式,最长等待2000毫秒
    Object msg = redis.call(
        "XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");
    if (msg == null) {
        //null说明设有消息,继续下一次
        continue;
    }
    try {
        //处理消息,完成后一定要ACK
        handleMessage(msg);
    } catch (Exception e) {
        while (true) {
            Object msg2 = redis.call(
                "XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
            if (msg2 == null) {
                //null说明没有异常消息,所有消息都已确以,结束循环
                break;
            }
            try {
                //说明有异常消息,再次处理
                handleMessage(msg2);
            } catch (Exception e) {
                //再次出现异常,记录日志,继续循环
                continue;
            }
        }
    }
}

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

三种MQ实现方式对比

ListPubSubStream
消息持久化支持不支持支持
阻塞读取支持支持支持
消息堆积处理受限于内存空间,可以利用多消费者加快处理受限于消费者缓冲区受限于队列长度,可以利用消费者组提高消费速度,减少堆积
消息确认机制不支持不支持支持
消息回溯不支持不支持支持

使用基于Stream的MQ优化秒杀

修改Lua脚本创建stream消息队列

lua
-- 参数
local productID = ARGV[1]
local userID = ARGV[2]
local orderID = ARGV[3]

-- 数据
local productKey = 'RedisSessionDemo:productNum:' .. productID
local orderKey = "RedisSessionDemo:productOrder:" .. productID

-- 业务
-- 库存判断
if tonumber(redis.call('get', productKey)) <= 0 then
    return 1
end
-- 是否下单过
if redis.call('sismember', orderKey, userID) == 1 then
    return 2
end

redis.call('incrby', productKey, -1)
redis.call('sadd', orderKey, userID)

-- 下单
redis.call('xadd', 'stream.orders', '*',
        'orderID', orderID, 'userID', userID, 'productID', productID)

return 0

执行Lua脚本下单

java
private static final DefaultRedisScript<Long> StreamScript;

static {
    StreamScript = new DefaultRedisScript<>();
    StreamScript.setLocation(new ClassPathResource("lua/shop-stream.lua"));
    StreamScript.setResultType(Long.class);
}

@Override
public String getProductOnlyOneByStream(Long id) {
    String userPhone = UserHolder.getUser().getPhone();
    Long orderID = idUtil.getID("order");
    Long success = redisTemplate.execute(
        StreamScript, Collections.emptyList(), id, userPhone, orderID);
    if (success == null || success != 0) {
        throw new BusinessException(400, "下单失败");
    }
    return orderID.toString();
}

使用多线程读取MQ的信息

java
private static final ExecutorService SHOP_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

@PostConstruct
private void init() {
    SHOP_ORDER_EXECUTOR.submit(new StreamHandler());
}

private class StreamHandler implements Runnable {

    @Override
    public void run() {
        while (true) {
            try {
                // 获取stream
                // XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order >
                List<MapRecord<String, Object, Object>> list = 
                    redisTemplate.opsForStream().read(Consumer.from("g1", "c1"),           StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.order", ReadOffset.lastConsumed())
                );
                if (list == null || list.isEmpty()) {
                    // 没有消息
                    continue;
                }
                // 解析消息
                Map<Object, Object> values = list.get(0).getValue();
                Ordertable order = BeanUtil.fillBeanWithMap(
                    values, new Ordertable(), true);
                // 库存 - 1
                boolean success = update().setSql("num = num - 1")
                    .eq("id", order.getProductID())
                    .gt("num", 0)
                    .update();
                if (!success) {
                    throw new BusinessException(400, "下单失败");
                }
                // 保存订单
                ordertableService.save(order);
                // ACK 确认
                redisTemplate.opsForStream().acknowledge(
                    "stream.order","g1", list.get(0).getId());
            } catch (Exception e) {
                while (true) {
                    try {
                        // 读取 pending-list
                        List<MapRecord<String, Object, Object>> pendingList = redisTemplate.opsForStream()
                            .read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(
                                "stream.order", ReadOffset.from("0"))
                        );
                        if (pendingList == null) {
                            // pendingList 为空 -> 结束循环
                            break;
                        }
                        // 解析消息
                        Map<Object, Object> values = 
                            pendingList.get(0).getValue();
                        Ordertable order = BeanUtil.fillBeanWithMap(
                            values, new Ordertable(), true);
                        // 库存 - 1
                        boolean success = update().setSql("num = num - 1")
                            .eq("id", order.getProductID())
                            .gt("num", 0)
                            .update();
                        if (!success) {
                            throw new BusinessException(400, "下单失败");
                        }
                        // 保存订单
                        ordertableService.save(order);
                        // ACK 确认
                        redisTemplate.opsForStream()
                            .acknowledge("stream.order","g1", 
                                         pendingList.get(0).getId());
                    } catch (Exception excp) {
                        log.error("", excp);
                    }
                }
            }
        }
    }

好友关注

关注与取关

数据库设计

创建数据库表,保存关注信息

sql
create table follow
(
    id          int auto_increment
    primary key,
    userID      int null comment '用户id',
    followID    int                                 null comment '博主id',
    idDeleted   int       default 0                 null,
    createdTime timestamp default CURRENT_TIMESTAMP null,
    constraint id
    unique (id)
);

查询关注状态

java
public Boolean getFollow(Long followId) {
    String phone = UserHolder.getUser().getPhone();
    User user = userMapper.selectOne(new QueryWrapper<User>().eq("phone", phone));
    Integer userID = user.getId();
    return followMapper.exists(
        new QueryWrapper<Follow>().eq("followID", followId).eq("userID", userID)
    );
}

关注与取关

java
public Boolean setFollow(Long id) {
    String phone = UserHolder.getUser().getPhone();
    User user = userMapper
        .selectOne(new QueryWrapper<User>().eq("phone", phone));
    Integer userID = user.getId();
    boolean isFollowed = followMapper.exists(
        new QueryWrapper<Follow>()
        .eq("followID", id).eq("userID", userID)
    );
    if (isFollowed) {
        // 取关
        followMapper.delete(new QueryWrapper<Follow>()
                            .eq("followID", id).eq("userID", userID));
        return false;
    }
    // 关注
    Follow f = new Follow();
    f.setUserID(userID);
    f.setFollowID(Math.toIntExact(id));
    followMapper.insert(f);
    return true;
}

共同关注

优化关注

关注过程中,在redis维护一个set集合存放关注列表。通过set取交集的方式查询共同关注。

java
public Boolean setFollow(Long id) {
    String phone = UserHolder.getUser().getPhone();
    User user = userMapper.selectOne(new QueryWrapper<User>().eq("phone", phone));
    Integer userID = user.getId();
    boolean isFollowed = followMapper.exists(
        new QueryWrapper<Follow>().eq("followID", id).eq("userID", userID)
    );
    if (isFollowed) {
        // 取关
        followMapper.delete(new QueryWrapper<Follow>().eq("followID", id).eq("userID", userID));
        redisTemplate.opsForSet().remove("RedisSessionDemo:follow:" + userID, String.valueOf(id));
        return false;
    }
    // 关注
    Follow f = new Follow();
    f.setUserID(userID);
    f.setFollowID(Math.toIntExact(id));
    followMapper.insert(f);
    redisTemplate.opsForSet().add("RedisSessionDemo:follow:" + userID, String.valueOf(id));
    return true;
}

共同关注

java
public List<UserDTO> getCommonFollow(Long followID) {
    String phone = UserHolder.getUser().getPhone();
    User user = userMapper.selectOne(new QueryWrapper<User>().eq("phone", phone));
    Integer userID = user.getId();
    Set<String> commonIds = redisTemplate.opsForSet().intersect("RedisSessionDemo:follow:" + userID, "RedisSessionDemo:follow:" + followID);
    if (commonIds == null) {
        return null;
    }
    List<User> commonUser = userMapper.selectBatchIds(commonIds);
    ArrayList<UserDTO> results = new ArrayList<>();
    commonUser.forEach(u -> results.add(BeanUtil.copyProperties(u, UserDTO.class)));
    return results;
}

关注推送

Feed流

关注推送也叫做Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。

Feed流实现方案

拉模式

拉模式:也叫做读扩散。

image-20240215232926781image-20240215232926781

推模式

推模式:也叫做写扩散。

image-20240215232956590image-20240215232956590

推拉结合模式

推拉结合模式:也叫做读写混合,兼具推和拉两种模式的优点。

image-20240215233032063image-20240215233032063

实现方式对比
拉模式推模式推拉结合
写比例
读比例
用户读取延迟
实现难度复杂简单很复杂
使用场景很少使用用户量少、没有大V过千万的用户量,有大V

签到点赞和UV统计

点赞

点赞功能分析

需求:

  1. 同一个用户只能点赞一次,再次点击则取消点赞
  2. 如果当前用户已经点赞,则点赞按钮高亮显示(前端判断字段isLike属性)

实现步骤:

  1. 利用Redis的set集合判断是否点赞过,将用户id保存到set中
  2. 判断当前登录用户是否点赞过,赋值给isLike字段
  3. 通过Redis的set集合中Scard命令获取成员个数,即点赞次数

业务实现

LikedDTO

java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class LikedDTO {
    /**
     * 点赞数量
     */
    long likedSum;
    /**
     * 用户是否点过赞
     */
    Boolean isLiked;
}

点赞操作

java
// 点赞操作
@Override
public String doLike() {
    String key = "RedisSessionDemo:liked";
    String phone = UserHolder.getUser().getPhone();
    // 查询是否点赞过
    Boolean isLiked = redisTemplate.opsForSet().isMember(key, phone);
    if (BooleanUtil.isTrue(isLiked)) {
        // 点赞过 -> 取消点赞
        redisTemplate.opsForSet().remove(key, phone);
        return "取消点赞成功";
    }
    // 没点赞过 -> 点赞
    redisTemplate.opsForSet().add(key, phone);
    return "点赞成功";
}

获取点赞数据

java
// 获取点赞数据
@Override
public LikedDTO getLiked() {
    String key = "RedisSessionDemo:liked";
    Long likedNum = redisTemplate.opsForSet().size(key);
    if (likedNum == null) {
        likedNum = 0L;
    }
    UserDTO user = UserHolder.getUser();
    Boolean isLiked = false;
    if (user != null) {
        isLiked = redisTemplate.opsForSet().isMember(key, user.getPhone());
    }
    return new LikedDTO(likedNum, isLiked);
}

点赞排行

功能分析

点赞排行:类似朋友圈的点赞列表,按照点赞的先后顺序展示头像等信息。

使用 sorted set 结构,将点赞的时间戳作为分数值记录。

功能实现

修改点赞函数

java
// 获取点赞数据
@Override
public LikedDTO getLiked2() {
    String key = "RedisSessionDemo:liked";
    Long likedNum = redisTemplate.opsForZSet().size(key);
    if (likedNum == null) {
        likedNum = 0L;
    }
    UserDTO user = UserHolder.getUser();
    boolean isLiked = false;
    if (user != null) {
        Double score = redisTemplate.opsForZSet().score(key, user.getPhone());
        isLiked = (score != null && score > 0);
    }
    return new LikedDTO(likedNum, isLiked);
}

// 点赞操作
@Override
public String doLike2() {
    String key = "RedisSessionDemo:liked";
    String phone = UserHolder.getUser().getPhone();
    // 查询是否点赞过
    Double isLiked = redisTemplate.opsForZSet().score(key, phone);
    if (isLiked != null && isLiked > 0) {
        // 点赞过 -> 取消点赞
        redisTemplate.opsForZSet().remove(key, phone);
        return "取消点赞成功";
    }
    // 没点赞过 -> 点赞
    redisTemplate.opsForZSet().add(key, phone, System.currentTimeMillis());
    return "点赞成功";
}

获取点赞列表

java
// 获取点赞列表
@Override
public List<String> getLikedList() {
    String key = "RedisSessionDemo:liked";
    // 获取所有元素
    Set<String> set = redisTemplate.opsForZSet().range(key, 0, -1);
    if (set != null) {
        return new ArrayList<>(set);
    }
    return Collections.emptyList();
}

用户签到

BitMap用法

我们按月来统计用户签到信息,签到记录为1,未签到则记录为0。

把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为位图(BitMap)。

Redis中是利用string类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是 232 个bit位。

BitMap的操作命令有:

  1. SETBIT:向指定位置(offset)存入一个0或1
  2. GETBIT :获取指定位置(offset)的bit值
  3. BITCOUNT :统计BitMap中值为1的bit位的数量
  4. BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
  5. BITFIELD_RO :获取BitMap中bit数组,并以十进制形式返回
  6. BITOP :将多个BitMap的结果做位运算(与 、或、异或)
  7. BITPOS :查找bit数组中指定范围内第一个0或1出现的位置

实现签到功能

因为BitMap底层是基于String数据结构,因此其操作也都封装在字符串相关操作中了。

java
public Boolean sign() {
    String phone = UserHolder.getUser().getPhone();
    Date date = new Date();
    String yearAndMonth = new SimpleDateFormat("yyyy:MM").format(date);
    String key = "RedisSessionDemo:user:sign:" + phone + ":" + yearAndMonth;
    int day = Integer.parseInt(new SimpleDateFormat("DD").format(date));
    // 实现签到
    redisTemplate.opsForValue().setBit(key, day, true);
    return true;
}

签到统计

连续签到:从最后一次签到开始向前统计,直到遇到第一次未签到为止的签到次数

封装SignData类

java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SignData {
    // 月签到次数
    Integer MonthTimes;
    // 月连续签到次数
    Integer ContinuousTimes;
}

业务实现

java
@Override
public SignData signdata() {
    // 获取 bitmap
    String phone = UserHolder.getUser().getPhone();
    Date date = new Date();
    String yearAndMonth = new SimpleDateFormat("yyyy:MM").format(date);
    String key = "RedisSessionDemo:user:sign:" + phone + ":" + yearAndMonth;
    int day = Integer.parseInt(new SimpleDateFormat("DD").format(date));
    List<Long> list = redisTemplate.opsForValue().bitField(key, BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(day + 1)).valueAt(0));
    if (list == null || list.isEmpty()) {
        return new SignData(0, 0);
    }
    Long sign = list.get(0);
    if (sign == null) {
        return new SignData(0, 0);
    }

    // 统计计算
    int MonthTimes = 0;
    int ContinuousTimes = 0;
    boolean isContinuous = true;
    while (sign != 0) {
        // 连续签到
        if (isContinuous) {
            if ((sign & 1) == 1) {
                ContinuousTimes++;
            } else {
                isContinuous = false;
            }
        }
        // 月签到次数
        if ((sign & 1) == 1) {
            MonthTimes++;
        }
        sign = sign >> 1;
    }
    return new SignData(MonthTimes, ContinuousTimes);
}

UV统计

HyperLogLog

  • UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
  • PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。

UV统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖。

Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。相关算法

Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb,内存占用低的令人发指!

作为代价,其测量结果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略。

  1. 作用:做海量数据的统计工作
  2. 优点:内存占用极低、性能非常好
  3. 缺点:有一定的误差

业务实现

java
@Test
void hyperlogTest() {
    for (int i = 0; i < 100; i++) {
        stringRedisTemplate.opsForHyperLogLog().add("hyperlogTest", "user-" + i);
    }
    Long size = stringRedisTemplate.opsForHyperLogLog().size("hyperlogTest");
    System.out.println(size);
}

位置相关操作

GEO数据结构

GEO就是Geolocation的简写形式,代表地理坐标。

常见的命令有:

  1. GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
  2. GEODIST:计算指定的两个点之间的距离并返回
  3. GEOHASH:将指定member的坐标转为hash字符串形式并返回
  4. GEOPOS:返回指定member的坐标
  5. GEOSEARCH:在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。
  6. GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。

与北京站之间的距离

向redis中添加北京站坐标

java
@Test
void test1() {
    // 北京站: 116.42803 39.903738
    double x = 116.42803;
    double y = 39.903738;
    String key = "RedisSessionDemo:geo";
    Long isSuccess = stringRedisTemplate.opsForGeo()
        .add(key, new Point(x, y), "beijingStation");
    System.out.println(isSuccess);
}

计算与北京站之间的距离

java
public Double getDistance(Double x, Double y) {
    String key = "RedisSessionDemo:geo";
    stringRedisTemplate.opsForGeo().add(key, new Point(x, y), "temp");
    Distance distance = stringRedisTemplate.opsForGeo()
        .distance(key, "temp", "beijingStation");
    stringRedisTemplate.opsForGeo().remove(key, "temp");
    if (distance == null) {
        return -1.0;
    }
    return distance.getValue();
}

查看附近的火车站

预先添加北京三个火车站坐标

java
@Test
void test3() {
    String key = "RedisSessionDemo:geo";
    // 北京站: 116.42803 39.903738
    stringRedisTemplate.opsForGeo()
        .add(key, new Point(116.42803, 39.903738), "北京站");
    // 北京南站: 116.378248 39.865275
    stringRedisTemplate.opsForGeo()
        .add(key, new Point(116.378248, 39.865275), "北京南站");
    // 北京西站: 116.322287 39.893729
    stringRedisTemplate.opsForGeo()
        .add(key, new Point(116.322287, 39.893729), "北京西站");
}

查询附近的火车站

java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class GEODTO {
    String name;
    double distance;
}
java
@Override
public List<GEODTO> nearbyStation(Double x, Double y) {
    String key = "RedisSessionDemo:geo";
    stringRedisTemplate.opsForGeo().add(key, new Point(x, y), "temp");
    GeoResults<RedisGeoCommands.GeoLocation<String>> geoResults = stringRedisTemplate.opsForGeo().search(key, GeoReference.fromCoordinate(x, y), new Distance(100 * 1000), RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance());
    List<GEODTO> result = new ArrayList<>();
    if (geoResults != null) {
        geoResults.getContent().stream().skip(1).forEach(item -> {
            String name = item.getContent().getName();
            double distance = item.getDistance().getValue();
            result.add(new GEODTO(name, distance));
        });
    }
    return result;
}

image-20240201213817837## 分布式缓存

单点 Redis 的问题

  1. 数据丢失(持久化)
  2. 并发能力不如集群(主从集群、读写分离)
  3. Redis宕机导致服务不可用(Redis哨兵)
  4. 存储能力差(分片集群)

Redis 持久化

RDB 持久化

什么是RDB

RDB (Redis Database Backup file):数据快照

默认保存在运行目录

sh
## 主进程保存快照(阻塞)
save

## 子进程保存快照
bgsave

Redis 停机时会执行一次RDB。

RDB 触发机制

在 redis.conf 中配置

sh
## 900秒内,如果至少有1个key被修改,则执行bgsave
## 如果是 save "" 则表示禁用RDB
save 900 1  
save 300 10  
save 60 10000 

## 是否压缩:建议不开启,压缩也会消耗cpu,磁盘的话不值钱
rdbcompression yes

## RDB文件名称
dbfilename dump.rdb  

## 文件保存的路径目录
dir ./
RDB 原理
  1. fork主进程得到一个子进程,共享内存空间
  2. 子进程读取内存数据并写入新的RDB文件
  3. 用新RDB文件替换旧的RDB文件。

fork采用的是copy-on-write技术:

  • 当主进程执行读操作时,访问共享内存;
  • 当主进程执行写操作时,则会拷贝一份数据,执行写操作。

AOF 持久化

什么是AOF

AOF (Append Only File):命令日志

AOF 配置

AOF默认是关闭的,需要在 redis.conf 中配置

sh
## 是否开启AOF功能,默认是no
appendonly yes
## AOF文件的名称
appendfilename "appendonly.aof"

在 redis.conf 中配置命令记录的频率(刷盘时机)

sh
## 表示每执行一次写命令,立即记录到AOF文件
appendfsync always 
## 写命令执行完先放入AOF缓冲区,然后表示每隔1秒将缓冲区数据写到AOF文件,是默认方案
appendfsync everysec 
## 写命令执行完先放入AOF缓冲区,由操作系统决定何时将缓冲区内容写回磁盘
appendfsync no
配置项刷盘时机优点缺点
Always同步刷盘可靠性高,几乎不丢数据性能影响大
everysec每秒刷盘性能适中最多丢失1秒数据
no操作系统控制性能最好可靠性较差,可能丢失大量数据

配置重写AOF文件:执行bgrewriteaof命令,可以让AOF文件执行重写功能

AOF会记录对同一个key的多次写操作,但只有最后一次写操作才有意义

sh
## AOF文件比上次文件 增长超过多少百分比则触发重写
auto-aof-rewrite-percentage 100

## AOF文件体积最小多大以上才触发重写 
auto-aof-rewrite-min-size 64mb

持久化方式对比

持久化方式RDBAOF
数据完整性不完整,两次备份之间会丢失相对完整,取决于刷盘策略
文件大小会有压缩,文件体积小记录命令,文件体积很大
宕机恢复速度很快
数据恢复优先级低,因为数据完整性不如AOF高,因为数据完整性更高
系统资源占用高,大量CPU和内存消耗低,主要是磁盘IO资源,
但AOF重写时会占用大量CPU和内存资源
使用场景可以容忍数分钟的数据丢失,追求更快的启动速度对数据安全性要求较高常见

Redis 主从集群

搭建主从架构

主从集群:提高并发能力,实现读写分离。

集群结构

共包含三个节点,一个主节点,两个从节点。

这里我们会在同一台虚拟机中开启3个redis实例,模拟主从集群,信息如下:

IPPORT角色
192.168.150.1017001master
192.168.150.1017002slave
192.168.150.1017003slave
准备实例和配置

要在同一台虚拟机开启3个实例,必须准备三份不同的配置文件和目录,配置文件所在目录也就是工作目录。

1)创建目录

我们创建三个文件夹,名字分别叫7001、7002、7003:

sh
## 进入/tmp目录
cd /tmp
## 创建目录
mkdir 7001 7002 7003

如图:

image-20210630113929868image-20210630113929868

2)恢复原始配置

修改redis-6.2.4/redis.conf文件,将其中的持久化模式改为默认的RDB模式,AOF保持关闭状态。

properties
## 开启RDB
## save ""
save 3600 1
save 300 100
save 60 10000

## 关闭AOF
appendonly no

3)拷贝配置文件到每个实例目录

然后将redis-6.2.4/redis.conf文件拷贝到三个目录中(在/tmp目录执行下列命令):

sh
## 方式一:逐个拷贝
cp redis-6.2.4/redis.conf 7001
cp redis-6.2.4/redis.conf 7002
cp redis-6.2.4/redis.conf 7003

## 方式二:管道组合命令,一键拷贝
echo 7001 7002 7003 | xargs -t -n 1 cp redis-6.2.4/redis.conf

4)修改每个实例的端口、工作目录

修改每个文件夹内的配置文件,将端口分别修改为7001、7002、7003,将rdb文件保存位置都修改为自己所在目录(在/tmp目录执行下列命令):

sh
sed -i -e 's/6379/7001/g' -e 's/dir .\//dir \/tmp\/7001\//g' 7001/redis.conf
sed -i -e 's/6379/7002/g' -e 's/dir .\//dir \/tmp\/7002\//g' 7002/redis.conf
sed -i -e 's/6379/7003/g' -e 's/dir .\//dir \/tmp\/7003\//g' 7003/redis.conf

5)修改每个实例的声明IP

虚拟机本身有多个IP,为了避免将来混乱,我们需要在redis.conf文件中指定每一个实例的绑定ip信息,格式如下:

properties
## redis实例的声明 IP
replica-announce-ip 192.168.150.101

每个目录都要改,我们一键完成修改(在/tmp目录执行下列命令):

sh
## 逐一执行
sed -i '1a replica-announce-ip 192.168.150.101' 7001/redis.conf
sed -i '1a replica-announce-ip 192.168.150.101' 7002/redis.conf
sed -i '1a replica-announce-ip 192.168.150.101' 7003/redis.conf

## 或者一键修改
printf '%s\n' 7001 7002 7003 | xargs -I{} -t sed -i '1a replica-announce-ip 192.168.150.101' {}/redis.conf
启动

为了方便查看日志,我们打开3个ssh窗口,分别启动3个redis实例,启动命令:

sh
## 第1个
redis-server 7001/redis.conf
## 第2个
redis-server 7002/redis.conf
## 第3个
redis-server 7003/redis.conf

启动后:

image-20210630183914491image-20210630183914491

如果要一键停止,可以运行下面命令:

sh
printf '%s\n' 7001 7002 7003 | xargs -I{} -t redis-cli -p {} shutdown
开启主从关系

现在三个实例还没有任何关系,要配置主从可以使用replicaof 或者slaveof(5.0以前)命令。

有临时和永久两种模式:

  • 修改配置文件(永久生效)

    • 在redis.conf中添加一行配置:slaveof <masterip> <masterport>
  • 使用redis-cli客户端连接到redis服务,执行slaveof命令(重启后失效):

    sh
    slaveof <masterip> <masterport>

注意:在5.0以后新增命令replicaof,与salveof效果一致。

这里我们为了演示方便,使用方式二。

通过redis-cli命令连接7002,执行下面命令:

sh
## 连接 7002
redis-cli -p 7002
## 执行slaveof
slaveof 192.168.150.101 7001

通过redis-cli命令连接7003,执行下面命令:

sh
## 连接 7003
redis-cli -p 7003
## 执行slaveof
slaveof 192.168.150.101 7001

然后连接 7001节点,查看集群状态:

sh
## 连接 7001
redis-cli -p 7001
## 查看状态
info replication

结果:

image-20210630201258802image-20210630201258802

测试

执行下列操作以测试:

  • 利用redis-cli连接7001,执行set num 123

  • 利用redis-cli连接7002,执行get num,再执行set num 666

  • 利用redis-cli连接7003,执行get num,再执行set num 888

可以发现,只有在7001这个master节点上可以执行写操作,7002和7003这两个slave节点只能执行读操作。

数据主从同步原理

全量同步

主从第一次同步是全量同步

slave 做数据同步,必须向master声明自己的 Replid 和offset,master才可以判断到底需要同步哪些数据

  • Replication Id是数据集的身份标识,每个主节点(master)都有一个唯一的标识,而从节点(slave)会继承主节点的标识。通过比较 Replid 可以确定是否同步的是同一个数据集。
  • 偏移量(offset)是一个数字,随着数据在备份日志(repl_baklog)中的增加而逐渐增大。从节点在完成数据同步时也会记录当前的偏移量。如果从节点的偏移量小于主节点的偏移量,说明从节点的数据落后于主节点,需要进行数据更新同步。

简述全量同步的流程:

  1. slave节点请求增量同步
  2. master节点判断replid,发现不一致,拒绝增量同步
  3. master将完整内存数据生成RDB,发送RDB到slave
  4. slave清空本地数据,加载master的RDB
  5. master将RDB期间的命令记录在repl_baklog,并持续将log中的命令发送给slave
  6. slave执行接收到的命令,保持与master之间的同步
增量同步

如果slave重启后同步,则执行增量同步

repl_baklog大小有上限,写满后会覆盖最早的数据。如果slave断开时间过久,导致尚未备份的数据被覆盖,则无法基于log做增量同步,只能再次全量同步。

主从集群优化

  1. 在master中配置repl-diskless-sync yes启用无磁盘复制RDB,避免全量同步时的磁盘IO。
  2. Redis单节点上的内存占用不要太大,减少RDB导致的过多磁盘IO
  3. 适当提高repl_baklog的大小,发现slave宕机时尽快实现故障恢复,尽可能避免全量同步
  4. 限制一个master上的slave节点数量,如果实在是太多slave,则可以采用主-从-从链式结构,减少master压力

Redis 哨兵机制

哨兵作用和原理

Redis提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复。

哨兵的作用
  • 监控:Sentinel 会不断检查您的master和slave是否按预期工作
  • 自动故障恢复:如果master故障,Sentinel会将一个slave提升为master。当故障实例恢复后也以新的master为主
  • 通知:Sentinel充当Redis客户端的服务发现来源,当集群发生故障转移时,会将最新信息推送给Redis的客户端

服务状态监控

Sentinel基于心跳机制监测服务状态。

每隔1秒发送一次ping命令,如果超过一定时间没有相向则认为是主观下线

如果大多数sentinel都认为实例主观下线,则判定服务客观下线

quorum值最好超过Sentinel实例数量的一半。

image-20240506174805745
选举新的master

一旦发现master故障,sentinel需要在salve中选择一个作为新的master

  1. 首先会判断slave节点与master节点断开时间长短,如果超过指定值(down-after-milliseconds * 10)则会排除该slave节点
  2. 然后判断slave节点的slave-priority值,越小优先级越高,如果是0则永不参与选举
  3. 如果slave-prority一样,则判断slave节点的offset值,越大说明数据越新,优先级越高
  4. 最后是判断slave节点的运行id大小 (随机数),越小优先级越高。
故障转移
  1. 首先选定一个slave作为新的master,执行slaveof no one
  2. 然后让所有节点都执行slaveof 新master
  3. 修改故障节点,执行slaveof 新master
image-20240506175021137

搭建哨兵集群

集群结构

这里我们搭建一个三节点形成的Sentinel集群,来监管之前的Redis主从集群。如图:

image-20210701215227018

三个sentinel实例信息如下:

节点IPPORT
s1192.168.150.10127001
s2192.168.150.10127002
s3192.168.150.10127003
准备实例和配置

要在同一台虚拟机开启3个实例,必须准备三份不同的配置文件和目录,配置文件所在目录也就是工作目录。

我们创建三个文件夹,名字分别叫s1、s2、s3:

sh
## 进入/tmp目录
cd /tmp
## 创建目录
mkdir s1 s2 s3

如图:

image-20210701215534714image-20210701215534714

然后我们在s1目录创建一个sentinel.conf文件,添加下面的内容:

ini
port 27001
sentinel announce-ip 192.168.150.101
sentinel monitor mymaster 192.168.150.101 7001 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
dir "/tmp/s1"

解读:

  • port 27001:是当前sentinel实例的端口
  • sentinel monitor mymaster 192.168.150.101 7001 2:指定主节点信息
    • mymaster:主节点名称,自定义,任意写
    • 192.168.150.101 7001:主节点的ip和端口
    • 2:选举master时的quorum值

然后将s1/sentinel.conf文件拷贝到s2、s3两个目录中(在/tmp目录执行下列命令):

sh
## 方式一:逐个拷贝
cp s1/sentinel.conf s2
cp s1/sentinel.conf s3
## 方式二:管道组合命令,一键拷贝
echo s2 s3 | xargs -t -n 1 cp s1/sentinel.conf

修改s2、s3两个文件夹内的配置文件,将端口分别修改为27002、27003:

sh
sed -i -e 's/27001/27002/g' -e 's/s1/s2/g' s2/sentinel.conf
sed -i -e 's/27001/27003/g' -e 's/s1/s3/g' s3/sentinel.conf
启动

为了方便查看日志,我们打开3个ssh窗口,分别启动3个redis实例,启动命令:

sh
## 第1个
redis-sentinel s1/sentinel.conf
## 第2个
redis-sentinel s2/sentinel.conf
## 第3个
redis-sentinel s3/sentinel.conf

启动后:

image-20210701220714104image-20210701220714104

测试

尝试让master节点7001宕机,查看sentinel日志:

image-20210701222857997image-20210701222857997

查看7003的日志:

image-20210701223025709image-20210701223025709

查看7002的日志:

image-20210701223131264image-20210701223131264

RedisTemplate 哨兵模式

1、在pom文件中引入redis的starter依赖

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2、在application.yml中指定sentinel

yaml
spring:
	redis:
		sentinel:
		master: mymaster ## 指定master名称
		nodes: ## 指定redis-sentinel集群信息 
			- 192.168.150.101:27001
			- 192.168.150.101:27002
			- 192.168.150.101:27003

3、配置主从读写分离

java
@Bean
public LettuceClientConfigurationBuilderCustomizer configurationr(){
    return configBuilder -> configBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}

这里的ReadFrom是配置Redis的读取策略,是一个枚举,包括下面选择:

  1. MASTER:从主节点读取
  2. MASTER_PREFERRED:优先从master节点读取,master不可用才读取replica
  3. REPLICA:从slave(replica)节点读取
  4. REPLICA _PREFERRED:优先从slave(replica)节点读取,所有的slave都不可用才读取master

Redis 分片集群

搭建分片集群

集群结构

分片集群需要的节点数量较多,这里我们搭建一个最小的分片集群,包含3个master节点,每个master包含一个slave节点,结构如下:

image-20210702164116027image-20210702164116027

这里我们会在同一台虚拟机中开启6个redis实例,模拟分片集群,信息如下:

IPPORT角色
192.168.150.1017001master
192.168.150.1017002master
192.168.150.1017003master
192.168.150.1018001slave
192.168.150.1018002slave
192.168.150.1018003slave
准备实例和配置

删除之前的7001、7002、7003这几个目录,重新创建出7001、7002、7003、8001、8002、8003目录:

sh
## 进入/tmp目录
cd /tmp
## 删除旧的,避免配置干扰
rm -rf 7001 7002 7003
## 创建目录
mkdir 7001 7002 7003 8001 8002 8003

在/tmp下准备一个新的redis.conf文件,内容如下:

ini
port 6379
## 开启集群功能
cluster-enabled yes
## 集群的配置文件名称,不需要我们创建,由redis自己维护
cluster-config-file /tmp/6379/nodes.conf
## 节点心跳失败的超时时间
cluster-node-timeout 5000
## 持久化文件存放目录
dir /tmp/6379
## 绑定地址
bind 0.0.0.0
## 让redis后台运行
daemonize yes
## 注册的实例ip
replica-announce-ip 192.168.150.101
## 保护模式
protected-mode no
## 数据库数量
databases 1
## 日志
logfile /tmp/6379/run.log

将这个文件拷贝到每个目录下:

sh
## 进入/tmp目录
cd /tmp
## 执行拷贝
echo 7001 7002 7003 8001 8002 8003 | xargs -t -n 1 cp redis.conf

修改每个目录下的redis.conf,将其中的6379修改为与所在目录一致:

sh
## 进入/tmp目录
cd /tmp
## 修改配置文件
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t sed -i 's/6379/{}/g' {}/redis.conf
启动

因为已经配置了后台启动模式,所以可以直接启动服务:

sh
## 进入/tmp目录
cd /tmp
## 一键启动所有服务
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-server {}/redis.conf

通过ps查看状态:

sh
ps -ef | grep redis

发现服务都已经正常启动:

image-20210702174255799image-20210702174255799

如果要关闭所有进程,可以执行命令:

sh
ps -ef | grep redis | awk '{print $2}' | xargs kill

或者(推荐这种方式):

sh
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-cli -p {} shutdown
创建集群

虽然服务启动了,但是目前每个服务之间都是独立的,没有任何关联。

我们需要执行命令来创建集群,在Redis5.0之前创建集群比较麻烦,5.0之后集群管理命令都集成到了redis-cli中。

1)Redis5.0之前

Redis5.0之前集群命令都是用redis安装包下的src/redis-trib.rb来实现的。因为redis-trib.rb是有ruby语言编写的所以需要安装ruby环境。

sh
## 安装依赖
yum -y install zlib ruby rubygems
gem install redis

然后通过命令来管理集群:

sh
## 进入redis的src目录
cd /tmp/redis-6.2.4/src
## 创建集群
./redis-trib.rb create --replicas 1 192.168.150.101:7001 192.168.150.101:7002 192.168.150.101:7003 192.168.150.101:8001 192.168.150.101:8002 192.168.150.101:8003

2)Redis5.0以后

我们使用的是Redis6.2.4版本,集群管理以及集成到了redis-cli中,格式如下:

sh
redis-cli --cluster create --cluster-replicas 1 192.168.150.101:7001 192.168.150.101:7002 192.168.150.101:7003 192.168.150.101:8001 192.168.150.101:8002 192.168.150.101:8003

命令说明:

  • redis-cli --cluster或者./redis-trib.rb:代表集群操作命令
  • create:代表是创建集群
  • --replicas 1或者--cluster-replicas 1 :指定集群中每个master的副本个数为1,此时节点总数 ÷ (replicas + 1) 得到的就是master的数量。因此节点列表中的前n个就是master,其它节点都是slave节点,随机分配到不同master

运行后的样子:

image-20210702181101969image-20210702181101969

这里输入yes,则集群开始创建:

image-20210702181215705image-20210702181215705

通过命令可以查看集群状态:

sh
redis-cli -p 7001 cluster nodes

image-20210702181922809image-20210702181922809

测试

尝试连接7001节点,存储一个数据:

sh
## 连接
redis-cli -p 7001
## 存储数据
set num 123
## 读取数据
get num
## 再次存储
set a 1

结果悲剧了:

image-20210702182343979image-20210702182343979

集群操作时,需要给redis-cli加上-c参数才可以:

sh
redis-cli -c -p 7001

这次可以了:

image-20210702182602145image-20210702182602145

散列插槽

Redis 会把每一个master节点映射到0~16383共16384个插槽(hash slot)上

数据key不是与节点绑定,而是与插槽绑定。(方便集群伸缩)

  • key中包含"{}",且“{}”中至少包含1个字符,“{}”中的部分是有效部分
  • key中不包含“{}”,整个key都是有效部分

连接集群任何一个节点即可,不同节点会重定向。

如何将同一类数据固定的保存在同一个 Redis 实例?

这一类数据使用相同的有效部分,例如key都以{typeId}为前缀

集群伸缩

1、添加节点

sh
redis-cli --cluster add-node 192.168.150.101:7004 192.168.150.101:7001

2、分配插槽

sh
redis-cli --cluster reshard 192.168.150.101:7001

故障转移

  1. 该实例与其它实例失去连接
  2. 疑似宕机
  3. 确定下线,自动提升一个slave为新的master

数据迁移

利用cluster failover命令可以手动让集群中的某个master宕机,切换到执行cluster failover命令的这个slave节点,实现无感知的数据迁移。

  • 缺省:默认的流程,如图1~6步
  • force:省略了对offset的一致性校验
  • takeover:直接执行第5步,忽略数据一致性、忽略master状态和其它master的意见

RedisTemplate 访问分片集群

  1. 引入redis的starter依赖
  2. 配置分片集群地址(指定分片集群的每一个节点信息)
  3. 配置读写分离

多级缓存

什么是多级缓存

传统缓存的问题

传统的缓存策略一般是请求到达Tomcat后,先查询Redis,如果未命中则查询数据库。

存在下面的问题:

  1. 请求要经过Tomcat处理,Tomcat的性能成为整个系统的瓶颈
  2. Redis缓存失效时,会对数据库产生冲击

多级缓存方案

用作缓存的Nginx是业务Nginx,需要部署为集群,再有专门的Nginx用来做反向代理。

JVM进程缓存 Caffeine

初识Caffeine

Caffeine是一个基于Java8开发的,提供了近乎最佳命中率的高性能的本地缓存库。

ben-manes/caffeine: A high performance caching library for Java (github.com)

实例代码

java
@Test
void testBasicOps() {
    // 创建缓存对象
    Cache<String, String> cache = Caffeine.newBuilder().build();

    // 存数据
    cache.put("gf", "迪丽热巴");

    // 取数据,不存在则返回null
    String gf = cache.getIfPresent("gf");
    System.out.println("gf = " + gf); 

    // 取数据,包含两个参数:
    // 参数一:缓存的key
    // 参数二:Lambda表达式,表达式参数就是缓存的key,方法体是查询数据库的逻辑
    // 优先根据key查询缓存,如果未命中,则执行参数二的Lambda表达式
    String defaultGF = cache.get("defaultGF", key -> {
        // 这里可以去数据库根据 key查询value
        return "柳岩";
    });
    System.out.println("defaultGF = " + defaultGF);
}

缓存驱逐策略

1、基于容量

java
// 创建缓存对象
Cache<String, String> cache = Caffeine.newBuilder()
    .maximumSize(1) // 设置缓存大小上限为 1 
    .build();

2、基于时间

java
// 创建缓存对象
Cache<String, String> cache = Caffeine.newBuilder()
    // 设置缓存有效期为 10 秒,从最后一次写入开始计时 
    .expireAfterWrite(Duration.ofSeconds(10)) 
    .build();

3、基于引用:利用GC来回收缓存数据(不推荐)

在默认情况下,当一个缓存元素过期的时候,Caffeine不会自动立即将其清理和驱逐。而是在一次读或写操作后,或者在空闲时间完成对失效数据的驱逐。

实现缓存

1、声明Bean

java
@Configuration
public class CaffeineConfig {

    @Bean
    public Cache<Long, Item> itemCache(){
        return Caffeine.newBuilder()
                .initialCapacity(100)
                .maximumSize(10_000)
                .build();
    }

    @Bean
    public Cache<Long, ItemStock> stockCache(){
        return Caffeine.newBuilder()
                .initialCapacity(100)
                .maximumSize(10_000)
                .build();
    }
}

2、使用Cache对象

java
@Autowired
private Cache<Long, Item> itemCache;
@Autowired
private Cache<Long, ItemStock> stockCache;

// ...

@GetMapping("/{id}")
public Item findById(@PathVariable("id") Long id) {
    return itemCache.get(id, key -> itemService.query()
                         .ne("status", 3).eq("id", key)
                         .one()
                        );
}

@GetMapping("/stock/{id}")
public ItemStock findStockById(@PathVariable("id") Long id) {
    return stockCache.get(id, key -> stockService.getById(key));
}

Lua语法入门

The Programming Language Lua

变量和循环

数据类型:

数据类型描述
nil这个最简单,只有值nil属于该类,表示一个无效值(在条件表达式中相当于false)。
boolean包含两个值:false和true
number表示双精度类型的实浮点数
string字符串由一对双引号或单引号来表示
function由 C 或 Lua 编写的函数
tableLua 中的表(table)其实是一个"关联数组"(associative arrays),数组的索引可以是数字、字符串或表类型。在 Lua 里,table 的创建是通过"构造表达式"来完成,最简单构造表达式是{},用来创建一个空表。

利用type函数测试给定变量或者值的类型

Lua声明变量的时候,并不需要指定数据类型

lua
-- 声明字符串
local str = 'hello'
-- 字符串拼接可以使用 ..
local str2 = 'hello' .. 'world'
-- 声明数字
local num = 21
-- 声明布尔类型
local flag = true
-- 声明数组 key为索引的 table
local arr = {'java', 'python', 'lua'}
-- 声明table,类似java的map
local map =  {name='Jack', age=21}

-- 访问数组,lua数组的角标从1开始
print(arr[1])
-- 访问table
print(map['name'])
print(map.name)

数组、table都可以利用for循环来遍历

lua
-- 声明数组 key为索引的 table
local arr = {'java', 'python', 'lua'}
-- 遍历数组
for index,value in ipairs(arr) do
    print(index, value) 
end


-- 声明map,也就是table
local map = {name='Jack', age=21}
-- 遍历table
for key,value in pairs(map) do
    print(key, value) 
end

条件控制、函数

定义函数的语法:

lua
function 函数名(argument1, argument2, ..., argumentn)
    -- 函数体
    return 返回值
end

if、else语法:

lua
if(布尔表达式)
    then
    --[ 布尔表达式为 true 时执行该语句块 --]
else
    --[ 布尔表达式为 false 时执行该语句块 --]
end

逻辑运算符:

操作符描述实例
and逻辑与操作符。若 A 为 false,则返回 A,否则返回 B。(A and B) 为 false。
or逻辑或操作符。若 A 为 true,则返回 A,否则返回 B。(A or B) 为 true。
not逻辑非操作符。与逻辑运算结果相反,如果条件为 true,逻辑非为 false。not(A and B) 为 true。

多级缓存

初识 OpenResty

OpenResty® - 开源官方站

基于 Nginx的高性能 Web 平台,用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。

  • 具备Nginx的完整功能
  • 基于Lua语言进行扩展,集成了大量精良的 Lua 库、第三方模块
  • 允许使用Lua自定义业务逻辑、自定义库

OpenResty 的安装

安装

首先你的Linux虚拟机必须联网

1)安装开发库

首先要安装OpenResty的依赖开发库,执行命令:

sh
yum install -y pcre-devel openssl-devel gcc --skip-broken

2)安装OpenResty仓库

你可以在你的 CentOS 系统中添加 openresty 仓库,这样就可以便于未来安装或更新我们的软件包(通过 yum check-update 命令)。运行下面的命令就可以添加我们的仓库:

yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo

如果提示说命令不存在,则运行:

yum install -y yum-utils

然后再重复上面的命令

3)安装OpenResty

然后就可以像下面这样安装软件包,比如 openresty

bash
yum install -y openresty

4)安装opm工具

opm是OpenResty的一个管理工具,可以帮助我们安装一个第三方的Lua模块。

如果你想安装命令行工具 opm,那么可以像下面这样安装 openresty-opm 包:

bash
yum install -y openresty-opm

5)目录结构

默认情况下,OpenResty安装的目录是:/usr/local/openresty

image-20200310225539214image-20200310225539214

看到里面的nginx目录了吗,OpenResty就是在Nginx基础上集成了一些Lua模块。

6)配置nginx的环境变量

打开配置文件:

sh
vi /etc/profile

在最下面加入两行:

sh
export NGINX_HOME=/usr/local/openresty/nginx
export PATH=${NGINX_HOME}/sbin:$PATH

NGINX_HOME:后面是OpenResty安装目录下的nginx的目录

然后让配置生效:

source /etc/profile
启动和运行

OpenResty底层是基于Nginx的,查看OpenResty目录的nginx目录,结构与windows中安装的nginx基本一致:

image-20210811100653291image-20210811100653291

所以运行方式与nginx基本一致:

sh
## 启动nginx
nginx
## 重新加载配置
nginx -s reload
## 停止
nginx -s stop

nginx的默认配置文件注释太多,影响后续我们的编辑,这里将nginx.conf中的注释部分删除,保留有效部分。

修改/usr/local/openresty/nginx/conf/nginx.conf文件,内容如下:

nginx
#user  nobody;
worker_processes  1;
error_log  logs/error.log;

events {
    worker_connections  1024;
}

http {
    include       mime.types;
    default_type  application/octet-stream;
    sendfile        on;
    keepalive_timeout  65;

    server {
        listen       8081;
        server_name  localhost;
        location / {
            root   html;
            index  index.html index.htm;
        }
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }
}

在Linux的控制台输入命令以启动nginx:

sh
nginx

然后访问页面:http://192.168.150.101:8081,注意ip地址替换为你自己的虚拟机IP:

OpenResty快速入门

1、在nginx.conf的http下面,添加对OpenResty的Lua模块的加载

nginx
#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块     
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";

2、在nginx.conf的server下面,添加对/api/item这个路径的监听

nginx
location /api/item {
    ## 响应类型,这里返回json
    default_type application/json;
    ## 响应数据由 lua/item.lua这个文件来决定
    content_by_lua_file lua/item.lua;
}

3、在nginx目录创建文件夹:lua,在lua文件夹下,新建文件:item.lua

lua
-- 返回假数据,这里的ngx.say()函数,就是写数据到Response中
ngx.say('{"id":10001,"data":"hello"}')

4、重新加载配置

sh
nginx -s reload

请求参数处理

路径占位符 /item/1001
nginx
# 1.正则表达式匹配: 
location ~ /item/(\d+) {
    content_by_lua_file lua/item.lua;
}
lua
-- 2. 匹配到的参数会存入ngx.var数组中,
-- 可以用角标获取
local id = ngx.var[1]
请求头 id: 1001
lua
-- 获取请求头,返回值是table类型
local headers = ngx.req.get_headers()
Get请求参数 ?id=1001
lua
-- 获取GET请求参数,返回值是table类型
local getParams = ngx.req.get_uri_args()
Post表单参数 id=1001
lua
-- 读取请求体
ngx.req.read_body()
-- 获取POST表单参数,返回值是table类型
local postParams = ngx.req.get_post_args()
JSON参数
lua
-- 读取请求体
ngx.req.read_body()
-- 获取body中的json参数,返回值是string类型
local jsonBody = ngx.req.get_body_data()

查询Tomcat

nginx内部发送Http请求

nginx提供了内部API用以发送http请求

lua
local resp = ngx.location.capture("/path",{
        method = ngx.HTTP_GET,   -- 请求方式
        args = {a=1,b=2},  -- get方式传参数
        body = "c=3&d=4" -- post方式传参数
    })

注意:这里的path是路径,并不包含IP和端口。这个请求会被nginx内部的server监听并处理。

但是我们希望这个请求发送到Tomcat服务器,所以还需要编写一个server来对这个路径做反向代理:

封装http查询的函数

把http查询的请求封装为一个函数,放到OpenResty函数库中

在/usr/local/openresty/lualib目录下创建common.lua文件

lua
-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
    local resp = ngx.location.capture(path,{
            method = ngx.HTTP_GET,
            args = params,
        })
    if not resp then
        -- 记录错误信息,返回404
        ngx.log(ngx.ERR, "http not found, path: ", path , ", args: ", args)
        ngx.exit(404)
    end
    return resp.body
end

-- 将方法导出
local _M = {  
    read_http = read_http
}  
return _M
使用Http函数查询数据

修改item.lua文件

lua
-- 引入自定义工具模块
local common = require("common")
local read_http = common.read_http

-- 获取路径参数
local id = ngx.var[1]

-- 根据id查询商品
local itemJSON = read_http("/item/".. id, nil)
-- 根据id查询商品库存
local itemStockJSON = read_http("/item/stock/".. id, nil)
JSON结果处理

OpenResty提供了一个cjson的模块用来处理JSON的序列化和反序列化。

openresty/lua-cjson: Lua CJSON is a fast JSON encoding/parsing module for Lua (github.com)

lua
-- 引入cjson模块
local cjson = require "cjson"

-- 序列化
local obj = {
    name = 'jack',
    age = 21
}
local json = cjson.encode(obj)

-- 反序列化
local json = '{"name": "jack", "age": 21}'
local obj = cjson.decode(json);
print(obj.name)
Tomcat集群的负载均衡
nginx
# 反向代理配置,将/item路径的请求代理到tomcat集群        
location /item {
    proxy_pass 	http://tomcat-cluster;
}

# tomcat集群配置
upstream tomcat-cluster{
    ## 对请求的 URI 进行哈希处理
    hash $request_uri;
    server 192.168.150.1:8081;
    server 192.168.150.1:8082;
}

Redis缓存预热

冷启动:服务刚刚启动时,Redis中并没有缓存,如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力。

缓存预热:在实际开发中,我们可以利用大数据统计用户访问的热点数据,在项目启动时将这些热点数据提前查询并保存到Redis中。

java
@Component
public class RedisHandler implements InitializingBean {
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Override
    public void afterPropertiesSet() throws Exception { 
        // 初始化缓存 ... 
    }
}

查询Redis缓存

OpenResty的Redis模块

OpenResty提供了操作Redis的模块

lua
-- 引入redis模块
local redis = require("resty.redis")
-- 初始化Redis对象
local red = redis:new()
-- 设置Redis超时时间
red:set_timeouts(100010001000)

封装函数,用来释放Redis连接,其实是放入连接池

lua
-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)  
    local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒  
    local pool_size = 100 --连接池大小  
    local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)  
    if not ok then  
        ngx.log(ngx.ERR"放入Redis连接池失败: ", err)  
    end  
end

封装函数,从Redis读数据并返回

lua
-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
    -- 获取一个连接
    local ok, err = red:connect(ip, port)
    if not ok then
        ngx.log(ngx.ERR, "连接redis失败 : ", err)
        return nil
    end
    -- 查询redis
    local resp, err = red:get(key)
    -- 查询失败处理
    if not resp then
        ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
    end
    --得到的数据为空处理
    if resp == ngx.null then
        resp = nil
        ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
    end
    close_redis(red)
    return resp
end

业务实现

lua
-- 封装函数,先查询redis,再查询http
local function read_data(key, path, params)
    -- 查询redis
    local resp = read_redis("127.0.0.1"6379, key)
    -- 判断redis是否命中
    if not resp then
        -- Redis查询失败,查询http
        resp = read_http(path, params)
    end
    return resp
end

Nginx本地缓存

开启共享词典,在nginx.conf的http下添加配置:

nginx
## 共享字典,也就是本地缓存,名称叫做:item_cache,大小150m
lua_shared_dict item_cache 150m;

操作共享字典

lua
-- 获取本地缓存对象
local item_cache = ngx.shared.item_cache
-- 存储, 指定key、value、过期时间,单位s,默认为0代表永不过期
item_cache:set('key''value'1000)
-- 读取
local val = item_cache:get('key')

修改后的查询逻辑

lua
-- 封装函数,先查询本地缓存,再查询redis,再查询http
local function read_data(key, expire, path, params)
    -- 读取本地缓存
    local val = item_cache:get(key)
    if not val then
        -- 缓存未命中,记录日志
        ngx.log(ngx.ERR"本地缓存查询失败, key: ", key , ", 尝试redis查询")
        -- 查询redis
        val = read_redis("127.0.0.1"6379, key)
        -- 判断redis是否命中
        if not val then
            ngx.log(ngx.ERR"Redis缓存查询失败, key: ", key , ", 尝试http查询")
            -- Redis查询失败,查询http
            val = read_http(path, params)
        end
    end
    -- 写入本地缓存
    item_cache:set(key, val, expire)
    return val
end
-- 根据id查询商品
local itemJSON = read_data('item:id:' .. id, 1800"/item/".. id, nil)
-- 根据id查询商品库存
local itemStockJSON = read_data('item:stock:id:' .. id, 60"/item/stock/".. id, nil)
mermaid
graph 
    A[开始] --> B{本地缓存命中?}
    B -->|是| C[返回缓存数据]
    B -->|否| D[记录日志: 本地缓存查询失败]
    D --> E[尝试redis查询]
    E --> F{Redis命中?}
    F -->|是| G[返回Redis数据]
    F -->|否| H[记录日志: Redis缓存查询失败]
    H --> I[尝试http查询]
    I --> J[返回http数据]
    J --> K[写入本地缓存]
    K --> L[返回数据]

    style A fill:#f9f,stroke:#333,stroke-width:2px
    style B fill:#f9f,stroke:#333,stroke-width:2px
    style C fill:#ccf,stroke:#f66,stroke-width:2px
    style D fill:#f9f,stroke:#333,stroke-width:2px
    style E fill:#f9f,stroke:#333,stroke-width:2px
    style F fill:#f9f,stroke:#333,stroke-width:2px
    style G fill:#ccf,stroke:#f66,stroke-width:2px
    style H fill:#f9f,stroke:#333,stroke-width:2px
    style I fill:#f9f,stroke:#333,stroke-width:2px
    style J fill:#ccf,stroke:#f66,stroke-width:2px
    style K fill:#ccf,stroke:#f66,stroke-width:2px
    style L fill:#ccf,stroke:#f66,stroke-width:2px

缓存同步策略

缓存同步策略

设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新

  • 优势:简单、方便
  • 缺点:时效性差,缓存过期之前可能不一致
  • 场景:更新频率较低,时效性要求低的业务

同步双写:在修改数据库的同时,直接修改缓存

  • 优势:时效性强,缓存与数据库强一致
  • 缺点:有代码侵入,耦合度高
  • 场景:对一致性、时效性要求较高的缓存数据

异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据

  • 优势:低耦合,可以同时通知多个缓存服务
  • 缺点:时效性一般,可能存在中间不一致状态
  • 场景:时效性要求一般,有多个服务需要同步

基于MQ的异步通知:

基于Canal的异步通知:

初始Canal

canal是阿里巴巴旗下的一款开源项目,基于Java开发。基于数据库增量日志解析,提供增量数据订阅和消费。Canal是基于mysql的主从同步来实现的。

image-20240508081104032

Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。

image-20240508081237132

安装和配置 Canal

下面我们就开启mysql的主从同步机制,让Canal来模拟salve

Canal是基于MySQL的主从同步功能,因此必须先开启MySQL的主从功能才可以。

这里以之前用Docker运行的mysql为例:

开启binlog

打开mysql容器挂载的日志文件,我的在/tmp/mysql/conf目录:

image-20210813153241537image-20210813153241537

修改文件:

sh
vi /tmp/mysql/conf/my.cnf

添加内容:

ini
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=heima

配置解读:

  • log-bin=/var/lib/mysql/mysql-bin:设置binary log文件的存放地址和文件名,叫做mysql-bin
  • binlog-do-db=heima:指定对哪个database记录binary log events,这里记录heima这个库

最终效果:

ini
[mysqld]
skip-name-resolve
character_set_server=utf8
datadir=/var/lib/mysql
server-id=1000
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=heima
设置用户权限

接下来添加一个仅用于数据同步的账户,出于安全考虑,这里仅提供对heima这个库的操作权限。

sql
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
FLUSH PRIVILEGES;

重启mysql容器即可

docker restart mysql

测试设置是否成功:在mysql控制台,或者Navicat中,输入命令:

show master status;

image-20200327094735948image-20200327094735948

创建网络

我们需要创建一个网络,将MySQL、Canal、MQ放到同一个Docker网络中:

sh
docker network create heima

让mysql加入这个网络:

sh
docker network connect heima mysql
安装Canal

通过命令导入canal的镜像压缩包:

docker load -i canal.tar

然后运行命令创建Canal容器:

sh
docker run -p 11111:11111 --name canal \
-e canal.destinations=heima \
-e canal.instance.master.address=mysql:3306  \
-e canal.instance.dbUsername=canal  \
-e canal.instance.dbPassword=canal  \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false  \
-e canal.instance.filter.regex=heima\\..* \
--network heima \
-d canal/canal-server:v1.1.5

说明:

  • -p 11111:11111:这是canal的默认监听端口
  • -e canal.instance.master.address=mysql:3306:数据库地址和端口,如果不知道mysql容器地址,可以通过docker inspect 容器id来查看
  • -e canal.instance.dbUsername=canal:数据库用户名
  • -e canal.instance.dbPassword=canal :数据库密码
  • -e canal.instance.filter.regex=:要监听的表名称

表名称监听支持的语法:

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2

监听Canal

Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。

使用第三方开源的canal-starter

NormanGyllenhaal/canal-client: spring boot canal starter 易用的canal 客户端 canal client (github.com)

xml
<!--canal-->
<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId> 
    <version>1.2.1-RELEASE</version>
</dependency>
yaml
canal:
    destination: heima ## canal实例名称,要跟canal-server运行时设置的destination一致
    server: 192.168.150.101:11111 ## canal地址
java
package com.heima.item.canal;

// 指定要监听的表
@CanalTable("tb_item")
@Component
// 指定表关联的实体类
public class ItemHandler implements EntryHandler<Item> {

    // 监听到数据库的增、改、删 的消息
    
    @Override
    public void insert(Item item) {
        // 新增数据到redis
    }
    @Override
    public void update(Item before, Item after) {
        // 更新redis数据
        // 更新本地缓存
    }
    @Override
    public void delete(Item item) {
        // 删除redis数据
        // 清理本地缓存
    }
}
java
@Data
@TableName("tb_item")
public class Item {
    @TableId(type = IdType.AUTO)
    // 标记表中的id字段
    @Id
    private Long id;
    // 标记表中与属性名不一致的字段
    @Column(name = "name")
    private String name;
    private Date updateTime;
    @TableField(exist = false)
    // 标记不属于表中的字段
    @Transient
    private Integer stock;
    @TableField(exist = false)
    // 标记不属于表中的字段
    @Transient
    private Integer sold;
}