• 已删除用户
Administrator
发布于 2018-07-24 / 5 阅读
0

Redis 分布式锁实现

分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于 Redis 的分布式锁;3. 基于 ZooKeeper 的分布式锁。

我们最先做电商+零售的库存业务时,选用的是数据库乐观锁,但是在并发较大时性能不太乐观。后来重构考虑使用 Redis 来实现分布式锁,采用了 Spring 的 RedisLockRegistry。在压测时发现单一销售业务下性能比较稳定,但是在采购、批发等业务里,动则一个单据就要给 2000-3000 款商品加锁,高并发下(重点指混合场景下的并发)RedisLockRegistry 在此场景的的缺点就暴露出来了:不可批量获取和释放锁,我们订单下的所有商品加锁,会消耗大量的网络资源且获取整单商品锁的时间和不可控制。

于是我们在参照 RedisLockRegistry 的技术上,写了 RedisBatchLockRegistry。再压测时,我们原先场景并发提高 10 倍的基础上,性能依然满足要求。

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;

import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.stereotype.Component;

/**
 *
 * ClassName: RedisBatchLockRegistry
 * @Description: Redis分布式锁,支持批量获取和释放锁
 *
 * org.springframework.integration.redis.util.RedisLockRegistry 存在两个问题:
 * 1.不支持锁的批量获取和释放
 * 2.锁的重试时间只能为100毫秒
 *
 * 该类参考自RedisLockRegistry,与之对比不同点为:
 * 1.增加了锁的批量获取和释放
 * 2.去除了本地锁实现(RedisLockRegistry的方式实现实际上为Redis锁+本地锁双重锁)
 *
 * @author zhangqin
 * @date 2017年5月24日
 *
 * =================================================================================================
 *     Task ID			  Date			     Author		      Description
 * ----------------+----------------+-------------------+-------------------------------------------
 *
 */
@Component
public class RedisBatchLockRegistry {

	/**
	 * Logger
	 */
	private static final Logger LOG = LoggerFactory.getLogger(RedisBatchLockRegistry.class);

	/**
	 * Redis模版
	 */
	@Autowired
	private RedisTemplate<String, String> redisTemplate;

	public RedisBatchLockRegistry(){}
	/**
	 *
	 * <p>Title: 初始化</p>
	 * <p>Description: </p>
	 * @param connectionFactory
	 */
	public RedisBatchLockRegistry(RedisConnectionFactory connectionFactory) {
	}

	/**
	 *
	 * @Description: 尝试批量获取锁
	 * @param list
	 * @param maxWait
	 * @return boolean
	 * @author zhangq
	 * @throws InterruptedException
	 * @date 2017年5月27日
	 */
	@SuppressWarnings("rawtypes")
	public boolean tryLock(final Set<String> keys, long maxWait, long timeout) {
		try {
			LOG.info("尝试获取锁:{}", keys);

			// 需要获取的锁
			final List<String> needLocking = new CopyOnWriteArrayList<String>();
			needLocking.addAll(keys);

			// 已经获取的锁
			List<String> locked = new CopyOnWriteArrayList<String>();

			// 获取等待时间的最后时间段
			long expireAt = System.currentTimeMillis() + maxWait;

			// 循环判断锁是否一直存在
			while (System.currentTimeMillis() < expireAt) {

				// 通过管道批量获取锁
				LOG.info("进入tryLock while,needLocking:{}", needLocking);
				List<Object> results = redisTemplate.executePipelined(new RedisCallback() {

					@Override
					public Object doInRedis(RedisConnection connection) throws DataAccessException {
						LOG.info("connection:{}, needLocking:{}", connection, needLocking);
						String value = System.currentTimeMillis() + "";
						for (String key : needLocking) {
							LOG.info("key :{}", key);
							// 如果不存在则 SET。SET if Not eXists。
							connection.setNX(key.getBytes(), value.getBytes());
							// LOG.info("pipelined setNX result:{}, key:{},
							// value:{}", flag, key, key);
						}
						return null;
					}
				});

				// 提交redis执行计数
				LOG.info("needLocking:{},results:{}", needLocking, results);
				for (int i = 0; i < results.size(); i++) {
					// 是否执行成功
					Boolean success = (Boolean) results.get(i);
					// 锁的KEY
					String key = needLocking.get(i);

					// setnx成功,获得锁
					if (success) {
						// 设置锁的过期时间
						redisTemplate.expire(key, timeout, TimeUnit.MILLISECONDS);
						locked.add(key);
					}
				}

				// 移除已锁定资源
				needLocking.removeAll(locked);

				// 是否锁住全部资源
				if (CollectionUtils.isEmpty(needLocking)) {
					// 全部资源均已锁住,返回成功true
					return true;
				} else {
					// 补偿处理,防止异常情况下(宕机/重启/连接超时等)导致的锁永不过期
					List<String> exceptionLock = new CopyOnWriteArrayList<String>();
					for (String key : needLocking) {
						String value = redisTemplate.opsForValue().get(key);
						// 当前时间 > 上锁时间 + 超时时间 + 2秒(经验时间),表示为该过期却未过期的数据,即异常数据
						if (null != value && System.currentTimeMillis() > Long.parseLong(value) + timeout + 2000) {
							exceptionLock.add(key);
						}
					}
					// 删除所有异常的 KEY
					if (CollectionUtils.isNotEmpty(exceptionLock)) {
						unLock(exceptionLock);
					}

					// 部分资源未能锁住,间隔N毫秒后重试
					Thread.sleep(10);
				}
			}

			// 仍有资源未被锁住(needLocking不为空),释放已锁定的资源,并返回失败false
			if (!CollectionUtils.isEmpty(needLocking)) {
				LOG.info("can not get the lock, keys:{}", needLocking);
				unLock(locked);
			}
		} catch (Exception e) {
			LOG.info("尝试获取锁失败:{}", e);
		}

		return false;
	}

	/**
	 *
	 * @Description: 批量释放锁
	 * @param list
	 * @return void
	 * @author zhangq
	 * @date 2017年5月27日
	 */
	public void unLock(Collection<String> keys) {
		try{
			// 防止多个服务器处理同一个key需要watch操作(相当于是禁止了其他client处理这个key)
			//redisTemplate.watch(keys);

			// 批量释放锁
			redisTemplate.delete(keys);

			// 锁已经过期了,释放watch操作
			//redisTemplate.unwatch();
		}catch (Exception e){
			LOG.error("批量释放锁失败!",e);
		}finally {
			RedisConnectionUtils.unbindConnection(redisTemplate.getConnectionFactory());
		}
	}
}