Redis 如何实现库存扣减操作和防止被超卖?
迪丽瓦拉
2025-06-01 00:39:43
0

本文已经收录到Github仓库,该仓库包含计算机基础、Java基础、多线程、JVM、数据库、Redis、Spring、Mybatis、SpringMVC、SpringBoot、分布式、微服务、设计模式、架构、校招社招分享等核心知识点,欢迎star~

Github地址:https://github.com/Tyson0314/Java-learning


电商当项目经验已经非常普遍了,不管你是包装的还是真实的,起码要能讲清楚电商中常见的问题,比如库存的操作怎么防止商品被超卖

解决方案:

  • 基于数据库单库存
  • 基于数据库多库存
  • 基于redis

基于redis实现扣减库存的具体实现

  • 初始化库存回调函数(IStockCallback)
  • 扣减库存服务(StockService)
  • 调用

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-STapA2Ti-1679409278382)(http://img.topjavaer.cn/img/库存扣减1.png)]


在日常开发中有很多地方都有类似扣减库存的操作,比如电商系统中的商品库存,抽奖系统中的奖品库存等。

解决方案

  1. 使用mysql数据库,使用一个字段来存储库存,每次扣减库存去更新这个字段。
  2. 还是使用数据库,但是将库存分层多份存到多条记录里面,扣减库存的时候路由一下,这样子增大了并发量,但是还是避免不了大量的去访问数据库来更新库存。
  3. 将库存放到redis使用redis的incrby特性来扣减库存。

分析

在上面的第一种和第二种方式都是基于数据来扣减库存。

基于数据库单库存

第一种方式在所有请求都会在这里等待锁,获取锁有去扣减库存。在并发量不高的情况下可以使用,但是一旦并发量大了就会有大量请求阻塞在这里,导致请求超时,进而整个系统雪崩;而且会频繁的去访问数据库,大量占用数据库资源,所以在并发高的情况下这种方式不适用。

基于数据库多库存

第二种方式其实是第一种方式的优化版本,在一定程度上提高了并发量,但是在还是会大量的对数据库做更新操作大量占用数据库资源。

基于数据库来实现扣减库存还存在的一些问题:

  • 用数据库扣减库存的方式,扣减库存的操作必须在一条语句中执行,不能先selec在update,这样在并发下会出现超扣的情况。如:
update number set x=x-1 where x > 0
  • MySQL自身对于高并发的处理性能就会出现问题,一般来说,MySQL的处理性能会随着并发thread上升而上升,但是到了一定的并发度之后会出现明显的拐点,之后一路下降,最终甚至会比单thread的性能还要差。
  • 当减库存和高并发碰到一起的时候,由于操作的库存数目在同一行,就会出现争抢InnoDB行锁的问题,导致出现互相等待甚至死锁,从而大大降低MySQL的处理性能,最终导致前端页面出现超时异常。

基于redis

针对上述问题的问题我们就有了第三种方案,将库存放到缓存,利用redis的incrby特性来扣减库存,解决了超扣和性能问题。但是一旦缓存丢失需要考虑恢复方案。比如抽奖系统扣奖品库存的时候,初始库存=总的库存数-已经发放的奖励数,但是如果是异步发奖,需要等到MQ消息消费完了才能重启redis初始化库存,否则也存在库存不一致的问题。

基于redis实现扣减库存的具体实现

  • 我们使用redis的lua脚本来实现扣减库存
  • 由于是分布式环境下所以还需要一个分布式锁来控制只能有一个服务去初始化库存
  • 需要提供一个回调函数,在初始化库存的时候去调用这个函数获取初始化库存

初始化库存回调函数(IStockCallback )

/*** 获取库存回调*/
public interface IStockCallback {/*** 获取库存* @return*/int getStock();
}

扣减库存服务(StockService)

/*** 扣库存**/
@Service
public class StockService {Logger logger = LoggerFactory.getLogger(StockService.class);/*** 不限库存*/public static final long UNINITIALIZED_STOCK = -3L;/*** Redis 客户端*/@Autowiredprivate RedisTemplate redisTemplate;/*** 执行扣库存的脚本*/public static final String STOCK_LUA;static {/**** @desc 扣减库存Lua脚本* 库存(stock)-1:表示不限库存* 库存(stock)0:表示没有库存* 库存(stock)大于0:表示剩余库存** @params 库存key* @return*   -3:库存未初始化*   -2:库存不足*   -1:不限库存*   大于等于0:剩余库存(扣减之后剩余的库存)*      redis缓存的库存(value)是-1表示不限库存,直接返回1*/StringBuilder sb = new StringBuilder();sb.append("if (redis.call('exists', KEYS[1]) == 1) then");sb.append("    local stock = tonumber(redis.call('get', KEYS[1]));");sb.append("    local num = tonumber(ARGV[1]);");sb.append("    if (stock == -1) then");sb.append("        return -1;");sb.append("    end;");sb.append("    if (stock >= num) then");sb.append("        return redis.call('incrby', KEYS[1], 0 - num);");sb.append("    end;");sb.append("    return -2;");sb.append("end;");sb.append("return -3;");STOCK_LUA = sb.toString();}/*** @param key           库存key* @param expire        库存有效时间,单位秒* @param num           扣减数量* @param stockCallback 初始化库存回调函数* @return -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存*/public long stock(String key, long expire, int num, IStockCallback stockCallback) {long stock = stock(key, num);// 初始化库存if (stock == UNINITIALIZED_STOCK) {RedisLock redisLock = new RedisLock(redisTemplate, key);try {// 获取锁if (redisLock.tryLock()) {// 双重验证,避免并发时重复回源到数据库stock = stock(key, num);if (stock == UNINITIALIZED_STOCK) {// 获取初始化库存final int initStock = stockCallback.getStock();// 将库存设置到redisredisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);// 调一次扣库存的操作stock = stock(key, num);}}} catch (Exception e) {logger.error(e.getMessage(), e);} finally {redisLock.unlock();}}return stock;}/*** 加库存(还原库存)** @param key    库存key* @param num    库存数量* @return*/public long addStock(String key, int num) {return addStock(key, null, num);}/*** 加库存** @param key    库存key* @param expire 过期时间(秒)* @param num    库存数量* @return*/public long addStock(String key, Long expire, int num) {boolean hasKey = redisTemplate.hasKey(key);// 判断key是否存在,存在就直接更新if (hasKey) {return redisTemplate.opsForValue().increment(key, num);}Assert.notNull(expire,"初始化库存失败,库存过期时间不能为null");RedisLock redisLock = new RedisLock(redisTemplate, key);try {if (redisLock.tryLock()) {// 获取到锁后再次判断一下是否有keyhasKey = redisTemplate.hasKey(key);if (!hasKey) {// 初始化库存redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);}}} catch (Exception e) {logger.error(e.getMessage(), e);} finally {redisLock.unlock();}return num;}/*** 获取库存** @param key 库存key* @return -1:不限库存; 大于等于0:剩余库存*/public int getStock(String key) {Integer stock = (Integer) redisTemplate.opsForValue().get(key);return stock == null ? -1 : stock;}/*** 扣库存** @param key 库存key* @param num 扣减库存数量* @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存】*/private Long stock(String key, int num) {// 脚本里的KEYS参数List keys = new ArrayList<>();keys.add(key);// 脚本里的ARGV参数List args = new ArrayList<>();args.add(Integer.toString(num));long result = redisTemplate.execute(new RedisCallback() {@Overridepublic Long doInRedis(RedisConnection connection) throws DataAccessException {Object nativeConnection = connection.getNativeConnection();// 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行// 集群模式if (nativeConnection instanceof JedisCluster) {return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);}// 单机模式else if (nativeConnection instanceof Jedis) {return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);}return UNINITIALIZED_STOCK;}});return result;}}

调用

@RestController
public class StockController {@Autowiredprivate StockService stockService;@RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)public Object stock() {// 商品IDlong commodityId = 1;// 库存IDString redisKey = "redis_key:stock:" + commodityId;long stock = stockService.stock(redisKey, 60 * 60, 2, () -> initStock(commodityId));return stock >= 0;}/*** 获取初始的库存** @return*/private int initStock(long commodityId) {// TODO 这里做一些初始化库存的操作return 1000;}@RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)public Object getStock() {// 商品IDlong commodityId = 1;// 库存IDString redisKey = "redis_key:stock:" + commodityId;return stockService.getStock(redisKey);}@RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)public Object addStock() {// 商品IDlong commodityId = 2;// 库存IDString redisKey = "redis_key:stock:" + commodityId;return stockService.addStock(redisKey, 2);}
}

最后给大家分享一个Github仓库,上面有大彬整理的300多本经典的计算机书籍PDF,包括C语言、C++、Java、Python、前端、数据库、操作系统、计算机网络、数据结构和算法、机器学习、编程人生等,可以star一下,下次找书直接在上面搜索,仓库持续更新中~

Github地址:https://github.com/Tyson0314/java-books

相关内容

热门资讯

linux入门---制作进度条 了解缓冲区 我们首先来看看下面的操作: 我们首先创建了一个文件并在这个文件里面添加了...
C++ 机房预约系统(六):学... 8、 学生模块 8.1 学生子菜单、登录和注销 实现步骤: 在Student.cpp的...
A.机器学习入门算法(三):基... 机器学习算法(三):K近邻(k-nearest neigh...
数字温湿度传感器DHT11模块... 模块实例https://blog.csdn.net/qq_38393591/article/deta...
有限元三角形单元的等效节点力 文章目录前言一、重新复习一下有限元三角形单元的理论1、三角形单元的形函数(Nÿ...
Redis 所有支持的数据结构... Redis 是一种开源的基于键值对存储的 NoSQL 数据库,支持多种数据结构。以下是...
win下pytorch安装—c... 安装目录一、cuda安装1.1、cuda版本选择1.2、下载安装二、cudnn安装三、pytorch...
MySQL基础-多表查询 文章目录MySQL基础-多表查询一、案例及引入1、基础概念2、笛卡尔积的理解二、多表查询的分类1、等...
keil调试专题篇 调试的前提是需要连接调试器比如STLINK。 然后点击菜单或者快捷图标均可进入调试模式。 如果前面...
MATLAB | 全网最详细网... 一篇超超超长,超超超全面网络图绘制教程,本篇基本能讲清楚所有绘制要点&#...
IHome主页 - 让你的浏览... 随着互联网的发展,人们越来越离不开浏览器了。每天上班、学习、娱乐,浏览器...
TCP 协议 一、TCP 协议概念 TCP即传输控制协议(Transmission Control ...
营业执照的经营范围有哪些 营业执照的经营范围有哪些 经营范围是指企业可以从事的生产经营与服务项目,是进行公司注册...
C++ 可变体(variant... 一、可变体(variant) 基础用法 Union的问题: 无法知道当前使用的类型是什...
血压计语音芯片,电子医疗设备声... 语音电子血压计是带有语音提示功能的电子血压计,测量前至测量结果全程语音播报࿰...
MySQL OCP888题解0... 文章目录1、原题1.1、英文原题1.2、答案2、题目解析2.1、题干解析2.2、选项解析3、知识点3...
【2023-Pytorch-检... (肆十二想说的一些话)Yolo这个系列我们已经更新了大概一年的时间,现在基本的流程也走走通了,包含数...
实战项目:保险行业用户分类 这里写目录标题1、项目介绍1.1 行业背景1.2 数据介绍2、代码实现导入数据探索数据处理列标签名异...
记录--我在前端干工地(thr... 这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助 前段时间接触了Th...
43 openEuler搭建A... 文章目录43 openEuler搭建Apache服务器-配置文件说明和管理模块43.1 配置文件说明...