Commit 1e30cfe4 1e30cfe40d8837ef08cbaae5506343088d996c3b by xianghan

1.去除消费任务时的部分查询,直接将消息传递至uce

1 parent 495bb3c6
......@@ -12,6 +12,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import java.util.Objects;
/**
*
*/
......@@ -34,8 +36,10 @@ public class ExpOperationServiceImpl implements ExpOperationService {
public void asyncExpDetail(ExpDetail expDetail) {
String code = expDetail.getMemberCode();
MemberDTO memberDTO = this.memberService.findByCode(code);
if (Objects.nonNull(memberDTO.getId())) {
expDetail.setMemberId(memberDTO.getId());
this.expDetailService.create(expDetail);
}
}
}
......
......@@ -7,29 +7,18 @@ import com.topdraw.business.module.member.service.dto.MemberDTO;
import com.topdraw.business.module.points.available.domain.PointsAvailable;
import com.topdraw.business.module.points.available.service.PointsAvailableService;
import com.topdraw.business.module.points.available.service.dto.PointsAvailableDTO;
import com.topdraw.business.module.points.detail.detailhistory.service.PointsDetailHistoryService;
import com.topdraw.business.module.points.detail.domain.PointsDetail;
import com.topdraw.business.module.points.detail.service.PointsDetailService;
import com.topdraw.business.module.points.service.PointsService;
import com.topdraw.business.process.domian.TempPoints;
import com.topdraw.business.process.service.PointsOperationService;
import com.topdraw.business.process.service.member.MemberOperationService;
import com.topdraw.util.IdWorker;
import com.topdraw.util.TimestampUtil;
import com.topdraw.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import javax.validation.constraints.NotNull;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
/**
*
......@@ -56,23 +45,29 @@ public class PointsOperationServiceImpl implements PointsOperationService {
return;
}
MemberDTO memberDTO = this.memberService.findByCode(code);
if (Objects.nonNull(memberDTO.getId())) {
member.setId(memberDTO.getId());
this.memberService.doUpdateMemberPoints(member);
}
}
public void asyncPointsAvailable(PointsAvailable pointsAvailable) {
String memberCode = pointsAvailable.getMemberCode();
MemberDTO memberDTO = this.memberService.findByCode(memberCode);
if (Objects.nonNull(memberDTO.getId())) {
pointsAvailable.setMemberId(memberDTO.getId());
this.pointsAvailableService.create4Custom(pointsAvailable);
}
}
public void asyncPointsDetail(PointsDetail pointsDetail) {
String memberCode = pointsDetail.getMemberCode();
MemberDTO memberDTO = this.memberService.findByCode(memberCode);
if (Objects.nonNull(memberDTO.getId())) {
pointsDetail.setMemberId(memberDTO.getId());
this.pointsDetailService.create4Custom(pointsDetail);
}
}
public void asyncDeletePointsAvailable(PointsAvailable pointsAvailable) {
String code = pointsAvailable.getCode();
......
package com.topdraw.config.redis;
package com.topdraw.config;
/**
* @author :
* @description:
* @function :
* @date :Created in 2022/6/18 17:22
* @date :Created in 2022/6/13 11:22
* @version: :
* @modified By:
* @since : modified in 2022/6/18 17:22
* @since : modified in 2022/6/13 11:22
*/
public class RedisKeyConstant {
public interface ResponseStatus {
Integer OK = 00000;
public static final String CACHE_PLATFROMACCOUNT_PLAYDURATION = "ucc::play::playduration::";
}
......
package com.topdraw.mq.consumer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.topdraw.business.module.member.service.MemberService;
import com.topdraw.business.module.member.service.dto.MemberDTO;
import com.topdraw.business.module.task.attribute.service.TaskAttrService;
import com.topdraw.business.module.task.attribute.service.dto.TaskAttrDTO;
import com.topdraw.business.module.task.domain.Task;
import com.topdraw.business.module.task.service.TaskService;
import com.topdraw.business.module.task.template.constant.TaskEventName;
import com.topdraw.business.module.task.template.constant.TaskEventType;
import com.topdraw.business.module.task.template.service.TaskTemplateService;
import com.topdraw.business.module.task.template.service.dto.TaskTemplateDTO;
import com.topdraw.business.module.user.iptv.service.UserTvService;
import com.topdraw.business.module.user.iptv.service.dto.UserTvDTO;
import com.topdraw.config.redis.RedisKeyConstant;
import com.topdraw.exception.BadRequestException;
import com.topdraw.exception.EntityNotFoundException;
import com.topdraw.mq.domain.DataSyncMsg;
import com.topdraw.resttemplate.RestTemplateClient;
import com.topdraw.util.DateUtil;
import com.topdraw.util.FileUtil;
import com.topdraw.util.JSONUtil;
import com.topdraw.utils.RedisUtils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ResponseStatusException;
import java.io.IOException;
import java.text.ParseException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.*;
......@@ -48,19 +29,7 @@ import java.util.*;
public class UcEventBusIptv2ManagementUcEngine {
@Autowired
private TaskService taskService;
@Autowired
private UserTvService userTvService;
@Autowired
private MemberService memberService;
@Autowired
private TaskAttrService taskAttrService;
@Autowired
private TaskTemplateService taskTemplateService;
@Autowired
private RestTemplateClient restTemplateClient;
@Autowired
private RedisUtils redisUtils;
@Value("#{rabbitMqErrorLogConfig.getEventBusError()}")
private Map<String, String> error;
......@@ -77,24 +46,57 @@ public class UcEventBusIptv2ManagementUcEngine {
containerFactory = "#{rabbitMqSourceConfig.getEventBusSource()}",
autoStartup = "#{rabbitMqSourceConfig.getEventBusStartUp()}",
ackMode = "AUTO")
public void eventBusConsumer(Channel channel, Message message, String content) throws IOException {
log.info(" receive dataSync msg , content is : {} ", content);
public void eventBusConsumer(Channel channel, Message message, String content) throws Exception {
log.info(" receive dataSync msg , content is ==>> {} ", content);
try {
PlayContent playContent = JSONUtil.parseMsg2Object(content, PlayContent.class);
log.info("解析后的参数 , playContent ==>> {} ", playContent);
if (Objects.nonNull(playContent)) {
this.parseContent(playContent);
DataSyncMsg dataSyncMsg = JSONUtil.parseMsg2Object(content, DataSyncMsg.class);
log.info("解析后的参数 , playContent ==>> {} ", dataSyncMsg);
if (Objects.nonNull(dataSyncMsg)) {
String evt = dataSyncMsg.getEvt();
if (StringUtils.isBlank(evt)) {
log.error("eventBus事件类型(evt)为空");
throw new BadRequestException("参数错误,事件类型 evt不存在");
}
LocalDateTime time = dataSyncMsg.getTime();
if (Objects.isNull(time)) {
log.error("参数错误,事件发送时间(time)不存在");
throw new BadRequestException("参数错误,事件发送时间(time)不存在");
} /*else {
if (time.isAfter(LocalDateTime.now()) || time.toLocalDate().compareTo(LocalDate.now()) != 0) {
log.error("参数错误,事件发送时间(time)非法 ==>> {}", time);
throw new BadRequestException("参数错误,事件发送时间非法 ");
}
}*/
String msgData = dataSyncMsg.getMsgData();
if (StringUtils.isBlank(msgData)) {
log.error("eventBus事件消息体(msgData)为空");
throw new BadRequestException("参数错误,事件类型 evt不存在");
}
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
switch (dataSyncMsg.getEvt().toUpperCase()) {
// 播放记录
case TaskEventName.PLAY:
this.doPlayEvent(dataSyncMsg);
break;
default:
log.info("无可处理的任务");
break;
}
}
} catch (Exception e) {
log.error("eventBus 消费异常 ==>> {}",e.getMessage());
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
// TODO使用slf4j记录日志
if (MapUtils.isNotEmpty(error)) {
String errorStart = this.error.get("start");
......@@ -111,299 +113,22 @@ public class UcEventBusIptv2ManagementUcEngine {
log.info("eventBusConsumer ====>>>> end");
}
/**
* 数据解析
*
* @param playContent
* @return
*/
private void parseContent(PlayContent playContent) throws ParseException {
PlayContent.MsgData msgData = playContent.getMsgData();
if (Objects.isNull(msgData)) {
log.error("eventBus事件消息体为空,msgData ==>> {}", msgData);
return;
}
String evt = playContent.getEvt();
switch (evt.toUpperCase()) {
case TaskEventName.PLAY:
String time = playContent.getTime();
String formatDate = DateUtil.formatDate(time);
Integer deviceType = playContent.getDeviceType();
String platformAccount = msgData.getPlatformAccount();
if (StringUtils.isBlank(platformAccount)){
log.error("参数错误,platformAccount不得为空, msgData ==>> {}", msgData);
return;
}
String mediaCode = msgData.getMediaCode();
Long mediaId = msgData.getMediaId();
String mediaName = msgData.getMediaName();
Integer playDuration = msgData.getPlayDuration();
if (Objects.isNull(playDuration) || playDuration == 0) {
return;
}
log.info("playDuration ==>> {}", playDuration);
DataSyncMsg dataSyncMsg = new DataSyncMsg();
dataSyncMsg.setEvt(evt);
dataSyncMsg.setEvent(TaskEventType.PLAY);
dataSyncMsg.setTime(LocalDateTime.now());
dataSyncMsg.setDeviceType(deviceType);
DataSyncMsg.MsgData msg = new DataSyncMsg.MsgData();
msg.setPlatformAccount(platformAccount);
msg.setMediaId(mediaId);
Integer playDurationValueTotal = 0;
if (StringUtils.isNotBlank(platformAccount)) {
Long count = this.userTvService.countByPlatformAccount(platformAccount);
log.info("通过大屏账号查询对应记录的条数,==>> {}", count);
if(count > 0L) {
String key = RedisKeyConstant.CACHE_PLATFROMACCOUNT_PLAYDURATION+platformAccount+"|"+formatDate;
Map<Object, Object> hmget =
this.redisUtils.hmget(key);
if (MapUtils.isEmpty(hmget)) {
// 初始化播放总时长<total>和第一个播放时间
playDurationValueTotal = playDuration;
Map<String, Object> map = new HashMap<>();
map.put("total", playDurationValueTotal);
// 存储时间36小时
this.redisUtils.hmset(key, map, 129600);
} else {
// 计算播放总时长 total = 播放总时长+当前播放时长
Integer total = this.getTotalPlayDurationFromRedis(key);
playDurationValueTotal = total + playDuration;
}
Map<String, Object> map = new HashMap<>();
map.put("total", playDurationValueTotal);
this.redisUtils.hmset(key, map);
JSONObject jsonObject1 = new JSONObject();
jsonObject1.put("PLAYDURATION",playDurationValueTotal);
msg.setParam(JSON.toJSONString(jsonObject1));
dataSyncMsg.setMsgData(JSON.toJSONString(msg));
log.info("调用uc-engine的接口 ==>> /uce/taskOperation/dealTask, 参数 ==>> {}",dataSyncMsg);
JSONObject response = this.restTemplateClient.dealTask(dataSyncMsg);
if (Objects.nonNull(response)) {
log.info("uc-engine任务处理接口的响应结果,/uce/taskOperation/dealTask ==>> {}",response);
} else {
private void doPlayEvent(DataSyncMsg playContent) {
playContent.setEvent(TaskEventType.PLAY);
String msgData = playContent.getMsgData();
JSONObject jsonObject = JSONObject.parseObject(msgData, JSONObject.class);
Object platformAccount = jsonObject.get("platformAccount");
if (Objects.nonNull(platformAccount)) {
JSONObject response = this.restTemplateClient.dealTask(playContent);
if (Objects.isNull(response)) {
log.error("uc-engine响应超时,请检查uc-engine服务");
throw new BadRequestException("uc-engine响应超时");
}
}
}
break;
}
//return null;
}
private DataSyncMsg checkTask(Integer playDurationValueTotal, String time, Integer deviceType, String mediaCode,
Long mediaId, String mediaName, DataSyncMsg dataSyncMsg,
DataSyncMsg.MsgData msgData, UserTvDTO userTvDTO) {
// 检查播放记录任务
List<TaskAttrDTO> taskAttrDTOList = new ArrayList<>();
// TODO 枚举
TaskTemplateDTO taskTemplateDTO = this.taskTemplateService.findByType(8);
if (Objects.nonNull(taskTemplateDTO.getId())) {
List<Task> taskList = this.taskService.findByTemplateId(taskTemplateDTO.getId());
if (CollectionUtils.isNotEmpty(taskList)) {
for (Task task : taskList) {
TaskAttrDTO taskAttrDTO = this.taskAttrService.findByTaskId(task.getId());
taskAttrDTOList.add(taskAttrDTO);
}
} else {
return null;
}
} else {
return null;
}
List<List<Integer>> attrList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(taskAttrDTOList)) {
for (TaskAttrDTO taskAttrDTO : taskAttrDTOList) {
String attrStr = taskAttrDTO.getAttrStr();
if (StringUtils.isNotBlank(attrStr)) {
JSONObject parse = JSONObject.parseObject(attrStr, JSONObject.class);
List<Integer> value = (List<Integer>) parse.get("value");
attrList.add(value);
}
}
} else {
return null;
}
int size = attrList.size();
DataSyncMsg dataSyncMsg1 = null;
if (size > 0) {
for (int i = size-1; i >= 0; i--) {
Integer integer = attrList.get(i).get(0);
if (playDurationValueTotal >= integer) {
dataSyncMsg1 = getDataSyncMsg(time, mediaCode, mediaId, mediaName, playDurationValueTotal, dataSyncMsg,
msgData, userTvDTO);
dataSyncMsg1.setEvt("PLAY");
dataSyncMsg1.setEvent(8);
dataSyncMsg1.setTime(LocalDateTime.now());
dataSyncMsg1.setDeviceType(deviceType);
this.taskDeal(dataSyncMsg1);
}
}
}
return dataSyncMsg1;
}
private Integer getTotalPlayDurationFromRedis(String key) {
Map<Object, Object> total = this.redisUtils.hmget("key");
if (Objects.nonNull(total)){
Object total1 = total.get("total");
if (Objects.nonNull(total1)) {
return Integer.valueOf(total1.toString());
}
}
return 0;
}
private Integer getRedisTotalKey(Map<Object, Object> hmget) {
Set<Object> objects = hmget.keySet();
return objects.size();
}
private Integer getRedisTotal(Map<Object, Object> hmget) {
Set<Object> objects = hmget.keySet();
Integer playDurationValueTotal_ = 0;
for (Object key_ : objects) {
if (key_.toString().equalsIgnoreCase("total")) {
playDurationValueTotal_ = Integer.valueOf(hmget.get(key_).toString());
return playDurationValueTotal_;
} else {
continue;
}
}
return playDurationValueTotal_;
}
private DataSyncMsg getDataSyncMsg(String time, String mediaCode, Long mediaId, String mediaName,
Integer playDuration, DataSyncMsg dataSyncMsg, DataSyncMsg.MsgData msgData1, UserTvDTO userTvDTO) {
String priorityMemberCode = userTvDTO.getPriorityMemberCode();
String memberCode = "";
if (StringUtils.isNotBlank(priorityMemberCode)) {
memberCode = priorityMemberCode;
} else {
memberCode = this.memberService.findById(userTvDTO.getMemberId()).getCode();
}
if (StringUtils.isBlank(memberCode))
throw new EntityNotFoundException(MemberDTO.class, "memberCode", "memberCode is null");
msgData1.setMemberCode(memberCode);
msgData1.setMediaId(mediaId);
JSONObject param = new JSONObject();
// 增量
param.put("playDuration", playDuration);
msgData1.setParam(JSON.toJSONString(param));
JSONObject description = new JSONObject();
description.put("mediaId", mediaId);
description.put("mediaName", mediaName);
description.put("playDuration", playDuration);
description.put("mediaCode", mediaCode);
description.put("time", time);
msgData1.setDescription(JSON.toJSONString(description));
dataSyncMsg.setMsgData(JSONObject.toJSONString(msgData1));
return dataSyncMsg;
}
/**
* 任务处理
* @param dataSyncMsg
*/
private void taskDeal(DataSyncMsg dataSyncMsg) {
this.restTemplateClient.dealTask(dataSyncMsg);
}
@Data
static class PlayContent {
private String evt;
private Integer event;
private Integer deviceType;
private String time;
private MsgData msgData;
@Data
static class MsgData {
private String platformAccount;
private Integer playDuration;
private Long mediaId;
private String mediaCode;
private String mediaName;
}
}
}
......
......@@ -3,10 +3,7 @@ package com.topdraw.mq.consumer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.topdraw.business.module.member.service.MemberService;
import com.topdraw.business.module.member.service.dto.MemberDTO;
import com.topdraw.business.module.user.iptv.service.UserTvService;
import com.topdraw.business.module.user.iptv.service.dto.UserTvDTO;
import com.topdraw.exception.BadRequestException;
import com.topdraw.mq.domain.DataSyncMsg;
import com.topdraw.resttemplate.RestTemplateClient;
import com.topdraw.util.FileUtil;
......@@ -19,9 +16,7 @@ import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.time.LocalDate;
import java.util.Map;
......@@ -34,21 +29,13 @@ public class UcGatewayIptv2IptvConsumer {
@Autowired
RestTemplateClient restTemplateClient;
@Autowired
AutoRoute autoUser;
@Autowired
private MemberService memberService;
@Autowired
private UserTvService userTvService;
@Value("#{rabbitMqErrorLogConfig.getUcgError()}")
private Map<String, String> error;
/**
* 事件
* @param content
* @description 基础数据同步
* @description 普通权益事件
* @author Hongyan Wang
* @date 2021/9/7 11:26 上午
*/
......@@ -56,20 +43,22 @@ public class UcGatewayIptv2IptvConsumer {
@RabbitListener(queues = "#{rabbitMqSourceConfig.getUcgEventQueue()}",
containerFactory = "#{rabbitMqSourceConfig.getUcgEventSource()}",
autoStartup = "#{rabbitMqSourceConfig.getUcgEventStartUp()}",
ackMode = "MANUAL")
ackMode = "AUTO")
public void eventConsumer(Channel channel, Message message, String content) throws IOException {
log.info(" receive dataSync msg , content is : {} ", content);
log.info(" eventConsumer receive dataSync msg , content is : {} ", content);
try {
DataSyncMsg dataSyncMsg = this.parseContent(content);
DataSyncMsg dataSyncMsg = JSONUtil.parseMsg2Object(content, DataSyncMsg.class);
if (Objects.nonNull(dataSyncMsg)) {
this.taskDeal(dataSyncMsg);
JSONObject jsonObject = this.restTemplateClient.dealTask(dataSyncMsg);
if (Objects.isNull(jsonObject)) {
throw new BadRequestException("uce处理任务响应超时");
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
log.error("普通权益事件处理异常, ==>> {}", e.getMessage());
if (MapUtils.isNotEmpty(error)) {
String errorStart = this.error.get("start");
......@@ -83,63 +72,9 @@ public class UcGatewayIptv2IptvConsumer {
}
e.printStackTrace();
}
log.info("ucEventConsumer ====>>>> end");
}
/**
* 数据解析
* @param content
* @return
*/
private DataSyncMsg parseContent(String content) {
DataSyncMsg dataSyncMsg = JSONUtil.parseMsg2Object(content,DataSyncMsg.class);
Assert.notNull(dataSyncMsg,"ERROR -->> operationConsumer -->> parseContent -->> 【dataSyncMsg】 not be null !!");
String msgDataStr = dataSyncMsg.getMsgData();
DataSyncMsg.MsgData msgData = null;
if (StringUtils.isNotBlank(msgDataStr)) {
msgData = JSONObject.parseObject(msgDataStr, DataSyncMsg.MsgData.class);
}else {
return null;
}
Long memberId = msgData.getMemberId();
String memberCode = msgData.getMemberCode();
if (Objects.nonNull(memberId) && StringUtils.isBlank(memberCode)) {
MemberDTO memberDTO = this.memberService.findById(memberId);
String code = memberDTO.getCode();
msgData.setMemberCode(code);
}
String platformAccount = msgData.getPlatformAccount();
if (StringUtils.isNotBlank(platformAccount)) {
UserTvDTO userTvDTO = userTvService.findByPlatformAccount(platformAccount);
if (Objects.nonNull(userTvDTO)) {
String priorityMemberCode = userTvDTO.getPriorityMemberCode();
if (StringUtils.isNotBlank(priorityMemberCode)) {
msgData.setMemberCode(priorityMemberCode);
} else {
MemberDTO memberDTO = this.memberService.findById(userTvDTO.getMemberId());
msgData.setMemberCode(memberDTO.getCode());
}
}
}
if(Objects.isNull(msgData.getMemberCode()) && Objects.isNull(msgData.getMemberId())) {
log.error("会员信息不存在,msgData =>> {}", msgData);
return null;
}
dataSyncMsg.setMsgData(JSONObject.toJSONString(msgData));
return dataSyncMsg;
}
/**
* 任务处理
* @param dataSyncMsg
*/
private void taskDeal(DataSyncMsg dataSyncMsg) {
this.restTemplateClient.dealTask(dataSyncMsg);
log.info("ucEventConsumer ====>>>> end");
}
......@@ -151,7 +86,7 @@ public class UcGatewayIptv2IptvConsumer {
@RabbitListener(queues = "#{rabbitMqSourceConfig.getUcgCollectionQueue()}",
containerFactory = "#{rabbitMqSourceConfig.getUcgCollectionSource()}",
autoStartup = "#{rabbitMqSourceConfig.getUcgCollectionStartUp()}",
ackMode = "MANUAL")
ackMode = "AUTO")
public void collectionConsumer(Channel channel, Message message, String content) throws IOException {
log.info("receive UserCollection add message, content {}", content);
......@@ -180,11 +115,9 @@ public class UcGatewayIptv2IptvConsumer {
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
log.error("收藏事件处理异常,cause ==>> {}", e.getMessage());
if (MapUtils.isNotEmpty(error)) {
String errorStart = this.error.get("start");
......@@ -198,7 +131,6 @@ public class UcGatewayIptv2IptvConsumer {
}
e.printStackTrace();
}
}
......@@ -210,9 +142,9 @@ public class UcGatewayIptv2IptvConsumer {
@RabbitListener(queues = "#{rabbitMqSourceConfig.getViewRecordQueue()}",
containerFactory = "#{rabbitMqSourceConfig.getViewRecordSource()}",
autoStartup = "#{rabbitMqSourceConfig.getViewRecordStartUp()}",
ackMode = "MANUAL")
ackMode = "AUTO")
public void viewRecordConsumer(Channel channel, Message message, String content) throws IOException {
log.info("receive ViewRecord add message, content {}", content);
log.info("viewRecordConsumer receive ViewRecord add message, content {}", content);
try {
......@@ -227,15 +159,11 @@ public class UcGatewayIptv2IptvConsumer {
break;
default:
break;
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
log.error("观影事件处理异常,cause ==>> {}", e.getMessage());
if (MapUtils.isNotEmpty(error)) {
String errorStart = this.error.get("start");
......@@ -249,14 +177,11 @@ public class UcGatewayIptv2IptvConsumer {
}
e.printStackTrace();
}
}
/**
* @description 添加收藏记录
* @param content 消息内容
......@@ -265,7 +190,7 @@ public class UcGatewayIptv2IptvConsumer {
@RabbitListener(queues = "#{rabbitMqSourceConfig.getUcgCollectionQueueAdd()}",
containerFactory = "#{rabbitMqSourceConfig.getUcgCollectionSource()}",
autoStartup = "#{rabbitMqSourceConfig.getUcgCollectionStartUp()}",
ackMode = "MANUAL")
ackMode = "AUTO")
public void collectionConsumerAdd(Channel channel, Message message, String content) throws IOException {
log.info("receive collectionConsumerAdd add message, content {}", content);
......@@ -294,11 +219,10 @@ public class UcGatewayIptv2IptvConsumer {
}
}
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
log.error("添加收藏记录事件处理异常,cause ==>> {}", e.getMessage());
if (MapUtils.isNotEmpty(error)) {
String errorStart = this.error.get("start");
......@@ -312,24 +236,22 @@ public class UcGatewayIptv2IptvConsumer {
}
e.printStackTrace();
}
}
/**
* @description 添加收藏记录
* @description 删除收藏记录
* @param content 消息内容
*/
@RabbitHandler
@RabbitListener(queues = "#{rabbitMqSourceConfig.getUcgCollectionQueueDelete()}",
containerFactory = "#{rabbitMqSourceConfig.getUcgCollectionSource()}",
autoStartup = "#{rabbitMqSourceConfig.getUcgCollectionStartUp()}",
ackMode = "MANUAL")
ackMode = "AUTO")
public void collectionConsumerDelete(Channel channel, Message message, String content) throws IOException {
log.info("receive collectionConsumerDelete add message, content {}", content);
try {
JSONObject jsonObject = JSON.parseObject(content, JSONObject.class);
if (Objects.nonNull(content)) {
String evt = jsonObject.get("evt").toString();
......@@ -353,11 +275,9 @@ public class UcGatewayIptv2IptvConsumer {
}
}
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
log.error("删除收藏记录事件处理异常,cause ==>> {}", e.getMessage());
if (MapUtils.isNotEmpty(error)) {
String errorStart = this.error.get("start");
......@@ -371,12 +291,12 @@ public class UcGatewayIptv2IptvConsumer {
}
e.printStackTrace();
}
}
/**
* @description 添加收藏记录
* @description 删除全部收藏记录
* @param content 消息内容
*/
@RabbitHandler
......@@ -412,11 +332,8 @@ public class UcGatewayIptv2IptvConsumer {
}
}
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
log.error("删除全部收藏记录事件处理异常,cause ==>> {}", e.getMessage());
if (MapUtils.isNotEmpty(error)) {
String errorStart = this.error.get("start");
......@@ -429,8 +346,6 @@ public class UcGatewayIptv2IptvConsumer {
}
}
e.printStackTrace();
}
}
}
......
......@@ -28,23 +28,4 @@ public class DataSyncMsg implements Serializable {
// 消息体
private String msgData;
/**
* 消息体
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class MsgData {
private Long memberId; // 会员id
private String memberCode;
private Long orderId;
private Long activityId;
private Long mediaId;
private Long itemId;
private String description;
private String param;
private String platformAccount;
}
}
......
......@@ -2,11 +2,11 @@ package com.topdraw.resttemplate;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.topdraw.business.module.member.address.domain.MemberAddress;
import com.topdraw.config.ResponseStatus;
import com.topdraw.mq.consumer.UcEventBusIptv2ManagementUcEngine;
import com.topdraw.mq.domain.DataSyncMsg;
import com.topdraw.mq.domain.SubscribeBean;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.http.ResponseEntity;
......@@ -42,156 +42,150 @@ public class RestTemplateClient {
}
public JSONObject dealTask(DataSyncMsg dataSyncMsg) {
JSONObject resultSet = null;
try {
String url = BASE_URL + "/uce/taskOperation/dealTask";
log.info("request uc : url is " + url + ", dataSyncMsg is " + dataSyncMsg);
String content = JSON.toJSONString(dataSyncMsg);
HashMap<Object, Object> objectObjectHashMap = new HashMap<>();
objectObjectHashMap.put("content", content);
restTemplate.postForEntity(url, objectObjectHashMap, String.class);
objectObjectHashMap.put("content", JSON.toJSONString(dataSyncMsg));
log.info("request url is ==>> {} || param is ==>> {} ", url, objectObjectHashMap);
ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, objectObjectHashMap, String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
String entityBody = responseEntity.getBody();
JSONObject jsonObject = JSONObject.parseObject(entityBody);
if (jsonObject.getInteger("businessCode").equals(ResponseStatusConstant.OK)) {
resultSet = jsonObject.getJSONArray("resultSet").getJSONObject(0);
}
}
log.info("uc response: " + resultSet.toJSONString());
return resultSet;
}
log.info("response ==>> {}", responseEntity);
return getParseResponseResult(responseEntity);
public JSONObject getMemberInfo(Long memberId) {
JSONObject resultSet = null;
String url = BASE_URL + "/uce/member/findById/" + memberId;
log.info("request uc : url is " + url + ", memberId is " + memberId);
ResponseEntity<String> responseEntity = restTemplate.getForEntity(url, String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
String entityBody = responseEntity.getBody();
JSONObject jsonObject = JSONObject.parseObject(entityBody);
if (jsonObject.getInteger("businessCode").equals(200)) {
resultSet = jsonObject.getJSONArray("resultSet").getJSONObject(0);
}
}
log.info("uc response: " + resultSet.toJSONString());
return resultSet;
} catch (Exception e) {
log.error("处理普通权益任务(ApiUti.dealTask)信息时出现异常,cause ==>> {}", e.getMessage());
}
public String createMemberAddress(MemberAddress member) {
String url = BASE_URL + "/uce/memberAddress/create";
log.info("request uc : url is " + url + ", memberId is " + JSONObject.toJSONString(member));
restTemplate.postForEntity(url, member, String.class);
/* ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, member, String.class);
String entityBody = "";
if (responseEntity.getStatusCode().is2xxSuccessful()) {
entityBody = responseEntity.getBody();
}
log.info("uc response: " + entityBody);*/
return null;
}
public String unsubscribe(SubscribeBean subscribeBean) {
public JSONObject unsubscribe(SubscribeBean subscribeBean) {
try {
String url = BASE_URL + "/uce/userOperation/unsubscribe";
String content = JSON.toJSONString(subscribeBean);
HashMap<Object, Object> objectObjectHashMap = new HashMap<>();
objectObjectHashMap.put("content", content);
log.info("reobjectObjectHashMap ===>> [{}]",objectObjectHashMap);
restTemplate.postForEntity(url, objectObjectHashMap, String.class);
log.info("unsubscribe ===>> success");
/*ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, subscribeBean, String.class);
String entityBody = "";
if (responseEntity.getStatusCode().is2xxSuccessful()) {
entityBody = responseEntity.getBody();
log.info("request url is ==>> {} || param is ==>> {} ", url, objectObjectHashMap);
ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, objectObjectHashMap, String.class);
log.info("response ==>> {}", responseEntity);
return getParseResponseResult(responseEntity);
} catch (Exception e) {
log.error("处理微信取关(ApiUti.unsubscribe)信息时出现异常,cause ==>> {}", e.getMessage());
}
log.info("uc response: " + entityBody);*/
return null;
}
public String subscribe(SubscribeBean subscribeBean) {
public JSONObject subscribe(SubscribeBean subscribeBean) {
try {
String url = BASE_URL + "/uce/userOperation/subscribe";
String content = JSON.toJSONString(subscribeBean);
HashMap<String, String> objectObjectHashMap = new HashMap<>();
objectObjectHashMap.put("content", content);
log.info("reobjectObjectHashMap ===>> [{}]",objectObjectHashMap);
restTemplate.postForEntity(url, objectObjectHashMap, String.class);
log.info("send subscribe request ===>> success");
/*ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, subscribeBean, String.class);
String entityBody = "";
if (responseEntity.getStatusCode().is2xxSuccessful()) {
entityBody = responseEntity.getBody();
}
log.info("uc response: " + entityBody);*/
return null;
}
public String sendQrCodeMessage(String content) {
String url = BASE_URL + "/uce/userOperation/sendQrCodeMessage";
restTemplate.postForEntity(url, content, String.class);
/* ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, content, String.class);
String entityBody = "";
if (responseEntity.getStatusCode().is2xxSuccessful()) {
entityBody = responseEntity.getBody();
log.info("request url is ==>> {} || param is ==>> {} ", url, objectObjectHashMap);
ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, subscribeBean, String.class);
log.info("response ==>> {}", responseEntity);
return getParseResponseResult(responseEntity);
} catch (Exception e) {
log.error("处理微信关注(ApiUti.subscribe)信息时出现异常,cause ==>> {}", e.getMessage());
}
log.info("uc response: " + entityBody);*/
return null;
}
public String addCollection(String content) {
public JSONObject addCollection(String content) {
try {
String url = BASE_URL + "/uce/userOperation/addCollection";
//处理接口调用 中文不显示问题
content = new String(Base64.getEncoder().encode(content.getBytes(StandardCharsets.UTF_8)));
restTemplate.postForEntity(url, content, String.class);
/* ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, content, String.class);
String entityBody = "";
if (responseEntity.getStatusCode().is2xxSuccessful()) {
entityBody = responseEntity.getBody();
ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, content, String.class);
log.info("response ==>> {}", responseEntity);
return getParseResponseResult(responseEntity);
} catch (Exception e) {
log.error("添加观影记录(ApiUti.addCollection)信息时出现异常,cause ==>> {}", e.getMessage());
}
log.info("uc response: " + entityBody);*/
return null;
}
public String deleteCollection(String content) {
public JSONObject deleteCollection(String content) {
try {
String url = BASE_URL + "/uce/userOperation/deleteCollection";
//处理接口调用 中文不显示问题
content = new String(Base64.getEncoder().encode(content.getBytes(StandardCharsets.UTF_8)));
restTemplate.postForEntity(url, content, String.class);
/* ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, content, String.class);
String entityBody = "";
if (responseEntity.getStatusCode().is2xxSuccessful()) {
entityBody = responseEntity.getBody();
ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, content, String.class);
log.info("response ==>> {}", responseEntity);
return getParseResponseResult(responseEntity);
} catch (Exception e) {
log.error("删除一条观影记录(ApiUti.deleteCollection)信息时出现异常,cause ==>> {}", e.getMessage());
}
log.info("uc response: " + entityBody);*/
return null;
}
public String deleteAllCollection(String content) {
public JSONObject deleteAllCollection(String content) {
try {
String url = BASE_URL + "/uce/userOperation/deleteAllCollection";
restTemplate.postForEntity(url, content, String.class);
/*ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, content, String.class);
String entityBody = "";
if (responseEntity.getStatusCode().is2xxSuccessful()) {
entityBody = responseEntity.getBody();
log.info("request url is ==>> {} || param is ==>> {} ", url, content);
ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, content, String.class);
log.info("response ==>> {}", responseEntity);
return getParseResponseResult(responseEntity);
} catch (Exception e) {
log.error("删除所有观影记录(ApiUti.deleteAllCollection)信息时出现异常,cause ==>> {}", e.getMessage());
}
log.info("uc response: " + entityBody);*/
return null;
}
public String dealViewRecord(String content) {
public JSONObject dealViewRecord(String content) {
try {
String url = BASE_URL + "/uce/userOperation/addCollection";
//处理接口调用 中文不显示问题
content = new String(Base64.getEncoder().encode(content.getBytes(StandardCharsets.UTF_8)));
ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, content, String.class);
log.info("response ==>> {}", responseEntity);
restTemplate.postForEntity(url, content, String.class);
/* ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, content, String.class);
String entityBody = "";
if (responseEntity.getStatusCode().is2xxSuccessful()) {
entityBody = responseEntity.getBody();
return getParseResponseResult(responseEntity);
} catch (Exception e) {
log.error("处理观影记录(ApiUti.dealViewRecord)信息时出现异常,cause ==>> {}", e.getMessage());
}
log.info("uc response: " + entityBody);*/
return null;
}
/**
*
* @param responseEntity
* @return
*/
private static JSONObject getParseResponseResult(ResponseEntity<String> responseEntity) {
JSONObject resultSet = null;
if (responseEntity.getStatusCode().is2xxSuccessful()) {
String entityBody = responseEntity.getBody();
JSONObject jsonObject = JSONObject.parseObject(entityBody);
if (jsonObject.getInteger("businessCode").equals(ResponseStatus.OK)) {
resultSet = jsonObject.getJSONArray("resultSet").getJSONObject(0);
}
}
log.info("result ==>> {}", resultSet);
return resultSet;
}
}
......
......@@ -18,11 +18,6 @@ public class RestTemplateTest extends BaseTest {
@Autowired
RestTemplateClient apiUtil;
@Test
public void t(){
JSONObject memberInfo = this.apiUtil.getMemberInfo(5L);
System.out.println(memberInfo);
}
@Test
public void error(){
......