Commit ca54709d ca54709d0e4f3ef32c344570694d2c7ff0b4d002 by xianghan

1.同步master,优化任务处理过程

1 parent f4ad8dc9
package com.topdraw.business.module.task.template.constant;
/**
* @author :
* @description:
* @function :
* @date :Created in 2022/6/18 14:30
* @version: :
* @modified By:
* @since : modified in 2022/6/18 14:30
*/
public interface TaskEventName {
//类型 1:登录;2:观影;3:参加活动;4:订购;5:优享会员;6:签到;7:完成设置;
// 8:播放记录;10:跨屏绑定;11:积分转移;30:积分兑换商品;98:系统操作;99:其他
String LOGIN = "LOGIN";
String VIEW = "VIEW";
String ACTIVITY = "ACTIVITY";
String ORDER = "ORDER";
String MEMBER_PRIORITY = "MEMBER_PRIORITY";
String SIGN = "SIGN";
String COMPLETE_INFO = "COMPLETE_INFO";
String PLAY = "PLAY";
String BINDING = "BINDING";
String POINTS_TRANS = "POINTS_TRANS";
String POINTS_EXCHANGE_GOODS = "POINTS_EXCHANGE_GOODS";
String SYSTEM_OPERATE = "SYSTEM_OPERATE";
String OTHER = "OHHER";
}
package com.topdraw.business.module.task.template.constant;
/**
* @author :
* @description:
* @function :
* @date :Created in 2022/6/18 14:30
* @version: :
* @modified By:
* @since : modified in 2022/6/18 14:30
*/
public interface TaskEventType {
//类型 1:登录;2:观影;3:参加活动;4:订购;5:优享会员;6:签到;7:完成设置;
// 8:播放记录;10:跨屏绑定;11:积分转移;30:积分兑换商品;98:系统操作;99:其他
int LOGIN = 1;
int VIEW = 2;
int ACTIVITY = 3;
int ORDER = 4;
int MEMBER_PRIORITY = 5;
int SIGN = 6;
int COMPLETE_INFO = 7;
int PLAY = 8;
int BINDING = 10;
int POINTS_TRANS = 11;
int POINTS_EXCHANGE_GOODS = 30;
int SYSTEM_OPERATE = 98;
int OHHER = 99;
}
......@@ -56,4 +56,11 @@ public interface UserTvRepository extends JpaRepository<UserTv, Long>, JpaSpecif
" `vis_user_id` = :#{#resources.visUserId}, " +
" `update_time` = now() WHERE `platform_account` = :#{#resources.platformAccount}", nativeQuery = true)
void updateUserTvByPlatformAccount(@Param("resources") UserTv userTv);
Long countByPlatformAccount(String platformAccount);
@Modifying
@Query(value = "UPDATE `uc_user_tv` SET `priority_member_code` = :#{#resources.priorityMemberCode}, " +
"`update_time` = now() WHERE `platform_account` = :#{#resources.platformAccount}", nativeQuery = true)
void updatePriorityMemberCode(@Param("resources") UserTv userTv);
}
......
......@@ -108,4 +108,17 @@ public interface UserTvService {
* @param userTv
*/
void updateUserTvByPlatformAccount(UserTv userTv);
/**
*
* @param platformAccount
* @return
*/
Long countByPlatformAccount(String platformAccount);
/**
*
* @param userTv
*/
void doUpdatePriorityMemberCode(UserTv userTv);
}
......
......@@ -62,6 +62,16 @@ public class UserTvServiceImpl implements UserTvService {
this.userTvRepository.updateUserTvByPlatformAccount(userTv);
}
@Override
public Long countByPlatformAccount(String platformAccount) {
return this.userTvRepository.countByPlatformAccount(platformAccount);
}
@Override
public void doUpdatePriorityMemberCode(UserTv userTv) {
this.userTvRepository.updatePriorityMemberCode(userTv);
}
/**
* 获取大屏账户对应的会员
......
......@@ -21,13 +21,7 @@ public class ExpOperationServiceImpl implements ExpOperationService {
@Autowired
private ExpDetailService expDetailService;
@Autowired
private MemberOperationService memberOperationService;
@Autowired
private MemberLevelService memberLevelService;
@Autowired
private MemberService memberService;
@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor;
public void asyncMemberExpAndLevel(Member member) {
String code = member.getCode();
......
......@@ -349,6 +349,31 @@ public class UserOperationServiceImpl implements UserOperationService {
}
@Transactional(propagation = Propagation.SUPPORTS, rollbackFor = Exception.class)
public void asyncUserTvChangeMainAccount(UserTvDTO userTvDTO) {
log.info("asyncUserTv ==>> userTvDTO ==>> {}", userTvDTO);
String priorityMemberCode = userTvDTO.getPriorityMemberCode();
if (StringUtils.isBlank(priorityMemberCode)) {
log.error("大屏账号设置主会员异常,主会员code不存在");
return;
}
String platformAccount = userTvDTO.getPlatformAccount();
UserTvDTO _userTvDTO = this.userTvService.findByPlatformAccount(platformAccount);
log.info("从数据库中获取大屏信息, _userTvDTO ==>> {}", _userTvDTO);
if (Objects.isNull(_userTvDTO.getId())) {
log.error("大屏账号设置主会员异常,大屏账号不存在");
return;
}
UserTv userTv = new UserTv();
userTv.setId(_userTvDTO.getId());
userTv.setPriorityMemberCode(priorityMemberCode);
log.info("开始修改大屏数据,userTv ==>>{}", userTv);
this.userTvService.doUpdatePriorityMemberCode(userTv);
}
@Transactional(propagation = Propagation.SUPPORTS, rollbackFor = Exception.class)
public void asyncUserTv(UserTvDTO userTvDTO) {
log.info("asyncUserTv ==>> userTvDTO ==>> {}", userTvDTO);
String platformAccount = userTvDTO.getPlatformAccount();
......
package com.topdraw.config.redis;
/**
* @author :
* @description:
* @function :
* @date :Created in 2022/6/18 17:22
* @version: :
* @modified By:
* @since : modified in 2022/6/18 17:22
*/
public class RedisKeyConstant {
public static final String CACHE_PLATFROMACCOUNT_PLAYDURATION = "ucc::play::playduration::";
}
......@@ -9,17 +9,20 @@ import com.topdraw.business.module.task.attribute.service.TaskAttrService;
import com.topdraw.business.module.task.attribute.service.dto.TaskAttrDTO;
import com.topdraw.business.module.task.domain.Task;
import com.topdraw.business.module.task.service.TaskService;
import com.topdraw.business.module.task.template.constant.TaskEventName;
import com.topdraw.business.module.task.template.constant.TaskEventType;
import com.topdraw.business.module.task.template.service.TaskTemplateService;
import com.topdraw.business.module.task.template.service.dto.TaskTemplateDTO;
import com.topdraw.business.module.user.iptv.service.UserTvService;
import com.topdraw.business.module.user.iptv.service.dto.UserTvDTO;
import com.topdraw.config.redis.RedisKeyConstant;
import com.topdraw.exception.BadRequestException;
import com.topdraw.exception.EntityNotFoundException;
import com.topdraw.mq.domain.DataSyncMsg;
import com.topdraw.resttemplate.RestTemplateClient;
import com.topdraw.util.DateUtil;
import com.topdraw.util.FileUtil;
import com.topdraw.util.JSONUtil;
import com.topdraw.util.TimestampUtil;
import com.topdraw.utils.RedisUtils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
......@@ -31,6 +34,7 @@ import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ResponseStatusException;
import java.io.IOException;
import java.text.ParseException;
......@@ -57,7 +61,6 @@ public class UcEventBusIptv2ManagementUcEngine {
@Autowired
private RedisUtils redisUtils;
@Value("#{rabbitMqErrorLogConfig.getEventBusError()}")
private Map<String, String> error;
......@@ -73,16 +76,22 @@ public class UcEventBusIptv2ManagementUcEngine {
containerFactory = "#{rabbitMqSourceConfig.getEventBusSource()}",
autoStartup = "#{rabbitMqSourceConfig.getEventBusStartUp()}",
ackMode = "AUTO")
public void eventBusConsumer(Channel channel, Message message, String content) throws ParseException, IOException {
public void eventBusConsumer(Channel channel, Message message, String content) throws IOException {
log.info(" receive dataSync msg , content is : {} ", content);
try {
this.parseContent(content);
PlayContent playContent = JSONUtil.parseMsg2Object(content, PlayContent.class);
log.info("解析后的参数 , playContent ==>> {} ", playContent);
if (Objects.nonNull(playContent)) {
this.parseContent(playContent);
}
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("eventBus 消费异常 ==>> {}",e.getMessage());
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
if (MapUtils.isNotEmpty(error)) {
......@@ -97,33 +106,35 @@ public class UcEventBusIptv2ManagementUcEngine {
}
e.printStackTrace();
}
log.info("ucEventConsumer ====>>>> end");
log.info("eventBusConsumer ====>>>> end");
}
/**
* 数据解析
* @param content
* @param playContent
* @return
*/
private void parseContent(String content) throws ParseException {
PlayContent commonMsg = JSONUtil.parseMsg2Object(content, PlayContent.class);
private void parseContent(PlayContent playContent) throws ParseException {
PlayContent.MsgData msgData = playContent.getMsgData();
if (Objects.isNull(msgData)) {
log.error("eventBus事件消息体为空,msgData ==>> {}", msgData);
return;
}
String evt = commonMsg.getEvt();
String evt = playContent.getEvt();
switch (evt.toUpperCase()) {
case "PLAY":
PlayContent playContent = JSONUtil.parseMsg2Object(content, PlayContent.class);
PlayContent.MsgData msgData = playContent.getMsgData();
if (Objects.nonNull(msgData)) {
case TaskEventName.PLAY:
String time = playContent.getTime();
String formatDate = DateUtil.formatDate(time);
Integer deviceType = playContent.getDeviceType();
String platformAccount = msgData.getPlatformAccount();
if (StringUtils.isBlank(platformAccount)){
log.error("参数错误,platformAccount不得为空, msgData ==>> {}", msgData);
return;
}
String mediaCode = msgData.getMediaCode();
Long mediaId = msgData.getMediaId();
......@@ -136,16 +147,23 @@ public class UcEventBusIptv2ManagementUcEngine {
DataSyncMsg dataSyncMsg = new DataSyncMsg();
dataSyncMsg.setEvt(evt);
dataSyncMsg.setEvent(TaskEventType.PLAY);
dataSyncMsg.setTime(LocalDateTime.now());
dataSyncMsg.setDeviceType(deviceType);
DataSyncMsg.MsgData msg = new DataSyncMsg.MsgData();
msg.setPlatformAccount(platformAccount);
msg.setMediaId(mediaId);
Integer playDurationValueTotal = 0;
if (StringUtils.isNotBlank(platformAccount)) {
UserTvDTO userTvDTO = this.userTvService.findByPlatformAccount(platformAccount);
Long count = this.userTvService.countByPlatformAccount(platformAccount);
log.info("通过大屏账号查询对应记录的条数,==>> {}", count);
if(count > 0L) {
String key = RedisKeyConstant.CACHE_PLATFROMACCOUNT_PLAYDURATION+platformAccount+"|"+formatDate;
if(Objects.nonNull(userTvDTO)) {
// 用大屏账号+日期做为key,并判断这个key是否存在 ,数据类型为hash eg:<total,1>,<1,playDuration>,<2,playDuration>
String key = platformAccount+"|"+formatDate;
Map<Object, Object> hmget =
this.redisUtils.hmget(key);
......@@ -155,28 +173,35 @@ public class UcEventBusIptv2ManagementUcEngine {
playDurationValueTotal = playDuration;
Map<String, Object> map = new HashMap<>();
map.put("total", playDurationValueTotal);
map.put("1", playDuration);
// 存储时间36小时
this.redisUtils.hmset(key, map, 129600);
} else {
// 计算播放总时长 total = 播放总时长+当前播放时长
Integer total = this.getRedisTotal(hmget);
Integer total = this.getTotalPlayDurationFromRedis(key);
playDurationValueTotal = total + playDuration;
}
Integer totalKey = this.getRedisTotalKey(hmget);
Integer maxSize = totalKey + 1;
Map<String, Object> map = new HashMap<>();
map.put(String.valueOf(maxSize), playDuration);
map.put("total", playDurationValueTotal);
this.redisUtils.hmset(key, map);
this.checkTask(playDurationValueTotal, time, deviceType,
mediaCode, mediaId, mediaName, dataSyncMsg, msg, userTvDTO);
JSONObject jsonObject1 = new JSONObject();
jsonObject1.put("PLAYDURATION",playDurationValueTotal);
msg.setParam(JSON.toJSONString(jsonObject1));
dataSyncMsg.setMsgData(JSON.toJSONString(msg));
log.info("调用uc-engine的接口 ==>> /uce/taskOperation/dealTask, 参数 ==>> {}",dataSyncMsg);
JSONObject response = this.restTemplateClient.dealTask(dataSyncMsg);
if (Objects.nonNull(response)) {
log.info("uc-engine任务处理接口的响应结果,/uce/taskOperation/dealTask ==>> {}",response);
} else {
log.error("uc-engine响应超时,请检查uc-engine服务");
throw new BadRequestException("uc-engine响应超时");
}
}
......@@ -184,18 +209,35 @@ public class UcEventBusIptv2ManagementUcEngine {
}
break;
}
//return null;
}
private DataSyncMsg checkTask(Integer playDurationValueTotal, String time, Integer deviceType, String mediaCode,
Long mediaId, String mediaName, DataSyncMsg dataSyncMsg,
DataSyncMsg.MsgData msgData, UserTvDTO userTvDTO) {
// 检查播放记录任务
List<TaskAttrDTO> taskAttrDTOList = new ArrayList<>();
// TODO 枚举
TaskTemplateDTO taskTemplateDTO = this.taskTemplateService.findByType(8);
if (Objects.nonNull(taskTemplateDTO.getId())) {
List<Task> taskList = this.taskService.findByTemplateId(taskTemplateDTO.getId());
......@@ -250,7 +292,7 @@ public class UcEventBusIptv2ManagementUcEngine {
Integer integer = attrList.get(i).get(0);
if (playDurationValueTotal >= integer) {
dataSyncMsg1 = getDataSyncMsg(time, mediaCode, mediaId, mediaName, integer, dataSyncMsg,
dataSyncMsg1 = getDataSyncMsg(time, mediaCode, mediaId, mediaName, playDurationValueTotal, dataSyncMsg,
msgData, userTvDTO);
dataSyncMsg1.setEvt("PLAY");
dataSyncMsg1.setEvent(8);
......@@ -266,6 +308,18 @@ public class UcEventBusIptv2ManagementUcEngine {
return dataSyncMsg1;
}
private Integer getTotalPlayDurationFromRedis(String key) {
Map<Object, Object> total = this.redisUtils.hmget("key");
if (Objects.nonNull(total)){
Object total1 = total.get("total");
if (Objects.nonNull(total1)) {
return Integer.valueOf(total1.toString());
}
}
return 0;
}
private Integer getRedisTotalKey(Map<Object, Object> hmget) {
Set<Object> objects = hmget.keySet();
return objects.size();
......
package com.topdraw.resttemplate;
/**
* @author :
* @description:
* @function :
* @date :Created in 2022/6/18 17:45
* @version: :
* @modified By:
* @since : modified in 2022/6/18 17:45
*/
public interface ResponseStatusConstant {
String OK = "00000";
}
......@@ -42,24 +42,23 @@ public class RestTemplateClient {
}
public JSONObject dealTask(DataSyncMsg dataSyncMsg) {
JSONObject resultSet = null;
String url = BASE_URL + "/uce/taskOperation/dealTask";
log.info("request uc : url is " + url + ", dataSyncMsg is " + dataSyncMsg);
String content = JSON.toJSONString(dataSyncMsg);
HashMap<Object, Object> objectObjectHashMap = new HashMap<>();
objectObjectHashMap.put("content", content);
log.info("===>>>" + content);
restTemplate.postForEntity(url, objectObjectHashMap, String.class);
/* ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, objectObjectHashMap, String.class);
ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, objectObjectHashMap, String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
String entityBody = responseEntity.getBody();444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444被44444444444444 444444 44444 44444 44444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444
String entityBody = responseEntity.getBody();
JSONObject jsonObject = JSONObject.parseObject(entityBody);
if (jsonObject.getInteger("businessCode").equals(200)) {
if (jsonObject.getInteger("businessCode").equals(ResponseStatusConstant.OK)) {
resultSet = jsonObject.getJSONArray("resultSet").getJSONObject(0);
}
}
log.info("uc response: " + resultSet.toJSONString());
return resultSet;*/
return null;
return resultSet;
}
public JSONObject getMemberInfo(Long memberId) {
......
......@@ -99,12 +99,12 @@ mutil-mq:
service:
mq:
list:
# - source: event
# exchange: event.exchange
# queue: event.queue
# exchange-type: direct
# routing-key:
# active: service
- source: event
exchange: event.exchange
queue: event.queue
exchange-type: direct
routing-key:
active: service
- source: collection
exchange: exchange.collection
queue: collection.queue
......@@ -136,12 +136,12 @@ service:
# exchange-type: direct
# routing-key:
# active: service
# - source: eventBus
# exchange: uc.eventbus
# queue: uc.eventbus
# exchange-type: topic
# routing-key: uc.eventbus.*.topic
# active: service
- source: eventBus
exchange: uc.eventbus
queue: uc.eventbus
exchange-type: topic
routing-key: uc.eventbus.*.topic
active: service
- source: uce
exchange: uce.exchange
queue: uce.queue
......
......@@ -64,12 +64,12 @@
<!--监控sql日志输出 -->
<logger name="jdbc.sqlonly" level="INFO" additivity="false">
<logger name="jdbc.sqlonly" level="OFF" additivity="false">
<appender-ref ref="console" />
<appender-ref ref="info" />
</logger>
<logger name="jdbc.resultset" level="ERROR" additivity="false">
<logger name="jdbc.resultset" level="OFF" additivity="false">
<appender-ref ref="console" />
<appender-ref ref="info" />
</logger>
......