您的当前位置:首页正文

Redis之分布式锁

2024-11-30 来源:个人技术集锦

1. 什么是分布式锁

我们直接在并发编程阶段学习过synchronized lock这些都是单机锁。什么是锁呢? 就是我们有一些互斥资源,不能并行执行,需要一个东西来保证是串行执行的   

单机锁的弊端

 如图,有3个独立的订单服务,4个用户并发从客端访问,用户12访问订单服务1,由于单机锁,所以可以有一个用户能够进行支付操作,用户3 用户4分别进入订单服务2 订单服务3 也能够进行支付操作,因此我们想让4个用户在一个时间内只有一个用户能够进行支付操作,单机锁就无法做到

所以就引入了分布式锁

分布式锁是把锁放在第三方,而不是在单机服务中,用户1-4 获取锁都到第三方服务中去获取,这样就可以保证分布式服务下的支付操作时是串行执行的了 

2. 自己设计一个锁,如何实现

需求: 

a.只能有一个线程能同时执行互斥的资源

b.其它的线程执行的时候,有线程在执行的话,要么等待,要么报错

需要实现这个需求的3个重要条件

3. Redis中怎么做分布式锁

a. 标记 redis是k-v结构 key作为一个标记,这个key存在的话说明有了标记,key不存在,说明没有标记

b.标记的可见性  单线程命令执行,  必须要set完 你才能get

c. 保证原子性  setnx 单线执行 

setnx其实等价于两个命令(exists 和 set),假如我不想setnx,那么流程就等价于:

if(exists(order:111)==0) //没锁
{
    set(order:111,1)
}

 但是如果这样写,就会发现一个问题,这两个指令不是原子性的,就会导致同时进入判断中,得到没锁的场景

 因此我们应该思考的是怎么保证多个指令的原子性?

3.1 Redis事务

redis的开启提交回滚命令分别为multi、exec、discard

但是事务的特点是

1.命令是原子的 在执行事务中的指令时,服务器阻塞,不能执行其他指令
2.不能拿到事务中间的指令的结果 来决定后面指令的逻辑。

因此,不能够通过判断而决定后面的逻辑,事务不能够满足我们上述的需求,因而采用Lua脚本 

3.2 Lua脚本

lua语言是一个轻量级的脚本语言,能够完美的融入redis中。

redis调用lua脚本 采用eval指令

预填充键值 如果传入的键是1 返回第一个参数 否则返回第二个参数

127.0 . 0.1 : 6379 > eval "if KEYS[1]=='1' then return ARGV[1]
end return ARGV[2] " 1 1 'HUIHUI' 'XIAOHUIHUI'
"HUIHUI"
127.0 . 0.1 : 6379 > eval "if KEYS[1]=='1' then return ARGV[1]
end return ARGV[2] " 1 2 'HUIHUI' 'XIAOHUIHUI'
"XIAOHUIHUI"

 既然可以传入键值对,我们能不能在lua脚本里面再去执行redis的指令,并且可以根据返回的结果继续执行后续的逻辑。

判断一个key是不是存在,如果不存在,则调用set的redis指令,在lua脚本调用redis指令,用redis.call

127.0 . 0.1 : 6379 > eval "local
key=redis.call('exists',KEYS[1]) if key==0 then return
redis.call('set',KEYS[1],ARGV[1]) end return 1" 1 name
HUIHUI
OK
127.0 . 0.1 : 6379 > get name
"HUIHUI"

Lua脚本中的指令是原子的,在执行lua脚本期间,其它指令阻塞,必须等待lua脚本的指令执行完毕。 所以单个lua脚本的指令不宜有太多指令。

lua脚本的使用场景

1.需要原子性地执行多个命令
2.需要中间值来组合后面的命令
3.需要中间值来编排后面的命令

3.3 Demo

我们使用lua脚本来保证多个指令的原子性,实现我们的分布式锁。

private static String script = "" +
    "local lockSet = redis.call('exists', KEYS[1])\n"
    + "if lockSet == 0 then\n" +
    "redis.call('set', KEYS[1], ARGV[2])\n" +
    //设置过期时间,防止死锁
    "redis.call('expire', KEYS[1], ARGV[2])\n" +
    "end\n" +
    "return lockSet\n";

 加锁方法

public String pay(Long orderId) {
    try {
        Long lock = (Long)
        redisTemplate.execute(RedisScript.of(script, Long.class),
        Arrays.asList("pay:" + orderId), "1", 30);
        if (lock == 0) {
            //模拟支付业务代码执行10s
            Thread.sleep(10000);
            //处理完业务逻辑删除锁 异常了
            redisTemplate.delete("lock");
        }
        return lock == 0 ? "正常支付完毕" : "请稍等,已经有人在支付!!";
    } catch (Exception exception) {
        redisTemplate.delete("lock" + orderId);
        return "系统异常";
    }
}

对外接口

@RequestMapping(value = "/pay",method = RequestMethod.GET)
@ResponseBody
public String pay(@RequestParam("id") Long id) throws InterruptedException {
    return productService.pay(id);
}

 思考:

  • 做重入锁怎么做? 同一个线程能加锁加多次 (redis中的hash)
    • 互斥的key  (大key)
    • 知道线程信息  (field)
    • 保存重入次数 value  hincrby 线程安全的
  • 假如锁30s会过期,但是业务没有执行完,锁就会失效

4. Redission锁原理源码分析

4.1 Redission使用

a. 导入redission客户端包

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.5.4</version>
</dependency>

b.初始化RedissionClient,将RedissionClient交给Spring容器管理

@Bean
RedissonClientgetRedissonClient(){
    Configconfig=newConfig();
    config.useClusterServers()    //cluster方式至少6个节点(3主3从,3主做sharding,3从用来保证主宕机后可以高可用)
        .addNodeAddress("redis://192.168.8.127:6381")
        .addNodeAddress("redis://192.168.8.128:6381")
        .addNodeAddress("redis://192.168.8.129:6381")
        .addNodeAddress("redis://192.168.8.127:6380")
        .addNodeAddress("redis://192.168.8.128:6380")
        .addNodeAddress("redis://192.168.8.129:6380");
    return Redisson.create(config);
}

c. 调用Redission的分布式锁

public String payRedisson(Long orderId)throws InterruptedException{
    RLockr Lock1=redissonClient.getLock("order_lock"+orderId);
    if(rLock1.tryLock(-1,-1,TimeUnit.SECONDS)){
        rLock1.tryLock(-1,-1,TimeUnit.SECONDS);
        System.out.println("获取锁成功");
        Thread.sleep(500000);
        rLock1.unlock();
        return"处理完成";
    } else {
        System.out.println("获取锁失败");    
        return"请稍等,已经有人在支付!!";
}    

 d. 暴露接口

@RequestMapping(value="/pay_lock",method= RequestMethod.GET)
@ResponseBody
public String payRedisson(@RequestParam("id") Long id) throws InterruptedException{
    return product Service.payRedisson(id);
}

 4.2 Reddission分布式流程图

 4.3 Reddison源码解析

4.3.1 初始化锁对象

RLockr Lock1=redissonClient.getLock("order_lock"+orderId);

主要初始化3个方法: RedissonLock、RedissonBaseLock、RedissonObject 

主要是一些任务执行器、大key,UUID以及过期时间、订阅服务的一些初始化

4.3.2 加锁方法

    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);  // 锁等待时间
        long current = System.currentTimeMillis();  // 当前时间
        final long threadId = Thread.currentThread().getId();  // 当前线程
        Long ttl = this.tryAcquire(leaseTime, unit, threadId); // 抢占锁
        if (ttl == null) {   // 抢占成功 执行业务逻辑
            return true;
        } else {   // 抢占失败的逻辑处理
            time -= System.currentTimeMillis() - current;    // 看当前时间是否已经过了锁等待时间
            if (time <= 0L) {   // 如果过了 直接失败
                this.acquireFailed(threadId);
                return false;
            } else {    // 如果没过锁等待时间
                current = System.currentTimeMillis();
                // 当前线程的订阅  也就是说在锁等待时间内 如果锁释放了 则就进行直接订阅
                final RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
                if (!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
                    // 订阅失败 获取锁失败
                    if (!subscribeFuture.cancel(false)) {
                        subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                            public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                                if (subscribeFuture.isSuccess()) {
                                    RedissonLock.this.unsubscribe(subscribeFuture, threadId);
                                }

                            }
                        });
                    }

                    this.acquireFailed(threadId);
                    return false;
                } else {
                    // 表示订阅成功了  就是监听到了锁释放  开始进行抢占锁
                    boolean var14;
                    try {
                        time -= System.currentTimeMillis() - current;
                        if (time > 0L) {
                            boolean var16;
                            do {
                                long currentTime = System.currentTimeMillis();
                                ttl = this.tryAcquire(leaseTime, unit, threadId);
                                if (ttl == null) {   // 表示抢占成功
                                    var16 = true;
                                    return var16;
                                }

                                time -= System.currentTimeMillis() - currentTime;
                                if (time <= 0L) {   // 已过锁等待时间  抢占失败
                                    this.acquireFailed(threadId);
                                    var16 = false;
                                    return var16;
                                }
                                                                      
                                currentTime = System.currentTimeMillis();
                                // 如果没抢到锁,并且也还没到锁的释放时间 使用Semaphore进行阻塞,阻塞时间 : 剩余等待时间ttl< 锁释放时间time? 剩余等待时间: 锁释放时间
                                if (ttl >= 0L && ttl < time) { 
                                    this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                                } else {
                                    this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                                }

                                time -= System.currentTimeMillis() - currentTime;
                            } while(time > 0L);

                            this.acquireFailed(threadId);
                            var16 = false;
                            return var16;
                        }

                        this.acquireFailed(threadId);
                        var14 = false;
                    } finally {
                        this.unsubscribe(subscribeFuture, threadId);
                    }

                    return var14;
                }
            }
        }
    }

整个逻辑的实现就是我们4.2的左半部分 

下面我们首先看一个Semaphore的demo 知道是怎么进行阻塞的

Semaphoresemaphore=newSemaphore(0);
semaphore.release(); //释放semaphore才能后面判断为true if(semaphore.tryAcquire(10,TimeUnit.SECONDS))
{
    System.out.println("10s过后");
}
System.out.println("1");

 接着我画了一个上述代码的时间轴图 可帮助理解

 4.3.2.1 tryAcquire(可重入锁)

tracquire就是重入锁抢占的一个实现,我们主要看一下lua脚本的实现

其中KEYS[1]是我的大key: order_lock111 ARGV[1]=30000 ARGV[2]:UUID+线程ID

if(redis.call('exists',KEYS[1])==0)  //判断我的锁是否存在 =0为不存在没人抢占锁
then
    redis.call('hincrby',KEYS[1],ARGV[2],1); //把我的小key+1
    redis.call('pexpire',KEYS[1],ARGV[1]); //设置过期时间30s
eturnnil;
end;
if(redis.call('hexists',KEYS[1],ARGV[2])==1) //进入该逻辑说明有线程抢占了锁继续判断是否同一个线程==1为同一线程
then 
    redis.call('hincrby', KEYS[1],ARGV[2],1); 把我的小key+1 代表重入次数
    redis.call('pexpire',KEYS[1],ARGV[1]);  //设置过期时间30s
returnnil;
end;
return redis.call('pttl',KEYS[1]); //前面2个if都没进,说明有人抢占并且不是同一线程,直接返回还有多少ms过期

5. watchDog看门狗机制之时间轮算法

watchDog看门够机制主要是为了解决锁时间到了,但是业务还没执行完,我希望我这个锁一直被我占有,我业务没执行完,就给我续上。怎么做?无非就是开个定时任务,定时的去判断下这个锁还存不存在,如果存在,续期,如果不存在,不续期。只有分布式锁中默认的30s时间才会触发看门狗机制

定时调度任务的方式有很多,比如es-job ssl-job  rabbitmq死信队列等分布式的,还有Netty下面的时间轮算法,redis分布式就是采用的时间轮算法。

5.1 hash轮定时器demo

    public static void main(String[] args) {
        Timer timer = new HashedWheelTimer();  // 创建一个HashedWheelTimer hash轮定时器
        // 提交一个任务 让它在10s后执行
        timer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                System.out.println("10s 后执行该任务1");
            }
        }, 10, TimeUnit.SECONDS);
        // 再提交一个任务,让它在20s后执行
        timer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                System.out.println("20s 后执行该任务1");
            }
        }, 20, TimeUnit.SECONDS);

        // 提交一个任务 让它在10s后执行
        timer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                System.out.println("5s 后执行该任务");
            }
        }, 5, TimeUnit.SECONDS);
    }

执行结果:

 

可以看到 hash轮定时器是只会延时执行一次。 

5.1时间轮算法原理

有家足浴店,他们的技师是这么安排的!
总共有 8 个技师,分别是技师 1.2.3.4.5.6.7.8 号。
技师 1 :负责 0-3 点的顾客
技师 2 :负责 3-6 点的顾客
技师 3 :负责 6-9 点的顾客
技师 4 :负责 9-12 点的顾客
技师 5 :负责 12-15 点的顾客
技师 6 :负责 15-18 点的顾客
技师 7 :负责 18-21 点的顾客
技师 8 :负责 21-24 点的顾客
 
那么,假如哪天放假,现在是早上 6 点,我想 9 个小时候去按摩,会是哪个技师为我服务?
早点 6 点,一天已经过了 6 个小时,那么我服务的绝对时间是 6+9=15 个小时之后。,每个技师是3 小时,那么前面总共要 5 个技师,到了 15 以后就得技师6 来负责。
那么,哪天晚上我累了,我想去按摩,假如现在是 21 点,我想预约 6 个小时后的按摩,就是6 个小时后我需要按摩,那么哪个技师为我按摩?
技师 2 我们已知一天会有 8 个技师轮询,一天已经过了 21 点,而我是 6个小 时,那么假如从按摩店的角度去看的话,我要执行的决定时间 21+6 也就是 27 小时, 1 个技师的服务时间是 3 个小时,所以我前面 27 个小时需要 9 个技 师服务,那么你是 27 小时候,就得第 10 个技师,因为是每天轮询的,所以 第十个是技师 2

时间轮算法流程图: https://www.processon.com/view/link/62b2d1751e08530728b1f954

5.2 Redisson中的看门狗机制

在抢占锁的时候,如果抢占成功,才会触发看门狗机制

 

我们主要看newTimeout部分

            Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                public void run(Timeout timeout) throws Exception {
                    RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
                    future.addListener(new FutureListener<Boolean>() {
                        public void operationComplete(Future<Boolean> future) throws Exception {
                            RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
                            if (!future.isSuccess()) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
                            } else {
                                if ((Boolean)future.getNow()) {
                                    RedissonLock.this.scheduleExpirationRenewal(threadId);
                                }

                            }
                        }
                    });
                }
            }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);

 run方法里面为任务,this.internalLockLeaseTime / 3L 代表是多长时间执行1次,等于10s,future中的任务是判断主线程中的业务key是否需要续期,并且重置为默认时间30s; 也就是说,10s会去监听一下执行该延时任务,如果该业务key还在,就续期。那么该延时任务究竟是怎么执行的呢?

newTimeout源码 执行start方法,该start方法会另起一个守护线程,执行我们的task

 守护线程的真正执行方法在Work的run方法中

      public void run() {
            // 得到当前的系统时间
            HashedWheelTimer.this.startTime = System.nanoTime();
            if (HashedWheelTimer.this.startTime == 0L) {
                HashedWheelTimer.this.startTime = 1L;
            }
            // 得到了当前的系统时间后,主线程可以正常执行
            HashedWheelTimer.this.startTimeInitialized.countDown();

            int idx;
            HashedWheelBucket bucket;
            do {
               // 获取指针下一次走到的时间,如果没到下一次时间,就等待
                long deadline = this.waitForNextTick();
                if (deadline > 0L) {
                    // 根据tick与轮的大小取模 得到当前tick所在的环的下标
                    idx = (int)(this.tick & (long)HashedWheelTimer.this.mask);
                    // 移除已经取消了的任务
                    this.processCancelledTasks();
                    // 根据idx下标 得到轮的hash桶
                    bucket = HashedWheelTimer.this.wheel[idx];
                     // 将队列的任务放到相关的hash桶,如果一个数组有多个任务,则采用链表的形式
                    this.transferTimeoutsToBuckets();
                    // 去执行hash桶下的任务
                    bucket.expireTimeouts(deadline);
                    ++this.tick;
                }
            } while(HashedWheelTimer.WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 1);

            HashedWheelBucket[] var5 = HashedWheelTimer.this.wheel;
            int var2 = var5.length;

            for(idx = 0; idx < var2; ++idx) {
                bucket = var5[idx];
                bucket.clearTimeouts(this.unprocessedTimeouts);
            }

            while(true) {
                HashedWheelTimeout timeout = (HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
                if (timeout == null) {
                    this.processCancelledTasks();
                    return;
                }

                if (!timeout.isCancelled()) {
                    this.unprocessedTimeouts.add(timeout);
                }
            }
        }

5.2.1  waitForNextTick方法(获取指针走到下一次时间)

     private long waitForNextTick() {
            // 100ms走一次,得到下一次得deadLine
            long deadline = HashedWheelTimer.this.tickDuration * (this.tick + 1L);

            while(true) {
                // 得到当前时间跟开始时间的差值(时间轮走了多少时间)
                long currentTime = System.nanoTime() - HashedWheelTimer.this.startTime;
                //sleepTimeMs如果大于0,说明还没走到指针该走的时间,睡眠
                long sleepTimeMs = (deadline - currentTime + 999999L) / 1000000L;
                //如果到了时间,直接返回
                if (sleepTimeMs <= 0L) {
                    if (currentTime == Long.MIN_VALUE) {
                        return -9223372036854775807L;
                    }

                    return currentTime;
                }

                if (PlatformDependent.isWindows()) {
                    sleepTimeMs = sleepTimeMs / 10L * 10L;
                    if (sleepTimeMs == 0L) {
                        sleepTimeMs = 1L;
                    }
                }

                try {
                    //如果没到指针时间,睡眠
                    Thread.sleep(sleepTimeMs);
                } catch (InterruptedException var8) {
                    if (HashedWheelTimer.WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 2) {
                        return Long.MIN_VALUE;
                    }
                }
            }
        }

5.2.2  transferTimeoutsToBuckets方法(给队列中的任务找对应的hash桶)

       private void transferTimeoutsToBuckets() {
            //从队列中获取任务,每次最多只能获取100000次
            for(int i = 0; i < 100000; ++i) {
                //从队列中获取task,任务会在主线程中添加到队列
                HashedWheelTimeout timeout = (HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
                if (timeout == null) {
                    break;
                }

                if (timeout.state() != 1) {
                    // 任务的deadline/每次指针时间
                    // 从第一个starttime开始得到需要多少指针
                    long calculated = timeout.deadline / HashedWheelTimer.this.tickDuration;
                    /轮数 默认512的环的大小,如果大于,则需要在第二次环的时候才会执行
                    timeout.remainingRounds = (calculated - this.tick) / (long)HashedWheelTimer.this.wheel.length;
                    // 取2个的最大值,确保不会把时间安排在过去
                    long ticks = Math.max(calculated, this.tick);
                    //取模得到我的指针跳到哪个hash环执行
                    int stopIndex = (int)(ticks & (long)HashedWheelTimer.this.mask);
                    //放到对应的hashWheeld的下标中
                    HashedWheelBucket bucket = HashedWheelTimer.this.wheel[stopIndex];
                    //如果这个hash下标中有值,则添加至链表后面
                    bucket.addTimeout(timeout);
                }
            }

        }

5.2.3  expireTimeouts方法(限期执行hash桶下的任务)

      public void expireTimeouts(long deadline) {
            HashedWheelTimeout next;
             // 循环hash环中链表
            for(HashedWheelTimeout timeout = this.head; timeout != null; timeout = next) {
                next = timeout.next;
                // 是不是当前轮应该执行的任务
                if (timeout.remainingRounds <= 0L) {
                    /从链表清除
                    next = this.remove(timeout);
                    // 判断是否到了执行时间
                    if (timeout.deadline > deadline) {
                        throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                    //调用timeout.expire方法,执行task
                    timeout.expire();
                } else if (timeout.isCancelled()) {
                    next = this.remove(timeout);
                } else {
                    --timeout.remainingRounds;
                }
            }

        }

expire执行task任务

5.3 手动释放分布式锁

KEYS[1] 为大 key KEYS[2] 为加锁的时候订阅的 channelName
ARGV[1]=UNLOCK_MESSAGE , ARGV[2]=释放时间 ARGV[3]= 重入的线程信息
if(redis.call('hexists',KEYS[1],ARGV[3])==0) //如果小key不存在  返回空
    then
    return nil;
    end;
local counter=redis.call('hincrby',KEYS[1],ARGV[3],-1); //如果存在 可重入次数-1
if(counter>0)  //如果大于0,说明加锁次数大于释放锁次数,不能释放锁
    then redis.call('pexpire',KEYS[1],ARGV[2]); //设置锁的过期时间
    return0;//返回0 
else
    redis.call('del',KEYS[1]); //删除大key
    redis.call('publish',KEYS[2],ARGV[1]);//往订阅的频道发送message,发送UNLOCK_MESSAGE
    return1;
    end;
returnnil;

5.4 联锁

Redis做分布式遇到的问题?

因为 Redis 属于 CAP 中的 AP, 为了优先保证高可用性,所以会牺牲一定的数据一致性。比如主从方案中,如果主库挂的话,从库是不管数据有没有同步完主库的数据,都会自动升级主。
那么这样就会出现一种情况:加锁返回是成功的,但是由于发生了主库挂的,从库切换的时候,没有同步到这个锁,从而导致锁失效。
那么这个怎么解决?解决不了,能做的是我尽可能的去减少这种情况!
怎么解决?既然你一个主从实例,会导致锁丢失,那么假如说我把这种风险分担,同时在5 台机器加锁,这 5 台机器只要有一半以上的锁能成功获取,这样就尽可能的减少了锁丢失的场景。
就相当于你家的门钥匙,假如钥匙只有一把,你丢了就丢了,但是把这个钥匙给你们家里面所有的人,那么你丢了家门还是能进去。因为家里其他人还有。

 实现方案: 给你同时往多个主节点+同一个锁 只要超过一半的节点能够去加锁成功,就成功

显示全文