[TOC]
什么是SKU?
我们在提到一个产品时经常会说SKU,而且只要涉及到实物交易和库存管理就会有SKU的概念,这个词到底是什么意思呢?
SKU英文全称为Stock Keeping Unit,简称SKU,**是产品入库后一种编码归类方法,也是库存控制的最小单位。**可以是以件,盒,托盘等为单位,每种产品均对应唯一的SKU号,SKU号包含一种产品的品牌、型号、配置、等级、包装容量、单位、生产日期、保质期、用途、价格、产地等属性,一件产品的属性与其他产品都不一样,这样的商品就是一个单品。
简单来说,电商行业一件商品的信息就是一个SKU
带货流程梳理
直播间展示货物 -> 用户可以点击商品logo进行购买
关于商品带货这块的产品设计,我们可以在直播间中设计一块浮动的tab栏,用于展示相关的产品列表信息。
当用户进入直播间以后,可以通过产品列表去点击查看商品详情,然后使用虚拟币去进行购买即可。
业务功能点分析
- 商品信息表的设计(sku作为基本单位,类目,库存的设计),提供商品信息的基础接口,商品id的查询,商品信息的更新,商品信息的录入,商品信息的批量查询
- 手机(一级类目) -> 三星,华为,小米,苹果,vivo,oppo(二级类目)-> note 荣耀系列 .... 机型,内存,cpu等(三级类目)
- sku单位,名字细化到具体的商品名称(小米note11系列 最新手机 曲面屏 黑色系列),category
- 直播带货配置表(按照主播id为维度,去配置每个主播可以带货的sku信息),在直播间配置接口中返回主播是否有带货的权限
- 提供接口,查看主播本次带货的商品列表信息
- 提供接口,查看商品详情信息
- 商品下单的功能(加入购物车,待支付(如果没填写收货地址,需要填写收货地址),下单)
- 库存扣减的功能(如果订单超时未支付成功,我们得考虑库存回滚的问题)
按理说带货功能应该单独放到一模块,但是当前我们的模块数量已经很多了,所以我们还是将其放到gift模块中
创建数据表:
-- Create syntax for TABLE 't_sku_info'
CREATE TABLE `t_sku_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT COMMENT '主键id',
`sku_id` int unsigned NOT NULL DEFAULT '0' COMMENT 'sku id',
`sku_price` int unsigned NOT NULL DEFAULT '0' COMMENT 'sku价格',
`sku_code` varchar(250) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'sku编码',
`name` varchar(250) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '商品名称',
`icon_url` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '缩略图',
`original_icon_url` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '原图',
`remark` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '商品描述',
`status` tinyint unsigned NOT NULL DEFAULT '0' COMMENT '状态(0下架,1上架)',
`category_id` int NOT NULL COMMENT '类目id',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='商品sku信息表';
-- Create syntax for TABLE 't_sku_order_info'
CREATE TABLE `t_sku_order_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`sku_id_list` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`user_id` bigint unsigned NOT NULL DEFAULT '0' COMMENT '用户id',
`room_id` int unsigned NOT NULL DEFAULT '0' COMMENT '直播id',
`status` int unsigned NOT NULL DEFAULT '0' COMMENT '状态',
`extra` varchar(250) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '备注',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=23 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='商品订单表';
-- Create syntax for TABLE 't_sku_stock_info'
CREATE TABLE `t_sku_stock_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT COMMENT '主键id',
`sku_id` int unsigned NOT NULL DEFAULT '0' COMMENT 'sku id',
`stock_num` int unsigned NOT NULL DEFAULT '0' COMMENT 'sku库存',
`status` tinyint unsigned NOT NULL DEFAULT '0' COMMENT '状态(0无效,1有效)',
`version` int unsigned DEFAULT NULL COMMENT '乐观锁',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='sku库存表';
-- Create syntax for TABLE 't_anchor_shop_info'
CREATE TABLE `t_anchor_shop_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`anchor_id` int unsigned NOT NULL DEFAULT '0' COMMENT '主播id',
`sku_id` int unsigned NOT NULL DEFAULT '0' COMMENT '商品sku id',
`status` tinyint unsigned NOT NULL DEFAULT '1' COMMENT '有效(0无效,1有效)',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='带货主播权限配置表';
-- Create syntax for TABLE 't_category_info'
CREATE TABLE `t_category_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT COMMENT '主键id',
`level` int unsigned NOT NULL DEFAULT '0' COMMENT '类目级别',
`parent_id` int unsigned NOT NULL DEFAULT '0' COMMENT '父类目id',
`category_name` varchar(250) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '类目名称',
`status` tinyint unsigned NOT NULL DEFAULT '0' COMMENT '状态(0无效,1有效)',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='类目表';
实体类对象的准备:
qiyu-live-api:
@Data
public class SkuInfoReqVO {
private Long skuId;
private Long anchorId;
}
@Data
public class SkuInfoVO {
private Long skuId;
private Integer skuPrice;
private String skuCode;
private String name;
private String iconUrl;
private String originalIconUrl;
private String remark;
}
@Data
public class SkuDetailInfoVO {
private Long skuId;
private Integer skuPrice;
private String skuCode;
private String name;
private String iconUrl;
private String originalIconUrl;
private String remark;
//还有其它复杂数据
}
@Data
public class ShopCarReqVO {
private Long skuId;
private Integer roomId;
}
@Data
public class ShopCarRespVO {
private Long userId;
private Integer roomId;
private List<ShopCarItemRespDTO> shopCarItemRespDTOS;
}
@Data
public class PrepareOrderVO {
private Long userId;
private Integer roomId;
}
qiyu-live-gift-interface:
@Data
public class SkuInfoDTO implements Serializable {
@Serial
private static final long serialVersionUID = 6682046584901973187L;
private Long id;
private Long skuId;
private Integer skuPrice;
private String skuCode;
private String name;
private String iconUrl;
private String originalIconUrl;
private Integer status;
private String remark;
}
@Data
public class SkuDetailInfoDTO implements Serializable {
@Serial
private static final long serialVersionUID = -1279033925266285109L;
private Long skuId;
private Integer skuPrice;
private String skuCode;
private String name;
private String iconUrl;
private String originalIconUrl;
private String remark;
//还有其它复杂数据
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ShopCarReqDTO implements Serializable {
@Serial
private static final long serialVersionUID = -341133016477720753L;
private Long userId;
private Long skuId;
private Integer roomId;
}
@Data
public class ShopCarItemRespDTO implements Serializable {
@Serial
private static final long serialVersionUID = 7247175817439564893L;
private Integer count;
private SkuInfoDTO skuInfoDTO;
}
@Data
public class ShopCarRespDTO implements Serializable {
@Serial
private static final long serialVersionUID = 7147830236451419334L;
private Long userId;
private Integer roomId;
private List<ShopCarItemRespDTO> skuCarItemRespDTODTOS;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SkuOrderInfoReqDTO implements Serializable {
@Serial
private static final long serialVersionUID = -9220028624463964600L;
private Long id;
private Long userId;
private Integer roomId;
private Integer status;
private List<Long> skuIdList;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SkuOrderInfoRespDTO implements Serializable {
@Serial
private static final long serialVersionUID = 2916280620499166681L;
private Long Id;
private String skuIdList;
private Long userId;
private Integer roomId;
private Integer status;
private String extra;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RollBackStockBO {
private Long userId;
private Long orderId;
}
@Data
public class PrepareOrderReqDTO implements Serializable {
@Serial
private static final long serialVersionUID = 1742445784431200306L;
private Long userId;
private Integer roomId;
}
public enum SkuOrderInfoEnum {
PREPARE_PAY(0, "待支付状态"),
HAS_PAY(1, "已支付状态"),
CANCEL(2, "取消订单状态");
int code;
String desc;
SkuOrderInfoEnum(int code, String desc) {
this.code = code;
this.desc = desc;
}
public int getCode() {
return code;
}
public String getDesc() {
return desc;
}
}
@Data
public class SkuPrepareOrderInfoDTO implements Serializable {
@Serial
private static final long serialVersionUID = -8683020132073931910L;
private Integer totalPrice;
private List<ShopCarItemRespDTO> skuPrepareOrderItemInfoDTOS;
}
qiyu-live-gift-provider:
@Data
@TableName("t_sku_info")
public class SkuInfoPO {
@TableId(type = IdType.AUTO)
private Long id;
private Long skuId;
private Integer skuPrice;
private String skuCode;
private String name;
private String iconUrl;
private String originalIconUrl;
private Integer status;
private String remark;
private Date createTime;
private Date updateTime;
}
@Data
@TableName("t_anchor_shop_info")
public class AnchorShopInfoPO {
@TableId(type = IdType.AUTO)
private Integer id;
private Long anchorId;
private Long skuId;
private Integer status;
private Date createTime;
private Date updateTime;
}
@Data
@TableName("t_sku_stock_info")
public class SkuStockInfoPO {
@TableId(type = IdType.AUTO)
private Integer id;
private Long skuId;
private Integer stockNum;
private Integer status;
private Date createTime;
private Date updateTime;
}
@Data
@TableName("t_category_info")
public class CategoryInfoPO {
@TableId(type = IdType.AUTO)
private Integer level;
private String categoryName;
private Integer parentId;
private Integer status;
private Date createTime;
private Date updateTime;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("t_sku_order_info")
public class SkuOrderInfoPO {
@TableId(type = IdType.AUTO)
private Long id;
private String skuIdList;
private Long userId;
private Integer roomId;
private Integer status;
private String extra;
private Date createTime;
private Date updateTime;
}
@Mapper
public interface ISkuInfoMapper extends BaseMapper<SkuInfoPO> {
}
@Mapper
public interface IAnchorShopInfoMapper extends BaseMapper<AnchorShopInfoPO> {
}
@Mapper
public interface ISkuStockInfoMapper extends BaseMapper<SkuStockInfoPO> {
}
@Mapper
public interface ICategoryInfoMapper extends BaseMapper<CategoryInfoPO> {
}
@Mapper
public interface ISkuOrderInfoMapper extends BaseMapper<SkuOrderInfoPO> {
}
redis-starter:
@Configuration
@Conditional(RedisKeyLoadMatch.class)
public class GiftProviderCacheKeyBuilder extends RedisKeyBuilder {
...
private static String SKU_DETAIL_INFO_MAP = "sku_detail_info_map";
private static String SHOP_CAR = "shop_car";
private static String SKU_STOCK = "sku_stock";
private static String STOCK_SYNC_LOCK = "stock_sync_lock";
public String buildStockSyncLock() {
return super.getPrefix() + STOCK_SYNC_LOCK;
}
public String buildSkuStock(Long skuId) {
return super.getPrefix() + SKU_STOCK + super.getSplitItem() + skuId;
}
public String buildUserShopCar(Long userId, Integer roomId) {
return super.getPrefix() + SKU_DETAIL_INFO_MAP + super.getSplitItem() + userId + super.getSplitItem() + roomId;
}
public String buildSkuDetailInfoMap(Long anchorId) {
return super.getPrefix() + SKU_DETAIL_INFO_MAP + super.getSplitItem() + anchorId;
}
前端接口qiyu-live-api:
@RestController
@RequestMapping("/shop")
public class ShopInfoController {
@Resource
private IShopInfoService shopInfoService;
@PostMapping("/listSkuInfo")
public WebResponseVO listSkuInfo(Long anchorId) {
return WebResponseVO.success(shopInfoService.queryByAnchorId(anchorId));
}
@PostMapping("detail")
public WebResponseVO detail(SkuInfoReqVO reqVO) {
return WebResponseVO.success(shopInfoService.detail(reqVO));
}
}
public interface IShopInfoService {
/**
* 根据anchorId查询商品列表
*/
List<SkuInfoVO> queryByAnchorId(Long anchorId);
/**
* 根据skuId查询商品详情信息
*/
SkuDetailInfoVO detail(SkuInfoReqVO skuInfoReqVO);
}
@Service
public class ShopInfoServiceImpl implements IShopInfoService {
@DubboReference
private ISkuInfoRpc skuInfoRpc;
@Override
public List<SkuInfoVO> queryByAnchorId(Long anchorId) {
List<SkuInfoDTO> skuInfoDTOS = skuInfoRpc.queryByAnchorId(anchorId);
return ConvertBeanUtils.convertList(skuInfoDTOS, SkuInfoVO.class);
}
@Override
public SkuDetailInfoVO detail(SkuInfoReqVO skuInfoReqVO) {
return ConvertBeanUtils.convert(skuInfoRpc.queryBySkuId(skuInfoReqVO.getSkuId(), skuInfoReqVO.getAnchorId()), SkuDetailInfoVO.class);
}
}
qiyu-live-gift-provider:
public interface ISkuInfoRpc {
/**
* 根据anchorId查询skuInfoList
*/
List<SkuInfoDTO> queryByAnchorId(Long anchorId);
SkuDetailInfoDTO queryBySkuId(Long skuId, Long anchorId);
}
@DubboService
public class SkuInfoRpcImpl implements ISkuInfoRpc {
@Resource
private ISkuInfoService skuInfoService;
@Resource
private IAnchorShopInfoService anchorShopInfoService;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private GiftProviderCacheKeyBuilder cacheKeyBuilder;
@Override
public List<SkuInfoDTO> queryByAnchorId(Long anchorId) {
String cacheKey = cacheKeyBuilder.buildSkuDetailInfoMap(anchorId);
List<SkuInfoDTO> skuInfoDTOS = redisTemplate.opsForHash().values(cacheKey).stream().map(x -> (SkuInfoDTO) x).collect(Collectors.toList());
if (!CollectionUtils.isEmpty(skuInfoDTOS)) {
if (skuInfoDTOS.get(0).getSkuId() == null) {
return Collections.emptyList();
}
return skuInfoDTOS;
}
List<Long> skuIdList = anchorShopInfoService.querySkuIdsByAnchorId(anchorId);
if (CollectionUtils.isEmpty(skuIdList)) {
return Collections.emptyList();
}
skuInfoDTOS = ConvertBeanUtils.convertList(skuInfoService.queryBySkuIds(skuIdList), SkuInfoDTO.class);
if (CollectionUtils.isEmpty(skuInfoDTOS)) {
redisTemplate.opsForHash().put(cacheKey, -1, new PayProductDTO());
redisTemplate.expire(cacheKey, 1L, TimeUnit.MINUTES);
return Collections.emptyList();
}
// 使用Redis进行缓存
Map<String, SkuInfoDTO> skuInfoMap = skuInfoDTOS.stream().collect(Collectors.toMap(x -> String.valueOf(x.getSkuId()), x -> x));
redisTemplate.opsForHash().putAll(cacheKey, skuInfoMap);
redisTemplate.expire(cacheKey, 30L, TimeUnit.MINUTES);
return skuInfoDTOS;
}
@Override
public SkuDetailInfoDTO queryBySkuId(Long skuId, Long anchorId) {
String cacheKey = cacheKeyBuilder.buildSkuDetailInfoMap(anchorId);
SkuInfoDTO skuInfoDTO = (SkuInfoDTO) redisTemplate.opsForHash().get(cacheKey, String.valueOf(skuId));
if (skuInfoDTO != null) {
return ConvertBeanUtils.convert(skuInfoDTO, SkuDetailInfoDTO.class);
}
skuInfoDTO = ConvertBeanUtils.convert(skuInfoService.queryBySkuId(skuId), SkuInfoDTO.class);
if (skuInfoDTO != null) {
redisTemplate.opsForHash().put(cacheKey, String.valueOf(skuId), skuInfoDTO);
}
return ConvertBeanUtils.convert(skuInfoDTO, SkuDetailInfoDTO.class);
}
}
public interface ISkuInfoService {
/**
* 使用skuIdList进行批量查询
*/
List<SkuInfoPO> queryBySkuIds(List<Long> skuIdList);
/**
* 直接将SkuInfo当成SkuDetailInfo,根据skuId查询Info
*/
SkuInfoPO queryBySkuId(Long skuId);
}
@Service
public class SkuInfoServiceImpl implements ISkuInfoService {
@Resource
private ISkuInfoMapper skuInfoMapper;
@Override
public List<SkuInfoPO> queryBySkuIds(List<Long> skuIdList) {
LambdaQueryWrapper<SkuInfoPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.in(SkuInfoPO::getSkuId, skuIdList);
queryWrapper.eq(SkuInfoPO::getStatus, CommonStatusEnum.VALID_STATUS.getCode());
return skuInfoMapper.selectList(queryWrapper);
}
@Override
public SkuInfoPO queryBySkuId(Long skuId) {
LambdaQueryWrapper<SkuInfoPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.in(SkuInfoPO::getSkuId, skuId);
queryWrapper.eq(SkuInfoPO::getStatus, CommonStatusEnum.VALID_STATUS.getCode());
queryWrapper.last("limit 1");
return skuInfoMapper.selectOne(queryWrapper);
}
}
购物车接口的开发:
- 用户进入直播间,查看到商品列表
- 用户查看商品详情
- 用户把感兴趣的商品,加入待支付的购物车中(购物车的概念)-> 购物车的基本存储结构(按照直播间为维度去设计购物车),直播间的购物车是独立的,不会存在数据跨直播间存在的情况
- 购物车的添加,移除
- 购物车的内容展示
- 购物车的清空
购物车以及塞满了,下边的逻辑是怎样的?
- 预下单,(手机产品100台,库存的预锁定操作)
- 如果下单成功(库存就正常扣减了)
- 如果到达一定时间限制没有下单(100台手机,100台库存锁定,不支付,支付倒计时,库存回滚,订单状态会变成支付超时状态)
qiyu-live-api:
@RestController
@RequestMapping("/shop")
public class ShopInfoController {
...
// 购物车接口的开发:
// 用户进入直播间,查看到商品列表
// 用户查看商品详情
// 用户把感兴趣的商品,加入待支付的购物车中(购物车的概念)-> 购物车的基本存储结构(按照直播间为维度去设计购物车),直播间的购物车是独立的,不会存在数据跨直播间存在的情况
// 购物车的添加,移除
// 购物车的内容展示
// 购物车的清空
@PostMapping("/addCar")
public WebResponseVO addCar(ShopCarReqVO reqVO) {
return WebResponseVO.success(shopInfoService.addCar(reqVO));
}
@PostMapping("/removeFromCar")
public WebResponseVO removeFromCar(ShopCarReqVO reqVO) {
return WebResponseVO.success(shopInfoService.removeFromCar(reqVO));
}
@PostMapping("/getCarInfo")
public WebResponseVO getCarInfo(ShopCarReqVO reqVO) {
return WebResponseVO.success(shopInfoService.getCarInfo(reqVO));
}
@PostMapping("/clearCar")
public WebResponseVO clearCar(ShopCarReqVO reqVO) {
return WebResponseVO.success(shopInfoService.clearShopCar(reqVO));
}
// 购物车以及塞满了,下边的逻辑是怎样的?
// 预下单,(手机产品100台,库存的预锁定操作)
// 如果下单成功(库存就正常扣减了)
// 如果到达一定时间限制没有下单(100台手机,100台库存锁定,不支付,支付倒计时,库存回滚,订单状态会变成支付超时状态)
}
public interface IShopInfoService {
...
/**
* 添加购物车
*/
Boolean addCar(ShopCarReqVO reqVO);
/**
* 移除购物车
*/
Boolean removeFromCar(ShopCarReqVO reqVO);
/**
* 清空购物车
*/
Boolean clearShopCar(ShopCarReqVO reqVO);
/**
* 修改购物车中某个商品的数量
*/
Boolean addCarItemNum(ShopCarReqVO reqVO);
/**
* 查看购物车信息
*/
ShopCarRespVO getCarInfo(ShopCarReqVO reqVO);
}
@Service
public class ShopInfoServiceImpl implements IShopInfoService {
...
@Override
public Boolean addCar(ShopCarReqVO reqVO) {
return shopCarRpc.addCar(new ShopCarReqDTO(QiyuRequestContext.getUserId(), reqVO.getSkuId(), reqVO.getRoomId()));
}
@Override
public Boolean removeFromCar(ShopCarReqVO reqVO) {
return shopCarRpc.removeFromCar(new ShopCarReqDTO(QiyuRequestContext.getUserId(), reqVO.getSkuId(), reqVO.getRoomId()));
}
@Override
public Boolean clearShopCar(ShopCarReqVO reqVO) {
return shopCarRpc.clearShopCar(new ShopCarReqDTO(QiyuRequestContext.getUserId(), reqVO.getSkuId(), reqVO.getRoomId()));
}
@Override
public Boolean addCarItemNum(ShopCarReqVO reqVO) {
return shopCarRpc.addCarItemNum(new ShopCarReqDTO(QiyuRequestContext.getUserId(), reqVO.getSkuId(), reqVO.getRoomId()));
}
@Override
public ShopCarRespVO getCarInfo(ShopCarReqVO reqVO) {
return ConvertBeanUtils.convert(shopCarRpc.getCarInfo(new ShopCarReqDTO(QiyuRequestContext.getUserId(), reqVO.getSkuId(), reqVO.getRoomId())), ShopCarRespVO.class);
}
}
qiyu-live-gift-provider:
//gift-interface中
public interface IShopCarRpc {
/**
* 添加商品到购物车中
*/
Boolean addCar(ShopCarReqDTO shopCarReqDTO);
/**
* 移除购物车
*/
Boolean removeFromCar(ShopCarReqDTO shopCarReqDTO);
/**
* 清空购物车
*/
Boolean clearShopCar(ShopCarReqDTO shopCarReqDTO);
/**
* 修改购物车中某个商品的数量
*/
Boolean addCarItemNum(ShopCarReqDTO shopCarReqDTO);
/**
* 查看购物车信息
*/
ShopCarRespDTO getCarInfo(ShopCarReqDTO shopCarReqDTO);
}
@DubboService
public class ShopCarRpcImpl implements IShopCarRpc {
@Resource
private IShopCarService shopCarService;
@Override
public Boolean addCar(ShopCarReqDTO shopCarReqDTO) {
return shopCarService.addCar(shopCarReqDTO);
}
@Override
public Boolean removeFromCar(ShopCarReqDTO shopCarReqDTO) {
return shopCarService.removeFromCar(shopCarReqDTO);
}
@Override
public Boolean clearShopCar(ShopCarReqDTO shopCarReqDTO) {
return shopCarService.clearShopCar(shopCarReqDTO);
}
@Override
public Boolean addCarItemNum(ShopCarReqDTO shopCarReqDTO) {
return shopCarService.addCarItemNum(shopCarReqDTO);
}
@Override
public ShopCarRespVO getCarInfo(ShopCarReqVO reqVO) {
ShopCarRespDTO carInfo = shopCarRpc.getCarInfo(new ShopCarReqDTO(QiyuRequestContext.getUserId(), reqVO.getSkuId(), reqVO.getRoomId()));
ShopCarRespVO respVO = ConvertBeanUtils.convert(carInfo, ShopCarRespVO.class);
respVO.setShopCarItemRespDTOS(carInfo.getSkuCarItemRespDTODTOS());
return respVO;
}
}
public interface IShopCarService {
/**
* 添加商品到购物车中
*/
Boolean addCar(ShopCarReqDTO shopCarReqDTO);
/**
* 移除购物车
*/
Boolean removeFromCar(ShopCarReqDTO shopCarReqDTO);
/**
* 清空购物车
*/
Boolean clearShopCar(ShopCarReqDTO shopCarReqDTO);
/**
* 修改购物车中某个商品的数量
*/
Boolean addCarItemNum(ShopCarReqDTO shopCarReqDTO);
/**
* 查看购物车信息
*/
ShopCarRespDTO getCarInfo(ShopCarReqDTO shopCarReqDTO);
}
@Service
public class ShopCarServiceImpl implements IShopCarService {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private GiftProviderCacheKeyBuilder cacheKeyBuilder;
@Resource
private ISkuInfoService skuInfoService;
/**
* 因为是以直播间为维度的购物车,所以不需要持久化,用缓存即可
*/
@Override
public Boolean addCar(ShopCarReqDTO shopCarReqDTO) {
String cacheKey = cacheKeyBuilder.buildUserShopCar(shopCarReqDTO.getUserId(), shopCarReqDTO.getRoomId());
redisTemplate.opsForHash().put(cacheKey, String.valueOf(shopCarReqDTO.getSkuId()), 1);
return true;
}
@Override
public Boolean removeFromCar(ShopCarReqDTO shopCarReqDTO) {
String cacheKey = cacheKeyBuilder.buildUserShopCar(shopCarReqDTO.getUserId(), shopCarReqDTO.getRoomId());
redisTemplate.opsForHash().delete(cacheKey, String.valueOf(shopCarReqDTO.getSkuId()));
return true;
}
@Override
public Boolean clearShopCar(ShopCarReqDTO shopCarReqDTO) {
String cacheKey = cacheKeyBuilder.buildUserShopCar(shopCarReqDTO.getUserId(), shopCarReqDTO.getRoomId());
redisTemplate.delete(cacheKey);
return true;
}
@Override
public Boolean addCarItemNum(ShopCarReqDTO shopCarReqDTO) {
String cacheKey = cacheKeyBuilder.buildUserShopCar(shopCarReqDTO.getUserId(), shopCarReqDTO.getRoomId());
redisTemplate.opsForHash().increment(cacheKey, String.valueOf(shopCarReqDTO.getSkuId()), 1);
return true;
}
@Override
public ShopCarRespDTO getCarInfo(ShopCarReqDTO shopCarReqDTO) {
String cacheKey = cacheKeyBuilder.buildUserShopCar(shopCarReqDTO.getUserId(), shopCarReqDTO.getRoomId());
Map<Object, Object> entries = redisTemplate.opsForHash().entries(cacheKey);
if (CollectionUtils.isEmpty(entries)) {
return new ShopCarRespDTO();
}
Map<Long, Integer> skuCountMap = new HashMap<>(entries.size());
for (Map.Entry<Object, Object> entry : entries.entrySet()) {
skuCountMap.put(Long.valueOf((String) entry.getKey()), (Integer) entry.getValue());
}
List<Long> skuIdList = new ArrayList<>(skuCountMap.keySet());
List<SkuInfoPO> skuInfoPOS = skuInfoService.queryBySkuIds(skuIdList);
ShopCarRespDTO shopCarRespDTO = new ShopCarRespDTO();
shopCarRespDTO.setRoomId(shopCarReqDTO.getRoomId());
shopCarRespDTO.setUserId(shopCarReqDTO.getUserId());
List<ShopCarItemRespDTO> itemList = new ArrayList<>();
skuInfoPOS.forEach(skuInfoPO -> {
ShopCarItemRespDTO item = new ShopCarItemRespDTO();
item.setSkuInfoDTO(ConvertBeanUtils.convert(skuInfoPO, SkuInfoDTO.class));
item.setCount(skuCountMap.get(skuInfoPO.getSkuId()));
itemList.add(item);
});
shopCarRespDTO.setSkuCarItemRespDTODTOS(itemList);
return shopCarRespDTO;
}
}
背景
在直播带货场景中,用户可能会在带货过程中出现抢购情况。例如10000个在线观看直播的用户,于同一时刻参与到抢购行为当中。而在我们的商品下单流程中,库存是一个非常重要的因素。
通常来说,我们建议在以下几个步骤进行库存交验:
- 加入购物车的时候(真正有购买行为的产生)
- 生成待订单的时候(库存校验,时间间隔比较大,这批货物是可以购买的
一般来说,大多数情况都是选择在生成待支付订单的时候才会预先扣减库存。而简单的加入购物车其实用户并没有算做是真实购买行为,所以只需要交验基本库存即可
- Mysql 2core 4gb的机器,写流量,抗500+tps,读1000+qps(极限)(支撑的并发度不高,成本很高)
- Redis缓存数据库(读1w+ qps,机器配置高,5w+ qps,redis cluster )
- 顺序写数据库,写日志类型的方案(redo log)
基于Redis + MySQL + MQ的方案
抢购活动组成:活动开始前宣传(主播在准备参与抢购活动之前,需要先点击预热按钮) -> 抢购 -> 抢购结束
将库存的数目存在于redis中,依靠decr指令去扣减库存。
- 被动同步:定时同步,定时任务,每隔一段时间,将我们的库存同步到db中
- 主动同步:每次访问redis之后,异步线程刷新同步到db中,对db的压力会比较大
同时需要依靠一个定时任务,定时将redis中的库存数目同步到mysql表中。但是这里需要注意以下几点:
- Redis是内存数据库,内存占用到一定阈值的时候,可能会回收部分数据。
- 不一定要等到ttl清零才回收Redis,有些回收策略,会对ttl进行判断
- 单独拉一台Redis也是可以的
- 库存存入Redis后的有效期要足够长,避免说这份缓存数据在抢购活动还没结束前被回收了。(甚至可以考虑单独拉一台redis,然后不设置过期时间,等带货高峰期过后,全部清空)
- 库存扣减前需要先校验是否充足,然后再记录扣减库存流水,最后才执行库存扣减(这里有多元操作,可以考虑使用Lua脚本)(流水id 临时记录在redis的string中,重复请求)
- 最后库存扣减成功后,需要发送RMQ去告知下游系统,例如执行支付倒计时,系统通知,购物车清空等操作
- 支付成功,利用支付中台(qiyu-live-bank)的RMQ通知下游系统,订单状态修改,商品发货(物流系统打交道),系统通知等操作
库存回滚问题
如果购物车下单后,一直处于待支付状态,那么就会导致我们的库存一直被锁住了,所以通常来说,这里会做一层回滚的策略。
定时任务回滚
需要将处于待支付状态的信息记录到一张表中,表中需要记录订单的生成时间,然后由一个定时任务每隔15秒扫描一次,进行校验,如果订单超时则修改其状态,回滚对应的库存。
基于延时消息
RMQ本身基于时间轮做了一套延时队列,我们可以利用该特效去实现,下单30min后消息反向通知的功能。
public interface ISkuStockInfoRpc {
/**
* 根据stuId更新库存值
*/
boolean decrStockNumBySkuId(Long skuId, Integer num);
/**
* 预热库存信息:将库存存入到Redis
*/
boolean prepareStockInfo(Long anchorId);
/**
* 从Redis中查询缓存的库存值
*/
Integer queryStockNum(Long skuId);
/**
* 同步库存数据到MySQL
*/
boolean syncStockNumToMySQL(Long anchor);
}
@DubboService
public class SkuStockInfoRpcImpl implements ISkuStockInfoRpc {
@Resource
private ISkuStockInfoService skuStockInfoService;
@Resource
private IAnchorShopInfoService anchorShopInfoService;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private GiftProviderCacheKeyBuilder cacheKeyBuilder;
@Override
public boolean decrStockNumBySkuId(Long skuId, Integer num) {
return skuStockInfoService.decrStockNumBySkuId(skuId, num);
}
@Override
public boolean prepareStockInfo(Long anchorId) {
List<Long> skuIdList = anchorShopInfoService.querySkuIdsByAnchorId(anchorId);
List<SkuStockInfoPO> skuStockInfoPOS = skuStockInfoService.queryBySkuIds(skuIdList);
Map<String, Integer> cacheKeyMap = skuStockInfoPOS.stream()
.collect(Collectors.toMap(skuStockInfoPO -> cacheKeyBuilder.buildSkuStock(skuStockInfoPO.getSkuId()), SkuStockInfoPO::getStockNum));
redisTemplate.opsForValue().multiSet(cacheKeyMap);
redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
for (String key : cacheKeyMap.keySet()) {
operations.expire((K) key, 1L, TimeUnit.DAYS);
}
return null;
}
});
return true;
}
@Override
public Integer queryStockNum(Long skuId) {
String cacheKey = cacheKeyBuilder.buildSkuStock(skuId);
Object stockObj = redisTemplate.opsForValue().get(cacheKey);
return stockObj == null ? null : (Integer) stockObj;
}
@Override
public boolean syncStockNumToMySQL(Long anchor) {
return true;
}
}
public interface ISkuStockInfoService {
/**
* 根据stuId跟新库存之
*/
boolean updateStockNum(Long skuId, Integer stockNum);
/**
* 根据stuId扣减库存值
*/
boolean decrStockNumBySkuId(Long skuId, Integer num);
/**
* 使用lua脚本扣减缓存的库存值
*/
boolean decrStockNumBySkuIdByLua(Long skuId, Integer num);
/**
* 根据skuId查询库存值
*/
SkuStockInfoPO queryBySkuId(Long skuId);
/**
* 根据stuIdList批量查询数据
*/
List<SkuStockInfoPO> queryBySkuIds(List<Long> skuIdList);
}
@Service
public class SkuStockInfoServiceImpl implements ISkuStockInfoService {
@Resource
private ISkuStockInfoMapper skuStockInfoMapper;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private GiftProviderCacheKeyBuilder cacheKeyBuilder;
@Override
public boolean updateStockNum(Long skuId, Integer stockNum) {
LambdaUpdateWrapper<SkuStockInfoPO> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.eq(SkuStockInfoPO::getSkuId, skuId);
SkuStockInfoPO skuStockInfoPO = new SkuStockInfoPO();
skuStockInfoPO.setStockNum(stockNum);
return skuStockInfoMapper.update(skuStockInfoPO, updateWrapper) > 0;
}
@Override
public boolean decrStockNumBySkuId(Long skuId, Integer num) {
return skuStockInfoMapper.decrStockNumBySkuId(skuId, num);
}
@Override
public SkuStockInfoPO queryBySkuId(Long skuId) {
LambdaQueryWrapper<SkuStockInfoPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(SkuStockInfoPO::getSkuId, skuId);
queryWrapper.eq(SkuStockInfoPO::getStatus, CommonStatusEnum.VALID_STATUS.getCode());
queryWrapper.last("limit 1");
return skuStockInfoMapper.selectOne(queryWrapper);
}
@Override
public List<SkuStockInfoPO> queryBySkuIds(List<Long> skuIdList) {
LambdaQueryWrapper<SkuStockInfoPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.in(SkuStockInfoPO::getSkuId, skuIdList);
queryWrapper.eq(SkuStockInfoPO::getStatus, CommonStatusEnum.VALID_STATUS.getCode());
return skuStockInfoMapper.selectList(queryWrapper);
}
}
@Mapper
public interface ISkuStockInfoMapper extends BaseMapper<SkuStockInfoPO> {
@Update("update t_sku_stock_info set stock_num = stock_num - #{num} where sku_id = #{skuId} and stock_num >= #{num}")
boolean decrStockNumBySkuId(@Param("skuId") Long skuId, @Param("num") Integer num);
}
因为我们要保证判断库存和库存扣减操作是原子的,所以常用的实现方式有两种:
- 分布式锁
- lua脚本
resources下新建secKill.lua:
if (redis.call('exists', KEYS[1])) == 1 then
local currentStock = redis.call('get', KEYS[1])
if (tonumber(currentStock) >= tonumber(ARGV[1])) then
return redis.call('decrby', KEYS[1], tonumber(ARGV[1]))
else
return -1
end
return -1
end
@Service
public class SkuStockInfoServiceImpl implements ISkuStockInfoService {
...
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("secKill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
public boolean decrStockNumBySkuIdByLua(Long skuId, Integer num) {
return redisTemplate.execute(
SECKILL_SCRIPT,
Collections.singletonList(cacheKeyBuilder.buildSkuStock(skuId)),
num
) >= 0;
}
我们前面只使用lua脚本在Redis中做了库存扣减操作,但是我们MySQL中的库存信息还没有同步,这里我们也有两种常见方法:
- 定时(每隔15秒)拉取Redis数据批量同步到MySQL
- 每次使用lua脚本扣减库存成功后,使用MQ异步的同步每次更新的数据到MySQL
@DubboService
public class SkuStockInfoRpcImpl implements ISkuStockInfoRpc {
...
@Override
public boolean syncStockNumToMySQL(Long anchor) {
List<Long> skuIdList = anchorShopInfoService.querySkuIdsByAnchorId(anchor);
for (Long skuId : skuIdList) {
Integer stockNum = this.queryStockNum(skuId);
if (stockNum != null) {
skuStockInfoService.updateStockNum(skuId, stockNum);
}
}
return true;
}
}
@Configuration
@EnableScheduling
public class RefreshStockNumConfig {
@Resource
private ISkuStockInfoRpc skuStockInfoRpc;
@Resource
private IAnchorShopInfoService anchorShopInfoService;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private GiftProviderCacheKeyBuilder cacheKeyBuilder;
@Scheduled(cron = "*/15 * * * * ? ")
public void refreshStockNum() {
String lockKey = cacheKeyBuilder.buildStockSyncLock();
Boolean isLock = redisTemplate.opsForValue().setIfAbsent(lockKey, 1, 15L, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(isLock)) {
List<Long> anchorIdList = anchorShopInfoService.queryAllValidAnchorId();
for (Long anchorId : anchorIdList) {
skuStockInfoRpc.syncStockNumToMySQL(anchorId);
}
}
}
}
这里我们要添加方法anchorShopInfoService.queryAllValidAnchorId():
public interface IAnchorShopInfoService { ... /** * 查询所有有效的主播id列表 */ List<Long> queryAllValidAnchorId(); }@Service public class AnchorShopInfoServiceImpl implements IAnchorShopInfoService { ... @Override public List<Long> queryAllValidAnchorId() { LambdaQueryWrapper<AnchorShopInfoPO> queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(AnchorShopInfoPO::getStatus, CommonStatusEnum.VALID_STATUS.getCode()); return anchorShopInfoMapper.selectList(queryWrapper).stream().map(AnchorShopInfoPO::getAnchorId).collect(Collectors.toList()); } }
public interface ISkuOrderInfoRpc {
/**
* 根据userId和roomId查询订单信息
*/
SkuOrderInfoRespDTO queryByUserIdAndRoomId(Long userId, Integer roomId);
/**
* 插入一条订单
*/
boolean insertOne(SkuOrderInfoReqDTO skuOrderInfoReqDTO);
/**
* 更新订单状态
*/
boolean updateOrderStatus(SkuOrderInfoReqDTO skuOrderInfoReqDTO);
}
@DubboService
public class SkuOrderInfoRpcImpl implements ISkuOrderInfoRpc {
@Resource
private ISkuOrderInfoService skuOrderInfoService;
@Override
public SkuOrderInfoRespDTO queryByUserIdAndRoomId(Long userId, Integer roomId) {
return skuOrderInfoService.queryByUserIdAndRoomId(userId, roomId);
}
@Override
public boolean insertOne(SkuOrderInfoReqDTO skuOrderInfoReqDTO) {
return skuOrderInfoService.insertOne(skuOrderInfoReqDTO) != null;
}
@Override
public boolean updateOrderStatus(SkuOrderInfoReqDTO skuOrderInfoReqDTO) {
return updateOrderStatus(skuOrderInfoReqDTO);
}
}
public interface ISkuOrderInfoService {
/**
* 根据userId和roomId查询订单信息
*/
SkuOrderInfoRespDTO queryByUserIdAndRoomId(Long userId, Integer roomId);
/**
* 插入一条订单
*/
SkuOrderInfoPO insertOne(SkuOrderInfoReqDTO skuOrderInfoReqDTO);
/**
* 更新订单状态
*/
boolean updateOrderStatus(SkuOrderInfoReqDTO skuOrderInfoReqDTO);
SkuOrderInfoRespDTO queryByOrderId(Long orderId);
}
@Service
public class SkuOrderInfoServiceImpl implements ISkuOrderInfoService {
@Resource
private ISkuOrderInfoMapper skuOrderInfoMapper;
@Override
public SkuOrderInfoRespDTO queryByUserIdAndRoomId(Long userId, Integer roomId) {
LambdaQueryWrapper<SkuOrderInfoPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(SkuOrderInfoPO::getUserId, userId);
queryWrapper.eq(SkuOrderInfoPO::getRoomId, roomId);
queryWrapper.orderByDesc(SkuOrderInfoPO::getId);
queryWrapper.last("limit 1");
return ConvertBeanUtils.convert(skuOrderInfoMapper.selectOne(queryWrapper), SkuOrderInfoRespDTO.class);
}
@Override
public SkuOrderInfoRespDTO queryByOrderId(Long orderId) {
return ConvertBeanUtils.convert(skuOrderInfoMapper.selectById(orderId), SkuOrderInfoRespDTO.class);
}
@Override
public SkuOrderInfoPO insertOne(SkuOrderInfoReqDTO skuOrderInfoReqDTO) {
// hutool工具包的StrUtil
String skuIdListStr = StrUtil.join(",", skuOrderInfoReqDTO.getSkuIdList());
SkuOrderInfoPO skuOrderInfoPO = ConvertBeanUtils.convert(skuOrderInfoReqDTO, SkuOrderInfoPO.class);
skuOrderInfoPO.setSkuIdList(skuIdListStr);
skuOrderInfoMapper.insert(skuOrderInfoPO);
return skuOrderInfoPO;
}
@Override
public boolean updateOrderStatus(SkuOrderInfoReqDTO skuOrderInfoReqDTO) {
SkuOrderInfoPO skuOrderInfoPO = new SkuOrderInfoPO();
skuOrderInfoPO.setStatus(skuOrderInfoReqDTO.getStatus());
skuOrderInfoPO.setId(skuOrderInfoReqDTO.getId());
skuOrderInfoMapper.updateById(skuOrderInfoPO);
return false;
}
}
qiyu-live-api
@RestController
@RequestMapping("/shop")
public class ShopInfoController {
...
// 购物车以及塞满了,下边的逻辑是怎样的?
// 预下单,(手机产品100台,库存的预锁定操作)
// 如果下单成功(库存就正常扣减了)
// 如果到达一定时间限制没有下单(100台手机,100台库存锁定,不支付,支付倒计时,库存回滚,订单状态会变成支付超时状态)
@PostMapping("/prepareOrder")
public WebResponseVO prepareOrder(PrepareOrderVO prepareOrderVO) {
return WebResponseVO.success(shopInfoService.prepareOrder(prepareOrderVO));
}
@PostMapping("/prepareStock")
public WebResponseVO prepareStock(Long anchorId) {
return WebResponseVO.success(shopInfoService.prepareStock(anchorId));
}
}
public interface IShopInfoService {
...
/**
* 进行预下单操作
*/
boolean prepareOrder(PrepareOrderVO prepareOrderVO);
/**
* 准备库存到Redis
*/
boolean prepareStock(Long anchorId);
}
@Service
public class ShopInfoServiceImpl implements IShopInfoService {
...
@Override
public boolean prepareOrder(PrepareOrderVO prepareOrderVO) {
PrepareOrderReqDTO reqDTO = new PrepareOrderReqDTO();
reqDTO.setRoomId(reqDTO.getRoomId());
reqDTO.setUserId(reqDTO.getUserId());
skuOrderInfoRpc.prepareOrder(reqDTO);
return false;
}
@Override
public boolean prepareStock(Long anchorId) {
return skuStockInfoRpc.prepareStockInfo(anchorId);
}
}
- 进行准备下单的逻辑编写
qiyu-live-gift-provider:
public interface ISkuOrderInfoRpc {
...
/**
* 预支付订单生成
*/
boolean prepareOrder(PrepareOrderReqDTO reqDTO);
}
@DubboService
public class SkuOrderInfoRpcImpl implements ISkuOrderInfoRpc {
...
@Override
public SkuPrepareOrderInfoDTO prepareOrder(PrepareOrderReqDTO reqDTO) {
ShopCarReqDTO shopCarReqDTO = ConvertBeanUtils.convert(reqDTO, ShopCarReqDTO.class);
ShopCarRespDTO carInfo = shopCarService.getCarInfo(shopCarReqDTO);
List<ShopCarItemRespDTO> carItemList = carInfo.getSkuCarItemRespDTODTOS();
if (CollectionUtils.isEmpty(carItemList)) {
return new SkuPrepareOrderInfoDTO();
}
List<Long> skuIdList = carItemList.stream().map(item -> item.getSkuInfoDTO().getSkuId()).collect(Collectors.toList());
Iterator<Long> iterator = skuIdList.iterator();
// 进行商品库存的扣减
while (iterator.hasNext()) {
Long skuId = iterator.next();
boolean isSuccess = skuStockInfoService.decrStockNumBySkuIdByLua(skuId, 1);
if (!isSuccess) iterator.remove();
}
SkuOrderInfoPO skuOrderInfoPO = skuOrderInfoService.insertOne(new SkuOrderInfoReqDTO(
null, reqDTO.getUserId(), reqDTO.getRoomId(), SkuOrderInfoEnum.PREPARE_PAY.getCode(), skuIdList));
// 清空购物车
shopCarService.clearShopCar(shopCarReqDTO);
// 发送延时MQ:若订单未支付,进行库存回滚
RollBackStockBO rollBackStockBO = new RollBackStockBO(reqDTO.getUserId(), skuOrderInfoPO.getId());
CompletableFuture<SendResult<String, String>> sendResult = kafkaTemplate.send(GiftProviderTopicNames.ROLL_BACK_STOCK, JSON.toJSONString(rollBackStockBO));
System.out.println(sendResult);
// 封装返回对象
SkuPrepareOrderInfoDTO respDTO = new SkuPrepareOrderInfoDTO();
List<ShopCarItemRespDTO> itemList = carItemList.stream().filter(item -> skuIdList.contains(item.getSkuInfoDTO().getSkuId())).collect(Collectors.toList());
respDTO.setSkuPrepareOrderItemInfoDTOS(itemList);
respDTO.setTotalPrice(itemList.stream().map(item -> item.getSkuInfoDTO().getSkuPrice()).reduce(Integer::sum).orElse(0));
return respDTO;
}
实现延时任务的常见思路:
- DelayQueue:缺点:单机保存在内存中,宕机后任务丢失
- 订阅Redis TTL过期:缺点:回调消息可能丢失,没有重试机制
- 发送延时MQ:如RocketMQ、RabbitMQ等
我这里采用Kafka + DelayQueue的方法模拟RocketMQ的延时消息(是不规范的,因为宕机后任务会丢失)
- MQ处理超时未支付订单:
public class GiftProviderTopicNames {
...
/**
* 回滚未支付订单库存的topic
*/
public static final String ROLL_BACK_STOCK = "rollback-stock";
/**
* 开启直播时同步商品库存到Redis中的topic
*/
public static final String START_LIVING_ROOM = "start-living-room";
}
/**
* 延迟任务
*/
public class DelayedTask implements Delayed {
/**
* 任务到期时间
*/
private long executeTime;
/**
* 任务
*/
private Runnable task;
public DelayedTask(long delay, Runnable task) {
this.executeTime = System.currentTimeMillis() + delay;
this.task = task;
}
/**
* 查看当前任务还有多久到期
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 延迟队列需要到期时间升序入队,所以我们需要实现compareTo进行到期时间比较
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return Long.compare(this.executeTime, ((DelayedTask) o).executeTime);
}
public void execute() {
task.run();
}
}
@Component
public class StockRollBackConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(StockRollBackConsumer.class);
@Resource
private ISkuStockInfoService skuStockInfoService;
private static final DelayQueue<DelayedTask> DELAY_QUEUE = new DelayQueue<>();
private static final ExecutorService DELAY_QUEUE_THREAD_POOL = new ThreadPoolExecutor(
3, 10,
10L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100)
);
@PostConstruct()
private void init() {
DELAY_QUEUE_THREAD_POOL.submit(() -> {
while (true) {
try {
DelayedTask task = DELAY_QUEUE.take();
task.execute();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
@KafkaListener(topics = GiftProviderTopicNames.ROLL_BACK_STOCK, groupId = "stock-roll-back")
public void stockRollBack(String rollBackStockBoStr) {
RollBackStockBO rollBackStockBO = JSON.parseObject(rollBackStockBoStr, RollBackStockBO.class);
DELAY_QUEUE.offer(new DelayedTask(30 * 60 * 1000, () -> skuStockInfoService.stockRollBackHandler(rollBackStockBO)));
LOGGER.info("[StockRollBackConsumer] rollback success, rollbackInfo is {}", rollBackStockBO);
}
}
public interface ISkuStockInfoService {
...
/**
* 库存回滚
*/
void stockRollBackHandler(RollBackStockBO rollBackStockBO);
}
@Service
public class SkuStockInfoServiceImpl implements ISkuStockInfoService {
...
@Override
public void stockRollBackHandler(RollBackStockBO rollBackStockBO) {
SkuOrderInfoRespDTO respDTO = skuOrderInfoService.queryByOrderId(rollBackStockBO.getOrderId());
if (respDTO == null || !respDTO.getStatus().equals(SkuOrderInfoEnum.PREPARE_PAY.getCode())) {
return;
}
SkuOrderInfoReqDTO skuOrderInfoReqDTO = new SkuOrderInfoReqDTO();
skuOrderInfoReqDTO.setStatus(SkuOrderInfoEnum.CANCEL.getCode());
skuOrderInfoReqDTO.setId(rollBackStockBO.getOrderId());
// 设置订单状态未撤销状态
skuOrderInfoService.updateOrderStatus(skuOrderInfoReqDTO);
// 回滚库存
List<Long> skuIdList = Arrays.stream(respDTO.getSkuIdList().split(",")).map(Long::valueOf).collect(Collectors.toList());
skuIdList.parallelStream().forEach(skuId -> {
// 只用更新Redis库存,定时任务会自动更新MySQL库存
redisTemplate.opsForValue().increment(cacheKeyBuilder.buildSkuStock(skuId), 1);
});
}
}
找到我们之前写的开播逻辑代码:LivingRoomServiceImpl中的startLivingRoom()方法,我们在这个方法最后加上发送 MQ进行异步同步库存的操作:
@Service
public class LivingRoomServiceImpl implements ILivingRoomService {
...
@Override
public Integer startLivingRoom(LivingRoomReqDTO livingRoomReqDTO) {
LivingRoomPO livingRoomPO = BeanUtil.copyProperties(livingRoomReqDTO, LivingRoomPO.class);
livingRoomPO.setStatus(CommonStatusEnum.VALID_STATUS.getCode());
livingRoomPO.setStartTime(new Date());
livingRoomMapper.insert(livingRoomPO);
String cacheKey = cacheKeyBuilder.buildLivingRoomObj(livingRoomPO.getId());
// 防止之前有空值缓存,这里做移除操作
redisTemplate.delete(cacheKey);
// 发送mq进行异步商品库存加载
kafkaTemplate.send(GiftProviderTopicNames.START_LIVING_ROOM, String.valueOf(livingRoomReqDTO.getAnchorId()));
return livingRoomPO.getId();
}
然后在我们的gift-provider模块中新建一个consumer:
@Component
public class StartLivingRoomConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(StartLivingRoomConsumer.class);
@Resource
private ISkuStockInfoRpc skuStockInfoRpc;
@KafkaListener(topics = GiftProviderTopicNames.START_LIVING_ROOM, groupId = "start-living-room-consumer")
public void startLivingRoom(String anchorIdStr) {
Long anchorId = Long.valueOf(anchorIdStr);
boolean isSuccess = skuStockInfoRpc.prepareStockInfo(anchorId);
if (isSuccess) {
LOGGER.info("[StartLivingRoomConsumer] 同步库存到Redis成功,主播id:{}", anchorId);
}
}
}
使用平台旗鱼币进行支付(若是使用第三方平台进行支付则在回调方法中添加以下编写的逻辑,然后删除扣减旗鱼币的逻辑即可)
@RestController
@RequestMapping("/shop")
public class ShopInfoController {
...
@PostMapping("/payNow")
public WebResponseVO payNow(PrepareOrderVO prepareOrderVO) {
return WebResponseVO.success(shopInfoService.payNow(prepareOrderVO));
}
}
public interface IShopInfoService {
...
/**
* 用户进行订单支付
*/
boolean payNow(PrepareOrderVO prepareOrderVO);
}
@Service
public class ShopInfoServiceImpl implements IShopInfoService {
...
@Override
public boolean payNow(PrepareOrderVO prepareOrderVO) {
return skuOrderInfoRpc.payNow(QiyuRequestContext.getUserId(), prepareOrderVO.getRoomId());
}
}
public interface ISkuOrderInfoRpc {
...
/**
* 用户对订单进行支付
*/
boolean payNow(Long userId, Integer roomId);
}
@DubboService
public class SkuOrderInfoRpcImpl implements ISkuOrderInfoRpc {
...
@Override
public boolean payNow(Long userId, Integer roomId) {
SkuOrderInfoRespDTO skuOrderInfo = skuOrderInfoService.queryByUserIdAndRoomId(userId, roomId);
// 判断是否是未支付状态
if (!skuOrderInfo.getStatus().equals(SkuOrderInfoEnum.PREPARE_PAY.getCode())) {
return false;
}
// 获取到订单中的skuIdList
List<Long> skuIdList = Arrays.stream(skuOrderInfo.getSkuIdList().split(",")).map(Long::valueOf).collect(Collectors.toList());
List<SkuInfoPO> skuInfoPOS = skuInfoService.queryBySkuIds(skuIdList);
// 计算出商品的总价
Integer totalPrice = skuInfoPOS.stream().map(SkuInfoPO::getSkuPrice).reduce(Integer::sum).orElse(0);
// 获取余额并判断余额是否充足
Integer balance = qiyuCurrencyAccountRpc.getBalance(userId);
if (balance < totalPrice) {
return false;
}
// 余额扣减
qiyuCurrencyAccountRpc.decr(userId, totalPrice);
// 更改订单状态未已支付
SkuOrderInfoReqDTO reqDTO = ConvertBeanUtils.convert(skuOrderInfo, SkuOrderInfoReqDTO.class);
reqDTO.setStatus(SkuOrderInfoEnum.HAS_PAY.getCode());
skuOrderInfoService.updateOrderStatus(reqDTO);
return true;
}
}