1.update
Showing
3 changed files
with
2 additions
and
87 deletions
1 | package com.topdraw.mq.consumer; | ||
2 | |||
3 | import com.rabbitmq.client.Channel; | ||
4 | import com.topdraw.mq.domain.TableOperationMsg; | ||
5 | import com.topdraw.resttemplate.RestTemplateClient; | ||
6 | import com.topdraw.util.FileUtil; | ||
7 | import com.topdraw.util.JSONUtil; | ||
8 | import lombok.extern.slf4j.Slf4j; | ||
9 | import org.apache.commons.collections4.MapUtils; | ||
10 | import org.springframework.amqp.core.Message; | ||
11 | import org.springframework.amqp.rabbit.annotation.RabbitHandler; | ||
12 | import org.springframework.amqp.rabbit.annotation.RabbitListener; | ||
13 | import org.springframework.beans.factory.annotation.Autowired; | ||
14 | import org.springframework.beans.factory.annotation.Value; | ||
15 | import org.springframework.stereotype.Component; | ||
16 | import org.springframework.util.Assert; | ||
17 | |||
18 | import java.io.IOException; | ||
19 | import java.time.LocalDate; | ||
20 | import java.util.Map; | ||
21 | |||
22 | @Component | ||
23 | @Slf4j | ||
24 | public class UcEngineIptv2ManagementConsumer { | ||
25 | |||
26 | @Autowired | ||
27 | AutoRoute autoUser; | ||
28 | |||
29 | @Autowired | ||
30 | RestTemplateClient restTemplateClient; | ||
31 | |||
32 | @Value("#{rabbitMqErrorLogConfig.getUceError()}") | ||
33 | private Map<String, String> error; | ||
34 | |||
35 | /** | ||
36 | * 事件 | ||
37 | * @param content | ||
38 | * @description 基础数据同步 | ||
39 | * @author Hongyan Wang | ||
40 | * @date 2021/9/7 11:26 上午 | ||
41 | */ | ||
42 | /*@RabbitHandler | ||
43 | @RabbitListener(queues = "#{rabbitMqConfig.getUceQueue()}", | ||
44 | containerFactory = "serviceRabbitListenerContainerFactory", ackMode = "MANUAL")*/ | ||
45 | public void ucEventConsumer(Channel channel, Message message, String content) throws IOException { | ||
46 | log.info(" receive dataSync msg , content is : {} ", content); | ||
47 | try { | ||
48 | |||
49 | TableOperationMsg tableOperationMsg = this.parseContent(content); | ||
50 | |||
51 | autoUser.route(tableOperationMsg); | ||
52 | |||
53 | channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); | ||
54 | |||
55 | } catch (Exception e) { | ||
56 | |||
57 | channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); | ||
58 | |||
59 | if (MapUtils.isNotEmpty(error)) { | ||
60 | String errorStart = this.error.get("start"); | ||
61 | |||
62 | if (errorStart.equalsIgnoreCase("true")) { | ||
63 | String fileName = this.error.get("fileName")+"_"+ LocalDate.now() +".log"; | ||
64 | String filePath = this.error.get("filePath"); | ||
65 | String filePath1 = filePath+fileName; | ||
66 | FileUtil.writeStringToFile2(filePath1, content, e.getMessage()); | ||
67 | } | ||
68 | |||
69 | } | ||
70 | |||
71 | e.printStackTrace(); | ||
72 | } | ||
73 | log.info("ucEventConsumer ====>>>> end"); | ||
74 | } | ||
75 | |||
76 | /** | ||
77 | * 数据解析 | ||
78 | * @param content | ||
79 | * @return | ||
80 | */ | ||
81 | private TableOperationMsg parseContent(String content) { | ||
82 | TableOperationMsg tableOperationMsg = JSONUtil.parseMsg2Object(content,TableOperationMsg.class); | ||
83 | Assert.notNull(tableOperationMsg,"ERROR -->> operationConsumer -->> parseContent -->> 【dataSyncMsg】 not be null !!"); | ||
84 | return tableOperationMsg; | ||
85 | } | ||
86 | |||
87 | } |
... | @@ -71,6 +71,7 @@ public class UcEngineManagement2IptvConsumer { | ... | @@ -71,6 +71,7 @@ public class UcEngineManagement2IptvConsumer { |
71 | 71 | ||
72 | e.printStackTrace(); | 72 | e.printStackTrace(); |
73 | } | 73 | } |
74 | |||
74 | log.info("ucEventConsumer ====>>>> end"); | 75 | log.info("ucEventConsumer ====>>>> end"); |
75 | } | 76 | } |
76 | 77 | ... | ... |
-
Please register or sign in to post a comment