我们直接在并发编程阶段学习过synchronized lock这些都是单机锁。什么是锁呢? 就是我们有一些互斥资源,不能并行执行,需要一个东西来保证是串行执行的 锁
单机锁的弊端
如图,有3个独立的订单服务,4个用户并发从客端访问,用户12访问订单服务1,由于单机锁,所以可以有一个用户能够进行支付操作,用户3 用户4分别进入订单服务2 订单服务3 也能够进行支付操作,因此我们想让4个用户在一个时间内只有一个用户能够进行支付操作,单机锁就无法做到
所以就引入了分布式锁
分布式锁是把锁放在第三方,而不是在单机服务中,用户1-4 获取锁都到第三方服务中去获取,这样就可以保证分布式服务下的支付操作时是串行执行的了
需求:
a.只能有一个线程能同时执行互斥的资源
b.其它的线程执行的时候,有线程在执行的话,要么等待,要么报错
需要实现这个需求的3个重要条件
a. 标记 redis是k-v结构 key作为一个标记,这个key存在的话说明有了标记,key不存在,说明没有标记
b.标记的可见性 单线程命令执行, 必须要set完 你才能get
c. 保证原子性 setnx 单线执行
setnx其实等价于两个命令(exists 和 set),假如我不想setnx,那么流程就等价于:
if(exists(order:111)==0) //没锁
{
set(order:111,1)
}
但是如果这样写,就会发现一个问题,这两个指令不是原子性的,就会导致同时进入判断中,得到没锁的场景
因此我们应该思考的是怎么保证多个指令的原子性?
redis的开启提交回滚命令分别为multi、exec、discard
但是事务的特点是
1.命令是原子的 在执行事务中的指令时,服务器阻塞,不能执行其他指令2.不能拿到事务中间的指令的结果 来决定后面指令的逻辑。
因此,不能够通过判断而决定后面的逻辑,事务不能够满足我们上述的需求,因而采用Lua脚本
lua语言是一个轻量级的脚本语言,能够完美的融入redis中。
redis调用lua脚本 采用eval指令
预填充键值 如果传入的键是1 返回第一个参数 否则返回第二个参数
127.0 . 0.1 : 6379 > eval "if KEYS[1]=='1' then return ARGV[1]end return ARGV[2] " 1 1 'HUIHUI' 'XIAOHUIHUI'"HUIHUI"127.0 . 0.1 : 6379 > eval "if KEYS[1]=='1' then return ARGV[1]end return ARGV[2] " 1 2 'HUIHUI' 'XIAOHUIHUI'"XIAOHUIHUI"
既然可以传入键值对,我们能不能在lua脚本里面再去执行redis的指令,并且可以根据返回的结果继续执行后续的逻辑。
判断一个key是不是存在,如果不存在,则调用set的redis指令,在lua脚本调用redis指令,用redis.call
127.0 . 0.1 : 6379 > eval "localkey=redis.call('exists',KEYS[1]) if key==0 then returnredis.call('set',KEYS[1],ARGV[1]) end return 1" 1 nameHUIHUIOK127.0 . 0.1 : 6379 > get name"HUIHUI"
Lua脚本中的指令是原子的,在执行lua脚本期间,其它指令阻塞,必须等待lua脚本的指令执行完毕。 所以单个lua脚本的指令不宜有太多指令。
lua脚本的使用场景
1.需要原子性地执行多个命令2.需要中间值来组合后面的命令3.需要中间值来编排后面的命令
我们使用lua脚本来保证多个指令的原子性,实现我们的分布式锁。
private static String script = "" +
"local lockSet = redis.call('exists', KEYS[1])\n"
+ "if lockSet == 0 then\n" +
"redis.call('set', KEYS[1], ARGV[2])\n" +
//设置过期时间,防止死锁
"redis.call('expire', KEYS[1], ARGV[2])\n" +
"end\n" +
"return lockSet\n";
加锁方法
public String pay(Long orderId) {
try {
Long lock = (Long)
redisTemplate.execute(RedisScript.of(script, Long.class),
Arrays.asList("pay:" + orderId), "1", 30);
if (lock == 0) {
//模拟支付业务代码执行10s
Thread.sleep(10000);
//处理完业务逻辑删除锁 异常了
redisTemplate.delete("lock");
}
return lock == 0 ? "正常支付完毕" : "请稍等,已经有人在支付!!";
} catch (Exception exception) {
redisTemplate.delete("lock" + orderId);
return "系统异常";
}
}
对外接口
@RequestMapping(value = "/pay",method = RequestMethod.GET)
@ResponseBody
public String pay(@RequestParam("id") Long id) throws InterruptedException {
return productService.pay(id);
}
思考:
a. 导入redission客户端包
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.5.4</version>
</dependency>
b.初始化RedissionClient,将RedissionClient交给Spring容器管理
@Bean
RedissonClientgetRedissonClient(){
Configconfig=newConfig();
config.useClusterServers() //cluster方式至少6个节点(3主3从,3主做sharding,3从用来保证主宕机后可以高可用)
.addNodeAddress("redis://192.168.8.127:6381")
.addNodeAddress("redis://192.168.8.128:6381")
.addNodeAddress("redis://192.168.8.129:6381")
.addNodeAddress("redis://192.168.8.127:6380")
.addNodeAddress("redis://192.168.8.128:6380")
.addNodeAddress("redis://192.168.8.129:6380");
return Redisson.create(config);
}
c. 调用Redission的分布式锁
public String payRedisson(Long orderId)throws InterruptedException{
RLockr Lock1=redissonClient.getLock("order_lock"+orderId);
if(rLock1.tryLock(-1,-1,TimeUnit.SECONDS)){
rLock1.tryLock(-1,-1,TimeUnit.SECONDS);
System.out.println("获取锁成功");
Thread.sleep(500000);
rLock1.unlock();
return"处理完成";
} else {
System.out.println("获取锁失败");
return"请稍等,已经有人在支付!!";
}
d. 暴露接口
@RequestMapping(value="/pay_lock",method= RequestMethod.GET)
@ResponseBody
public String payRedisson(@RequestParam("id") Long id) throws InterruptedException{
return product Service.payRedisson(id);
}
RLockr Lock1=redissonClient.getLock("order_lock"+orderId);
主要初始化3个方法: RedissonLock、RedissonBaseLock、RedissonObject
主要是一些任务执行器、大key,UUID以及过期时间、订阅服务的一些初始化
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime); // 锁等待时间
long current = System.currentTimeMillis(); // 当前时间
final long threadId = Thread.currentThread().getId(); // 当前线程
Long ttl = this.tryAcquire(leaseTime, unit, threadId); // 抢占锁
if (ttl == null) { // 抢占成功 执行业务逻辑
return true;
} else { // 抢占失败的逻辑处理
time -= System.currentTimeMillis() - current; // 看当前时间是否已经过了锁等待时间
if (time <= 0L) { // 如果过了 直接失败
this.acquireFailed(threadId);
return false;
} else { // 如果没过锁等待时间
current = System.currentTimeMillis();
// 当前线程的订阅 也就是说在锁等待时间内 如果锁释放了 则就进行直接订阅
final RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
if (!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
// 订阅失败 获取锁失败
if (!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (subscribeFuture.isSuccess()) {
RedissonLock.this.unsubscribe(subscribeFuture, threadId);
}
}
});
}
this.acquireFailed(threadId);
return false;
} else {
// 表示订阅成功了 就是监听到了锁释放 开始进行抢占锁
boolean var14;
try {
time -= System.currentTimeMillis() - current;
if (time > 0L) {
boolean var16;
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl == null) { // 表示抢占成功
var16 = true;
return var16;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) { // 已过锁等待时间 抢占失败
this.acquireFailed(threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
// 如果没抢到锁,并且也还没到锁的释放时间 使用Semaphore进行阻塞,阻塞时间 : 剩余等待时间ttl< 锁释放时间time? 剩余等待时间: 锁释放时间
if (ttl >= 0L && ttl < time) {
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
this.acquireFailed(threadId);
var16 = false;
return var16;
}
this.acquireFailed(threadId);
var14 = false;
} finally {
this.unsubscribe(subscribeFuture, threadId);
}
return var14;
}
}
}
}
整个逻辑的实现就是我们4.2的左半部分
下面我们首先看一个Semaphore的demo 知道是怎么进行阻塞的
Semaphoresemaphore=newSemaphore(0);
semaphore.release(); //释放semaphore才能后面判断为true if(semaphore.tryAcquire(10,TimeUnit.SECONDS))
{
System.out.println("10s过后");
}
System.out.println("1");
接着我画了一个上述代码的时间轴图 可帮助理解
tracquire就是重入锁抢占的一个实现,我们主要看一下lua脚本的实现
其中KEYS[1]是我的大key: order_lock111 ARGV[1]=30000 ARGV[2]:UUID+线程ID
if(redis.call('exists',KEYS[1])==0) //判断我的锁是否存在 =0为不存在没人抢占锁
then
redis.call('hincrby',KEYS[1],ARGV[2],1); //把我的小key+1
redis.call('pexpire',KEYS[1],ARGV[1]); //设置过期时间30s
eturnnil;
end;
if(redis.call('hexists',KEYS[1],ARGV[2])==1) //进入该逻辑说明有线程抢占了锁继续判断是否同一个线程==1为同一线程
then
redis.call('hincrby', KEYS[1],ARGV[2],1); 把我的小key+1 代表重入次数
redis.call('pexpire',KEYS[1],ARGV[1]); //设置过期时间30s
returnnil;
end;
return redis.call('pttl',KEYS[1]); //前面2个if都没进,说明有人抢占并且不是同一线程,直接返回还有多少ms过期
watchDog看门够机制主要是为了解决锁时间到了,但是业务还没执行完,我希望我这个锁一直被我占有,我业务没执行完,就给我续上。怎么做?无非就是开个定时任务,定时的去判断下这个锁还存不存在,如果存在,续期,如果不存在,不续期。只有分布式锁中默认的30s时间才会触发看门狗机制
定时调度任务的方式有很多,比如es-job ssl-job rabbitmq死信队列等分布式的,还有Netty下面的时间轮算法,redis分布式就是采用的时间轮算法。
public static void main(String[] args) {
Timer timer = new HashedWheelTimer(); // 创建一个HashedWheelTimer hash轮定时器
// 提交一个任务 让它在10s后执行
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("10s 后执行该任务1");
}
}, 10, TimeUnit.SECONDS);
// 再提交一个任务,让它在20s后执行
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("20s 后执行该任务1");
}
}, 20, TimeUnit.SECONDS);
// 提交一个任务 让它在10s后执行
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("5s 后执行该任务");
}
}, 5, TimeUnit.SECONDS);
}
执行结果:
可以看到 hash轮定时器是只会延时执行一次。
有家足浴店,他们的技师是这么安排的!总共有 8 个技师,分别是技师 1.2.3.4.5.6.7.8 号。技师 1 :负责 0-3 点的顾客技师 2 :负责 3-6 点的顾客技师 3 :负责 6-9 点的顾客技师 4 :负责 9-12 点的顾客技师 5 :负责 12-15 点的顾客技师 6 :负责 15-18 点的顾客技师 7 :负责 18-21 点的顾客技师 8 :负责 21-24 点的顾客那么,假如哪天放假,现在是早上 6 点,我想 9 个小时候去按摩,会是哪个技师为我服务?早点 6 点,一天已经过了 6 个小时,那么我服务的绝对时间是 6+9=15 个小时之后。,每个技师是3 小时,那么前面总共要 5 个技师,到了 15 以后就得技师6 来负责。那么,哪天晚上我累了,我想去按摩,假如现在是 21 点,我想预约 6 个小时后的按摩,就是6 个小时后我需要按摩,那么哪个技师为我按摩?技师 2 我们已知一天会有 8 个技师轮询,一天已经过了 21 点,而我是 6个小 时,那么假如从按摩店的角度去看的话,我要执行的决定时间 21+6 也就是 27 小时, 1 个技师的服务时间是 3 个小时,所以我前面 27 个小时需要 9 个技 师服务,那么你是 27 小时候,就得第 10 个技师,因为是每天轮询的,所以 第十个是技师 2 。
时间轮算法流程图: https://www.processon.com/view/link/62b2d1751e08530728b1f954
在抢占锁的时候,如果抢占成功,才会触发看门狗机制
我们主要看newTimeout部分
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
future.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
if (!future.isSuccess()) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
} else {
if ((Boolean)future.getNow()) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
run方法里面为任务,this.internalLockLeaseTime / 3L 代表是多长时间执行1次,等于10s,future中的任务是判断主线程中的业务key是否需要续期,并且重置为默认时间30s; 也就是说,10s会去监听一下执行该延时任务,如果该业务key还在,就续期。那么该延时任务究竟是怎么执行的呢?
newTimeout源码 执行start方法,该start方法会另起一个守护线程,执行我们的task
守护线程的真正执行方法在Work的run方法中
public void run() {
// 得到当前的系统时间
HashedWheelTimer.this.startTime = System.nanoTime();
if (HashedWheelTimer.this.startTime == 0L) {
HashedWheelTimer.this.startTime = 1L;
}
// 得到了当前的系统时间后,主线程可以正常执行
HashedWheelTimer.this.startTimeInitialized.countDown();
int idx;
HashedWheelBucket bucket;
do {
// 获取指针下一次走到的时间,如果没到下一次时间,就等待
long deadline = this.waitForNextTick();
if (deadline > 0L) {
// 根据tick与轮的大小取模 得到当前tick所在的环的下标
idx = (int)(this.tick & (long)HashedWheelTimer.this.mask);
// 移除已经取消了的任务
this.processCancelledTasks();
// 根据idx下标 得到轮的hash桶
bucket = HashedWheelTimer.this.wheel[idx];
// 将队列的任务放到相关的hash桶,如果一个数组有多个任务,则采用链表的形式
this.transferTimeoutsToBuckets();
// 去执行hash桶下的任务
bucket.expireTimeouts(deadline);
++this.tick;
}
} while(HashedWheelTimer.WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 1);
HashedWheelBucket[] var5 = HashedWheelTimer.this.wheel;
int var2 = var5.length;
for(idx = 0; idx < var2; ++idx) {
bucket = var5[idx];
bucket.clearTimeouts(this.unprocessedTimeouts);
}
while(true) {
HashedWheelTimeout timeout = (HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
if (timeout == null) {
this.processCancelledTasks();
return;
}
if (!timeout.isCancelled()) {
this.unprocessedTimeouts.add(timeout);
}
}
}
private long waitForNextTick() {
// 100ms走一次,得到下一次得deadLine
long deadline = HashedWheelTimer.this.tickDuration * (this.tick + 1L);
while(true) {
// 得到当前时间跟开始时间的差值(时间轮走了多少时间)
long currentTime = System.nanoTime() - HashedWheelTimer.this.startTime;
//sleepTimeMs如果大于0,说明还没走到指针该走的时间,睡眠
long sleepTimeMs = (deadline - currentTime + 999999L) / 1000000L;
//如果到了时间,直接返回
if (sleepTimeMs <= 0L) {
if (currentTime == Long.MIN_VALUE) {
return -9223372036854775807L;
}
return currentTime;
}
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10L * 10L;
if (sleepTimeMs == 0L) {
sleepTimeMs = 1L;
}
}
try {
//如果没到指针时间,睡眠
Thread.sleep(sleepTimeMs);
} catch (InterruptedException var8) {
if (HashedWheelTimer.WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 2) {
return Long.MIN_VALUE;
}
}
}
}
private void transferTimeoutsToBuckets() {
//从队列中获取任务,每次最多只能获取100000次
for(int i = 0; i < 100000; ++i) {
//从队列中获取task,任务会在主线程中添加到队列
HashedWheelTimeout timeout = (HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
if (timeout == null) {
break;
}
if (timeout.state() != 1) {
// 任务的deadline/每次指针时间
// 从第一个starttime开始得到需要多少指针
long calculated = timeout.deadline / HashedWheelTimer.this.tickDuration;
/轮数 默认512的环的大小,如果大于,则需要在第二次环的时候才会执行
timeout.remainingRounds = (calculated - this.tick) / (long)HashedWheelTimer.this.wheel.length;
// 取2个的最大值,确保不会把时间安排在过去
long ticks = Math.max(calculated, this.tick);
//取模得到我的指针跳到哪个hash环执行
int stopIndex = (int)(ticks & (long)HashedWheelTimer.this.mask);
//放到对应的hashWheeld的下标中
HashedWheelBucket bucket = HashedWheelTimer.this.wheel[stopIndex];
//如果这个hash下标中有值,则添加至链表后面
bucket.addTimeout(timeout);
}
}
}
public void expireTimeouts(long deadline) {
HashedWheelTimeout next;
// 循环hash环中链表
for(HashedWheelTimeout timeout = this.head; timeout != null; timeout = next) {
next = timeout.next;
// 是不是当前轮应该执行的任务
if (timeout.remainingRounds <= 0L) {
/从链表清除
next = this.remove(timeout);
// 判断是否到了执行时间
if (timeout.deadline > deadline) {
throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
//调用timeout.expire方法,执行task
timeout.expire();
} else if (timeout.isCancelled()) {
next = this.remove(timeout);
} else {
--timeout.remainingRounds;
}
}
}
expire执行task任务
KEYS[1] 为大 key KEYS[2] 为加锁的时候订阅的 channelNameARGV[1]=UNLOCK_MESSAGE , ARGV[2]=释放时间 ARGV[3]= 重入的线程信息
if(redis.call('hexists',KEYS[1],ARGV[3])==0) //如果小key不存在 返回空
then
return nil;
end;
local counter=redis.call('hincrby',KEYS[1],ARGV[3],-1); //如果存在 可重入次数-1
if(counter>0) //如果大于0,说明加锁次数大于释放锁次数,不能释放锁
then redis.call('pexpire',KEYS[1],ARGV[2]); //设置锁的过期时间
return0;//返回0
else
redis.call('del',KEYS[1]); //删除大key
redis.call('publish',KEYS[2],ARGV[1]);//往订阅的频道发送message,发送UNLOCK_MESSAGE
return1;
end;
returnnil;
Redis做分布式遇到的问题?
因为 Redis 属于 CAP 中的 AP, 为了优先保证高可用性,所以会牺牲一定的数据一致性。比如主从方案中,如果主库挂的话,从库是不管数据有没有同步完主库的数据,都会自动升级主。那么这样就会出现一种情况:加锁返回是成功的,但是由于发生了主库挂的,从库切换的时候,没有同步到这个锁,从而导致锁失效。那么这个怎么解决?解决不了,能做的是我尽可能的去减少这种情况!怎么解决?既然你一个主从实例,会导致锁丢失,那么假如说我把这种风险分担,同时在5 台机器加锁,这 5 台机器只要有一半以上的锁能成功获取,这样就尽可能的减少了锁丢失的场景。就相当于你家的门钥匙,假如钥匙只有一把,你丢了就丢了,但是把这个钥匙给你们家里面所有的人,那么你丢了家门还是能进去。因为家里其他人还有。
实现方案: 给你同时往多个主节点+同一个锁 只要超过一半的节点能够去加锁成功,就成功