admin管理员组文章数量:1030001
RocketMQ实战—10.营销系统代码优化二
大纲
1.营销系统引入MQ实现异步化来进行性能优化
2.基于MQ释放优惠券提升系统扩展性
3.基于Redis实现重复促销活动去重
4.基于促销活动创建事件实现异步化
5.推送任务分片和分片消息batch合并发送实现
6.推送系统与用户群体查询逻辑解耦
7.查询用户数据以及批量发送推送消息
8.线程池封装以及推送系统多线程推送
9.推送系统的千万级消息多线程推送
10.千万级用户惰性发券代码实现
11.指定用户群体发券的代码实现
12.分片消息的batch合并算法重构实现
13.百万画像群体爆款商品推送代码实现
14.生产环境百万级用户PUSH全链路压测
10.千万级用户惰性发券代码实现
(1)给全量用户发放优惠券的初版实现
(2)给全量用户惰性发放优惠券的优化实现
(1)给全量用户发放优惠券的初版实现
首先营销系统对全量用户发放优惠券的任务进行分片,然后将分片的消息发送到如下Topic。
代码语言:javascript代码运行次数:0运行复制PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC
代码语言:javascript代码运行次数:0运行复制@RestController
@RequestMapping("/demo/promotion/coupon")
public class PromotionCouponController {
//优惠活动service
@Autowired
private CouponService couponService;
//新增一个优惠券活动
@PostMapping
public JsonResult<SaveOrUpdateCouponDTO> saveOrUpdateCoupon(@RequestBody SaveOrUpdateCouponRequest request) {
try {
log.info("新增一条优惠券:{}", JSON.toJSONString(request));
SaveOrUpdateCouponDTO dto = couponService.saveOrUpdateCoupon(request);
return JsonResult.buildSuccess(dto);
} catch (BaseBizException e) {
log.error("biz error: request={}", JSON.toJSONString(request), e);
return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
} catch (Exception e) {
log.error("system error: request={}", JSON.toJSONString(request), e);
return JsonResult.buildError(e.getMessage());
}
}
...
}
//优惠券接口实现
@Service
public class CouponServiceImpl implements CouponService {
...
//保存/修改优惠券活动方法
@Transactional(rollbackFor = Exception.class)
@Override
public SaveOrUpdateCouponDTO saveOrUpdateCoupon(SaveOrUpdateCouponRequest request) {
SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(request);
couponDO.setCouponReceivedCount(0);
salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO);
//为所有用户发放优惠券
sendPlatformCouponMessage(couponDO);
SaveOrUpdateCouponDTO dto = new SaveOrUpdateCouponDTO();
dto.setCouponName(request.getCouponName());
dto.setRule(request.getCouponRule());
dto.setSuccess(true);
return dto;
}
...
//为所有用户发放优惠券
private void sendPlatformCouponMessage(SalesPromotionCouponDO promotionCouponDO) {
//桶的大小
final int userBucketSize = 1000;
final int messageBatchSize = 100;
//1.查询出库里面最大的userId,作为用户的总数量
JsonResult<Long> maxUserIdJsonResult = accountApi.queryMaxUserId();
if (maxUserIdJsonResult.getSuccess()) {
throw new BaseBizException(maxUserIdJsonResult.getErrorCode(), maxUserIdJsonResult.getErrorMessage());
}
Long maxUserId = maxUserIdJsonResult.getData();
//2.分成m个桶,每个桶里面有n个用户,每个桶发送一条"批量发送优惠券用户桶消息"
Map<Long, Long> userBuckets = new LinkedHashMap<>();
AtomicBoolean flagRef = new AtomicBoolean(true);
long startUserId = 1L;
while (flagRef.get()) {
if (startUserId > maxUserId) {
flagRefpareAndSet(true, false);
}
userBuckets.put(startUserId, startUserId + userBucketSize);
startUserId += userBucketSize;
}
//3.批量发送消息
//例:userBucketCount = 1000; messageBatchSize = 100
//批量发送次数 = 10次,经过两次分桶,这里发送消息的次数从100w次降到10次
int handledBucketCount = 0;
List<String> jsonMessageBatch = new ArrayList<>(messageBatchSize);
for (Map.Entry<Long, Long> userBucket : userBuckets.entrySet()) {
handledBucketCount++;
PlatformCouponUserBucketMessage message = PlatformCouponUserBucketMessage.builder()
.startUserId(userBucket.getKey())
.endUserId(userBucket.getValue())
.informType(promotionCouponDO.getInformType())
.couponId(promotionCouponDO.getId())
.activityStartTime(promotionCouponDO.getActivityStartTime())
.activityEndTime(promotionCouponDO.getActivityEndTime())
.couponType(promotionCouponDO.getCouponType())
.build();
String jsonMessage = JsonUtil.object2Json(message);
jsonMessageBatch.add(jsonMessage);
if (jsonMessageBatch.size() == messageBatchSize || handledBucketCount == userBuckets.size()) {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC, jsonMessageBatch, "平台发放优惠券用户桶消息");
jsonMessageBatch.clear();
}
}
}
}
接着营销系统监听如下Topic消费分片后的发放优惠券任务。
代码语言:javascript代码运行次数:0运行复制PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC
此时有两种处理方法:一.直接使用线程池进行发送发放优惠券消息到MQ。二.合并batch后再使用线程池发送MQ。
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
//平台发放优惠券用户桶消费者
@Bean("platformCouponUserBucketReceiveTopicConsumer")
public DefaultMQPushConsumer receiveCouponUserBucketConsumer(@Qualifier("platformCouponUserBucketReceiveTopicConsumer")PlatFormCouponUserBucketListener platFormCouponUserBucketListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_COUPON_SEND_USER_BUCKET_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC, "*");
consumer.registerMessageListener(platFormCouponUserBucketListener);
consumer.start();
return consumer;
}
}
@Component
public class PlatFormCouponUserBucketListener implements MessageListenerConcurrently {
//账户服务
@DubboReference(version = "1.0.0")
private AccountApi accountApi;
//发送消息共用的线程池
@Autowired
@Qualifier("sharedSendMsgThreadPool")
private SafeThreadPool sharedSendMsgThreadPool;
//RocketMQ生产者
@Autowired
private DefaultProducer defaultProducer;
//并发消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgList) {
//1.反序列化消息
String messageString = new String(messageExt.getBody());
log.debug("执行平台发送优惠券用户桶消息逻辑,消息内容:{}", messageString);
PlatformCouponUserBucketMessage message = JSON.parseObject(messageString, PlatformCouponUserBucketMessage.class);
//2.查询桶内的用户信息
Long startUserId = message.getStartUserId();
Long endUserId = message.getEndUserId();
JsonResult<List<MembershipAccountDTO>> accountBucketResult = accountApi.queryAccountByIdRange(startUserId, endUserId);
if (!accountBucketResult.getSuccess()) {
throw new BaseBizException(accountBucketResult.getErrorCode(), accountBucketResult.getErrorMessage());
}
List<MembershipAccountDTO> accountBucket = accountBucketResult.getData();
if (CollectionUtils.isEmpty(accountBucket)) {
log.info("根据用户桶内的id范围没有查询到用户, startUserId={}, endUserId{}", startUserId, endUserId);
continue;
}
//3.每个用户发送一条"平台发送优惠券消息"
//方法一:直接使用线程池进行发送发放优惠券消息到MQ;
//这里是并行消费的,以上逻辑已经是并行执行的了,而且有查库的操作
//accountBucket 默认是 1000 个用户,要为每一个用户都发送一条"平台发送优惠券消息",也就是1000条消息
//下面我们使用线程池来并行发送这1000条消息(ps:另一种也可以像发送优惠券用户桶消息一样用批量发送)
PlatformCouponMessage couponMessage = PlatformCouponMessage.builder()
.couponId(message.getCouponId())
.activityStartTime(message.getActivityStartTime())
.activityEndTime(message.getActivityEndTime())
.couponType(message.getCouponType())
.build();
for (MembershipAccountDTO account : accountBucket) {
sharedSendMsgThreadPool.execute(() -> {
couponMessage.setUserAccountId(account.getId());
String jsonMessage = JSON.toJSONString(couponMessage);
defaultProducer.sendMessage(RocketMqConstant.PLATFORM_COUPON_SEND_TOPIC, jsonMessage, "平台发送优惠券消息");
});
}
//方法二:合并batch后再使用线程池发送MQ
/*List<String> messages = new ArrayList<>(100);
for (MembershipAccountDTO account : accountBucket) {
couponMessage.setUserAccountId(account.getId());
messages.add(JSON.toJSONString(couponMessage));
if (messages.size() == 100) {
sharedSendMsgThreadPool.execute(() -> {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_COUPON_SEND_TOPIC, messages, "平台发送优惠券消息");
});
messages.clear();
}
}
//最后剩下的也批量发出
if (!CollectionUtils.isEmpty(messages)) {
sharedSendMsgThreadPool.execute(() -> {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, messages, "平台发送促销活动消息");
});
messages.clear();
}*/
}
} catch (Exception e){
log.error("consume error,平台优惠券消费失败", e);
//本次消费失败,下次重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
最后营销系统监听如下Topic,然后通过线程池对每个用户发放优惠券。
代码语言:javascript代码运行次数:0运行复制PLATFORM_PROMOTION_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
//平台发放优惠券领取消费者
@Bean("platformCouponReceiveTopicConsumer")
public DefaultMQPushConsumer receiveCouponConsumer(PlatFormCouponListener platFormCouponListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_COUPON_SEND_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(PLATFORM_COUPON_SEND_TOPIC, "*");
consumer.registerMessageListener(platFormCouponListener);
consumer.start();
return consumer;
}
}
@Component
public class PlatFormCouponListener implements MessageListenerConcurrently {
//优惠券服务service
@Autowired
private CouponItemService couponItemService;
//测试completableFuture使用commonPool的时是不需要初始化业务ThreadPoolExecutor的
//这里用supplier懒加载让测试completableFuture使用commonPool时不要初始化线程池
//只有当使用completableFuture使用自定义的线程时才初始化线程池
private static final int PERMITS = 30;
private static final AtomicBoolean initializedRef = new AtomicBoolean(false);
private static ThreadPoolExecutor THREAD_POOL_EXECUTOR = null;
private static final Supplier<ThreadPoolExecutor> THREAD_POOL_EXECUTOR_SUPPLIER = () -> {
if (initializedRefpareAndSet(false, true)) {
THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(PERMITS, PERMITS * 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), NamedDaemonThreadFactory.getInstance("consumeCouponMsg"), new ThreadPoolExecutor.CallerRunsPolicy());
}
return THREAD_POOL_EXECUTOR;
};
//并发消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
try {
//方式一:使用默认的commonPool来处理任务
//supplyAsync(Supplier<U> supplier) API
//默认使用的是 ForkJoinPoolmonPool() 这个线程池
//该线程池在jvm内是唯一的,默认的线程数量是cpu的核数减1
//如果觉得线程数不够用可以通过jvm系统参数 java.util.concurrent.ForkJoinPoolmon.parallelism 的值调整commonPool的并行度,或者采用方式二
List<CompletableFuture<SalesPromotionCouponItemDTO>> futureList = msgList.stream()
.map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e)))
.collect(Collectors.toList());
//方式二:使用自定的业务线程池来处理任务
//List<CompletableFuture<SalesPromotionCouponItemDTO>> futureList = msgList.stream()
// .map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e), THREAD_POOL_EXECUTOR_SUPPLIER.get()))
// .collect(Collectors.toList());
List<SalesPromotionCouponItemDTO> couponItemDTOList = futureList.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
//优惠券保存到数据库
couponItemService.saveCouponBatch(couponItemDTOList);
} catch (Exception e) {
log.error("consume error,平台优惠券消费失败", e);
//本次消费失败,下次重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
public SalesPromotionCouponItemDTO handleMessageExt(MessageExt messageExt) {
log.debug("执行平台发放优惠券消费消息逻辑,消息内容:{}", messageExt.getBody());
String msg = new String(messageExt.getBody());
PlatformCouponMessage platformCouponMessage = JSON.parseObject(msg , PlatformCouponMessage.class);
log.info("开始发放平台优惠券,couponId:{}", platformCouponMessage.getCouponId());
//幂等逻辑防止重复消费
JsonResult<Long> result = couponItemService.selectByAccountIdAndCouponId(platformCouponMessage.getUserAccountId(), platformCouponMessage.getCouponId());
//如果已经存在,直接跳过循环,不再执行优惠券保存操作
if (result.getSuccess()) {
return null;
}
SalesPromotionCouponItemDTO itemDTO = new SalesPromotionCouponItemDTO();
itemDTO.setCouponId(platformCouponMessage.getCouponId());
itemDTO.setCouponType(platformCouponMessage.getCouponType());
itemDTO.setUserAccountId(platformCouponMessage.getUserAccountId());
itemDTO.setIsUsed(0);
itemDTO.setActivityStartTime(platformCouponMessage.getActivityStartTime());
itemDTO.setActivityEndTime(platformCouponMessage.getActivityEndTime());
return itemDTO;
}
}
(2)给全量用户惰性发放优惠券的优化实现
一.首先需要配置好要使用的Redis相关Bean
代码语言:javascript代码运行次数:0运行复制@Data
@Configuration
@ConditionalOnClass(RedisConnectionFactory.class)
public class RedisConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.timeout}")
private int timeout;
@Bean
@ConditionalOnClass(RedisConnectionFactory.class)
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setDefaultSerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
@ConditionalOnClass(RedissonClient.class)
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + host + ":" + port)
.setPassword(password)
.setConnectionMinimumIdleSize(10)
.setConnectionPoolSize(100)
.setIdleConnectionTimeout(600000)
.setSubscriptionConnectionMinimumIdleSize(10)
.setSubscriptionConnectionPoolSize(100)
.setTimeout(timeout);
config.setCodec(new StringCodec());
config.setThreads(5);
config.setNettyThreads(5);
RedissonClient client = Redisson.create(config);
return client;
}
@Bean
@ConditionalOnClass(RedisConnectionFactory.class)
public RedisCache redisCache(RedisTemplate redisTemplate) {
return new RedisCache(redisTemplate);
}
}
二.然后实现基于Redisson分布式锁维护优惠券缓存列表,以及惰性优惠券过期缓存清理
其实就是新增优惠券时,将优惠券信息写入Redis,并检查优惠券是否过期,如果过期就进行删除。
代码语言:javascript代码运行次数:0运行复制//优惠券接口实现
@Service
public class CouponServiceImpl implements CouponService {
//Redis客户端工具
@Autowired
private RedisCache redisCache;
@Autowired
private RedissonClient redissonClient;
...
//保存/修改优惠券活动方法
@Transactional(rollbackFor = Exception.class)
@Override
public SaveOrUpdateCouponDTO saveOrUpdateCoupon(SaveOrUpdateCouponRequest request) {
SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(request);
couponDO.setCouponReceivedCount(0);
salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO);
//判断优惠券类型
if (CouponSendTypeEnum.PLATFORM_SEND.getCode().equals(request.getCouponReceiveType())) {
//一.如果是系统发放类型,则针对所有用户,发送优惠券到MQ
writeCouponToRedis(couponDO);
} else {
//二.如果是自己领取类型
//TODO
}
SaveOrUpdateCouponDTO dto = new SaveOrUpdateCouponDTO();
dto.setCouponName(request.getCouponName());
dto.setRule(request.getCouponRule());
dto.setSuccess(true);
return dto;
}
private void writeCouponToRedis(SalesPromotionCouponDO coupon) {
//首先需要用Redisson基于Redis做一个分布式锁的加锁PROMOTION_COUPON_ID_LIST_LOCK
//再去维护一个"一共发出去了多少张券"的数据结构PROMOTION_COUPON_ID_LIST
RLock lock = redissonClient.getLock(RedisKey.PROMOTION_COUPON_ID_LIST_LOCK);
try {
//进行加锁,超时时间为60s释放
lock.lock(60, TimeUnit.SECONDS);
List<Long> couponIds = null;
String couponIdsJSON = redisCache.get(RedisKey.PROMOTION_COUPON_ID_LIST);
if (couponIdsJSON == null || couponIdsJSON.equals("")) {
couponIds = new ArrayList<>();
} else {
couponIds = JSON.parseObject(couponIdsJSON, List.class);
}
//检查每个优惠券时间是否过期了,如果过期或者已经发完了券,则把它从List里删除,以及从Redis里删除
//如果是全量发券,则不会发完,因此可以给所有人发,如果超过了时间才不能发券
if (couponIds.size() > 0) {
Iterator<Long> couponIdIterator = couponIds.iterator();
while (couponIdIterator.hasNext()) {
Long tempCouponId = couponIdIterator.next();
String tempCouponJSON = redisCache.get(RedisKey.PROMOTION_COUPON_KEY + "::" + tempCouponId);
SalesPromotionCouponDO tempCoupon = JSON.parseObject(tempCouponJSON, SalesPromotionCouponDO.class);
Date now = new Date();
if (now.after(tempCoupon.getActivityEndTime())) {
couponIdIterator.remove();
redisCache.delete(RedisKey.PROMOTION_COUPON_KEY + "::" + tempCouponId);
}
}
}
couponIds.add(coupon.getId());
couponIdsJSON = JsonUtil.object2Json(couponIds);
redisCache.set(RedisKey.PROMOTION_COUPON_ID_LIST, couponIdsJSON, -1);
String couponJSON = JsonUtil.object2Json(coupon);
redisCache.set(RedisKey.PROMOTION_COUPON_KEY + "::" + coupon.getId(), couponJSON, -1);
} finally {
lock.unlock();
}
}
}
三.接着实现会员系统发布用户登录事件 + 营销系统在用户登录后的惰性发券
会员系统在用户登录时会发送消息到MQ的USER_LOGINED_EVENT_TOPIC:
代码语言:javascript代码运行次数:0运行复制@RestController
@RequestMapping("/demo/membership")
public class MembershipController {
@Autowired
private DefaultProducer defaultProducer;
@Autowired
private MembershipAccountService accountService;
//触发用户登录
@PostMapping("/triggerUserLoginEvent")
public JsonResult<Boolean> triggerUserLoginEvent(Long accountId) {
try {
List<MembershipAccountDTO> accounts = accountService.queryAccountByIdRange(accountId, accountId);
if (accounts != null && accounts.size() > 0) {
MembershipAccountDTO account = accounts.get(0);
UserLoginedEvent userLoginedEvent = new UserLoginedEvent();
userLoginedEvent.setAccount(account);
String userLoginedEventJSON = JsonUtil.object2Json(userLoginedEvent);
defaultProducer.sendMessage(RocketMqConstant.USER_LOGINED_EVENT_TOPIC, userLoginedEventJSON, "用户登录事件发生了");
}
return JsonResult.buildSuccess(true);
} catch (BaseBizException e) {
log.error("biz error: request={}", accountId, e);
return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
} catch (Exception e) {
log.error("system error: request={}", accountId, e);
return JsonResult.buildError(e.getMessage());
}
}
}
营销系统监听USER_LOGINED_EVENT_TOPIC对用户登录时发送的登录事件消息进行惰性发券:
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
@Bean("userLoginedEventListener")
public DefaultMQPushConsumer userLoginedEventListener(UserLoginedEventListener userLoginedEventListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(USER_LOGINED_EVENT_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(USER_LOGINED_EVENT_TOPIC, "*");
consumer.registerMessageListener(userLoginedEventListener);
consumer.start();
return consumer;
}
}
//用户登录事件监听器
@Component
public class UserLoginedEventListener implements MessageListenerConcurrently {
@Autowired
private RedisCache redisCache;
@Autowired
private SalesPromotionCouponItemDAO couponItemDAO;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
//从Redis缓存里查询所有的优惠券
String couponIdsJSON = redisCache.get(RedisKey.PROMOTION_COUPON_ID_LIST);
List<Long> couponIds = JSON.parseObject(couponIdsJSON, List.class);
List<SalesPromotionCouponDO> coupons = new ArrayList<>();
for (Long couponId : couponIds) {
String couponJSON = redisCache.get(RedisKey.PROMOTION_COUPON_KEY + "::" + couponId);
SalesPromotionCouponDO coupon = JSON.parseObject(couponJSON, SalesPromotionCouponDO.class);
Date now = new Date();
if (now.after(coupon.getActivityStartTime()) && now.before(coupon.getActivityEndTime())) {
coupons.add(coupon);
}
}
for (MessageExt messageExt : list) {
//这个代码就可以拿到一个刚刚登陆成功的用户
String message = new String(messageExt.getBody());
UserLoginedEvent userLoginedEvent = JSON.parseObject(message, UserLoginedEvent.class);
MembershipAccountDTO account = userLoginedEvent.getAccount();
//遍历每一个优惠券,检查这个优惠券是否有效,是否还可以继续进行发券,以及当前用户是否发过券,然后才给用户进行发券
for (SalesPromotionCouponDO coupon : coupons) {
String receiveCouponFlag = redisCache.get(RedisKey.PROMOTION_USER_RECEIVE_COUPON + "::" + account.getId() + "::" + coupon.getId());
if (receiveCouponFlag == null || receiveCouponFlag.equals("")) {
SalesPromotionCouponItemDO couponItem = new SalesPromotionCouponItemDO();
couponItem.setActivityEndTime(coupon.getActivityEndTime());
couponItem.setActivityStartTime(coupon.getActivityStartTime());
couponItem.setCouponId(coupon.getId());
couponItem.setCouponType(coupon.getCouponType());
couponItem.setCreateTime(new Date());
couponItem.setCreateUser(account.getId());
couponItem.setIsUsed(0);
couponItem.setUpdateTime(new Date());
couponItem.setUpdateUser(account.getId());
couponItem.setUserAccountId(account.getId());
couponItemDAO.receiveCoupon(couponItem);
redisCache.set(RedisKey.PROMOTION_USER_RECEIVE_COUPON + "::" + account.getId() + "::" + coupon.getId(), "true", -1);
}
}
}
} catch(Exception e) {
log.error("consume error, 用户登录事件处理异常", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
11.指定用户群体发券的代码实现
(1)千万级用户推送和发放优惠券的方案总结
(2)指定用户群体发券实现
(1)千万级用户推送和发放优惠券的方案总结
创建促销活动和发放优惠券时,都不是直接进行推送和发放的,而是利用RocketMQ进行多次中转、异步化处理。
营销系统首先会查出用户总数,然后进行任务分片,接着把分片任务消息通过batch合并发送到MQ(异步化提升性能)。
营销系统会消费这些分片任务消息,并查询出用户和封装好每个用户的消息,然后发到MQ(解耦会员系统和推送系统)。
推送系统最后会消费每个用户的推送消息,并基于线程池采用多线程并发进行推送。
(2)指定用户群体发券实现
典型的例子就是为激活百万不活跃用户发放优惠券。
一.营销系统创建指定用户群体发券的入口代码
代码语言:javascript代码运行次数:0运行复制@RestController
@RequestMapping("/demo/promotion/coupon")
public class PromotionCouponController {
...
@RequestMapping("/send")
public JsonResult<SendCouponDTO> sendCouponByConditions(@RequestBody SendCouponRequest request) {
try {
log.info("发送优惠券给指定用户群体:{}", JSON.toJSONString(request));
SendCouponDTO dto = couponService.sendCouponByConditions(request);
return JsonResult.buildSuccess(dto);
} catch (BaseBizException e) {
log.error("biz error: request={}", JSON.toJSONString(request), e);
return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
} catch (Exception e) {
log.error("system error: request={}", JSON.toJSONString(request), e);
return JsonResult.buildError(e.getMessage());
}
}
}
@Service
public class CouponServiceImpl implements CouponService {
@DubboReference(version = "1.0.0")
private MessagePushApi messagePushApi;
...
@Transactional(rollbackFor = Exception.class)
@Override
public SendCouponDTO sendCouponByConditions(SendCouponRequest sendCouponRequest) {
//保存优惠券信息
SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(sendCouponRequest);
couponDO.setCouponReceivedCount(0);
couponDO.setCouponStatus(CouponStatusEnum.NORMAL.getCode());
couponDO.setCouponReceiveType(CouponSendTypeEnum.SELF_RECEIVE.getCode());
salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO);
//分片和批量发送发放优惠券消息
shardBatchSendCouponMessage(sendCouponRequest);
SendCouponDTO sendCouponDTO = new SendCouponDTO();
sendCouponDTO.setSuccess(Boolean.TRUE);
sendCouponDTO.setCouponName(sendCouponRequest.getCouponName());
sendCouponDTO.setRule(sendCouponRequest.getCouponRule());
//TODO 发放数量
sendCouponDTO.setSendCount(0);
return sendCouponDTO;
}
}
二.营销系统分片和批量发送发券消息的代码
其中要去画像系统获取用户信息,然后根据用户数量进行分片,接着进行批量发送,发送到MQ的如下Topic。
代码语言:javascript代码运行次数:0运行复制PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC
代码语言:javascript代码运行次数:0运行复制@Service
public class CouponServiceImpl implements CouponService {
...
//分片和批量发送发放优惠券消息
private void shardBatchSendCouponMessage(SendCouponRequest sendCouponRequest) {
//1.到画像系统获取当前条件下的count值
MembershipFilterDTO membershipFilterDTO = sendCouponRequest.getMembershipFilterDTO();
PersonaFilterConditionDTO conditionDTO = conditionConverter.convertFilterCondition(membershipFilterDTO);
JsonResult<Integer> countResult = personaApi.countByCondition(conditionDTO);
if (!countResult.getSuccess()) {
throw new BaseBizException(countResult.getErrorCode(), countResult.getErrorMessage());
}
//2.根据count值分片
//分成m个分片,每个分片中包含:(1)分片ID;(2)用户个数;
//例:maxUserId = 100w; userBucketSize=1000
//userBucket1 = [1, 1000)
//userBucket2 = [2, 1000)
//userBucket2 = [n, 756),最后一个分片可能数量不足1000
//userBucketCount = 1000
Integer count = countResult.getData();
Map<Integer, Integer> userBuckets = new LinkedHashMap<>();
AtomicBoolean flagRef = new AtomicBoolean(true);
Integer shardId = 1;
while (flagRef.get()) {
if (USER_BUCKET_SIZE > count) {
userBuckets.put(shardId, USER_BUCKET_SIZE);
flagRefpareAndSet(true, false);
}
userBuckets.put(shardId, USER_BUCKET_SIZE);
shardId += 1;
count -= USER_BUCKET_SIZE;
}
//3.批量发送消息
//例:userBucketCount = 1000; messageBatchSize = 100
List<String> messages = new ArrayList<>();
PlatformPromotionConditionUserBucketMessage message = PlatformPromotionConditionUserBucketMessage.builder().personaFilterCondition(JSON.toJSONString(conditionDTO)).build();
for (Map.Entry<Integer, Integer> userBucket : userBuckets.entrySet()) {
message.setShardId(userBucket.getKey());
message.setBucketSize(userBucket.getValue());
String jsonMessage = JsonUtil.object2Json(message);
messages.add(jsonMessage);
}
log.info("本次推送消息数量,{}",messages.size());
ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
while (splitter.hasNext()) {
List<String> sendBatch = splitter.next();
log.info("本次批次消息数量,{}",sendBatch.size());
sharedSendMsgThreadPool.execute(() -> {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC, sendBatch, "部分用户优惠活动用户桶消息");
});
}
}
}
三.营销系统对指定用户群体分片消息的处理和推送代码
首先,营销系统会消费MQ的如下Topic:
代码语言:javascript代码运行次数:0运行复制PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC
然后,营销系统会把领取优惠券的消息会发送到MQ的如下Topic:
代码语言:javascript代码运行次数:0运行复制PLATFORM_CONDITION_COUPON_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
@Bean("platFormConditionCouponUserBucketConsumer")
public DefaultMQPushConsumer platFormConditionCouponUserBucketConsumer(PlatFormConditionCouponUserBucketListener platFormPromotionUserBucketListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC, "*");
consumer.registerMessageListener(platFormPromotionUserBucketListener);
consumer.start();
return consumer;
}
}
@Component
public class PlatFormConditionCouponUserBucketListener implements MessageListenerConcurrently {
//用户画像服务
@DubboReference(version = "1.0.0")
private PersonaApi personaApi;
//发送消息共用的线程池
@Autowired
@Qualifier("sharedSendMsgThreadPool")
private SafeThreadPool sharedSendMsgThreadPool;
//RocketMQ生产者
@Autowired
private DefaultProducer defaultProducer;
//并发消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgList) {
//1.反序列化消息
String messageString = new String(messageExt.getBody());
log.debug("部分用户领取优惠券用户桶消息逻辑,消息内容:{}", messageString);
PlatformPromotionConditionUserBucketMessage message = JSON.parseObject(messageString, PlatformPromotionConditionUserBucketMessage.class);
//2.查询桶内的用户信息
Integer shardId = message.getShardId();
//根据分片id,和分片数量大小,计算出本次分片的起始userId
Long startUserId = (shardId.longValue() - 1) * 1000;
Integer bucketSize = message.getBucketSize();
String personaFilterCondition = message.getPersonaFilterCondition();
PersonaFilterConditionDTO personaFilterConditionDTO = JSON.parseObject(personaFilterCondition, PersonaFilterConditionDTO.class);
//封装查询用户id的条件
PersonaConditionPage page = PersonaConditionPage.builder()
.memberPoint(personaFilterConditionDTO.getMemberPoint())
.memberLevel(personaFilterConditionDTO.getMemberLevel())
.offset(startUserId)
.limit(bucketSize)
.build();
//从用户画像系统查询用户账号id
JsonResult<List<Long>> accountIdsResult = personaApi.getAccountIdsByIdLimit(page);
if (!accountIdsResult.getSuccess()) {
throw new BaseBizException(accountIdsResult.getErrorCode(), accountIdsResult.getErrorMessage());
}
List<Long> accountIds = accountIdsResult.getData();
if (CollectionUtils.isEmpty(accountIds)) {
log.info("根据用户桶内的分片信息没有查询到用户, shardId={}", shardId);
continue;
}
//3.每个用户发送一条领取优惠券的消息通知
PlatformMessagePushMessage pushMessage = PlatformMessagePushMessage.builder()
.message("恭喜您获得优惠券领取资格,点击www.wjunt进入活动页面")
.mainMessage("获得优惠券领取资格")
.informType(InformTypeEnum.APP.getCode())
.build();
List<String> messages = new ArrayList<>();
for (Long accountId : accountIds) {
pushMessage.setUserAccountId(accountId);
messages.add(JSON.toJSONString(pushMessage));
}
log.info("本次推送消息数量,{}",messages.size());
ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
while (splitter.hasNext()) {
List<String> sendBatch = splitter.next();
sharedSendMsgThreadPool.execute(() -> {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_CONDITION_COUPON_SEND_TOPIC, sendBatch, "平台发送优惠券消息");
});
}
}
} catch (Exception e){
log.error("consume error,消费失败", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
四.推送系统会监听如下Topic消费消息推送领取优惠券的消息
代码语言:javascript代码运行次数:0运行复制PLATFORM_CONDITION_COUPON_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
@Bean("platFormConditionCouponConsumer")
public DefaultMQPushConsumer platFormConditionCouponConsumer(PlatFormConditionCouponListener platFormPromotionListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_CONDITION_COUPON_SEND_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(PLATFORM_CONDITION_COUPON_SEND_TOPIC, "*");
consumer.registerMessageListener(platFormPromotionListener);
consumer.start();
return consumer;
}
}
@Component
public class PlatFormConditionCouponListener implements MessageListenerConcurrently {
//消息推送工厂提供者
@Autowired
private FactoryProducer factoryProducer;
@Autowired
private RedisTemplate<String, String> redisTemplate;
//并发消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
try {
//方式二:使用自定的业务线程池来处理任务
List<CompletableFuture<AltResult>> futureList = msgList.stream()
.map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e)))
.collect(Collectors.toList());
List<Throwable> resultList = futureList.stream()
.map(CompletableFuture::join)
.filter(e -> e.ex != null)
.map(e -> e.ex)
.collect(Collectors.toList());
} catch (Exception e) {
log.error("consume error,消费失败", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private AltResult handleMessageExt(MessageExt messageExt) {
try {
log.debug("执行平台发送通知消息逻辑,消息内容:{}", messageExt.getBody());
String msg = new String(messageExt.getBody());
PlatformMessagePushMessage message = JSON.parseObject(msg , PlatformMessagePushMessage.class);
//幂等控制
if (StringUtils.isNotBlank(redisTemplate.opsForValue().get(PROMOTION_CONDITION_COUPON_KEY + message.getUserAccountId()))) {
return new AltResult(null);
}
//获取消息服务工厂
MessageSendServiceFactory messageSendServiceFactory = factoryProducer.getMessageSendServiceFactory(message.getInformType());
//消息发送服务组件
MessageSendService messageSendService = messageSendServiceFactory.createMessageSendService();
//构造消息
PlatformMessagePushMessage messagePushMessage = PlatformMessagePushMessage.builder()
.informType(message.getInformType())
.mainMessage(message.getMainMessage())
.userAccountId(message.getUserAccountId())
.message(message.getMessage())
.build();
MessageSendDTO messageSendDTO = messageSendServiceFactory.createMessageSendDTO(messagePushMessage);
messageSendService.send(messageSendDTO);
//发送成功之后把已经发送成功记录到redis
redisTemplate.opsForValue().set(PROMOTION_CONDITION_COUPON_KEY + message.getUserAccountId(), UUID.randomUUID().toString());
log.info("消息推送完成,messageSendDTO:{}", messageSendDTO);
return new AltResult(null);
} catch (Exception e) {
return new AltResult(e);
}
}
//completableFuture的返回结果,适用于无返回值的情况
//ex字段为null表示任务执行成功
//ex字段不为null表示任务执行失败,并把异常设置为ex字段
private static class AltResult {
final Throwable ex;
public AltResult(Throwable ex) {
this.ex = ex;
}
}
}
12.分片消息的batch合并算法重构实现
之前的batch合并是按照条数进行合并的,现在重构为按照合并后的大小不超过800KB和不超过100条进行合并。
代码语言:javascript代码运行次数:0运行复制public class ListSplitter implements Iterator<List<String>> {
//设置每一个batch最多不超过800k,因为RocketMQ官方推荐,一条消息不建议长度超过1MB
//而封装一个RocketMQ的message,包括了MessageBody, Topic,Addr等数据,所以设置小一点
private int sizeLimit = 800 * 1024;
private final List<String> messages;
private int currIndex;
private int batchSize = 100;
public ListSplitter(List<String> messages, Integer batchSize) {
this.messages = messages;
this.batchSize = batchSize;
}
public ListSplitter(List<String> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
//每次从list中取一部分
@Override
public List<String> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
String message = messages.get(nextIndex);
//获取每条message的长度
int tmpSize = message.length();
if (tmpSize > sizeLimit) {
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
if (tmpSize + totalSize > sizeLimit || (nextIndex - currIndex) == batchSize ) {
break;
} else {
totalSize += tmpSize;
}
}
List<String> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Not allowed to remove");
}
}
13.百万画像群体爆款商品推送代码实现
(1)营销系统的定时推送任务
(2)推送系统消费分片推送任务消息,发送具体用户的推送消息到MQ
(3)推送系统消费具体用户的推送消息进行真正推送
(1)营销系统的定时推送任务
代码语言:javascript代码运行次数:0运行复制@Component
public class ScheduleSendMessageJobHandler {
...
//执行定时任务,筛选热门商品和用户发送给MQ
@XxlJob("hotGoodsPushHandler")
public void hotGoodsPushHandler() {
log.info("hotGoodsPushHandler 开始执行");
//获取热门商品和用户画像,业务先简化为一对一关系
List<HotGoodsCrontabDO> crontabDOs = hotGoodsCrontabDAO.queryHotGoodsCrontabByCrontabDate(new Date());
log.info("获取热门商品和用户画像数据, crontabDOs:{}", JsonUtil.object2Json(crontabDOs));
//找出每个热门商品对应画像所匹配的用户
for (HotGoodsCrontabDO crontabDO : crontabDOs) {
log.info("自动分片逻辑, 当前任务:crontabDO:{}", JsonUtil.object2Json(crontabDO));
if (StringUtils.isEmpty(crontabDO.getPortrayal())) {
continue;
}
//热门商品对应的画像实体
MembershipPointDTO membershipPointDTO = JsonUtil.json2Object(crontabDO.getPortrayal(), MembershipPointDTO.class);
if (Objects.isNull(membershipPointDTO)) {
continue;
}
//获取匹配画像的用户实体
MembershipFilterConditionDTO conditionDTO = buildCondition(membershipPointDTO);
PersonaFilterConditionDTO personaFilterConditionDTO = conditionConverter.convertFilterCondition(conditionDTO);
log.info("用户查询条件:{}", personaFilterConditionDTO);
//获取画像用户匹配的用户ID最大最小值
JsonResult<Long> accountMaxIdResult = personaApi.queryMaxIdByCondition(personaFilterConditionDTO);
log.info("获取最大ID,result:{}", JsonUtil.object2Json(accountMaxIdResult));
if (!accountMaxIdResult.getSuccess()) {
log.info("获取最大ID失败,condition:{}", JsonUtil.object2Json(personaFilterConditionDTO));
throw new BaseBizException(accountMaxIdResult.getErrorCode(), accountMaxIdResult.getErrorMessage());
}
JsonResult<Long> accountMinIdResult = personaApi.queryMinIdByCondition(personaFilterConditionDTO);
log.info("获取最小ID,result:{}", JsonUtil.object2Json(accountMinIdResult));
if (!accountMinIdResult.getSuccess()) {
log.info("获取最小ID失败,condition:{}", JsonUtil.object2Json(personaFilterConditionDTO));
throw new BaseBizException(accountMinIdResult.getErrorCode(), accountMinIdResult.getErrorMessage());
}
//需要执行推送的用户起始ID
//注意:这是一个预估值,因为最小ID到最大ID中间会有很多不符合条件的用户
//针对这些用户,需要在下一层的业务逻辑中,用选人条件过滤掉
Long minUserId = accountMinIdResult.getData();
Long maxUserId = accountMaxIdResult.getData();
//bucket就是一个用户分片,对应的是一个startUserId -> endUserId,用户ID范围
//可以根据一定的算法,把千万级用户推送任务分片,比如一个分片后的推送任务包含1000个用户/2000个用户
//userBuckets就有上万条key-value对,每个key-value对就是一个startUserId -> endUserId的推送任务分片
final int userBucketSize = 1000;
Map<Long, Long> userBuckets = new LinkedHashMap<>();
AtomicBoolean doSharding = new AtomicBoolean(true);
long startUserId = minUserId;
log.info("开始对任务人群进行分片,startId:{}",minUserId);
while (doSharding.get()) {
if ((maxUserId -minUserId) < userBucketSize) {
userBuckets.put(startUserId, maxUserId);
doShardingpareAndSet(true, false);
break;
}
userBuckets.put(startUserId, startUserId + userBucketSize);
startUserId += userBucketSize;
maxUserId -= userBucketSize;
}
//把可能成千上万的分片推送任务进行RocketMQ消息的batch合并,以batch模式的发送任务到MQ,减少跟RocketMQ网络通信的耗时
List<String> hotProductPushTasks = new ArrayList<>();
HotGoodsVO hotGoodsVO = buildHotGoodsVO(crontabDO);
PlatformHotProductUserBucketMessage bucketMessage = PlatformHotProductUserBucketMessage.builder()
.hotGoodsVO(JSON.toJSONString(hotGoodsVO))
.personaFilterConditionDTO(JSON.toJSONString(personaFilterConditionDTO))
.build();
for (Map.Entry<Long, Long> userBucket : userBuckets.entrySet()) {
bucketMessage.setEndUserId(userBucket.getValue());
bucketMessage.setStartUserId(userBucket.getKey());
String promotionPushTaskJSON = JsonUtil.object2Json(bucketMessage);
log.info("用户桶构建侧选人条件:{}",bucketMessage.getPersonaFilterConditionDTO());
hotProductPushTasks.add(promotionPushTaskJSON);
}
//MESSAGE_BATCH_SIZE指的是消息batch大小,RocketMQ的每个batch消息包含了100个推送任务
//这样可以将1万个推送任务消息合并为100个batch消息,进行100次网络通信发给RocketMQ,可以大幅度降低发送消息的耗时
ListSplitter splitter = new ListSplitter(hotProductPushTasks, MESSAGE_BATCH_SIZE);
while (splitter.hasNext()) {
List<String> sendBatch = splitter.next();
log.info("本次批次消息数量,{}",sendBatch.size());
sharedSendMsgThreadPool.execute(() -> {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_TOPIC, sendBatch, "平台热门商品定时任务用户桶消息");
});
}
}
}
}
(2)推送系统消费分片推送任务消息,发送具体用户的推送消息到MQ
监听MQ的如下Topic:
代码语言:javascript代码运行次数:0运行复制PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
@Bean("PlatFormHotProductUserBucketConsumer")
public DefaultMQPushConsumer PlatFormHotProductUserBucketConsumer(PlatFormHotProductUserBucketListener platFormHotProductUserBucketListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_TOPIC, "*");
consumer.registerMessageListener(platFormHotProductUserBucketListener);
consumer.start();
return consumer;
}
}
@Component
public class PlatFormHotProductUserBucketListener implements MessageListenerConcurrently {
@DubboReference(version = "1.0.0")
private PersonaApi personaApi;
@Autowired
private DefaultProducer producer;
//公用的消息推送线程池
@Autowired
@Qualifier("sharedSendMsgThreadPool")
private SafeThreadPool sharedSendMsgThreadPool;
//并发消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgList) {
String msg = new String(messageExt.getBody());
PlatformHotProductUserBucketMessage hotProductMessagePushTask = JSON.parseObject(msg , PlatformHotProductUserBucketMessage.class);
log.info("执行热门商品推送用户桶消息逻辑,消息内容:{}", hotProductMessagePushTask);
//1.获取本次热门商品推送任务分片中的数据
String hotGoodString = hotProductMessagePushTask.getHotGoodsVO();
String personaFilterCondition = hotProductMessagePushTask.getPersonaFilterConditionDTO();
HotGoodsVO hotGoodsVO = JSON.parseObject(hotGoodString, HotGoodsVO.class);
log.info("选人条件,内容:{}", personaFilterCondition);
if (Objects.isNull(personaFilterCondition) || Objects.isNull(hotGoodsVO)) {
continue;
}
PersonaFilterConditionDTO conditionDTO = JSON.parseObject(personaFilterCondition, PersonaFilterConditionDTO.class);
Long startUserId = hotProductMessagePushTask.getStartUserId();
Long endUserId = hotProductMessagePushTask.getEndUserId();
//分页查询条件
PersonaConditionWithIdRange page = PersonaConditionWithIdRange.builder()
.memberLevel(conditionDTO.getMemberLevel())
.memberPoint(conditionDTO.getMemberPoint())
.startId(startUserId)
.endId(endUserId)
.build();
//2.查询本次推送任务分片对应的用户群体
//注意:查询的时候,传入查询条件过滤掉不符合条件的用户id
JsonResult<List<Long>> queryAccountIdsResult = personaApi.getAccountIdsByIdRange(page);
List<Long> accountIds = queryAccountIdsResult.getData();
PlatformHotProductMessage hotMessage = PlatformHotProductMessage.builder()
.goodsName(hotGoodsVO.getGoodsName())
.goodsDesc(hotGoodsVO.getGoodsDesc())
.keyWords(hotGoodsVO.getKeyWords())
.build();
int handledBucketCount = 0;
List<String> messages = new ArrayList<>();
for (Long accountId : accountIds) {
handledBucketCount++;
hotMessage.setAccountId(accountId);
log.info("构造热门商品MQ消息, hotMessage: {}", hotMessage);
messages.add(JSON.toJSONString(hotMessage));
}
ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
while (splitter.hasNext()) {
List<String> sendBatch = splitter.next();
log.info("本次批次消息数量,{}", sendBatch.size());
sharedSendMsgThreadPool.execute(() -> {
producer.sendMessages(RocketMqConstant.PLATFORM_HOT_PRODUCT_SEND_TOPIC, sendBatch, "平台热门商品定时任务用户桶消息");
});
}
}
} catch (Exception e) {
log.error("consume error,热门商品通知消费失败", e);
//这边因为是推送任务,个别失败也可以直接丢弃
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
(3)推送系统消费具体用户的推送消息进行真正推送
监听MQ的如下Topic:
代码语言:javascript代码运行次数:0运行复制PLATFORM_HOT_PRODUCT_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
@Bean("platformHotProductSendTopicConsumer")
public DefaultMQPushConsumer platformHotProductConsumer(PlatFormHotProductListener platFormHotProductListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_HOT_PRODUCT_SEND_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(PLATFORM_HOT_PRODUCT_SEND_TOPIC, "*");
consumer.registerMessageListener(platFormHotProductListener);
consumer.start();
return consumer;
}
}
@Component
public class PlatFormHotProductListener implements MessageListenerConcurrently {
//并发消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgList) {
log.debug("执行平台发送通知消息逻辑,消息内容:{}", messageExt.getBody());
String msg = new String(messageExt.getBody());
HashMap hotProductMessage = JSON.parseObject(msg , HashMap.class);
//推送通知
informByPush(hotProductMessage);
}
} catch (Exception e) {
log.error("consume error,平台优惠券消费失败", e);
//本次消费失败,下次重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//第三方平台推送消息到app
private void informByPush(HashMap message){
String messageBody = "速戳!精致小物件,"
+ message.get("keywords")+"!"
+ message.get("goodsName")
+ message.get("goodsDesc");
log.info("消息推送中:消息内容:{}", messageBody);
}
}
14.生产环境百万级用户PUSH全链路压测
百万级用户Push生产环境的部署:营销系统部署了5台2核4G机器、推送系统部署了5台2核4G机器、会员系统1台、画像系统1台。
假设会员系统⼀共有150w的⽤户数据,现在开启⼀次全员推送通知的优惠活动,总计MQ的数据量⼤概就是150w。开启⼀个全员活动的时间,⼤概⽤时27分钟左右,即完成了全部消息的推送,整体效率还是算⽐较⾼的。所以如果是千万级别的推送,⼤概也就是需要27 * 5⼤概是3个⼩时左右,和我们的预期是⽐较相似的。
150万的用户推送任务,按1000用户进行分片,那么总共会有1500个分片任务,每个分片任务需处理1000条消息。这1500个人分片任务通过batch合并发送到MQ,也非常快,假设每100个分片合并成一个batch,才15个batch消息。5台推送机器,每台机器开启30个线程,那么总共有150个线程。假设一个线程完成一条消息的推送需要200ms,那么每个线程每秒能推送5条消息,5台机器每秒能推送750条消息。150万 / 750 = 2000s = 30分钟,这就是通过计算预估的结果,和实际的27分钟相差不大。
由于有些场景下,这种全员性质的消息推送,是不需要接收推送结果的。如果直接放弃推送结果的获取操作,效率还能稍微有所提升。
RocketMQ实战—10.营销系统代码优化二
大纲
1.营销系统引入MQ实现异步化来进行性能优化
2.基于MQ释放优惠券提升系统扩展性
3.基于Redis实现重复促销活动去重
4.基于促销活动创建事件实现异步化
5.推送任务分片和分片消息batch合并发送实现
6.推送系统与用户群体查询逻辑解耦
7.查询用户数据以及批量发送推送消息
8.线程池封装以及推送系统多线程推送
9.推送系统的千万级消息多线程推送
10.千万级用户惰性发券代码实现
11.指定用户群体发券的代码实现
12.分片消息的batch合并算法重构实现
13.百万画像群体爆款商品推送代码实现
14.生产环境百万级用户PUSH全链路压测
10.千万级用户惰性发券代码实现
(1)给全量用户发放优惠券的初版实现
(2)给全量用户惰性发放优惠券的优化实现
(1)给全量用户发放优惠券的初版实现
首先营销系统对全量用户发放优惠券的任务进行分片,然后将分片的消息发送到如下Topic。
代码语言:javascript代码运行次数:0运行复制PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC
代码语言:javascript代码运行次数:0运行复制@RestController
@RequestMapping("/demo/promotion/coupon")
public class PromotionCouponController {
//优惠活动service
@Autowired
private CouponService couponService;
//新增一个优惠券活动
@PostMapping
public JsonResult<SaveOrUpdateCouponDTO> saveOrUpdateCoupon(@RequestBody SaveOrUpdateCouponRequest request) {
try {
log.info("新增一条优惠券:{}", JSON.toJSONString(request));
SaveOrUpdateCouponDTO dto = couponService.saveOrUpdateCoupon(request);
return JsonResult.buildSuccess(dto);
} catch (BaseBizException e) {
log.error("biz error: request={}", JSON.toJSONString(request), e);
return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
} catch (Exception e) {
log.error("system error: request={}", JSON.toJSONString(request), e);
return JsonResult.buildError(e.getMessage());
}
}
...
}
//优惠券接口实现
@Service
public class CouponServiceImpl implements CouponService {
...
//保存/修改优惠券活动方法
@Transactional(rollbackFor = Exception.class)
@Override
public SaveOrUpdateCouponDTO saveOrUpdateCoupon(SaveOrUpdateCouponRequest request) {
SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(request);
couponDO.setCouponReceivedCount(0);
salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO);
//为所有用户发放优惠券
sendPlatformCouponMessage(couponDO);
SaveOrUpdateCouponDTO dto = new SaveOrUpdateCouponDTO();
dto.setCouponName(request.getCouponName());
dto.setRule(request.getCouponRule());
dto.setSuccess(true);
return dto;
}
...
//为所有用户发放优惠券
private void sendPlatformCouponMessage(SalesPromotionCouponDO promotionCouponDO) {
//桶的大小
final int userBucketSize = 1000;
final int messageBatchSize = 100;
//1.查询出库里面最大的userId,作为用户的总数量
JsonResult<Long> maxUserIdJsonResult = accountApi.queryMaxUserId();
if (maxUserIdJsonResult.getSuccess()) {
throw new BaseBizException(maxUserIdJsonResult.getErrorCode(), maxUserIdJsonResult.getErrorMessage());
}
Long maxUserId = maxUserIdJsonResult.getData();
//2.分成m个桶,每个桶里面有n个用户,每个桶发送一条"批量发送优惠券用户桶消息"
Map<Long, Long> userBuckets = new LinkedHashMap<>();
AtomicBoolean flagRef = new AtomicBoolean(true);
long startUserId = 1L;
while (flagRef.get()) {
if (startUserId > maxUserId) {
flagRefpareAndSet(true, false);
}
userBuckets.put(startUserId, startUserId + userBucketSize);
startUserId += userBucketSize;
}
//3.批量发送消息
//例:userBucketCount = 1000; messageBatchSize = 100
//批量发送次数 = 10次,经过两次分桶,这里发送消息的次数从100w次降到10次
int handledBucketCount = 0;
List<String> jsonMessageBatch = new ArrayList<>(messageBatchSize);
for (Map.Entry<Long, Long> userBucket : userBuckets.entrySet()) {
handledBucketCount++;
PlatformCouponUserBucketMessage message = PlatformCouponUserBucketMessage.builder()
.startUserId(userBucket.getKey())
.endUserId(userBucket.getValue())
.informType(promotionCouponDO.getInformType())
.couponId(promotionCouponDO.getId())
.activityStartTime(promotionCouponDO.getActivityStartTime())
.activityEndTime(promotionCouponDO.getActivityEndTime())
.couponType(promotionCouponDO.getCouponType())
.build();
String jsonMessage = JsonUtil.object2Json(message);
jsonMessageBatch.add(jsonMessage);
if (jsonMessageBatch.size() == messageBatchSize || handledBucketCount == userBuckets.size()) {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC, jsonMessageBatch, "平台发放优惠券用户桶消息");
jsonMessageBatch.clear();
}
}
}
}
接着营销系统监听如下Topic消费分片后的发放优惠券任务。
代码语言:javascript代码运行次数:0运行复制PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC
此时有两种处理方法:一.直接使用线程池进行发送发放优惠券消息到MQ。二.合并batch后再使用线程池发送MQ。
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
//平台发放优惠券用户桶消费者
@Bean("platformCouponUserBucketReceiveTopicConsumer")
public DefaultMQPushConsumer receiveCouponUserBucketConsumer(@Qualifier("platformCouponUserBucketReceiveTopicConsumer")PlatFormCouponUserBucketListener platFormCouponUserBucketListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_COUPON_SEND_USER_BUCKET_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC, "*");
consumer.registerMessageListener(platFormCouponUserBucketListener);
consumer.start();
return consumer;
}
}
@Component
public class PlatFormCouponUserBucketListener implements MessageListenerConcurrently {
//账户服务
@DubboReference(version = "1.0.0")
private AccountApi accountApi;
//发送消息共用的线程池
@Autowired
@Qualifier("sharedSendMsgThreadPool")
private SafeThreadPool sharedSendMsgThreadPool;
//RocketMQ生产者
@Autowired
private DefaultProducer defaultProducer;
//并发消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgList) {
//1.反序列化消息
String messageString = new String(messageExt.getBody());
log.debug("执行平台发送优惠券用户桶消息逻辑,消息内容:{}", messageString);
PlatformCouponUserBucketMessage message = JSON.parseObject(messageString, PlatformCouponUserBucketMessage.class);
//2.查询桶内的用户信息
Long startUserId = message.getStartUserId();
Long endUserId = message.getEndUserId();
JsonResult<List<MembershipAccountDTO>> accountBucketResult = accountApi.queryAccountByIdRange(startUserId, endUserId);
if (!accountBucketResult.getSuccess()) {
throw new BaseBizException(accountBucketResult.getErrorCode(), accountBucketResult.getErrorMessage());
}
List<MembershipAccountDTO> accountBucket = accountBucketResult.getData();
if (CollectionUtils.isEmpty(accountBucket)) {
log.info("根据用户桶内的id范围没有查询到用户, startUserId={}, endUserId{}", startUserId, endUserId);
continue;
}
//3.每个用户发送一条"平台发送优惠券消息"
//方法一:直接使用线程池进行发送发放优惠券消息到MQ;
//这里是并行消费的,以上逻辑已经是并行执行的了,而且有查库的操作
//accountBucket 默认是 1000 个用户,要为每一个用户都发送一条"平台发送优惠券消息",也就是1000条消息
//下面我们使用线程池来并行发送这1000条消息(ps:另一种也可以像发送优惠券用户桶消息一样用批量发送)
PlatformCouponMessage couponMessage = PlatformCouponMessage.builder()
.couponId(message.getCouponId())
.activityStartTime(message.getActivityStartTime())
.activityEndTime(message.getActivityEndTime())
.couponType(message.getCouponType())
.build();
for (MembershipAccountDTO account : accountBucket) {
sharedSendMsgThreadPool.execute(() -> {
couponMessage.setUserAccountId(account.getId());
String jsonMessage = JSON.toJSONString(couponMessage);
defaultProducer.sendMessage(RocketMqConstant.PLATFORM_COUPON_SEND_TOPIC, jsonMessage, "平台发送优惠券消息");
});
}
//方法二:合并batch后再使用线程池发送MQ
/*List<String> messages = new ArrayList<>(100);
for (MembershipAccountDTO account : accountBucket) {
couponMessage.setUserAccountId(account.getId());
messages.add(JSON.toJSONString(couponMessage));
if (messages.size() == 100) {
sharedSendMsgThreadPool.execute(() -> {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_COUPON_SEND_TOPIC, messages, "平台发送优惠券消息");
});
messages.clear();
}
}
//最后剩下的也批量发出
if (!CollectionUtils.isEmpty(messages)) {
sharedSendMsgThreadPool.execute(() -> {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, messages, "平台发送促销活动消息");
});
messages.clear();
}*/
}
} catch (Exception e){
log.error("consume error,平台优惠券消费失败", e);
//本次消费失败,下次重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
最后营销系统监听如下Topic,然后通过线程池对每个用户发放优惠券。
代码语言:javascript代码运行次数:0运行复制PLATFORM_PROMOTION_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
//平台发放优惠券领取消费者
@Bean("platformCouponReceiveTopicConsumer")
public DefaultMQPushConsumer receiveCouponConsumer(PlatFormCouponListener platFormCouponListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_COUPON_SEND_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(PLATFORM_COUPON_SEND_TOPIC, "*");
consumer.registerMessageListener(platFormCouponListener);
consumer.start();
return consumer;
}
}
@Component
public class PlatFormCouponListener implements MessageListenerConcurrently {
//优惠券服务service
@Autowired
private CouponItemService couponItemService;
//测试completableFuture使用commonPool的时是不需要初始化业务ThreadPoolExecutor的
//这里用supplier懒加载让测试completableFuture使用commonPool时不要初始化线程池
//只有当使用completableFuture使用自定义的线程时才初始化线程池
private static final int PERMITS = 30;
private static final AtomicBoolean initializedRef = new AtomicBoolean(false);
private static ThreadPoolExecutor THREAD_POOL_EXECUTOR = null;
private static final Supplier<ThreadPoolExecutor> THREAD_POOL_EXECUTOR_SUPPLIER = () -> {
if (initializedRefpareAndSet(false, true)) {
THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(PERMITS, PERMITS * 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), NamedDaemonThreadFactory.getInstance("consumeCouponMsg"), new ThreadPoolExecutor.CallerRunsPolicy());
}
return THREAD_POOL_EXECUTOR;
};
//并发消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
try {
//方式一:使用默认的commonPool来处理任务
//supplyAsync(Supplier<U> supplier) API
//默认使用的是 ForkJoinPoolmonPool() 这个线程池
//该线程池在jvm内是唯一的,默认的线程数量是cpu的核数减1
//如果觉得线程数不够用可以通过jvm系统参数 java.util.concurrent.ForkJoinPoolmon.parallelism 的值调整commonPool的并行度,或者采用方式二
List<CompletableFuture<SalesPromotionCouponItemDTO>> futureList = msgList.stream()
.map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e)))
.collect(Collectors.toList());
//方式二:使用自定的业务线程池来处理任务
//List<CompletableFuture<SalesPromotionCouponItemDTO>> futureList = msgList.stream()
// .map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e), THREAD_POOL_EXECUTOR_SUPPLIER.get()))
// .collect(Collectors.toList());
List<SalesPromotionCouponItemDTO> couponItemDTOList = futureList.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
//优惠券保存到数据库
couponItemService.saveCouponBatch(couponItemDTOList);
} catch (Exception e) {
log.error("consume error,平台优惠券消费失败", e);
//本次消费失败,下次重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
public SalesPromotionCouponItemDTO handleMessageExt(MessageExt messageExt) {
log.debug("执行平台发放优惠券消费消息逻辑,消息内容:{}", messageExt.getBody());
String msg = new String(messageExt.getBody());
PlatformCouponMessage platformCouponMessage = JSON.parseObject(msg , PlatformCouponMessage.class);
log.info("开始发放平台优惠券,couponId:{}", platformCouponMessage.getCouponId());
//幂等逻辑防止重复消费
JsonResult<Long> result = couponItemService.selectByAccountIdAndCouponId(platformCouponMessage.getUserAccountId(), platformCouponMessage.getCouponId());
//如果已经存在,直接跳过循环,不再执行优惠券保存操作
if (result.getSuccess()) {
return null;
}
SalesPromotionCouponItemDTO itemDTO = new SalesPromotionCouponItemDTO();
itemDTO.setCouponId(platformCouponMessage.getCouponId());
itemDTO.setCouponType(platformCouponMessage.getCouponType());
itemDTO.setUserAccountId(platformCouponMessage.getUserAccountId());
itemDTO.setIsUsed(0);
itemDTO.setActivityStartTime(platformCouponMessage.getActivityStartTime());
itemDTO.setActivityEndTime(platformCouponMessage.getActivityEndTime());
return itemDTO;
}
}
(2)给全量用户惰性发放优惠券的优化实现
一.首先需要配置好要使用的Redis相关Bean
代码语言:javascript代码运行次数:0运行复制@Data
@Configuration
@ConditionalOnClass(RedisConnectionFactory.class)
public class RedisConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.timeout}")
private int timeout;
@Bean
@ConditionalOnClass(RedisConnectionFactory.class)
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setDefaultSerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
@ConditionalOnClass(RedissonClient.class)
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + host + ":" + port)
.setPassword(password)
.setConnectionMinimumIdleSize(10)
.setConnectionPoolSize(100)
.setIdleConnectionTimeout(600000)
.setSubscriptionConnectionMinimumIdleSize(10)
.setSubscriptionConnectionPoolSize(100)
.setTimeout(timeout);
config.setCodec(new StringCodec());
config.setThreads(5);
config.setNettyThreads(5);
RedissonClient client = Redisson.create(config);
return client;
}
@Bean
@ConditionalOnClass(RedisConnectionFactory.class)
public RedisCache redisCache(RedisTemplate redisTemplate) {
return new RedisCache(redisTemplate);
}
}
二.然后实现基于Redisson分布式锁维护优惠券缓存列表,以及惰性优惠券过期缓存清理
其实就是新增优惠券时,将优惠券信息写入Redis,并检查优惠券是否过期,如果过期就进行删除。
代码语言:javascript代码运行次数:0运行复制//优惠券接口实现
@Service
public class CouponServiceImpl implements CouponService {
//Redis客户端工具
@Autowired
private RedisCache redisCache;
@Autowired
private RedissonClient redissonClient;
...
//保存/修改优惠券活动方法
@Transactional(rollbackFor = Exception.class)
@Override
public SaveOrUpdateCouponDTO saveOrUpdateCoupon(SaveOrUpdateCouponRequest request) {
SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(request);
couponDO.setCouponReceivedCount(0);
salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO);
//判断优惠券类型
if (CouponSendTypeEnum.PLATFORM_SEND.getCode().equals(request.getCouponReceiveType())) {
//一.如果是系统发放类型,则针对所有用户,发送优惠券到MQ
writeCouponToRedis(couponDO);
} else {
//二.如果是自己领取类型
//TODO
}
SaveOrUpdateCouponDTO dto = new SaveOrUpdateCouponDTO();
dto.setCouponName(request.getCouponName());
dto.setRule(request.getCouponRule());
dto.setSuccess(true);
return dto;
}
private void writeCouponToRedis(SalesPromotionCouponDO coupon) {
//首先需要用Redisson基于Redis做一个分布式锁的加锁PROMOTION_COUPON_ID_LIST_LOCK
//再去维护一个"一共发出去了多少张券"的数据结构PROMOTION_COUPON_ID_LIST
RLock lock = redissonClient.getLock(RedisKey.PROMOTION_COUPON_ID_LIST_LOCK);
try {
//进行加锁,超时时间为60s释放
lock.lock(60, TimeUnit.SECONDS);
List<Long> couponIds = null;
String couponIdsJSON = redisCache.get(RedisKey.PROMOTION_COUPON_ID_LIST);
if (couponIdsJSON == null || couponIdsJSON.equals("")) {
couponIds = new ArrayList<>();
} else {
couponIds = JSON.parseObject(couponIdsJSON, List.class);
}
//检查每个优惠券时间是否过期了,如果过期或者已经发完了券,则把它从List里删除,以及从Redis里删除
//如果是全量发券,则不会发完,因此可以给所有人发,如果超过了时间才不能发券
if (couponIds.size() > 0) {
Iterator<Long> couponIdIterator = couponIds.iterator();
while (couponIdIterator.hasNext()) {
Long tempCouponId = couponIdIterator.next();
String tempCouponJSON = redisCache.get(RedisKey.PROMOTION_COUPON_KEY + "::" + tempCouponId);
SalesPromotionCouponDO tempCoupon = JSON.parseObject(tempCouponJSON, SalesPromotionCouponDO.class);
Date now = new Date();
if (now.after(tempCoupon.getActivityEndTime())) {
couponIdIterator.remove();
redisCache.delete(RedisKey.PROMOTION_COUPON_KEY + "::" + tempCouponId);
}
}
}
couponIds.add(coupon.getId());
couponIdsJSON = JsonUtil.object2Json(couponIds);
redisCache.set(RedisKey.PROMOTION_COUPON_ID_LIST, couponIdsJSON, -1);
String couponJSON = JsonUtil.object2Json(coupon);
redisCache.set(RedisKey.PROMOTION_COUPON_KEY + "::" + coupon.getId(), couponJSON, -1);
} finally {
lock.unlock();
}
}
}
三.接着实现会员系统发布用户登录事件 + 营销系统在用户登录后的惰性发券
会员系统在用户登录时会发送消息到MQ的USER_LOGINED_EVENT_TOPIC:
代码语言:javascript代码运行次数:0运行复制@RestController
@RequestMapping("/demo/membership")
public class MembershipController {
@Autowired
private DefaultProducer defaultProducer;
@Autowired
private MembershipAccountService accountService;
//触发用户登录
@PostMapping("/triggerUserLoginEvent")
public JsonResult<Boolean> triggerUserLoginEvent(Long accountId) {
try {
List<MembershipAccountDTO> accounts = accountService.queryAccountByIdRange(accountId, accountId);
if (accounts != null && accounts.size() > 0) {
MembershipAccountDTO account = accounts.get(0);
UserLoginedEvent userLoginedEvent = new UserLoginedEvent();
userLoginedEvent.setAccount(account);
String userLoginedEventJSON = JsonUtil.object2Json(userLoginedEvent);
defaultProducer.sendMessage(RocketMqConstant.USER_LOGINED_EVENT_TOPIC, userLoginedEventJSON, "用户登录事件发生了");
}
return JsonResult.buildSuccess(true);
} catch (BaseBizException e) {
log.error("biz error: request={}", accountId, e);
return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
} catch (Exception e) {
log.error("system error: request={}", accountId, e);
return JsonResult.buildError(e.getMessage());
}
}
}
营销系统监听USER_LOGINED_EVENT_TOPIC对用户登录时发送的登录事件消息进行惰性发券:
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
@Bean("userLoginedEventListener")
public DefaultMQPushConsumer userLoginedEventListener(UserLoginedEventListener userLoginedEventListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(USER_LOGINED_EVENT_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(USER_LOGINED_EVENT_TOPIC, "*");
consumer.registerMessageListener(userLoginedEventListener);
consumer.start();
return consumer;
}
}
//用户登录事件监听器
@Component
public class UserLoginedEventListener implements MessageListenerConcurrently {
@Autowired
private RedisCache redisCache;
@Autowired
private SalesPromotionCouponItemDAO couponItemDAO;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
//从Redis缓存里查询所有的优惠券
String couponIdsJSON = redisCache.get(RedisKey.PROMOTION_COUPON_ID_LIST);
List<Long> couponIds = JSON.parseObject(couponIdsJSON, List.class);
List<SalesPromotionCouponDO> coupons = new ArrayList<>();
for (Long couponId : couponIds) {
String couponJSON = redisCache.get(RedisKey.PROMOTION_COUPON_KEY + "::" + couponId);
SalesPromotionCouponDO coupon = JSON.parseObject(couponJSON, SalesPromotionCouponDO.class);
Date now = new Date();
if (now.after(coupon.getActivityStartTime()) && now.before(coupon.getActivityEndTime())) {
coupons.add(coupon);
}
}
for (MessageExt messageExt : list) {
//这个代码就可以拿到一个刚刚登陆成功的用户
String message = new String(messageExt.getBody());
UserLoginedEvent userLoginedEvent = JSON.parseObject(message, UserLoginedEvent.class);
MembershipAccountDTO account = userLoginedEvent.getAccount();
//遍历每一个优惠券,检查这个优惠券是否有效,是否还可以继续进行发券,以及当前用户是否发过券,然后才给用户进行发券
for (SalesPromotionCouponDO coupon : coupons) {
String receiveCouponFlag = redisCache.get(RedisKey.PROMOTION_USER_RECEIVE_COUPON + "::" + account.getId() + "::" + coupon.getId());
if (receiveCouponFlag == null || receiveCouponFlag.equals("")) {
SalesPromotionCouponItemDO couponItem = new SalesPromotionCouponItemDO();
couponItem.setActivityEndTime(coupon.getActivityEndTime());
couponItem.setActivityStartTime(coupon.getActivityStartTime());
couponItem.setCouponId(coupon.getId());
couponItem.setCouponType(coupon.getCouponType());
couponItem.setCreateTime(new Date());
couponItem.setCreateUser(account.getId());
couponItem.setIsUsed(0);
couponItem.setUpdateTime(new Date());
couponItem.setUpdateUser(account.getId());
couponItem.setUserAccountId(account.getId());
couponItemDAO.receiveCoupon(couponItem);
redisCache.set(RedisKey.PROMOTION_USER_RECEIVE_COUPON + "::" + account.getId() + "::" + coupon.getId(), "true", -1);
}
}
}
} catch(Exception e) {
log.error("consume error, 用户登录事件处理异常", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
11.指定用户群体发券的代码实现
(1)千万级用户推送和发放优惠券的方案总结
(2)指定用户群体发券实现
(1)千万级用户推送和发放优惠券的方案总结
创建促销活动和发放优惠券时,都不是直接进行推送和发放的,而是利用RocketMQ进行多次中转、异步化处理。
营销系统首先会查出用户总数,然后进行任务分片,接着把分片任务消息通过batch合并发送到MQ(异步化提升性能)。
营销系统会消费这些分片任务消息,并查询出用户和封装好每个用户的消息,然后发到MQ(解耦会员系统和推送系统)。
推送系统最后会消费每个用户的推送消息,并基于线程池采用多线程并发进行推送。
(2)指定用户群体发券实现
典型的例子就是为激活百万不活跃用户发放优惠券。
一.营销系统创建指定用户群体发券的入口代码
代码语言:javascript代码运行次数:0运行复制@RestController
@RequestMapping("/demo/promotion/coupon")
public class PromotionCouponController {
...
@RequestMapping("/send")
public JsonResult<SendCouponDTO> sendCouponByConditions(@RequestBody SendCouponRequest request) {
try {
log.info("发送优惠券给指定用户群体:{}", JSON.toJSONString(request));
SendCouponDTO dto = couponService.sendCouponByConditions(request);
return JsonResult.buildSuccess(dto);
} catch (BaseBizException e) {
log.error("biz error: request={}", JSON.toJSONString(request), e);
return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
} catch (Exception e) {
log.error("system error: request={}", JSON.toJSONString(request), e);
return JsonResult.buildError(e.getMessage());
}
}
}
@Service
public class CouponServiceImpl implements CouponService {
@DubboReference(version = "1.0.0")
private MessagePushApi messagePushApi;
...
@Transactional(rollbackFor = Exception.class)
@Override
public SendCouponDTO sendCouponByConditions(SendCouponRequest sendCouponRequest) {
//保存优惠券信息
SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(sendCouponRequest);
couponDO.setCouponReceivedCount(0);
couponDO.setCouponStatus(CouponStatusEnum.NORMAL.getCode());
couponDO.setCouponReceiveType(CouponSendTypeEnum.SELF_RECEIVE.getCode());
salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO);
//分片和批量发送发放优惠券消息
shardBatchSendCouponMessage(sendCouponRequest);
SendCouponDTO sendCouponDTO = new SendCouponDTO();
sendCouponDTO.setSuccess(Boolean.TRUE);
sendCouponDTO.setCouponName(sendCouponRequest.getCouponName());
sendCouponDTO.setRule(sendCouponRequest.getCouponRule());
//TODO 发放数量
sendCouponDTO.setSendCount(0);
return sendCouponDTO;
}
}
二.营销系统分片和批量发送发券消息的代码
其中要去画像系统获取用户信息,然后根据用户数量进行分片,接着进行批量发送,发送到MQ的如下Topic。
代码语言:javascript代码运行次数:0运行复制PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC
代码语言:javascript代码运行次数:0运行复制@Service
public class CouponServiceImpl implements CouponService {
...
//分片和批量发送发放优惠券消息
private void shardBatchSendCouponMessage(SendCouponRequest sendCouponRequest) {
//1.到画像系统获取当前条件下的count值
MembershipFilterDTO membershipFilterDTO = sendCouponRequest.getMembershipFilterDTO();
PersonaFilterConditionDTO conditionDTO = conditionConverter.convertFilterCondition(membershipFilterDTO);
JsonResult<Integer> countResult = personaApi.countByCondition(conditionDTO);
if (!countResult.getSuccess()) {
throw new BaseBizException(countResult.getErrorCode(), countResult.getErrorMessage());
}
//2.根据count值分片
//分成m个分片,每个分片中包含:(1)分片ID;(2)用户个数;
//例:maxUserId = 100w; userBucketSize=1000
//userBucket1 = [1, 1000)
//userBucket2 = [2, 1000)
//userBucket2 = [n, 756),最后一个分片可能数量不足1000
//userBucketCount = 1000
Integer count = countResult.getData();
Map<Integer, Integer> userBuckets = new LinkedHashMap<>();
AtomicBoolean flagRef = new AtomicBoolean(true);
Integer shardId = 1;
while (flagRef.get()) {
if (USER_BUCKET_SIZE > count) {
userBuckets.put(shardId, USER_BUCKET_SIZE);
flagRefpareAndSet(true, false);
}
userBuckets.put(shardId, USER_BUCKET_SIZE);
shardId += 1;
count -= USER_BUCKET_SIZE;
}
//3.批量发送消息
//例:userBucketCount = 1000; messageBatchSize = 100
List<String> messages = new ArrayList<>();
PlatformPromotionConditionUserBucketMessage message = PlatformPromotionConditionUserBucketMessage.builder().personaFilterCondition(JSON.toJSONString(conditionDTO)).build();
for (Map.Entry<Integer, Integer> userBucket : userBuckets.entrySet()) {
message.setShardId(userBucket.getKey());
message.setBucketSize(userBucket.getValue());
String jsonMessage = JsonUtil.object2Json(message);
messages.add(jsonMessage);
}
log.info("本次推送消息数量,{}",messages.size());
ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
while (splitter.hasNext()) {
List<String> sendBatch = splitter.next();
log.info("本次批次消息数量,{}",sendBatch.size());
sharedSendMsgThreadPool.execute(() -> {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC, sendBatch, "部分用户优惠活动用户桶消息");
});
}
}
}
三.营销系统对指定用户群体分片消息的处理和推送代码
首先,营销系统会消费MQ的如下Topic:
代码语言:javascript代码运行次数:0运行复制PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC
然后,营销系统会把领取优惠券的消息会发送到MQ的如下Topic:
代码语言:javascript代码运行次数:0运行复制PLATFORM_CONDITION_COUPON_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
@Bean("platFormConditionCouponUserBucketConsumer")
public DefaultMQPushConsumer platFormConditionCouponUserBucketConsumer(PlatFormConditionCouponUserBucketListener platFormPromotionUserBucketListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC, "*");
consumer.registerMessageListener(platFormPromotionUserBucketListener);
consumer.start();
return consumer;
}
}
@Component
public class PlatFormConditionCouponUserBucketListener implements MessageListenerConcurrently {
//用户画像服务
@DubboReference(version = "1.0.0")
private PersonaApi personaApi;
//发送消息共用的线程池
@Autowired
@Qualifier("sharedSendMsgThreadPool")
private SafeThreadPool sharedSendMsgThreadPool;
//RocketMQ生产者
@Autowired
private DefaultProducer defaultProducer;
//并发消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgList) {
//1.反序列化消息
String messageString = new String(messageExt.getBody());
log.debug("部分用户领取优惠券用户桶消息逻辑,消息内容:{}", messageString);
PlatformPromotionConditionUserBucketMessage message = JSON.parseObject(messageString, PlatformPromotionConditionUserBucketMessage.class);
//2.查询桶内的用户信息
Integer shardId = message.getShardId();
//根据分片id,和分片数量大小,计算出本次分片的起始userId
Long startUserId = (shardId.longValue() - 1) * 1000;
Integer bucketSize = message.getBucketSize();
String personaFilterCondition = message.getPersonaFilterCondition();
PersonaFilterConditionDTO personaFilterConditionDTO = JSON.parseObject(personaFilterCondition, PersonaFilterConditionDTO.class);
//封装查询用户id的条件
PersonaConditionPage page = PersonaConditionPage.builder()
.memberPoint(personaFilterConditionDTO.getMemberPoint())
.memberLevel(personaFilterConditionDTO.getMemberLevel())
.offset(startUserId)
.limit(bucketSize)
.build();
//从用户画像系统查询用户账号id
JsonResult<List<Long>> accountIdsResult = personaApi.getAccountIdsByIdLimit(page);
if (!accountIdsResult.getSuccess()) {
throw new BaseBizException(accountIdsResult.getErrorCode(), accountIdsResult.getErrorMessage());
}
List<Long> accountIds = accountIdsResult.getData();
if (CollectionUtils.isEmpty(accountIds)) {
log.info("根据用户桶内的分片信息没有查询到用户, shardId={}", shardId);
continue;
}
//3.每个用户发送一条领取优惠券的消息通知
PlatformMessagePushMessage pushMessage = PlatformMessagePushMessage.builder()
.message("恭喜您获得优惠券领取资格,点击www.wjunt进入活动页面")
.mainMessage("获得优惠券领取资格")
.informType(InformTypeEnum.APP.getCode())
.build();
List<String> messages = new ArrayList<>();
for (Long accountId : accountIds) {
pushMessage.setUserAccountId(accountId);
messages.add(JSON.toJSONString(pushMessage));
}
log.info("本次推送消息数量,{}",messages.size());
ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
while (splitter.hasNext()) {
List<String> sendBatch = splitter.next();
sharedSendMsgThreadPool.execute(() -> {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_CONDITION_COUPON_SEND_TOPIC, sendBatch, "平台发送优惠券消息");
});
}
}
} catch (Exception e){
log.error("consume error,消费失败", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
四.推送系统会监听如下Topic消费消息推送领取优惠券的消息
代码语言:javascript代码运行次数:0运行复制PLATFORM_CONDITION_COUPON_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
@Bean("platFormConditionCouponConsumer")
public DefaultMQPushConsumer platFormConditionCouponConsumer(PlatFormConditionCouponListener platFormPromotionListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_CONDITION_COUPON_SEND_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(PLATFORM_CONDITION_COUPON_SEND_TOPIC, "*");
consumer.registerMessageListener(platFormPromotionListener);
consumer.start();
return consumer;
}
}
@Component
public class PlatFormConditionCouponListener implements MessageListenerConcurrently {
//消息推送工厂提供者
@Autowired
private FactoryProducer factoryProducer;
@Autowired
private RedisTemplate<String, String> redisTemplate;
//并发消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
try {
//方式二:使用自定的业务线程池来处理任务
List<CompletableFuture<AltResult>> futureList = msgList.stream()
.map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e)))
.collect(Collectors.toList());
List<Throwable> resultList = futureList.stream()
.map(CompletableFuture::join)
.filter(e -> e.ex != null)
.map(e -> e.ex)
.collect(Collectors.toList());
} catch (Exception e) {
log.error("consume error,消费失败", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private AltResult handleMessageExt(MessageExt messageExt) {
try {
log.debug("执行平台发送通知消息逻辑,消息内容:{}", messageExt.getBody());
String msg = new String(messageExt.getBody());
PlatformMessagePushMessage message = JSON.parseObject(msg , PlatformMessagePushMessage.class);
//幂等控制
if (StringUtils.isNotBlank(redisTemplate.opsForValue().get(PROMOTION_CONDITION_COUPON_KEY + message.getUserAccountId()))) {
return new AltResult(null);
}
//获取消息服务工厂
MessageSendServiceFactory messageSendServiceFactory = factoryProducer.getMessageSendServiceFactory(message.getInformType());
//消息发送服务组件
MessageSendService messageSendService = messageSendServiceFactory.createMessageSendService();
//构造消息
PlatformMessagePushMessage messagePushMessage = PlatformMessagePushMessage.builder()
.informType(message.getInformType())
.mainMessage(message.getMainMessage())
.userAccountId(message.getUserAccountId())
.message(message.getMessage())
.build();
MessageSendDTO messageSendDTO = messageSendServiceFactory.createMessageSendDTO(messagePushMessage);
messageSendService.send(messageSendDTO);
//发送成功之后把已经发送成功记录到redis
redisTemplate.opsForValue().set(PROMOTION_CONDITION_COUPON_KEY + message.getUserAccountId(), UUID.randomUUID().toString());
log.info("消息推送完成,messageSendDTO:{}", messageSendDTO);
return new AltResult(null);
} catch (Exception e) {
return new AltResult(e);
}
}
//completableFuture的返回结果,适用于无返回值的情况
//ex字段为null表示任务执行成功
//ex字段不为null表示任务执行失败,并把异常设置为ex字段
private static class AltResult {
final Throwable ex;
public AltResult(Throwable ex) {
this.ex = ex;
}
}
}
12.分片消息的batch合并算法重构实现
之前的batch合并是按照条数进行合并的,现在重构为按照合并后的大小不超过800KB和不超过100条进行合并。
代码语言:javascript代码运行次数:0运行复制public class ListSplitter implements Iterator<List<String>> {
//设置每一个batch最多不超过800k,因为RocketMQ官方推荐,一条消息不建议长度超过1MB
//而封装一个RocketMQ的message,包括了MessageBody, Topic,Addr等数据,所以设置小一点
private int sizeLimit = 800 * 1024;
private final List<String> messages;
private int currIndex;
private int batchSize = 100;
public ListSplitter(List<String> messages, Integer batchSize) {
this.messages = messages;
this.batchSize = batchSize;
}
public ListSplitter(List<String> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
//每次从list中取一部分
@Override
public List<String> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
String message = messages.get(nextIndex);
//获取每条message的长度
int tmpSize = message.length();
if (tmpSize > sizeLimit) {
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
if (tmpSize + totalSize > sizeLimit || (nextIndex - currIndex) == batchSize ) {
break;
} else {
totalSize += tmpSize;
}
}
List<String> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Not allowed to remove");
}
}
13.百万画像群体爆款商品推送代码实现
(1)营销系统的定时推送任务
(2)推送系统消费分片推送任务消息,发送具体用户的推送消息到MQ
(3)推送系统消费具体用户的推送消息进行真正推送
(1)营销系统的定时推送任务
代码语言:javascript代码运行次数:0运行复制@Component
public class ScheduleSendMessageJobHandler {
...
//执行定时任务,筛选热门商品和用户发送给MQ
@XxlJob("hotGoodsPushHandler")
public void hotGoodsPushHandler() {
log.info("hotGoodsPushHandler 开始执行");
//获取热门商品和用户画像,业务先简化为一对一关系
List<HotGoodsCrontabDO> crontabDOs = hotGoodsCrontabDAO.queryHotGoodsCrontabByCrontabDate(new Date());
log.info("获取热门商品和用户画像数据, crontabDOs:{}", JsonUtil.object2Json(crontabDOs));
//找出每个热门商品对应画像所匹配的用户
for (HotGoodsCrontabDO crontabDO : crontabDOs) {
log.info("自动分片逻辑, 当前任务:crontabDO:{}", JsonUtil.object2Json(crontabDO));
if (StringUtils.isEmpty(crontabDO.getPortrayal())) {
continue;
}
//热门商品对应的画像实体
MembershipPointDTO membershipPointDTO = JsonUtil.json2Object(crontabDO.getPortrayal(), MembershipPointDTO.class);
if (Objects.isNull(membershipPointDTO)) {
continue;
}
//获取匹配画像的用户实体
MembershipFilterConditionDTO conditionDTO = buildCondition(membershipPointDTO);
PersonaFilterConditionDTO personaFilterConditionDTO = conditionConverter.convertFilterCondition(conditionDTO);
log.info("用户查询条件:{}", personaFilterConditionDTO);
//获取画像用户匹配的用户ID最大最小值
JsonResult<Long> accountMaxIdResult = personaApi.queryMaxIdByCondition(personaFilterConditionDTO);
log.info("获取最大ID,result:{}", JsonUtil.object2Json(accountMaxIdResult));
if (!accountMaxIdResult.getSuccess()) {
log.info("获取最大ID失败,condition:{}", JsonUtil.object2Json(personaFilterConditionDTO));
throw new BaseBizException(accountMaxIdResult.getErrorCode(), accountMaxIdResult.getErrorMessage());
}
JsonResult<Long> accountMinIdResult = personaApi.queryMinIdByCondition(personaFilterConditionDTO);
log.info("获取最小ID,result:{}", JsonUtil.object2Json(accountMinIdResult));
if (!accountMinIdResult.getSuccess()) {
log.info("获取最小ID失败,condition:{}", JsonUtil.object2Json(personaFilterConditionDTO));
throw new BaseBizException(accountMinIdResult.getErrorCode(), accountMinIdResult.getErrorMessage());
}
//需要执行推送的用户起始ID
//注意:这是一个预估值,因为最小ID到最大ID中间会有很多不符合条件的用户
//针对这些用户,需要在下一层的业务逻辑中,用选人条件过滤掉
Long minUserId = accountMinIdResult.getData();
Long maxUserId = accountMaxIdResult.getData();
//bucket就是一个用户分片,对应的是一个startUserId -> endUserId,用户ID范围
//可以根据一定的算法,把千万级用户推送任务分片,比如一个分片后的推送任务包含1000个用户/2000个用户
//userBuckets就有上万条key-value对,每个key-value对就是一个startUserId -> endUserId的推送任务分片
final int userBucketSize = 1000;
Map<Long, Long> userBuckets = new LinkedHashMap<>();
AtomicBoolean doSharding = new AtomicBoolean(true);
long startUserId = minUserId;
log.info("开始对任务人群进行分片,startId:{}",minUserId);
while (doSharding.get()) {
if ((maxUserId -minUserId) < userBucketSize) {
userBuckets.put(startUserId, maxUserId);
doShardingpareAndSet(true, false);
break;
}
userBuckets.put(startUserId, startUserId + userBucketSize);
startUserId += userBucketSize;
maxUserId -= userBucketSize;
}
//把可能成千上万的分片推送任务进行RocketMQ消息的batch合并,以batch模式的发送任务到MQ,减少跟RocketMQ网络通信的耗时
List<String> hotProductPushTasks = new ArrayList<>();
HotGoodsVO hotGoodsVO = buildHotGoodsVO(crontabDO);
PlatformHotProductUserBucketMessage bucketMessage = PlatformHotProductUserBucketMessage.builder()
.hotGoodsVO(JSON.toJSONString(hotGoodsVO))
.personaFilterConditionDTO(JSON.toJSONString(personaFilterConditionDTO))
.build();
for (Map.Entry<Long, Long> userBucket : userBuckets.entrySet()) {
bucketMessage.setEndUserId(userBucket.getValue());
bucketMessage.setStartUserId(userBucket.getKey());
String promotionPushTaskJSON = JsonUtil.object2Json(bucketMessage);
log.info("用户桶构建侧选人条件:{}",bucketMessage.getPersonaFilterConditionDTO());
hotProductPushTasks.add(promotionPushTaskJSON);
}
//MESSAGE_BATCH_SIZE指的是消息batch大小,RocketMQ的每个batch消息包含了100个推送任务
//这样可以将1万个推送任务消息合并为100个batch消息,进行100次网络通信发给RocketMQ,可以大幅度降低发送消息的耗时
ListSplitter splitter = new ListSplitter(hotProductPushTasks, MESSAGE_BATCH_SIZE);
while (splitter.hasNext()) {
List<String> sendBatch = splitter.next();
log.info("本次批次消息数量,{}",sendBatch.size());
sharedSendMsgThreadPool.execute(() -> {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_TOPIC, sendBatch, "平台热门商品定时任务用户桶消息");
});
}
}
}
}
(2)推送系统消费分片推送任务消息,发送具体用户的推送消息到MQ
监听MQ的如下Topic:
代码语言:javascript代码运行次数:0运行复制PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
@Bean("PlatFormHotProductUserBucketConsumer")
public DefaultMQPushConsumer PlatFormHotProductUserBucketConsumer(PlatFormHotProductUserBucketListener platFormHotProductUserBucketListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_TOPIC, "*");
consumer.registerMessageListener(platFormHotProductUserBucketListener);
consumer.start();
return consumer;
}
}
@Component
public class PlatFormHotProductUserBucketListener implements MessageListenerConcurrently {
@DubboReference(version = "1.0.0")
private PersonaApi personaApi;
@Autowired
private DefaultProducer producer;
//公用的消息推送线程池
@Autowired
@Qualifier("sharedSendMsgThreadPool")
private SafeThreadPool sharedSendMsgThreadPool;
//并发消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgList) {
String msg = new String(messageExt.getBody());
PlatformHotProductUserBucketMessage hotProductMessagePushTask = JSON.parseObject(msg , PlatformHotProductUserBucketMessage.class);
log.info("执行热门商品推送用户桶消息逻辑,消息内容:{}", hotProductMessagePushTask);
//1.获取本次热门商品推送任务分片中的数据
String hotGoodString = hotProductMessagePushTask.getHotGoodsVO();
String personaFilterCondition = hotProductMessagePushTask.getPersonaFilterConditionDTO();
HotGoodsVO hotGoodsVO = JSON.parseObject(hotGoodString, HotGoodsVO.class);
log.info("选人条件,内容:{}", personaFilterCondition);
if (Objects.isNull(personaFilterCondition) || Objects.isNull(hotGoodsVO)) {
continue;
}
PersonaFilterConditionDTO conditionDTO = JSON.parseObject(personaFilterCondition, PersonaFilterConditionDTO.class);
Long startUserId = hotProductMessagePushTask.getStartUserId();
Long endUserId = hotProductMessagePushTask.getEndUserId();
//分页查询条件
PersonaConditionWithIdRange page = PersonaConditionWithIdRange.builder()
.memberLevel(conditionDTO.getMemberLevel())
.memberPoint(conditionDTO.getMemberPoint())
.startId(startUserId)
.endId(endUserId)
.build();
//2.查询本次推送任务分片对应的用户群体
//注意:查询的时候,传入查询条件过滤掉不符合条件的用户id
JsonResult<List<Long>> queryAccountIdsResult = personaApi.getAccountIdsByIdRange(page);
List<Long> accountIds = queryAccountIdsResult.getData();
PlatformHotProductMessage hotMessage = PlatformHotProductMessage.builder()
.goodsName(hotGoodsVO.getGoodsName())
.goodsDesc(hotGoodsVO.getGoodsDesc())
.keyWords(hotGoodsVO.getKeyWords())
.build();
int handledBucketCount = 0;
List<String> messages = new ArrayList<>();
for (Long accountId : accountIds) {
handledBucketCount++;
hotMessage.setAccountId(accountId);
log.info("构造热门商品MQ消息, hotMessage: {}", hotMessage);
messages.add(JSON.toJSONString(hotMessage));
}
ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
while (splitter.hasNext()) {
List<String> sendBatch = splitter.next();
log.info("本次批次消息数量,{}", sendBatch.size());
sharedSendMsgThreadPool.execute(() -> {
producer.sendMessages(RocketMqConstant.PLATFORM_HOT_PRODUCT_SEND_TOPIC, sendBatch, "平台热门商品定时任务用户桶消息");
});
}
}
} catch (Exception e) {
log.error("consume error,热门商品通知消费失败", e);
//这边因为是推送任务,个别失败也可以直接丢弃
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
(3)推送系统消费具体用户的推送消息进行真正推送
监听MQ的如下Topic:
代码语言:javascript代码运行次数:0运行复制PLATFORM_HOT_PRODUCT_SEND_TOPIC
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
@Bean("platformHotProductSendTopicConsumer")
public DefaultMQPushConsumer platformHotProductConsumer(PlatFormHotProductListener platFormHotProductListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_HOT_PRODUCT_SEND_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(PLATFORM_HOT_PRODUCT_SEND_TOPIC, "*");
consumer.registerMessageListener(platFormHotProductListener);
consumer.start();
return consumer;
}
}
@Component
public class PlatFormHotProductListener implements MessageListenerConcurrently {
//并发消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgList) {
log.debug("执行平台发送通知消息逻辑,消息内容:{}", messageExt.getBody());
String msg = new String(messageExt.getBody());
HashMap hotProductMessage = JSON.parseObject(msg , HashMap.class);
//推送通知
informByPush(hotProductMessage);
}
} catch (Exception e) {
log.error("consume error,平台优惠券消费失败", e);
//本次消费失败,下次重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//第三方平台推送消息到app
private void informByPush(HashMap message){
String messageBody = "速戳!精致小物件,"
+ message.get("keywords")+"!"
+ message.get("goodsName")
+ message.get("goodsDesc");
log.info("消息推送中:消息内容:{}", messageBody);
}
}
14.生产环境百万级用户PUSH全链路压测
百万级用户Push生产环境的部署:营销系统部署了5台2核4G机器、推送系统部署了5台2核4G机器、会员系统1台、画像系统1台。
假设会员系统⼀共有150w的⽤户数据,现在开启⼀次全员推送通知的优惠活动,总计MQ的数据量⼤概就是150w。开启⼀个全员活动的时间,⼤概⽤时27分钟左右,即完成了全部消息的推送,整体效率还是算⽐较⾼的。所以如果是千万级别的推送,⼤概也就是需要27 * 5⼤概是3个⼩时左右,和我们的预期是⽐较相似的。
150万的用户推送任务,按1000用户进行分片,那么总共会有1500个分片任务,每个分片任务需处理1000条消息。这1500个人分片任务通过batch合并发送到MQ,也非常快,假设每100个分片合并成一个batch,才15个batch消息。5台推送机器,每台机器开启30个线程,那么总共有150个线程。假设一个线程完成一条消息的推送需要200ms,那么每个线程每秒能推送5条消息,5台机器每秒能推送750条消息。150万 / 750 = 2000s = 30分钟,这就是通过计算预估的结果,和实际的27分钟相差不大。
由于有些场景下,这种全员性质的消息推送,是不需要接收推送结果的。如果直接放弃推送结果的获取操作,效率还能稍微有所提升。
本文标签: RocketMQ实战10营销系统代码优化二
版权声明:本文标题:RocketMQ实战—10.营销系统代码优化二 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/jiaocheng/1747632406a2196140.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论