1、分布式的特点

分布式系统由独立的服务器通过网络松散耦合组成。在这个系统中每个服务器都是一台独立的主机,服务器之间通过内部网络连接。分布式系统有以下几个特点:

  • 可扩展性:可通过横向水平扩展提高系统的性能和吞吐量。
  • 高可靠性:高容错,即使系统中一台或几台故障,系统仍可提供服务。
  • 高并发性:各机器并行独立处理和计算。
  • 廉价高效:多台小型机而非单台高性能机。

2、分布式锁条件

  1. 可重入:线程中的可重入,指的是外层函数获得锁之后,内层也可以获得锁,ReentrantLock和synchronized都是可重入锁;衍生到分布式环境中,一般仍然指的是线程的可重入,在绝大多数分布式环境中,都要求分布式锁是可重入的。
  2. 惊群效应(Herd Effect):在分布式锁中,惊群效应指的是,在有多个请求等待获取锁的时候,一旦占有锁的线程释放之后,如果所有等待的方都同时被唤醒,尝试抢占锁。但是这样的情况会造成比较大的开销,那么在实现分布式锁的时候,应该尽量避免惊群效应的产生。
  3. 公平锁和非公平锁:不同的需求,可能需要不同的分布式锁。非公平锁普遍比公平锁开销小。但是业务需求如果必须要锁的竞争者按顺序获得锁,那么就需要实现公平锁。
  4. 阻塞锁和自旋锁:针对不同的使用场景,阻塞锁和自旋锁的效率也会有所不同。阻塞锁会有上下文切换,如果并发量比较高且临界区的操作耗时比较短,那么造成的性能开销就比较大了。但是如果临界区操作耗时比较长,一直保持自旋,也会对CPU造成更大的负荷。

3、分布式锁的实现方式

  • 基于Redis的分布式锁
  • MySQL数据库乐观锁
  • 基于ZooKeeper的分布式锁

3.1、Redis分布式锁的实现原理

  • Redis分布式锁的基本原理:用一个状态值表示锁,对锁的占用和释放通过状态值来标识。
  • Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。redis的SETNX命令可以方便的实现分布式锁。
  • 安全属性(互斥):不管任何时候,只有一个客户端能持有同一个锁。
  • 效率属性
    • 不会死锁,最终一定会得到锁,就算一个持有锁的客户端宕掉或者发生网络分区。
    • 容错,只要大多数Redis节点正常工作,客户端应该都能获取和释放锁。
  • 多个进程执行以下Redis命令
SETNX lock.foo < current Unix time + lock timeout + 1>
- 如果 SETNX 返回1,说明该进程获得锁,SETNX 将键 lock.foo 的值设置为锁的超时时间(当前时间 + 锁的有效时间)。
- 如果 SETNX 返回0,说明其他进程已经获得了锁,进程不能进入临界区。进程可以在一个循环中不断地尝试 SETNX 操作,以获得锁。
  • Redis分布式锁实现的关键
    • 原子命令加锁:SET key random_value NX PX 30000
    • 设置值random_value是随机的,为了更安全的释放锁。例如:使用uuid
    • 释放锁的时候需要检查 key 是否存在,且 key 对应的值是否和指定的值相等,相等才能释放锁。为了保障原子性,需要用 lua 脚本

3.1.1、利用SETNX和SETEX实现

public class Setexnx {
 
    private static Logger logger = Logger.getLogger(Setexnx.class.getName());
 
    //最长时间锁为1小时
    private final static int maxExpireTime = 1 * 60 * 60;
 
    //系统时间偏移量15秒,服务器间的系统时间差不可以超过15秒,避免由于时间差造成错误的解锁
    private final static int offsetTime = 15;
 
    //分布式锁的实现
    public static boolean Lock(String key, String value, int waitTime, int expire) {
 
        long start = System.currentTimeMillis();
        String lock_key = key + "_lock";
        logger.info("开始获取分布式锁 key:" + key + " lock_key:" + lock_key + " value:" + value);
 
        do {
            try {
                Thread.sleep(1);
                long ret = CacheUtils.Setnx(CacheSpacePrefixEnum.TOOLBAR_SYS.name(), lock_key, System.currentTimeMillis() + "undefined" + value,
                        (expire > maxExpireTime) ? maxExpireTime : expire);
                if (ret == 1) {
                    logger.info("成功获得分布式锁 key:" + key + " value:" + value);
                    return Boolean.TRUE;
                } else { // 存在锁,并对死锁进行修复
                    String desc = CacheUtils.GSetnx(CacheSpacePrefixEnum.TOOLBAR_SYS.name(), lock_key);
 
                    // 首次锁检测
                    if (desc.indexOf("undefined") > 0) {
                        // 上次锁时间
                        long lastLockTime = NumberUtils.toLong(desc.split("[undefined]")[0]);
                        // 明确死锁,利用Setex复写,再次设定一个合理的解锁时间让系统正常解锁
                        if (System.currentTimeMillis() - lastLockTime > (expire + offsetTime) * 1000) {
                            // 原子操作,只需要一次,会发生小概率事件,多个服务同时发现死锁同时执行此行代码(并发),
                            // 为什么设置解锁时间为expire(而不是更小的时间),防止在解锁发送错乱造成新锁解锁
                            CacheUtils.Setex(CacheSpacePrefixEnum.TOOLBAR_SYS.name(), lock_key, value, expire);
                            logger.warn("发现死锁【" + expire + "秒后解锁】key:" + key + " desc:" + desc);
                        } else {
                            logger.info("当前锁key:" + key + " desc:" + desc);
                        }
                    } else {
                        logger.warn("死锁解锁中key:" + key + " desc:" + desc);
                    }
 
                }
                if (waitTime == 0) {
                    break;
                }
                Thread.sleep(500);
            }
            catch (Exception ex) {
                logger.error(Trace.GetTraceStackDetails("获取锁失败", ex));
            }
        }
        while ((System.currentTimeMillis() - start) < waitTime * 1000);
        logger.warn("获取分布式锁失败 key:" + key + " value:" + value);
        return Boolean.FALSE;
    }
 
    //解锁
    public static boolean UnLock(String key) {
        String lock_key = key + "_lock";
        try {
            CacheUtils.Del(CacheSpacePrefixEnum.TOOLBAR_SYS.name(), lock_key);
        }
        catch (Exception ex) {
            logger.error(Trace.GetTraceStackDetails("解锁锁失败key:" + key + " lock_key:" + lock_key, ex));
        }
        return Boolean.FALSE;
    }
 
 
    public Long Setnx(String key, String value, int expireTime) throws Exception {
        ShardedJedis jedis = null;
 
        try {
            jedis = pool.getResource();
 
            Long ret = jedis.setnx(key, value);
            if (ret == 1 && expireTime > 0) {
                jedis.expire(key, expireTime);
            }
            return ret;
        }
        catch (Exception e) {
            throw e;
        }
        finally {
            if (pool != null && jedis != null) {
                pool.returnResourceObject(jedis);
            }
        }
    }
 
    public String GSetnx(String key) throws Exception {
        ShardedJedis jedis = null;
        try {
            jedis = pool.getResource();
            return jedis.get(key);
        }
        catch (Exception e) {
            throw e;
        }
        finally {
            if (pool != null && jedis != null) {
                pool.returnResourceObject(jedis);
            }
        }
    }
}

存在的问题:

  1. 需要设置睡眠时间
    1. 为了减少对Redis的压力,获取锁尝试时,循环之间一定要做sleep操作。但是sleep时间是多少是门学问。需要根据自己的Redis的QPS,加上持锁处理时间等进行合理计算。
    2. 最好使用随机时间,可以防止饥饿进程的出现,
    3. 当同时到达多个进程,只会有一个进程获得锁,其他的进程都用同样的频率进行尝试申请锁,这将可能导致前面来的锁得不到满足.
    4. 使用随机的等待时间可以一定程度上保证公平性
  2. setnx和expire是两条Redis命令,不具有原子性,如果程序在执行完setnx()之后突然崩溃,导致锁没有设置过期时间。那么将会发生死锁。之所以这样实现,是因为低版本的jedis并不支持多参数的set()方法。
  3. 锁不具备拥有者标识,解锁时没有预先判断锁的拥有者而直接解锁,会导致任何客户端都可以随时进行解锁,不管这把锁是不是它的。

3.1.2、只用SETNX命令实现分布式锁

public class RedisLockSetNx {
 
    private static Logger logger = Logger.getLogger(RedisLockSetNx.class.getName());
    private RedisTemplate redisTemplate;
    private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;
    private String lockKey;
    //锁超时时间,防止线程在入锁以后,无限的执行等待
    private int expireMsecs = 60 * 1000;
    //锁等待时间,防止线程饥饿
    private int timeoutMsecs = 10 * 1000;
    private volatile boolean locked = false;
 
    //Detailed constructor with default acquire timeout 10000 msecs and lock expiration of 60000 msecs.
    public RedisLock(RedisTemplate redisTemplate, String lockKey) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey + "_lock";
    }
 
    //Detailed constructor with default lock expiration of 60000 msecs.
    public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs) {
        this(redisTemplate, lockKey);
        this.timeoutMsecs = timeoutMsecs;
    }
 
    //Detailed constructor.
    public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs, int expireMsecs) {
        this(redisTemplate, lockKey, timeoutMsecs);
        this.expireMsecs = expireMsecs;
    }
 
    //获取lockkey
    public String getLockKey() {
        return lockKey;
    }
 
    //获取键的值
    private String get(final String key) {
        Object obj = null;
        try {
            obj = redisTemplate.execute(new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    StringRedisSerializer serializer = new StringRedisSerializer();
                    byte[] data = connection.get(serializer.serialize(key));
                    connection.close();
                    if (data == null) {
                        return null;
                    }
                    return serializer.deserialize(data);
                }
            });
        } catch (Exception e) {
            logger.error("get redis error, key : {}", key);
        }
        return obj != null ? obj.toString() : null;
    }
 
    //设置键的值
    private boolean setNX(final String key, final String value) {
        Object obj = null;
        try {
            obj = redisTemplate.execute(new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    StringRedisSerializer serializer = new StringRedisSerializer();
                    Boolean success = connection.setNX(serializer.serialize(key), serializer.serialize(value));
                    connection.close();
                    return success;
                }
            });
        } catch (Exception e) {
            logger.error("setNX redis error, key : {}", key);
        }
        return obj != null ? (Boolean) obj : false;
    }
 
    //获取并设置键的值,并返回旧值
    private String getSet(final String key, final String value) {
        Object obj = null;
        try {
            obj = redisTemplate.execute(new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    StringRedisSerializer serializer = new StringRedisSerializer();
                    byte[] ret = connection.getSet(serializer.serialize(key), serializer.serialize(value));
                    connection.close();
                    return serializer.deserialize(ret);
                }
            });
        } catch (Exception e) {
            logger.error("setNX redis error, key : {}", key);
        }
        return obj != null ? (String) obj : null;
    }
 
    //实现分布式锁
    public synchronized boolean lock() throws InterruptedException {
        int timeout = timeoutMsecs;
        while (timeout >= 0) {
            long expires = System.currentTimeMillis() + expireMsecs + 1;
            String expiresStr = String.valueOf(expires); //锁到期时间
            if (this.setNX(lockKey, expiresStr)) {
                // 获取锁
                locked = true;
                return true;
            }
            //redis系统的时间
            String currentValueStr = this.get(lockKey);
            //判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
 
                //获取上一个锁到期时间,并设置现在的锁到期时间,
                //只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的
                String oldValueStr = this.getSet(lockKey, expiresStr);
 
                //防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //分布式的情况下:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
                    // 获取锁
                    locked = true;
                    return true;
                }
            }
            timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;
            //延迟100 毫秒,
            Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);
        }
        return false;
    }
 
    //释放锁
    public static void wrongReleaseLock(Jedis jedis, String lockKey, String requestId) {
        // 判断加锁与解锁是不是同一个客户端
        if (requestId.equals(jedis.get(lockKey))) {
            // 若在此时,这把锁突然不是这个客户端的,则会误解锁
            jedis.del(lockKey);
        }
    }
}

存在的问题

  1. 需要保证时间同步 由于是客户端自己生成过期时间,所以需要强制要求分布式下每个客户端的时间必须同步。
  2. 过期时间被覆盖 当锁过期的时候,如果多个客户端同时执行jedis.getSet()方法,那么虽然最终只有一个客户端可以加锁,但是这个客户端的锁的过期时间可能被其他客户端覆盖。
  3. 解锁时误删锁 若调用jedis.del()方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。例如客户端A加锁,一段时间之后客户端A解锁,在执行jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除了。
    image.png

3.1.3、使用多参数的set方法实现单机的分布式锁

public class RedisTool {
 
    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";
    private static final Long RELEASE_SUCCESS = 1L;
 
    /**
     * 尝试获取分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @param expireTime 超期时间
     * @return 是否获取成功
     */
    public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
        return LOCK_SUCCESS.equals(result);
    }
 
    /**
     * 释放分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @return 是否释放成功
     */
    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
        return RELEASE_SUCCESS.equals(result);
    }
 
}

分析

  1. 加锁set()方法的五个形参解释
    1. key:使用key来当锁,因为key是唯一的。
    2. value:传入requestId,因为分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,就能知道这把锁是哪个请求加的,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。
    3. nxxx:传入NX,即SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
    4. expx:传入PX,给key加一个过期的设置,具体时间由参数time决定。
    5. time:与expx参数对应,代表key的过期时间。
  2. 解锁使用lua脚本
    1. 使用lua脚本获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。
    2. 解锁lua脚本以确保该操作是原子性的,在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令。

存在的问题

  • 单机部署的情况,宕机后不可用。使用Redlock解决。
  • 客户端持有锁超时,使用Redssion 解决。
    image.png

3.1.4、Redis本身的分布式锁的实现

Redlock

  • 在 Redis 的分布式环境中,我们假设有 N 个 Redis Master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。
  • 假设有 5 个完全独立的 Redis Master 节点,他们分别运行在 5 台服务器中,可以保证他们不会同时宕机。客户端获取锁的5个步骤:
    1. 获取当前 Unix 时间,以毫秒为单位。
    2. 依次尝试从 N 个实例,使用相同的 key 和随机值获取锁。
    3. 客户端使用当前时间减去开始获取锁时间(步骤 1 记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是 3 个节点)的 Redis 节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
    4. 如果取到了锁,key 的真正有效时间等于有效时间减去获取锁所使用的时间(步骤 3 计算的结果)。
    5. 如果因为某些原因,获取锁失败(没有在至少 N/2+1 个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁(即便某些 Redis 实例根本就没有加锁成功)。
  • 只要大多数的节点可以正常工作,就可以保证 Redlock 的正常工作。就解决了单点 Redis 宕机,由于集群异步通信,导致锁失效的问题。
  • Redlock存在的问题:
    • 对时钟依赖性太强, 若N个节点中的某个节点发生时间跳跃,也可能会引此而引发锁安全性问题。
    • 某个节点故障重启后可能一把锁被多个客户端持有。延迟重启解决方法会导致系统在TTL时间内任何锁都将无法加锁成功。所以不能用

Redisson

  • 使用 Redssion 做分布式锁,不需要明确指定 value ,框架会生成一个由 UUID 和 加锁操作的线程的 threadId 用冒号拼接起来的字符串。

  • 锁名称用的是hash类型,而不是string。因为加锁使用的hincrby 命令,支持可重入锁

  • 源码(版本3.15.0)剖析:

    • 加锁的过期时间默认是 30s,也可以通过修改Config.lockWatchdogTimeout来另行指定。当一个 key 加锁成功或者当一个锁重入成功后都会返回空,只有加锁失败的情况下会返回当前锁剩余的时间。
        private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
            if (leaseTime != -1) {
                return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
            }
            RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                                                    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                if (e != null) {
                    return;
                }
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            });
            return ttlRemainingFuture;
        }
        // 加锁成功进入该方法,即将进入看门狗逻辑
        protected void scheduleExpirationRenewal(long threadId) {
            ExpirationEntry entry = new ExpirationEntry();
            ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
            if (oldEntry != null) {
                oldEntry.addThreadId(threadId);
            } else {
                entry.addThreadId(threadId);
                renewExpiration();
            }
        }
    
    • 当加锁成功或者重入成功后进入看门狗逻辑:定时任务每 internalLockLeaseTime/3ms 后执行一次。而 internalLockLeaseTime 默认为 30000。所以该任务每 10s 执行一次。定时任务(Timeout)基于 netty 的时间轮HashedWheelTimer

          private void renewExpiration() {
              ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
              if (ee == null) {
                  return;
              }
              Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                  @Override
                  public void run(Timeout timeout) throws Exception {
                      ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                      if (ent == null) {
                          return;
                      }
                      Long threadId = ent.getFirstThreadId();
                      if (threadId == null) {
                          return;
                      }
                      RFuture<Boolean> future = renewExpirationAsync(threadId);
                      future.onComplete((res, e) -> {
                          if (e != null) {
                              log.error("Can't update lock " + getName() + " expiration", e);
                              EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                              return;
                          }
                          if (res) {
                              // reschedule itself
                              renewExpiration();
                          }
                      });
                  }
              }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
              ee.setTimeout(task);
          }
      
    • 释放锁: 有个counter 的判断,如果减一后小于等于 0。就执行 del key 的操作,之后publish发布事件(tryAcquire 加锁失败订阅的事件)

    • 解锁完成后通过cancelExpirationRenewal方法取消看门狗定时任务

          public RFuture<Void> unlockAsync(long threadId) {
              RPromise<Void> result = new RedissonPromise<>();
              RFuture<Boolean> future = unlockInnerAsync(threadId);
       
              future.onComplete((opStatus, e) -> {
                  // 取消看门狗定时任务(响应式编程)
                  cancelExpirationRenewal(threadId);
       
                  if (e != null) {
                      result.tryFailure(e);
                      return;
                  }
       
                  if (opStatus == null) {
                      IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                              + id + " thread-id: " + threadId);
                      result.tryFailure(cause);
                      return;
                  }
       
                  result.trySuccess(null);
              });
       
              return result;
          }
      

image.png

  • 通过方法可以设置过期时间,就不用启动看门狗了

  • Redisson加锁解锁流程图
    image.png

3.2、MySQL数据库锁

基于数据库表
最简单的就是新建一张表,然后通过操作该表中的数据来实现。当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录

基于数据库排他锁
除了可以通过增删操作数据表中的记录以外,其实还可以借助数据中自带的锁来实现分布式的锁。

基于MySql的InnoDB引擎,可以使用以下方法来实现加锁操作:

    public boolean lock(){
        connection.setAutoCommit(false)
        while(true){
            try{
                result = select * from methodLock where method_name=xxx for update;
                if(result==null){
                    return true;
                }
            }catch(Exception e){
    
            }
            sleep(1000);
        }
        return false;
    }

在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁(这里再多提一句,InnoDB引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁。这里我们希望使用行级锁,就要给method_name添加索引,值得注意的是,这个索引一定要创建成唯一索引,否则会出现多个重载方法之间无法同时被访问的问题。重载方法的话建议把参数类型也加上。)。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。

通过connection.commit()操作来释放锁

这里还可能存在另外一个问题,虽然我们对method_name 使用了唯一索引,并且显示使用for update来使用行级锁。但是,MySql会对查询进行优化,即便在条件中使用了索引字段,但是否使用索引来检索数据是由 MySQL 通过判断不同执行计划的代价来决定的,如果 MySQL 认为全表扫效率更高,比如对一些很小的表,它就不会使用索引,这种情况下 InnoDB 将使用表锁,而不是行锁。如果发生这种情况就悲剧了

还有一个问题,就是我们要使用排他锁来进行分布式锁的lock,那么一个排他锁长时间不提交,就会占用数据库连接。一旦类似的连接变得多了,就可能把数据库连接池撑爆

基于数据库乐观锁
和排它锁类似,使用version进行判断就可以了。具体参考mysql数据库的锁这篇文章

3.3、基于ZooKeeper的分布式锁

zookeeper主要利用节点无法重复创建 以及节点的监听通知机制来实现分布式锁的,当一个线程在zookeeper中创建了一个/test节点后,其他线程再创建这个/test节点会提示创建失败!因为zookeeper内部执行命令也像redis一样是单线程,多线程下的操作节点请求会排队执行,当发现节点已存在,则提示节点已存在!

当前线程加完锁,逻辑代码执行完毕后,还需要删除节点释放锁,供其他线程争抢。其他线程利用zookeeper的节点监听特性,一旦节点被修改(删除),就会受到来自服务端的消息,表示自己可以抢锁了,如果没有抢到继续 监听/等待 此节点。

zookeeper根据这两个特性可以做分布式锁,分布式锁又分为公平锁、非公平锁、读写锁等等。zookeeper对其都做了实现!

zookeeper分布式锁和redis分布式锁的区别?

redis分布式锁中向master节点setnx存储数据成功就算成功,当数据同步时,master节点突然挂掉,集群剩余节点会选举出新的master节点,当新的master节点执行setnx命令时,旧的已挂掉master节点中还存在未同步的setnx数据,这就会使分布式锁失效!出现数据一致性问题。
而zookeeper分布式锁采用leader-follwer模式,在创建/lock节点作为分布式锁时,只有半数以上的节点执行成功才算成功,就算leader节点挂掉,其他zookeeper集群节点中已经存在/lock节点,后续创建/lock行为不会执行成功,保证了强一致性。但也正是因为这点,相比于redis分布式锁,性能较低!

zookeeper实现非公平锁

可以使用 持久化节点 和 临时节点 来实现。推荐使用临时节点,因为持久化节点还需要手动删除,人工维护大量无用节点。临时节点session关闭就会过期,zookeeper内部线程自动删除!

非公平锁加锁原理:
image.png
如上非公平锁的实现方式在高并发的场景下,性能会下降的比较厉害。主要原因是:所有的连接都在对同一个节点进行监听,当服务器检测到删除事件时,要通知所有的连接,所有的连接同时收到事件,再次并发竞争,竞争失败再次等待。这就是惊群效应。惊群效应需要大量通知其他连接,而且不止一次,会造成一定的资源浪费!为了避免惊群效应,可以采用公平锁的方式实现分布式锁!

zookeeper实现公平锁

公平锁的实现需要保证节点的顺序性,可以使用zookeeper的 持久化顺序节点 或 临时顺序节点,推荐使用临时顺序节点,原因同上。

公平锁实现原理图:
image.png
/lock节点可以用 Container节点类型,该节点类型规定,如果Container节点下面没有子节点,则Container节点在未来会被Zookeeper自动清除,定时任务默认60s 检查一次。使用 临时顺序子节点代表锁,在session关闭时,也会自动清理,进而触发/lock节点的自动删除,减少人工维护成本

如上借助于容器Container节点 和 临时顺序节点,可以避免同时多个节点的并发竞争锁,缓解了服务端压力。这种实现方式所有加锁请求都进行排队加锁,是公平锁的具体实现。

问题一:当前线程怎么判断自己能不能获取锁?

答:在公平锁的实现中,只有顺序最小的节点才能获取分布式锁,每当有一个线程进来就会对/lock节点下所有的子节点进行排序,并比较自己的序号是不是列表中最小的,如果是就获取锁,如果不是,就对序号比自己小的上一个节点进行监听。

问题二:排队中的某个子节点如果挂掉,整个监听序列是否会出问题?

答:公平锁中,临时顺序节点中的每一个节点都监听着前一个序号比它小的节点。以 A - B -C 为例 ,A为最小的节点。如果B节点挂掉(或被删除),此时A节点仍在处理业务逻辑,并未释放,那么整个序列将变为不连续的。但是B挂掉后(或被删除),会对C发送通知机制,C接收到B已挂掉的通知,要去竞争锁啊,所以C会获取/lock节点下所有的子节点,排序并比较自己是不是最小的节点,结果发现并不是,因为A节点还在,然后B会对序号比它小上一个节点A进行监听,自动跳过了意外挂掉的B节点,这样就完成了监听关系的自动维护!

问题三:顺序子节点已创建,但服务端响应失败,造成节点多次创建怎么办?

答:客户端发送节点创建命令,服务器接收到并创建成功,但是给客户端响应的时候,服务器闪断,又在session超时时间内连上了。此时节点已创建成功,但客户端并没有收到节点创建成功的消息,它认为节点创建失败,由于客户端重试机制存在,会重新发起创建,这样就导致节点多次创建!!导致部分顺序节点一直存在服务器中,不会被释放,被称为僵尸节点。可以通过Curator客户端的protection 模式来解决僵尸节点问题。protection 模式会通过一个uuid来检测服务器节点,重试时如果uuid已存在,则表示已创建,就不会再次创建节点!

zookeeper实现读写锁(共享锁)

zookeeper的公平锁和非公平锁的实现有一个共同的特质,就是都是互斥锁,同一时间只能有一个请求占用。如果是大量的并发上来,无论读写,所有的请求都得加锁,性能是会急剧下降的。那是不是真的所有的请求都需要加锁呢?答案是否定的!如果数据没有任何写操作只有读操作的话,是不需要加锁的,

如果读数据的请求还没读完,这个时候来了一个写请求,怎么办呢?有人已经在读数据了,这个时候是不能写数据的,不然数据就不正确了。直到前面读锁全部释放掉以后,写请求才能执行,所以需要给所有的读请求加一个读标识(读锁),让写请求知道,这个时候是不能修改数据的。不然数据就不一致了。

如果已经有人在写数据了,再来一个请求写数据,也是不允许的,这样也会导致数据的不一致,所以所有的写请求都需要加一个写标识(写锁),是为了避免同时对共享数据进行写操作。

zookeeper对读写锁的实现原理如下:
image.png
①:read请求,如果前面的请求都是read,则直接获取锁(读读共享),如果前面的请求有write请求,则该read请求不能直接获取锁(读写互斥),需要对前面的write请求节点进行监听。如果前面有多个write请求,则对离当前最近的write请求进行监听

②:write请求,无论前面请求是read还是write请求,都会对其监听,和公平锁和非公平锁性质一致,对其他行为互斥!

Curator客户端也实现了zookeeper读写锁,底层与实现公平锁不同的是:读写锁增加了读、写标识,在获取锁时,根据读写标识区分不同的读写逻辑

  • 写请求:与Curator客户端实现的公平锁逻辑一致
  • 读请求:遍历容器节点中所有子节点后,如果当前读节点前面都是读请求,则直接获取锁,执行业务逻辑;如果当前读节点前面有写请求,则记录所有写请求节点位置后,找到自己前边最近的节点,进行监听。等待这个写请求释放,自己再获取锁

4、Redis分布式锁存在的问题

4.1 主备切换

  • 为了保证 Redis 的可用性,一般采用主从方式部署。主从数据同步有异步和同步两种方式,Redis 将指令记录在本地内存 buffer 中,然后异步将 buffer 中的指令同步到从节点,从节点一边执行同步的指令流来达到和主节点一致的状态,一边向主节点反馈同步情况。
  • 在包含主从模式的集群部署方式中,当主节点挂掉时,从节点会取而代之,但客户端无明显感知。当客户端 A 成功加锁,但是数据还没被同步到 Salve,此时主节点挂掉,从节点提升为主节点,新的主节点没有锁的数据,当客户端 B 加锁时就会成功,出现一把锁被拿到了两次的场景。
    image.png

我认为这两个问题都属于分布式系统中的一致性问题,Redis集群是一个AP系统,所以解决这两个问题的办法是使用Redis提供的redlock,或者使用CP系统的Zookeeper分布式锁解决

4.2 集群脑裂

  • 集群脑裂指因为网络问题,导致 Redis master 节点跟 slave 节点和 sentinel 集群处于不同的网络分区,因为 sentinel 集群无法感知到 master 的存在,所以将 slave 节点提升为 master 节点,此时存在两个不同的 master 节点。Redis Cluster 集群部署方式同理。
  • 当不同的客户端连接不同的 master 节点时,两个客户端可以同时拥有同一把锁。如下:
    image.png

参考:
Redis分布式锁的实现
分布式系统互斥性与幂等性问题的分析与解决
分布式锁的实现之 redis 篇
面试时遇到『看门狗』脖子上挂着『时间轮』
Redis锁从面试连环炮聊到神仙打架
分布式锁的几种实现方式
zookeeper专题:使用zookeeper实现分布式锁