分布式锁的关键在于对单一资源的竞争。获得资源的实例将继续执行,其余实例要么退出(互斥锁),要么等待(阻塞锁)。
实现分布式锁的方案有很多,既可以直接使用MySQL作为分布式锁(例如xxl-job),也可以利用ZooKeeperRedis等。
在基于Spring Cloud的业务系统中,一般都会引入Redis作为分布式缓存中间件,因此更多的人会选择使用Redis来实现分布式锁。本文将介绍使用Redis作为分布式锁时常见的问题和解决方法。

1. 没有使用原子操作指令

错误写法

    Boolean tryLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockValue);
    stringRedisTemplate.expire(lockKey, Duration.ofSeconds(expireTime));
    if (!tryLock) {
        return;
    }

上述操作通常出现在新手阶段,在写入锁对象时,没有考虑到原子性问题。在Redis中有提供SET NX PX指令,支持在设置锁的同时指定过期时间,并且支持原子性判断key是否已存在。

NX 和 PX 是 Redis 命令中用于设置 key 的两个选项。

  • NX: 当指定 NX 选项时,只有在 key 不存在的情况下才会设置 key 的值。如果 key 已经存在,则不进行任何操作。
  • PX: PX 选项用于设置 key 的过期时间(以毫秒为单位)。例如,PX 10000 表示在 10 秒后将 key 设置为过期状态。

正确写法:

Boolean tryLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS);

2. 释放了别人的锁

错误写法

try {
    Boolean tryLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS);
    if (!tryLock) {
        return;
    }

    // do something
    } finally {
    stringRedisTemplate.delete(lockKey);
    }

在加锁的过程中,没有设定唯一值作为Value存储到Redis中,在释放时,不判断直接对锁进行释放。其二,将获取锁的代码放在了try代码块中。

在上述代码中存在两个问题:

  1. 不该执行到finlly代码块:A请求获得了锁正在执行业务代码,而B请求没有获得锁,但是因为获取锁的代码在try代码块中,导致finally一定会执行,B请求就会将A请求的锁释放,而如果A请求依旧未执行完毕,此时C请求过来时,则C请求错误的拿到了锁
  2. 不该删除别人的锁:在删除锁时,应该判断自己是否是上锁人,由于多次执行Redis指令不具备原子性,所以一般是交由LUA脚本来实现的。
if redis.call('get', KEYS[1]) == ARGV[1]
    then
        return redis.call('del', KEYS[1])
    else
        return 0
end

正确写法

  1. 提前将LUA脚本载入到Redis服务端
script = new DefaultRedisScript<>();
script.setResultType(Long.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("release_lock.lua")));
  1. 获取和释放锁示例
Boolean tryLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS);
if (!tryLock) {
    return;
}
try {

    // do something
} finally {
    ArrayList<String> keys = new ArrayList<>();
    keys.add(context.getLockKey());
    stringRedisTemplate.execute(this.script, keys, context.getLockValue());
}

3. 事务未提交锁就释放了

错误代码

/**
    * 事务内获取分布式锁
    */
@Transactional(rollbackFor = Exception.class)
public void saveUserWithDistributedLock(String name) {

    String lockKey = "lock_key:" + name;
    RedisLock.LockContext lockContext = redisLock.tryLock(lockKey, 10000L);
    if (!lockContext.getTryLock()) {
        // printLog("没拿到锁");
        return;
    }

    printLog("拿到锁了" + lockKey);

    try {
        this.save(name);
    } finally {
        redisLock.release(lockContext);
        printLog("释放锁了");
    }
}

MySQL常规情况下是RR的隔离级别,只有等到事务提交数据才对其他事务可见,存在**“读视图”,在上述的代码中,A请求拿到了锁执行了业务代码,执行到redisLock.release时将锁释放了,但Spring的@Transactional依赖的是AOP,其需要等到方法执行完毕才会提交事务,在这个临界点,B请求可以正常拿到锁,但是A请求的事务还未提交,B请求的读视图**中还未查询到A请求提交的数据,最终造成了数据的不一致性。

正确代码

正确的情况是在另一个方法中获取到锁之后,再调用包含事务的业务代码。此时需要注意SpringAOP在本方法内代理失效的问题,通常需要新建一个Service来处理。

业务代码执行超过锁过期时间

错误代码

// Domain-Service
public void save(String name) {

    String lockKey = "lock_key:" + name;
    RedisLock.LockContext lockContext = redisLock.tryLock(lockKey, 10000L);
    if (!lockContext.getTryLock()) {
        printLog("没拿到锁");
        return;
    }

    printLog("拿到锁了" + lockKey);
    try {
        userService.save(name);
    } finally {
        redisLock.release(lockContext);
        printLog("释放锁了");
    }
}

// UserService
@Transactional(rollbackFor = Exception.class)
public void save(String name) {

    List<User> users = userRepository.findUsersByName(name);
    if (CollUtil.isNotEmpty(users)) {
        printLog("已经写入, 不再写入" + users);
        return;
    }

    // 业务保存模拟执行很慢
    TimeUnit.SECONDS.sleep(70);
    
}

上述代码中,锁对象只有10s的时间,但是业务代码执行却需要70s,A请求虽然拿到了锁,此时后续10秒其他请求均无法获取锁,但是从第11秒开始的请求将可以拿到锁,而此时A请求还未执行完毕,此时开始出现错误的获取锁,最终造成数据的不一致。

正确写法

参考RedissonWatchDog机制,另外开辟线程每隔 10s 就给还未执行完毕的 Key 自动续期 30s,保证业务代码能够安全的执行完毕再自行释放锁对象。

示例代码:

// watch dog
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> {
    if (!LOCK_CONTEXTS.isEmpty()) {
        for (LockContext lockContext : LOCK_CONTEXTS) {
            // 如果执行线程还未释放锁, 续期30s(模拟Redisson)
            stringRedisTemplate.expire(lockContext.getLockKey(), Duration.ofSeconds(30));
            Long expire = stringRedisTemplate.getExpire(lockContext.getLockKey());
            log.info("WatchDog, expire 30s, lockKey={}, ttl={}", lockContext.getLockKey(), expire);
        }
    }
        }, 0,
        // 10秒检测一次
        10, TimeUnit.SECONDS);

后记

分布式锁的错误还有很多,本篇主要是自己在工作过程中遇到的一些坑,着重介绍新手阶段在编写分布式锁时遇到的比较基础的问题,后面有空再进行其他场景的逐个介绍。

本文参考:聊聊redis分布式锁的8大坑
本文代码:redis-lua-distributed-lock