分布式系统中,操作共享资源的时候,需要用到分布式锁,来保证数据的一致性。常用的分布式锁有三种实现方式:

  1. 基于数据库实现;
  2. 基于Redis的实现;
  3. 基于Zookeeper实现。

基于数据库实现分布式锁

共享资源大部分情况下是数据库中的数据,比如商品的增减,账户金额的增减。因此,可以在sql层面上,利用数据库的锁来实现分布式锁。

悲观锁

悲观锁,认为任何一条操作都可能存在冲突,例如MySQL提供了sql语句 + for update手动加上排它锁,避免了多个服务对数据库的更新。
sql语句中的条件加上索引,否则会锁整个表。

乐观锁

类似Java并发中的CAS操作,在数据库表中添加也给额外的version字段,每次更新操作,version字段加一,更新的时候会比对数据库中version与查出的version是否一致,不一致则不能更新。

基于Redis实现分布式锁

简单的API方式

Jedis中提供了一个setnx的API,当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。
expire设置key的过期时间,del删除指定的key。
所以操作资源前获取锁,用完之后释放锁,整个流程如下:

其中有许多需要注意的点:比如几个对于redis的操作要做到原子性,过期时间的设置也需要根据实际业务的操作时间进行设置(不太好把握),异常情况下key的删除。
优点:使用简单。
缺点:容易出现并发性能瓶颈,死锁问题,比如设置完key后,服务死机,则容易照成key无法释放。

Redission实现

Redission是Redis官方推荐的客户端,提供了一个RLock的锁,RLock继承自juc的Lock接口,提供了中断,超时,尝试获取锁等操作,支持可重入,互斥等特性。

原理:
RLock底层使用Redis的Hash作为存储结构,其中Hash的key用于存储锁的名字,Hash的filed用于存储客户端id,filed对应的value是线程重入次数。
加锁的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// KEYS[1]:锁的名称;ARGV[1]:过期时间;ARGV[2]:线程的id
// 锁不存在
if (redis.call('exists', KEYS[1]) == 0) then
// 重入数设置为1,过期时间设置
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 该线程已经存在了,重入数加1
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 锁存在,并且不是调用线程所有,返回过期时间
return redis.call('pttl', KEYS[1]);

过期时间:启动一个看门狗线程,用来续期锁的过期时间,除非程序挂,或者自己主动释放,不然,看门狗程序一直存在,保证锁内的业务执行完成。

使用:
依赖:

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.2.0</version>
</dependency>

初始化一个Redisson客户端:

1
2
3
4
5
6
7
8
9
10
11
Config config = new Config();           config.useSingleServer().setAddress(host+":"+port).setDatabase(0)
.setConnectionMinimumIdleSize(10).setConnectionPoolSize(50);
RedissonClient redissonClient = Redisson.create(config);
// 锁
RLock rLock = redissonClient.getLock("redisson_LOCK");
// 尝试获取锁
rLock.tryLock(10, TimeUnit.SECONDS);
//业务

// 释放
rLock.unlock();

基于Zookeeper实现分布式锁

基于zookeeper的临时节点特性,大致思想是:
每个客户端需要加锁的时候,去zookeeper中创建一个目录,并生成一个瞬时有序节点,判断是否能获取锁的方式很简单,自己的节点是否是最小的一个。当使用完成后删除自己的临时节点,然后通知。

获取锁


客户端1和客户端2同时想获取锁,1先创建一个节点lock1,那么它可以成功获取锁,进行操作;
客户端2没获取到锁,它也创建一个节点lock2,并在节点lock1上注册一个监听器,当lock1变化时触发;
触发的时候客户端2会判断自己的节点是否是最小的,如果是最小的,那么可以获取到锁。

释放锁

主动释放:业务完成后,主动删除自己拥有的临时节点;
被动删除:还没完成业务,异常退出,断开了与zookeeper的通信,那么zookeeper可以帮着清除该临时节点。

使用

zookeeper的第三方库Curator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 创建也给客户端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
// 分布式锁
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) )
{
try
{
// do some work inside of the critical section here
}
finally
{
lock.release();
}
}