Skip to content
Toggle navigation
Toggle navigation
This project
Loading...
Sign in
向汉
/
uc-consumer
Go to a project
Toggle navigation
Toggle navigation pinning
Projects
Groups
Snippets
Help
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Issue Boards
Files
Commits
Network
Compare
Branches
Tags
Commit
7bee7486
...
7bee74860b6093482769176699e11a84a47ee1fd
authored
2022-04-07 13:16:03 +0800
by
xianghan
Browse Files
Options
Browse Files
Tag
Download
Email Patches
Plain Diff
1.优化
1 parent
32ee9bbe
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
28 additions
and
135 deletions
src/main/java/com/topdraw/config/RabbitMqConfig.java
src/main/java/com/topdraw/mq/consumer/UcEngineManagementEventConsumer.java
src/main/java/com/topdraw/mq/consumer/UcEventBusConsumer.java
src/main/java/com/topdraw/mq/consumer/UcGatewayEventConsumer.java
src/main/java/com/topdraw/mq/consumer/WeiXinEventConsumer.java
src/main/resources/config/application-dev.yml
src/main/java/com/topdraw/config/RabbitMqConfig.java
View file @
7bee748
...
...
@@ -18,24 +18,6 @@ import org.springframework.context.annotation.Primary;
@Configuration
public
class
RabbitMqConfig
{
//////////////////////////////////////////////////////// OMO 小屏///////////////////////////////////////////////////////////////
/** 获取带参二维码(大屏->小屏) */
public
static
final
String
GET_QR_CODE_QUEUE
=
"queue.qrCode.get"
;
/** 删除全部收藏队列(大屏->小屏) */
public
static
final
String
COLLECTION_DELETE_ALL_QUEUE
=
"queue.collection.deleteall"
;
/** 添加收藏队列(大屏->小屏) */
public
static
final
String
COLLECTION_ADD_QUEUE
=
"queue.collection.add"
;
/** 删除收藏队列(大屏->小屏) */
public
static
final
String
COLLECTION_DELETE_QUEUE
=
"queue.collection.delete"
;
/** 微信侧 公众号关注与取消关注 */
public
static
final
String
WEIXIN_SUBORUNSUB_QUEUE
=
"weixin.subOrUnSub.queue"
;
@Value
(
"${mutil-mq.service.host}"
)
private
String
serviceHost
;
@Value
(
"${mutil-mq.service.port}"
)
...
...
@@ -205,4 +187,18 @@ public class RabbitMqConfig {
.
to
(
eventBusExchange
)
.
with
(
UC_EVENTBUS_KEY
);
}
/**************************************************跨屏数据*************************************************************/
/** 删除全部收藏队列(大屏->小屏) */
public
static
final
String
COLLECTION_DELETE_ALL_QUEUE
=
"queue.collection.deleteall"
;
/** 添加收藏队列(大屏->小屏) */
public
static
final
String
COLLECTION_ADD_QUEUE
=
"queue.collection.add"
;
/** 删除收藏队列(大屏->小屏) */
public
static
final
String
COLLECTION_DELETE_QUEUE
=
"queue.collection.delete"
;
/** 微信侧 公众号关注与取消关注 */
public
static
final
String
WEIXIN_SUBORUNSUB_QUEUE
=
"weixin.subOrUnSub.queue"
;
}
...
...
src/main/java/com/topdraw/mq/consumer/UcEngineManagementEventConsumer.java
View file @
7bee748
...
...
@@ -28,11 +28,11 @@ public class UcEngineManagementEventConsumer {
* @author Hongyan Wang
* @date 2021/9/7 11:26 上午
*/
@RabbitHandler
/*
@RabbitHandler
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = RabbitMqConfig.ENGINE_TO_SERVICE_DIRECT),
exchange = @Exchange(value = ExchangeTypes.DIRECT))
},
containerFactory
=
"serviceRabbitListenerContainerFactory"
)
}, containerFactory = "serviceRabbitListenerContainerFactory")
*/
public
void
ucEventConsumer
(
String
content
)
{
log
.
info
(
" receive dataSync msg , content is : {} "
,
content
);
TableOperationMsg
tableOperationMsg
=
this
.
parseContent
(
content
);
...
...
src/main/java/com/topdraw/mq/consumer/UcEventBusConsumer.java
View file @
7bee748
...
...
@@ -38,9 +38,9 @@ public class UcEventBusConsumer {
* @author Hongyan Wang
* @date 2021/9/7 11:26 上午
*/
@RabbitHandler
/*
@RabbitHandler
@RabbitListener(queues = RabbitMqConfig.UC_EVENTBUS_TOPIC,
containerFactory
=
"managementRabbitListenerContainerFactory"
)
containerFactory = "managementRabbitListenerContainerFactory")
*/
public
void
ucEventConsumer
(
String
content
)
{
log
.
info
(
" receive dataSync msg , content is : {} "
,
content
);
DataSyncMsg
dataSyncMsg
=
this
.
parseContent
(
content
);
...
...
src/main/java/com/topdraw/mq/consumer/UcGatewayEventConsumer.java
View file @
7bee748
...
...
@@ -28,11 +28,11 @@ public class UcGatewayEventConsumer {
* @author Hongyan Wang
* @date 2021/9/7 11:26 上午
*/
@RabbitHandler
/*
@RabbitHandler
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = RabbitMqConfig.GATEWAY_TO_SERVICE_DIRECT),
exchange = @Exchange(value = ExchangeTypes.DIRECT))
},
containerFactory
=
"managementRabbitListenerContainerFactory"
)
}, containerFactory = "managementRabbitListenerContainerFactory")
*/
public
void
ucEventConsumer
(
String
content
)
{
log
.
info
(
" receive dataSync msg , content is : {} "
,
content
);
DataSyncMsg
dataSyncMsg
=
this
.
parseContent
(
content
);
...
...
src/main/java/com/topdraw/mq/consumer/WeiXinEventConsumer.java
View file @
7bee748
...
...
@@ -32,11 +32,11 @@ public class WeiXinEventConsumer {
* @description 删除用户收藏记录
* @param content 消息内容
*/
/*
@RabbitHandler
@RabbitHandler
@RabbitListener
(
bindings
=
{
@QueueBinding
(
value
=
@Queue
(
value
=
RabbitMqConfig
.
COLLECTION_DELETE_QUEUE
),
exchange
=
@Exchange
(
value
=
ExchangeTypes
.
DIRECT
))},
containerFactory = "managementRabbitListenerContainerFactory")
*/
containerFactory
=
"managementRabbitListenerContainerFactory"
)
public
void
deleteCollection
(
String
content
)
{
try
{
log
.
info
(
"receive UserCollection delete message, content {}"
,
content
);
...
...
@@ -50,11 +50,11 @@ public class WeiXinEventConsumer {
* @description 删除全部收藏记录
* @param content 消息内容
*/
/*
@RabbitHandler
@RabbitHandler
@RabbitListener
(
bindings
=
{
@QueueBinding
(
value
=
@Queue
(
value
=
RabbitMqConfig
.
COLLECTION_DELETE_ALL_QUEUE
),
exchange
=
@Exchange
(
value
=
ExchangeTypes
.
DIRECT
))},
containerFactory = "managementRabbitListenerContainerFactory")
*/
containerFactory
=
"managementRabbitListenerContainerFactory"
)
@Transactional
public
void
deleteAllCollection
(
String
content
)
{
try
{
...
...
@@ -66,62 +66,6 @@ public class WeiXinEventConsumer {
}
/**
* 处理带参的二维码事件
* @param content 消息内容
* @description 获取公众号带参二维码
*/
/*@RabbitHandler
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = RabbitMqConfig.GET_QR_CODE_QUEUE),
exchange = @Exchange(value = ExchangeTypes.DIRECT))},
containerFactory = "managementRabbitListenerContainerFactory")*/
public
void
getQrCode
(
String
content
)
{
try
{
log
.
info
(
"receive get qrCode message, content {}"
,
content
);
JSONObject
jsonObject
=
JSONObject
.
parseObject
(
content
);
/*String appid = jsonObject.getString("appid");
String IPTVappid = jsonObject.getString("IPTVappid");
String platformAccount = jsonObject.getString("platformAccount");
String sessionId = jsonObject.getString("sessionId");
String key = QR_CODE_URL + appid + "_" + platformAccount + "_" + sessionId;
String url = (String) redisUtils.get(key);
if (StringUtils.isBlank(url)) {
Map<String, String> wxInfo = WeixinUtil.getWeixinInfoByAppid(appid);
var appType = wxInfo.get("appType");
// 订阅号不支持带参二维码,直接返回
if (StrUtil.isNotEmpty(appType) && ObjectUtil.equals(appType, WeChatConstants.WX_SUBSCRIPTION)) {
log.error("订阅号不支持带参二维码 || {} || {}", appid, content);
return;
}
QrCode qrCode = new QrCode();
qrCode.setActionName(WeChatConstants.QR_STR_SCENE);
if (StringUtils.isNotBlank(wxInfo.get("qrCodeExpireSeconds"))) {
qrCode.setExpireSeconds(Integer.valueOf(wxInfo.get("qrCodeExpireSeconds")));
}
ActionInfo actionInfo = new ActionInfo();
Scene scene = new Scene();
scene.setSceneStr(content);
actionInfo.setScene(scene);
qrCode.setActionInfo(actionInfo);
JSONObject jsonQrCode = weixinRequestUtil.getQrCode(wxInfo, qrCode);
url = jsonQrCode.getString("url");
Integer expireSeconds = jsonQrCode.getInteger("expire_seconds");
redisUtils.set(key, url, expireSeconds, TimeUnit.SECONDS);
}
HashMap<String, Object> map = new HashMap<>();
map.put("sessionId", sessionId);
map.put("url", url);
map.put("appid", appid);
map.put("IPTVappid", IPTVappid);
map.put("platformAccount", platformAccount);
map.put("extraInfo", content);*/
restTemplateClient
.
sendQrCodeMessage
(
content
);
}
catch
(
Exception
e
)
{
log
.
error
(
"GetQrCodeConsumer || get qrCode error || {}"
,
e
.
toString
(),
e
);
}
}
/**
* 关注和取关事件
* eg:
* {
...
...
@@ -130,11 +74,11 @@ public class WeiXinEventConsumer {
* }
* @param content
*/
/*
@RabbitHandler
@RabbitHandler
@RabbitListener
(
bindings
=
{
@QueueBinding
(
value
=
@Queue
(
value
=
RabbitMqConfig
.
WEIXIN_SUBORUNSUB_QUEUE
),
exchange
=
@Exchange
(
value
=
ExchangeTypes
.
DIRECT
))},
containerFactory = "managementRabbitListenerContainerFactory")
*/
containerFactory
=
"managementRabbitListenerContainerFactory"
)
@Transactional
public
void
subOrUnSubEvent
(
String
content
)
{
try
{
...
...
@@ -145,7 +89,6 @@ public class WeiXinEventConsumer {
JSONObject
wechatMsg
=
jsonObject
.
getJSONObject
(
"allFieldsMap"
);
String
appid
=
map
.
getString
(
"mpId"
);
String
unionid
=
map
.
getString
(
"unionid"
);
// Map<String, String> wxInfoMap = WeixinUtil.getWeixinInfoByAppid(appid);
String
openid
=
wechatMsg
.
getString
(
"FromUserName"
);
String
msgType
=
wechatMsg
.
getString
(
"MsgType"
);
...
...
@@ -186,52 +129,6 @@ public class WeiXinEventConsumer {
public
void
addCollection
(
String
content
)
{
try
{
log
.
info
(
"receive UserCollection add message, content {}"
,
content
);
JSONObject
jsonObject
=
JSONObject
.
parseObject
(
content
);
String
platformAccount
=
jsonObject
.
getString
(
"platformAccount"
);
String
data
=
jsonObject
.
getString
(
"data"
);
if
(
StringUtils
.
isBlank
(
data
)
||
!
data
.
startsWith
(
"["
))
{
// return;
}
/*Optional<TvUser> userOptional = tvUserRepository.findByPlatformAccount(platformAccount);
if (!userOptional.isPresent()) {
return;
}
Long tvUserId = userOptional.get().getId();
List<UserCollectionMq> userCollectionMqList = JSONObject.parseArray(data, UserCollectionMq.class);
if (userCollectionMqList == null || userCollectionMqList.isEmpty()) {
return;
}
Map<Long, List<UserCollectionMq>> collect = userCollectionMqList.stream().collect(Collectors.groupingBy(UserCollectionMq::getUserCollectionId));
for (Map.Entry<Long, List<UserCollectionMq>> entry : collect.entrySet()) {
List<UserCollectionMq> value = entry.getValue();
UserCollectionMq userCollectionMq = value.get(0);
if (StringUtils.isBlank(userCollectionMq.getName())) {
userCollectionMq.setName("DEFAULT");
}
UserCollection userCollection = userCollectionRepository
.findFirstByUserIdAndTypeAndName(tvUserId, userCollectionMq.getType(), userCollectionMq.getName()).orElseGet(UserCollection::new);
userCollection.setAppId(userCollectionMq.getAppId())
.setUserId(tvUserId)
.setName(userCollectionMq.getName())
.setType(userCollectionMq.getType())
.setCount(userCollection.getCount() == null ? value.size() : userCollection.getCount() + value.size());
UserCollection userCollectionSave = userCollectionRepository.save(userCollection);
for (UserCollectionMq collectionMq : value) {
UserCollectionDetail userCollectionDetail = collectionMq2DetailMapper.toEntity(collectionMq);
Optional<UserCollectionDetail> userCollectionDetailOptional = userCollectionDetailRepository
.findByDetailIdAndDetailTypeAndUserCollectionId(userCollectionDetail.getDetailId(), userCollectionDetail.getDetailType(), userCollectionSave.getId());
//观影记录同一天只存一条记录
if (userCollectionDetailOptional.isPresent() &&
DateUtil.isSameDay(new Date(userCollectionDetailOptional.get().getCreateTime().getTime()), new Date())) {
userCollectionDetail.setId(userCollectionDetailOptional.get().getId());
} else {
userCollectionDetail.setId(null)
.setUserCollectionId(userCollectionSave.getId());
}
userCollectionDetailRepository.save(userCollectionDetail);
}
}*/
this
.
restTemplateClient
.
addCollection
(
content
);
}
catch
(
Exception
e
)
{
log
.
error
(
"CollectionAddConsumer || UserCollection add error || {}"
,
e
.
toString
(),
e
);
...
...
src/main/resources/config/application-dev.yml
View file @
7bee748
...
...
@@ -119,9 +119,9 @@ mutil-mq:
# 服务属性
service
:
#平台类型 service: 服务侧 management: 管理侧
platform
:
service
platform
:
management
# 服务域 mobile:小屏侧 vis:大屏侧
type
:
vis
type
:
# uc-engine服务地址
api
:
...
...
Please
register
or
sign in
to post a comment