Skip to content
Toggle navigation
Toggle navigation
This project
Loading...
Sign in
向汉
/
uc-consumer
Go to a project
Toggle navigation
Toggle navigation pinning
Projects
Groups
Snippets
Help
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Issue Boards
Files
Commits
Network
Compare
Branches
Tags
Commit
b5fbd670
...
b5fbd6701730057cd8401280e7927030aaa63f3b
authored
2022-04-13 20:27:29 +0800
by
xianghan
Browse Files
Options
Browse Files
Tag
Download
Email Patches
Plain Diff
1.update
1 parent
64fe16cc
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
148 additions
and
40 deletions
src/main/java/com/topdraw/mq/consumer/UcEngineIptv2ManagementConsumer.java
src/main/java/com/topdraw/mq/consumer/UcEngineManagement2IptvConsumer.java
src/main/java/com/topdraw/mq/consumer/UcEventBusIptv2ManagementUcEngine.java
src/main/java/com/topdraw/mq/consumer/UcGatewayIptv2IptvConsumer.java
src/main/java/com/topdraw/mq/consumer/WeiXinEventConsumer.java
src/main/java/com/topdraw/util/DateUtil.java
src/main/java/com/topdraw/mq/consumer/UcEngineIptv2ManagementConsumer.java
View file @
b5fbd67
...
...
@@ -35,7 +35,7 @@ public class UcEngineIptv2ManagementConsumer {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = RabbitMqConfig.ENGINE_TO_MANAGEMENT_DIRECT),
exchange = @Exchange(value = ExchangeTypes.DIRECT))
}, containerFactory = "
service
RabbitListenerContainerFactory")*/
}, containerFactory = "
management
RabbitListenerContainerFactory")*/
public
void
ucEventConsumer
(
String
content
)
{
log
.
info
(
" receive dataSync msg , content is : {} "
,
content
);
TableOperationMsg
tableOperationMsg
=
this
.
parseContent
(
content
);
...
...
src/main/java/com/topdraw/mq/consumer/UcEngineManagement2IptvConsumer.java
View file @
b5fbd67
...
...
@@ -28,11 +28,11 @@ public class UcEngineManagement2IptvConsumer {
* @author Hongyan Wang
* @date 2021/9/7 11:26 上午
*/
/*
@RabbitHandler
@RabbitHandler
@RabbitListener
(
bindings
=
{
@QueueBinding
(
value
=
@Queue
(
value
=
RabbitMqConfig
.
ENGINE_TO_SERVICE_DIRECT
),
exchange
=
@Exchange
(
value
=
ExchangeTypes
.
DIRECT
))
}, containerFactory = "
serviceRabbitListenerContainerFactory")*/
},
containerFactory
=
"
managementRabbitListenerContainerFactory"
)
public
void
ucEventConsumer
(
String
content
)
{
log
.
info
(
" receive dataSync msg , content is : {} "
,
content
);
TableOperationMsg
tableOperationMsg
=
this
.
parseContent
(
content
);
...
...
src/main/java/com/topdraw/mq/consumer/UcEventBusIptv2ManagementUcEngine.java
View file @
b5fbd67
...
...
@@ -11,15 +11,25 @@ import com.topdraw.exception.BadRequestException;
import
com.topdraw.exception.EntityNotFoundException
;
import
com.topdraw.mq.domain.DataSyncMsg
;
import
com.topdraw.resttemplate.RestTemplateClient
;
import
com.topdraw.util.DateUtil
;
import
com.topdraw.util.JSONUtil
;
import
com.topdraw.util.TimestampUtil
;
import
com.topdraw.utils.RedisUtils
;
import
lombok.Data
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.commons.collections4.MapUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.time.DateFormatUtils
;
import
org.apache.commons.lang3.time.DateUtils
;
import
org.springframework.amqp.rabbit.annotation.*
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
java.util.Objects
;
import
java.text.DateFormat
;
import
java.text.ParseException
;
import
java.text.SimpleDateFormat
;
import
java.time.LocalDateTime
;
import
java.util.*
;
@Component
@Slf4j
...
...
@@ -32,6 +42,12 @@ public class UcEventBusIptv2ManagementUcEngine {
@Autowired
private
MemberService
memberService
;
@Autowired
private
RedisUtils
redisUtils
;
private
static
final
Integer
PLAY_30
=
30
;
/**
* 事件
* @param content
...
...
@@ -39,22 +55,25 @@ public class UcEventBusIptv2ManagementUcEngine {
* @author Hongyan Wang
* @date 2021/9/7 11:26 上午
*/
/*
@RabbitHandler
@RabbitHandler
@RabbitListener
(
queues
=
RabbitMqConfig
.
UC_EVENTBUS_QUEUE
,
containerFactory = "
managementRabbitListenerContainerFactory")*/
public
void
ucEventConsumer
(
String
content
)
{
containerFactory
=
"
serviceRabbitListenerContainerFactory"
)
public
void
ucEventConsumer
(
String
content
)
throws
ParseException
{
log
.
info
(
" receive dataSync msg , content is : {} "
,
content
);
DataSyncMsg
dataSyncMsg
=
this
.
parseContent
(
content
);
this
.
taskDeal
(
dataSyncMsg
);
if
(
Objects
.
nonNull
(
dataSyncMsg
))
{
this
.
taskDeal
(
dataSyncMsg
);
}
log
.
info
(
"ucEventConsumer ====>>>> end"
);
}
/**
* 数据解析
* @param content
* @return
*/
private
DataSyncMsg
parseContent
(
String
content
)
{
private
DataSyncMsg
parseContent
(
String
content
)
throws
ParseException
{
CommonMsg
commonMsg
=
JSONUtil
.
parseMsg2Object
(
content
,
CommonMsg
.
class
);
...
...
@@ -67,6 +86,7 @@ public class UcEventBusIptv2ManagementUcEngine {
PlayContent
.
MsgData
msgData
=
playContent
.
getMsgData
();
if
(
Objects
.
nonNull
(
msgData
))
{
String
time
=
playContent
.
getTime
();
String
formatDate
=
DateUtil
.
formatDate
(
time
);
Integer
deviceType
=
playContent
.
getDeviceType
();
String
platformAccount
=
msgData
.
getPlatformAccount
();
...
...
@@ -74,44 +94,82 @@ public class UcEventBusIptv2ManagementUcEngine {
Long
mediaId
=
msgData
.
getMediaId
();
String
mediaName
=
msgData
.
getMediaName
();
Integer
playDuration
=
msgData
.
getPlayDuration
();
log
.
info
(
"playDuration ==>> {}"
,
playDuration
);
DataSyncMsg
dataSyncMsg
=
new
DataSyncMsg
();
dataSyncMsg
.
setEventType
(
evt
);
DataSyncMsg
.
MsgData
msgData1
=
new
DataSyncMsg
.
MsgData
();
if
(
StringUtils
.
isNotBlank
(
platformAccount
))
{
UserTvDTO
userTvDTO
=
this
.
userTvService
.
findByPlatformAccount
(
platformAccount
);
String
priorityMemberCode
=
userTvDTO
.
getPriorityMemberCode
();
String
memberCode
=
""
;
if
(
StringUtils
.
isNotBlank
(
priorityMemberCode
))
{
memberCode
=
priorityMemberCode
;
}
else
{
memberCode
=
this
.
memberService
.
findById
(
userTvDTO
.
getMemberId
()).
getCode
();
}
if
(
StringUtils
.
isBlank
(
memberCode
))
throw
new
EntityNotFoundException
(
MemberDTO
.
class
,
"memberCode"
,
"memberCode is null"
);
if
(
Objects
.
nonNull
(
userTvDTO
))
{
String
key
=
platformAccount
+
"|"
+
formatDate
;
Map
<
Object
,
Object
>
hmget
=
this
.
redisUtils
.
hmget
(
key
);
Integer
playDurationValueTotal
=
0
;
int
maxSize
=
1
;
if
(
MapUtils
.
isNotEmpty
(
hmget
))
{
Set
<
Object
>
objects
=
hmget
.
keySet
();
for
(
Object
key_
:
objects
)
{
if
(
Objects
.
nonNull
(
key_
))
{
if
(
key_
.
toString
().
equalsIgnoreCase
(
"total"
))
{
Integer
playDurationValueTotal_
=
Integer
.
valueOf
(
hmget
.
get
(
key_
).
toString
());
if
(
playDurationValueTotal_
>=
30
)
{
maxSize
=
objects
.
size
();
Integer
maxTotal
=
maxSize
+
1
;
Integer
playDurationValue
=
Integer
.
valueOf
(
hmget
.
get
(
key_
).
toString
());
playDurationValueTotal
=
playDurationValue
+
playDuration
;
Map
<
String
,
Object
>
map
=
new
HashMap
<>();
map
.
put
(
String
.
valueOf
(
maxTotal
),
playDuration
);
map
.
put
(
"total"
,
playDurationValueTotal
);
this
.
redisUtils
.
hmset
(
key
,
map
,
172800
);
return
null
;
}
}
maxSize
=
objects
.
size
();
Integer
playDurationValue
=
Integer
.
valueOf
(
hmget
.
get
(
"total"
).
toString
());
playDurationValueTotal
=
playDurationValue
+
playDuration
;
Map
<
String
,
Object
>
map
=
new
HashMap
<>();
map
.
put
(
String
.
valueOf
(
maxSize
+
1
),
playDuration
);
map
.
put
(
"total"
,
playDurationValueTotal
);
this
.
redisUtils
.
hmset
(
key
,
map
,
172800
);
}
}
}
else
{
playDurationValueTotal
=
playDuration
;
Map
<
String
,
Object
>
map
=
new
HashMap
<>();
map
.
put
(
"total"
,
playDurationValueTotal
);
map
.
put
(
"1"
,
playDuration
);
this
.
redisUtils
.
hmset
(
key
,
map
,
172800
);
}
DataSyncMsg
dataSyncMsg1
=
null
;
if
(
playDurationValueTotal
>=
PLAY_30
)
{
log
.
info
(
"playDurationValueTotal ===>>> {}"
,
playDurationValueTotal
);
log
.
info
(
"===>> start dealTask"
);
dataSyncMsg1
=
getDataSyncMsg
(
time
,
deviceType
,
mediaCode
,
mediaId
,
mediaName
,
playDuration
,
dataSyncMsg
,
msgData1
,
userTvDTO
);
}
return
dataSyncMsg1
;
}
msgData1
.
setMemberCode
(
memberCode
);
}
msgData1
.
setEvent
(
8
);
msgData1
.
setDeviceType
(
deviceType
);
msgData1
.
setMediaId
(
mediaId
);
JSONObject
param
=
new
JSONObject
();
param
.
put
(
"playDuration"
,
playDuration
);
msgData1
.
setParam
(
JSON
.
toJSONString
(
param
));
JSONObject
description
=
new
JSONObject
();
description
.
put
(
"mediaId"
,
mediaId
);
description
.
put
(
"mediaName"
,
mediaName
);
description
.
put
(
"playDuration"
,
playDuration
);
description
.
put
(
"mediaCode"
,
mediaCode
);
description
.
put
(
"time"
,
time
);
msgData1
.
setDescription
(
JSON
.
toJSONString
(
description
));
dataSyncMsg
.
setMsg
(
msgData1
);
return
dataSyncMsg
;
}
System
.
out
.
println
(
playContent
);
...
...
@@ -122,6 +180,42 @@ public class UcEventBusIptv2ManagementUcEngine {
return
null
;
}
private
DataSyncMsg
getDataSyncMsg
(
String
time
,
Integer
deviceType
,
String
mediaCode
,
Long
mediaId
,
String
mediaName
,
Integer
playDuration
,
DataSyncMsg
dataSyncMsg
,
DataSyncMsg
.
MsgData
msgData1
,
UserTvDTO
userTvDTO
)
{
String
priorityMemberCode
=
userTvDTO
.
getPriorityMemberCode
();
log
.
info
(
"priorityMemberCode ==>> {}"
,
priorityMemberCode
);
String
memberCode
=
""
;
if
(
StringUtils
.
isNotBlank
(
priorityMemberCode
))
{
memberCode
=
priorityMemberCode
;
}
else
{
memberCode
=
this
.
memberService
.
findById
(
userTvDTO
.
getMemberId
()).
getCode
();
}
log
.
info
(
"memberCode ==>> {}"
,
priorityMemberCode
);
if
(
StringUtils
.
isBlank
(
memberCode
))
throw
new
EntityNotFoundException
(
MemberDTO
.
class
,
"memberCode"
,
"memberCode is null"
);
msgData1
.
setMemberCode
(
memberCode
);
msgData1
.
setEvent
(
8
);
msgData1
.
setDeviceType
(
deviceType
);
msgData1
.
setMediaId
(
mediaId
);
JSONObject
param
=
new
JSONObject
();
// 增量
param
.
put
(
"playDuration"
,
playDuration
);
msgData1
.
setParam
(
JSON
.
toJSONString
(
param
));
JSONObject
description
=
new
JSONObject
();
description
.
put
(
"mediaId"
,
mediaId
);
description
.
put
(
"mediaName"
,
mediaName
);
description
.
put
(
"playDuration"
,
playDuration
);
description
.
put
(
"mediaCode"
,
mediaCode
);
description
.
put
(
"time"
,
time
);
msgData1
.
setDescription
(
JSON
.
toJSONString
(
description
));
dataSyncMsg
.
setMsg
(
msgData1
);
log
.
info
(
"dataSyncMsg ==>> {}"
,
dataSyncMsg
);
return
dataSyncMsg
;
}
/**
* 任务处理
* @param dataSyncMsg
...
...
src/main/java/com/topdraw/mq/consumer/UcGatewayIptv2IptvConsumer.java
View file @
b5fbd67
...
...
@@ -28,11 +28,11 @@ public class UcGatewayIptv2IptvConsumer {
* @author Hongyan Wang
* @date 2021/9/7 11:26 上午
*/
/*
@RabbitHandler
@RabbitHandler
@RabbitListener
(
bindings
=
{
@QueueBinding
(
value
=
@Queue
(
value
=
RabbitMqConfig
.
GATEWAY_TO_SERVICE_DIRECT
),
exchange
=
@Exchange
(
value
=
ExchangeTypes
.
DIRECT
))
}, containerFactory = "
managementRabbitListenerContainerFactory")*/
},
containerFactory
=
"
serviceRabbitListenerContainerFactory"
)
public
void
ucEventConsumer
(
String
content
)
{
log
.
info
(
" receive dataSync msg , content is : {} "
,
content
);
DataSyncMsg
dataSyncMsg
=
this
.
parseContent
(
content
);
...
...
src/main/java/com/topdraw/mq/consumer/WeiXinEventConsumer.java
View file @
b5fbd67
...
...
@@ -74,11 +74,11 @@ public class WeiXinEventConsumer {
* }
* @param content
*/
@RabbitHandler
/*
@RabbitHandler
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = RabbitMqConfig.WEIXIN_SUBORUNSUB_QUEUE),
exchange = @Exchange(value = ExchangeTypes.DIRECT))},
containerFactory
=
"managementRabbitListenerContainerFactory"
)
containerFactory = "managementRabbitListenerContainerFactory")
*/
@Transactional
public
void
subOrUnSubEvent
(
String
content
)
{
try
{
...
...
src/main/java/com/topdraw/util/DateUtil.java
View file @
b5fbd67
package
com
.
topdraw
.
util
;
import
java.text.ParseException
;
import
java.text.SimpleDateFormat
;
import
java.util.Date
;
public
class
DateUtil
{
...
...
@@ -12,5 +14,17 @@ public class DateUtil {
return
System
.
currentTimeMillis
();
}
public
static
String
formatDate
(
String
time
){
SimpleDateFormat
sf
=
new
SimpleDateFormat
(
"yyyy-MM-dd"
);
try
{
//使用SimpleDateFormat的parse()方法生成Date
Date
date
=
sf
.
parse
(
time
);
String
format
=
sf
.
format
(
date
);
return
format
;
}
catch
(
ParseException
e
)
{
e
.
printStackTrace
();
}
return
null
;
}
}
...
...
Please
register
or
sign in
to post a comment