admin管理员组

文章数量:1030001

RocketMQ实战—10.营销系统代码优化二

大纲

1.营销系统引入MQ实现异步化来进行性能优化

2.基于MQ释放优惠券提升系统扩展性

3.基于Redis实现重复促销活动去重

4.基于促销活动创建事件实现异步化

5.推送任务分片和分片消息batch合并发送实现

6.推送系统与用户群体查询逻辑解耦

7.查询用户数据以及批量发送推送消息

8.线程池封装以及推送系统多线程推送

9.推送系统的千万级消息多线程推送

10.千万级用户惰性发券代码实现

11.指定用户群体发券的代码实现

12.分片消息的batch合并算法重构实现

13.百万画像群体爆款商品推送代码实现

14.生产环境百万级用户PUSH全链路压测

10.千万级用户惰性发券代码实现

(1)给全量用户发放优惠券的初版实现

(2)给全量用户惰性发放优惠券的优化实现

(1)给全量用户发放优惠券的初版实现

首先营销系统对全量用户发放优惠券的任务进行分片,然后将分片的消息发送到如下Topic。

代码语言:javascript代码运行次数:0运行复制
PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC
代码语言:javascript代码运行次数:0运行复制
@RestController
@RequestMapping("/demo/promotion/coupon")
public class PromotionCouponController {
    //优惠活动service
    @Autowired
    private CouponService couponService;
    
    //新增一个优惠券活动
    @PostMapping
    public JsonResult<SaveOrUpdateCouponDTO> saveOrUpdateCoupon(@RequestBody SaveOrUpdateCouponRequest request) {
        try {
            log.info("新增一条优惠券:{}", JSON.toJSONString(request));
            SaveOrUpdateCouponDTO dto = couponService.saveOrUpdateCoupon(request);
            return JsonResult.buildSuccess(dto);
        } catch (BaseBizException e) {
            log.error("biz error: request={}", JSON.toJSONString(request), e);
            return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
        } catch (Exception e) {
            log.error("system error: request={}", JSON.toJSONString(request), e);
            return JsonResult.buildError(e.getMessage());
        }
    }
    ...
}

//优惠券接口实现
@Service
public class CouponServiceImpl implements CouponService {
    ...
    //保存/修改优惠券活动方法
    @Transactional(rollbackFor = Exception.class)
    @Override
    public SaveOrUpdateCouponDTO saveOrUpdateCoupon(SaveOrUpdateCouponRequest request) {
        SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(request);
        couponDO.setCouponReceivedCount(0);
        salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO);
      
        //为所有用户发放优惠券
        sendPlatformCouponMessage(couponDO);
      
        SaveOrUpdateCouponDTO dto = new SaveOrUpdateCouponDTO();
        dto.setCouponName(request.getCouponName());
        dto.setRule(request.getCouponRule());
        dto.setSuccess(true);
        return dto;
    }
    ...
    //为所有用户发放优惠券
    private void sendPlatformCouponMessage(SalesPromotionCouponDO promotionCouponDO) {
        //桶的大小
        final int userBucketSize = 1000;
        final int messageBatchSize = 100;

        //1.查询出库里面最大的userId,作为用户的总数量
        JsonResult<Long> maxUserIdJsonResult = accountApi.queryMaxUserId();
        if (maxUserIdJsonResult.getSuccess()) {
            throw new BaseBizException(maxUserIdJsonResult.getErrorCode(), maxUserIdJsonResult.getErrorMessage());
        }
        Long maxUserId = maxUserIdJsonResult.getData();

        //2.分成m个桶,每个桶里面有n个用户,每个桶发送一条"批量发送优惠券用户桶消息"
        Map<Long, Long> userBuckets = new LinkedHashMap<>();
        AtomicBoolean flagRef = new AtomicBoolean(true);
        long startUserId = 1L;
        while (flagRef.get()) {
            if (startUserId > maxUserId) {
                flagRefpareAndSet(true, false);
            }
            userBuckets.put(startUserId, startUserId + userBucketSize);
            startUserId += userBucketSize;
        }

        //3.批量发送消息
        //例:userBucketCount = 1000; messageBatchSize = 100
        //批量发送次数 = 10次,经过两次分桶,这里发送消息的次数从100w次降到10次
        int handledBucketCount = 0;
        List<String> jsonMessageBatch = new ArrayList<>(messageBatchSize);
        for (Map.Entry<Long, Long> userBucket : userBuckets.entrySet()) {
            handledBucketCount++;
            PlatformCouponUserBucketMessage message = PlatformCouponUserBucketMessage.builder()
                .startUserId(userBucket.getKey())
                .endUserId(userBucket.getValue())
                .informType(promotionCouponDO.getInformType())
                .couponId(promotionCouponDO.getId())
                .activityStartTime(promotionCouponDO.getActivityStartTime())
                .activityEndTime(promotionCouponDO.getActivityEndTime())
                .couponType(promotionCouponDO.getCouponType())
                .build();
            String jsonMessage = JsonUtil.object2Json(message);
            jsonMessageBatch.add(jsonMessage);

            if (jsonMessageBatch.size() == messageBatchSize || handledBucketCount == userBuckets.size()) {
                defaultProducer.sendMessages(RocketMqConstant.PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC, jsonMessageBatch, "平台发放优惠券用户桶消息");
                jsonMessageBatch.clear();
            }
        }
    }
}

接着营销系统监听如下Topic消费分片后的发放优惠券任务。

代码语言:javascript代码运行次数:0运行复制
PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC

此时有两种处理方法:一.直接使用线程池进行发送发放优惠券消息到MQ。二.合并batch后再使用线程池发送MQ。

代码语言:javascript代码运行次数:0运行复制
@Configuration
public class ConsumerBeanConfig {
    ...
    //平台发放优惠券用户桶消费者
    @Bean("platformCouponUserBucketReceiveTopicConsumer")
    public DefaultMQPushConsumer receiveCouponUserBucketConsumer(@Qualifier("platformCouponUserBucketReceiveTopicConsumer")PlatFormCouponUserBucketListener platFormCouponUserBucketListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_COUPON_SEND_USER_BUCKET_CONSUMER_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC, "*");
        consumer.registerMessageListener(platFormCouponUserBucketListener);
        consumer.start();
        return consumer;
    }
}

@Component
public class PlatFormCouponUserBucketListener implements MessageListenerConcurrently {
    //账户服务
    @DubboReference(version = "1.0.0")
    private AccountApi accountApi;

    //发送消息共用的线程池
    @Autowired
    @Qualifier("sharedSendMsgThreadPool")
    private SafeThreadPool sharedSendMsgThreadPool;

    //RocketMQ生产者
    @Autowired
    private DefaultProducer defaultProducer;

    //并发消费消息
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {
            for (MessageExt messageExt : msgList) {
                //1.反序列化消息
                String messageString = new String(messageExt.getBody());
                log.debug("执行平台发送优惠券用户桶消息逻辑,消息内容:{}", messageString);
                PlatformCouponUserBucketMessage message = JSON.parseObject(messageString, PlatformCouponUserBucketMessage.class);

                //2.查询桶内的用户信息
                Long startUserId = message.getStartUserId();
                Long endUserId = message.getEndUserId();
                JsonResult<List<MembershipAccountDTO>> accountBucketResult = accountApi.queryAccountByIdRange(startUserId, endUserId);
                if (!accountBucketResult.getSuccess()) {
                    throw new BaseBizException(accountBucketResult.getErrorCode(), accountBucketResult.getErrorMessage());
                }
                List<MembershipAccountDTO> accountBucket = accountBucketResult.getData();
                if (CollectionUtils.isEmpty(accountBucket)) {
                    log.info("根据用户桶内的id范围没有查询到用户, startUserId={}, endUserId{}", startUserId, endUserId);
                    continue;
                }

                //3.每个用户发送一条"平台发送优惠券消息"
                //方法一:直接使用线程池进行发送发放优惠券消息到MQ;
                //这里是并行消费的,以上逻辑已经是并行执行的了,而且有查库的操作
                //accountBucket 默认是 1000 个用户,要为每一个用户都发送一条"平台发送优惠券消息",也就是1000条消息
                //下面我们使用线程池来并行发送这1000条消息(ps:另一种也可以像发送优惠券用户桶消息一样用批量发送)
                PlatformCouponMessage couponMessage = PlatformCouponMessage.builder()
                    .couponId(message.getCouponId())
                    .activityStartTime(message.getActivityStartTime())
                    .activityEndTime(message.getActivityEndTime())
                    .couponType(message.getCouponType())
                    .build();
                for (MembershipAccountDTO account : accountBucket) {
                    sharedSendMsgThreadPool.execute(() -> {
                        couponMessage.setUserAccountId(account.getId());
                        String jsonMessage = JSON.toJSONString(couponMessage);
                        defaultProducer.sendMessage(RocketMqConstant.PLATFORM_COUPON_SEND_TOPIC, jsonMessage, "平台发送优惠券消息");
                    });
                }

                //方法二:合并batch后再使用线程池发送MQ
                /*List<String> messages = new ArrayList<>(100);
                for (MembershipAccountDTO account : accountBucket) {
                    couponMessage.setUserAccountId(account.getId());
                    messages.add(JSON.toJSONString(couponMessage));
                    if (messages.size() == 100) {
                        sharedSendMsgThreadPool.execute(() -> {
                            defaultProducer.sendMessages(RocketMqConstant.PLATFORM_COUPON_SEND_TOPIC, messages, "平台发送优惠券消息");
                        });
                        messages.clear();
                    }
                }
                //最后剩下的也批量发出
                if (!CollectionUtils.isEmpty(messages)) {
                    sharedSendMsgThreadPool.execute(() -> {
                        defaultProducer.sendMessages(RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, messages, "平台发送促销活动消息");
                    });
                    messages.clear();
                }*/
            }
        } catch (Exception e){
            log.error("consume error,平台优惠券消费失败", e);
            //本次消费失败,下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

最后营销系统监听如下Topic,然后通过线程池对每个用户发放优惠券。

代码语言:javascript代码运行次数:0运行复制
PLATFORM_PROMOTION_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制
@Configuration
public class ConsumerBeanConfig {
    ...
    //平台发放优惠券领取消费者
    @Bean("platformCouponReceiveTopicConsumer")
    public DefaultMQPushConsumer receiveCouponConsumer(PlatFormCouponListener platFormCouponListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_COUPON_SEND_CONSUMER_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(PLATFORM_COUPON_SEND_TOPIC, "*");
        consumer.registerMessageListener(platFormCouponListener);
        consumer.start();
        return consumer;
    }
}

@Component
public class PlatFormCouponListener implements MessageListenerConcurrently {
    //优惠券服务service
    @Autowired
    private CouponItemService couponItemService;

    //测试completableFuture使用commonPool的时是不需要初始化业务ThreadPoolExecutor的
    //这里用supplier懒加载让测试completableFuture使用commonPool时不要初始化线程池
    //只有当使用completableFuture使用自定义的线程时才初始化线程池
    private static final int PERMITS = 30;
    private static final AtomicBoolean initializedRef = new AtomicBoolean(false);
    private static ThreadPoolExecutor THREAD_POOL_EXECUTOR = null;
    private static final Supplier<ThreadPoolExecutor> THREAD_POOL_EXECUTOR_SUPPLIER = () -> {
        if (initializedRefpareAndSet(false, true)) {
            THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(PERMITS, PERMITS * 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), NamedDaemonThreadFactory.getInstance("consumeCouponMsg"), new ThreadPoolExecutor.CallerRunsPolicy());
        }
        return THREAD_POOL_EXECUTOR;
    };

    //并发消费消息
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {
            //方式一:使用默认的commonPool来处理任务
            //supplyAsync(Supplier<U> supplier) API
            //默认使用的是 ForkJoinPoolmonPool() 这个线程池
            //该线程池在jvm内是唯一的,默认的线程数量是cpu的核数减1
            //如果觉得线程数不够用可以通过jvm系统参数 java.util.concurrent.ForkJoinPoolmon.parallelism 的值调整commonPool的并行度,或者采用方式二
            List<CompletableFuture<SalesPromotionCouponItemDTO>> futureList = msgList.stream()
                .map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e)))
                .collect(Collectors.toList());

            //方式二:使用自定的业务线程池来处理任务
            //List<CompletableFuture<SalesPromotionCouponItemDTO>> futureList = msgList.stream()
            //     .map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e), THREAD_POOL_EXECUTOR_SUPPLIER.get()))
            //     .collect(Collectors.toList());

            List<SalesPromotionCouponItemDTO> couponItemDTOList = futureList.stream()
                .map(CompletableFuture::join)
                .filter(Objects::nonNull)
                .collect(Collectors.toList());

            //优惠券保存到数据库
            couponItemService.saveCouponBatch(couponItemDTOList);
        } catch (Exception e) {
            log.error("consume error,平台优惠券消费失败", e);
            //本次消费失败,下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    public SalesPromotionCouponItemDTO handleMessageExt(MessageExt messageExt) {
        log.debug("执行平台发放优惠券消费消息逻辑,消息内容:{}", messageExt.getBody());
        String msg = new String(messageExt.getBody());
        PlatformCouponMessage platformCouponMessage = JSON.parseObject(msg , PlatformCouponMessage.class);
        log.info("开始发放平台优惠券,couponId:{}", platformCouponMessage.getCouponId());

        //幂等逻辑防止重复消费
        JsonResult<Long> result = couponItemService.selectByAccountIdAndCouponId(platformCouponMessage.getUserAccountId(), platformCouponMessage.getCouponId());
        //如果已经存在,直接跳过循环,不再执行优惠券保存操作
        if (result.getSuccess()) {
            return null;
        }

        SalesPromotionCouponItemDTO itemDTO = new SalesPromotionCouponItemDTO();
        itemDTO.setCouponId(platformCouponMessage.getCouponId());
        itemDTO.setCouponType(platformCouponMessage.getCouponType());
        itemDTO.setUserAccountId(platformCouponMessage.getUserAccountId());
        itemDTO.setIsUsed(0);
        itemDTO.setActivityStartTime(platformCouponMessage.getActivityStartTime());
        itemDTO.setActivityEndTime(platformCouponMessage.getActivityEndTime());
        return itemDTO;
    }
}

(2)给全量用户惰性发放优惠券的优化实现

一.首先需要配置好要使用的Redis相关Bean

代码语言:javascript代码运行次数:0运行复制
@Data
@Configuration
@ConditionalOnClass(RedisConnectionFactory.class)
public class RedisConfig {
    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;
    @Value("${spring.redis.password}")
    private String password;
    @Value("${spring.redis.timeout}")
    private int timeout;

    @Bean
    @ConditionalOnClass(RedisConnectionFactory.class)
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setDefaultSerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    @Bean
    @ConditionalOnClass(RedissonClient.class)
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer()
            .setAddress("redis://" + host + ":" + port)
            .setPassword(password)
            .setConnectionMinimumIdleSize(10)
            .setConnectionPoolSize(100)
            .setIdleConnectionTimeout(600000)
            .setSubscriptionConnectionMinimumIdleSize(10)
            .setSubscriptionConnectionPoolSize(100)
            .setTimeout(timeout);

        config.setCodec(new StringCodec());
        config.setThreads(5);
        config.setNettyThreads(5);

        RedissonClient client = Redisson.create(config);
        return client;
    }

    @Bean
    @ConditionalOnClass(RedisConnectionFactory.class)
    public RedisCache redisCache(RedisTemplate redisTemplate) {
        return new RedisCache(redisTemplate);
    }
}

二.然后实现基于Redisson分布式锁维护优惠券缓存列表,以及惰性优惠券过期缓存清理

其实就是新增优惠券时,将优惠券信息写入Redis,并检查优惠券是否过期,如果过期就进行删除。

代码语言:javascript代码运行次数:0运行复制
//优惠券接口实现
@Service
public class CouponServiceImpl implements CouponService {
    //Redis客户端工具
    @Autowired
    private RedisCache redisCache;
  
    @Autowired
    private RedissonClient redissonClient;
    
    ...
    //保存/修改优惠券活动方法
    @Transactional(rollbackFor = Exception.class)
    @Override
    public SaveOrUpdateCouponDTO saveOrUpdateCoupon(SaveOrUpdateCouponRequest request) {
        SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(request);
        couponDO.setCouponReceivedCount(0);
        salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO);

        //判断优惠券类型
        if (CouponSendTypeEnum.PLATFORM_SEND.getCode().equals(request.getCouponReceiveType())) {
            //一.如果是系统发放类型,则针对所有用户,发送优惠券到MQ
            writeCouponToRedis(couponDO);
        } else {
            //二.如果是自己领取类型
            //TODO
        }

        SaveOrUpdateCouponDTO dto = new SaveOrUpdateCouponDTO();
        dto.setCouponName(request.getCouponName());
        dto.setRule(request.getCouponRule());
        dto.setSuccess(true);
        return dto;
    }
    
    private void writeCouponToRedis(SalesPromotionCouponDO coupon) {
        //首先需要用Redisson基于Redis做一个分布式锁的加锁PROMOTION_COUPON_ID_LIST_LOCK
        //再去维护一个"一共发出去了多少张券"的数据结构PROMOTION_COUPON_ID_LIST
        RLock lock = redissonClient.getLock(RedisKey.PROMOTION_COUPON_ID_LIST_LOCK);
        try {
            //进行加锁,超时时间为60s释放
            lock.lock(60, TimeUnit.SECONDS);
            List<Long> couponIds = null;

            String couponIdsJSON = redisCache.get(RedisKey.PROMOTION_COUPON_ID_LIST);
            if (couponIdsJSON == null || couponIdsJSON.equals("")) {
                couponIds = new ArrayList<>();
            } else {
                couponIds = JSON.parseObject(couponIdsJSON, List.class);
            }

            //检查每个优惠券时间是否过期了,如果过期或者已经发完了券,则把它从List里删除,以及从Redis里删除
            //如果是全量发券,则不会发完,因此可以给所有人发,如果超过了时间才不能发券
            if (couponIds.size() > 0) {
                Iterator<Long> couponIdIterator = couponIds.iterator();
                while (couponIdIterator.hasNext()) {
                    Long tempCouponId = couponIdIterator.next();
                    String tempCouponJSON = redisCache.get(RedisKey.PROMOTION_COUPON_KEY + "::" + tempCouponId);
                    SalesPromotionCouponDO tempCoupon = JSON.parseObject(tempCouponJSON, SalesPromotionCouponDO.class);

                    Date now = new Date();
                    if (now.after(tempCoupon.getActivityEndTime())) {
                        couponIdIterator.remove();
                        redisCache.delete(RedisKey.PROMOTION_COUPON_KEY + "::" + tempCouponId);
                    }
                }
            }

            couponIds.add(coupon.getId());
            couponIdsJSON = JsonUtil.object2Json(couponIds);
            redisCache.set(RedisKey.PROMOTION_COUPON_ID_LIST, couponIdsJSON, -1);

            String couponJSON = JsonUtil.object2Json(coupon);
            redisCache.set(RedisKey.PROMOTION_COUPON_KEY + "::" + coupon.getId(), couponJSON, -1);
        } finally {
            lock.unlock();
        }
    }
}

三.接着实现会员系统发布用户登录事件 + 营销系统在用户登录后的惰性发券

会员系统在用户登录时会发送消息到MQ的USER_LOGINED_EVENT_TOPIC:

代码语言:javascript代码运行次数:0运行复制
@RestController
@RequestMapping("/demo/membership")
public class MembershipController {
    @Autowired
    private DefaultProducer defaultProducer;

    @Autowired
    private MembershipAccountService accountService;

    //触发用户登录
    @PostMapping("/triggerUserLoginEvent")
    public JsonResult<Boolean> triggerUserLoginEvent(Long accountId) {
        try {
            List<MembershipAccountDTO> accounts = accountService.queryAccountByIdRange(accountId, accountId);
            if (accounts != null && accounts.size() > 0) {
                MembershipAccountDTO account = accounts.get(0);
                UserLoginedEvent userLoginedEvent = new UserLoginedEvent();
                userLoginedEvent.setAccount(account);

                String userLoginedEventJSON = JsonUtil.object2Json(userLoginedEvent);
                defaultProducer.sendMessage(RocketMqConstant.USER_LOGINED_EVENT_TOPIC, userLoginedEventJSON, "用户登录事件发生了");
            }
            return JsonResult.buildSuccess(true);
        } catch (BaseBizException e) {
            log.error("biz error: request={}", accountId, e);
            return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
        } catch (Exception e) {
            log.error("system error: request={}", accountId, e);
            return JsonResult.buildError(e.getMessage());
        }
    }
}

营销系统监听USER_LOGINED_EVENT_TOPIC对用户登录时发送的登录事件消息进行惰性发券:

代码语言:javascript代码运行次数:0运行复制
@Configuration
public class ConsumerBeanConfig {
    ...
    @Bean("userLoginedEventListener")
    public DefaultMQPushConsumer userLoginedEventListener(UserLoginedEventListener userLoginedEventListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(USER_LOGINED_EVENT_CONSUMER_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(USER_LOGINED_EVENT_TOPIC, "*");
        consumer.registerMessageListener(userLoginedEventListener);
        consumer.start();
        return consumer;
    }
}

//用户登录事件监听器
@Component
public class UserLoginedEventListener implements MessageListenerConcurrently {
    @Autowired
    private RedisCache redisCache;

    @Autowired
    private SalesPromotionCouponItemDAO couponItemDAO;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            //从Redis缓存里查询所有的优惠券
            String couponIdsJSON = redisCache.get(RedisKey.PROMOTION_COUPON_ID_LIST);
            List<Long> couponIds = JSON.parseObject(couponIdsJSON, List.class);
            List<SalesPromotionCouponDO> coupons = new ArrayList<>();

            for (Long couponId : couponIds) {
                String couponJSON = redisCache.get(RedisKey.PROMOTION_COUPON_KEY + "::" + couponId);
                SalesPromotionCouponDO coupon = JSON.parseObject(couponJSON, SalesPromotionCouponDO.class);
                Date now = new Date();
                if (now.after(coupon.getActivityStartTime()) && now.before(coupon.getActivityEndTime())) {
                    coupons.add(coupon);
                }
            }

            for (MessageExt messageExt : list) {
                //这个代码就可以拿到一个刚刚登陆成功的用户
                String message = new String(messageExt.getBody());
                UserLoginedEvent userLoginedEvent = JSON.parseObject(message, UserLoginedEvent.class);
                MembershipAccountDTO account = userLoginedEvent.getAccount();

                //遍历每一个优惠券,检查这个优惠券是否有效,是否还可以继续进行发券,以及当前用户是否发过券,然后才给用户进行发券
                for (SalesPromotionCouponDO coupon : coupons) {
                    String receiveCouponFlag = redisCache.get(RedisKey.PROMOTION_USER_RECEIVE_COUPON + "::" + account.getId() + "::" + coupon.getId());
                    if (receiveCouponFlag == null || receiveCouponFlag.equals("")) {
                        SalesPromotionCouponItemDO couponItem = new SalesPromotionCouponItemDO();
                        couponItem.setActivityEndTime(coupon.getActivityEndTime());
                        couponItem.setActivityStartTime(coupon.getActivityStartTime());
                        couponItem.setCouponId(coupon.getId());
                        couponItem.setCouponType(coupon.getCouponType());
                        couponItem.setCreateTime(new Date());
                        couponItem.setCreateUser(account.getId());
                        couponItem.setIsUsed(0);
                        couponItem.setUpdateTime(new Date());
                        couponItem.setUpdateUser(account.getId());
                        couponItem.setUserAccountId(account.getId());
                        couponItemDAO.receiveCoupon(couponItem);
                        redisCache.set(RedisKey.PROMOTION_USER_RECEIVE_COUPON + "::" + account.getId() + "::" + coupon.getId(), "true", -1);
                    }
                }
            }
        } catch(Exception e) {
            log.error("consume error, 用户登录事件处理异常", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

11.指定用户群体发券的代码实现

(1)千万级用户推送和发放优惠券的方案总结

(2)指定用户群体发券实现

(1)千万级用户推送和发放优惠券的方案总结

创建促销活动和发放优惠券时,都不是直接进行推送和发放的,而是利用RocketMQ进行多次中转、异步化处理。

营销系统首先会查出用户总数,然后进行任务分片,接着把分片任务消息通过batch合并发送到MQ(异步化提升性能)。

营销系统会消费这些分片任务消息,并查询出用户和封装好每个用户的消息,然后发到MQ(解耦会员系统和推送系统)。

推送系统最后会消费每个用户的推送消息,并基于线程池采用多线程并发进行推送。

(2)指定用户群体发券实现

典型的例子就是为激活百万不活跃用户发放优惠券。

一.营销系统创建指定用户群体发券的入口代码

代码语言:javascript代码运行次数:0运行复制
@RestController
@RequestMapping("/demo/promotion/coupon")
public class PromotionCouponController {
    ...
    @RequestMapping("/send")
    public JsonResult<SendCouponDTO> sendCouponByConditions(@RequestBody SendCouponRequest request) {
        try {
            log.info("发送优惠券给指定用户群体:{}", JSON.toJSONString(request));
            SendCouponDTO dto = couponService.sendCouponByConditions(request);
            return JsonResult.buildSuccess(dto);
        } catch (BaseBizException e) {
            log.error("biz error: request={}", JSON.toJSONString(request), e);
            return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
        } catch (Exception e) {
            log.error("system error: request={}", JSON.toJSONString(request), e);
            return JsonResult.buildError(e.getMessage());
        }
    }
}

@Service
public class CouponServiceImpl implements CouponService {
    @DubboReference(version = "1.0.0")
    private MessagePushApi messagePushApi;
    ...
    
    @Transactional(rollbackFor = Exception.class)
    @Override
    public SendCouponDTO sendCouponByConditions(SendCouponRequest sendCouponRequest) {
        //保存优惠券信息
        SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(sendCouponRequest);
        couponDO.setCouponReceivedCount(0);
        couponDO.setCouponStatus(CouponStatusEnum.NORMAL.getCode());
        couponDO.setCouponReceiveType(CouponSendTypeEnum.SELF_RECEIVE.getCode());
        salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO);

        //分片和批量发送发放优惠券消息
        shardBatchSendCouponMessage(sendCouponRequest);

        SendCouponDTO sendCouponDTO = new SendCouponDTO();
        sendCouponDTO.setSuccess(Boolean.TRUE);
        sendCouponDTO.setCouponName(sendCouponRequest.getCouponName());
        sendCouponDTO.setRule(sendCouponRequest.getCouponRule());

        //TODO 发放数量
        sendCouponDTO.setSendCount(0);
        return sendCouponDTO;
    }
}

二.营销系统分片和批量发送发券消息的代码

其中要去画像系统获取用户信息,然后根据用户数量进行分片,接着进行批量发送,发送到MQ的如下Topic。

代码语言:javascript代码运行次数:0运行复制
PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC
代码语言:javascript代码运行次数:0运行复制
@Service
public class CouponServiceImpl implements CouponService {
    ...
    //分片和批量发送发放优惠券消息
    private void shardBatchSendCouponMessage(SendCouponRequest sendCouponRequest) {
        //1.到画像系统获取当前条件下的count值
        MembershipFilterDTO membershipFilterDTO = sendCouponRequest.getMembershipFilterDTO();
        PersonaFilterConditionDTO conditionDTO = conditionConverter.convertFilterCondition(membershipFilterDTO);
        JsonResult<Integer> countResult = personaApi.countByCondition(conditionDTO);
        if (!countResult.getSuccess()) {
            throw new BaseBizException(countResult.getErrorCode(), countResult.getErrorMessage());
        }

        //2.根据count值分片
        //分成m个分片,每个分片中包含:(1)分片ID;(2)用户个数;
        //例:maxUserId = 100w; userBucketSize=1000
        //userBucket1 = [1, 1000)
        //userBucket2 = [2, 1000)
        //userBucket2 = [n, 756),最后一个分片可能数量不足1000
        //userBucketCount = 1000
        Integer count = countResult.getData();
        Map<Integer, Integer> userBuckets = new LinkedHashMap<>();
        AtomicBoolean flagRef = new AtomicBoolean(true);
        Integer shardId = 1;
        while (flagRef.get()) {
            if (USER_BUCKET_SIZE > count) {
                userBuckets.put(shardId, USER_BUCKET_SIZE);
                flagRefpareAndSet(true, false);
            }
            userBuckets.put(shardId, USER_BUCKET_SIZE);
            shardId += 1;
            count -= USER_BUCKET_SIZE;
        }

        //3.批量发送消息
        //例:userBucketCount = 1000; messageBatchSize = 100
        List<String> messages = new ArrayList<>();
        PlatformPromotionConditionUserBucketMessage message = PlatformPromotionConditionUserBucketMessage.builder().personaFilterCondition(JSON.toJSONString(conditionDTO)).build();
        for (Map.Entry<Integer, Integer> userBucket : userBuckets.entrySet()) {
            message.setShardId(userBucket.getKey());
            message.setBucketSize(userBucket.getValue());
            String jsonMessage = JsonUtil.object2Json(message);
            messages.add(jsonMessage);
        }
        log.info("本次推送消息数量,{}",messages.size());

        ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
        while (splitter.hasNext()) {
            List<String> sendBatch = splitter.next();
            log.info("本次批次消息数量,{}",sendBatch.size());
            sharedSendMsgThreadPool.execute(() -> {
                defaultProducer.sendMessages(RocketMqConstant.PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC, sendBatch, "部分用户优惠活动用户桶消息");
            });
        }
    }
}

三.营销系统对指定用户群体分片消息的处理和推送代码

首先,营销系统会消费MQ的如下Topic:

代码语言:javascript代码运行次数:0运行复制
PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC

然后,营销系统会把领取优惠券的消息会发送到MQ的如下Topic:

代码语言:javascript代码运行次数:0运行复制
PLATFORM_CONDITION_COUPON_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制
@Configuration
public class ConsumerBeanConfig {
    ...
    @Bean("platFormConditionCouponUserBucketConsumer")
    public DefaultMQPushConsumer platFormConditionCouponUserBucketConsumer(PlatFormConditionCouponUserBucketListener platFormPromotionUserBucketListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_CONSUMER_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC, "*");
        consumer.registerMessageListener(platFormPromotionUserBucketListener);
        consumer.start();
        return consumer;
    }
}

@Component
public class PlatFormConditionCouponUserBucketListener implements MessageListenerConcurrently {
    //用户画像服务
    @DubboReference(version = "1.0.0")
    private PersonaApi personaApi;

    //发送消息共用的线程池
    @Autowired
    @Qualifier("sharedSendMsgThreadPool")
    private SafeThreadPool sharedSendMsgThreadPool;

    //RocketMQ生产者
    @Autowired
    private DefaultProducer defaultProducer;

    //并发消费消息
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {
            for (MessageExt messageExt : msgList) {
                //1.反序列化消息
                String messageString = new String(messageExt.getBody());
                log.debug("部分用户领取优惠券用户桶消息逻辑,消息内容:{}", messageString);
                PlatformPromotionConditionUserBucketMessage message = JSON.parseObject(messageString, PlatformPromotionConditionUserBucketMessage.class);

                //2.查询桶内的用户信息
                Integer shardId = message.getShardId();

                //根据分片id,和分片数量大小,计算出本次分片的起始userId
                Long startUserId = (shardId.longValue() - 1) * 1000;
                Integer bucketSize = message.getBucketSize();
                String personaFilterCondition = message.getPersonaFilterCondition();
                PersonaFilterConditionDTO personaFilterConditionDTO = JSON.parseObject(personaFilterCondition, PersonaFilterConditionDTO.class);

                //封装查询用户id的条件
                PersonaConditionPage page = PersonaConditionPage.builder()
                    .memberPoint(personaFilterConditionDTO.getMemberPoint())
                    .memberLevel(personaFilterConditionDTO.getMemberLevel())
                    .offset(startUserId)
                    .limit(bucketSize)
                    .build();

                //从用户画像系统查询用户账号id
                JsonResult<List<Long>> accountIdsResult = personaApi.getAccountIdsByIdLimit(page);
                if (!accountIdsResult.getSuccess()) {
                    throw new BaseBizException(accountIdsResult.getErrorCode(), accountIdsResult.getErrorMessage());
                }

                List<Long> accountIds = accountIdsResult.getData();
                if (CollectionUtils.isEmpty(accountIds)) {
                    log.info("根据用户桶内的分片信息没有查询到用户, shardId={}", shardId);
                    continue;
                }

                //3.每个用户发送一条领取优惠券的消息通知
                PlatformMessagePushMessage pushMessage = PlatformMessagePushMessage.builder()
                    .message("恭喜您获得优惠券领取资格,点击www.wjunt进入活动页面")
                    .mainMessage("获得优惠券领取资格")
                    .informType(InformTypeEnum.APP.getCode())
                    .build();

                List<String> messages = new ArrayList<>();
                for (Long accountId : accountIds) {
                    pushMessage.setUserAccountId(accountId);
                    messages.add(JSON.toJSONString(pushMessage));
                }
                log.info("本次推送消息数量,{}",messages.size());

                ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
                while (splitter.hasNext()) {
                    List<String> sendBatch = splitter.next();
                    sharedSendMsgThreadPool.execute(() -> {
                        defaultProducer.sendMessages(RocketMqConstant.PLATFORM_CONDITION_COUPON_SEND_TOPIC, sendBatch, "平台发送优惠券消息");
                    });
                }
            }
        } catch (Exception e){
            log.error("consume error,消费失败", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

四.推送系统会监听如下Topic消费消息推送领取优惠券的消息

代码语言:javascript代码运行次数:0运行复制
PLATFORM_CONDITION_COUPON_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制
@Configuration
public class ConsumerBeanConfig {
    ...
    @Bean("platFormConditionCouponConsumer")
    public DefaultMQPushConsumer platFormConditionCouponConsumer(PlatFormConditionCouponListener platFormPromotionListener) throws MQClientException {
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_CONDITION_COUPON_SEND_CONSUMER_GROUP);
      consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
      consumer.subscribe(PLATFORM_CONDITION_COUPON_SEND_TOPIC, "*");
      consumer.registerMessageListener(platFormPromotionListener);
      consumer.start();
      return consumer;
    }
}

@Component
public class PlatFormConditionCouponListener implements MessageListenerConcurrently {
    //消息推送工厂提供者
    @Autowired
    private FactoryProducer factoryProducer;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    //并发消费消息
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {
            //方式二:使用自定的业务线程池来处理任务
            List<CompletableFuture<AltResult>> futureList = msgList.stream()
                .map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e)))
                .collect(Collectors.toList());

            List<Throwable> resultList = futureList.stream()
                .map(CompletableFuture::join)
                .filter(e -> e.ex != null)
                .map(e -> e.ex)
                .collect(Collectors.toList());
        } catch (Exception e) {
            log.error("consume error,消费失败", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private AltResult handleMessageExt(MessageExt messageExt) {
        try {
            log.debug("执行平台发送通知消息逻辑,消息内容:{}", messageExt.getBody());
            String msg = new String(messageExt.getBody());
            PlatformMessagePushMessage message = JSON.parseObject(msg , PlatformMessagePushMessage.class);

            //幂等控制
            if (StringUtils.isNotBlank(redisTemplate.opsForValue().get(PROMOTION_CONDITION_COUPON_KEY + message.getUserAccountId()))) {
                return new AltResult(null);
            }

            //获取消息服务工厂
            MessageSendServiceFactory messageSendServiceFactory = factoryProducer.getMessageSendServiceFactory(message.getInformType());

            //消息发送服务组件
            MessageSendService messageSendService = messageSendServiceFactory.createMessageSendService();

            //构造消息
            PlatformMessagePushMessage messagePushMessage = PlatformMessagePushMessage.builder()
                .informType(message.getInformType())
                .mainMessage(message.getMainMessage())
                .userAccountId(message.getUserAccountId())
                .message(message.getMessage())
                .build();

            MessageSendDTO messageSendDTO = messageSendServiceFactory.createMessageSendDTO(messagePushMessage);
            messageSendService.send(messageSendDTO);

            //发送成功之后把已经发送成功记录到redis
            redisTemplate.opsForValue().set(PROMOTION_CONDITION_COUPON_KEY + message.getUserAccountId(), UUID.randomUUID().toString());
            log.info("消息推送完成,messageSendDTO:{}", messageSendDTO);

            return new AltResult(null);
        } catch (Exception e) {
            return new AltResult(e);
        }
    }

    //completableFuture的返回结果,适用于无返回值的情况
    //ex字段为null表示任务执行成功
    //ex字段不为null表示任务执行失败,并把异常设置为ex字段
    private static class AltResult {
        final Throwable ex;
        public AltResult(Throwable ex) {
            this.ex = ex;
        }
    }
}

12.分片消息的batch合并算法重构实现

之前的batch合并是按照条数进行合并的,现在重构为按照合并后的大小不超过800KB和不超过100条进行合并。

代码语言:javascript代码运行次数:0运行复制
public class ListSplitter implements Iterator<List<String>> {
    //设置每一个batch最多不超过800k,因为RocketMQ官方推荐,一条消息不建议长度超过1MB
    //而封装一个RocketMQ的message,包括了MessageBody, Topic,Addr等数据,所以设置小一点
    private int sizeLimit = 800 * 1024;
    private final List<String> messages;
    private int currIndex;
    private int batchSize = 100;

    public ListSplitter(List<String> messages, Integer batchSize) {
        this.messages = messages;
        this.batchSize = batchSize;
    }

    public ListSplitter(List<String> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    //每次从list中取一部分
    @Override
    public List<String> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            String message = messages.get(nextIndex);
            //获取每条message的长度
            int tmpSize = message.length();
            if (tmpSize > sizeLimit) {
                if (nextIndex - currIndex == 0) {
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > sizeLimit || (nextIndex - currIndex) == batchSize ) {
                break;
            } else {
                totalSize += tmpSize;
            }
        }
        List<String> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("Not allowed to remove");
    }
}

13.百万画像群体爆款商品推送代码实现

(1)营销系统的定时推送任务

(2)推送系统消费分片推送任务消息,发送具体用户的推送消息到MQ

(3)推送系统消费具体用户的推送消息进行真正推送

(1)营销系统的定时推送任务

代码语言:javascript代码运行次数:0运行复制
@Component
public class ScheduleSendMessageJobHandler {
    ...
    //执行定时任务,筛选热门商品和用户发送给MQ
    @XxlJob("hotGoodsPushHandler")
    public void hotGoodsPushHandler() {
        log.info("hotGoodsPushHandler 开始执行");

        //获取热门商品和用户画像,业务先简化为一对一关系
        List<HotGoodsCrontabDO> crontabDOs = hotGoodsCrontabDAO.queryHotGoodsCrontabByCrontabDate(new Date());
        log.info("获取热门商品和用户画像数据, crontabDOs:{}", JsonUtil.object2Json(crontabDOs));

        //找出每个热门商品对应画像所匹配的用户
        for (HotGoodsCrontabDO crontabDO : crontabDOs) {
            log.info("自动分片逻辑, 当前任务:crontabDO:{}", JsonUtil.object2Json(crontabDO));
            if (StringUtils.isEmpty(crontabDO.getPortrayal())) {
                continue;
            }

            //热门商品对应的画像实体
            MembershipPointDTO membershipPointDTO = JsonUtil.json2Object(crontabDO.getPortrayal(), MembershipPointDTO.class);
            if (Objects.isNull(membershipPointDTO)) {
                continue;
            }

            //获取匹配画像的用户实体
            MembershipFilterConditionDTO conditionDTO = buildCondition(membershipPointDTO);
            PersonaFilterConditionDTO personaFilterConditionDTO = conditionConverter.convertFilterCondition(conditionDTO);
            log.info("用户查询条件:{}", personaFilterConditionDTO);

            //获取画像用户匹配的用户ID最大最小值
            JsonResult<Long> accountMaxIdResult = personaApi.queryMaxIdByCondition(personaFilterConditionDTO);
            log.info("获取最大ID,result:{}", JsonUtil.object2Json(accountMaxIdResult));
            if (!accountMaxIdResult.getSuccess()) {
                log.info("获取最大ID失败,condition:{}", JsonUtil.object2Json(personaFilterConditionDTO));
                throw new BaseBizException(accountMaxIdResult.getErrorCode(), accountMaxIdResult.getErrorMessage());
            }
            JsonResult<Long> accountMinIdResult = personaApi.queryMinIdByCondition(personaFilterConditionDTO);
            log.info("获取最小ID,result:{}", JsonUtil.object2Json(accountMinIdResult));
            if (!accountMinIdResult.getSuccess()) {
                log.info("获取最小ID失败,condition:{}", JsonUtil.object2Json(personaFilterConditionDTO));
                throw new BaseBizException(accountMinIdResult.getErrorCode(), accountMinIdResult.getErrorMessage());
            }

            //需要执行推送的用户起始ID
            //注意:这是一个预估值,因为最小ID到最大ID中间会有很多不符合条件的用户
            //针对这些用户,需要在下一层的业务逻辑中,用选人条件过滤掉
            Long minUserId = accountMinIdResult.getData();
            Long maxUserId = accountMaxIdResult.getData();

            //bucket就是一个用户分片,对应的是一个startUserId -> endUserId,用户ID范围
            //可以根据一定的算法,把千万级用户推送任务分片,比如一个分片后的推送任务包含1000个用户/2000个用户
            //userBuckets就有上万条key-value对,每个key-value对就是一个startUserId -> endUserId的推送任务分片
            final int userBucketSize = 1000;
            Map<Long, Long> userBuckets = new LinkedHashMap<>();
            AtomicBoolean doSharding = new AtomicBoolean(true);
            long startUserId = minUserId;
            log.info("开始对任务人群进行分片,startId:{}",minUserId);
            while (doSharding.get()) {
                if ((maxUserId -minUserId) < userBucketSize) {
                    userBuckets.put(startUserId, maxUserId);
                    doShardingpareAndSet(true, false);
                    break;
                }
                userBuckets.put(startUserId, startUserId + userBucketSize);
                startUserId += userBucketSize;
                maxUserId -= userBucketSize;
            }

            //把可能成千上万的分片推送任务进行RocketMQ消息的batch合并,以batch模式的发送任务到MQ,减少跟RocketMQ网络通信的耗时
            List<String> hotProductPushTasks = new ArrayList<>();
            HotGoodsVO hotGoodsVO = buildHotGoodsVO(crontabDO);
            PlatformHotProductUserBucketMessage bucketMessage = PlatformHotProductUserBucketMessage.builder()
                .hotGoodsVO(JSON.toJSONString(hotGoodsVO))
                .personaFilterConditionDTO(JSON.toJSONString(personaFilterConditionDTO))
                .build();
            for (Map.Entry<Long, Long> userBucket : userBuckets.entrySet()) {
                bucketMessage.setEndUserId(userBucket.getValue());
                bucketMessage.setStartUserId(userBucket.getKey());

                String promotionPushTaskJSON = JsonUtil.object2Json(bucketMessage);
                log.info("用户桶构建侧选人条件:{}",bucketMessage.getPersonaFilterConditionDTO());
                hotProductPushTasks.add(promotionPushTaskJSON);
            }

            //MESSAGE_BATCH_SIZE指的是消息batch大小,RocketMQ的每个batch消息包含了100个推送任务
            //这样可以将1万个推送任务消息合并为100个batch消息,进行100次网络通信发给RocketMQ,可以大幅度降低发送消息的耗时
            ListSplitter splitter = new ListSplitter(hotProductPushTasks, MESSAGE_BATCH_SIZE);
            while (splitter.hasNext()) {
                List<String> sendBatch = splitter.next();
                log.info("本次批次消息数量,{}",sendBatch.size());
                sharedSendMsgThreadPool.execute(() -> {
                    defaultProducer.sendMessages(RocketMqConstant.PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_TOPIC, sendBatch, "平台热门商品定时任务用户桶消息");
                });
            }
        }
    }
}

(2)推送系统消费分片推送任务消息,发送具体用户的推送消息到MQ

监听MQ的如下Topic:

代码语言:javascript代码运行次数:0运行复制
PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制
@Configuration
public class ConsumerBeanConfig {
    ...
    @Bean("PlatFormHotProductUserBucketConsumer")
    public DefaultMQPushConsumer PlatFormHotProductUserBucketConsumer(PlatFormHotProductUserBucketListener platFormHotProductUserBucketListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_CONSUMER_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_TOPIC, "*");
        consumer.registerMessageListener(platFormHotProductUserBucketListener);
        consumer.start();
        return consumer;
    }
}

@Component
public class PlatFormHotProductUserBucketListener implements MessageListenerConcurrently {
    @DubboReference(version = "1.0.0")
    private PersonaApi personaApi;

    @Autowired
    private DefaultProducer producer;

    //公用的消息推送线程池
    @Autowired
    @Qualifier("sharedSendMsgThreadPool")
    private SafeThreadPool sharedSendMsgThreadPool;
    
    //并发消费消息
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {
            for (MessageExt messageExt : msgList) {
                String msg = new String(messageExt.getBody());
                PlatformHotProductUserBucketMessage hotProductMessagePushTask = JSON.parseObject(msg , PlatformHotProductUserBucketMessage.class);
                log.info("执行热门商品推送用户桶消息逻辑,消息内容:{}", hotProductMessagePushTask);

                //1.获取本次热门商品推送任务分片中的数据
                String hotGoodString = hotProductMessagePushTask.getHotGoodsVO();
                String personaFilterCondition = hotProductMessagePushTask.getPersonaFilterConditionDTO();
                HotGoodsVO hotGoodsVO = JSON.parseObject(hotGoodString, HotGoodsVO.class);
                log.info("选人条件,内容:{}", personaFilterCondition);
                if (Objects.isNull(personaFilterCondition) || Objects.isNull(hotGoodsVO)) {
                    continue;
                }

                PersonaFilterConditionDTO conditionDTO = JSON.parseObject(personaFilterCondition, PersonaFilterConditionDTO.class);
                Long startUserId = hotProductMessagePushTask.getStartUserId();
                Long endUserId = hotProductMessagePushTask.getEndUserId();

                //分页查询条件
                PersonaConditionWithIdRange page = PersonaConditionWithIdRange.builder()
                    .memberLevel(conditionDTO.getMemberLevel())
                    .memberPoint(conditionDTO.getMemberPoint())
                    .startId(startUserId)
                    .endId(endUserId)
                    .build();

                //2.查询本次推送任务分片对应的用户群体
                //注意:查询的时候,传入查询条件过滤掉不符合条件的用户id

                JsonResult<List<Long>> queryAccountIdsResult = personaApi.getAccountIdsByIdRange(page);
                List<Long> accountIds = queryAccountIdsResult.getData();

                PlatformHotProductMessage hotMessage = PlatformHotProductMessage.builder()
                    .goodsName(hotGoodsVO.getGoodsName())
                    .goodsDesc(hotGoodsVO.getGoodsDesc())
                    .keyWords(hotGoodsVO.getKeyWords())
                    .build();
                int handledBucketCount = 0;
                List<String> messages = new ArrayList<>();
                for (Long accountId : accountIds) {
                    handledBucketCount++;
                    hotMessage.setAccountId(accountId);
                    log.info("构造热门商品MQ消息, hotMessage: {}", hotMessage);
                    messages.add(JSON.toJSONString(hotMessage));
                }
                ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
                while (splitter.hasNext()) {
                    List<String> sendBatch = splitter.next();
                    log.info("本次批次消息数量,{}", sendBatch.size());
                    sharedSendMsgThreadPool.execute(() -> {
                        producer.sendMessages(RocketMqConstant.PLATFORM_HOT_PRODUCT_SEND_TOPIC, sendBatch, "平台热门商品定时任务用户桶消息");
                    });
                }
            }
        } catch (Exception e) {
            log.error("consume error,热门商品通知消费失败", e);
            //这边因为是推送任务,个别失败也可以直接丢弃
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

(3)推送系统消费具体用户的推送消息进行真正推送

监听MQ的如下Topic:

代码语言:javascript代码运行次数:0运行复制
PLATFORM_HOT_PRODUCT_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制
@Configuration
public class ConsumerBeanConfig {
    ...
    @Bean("platformHotProductSendTopicConsumer")
    public DefaultMQPushConsumer platformHotProductConsumer(PlatFormHotProductListener platFormHotProductListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_HOT_PRODUCT_SEND_CONSUMER_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(PLATFORM_HOT_PRODUCT_SEND_TOPIC, "*");
        consumer.registerMessageListener(platFormHotProductListener);
        consumer.start();
        return consumer;
    }
}

@Component
public class PlatFormHotProductListener implements MessageListenerConcurrently {
    //并发消费消息
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {
            for (MessageExt messageExt : msgList) {
                log.debug("执行平台发送通知消息逻辑,消息内容:{}", messageExt.getBody());
                String msg = new String(messageExt.getBody());
                HashMap hotProductMessage = JSON.parseObject(msg , HashMap.class);

                //推送通知
                informByPush(hotProductMessage);
            }
        } catch (Exception e) {
            log.error("consume error,平台优惠券消费失败", e);
            //本次消费失败,下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    //第三方平台推送消息到app
    private void informByPush(HashMap message){
        String messageBody = "速戳!精致小物件,"
            + message.get("keywords")+"!"
            + message.get("goodsName")
            + message.get("goodsDesc");
        log.info("消息推送中:消息内容:{}", messageBody);
    }
}

14.生产环境百万级用户PUSH全链路压测

百万级用户Push生产环境的部署:营销系统部署了5台2核4G机器、推送系统部署了5台2核4G机器、会员系统1台、画像系统1台。

假设会员系统⼀共有150w的⽤户数据,现在开启⼀次全员推送通知的优惠活动,总计MQ的数据量⼤概就是150w。开启⼀个全员活动的时间,⼤概⽤时27分钟左右,即完成了全部消息的推送,整体效率还是算⽐较⾼的。所以如果是千万级别的推送,⼤概也就是需要27 * 5⼤概是3个⼩时左右,和我们的预期是⽐较相似的。

150万的用户推送任务,按1000用户进行分片,那么总共会有1500个分片任务,每个分片任务需处理1000条消息。这1500个人分片任务通过batch合并发送到MQ,也非常快,假设每100个分片合并成一个batch,才15个batch消息。5台推送机器,每台机器开启30个线程,那么总共有150个线程。假设一个线程完成一条消息的推送需要200ms,那么每个线程每秒能推送5条消息,5台机器每秒能推送750条消息。150万 / 750 = 2000s = 30分钟,这就是通过计算预估的结果,和实际的27分钟相差不大。

由于有些场景下,这种全员性质的消息推送,是不需要接收推送结果的。如果直接放弃推送结果的获取操作,效率还能稍微有所提升。

RocketMQ实战—10.营销系统代码优化二

大纲

1.营销系统引入MQ实现异步化来进行性能优化

2.基于MQ释放优惠券提升系统扩展性

3.基于Redis实现重复促销活动去重

4.基于促销活动创建事件实现异步化

5.推送任务分片和分片消息batch合并发送实现

6.推送系统与用户群体查询逻辑解耦

7.查询用户数据以及批量发送推送消息

8.线程池封装以及推送系统多线程推送

9.推送系统的千万级消息多线程推送

10.千万级用户惰性发券代码实现

11.指定用户群体发券的代码实现

12.分片消息的batch合并算法重构实现

13.百万画像群体爆款商品推送代码实现

14.生产环境百万级用户PUSH全链路压测

10.千万级用户惰性发券代码实现

(1)给全量用户发放优惠券的初版实现

(2)给全量用户惰性发放优惠券的优化实现

(1)给全量用户发放优惠券的初版实现

首先营销系统对全量用户发放优惠券的任务进行分片,然后将分片的消息发送到如下Topic。

代码语言:javascript代码运行次数:0运行复制
PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC
代码语言:javascript代码运行次数:0运行复制
@RestController
@RequestMapping("/demo/promotion/coupon")
public class PromotionCouponController {
    //优惠活动service
    @Autowired
    private CouponService couponService;
    
    //新增一个优惠券活动
    @PostMapping
    public JsonResult<SaveOrUpdateCouponDTO> saveOrUpdateCoupon(@RequestBody SaveOrUpdateCouponRequest request) {
        try {
            log.info("新增一条优惠券:{}", JSON.toJSONString(request));
            SaveOrUpdateCouponDTO dto = couponService.saveOrUpdateCoupon(request);
            return JsonResult.buildSuccess(dto);
        } catch (BaseBizException e) {
            log.error("biz error: request={}", JSON.toJSONString(request), e);
            return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
        } catch (Exception e) {
            log.error("system error: request={}", JSON.toJSONString(request), e);
            return JsonResult.buildError(e.getMessage());
        }
    }
    ...
}

//优惠券接口实现
@Service
public class CouponServiceImpl implements CouponService {
    ...
    //保存/修改优惠券活动方法
    @Transactional(rollbackFor = Exception.class)
    @Override
    public SaveOrUpdateCouponDTO saveOrUpdateCoupon(SaveOrUpdateCouponRequest request) {
        SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(request);
        couponDO.setCouponReceivedCount(0);
        salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO);
      
        //为所有用户发放优惠券
        sendPlatformCouponMessage(couponDO);
      
        SaveOrUpdateCouponDTO dto = new SaveOrUpdateCouponDTO();
        dto.setCouponName(request.getCouponName());
        dto.setRule(request.getCouponRule());
        dto.setSuccess(true);
        return dto;
    }
    ...
    //为所有用户发放优惠券
    private void sendPlatformCouponMessage(SalesPromotionCouponDO promotionCouponDO) {
        //桶的大小
        final int userBucketSize = 1000;
        final int messageBatchSize = 100;

        //1.查询出库里面最大的userId,作为用户的总数量
        JsonResult<Long> maxUserIdJsonResult = accountApi.queryMaxUserId();
        if (maxUserIdJsonResult.getSuccess()) {
            throw new BaseBizException(maxUserIdJsonResult.getErrorCode(), maxUserIdJsonResult.getErrorMessage());
        }
        Long maxUserId = maxUserIdJsonResult.getData();

        //2.分成m个桶,每个桶里面有n个用户,每个桶发送一条"批量发送优惠券用户桶消息"
        Map<Long, Long> userBuckets = new LinkedHashMap<>();
        AtomicBoolean flagRef = new AtomicBoolean(true);
        long startUserId = 1L;
        while (flagRef.get()) {
            if (startUserId > maxUserId) {
                flagRefpareAndSet(true, false);
            }
            userBuckets.put(startUserId, startUserId + userBucketSize);
            startUserId += userBucketSize;
        }

        //3.批量发送消息
        //例:userBucketCount = 1000; messageBatchSize = 100
        //批量发送次数 = 10次,经过两次分桶,这里发送消息的次数从100w次降到10次
        int handledBucketCount = 0;
        List<String> jsonMessageBatch = new ArrayList<>(messageBatchSize);
        for (Map.Entry<Long, Long> userBucket : userBuckets.entrySet()) {
            handledBucketCount++;
            PlatformCouponUserBucketMessage message = PlatformCouponUserBucketMessage.builder()
                .startUserId(userBucket.getKey())
                .endUserId(userBucket.getValue())
                .informType(promotionCouponDO.getInformType())
                .couponId(promotionCouponDO.getId())
                .activityStartTime(promotionCouponDO.getActivityStartTime())
                .activityEndTime(promotionCouponDO.getActivityEndTime())
                .couponType(promotionCouponDO.getCouponType())
                .build();
            String jsonMessage = JsonUtil.object2Json(message);
            jsonMessageBatch.add(jsonMessage);

            if (jsonMessageBatch.size() == messageBatchSize || handledBucketCount == userBuckets.size()) {
                defaultProducer.sendMessages(RocketMqConstant.PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC, jsonMessageBatch, "平台发放优惠券用户桶消息");
                jsonMessageBatch.clear();
            }
        }
    }
}

接着营销系统监听如下Topic消费分片后的发放优惠券任务。

代码语言:javascript代码运行次数:0运行复制
PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC

此时有两种处理方法:一.直接使用线程池进行发送发放优惠券消息到MQ。二.合并batch后再使用线程池发送MQ。

代码语言:javascript代码运行次数:0运行复制
@Configuration
public class ConsumerBeanConfig {
    ...
    //平台发放优惠券用户桶消费者
    @Bean("platformCouponUserBucketReceiveTopicConsumer")
    public DefaultMQPushConsumer receiveCouponUserBucketConsumer(@Qualifier("platformCouponUserBucketReceiveTopicConsumer")PlatFormCouponUserBucketListener platFormCouponUserBucketListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_COUPON_SEND_USER_BUCKET_CONSUMER_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC, "*");
        consumer.registerMessageListener(platFormCouponUserBucketListener);
        consumer.start();
        return consumer;
    }
}

@Component
public class PlatFormCouponUserBucketListener implements MessageListenerConcurrently {
    //账户服务
    @DubboReference(version = "1.0.0")
    private AccountApi accountApi;

    //发送消息共用的线程池
    @Autowired
    @Qualifier("sharedSendMsgThreadPool")
    private SafeThreadPool sharedSendMsgThreadPool;

    //RocketMQ生产者
    @Autowired
    private DefaultProducer defaultProducer;

    //并发消费消息
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {
            for (MessageExt messageExt : msgList) {
                //1.反序列化消息
                String messageString = new String(messageExt.getBody());
                log.debug("执行平台发送优惠券用户桶消息逻辑,消息内容:{}", messageString);
                PlatformCouponUserBucketMessage message = JSON.parseObject(messageString, PlatformCouponUserBucketMessage.class);

                //2.查询桶内的用户信息
                Long startUserId = message.getStartUserId();
                Long endUserId = message.getEndUserId();
                JsonResult<List<MembershipAccountDTO>> accountBucketResult = accountApi.queryAccountByIdRange(startUserId, endUserId);
                if (!accountBucketResult.getSuccess()) {
                    throw new BaseBizException(accountBucketResult.getErrorCode(), accountBucketResult.getErrorMessage());
                }
                List<MembershipAccountDTO> accountBucket = accountBucketResult.getData();
                if (CollectionUtils.isEmpty(accountBucket)) {
                    log.info("根据用户桶内的id范围没有查询到用户, startUserId={}, endUserId{}", startUserId, endUserId);
                    continue;
                }

                //3.每个用户发送一条"平台发送优惠券消息"
                //方法一:直接使用线程池进行发送发放优惠券消息到MQ;
                //这里是并行消费的,以上逻辑已经是并行执行的了,而且有查库的操作
                //accountBucket 默认是 1000 个用户,要为每一个用户都发送一条"平台发送优惠券消息",也就是1000条消息
                //下面我们使用线程池来并行发送这1000条消息(ps:另一种也可以像发送优惠券用户桶消息一样用批量发送)
                PlatformCouponMessage couponMessage = PlatformCouponMessage.builder()
                    .couponId(message.getCouponId())
                    .activityStartTime(message.getActivityStartTime())
                    .activityEndTime(message.getActivityEndTime())
                    .couponType(message.getCouponType())
                    .build();
                for (MembershipAccountDTO account : accountBucket) {
                    sharedSendMsgThreadPool.execute(() -> {
                        couponMessage.setUserAccountId(account.getId());
                        String jsonMessage = JSON.toJSONString(couponMessage);
                        defaultProducer.sendMessage(RocketMqConstant.PLATFORM_COUPON_SEND_TOPIC, jsonMessage, "平台发送优惠券消息");
                    });
                }

                //方法二:合并batch后再使用线程池发送MQ
                /*List<String> messages = new ArrayList<>(100);
                for (MembershipAccountDTO account : accountBucket) {
                    couponMessage.setUserAccountId(account.getId());
                    messages.add(JSON.toJSONString(couponMessage));
                    if (messages.size() == 100) {
                        sharedSendMsgThreadPool.execute(() -> {
                            defaultProducer.sendMessages(RocketMqConstant.PLATFORM_COUPON_SEND_TOPIC, messages, "平台发送优惠券消息");
                        });
                        messages.clear();
                    }
                }
                //最后剩下的也批量发出
                if (!CollectionUtils.isEmpty(messages)) {
                    sharedSendMsgThreadPool.execute(() -> {
                        defaultProducer.sendMessages(RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, messages, "平台发送促销活动消息");
                    });
                    messages.clear();
                }*/
            }
        } catch (Exception e){
            log.error("consume error,平台优惠券消费失败", e);
            //本次消费失败,下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

最后营销系统监听如下Topic,然后通过线程池对每个用户发放优惠券。

代码语言:javascript代码运行次数:0运行复制
PLATFORM_PROMOTION_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制
@Configuration
public class ConsumerBeanConfig {
    ...
    //平台发放优惠券领取消费者
    @Bean("platformCouponReceiveTopicConsumer")
    public DefaultMQPushConsumer receiveCouponConsumer(PlatFormCouponListener platFormCouponListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_COUPON_SEND_CONSUMER_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(PLATFORM_COUPON_SEND_TOPIC, "*");
        consumer.registerMessageListener(platFormCouponListener);
        consumer.start();
        return consumer;
    }
}

@Component
public class PlatFormCouponListener implements MessageListenerConcurrently {
    //优惠券服务service
    @Autowired
    private CouponItemService couponItemService;

    //测试completableFuture使用commonPool的时是不需要初始化业务ThreadPoolExecutor的
    //这里用supplier懒加载让测试completableFuture使用commonPool时不要初始化线程池
    //只有当使用completableFuture使用自定义的线程时才初始化线程池
    private static final int PERMITS = 30;
    private static final AtomicBoolean initializedRef = new AtomicBoolean(false);
    private static ThreadPoolExecutor THREAD_POOL_EXECUTOR = null;
    private static final Supplier<ThreadPoolExecutor> THREAD_POOL_EXECUTOR_SUPPLIER = () -> {
        if (initializedRefpareAndSet(false, true)) {
            THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(PERMITS, PERMITS * 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), NamedDaemonThreadFactory.getInstance("consumeCouponMsg"), new ThreadPoolExecutor.CallerRunsPolicy());
        }
        return THREAD_POOL_EXECUTOR;
    };

    //并发消费消息
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {
            //方式一:使用默认的commonPool来处理任务
            //supplyAsync(Supplier<U> supplier) API
            //默认使用的是 ForkJoinPoolmonPool() 这个线程池
            //该线程池在jvm内是唯一的,默认的线程数量是cpu的核数减1
            //如果觉得线程数不够用可以通过jvm系统参数 java.util.concurrent.ForkJoinPoolmon.parallelism 的值调整commonPool的并行度,或者采用方式二
            List<CompletableFuture<SalesPromotionCouponItemDTO>> futureList = msgList.stream()
                .map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e)))
                .collect(Collectors.toList());

            //方式二:使用自定的业务线程池来处理任务
            //List<CompletableFuture<SalesPromotionCouponItemDTO>> futureList = msgList.stream()
            //     .map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e), THREAD_POOL_EXECUTOR_SUPPLIER.get()))
            //     .collect(Collectors.toList());

            List<SalesPromotionCouponItemDTO> couponItemDTOList = futureList.stream()
                .map(CompletableFuture::join)
                .filter(Objects::nonNull)
                .collect(Collectors.toList());

            //优惠券保存到数据库
            couponItemService.saveCouponBatch(couponItemDTOList);
        } catch (Exception e) {
            log.error("consume error,平台优惠券消费失败", e);
            //本次消费失败,下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    public SalesPromotionCouponItemDTO handleMessageExt(MessageExt messageExt) {
        log.debug("执行平台发放优惠券消费消息逻辑,消息内容:{}", messageExt.getBody());
        String msg = new String(messageExt.getBody());
        PlatformCouponMessage platformCouponMessage = JSON.parseObject(msg , PlatformCouponMessage.class);
        log.info("开始发放平台优惠券,couponId:{}", platformCouponMessage.getCouponId());

        //幂等逻辑防止重复消费
        JsonResult<Long> result = couponItemService.selectByAccountIdAndCouponId(platformCouponMessage.getUserAccountId(), platformCouponMessage.getCouponId());
        //如果已经存在,直接跳过循环,不再执行优惠券保存操作
        if (result.getSuccess()) {
            return null;
        }

        SalesPromotionCouponItemDTO itemDTO = new SalesPromotionCouponItemDTO();
        itemDTO.setCouponId(platformCouponMessage.getCouponId());
        itemDTO.setCouponType(platformCouponMessage.getCouponType());
        itemDTO.setUserAccountId(platformCouponMessage.getUserAccountId());
        itemDTO.setIsUsed(0);
        itemDTO.setActivityStartTime(platformCouponMessage.getActivityStartTime());
        itemDTO.setActivityEndTime(platformCouponMessage.getActivityEndTime());
        return itemDTO;
    }
}

(2)给全量用户惰性发放优惠券的优化实现

一.首先需要配置好要使用的Redis相关Bean

代码语言:javascript代码运行次数:0运行复制
@Data
@Configuration
@ConditionalOnClass(RedisConnectionFactory.class)
public class RedisConfig {
    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;
    @Value("${spring.redis.password}")
    private String password;
    @Value("${spring.redis.timeout}")
    private int timeout;

    @Bean
    @ConditionalOnClass(RedisConnectionFactory.class)
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setDefaultSerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    @Bean
    @ConditionalOnClass(RedissonClient.class)
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer()
            .setAddress("redis://" + host + ":" + port)
            .setPassword(password)
            .setConnectionMinimumIdleSize(10)
            .setConnectionPoolSize(100)
            .setIdleConnectionTimeout(600000)
            .setSubscriptionConnectionMinimumIdleSize(10)
            .setSubscriptionConnectionPoolSize(100)
            .setTimeout(timeout);

        config.setCodec(new StringCodec());
        config.setThreads(5);
        config.setNettyThreads(5);

        RedissonClient client = Redisson.create(config);
        return client;
    }

    @Bean
    @ConditionalOnClass(RedisConnectionFactory.class)
    public RedisCache redisCache(RedisTemplate redisTemplate) {
        return new RedisCache(redisTemplate);
    }
}

二.然后实现基于Redisson分布式锁维护优惠券缓存列表,以及惰性优惠券过期缓存清理

其实就是新增优惠券时,将优惠券信息写入Redis,并检查优惠券是否过期,如果过期就进行删除。

代码语言:javascript代码运行次数:0运行复制
//优惠券接口实现
@Service
public class CouponServiceImpl implements CouponService {
    //Redis客户端工具
    @Autowired
    private RedisCache redisCache;
  
    @Autowired
    private RedissonClient redissonClient;
    
    ...
    //保存/修改优惠券活动方法
    @Transactional(rollbackFor = Exception.class)
    @Override
    public SaveOrUpdateCouponDTO saveOrUpdateCoupon(SaveOrUpdateCouponRequest request) {
        SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(request);
        couponDO.setCouponReceivedCount(0);
        salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO);

        //判断优惠券类型
        if (CouponSendTypeEnum.PLATFORM_SEND.getCode().equals(request.getCouponReceiveType())) {
            //一.如果是系统发放类型,则针对所有用户,发送优惠券到MQ
            writeCouponToRedis(couponDO);
        } else {
            //二.如果是自己领取类型
            //TODO
        }

        SaveOrUpdateCouponDTO dto = new SaveOrUpdateCouponDTO();
        dto.setCouponName(request.getCouponName());
        dto.setRule(request.getCouponRule());
        dto.setSuccess(true);
        return dto;
    }
    
    private void writeCouponToRedis(SalesPromotionCouponDO coupon) {
        //首先需要用Redisson基于Redis做一个分布式锁的加锁PROMOTION_COUPON_ID_LIST_LOCK
        //再去维护一个"一共发出去了多少张券"的数据结构PROMOTION_COUPON_ID_LIST
        RLock lock = redissonClient.getLock(RedisKey.PROMOTION_COUPON_ID_LIST_LOCK);
        try {
            //进行加锁,超时时间为60s释放
            lock.lock(60, TimeUnit.SECONDS);
            List<Long> couponIds = null;

            String couponIdsJSON = redisCache.get(RedisKey.PROMOTION_COUPON_ID_LIST);
            if (couponIdsJSON == null || couponIdsJSON.equals("")) {
                couponIds = new ArrayList<>();
            } else {
                couponIds = JSON.parseObject(couponIdsJSON, List.class);
            }

            //检查每个优惠券时间是否过期了,如果过期或者已经发完了券,则把它从List里删除,以及从Redis里删除
            //如果是全量发券,则不会发完,因此可以给所有人发,如果超过了时间才不能发券
            if (couponIds.size() > 0) {
                Iterator<Long> couponIdIterator = couponIds.iterator();
                while (couponIdIterator.hasNext()) {
                    Long tempCouponId = couponIdIterator.next();
                    String tempCouponJSON = redisCache.get(RedisKey.PROMOTION_COUPON_KEY + "::" + tempCouponId);
                    SalesPromotionCouponDO tempCoupon = JSON.parseObject(tempCouponJSON, SalesPromotionCouponDO.class);

                    Date now = new Date();
                    if (now.after(tempCoupon.getActivityEndTime())) {
                        couponIdIterator.remove();
                        redisCache.delete(RedisKey.PROMOTION_COUPON_KEY + "::" + tempCouponId);
                    }
                }
            }

            couponIds.add(coupon.getId());
            couponIdsJSON = JsonUtil.object2Json(couponIds);
            redisCache.set(RedisKey.PROMOTION_COUPON_ID_LIST, couponIdsJSON, -1);

            String couponJSON = JsonUtil.object2Json(coupon);
            redisCache.set(RedisKey.PROMOTION_COUPON_KEY + "::" + coupon.getId(), couponJSON, -1);
        } finally {
            lock.unlock();
        }
    }
}

三.接着实现会员系统发布用户登录事件 + 营销系统在用户登录后的惰性发券

会员系统在用户登录时会发送消息到MQ的USER_LOGINED_EVENT_TOPIC:

代码语言:javascript代码运行次数:0运行复制
@RestController
@RequestMapping("/demo/membership")
public class MembershipController {
    @Autowired
    private DefaultProducer defaultProducer;

    @Autowired
    private MembershipAccountService accountService;

    //触发用户登录
    @PostMapping("/triggerUserLoginEvent")
    public JsonResult<Boolean> triggerUserLoginEvent(Long accountId) {
        try {
            List<MembershipAccountDTO> accounts = accountService.queryAccountByIdRange(accountId, accountId);
            if (accounts != null && accounts.size() > 0) {
                MembershipAccountDTO account = accounts.get(0);
                UserLoginedEvent userLoginedEvent = new UserLoginedEvent();
                userLoginedEvent.setAccount(account);

                String userLoginedEventJSON = JsonUtil.object2Json(userLoginedEvent);
                defaultProducer.sendMessage(RocketMqConstant.USER_LOGINED_EVENT_TOPIC, userLoginedEventJSON, "用户登录事件发生了");
            }
            return JsonResult.buildSuccess(true);
        } catch (BaseBizException e) {
            log.error("biz error: request={}", accountId, e);
            return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
        } catch (Exception e) {
            log.error("system error: request={}", accountId, e);
            return JsonResult.buildError(e.getMessage());
        }
    }
}

营销系统监听USER_LOGINED_EVENT_TOPIC对用户登录时发送的登录事件消息进行惰性发券:

代码语言:javascript代码运行次数:0运行复制
@Configuration
public class ConsumerBeanConfig {
    ...
    @Bean("userLoginedEventListener")
    public DefaultMQPushConsumer userLoginedEventListener(UserLoginedEventListener userLoginedEventListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(USER_LOGINED_EVENT_CONSUMER_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(USER_LOGINED_EVENT_TOPIC, "*");
        consumer.registerMessageListener(userLoginedEventListener);
        consumer.start();
        return consumer;
    }
}

//用户登录事件监听器
@Component
public class UserLoginedEventListener implements MessageListenerConcurrently {
    @Autowired
    private RedisCache redisCache;

    @Autowired
    private SalesPromotionCouponItemDAO couponItemDAO;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            //从Redis缓存里查询所有的优惠券
            String couponIdsJSON = redisCache.get(RedisKey.PROMOTION_COUPON_ID_LIST);
            List<Long> couponIds = JSON.parseObject(couponIdsJSON, List.class);
            List<SalesPromotionCouponDO> coupons = new ArrayList<>();

            for (Long couponId : couponIds) {
                String couponJSON = redisCache.get(RedisKey.PROMOTION_COUPON_KEY + "::" + couponId);
                SalesPromotionCouponDO coupon = JSON.parseObject(couponJSON, SalesPromotionCouponDO.class);
                Date now = new Date();
                if (now.after(coupon.getActivityStartTime()) && now.before(coupon.getActivityEndTime())) {
                    coupons.add(coupon);
                }
            }

            for (MessageExt messageExt : list) {
                //这个代码就可以拿到一个刚刚登陆成功的用户
                String message = new String(messageExt.getBody());
                UserLoginedEvent userLoginedEvent = JSON.parseObject(message, UserLoginedEvent.class);
                MembershipAccountDTO account = userLoginedEvent.getAccount();

                //遍历每一个优惠券,检查这个优惠券是否有效,是否还可以继续进行发券,以及当前用户是否发过券,然后才给用户进行发券
                for (SalesPromotionCouponDO coupon : coupons) {
                    String receiveCouponFlag = redisCache.get(RedisKey.PROMOTION_USER_RECEIVE_COUPON + "::" + account.getId() + "::" + coupon.getId());
                    if (receiveCouponFlag == null || receiveCouponFlag.equals("")) {
                        SalesPromotionCouponItemDO couponItem = new SalesPromotionCouponItemDO();
                        couponItem.setActivityEndTime(coupon.getActivityEndTime());
                        couponItem.setActivityStartTime(coupon.getActivityStartTime());
                        couponItem.setCouponId(coupon.getId());
                        couponItem.setCouponType(coupon.getCouponType());
                        couponItem.setCreateTime(new Date());
                        couponItem.setCreateUser(account.getId());
                        couponItem.setIsUsed(0);
                        couponItem.setUpdateTime(new Date());
                        couponItem.setUpdateUser(account.getId());
                        couponItem.setUserAccountId(account.getId());
                        couponItemDAO.receiveCoupon(couponItem);
                        redisCache.set(RedisKey.PROMOTION_USER_RECEIVE_COUPON + "::" + account.getId() + "::" + coupon.getId(), "true", -1);
                    }
                }
            }
        } catch(Exception e) {
            log.error("consume error, 用户登录事件处理异常", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

11.指定用户群体发券的代码实现

(1)千万级用户推送和发放优惠券的方案总结

(2)指定用户群体发券实现

(1)千万级用户推送和发放优惠券的方案总结

创建促销活动和发放优惠券时,都不是直接进行推送和发放的,而是利用RocketMQ进行多次中转、异步化处理。

营销系统首先会查出用户总数,然后进行任务分片,接着把分片任务消息通过batch合并发送到MQ(异步化提升性能)。

营销系统会消费这些分片任务消息,并查询出用户和封装好每个用户的消息,然后发到MQ(解耦会员系统和推送系统)。

推送系统最后会消费每个用户的推送消息,并基于线程池采用多线程并发进行推送。

(2)指定用户群体发券实现

典型的例子就是为激活百万不活跃用户发放优惠券。

一.营销系统创建指定用户群体发券的入口代码

代码语言:javascript代码运行次数:0运行复制
@RestController
@RequestMapping("/demo/promotion/coupon")
public class PromotionCouponController {
    ...
    @RequestMapping("/send")
    public JsonResult<SendCouponDTO> sendCouponByConditions(@RequestBody SendCouponRequest request) {
        try {
            log.info("发送优惠券给指定用户群体:{}", JSON.toJSONString(request));
            SendCouponDTO dto = couponService.sendCouponByConditions(request);
            return JsonResult.buildSuccess(dto);
        } catch (BaseBizException e) {
            log.error("biz error: request={}", JSON.toJSONString(request), e);
            return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
        } catch (Exception e) {
            log.error("system error: request={}", JSON.toJSONString(request), e);
            return JsonResult.buildError(e.getMessage());
        }
    }
}

@Service
public class CouponServiceImpl implements CouponService {
    @DubboReference(version = "1.0.0")
    private MessagePushApi messagePushApi;
    ...
    
    @Transactional(rollbackFor = Exception.class)
    @Override
    public SendCouponDTO sendCouponByConditions(SendCouponRequest sendCouponRequest) {
        //保存优惠券信息
        SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(sendCouponRequest);
        couponDO.setCouponReceivedCount(0);
        couponDO.setCouponStatus(CouponStatusEnum.NORMAL.getCode());
        couponDO.setCouponReceiveType(CouponSendTypeEnum.SELF_RECEIVE.getCode());
        salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO);

        //分片和批量发送发放优惠券消息
        shardBatchSendCouponMessage(sendCouponRequest);

        SendCouponDTO sendCouponDTO = new SendCouponDTO();
        sendCouponDTO.setSuccess(Boolean.TRUE);
        sendCouponDTO.setCouponName(sendCouponRequest.getCouponName());
        sendCouponDTO.setRule(sendCouponRequest.getCouponRule());

        //TODO 发放数量
        sendCouponDTO.setSendCount(0);
        return sendCouponDTO;
    }
}

二.营销系统分片和批量发送发券消息的代码

其中要去画像系统获取用户信息,然后根据用户数量进行分片,接着进行批量发送,发送到MQ的如下Topic。

代码语言:javascript代码运行次数:0运行复制
PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC
代码语言:javascript代码运行次数:0运行复制
@Service
public class CouponServiceImpl implements CouponService {
    ...
    //分片和批量发送发放优惠券消息
    private void shardBatchSendCouponMessage(SendCouponRequest sendCouponRequest) {
        //1.到画像系统获取当前条件下的count值
        MembershipFilterDTO membershipFilterDTO = sendCouponRequest.getMembershipFilterDTO();
        PersonaFilterConditionDTO conditionDTO = conditionConverter.convertFilterCondition(membershipFilterDTO);
        JsonResult<Integer> countResult = personaApi.countByCondition(conditionDTO);
        if (!countResult.getSuccess()) {
            throw new BaseBizException(countResult.getErrorCode(), countResult.getErrorMessage());
        }

        //2.根据count值分片
        //分成m个分片,每个分片中包含:(1)分片ID;(2)用户个数;
        //例:maxUserId = 100w; userBucketSize=1000
        //userBucket1 = [1, 1000)
        //userBucket2 = [2, 1000)
        //userBucket2 = [n, 756),最后一个分片可能数量不足1000
        //userBucketCount = 1000
        Integer count = countResult.getData();
        Map<Integer, Integer> userBuckets = new LinkedHashMap<>();
        AtomicBoolean flagRef = new AtomicBoolean(true);
        Integer shardId = 1;
        while (flagRef.get()) {
            if (USER_BUCKET_SIZE > count) {
                userBuckets.put(shardId, USER_BUCKET_SIZE);
                flagRefpareAndSet(true, false);
            }
            userBuckets.put(shardId, USER_BUCKET_SIZE);
            shardId += 1;
            count -= USER_BUCKET_SIZE;
        }

        //3.批量发送消息
        //例:userBucketCount = 1000; messageBatchSize = 100
        List<String> messages = new ArrayList<>();
        PlatformPromotionConditionUserBucketMessage message = PlatformPromotionConditionUserBucketMessage.builder().personaFilterCondition(JSON.toJSONString(conditionDTO)).build();
        for (Map.Entry<Integer, Integer> userBucket : userBuckets.entrySet()) {
            message.setShardId(userBucket.getKey());
            message.setBucketSize(userBucket.getValue());
            String jsonMessage = JsonUtil.object2Json(message);
            messages.add(jsonMessage);
        }
        log.info("本次推送消息数量,{}",messages.size());

        ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
        while (splitter.hasNext()) {
            List<String> sendBatch = splitter.next();
            log.info("本次批次消息数量,{}",sendBatch.size());
            sharedSendMsgThreadPool.execute(() -> {
                defaultProducer.sendMessages(RocketMqConstant.PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC, sendBatch, "部分用户优惠活动用户桶消息");
            });
        }
    }
}

三.营销系统对指定用户群体分片消息的处理和推送代码

首先,营销系统会消费MQ的如下Topic:

代码语言:javascript代码运行次数:0运行复制
PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC

然后,营销系统会把领取优惠券的消息会发送到MQ的如下Topic:

代码语言:javascript代码运行次数:0运行复制
PLATFORM_CONDITION_COUPON_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制
@Configuration
public class ConsumerBeanConfig {
    ...
    @Bean("platFormConditionCouponUserBucketConsumer")
    public DefaultMQPushConsumer platFormConditionCouponUserBucketConsumer(PlatFormConditionCouponUserBucketListener platFormPromotionUserBucketListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_CONSUMER_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC, "*");
        consumer.registerMessageListener(platFormPromotionUserBucketListener);
        consumer.start();
        return consumer;
    }
}

@Component
public class PlatFormConditionCouponUserBucketListener implements MessageListenerConcurrently {
    //用户画像服务
    @DubboReference(version = "1.0.0")
    private PersonaApi personaApi;

    //发送消息共用的线程池
    @Autowired
    @Qualifier("sharedSendMsgThreadPool")
    private SafeThreadPool sharedSendMsgThreadPool;

    //RocketMQ生产者
    @Autowired
    private DefaultProducer defaultProducer;

    //并发消费消息
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {
            for (MessageExt messageExt : msgList) {
                //1.反序列化消息
                String messageString = new String(messageExt.getBody());
                log.debug("部分用户领取优惠券用户桶消息逻辑,消息内容:{}", messageString);
                PlatformPromotionConditionUserBucketMessage message = JSON.parseObject(messageString, PlatformPromotionConditionUserBucketMessage.class);

                //2.查询桶内的用户信息
                Integer shardId = message.getShardId();

                //根据分片id,和分片数量大小,计算出本次分片的起始userId
                Long startUserId = (shardId.longValue() - 1) * 1000;
                Integer bucketSize = message.getBucketSize();
                String personaFilterCondition = message.getPersonaFilterCondition();
                PersonaFilterConditionDTO personaFilterConditionDTO = JSON.parseObject(personaFilterCondition, PersonaFilterConditionDTO.class);

                //封装查询用户id的条件
                PersonaConditionPage page = PersonaConditionPage.builder()
                    .memberPoint(personaFilterConditionDTO.getMemberPoint())
                    .memberLevel(personaFilterConditionDTO.getMemberLevel())
                    .offset(startUserId)
                    .limit(bucketSize)
                    .build();

                //从用户画像系统查询用户账号id
                JsonResult<List<Long>> accountIdsResult = personaApi.getAccountIdsByIdLimit(page);
                if (!accountIdsResult.getSuccess()) {
                    throw new BaseBizException(accountIdsResult.getErrorCode(), accountIdsResult.getErrorMessage());
                }

                List<Long> accountIds = accountIdsResult.getData();
                if (CollectionUtils.isEmpty(accountIds)) {
                    log.info("根据用户桶内的分片信息没有查询到用户, shardId={}", shardId);
                    continue;
                }

                //3.每个用户发送一条领取优惠券的消息通知
                PlatformMessagePushMessage pushMessage = PlatformMessagePushMessage.builder()
                    .message("恭喜您获得优惠券领取资格,点击www.wjunt进入活动页面")
                    .mainMessage("获得优惠券领取资格")
                    .informType(InformTypeEnum.APP.getCode())
                    .build();

                List<String> messages = new ArrayList<>();
                for (Long accountId : accountIds) {
                    pushMessage.setUserAccountId(accountId);
                    messages.add(JSON.toJSONString(pushMessage));
                }
                log.info("本次推送消息数量,{}",messages.size());

                ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
                while (splitter.hasNext()) {
                    List<String> sendBatch = splitter.next();
                    sharedSendMsgThreadPool.execute(() -> {
                        defaultProducer.sendMessages(RocketMqConstant.PLATFORM_CONDITION_COUPON_SEND_TOPIC, sendBatch, "平台发送优惠券消息");
                    });
                }
            }
        } catch (Exception e){
            log.error("consume error,消费失败", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

四.推送系统会监听如下Topic消费消息推送领取优惠券的消息

代码语言:javascript代码运行次数:0运行复制
PLATFORM_CONDITION_COUPON_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制
@Configuration
public class ConsumerBeanConfig {
    ...
    @Bean("platFormConditionCouponConsumer")
    public DefaultMQPushConsumer platFormConditionCouponConsumer(PlatFormConditionCouponListener platFormPromotionListener) throws MQClientException {
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_CONDITION_COUPON_SEND_CONSUMER_GROUP);
      consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
      consumer.subscribe(PLATFORM_CONDITION_COUPON_SEND_TOPIC, "*");
      consumer.registerMessageListener(platFormPromotionListener);
      consumer.start();
      return consumer;
    }
}

@Component
public class PlatFormConditionCouponListener implements MessageListenerConcurrently {
    //消息推送工厂提供者
    @Autowired
    private FactoryProducer factoryProducer;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    //并发消费消息
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {
            //方式二:使用自定的业务线程池来处理任务
            List<CompletableFuture<AltResult>> futureList = msgList.stream()
                .map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e)))
                .collect(Collectors.toList());

            List<Throwable> resultList = futureList.stream()
                .map(CompletableFuture::join)
                .filter(e -> e.ex != null)
                .map(e -> e.ex)
                .collect(Collectors.toList());
        } catch (Exception e) {
            log.error("consume error,消费失败", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private AltResult handleMessageExt(MessageExt messageExt) {
        try {
            log.debug("执行平台发送通知消息逻辑,消息内容:{}", messageExt.getBody());
            String msg = new String(messageExt.getBody());
            PlatformMessagePushMessage message = JSON.parseObject(msg , PlatformMessagePushMessage.class);

            //幂等控制
            if (StringUtils.isNotBlank(redisTemplate.opsForValue().get(PROMOTION_CONDITION_COUPON_KEY + message.getUserAccountId()))) {
                return new AltResult(null);
            }

            //获取消息服务工厂
            MessageSendServiceFactory messageSendServiceFactory = factoryProducer.getMessageSendServiceFactory(message.getInformType());

            //消息发送服务组件
            MessageSendService messageSendService = messageSendServiceFactory.createMessageSendService();

            //构造消息
            PlatformMessagePushMessage messagePushMessage = PlatformMessagePushMessage.builder()
                .informType(message.getInformType())
                .mainMessage(message.getMainMessage())
                .userAccountId(message.getUserAccountId())
                .message(message.getMessage())
                .build();

            MessageSendDTO messageSendDTO = messageSendServiceFactory.createMessageSendDTO(messagePushMessage);
            messageSendService.send(messageSendDTO);

            //发送成功之后把已经发送成功记录到redis
            redisTemplate.opsForValue().set(PROMOTION_CONDITION_COUPON_KEY + message.getUserAccountId(), UUID.randomUUID().toString());
            log.info("消息推送完成,messageSendDTO:{}", messageSendDTO);

            return new AltResult(null);
        } catch (Exception e) {
            return new AltResult(e);
        }
    }

    //completableFuture的返回结果,适用于无返回值的情况
    //ex字段为null表示任务执行成功
    //ex字段不为null表示任务执行失败,并把异常设置为ex字段
    private static class AltResult {
        final Throwable ex;
        public AltResult(Throwable ex) {
            this.ex = ex;
        }
    }
}

12.分片消息的batch合并算法重构实现

之前的batch合并是按照条数进行合并的,现在重构为按照合并后的大小不超过800KB和不超过100条进行合并。

代码语言:javascript代码运行次数:0运行复制
public class ListSplitter implements Iterator<List<String>> {
    //设置每一个batch最多不超过800k,因为RocketMQ官方推荐,一条消息不建议长度超过1MB
    //而封装一个RocketMQ的message,包括了MessageBody, Topic,Addr等数据,所以设置小一点
    private int sizeLimit = 800 * 1024;
    private final List<String> messages;
    private int currIndex;
    private int batchSize = 100;

    public ListSplitter(List<String> messages, Integer batchSize) {
        this.messages = messages;
        this.batchSize = batchSize;
    }

    public ListSplitter(List<String> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    //每次从list中取一部分
    @Override
    public List<String> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            String message = messages.get(nextIndex);
            //获取每条message的长度
            int tmpSize = message.length();
            if (tmpSize > sizeLimit) {
                if (nextIndex - currIndex == 0) {
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > sizeLimit || (nextIndex - currIndex) == batchSize ) {
                break;
            } else {
                totalSize += tmpSize;
            }
        }
        List<String> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("Not allowed to remove");
    }
}

13.百万画像群体爆款商品推送代码实现

(1)营销系统的定时推送任务

(2)推送系统消费分片推送任务消息,发送具体用户的推送消息到MQ

(3)推送系统消费具体用户的推送消息进行真正推送

(1)营销系统的定时推送任务

代码语言:javascript代码运行次数:0运行复制
@Component
public class ScheduleSendMessageJobHandler {
    ...
    //执行定时任务,筛选热门商品和用户发送给MQ
    @XxlJob("hotGoodsPushHandler")
    public void hotGoodsPushHandler() {
        log.info("hotGoodsPushHandler 开始执行");

        //获取热门商品和用户画像,业务先简化为一对一关系
        List<HotGoodsCrontabDO> crontabDOs = hotGoodsCrontabDAO.queryHotGoodsCrontabByCrontabDate(new Date());
        log.info("获取热门商品和用户画像数据, crontabDOs:{}", JsonUtil.object2Json(crontabDOs));

        //找出每个热门商品对应画像所匹配的用户
        for (HotGoodsCrontabDO crontabDO : crontabDOs) {
            log.info("自动分片逻辑, 当前任务:crontabDO:{}", JsonUtil.object2Json(crontabDO));
            if (StringUtils.isEmpty(crontabDO.getPortrayal())) {
                continue;
            }

            //热门商品对应的画像实体
            MembershipPointDTO membershipPointDTO = JsonUtil.json2Object(crontabDO.getPortrayal(), MembershipPointDTO.class);
            if (Objects.isNull(membershipPointDTO)) {
                continue;
            }

            //获取匹配画像的用户实体
            MembershipFilterConditionDTO conditionDTO = buildCondition(membershipPointDTO);
            PersonaFilterConditionDTO personaFilterConditionDTO = conditionConverter.convertFilterCondition(conditionDTO);
            log.info("用户查询条件:{}", personaFilterConditionDTO);

            //获取画像用户匹配的用户ID最大最小值
            JsonResult<Long> accountMaxIdResult = personaApi.queryMaxIdByCondition(personaFilterConditionDTO);
            log.info("获取最大ID,result:{}", JsonUtil.object2Json(accountMaxIdResult));
            if (!accountMaxIdResult.getSuccess()) {
                log.info("获取最大ID失败,condition:{}", JsonUtil.object2Json(personaFilterConditionDTO));
                throw new BaseBizException(accountMaxIdResult.getErrorCode(), accountMaxIdResult.getErrorMessage());
            }
            JsonResult<Long> accountMinIdResult = personaApi.queryMinIdByCondition(personaFilterConditionDTO);
            log.info("获取最小ID,result:{}", JsonUtil.object2Json(accountMinIdResult));
            if (!accountMinIdResult.getSuccess()) {
                log.info("获取最小ID失败,condition:{}", JsonUtil.object2Json(personaFilterConditionDTO));
                throw new BaseBizException(accountMinIdResult.getErrorCode(), accountMinIdResult.getErrorMessage());
            }

            //需要执行推送的用户起始ID
            //注意:这是一个预估值,因为最小ID到最大ID中间会有很多不符合条件的用户
            //针对这些用户,需要在下一层的业务逻辑中,用选人条件过滤掉
            Long minUserId = accountMinIdResult.getData();
            Long maxUserId = accountMaxIdResult.getData();

            //bucket就是一个用户分片,对应的是一个startUserId -> endUserId,用户ID范围
            //可以根据一定的算法,把千万级用户推送任务分片,比如一个分片后的推送任务包含1000个用户/2000个用户
            //userBuckets就有上万条key-value对,每个key-value对就是一个startUserId -> endUserId的推送任务分片
            final int userBucketSize = 1000;
            Map<Long, Long> userBuckets = new LinkedHashMap<>();
            AtomicBoolean doSharding = new AtomicBoolean(true);
            long startUserId = minUserId;
            log.info("开始对任务人群进行分片,startId:{}",minUserId);
            while (doSharding.get()) {
                if ((maxUserId -minUserId) < userBucketSize) {
                    userBuckets.put(startUserId, maxUserId);
                    doShardingpareAndSet(true, false);
                    break;
                }
                userBuckets.put(startUserId, startUserId + userBucketSize);
                startUserId += userBucketSize;
                maxUserId -= userBucketSize;
            }

            //把可能成千上万的分片推送任务进行RocketMQ消息的batch合并,以batch模式的发送任务到MQ,减少跟RocketMQ网络通信的耗时
            List<String> hotProductPushTasks = new ArrayList<>();
            HotGoodsVO hotGoodsVO = buildHotGoodsVO(crontabDO);
            PlatformHotProductUserBucketMessage bucketMessage = PlatformHotProductUserBucketMessage.builder()
                .hotGoodsVO(JSON.toJSONString(hotGoodsVO))
                .personaFilterConditionDTO(JSON.toJSONString(personaFilterConditionDTO))
                .build();
            for (Map.Entry<Long, Long> userBucket : userBuckets.entrySet()) {
                bucketMessage.setEndUserId(userBucket.getValue());
                bucketMessage.setStartUserId(userBucket.getKey());

                String promotionPushTaskJSON = JsonUtil.object2Json(bucketMessage);
                log.info("用户桶构建侧选人条件:{}",bucketMessage.getPersonaFilterConditionDTO());
                hotProductPushTasks.add(promotionPushTaskJSON);
            }

            //MESSAGE_BATCH_SIZE指的是消息batch大小,RocketMQ的每个batch消息包含了100个推送任务
            //这样可以将1万个推送任务消息合并为100个batch消息,进行100次网络通信发给RocketMQ,可以大幅度降低发送消息的耗时
            ListSplitter splitter = new ListSplitter(hotProductPushTasks, MESSAGE_BATCH_SIZE);
            while (splitter.hasNext()) {
                List<String> sendBatch = splitter.next();
                log.info("本次批次消息数量,{}",sendBatch.size());
                sharedSendMsgThreadPool.execute(() -> {
                    defaultProducer.sendMessages(RocketMqConstant.PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_TOPIC, sendBatch, "平台热门商品定时任务用户桶消息");
                });
            }
        }
    }
}

(2)推送系统消费分片推送任务消息,发送具体用户的推送消息到MQ

监听MQ的如下Topic:

代码语言:javascript代码运行次数:0运行复制
PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制
@Configuration
public class ConsumerBeanConfig {
    ...
    @Bean("PlatFormHotProductUserBucketConsumer")
    public DefaultMQPushConsumer PlatFormHotProductUserBucketConsumer(PlatFormHotProductUserBucketListener platFormHotProductUserBucketListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_CONSUMER_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_TOPIC, "*");
        consumer.registerMessageListener(platFormHotProductUserBucketListener);
        consumer.start();
        return consumer;
    }
}

@Component
public class PlatFormHotProductUserBucketListener implements MessageListenerConcurrently {
    @DubboReference(version = "1.0.0")
    private PersonaApi personaApi;

    @Autowired
    private DefaultProducer producer;

    //公用的消息推送线程池
    @Autowired
    @Qualifier("sharedSendMsgThreadPool")
    private SafeThreadPool sharedSendMsgThreadPool;
    
    //并发消费消息
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {
            for (MessageExt messageExt : msgList) {
                String msg = new String(messageExt.getBody());
                PlatformHotProductUserBucketMessage hotProductMessagePushTask = JSON.parseObject(msg , PlatformHotProductUserBucketMessage.class);
                log.info("执行热门商品推送用户桶消息逻辑,消息内容:{}", hotProductMessagePushTask);

                //1.获取本次热门商品推送任务分片中的数据
                String hotGoodString = hotProductMessagePushTask.getHotGoodsVO();
                String personaFilterCondition = hotProductMessagePushTask.getPersonaFilterConditionDTO();
                HotGoodsVO hotGoodsVO = JSON.parseObject(hotGoodString, HotGoodsVO.class);
                log.info("选人条件,内容:{}", personaFilterCondition);
                if (Objects.isNull(personaFilterCondition) || Objects.isNull(hotGoodsVO)) {
                    continue;
                }

                PersonaFilterConditionDTO conditionDTO = JSON.parseObject(personaFilterCondition, PersonaFilterConditionDTO.class);
                Long startUserId = hotProductMessagePushTask.getStartUserId();
                Long endUserId = hotProductMessagePushTask.getEndUserId();

                //分页查询条件
                PersonaConditionWithIdRange page = PersonaConditionWithIdRange.builder()
                    .memberLevel(conditionDTO.getMemberLevel())
                    .memberPoint(conditionDTO.getMemberPoint())
                    .startId(startUserId)
                    .endId(endUserId)
                    .build();

                //2.查询本次推送任务分片对应的用户群体
                //注意:查询的时候,传入查询条件过滤掉不符合条件的用户id

                JsonResult<List<Long>> queryAccountIdsResult = personaApi.getAccountIdsByIdRange(page);
                List<Long> accountIds = queryAccountIdsResult.getData();

                PlatformHotProductMessage hotMessage = PlatformHotProductMessage.builder()
                    .goodsName(hotGoodsVO.getGoodsName())
                    .goodsDesc(hotGoodsVO.getGoodsDesc())
                    .keyWords(hotGoodsVO.getKeyWords())
                    .build();
                int handledBucketCount = 0;
                List<String> messages = new ArrayList<>();
                for (Long accountId : accountIds) {
                    handledBucketCount++;
                    hotMessage.setAccountId(accountId);
                    log.info("构造热门商品MQ消息, hotMessage: {}", hotMessage);
                    messages.add(JSON.toJSONString(hotMessage));
                }
                ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
                while (splitter.hasNext()) {
                    List<String> sendBatch = splitter.next();
                    log.info("本次批次消息数量,{}", sendBatch.size());
                    sharedSendMsgThreadPool.execute(() -> {
                        producer.sendMessages(RocketMqConstant.PLATFORM_HOT_PRODUCT_SEND_TOPIC, sendBatch, "平台热门商品定时任务用户桶消息");
                    });
                }
            }
        } catch (Exception e) {
            log.error("consume error,热门商品通知消费失败", e);
            //这边因为是推送任务,个别失败也可以直接丢弃
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

(3)推送系统消费具体用户的推送消息进行真正推送

监听MQ的如下Topic:

代码语言:javascript代码运行次数:0运行复制
PLATFORM_HOT_PRODUCT_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制
@Configuration
public class ConsumerBeanConfig {
    ...
    @Bean("platformHotProductSendTopicConsumer")
    public DefaultMQPushConsumer platformHotProductConsumer(PlatFormHotProductListener platFormHotProductListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_HOT_PRODUCT_SEND_CONSUMER_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(PLATFORM_HOT_PRODUCT_SEND_TOPIC, "*");
        consumer.registerMessageListener(platFormHotProductListener);
        consumer.start();
        return consumer;
    }
}

@Component
public class PlatFormHotProductListener implements MessageListenerConcurrently {
    //并发消费消息
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {
            for (MessageExt messageExt : msgList) {
                log.debug("执行平台发送通知消息逻辑,消息内容:{}", messageExt.getBody());
                String msg = new String(messageExt.getBody());
                HashMap hotProductMessage = JSON.parseObject(msg , HashMap.class);

                //推送通知
                informByPush(hotProductMessage);
            }
        } catch (Exception e) {
            log.error("consume error,平台优惠券消费失败", e);
            //本次消费失败,下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    //第三方平台推送消息到app
    private void informByPush(HashMap message){
        String messageBody = "速戳!精致小物件,"
            + message.get("keywords")+"!"
            + message.get("goodsName")
            + message.get("goodsDesc");
        log.info("消息推送中:消息内容:{}", messageBody);
    }
}

14.生产环境百万级用户PUSH全链路压测

百万级用户Push生产环境的部署:营销系统部署了5台2核4G机器、推送系统部署了5台2核4G机器、会员系统1台、画像系统1台。

假设会员系统⼀共有150w的⽤户数据,现在开启⼀次全员推送通知的优惠活动,总计MQ的数据量⼤概就是150w。开启⼀个全员活动的时间,⼤概⽤时27分钟左右,即完成了全部消息的推送,整体效率还是算⽐较⾼的。所以如果是千万级别的推送,⼤概也就是需要27 * 5⼤概是3个⼩时左右,和我们的预期是⽐较相似的。

150万的用户推送任务,按1000用户进行分片,那么总共会有1500个分片任务,每个分片任务需处理1000条消息。这1500个人分片任务通过batch合并发送到MQ,也非常快,假设每100个分片合并成一个batch,才15个batch消息。5台推送机器,每台机器开启30个线程,那么总共有150个线程。假设一个线程完成一条消息的推送需要200ms,那么每个线程每秒能推送5条消息,5台机器每秒能推送750条消息。150万 / 750 = 2000s = 30分钟,这就是通过计算预估的结果,和实际的27分钟相差不大。

由于有些场景下,这种全员性质的消息推送,是不需要接收推送结果的。如果直接放弃推送结果的获取操作,效率还能稍微有所提升。

本文标签: RocketMQ实战10营销系统代码优化二