一、常见问题

例如,我们在做任务分发的时候,一般都是由master节点进行分发,而且是这个master节点上的某一个线程进行具体的分发工作。如果是在一个分布式的集群里面,我们要如何做到这一点呢?也就是我们要在这个集群里做到每次任务分发只有某台机器上的某个线程去做,不能出现并发的情况。

这里就需要应用到分布式锁的技术。

实现分布式锁的方式很多,一般主要有以下几种:

  • MySql
  • ZooKeeper
  • Redis
  • 自研分布式锁,如谷歌的Chubby

这篇文章我们主要介绍通过Redis的方式来实现分布式锁,其他几种实现方式和原理,可以参考这篇文章^ 1

二、分布式锁的特点

为了实现分布式锁,我们要做到以下三点[^ 2 ] :

  • 互斥性:在任意时刻,只有一个客户端能够持有锁(需要保证在集群中不同节点的不同线程之间互斥)。
  • 不会死锁:即使一个客户端在持有锁的时候发生crash,最终其他客户端也可以获得锁。
  • 容错性:只要Redis集群中大部分的节点正常运行,客户端就可以进行加锁和解锁操作。

三、分布式锁的原理和实现

下面的内容主要是基于Redis的方式来实现分布式锁,我会通过一个个的问题来循序渐进地引导。

一般加锁和解锁的流程如下:

Step1::获取锁

Step2:执行其他操作

Step3:释放锁

1、setnx^ 3

在第二节内容中,我们提到分布式锁的其中一个特点就是要保证互斥性。

实际在Redis中,已经提供了一个set`命令,通过参数组合的`setnx这个命令是原子性的,可以保证互斥性(即只有一个客户端可以持有锁)。

setnx命令的意思是:如果key不存在则创建。

命令:

set key value NX 

2、如何处理死锁

在上面的内容,我们已经通过setnx命令的互斥特性获得锁,但是这里存在一个问题:

如果在执行Step2的过程中,客户端出现crash,这时候会导致锁没有释放,从而出现死锁的问题,这个也是我们在第二部分内容中介绍分布式锁的特点---不会死锁,需要解决的其中一个问题。

为了保证不会发生死锁,在使用setnx命令的时候给锁加一个过期时间即可。这样即使客户端在获取锁的时候发生crash,锁最终也会因为过期时间而释放。

命令:

set key value NX [EX seconds|PX milliseconds]

3、如何处理超时时,锁被其他客户端释放的问题

如下图所示,我们再考虑一种情况。如果一个客户端在获取锁之后设置了超时时间,然后开始执行其他操作,但是这个操作很耗时从而超过了设定的超时时间。这时候锁被释放了,然后另一个客户端获得了锁并进行其他操作。这是,第一个客户端执行完以后,它还以为自己持有锁,就会进行解锁操作,实际上这时候它释放的是第二个客户端持有的锁,最终导致了问题发生。

img

这里,我们分析可以发现有2个问题需要解决:

  • 超时时间的设置
  • 如何保证加锁和解锁是同一个客户端

(1)超时时间的设置

对于这个问题,一般有两种方式解决:

  • 将过期时间设置的足够长,确保代码逻辑在锁释放之前能够执行完成,具体多长看自己的业务来定
  • 为获取锁的线程增加一个守护线程,为将要过期但是未释放的锁增加有效时间

(2)如何保证加锁和解锁是同一个客户端

还是setnx命令,我们每次加锁的时候给这个锁分配一个唯一的id,然后解锁的时候就释放这个id即可,这样就可以保证加锁和解锁都是同一个客户端。实现流程如下(伪代码):

# 加锁
uuid = UUID.RandomUUID().ToString()
set key uuid NX EX 60

# 业务逻辑

# 解锁
if redis.get(key) == uuid
    return del key
else
    return 0

在释放锁之前,我们查看锁的id是否是加锁时的id,如果不是,说明已经超时释放了,就不再进行解锁操作;否则就解锁。这样就实现了实现了加锁和解锁都是同一个客户端。

那么到这里是不是就完事了呢?还没!!请继续看下面的内容。

4、解锁操作(读+删除)如何保证原子性

在上一节里面,当进行解锁操作时,它是分两步执行的:

Step1:获取锁的id并判断是否是当前客户端设置的id

Step2:删除锁

这里面就有一个问题,判断id和删除锁的操作不是原子性的。如果客户端在执行Step1的时候,在删除锁之前发生了超时,后面删除锁释放的就是其他客户端获取的锁,这样也是没法保证加锁和解锁是同一个客户端的特性。

解决思路是保证Step1和Step2的操作在一个事务中进行,保证其原子性。

这里就可以用到lua脚本,我们把Step1和Step2的Get、判断、Del操作放在一个lua脚本中执行,如下:

    $script = '
if redis.call("get",KEYS[1]) == ARGV[1]
then
    return redis.call("del",KEYS[1])
else
    return 0
end
    ';
    $token = uniqid(mt_rand(), true);
    if ($redis->set('my:lock', $token, ['NX', 'EX' => 10])) {
        # todo

        $redis->eval($script, ["my:lock", $token], 1);
    } else {
        echo 'get lock failed!';
    }

到这里,我们就把在Redis单机情况下的分布式锁给实现了。但是这里由于Redis是单机模式,如果机器挂了,就直接无法提供服务了。所以很多时候我们会搭建一个主从的架构,来提高Redis集群的容灾能力。Redis会把操作的指令记录在本地内存buffer中,然后将buffer中的指令同步到从节点。

5、看门狗模式^10

在上一步中,我们会发现还有一个问题,那就是 todo`里面的业务逻辑还没有执行完,但是锁已经过期释放了。简单粗暴的方法,可以把过期时间设置的长一点。当然我们还有另一个思路,就是可以给获得锁的线程,另外开一个定时守护线程,每隔10s中就去判断锁有没有释放,如果没有释放就对锁的过期时间延长(使用`set key uniq_id ex timeout,没有使用NX)。

当前开源的Redisson解决了这个问题,它的底层原理如下图所示:

图片

到这里,我们给出完整的go实现代码:

package main

import (
    "github.com/gomodule/redigo/redis"
    "log"
    "sync"
    "time"
)

//====================================
// redis分布式锁+看门狗模式
//====================================
type RedisDistributedLock struct {
    key     string
    token   string
    conn    redis.Conn
    timeout int // 秒
}

func NewRedisDistributedLock(key, token string, conn redis.Conn, timeout int) *RedisDistributedLock {
    return &RedisDistributedLock{
        key:     key,
        token:   token,
        conn:    conn,
        timeout: timeout,
    }
}

func (lock *RedisDistributedLock) watchDog(interval time.Duration) {
    defer func() {
        log.Println("end watchDog:", lock.key, lock.token)
    }()
    log.Println("start watchDog:", lock.key, lock.token)

    for {
        // 对于未释放的锁,延长它的过期时间
        time.Sleep(interval)
        token, err := redis.String(lock.conn.Do("GET", lock.key))
        if err != nil {
            log.Println("GET", lock.key, "error:", err.Error())
            return
        }

        // token已经释放(被其他线程获取锁并设置其他唯一token值)
        if token != lock.token {
            log.Println(lock.key, lock.token, "release lock")
            return
        }

        // 延长过期时间
        _, err = redis.String(lock.conn.Do("SET", lock.key, lock.token, "EX", lock.timeout))
        if err != nil {
            log.Println("SET", lock.key, lock.token, "EX", lock.timeout, "error:", err.Error())
            return
        } else {
            log.Println("extend=>", "SET", lock.key, lock.token, "EX", lock.timeout)
        }
    }
}

func (lock *RedisDistributedLock) tryLock() (ok bool, err error) {
    var result string
    // set key value ex 10 nx,操作成功会返回"OK"字符串,失败返回nil
    result, err = redis.String(lock.conn.Do("SET", lock.key, lock.token, "EX", lock.timeout, "NX"))
    if result == "OK" {
        go lock.watchDog(10 * time.Second)
        return true, nil
    } else {
        return false, err
    }
}

func (lock *RedisDistributedLock) Unlock() (err error) {
    //====================================
    //1、保证加锁和解锁是同一个线程(防止去解别的线程的锁)
    //2、保证获取锁和删除锁是原子的
    //====================================
    code := `if redis.call("get",KEYS[1]) == ARGV[1]
then
    return redis.call("del",KEYS[1])
else
    return 0
end`
    lua := redis.NewScript(1, code)

    cnt, err := redis.Int(lua.Do(lock.conn, lock.key, lock.token))
    if err != nil || cnt == 0 {
        log.Printf("Unlock key:%s, token:%s error:%v, cnt:%d\n", lock.key, lock.token, err, cnt)
    } else {
        log.Printf("Unlock key:%s, token:%s success, cnt:%d\n", lock.key, lock.token, cnt)
    }

    return
}

func main() {
    redisAddress := "127.0.0.1:6379"
    RedisPasswd := "test:test"
    RdsPool := &redis.Pool{
        MaxIdle:     300,
        MaxActive:   0, //设置MaxActive,设MaxActive=0(表示无限大)或者足够大
        IdleTimeout: 10 * time.Second,
        Wait:        true, //设置Wait=true,当程序执行get(),无法获得可用连接时,将会暂时阻塞
        Dial: func() (conn redis.Conn, e error) {
            conn, err := redis.Dial("tcp", redisAddress)
            if err != nil {
                return nil, err
            }

            _, err = conn.Do("AUTH", RedisPasswd)
            if err != nil {
                conn.Close()
                return nil, err
            }
            return conn, err
        },
    }

    var wg sync.WaitGroup
    wg.Add(10)
    for i := 0; i < 10; i++ {
        go func(wg *sync.WaitGroup, idx int) {
            log.Printf("[%d]start...\n", idx)
            defer wg.Done()

            // redis连接
            conn := RdsPool.Get()
            defer conn.Close()

            // 创建分布式锁
            v := NewRedisDistributedLock("master", "hdhsksk", conn, 10)
            if ok, err := v.tryLock(); ok != true || err != nil {
                log.Printf("[%d] get lock failed, err:%v\n", idx, err)
                return
            }
            log.Printf("[%d] get lock success\n", idx)
            time.Sleep(20 * time.Second)
            v.Unlock()
        }(&wg, i)
    }

    wg.Wait()
}

但是这种主从模式,为分布式锁也带来了另一个问题。

6、如何解决Redis集群下failover的情况

如果在Redis主从架构中,主节点挂掉,系统会切换到从节点(failover)。但是偶遇Redis的主从复制是异步的,这可能导致在failover过程中丧失锁的互斥性,下面我们考虑一种情况:

  • 客户端A成功获取锁,
  • 主节点挂掉,但是指令(存储锁的key)还没有同步到从节点。
  • 从节点成为了新的主节点,客户端B从新从新的主节点获取锁。

这个时候客户端A和B同时持有了同一个资源的锁,锁的互斥性被打破。

像这种Redis集群的情况,要实现分布式锁,要考虑的问题就比前面单节点Redis复杂多了。

但是不用担心,Redis的创始人已经提供了一个官方的解决方案:RedLock算法^ 4 ,见下面内容。

7、RedLock算法原理

它基于N个完全独立的Redis节点,原理如下^ 8

加锁步骤:

  • 获取当前时间(毫秒数)

  • 按顺序依次向N个Redis节点执行加锁操作。这个加锁操作跟前面单节点Redis的加锁操作一样,包含超时时间和唯一的字符串id。

    这里为了保证在某个Redis节点不可用的时候算法能够正常运行,这个加锁操作还有一个超时时间,它要远小于锁的超时时间(几十毫秒)。客户端在向某个Redis节点获取锁失败后,就立即开始尝试下一个Redis节点。这里的失败包括Redis节点不可用,也包括该Redis节点上的锁已经被其他客户端持有等等情况。

  • 计算整个加锁过程总共消耗的时间,计算方法是用当前时间减去第1步记录的时间。如果客户端从大多数Redis节点(>= N/2+1)成功获取到锁,并且获取锁总共消耗的时间没有超过锁的超时时间,那么这时才认为客户端最终加锁成功;否则,认为最终加锁失败。

  • 如果最终加锁成功,那么这个锁的超时时间应该重新计算,它等于最初的锁的超时时间减去上一步计算出来的加锁消耗的时间。

  • 如果最终加锁失败(可能由于获取到锁的Redis节点个数小于N/2+1,也可能是整个获取锁的过程消耗的时间超过了锁最初的超时时间),那么客户端应该立即想所有Redis节点发起解锁操作(见下面)。

解锁步骤:

  • 解锁过程很简单,想所有Redis发起解锁操作,不用管这些节点当时在加锁的时候是成功还是失败,解锁过程跟前面介绍的单节点Redis解锁过程一样。

这一块的算法,官方已经提供了各种语言的实现,不需要我们重复去造轮子:

当然RedLock的算法在网上也有很多争论,想要了解具体内容的可以自己上网查[^ 5 ],这里就不做展开。

四、参考

[^ 2 ]: Safety and Liveness guarantees

[^ 5 ]: how to do distributed locking

打赏

发表评论

您的电子邮箱地址不会被公开。