企业项目管理、ORK、研发管理与敏捷开发工具平台

网站首页 > 精选文章 正文

分布式锁—3.Redisson的公平锁二(redisson分布式锁缺陷)

wudianyun 2025-03-19 03:15:51 精选文章 2 ℃

大纲

1.Redisson公平锁RedissonFairLock概述

2.公平锁源码之加锁和排队

3.公平锁源码之可重入加锁

4.公平锁源码之新旧版本对比

5.公平锁源码之队列重排

6.公平锁源码之释放锁

7.公平锁源码之按顺序依次加锁


4.公平锁源码之新旧版本对比

(1)新版本再次加锁失败不会刷新排队分数(等待超时的时间点timeout)

(2)旧版本再次加锁失败会刷新排队分数(等待超时的时间点timeout)


当客户端线程尝试加公平锁失败处于排队状态时,会进入while循环。在while循环中,每次都会等待一段时间,再重新进行尝试加公平锁。

public class RedissonLock extends RedissonBaseLock {
    ...
    //加锁
    @Override
    public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }
    
    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        //线程ID,用来生成设置Hash的值
        long threadId = Thread.currentThread().getId();
        //尝试加锁,此时执行RedissonLock.lock()方法默认传入的leaseTime=-1
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        //ttl为null说明加锁成功
        if (ttl == null) {
            return;
        }
      
        //加锁失败时的处理
        CompletableFuture future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

        try {
            while (true) {
                //再次尝试获取锁
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                //返回的ttl为null,获取到锁,就退出while循环
                if (ttl == null) {
                    break;
                }
                //返回的ttl不为null,则说明其他客户端或线程还持有锁
                //那么就利用同步组件Semaphore进行阻塞等待一段ttl的时间
                if (ttl >= 0) {
                    try {
                        commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        commandExecutor.getNow(future).getLatch().acquire();
                    } else {
                        commandExecutor.getNow(future).getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(commandExecutor.getNow(future), threadId);
        }
    }
    ...
}

假设第二个客户端线程第一次加锁是在10:00:00,然后在10:00:15该客户端线程再次发起请求尝试进行加锁,但第一个客户端线程在10:00:00~10:00:15之间一直持有这把锁,此时第二个客户端线程的再次加锁流程如下:


(1)新版本再次加锁失败不会刷新排队分数(等待超时的时间点timeout)

步骤一:进入while循环,移除等待超时的线程。执行命令"lindex redisson_lock_queue:{myLock} 0",获取队列排第一元素。此时获取到UUID2:ThreadID2,代表着第二个客户端线程正在队列里排队


继续执行命令"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2",从有序集合中获取UUID2:ThreadID2对应的分数,比如获取到的timeout = 10:05:20。根据当前时间是10:00:15,那么timeout <= 10:00:15的这个条件不成立,于是退出while循环。


步骤二:判断当前线程现在能否尝试获取锁,发现不能通过。因为执行命令"exists myLock"时,发现锁已经存在


步骤三:判断锁是否已经被当前线程持有。由于第二个客户端线程的UUID + 线程ID必然不等于第一个客户端线程,所以此时执行命令"hexists myLock UUID2:ThreadID2",发现不存在,所以此处的可重入锁的判断条件也不成立


步骤四:判断当前获取锁失败的线程是否已经在队列中排队。由于当前线程是第二次尝试获取锁,所以判断通过。然后返回第二个客户端线程等待获取锁时,还剩多少时间就超时,不会刷新排队分数

//Redisson的3.16.8版本
if (command == RedisCommands.EVAL_LONG) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
        //步骤一:remove stale threads,移除等待超时的线程
        "while true do " +
            //获取队列中的第一个元素
            //KEYS[2]是一个用来对线程排队的队列的名字
            "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
            "if firstThreadId2 == false then " +
                "break;" +
            "end;" +
            //获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间
            //KEYS[3]是一个用来对线程排序的有序集合的名字
            "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
            //如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除
            //ARGV[4]是当前时间
            "if timeout <= tonumber(ARGV[4]) then " +
                //从有序集合 + 队列中移除这个线程
                "redis.call('zrem', KEYS[3], firstThreadId2);" +
                "redis.call('lpop', KEYS[2]);" +
            "else " +
                "break;" +
            "end;" +
        "end;" +
 
        //步骤二:判断当前线程现在能否尝试获取锁,以下两种情况可以通过判断去进行尝试获取锁
        //情况一:锁不存在 + 队列也不存在;KEYS[1]是锁的名字;KEYS[2]是对线程排队的队列;
        //情况二:锁不存在 + 队列存在 + 队列的第一个元素就是当前线程;ARGV[2]是当前线程的UUID + ThreadID;
        "if (redis.call('exists', KEYS[1]) == 0) " +
            "and ((redis.call('exists', KEYS[2]) == 0) " +
                "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
            //步骤三:当前线程执行获取锁的操作
            //弹出队列的第一个元素 + 从有序集合中删除UUID:ThreadID对应的元素
            "redis.call('lpop', KEYS[2]);" +
            "redis.call('zrem', KEYS[3], ARGV[2]);" +
      
            //递减有序集合中每个线程的分数,也就是递减每个线程获取锁时的已经等待时间
            //zrange返回有序集合KEYS[3]中指定区间内(0,-1)的成员,也就是全部成员
            "local keys = redis.call('zrange', KEYS[3], 0, -1);" +
            "for i = 1, #keys, 1 do " +
                //对有序集合KEYS[3]的成员keys[i]的score减去:tonumber(ARGV[3])
                //ARGV[3]就是线程获取锁时可以等待的时间,默认是5分钟
                "redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
            "end;" +

            //hset设置Hash值进行加锁操作 + pexpire设置锁key的过期时间 + 最后返回nil表示加锁成功
            "redis.call('hset', KEYS[1], ARGV[2], 1);" +
            "redis.call('pexpire', KEYS[1], ARGV[1]);" +
            "return nil;" +
        "end;" +

        //步骤四:判断锁是否已经被当前线程持有(可重入锁),KEYS[1]是锁的名字,ARGV[2]是当前线程的UUID+ThreadID;
        "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
            "redis.call('hincrby', KEYS[1], ARGV[2],1);" +
            "redis.call('pexpire', KEYS[1], ARGV[1]);" +
            "return nil;" +
        "end;" +

        //步骤五:判断当前获取锁失败的线程是否已经在队列中排队
        //KEYS[3]是对线程排序的有序集合,ARGV[2]是当前线程的UUID + ThreadID;
        "local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
        "if timeout ~= false then " +
            //如果当前获取锁失败的线程已经在队列中排队
            //那么就返回该线程等待获取锁时,还剩多少时间就超时了,外部代码拿到这个时间会阻塞等待这个时间
            //ARGV[3]是当前线程获取锁时可以等待的时间,ARGV[4]是当前时间
            "return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
        "end;" +

        //步骤六:对获取锁失败的线程进行排队处理
        "local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
        "local ttl;" +
        //如果在队列中排队的最后一个元素不是当前线程
        "if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
            //lastThreadId是在队列中排最后的线程,ARGV[2]是当前线程的UUID + 线程ID,ARGV[4]是当前时间
            //因为拥有最大过期时间的线程在队列中是排最后的
            //所以可通过队列中的最后一个元素的过期时间,计算当前线程的过期时间
            //从而保证新加入队列和有序集合的线程的过期时间是最大的
            //下面这一行会计算出:还有多少时间,当前队列中排最后的线程就会过期,外部代码拿到这个时间会阻塞等待这个时间 
            //这样后一个加入队列的线程,会阻塞等待前一个加入队列的线程的过期时间
            "ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
        "else " +
            //下面这一行会计算出:还有多少时间,锁就会过期,外部代码拿到这个时间会阻塞等待这个时间
            "ttl = redis.call('pttl', KEYS[1]);" +
        "end;" +
        //计算当前线程在排队等待锁时的过期时间
        "local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
        //把当前线程作为一个元素插入有序集合,并设置元素分数为该线程在排队等待锁时的过期时间
        //然后再把当前线程作为一个元素插入队列尾部
        "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
            "redis.call('rpush', KEYS[2], ARGV[2]);" +
        "end;" +
        "return ttl;",
        Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),
        unit.toMillis(leaseTime),
        getLockName(threadId),
        wait,//默认是5分钟
        currentTime
    );
}

(2)旧版本再次加锁失败会刷新排队分数(等待超时的时间点timeout)

旧版本公平锁的lua脚本如下所示,当第二个客户端线程再次加锁时会再次进入排队逻辑。


首先会出计算队列中的第一个元素还有多少时间就超时,即ttl。然后根据ttl + 传入的等待时间,计算当前线程等待锁的超时时间timeout


接着执行命令"zadd redisson_lock_timeout:{myLock} timeout UUID2:ThreadID2",刷新有序集合中的同名元素的分数为timeout。客户端线程每次重复尝试加锁,都会将其对应的过期时间往后延长,也就是刷新了排队的分数


zadd命令在添加存在的元素时,会返回0,但会更新该元素的分数。

//Redisson的3.8.1版本
if (command == RedisCommands.EVAL_LONG) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
        //步骤一:移除等待超时的线程
        "while true do " +
            //获取队列中的第一个元素
            //KEYS[2]是一个用来对线程排队的队列的名字
            "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
            "if firstThreadId2 == false then " +
                "break;" +
            "end; " +
            //获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间
            //KEYS[3]是一个用来对线程排序的有序集合的名字
            "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
            //如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除
            //ARGV[4]是当前时间
            "if timeout <= tonumber(ARGV[4]) then " +
                //从有序集合 + 队列中移除这个线程
                "redis.call('zrem', KEYS[3], firstThreadId2); " +
                "redis.call('lpop', KEYS[2]); " +
            "else " +
                "break;" +
            "end; " +
        "end;" +
 
        //步骤二:判断当前线程现在能否尝试获取锁,以下两种情况可以通过判断
        //情况一:锁不存在 + 队列也不存在;KEYS[1]是锁的名字;KEYS[2]是对线程排队的队列;
        //情况二:锁不存在 + 队列存在 + 队列的第一个元素就是当前线程;ARGV[2]是当前线程的UUID+ThreadID;
        "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) " +
            "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
            //步骤三:当前线程执行获取锁的操作
            //弹出队列的第一个元素 + 从有序集合中删除UUID:ThreadID对应的元素
            "redis.call('lpop', KEYS[2]); " +
            "redis.call('zrem', KEYS[3], ARGV[2]); " +
            //hset设置Hash值进行加锁操作 + pexpire设置锁key的过期时间 + 最后返回nil表示加锁成功
            "redis.call('hset', KEYS[1], ARGV[2], 1); " +
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return nil; " +
        "end; " +
     
        //步骤四:判断锁是否已经被当前线程持有,KEYS[1]是锁的名字,ARGV[2]是当前线程的UUID+ThreadID;
        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return nil; " +
        "end; " +
     
        //步骤五:对获取锁失败的线程进行排队处理
        "local firstThreadId = redis.call('lindex', KEYS[2], 0); " +
        "local ttl; " +
        //如果在队列中排队的第一个元素不是当前线程
        "if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " +
            //计算队列中第一个元素还有多少时间就超时了
            "ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" +
        "else " +
            "ttl = redis.call('pttl', KEYS[1]);" +
        "end; " +
        //计算当前线程等待锁的超时时间
        "local timeout = ttl + tonumber(ARGV[3]);" +
        //把当前线程作为一个元素插入有序集合,并设置元素分数为该线程在排队等待锁时的过期时间
        //然后再把当前线程作为一个元素插入队列尾部
        "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
            "redis.call('rpush', KEYS[2], ARGV[2]);" +
        "end; " +
        "return ttl;",
        Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),//KEYS[1]、KEYS[2]、KEYS[3]
        internalLockLeaseTime,//ARGV[1]
        getLockName(threadId),//ARGV[2]
        currentTime + threadWaitTime,//ARGV[3] = 当前时间 + 5秒
        currentTime//ARGV[4]
    );
}

注意:如果仅仅使用有序集合是不行的,因为有序集合的分数在lua脚本执行过程中也会发生变化。旧版本中,客户端线程每次尝试加锁,有序集合中的分数会更新。新版本中,当前线程可以尝试获取锁时,也会遍历更新有序集合中的分数。


此外,有序集合获取第一个元素的时间复杂度比队列要高。如果仅仅使用队列也是不行的,因为需要管理排队线程的等待超时时间。如果没有有序集合,那么就不能移除在队列中排队已超时的线程。当然,为了管理线程的等待超时时间,将有序集合换成两层Hash值也可以。


5.公平锁源码之队列重排

(1)新版本在5分钟后尝试再次加锁才会队列重排

(2)旧版本在5秒后尝试再次加锁就会队列重排

(3)导致队列重排的是lua脚本的步骤一(移除等待超时的线程)


(1)新版本在5分钟后尝试再次加锁才会队列重排

新版本的公平锁中,获取锁失败的线程默认会进入队列最多等待5分钟。


在这5分钟内,该线程不管再次加锁多少次,都不会刷新队列排序和分数。


在这5分钟内,该线程没有进行再次加锁尝试,就会被移出队列和有序集合。所以5分钟后,该线程才尝试再次加锁,那么会重新入队,导致队列重排


(2)旧版本在5秒后尝试再次加锁就会队列重排

旧版本的公平锁中,获取锁失败的线程默认会进入队列最多等待5秒钟。


在这5秒钟内,该线程只要重新尝试进行加锁,那么就会延长其最多等待时间,也就是刷新有序集合中的排队分数。


在这5秒钟内,该线程没有进行再次加锁尝试,就会被移出队列和有序集合。所以5秒钟后,该线程才尝试再次加锁,那么会重新入队,导致队列重排


(3)导致队列重排的是lua脚本的步骤一(移除等待超时的线程)

也就是公平锁lua脚本中while循环的作用


当客户端线程使用RedissonLock的tryAcquire()方法尝试获取公平锁,并且指定了一个获取锁的超时时间时。比如指定客户端线程在队列里排队超过了20秒,就不再尝试获取锁了。如果获取锁的超时时间没有指定,新版本是默认5分钟超时,旧版本是默认5秒后超时。


此时由于这些等待获取锁已超时线程元素还存在队列有序集合里,所以可以通过while循环的逻辑来清除这些不再尝试获取锁的客户端线程


在新版本,随着时间推移,这些等待获取锁超时的线程就会被移出队列。在旧版本,随着时间推移,这些等待获取锁超时的线程只要不再尝试加锁,那么其等待获取锁的超时时间就不会更新被不断延长,就会被移除队列


如果客户端宕机了,那么客户端就不会重新尝试获取锁。在新版本中,随着时间推移,宕机的客户端线程就会被移出队列。在旧版本中,就不会刷新和延长有序集合中的超时时间分数,这样while循环的逻辑就会将这些宕机的客户端线程从队列中移出。


新版本中,最多5分钟后,宕机的客户端线程会被移出队列。在旧版本中,最多5秒钟后,宕机的客户端线程就会被移出队列


因为网络延迟等原因,可能会导致客户端线程等待锁时间过长,从而触发各个客户端线程的排队顺序的重排序。有的客户端如果在队列里等待时间过长,可能就会触发一次队列的重排序。新版本触发重排序的频率是每5分钟旧版本触发重排序的频率是每5秒

//步骤一:移除等待超时的线程
"while true do " +
    //获取队列中的第一个元素
    //KEYS[2]是一个用来对线程排队的队列的名字
    "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
    "if firstThreadId2 == false then " +
        "break;" +
    "end; " +
    //获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间
    //KEYS[3]是一个用来对线程排序的有序集合的名字
    "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
    //如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除
    //ARGV[4]是当前时间
    "if timeout <= tonumber(ARGV[4]) then " +
        //从有序集合 + 队列中移除这个线程
        "redis.call('zrem', KEYS[3], firstThreadId2); " +
        "redis.call('lpop', KEYS[2]); " +
    "else " +
        "break;" +
    "end; " +
"end;" +


6.公平锁源码之释放锁

(1)释放公平锁的流程

(2)释放公平锁的lua脚本分析


(1)释放公平锁的流程

释放公平锁首先调用的还是RedissonLock的unlock()方法


在RedissonLock的unlock()方法中,会调用get(unlockAsync())。也就是首先调用RedissonBaseLock的unlockAsync()方法,然后调用RedissonObject的get()方法


其中个RedissonBaseLock的unlockAsync()方法是异步化执行的方法,释放锁的操作是异步执行的。而RedisObject的get()方法会通过RFuture同步等待获取异步执行的结果。所以,可以将get(unlockAsync())理解为异步转同步


RedissonBaseLock的unlockAsync()方法中,就会调用公平锁RedissonFairLock的unlockInnerAsync()方法进行释放锁。然后当完成释放锁的处理后,会通过异步去取消定时调度任务

public class Application {
    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");
        //创建RedissonClient实例
        RedissonClient redisson = Redisson.create(config);
        //获取公平的可重入锁
        RLock fairLock = redisson.getFairLock("myLock");
        fairLock.lock();
        fairLock.unlock();
        ...
    }
}

public class RedissonLock extends RedissonBaseLock {
    ...
    @Override
    public void unlock() {
        ...
        //异步转同步
        //首先调用的是RedissonBaseLock的unlockAsync()方法
        //然后调用的是RedissonObject的get()方法
        get(unlockAsync(Thread.currentThread().getId()));
        ...
    }
    ...
}

public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
    ...
    @Override
    public RFuture unlockAsync(long threadId) {
        //异步执行释放锁的lua脚本
        RFuture future = unlockInnerAsync(threadId);
        CompletionStage f = future.handle((opStatus, e) -> {
            //取消定时调度任务
            cancelExpirationRenewal(threadId);
            if (e != null) {
                throw new CompletionException(e);
            }
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId);
                throw new CompletionException(cause);
            }
            return null;
        });
        return new CompletableFutureWrapper<>(f);
    }
    protected abstract RFuture unlockInnerAsync(long threadId);
    ...
}

public class RedissonFairLock extends RedissonLock implements RLock {
    private final long threadWaitTime;
    private final CommandAsyncExecutor commandExecutor;
    private final String threadsQueueName;
    private final String timeoutSetName;
    
    public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
        this(commandExecutor, name, 60000*5);
    }
    
    public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        this.threadWaitTime = threadWaitTime;
        threadsQueueName = prefixName("redisson_lock_queue", name);
        timeoutSetName = prefixName("redisson_lock_timeout", name);
    }
    
    @Override
    protected RFuture unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            //步骤一:移除等待超时的线程
            "while true do " +
                //获取队列中的第一个元素
                //KEYS[2]是一个用来对线程排队的队列的名字
                "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
                "if firstThreadId2 == false then " +
                    "break;" +
                "end; " +
                //获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间
                //KEYS[3]是一个用来对线程排序的有序集合的名字
                "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
                //如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除
                //ARGV[4]是当前时间
                "if timeout <= tonumberargv4 then redis.callzrem keys3 firstthreadid2 redis.calllpop keys2 else break end end keyhash if redis.callexists keys1='= 0)' then local nextthreadid='redis.call('lindex',' keys2 0 if nextthreadid then argv1 redis.callpublish keys4 .. : .. nextthreadid argv1 end return 1 end keyuuididhash if redis.callhexists keys1 argv3='= 0)' then return nil end keyuuididhash1 local counter='redis.call('hincrby',' keys1 argv3 -1 if counter> 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "end; " +
                
            "redis.call('del', KEYS[1]); " +
            "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + 
            "if nextThreadId ~= false then " +
                //发布一个事件给在队列中排第一的线程
                "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
            "end; " +
            "return 1; ",
            Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()),
            LockPubSub.UNLOCK_MESSAGE,//ARGV[1]
            internalLockLeaseTime, 
            getLockName(threadId), 
            System.currentTimeMillis()
        );
    }
    ...
}

(2)释放公平锁的lua脚本分析

步骤一:移除等待超时的线程

首先也会进入while循环,移除等待超时的线程。即获取队列中排第一的线程,判断该线程的过期时间是否已小于当前时间。如果小于当前时间,那么就说明该线程在队列中的排队已经过期,于是便将该线程从有序集合 + 队列中移除。后续如果该线程再次尝试加锁,那么会重新排序 + 重新入队


步骤二:判断锁是否还存在

如果key为锁名的Hash值已不存在,那么先获取队列中排第一的线程,然后发布一个事件给该线程对应的客户端让其获取锁


如果key为锁名的Hash值还存在,那么判断field为UUID + 线程ID的映射是否存在。如果field为UUID + 线程ID的映射不存在,那么表示锁已经被释放了,直接返回nil。如果field为UUID + 线程ID的映射存在,那么在key为锁名的Hash值中,对field为UUID + 线程ID的value值递减1。也就是调用Redis的hincrby命令,进行递减1处理。


步骤三:对递减1后的结果进行如下判断处理

如果递减1后的结果大于0,表示线程还在持有锁。对应于持有锁的线程多次重入锁,此时需要重置锁的过期时间


如果递减1后的结果小于0,表示线程不再持有锁,则删除锁对应的key,并且发布一个事件给在队列中排第一的线程所对应的客户端。


7.公平锁源码之按顺序依次加锁

(1)锁被释放后,排第二的客户端线程先来加锁

(2)锁被释放后,排第一的客户端线程再来加锁


假设客户端A先持有锁,而客户端B在队列里面是排在客户端C的后面。那么如果客户端A释放了锁后,客户端B和C是如何按顺序加锁的。


(1)锁被释放后,排第二的客户端线程先来加锁

锁被客户端A释放掉,锁key被删除之后,客户端B先来进行尝试加锁。此时客户端B执行的lua脚本步骤二的逻辑:

//check if the lock can be acquired now
//步骤二:判断当前线程现在能否尝试获取锁,以下两种情况可以通过判断去进行尝试获取锁
//情况一:锁不存在 + 队列也不存在;KEYS[1]是锁的名字;
//情况二:锁不存在 + 队列存在 + 队列的第一个元素就是当前线程;ARGV[2]是当前线程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +
    "and ((redis.call('exists', KEYS[2]) == 0) " +
    "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
    ...
"end;"

首先,执行判断"exists myLock = 0",由于当前锁存在,所以条件不成立。


然后,执行判断"exists redisson_lock_queue:{myLock} = 0",由于队列存在,所以条件不成立。


接着,执行判断"lindex redisson_lock_queue:{myLock} 0 == UUID2:ThreadID2",由于队列存在,但是在队列中排第一的不是客户端B而是客户端C,所以条件不成立,客户端B无法加锁。


由此可见:即使锁释放掉后,多个客户端来尝试加锁也只认队列中排第一的客户端。从而实现按队列的顺序依次获取锁,保证了公平性。


(2)锁被释放后,排第一的客户端线程再来加锁

当在队列中排第一的客户端C此时过来尝试加锁时,就会执行如下步骤三的尝试加锁逻辑:

//check if the lock can be acquired now
//步骤二:判断当前线程现在能否尝试获取锁,以下两种情况可以通过判断去进行尝试获取锁
//情况一:锁不存在 + 队列也不存在;KEYS[1]是锁的名字;KEYS[2]是对线程排队的队列;
//情况二:锁不存在 + 队列存在 + 队列的第一个元素就是当前线程;ARGV[2]是当前线程的UUID+ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +
    "and ((redis.call('exists', KEYS[2]) == 0) " +
        "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
    //步骤三:当前线程执行获取锁的操作
    //remove this thread from the queue and timeout set
    //弹出队列的第一个元素 + 从有序集合中删除UUID:ThreadID对应的元素
    "redis.call('lpop', KEYS[2]);" +
    "redis.call('zrem', KEYS[3], ARGV[2]);" +

    //decrease timeouts for all waiting in the queue
    //递减有序集合中每个线程的分数,也就是递减每个线程获取锁时的已经等待时间
    //zrange返回有序集合KEYS[3]中指定区间内(0,-1)的成员,也就是全部成员
    "local keys = redis.call('zrange', KEYS[3], 0, -1);" +
    "for i = 1, #keys, 1 do " +
        //对有序集合KEYS[3]的成员keys[i]的score减去:tonumber(ARGV[3])
        //ARGV[3]就是线程获取锁时可以等待的时间,默认是5分钟
        "redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
    "end;" +

    //acquire the lock and set the TTL for the lease
    //hset设置Hash值进行加锁操作 + pexpire设置锁key的过期时间 + 最后返回nil表示加锁成功
    "redis.call('hset', KEYS[1], ARGV[2], 1);" +
    "redis.call('pexpire', KEYS[1], ARGV[1]);" +
    "return nil;" +
"end;"

首先,执行命令"lpop redisson_lock_queue:{myLock}",将队列中的第一个元素弹出来


然后,执行命令"zrem redisson_lock_timeout:{myLock} UUID3:ThreadID3",将有序集合中客户端C的线程对应的元素给删除掉


接着,执行"hset myLock UUID3:ThreadID3 1"进行加锁设置field为UUID + 线程ID的value值为1


最后,执行命令"pexpire myLock 30000",设置key为锁名的Hash值的过期时间为30000毫秒。


客户端C完成加锁后,客户端C就会从队列中出队,此时排在队头的就是客户端B。

Tags:

最近发表
标签列表