Commit e20d47d7 e20d47d7f019921dbd8202b13e21ebe2ea93ba7b by xianghan

1.优化

1 parent 8c7571ef
...@@ -23,12 +23,12 @@ ...@@ -23,12 +23,12 @@
23 23
24 <dependencies> 24 <dependencies>
25 25
26 <!--redisson--> 26 <!--wechat-util-->
27 <dependency> 27 <!--<dependency>
28 <groupId>org.redisson</groupId> 28 <groupId>com.topdraw</groupId>
29 <artifactId>redisson</artifactId> 29 <artifactId>wechat-util</artifactId>
30 <version>3.16.3</version> 30 <version>0.0.1-SNAPSHOT</version>
31 </dependency> 31 </dependency>-->
32 32
33 <!--代码生成器--> 33 <!--代码生成器-->
34 <dependency> 34 <dependency>
......
1 package com.topdraw.config;
2
3 import org.apache.commons.collections4.CollectionUtils;
4 import org.apache.commons.lang3.StringUtils;
5 import org.springframework.amqp.core.*;
6 import org.springframework.amqp.rabbit.core.RabbitAdmin;
7 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.stereotype.Component;
9
10 import javax.annotation.PostConstruct;
11 import javax.annotation.Resource;
12 import javax.naming.ConfigurationException;
13 import java.util.List;
14 import java.util.Map;
15
16 @Component
17 public class RabbitMqBindingConfig {
18
19 /**************************************************数据源选择*************************************************************/
20
21 @Autowired
22 private RabbitMqSourceConfig rabbitMqSourceConfig;
23 @Autowired
24 private RabbitMqCustomConfig rabbitMqCustomConfig;
25
26 @Resource(name = "managementRabbitAdmin")
27 private RabbitAdmin managementRabbitAdmin;
28
29 @Resource(name = "serviceRabbitAdmin")
30 private RabbitAdmin serviceRabbitAdmin;
31
32
33 @PostConstruct
34 public void initBinding() throws ConfigurationException {
35
36 // String source = rabbitMqSourceConfig.getActiveSource();
37
38 List<Map<String, String>> list = rabbitMqCustomConfig.getList();
39 if (CollectionUtils.isNotEmpty(list)) {
40
41 for (Map<String, String> map : list) {
42
43 String exchange = map.get("exchange");
44 String exchangeType = map.get("exchange-type");
45 Exchange exchange_ = null;
46 switch (exchangeType) {
47 case ExchangeTypes.TOPIC:
48 exchange_ = ExchangeBuilder.topicExchange(exchange)
49 .durable(true).build();
50 break;
51 case ExchangeTypes.FANOUT:
52 exchange_ = ExchangeBuilder.fanoutExchange(exchange)
53 .durable(true).build();
54 break;
55 case ExchangeTypes.HEADERS:
56 exchange_ = ExchangeBuilder.headersExchange(exchange)
57 .durable(true).build();
58 break;
59
60 default:
61 exchange_ = ExchangeBuilder.directExchange(exchange)
62 .durable(true).build();
63 break;
64 }
65
66 String queue = map.get("queue");
67 Queue queue_ = new Queue(queue);
68
69 String routingKey = map.get("routing-key");
70 if (StringUtils.isBlank(routingKey)) {
71 routingKey = queue;
72 }
73
74 Binding binding_ = BindingBuilder.bind(queue_).to(exchange_).with(routingKey).and(null);
75
76
77 String active = map.get("active");
78 switch (active) {
79 case "management":
80 this.managementRabbitAdmin.declareExchange(exchange_);
81 this.managementRabbitAdmin.declareQueue(queue_);
82 this.managementRabbitAdmin.declareBinding(binding_);
83 break;
84 case "service":
85 this.serviceRabbitAdmin.declareExchange(exchange_);
86 this.serviceRabbitAdmin.declareQueue(queue_);
87 this.serviceRabbitAdmin.declareBinding(binding_);
88 break;
89 /* case "service,management":
90 this.serviceRabbitAdmin.declareExchange(exchange_);
91 this.serviceRabbitAdmin.declareQueue(queue_);
92 this.serviceRabbitAdmin.declareBinding(binding_);
93
94 this.managementRabbitAdmin.declareExchange(exchange_);
95 this.managementRabbitAdmin.declareQueue(queue_);
96 this.managementRabbitAdmin.declareBinding(binding_);
97 break;*/
98 default:
99
100 break;
101 }
102
103 }
104
105 }
106
107 }
108
109 }
1 package com.topdraw.config;
2
3 import org.springframework.amqp.core.*;
4 import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
5 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
6 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
7 import org.springframework.amqp.rabbit.core.RabbitTemplate;
8 import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
9 import org.springframework.beans.factory.annotation.Qualifier;
10 import org.springframework.beans.factory.annotation.Value;
11 import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
12 import org.springframework.context.annotation.Bean;
13 import org.springframework.context.annotation.Configuration;
14 import org.springframework.context.annotation.Primary;
15
16 @Configuration
17 public class RabbitMqConfig {
18
19 @Value("${mutil-mq.service.host}")
20 private String serviceHost;
21 @Value("${mutil-mq.service.port}")
22 private Integer servicePort;
23 @Value("${mutil-mq.service.username}")
24 private String serviceUserName;
25 @Value("${mutil-mq.service.password}")
26 private String servicePassword;
27 @Value("${mutil-mq.service.virtual-host}")
28 private String serviceVirtualHost;
29
30 @Bean(name = "serviceConnectionFactory")
31 @Primary
32 public ConnectionFactory serviceConnectionFactory(){
33 CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
34 connectionFactory.setHost(serviceHost);
35 connectionFactory.setPort(servicePort);
36 connectionFactory.setUsername(serviceUserName);
37 connectionFactory.setPassword(servicePassword);
38 connectionFactory.setVirtualHost(serviceVirtualHost);
39 return connectionFactory;
40 }
41
42 @Bean(name = "serviceRabbitListenerContainerFactory")
43 @Primary
44 public RabbitListenerContainerFactory userCenterRabbitListenerContainerFactory(
45 SimpleRabbitListenerContainerFactoryConfigurer containerFactoryConfigurer,
46 @Qualifier("serviceConnectionFactory") ConnectionFactory connectionFactory) {
47 System.out.println("userCenterRabbitListenerContainerFactory ====>> start ");
48 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
49 containerFactoryConfigurer.configure(factory,connectionFactory);
50 return factory;
51 }
52
53 @Bean(name = "serviceRabbitTemplate")
54 public RabbitTemplate userCenterRabbitTemplate(ConnectionFactory ucServiceConnectionFactory){
55 RabbitTemplate u = new RabbitTemplate(ucServiceConnectionFactory);
56 return u;
57 }
58
59
60 @Value("${mutil-mq.management.host}")
61 private String managementHost;
62 @Value("${mutil-mq.management.port}")
63 private Integer managementPort;
64 @Value("${mutil-mq.management.username}")
65 private String managementUserName;
66 @Value("${mutil-mq.management.password}")
67 private String managementPassword;
68 @Value("${mutil-mq.management.virtual-host}")
69 private String managementVirtualHost;
70
71 @Bean(name = "managementConnectionFactory")
72 public ConnectionFactory managementConnectionFactory(){
73 CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
74 connectionFactory.setHost(managementHost);
75 connectionFactory.setPort(managementPort);
76 connectionFactory.setUsername(managementUserName);
77 connectionFactory.setPassword(managementPassword);
78 connectionFactory.setVirtualHost(managementVirtualHost);
79 return connectionFactory;
80 }
81
82 @Bean(name = "managementRabbitListenerContainerFactory")
83 public RabbitListenerContainerFactory memberServiceRabbitListenerContainerFactory(
84 SimpleRabbitListenerContainerFactoryConfigurer containerFactoryConfigurer,
85 @Qualifier("managementConnectionFactory") ConnectionFactory connectionFactory) {
86 System.out.println("memberServiceRabbitListenerContainerFactory ====>> start ");
87 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
88 containerFactoryConfigurer.configure(factory,connectionFactory);
89 return factory;
90 }
91
92 @Bean(name = "managementRabbitTemplate")
93 public RabbitTemplate memberServiceRabbitTemplate(ConnectionFactory ucGatewayConnectionFactory){
94 RabbitTemplate u = new RabbitTemplate(ucGatewayConnectionFactory);
95 return u;
96 }
97
98 /**************************************************uc-getaway-samll 2 uc-consumer-samll*************************************************************/
99
100 /*public static final String GATEWAY_TO_SMALL_DIRECT = "uc.gateway.small.direct";
101
102 @Bean
103 public Queue eventSmallDirect() {
104 return new Queue(GATEWAY_TO_SMALL_DIRECT);
105 }
106
107 @Bean
108 Binding eventSmallBinding(DirectExchange managementExchange , Queue eventSmallDirect) {
109 BindingBuilder.DirectExchangeRoutingKeyConfigurer directExchangeRoutingKeyConfigurer =
110 BindingBuilder.bind(eventSmallDirect).to(managementExchange);
111 return directExchangeRoutingKeyConfigurer.with(GATEWAY_TO_SMALL_DIRECT);
112 }*/
113
114
115 /**************************************************uc-engine-management 2 uc-consumer-iptv*************************************************************/
116
117
118 /**************************************************uc-getaway-iptv 2 uc-consumer-iptv*************************************************************/
119
120 public static final String GATEWAY_TO_SERVICE_DIRECT = "uc.gateway.service.direct";
121
122 @Bean
123 public Queue eventServiceDirect() {
124 return new Queue(GATEWAY_TO_SERVICE_DIRECT);
125 }
126
127 @Bean
128 Binding eventBinding(DirectExchange serviceExchange, Queue eventServiceDirect) {
129 BindingBuilder.DirectExchangeRoutingKeyConfigurer directExchangeRoutingKeyConfigurer =
130 BindingBuilder.bind(eventServiceDirect).to(serviceExchange);
131 return directExchangeRoutingKeyConfigurer.with(ENGINE_TO_SERVICE_DIRECT);
132 }
133
134
135 /**************************************************uc-engine-management 2 uc-consumer-iptv*************************************************************/
136
137 public static final String UC_DIRECT_MANAGEMENT = "uc.direct.management";
138 // uc-service-management 2 uc-consumer-iptv
139 // public static final String ENGINE_TO_SERVICE_DIRECT = "uc.engine.service.direct";
140 public static final String ENGINE_TO_SERVICE_DIRECT = "uc.engine.service.direct";
141
142 @Bean
143 public Queue managementDirect() {
144 return new Queue(ENGINE_TO_SERVICE_DIRECT);
145 }
146
147 @Bean
148 DirectExchange managementExchange(){
149 return ExchangeBuilder.directExchange(UC_DIRECT_MANAGEMENT).build();
150 }
151
152 @Bean
153 Binding managementBinding(DirectExchange managementExchange , Queue managementDirect) {
154 BindingBuilder.DirectExchangeRoutingKeyConfigurer directExchangeRoutingKeyConfigurer =
155 BindingBuilder.bind(managementDirect).to(managementExchange);
156 return directExchangeRoutingKeyConfigurer.with(ENGINE_TO_SERVICE_DIRECT);
157 }
158
159
160 /**************************************************uc-engine-iptv 2 uc-consumer-management*************************************************************/
161
162 public static final String UC_DIRECT_SERVICE = "uc.direct";
163 // uc-service-iptv 2 uc-consumer-management
164 public static final String ENGINE_TO_MANAGEMENT_DIRECT = "uc.engine.management.direct";
165
166 @Bean
167 public Queue serviceDirect() {
168 return new Queue(ENGINE_TO_MANAGEMENT_DIRECT);
169 }
170
171 @Bean
172 DirectExchange serviceExchange(){
173 return ExchangeBuilder.directExchange(UC_DIRECT_SERVICE).build();
174 }
175
176 @Bean
177 Binding serviceBinding(DirectExchange serviceExchange , Queue serviceDirect) {
178 BindingBuilder.DirectExchangeRoutingKeyConfigurer directExchangeRoutingKeyConfigurer =
179 BindingBuilder.bind(serviceDirect).to(serviceExchange);
180 return directExchangeRoutingKeyConfigurer.with(ENGINE_TO_MANAGEMENT_DIRECT);
181 }
182
183
184
185 /**************************************************数据基座*************************************************************/
186 public static final String UC_EVENTBUS_EXCHANGE = "uc.eventbus";
187 public static final String UC_EVENTBUS_KEY = "uc.eventbus.*.topic";
188 public static final String UC_EVENTBUS_QUEUE = "uc.eventbus";
189
190 @Bean
191 public Queue eventBusQueue() {
192 return new Queue(UC_EVENTBUS_QUEUE);
193 }
194
195 @Bean
196 TopicExchange eventBusExchange() {
197 return ExchangeBuilder.topicExchange(UC_EVENTBUS_EXCHANGE)
198 .durable(true).build();
199 }
200
201 @Bean
202 Binding eventBusBinding(TopicExchange eventBusExchange, Queue eventBusQueue) {
203 return BindingBuilder
204 .bind(eventBusQueue)
205 .to(eventBusExchange)
206 .with(UC_EVENTBUS_KEY);
207 }
208
209
210 /**************************************************跨屏数据*************************************************************/
211 /** 删除全部收藏队列(大屏->小屏) */
212 public static final String COLLECTION_DELETE_ALL_QUEUE = "queue.collection.deleteall";
213
214 /** 添加收藏队列(大屏->小屏) */
215 public static final String COLLECTION_ADD_QUEUE = "queue.collection.add";
216
217 /** 删除收藏队列(大屏->小屏) */
218 public static final String COLLECTION_DELETE_QUEUE = "queue.collection.delete";
219
220 /** 微信侧 公众号关注与取消关注 */
221 public static final String WEIXIN_SUBORUNSUB_QUEUE = "weixin.subOrUnSub.queue";
222 }
1 package com.topdraw.config;
2
3 import lombok.Data;
4 import org.apache.commons.collections4.CollectionUtils;
5 import org.springframework.boot.context.properties.ConfigurationProperties;
6 import org.springframework.context.annotation.Configuration;
7
8 import java.util.List;
9 import java.util.Map;
10
11 @Data
12 @Configuration
13 @ConfigurationProperties(prefix = "service.mq")
14 public class RabbitMqCustomConfig {
15
16 private List<Map<String, String>> list;
17
18 /**
19 * viewRecord
20 * @return
21 */
22 public Map<String, String> getViewRecordInfo() {
23
24 if (CollectionUtils.isNotEmpty(list)) {
25
26 for (Map<String, String> map : list) {
27 String type = map.get("source");
28 if (type.equalsIgnoreCase("viewRecord")) {
29 return map;
30 }
31 }
32
33 }
34
35 return null;
36 }
37
38 /**
39 * ucg_event
40 * @return
41 */
42 public Map<String, String> getUcgEventInfo() {
43
44 if (CollectionUtils.isNotEmpty(list)) {
45
46 for (Map<String, String> map : list) {
47 String type = map.get("source");
48 if (type.equalsIgnoreCase("event")) {
49 return map;
50 }
51 }
52
53 }
54
55 return null;
56 }
57
58 /**
59 * ucg_collection
60 * @return
61 */
62 public Map<String, String> getUcgCollectionInfo() {
63
64 if (CollectionUtils.isNotEmpty(list)) {
65
66 for (Map<String, String> map : list) {
67 String type = map.get("source");
68 if (type.equalsIgnoreCase("collection")) {
69 return map;
70 }
71 }
72
73 }
74
75 return null;
76 }
77
78 /**
79 * uce
80 * @return
81 */
82 public Map<String, String> getUceInfo() {
83
84 if (CollectionUtils.isNotEmpty(list)) {
85
86 for (Map<String, String> map : list) {
87 String type = map.get("source");
88 if (type.equalsIgnoreCase("uce")) {
89 return map;
90 }
91 }
92
93 }
94
95 return null;
96 }
97
98 /**
99 * eventBus
100 * @return
101 */
102 public Map<String, String> getEventBusInfo() {
103
104 if (CollectionUtils.isNotEmpty(list)) {
105
106 for (Map<String, String> map : list) {
107 String type = map.get("source");
108 if (type.equalsIgnoreCase("eventBus")) {
109 return map;
110 }
111 }
112
113 }
114
115 return null;
116 }
117
118 /**
119 * wechat
120 * @return
121 */
122 public Map<String, String> getWechatInfo() {
123
124 if (CollectionUtils.isNotEmpty(list)) {
125
126 for (Map<String, String> map : list) {
127 String type = map.get("source");
128 if (type.equalsIgnoreCase("wechat")) {
129 return map;
130 }
131 }
132
133 }
134
135 return null;
136 }
137 }
1 package com.topdraw.config;
2
3 import lombok.Data;
4 import org.apache.commons.collections4.CollectionUtils;
5 import org.springframework.boot.context.properties.ConfigurationProperties;
6 import org.springframework.context.annotation.Configuration;
7
8 import java.util.List;
9 import java.util.Map;
10
11 /**
12 * @author :
13 * @description:
14 * @function :
15 * @date :Created in 2022/5/12 12:37
16 * @version: :
17 * @modified By:
18 * @since : modified in 2022/5/12 12:37
19 */
20 @Data
21 @Configuration
22 @ConfigurationProperties(prefix = "service.mq.error.logs")
23 public class RabbitMqErrorLogConfig {
24
25 private List<Map<String, String>> list;
26
27 /**
28 * uce
29 * @return
30 */
31 public Map<String, String> getWechatError() {
32
33 if (CollectionUtils.isNotEmpty(list)) {
34
35 for (Map<String, String> map : list) {
36 String type = map.get("type");
37 if (type.equalsIgnoreCase("wechat")) {
38 return map;
39 }
40 }
41
42 }
43
44 return null;
45 }
46
47 /**
48 * uce
49 * @return
50 */
51 public Map<String, String> getUceError() {
52
53 if (CollectionUtils.isNotEmpty(list)) {
54
55 for (Map<String, String> map : list) {
56 String type = map.get("type");
57 if (type.equalsIgnoreCase("uce")) {
58 return map;
59 }
60 }
61
62 }
63
64 return null;
65 }
66
67 /**
68 * ucg
69 * @return
70 */
71 public Map<String, String> getUcgError() {
72
73 if (CollectionUtils.isNotEmpty(list)) {
74
75 for (Map<String, String> map : list) {
76 String type = map.get("type");
77 if (type.equalsIgnoreCase("ucg")) {
78 return map;
79 }
80 }
81
82 }
83
84 return null;
85 }
86
87 /**
88 * 数据总线
89 * @return
90 */
91 public Map<String, String> getEventBusError() {
92
93 if (CollectionUtils.isNotEmpty(list)) {
94
95 for (Map<String, String> map : list) {
96 String type = map.get("type");
97 if (type.equalsIgnoreCase("eventBus")) {
98 return map;
99 }
100 }
101
102 }
103
104 return null;
105 }
106
107 /**
108 * 个人信息完善
109 * @return
110 */
111 public Map<String, String> getCompleteInfoError() {
112
113 if (CollectionUtils.isNotEmpty(list)) {
114
115 for (Map<String, String> map : list) {
116 String type = map.get("type");
117 if (type.equalsIgnoreCase("completeInfo")) {
118 return map;
119 }
120 }
121
122 }
123
124 return null;
125 }
126
127 /**
128 * 订购
129 * @return
130 */
131 public Map<String, String> getOrderError() {
132
133 if (CollectionUtils.isNotEmpty(list)) {
134
135 for (Map<String, String> map : list) {
136 String type = map.get("type");
137 if (type.equalsIgnoreCase("order")) {
138 return map;
139 }
140 }
141
142 }
143
144 return null;
145 }
146
147 /**
148 * 参加活动
149 * @return
150 */
151 public Map<String, String> getActivityError() {
152
153 if (CollectionUtils.isNotEmpty(list)) {
154
155 for (Map<String, String> map : list) {
156 String type = map.get("type");
157 if (type.equalsIgnoreCase("activity")) {
158 return map;
159 }
160 }
161
162 }
163
164 return null;
165 }
166
167 /**
168 * 签到
169 * @return
170 */
171 public Map<String, String> getSignError() {
172
173 if (CollectionUtils.isNotEmpty(list)) {
174
175 for (Map<String, String> map : list) {
176 String type = map.get("type");
177 if (type.equalsIgnoreCase("sign")) {
178 return map;
179 }
180 }
181
182 }
183
184 return null;
185 }
186
187 /**
188 * 观影、播放记录
189 * @return
190 */
191 public Map<String, String> getPlayError() {
192
193 if (CollectionUtils.isNotEmpty(list)) {
194
195 for (Map<String, String> map : list) {
196 String type = map.get("type");
197 if (type.equalsIgnoreCase("play")) {
198 return map;
199 }
200 }
201
202 }
203
204 return null;
205 }
206
207
208 }
1 package com.topdraw.config;
2
3 import lombok.Data;
4 import org.apache.commons.collections4.MapUtils;
5 import org.apache.commons.lang3.StringUtils;
6 import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
7 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
8 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
9 import org.springframework.amqp.rabbit.core.RabbitAdmin;
10 import org.springframework.amqp.rabbit.core.RabbitTemplate;
11 import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
12 import org.springframework.beans.factory.annotation.Qualifier;
13 import org.springframework.beans.factory.annotation.Value;
14 import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
15 import org.springframework.context.annotation.Bean;
16 import org.springframework.context.annotation.Configuration;
17 import org.springframework.context.annotation.Primary;
18
19 import javax.naming.ConfigurationException;
20 import java.util.Map;
21
22 @Data
23 @Configuration
24 public class RabbitMqSourceConfig {
25
26 @Value("${mutil-mq.service.host}")
27 private String serviceHost;
28 @Value("${mutil-mq.service.port}")
29 private Integer servicePort;
30 @Value("${mutil-mq.service.username}")
31 private String serviceUserName;
32 @Value("${mutil-mq.service.password}")
33 private String servicePassword;
34 @Value("${mutil-mq.service.virtual-host}")
35 private String serviceVirtualHost;
36
37 public static final String SERVICE_ = "serviceRabbitListenerContainerFactory";
38
39 @Bean(name = "serviceConnectionFactory")
40 @Primary
41 public ConnectionFactory serviceConnectionFactory(){
42 CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
43 connectionFactory.setHost(serviceHost);
44 connectionFactory.setPort(servicePort);
45 connectionFactory.setUsername(serviceUserName);
46 connectionFactory.setPassword(servicePassword);
47 connectionFactory.setVirtualHost(serviceVirtualHost);
48 return connectionFactory;
49 }
50
51 @Bean(name = SERVICE_)
52 @Primary
53 public RabbitListenerContainerFactory serviceRabbitListenerContainerFactory(
54 SimpleRabbitListenerContainerFactoryConfigurer containerFactoryConfigurer,
55 @Qualifier("serviceConnectionFactory") ConnectionFactory connectionFactory) {
56 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
57 containerFactoryConfigurer.configure(factory, connectionFactory);
58 return factory;
59 }
60
61 @Bean(name = "serviceRabbitTemplate")
62 public RabbitTemplate serviceRabbitTemplate(ConnectionFactory serviceConnectionFactory){
63 RabbitTemplate rabbitTemplate = new RabbitTemplate(serviceConnectionFactory);
64 return rabbitTemplate;
65 }
66
67 @Bean(name = "serviceRabbitAdmin")
68 public RabbitAdmin serviceRabbitAdmin(
69 @Qualifier("serviceConnectionFactory") ConnectionFactory connectionFactory) {
70 RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
71 rabbitAdmin.setAutoStartup(true);
72 return rabbitAdmin;
73 }
74
75
76 public static final String MANAGEMENT_ = "managementRabbitListenerContainerFactory";
77
78 @Value("${mutil-mq.management.host}")
79 private String managementHost;
80 @Value("${mutil-mq.management.port}")
81 private Integer managementPort;
82 @Value("${mutil-mq.management.username}")
83 private String managementUserName;
84 @Value("${mutil-mq.management.password}")
85 private String managementPassword;
86 @Value("${mutil-mq.management.virtual-host}")
87 private String managementVirtualHost;
88
89 @Bean(name = "managementConnectionFactory")
90 public ConnectionFactory managementConnectionFactory(){
91 CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
92 connectionFactory.setHost(managementHost);
93 connectionFactory.setPort(managementPort);
94 connectionFactory.setUsername(managementUserName);
95 connectionFactory.setPassword(managementPassword);
96 connectionFactory.setVirtualHost(managementVirtualHost);
97 return connectionFactory;
98 }
99
100 @Bean(name = MANAGEMENT_)
101 public RabbitListenerContainerFactory managementRabbitListenerContainerFactory(
102 SimpleRabbitListenerContainerFactoryConfigurer containerFactoryConfigurer,
103 @Qualifier("managementConnectionFactory") ConnectionFactory connectionFactory) {
104 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
105 containerFactoryConfigurer.configure(factory, connectionFactory);
106 return factory;
107 }
108
109 @Bean(name = "managementRabbitTemplate")
110 public RabbitTemplate managementRabbitTemplate(ConnectionFactory managementConnectionFactory){
111 RabbitTemplate rabbitTemplate = new RabbitTemplate(managementConnectionFactory);
112 return rabbitTemplate;
113 }
114
115 @Bean(name = "managementRabbitAdmin")
116 public RabbitAdmin managementRabbitAdmin(
117 @Qualifier("managementConnectionFactory") ConnectionFactory connectionFactory) {
118 RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
119 rabbitAdmin.setAutoStartup(true);
120 return rabbitAdmin;
121 }
122
123
124 /**************************************************数据源*************************************************************/
125
126
127 /*@Value("${service.platform}")
128 private String platform;
129 @Value("${service.active}")
130 private String active;
131
132 public String getActiveSource() throws ConfigurationException {
133
134 if (StringUtils.isBlank(platform))
135 throw new ConfigurationException("【error】=> platform is none !! ");
136
137 if (StringUtils.isBlank(active))
138 throw new ConfigurationException("【error】=> active is none !! ");
139
140 if (active.equalsIgnoreCase("service,management") || active.equalsIgnoreCase("management,service")) {
141 return "service,management";
142 }
143
144 return active;
145 }*/
146
147 private String chargeSource(String active) {
148
149 if (active.equalsIgnoreCase("management")) {
150 return MANAGEMENT_;
151 }
152
153 if (active.equalsIgnoreCase("service")) {
154 return SERVICE_;
155 }
156
157 return null;
158 }
159
160 /**************************************************uc-getaway 2 uc-consumer*************************************************************/
161
162 public static final String EVENT_EXCHANGE = "event.exchange";
163 public static final String EVENT_QUEUE = "event.queue";
164
165 @Value("#{rabbitMqCustomConfig.getUcgEventInfo()}")
166 private Map<String, String> ucgEventInfo;
167
168 public String getUcgEventQueue(){
169 if (MapUtils.isNotEmpty(ucgEventInfo)) {
170 String queue = ucgEventInfo.get("queue");
171 return queue;
172 }
173
174 return EVENT_QUEUE;
175 }
176
177 public String getUcgEventSource(){
178 if (MapUtils.isNotEmpty(ucgEventInfo)) {
179 String source = ucgEventInfo.get("active");
180 if (StringUtils.isNotBlank(source)) {
181 return this.chargeSource(source);
182 } else {
183 return "";
184 }
185 }
186
187 return null;
188 }
189
190 public String getUcgEventStartUp(){
191 if (MapUtils.isNotEmpty(ucgEventInfo)) {
192 String source = ucgEventInfo.get("active");
193 if (StringUtils.isNotBlank(source)) {
194 return "true";
195 }
196 }
197
198 return "false";
199 }
200
201 /* public static final String COLLECTION_DELETE_ALL_QUEUE = "queue.collection.deleteall";
202 public static final String COLLECTION_ADD_QUEUE = "queue.collection.add";
203 public static final String COLLECTION_DELETE_QUEUE = "queue.collection.delete";*/
204
205 public static final String COLLECTION_EXCHANGE = "collection.exchange";
206 public static final String COLLECTION_QUEUE = "collection.queue";
207
208 @Value("#{rabbitMqCustomConfig.getUcgCollectionInfo()}")
209 private Map<String, String> ucgIptvCollectionInfo;
210
211 public String getUcgCollectionQueue(){
212 if (MapUtils.isNotEmpty(ucgIptvCollectionInfo)) {
213 String queue = ucgIptvCollectionInfo.get("queue");
214 return queue;
215 }
216
217 return COLLECTION_QUEUE;
218 }
219
220 public String getUcgCollectionSource(){
221 if (MapUtils.isNotEmpty(ucgIptvCollectionInfo)) {
222 String source = ucgIptvCollectionInfo.get("active");
223 if (StringUtils.isNotBlank(source)) {
224 return this.chargeSource(source);
225 } else {
226 return "";
227 }
228 }
229
230 return null;
231 }
232
233 public String getUcgCollectionStartUp(){
234 if (MapUtils.isNotEmpty(ucgIptvCollectionInfo)) {
235 String source = ucgIptvCollectionInfo.get("active");
236 if (StringUtils.isNotBlank(source)) {
237 return "true";
238 }
239 }
240
241 return "false";
242 }
243
244 public static final String VIEW_RECORD_EXCHANGE = "viewRecord.exchange";
245 public static final String VIEW_RECORD_QUEUE = "viewRecord.queue";
246
247 @Value("#{rabbitMqCustomConfig.getViewRecordInfo()}")
248 private Map<String, String> viewRecordInfo;
249
250 public String getViewRecordQueue(){
251 if (MapUtils.isNotEmpty(viewRecordInfo)) {
252 String queue = viewRecordInfo.get("queue");
253 return queue;
254 }
255
256 return VIEW_RECORD_QUEUE;
257 }
258
259 public String getViewRecordSource(){
260 if (MapUtils.isNotEmpty(viewRecordInfo)) {
261 String source = viewRecordInfo.get("active");
262 if (StringUtils.isNotBlank(source)) {
263 return this.chargeSource(source);
264 } else {
265 return "";
266 }
267 }
268
269 return null;
270 }
271
272 public String getViewRecordStartUp(){
273 if (MapUtils.isNotEmpty(viewRecordInfo)) {
274 String source = viewRecordInfo.get("active");
275 if (StringUtils.isNotBlank(source)) {
276 return "true";
277 }
278 }
279
280 return "false";
281 }
282
283 /**************************************************uc-engine 2 uc-consumer*************************************************************/
284
285 public static final String UCE_EXCHANGE = "uce.exchange";
286 public static final String UCE_QUEUE = "uce.queue";
287
288 @Value("#{rabbitMqCustomConfig.getUceInfo()}")
289 private Map<String, String> uceInfo;
290
291 public String getUceQueue(){
292 if (MapUtils.isNotEmpty(uceInfo)) {
293 String queue = uceInfo.get("queue");
294 return queue;
295 }
296
297 return UCE_QUEUE;
298 }
299
300 public String getUceSource(){
301 if (MapUtils.isNotEmpty(uceInfo)) {
302 String source = uceInfo.get("active");
303 if (StringUtils.isNotBlank(source)) {
304 return this.chargeSource(source);
305 } else {
306 return "";
307 }
308 }
309
310 return null;
311 }
312
313 public String getUceStartUp(){
314 if (MapUtils.isNotEmpty(uceInfo)) {
315 String source = uceInfo.get("active");
316 if (StringUtils.isNotBlank(source)) {
317 return "true";
318 }
319 }
320
321 return "false";
322 }
323
324 /**************************************************eventBus*************************************************************/
325 public static final String UC_EVENTBUS_EXCHANGE = "uc.eventbus";
326 public static final String UC_EVENTBUS_KEY = "uc.eventbus.*.topic";
327 public static final String UC_EVENTBUS_QUEUE = "uc.eventbus";
328
329 @Value("#{rabbitMqCustomConfig.getEventBusInfo()}")
330 private Map<String, String> eventBusInfo;
331
332 public String getEventBusQueue(){
333 if (MapUtils.isNotEmpty(eventBusInfo)) {
334 String queue = eventBusInfo.get("queue");
335 return queue;
336 }
337
338 return UC_EVENTBUS_QUEUE;
339 }
340
341 public String getEventBusSource(){
342 if (MapUtils.isNotEmpty(eventBusInfo)) {
343 String source = eventBusInfo.get("active");
344 if (StringUtils.isNotBlank(source)) {
345 return this.chargeSource(source);
346 } else {
347 return "";
348 }
349 }
350
351 return null;
352 }
353
354 public String getEventBusStartUp(){
355 if (MapUtils.isNotEmpty(eventBusInfo)) {
356 String source = eventBusInfo.get("active");
357 if (StringUtils.isNotBlank(source)) {
358 return "true";
359 }
360 }
361
362 return "false";
363 }
364
365 /**************************************************wechat*************************************************************/
366
367 public static final String WEIXIN_EXCHANGE = "weixin.subOrUnSub.direct";
368 public static final String WEIXIN_SUBORUNSUB_QUEUE = "weixin.subOrUnSub.queue";
369
370 @Value("#{rabbitMqCustomConfig.getWechatInfo()}")
371 private Map<String, String> wechatInfo;
372
373 public String getWechatQueue(){
374 if (MapUtils.isNotEmpty(wechatInfo)) {
375 String queue = wechatInfo.get("queue");
376 return queue;
377 }
378
379 return WEIXIN_SUBORUNSUB_QUEUE;
380 }
381
382 public String getWechatSource(){
383 if (MapUtils.isNotEmpty(wechatInfo)) {
384 String source = wechatInfo.get("active");
385 if (StringUtils.isNotBlank(source)) {
386 return this.chargeSource(source);
387 } else {
388 return "";
389 }
390 }
391
392 return null;
393 }
394
395 public String getWechatStartUp(){
396 if (MapUtils.isNotEmpty(wechatInfo)) {
397 String source = wechatInfo.get("active");
398 if (StringUtils.isNotBlank(source)) {
399 return "true";
400 }
401 }
402
403 return "false";
404 }
405 }
1 package com.topdraw.exception;
2
3 /**
4 * @author :
5 * @description:
6 * @function :
7 * @date :Created in 2022/5/13 15:50
8 * @version: :
9 * @modified By:
10 * @since : modified in 2022/5/13 15:50
11 */
12 public class RabbitMqExceptionHandler {
13
14
15
16 }
1 package com.topdraw.mq.consumer; 1 package com.topdraw.mq.consumer;
2 2
3 import com.topdraw.config.RabbitMqConfig; 3 import com.rabbitmq.client.Channel;
4 import com.topdraw.mq.domain.DataSyncMsg;
5 import com.topdraw.mq.domain.TableOperationMsg; 4 import com.topdraw.mq.domain.TableOperationMsg;
6 import com.topdraw.resttemplate.RestTemplateClient; 5 import com.topdraw.resttemplate.RestTemplateClient;
6 import com.topdraw.util.FileUtil;
7 import com.topdraw.util.JSONUtil; 7 import com.topdraw.util.JSONUtil;
8 import lombok.extern.slf4j.Slf4j; 8 import lombok.extern.slf4j.Slf4j;
9 import org.slf4j.Logger; 9 import org.apache.commons.collections4.MapUtils;
10 import org.slf4j.LoggerFactory; 10 import org.springframework.amqp.core.Message;
11 import org.springframework.amqp.core.ExchangeTypes; 11 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
12 import org.springframework.amqp.rabbit.annotation.*; 12 import org.springframework.amqp.rabbit.annotation.RabbitListener;
13 import org.springframework.beans.factory.annotation.Autowired; 13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.beans.factory.annotation.Value;
14 import org.springframework.stereotype.Component; 15 import org.springframework.stereotype.Component;
15 import org.springframework.util.Assert; 16 import org.springframework.util.Assert;
16 17
18 import java.io.IOException;
19 import java.time.LocalDate;
20 import java.util.Map;
21
17 @Component 22 @Component
18 @Slf4j 23 @Slf4j
19 public class UcEngineIptv2ManagementConsumer { 24 public class UcEngineIptv2ManagementConsumer {
...@@ -24,6 +29,9 @@ public class UcEngineIptv2ManagementConsumer { ...@@ -24,6 +29,9 @@ public class UcEngineIptv2ManagementConsumer {
24 @Autowired 29 @Autowired
25 RestTemplateClient restTemplateClient; 30 RestTemplateClient restTemplateClient;
26 31
32 @Value("#{rabbitMqErrorLogConfig.getUceError()}")
33 private Map<String, String> error;
34
27 /** 35 /**
28 * 事件 36 * 事件
29 * @param content 37 * @param content
...@@ -32,14 +40,36 @@ public class UcEngineIptv2ManagementConsumer { ...@@ -32,14 +40,36 @@ public class UcEngineIptv2ManagementConsumer {
32 * @date 2021/9/7 11:26 上午 40 * @date 2021/9/7 11:26 上午
33 */ 41 */
34 /*@RabbitHandler 42 /*@RabbitHandler
35 @RabbitListener(bindings = { 43 @RabbitListener(queues = "#{rabbitMqConfig.getUceQueue()}",
36 @QueueBinding(value = @Queue(value = RabbitMqConfig.ENGINE_TO_MANAGEMENT_DIRECT), 44 containerFactory = "serviceRabbitListenerContainerFactory", ackMode = "MANUAL")*/
37 exchange = @Exchange(value = ExchangeTypes.DIRECT)) 45 public void ucEventConsumer(Channel channel, Message message, String content) throws IOException {
38 }, containerFactory = "serviceRabbitListenerContainerFactory")*/
39 public void ucEventConsumer(String content) {
40 log.info(" receive dataSync msg , content is : {} ", content); 46 log.info(" receive dataSync msg , content is : {} ", content);
41 TableOperationMsg tableOperationMsg = this.parseContent(content); 47 try {
42 autoUser.route(tableOperationMsg); 48
49 TableOperationMsg tableOperationMsg = this.parseContent(content);
50
51 autoUser.route(tableOperationMsg);
52
53 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
54
55 } catch (Exception e) {
56
57 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
58
59 if (MapUtils.isNotEmpty(error)) {
60 String errorStart = this.error.get("start");
61
62 if (errorStart.equalsIgnoreCase("true")) {
63 String fileName = this.error.get("fileName")+"_"+ LocalDate.now() +".log";
64 String filePath = this.error.get("filePath");
65 String filePath1 = filePath+fileName;
66 FileUtil.writeStringToFile2(filePath1, content, e.getMessage());
67 }
68
69 }
70
71 e.printStackTrace();
72 }
43 log.info("ucEventConsumer ====>>>> end"); 73 log.info("ucEventConsumer ====>>>> end");
44 } 74 }
45 75
......
1 package com.topdraw.mq.consumer; 1 package com.topdraw.mq.consumer;
2 2
3 import com.topdraw.config.RabbitMqConfig; 3 import com.rabbitmq.client.Channel;
4 import com.topdraw.mq.domain.TableOperationMsg; 4 import com.topdraw.mq.domain.TableOperationMsg;
5 import com.topdraw.resttemplate.RestTemplateClient; 5 import com.topdraw.resttemplate.RestTemplateClient;
6 import com.topdraw.util.FileUtil;
6 import com.topdraw.util.JSONUtil; 7 import com.topdraw.util.JSONUtil;
7 import lombok.extern.slf4j.Slf4j; 8 import lombok.extern.slf4j.Slf4j;
8 import org.springframework.amqp.core.ExchangeTypes; 9 import org.apache.commons.collections4.MapUtils;
10 import org.springframework.amqp.core.Message;
9 import org.springframework.amqp.rabbit.annotation.*; 11 import org.springframework.amqp.rabbit.annotation.*;
10 import org.springframework.beans.factory.annotation.Autowired; 12 import org.springframework.beans.factory.annotation.Autowired;
13 import org.springframework.beans.factory.annotation.Value;
11 import org.springframework.stereotype.Component; 14 import org.springframework.stereotype.Component;
12 import org.springframework.util.Assert; 15 import org.springframework.util.Assert;
13 16
17 import java.io.IOException;
18 import java.time.LocalDate;
19 import java.util.Map;
20
14 @Component 21 @Component
15 @Slf4j 22 @Slf4j
16 public class UcEngineManagement2IptvConsumer { 23 public class UcEngineManagement2IptvConsumer {
...@@ -21,6 +28,9 @@ public class UcEngineManagement2IptvConsumer { ...@@ -21,6 +28,9 @@ public class UcEngineManagement2IptvConsumer {
21 @Autowired 28 @Autowired
22 RestTemplateClient restTemplateClient; 29 RestTemplateClient restTemplateClient;
23 30
31 @Value("#{rabbitMqErrorLogConfig.getUceError()}")
32 private Map<String, String> error;
33
24 /** 34 /**
25 * 事件 35 * 事件
26 * @param content 36 * @param content
...@@ -29,14 +39,38 @@ public class UcEngineManagement2IptvConsumer { ...@@ -29,14 +39,38 @@ public class UcEngineManagement2IptvConsumer {
29 * @date 2021/9/7 11:26 上午 39 * @date 2021/9/7 11:26 上午
30 */ 40 */
31 @RabbitHandler 41 @RabbitHandler
32 @RabbitListener(bindings = { 42 @RabbitListener(queues = "#{rabbitMqSourceConfig.getUceQueue()}",
33 @QueueBinding(value = @Queue(value = RabbitMqConfig.ENGINE_TO_SERVICE_DIRECT), 43 containerFactory = "#{rabbitMqSourceConfig.getUceSource()}",
34 exchange = @Exchange(value = ExchangeTypes.DIRECT)) 44 autoStartup = "#{rabbitMqSourceConfig.getUceStartUp()}",
35 }, containerFactory = "managementRabbitListenerContainerFactory") 45 ackMode = "MANUAL")
36 public void ucEventConsumer(String content) { 46 public void ucEventConsumer(Channel channel, Message message, String content) throws IOException {
37 log.info(" receive dataSync msg , content is : {} ", content); 47 log.info(" receive dataSync msg , content is : {} ", content);
38 TableOperationMsg tableOperationMsg = this.parseContent(content); 48
39 autoUser.route(tableOperationMsg); 49 try {
50 TableOperationMsg tableOperationMsg = this.parseContent(content);
51
52 autoUser.route(tableOperationMsg);
53
54 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
55
56 } catch (Exception e) {
57
58 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
59
60 if (MapUtils.isNotEmpty(error)) {
61 String errorStart = this.error.get("start");
62
63 if (errorStart.equalsIgnoreCase("true")) {
64 String fileName = this.error.get("fileName")+"_"+ LocalDate.now() +".log";
65 String filePath = this.error.get("filePath");
66 String filePath1 = filePath+fileName;
67 FileUtil.writeStringToFile2(filePath1, content, e.getMessage());
68 }
69
70 }
71
72 e.printStackTrace();
73 }
40 log.info("ucEventConsumer ====>>>> end"); 74 log.info("ucEventConsumer ====>>>> end");
41 } 75 }
42 76
......
...@@ -2,33 +2,38 @@ package com.topdraw.mq.consumer; ...@@ -2,33 +2,38 @@ package com.topdraw.mq.consumer;
2 2
3 import com.alibaba.fastjson.JSON; 3 import com.alibaba.fastjson.JSON;
4 import com.alibaba.fastjson.JSONObject; 4 import com.alibaba.fastjson.JSONObject;
5 import com.rabbitmq.client.Channel;
5 import com.topdraw.business.module.member.service.MemberService; 6 import com.topdraw.business.module.member.service.MemberService;
6 import com.topdraw.business.module.member.service.dto.MemberDTO; 7 import com.topdraw.business.module.member.service.dto.MemberDTO;
8 import com.topdraw.business.module.task.attribute.service.TaskAttrService;
9 import com.topdraw.business.module.task.attribute.service.dto.TaskAttrDTO;
10 import com.topdraw.business.module.task.domain.Task;
11 import com.topdraw.business.module.task.service.TaskService;
12 import com.topdraw.business.module.task.template.service.TaskTemplateService;
13 import com.topdraw.business.module.task.template.service.dto.TaskTemplateDTO;
7 import com.topdraw.business.module.user.iptv.service.UserTvService; 14 import com.topdraw.business.module.user.iptv.service.UserTvService;
8 import com.topdraw.business.module.user.iptv.service.dto.UserTvDTO; 15 import com.topdraw.business.module.user.iptv.service.dto.UserTvDTO;
9 import com.topdraw.config.RabbitMqConfig;
10 import com.topdraw.exception.BadRequestException;
11 import com.topdraw.exception.EntityNotFoundException; 16 import com.topdraw.exception.EntityNotFoundException;
12 import com.topdraw.mq.domain.DataSyncMsg; 17 import com.topdraw.mq.domain.DataSyncMsg;
13 import com.topdraw.resttemplate.RestTemplateClient; 18 import com.topdraw.resttemplate.RestTemplateClient;
14 import com.topdraw.util.DateUtil; 19 import com.topdraw.util.DateUtil;
20 import com.topdraw.util.FileUtil;
15 import com.topdraw.util.JSONUtil; 21 import com.topdraw.util.JSONUtil;
16 import com.topdraw.util.TimestampUtil;
17 import com.topdraw.utils.RedisUtils; 22 import com.topdraw.utils.RedisUtils;
18 import lombok.Data; 23 import lombok.Data;
19 import lombok.extern.slf4j.Slf4j; 24 import lombok.extern.slf4j.Slf4j;
25 import org.apache.commons.collections4.CollectionUtils;
20 import org.apache.commons.collections4.MapUtils; 26 import org.apache.commons.collections4.MapUtils;
21 import org.apache.commons.lang3.StringUtils; 27 import org.apache.commons.lang3.StringUtils;
22 import org.apache.commons.lang3.time.DateFormatUtils; 28 import org.springframework.amqp.core.Message;
23 import org.apache.commons.lang3.time.DateUtils;
24 import org.springframework.amqp.rabbit.annotation.*; 29 import org.springframework.amqp.rabbit.annotation.*;
25 import org.springframework.beans.factory.annotation.Autowired; 30 import org.springframework.beans.factory.annotation.Autowired;
31 import org.springframework.beans.factory.annotation.Value;
26 import org.springframework.stereotype.Component; 32 import org.springframework.stereotype.Component;
27 33
28 import java.text.DateFormat; 34 import java.io.IOException;
29 import java.text.ParseException; 35 import java.text.ParseException;
30 import java.text.SimpleDateFormat; 36 import java.time.LocalDate;
31 import java.time.LocalDateTime;
32 import java.util.*; 37 import java.util.*;
33 38
34 @Component 39 @Component
...@@ -36,17 +41,23 @@ import java.util.*; ...@@ -36,17 +41,23 @@ import java.util.*;
36 public class UcEventBusIptv2ManagementUcEngine { 41 public class UcEventBusIptv2ManagementUcEngine {
37 42
38 @Autowired 43 @Autowired
39 private RestTemplateClient restTemplateClient; 44 private TaskService taskService;
40 @Autowired 45 @Autowired
41 private UserTvService userTvService; 46 private UserTvService userTvService;
42 @Autowired 47 @Autowired
43 private MemberService memberService; 48 private MemberService memberService;
44 49 @Autowired
50 private TaskAttrService taskAttrService;
51 @Autowired
52 private TaskTemplateService taskTemplateService;
53 @Autowired
54 private RestTemplateClient restTemplateClient;
45 @Autowired 55 @Autowired
46 private RedisUtils redisUtils; 56 private RedisUtils redisUtils;
47 57
48 58
49 private static final Integer PLAY_30 = 30; 59 @Value("#{rabbitMqErrorLogConfig.getEventBusError()}")
60 private Map<String, String> error;
50 61
51 /** 62 /**
52 * 事件 63 * 事件
...@@ -56,13 +67,35 @@ public class UcEventBusIptv2ManagementUcEngine { ...@@ -56,13 +67,35 @@ public class UcEventBusIptv2ManagementUcEngine {
56 * @date 2021/9/7 11:26 上午 67 * @date 2021/9/7 11:26 上午
57 */ 68 */
58 @RabbitHandler 69 @RabbitHandler
59 @RabbitListener(queues = RabbitMqConfig.UC_EVENTBUS_QUEUE, 70 @RabbitListener(queues = "#{rabbitMqSourceConfig.getEventBusQueue()}",
60 containerFactory = "serviceRabbitListenerContainerFactory") 71 containerFactory = "#{rabbitMqSourceConfig.getEventBusSource()}",
61 public void ucEventConsumer(String content) throws ParseException { 72 autoStartup = "#{rabbitMqSourceConfig.getEventBusStartUp()}",
73 ackMode = "MANUAL")
74 public void eventBusConsumer(Channel channel, Message message, String content) throws ParseException, IOException {
62 log.info(" receive dataSync msg , content is : {} ", content); 75 log.info(" receive dataSync msg , content is : {} ", content);
63 DataSyncMsg dataSyncMsg = this.parseContent(content); 76 try {
64 if (Objects.nonNull(dataSyncMsg)) { 77
65 this.taskDeal(dataSyncMsg); 78 this.parseContent(content);
79
80 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
81
82 } catch (Exception e) {
83
84 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
85
86 if (MapUtils.isNotEmpty(error)) {
87 String errorStart = this.error.get("start");
88
89 if (errorStart.equalsIgnoreCase("true")) {
90 String fileName = this.error.get("fileName")+"_"+LocalDate.now() +".log";
91 String filePath = this.error.get("filePath");
92 String filePath1 = filePath+fileName;
93 FileUtil.writeStringToFile2(filePath1, content, e.getMessage());
94 }
95
96 }
97
98 e.printStackTrace();
66 } 99 }
67 log.info("ucEventConsumer ====>>>> end"); 100 log.info("ucEventConsumer ====>>>> end");
68 } 101 }
...@@ -73,7 +106,7 @@ public class UcEventBusIptv2ManagementUcEngine { ...@@ -73,7 +106,7 @@ public class UcEventBusIptv2ManagementUcEngine {
73 * @param content 106 * @param content
74 * @return 107 * @return
75 */ 108 */
76 private DataSyncMsg parseContent(String content) throws ParseException { 109 private void parseContent(String content) throws ParseException {
77 110
78 CommonMsg commonMsg = JSONUtil.parseMsg2Object(content, CommonMsg.class); 111 CommonMsg commonMsg = JSONUtil.parseMsg2Object(content, CommonMsg.class);
79 112
...@@ -98,120 +131,147 @@ public class UcEventBusIptv2ManagementUcEngine { ...@@ -98,120 +131,147 @@ public class UcEventBusIptv2ManagementUcEngine {
98 131
99 DataSyncMsg dataSyncMsg = new DataSyncMsg(); 132 DataSyncMsg dataSyncMsg = new DataSyncMsg();
100 dataSyncMsg.setEventType(evt); 133 dataSyncMsg.setEventType(evt);
101 DataSyncMsg.MsgData msgData1 = new DataSyncMsg.MsgData(); 134 DataSyncMsg.MsgData msg = new DataSyncMsg.MsgData();
102
103 135
104 Integer playDurationValueTotal = 0; 136 Integer playDurationValueTotal = 0;
105
106 if (StringUtils.isNotBlank(platformAccount)) { 137 if (StringUtils.isNotBlank(platformAccount)) {
107 138
108 UserTvDTO userTvDTO = this.userTvService.findByPlatformAccount(platformAccount); 139 UserTvDTO userTvDTO = this.userTvService.findByPlatformAccount(platformAccount);
109 140
110 if(Objects.nonNull(userTvDTO)) { 141 if(Objects.nonNull(userTvDTO)) {
111 142 // 用大屏账号+日期做为key,并判断这个key是否存在 ,数据类型为hash eg:<total,1>,<1,playDuration>,<2,playDuration>
112 String key = platformAccount+"|"+formatDate; 143 String key = platformAccount+"|"+formatDate;
113 Map<Object, Object> hmget = 144 Map<Object, Object> hmget =
114 this.redisUtils.hmget(key); 145 this.redisUtils.hmget(key);
115 146
116 int maxSize = 1; 147 if (MapUtils.isEmpty(hmget)) {
117 if (MapUtils.isNotEmpty(hmget)) { 148
149 // 初始化播放总时长<total>和第一个播放时间
150 playDurationValueTotal = playDuration;
151 Map<String, Object> map = new HashMap<>();
152 map.put("total", playDurationValueTotal);
153 map.put("1", playDuration);
154 this.redisUtils.hmset(key, map, 129600);
118 155
119 Set<Object> objects = hmget.keySet(); 156 } else {
120 157
121 Integer playDurationValueTotal_ = 0; 158 // 计算播放总时长 total = 播放总时长+当前播放时长
122 for (Object key_ : objects) { 159 Integer total = this.getRedisTotal(hmget);
160 playDurationValueTotal = total + playDuration;
123 161
124 if (key_.toString().equalsIgnoreCase("total")) { 162 }
125 playDurationValueTotal_ = Integer.valueOf(hmget.get(key_).toString());
126 if (playDurationValueTotal_ > 1440) {
127 maxSize = objects.size();
128 Integer maxTotal = maxSize+1;
129 Integer playDurationValue = Integer.valueOf(hmget.get(key_).toString());
130 playDurationValueTotal = playDurationValue+playDuration;
131 Map<String, Object> map = new HashMap<>();
132 map.put(String.valueOf(maxTotal), playDuration);
133 map.put("total", playDurationValueTotal);
134 this.redisUtils.hmset(key, map, 172800);
135 return null;
136 }
137 }
138 163
139 } 164 Integer totalKey = this.getRedisTotalKey(hmget);
165 Integer maxSize = totalKey + 1;
140 166
141 maxSize = objects.size(); 167 // DataSyncMsg dataSyncMsg1 =
168 this.checkTask(playDurationValueTotal, time, deviceType,
169 mediaCode, mediaId, mediaName, dataSyncMsg, msg, userTvDTO);
142 170
143 playDurationValueTotal = playDurationValueTotal_ + playDuration; 171 // if (Objects.nonNull(dataSyncMsg1)) {
144 172
145 Map<String, Object> map = new HashMap<>(); 173 Map<String, Object> map = new HashMap<>();
146 map.put(String.valueOf(maxSize+1), playDuration); 174 map.put(String.valueOf(maxSize), playDuration);
147 map.put("total", playDurationValueTotal); 175 map.put("total", playDurationValueTotal);
148 this.redisUtils.hmset(key, map, 172800); 176 this.redisUtils.hmset(key, map);
149 177
178 // }
179 }
150 180
151 } else { 181 }
152 182
153 playDurationValueTotal = playDuration; 183 }
154 Map<String, Object> map = new HashMap<>();
155 map.put("total", playDurationValueTotal);
156 map.put("1", playDuration);
157 this.redisUtils.hmset(key, map, 172800);
158 184
159 } 185 break;
160 186
161 DataSyncMsg dataSyncMsg1 = null; 187 }
162 if (playDurationValueTotal > 0 && playDurationValueTotal <= PLAY_30) {
163 log.info("playDurationValueTotal ===>>> {}",playDurationValueTotal);
164 log.info("===>> start dealTask");
165 dataSyncMsg1 = getDataSyncMsg(time, deviceType, mediaCode, mediaId, mediaName, playDurationValueTotal,
166 dataSyncMsg, msgData1, userTvDTO);
167 }
168 188
169 if (playDurationValueTotal >= 31 && playDurationValueTotal <= 60) { 189 //return null;
170 log.info("playDurationValueTotal ===>>> {}",playDurationValueTotal); 190 }
171 log.info("===>> start dealTask");
172 191
173 dataSyncMsg1 = getDataSyncMsg(time, deviceType, mediaCode, mediaId, mediaName, 30, 192 private DataSyncMsg checkTask(Integer playDurationValueTotal, String time, Integer deviceType, String mediaCode,
174 dataSyncMsg, msgData1, userTvDTO); 193 Long mediaId, String mediaName, DataSyncMsg dataSyncMsg,
175 this.taskDeal(dataSyncMsg1); 194 DataSyncMsg.MsgData msgData, UserTvDTO userTvDTO) {
195
196 List<TaskAttrDTO> taskAttrDTOList = new ArrayList<>();
197 TaskTemplateDTO taskTemplateDTO = this.taskTemplateService.findByType(8);
198 if (Objects.nonNull(taskTemplateDTO.getId())) {
199 List<Task> taskList = this.taskService.findByTemplateId(taskTemplateDTO.getId());
200 if (CollectionUtils.isNotEmpty(taskList)) {
201 for (Task task : taskList) {
202 TaskAttrDTO taskAttrDTO = this.taskAttrService.findByTaskId(task.getId());
203 taskAttrDTOList.add(taskAttrDTO);
204 }
205 }
206 }
176 207
177 dataSyncMsg1 = getDataSyncMsg(time, deviceType, mediaCode, mediaId, mediaName, playDurationValueTotal, 208 List<List<Integer>> attrList = new ArrayList<>();
178 dataSyncMsg, msgData1, userTvDTO); 209 if (CollectionUtils.isNotEmpty(taskAttrDTOList)) {
179 210
180 } 211 for (TaskAttrDTO taskAttrDTO : taskAttrDTOList) {
181 if (playDurationValueTotal >= 61 && playDurationValueTotal <= 1440) {
182 log.info("playDurationValueTotal ===>>> {}",playDurationValueTotal);
183 log.info("===>> start dealTask");
184 212
185 dataSyncMsg1 = getDataSyncMsg(time, deviceType, mediaCode, mediaId, mediaName, 30, 213 String attrStr = taskAttrDTO.getAttrStr();
186 dataSyncMsg, msgData1, userTvDTO); 214 if (StringUtils.isNotBlank(attrStr)) {
187 this.taskDeal(dataSyncMsg1);
188 215
189 dataSyncMsg1 = getDataSyncMsg(time, deviceType, mediaCode, mediaId, mediaName, 60, 216 JSONObject parse = JSONObject.parseObject(attrStr, JSONObject.class);
190 dataSyncMsg, msgData1, userTvDTO); 217 List<Integer> value = (List<Integer>) parse.get("value");
191 this.taskDeal(dataSyncMsg1); 218 attrList.add(value);
219 }
192 220
193 dataSyncMsg1 = getDataSyncMsg(time, deviceType, mediaCode, mediaId, mediaName, playDurationValueTotal, 221 }
194 dataSyncMsg, msgData1, userTvDTO);
195 }
196 222
197 log.info("playDurationValueTotal ===>>> {}",playDurationValueTotal); 223 }
198 log.info("===>> start dealTask");
199 /*dataSyncMsg1 = getDataSyncMsg(time, deviceType, mediaCode, mediaId, mediaName, playDurationValueTotal,
200 dataSyncMsg, msgData1, userTvDTO);*/
201 return dataSyncMsg1;
202 224
203 } 225 int size = attrList.size();
204 226
205 } 227 DataSyncMsg dataSyncMsg1 = null;
228
229 if (size > 0) {
230
231 for (int i = size-1; i >= 0; i--) {
232
233 Integer integer = attrList.get(i).get(0);
206 234
235 if (playDurationValueTotal >= integer) {
236 dataSyncMsg1 = getDataSyncMsg(time, deviceType, mediaCode, mediaId, mediaName, integer, dataSyncMsg,
237 msgData, userTvDTO);
238
239 this.taskDeal(dataSyncMsg1);
207 } 240 }
208 241
209 System.out.println(playContent); 242 }
210 break; 243
244 }
245
246 return dataSyncMsg1;
247 }
248
249 private Integer getRedisTotalKey(Map<Object, Object> hmget) {
250 Set<Object> objects = hmget.keySet();
251 return objects.size();
252 }
253
254 private Integer getRedisTotal(Map<Object, Object> hmget) {
255 Set<Object> objects = hmget.keySet();
256
257 Integer playDurationValueTotal_ = 0;
258
259 for (Object key_ : objects) {
260
261 if (key_.toString().equalsIgnoreCase("total")) {
262 playDurationValueTotal_ = Integer.valueOf(hmget.get(key_).toString());
263 return playDurationValueTotal_;
264
265 } else {
266
267 continue;
268
269 }
211 270
212 } 271 }
213 272
214 return null; 273 return playDurationValueTotal_;
274
215 } 275 }
216 276
217 private DataSyncMsg getDataSyncMsg(String time, Integer deviceType, String mediaCode, Long mediaId, String mediaName, 277 private DataSyncMsg getDataSyncMsg(String time, Integer deviceType, String mediaCode, Long mediaId, String mediaName,
......
1 package com.topdraw.mq.consumer; 1 package com.topdraw.mq.consumer;
2 2
3 import com.topdraw.config.RabbitMqConfig; 3 import com.alibaba.fastjson.JSON;
4 import com.alibaba.fastjson.JSONObject;
5 import com.rabbitmq.client.Channel;
4 import com.topdraw.mq.domain.DataSyncMsg; 6 import com.topdraw.mq.domain.DataSyncMsg;
5 import com.topdraw.resttemplate.RestTemplateClient; 7 import com.topdraw.resttemplate.RestTemplateClient;
8 import com.topdraw.util.FileUtil;
6 import com.topdraw.util.JSONUtil; 9 import com.topdraw.util.JSONUtil;
7 import lombok.extern.slf4j.Slf4j; 10 import lombok.extern.slf4j.Slf4j;
8 import org.springframework.amqp.core.ExchangeTypes; 11 import org.apache.commons.collections4.MapUtils;
12 import org.apache.commons.lang3.StringUtils;
13 import org.springframework.amqp.core.Message;
9 import org.springframework.amqp.rabbit.annotation.*; 14 import org.springframework.amqp.rabbit.annotation.*;
10 import org.springframework.beans.factory.annotation.Autowired; 15 import org.springframework.beans.factory.annotation.Autowired;
16 import org.springframework.beans.factory.annotation.Value;
11 import org.springframework.stereotype.Component; 17 import org.springframework.stereotype.Component;
12 import org.springframework.util.Assert; 18 import org.springframework.util.Assert;
13 19
20 import java.io.IOException;
21 import java.time.LocalDate;
22 import java.util.Map;
23 import java.util.Objects;
24
14 @Component 25 @Component
15 @Slf4j 26 @Slf4j
16 public class UcGatewayIptv2IptvConsumer { 27 public class UcGatewayIptv2IptvConsumer {
...@@ -21,6 +32,9 @@ public class UcGatewayIptv2IptvConsumer { ...@@ -21,6 +32,9 @@ public class UcGatewayIptv2IptvConsumer {
21 @Autowired 32 @Autowired
22 AutoRoute autoUser; 33 AutoRoute autoUser;
23 34
35 @Value("#{rabbitMqErrorLogConfig.getUcgError()}")
36 private Map<String, String> error;
37
24 /** 38 /**
25 * 事件 39 * 事件
26 * @param content 40 * @param content
...@@ -29,14 +43,37 @@ public class UcGatewayIptv2IptvConsumer { ...@@ -29,14 +43,37 @@ public class UcGatewayIptv2IptvConsumer {
29 * @date 2021/9/7 11:26 上午 43 * @date 2021/9/7 11:26 上午
30 */ 44 */
31 @RabbitHandler 45 @RabbitHandler
32 @RabbitListener(bindings = { 46 @RabbitListener(queues = "#{rabbitMqSourceConfig.getUcgEventQueue()}",
33 @QueueBinding(value = @Queue(value = RabbitMqConfig.GATEWAY_TO_SERVICE_DIRECT), 47 containerFactory = "#{rabbitMqSourceConfig.getUcgEventSource()}",
34 exchange = @Exchange(value = ExchangeTypes.DIRECT)) 48 autoStartup = "#{rabbitMqSourceConfig.getUcgEventStartUp()}",
35 }, containerFactory = "serviceRabbitListenerContainerFactory") 49 ackMode = "MANUAL")
36 public void ucEventConsumer(String content) { 50 public void eventConsumer(Channel channel, Message message, String content) throws IOException {
37 log.info(" receive dataSync msg , content is : {} ", content); 51 log.info(" receive dataSync msg , content is : {} ", content);
38 DataSyncMsg dataSyncMsg = this.parseContent(content); 52 try {
39 this.taskDeal(dataSyncMsg); 53 DataSyncMsg dataSyncMsg = this.parseContent(content);
54
55 this.taskDeal(dataSyncMsg);
56
57 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
58
59 } catch (Exception e) {
60
61 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
62
63 if (MapUtils.isNotEmpty(error)) {
64 String errorStart = this.error.get("start");
65
66 if (StringUtils.isEmpty(errorStart) || errorStart.equalsIgnoreCase("true")) {
67 String fileName = this.error.get("fileName")+"_"+ LocalDate.now() +".log";
68 String filePath = this.error.get("filePath");
69 String filePath1 = filePath+fileName;
70 FileUtil.writeStringToFile2(filePath1, content, e.getMessage());
71 }
72
73 }
74
75 e.printStackTrace();
76 }
40 log.info("ucEventConsumer ====>>>> end"); 77 log.info("ucEventConsumer ====>>>> end");
41 } 78 }
42 79
...@@ -60,4 +97,115 @@ public class UcGatewayIptv2IptvConsumer { ...@@ -60,4 +97,115 @@ public class UcGatewayIptv2IptvConsumer {
60 private void taskDeal(DataSyncMsg dataSyncMsg) { 97 private void taskDeal(DataSyncMsg dataSyncMsg) {
61 this.restTemplateClient.dealTask(dataSyncMsg); 98 this.restTemplateClient.dealTask(dataSyncMsg);
62 } 99 }
100
101
102 /**
103 * @description 添加收藏记录
104 * @param content 消息内容
105 */
106 @RabbitHandler
107 @RabbitListener(queues = "#{rabbitMqSourceConfig.getUcgCollectionQueue()}",
108 containerFactory = "#{rabbitMqSourceConfig.getUcgCollectionSource()}",
109 autoStartup = "#{rabbitMqSourceConfig.getUcgCollectionStartUp()}",
110 ackMode = "MANUAL")
111 public void collectionConsumer(Channel channel, Message message, String content) throws IOException {
112 log.info("receive UserCollection add message, content {}", content);
113
114 try {
115
116 JSONObject jsonObject = JSON.parseObject(content, JSONObject.class);
117 if (Objects.nonNull(content)) {
118 String evt = jsonObject.get("evt").toString();
119 String msgData = jsonObject.get("msgData").toString();
120 switch (evt.toUpperCase()) {
121 // 添加收藏
122 case "ADDCOLLECTION":
123 this.restTemplateClient.addCollection(msgData);
124 break;
125 // 删除收藏
126 case "DELETECOLLECTION":
127 this.restTemplateClient.deleteCollection(msgData);
128 break;
129 // 删除全部收藏
130 case "DELETEALLCOLLECTION":
131 this.restTemplateClient.deleteAllCollection(msgData);
132 break;
133 default:
134 break;
135
136 }
137 }
138
139 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
140
141 } catch (Exception e) {
142
143 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
144
145 if (MapUtils.isNotEmpty(error)) {
146 String errorStart = this.error.get("start");
147
148 if (errorStart.equalsIgnoreCase("true")) {
149 String fileName = this.error.get("fileName")+"_"+ LocalDate.now() +".log";
150 String filePath = this.error.get("filePath");
151 String filePath1 = filePath+fileName;
152 FileUtil.writeStringToFile2(filePath1, content, e.getMessage());
153 }
154
155 }
156
157 e.printStackTrace();
158 }
159 }
160
161 /**
162 * @description 处理观影记录
163 * @param content 消息内容
164 */
165 @RabbitHandler
166 @RabbitListener(queues = "#{rabbitMqSourceConfig.getViewRecordQueue()}",
167 containerFactory = "#{rabbitMqSourceConfig.getViewRecordSource()}",
168 autoStartup = "#{rabbitMqSourceConfig.getViewRecordStartUp()}",
169 ackMode = "MANUAL")
170 public void viewRecordConsumer(Channel channel, Message message, String content) throws IOException {
171 log.info("receive ViewRecord add message, content {}", content);
172
173 try {
174
175 JSONObject jsonObject = JSON.parseObject(content, JSONObject.class);
176 if (Objects.nonNull(content)) {
177 String evt = jsonObject.get("evt").toString();
178 String msgData = jsonObject.get("msgData").toString();
179 switch (evt.toUpperCase()) {
180 // 添加收藏
181 case "VIEWRECORD":
182 this.restTemplateClient.dealViewRecord(msgData);
183 break;
184 default:
185 break;
186
187 }
188 }
189
190 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
191
192 } catch (Exception e) {
193
194 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
195
196 if (MapUtils.isNotEmpty(error)) {
197 String errorStart = this.error.get("start");
198
199 if (errorStart.equalsIgnoreCase("true")) {
200 String fileName = this.error.get("fileName")+"_"+ LocalDate.now() +".log";
201 String filePath = this.error.get("filePath");
202 String filePath1 = filePath+fileName;
203 FileUtil.writeStringToFile2(filePath1, content, e.getMessage());
204 }
205
206 }
207
208 e.printStackTrace();
209 }
210 }
63 } 211 }
......
...@@ -2,20 +2,23 @@ package com.topdraw.mq.consumer; ...@@ -2,20 +2,23 @@ package com.topdraw.mq.consumer;
2 2
3 3
4 import com.alibaba.fastjson.JSONObject; 4 import com.alibaba.fastjson.JSONObject;
5 import com.topdraw.config.RabbitMqConfig; 5 import com.rabbitmq.client.Channel;
6 import com.topdraw.mq.domain.SubscribeBean; 6 import com.topdraw.mq.domain.SubscribeBean;
7 import com.topdraw.resttemplate.RestTemplateClient; 7 import com.topdraw.resttemplate.RestTemplateClient;
8 import com.topdraw.utils.RedisUtils; 8 import com.topdraw.util.FileUtil;
9 import com.topdraw.utils.StringUtils;
10 import lombok.extern.slf4j.Slf4j; 9 import lombok.extern.slf4j.Slf4j;
11 import org.springframework.amqp.core.ExchangeTypes; 10 import org.apache.commons.collections4.MapUtils;
11 import org.springframework.amqp.core.Message;
12 import org.springframework.amqp.rabbit.annotation.*; 12 import org.springframework.amqp.rabbit.annotation.*;
13 import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
14 import org.springframework.beans.factory.annotation.Autowired; 13 import org.springframework.beans.factory.annotation.Autowired;
15 import org.springframework.beans.factory.annotation.Value; 14 import org.springframework.beans.factory.annotation.Value;
16 import org.springframework.stereotype.Component; 15 import org.springframework.stereotype.Component;
17 import org.springframework.transaction.annotation.Transactional; 16 import org.springframework.transaction.annotation.Transactional;
18 17
18 import java.io.IOException;
19 import java.time.LocalDate;
20 import java.util.Map;
21
19 /** 22 /**
20 * 微信事件 23 * 微信事件
21 */ 24 */
...@@ -28,42 +31,9 @@ public class WeiXinEventConsumer { ...@@ -28,42 +31,9 @@ public class WeiXinEventConsumer {
28 31
29 private static final String QR_CODE_URL = "QR_CODE_URL_"; 32 private static final String QR_CODE_URL = "QR_CODE_URL_";
30 33
31 /** 34 @Value("#{rabbitMqErrorLogConfig.getWechatError()}")
32 * @description 删除用户收藏记录 35 private Map<String, String> error;
33 * @param content 消息内容
34 */
35 /*@RabbitHandler
36 @RabbitListener(bindings = {
37 @QueueBinding(value = @Queue(value = RabbitMqConfig.COLLECTION_DELETE_QUEUE),
38 exchange = @Exchange(value = ExchangeTypes.DIRECT))},
39 containerFactory = "managementRabbitListenerContainerFactory")*/
40 public void deleteCollection(String content) {
41 try {
42 log.info("receive UserCollection delete message, content {}", content);
43 this.restTemplateClient.deleteCollection(content);
44 } catch (Exception e) {
45 log.error("CollectionDeleteConsumer || UserCollection delete error || {}", e.toString(), e);
46 }
47 }
48 36
49 /**
50 * @description 删除全部收藏记录
51 * @param content 消息内容
52 */
53 /*@RabbitHandler
54 @RabbitListener(bindings = {
55 @QueueBinding(value = @Queue(value = RabbitMqConfig.COLLECTION_DELETE_ALL_QUEUE),
56 exchange = @Exchange(value = ExchangeTypes.DIRECT))},
57 containerFactory = "managementRabbitListenerContainerFactory")*/
58 @Transactional
59 public void deleteAllCollection(String content) {
60 try {
61 log.info("receive UserCollection delete all message, content {}", content);
62 this.restTemplateClient.deleteAllCollection(content);
63 } catch (Exception e) {
64 log.error("CollectionDeleteConsumer || UserCollection delete all error || {}", e.toString(), e);
65 }
66 }
67 37
68 /** 38 /**
69 * 关注和取关事件 39 * 关注和取关事件
...@@ -74,13 +44,12 @@ public class WeiXinEventConsumer { ...@@ -74,13 +44,12 @@ public class WeiXinEventConsumer {
74 * } 44 * }
75 * @param content 45 * @param content
76 */ 46 */
77 /* @RabbitHandler 47 @RabbitHandler
78 @RabbitListener(bindings = { 48 @RabbitListener(queues = "#{rabbitMqSourceConfig.getWechatQueue()}",
79 @QueueBinding(value = @Queue(value = RabbitMqConfig.WEIXIN_SUBORUNSUB_QUEUE), 49 containerFactory = "#{rabbitMqSourceConfig.getWechatSource()}",
80 exchange = @Exchange(value = ExchangeTypes.DIRECT))}, 50 autoStartup = "#{rabbitMqSourceConfig.getWechatStartUp()}", ackMode = "MANUAL")
81 containerFactory = "managementRabbitListenerContainerFactory")*/
82 @Transactional 51 @Transactional
83 public void subOrUnSubEvent(String content) { 52 public void subOrUnSubEvent(Channel channel, Message message, String content) throws IOException {
84 try { 53 try {
85 log.info("receive wxu subOrUnSub message, content {}", content); 54 log.info("receive wxu subOrUnSub message, content {}", content);
86 JSONObject jsonObject = JSONObject.parseObject(content); 55 JSONObject jsonObject = JSONObject.parseObject(content);
...@@ -109,30 +78,31 @@ public class WeiXinEventConsumer { ...@@ -109,30 +78,31 @@ public class WeiXinEventConsumer {
109 if (event.equals("unsubscribe")) 78 if (event.equals("unsubscribe"))
110 this.restTemplateClient.unsubscribe(subscribeBean); 79 this.restTemplateClient.unsubscribe(subscribeBean);
111 80
81 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
112 } 82 }
113 83
114 } catch (Exception e) { 84 } catch (Exception e) {
115 log.error("WXSubscribeConsumer || subOrUnSub msg error || {} || {}", content, e.getMessage()); 85 log.error("WXSubscribeConsumer || subOrUnSub msg error || {} || {}", content, e.getMessage());
116 }
117 }
118 86
119 /** 87 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
120 * @description 添加收藏记录 88
121 * @param content 消息内容 89 if (MapUtils.isNotEmpty(error)) {
122 */ 90 String errorStart = this.error.get("start");
123 /*@RabbitHandler 91
124 @RabbitListener(bindings = { 92 if (errorStart.equalsIgnoreCase("true")) {
125 @QueueBinding(value = @Queue(value = RabbitMqConfig.COLLECTION_ADD_QUEUE), 93 String fileName = this.error.get("fileName")+"_"+LocalDate.now() +".log";
126 exchange = @Exchange(value = ExchangeTypes.DIRECT))}, 94 String filePath = this.error.get("filePath");
127 containerFactory = "managementRabbitListenerContainerFactory")*/ 95 String filePath1 = filePath+fileName;
128 @Transactional 96 FileUtil.writeStringToFile2(filePath1, content, e.getMessage());
129 public void addCollection(String content) { 97 }
130 try { 98
131 log.info("receive UserCollection add message, content {}", content); 99 }
132 this.restTemplateClient.addCollection(content); 100
133 } catch (Exception e) { 101 e.printStackTrace();
134 log.error("CollectionAddConsumer || UserCollection add error || {}", e.toString(), e); 102 log.info("ucEventConsumer ====>>>> end");
135 } 103 }
136 } 104 }
137 105
106
107
138 } 108 }
......
1 package com.topdraw.mq.domain;
2
3 import lombok.Getter;
4
5 // 实体类型
6 @Getter
7 public enum EntityType {
8
9 // 用户
10 PERSON,
11 // 会员
12 MEMBER,
13 // 小屏-微信
14 WEIXIN_USER,
15 // 大屏
16 VIS_USER,
17 // 地址
18 PERSON_ADDRESS
19
20 }
1 package com.topdraw.mq.domain;
2
3 // 关注的事件
4 public enum EventType {
5
6 // 关注
7 SUBSCRIBE,
8 // 取消关注
9 UN_SUBSCRIBE,
10 // 绑定
11 BIND,
12 // 取消绑定
13 UN_BIND,
14 // 变更主账号
15 C_M_ACCOUNT,
16 // 获取积分
17 GAIN_POINT,
18 // 消耗积分
19 CONSUME_POINT,
20 // 观影
21 VIEWING
22
23 }
...@@ -173,4 +173,19 @@ public class RestTemplateClient { ...@@ -173,4 +173,19 @@ public class RestTemplateClient {
173 log.info("uc response: " + entityBody);*/ 173 log.info("uc response: " + entityBody);*/
174 return null; 174 return null;
175 } 175 }
176
177 public String dealViewRecord(String content) {
178 String url = BASE_URL + "/uce/userOperation/addCollection";
179 //处理接口调用 中文不显示问题
180 content = new String(Base64.getEncoder().encode(content.getBytes(StandardCharsets.UTF_8)));
181
182 restTemplate.postForEntity(url, content, String.class);
183 /* ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, content, String.class);
184 String entityBody = "";
185 if (responseEntity.getStatusCode().is2xxSuccessful()) {
186 entityBody = responseEntity.getBody();
187 }
188 log.info("uc response: " + entityBody);*/
189 return null;
190 }
176 } 191 }
......
1 package com.topdraw.util;
2
3 import java.io.BufferedWriter;
4 import java.io.File;
5 import java.io.FileWriter;
6 import java.io.IOException;
7 import java.time.LocalDateTime;
8
9 /**
10 * @author :
11 * @description:
12 * @function :
13 * @date :Created in 2022/5/12 11:43
14 * @version: :
15 * @modified By:
16 * @since : modified in 2022/5/12 11:43
17 */
18 public class FileUtil {
19
20 private static void createFile(String filePath){
21 File testFile = new File(filePath);
22 File fileParent = testFile.getParentFile();//返回的是File类型,可以调用exsit()等方法
23 //String fileParentPath = testFile.getParent();//返回的是String类型
24
25 if (!fileParent.exists()) {
26 fileParent.mkdirs();// 能创建多级目录
27 }
28
29 if (!testFile.exists()) {
30 try {
31 testFile.createNewFile();//有路径才能创建文件
32 } catch (IOException e) {
33 e.printStackTrace();
34 }
35 }
36 }
37
38 public static void writeStringToFile2(String filePath, String content, String error) {
39 try {
40 String property = System.getProperty("user.dir");
41 filePath = property + filePath;
42 createFile(filePath);
43
44 FileWriter fw = new FileWriter(filePath, true);
45 BufferedWriter bw = new BufferedWriter(fw);
46 bw.append(LocalDateTime.now()+"\n");
47 bw.write("【content】==>> \n"+content+"\n");
48 bw.write("【error】==>> \n"+error+"\n");
49 bw.write("----------------------------------------------------------------\n");
50 bw.close();
51 fw.close();
52 } catch (Exception e) {
53 e.printStackTrace();
54 }
55 }
56
57 }
1 package com.topdraw.util;
2
3 import java.util.ArrayList;
4 import java.util.List;
5
6 /**
7 * @author :
8 * @description:
9 * @function :
10 * @date :Created in 2022/5/5 12:57
11 * @version: :
12 * @modified By:
13 * @since : modified in 2022/5/5 12:57
14 */
15 public class ListUtil {
16
17 public static List<List<Integer>> sortList(List<List<Integer>> list, String desc) {
18
19
20 return null;
21 }
22
23 public static boolean containsValue(List<Integer> list, Integer value) {
24
25
26 return false;
27 }
28
29 public static void main(String[] args) {
30 List<List<Integer>> listList = new ArrayList<>();
31
32 List<Integer> integerList = new ArrayList<>();
33 integerList.add(1);
34 integerList.add(30);
35
36 List<Integer> integerList1 = new ArrayList<>();
37 integerList1.add(31);
38 integerList1.add(60);
39
40 listList.add(integerList);
41 listList.add(integerList1);
42
43 // Integer firstMaxInteger = listList.get(0).get;
44 /*Integer firstMaxInteger = 0;
45 for (List<Integer> integers : listList) {
46
47 firstMaxInteger = integers.get(integerList.size() - 1);
48
49
50 }*/
51 }
52
53 }
1 package com.topdraw.util;
2
3
4 import org.redisson.api.RLock;
5
6 public class RedissonUtil {
7
8 public static void lock(RLock rLock){
9 rLock.lock();
10 }
11
12
13 public static void unlock(RLock rLock){
14 if (rLock.isLocked() && rLock.isHeldByCurrentThread())
15 rLock.unlock();
16 }
17
18
19 }
1 spring: 1 spring:
2 # 数据源 2 # 数据源
3 datasource: 3 datasource:
4 # 数据源地址
5 # url: jdbc:log4jdbc:mysql://139.196.192.242:3306/tj_user_0819?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false
6 # # 用户名
7 # username: root
8 # # 密码
9 # password: Tjlh@2017
10 url: jdbc:log4jdbc:mysql://122.112.214.149:3306/tj_user_iptv?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false 4 url: jdbc:log4jdbc:mysql://122.112.214.149:3306/tj_user_iptv?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false
11 username: root 5 username: root
12 password: root 6 password: root
...@@ -70,47 +64,92 @@ mutil-mq: ...@@ -70,47 +64,92 @@ mutil-mq:
70 service: 64 service:
71 # ip 65 # ip
72 host: 122.112.214.149 66 host: 122.112.214.149
67 # host: 139.196.145.150
73 # 端口 68 # 端口
74 port: 5672 69 port: 5672
75 # 用户名 70 # 用户名
76 username: guest 71 username: guest
72 # username: admin
77 # 密码 73 # 密码
78 password: guest 74 password: guest
75 # password: Topdraw1qaz
79 # 虚拟空间 76 # 虚拟空间
80 virtual-host: member_center 77 virtual-host: member_center_chongshu
81 # host: 139.196.145.150 # rabbitmq的连接地址
82 # port: 5672 # rabbitmq的连接端口号
83 # virtual-host: member_center # rabbitmq的虚拟hosthhh
84 # username: admin # rabbitmq的用户名
85 # password: Topdraw1qaz # rabbitmq的密码
86 publisher-confirms: true #如果对异步消息需要回调必须设置为true 78 publisher-confirms: true #如果对异步消息需要回调必须设置为true
87 79
88 # 管理侧 80 # 管理侧
89 management: 81 management:
90 # ip 82 # host: 122.112.214.149 # rabbitmq的连接地址
91 # host: 139.196.145.150
92 # # 端口
93 # port: 5672
94 # # 用户名
95 # username: admin
96 # # 密码
97 # password: Topdraw1qaz
98 # # 虚拟空间
99 # virtual-host: member_center
100 host: 122.112.214.149 # rabbitmq的连接地址 83 host: 122.112.214.149 # rabbitmq的连接地址
101 port: 5672 # rabbitmq的连接端口号 84 port: 5672 # rabbitmq的连接端口号
102 virtual-host: member_center # rabbitmq的虚拟hosthhh 85 virtual-host: none # rabbitmq的虚拟host
103 username: guest # rabbitmq的用户名 86 username: guest # rabbitmq的用户名
104 password: guest # rabbitmq的密码 87 password: guest # rabbitmq的密码
88 # username: admin # rabbitmq的用户名
89 # password: Topdraw1qaz # rabbitmq的密码
105 publisher-confirms: true #如果对异步消息需要回调必须设置为true 90 publisher-confirms: true #如果对异步消息需要回调必须设置为true
106 91
107 # 服务属性 92 # 服务属性
108 service: 93 service:
109 #平台类型 service: 服务侧 management: 管理侧 94 mq:
110 platform: service 95 list:
111 # 服务域 mobile:小屏侧 vis:大屏侧 96 - source: event
112 type: vis 97 exchange: event.exchange
98 queue: event.queue
99 exchange-type: direct
100 routing-key:
101 active:
102 - source: collection
103 exchange: collection.exchange
104 queue: collection.queue
105 exchange-type: direct
106 routing-key:
107 active:
108 - source: viewRecord
109 exchange: viewRecord.exchange
110 queue: viewRecord.queue
111 exchange-type: direct
112 routing-key:
113 active:
114 - source: eventBus
115 exchange: uc.eventbus
116 queue: uc.eventbus
117 exchange-type: topic
118 routing-key: uc.eventbus.*.topic
119 active:
120 - source: uce
121 exchange: uce.exchange
122 queue: uce.queue
123 exchange-type: direct
124 routing-key:
125 active: service
126 - source: wechat
127 exchange: wechat.exchange
128 queue: weixin.subOrUnSub.queue
129 exchange-type: direct
130 routing-key:
131 active:
132 error:
133 logs:
134 list:
135 - type: eventBus
136 filePath: /logs/mq/eventBus/
137 fileName: error
138 start: on
139 - type: ucg
140 filePath: /logs/mq/ucg/
141 fileName: error
142 start: on
143 - type: uce
144 filePath: /logs/mq/uce/
145 fileName: error
146 start: on
147 - type: wechat
148 filePath: /logs/mq/wechat/
149 fileName: error
150 start: on
113 151
114 # uc-engine服务地址 152 # uc-engine服务地址
115 api: 153 api:
116 baseUrl: http://127.0.0.1:8447
...\ No newline at end of file ...\ No newline at end of file
154 baseUrl: http://127.0.0.1:8447
155
......
...@@ -2,8 +2,16 @@ package com.topdraw.resttemplate; ...@@ -2,8 +2,16 @@ package com.topdraw.resttemplate;
2 2
3 import com.alibaba.fastjson.JSONObject; 3 import com.alibaba.fastjson.JSONObject;
4 import com.topdraw.BaseTest; 4 import com.topdraw.BaseTest;
5 import org.apache.poi.hssf.usermodel.HSSFWorkbook;
6 import org.apache.poi.util.IOUtils;
5 import org.junit.Test; 7 import org.junit.Test;
6 import org.springframework.beans.factory.annotation.Autowired; 8 import org.springframework.beans.factory.annotation.Autowired;
9 import org.springframework.context.ApplicationContext;
10
11 import java.io.*;
12 import java.lang.reflect.Field;
13 import java.time.LocalDate;
14 import java.time.LocalDateTime;
7 15
8 public class RestTemplateTest extends BaseTest { 16 public class RestTemplateTest extends BaseTest {
9 17
...@@ -16,4 +24,92 @@ public class RestTemplateTest extends BaseTest { ...@@ -16,4 +24,92 @@ public class RestTemplateTest extends BaseTest {
16 System.out.println(memberInfo); 24 System.out.println(memberInfo);
17 } 25 }
18 26
27 @Test
28 public void error(){
29 String msg3 = "{\n" +
30 " \"evt\": \"play\", \n" +
31 " \"deviceType\": 1, \n" +
32 // " \"time\": \"2022-04-14 00:10:09\",\n" +
33 " \"time\": \"2022-05-03 23:10:09\",\n" +
34 " \"msgData\": {\n" +
35 " \"platformAccount\": \"topdraw\", \n" +
36 " \"playDuration\": "+23+", \n" +
37 " \"mediaCode\": \"\", \n" +
38 " \"mediaName\": \"\"\n" +
39 " }\n" +
40 " }";
41
42 String classpathUrlPrefix = ApplicationContext.CLASSPATH_URL_PREFIX;
43 String panfu = "";
44 // String filePath = classpathUrlPrefix+"/mq/play/error.log";
45 String property = System.getProperty("user.dir");
46 String separator = File.separator;
47 System.out.println(property);
48 String fileName = "error_"+ LocalDate.now() +".log";
49 String filePath = property+separator+"logs"+separator+"mq"+separator+fileName;
50 createFile(filePath);
51 this.writeStringToFile2(filePath, msg3);
52
53 String a = "{\n" +
54 "\"data\": [\n" +
55 "\t{\n" +
56 "\t\"app_id\": 57,\n" +
57 "\t\"user_id\": 1,\n" +
58 "\t\"type\": 1,\n" +
59 "\t\"name\": \"PersonalCollectionRecords\",\n" +
60 "\t\"count\": 22,\n" +
61 "\t\"images\": \"{\\\"map\\\":{\\\"poster\\\":[0]},\\\"list\\\":[{\\\"id\\\":47422,\\\"type\\\":2,\\\"width\\\":222,\\\"height\\\":294,\\\"fileUrl\\\":\\\"upload/image/media/2020-07-30/9a8a02db-9444-4bff-ba54-ea784ae4f88c.jpg\\\",\\\"size\\\":104643}]}\",\n" +
62 "\t\"id\": 756756,\n" +
63 "\t\"user_collection_id\": 1,\n" +
64 "\t\"detail_folder_code\": \"Default\",\n" +
65 "\t\"detail_type\": \"MEDIA\",\n" +
66 "\t\"detail_id\": 46532,\n" +
67 "\t\"detail_code\": \"media_558bc45a-5480-46ec-be9a-c749ffdbdf49\",\n" +
68 "\t\"detail_name\": \"熊出没之探险日记2\",\n" +
69 "\t\"detail_total_index\": 40,\n" +
70 "\t\"detail_sequence\": 1,\n" +
71 "\t\"create_time\": 1644503167000,\n" +
72 "\t\"update_time\": 1644503167000\n" +
73 "\t}\n" +
74 "\t],\n" +
75 "\"platformAccount\": \"topdraw\"\n" +
76 "}";
77 }
78
79 private void createFile(String filePath){
80 File testFile = new File(filePath);
81 File fileParent = testFile.getParentFile();//返回的是File类型,可以调用exsit()等方法
82 String fileParentPath = testFile.getParent();//返回的是String类型
83 System.out.println("fileParent:" + fileParent);
84 System.out.println("fileParentPath:" + fileParentPath);
85 if (!fileParent.exists()) {
86 fileParent.mkdirs();// 能创建多级目录
87 }
88
89 if (!testFile.exists()) {
90 try {
91 testFile.createNewFile();//有路径才能创建文件
92 } catch (IOException e) {
93 e.printStackTrace();
94 }
95 }
96
97 System.out.println(testFile);
98 }
99
100 public void writeStringToFile2(String filePath, String error) {
101 try {
102 FileWriter fw = new FileWriter(filePath, true);
103 BufferedWriter bw = new BufferedWriter(fw);
104 bw.append(LocalDateTime.now()+"=>"+error+"\n");
105 //bw.write("我是");// 往已有的文件上添加字符串
106 //bw.write("程序猿\n ");
107 bw.close();
108 fw.close();
109 } catch (Exception e) {
110 // TODO Auto-generated catch block
111 e.printStackTrace();
112 }
113 }
114
19 } 115 }
......