Skip to content

Latest commit

 

History

History
878 lines (726 loc) · 30.8 KB

5-直播间红包雨功能的实现.md

File metadata and controls

878 lines (726 loc) · 30.8 KB

start

[TOC]

1 直播间红包雨功能的设计与分析

  1. 生成红包逻辑
    1. 红包数据的存储位置如何设计?
      • 临时性数据,持久化存在MySQL,内存当中,读写性能非常好。通常做法是本地内存存储红包雨数据 或者 redis存储红包雨数据。 主播id -> 红包雨配置(唯一的code)
      • 1.使用Redis的List集合存储红包数据(key包含code)-> pop接口领取红包(单个List存储1k个红包)(redis的分片存储,100个list)
        • 缺点:每个List存储的数据量不能太大,否则有大key风险,但是可以利用 分片思路进行存储
      • 2.本地内存存储红包数据。(500台tomcat,每台tomcat的本地内存里存储一定量的红包数据。20000个红包,一共是1000w个红包,2亿元红包,1kw人瓜分)(结合dns访问所在地最近的一台clb,clb-> nginx集群 ->gateway -> tomcat)
        • 缺点:500台tomcat,假设其中的某一台挂了,导致这台tomcat的红包没被领完,但是对于平台来说并不亏
    2. 红包的领取是怎么一个过程?
      • Redis + List存储红包数据(每场直播对应一个List,1000个红包)
        • 如果按照主播id为维度去生成红包数据是怎么存储?
      • List(一定要有一个唯一的标识,key最好有一个唯一的code识别)
        • A主播开了一场直播,生成了一场红包雨的数据,但是没有领完,然后下播了。此时A主播下播后立马又获取了下一场直播的红包雨奖励。
        • list:(id) -> (上一场没有领完的红包数据)
        • list:(id) -> (往旧的list集合中塞入了红包数据)
    3. 生成红包的流程是怎样的?
      • 主播点击前端的一个按钮去生成的。
      • 如何防止红包重复生成?
        • 设置红包雨配置信息的状态,并进行限流防止重复点击
    4. 如何保证红包生成的均匀性?如何防止不超出预期金额限制?
  2. 抽红包逻辑
    1. 领取红包的流程如何设计?(redis的List的pop接口领取红包,如果是本地内存的话,queue也有pop功能)
    2. 如何防止恶意刷接口调用领取红包?(A直播间,恶意请求可以领取到B,C,D等多个直播间的数据)
      • 通过随机生成的红包雨code为标识进行抢红包,而不是以roomId或anchorId为标识
    3. 如何通知全场观众开始参与抢红包行为?(点击生成红包按钮,im异步通知主播,红包初始化完成,主播才有权限点击下一步,开始抢红包
    4. 直播间之间的领取红包接口不能直接互通,如何考虑接口安全性?

总结一下整个红包雨开始的流程:

主播点击准备开始红包雨 --> 后台根据红包雨配置生成红包金额列表(存储到Redis或者本地内存) --> 通知主播数据准备好 --> 开始抢红包

2 红包雨功能的实现

2.1 红包雨配置对象基本操作的实现

在之前qiyu-live-gift-provider操作的数据库中新建红包雨配置表格:

-- Create syntax for TABLE 't_red_packet_config'
CREATE TABLE `t_red_packet_config` (
  `id` int unsigned NOT NULL AUTO_INCREMENT,
  `anchor_id` bigint NOT NULL DEFAULT '0' COMMENT '主播id',
  `start_time` datetime DEFAULT NULL COMMENT '红包雨活动开始时间',
  `total_get` int NOT NULL DEFAULT '0' COMMENT '一共领取数量',
  `total_get_price` int NOT NULL DEFAULT '0' COMMENT '一共领取金额',
  `max_get_price` int NOT NULL DEFAULT '0' COMMENT '最大领取金额',
  `status` tinyint NOT NULL DEFAULT '1' COMMENT '(1 待准备,2已准备,3已发送)',
  `total_price` int NOT NULL DEFAULT '0' COMMENT '红包雨总金额数',
  `total_count` int unsigned NOT NULL DEFAULT '0' COMMENT '红包雨总红包数',
  `config_code` varchar(250) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '唯一code',
  `remark` varchar(250) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '备注',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='直播间红包雨配置';


INSERT INTO `qiyu_live_user`.`t_red_packet_config` 
(`id`, `anchor_id`, `start_time`, `total_get`, `total_get_price`, `max_get_price`, 
 `status`, `total_price`, `total_count`, `config_code`, `remark`, `create_time`, `update_time`) 
 VALUES (3, 285608972927369200, NULL, 0, 0, 0, 1, 10000, 1000, '1', '默认红包雨配置', 
'2024-02-24 22:28:47', '2024-02-24 22:28:47');

qiyu-live-gift-provider:

@Data
@TableName("t_red_packet_config")
public class RedPacketConfigPO {
    
    @TableId(type = IdType.AUTO)
    private Integer id;
    private Long anchorId;
    private Date startTime;
    private Integer totalGet;
    private Integer totalGetPrice;
    private Integer maxGetPrice;
    private Integer status;
    private Integer totalPrice;
    private Integer totalCount;
    private String configCode;
    private String remark;
    private Date createTime;
    private Date updateTime;
}
@Data
public class RedPacketConfigReqDTO implements Serializable {

    @Serial
    private static final long serialVersionUID = 5117539613836783248L;
    private Integer id;
    private Integer roomId;
    private Integer status;
    private Long userId;
    private String redPacketConfigCode;
    private Integer totalPrice;
    private Integer totalCount;
    private String remark;
}
//gift-interface中
@Data
public class RedPacketConfigRespDTO implements Serializable {

    @Serial
    private static final long serialVersionUID = 5117539613836783248L;
    private Long anchorId;
    private Integer totalPrice;
    private Integer totalCount;
    private String configCode;
    private String remark;
}
//gift-interface中
@Data
@AllArgsConstructor
public class RedPacketReceiveDTO implements Serializable {
    @Serial
    private static final long serialVersionUID = -5916608127876611063L;
    
    private Integer price;
    private String notifyMsg;
}
//gift-interface中
public enum RedPacketStatusEnum {
    
    WAIT(1,"待准备"),
    IS_PREPARED(2, "已准备"),
    IS_SEND(3, "已发送");

    int code;
    String desc;

	RedPacketStatusEnum(int code, String desc) {
        this.code = code;
        this.desc = desc;
    }

    public int getCode() {
        return code;
    }

    public String getDesc() {
        return desc;
    }
}
//gift-interface中
public interface IRedPacketConfigRpc {

    /**
     * 根据主播id查询有无发放红包雨的特权
     */
    RedPacketConfigRespDTO queryByAnchorId(Long anchorId);
    
    /**
     * 新增红包雨配置
     */
    boolean addOne(RedPacketConfigReqDTO redPacketConfigReqDTO);

    /**
     * 准备生成红包金额列表
     */
    boolean prepareRedPacket(Long anchorId);

    /**
     * 直播间用户领取红包
     */
    RedPacketReceiveDTO receiveRedPacket(RedPacketConfigReqDTO redPacketConfigReqDTO);

    /**
     * 开始红包雨
     */
    Boolean startRedPacket(RedPacketConfigReqDTO reqDTO);
    
}
@DubboService
public class RedPacketConfigRpcImpl implements IRedPacketConfigRpc {
    
    @Resource
    private IRedPacketConfigService redPacketConfigService;

    @Override
    public RedPacketConfigRespDTO queryByAnchorId(Long anchorId) {
        return ConvertBeanUtils.convert(redPacketConfigService.queryByAnchorId(anchorId), RedPacketConfigRespDTO.class);
    }

    @Override
    public boolean addOne(RedPacketConfigReqDTO redPacketConfigReqDTO) {
        return redPacketConfigService.addOne(ConvertBeanUtils.convert(redPacketConfigReqDTO, RedPacketConfigPO.class));
    }

    @Override
    public boolean prepareRedPacket(Long anchorId) {
        return redPacketConfigService.prepareRedPacket(anchorId);
    }

    @Override
    public RedPacketReceiveDTO receiveRedPacket(RedPacketConfigReqDTO redPacketConfigReqDTO) {
        return redPacketConfigService.receiveRedPacket(redPacketConfigReqDTO);
    }

    @Override
    public Boolean startRedPacket(RedPacketConfigReqDTO reqDTO) {
        return redPacketConfigService.startRedPacket(reqDTO);
    }
}
public interface IRedPacketConfigService {

    /**
     * 根据主播id查询有无发放红包雨的特权
     */
    RedPacketConfigPO queryByAnchorId(Long anchorId);

    /**
     * 根据code查询已准备的红包雨配置信息
     */
    RedPacketConfigPO queryByConfigCode(String code);

    /**
     * 新增红包雨配置
     */
    boolean addOne(RedPacketConfigPO redPacketConfigPO);

    /**
     * 更新红包雨配置
     */
    boolean updateById(RedPacketConfigPO redPacketConfigPO);

    /**
     * 主播开始准备红包雨
     */
    boolean prepareRedPacket(Long anchorId);

    /**
     * 直播间用户领取红包
     */
    RedPacketReceiveDTO receiveRedPacket(RedPacketConfigReqDTO redPacketConfigReqDTO);

    /**
     * 开始红包雨
     */
    Boolean startRedPacket(RedPacketConfigReqDTO reqDTO);
@Service
public class RedPacketConfigServiceImpl implements IRedPacketConfigService {

    @Resource
    private IRedPacketConfigMapper redPacketConfigMapper;
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    @Resource
    private GiftProviderCacheKeyBuilder cacheKeyBuilder;

    @Override
    public RedPacketConfigPO queryByAnchorId(Long anchorId) {
        LambdaQueryWrapper<RedPacketConfigPO> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(RedPacketConfigPO::getAnchorId, anchorId);
        queryWrapper.eq(RedPacketConfigPO::getStatus, RedPacketStatusEnum.WAIT.getCode());
        queryWrapper.orderByDesc(RedPacketConfigPO::getCreateTime);
        queryWrapper.last("limit 1");
        return redPacketConfigMapper.selectOne(queryWrapper);
    }

    @Override
    public RedPacketConfigPO queryByConfigCode(String code) {
        LambdaQueryWrapper<RedPacketConfigPO> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(RedPacketConfigPO::getConfigCode, code);
        queryWrapper.eq(RedPacketConfigPO::getStatus, RedPacketStatusEnum.IS_PREPARED.getCode());
        queryWrapper.orderByDesc(RedPacketConfigPO::getCreateTime);
        queryWrapper.last("limit 1");
        return redPacketConfigMapper.selectOne(queryWrapper);
    }

    @Override
    public boolean addOne(RedPacketConfigPO redPacketConfigPO) {
        redPacketConfigPO.setConfigCode(UUID.randomUUID().toString());
        return redPacketConfigMapper.insert(redPacketConfigPO) > 0;
    }

    @Override
    public boolean updateById(RedPacketConfigPO redPacketConfigPO) {
        return redPacketConfigMapper.updateById(redPacketConfigPO) > 0;
    }

    @Override
    public boolean prepareRedPacket(Long anchorId) {
        return true;
    }
    
    @Override
    public RedPacketReceiveDTO receiveRedPacket(String code) {
        return null;
    }
    
    @Override
    public Boolean startRedPacket(RedPacketConfigReqDTO reqDTO) {
        return true;
    }
}

2.2 红包雨金额列表的生成和存储

  • 准备:

redis的keyBuilder添加新的keyBuilder:

@Configuration
@Conditional(RedisKeyLoadMatch.class)
public class GiftProviderCacheKeyBuilder extends RedisKeyBuilder {

    ...
    private static String RED_PACKET_LIST = "red_packet_list";
    private static String RED_PACKET_INIT_LOCK = "red_packet_init_lock";
    private static String RED_PACKET_TOTAL_GET_COUNT = "red_packet_total_get_count";
    private static String RED_PACKET_TOTAL_GET_PRICE = "red_packet_total_get_price";
    private static String RED_PACKET_MAX_GET_PRICE = "red_packet_max_get_price";
    private static String USER_TOTAL_GET_PRICE_CACHE = "red_packet_user_total_get_price";
    private static String RED_PACKET_PREPARE_SUCCESS = "red_packet_prepare_success";
    private static String RED_PACKET_NOTIFY = "red_packet_notify";

    ...

    public String buildRedPacketList(String code) {
        return super.getPrefix() + RED_PACKET_LIST + super.getSplitItem() + code;
    }

    public String buildRedPacketInitLock(String code) {
        return super.getPrefix() + RED_PACKET_INIT_LOCK + super.getSplitItem() + code;
    }

    public String buildRedPacketTotalGetCount(String code) {
        return super.getPrefix() + RED_PACKET_TOTAL_GET_COUNT + super.getSplitItem() + (Math.abs(code.hashCode()) % 100);
    }

    public String buildRedPacketTotalGetPrice(String code) {
        return super.getPrefix() + RED_PACKET_TOTAL_GET_PRICE + super.getSplitItem() + (Math.abs(code.hashCode()) % 100);
    }

    public String buildRedPacketMaxGetPrice(String code) {
        return super.getPrefix() + RED_PACKET_MAX_GET_PRICE + super.getSplitItem() + (Math.abs(code.hashCode()) % 100);
    }
    
    public String buildUserTotalGetPrice(Long userId) {
        return super.getPrefix() + USER_TOTAL_GET_PRICE_CACHE + super.getSplitItem() + userId;
    }

    public String buildRedPacketPrepareSuccess(String code) {
        return super.getPrefix() + RED_PACKET_PREPARE_SUCCESS + super.getSplitItem() + code;
    }

    public String buildRedPacketNotify(String code) {
        return super.getPrefix() + RED_PACKET_NOTIFY + super.getSplitItem() + code;
    }
}

写一个工具类ListUtils:可以将大列表拆分为多个子列表

/**
 * 将List拆分为小的List,用于Redis中list的存入,避免 Redis的输入输出缓冲区 和 网络 发生堵塞
 */
public class ListUtils {

    /**
     * 将一个大List集合拆分为多个子list集合(每个子集合有subNum个元素)
     */
    public static <T> List<List<T>> splistList(List<T> list, int subNum) {
        List<List<T>> resultList = new ArrayList<>();
        int priIndex = 0;
        int lastIndex = 0;
        int insertTime = list.size() / subNum;
        List<T> subList;
        for (int i = 0; i <= insertTime; i++) {
            priIndex = subNum * i;
            lastIndex = priIndex + subNum;
            if (i != insertTime) {
                subList = list.subList(priIndex, lastIndex);
            } else {
                subList = list.subList(priIndex, list.size());
            }
            if (subList.size() > 0) {
                resultList.add(subList);
            }
        }
        return resultList;
    }
}
  • 开始书写逻辑:
  • 我们要用到上面写的ListUtils中的拆分列表功能,用于将生成的红包金额列表拆分为小列表,再往redis中存,避免Redis的输入输出缓冲区堵塞
  • 我们生成红包金额列表 使用的算法是 二倍均值法
@Service
public class RedPacketConfigServiceImpl implements IRedPacketConfigService {

    ...

    @Override
    public boolean prepareRedPacket(Long anchorId) {
        // 防止重复生成,以及错误参数传递情况
        RedPacketConfigPO redPacketConfigPO = this.queryByAnchorId(anchorId);
        if (redPacketConfigPO == null) {
            return false;
        }
        // 加锁保证原子性:仿重
        Boolean isLock = redisTemplate.opsForValue().setIfAbsent(cacheKeyBuilder.buildRedPacketInitLock(redPacketConfigPO.getConfigCode()), 1, 3L, TimeUnit.SECONDS);
        if (Boolean.FALSE.equals(isLock)) {
            return false;
        }
        Integer totalPrice = redPacketConfigPO.getTotalPrice();
        Integer totalCount = redPacketConfigPO.getTotalCount();
        List<Integer> priceList = this.createRedPacketPriceList(totalPrice, totalCount);
        String cacheKey = cacheKeyBuilder.buildRedPacketList(redPacketConfigPO.getConfigCode());
        // 将红包数据拆分为子集合进行插入到Redis,避免 Redis输入输出缓冲区 被填满
        List<List<Integer>> splitPriceList = ListUtils.splistList(priceList, 100);
        for (List<Integer> priceItemList : splitPriceList) {
            redisTemplate.opsForList().leftPushAll(cacheKey, priceItemList.toArray());
        }
        // 更改红包雨配置状态,防止重发
        redPacketConfigPO.setStatus(RedPacketStatusEnum.IS_PREPARED.getCode());
        this.updateById(redPacketConfigPO);
        // Redis中设置该红包雨已经准备好的标记
        redisTemplate.opsForValue().set(cacheKeyBuilder.buildRedPacketPrepareSuccess(redPacketConfigPO.getConfigCode()), 1, 1L, TimeUnit.DAYS);
        return true;
    }

    /**
     * 二倍均值法:
     * 创建红包雨的每个红包金额数据
     */
    private List<Integer> createRedPacketPriceList(Integer totalPrice, Integer totalCount) {
        List<Integer> redPacketPriceList = new ArrayList<>();
        for (int i = 0; i < totalCount; i++) {
            if (i + 1 == totalCount) {
                // 如果是最后一个红包
                redPacketPriceList.add(totalPrice);
                break;
            }
            int maxLimit = (totalPrice / (totalCount - i)) * 2;// 最大限额为平均值的两倍
            int currentPrice = ThreadLocalRandom.current().nextInt(1, maxLimit);
            totalPrice -= currentPrice;
            redPacketPriceList.add(currentPrice);
        }
        return redPacketPriceList;
    }
}

2.3 开始红包雨逻辑

主播接收到红包数据准备完毕后,就可以点击开始红包雨了

  • 准备:
public enum ImMsgBizCodeEnum {
    
    ...
    RED_PACKET_CONFIG(5560, "开启红包雨活动");
  • 开始书写逻辑:

需要用到SendGiftConsumer里封装的batchSendImMsg()方法,把它拷贝过来 很多地方都用到了这个方法,其实我们可以把它抽为一个单独的utils工具类,放在router-interface中

@Service
public class RedPacketConfigServiceImpl implements IRedPacketConfigService {

    ...
    @DubboReference
    private ImRouterRpc routerRpc;
    @DubboReference
    private ILivingRoomRpc livingRoomRpc;

    ...

    @Override
    public Boolean startRedPacket(RedPacketConfigReqDTO reqDTO) {
        String code = reqDTO.getRedPacketConfigCode();
        // 红包没有准备好,则返回false
        if (Boolean.FALSE.equals(redisTemplate.hasKey(cacheKeyBuilder.buildRedPacketPrepareSuccess(code)))) {
            return false;
        }
        // 红包已经开始过(有别的线程正在通知用户中),返回false
        String notifySuccessCacheKey = cacheKeyBuilder.buildRedPacketNotify(code);
        if (Boolean.TRUE.equals(redisTemplate.hasKey(notifySuccessCacheKey))) {
            return false;
        }
        redisTemplate.opsForValue().set(notifySuccessCacheKey, 1, 1L, TimeUnit.DAYS);
        // 广播通知直播间所有用户开始抢红包了
        RedPacketConfigPO redPacketConfigPO = this.queryByConfigCode(code);
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("redPacketConfig", JSON.toJSONString(redPacketConfigPO));
        LivingRoomReqDTO livingRoomReqDTO = new LivingRoomReqDTO();
        livingRoomReqDTO.setRoomId(reqDTO.getRoomId());
        livingRoomReqDTO.setAppId(AppIdEnum.QIYU_LIVE_BIZ.getCode());
        List<Long> userIdList = livingRoomRpc.queryUserIdsByRoomId(livingRoomReqDTO);
        if (CollectionUtils.isEmpty(userIdList)) return false;
        this.batchSendImMsg(userIdList, ImMsgBizCodeEnum.RED_PACKET_CONFIG.getCode(), jsonObject);
        // 更改红包雨配置的状态为已发送
        redPacketConfigPO.setStatus(RedPacketStatusEnum.IS_SEND.getCode());
        this.updateById(redPacketConfigPO);
        return true;
    }

    /**
     * 批量发送im消息
     */
    private void batchSendImMsg(List<Long> userIdList, Integer bizCode, JSONObject jsonObject) {
        List<ImMsgBody> imMsgBodies = new ArrayList<>();

        userIdList.forEach(userId -> {
            ImMsgBody imMsgBody = new ImMsgBody();
            imMsgBody.setAppId(AppIdEnum.QIYU_LIVE_BIZ.getCode());
            imMsgBody.setBizCode(bizCode);
            imMsgBody.setData(jsonObject.toJSONString());
            imMsgBody.setUserId(userId);
            imMsgBodies.add(imMsgBody);
        });
        routerRpc.batchSendMsg(imMsgBodies);
    }
}

2.4 领红包逻辑

//gift-interface中

/**
 * 用户红包雨抢红包后发送的mq消息体
 */
@Data
public class SendRedPacketBO implements Serializable {
    
    @Serial
    private static final long serialVersionUID = 1829802295999336708L;
    
    private Integer price;
    private RedPacketConfigReqDTO reqDTO;
}
public class GiftProviderTopicNames {

	...
    /**
     * 用户红包雨抢红包消息topic
     */
    public static final String RECEIVE_RED_PACKET = "receive-red-packet";
}
@Service
public class RedPacketConfigServiceImpl implements IRedPacketConfigService {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(RedPacketConfigServiceImpl.class);
	...

    @Override
    public RedPacketReceiveDTO receiveRedPacket(RedPacketConfigReqDTO redPacketConfigReqDTO) {
        String code = redPacketConfigReqDTO.getRedPacketConfigCode();
        // 从Redis中领取一个红包金额
        String cacheKey = cacheKeyBuilder.buildRedPacketList(code);
        Object priceObj = redisTemplate.opsForList().rightPop(cacheKey);
        if (priceObj == null) {
            return null;
        }
        Integer price = (Integer) priceObj;
        // 发送mq消息进行异步信息的统计,以及用户余额的增加
        SendRedPacketBO sendRedPacketBO = new SendRedPacketBO();
        sendRedPacketBO.setPrice(price);
        sendRedPacketBO.setReqDTO(redPacketConfigReqDTO);
        CompletableFuture<SendResult<String, String>> sendResult = kafkaTemplate.send(GiftProviderTopicNames.RECEIVE_RED_PACKET, JSON.toJSONString(sendRedPacketBO));
        try {
            sendResult.whenComplete((v, e) -> {
                if (e == null) {
                    LOGGER.info("[RedPacketConfigServiceImpl] user {} receive a redPacket, send success", redPacketConfigReqDTO.getUserId());
                }
            }).exceptionally(e -> {
                LOGGER.error("[RedPacketConfigServiceImpl] send error, userId is {}, price is {}", redPacketConfigReqDTO.getUserId(), price);
                throw new RuntimeException(e);
            });
        } catch (Exception e) {
            return new RedPacketReceiveDTO(null, "抱歉,红包被人抢走了,再试试");
        }
        return new RedPacketReceiveDTO(price, "恭喜领取到红包:" + price + "旗鱼币!");
    }


    ...
}

刚刚我们发送了mq消息去做数据库工作:

在gift-provider中编写一个consumer来消费刚刚的mq消息:

/**
 * 处理抢红包mq消息的消费者
 */
@Component
public class ReceiveRedPacketConsumer {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(ReceiveRedPacketConsumer.class);
    @Resource
    private IRedPacketConfigService redPacketConfigService;
    
    @KafkaListener(topics = GiftProviderTopicNames.RECEIVE_RED_PACKET, groupId = "receive-red-packet")
    public void receiveRedPacket(String sendRedPacketBOStr) {
        try {
            SendRedPacketBO sendRedPacketBO = JSON.parseObject(sendRedPacketBOStr, SendRedPacketBO.class);
            redPacketConfigService.receiveRedPacketHandler(sendRedPacketBO.getReqDTO(), sendRedPacketBO.getPrice());
            LOGGER.info("[ReceiveRedPacketConsumer] receiveRedPacket success");
        } catch (Exception e) {
            LOGGER.error("[ReceiveRedPacketConsumer] receiveRedPacket error, mqBody is {}", sendRedPacketBOStr);
        }
    }
}

里面调用了redPacketConfigService.receiveRedPacketHandler(),现在来进行编写该方法

public interface IRedPacketConfigService {
	...

    /**
     * 接收到抢红包的消息过后,进行异步处理的handler
     */
    void receiveRedPacketHandler(RedPacketConfigReqDTO reqDTO, Integer price);
}
@Service
public class RedPacketConfigServiceImpl implements IRedPacketConfigService {

    ...

    @Override
    public void receiveRedPacketHandler(RedPacketConfigReqDTO reqDTO, Integer price) {
        String code = reqDTO.getRedPacketConfigCode();
        String totalGetCountCacheKey = cacheKeyBuilder.buildRedPacketTotalGetCount(code);
        String totalGetPriceCacheKey = cacheKeyBuilder.buildRedPacketTotalGetPrice(code);
        // 记录该用户总共领取了多少金额的红包
        redisTemplate.opsForValue().increment(cacheKeyBuilder.buildUserTotalGetPrice(reqDTO.getUserId()), price);
        redisTemplate.opsForHash().increment(totalGetCountCacheKey, code, 1);
        redisTemplate.expire(totalGetCountCacheKey, 1L, TimeUnit.DAYS);
        redisTemplate.opsForHash().increment(totalGetPriceCacheKey, code, price);
        redisTemplate.expire(totalGetPriceCacheKey, 1L, TimeUnit.DAYS);
        // 往用户的余额里增加金额
        qiyuCurrencyAccountRpc.incr(reqDTO.getUserId(), price);
        // 持久化红包雨的totalGetCount和totalGetPrice
        redPacketConfigMapper.incrTotalGetPrice(code, price);
        redPacketConfigMapper.incrTotalGetCount(code);
    }
  • 这里我们的取出红包的逻辑,因为我们的totalGetCountCacheKey和 totalGetPriceCacheKey使用的是Hash类型,所以我们之后红包雨结束后,记得remove掉code对应的键值对
  • 可以看到我们在用户每次抢红包时,就把金额incr持久化到了MySQL
    • 但是:前面在Redis中又累加了每个用户抢到的金额,但是却没有用到
    • 并且:前面在Redis中的totalGetCount和totalGetPrice都没有用到
    • 所以:我推荐:
      • 方法一:最后不做那3行持久化到MySQL的操作,应该最后红包雨完成后,由前端发起一个红包雨结束的请求,然后使用mq异步的处理MySQl持久化,包括 1、从Redis读取totalGetCount和totalGetPrice的进行MySQL持久化;2、从Redis读取每个用户的累加金额进行MySQL数据库的持久化;3、移除Redis中的数据
      • 方法二:不做Redis的操作,避免掉冗余代码

2.5 API模块调用逻辑

qiyu-live-api:

  • 准备工作:
@Data
public class LivingRoomInitVO {

    private Long anchorId;
    private Long userId;
    private String nickName;
    private String anchorImg;
    private String roomName;
    private boolean isAnchor;
    private String redPacketConfigCode;
    private String avatar;
    private Integer roomId;
    private String watcherNickName;
    private String anchorNickName;
    //观众头像
    private String watcherAvatar;
    //默认背景图,为了方便讲解使用
    private String defaultBgImg;
    private Long pkObjId;
}
@Data
public class RedPacketReceiveVO {
    
    private Integer price;
    private String msg;
}
@Data
public class LivingRoomReqVO {
    private Integer type;
    private int page;
    private int pageSize;
    private Integer roomId;
    private String redPacketConfigCode;
}
  • 开始逻辑的编写:

设置好我们的@RequestLimit进行限流,防止重复点击

@RestController
@RequestMapping("/living")
public class LivingRoomController {

    ...

    @RequestLimit(limit = 1, second = 10, msg = "正在初始化红包数据,请稍等")
    @PostMapping("/prepareRedPacket")
    public WebResponseVO prepareRedPacket(LivingRoomReqVO livingRoomReqVO) {
        return WebResponseVO.success(livingRoomService.prepareRedPacket(QiyuRequestContext.getUserId(), livingRoomReqVO.getRoomId()));
    }

    @RequestLimit(limit = 1, second = 10, msg = "正在广播直播间用户,请稍等")
    @PostMapping("/startRedPacket")
    public WebResponseVO startRedPacket(LivingRoomReqVO livingRoomReqVO) {
        return WebResponseVO.success(livingRoomService.startRedPacket(QiyuRequestContext.getUserId(), livingRoomReqVO.getRedPacketConfigCode()));
    }

    @RequestLimit(limit = 1, second = 7, msg = "")
    @PostMapping("/getRedPacket")
    public WebResponseVO getRedPacket(LivingRoomReqVO livingRoomReqVO) {
        return WebResponseVO.success(livingRoomService.getRedPacket(QiyuRequestContext.getUserId(), livingRoomReqVO.getRedPacketConfigCode()));
    }
    
}
public interface ILivingRoomService {
    ...

    /**
     * 主播点击开始准备红包雨金额
     */
    Boolean prepareRedPacket(Long userId, Integer roomId);

    /**
     * 主播开始红包雨
     */
    Boolean startRedPacket(Long userId, String code);

    /**
     * 根据红包雨code领取红包
     */
    RedPacketReceiveVO getRedPacket(Long userId, String redPacketConfigCode);
}

下一段代码LivingRoomServiceImpl里会调用livingRoomRpc.queryByAnchorId(userId),但是我们的living-provider里还没有写这个方法,我们需要使用在其中添加这个方法,LivingRoomServiceImpl的代码如下,RPC接口和service接口的代码请自己补充:

@Service
public class LivingRoomServiceImpl implements ILivingRoomService {

    ...

    @Override
    public LivingRoomRespDTO queryByAnchorId(Long anchorId) {
        LambdaQueryWrapper<LivingRoomPO> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(LivingRoomPO::getAnchorId, anchorId);
        queryWrapper.eq(LivingRoomPO::getStatus, CommonStatusEnum.VALID_STATUS.getCode());
        queryWrapper.last("limit 1");
        return ConvertBeanUtils.convert(livingRoomMapper.selectOne(queryWrapper), LivingRoomRespDTO.class);
    }
@Service
public class LivingRoomServiceImpl implements ILivingRoomService {

    ...

    @Override
    public Boolean prepareRedPacket(Long userId, Integer roomId) {
        LivingRoomRespDTO livingRoomRespDTO = livingRoomRpc.queryByRoomId(roomId);
        ErrorAssert.isNotNull(livingRoomRespDTO, BizBaseErrorEnum.PARAM_ERROR);
        ErrorAssert.isTure(userId.equals(livingRoomRespDTO.getAnchorId()), BizBaseErrorEnum.PARAM_ERROR);
        return redPacketConfigRpc.prepareRedPacket(userId);
    }

    @Override
    public Boolean startRedPacket(Long userId, String code) {
        RedPacketConfigReqDTO reqDTO = new RedPacketConfigReqDTO();
        reqDTO.setUserId(userId);
        reqDTO.setRedPacketConfigCode(code);
        LivingRoomRespDTO livingRoomRespDTO = livingRoomRpc.queryByAnchorId(userId);
        ErrorAssert.isNotNull(livingRoomRespDTO, BizBaseErrorEnum.PARAM_ERROR);
        reqDTO.setRoomId(livingRoomRespDTO.getId());
        return redPacketConfigRpc.startRedPacket(reqDTO);
    }

    @Override
    public RedPacketReceiveVO getRedPacket(Long userId, String code) {
        RedPacketConfigReqDTO reqDTO = new RedPacketConfigReqDTO();
        reqDTO.setUserId(userId);
        reqDTO.setRedPacketConfigCode(code);
        RedPacketReceiveDTO receiveDTO = redPacketConfigRpc.receiveRedPacket(reqDTO);
        RedPacketReceiveVO respVO = new RedPacketReceiveVO();
        if (receiveDTO == null) {
            respVO.setMsg("红包已派发完毕");
        } else {
            respVO.setPrice(receiveDTO.getPrice());
            respVO.setMsg(receiveDTO.getNotifyMsg());
        }
        return respVO;
    }
}

end