柚子快報(bào)邀請(qǐng)碼778899分享:數(shù)據(jù)庫(kù) Redis-分布式鎖
柚子快報(bào)邀請(qǐng)碼778899分享:數(shù)據(jù)庫(kù) Redis-分布式鎖
Redis-分布式鎖
如何使用分布式鎖
正常在一個(gè)java服務(wù)中使用sync鎖或lock鎖完全可以滿足線程安全問題的,但是在部署集群的情況下,不同的jvm不能鎖同一個(gè)方法,因此需要分布式鎖用來保護(hù)線程安全問題。
分布式鎖實(shí)現(xiàn)
常見的分布式鎖解決方案:
Mysql:自帶悲觀鎖,但是不太好維護(hù)redis:利用setnx實(shí)現(xiàn)互斥,操作方便,推薦使用zookeeper:利用節(jié)點(diǎn)實(shí)現(xiàn)互斥
本章主要采用redis的方式進(jìn)行實(shí)現(xiàn)
public interface ILock {
/**
* 分布式-互斥鎖
*/
boolean tryLock(String name, Long time, TimeUnit unit);
/**
* 分布式-釋放互斥鎖
*/
void unLock(String name);
}
/**
* 分布式鎖實(shí)現(xiàn)
*/
@Component
public class DistributedLock implements ILock{
@Resource
private StringRedisTemplate stringRedisTemplate;
/** 分布式鎖key */
private final String DISTRIBUTED_LOCK = "distributed_lock:";
/**
* 分布式互斥鎖
*/
@Override
public boolean tryLock(String name, Long time, TimeUnit unit) {
// value
String value = Thread.currentThread().getId() + "";
// key
String key = DISTRIBUTED_LOCK + name;
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(key, value, time, unit);
// 防止自動(dòng)拆箱空指針
return BooleanUtil.isTrue(aBoolean);
}
/**
* 分布式釋放鎖
*/
@Override
public void unLock(String name) {
String key = DISTRIBUTED_LOCK + name;
stringRedisTemplate.delete(key);
}
}
分布式鎖誤刪問題
在設(shè)置互斥鎖的時(shí)候?yàn)榱私鉀Qredis宕機(jī)導(dǎo)致互斥鎖永久失效的情況下,加了一個(gè)過期時(shí)間。此時(shí)如果緩存重建的時(shí)間比過期時(shí)間更長(zhǎng),會(huì)導(dǎo)致多個(gè)線程釋放不同的鎖資源導(dǎo)致分布式鎖誤刪問題。 解決誤刪問題:
需要在獲取鎖時(shí)存入線程表示(uuid + 線程id)的方式在釋放鎖時(shí)需要先獲取鎖中的線程標(biāo)識(shí),判斷是否與當(dāng)前線程標(biāo)識(shí)一致
如果一致則釋放鎖如果不一致則不釋放鎖
更新代碼:
@Component
public class DistributedLock implements ILock{
@Resource
private StringRedisTemplate stringRedisTemplate;
/** 分布式鎖key */
private final String DISTRIBUTED_LOCK = "distributed_lock:";
/** UUID */
private String uuid = UUID.randomUUID(true).toString();
/**
* 分布式互斥鎖
*/
@Override
public boolean tryLock(String name, Long time, TimeUnit unit) {
// value
String value = uuid + Thread.currentThread().getId();
// key
String key = DISTRIBUTED_LOCK + name;
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(key, value, time, unit);
// 防止自動(dòng)拆箱空指針
return BooleanUtil.isTrue(aBoolean);
}
/**
* 分布式釋放鎖
*/
@Override
public void unLock(String name) {
String key = DISTRIBUTED_LOCK + name;
String value = uuid + Thread.currentThread().getId();
// 獲取互斥鎖中的值
String redisValue = stringRedisTemplate.opsForValue().get(key);
if (StringUtils.equals(value,redisValue)){
stringRedisTemplate.delete(key);
}
}
}
Redisson入門
正常使用setnx實(shí)現(xiàn)分布式鎖存在以下幾種問題
不可重入鎖:同一現(xiàn)成無法多次獲取同一把鎖不可重試:獲取鎖只嘗試一次就返回,無法重試超時(shí)釋放:業(yè)務(wù)執(zhí)行耗時(shí)較長(zhǎng),會(huì)導(dǎo)致鎖釋放主從一致性:集群的情況下主節(jié)點(diǎn)宕機(jī)后同步數(shù)據(jù)過程種,導(dǎo)致鎖失效
Redisson 是一個(gè) Java 高級(jí) Redis 客戶端,提供了基于 Redis 的分布式和可擴(kuò)展的 Java 數(shù)據(jù)結(jié)構(gòu),如并發(fā)集合(Concurrent Collections)、同步器(Synchronizers)、分布式服務(wù)(Distributed Services)等。Redisson 構(gòu)建于 Jedis 之上,旨在簡(jiǎn)化 Redis 的使用,尤其對(duì)于分布式環(huán)境中的應(yīng)用程序而言,它提供了一種易于使用的 API 來處理 Redis 中的數(shù)據(jù),并實(shí)現(xiàn)了多種分布式鎖和其他高級(jí)功能。Redisson底層采用的是Netty 框架
案例:每個(gè)用戶對(duì)一件商品只能下一單。
配置文件
redisson:
# redis key前綴
keyPrefix:
# 線程池?cái)?shù)量
threads: 4
# Netty線程池?cái)?shù)量
nettyThreads: 8
# 單節(jié)點(diǎn)配置
singleServerConfig:
# 客戶端名稱
clientName: ${ruoyi.name}
# 最小空閑連接數(shù)
connectionMinimumIdleSize: 8
# 連接池大小
connectionPoolSize: 32
# 連接空閑超時(shí),單位:毫秒
idleConnectionTimeout: 10000
# 命令等待超時(shí),單位:毫秒
timeout: 3000
# 發(fā)布和訂閱連接池大小
subscriptionConnectionPoolSize: 50
/**
* Redisson 配置屬性
*
*/
@Data
@ConfigurationProperties(prefix = "redisson")
public class RedissonProperties {
/**
* redis緩存key前綴
*/
private String keyPrefix;
/**
* 線程池?cái)?shù)量,默認(rèn)值 = 當(dāng)前處理核數(shù)量 * 2
*/
private int threads;
/**
* Netty線程池?cái)?shù)量,默認(rèn)值 = 當(dāng)前處理核數(shù)量 * 2
*/
private int nettyThreads;
/**
* 單機(jī)服務(wù)配置
*/
private SingleServerConfig singleServerConfig;
/**
* 集群服務(wù)配置
*/
private ClusterServersConfig clusterServersConfig;
@Data
@NoArgsConstructor
public static class SingleServerConfig {
/**
* 客戶端名稱
*/
private String clientName;
/**
* 最小空閑連接數(shù)
*/
private int connectionMinimumIdleSize;
/**
* 連接池大小
*/
private int connectionPoolSize;
/**
* 連接空閑超時(shí),單位:毫秒
*/
private int idleConnectionTimeout;
/**
* 命令等待超時(shí),單位:毫秒
*/
private int timeout;
/**
* 發(fā)布和訂閱連接池大小
*/
private int subscriptionConnectionPoolSize;
}
@Data
@NoArgsConstructor
public static class ClusterServersConfig {
/**
* 客戶端名稱
*/
private String clientName;
/**
* master最小空閑連接數(shù)
*/
private int masterConnectionMinimumIdleSize;
/**
* master連接池大小
*/
private int masterConnectionPoolSize;
/**
* slave最小空閑連接數(shù)
*/
private int slaveConnectionMinimumIdleSize;
/**
* slave連接池大小
*/
private int slaveConnectionPoolSize;
/**
* 連接空閑超時(shí),單位:毫秒
*/
private int idleConnectionTimeout;
/**
* 命令等待超時(shí),單位:毫秒
*/
private int timeout;
/**
* 發(fā)布和訂閱連接池大小
*/
private int subscriptionConnectionPoolSize;
/**
* 讀取模式
*/
private ReadMode readMode;
/**
* 訂閱模式
*/
private SubscriptionMode subscriptionMode;
}
}
/**
* redis配置
*
*/
@Slf4j
@AutoConfiguration
@EnableCaching
@EnableConfigurationProperties(RedissonProperties.class)
public class RedisConfig {
@Autowired
private RedissonProperties redissonProperties;
@Autowired
private ObjectMapper objectMapper;
@Bean
public RedissonAutoConfigurationCustomizer redissonCustomizer() {
return config -> {
ObjectMapper om = objectMapper.copy();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 指定序列化輸入的類型,類必須是非final修飾的。序列化時(shí)將對(duì)象全類名一起保存下來
om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
TypedJsonJacksonCodec jsonCodec = new TypedJsonJacksonCodec(Object.class, om);
// 組合序列化 key 使用 String 內(nèi)容使用通用 json 格式
CompositeCodec codec = new CompositeCodec(StringCodec.INSTANCE, jsonCodec, jsonCodec);
config.setThreads(redissonProperties.getThreads())
.setNettyThreads(redissonProperties.getNettyThreads())
// 緩存 Lua 腳本 減少網(wǎng)絡(luò)傳輸(redisson 大部分的功能都是基于 Lua 腳本實(shí)現(xiàn))
.setUseScriptCache(true)
.setCodec(codec);
RedissonProperties.SingleServerConfig singleServerConfig = redissonProperties.getSingleServerConfig();
if (ObjectUtil.isNotNull(singleServerConfig)) {
// 使用單機(jī)模式
config.useSingleServer()
//設(shè)置redis key前綴
.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix()))
.setTimeout(singleServerConfig.getTimeout())
.setClientName(singleServerConfig.getClientName())
.setIdleConnectionTimeout(singleServerConfig.getIdleConnectionTimeout())
.setSubscriptionConnectionPoolSize(singleServerConfig.getSubscriptionConnectionPoolSize())
.setConnectionMinimumIdleSize(singleServerConfig.getConnectionMinimumIdleSize())
.setConnectionPoolSize(singleServerConfig.getConnectionPoolSize());
}
// 集群配置方式 參考下方注釋
RedissonProperties.ClusterServersConfig clusterServersConfig = redissonProperties.getClusterServersConfig();
if (ObjectUtil.isNotNull(clusterServersConfig)) {
config.useClusterServers()
//設(shè)置redis key前綴
.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix()))
.setTimeout(clusterServersConfig.getTimeout())
.setClientName(clusterServersConfig.getClientName())
.setIdleConnectionTimeout(clusterServersConfig.getIdleConnectionTimeout())
.setSubscriptionConnectionPoolSize(clusterServersConfig.getSubscriptionConnectionPoolSize())
.setMasterConnectionMinimumIdleSize(clusterServersConfig.getMasterConnectionMinimumIdleSize())
.setMasterConnectionPoolSize(clusterServersConfig.getMasterConnectionPoolSize())
.setSlaveConnectionMinimumIdleSize(clusterServersConfig.getSlaveConnectionMinimumIdleSize())
.setSlaveConnectionPoolSize(clusterServersConfig.getSlaveConnectionPoolSize())
.setReadMode(clusterServersConfig.getReadMode())
.setSubscriptionMode(clusterServersConfig.getSubscriptionMode());
}
log.info("初始化 redis 配置");
};
}
@RequiredArgsConstructor
@Service
public class BookOrderServiceImpl implements IBookOrderService {
private final BookOrderMapper baseMapper;
private final SysUserMapper sysUserMapper;
private final BooksMapper booksMapper;
private final BookOrderDetailMapper bookOrderDetailMapper;
/** 自定義分布式鎖 */
private final DistributedLock distributedLock;
/** redission */
private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
/**
* 模擬庫(kù)存扣減并發(fā)問題
*/
@Transactional(rollbackFor = Exception.class)
@Override
public void inventory(String bookId,Long userId) {
// 一人一單校驗(yàn)
Long aLong = bookOrderDetailMapper.selectCount(
Wrappers.lambdaQuery(BookOrderDetail.class).eq(BookOrderDetail::getNumber, userId)
.eq(BookOrderDetail::getBookId,bookId)
);
if (aLong > 0){
throw new ServiceException("下單失敗");
}
// 自定義獲取鎖
// boolean piker = distributedLock.tryLock("PIKER", 10L, TimeUnit.SECONDS);
// redisClient分布式鎖
RLock lock = CLIENT.getLock("lock:order:");
// 默認(rèn)失敗不等待鎖時(shí)間,鎖過期時(shí)間30秒
boolean piker = lock.tryLock();
if (piker){
try {
// 訂單業(yè)務(wù)
placingOrder(bookId, userId);
}finally {
// 自定義釋放鎖
// distributedLock.unLock("PIKER");
// redisson 釋放鎖
lock.unlock();
}
}
}
/**
* 業(yè)務(wù)操作
*/
private void placingOrder(String bookId, Long userId) {
// 1.減少庫(kù)存
Books books = booksMapper.selectById(bookId);
books.setStockQuantity(books.getStockQuantity() - 1);
booksMapper.updateById(books);
// 2.增加訂單
BookOrderDetail bookOrder = new BookOrderDetail();
bookOrder.setBookId(Long.parseLong(bookId));
bookOrder.setNumber(userId.intValue());
bookOrderDetailMapper.insert(bookOrder);
}
}
Redisson-分布式鎖實(shí)現(xiàn)原理
1.可重入鎖
方法1{
獲取鎖
調(diào)用方法2
}
方法2{
獲取鎖
}
以上這種情況下使用自定義的setnx方式就會(huì)造成死鎖的情況,比較經(jīng)典的重入鎖。
Rdisson使用Lua腳本來實(shí)現(xiàn)可重入鎖的。
2.重試機(jī)制,超時(shí)釋放
重試機(jī)制:在設(shè)置互斥鎖時(shí)有兩個(gè)線程A,B。A線程先獲取鎖資源,之后B在獲取鎖就會(huì)一直失敗,因?yàn)殒i的互斥性,沒有重試的機(jī)制。
超時(shí)釋放:給鎖設(shè)置一個(gè)過期時(shí)間,防止redis宕機(jī)情況下鎖一直沒有辦法被釋放導(dǎo)致死鎖情況,或者因?yàn)闃I(yè)務(wù)原因?qū)е戮彺嬷亟〞r(shí)間大于鎖過期時(shí)間導(dǎo)致數(shù)據(jù)丟失
注意:redisson不同版本的代碼不同,但是整體流程是大差不大的,下面是結(jié)合黑馬程序猿老師結(jié)合總結(jié)的流程圖。 如果自己設(shè)置失效時(shí)間的話,鎖過期時(shí)間就不是-1因此就不會(huì)觸發(fā)看門狗機(jī)制了。
獲取鎖:
釋放鎖:
有了以上的機(jī)制可以實(shí)現(xiàn):我有三個(gè)線程 A,B 設(shè)置等待時(shí)間3秒,線程A先獲取到鎖,由于業(yè)務(wù)原因進(jìn)行阻塞,此時(shí)線程2開始獲取鎖。線程A業(yè)務(wù)執(zhí)行了4秒,那么首先線程2獲取鎖失敗。如果線程A執(zhí)行業(yè)務(wù)在3秒內(nèi)完成,那么線程2可以成功獲取鎖。
柚子快報(bào)邀請(qǐng)碼778899分享:數(shù)據(jù)庫(kù) Redis-分布式鎖
文章鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。