UcGatewayIptv2IptvConsumer.java 11.5 KB
package com.topdraw.mq.consumer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.topdraw.exception.BadRequestException;
import com.topdraw.mq.domain.DataSyncMsg;
import com.topdraw.resttemplate.RestTemplateClient;
import com.topdraw.util.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.io.IOException;

@Component
@Slf4j
public class UcGatewayIptv2IptvConsumer {

    @Autowired
    RestTemplateClient restTemplateClient;

    /*@Value("#{rabbitMqErrorLogConfig.getUcgError()}")
    private Map<String, String> error;*/

    /**
     * 事件
     * @param content
     * @description 普通权益事件
     * @author Hongyan Wang
     * @date 2021/9/7 11:26 上午
     */
    /*@RabbitHandler
    @RabbitListener(queues = "#{rabbitMqSourceConfig.getUcgEventQueue()}",
            containerFactory = "#{rabbitMqSourceConfig.getUcgEventSource()}",
            autoStartup = "#{rabbitMqSourceConfig.getUcgEventStartUp()}",
            ackMode = "AUTO")*/
    @RabbitHandler
    @RabbitListener(queues = "#{rabbitMqConfig.getUcgEventQueue()}",
            ackMode = "AUTO")
    public void eventConsumer(Channel channel, Message message, String content) throws IOException {
        log.info("消费uc-gateway任务消息,参数eventConsumer# content ==>> {} ", content);
        try {

            if (StringUtils.isEmpty(content)) {
                throw new BadRequestException("无参数");
            }

            DataSyncMsg dataSyncMsg = JSONUtil.parseMsg2Object(content, DataSyncMsg.class);
            log.info("消费uc-gateway任务消息,解析参数结果,eventBusConsumer# dataSyncMsg ==>> {} ", dataSyncMsg);

            if (!this.restTemplateClient.dealTask(dataSyncMsg)) {
                throw new BadRequestException("uce处理任务响应超时");
            }

        } catch (Exception e) {
            log.error("普通权益事件处理异常,eventConsumer# message ==>> {}", e.getMessage());
        }
    }



    /**
     * @description 删除全部收藏记录
     * @param content 消息内容
     */
    /*@RabbitHandler
    @RabbitListener(queues = "#{rabbitMqSourceConfig.getGrowthReportQueue()}",
            containerFactory = "#{rabbitMqSourceConfig.getGrowthReportSource()}",
            autoStartup = "#{rabbitMqSourceConfig.getGrowthReportStartUp()}",
            ackMode = "AUTO")*/
    @RabbitHandler
    @RabbitListener(queues = "#{rabbitMqConfig.getGrowthReportQueue()}",
            ackMode = "AUTO")
    public void dealGrowthReport(Channel channel, Message message, String content) throws IOException {
        log.info("消费uc-gateway成长报告消息,参数 dealGrowthReport# content ==>> {}", content);

        try {

            if (StringUtils.isEmpty(content)) {
                throw new BadRequestException("无参数");
            }

            JSONObject jsonObject = JSON.parseObject(content, JSONObject.class);
            log.info("消费uc-gateway任务消息,解析参数结果,dealGrowthReport# jsonObject ==>> {} ", jsonObject);

            if (!this.restTemplateClient.saveGrowthReport(JSON.toJSONString(jsonObject.get("msgData")))) {
                log.error("同步大屏成长报告失败,uce接口响应超时");
            }

        } catch (Exception e) {
            log.error("消费uc-gateway成长报告消息异常,dealGrowthReport# message ==>> {}", e.getMessage());
        }
    }

    /**
     * @description 收藏记录
     * @param content 消息内容
     */
    @RabbitHandler
    @RabbitListener(queues = "#{rabbitMqConfig.getUcgCollectionQueue()}", ackMode = "AUTO")
    public void collectionConsumer(Channel channel, Message message, String content) throws IOException {
        // TODO 收藏统一的操作入口,uc-gateway需要修改发送的队列
        log.info("消费uc-gateway收藏操作消息,参数 collectionConsumer# content ==>> {}", content);

        try {

            if (StringUtils.isEmpty(content)) {
                throw new BadRequestException("无参数");
            }

            JSONObject jsonObject = JSON.parseObject(content, JSONObject.class);
            log.info("消费uc-gateway收藏操作消息,解析参数结果,collectionConsumer# jsonObject ==>> {} ", jsonObject);

            String evt = jsonObject.get("evt").toString();
            String msgData = jsonObject.get("msgData").toString();
            switch (evt.toUpperCase()) {
                // 添加收藏
                case "ADDCOLLECTION":
                    this.restTemplateClient.addCollection(msgData);
                    break;
                // 删除收藏
                case "DELETECOLLECTION":
                    this.restTemplateClient.deleteCollection(msgData);
                    break;
                // 删除全部收藏
                case "DELETEALLCOLLECTION":
                    this.restTemplateClient.deleteAllCollection(msgData);
                    break;
                default:
                    break;

            }

        } catch (Exception e) {
            log.error("消费uc-gateway收藏操作消息异常,collectionConsumer# massage ==>> {}", e.getMessage());
        }
    }

    /**
     * @description 处理观影记录
     * @param content 消息内容
     */
    /*@RabbitHandler
    @RabbitListener(queues = "#{rabbitMqSourceConfig.getViewRecordQueue()}",
            containerFactory = "#{rabbitMqSourceConfig.getViewRecordSource()}",
            autoStartup = "#{rabbitMqSourceConfig.getViewRecordStartUp()}",
            ackMode = "AUTO")*/
    @RabbitHandler
    @RabbitListener(queues = "#{rabbitMqConfig.getViewRecordQueue()}", ackMode = "AUTO")
    public void viewRecordConsumer(Channel channel, Message message, String content) throws IOException {
        log.info("消费uc-gateway观影记录消息,参数 viewRecordConsumer# content ==>> {}", content);

        try {

            if (StringUtils.isEmpty(content)) {
                throw new BadRequestException("无参数");
            }

            JSONObject jsonObject = JSON.parseObject(content, JSONObject.class);
            log.info("消费uc-gateway观影记录消息,解析参数结果,viewRecordConsumer# jsonObject ==>> {} ", jsonObject);

            String evt = jsonObject.get("evt").toString();
            String msgData = jsonObject.get("msgData").toString();
            // 观影
            if ("VIEWING".equals(evt.toUpperCase())) {
                this.restTemplateClient.dealViewRecord(msgData);
            }

        } catch (Exception e) {
            log.error("消费uc-gateway观影记录消息,viewRecordConsumer# message ==>> {}", e.getMessage());
        }
    }


    /**
     * @description 添加收藏记录
     * @param content 消息内容
     */
    /*@RabbitHandler
    @RabbitListener(queues = "#{rabbitMqSourceConfig.getUcgCollectionQueueAdd()}",
            containerFactory = "#{rabbitMqSourceConfig.getUcgCollectionSource()}",
            autoStartup = "#{rabbitMqSourceConfig.getUcgCollectionStartUp()}",
            ackMode = "AUTO")*/
    @RabbitHandler
    @RabbitListener(queues = "#{rabbitMqConfig.getUcgCollectionQueueAdd()}",
            ackMode = "AUTO")
    public void collectionConsumerAdd(Channel channel, Message message, String content) throws IOException {
        log.info("消费uc-gateway添加收藏记录事件,参数 collectionConsumerAdd# content ==>> {} ", content);

        try {

            if (StringUtils.isEmpty(content)) {
                throw new BadRequestException("无参数");
            }

            JSONObject jsonObject = JSON.parseObject(content, JSONObject.class);
            log.info("消费uc-gateway添加收藏记录事件,解析参数结果,collectionConsumerAdd# jsonObject ==>> {} ", jsonObject);

            String evt = jsonObject.get("evt").toString();
            String msgData = jsonObject.get("msgData").toString();
            // 添加收藏
            if ("ADDCOLLECTION".equals(evt.toUpperCase())) {
                this.restTemplateClient.addCollection(msgData);
            }


        } catch (Exception e) {
            log.error("添加收藏记录事件处理异常,cause ==>> {}", e.getMessage());
        }
    }

    /**
     * @description 删除收藏记录
     * @param content 消息内容
     */
   /* @RabbitHandler
    @RabbitListener(queues = "#{rabbitMqSourceConfig.getUcgCollectionQueueDelete()}",
            containerFactory = "#{rabbitMqSourceConfig.getUcgCollectionSource()}",
            autoStartup = "#{rabbitMqSourceConfig.getUcgCollectionStartUp()}",
            ackMode = "AUTO")*/
    @RabbitHandler
    @RabbitListener(queues = "#{rabbitMqConfig.getUcgCollectionQueueDelete()}", ackMode = "AUTO")
    public void collectionConsumerDelete(Channel channel, Message message, String content) throws IOException {
        log.info("消费uc-gateway删除收藏记录,参数 collectionConsumerDelete# content ==>> {}", content);

        try {

            if (StringUtils.isEmpty(content)) {
                throw new BadRequestException("无参数");
            }

            JSONObject jsonObject = JSON.parseObject(content, JSONObject.class);
            log.info("消费uc-gateway删除收藏记录,解析参数结果,collectionConsumerDelete# jsonObject ==>> {} ", jsonObject);

            String evt = jsonObject.get("evt").toString();
            String msgData = jsonObject.get("msgData").toString();
            // 删除收藏
            if ("DELETECOLLECTION".equals(evt.toUpperCase())) {
                this.restTemplateClient.deleteCollection(msgData);
            }

        } catch (Exception e) {

            log.error("消费uc-gateway删除收藏记录异常,collectionConsumerDelete# message ==>> {}", e.getMessage());

        }
    }

    /**
     * @description 删除全部收藏记录
     * @param content 消息内容
     */
    /*@RabbitHandler
    @RabbitListener(queues = "#{rabbitMqSourceConfig.getUcgCollectionQueueDeleteAll()}",
            containerFactory = "#{rabbitMqSourceConfig.getUcgCollectionSource()}",
            autoStartup = "#{rabbitMqSourceConfig.getUcgCollectionStartUp()}",
            ackMode = "MANUAL")*/
    @RabbitHandler
    @RabbitListener(queues = "#{rabbitMqConfig.getUcgCollectionQueueDeleteAll()}",
            ackMode = "AUTO")
    public void collectionConsumerDeleteAll(Channel channel, Message message, String content) throws IOException {
        log.info("消费uc-gateway删除全部收藏记录,参数 collectionConsumerDeleteAll# content ==>> {}", content);

        try {

            if (StringUtils.isEmpty(content)) {
                throw new BadRequestException("无参数");
            }

            JSONObject jsonObject = JSON.parseObject(content, JSONObject.class);
            log.info("消费uc-gateway删除全部收藏记录,解析参数结果,collectionConsumerDeleteAll# jsonObject ==>> {} ", jsonObject);

            String evt = jsonObject.get("evt").toString();
            String msgData = jsonObject.get("msgData").toString();
            // 删除全部收藏
            if ("DELETEALLCOLLECTION".equals(evt.toUpperCase())) {
                this.restTemplateClient.deleteAllCollection(msgData);
            }

        } catch (Exception e) {
            log.error("消费uc-gateway删除全部收藏记录,collectionConsumerDeleteAll# message ==>> {}", e.getMessage());
        }
    }

}