前言
随着互联网技术的不断发展,用户量的不断增加,越来越多的业务场景需要用到分布式系统。而在分布式系统中访问共享资源就需要一种互斥机制,来防止彼此之间的互相干扰,以保证一致性,这个时候就需要使用分布式锁。
业界常用解决方案
- 基于 MySql 等数据库的唯一索引
- 基于 ZooKeeper 临时有序节点
- 基于 Redis 的
NX EX
参数
本文主要讲解基于 Redis 实现的分布式锁
分布式锁的特点
- 互斥性。在任意时刻,只有一个客户端能持有锁
- 锁超时。即使一个客户端持有锁的期间崩溃而没有主动释放锁,也需要保证后续其他客户端能够加锁成功
- 加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给释放了。
实现
版本一
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
|
import uuid import math import time
from redis import WatchError
def acquire_lock_with_timeout(conn, lock_name, acquire_timeout=3, lock_timeout=2): """ 基于 Redis 实现的分布式锁 :param conn: Redis 连接 :param lock_name: 锁的名称 :param acquire_timeout: 获取锁的超时时间,默认 3 秒 :param lock_timeout: 锁的超时时间,默认 2 秒 :return: """
identifier = str(uuid.uuid4()) lockname = f'lock:{lock_name}' lock_timeout = int(math.ceil(lock_timeout))
end = time.time() + acquire_timeout
while time.time() < end: if conn.setnx(lockname, identifier): conn.expire(lockname, lock_timeout) return identifier elif conn.ttl(lockname) == -1: conn.expire(lockname, lock_timeout)
time.sleep(0.001)
return False
def release_lock(conn, lockname, identifier): """ 释放锁 :param conn: Redis 连接 :param lockname: 锁的名称 :param identifier: 锁的标识 :return: """ with conn.pipeline() as pipe: lockname = 'lock:' + lockname
while True: try: pipe.watch(lockname) iden = pipe.get(lockname) if iden and iden.decode('utf-8') == identifier: pipe.multi() pipe.delete(lockname) pipe.execute() return True
pipe.unwatch() break except WatchError: pass return False
|
加锁过程
- 首先需要为锁生成一个唯一的标识,这里使用 uuid;
- 然后使用
setnx
设置锁,如果该锁名之前不存在其他客户端的锁则加锁成功,接着设置锁的过期时间防止发生死锁并返回锁的唯一标示;
- 如果设置失败先判断一下锁名所在的锁是否有过期时间,因为
setnx
和 expire
两个命令执行不是原子性的,可能会出现加锁成功但是设置超时时间失败出现死锁。如果不存在就给锁重新设置过期时间,存在就不断循环知道加锁时间超时加锁失败。
解锁过程
- 首先整个解锁操作需要在一个 Redis 的事务中进行;
- 使用
watch
监听锁,防止解锁时出现删除其他人的锁;
- 查询锁名所在的标识是否与本次解锁的标识相同;
- 如果相同则在事务中删除这个锁,如果删除过程中锁自动失效过期又被其他客户端拿到,因为设置了
watch
就会删除失败,这样就不会出现删除了其他客户端锁的情况。
版本二
如果你使用的 Redis 版本大于等于 2.6.12
版本,加锁的过程就可以进行简化。因为这个版本以后的 Redis set
操作支持 EX
和 NX
参数,是一个原子性的操作。
- EX seconds : 将键的过期时间设置为 seconds 秒。 执行 SET key value EX seconds 的效果等同于执行 SETEX key seconds value 。
- NX : 只在键不存在时, 才对键进行设置操作。 执行 SET key value NX 的效果等同于执行 SETNX key value 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
|
import uuid import math import time
from redis import WatchError
def acquire_lock_with_timeout(conn, lock_name, acquire_timeout=3, lock_timeout=2): """ 基于 Redis 实现的分布式锁 :param conn: Redis 连接 :param lock_name: 锁的名称 :param acquire_timeout: 获取锁的超时时间,默认 3 秒 :param lock_timeout: 锁的超时时间,默认 2 秒 :return: """
identifier = str(uuid.uuid4()) lockname = f'lock:{lock_name}' lock_timeout = int(math.ceil(lock_timeout))
end = time.time() + acquire_timeout
while time.time() < end: if conn.set(lockname, identifier, ex=lock_timeout, nx=True): return identifier
time.sleep(0.001)
return False
def release_lock(conn, lockname, identifier): """ 释放锁 :param conn: Redis 连接 :param lockname: 锁的名称 :param identifier: 锁的标识 :return: """ with conn.pipeline() as pipe: lockname = 'lock:' + lockname
while True: try: pipe.watch(lockname) iden = pipe.get(lockname) if iden and iden.decode('utf-8') == identifier: pipe.multi() pipe.delete(lockname) pipe.execute() return True
pipe.unwatch() break except WatchError: pass return False
|
版本三
可能你也发现了解锁过程在代码逻辑上稍微有点复杂,别着急,我们可以使用 Lua
脚本实现原子性操作从而简化解锁过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
|
import uuid import math import time
def acquire_lock_with_timeout(conn, lock_name, acquire_timeout=3, lock_timeout=2): """ 基于 Redis 实现的分布式锁 :param conn: Redis 连接 :param lock_name: 锁的名称 :param acquire_timeout: 获取锁的超时时间,默认 3 秒 :param lock_timeout: 锁的超时时间,默认 2 秒 :return: """
identifier = str(uuid.uuid4()) lockname = f'lock:{lock_name}' lock_timeout = int(math.ceil(lock_timeout))
end = time.time() + acquire_timeout
while time.time() < end: if conn.set(lockname, identifier, ex=lock_timeout, nx=True): return identifier
time.sleep(0.001)
return False
def release_lock(conn, lock_name, identifier): """ 释放锁 :param conn: Redis 连接 :param lockname: 锁的名称 :param identifier: 锁的标识 :return: """ unlock_script = """ if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end """ lockname = f'lock:{lock_name}' unlock = conn.register_script(unlock_script) result = unlock(keys=[lockname], args=[identifier]) if result: return True else: return False
|
后续
截至到目前,我们已经有较好的方法获取锁和释放锁。基于Redis单实例,假设这个单实例总是可用,这种方法已经足够安全。但是如果 Redis 主节点挂了就会出现一些问题,比如主节点加锁后没有同步到从节点,从节点升为主节点,就会出现锁的丢失。如果你想要使用更加安全的 Redis 分布式锁实现可以参考一下 Redlock 的实现。
参考
- 《Redis 实战》中分布式锁的实现
- SETNX with TTL · Issue #387 · andymccurdy/redis-py · GitHub
- 万字长文!不为人知的分布式锁实现,全都在这里了!