AsyncMqProducer.java 3.14 KB
package com.topdraw.aspect;


import com.alibaba.fastjson.JSON;
import com.topdraw.mq.domain.TableOperationMsg;
import com.topdraw.mq.producer.MessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.*;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import java.lang.reflect.Method;

@Component
@Slf4j
@Aspect
public class AsyncMqProducer {

    private static final Logger LOG = LoggerFactory.getLogger(AsyncMqProducer.class);

    @Autowired
    MessageProducer messageProducer;

    @Resource(name = "executorTask")
    ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Pointcut(value = "@annotation(asyncMqSend)")
    public void sendMqMsg(AsyncMqSend asyncMqSend){
        LOG.info("AsyncMqProducer ===>>> sendMqMsg ====>> start");
    }

    @After("sendMqMsg(asyncMqSend)")
    public void doAfter(JoinPoint joinPoint, AsyncMqSend asyncMqSend){
        LOG.info("AsyncMqProducer ===>>> doAfter ====>> start");
        boolean open = asyncMqSend.open();
        if (open) {
            try {
                this.doTask(joinPoint,asyncMqSend);
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
        LOG.info("AsyncMqProducer ===>>> doAfter ====>> end ===>> " );
    }

    private void doTask(JoinPoint joinPoint, AsyncMqSend asyncMqSend) {
        LOG.info("AsyncMqProducer ===>>> doTask ====>> start ===>> " );

        String entityName = asyncMqSend.entityName();
        String methodName = asyncMqSend.method();

        Object defaultServiceImpl = joinPoint.getTarget();
        String defaultServiceImplName = defaultServiceImpl.getClass().getName();

        MethodSignature signature = (MethodSignature)joinPoint.getSignature();
        Method method = signature.getMethod();
        String defaultMethodName = method.getName();

        Object[] args = joinPoint.getArgs();
        Object arg = args[0];
        String defaultEntityName = arg.getClass().getName();

        TableOperationMsg tableOperationMsg = new TableOperationMsg();
        tableOperationMsg.setMethodName(StringUtils.isEmpty(methodName)?defaultMethodName:methodName);
        tableOperationMsg.setEntityBody(JSON.toJSONString(arg));
        tableOperationMsg.setInterfaceName(defaultServiceImplName);
        tableOperationMsg.setEntityName(StringUtils.isEmpty(entityName)?defaultEntityName:entityName);

        boolean async = asyncMqSend.async();
        if (async) {
            // 异步
            this.sendMqMessage(tableOperationMsg);
        } else {
            // 同步
            this.sendMqMessage(tableOperationMsg);
        }

        LOG.info("AsyncMqProducer ===>>> doTask ====>> end ===>> " );
    }

    private void sendMqMessage(TableOperationMsg tableOperationMsg){
        this.messageProducer.sendFanoutMessage(JSON.toJSONString(tableOperationMsg));
    }

}