Commit 1a39d208 1a39d2088c557b8a799be958bad87ae0262c8356 by xianghan

1.优化mq配置,添加默认数据源和默认队列

1 parent aae2399c
......@@ -18,6 +18,7 @@ import org.springframework.context.annotation.Primary;
import javax.naming.ConfigurationException;
import java.util.Map;
import java.util.Objects;
@Data
@Configuration
......@@ -166,32 +167,38 @@ public class RabbitMqSourceConfig {
private Map<String, String> ucgEventInfo;
public String getUcgEventQueue(){
if (MapUtils.isNotEmpty(ucgEventInfo)) {
String queue = ucgEventInfo.get("queue");
return queue;
if (Objects.nonNull(ucgEventInfo)) {
if (MapUtils.isNotEmpty(ucgEventInfo)) {
String queue = ucgEventInfo.get("queue");
return queue;
}
}
return EVENT_QUEUE;
}
public String getUcgEventSource(){
if (MapUtils.isNotEmpty(ucgEventInfo)) {
String source = ucgEventInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return this.chargeSource(source);
} else {
return "";
if (Objects.nonNull(ucgEventInfo)) {
if (MapUtils.isNotEmpty(ucgEventInfo)) {
String source = ucgEventInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return this.chargeSource(source);
} else {
return SERVICE_;
}
}
}
return null;
return SERVICE_;
}
public String getUcgEventStartUp(){
if (MapUtils.isNotEmpty(ucgEventInfo)) {
String source = ucgEventInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return "true";
if (Objects.nonNull(ucgEventInfo)) {
if (MapUtils.isNotEmpty(ucgEventInfo)) {
String source = ucgEventInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return "true";
}
}
}
......@@ -209,32 +216,38 @@ public class RabbitMqSourceConfig {
private Map<String, String> ucgIptvCollectionInfo;
public String getUcgCollectionQueue(){
if (MapUtils.isNotEmpty(ucgIptvCollectionInfo)) {
String queue = ucgIptvCollectionInfo.get("queue");
return queue;
if (Objects.nonNull(ucgIptvCollectionInfo)) {
if (MapUtils.isNotEmpty(ucgIptvCollectionInfo)) {
String queue = ucgIptvCollectionInfo.get("queue");
return queue;
}
}
return COLLECTION_QUEUE;
}
public String getUcgCollectionSource(){
if (MapUtils.isNotEmpty(ucgIptvCollectionInfo)) {
String source = ucgIptvCollectionInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return this.chargeSource(source);
} else {
return "";
if (Objects.nonNull(ucgIptvCollectionInfo)) {
if (MapUtils.isNotEmpty(ucgIptvCollectionInfo)) {
String source = ucgIptvCollectionInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return this.chargeSource(source);
} else {
return SERVICE_;
}
}
}
return null;
return SERVICE_;
}
public String getUcgCollectionStartUp(){
if (MapUtils.isNotEmpty(ucgIptvCollectionInfo)) {
String source = ucgIptvCollectionInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return "true";
if (Objects.nonNull(ucgIptvCollectionInfo)) {
if (MapUtils.isNotEmpty(ucgIptvCollectionInfo)) {
String source = ucgIptvCollectionInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return "true";
}
}
}
......@@ -248,32 +261,38 @@ public class RabbitMqSourceConfig {
private Map<String, String> viewRecordInfo;
public String getViewRecordQueue(){
if (MapUtils.isNotEmpty(viewRecordInfo)) {
String queue = viewRecordInfo.get("queue");
return queue;
if (Objects.nonNull(viewRecordInfo)) {
if (MapUtils.isNotEmpty(viewRecordInfo)) {
String queue = viewRecordInfo.get("queue");
return queue;
}
}
return VIEW_RECORD_QUEUE;
}
public String getViewRecordSource(){
if (MapUtils.isNotEmpty(viewRecordInfo)) {
String source = viewRecordInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return this.chargeSource(source);
} else {
return "";
if (Objects.nonNull(viewRecordInfo)) {
if (MapUtils.isNotEmpty(viewRecordInfo)) {
String source = viewRecordInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return this.chargeSource(source);
} else {
return SERVICE_;
}
}
}
return null;
return SERVICE_;
}
public String getViewRecordStartUp(){
if (MapUtils.isNotEmpty(viewRecordInfo)) {
String source = viewRecordInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return "true";
if (Objects.nonNull(viewRecordInfo)) {
if (MapUtils.isNotEmpty(viewRecordInfo)) {
String source = viewRecordInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return "true";
}
}
}
......@@ -289,32 +308,38 @@ public class RabbitMqSourceConfig {
private Map<String, String> uceInfo;
public String getUceQueue(){
if (MapUtils.isNotEmpty(uceInfo)) {
String queue = uceInfo.get("queue");
return queue;
if (Objects.nonNull(uceInfo)) {
if (MapUtils.isNotEmpty(uceInfo)) {
String queue = uceInfo.get("queue");
return queue;
}
}
return UCE_QUEUE;
}
public String getUceSource(){
if (MapUtils.isNotEmpty(uceInfo)) {
String source = uceInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return this.chargeSource(source);
} else {
return "";
if (Objects.nonNull(uceInfo)) {
if (MapUtils.isNotEmpty(uceInfo)) {
String source = uceInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return this.chargeSource(source);
} else {
return MANAGEMENT_;
}
}
}
return null;
return MANAGEMENT_;
}
public String getUceStartUp(){
if (MapUtils.isNotEmpty(uceInfo)) {
String source = uceInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return "true";
if (Objects.nonNull(uceInfo)) {
if (MapUtils.isNotEmpty(uceInfo)) {
String source = uceInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return "true";
}
}
}
......@@ -330,35 +355,39 @@ public class RabbitMqSourceConfig {
private Map<String, String> eventBusInfo;
public String getEventBusQueue(){
if (MapUtils.isNotEmpty(eventBusInfo)) {
String queue = eventBusInfo.get("queue");
return queue;
if (Objects.nonNull(eventBusInfo)) {
if (MapUtils.isNotEmpty(eventBusInfo)) {
String queue = eventBusInfo.get("queue");
return queue;
}
}
return UC_EVENTBUS_QUEUE;
}
public String getEventBusSource(){
if (MapUtils.isNotEmpty(eventBusInfo)) {
String source = eventBusInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return this.chargeSource(source);
} else {
return "";
if (Objects.nonNull(eventBusInfo)) {
if (MapUtils.isNotEmpty(eventBusInfo)) {
String source = eventBusInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return this.chargeSource(source);
} else {
return SERVICE_;
}
}
}
return null;
return SERVICE_;
}
public String getEventBusStartUp(){
if (MapUtils.isNotEmpty(eventBusInfo)) {
String source = eventBusInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return "true";
if (Objects.nonNull(eventBusInfo)) {
if (MapUtils.isNotEmpty(eventBusInfo)) {
String source = eventBusInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return "true";
}
}
}
return "false";
}
......@@ -371,32 +400,37 @@ public class RabbitMqSourceConfig {
private Map<String, String> wechatInfo;
public String getWechatQueue(){
if (MapUtils.isNotEmpty(wechatInfo)) {
String queue = wechatInfo.get("queue");
return queue;
if (Objects.nonNull(wechatInfo)) {
if (MapUtils.isNotEmpty(wechatInfo)) {
String queue = wechatInfo.get("queue");
return queue;
}
}
return WEIXIN_SUBORUNSUB_QUEUE;
}
public String getWechatSource(){
if (MapUtils.isNotEmpty(wechatInfo)) {
String source = wechatInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return this.chargeSource(source);
} else {
return "";
if (Objects.nonNull(wechatInfo)) {
if (MapUtils.isNotEmpty(wechatInfo)) {
String source = wechatInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return this.chargeSource(source);
} else {
return SERVICE_;
}
}
}
return null;
return SERVICE_;
}
public String getWechatStartUp(){
if (MapUtils.isNotEmpty(wechatInfo)) {
String source = wechatInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return "true";
if (Objects.nonNull(wechatInfo)) {
if (MapUtils.isNotEmpty(wechatInfo)) {
String source = wechatInfo.get("active");
if (StringUtils.isNotBlank(source)) {
return "true";
}
}
}
......
......@@ -79,7 +79,7 @@ mutil-mq:
# password: Topdraw1qaz
# 虚拟空间
# virtual-host: member_center_iptv_sichuan
virtual-host: member_center_iptv_chongshu
virtual-host: user_center
publisher-confirms: true #如果对异步消息需要回调必须设置为true
# 管理侧
......@@ -99,42 +99,60 @@ mutil-mq:
service:
mq:
list:
- source: event
exchange: event.exchange
queue: event.queue
# - source: event
# exchange: event.exchange
# queue: event.queue
# exchange-type: direct
# routing-key:
# active: service
- source: collection
exchange: userCenter_exchange
queue: queue.collection.add
exchange-type: direct
routing-key:
routing-key: route.UserCollection.add
active: service
- source: collection
exchange: collection.exchange
queue: collection.queue
exchange: userCenter_exchange
queue: queue.collection.delete
exchange-type: direct
routing-key: service
routing-key: route.UserCollection.delete
active: service
- source: viewRecord
exchange: viewRecord.exchange
queue: viewRecord.queue
- source: collection
exchange: userCenter_exchange
queue: queue.collection.deleteall
exchange-type: direct
routing-key:
routing-key: route.UserCollection.deleteall
active: service
- source: eventBus
exchange: uc.eventbus
queue: uc.eventbus
exchange-type: topic
routing-key: uc.eventbus.*.topic
- source: collection
exchange: exchange.collection
queue: collection.queue
exchange-type: direct
routing-key:
active: service
# - source: viewRecord
# exchange: viewRecord.exchange
# queue: viewRecord.queue
# exchange-type: direct
# routing-key:
# active: service
# - source: eventBus
# exchange: uc.eventbus
# queue: uc.eventbus
# exchange-type: topic
# routing-key: uc.eventbus.*.topic
# active: service
- source: uce
exchange: uce.exchange
queue: uce.queue
exchange-type: direct
routing-key:
active: management
- source: wechat
exchange: weixin.subOrUnSub.direct
queue: weixin.subOrUnSub.queue
exchange-type: direct
routing-key:
active:
# - source: wechat
# exchange: weixin.subOrUnSub.direct
# queue: weixin.subOrUnSub.queue
# exchange-type: direct
# routing-key:
# active: active
error:
logs:
list:
......