本章节,构造分布式锁的目的既不是给同一个进程中的多个线程使用,也不是给同一台机器上的多个进程使用,而是由不同机器上的不同Redis客户端进行获取和释放的。
回忆一下之前学的Redis事物章节中模拟的市场购物场景,购买商品的时候使用了watch去监视市场以及买家的的个人信息来保证购买流程的正常进行。引用书上的性能测试数据来说明性能扩展的必要性,下表将展示市场在重负载情况下运行60秒的结果:
| 卖家、买家数 | 上架商品数量 | 买入商品数量 | 购买重试次数 | 每次购买的平均等待时间 |
|---|---|---|---|---|
| 1个卖家,1个买家 | 145000 | 27000 | 8000 | 14ms |
| 5个卖家,1个买家 | 331000 | <200 | 5000 | 150ms |
| 5个卖家,5个买家 | 206000 | <600 | 161000 | 498ms |
可以很明显看出watch的性能不具扩展性,下面将使用锁来保证市场在任意时刻只能上架或销售一件商品,看看性能会怎么样。
接下来我们学习下第一版的锁实现,这个版本要锁的事就是正确地实现基本的加锁功能,而之后会学习如何处理过期的锁以及因为持有者崩溃而无法释放的锁。为了对数据进行排他访问程序首先要做的就是获取锁,setnx命令天生就适合用来实现锁的获取功能。
//获取锁
publicStringacquireLock(String lockName){
long acquireTimeout = 10000;
//随机128位UUID作为键的值
String identifier = UUID.randomUUID().toString();
final byte[] rawKey = stringRedisTemplate.getStringSerializer().serialize("lock:" + lockName);
final byte[] rawVal = stringRedisTemplate.getStringSerializer().serialize(identifier);
long end = System.currentTimeMillis() + acquireTimeout;
//10秒内获取不到锁就返回
while (System.currentTimeMillis() < end){
RedisCallback<Boolean> redisCallback = redisConnection ->
/**
* setnx命令的语义是将key的值设为value,当且仅当key不存在
* 若key存在,不做任何动作,返回0(false)
*/
redisConnection.setNX(rawKey, rawVal);
if(stringRedisTemplate.execute(redisCallback).booleanValue()){
return identifier;
}
}
return null;
}
锁的创建已经就绪,使用很简单,我们改造下之前的purchaseItem方法:
publicbooleanpurchaseItemWithLock(String buyerId, String itemId, String sellerId){
String locked = acquireLock("market");
if(locked == null){
return false;
}else{
try{
System.out.println("purchasing....");
return true;
}finally {
//release lock
System.out.println("release lock");
releaseLock("market", locked);
}
}
}
在这个版本中锁没有超时功能,使用完毕一定要主动释放:
publicbooleanreleaseLock(String lockName, String identifier){
String lockKey = "lock:" + lockName;
SessionCallback<List<Object>> sessionCallback = new SessionCallback<List<Object>>() {
publicList<Object>execute(RedisOperations operations)throwsDataAccessException{
operations.watch(lockKey);
if(identifier.equals(operations.opsForValue().get(lockKey))){
operations.multi();
operations.delete(lockKey);
return operations.exec();
}
//若取出来的不是想要释放的锁,不作任何操作,返回空
operations.unwatch();
return null;
}
};
List<Object> results = stringRedisTemplate.execute(sessionCallback);
return results != null ? true : false;
}
至此,已使用锁替代watch重新实现商品购买操作,下表依然引用书上的数据来展示锁替代的效果:
| 卖家、买家数 | 上架商品数量 | 买入商品数量 | 购买重试次数 | 每次购买的平均等待时间 |
|---|---|---|---|---|
| 1个卖家,1个买家 | 51000 | 50000 | 0 | 1ms |
| 5个卖家,1个买家 | 68000 | 13000 | <10 | 5ms |
| 5个卖家,5个买家 | 21000 | 20500 | 0 | 14ms |
与之前的watch实现相比,锁实现的上架数量虽然有所减少,但是在买入商品时却不需要重试,并且上架商品数量和买入商品数量之间的比率,也跟卖家数量和买家数量之间的比率接近。可想而知,上架和买卖商品操作都需要获取锁来锁住市场,两个进程之间的竞争限制了商品买卖操作性能的进一步提升。要解决这个问题其实也很容易,就是使用细粒度锁。到目前为止我们考虑的只是如何实现与watch命令力度相同的锁,这种锁可以把整个市场都锁住。因为我们是自己手动来构建锁实现,并且我们关心的不是整个市场,而是市场里面谋面商品是否存在,所以我们实际上可以将加锁的力度变得更细一些。通过只锁住被买卖的商品而不是整个市场,可以减少锁竞争出现的几率并提升程序的性能。
前面提到过,目前的锁实现在持有者崩溃的时候不会自动释放,这将导致一直处于已被获取的状态。为了解决这个问题,在这一节中,将为锁加上超时功能。
publicStringacquireLockWithTimeout(String lockName){
//10秒内获取不到锁就返回
long acquireTimeout = 10000;
//20秒后锁超时
long lockTimeout = 20000;
String identifier = UUID.randomUUID().toString();
final byte[] rawKey = stringRedisTemplate.getStringSerializer().serialize("lock:" + lockName);
final byte[] rawVal = stringRedisTemplate.getStringSerializer().serialize(identifier);
long end = System.currentTimeMillis() + acquireTimeout;
while (System.currentTimeMillis() < end) {
RedisCallback<Boolean> redisCallback = redisConnection ->{
//成功获取锁之后设置锁超时时间
if(redisConnection.setNX(rawKey, rawVal)){
redisConnection.expire(rawKey, lockTimeout);
return true;
}
/**
* 锁获取失败之后检测锁是否有超时时间,如果没有则设置超时时间
* 这是为了防止程序在setNX和expire之间崩溃
*/
if(redisConnection.ttl(rawKey) == -1){
redisConnection.expire(rawKey, lockTimeout);
return false;
}
return false;
};
if(stringRedisTemplate.execute(redisCallback).booleanValue()){
return identifier;
}
}
return null;
}