admin管理员组文章数量:1030003
RocketMQ实战—10.营销系统代码优化一
大纲
1.营销系统引入MQ实现异步化来进行性能优化
2.基于MQ释放优惠券提升系统扩展性
3.基于Redis实现重复促销活动去重
4.基于促销活动创建事件实现异步化
5.推送任务分片和分片消息batch合并发送实现
6.推送系统与用户群体查询逻辑解耦
7.查询用户数据以及批量发送推送消息
8.线程池封装以及推送系统多线程推送
9.推送系统的千万级消息多线程推送
10.千万级用户惰性发券代码实现
11.指定用户群体发券的代码实现
12.分片消息的batch合并算法重构实现
13.百万画像群体爆款商品推送代码实现
14.生产环境百万级用户PUSH全链路压测
接下来优化营销系统的四大促销场景的代码:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送
1.营销系统引入MQ实现异步化来进行性能优化
查询全量用户、创建大量消息、发送大量消息到MQ,这三个操作都可能非常耗时。所以这三个操作都不应该在创建营销活动时同步处理,而最好利用MQ在消费时异步化处理。
促销活动创建接口举例:
初版:写库(10毫秒) + 全量用户拉取(几分钟) + 大量消息创建和发送到MQ(几分钟),此时接口性能低下以及耗时。
优化:使用MQ实现异步化处理后,写库(10毫秒) + 发送一条消息给MQ(10毫秒)。
2.基于MQ释放优惠券提升系统扩展性
MQ主要有三大作用:削峰填谷、异步化提升性能、解耦提升扩展性。在初版主要用了MQ实现削峰填谷,解决面临的瞬时高并发写库或者调用接口的问题。而前面继续用MQ来剥离创建活动的接口时遇到的耗时问题,即通过实现异步化来提升性能。
下面介绍使用MQ提升系统扩展性:
例如创建订单时,订单里使用了一个优惠券。此时订单系统需要通知库存系统,对商品库存进行锁定。同时还需要通知营销系统,对该优惠券进行锁定,把其is_used字段进行设置。但后来用户发起取消订单操作,修改了订单状态,释放了库存和优惠券。库存恢复到原来的数量,优惠券也可以继续使用。那么在取消订单时,是否应该直接调用库存系统和营销系统的接口去释放库存和优惠券呢?
一般在取消订单时,会通过引入MQ,把一个订单取消事件消息OrderCanceledEvent发送到MQ。然后让库存系统、营销系统、积分系统等,关注这个订单取消事件消息,各自进行订单取消后的处理。以此实现订单系统对库存系统、营销系统、积分系统等系统的解耦,提升订单系统的扩展性。
3.基于Redis实现重复促销活动去重
(1)配置Redis
(2)创建促销活动时使用Redis进行去重
(1)配置Redis
代码语言:javascript代码运行次数:0运行复制@Data
@Configuration
@ConditionalOnClass(RedisConnectionFactory.class)
public class RedisConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.timeout}")
private int timeout;
@Bean
@ConditionalOnClass(RedisConnectionFactory.class)
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setDefaultSerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
@ConditionalOnClass(RedissonClient.class)
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + host + ":" + port)
.setPassword(password)
.setConnectionMinimumIdleSize(10)
.setConnectionPoolSize(100)
.setIdleConnectionTimeout(600000)
.setSubscriptionConnectionMinimumIdleSize(10)
.setSubscriptionConnectionPoolSize(100)
.setTimeout(timeout);
config.setCodec(new StringCodec());
config.setThreads(5);
config.setNettyThreads(5);
RedissonClient client = Redisson.create(config);
return client;
}
@Bean
@ConditionalOnClass(RedisConnectionFactory.class)
public RedisCache redisCache(RedisTemplate redisTemplate) {
return new RedisCache(redisTemplate);
}
}
public class RedisCache {
private RedisTemplate redisTemplate;
public RedisCache(RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
//缓存存储
public void set(String key, String value, int seconds) {
ValueOperations<String, String> vo = redisTemplate.opsForValue();
if (seconds > 0) {
vo.set(key, value, seconds, TimeUnit.SECONDS);
} else {
vo.set(key, value);
}
}
//缓存获取
public String get(String key) {
ValueOperations<String, String> vo = redisTemplate.opsForValue();
return vo.get(key);
}
//缓存手动失效
public boolean delete(String key) {
return redisTemplate.delete(key);
}
//判断hash key是否存在
public boolean hExists(String key) {
return hGetAll(key).isEmpty();
}
//获取hash变量中的键值对,对应redis hgetall 命令
public Map<String, String> hGetAll(String key) {
return redisTemplate.opsForHash().entries(key);
}
//以map集合的形式添加hash键值对
public void hPutAll(String key, Map<String, String> map) {
redisTemplate.opsForHash().putAll(key, map);
}
//执行lua脚本
public <T> T execute(RedisScript<T> script, List<String> keys, String... args) {
return (T) redisTemplate.execute(script, keys, args);
}
public RedisTemplate getRedisTemplate() {
return redisTemplate;
}
}
(2)创建促销活动时使用Redis进行去重
代码语言:javascript代码运行次数:0运行复制@Service
public class PromotionServiceImpl implements PromotionService {
//开启促销活动DAO
@Autowired
private SalesPromotionDAO salesPromotionDAO;
//redis缓存工具
@Resource
private RedisCache redisCache;
@Resource
private PromotionConverter promotionConverter;
//新增或修改一个促销活动
@Transactional(rollbackFor = Exception.class)
@Override
public SaveOrUpdatePromotionDTO saveOrUpdatePromotion(SaveOrUpdatePromotionRequest request) {
//判断是否活动是否重复
String result = redisCache.get(PROMOTION_CONCURRENCY_KEY +
request.getName() +
request.getCreateUser() +
request.getStartTime().getTime() +
request.getEndTime().getTime());
if (StringUtils.isNotBlank(result)) {
return null;
}
log.info("活动内容:{}", request);
//活动规则
String rule = JsonUtil.object2Json(request.getRule());
//构造促销活动实体
SalesPromotionDO salesPromotionDO = promotionConverter.convertPromotionDO(request);
salesPromotionDO.setRule(rule);
//促销活动落库
salesPromotionDAO.saveOrUpdatePromotion(salesPromotionDO);
//写Redis缓存用于下次创建去重
redisCache.set(PROMOTION_CONCURRENCY_KEY + request.getName() + request.getCreateUser() + request.getStartTime().getTime() + request.getEndTime().getTime(), UUID.randomUUID().toString(),30 * 60);
//为所有用户推送促销活动,发MQ
sendPlatformPromotionMessage(salesPromotionDO);
//构造响应数据
SaveOrUpdatePromotionDTO dto = new SaveOrUpdatePromotionDTO();
dto.setName(request.getName());
dto.setType(request.getType());
dto.setRule(rule);
dto.setCreateUser(request.getCreateUser());
dto.setSuccess(true);
return dto;
}
...
}
4.基于促销活动创建事件实现异步化
(1)创建促销活动时发布创建活动事件消息到MQ
(2)营销系统需要消费创建活动事件消息
(1)创建促销活动时发布创建活动事件消息到MQ
代码语言:javascript代码运行次数:0运行复制//促销活动创建事件
@Data
public class SalesPromotionCreatedEvent {
private SalesPromotionDO salesPromotion;
}
@Service
public class PromotionServiceImpl implements PromotionService {
...
//新增或修改一个运营活动
@Transactional(rollbackFor = Exception.class)
@Override
public SaveOrUpdatePromotionDTO saveOrUpdatePromotion(SaveOrUpdatePromotionRequest request) {
//判断是否活动是否重复
String result = redisCache.get(PROMOTION_CONCURRENCY_KEY +
request.getName() +
request.getCreateUser() +
request.getStartTime().getTime() +
request.getEndTime().getTime());
if (StringUtils.isNotBlank(result)) {
return null;
}
log.info("活动内容:{}", request);
//活动规则
String rule = JsonUtil.object2Json(request.getRule());
//构造促销活动实体
SalesPromotionDO salesPromotionDO = promotionConverter.convertPromotionDO(request);
salesPromotionDO.setRule(rule);
//促销活动落库
salesPromotionDAO.saveOrUpdatePromotion(salesPromotionDO);
redisCache.set(PROMOTION_CONCURRENCY_KEY +
request.getName() +
request.getCreateUser() +
request.getStartTime().getTime() +
request.getEndTime().getTime(), UUID.randomUUID().toString(),30 * 60);
//通过MQ为所有用户推送促销活动
//sendPlatformPromotionMessage(salesPromotionDO);
//发布促销活动创建事件到MQ
publishSalesPromotionCreatedEvent(salesPromotionDO);
//构造响应数据
SaveOrUpdatePromotionDTO dto = new SaveOrUpdatePromotionDTO();
dto.setName(request.getName());
dto.setType(request.getType());
dto.setRule(rule);
dto.setCreateUser(request.getCreateUser());
dto.setSuccess(true);
return dto;
}
//发布促销活动创建事件
private void publishSalesPromotionCreatedEvent(SalesPromotionDO salesPromotion) {
SalesPromotionCreatedEvent salesPromotionCreatedEvent = new SalesPromotionCreatedEvent();
salesPromotionCreatedEvent.setSalesPromotion(salesPromotion);
String salesPromotionCreatedEventJSON = JsonUtil.object2Json(salesPromotionCreatedEvent);
defaultProducer.sendMessage(RocketMqConstant.SALES_PROMOTION_CREATED_EVENT_TOPIC, salesPromotionCreatedEventJSON, "发布促销活动创建事件");
}
...
}
(2)营销系统需要消费创建活动事件消息
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
@Bean("salesPromotionCreatedEventListener")
public DefaultMQPushConsumer salesPromotionCreatedEventListener(SalesPromotionCreatedEventListener salesPromotionCreatedEventListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(SALES_PROMOTION_CREATED_EVENT_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(SALES_PROMOTION_CREATED_EVENT_TOPIC, "*");
consumer.registerMessageListener(salesPromotionCreatedEventListener);
consumer.start();
return consumer;
}
...
}
//促销活动创建事件监听器
@Component
public class SalesPromotionCreatedEventListener implements MessageListenerConcurrently {
...
}
5.推送任务分片和分片消息batch合并发送实现
营销系统在消费创建活动事件消息时,会进行千万级用户的推送任务分片和分片消息batch合并发送到MQ。
代码语言:javascript代码运行次数:0运行复制//发送到MQ的Topic是"PLATFORM_PROMOTION_SEND_USER_BUCKET_TOPIC"。
//促销活动创建事件监听器
@Component
public class SalesPromotionCreatedEventListener implements MessageListenerConcurrently {
@DubboReference(version = "1.0.0")
private AccountApi accountApi;
@Resource
private DefaultProducer defaultProducer;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
for (MessageExt messageExt : list) {
//下面三行代码可以获取到一个刚刚创建成功的促销活动
String message = new String(messageExt.getBody());
SalesPromotionCreatedEvent salesPromotionCreatedEvent = JSON.parseObject(message, SalesPromotionCreatedEvent.class);
SalesPromotionDO salesPromotion = salesPromotionCreatedEvent.getSalesPromotion();
//这个促销活动会针对全体用户发起Push
//userBucketSize就是一个用户分片的大小,与一个startUserId ~ endUserId用户ID范围相对应
final int userBucketSize = 1000;
//messageBatchSize就是合并多个任务消息成一个batch消息的大小,RocketMQ的每个batch消息包含了100个推送任务消息
//所以1w个推送任务消息会合并为100个batch消息
//发送1万个推送任务消息到MQ,只需要进行100次网络通信给RocketMQ即可,这样可以大幅降低发送消息的耗时
final int messageBatchSize = 100;
//1.获取全体用户数量有两种做法:
//第一种是进行count(效率不高),第二种是获取max(userId),通常会使用第二种做法
//select * from account order by id desc limit 1,类似于这样的sql语句去获取用户表里主键值最大的一个
JsonResult<Long> queryMaxUserIdResult = accountApi.queryMaxUserId();
if (!queryMaxUserIdResult.getSuccess()) {
throw new BaseBizException(queryMaxUserIdResult.getErrorCode(), queryMaxUserIdResult.getErrorMessage());
}
Long maxUserId = queryMaxUserIdResult.getData();
//2.获取到全体用户数量后,就可以根据一定算法结合自增ID,对千万级用户的推送任务进行分片,比如一个推送任务包含1000个用户或2000个用户
//userBuckets就是用户分片集合,其中有上万条key-value对,每个key-value对就是一个startUserId -> endUserId,代表一个推送任务分片
Map<Long, Long> userBuckets = new LinkedHashMap<>();
AtomicBoolean doSharding = new AtomicBoolean(true);// 当前是否需要执行分片
long startUserId = 1L;//起始用户ID,数据库自增主键是从1开始的
while (doSharding.get()) {
if (startUserId > maxUserId) {
doShardingpareAndSet(true, false);
break;
}
userBuckets.put(startUserId, startUserId + userBucketSize);
startUserId += userBucketSize;
}
//3.完成分片后,就把可能成千上万的推送任务进行RocketMQ消息的batch合并
//通过batch模式一批一批的发送任务到MQ里去,从而减少和RocketMQ网络通信的耗时
int handledBucketCount = 0;
List<String> promotionPushTaskBatch = new ArrayList<>(messageBatchSize);
for (Map.Entry<Long, Long> userBucket : userBuckets.entrySet()) {
handledBucketCount++;
PlatformPromotionUserBucketMessage promotionPushTask = PlatformPromotionUserBucketMessage.builder()
.startUserId(userBucket.getKey())
.endUserId(userBucket.getValue())
.promotionId(salesPromotion.getId())
.promotionType(salesPromotion.getType())
.mainMessage(salesPromotion.getName())
.message("您已获得活动资格,可以打开APP进入活动页面")
.informType(salesPromotion.getInformType())
.build();
String promotionPushTaskJSON = JsonUtil.object2Json(promotionPushTask);
promotionPushTaskBatch.add(promotionPushTaskJSON);
//batch合并发送后,对promotionPushTaskBatch进行清空
if (promotionPushTaskBatch.size() == messageBatchSize || handledBucketCount == userBuckets.size()) {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_PROMOTION_SEND_USER_BUCKET_TOPIC, promotionPushTaskBatch, "平台发放促销活动用户桶消息");
promotionPushTaskBatch.clear();
}
}
}
} catch(Exception e) {
log.error("consume error, 促销活动创建事件处理异常", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
6.推送系统与用户群体查询逻辑解耦
营销系统将推送分片消息batch合并发送到MQ后,这些消息会由营销系统进行消费,而不是由推送系统进行消费。如果推送系统直接消费这些分片任务消息,那么推送系统需要自己去根据用户ID范围去查询用户群体信息。这样推送系统就会耦合会员系统(根据用户ID查)或大数据系统(根据用户画像查),耦合具体的用户群体的查询逻辑。
所以,还是由营销系统来决定具体的用户群体查询逻辑,从而实现推送系统与会员系统或大数据系统的解耦。此时,营销系统会封装好每个用户的具体推送消息,再通过合并batch消息发送到MQ中,最后由推送系统消费。
因此,营销系统负责消费推送任务分片的消息。
7.查询用户数据以及批量发送推送消息
营销系统获取一个推送任务分片后,自己决定如何查询用户群体。营销系统会为查出来的每个用户进行推送消息封装,然后再将这些推送消息以batch模式发送到MQ由推送系统消费处理。
步骤1:获取到一个推送任务分片
步骤2:查询本次推送任务分片对应的用户群体
步骤3:为每个用户创建一条符合推送系统规定格式的用户推送消息,然后把每个用户的推送消息发送到MQ里
步骤3的第一种实现(不推荐):
用线程池并发地把一个任务分片里的1000条消息并发发送到MQ里,这种实现的问题是如果一个分片任务有1000个用户,那么此时虽然是多线程并发,但还是要发送1000次请求到MQ。
步骤3的第二种实现(推荐):
用线程池并发地批量发送消息到MQ里。RocketMQ官网建议批量发送消息的一个batch不能超过1MB,在RocketMQ源码中实际上批量消息不能超过4MB。所以批量发送时,需要考虑发送消息的大小,然后根据网络压力和IO压力选择每批次发送多少条消息。
此处按照100条一批发送,1000条用户推送消息,会合并为10个batch进行发送。因此只要发起10次网络请求即可,每个推送任务分片的处理到写MQ的整个过程,速度是非常快的。一台营销系统单线程处理1万个推送分片任务,每个任务要写10次MQ,每次10ms,总共需要1000s=20分钟。多台营销系统,对每个分片任务的10个batch发送都可以看成是线程池并发处理的。假设2台机器,每台机器开50个线程,那么总共需要1000s / (2*50) = 10s,就可以把1万个分片任务处理完毕。
代码语言:javascript代码运行次数:0运行复制//下面是营销系统监听"PLATFORM_PROMOTION_SEND_USER_BUCKET_TOPIC"消费推送任务分片消息的实现。
//消费完成后会将消息发送到MQ的"PLATFORM_PROMOTION_SEND_TOPIC":
@Configuration
public class ConsumerBeanConfig {
...
//平台发放促销活动用户桶消费者
@Bean("platformPromotionUserBucketReceiveTopicConsumer")
public DefaultMQPushConsumer receiveCouponUserBucketConsumer(PlatFormPromotionUserBucketListener platFormPromotionUserBucketListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_PROMOTION_SEND_USER_BUCKET_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(PLATFORM_PROMOTION_SEND_USER_BUCKET_TOPIC, "*");
consumer.registerMessageListener(platFormPromotionUserBucketListener);
consumer.start();
return consumer;
}
}
@Component
public class PlatFormPromotionUserBucketListener implements MessageListenerConcurrently {
//会员服务
@DubboReference(version = "1.0.0")
private AccountApi accountApi;
//发送消息共用的线程池
@Autowired
@Qualifier("sharedSendMsgThreadPool")
private SafeThreadPool sharedSendMsgThreadPool;
//RocketMQ生产者
@Autowired
private DefaultProducer defaultProducer;
//并发消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgList) {
//1.获取到一个推送任务分片
String message = new String(messageExt.getBody());
log.debug("执行平台发送促销活动用户桶消息逻辑,消息内容:{}", message);
PlatformPromotionUserBucketMessage promotionPushTask = JSON.parseObject(message, PlatformPromotionUserBucketMessage.class);
//2.查询本次推送任务分片对应的用户群体
Long startUserId = promotionPushTask.getStartUserId();
Long endUserId = promotionPushTask.getEndUserId();
JsonResult<List<MembershipAccountDTO>> queryResult = accountApi.queryAccountByIdRange(startUserId, endUserId);
if (!queryResult.getSuccess()) {
throw new BaseBizException(queryResult.getErrorCode(), queryResult.getErrorMessage());
}
List<MembershipAccountDTO> membershipAccounts = queryResult.getData();
if (CollectionUtils.isEmpty(membershipAccounts)) {
log.info("根据用户桶内的id范围没有查询到用户, startUserId={}, endUserId{}", startUserId, endUserId);
continue;
}
//3.为每个用户创建一条符合推送系统规定格式的用户推送消息,然后把每个用户的推送消息发送到MQ里;
//第一种实现(不推荐):
//用线程池并发地把一个任务分片里的1000条消息并发发送到MQ里去;
//这种实现的问题是如果一个分片任务有1000个用户,那么此时虽然是多线程并发,但还是要发送请求1000次到MQ;
//第二种实现(推荐):
//用线程池并发地批量发送消息到MQ里;
//RocketMQ官网对批量发送消息的说明是,一个batch不能超过1MB,在RocketMQ源码中实际上批量消息不能超过4MB;
//所以批量发送的时候,需要综合考虑发送消息的大小,然后根据网络压力和IO压力综合对比评估后选择每批次发送多少条;
//此处按照100条一批发送,1000条用户推送消息,会合并为10个batch进行发送;
//因此只要发起10次网络请求即可,每个任务分片的处理到写MQ的整个过程,速度是非常快的;
//一台营销系统单线程不停处理1万个分片任务,每个任务要写10次MQ,10万次,每次10ms,总共需要1000000ms=1000s=20分钟左右
//多台营销系统,对每个分片任务的10个batch都是线程池并发写的
//假设2台机器,每台机器开50个线程,那么总共需要:1000s / 50 = 20s,就可以把1万个分片任务在这里处理完毕;
PlatformPromotionMessage promotionMessage = PlatformPromotionMessage.builder()
.promotionId(promotionPushTask.getPromotionId())
.promotionType(promotionPushTask.getPromotionType())
.mainMessage(promotionPushTask.getMainMessage())
.message("您已获得活动资格,打开APP进入活动页面")
.informType(promotionPushTask.getInformType())
.build();
List<String> batch = new ArrayList<>(100);
for (MembershipAccountDTO account : membershipAccounts) {
promotionMessage.setUserAccountId(account.getId());
batch.add(JSON.toJSONString(promotionMessage));
if (batch.size() == 100) {
sharedSendMsgThreadPool.execute(() -> {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, batch, "平台发送促销活动消息");
});
batch.clear();
}
}
//最后剩下的也批量发出
if (!CollectionUtils.isEmpty(batch)) {
sharedSendMsgThreadPool.execute(() -> {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, batch, "平台发送促销活动消息");
});
batch.clear();
}
}
} catch (Exception e) {
log.error("consume error,促销活动消息消费失败", e);
//本次消费失败,下次重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
8.线程池封装以及推送系统多线程推送
(1)通过@Configuration实例化一个线程池
(2)具体的线程池实例化
(1)通过@Configuration实例化一个线程池
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ThreadPoolConfig {
//发送消息共用的线程池
//线程池名字、线程名字:sharedThreadPool
//最多允许多少线程同时执行任务:100
@Bean("sharedSendMsgThreadPool")
public SafeThreadPool sharedSendMsgThreadPool() {
return new SafeThreadPool("sharedSendMsgThreadPool", 100);
}
}
(2)具体的线程池实例化
发送消息的线程池的corePoolSize设置为0,可以在空闲时把线程都回收掉。
代码语言:javascript代码运行次数:0运行复制public class SafeThreadPool {
private final Semaphore semaphore;
private final ThreadPoolExecutor threadPoolExecutor;
public SafeThreadPool(String name, int permits) {
//如果超过了100个任务同时要运行,会通过semaphore信号量进行阻塞
semaphore = new Semaphore(permits);
//为什么要设置corePoolSize是0?
//因为消息推送并不是一直要推送的,只有促销活动比如发优惠券时才需要进行消息推送,正常情况下是不会进行消息推送的
//所以发送消息的线程池的corePoolSize设置为0,可以在空闲时把线程都回收掉
threadPoolExecutor = new ThreadPoolExecutor(
0,
permits * 2,
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
NamedDaemonThreadFactory.getInstance(name)
);
}
public void execute(Runnable task) {
//超过了100个batch要并发推送,就会在这里阻塞住
//比如100个线程都在繁忙时,就不可能有超过100个batch要同时提交过来
//极端情况下,最多也就是100个batch可以拿到信号量,100 * 2的max容量
semaphore.acquireUninterruptibly();
threadPoolExecutor.submit(() -> {
try {
task.run();
} finally {
semaphore.release();
}
});
}
}
9.推送系统的千万级消息多线程推送
根据前面可知:对一个千万级消息的推送任务,营销系统首先会对这个千万级消息的推送任务进行分片,分片成1万个推送任务然后batch发送到MQ。
营销系统会获取这个千万级消息的推送任务的推送任务分片,然后自己决定如何查询用户群体。
营销系统会为查出来的每个用户进行推送消息封装,然后再将这些推送消息以batch模式发送到MQ由推送系统消费处理。
所以营销系统会封装好千万级的推送消息,然后合并成10万个batch推送消息去发送到MQ。如果每个batch推送消息发送到MQ需要50ms,那么总共需要500万ms,即5000s。使用2台营销系统共200个线程并发去将这10万个batch推送消息发送到MQ,那么总共需要5000s / 200 = 25s,就可以把千万级推送消息发到MQ。
假设有5台4核8G的机器部署了推送系统,那么每个推送系统便会消费到200万条推送消息,接着使用多线程并发推送。由于部署5台机器,每台机器会拿到200w条消息,消费时会一批一批拿,放入msgList。如果每条消息调用第三方平台SDK发起推送耗时100ms~200ms,那么总共需要200w*200ms=40万s=几十个小时。这时必须用线程池采用多线程的方式并发去推送。
每台机器最多开启60个线程,那么5台机器总共300个线程。由于一次推送200ms,每个线程每秒钟可以推成5次,300个线程每秒1500次。那么经过6000s,5台机器300个线程就可以推1000万次,6000s / 60 = 100分钟,1个多小时。所以千万级用户全量推送,快则几十分钟,慢则两三个小时。
监听PLATFORM_PROMOTION_SEND_TOPIC的推送系统,对这200万条推送消息进行消费然后发起推送的代码如下:
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
//平台活动推送消息消费者 completableFuture逻辑
@Bean("platformPromotionSendTopicConsumer")
public DefaultMQPushConsumer platformPromotionSendConsumer(PlatFormPromotionListener platFormPromotionListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_PROMOTION_SEND_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(PLATFORM_PROMOTION_SEND_TOPIC, "*");
consumer.registerMessageListener(platFormPromotionListener);
consumer.start();
return consumer;
}
}
@Component
public class PlatFormPromotionListener implements MessageListenerConcurrently {
//消息推送工厂提供者
@Autowired
private FactoryProducer factoryProducer;
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final int PERMITS = 30;
private static final AtomicBoolean initializedRef = new AtomicBoolean(false);
private static ThreadPoolExecutor THREAD_POOL_EXECUTOR = null;
private static final Supplier<ThreadPoolExecutor> THREAD_POOL_EXECUTOR_SUPPLIER = () -> {
if (initializedRefpareAndSet(false, true)) {
//corePoolSize是30个,maxPoolSize是60个
THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(PERMITS, PERMITS * 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), NamedDaemonThreadFactory.getInstance("consumePromotionMsg"), new ThreadPoolExecutor.CallerRunsPolicy());
}
return THREAD_POOL_EXECUTOR;
};
//并发消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
try {
//使用自定义的业务线程池
List<CompletableFuture<AltResult>> futureList = msgList.stream()
.map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e), THREAD_POOL_EXECUTOR_SUPPLIER.get()))
.collect(Collectors.toList());
List<Throwable> resultList = futureList.stream()
.map(CompletableFuture::join)
.filter(e -> e.ex != null)
.map(e -> e.ex).collect(Collectors.toList());
if (!resultList.isEmpty()) {
throw resultList.get(0);
}
} catch (Throwable e) {
log.error("consume error,平台优惠券消费失败", e);
//本次消费失败,下次重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private AltResult handleMessageExt(MessageExt messageExt) {
try {
log.debug("执行平台发送通知消息逻辑,消息内容:{}", messageExt.getBody());
String msg = new String(messageExt.getBody());
PlatformPromotionMessage message = JSON.parseObject(msg , PlatformPromotionMessage.class);
//幂等控制
if (StringUtils.isNotBlank(redisTemplate.opsForValue().get(message.cacheKey()))) {
return new AltResult(null);
}
//获取消息服务工厂
MessageSendServiceFactory messageSendServiceFactory = factoryProducer.getMessageSendServiceFactory(message.getInformType());
//消息发送服务组件
MessageSendService messageSendService = messageSendServiceFactory.createMessageSendService();
//构造消息
PlatformMessagePushMessage messagePushMessage = PlatformMessagePushMessage.builder()
.informType(message.getInformType())
.mainMessage(message.getMainMessage())
.userAccountId(message.getUserAccountId())
.message(message.getMessage())
.build();
MessageSendDTO messageSendDTO = messageSendServiceFactory.createMessageSendDTO(messagePushMessage);
messageSendService.send(messageSendDTO);
//发送成功之后把已经发送成功记录到redis
redisTemplate.opsForValue().set(message.cacheKey(), UUID.randomUUID().toString());
log.info("消息推送完成,messageSendDTO:{}", messageSendDTO);
Thread.sleep(20);
return new AltResult(null);
} catch (Exception e) {
return new AltResult(e);
}
}
//completableFuture的返回结果,适用于无返回值的情况
//ex字段为null表示任务执行成功
//ex字段不为null表示任务执行失败,并把异常设置为ex字段
private static class AltResult {
final Throwable ex;
public AltResult(Throwable ex) {
this.ex = ex;
}
}
}
RocketMQ实战—10.营销系统代码优化一
大纲
1.营销系统引入MQ实现异步化来进行性能优化
2.基于MQ释放优惠券提升系统扩展性
3.基于Redis实现重复促销活动去重
4.基于促销活动创建事件实现异步化
5.推送任务分片和分片消息batch合并发送实现
6.推送系统与用户群体查询逻辑解耦
7.查询用户数据以及批量发送推送消息
8.线程池封装以及推送系统多线程推送
9.推送系统的千万级消息多线程推送
10.千万级用户惰性发券代码实现
11.指定用户群体发券的代码实现
12.分片消息的batch合并算法重构实现
13.百万画像群体爆款商品推送代码实现
14.生产环境百万级用户PUSH全链路压测
接下来优化营销系统的四大促销场景的代码:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送
1.营销系统引入MQ实现异步化来进行性能优化
查询全量用户、创建大量消息、发送大量消息到MQ,这三个操作都可能非常耗时。所以这三个操作都不应该在创建营销活动时同步处理,而最好利用MQ在消费时异步化处理。
促销活动创建接口举例:
初版:写库(10毫秒) + 全量用户拉取(几分钟) + 大量消息创建和发送到MQ(几分钟),此时接口性能低下以及耗时。
优化:使用MQ实现异步化处理后,写库(10毫秒) + 发送一条消息给MQ(10毫秒)。
2.基于MQ释放优惠券提升系统扩展性
MQ主要有三大作用:削峰填谷、异步化提升性能、解耦提升扩展性。在初版主要用了MQ实现削峰填谷,解决面临的瞬时高并发写库或者调用接口的问题。而前面继续用MQ来剥离创建活动的接口时遇到的耗时问题,即通过实现异步化来提升性能。
下面介绍使用MQ提升系统扩展性:
例如创建订单时,订单里使用了一个优惠券。此时订单系统需要通知库存系统,对商品库存进行锁定。同时还需要通知营销系统,对该优惠券进行锁定,把其is_used字段进行设置。但后来用户发起取消订单操作,修改了订单状态,释放了库存和优惠券。库存恢复到原来的数量,优惠券也可以继续使用。那么在取消订单时,是否应该直接调用库存系统和营销系统的接口去释放库存和优惠券呢?
一般在取消订单时,会通过引入MQ,把一个订单取消事件消息OrderCanceledEvent发送到MQ。然后让库存系统、营销系统、积分系统等,关注这个订单取消事件消息,各自进行订单取消后的处理。以此实现订单系统对库存系统、营销系统、积分系统等系统的解耦,提升订单系统的扩展性。
3.基于Redis实现重复促销活动去重
(1)配置Redis
(2)创建促销活动时使用Redis进行去重
(1)配置Redis
代码语言:javascript代码运行次数:0运行复制@Data
@Configuration
@ConditionalOnClass(RedisConnectionFactory.class)
public class RedisConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.timeout}")
private int timeout;
@Bean
@ConditionalOnClass(RedisConnectionFactory.class)
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setDefaultSerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
@ConditionalOnClass(RedissonClient.class)
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + host + ":" + port)
.setPassword(password)
.setConnectionMinimumIdleSize(10)
.setConnectionPoolSize(100)
.setIdleConnectionTimeout(600000)
.setSubscriptionConnectionMinimumIdleSize(10)
.setSubscriptionConnectionPoolSize(100)
.setTimeout(timeout);
config.setCodec(new StringCodec());
config.setThreads(5);
config.setNettyThreads(5);
RedissonClient client = Redisson.create(config);
return client;
}
@Bean
@ConditionalOnClass(RedisConnectionFactory.class)
public RedisCache redisCache(RedisTemplate redisTemplate) {
return new RedisCache(redisTemplate);
}
}
public class RedisCache {
private RedisTemplate redisTemplate;
public RedisCache(RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
//缓存存储
public void set(String key, String value, int seconds) {
ValueOperations<String, String> vo = redisTemplate.opsForValue();
if (seconds > 0) {
vo.set(key, value, seconds, TimeUnit.SECONDS);
} else {
vo.set(key, value);
}
}
//缓存获取
public String get(String key) {
ValueOperations<String, String> vo = redisTemplate.opsForValue();
return vo.get(key);
}
//缓存手动失效
public boolean delete(String key) {
return redisTemplate.delete(key);
}
//判断hash key是否存在
public boolean hExists(String key) {
return hGetAll(key).isEmpty();
}
//获取hash变量中的键值对,对应redis hgetall 命令
public Map<String, String> hGetAll(String key) {
return redisTemplate.opsForHash().entries(key);
}
//以map集合的形式添加hash键值对
public void hPutAll(String key, Map<String, String> map) {
redisTemplate.opsForHash().putAll(key, map);
}
//执行lua脚本
public <T> T execute(RedisScript<T> script, List<String> keys, String... args) {
return (T) redisTemplate.execute(script, keys, args);
}
public RedisTemplate getRedisTemplate() {
return redisTemplate;
}
}
(2)创建促销活动时使用Redis进行去重
代码语言:javascript代码运行次数:0运行复制@Service
public class PromotionServiceImpl implements PromotionService {
//开启促销活动DAO
@Autowired
private SalesPromotionDAO salesPromotionDAO;
//redis缓存工具
@Resource
private RedisCache redisCache;
@Resource
private PromotionConverter promotionConverter;
//新增或修改一个促销活动
@Transactional(rollbackFor = Exception.class)
@Override
public SaveOrUpdatePromotionDTO saveOrUpdatePromotion(SaveOrUpdatePromotionRequest request) {
//判断是否活动是否重复
String result = redisCache.get(PROMOTION_CONCURRENCY_KEY +
request.getName() +
request.getCreateUser() +
request.getStartTime().getTime() +
request.getEndTime().getTime());
if (StringUtils.isNotBlank(result)) {
return null;
}
log.info("活动内容:{}", request);
//活动规则
String rule = JsonUtil.object2Json(request.getRule());
//构造促销活动实体
SalesPromotionDO salesPromotionDO = promotionConverter.convertPromotionDO(request);
salesPromotionDO.setRule(rule);
//促销活动落库
salesPromotionDAO.saveOrUpdatePromotion(salesPromotionDO);
//写Redis缓存用于下次创建去重
redisCache.set(PROMOTION_CONCURRENCY_KEY + request.getName() + request.getCreateUser() + request.getStartTime().getTime() + request.getEndTime().getTime(), UUID.randomUUID().toString(),30 * 60);
//为所有用户推送促销活动,发MQ
sendPlatformPromotionMessage(salesPromotionDO);
//构造响应数据
SaveOrUpdatePromotionDTO dto = new SaveOrUpdatePromotionDTO();
dto.setName(request.getName());
dto.setType(request.getType());
dto.setRule(rule);
dto.setCreateUser(request.getCreateUser());
dto.setSuccess(true);
return dto;
}
...
}
4.基于促销活动创建事件实现异步化
(1)创建促销活动时发布创建活动事件消息到MQ
(2)营销系统需要消费创建活动事件消息
(1)创建促销活动时发布创建活动事件消息到MQ
代码语言:javascript代码运行次数:0运行复制//促销活动创建事件
@Data
public class SalesPromotionCreatedEvent {
private SalesPromotionDO salesPromotion;
}
@Service
public class PromotionServiceImpl implements PromotionService {
...
//新增或修改一个运营活动
@Transactional(rollbackFor = Exception.class)
@Override
public SaveOrUpdatePromotionDTO saveOrUpdatePromotion(SaveOrUpdatePromotionRequest request) {
//判断是否活动是否重复
String result = redisCache.get(PROMOTION_CONCURRENCY_KEY +
request.getName() +
request.getCreateUser() +
request.getStartTime().getTime() +
request.getEndTime().getTime());
if (StringUtils.isNotBlank(result)) {
return null;
}
log.info("活动内容:{}", request);
//活动规则
String rule = JsonUtil.object2Json(request.getRule());
//构造促销活动实体
SalesPromotionDO salesPromotionDO = promotionConverter.convertPromotionDO(request);
salesPromotionDO.setRule(rule);
//促销活动落库
salesPromotionDAO.saveOrUpdatePromotion(salesPromotionDO);
redisCache.set(PROMOTION_CONCURRENCY_KEY +
request.getName() +
request.getCreateUser() +
request.getStartTime().getTime() +
request.getEndTime().getTime(), UUID.randomUUID().toString(),30 * 60);
//通过MQ为所有用户推送促销活动
//sendPlatformPromotionMessage(salesPromotionDO);
//发布促销活动创建事件到MQ
publishSalesPromotionCreatedEvent(salesPromotionDO);
//构造响应数据
SaveOrUpdatePromotionDTO dto = new SaveOrUpdatePromotionDTO();
dto.setName(request.getName());
dto.setType(request.getType());
dto.setRule(rule);
dto.setCreateUser(request.getCreateUser());
dto.setSuccess(true);
return dto;
}
//发布促销活动创建事件
private void publishSalesPromotionCreatedEvent(SalesPromotionDO salesPromotion) {
SalesPromotionCreatedEvent salesPromotionCreatedEvent = new SalesPromotionCreatedEvent();
salesPromotionCreatedEvent.setSalesPromotion(salesPromotion);
String salesPromotionCreatedEventJSON = JsonUtil.object2Json(salesPromotionCreatedEvent);
defaultProducer.sendMessage(RocketMqConstant.SALES_PROMOTION_CREATED_EVENT_TOPIC, salesPromotionCreatedEventJSON, "发布促销活动创建事件");
}
...
}
(2)营销系统需要消费创建活动事件消息
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
@Bean("salesPromotionCreatedEventListener")
public DefaultMQPushConsumer salesPromotionCreatedEventListener(SalesPromotionCreatedEventListener salesPromotionCreatedEventListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(SALES_PROMOTION_CREATED_EVENT_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(SALES_PROMOTION_CREATED_EVENT_TOPIC, "*");
consumer.registerMessageListener(salesPromotionCreatedEventListener);
consumer.start();
return consumer;
}
...
}
//促销活动创建事件监听器
@Component
public class SalesPromotionCreatedEventListener implements MessageListenerConcurrently {
...
}
5.推送任务分片和分片消息batch合并发送实现
营销系统在消费创建活动事件消息时,会进行千万级用户的推送任务分片和分片消息batch合并发送到MQ。
代码语言:javascript代码运行次数:0运行复制//发送到MQ的Topic是"PLATFORM_PROMOTION_SEND_USER_BUCKET_TOPIC"。
//促销活动创建事件监听器
@Component
public class SalesPromotionCreatedEventListener implements MessageListenerConcurrently {
@DubboReference(version = "1.0.0")
private AccountApi accountApi;
@Resource
private DefaultProducer defaultProducer;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
for (MessageExt messageExt : list) {
//下面三行代码可以获取到一个刚刚创建成功的促销活动
String message = new String(messageExt.getBody());
SalesPromotionCreatedEvent salesPromotionCreatedEvent = JSON.parseObject(message, SalesPromotionCreatedEvent.class);
SalesPromotionDO salesPromotion = salesPromotionCreatedEvent.getSalesPromotion();
//这个促销活动会针对全体用户发起Push
//userBucketSize就是一个用户分片的大小,与一个startUserId ~ endUserId用户ID范围相对应
final int userBucketSize = 1000;
//messageBatchSize就是合并多个任务消息成一个batch消息的大小,RocketMQ的每个batch消息包含了100个推送任务消息
//所以1w个推送任务消息会合并为100个batch消息
//发送1万个推送任务消息到MQ,只需要进行100次网络通信给RocketMQ即可,这样可以大幅降低发送消息的耗时
final int messageBatchSize = 100;
//1.获取全体用户数量有两种做法:
//第一种是进行count(效率不高),第二种是获取max(userId),通常会使用第二种做法
//select * from account order by id desc limit 1,类似于这样的sql语句去获取用户表里主键值最大的一个
JsonResult<Long> queryMaxUserIdResult = accountApi.queryMaxUserId();
if (!queryMaxUserIdResult.getSuccess()) {
throw new BaseBizException(queryMaxUserIdResult.getErrorCode(), queryMaxUserIdResult.getErrorMessage());
}
Long maxUserId = queryMaxUserIdResult.getData();
//2.获取到全体用户数量后,就可以根据一定算法结合自增ID,对千万级用户的推送任务进行分片,比如一个推送任务包含1000个用户或2000个用户
//userBuckets就是用户分片集合,其中有上万条key-value对,每个key-value对就是一个startUserId -> endUserId,代表一个推送任务分片
Map<Long, Long> userBuckets = new LinkedHashMap<>();
AtomicBoolean doSharding = new AtomicBoolean(true);// 当前是否需要执行分片
long startUserId = 1L;//起始用户ID,数据库自增主键是从1开始的
while (doSharding.get()) {
if (startUserId > maxUserId) {
doShardingpareAndSet(true, false);
break;
}
userBuckets.put(startUserId, startUserId + userBucketSize);
startUserId += userBucketSize;
}
//3.完成分片后,就把可能成千上万的推送任务进行RocketMQ消息的batch合并
//通过batch模式一批一批的发送任务到MQ里去,从而减少和RocketMQ网络通信的耗时
int handledBucketCount = 0;
List<String> promotionPushTaskBatch = new ArrayList<>(messageBatchSize);
for (Map.Entry<Long, Long> userBucket : userBuckets.entrySet()) {
handledBucketCount++;
PlatformPromotionUserBucketMessage promotionPushTask = PlatformPromotionUserBucketMessage.builder()
.startUserId(userBucket.getKey())
.endUserId(userBucket.getValue())
.promotionId(salesPromotion.getId())
.promotionType(salesPromotion.getType())
.mainMessage(salesPromotion.getName())
.message("您已获得活动资格,可以打开APP进入活动页面")
.informType(salesPromotion.getInformType())
.build();
String promotionPushTaskJSON = JsonUtil.object2Json(promotionPushTask);
promotionPushTaskBatch.add(promotionPushTaskJSON);
//batch合并发送后,对promotionPushTaskBatch进行清空
if (promotionPushTaskBatch.size() == messageBatchSize || handledBucketCount == userBuckets.size()) {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_PROMOTION_SEND_USER_BUCKET_TOPIC, promotionPushTaskBatch, "平台发放促销活动用户桶消息");
promotionPushTaskBatch.clear();
}
}
}
} catch(Exception e) {
log.error("consume error, 促销活动创建事件处理异常", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
6.推送系统与用户群体查询逻辑解耦
营销系统将推送分片消息batch合并发送到MQ后,这些消息会由营销系统进行消费,而不是由推送系统进行消费。如果推送系统直接消费这些分片任务消息,那么推送系统需要自己去根据用户ID范围去查询用户群体信息。这样推送系统就会耦合会员系统(根据用户ID查)或大数据系统(根据用户画像查),耦合具体的用户群体的查询逻辑。
所以,还是由营销系统来决定具体的用户群体查询逻辑,从而实现推送系统与会员系统或大数据系统的解耦。此时,营销系统会封装好每个用户的具体推送消息,再通过合并batch消息发送到MQ中,最后由推送系统消费。
因此,营销系统负责消费推送任务分片的消息。
7.查询用户数据以及批量发送推送消息
营销系统获取一个推送任务分片后,自己决定如何查询用户群体。营销系统会为查出来的每个用户进行推送消息封装,然后再将这些推送消息以batch模式发送到MQ由推送系统消费处理。
步骤1:获取到一个推送任务分片
步骤2:查询本次推送任务分片对应的用户群体
步骤3:为每个用户创建一条符合推送系统规定格式的用户推送消息,然后把每个用户的推送消息发送到MQ里
步骤3的第一种实现(不推荐):
用线程池并发地把一个任务分片里的1000条消息并发发送到MQ里,这种实现的问题是如果一个分片任务有1000个用户,那么此时虽然是多线程并发,但还是要发送1000次请求到MQ。
步骤3的第二种实现(推荐):
用线程池并发地批量发送消息到MQ里。RocketMQ官网建议批量发送消息的一个batch不能超过1MB,在RocketMQ源码中实际上批量消息不能超过4MB。所以批量发送时,需要考虑发送消息的大小,然后根据网络压力和IO压力选择每批次发送多少条消息。
此处按照100条一批发送,1000条用户推送消息,会合并为10个batch进行发送。因此只要发起10次网络请求即可,每个推送任务分片的处理到写MQ的整个过程,速度是非常快的。一台营销系统单线程处理1万个推送分片任务,每个任务要写10次MQ,每次10ms,总共需要1000s=20分钟。多台营销系统,对每个分片任务的10个batch发送都可以看成是线程池并发处理的。假设2台机器,每台机器开50个线程,那么总共需要1000s / (2*50) = 10s,就可以把1万个分片任务处理完毕。
代码语言:javascript代码运行次数:0运行复制//下面是营销系统监听"PLATFORM_PROMOTION_SEND_USER_BUCKET_TOPIC"消费推送任务分片消息的实现。
//消费完成后会将消息发送到MQ的"PLATFORM_PROMOTION_SEND_TOPIC":
@Configuration
public class ConsumerBeanConfig {
...
//平台发放促销活动用户桶消费者
@Bean("platformPromotionUserBucketReceiveTopicConsumer")
public DefaultMQPushConsumer receiveCouponUserBucketConsumer(PlatFormPromotionUserBucketListener platFormPromotionUserBucketListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_PROMOTION_SEND_USER_BUCKET_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(PLATFORM_PROMOTION_SEND_USER_BUCKET_TOPIC, "*");
consumer.registerMessageListener(platFormPromotionUserBucketListener);
consumer.start();
return consumer;
}
}
@Component
public class PlatFormPromotionUserBucketListener implements MessageListenerConcurrently {
//会员服务
@DubboReference(version = "1.0.0")
private AccountApi accountApi;
//发送消息共用的线程池
@Autowired
@Qualifier("sharedSendMsgThreadPool")
private SafeThreadPool sharedSendMsgThreadPool;
//RocketMQ生产者
@Autowired
private DefaultProducer defaultProducer;
//并发消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgList) {
//1.获取到一个推送任务分片
String message = new String(messageExt.getBody());
log.debug("执行平台发送促销活动用户桶消息逻辑,消息内容:{}", message);
PlatformPromotionUserBucketMessage promotionPushTask = JSON.parseObject(message, PlatformPromotionUserBucketMessage.class);
//2.查询本次推送任务分片对应的用户群体
Long startUserId = promotionPushTask.getStartUserId();
Long endUserId = promotionPushTask.getEndUserId();
JsonResult<List<MembershipAccountDTO>> queryResult = accountApi.queryAccountByIdRange(startUserId, endUserId);
if (!queryResult.getSuccess()) {
throw new BaseBizException(queryResult.getErrorCode(), queryResult.getErrorMessage());
}
List<MembershipAccountDTO> membershipAccounts = queryResult.getData();
if (CollectionUtils.isEmpty(membershipAccounts)) {
log.info("根据用户桶内的id范围没有查询到用户, startUserId={}, endUserId{}", startUserId, endUserId);
continue;
}
//3.为每个用户创建一条符合推送系统规定格式的用户推送消息,然后把每个用户的推送消息发送到MQ里;
//第一种实现(不推荐):
//用线程池并发地把一个任务分片里的1000条消息并发发送到MQ里去;
//这种实现的问题是如果一个分片任务有1000个用户,那么此时虽然是多线程并发,但还是要发送请求1000次到MQ;
//第二种实现(推荐):
//用线程池并发地批量发送消息到MQ里;
//RocketMQ官网对批量发送消息的说明是,一个batch不能超过1MB,在RocketMQ源码中实际上批量消息不能超过4MB;
//所以批量发送的时候,需要综合考虑发送消息的大小,然后根据网络压力和IO压力综合对比评估后选择每批次发送多少条;
//此处按照100条一批发送,1000条用户推送消息,会合并为10个batch进行发送;
//因此只要发起10次网络请求即可,每个任务分片的处理到写MQ的整个过程,速度是非常快的;
//一台营销系统单线程不停处理1万个分片任务,每个任务要写10次MQ,10万次,每次10ms,总共需要1000000ms=1000s=20分钟左右
//多台营销系统,对每个分片任务的10个batch都是线程池并发写的
//假设2台机器,每台机器开50个线程,那么总共需要:1000s / 50 = 20s,就可以把1万个分片任务在这里处理完毕;
PlatformPromotionMessage promotionMessage = PlatformPromotionMessage.builder()
.promotionId(promotionPushTask.getPromotionId())
.promotionType(promotionPushTask.getPromotionType())
.mainMessage(promotionPushTask.getMainMessage())
.message("您已获得活动资格,打开APP进入活动页面")
.informType(promotionPushTask.getInformType())
.build();
List<String> batch = new ArrayList<>(100);
for (MembershipAccountDTO account : membershipAccounts) {
promotionMessage.setUserAccountId(account.getId());
batch.add(JSON.toJSONString(promotionMessage));
if (batch.size() == 100) {
sharedSendMsgThreadPool.execute(() -> {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, batch, "平台发送促销活动消息");
});
batch.clear();
}
}
//最后剩下的也批量发出
if (!CollectionUtils.isEmpty(batch)) {
sharedSendMsgThreadPool.execute(() -> {
defaultProducer.sendMessages(RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, batch, "平台发送促销活动消息");
});
batch.clear();
}
}
} catch (Exception e) {
log.error("consume error,促销活动消息消费失败", e);
//本次消费失败,下次重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
8.线程池封装以及推送系统多线程推送
(1)通过@Configuration实例化一个线程池
(2)具体的线程池实例化
(1)通过@Configuration实例化一个线程池
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ThreadPoolConfig {
//发送消息共用的线程池
//线程池名字、线程名字:sharedThreadPool
//最多允许多少线程同时执行任务:100
@Bean("sharedSendMsgThreadPool")
public SafeThreadPool sharedSendMsgThreadPool() {
return new SafeThreadPool("sharedSendMsgThreadPool", 100);
}
}
(2)具体的线程池实例化
发送消息的线程池的corePoolSize设置为0,可以在空闲时把线程都回收掉。
代码语言:javascript代码运行次数:0运行复制public class SafeThreadPool {
private final Semaphore semaphore;
private final ThreadPoolExecutor threadPoolExecutor;
public SafeThreadPool(String name, int permits) {
//如果超过了100个任务同时要运行,会通过semaphore信号量进行阻塞
semaphore = new Semaphore(permits);
//为什么要设置corePoolSize是0?
//因为消息推送并不是一直要推送的,只有促销活动比如发优惠券时才需要进行消息推送,正常情况下是不会进行消息推送的
//所以发送消息的线程池的corePoolSize设置为0,可以在空闲时把线程都回收掉
threadPoolExecutor = new ThreadPoolExecutor(
0,
permits * 2,
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
NamedDaemonThreadFactory.getInstance(name)
);
}
public void execute(Runnable task) {
//超过了100个batch要并发推送,就会在这里阻塞住
//比如100个线程都在繁忙时,就不可能有超过100个batch要同时提交过来
//极端情况下,最多也就是100个batch可以拿到信号量,100 * 2的max容量
semaphore.acquireUninterruptibly();
threadPoolExecutor.submit(() -> {
try {
task.run();
} finally {
semaphore.release();
}
});
}
}
9.推送系统的千万级消息多线程推送
根据前面可知:对一个千万级消息的推送任务,营销系统首先会对这个千万级消息的推送任务进行分片,分片成1万个推送任务然后batch发送到MQ。
营销系统会获取这个千万级消息的推送任务的推送任务分片,然后自己决定如何查询用户群体。
营销系统会为查出来的每个用户进行推送消息封装,然后再将这些推送消息以batch模式发送到MQ由推送系统消费处理。
所以营销系统会封装好千万级的推送消息,然后合并成10万个batch推送消息去发送到MQ。如果每个batch推送消息发送到MQ需要50ms,那么总共需要500万ms,即5000s。使用2台营销系统共200个线程并发去将这10万个batch推送消息发送到MQ,那么总共需要5000s / 200 = 25s,就可以把千万级推送消息发到MQ。
假设有5台4核8G的机器部署了推送系统,那么每个推送系统便会消费到200万条推送消息,接着使用多线程并发推送。由于部署5台机器,每台机器会拿到200w条消息,消费时会一批一批拿,放入msgList。如果每条消息调用第三方平台SDK发起推送耗时100ms~200ms,那么总共需要200w*200ms=40万s=几十个小时。这时必须用线程池采用多线程的方式并发去推送。
每台机器最多开启60个线程,那么5台机器总共300个线程。由于一次推送200ms,每个线程每秒钟可以推成5次,300个线程每秒1500次。那么经过6000s,5台机器300个线程就可以推1000万次,6000s / 60 = 100分钟,1个多小时。所以千万级用户全量推送,快则几十分钟,慢则两三个小时。
监听PLATFORM_PROMOTION_SEND_TOPIC的推送系统,对这200万条推送消息进行消费然后发起推送的代码如下:
代码语言:javascript代码运行次数:0运行复制@Configuration
public class ConsumerBeanConfig {
...
//平台活动推送消息消费者 completableFuture逻辑
@Bean("platformPromotionSendTopicConsumer")
public DefaultMQPushConsumer platformPromotionSendConsumer(PlatFormPromotionListener platFormPromotionListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_PROMOTION_SEND_CONSUMER_GROUP);
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.subscribe(PLATFORM_PROMOTION_SEND_TOPIC, "*");
consumer.registerMessageListener(platFormPromotionListener);
consumer.start();
return consumer;
}
}
@Component
public class PlatFormPromotionListener implements MessageListenerConcurrently {
//消息推送工厂提供者
@Autowired
private FactoryProducer factoryProducer;
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final int PERMITS = 30;
private static final AtomicBoolean initializedRef = new AtomicBoolean(false);
private static ThreadPoolExecutor THREAD_POOL_EXECUTOR = null;
private static final Supplier<ThreadPoolExecutor> THREAD_POOL_EXECUTOR_SUPPLIER = () -> {
if (initializedRefpareAndSet(false, true)) {
//corePoolSize是30个,maxPoolSize是60个
THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(PERMITS, PERMITS * 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), NamedDaemonThreadFactory.getInstance("consumePromotionMsg"), new ThreadPoolExecutor.CallerRunsPolicy());
}
return THREAD_POOL_EXECUTOR;
};
//并发消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
try {
//使用自定义的业务线程池
List<CompletableFuture<AltResult>> futureList = msgList.stream()
.map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e), THREAD_POOL_EXECUTOR_SUPPLIER.get()))
.collect(Collectors.toList());
List<Throwable> resultList = futureList.stream()
.map(CompletableFuture::join)
.filter(e -> e.ex != null)
.map(e -> e.ex).collect(Collectors.toList());
if (!resultList.isEmpty()) {
throw resultList.get(0);
}
} catch (Throwable e) {
log.error("consume error,平台优惠券消费失败", e);
//本次消费失败,下次重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private AltResult handleMessageExt(MessageExt messageExt) {
try {
log.debug("执行平台发送通知消息逻辑,消息内容:{}", messageExt.getBody());
String msg = new String(messageExt.getBody());
PlatformPromotionMessage message = JSON.parseObject(msg , PlatformPromotionMessage.class);
//幂等控制
if (StringUtils.isNotBlank(redisTemplate.opsForValue().get(message.cacheKey()))) {
return new AltResult(null);
}
//获取消息服务工厂
MessageSendServiceFactory messageSendServiceFactory = factoryProducer.getMessageSendServiceFactory(message.getInformType());
//消息发送服务组件
MessageSendService messageSendService = messageSendServiceFactory.createMessageSendService();
//构造消息
PlatformMessagePushMessage messagePushMessage = PlatformMessagePushMessage.builder()
.informType(message.getInformType())
.mainMessage(message.getMainMessage())
.userAccountId(message.getUserAccountId())
.message(message.getMessage())
.build();
MessageSendDTO messageSendDTO = messageSendServiceFactory.createMessageSendDTO(messagePushMessage);
messageSendService.send(messageSendDTO);
//发送成功之后把已经发送成功记录到redis
redisTemplate.opsForValue().set(message.cacheKey(), UUID.randomUUID().toString());
log.info("消息推送完成,messageSendDTO:{}", messageSendDTO);
Thread.sleep(20);
return new AltResult(null);
} catch (Exception e) {
return new AltResult(e);
}
}
//completableFuture的返回结果,适用于无返回值的情况
//ex字段为null表示任务执行成功
//ex字段不为null表示任务执行失败,并把异常设置为ex字段
private static class AltResult {
final Throwable ex;
public AltResult(Throwable ex) {
this.ex = ex;
}
}
}
本文标签: RocketMQ实战10营销系统代码优化一
版权声明:本文标题:RocketMQ实战—10.营销系统代码优化一 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/jiaocheng/1747632679a2196180.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论