Skip to content
Toggle navigation
Toggle navigation
This project
Loading...
Sign in
张云鹏
/
uc-consumer
Go to a project
Toggle navigation
Toggle navigation pinning
Projects
Groups
Snippets
Help
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Issue Boards
Files
Commits
Network
Compare
Branches
Tags
Commit
b1f50657
...
b1f50657c931922e5f4c86a9df8f95acf592ef3c
authored
2022-06-10 01:04:42 +0800
by
xianghan
Browse Files
Options
Browse Files
Tag
Download
Email Patches
Plain Diff
1.兼容1.0.0版本
1 parent
1a39d208
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
280 additions
and
4 deletions
src/main/java/com/topdraw/business/module/member/service/MemberService.java
src/main/java/com/topdraw/business/module/member/service/impl/MemberServiceImpl.java
src/main/java/com/topdraw/business/module/user/iptv/service/impl/UserTvServiceImpl.java
src/main/java/com/topdraw/config/RabbitMqSourceConfig.java
src/main/java/com/topdraw/mq/consumer/UcEngineManagement2IptvConsumer.java
src/main/java/com/topdraw/mq/consumer/UcGatewayIptv2IptvConsumer.java
src/main/resources/config/application-dev.yml
src/main/java/com/topdraw/business/module/member/service/MemberService.java
View file @
b1f5065
...
...
@@ -80,4 +80,7 @@ public interface MemberService {
MemberDTO
doUpdateMemberExpAndLevel
(
Member
resources
);
MemberDTO
unbindUserIpTv
(
Member
member
);
void
unbind
(
Member
resources
);
}
...
...
src/main/java/com/topdraw/business/module/member/service/impl/MemberServiceImpl.java
View file @
b1f5065
...
...
@@ -147,6 +147,27 @@ public class MemberServiceImpl implements MemberService {
@Override
@Transactional
(
rollbackFor
=
Exception
.
class
)
public
void
unbind
(
Member
resources
)
{
try
{
String
code
=
resources
.
getCode
();
MemberDTO
memberDTO
=
this
.
findByCode
(
code
);
// Member member = memberRepository.findById(resources.getId()).orElseGet(Member::new);
ValidationUtil
.
isNull
(
memberDTO
.
getId
(),
"Member"
,
"id"
,
resources
.
getId
());
Member
member
=
new
Member
();
BeanUtils
.
copyProperties
(
memberDTO
,
member
);
member
.
setUserIptvId
(
null
);
member
.
setBindIptvTime
(
null
);
member
.
setBindIptvPlatformType
(
null
);
this
.
save
(
member
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
throw
e
;
}
}
@Override
@Transactional
(
rollbackFor
=
Exception
.
class
)
public
MemberDTO
update
(
Member
resources
)
{
log
.
info
(
"MemberServiceImpl ==>> update ==>> resources ==>> [{}]"
,
resources
);
...
...
@@ -201,4 +222,6 @@ public class MemberServiceImpl implements MemberService {
}
}
...
...
src/main/java/com/topdraw/business/module/user/iptv/service/impl/UserTvServiceImpl.java
View file @
b1f5065
...
...
@@ -19,6 +19,7 @@ import org.springframework.transaction.annotation.Propagation;
import
org.springframework.transaction.annotation.Transactional
;
import
org.springframework.util.Assert
;
import
javax.validation.constraints.NotNull
;
import
java.util.Objects
;
import
java.util.Optional
;
...
...
@@ -120,6 +121,13 @@ public class UserTvServiceImpl implements UserTvService {
@Override
@Transactional
(
rollbackFor
=
Exception
.
class
)
public
void
unbindPriorityMemberCode
(
UserTv
resources
)
{
String
platformAccount
=
resources
.
getPlatformAccount
();
if
(
StringUtils
.
isNotBlank
(
platformAccount
))
{
UserTvDTO
userTvDTO
=
this
.
findByPlatformAccount
(
platformAccount
);
Long
id
=
userTvDTO
.
getId
();
resources
.
setId
(
id
);
resources
.
setMemberId
(
userTvDTO
.
getMemberId
());
}
this
.
userTvRepository
.
save
(
resources
);
}
...
...
src/main/java/com/topdraw/config/RabbitMqSourceConfig.java
View file @
b1f5065
...
...
@@ -226,6 +226,18 @@ public class RabbitMqSourceConfig {
return
COLLECTION_QUEUE
;
}
public
String
getUcgCollectionQueueAdd
(){
return
"queue.collection.add"
;
}
public
String
getUcgCollectionQueueDelete
(){
return
"queue.collection.delete"
;
}
public
String
getUcgCollectionQueueDeleteAll
(){
return
"queue.collection.deleteall"
;
}
public
String
getUcgCollectionSource
(){
if
(
Objects
.
nonNull
(
ucgIptvCollectionInfo
))
{
if
(
MapUtils
.
isNotEmpty
(
ucgIptvCollectionInfo
))
{
...
...
@@ -346,6 +358,12 @@ public class RabbitMqSourceConfig {
return
"false"
;
}
public
String
getMemberInfoAsyncQueue
(){
return
"queue.MemberInfoSync"
;
}
/**************************************************eventBus*************************************************************/
public
static
final
String
UC_EVENTBUS_EXCHANGE
=
"uc.eventbus"
;
public
static
final
String
UC_EVENTBUS_KEY
=
"uc.eventbus.*.topic"
;
...
...
src/main/java/com/topdraw/mq/consumer/UcEngineManagement2IptvConsumer.java
View file @
b1f5065
...
...
@@ -31,6 +31,43 @@ public class UcEngineManagement2IptvConsumer {
@Value
(
"#{rabbitMqErrorLogConfig.getUceError()}"
)
private
Map
<
String
,
String
>
error
;
@RabbitHandler
@RabbitListener
(
queues
=
"#{rabbitMqSourceConfig.getMemberInfoAsyncQueue()}"
,
containerFactory
=
"#{rabbitMqSourceConfig.getUceSource()}"
,
autoStartup
=
"#{rabbitMqSourceConfig.getUceStartUp()}"
,
ackMode
=
"AUTO"
)
public
void
ucEventConsumer2
(
Channel
channel
,
Message
message
,
String
content
)
throws
IOException
{
log
.
info
(
" receive MemberInfoAsync msg , content is : {} "
,
content
);
try
{
TableOperationMsg
tableOperationMsg
=
this
.
parseContent
(
content
);
autoUser
.
route
(
tableOperationMsg
);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
catch
(
Exception
e
)
{
channel
.
basicReject
(
message
.
getMessageProperties
().
getDeliveryTag
(),
false
);
if
(
MapUtils
.
isNotEmpty
(
error
))
{
String
errorStart
=
this
.
error
.
get
(
"start"
);
if
(
errorStart
.
equalsIgnoreCase
(
"true"
))
{
String
fileName
=
this
.
error
.
get
(
"fileName"
)+
"_"
+
LocalDate
.
now
()
+
".log"
;
String
filePath
=
this
.
error
.
get
(
"filePath"
);
String
filePath1
=
filePath
+
fileName
;
FileUtil
.
writeStringToFile2
(
filePath1
,
content
,
e
.
getMessage
());
}
}
e
.
printStackTrace
();
}
log
.
info
(
"ucEventConsumer ====>>>> end"
);
}
/**
* 事件
* @param content
...
...
@@ -44,7 +81,7 @@ public class UcEngineManagement2IptvConsumer {
autoStartup
=
"#{rabbitMqSourceConfig.getUceStartUp()}"
,
ackMode
=
"MANUAL"
)
public
void
ucEventConsumer
(
Channel
channel
,
Message
message
,
String
content
)
throws
IOException
{
log
.
info
(
" receive
dataSync
msg , content is : {} "
,
content
);
log
.
info
(
" receive
ucEventConsumer
msg , content is : {} "
,
content
);
try
{
TableOperationMsg
tableOperationMsg
=
this
.
parseContent
(
content
);
...
...
src/main/java/com/topdraw/mq/consumer/UcGatewayIptv2IptvConsumer.java
View file @
b1f5065
...
...
@@ -239,4 +239,185 @@ public class UcGatewayIptv2IptvConsumer {
e
.
printStackTrace
();
}
}
/**
* @description 添加收藏记录
* @param content 消息内容
*/
@RabbitHandler
@RabbitListener
(
queues
=
"#{rabbitMqSourceConfig.getUcgCollectionQueueAdd()}"
,
containerFactory
=
"#{rabbitMqSourceConfig.getUcgCollectionSource()}"
,
autoStartup
=
"#{rabbitMqSourceConfig.getUcgCollectionStartUp()}"
,
ackMode
=
"MANUAL"
)
public
void
collectionConsumerAdd
(
Channel
channel
,
Message
message
,
String
content
)
throws
IOException
{
log
.
info
(
"receive collectionConsumerAdd add message, content {}"
,
content
);
try
{
JSONObject
jsonObject
=
JSON
.
parseObject
(
content
,
JSONObject
.
class
);
if
(
Objects
.
nonNull
(
content
))
{
String
evt
=
jsonObject
.
get
(
"evt"
).
toString
();
String
msgData
=
jsonObject
.
get
(
"msgData"
).
toString
();
switch
(
evt
.
toUpperCase
())
{
// 添加收藏
case
"ADDCOLLECTION"
:
this
.
restTemplateClient
.
addCollection
(
msgData
);
break
;
// 删除收藏
case
"DELETECOLLECTION"
:
this
.
restTemplateClient
.
deleteCollection
(
msgData
);
break
;
// 删除全部收藏
case
"DELETEALLCOLLECTION"
:
this
.
restTemplateClient
.
deleteAllCollection
(
msgData
);
break
;
default
:
break
;
}
}
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
catch
(
Exception
e
)
{
channel
.
basicReject
(
message
.
getMessageProperties
().
getDeliveryTag
(),
false
);
if
(
MapUtils
.
isNotEmpty
(
error
))
{
String
errorStart
=
this
.
error
.
get
(
"start"
);
if
(
errorStart
.
equalsIgnoreCase
(
"true"
))
{
String
fileName
=
this
.
error
.
get
(
"fileName"
)+
"_"
+
LocalDate
.
now
()
+
".log"
;
String
filePath
=
this
.
error
.
get
(
"filePath"
);
String
filePath1
=
filePath
+
fileName
;
FileUtil
.
writeStringToFile2
(
filePath1
,
content
,
e
.
getMessage
());
}
}
e
.
printStackTrace
();
}
}
/**
* @description 添加收藏记录
* @param content 消息内容
*/
@RabbitHandler
@RabbitListener
(
queues
=
"#{rabbitMqSourceConfig.getUcgCollectionQueueDelete()}"
,
containerFactory
=
"#{rabbitMqSourceConfig.getUcgCollectionSource()}"
,
autoStartup
=
"#{rabbitMqSourceConfig.getUcgCollectionStartUp()}"
,
ackMode
=
"MANUAL"
)
public
void
collectionConsumerDelete
(
Channel
channel
,
Message
message
,
String
content
)
throws
IOException
{
log
.
info
(
"receive collectionConsumerDelete add message, content {}"
,
content
);
try
{
JSONObject
jsonObject
=
JSON
.
parseObject
(
content
,
JSONObject
.
class
);
if
(
Objects
.
nonNull
(
content
))
{
String
evt
=
jsonObject
.
get
(
"evt"
).
toString
();
String
msgData
=
jsonObject
.
get
(
"msgData"
).
toString
();
switch
(
evt
.
toUpperCase
())
{
// 添加收藏
case
"ADDCOLLECTION"
:
this
.
restTemplateClient
.
addCollection
(
msgData
);
break
;
// 删除收藏
case
"DELETECOLLECTION"
:
this
.
restTemplateClient
.
deleteCollection
(
msgData
);
break
;
// 删除全部收藏
case
"DELETEALLCOLLECTION"
:
this
.
restTemplateClient
.
deleteAllCollection
(
msgData
);
break
;
default
:
break
;
}
}
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
catch
(
Exception
e
)
{
channel
.
basicReject
(
message
.
getMessageProperties
().
getDeliveryTag
(),
false
);
if
(
MapUtils
.
isNotEmpty
(
error
))
{
String
errorStart
=
this
.
error
.
get
(
"start"
);
if
(
errorStart
.
equalsIgnoreCase
(
"true"
))
{
String
fileName
=
this
.
error
.
get
(
"fileName"
)+
"_"
+
LocalDate
.
now
()
+
".log"
;
String
filePath
=
this
.
error
.
get
(
"filePath"
);
String
filePath1
=
filePath
+
fileName
;
FileUtil
.
writeStringToFile2
(
filePath1
,
content
,
e
.
getMessage
());
}
}
e
.
printStackTrace
();
}
}
/**
* @description 添加收藏记录
* @param content 消息内容
*/
@RabbitHandler
@RabbitListener
(
queues
=
"#{rabbitMqSourceConfig.getUcgCollectionQueueDeleteAll()}"
,
containerFactory
=
"#{rabbitMqSourceConfig.getUcgCollectionSource()}"
,
autoStartup
=
"#{rabbitMqSourceConfig.getUcgCollectionStartUp()}"
,
ackMode
=
"MANUAL"
)
public
void
collectionConsumerDeleteAll
(
Channel
channel
,
Message
message
,
String
content
)
throws
IOException
{
log
.
info
(
"receive collectionConsumerDeleteAll add message, content {}"
,
content
);
try
{
JSONObject
jsonObject
=
JSON
.
parseObject
(
content
,
JSONObject
.
class
);
if
(
Objects
.
nonNull
(
content
))
{
String
evt
=
jsonObject
.
get
(
"evt"
).
toString
();
String
msgData
=
jsonObject
.
get
(
"msgData"
).
toString
();
switch
(
evt
.
toUpperCase
())
{
// 添加收藏
case
"ADDCOLLECTION"
:
this
.
restTemplateClient
.
addCollection
(
msgData
);
break
;
// 删除收藏
case
"DELETECOLLECTION"
:
this
.
restTemplateClient
.
deleteCollection
(
msgData
);
break
;
// 删除全部收藏
case
"DELETEALLCOLLECTION"
:
this
.
restTemplateClient
.
deleteAllCollection
(
msgData
);
break
;
default
:
break
;
}
}
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
catch
(
Exception
e
)
{
channel
.
basicReject
(
message
.
getMessageProperties
().
getDeliveryTag
(),
false
);
if
(
MapUtils
.
isNotEmpty
(
error
))
{
String
errorStart
=
this
.
error
.
get
(
"start"
);
if
(
errorStart
.
equalsIgnoreCase
(
"true"
))
{
String
fileName
=
this
.
error
.
get
(
"fileName"
)+
"_"
+
LocalDate
.
now
()
+
".log"
;
String
filePath
=
this
.
error
.
get
(
"filePath"
);
String
filePath1
=
filePath
+
fileName
;
FileUtil
.
writeStringToFile2
(
filePath1
,
content
,
e
.
getMessage
());
}
}
e
.
printStackTrace
();
}
}
}
...
...
src/main/resources/config/application-dev.yml
View file @
b1f5065
...
...
@@ -142,11 +142,17 @@ service:
# routing-key: uc.eventbus.*.topic
# active: service
-
source
:
uce
exchange
:
uc
e.exchange
queue
:
uc
e.queue
exchange
:
uc
.direct
queue
:
uc
.route.key.direct.event.bbb
exchange-type
:
direct
routing-key
:
active
:
management
active
:
service
-
source
:
uce
exchange
:
exchange.MemberInfoSync
queue
:
queue.MemberInfoSync
exchange-type
:
direct
routing-key
:
active
:
service
# - source: wechat
# exchange: weixin.subOrUnSub.direct
# queue: weixin.subOrUnSub.queue
...
...
Please
register
or
sign in
to post a comment