UcEventBusIptv2ManagementUcEngine.java 12.3 KB
package com.topdraw.mq.consumer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.topdraw.business.module.member.service.MemberService;
import com.topdraw.business.module.member.service.dto.MemberDTO;
import com.topdraw.business.module.task.attribute.service.TaskAttrService;
import com.topdraw.business.module.task.attribute.service.dto.TaskAttrDTO;
import com.topdraw.business.module.task.domain.Task;
import com.topdraw.business.module.task.service.TaskService;
import com.topdraw.business.module.task.template.service.TaskTemplateService;
import com.topdraw.business.module.task.template.service.dto.TaskTemplateDTO;
import com.topdraw.business.module.user.iptv.service.UserTvService;
import com.topdraw.business.module.user.iptv.service.dto.UserTvDTO;
import com.topdraw.exception.EntityNotFoundException;
import com.topdraw.mq.domain.DataSyncMsg;
import com.topdraw.resttemplate.RestTemplateClient;
import com.topdraw.util.DateUtil;
import com.topdraw.util.FileUtil;
import com.topdraw.util.JSONUtil;
import com.topdraw.utils.RedisUtils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.text.ParseException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.*;

@Component
@Slf4j
public class UcEventBusIptv2ManagementUcEngine {

    @Autowired
    private TaskService taskService;
    @Autowired
    private UserTvService userTvService;
    @Autowired
    private MemberService memberService;
    @Autowired
    private TaskAttrService taskAttrService;
    @Autowired
    private TaskTemplateService taskTemplateService;
    @Autowired
    private RestTemplateClient restTemplateClient;
    @Autowired
    private RedisUtils redisUtils;


    @Value("#{rabbitMqErrorLogConfig.getEventBusError()}")
    private Map<String, String> error;

    /**
     * 事件
     * @param content
     * @description 基础数据同步
     * @author Hongyan Wang
     * @date 2021/9/7 11:26 上午
     */
    @RabbitHandler
    @RabbitListener(queues = "#{rabbitMqSourceConfig.getEventBusQueue()}",
            containerFactory = "#{rabbitMqSourceConfig.getEventBusSource()}",
            autoStartup = "#{rabbitMqSourceConfig.getEventBusStartUp()}",
            ackMode = "MANUAL")
    public void eventBusConsumer(Channel channel, Message message, String content) throws ParseException, IOException {
        log.info(" receive dataSync msg , content is : {} ", content);
        try {

            this.parseContent(content);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        } catch (Exception e) {

            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

            if (MapUtils.isNotEmpty(error)) {
                String errorStart = this.error.get("start");

                if (errorStart.equalsIgnoreCase("true")) {
                    String fileName = this.error.get("fileName")+"_"+LocalDate.now() +".log";
                    String filePath = this.error.get("filePath");
                    String filePath1 = filePath+fileName;
                    FileUtil.writeStringToFile2(filePath1, content, e.getMessage());
                }

            }

            e.printStackTrace();
        }
        log.info("ucEventConsumer ====>>>> end");
    }


    /**
     * 数据解析
     * @param content
     * @return
     */
    private void parseContent(String content) throws ParseException {

        PlayContent commonMsg = JSONUtil.parseMsg2Object(content, PlayContent.class);

        String evt = commonMsg.getEvt();
        switch (evt.toUpperCase()) {

            case "PLAY":
                PlayContent playContent = JSONUtil.parseMsg2Object(content, PlayContent.class);

                PlayContent.MsgData msgData = playContent.getMsg();
                if (Objects.nonNull(msgData)) {
                    String time = playContent.getTime();
                    String formatDate = DateUtil.formatDate(time);
                    Integer deviceType = playContent.getDeviceType();
                    String platformAccount = msgData.getPlatformAccount();

                    String mediaCode = msgData.getMediaCode();
                    Long mediaId = msgData.getMediaId();
                    String mediaName = msgData.getMediaName();
                    Integer playDuration = msgData.getPlayDuration();
                    if (Objects.isNull(playDuration) || playDuration == 0) {
                        return;
                    }
                    log.info("playDuration ==>> {}", playDuration);

                    DataSyncMsg dataSyncMsg = new DataSyncMsg();
                    dataSyncMsg.setEvt(evt);
                    DataSyncMsg.MsgData msg = new DataSyncMsg.MsgData();

                    Integer playDurationValueTotal = 0;
                    if (StringUtils.isNotBlank(platformAccount)) {

                        UserTvDTO userTvDTO = this.userTvService.findByPlatformAccount(platformAccount);

                        if(Objects.nonNull(userTvDTO)) {
                            // 用大屏账号+日期做为key,并判断这个key是否存在 ,数据类型为hash eg:<total,1>,<1,playDuration>,<2,playDuration>
                            String key = platformAccount+"|"+formatDate;
                            Map<Object, Object> hmget =
                                    this.redisUtils.hmget(key);

                            if (MapUtils.isEmpty(hmget)) {

                                // 初始化播放总时长<total>和第一个播放时间
                                playDurationValueTotal = playDuration;
                                Map<String, Object> map = new HashMap<>();
                                map.put("total", playDurationValueTotal);
                                map.put("1", playDuration);
                                this.redisUtils.hmset(key, map, 129600);

                            } else {

                                // 计算播放总时长 total = 播放总时长+当前播放时长
                                Integer total = this.getRedisTotal(hmget);
                                playDurationValueTotal = total + playDuration;

                            }

                            Integer totalKey = this.getRedisTotalKey(hmget);
                            Integer maxSize = totalKey + 1;

                            this.checkTask(playDurationValueTotal, time, deviceType,
                            mediaCode, mediaId, mediaName, dataSyncMsg, msg, userTvDTO);

                            Map<String, Object> map = new HashMap<>();
                            map.put(String.valueOf(maxSize), playDuration);
                            map.put("total", playDurationValueTotal);
                            this.redisUtils.hmset(key, map);
                        }

                    }

                }

                break;

        }

        //return null;
    }

    private DataSyncMsg checkTask(Integer playDurationValueTotal, String time, Integer deviceType, String mediaCode,
                                  Long mediaId, String mediaName, DataSyncMsg dataSyncMsg,
                                  DataSyncMsg.MsgData msgData, UserTvDTO userTvDTO) {

        List<TaskAttrDTO> taskAttrDTOList = new ArrayList<>();
        TaskTemplateDTO taskTemplateDTO = this.taskTemplateService.findByType(8);
        if (Objects.nonNull(taskTemplateDTO.getId())) {
            List<Task> taskList = this.taskService.findByTemplateId(taskTemplateDTO.getId());
            if (CollectionUtils.isNotEmpty(taskList)) {
                for (Task task : taskList) {
                    TaskAttrDTO taskAttrDTO = this.taskAttrService.findByTaskId(task.getId());
                    taskAttrDTOList.add(taskAttrDTO);
                }
            }
        }

        List<List<Integer>> attrList = new ArrayList<>();
        if (CollectionUtils.isNotEmpty(taskAttrDTOList)) {

            for (TaskAttrDTO taskAttrDTO : taskAttrDTOList) {

                String attrStr = taskAttrDTO.getAttrStr();
                if (StringUtils.isNotBlank(attrStr)) {

                    JSONObject parse = JSONObject.parseObject(attrStr, JSONObject.class);
                    List<Integer> value = (List<Integer>) parse.get("value");
                    attrList.add(value);
                }

            }

        }

        int size = attrList.size();

        DataSyncMsg dataSyncMsg1 = null;

        if (size > 0) {

            for (int i = size-1; i >= 0; i--) {

                Integer integer = attrList.get(i).get(0);

                if (playDurationValueTotal >= integer) {
                    dataSyncMsg1 = getDataSyncMsg(time, deviceType, mediaCode, mediaId, mediaName, integer, dataSyncMsg,
                            msgData, userTvDTO);
                    dataSyncMsg1.setEvt("PLAY");
                    dataSyncMsg1.setEvent(8);
                    dataSyncMsg1.setTime(LocalDateTime.now());
                    dataSyncMsg1.setDeviceType(1);
                    this.taskDeal(dataSyncMsg1);
                }

            }

        }

        return dataSyncMsg1;
    }

    private Integer getRedisTotalKey(Map<Object, Object> hmget) {
        Set<Object> objects = hmget.keySet();
        return objects.size();
    }

    private Integer getRedisTotal(Map<Object, Object> hmget) {
        Set<Object> objects = hmget.keySet();

        Integer playDurationValueTotal_ = 0;

        for (Object key_ : objects) {

            if (key_.toString().equalsIgnoreCase("total")) {
                playDurationValueTotal_ = Integer.valueOf(hmget.get(key_).toString());
                return playDurationValueTotal_;

            } else {

                continue;

            }

        }

        return playDurationValueTotal_;

    }

    private DataSyncMsg getDataSyncMsg(String time, Integer deviceType, String mediaCode, Long mediaId, String mediaName,
                                       Integer playDuration, DataSyncMsg dataSyncMsg, DataSyncMsg.MsgData msgData1, UserTvDTO userTvDTO) {
        String priorityMemberCode = userTvDTO.getPriorityMemberCode();
        log.info("priorityMemberCode ==>> {}", priorityMemberCode);
        String memberCode = "";
        if (StringUtils.isNotBlank(priorityMemberCode)) {
            memberCode = priorityMemberCode;
        } else {
            memberCode = this.memberService.findById(userTvDTO.getMemberId()).getCode();
        }

        log.info("memberCode ==>> {}", priorityMemberCode);
        if (StringUtils.isBlank(memberCode))
            throw new EntityNotFoundException(MemberDTO.class, "memberCode", "memberCode is null");

        msgData1.setMemberCode(memberCode);

//        msgData1.setEvent(8);
//        msgData1.setDeviceType(deviceType);
        msgData1.setMediaId(mediaId);

        JSONObject param = new JSONObject();
        // 增量
        param.put("playDuration", playDuration);
        msgData1.setParam(JSON.toJSONString(param));
        JSONObject description = new JSONObject();
        description.put("mediaId", mediaId);
        description.put("mediaName", mediaName);
        description.put("playDuration", playDuration);
        description.put("mediaCode", mediaCode);
        description.put("time", time);
        msgData1.setDescription(JSON.toJSONString(description));
        dataSyncMsg.setMsg(msgData1);
        log.info("dataSyncMsg ==>> {}", dataSyncMsg);
        return dataSyncMsg;
    }

    /**
     * 任务处理
     * @param dataSyncMsg
     */
    private void taskDeal(DataSyncMsg dataSyncMsg) {
        this.restTemplateClient.dealTask(dataSyncMsg);
    }

    @Data
    static class PlayContent {
        private String evt;
        private Integer event;
        private Integer deviceType;
        private String time;
        private MsgData msg;

        @Data
        static class MsgData {
            private String platformAccount;
            private Integer playDuration;
            private Long mediaId;
            private String mediaCode;
            private String mediaName;
        }
    }
}