UcEventBusIptv2ManagementUcEngine.java 5.22 KB
package com.topdraw.mq.consumer;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.topdraw.business.module.task.template.constant.TaskEventName;
import com.topdraw.business.module.task.template.constant.TaskEventType;
import com.topdraw.exception.BadRequestException;
import com.topdraw.mq.domain.DataSyncMsg;
import com.topdraw.resttemplate.RestTemplateClient;
import com.topdraw.util.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.*;

@Component
@Slf4j
public class UcEventBusIptv2ManagementUcEngine {

    @Autowired
    private RestTemplateClient restTemplateClient;

    /*@Value("#{rabbitMqErrorLogConfig.getEventBusError()}")
    private Map<String, String> error;*/

    /**
     * 事件
     * @param content
     * @description 基础数据同步
     * @author Hongyan Wang
     * @date 2021/9/7 11:26 上午
     */
    /*@RabbitHandler
    @RabbitListener(queues = "#{rabbitMqSourceConfig.getEventBusQueue()}",
            containerFactory = "#{rabbitMqSourceConfig.getEventBusSource()}",
            autoStartup = "#{rabbitMqSourceConfig.getEventBusStartUp()}",
            ackMode = "AUTO")*/
    @RabbitHandler
    @RabbitListener(queues = "#{rabbitMqConfig.getEventBusQueue()}",
            ackMode = "AUTO")
    public void eventBusConsumer(Channel channel, Message message, String content) throws Exception {
        log.info("消费eventBus数据,参数 eventBusConsumer# content ==>> {} ", content);
        try {

            if (StringUtils.isBlank(content)) {
                throw new BadRequestException("无参数");
            }

            DataSyncMsg dataSyncMsg = JSONUtil.parseMsg2Object(content, DataSyncMsg.class);
            log.info("消费eventBus数据,解析参数,eventBusConsumer# ==>> {} ", dataSyncMsg);

            if (Objects.nonNull(dataSyncMsg)) {

                String evt = dataSyncMsg.getEvt();
                if (StringUtils.isBlank(evt)) {
                    log.error("消费eventBus数据异常,eventBusConsumer# message ==>> eventBus事件类型(evt)为空");
                    throw new BadRequestException("参数错误,事件类型 evt不存在");
                }

                LocalDateTime time = dataSyncMsg.getTime();
                if (Objects.isNull(time)) {
                    log.error("消费eventBus数据异常,eventBusConsumer# message ==>> 发送时间(time)不存在");
                    throw new BadRequestException("参数错误,发送时间(time)不得为空");
                } /*else {
                    if (time.isAfter(LocalDateTime.now()) || time.toLocalDate().compareTo(LocalDate.now()) != 0) {
                        log.error("参数错误,事件发送时间(time)非法 ==>> {}", time);
                        throw new BadRequestException("参数错误,事件发送时间非法 ");
                    }
                }*/

                String msgData = dataSyncMsg.getMsgData();
                if (StringUtils.isBlank(msgData)) {
                    log.error("消费eventBus数据异常,eventBusConsumer# message ==>> 消息体(msgData)为空");
                    throw new BadRequestException("参数错误,消息体(msgData)不得为空");
                }

                switch (dataSyncMsg.getEvt().toUpperCase()) {

                    // 播放记录
                    case TaskEventName.PLAY:
                        this.doPlayEvent(dataSyncMsg);
                        break;

                    default:
                        log.info("无可处理的任务");
                        break;
                }

            }

        } catch (Exception e) {

            log.error("eventBus消费异常,eventBusConsumer# message ==>> {}", e.getMessage());

            // TODO使用slf4j记录日志
            /*if (MapUtils.isNotEmpty(error)) {
                String errorStart = this.error.get("start");

                if (errorStart.equalsIgnoreCase("true")) {
                    String fileName = this.error.get("fileName")+"_"+LocalDate.now() +".log";
                    String filePath = this.error.get("filePath");
                    String filePath1 = filePath+fileName;
                    FileUtil.writeStringToFile2(filePath1, content, e.getMessage());
                }

            }*/

        }

    }

    /**
     *
     * @param playContent
     */
    private void doPlayEvent(DataSyncMsg playContent) {
        playContent.setEvent(TaskEventType.PLAY);
        String msgData = playContent.getMsgData();
        JSONObject jsonObject = JSONObject.parseObject(msgData, JSONObject.class);
        Object platformAccount = jsonObject.get("platformAccount");
        if (Objects.nonNull(platformAccount)) {
            boolean response = this.restTemplateClient.dealTask(playContent);
            if (!response) {
                log.error("uc-engine响应超时,请检查uc-engine服务");
                throw new BadRequestException("uc-engine响应超时");
            }
        }
    }

}