Commit 77995e43 77995e434c362a47088990f177bb211e3cea2eca by xianghan

1.优化任务处理过程

1 parent 977b98cf
...@@ -5,6 +5,7 @@ import org.springframework.data.jpa.repository.JpaRepository; ...@@ -5,6 +5,7 @@ import org.springframework.data.jpa.repository.JpaRepository;
5 import org.springframework.data.jpa.repository.JpaSpecificationExecutor; 5 import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
6 import org.springframework.data.jpa.repository.Modifying; 6 import org.springframework.data.jpa.repository.Modifying;
7 import org.springframework.data.jpa.repository.Query; 7 import org.springframework.data.jpa.repository.Query;
8 import org.springframework.data.repository.query.Param;
8 9
9 import java.time.LocalDateTime; 10 import java.time.LocalDateTime;
10 import java.util.List; 11 import java.util.List;
...@@ -26,4 +27,20 @@ public interface MemberRepository extends JpaRepository<Member, Long>, JpaSpecif ...@@ -26,4 +27,20 @@ public interface MemberRepository extends JpaRepository<Member, Long>, JpaSpecif
26 @Query(value = "UPDATE `uc_member` SET `user_iptv_id` = ?2, `update_time` = ?3 , `bind_iptv_platform_type`= 0, " + 27 @Query(value = "UPDATE `uc_member` SET `user_iptv_id` = ?2, `update_time` = ?3 , `bind_iptv_platform_type`= 0, " +
27 "`bind_iptv_time`=?3 WHERE `id` = ?1", nativeQuery = true) 28 "`bind_iptv_time`=?3 WHERE `id` = ?1", nativeQuery = true)
28 void updateUserIptvIdById(Long id, Long userIptvId, LocalDateTime now); 29 void updateUserIptvIdById(Long id, Long userIptvId, LocalDateTime now);
30
31 @Modifying
32 @Query(value = "UPDATE `uc_member` SET `exp` = :#{#resources.exp}, `level` = :#{#resources.level} , `update_time`= now() " +
33 " WHERE `id` = :#{#resources.id}", nativeQuery = true)
34 void updateExpAndLevel(@Param("resources") Member member);
35
36 @Modifying
37 @Query(value = "UPDATE `uc_member` SET `points` = :#{#resources.points}, `due_points` = :#{#resources.duePoints} , `update_time`= now() " +
38 " WHERE `id` = :#{#resources.id}", nativeQuery = true)
39 void updatePointAndDuePoint(@Param("resources") Member resources);
40
41 @Modifying
42 @Query(value = "UPDATE `uc_member` SET `coupon_amount` = :#{#resources.couponAmount}, `due_coupon_amount` = :#{#resources.dueCouponAmount} , `update_time`= now() " +
43 " WHERE `id` = :#{#resources.id}", nativeQuery = true)
44 void doUpdateMemberCoupon(@Param("resources") Member member);
45
29 } 46 }
......
...@@ -86,4 +86,6 @@ public interface MemberService { ...@@ -86,4 +86,6 @@ public interface MemberService {
86 void unbind(Member resources); 86 void unbind(Member resources);
87 87
88 void updateUserIptvIdById(Long id, Long userIptvId, LocalDateTime now); 88 void updateUserIptvIdById(Long id, Long userIptvId, LocalDateTime now);
89
90 void doUpdateMemberCoupon(Member member);
89 } 91 }
......
...@@ -111,12 +111,22 @@ public class MemberServiceImpl implements MemberService { ...@@ -111,12 +111,22 @@ public class MemberServiceImpl implements MemberService {
111 } 111 }
112 112
113 @Override 113 @Override
114 @Transactional(rollbackFor = Exception.class)
114 public MemberDTO doUpdateMemberExpAndLevel(Member resources) { 115 public MemberDTO doUpdateMemberExpAndLevel(Member resources) {
115 MemberDTO memberDTO = this.update(resources); 116 MemberDTO memberDTO = this.findByCode(resources.getCode());
117
118 Member member = new Member();
119 member.setId(memberDTO.getId());
120 member.setExp(resources.getExp());
121 member.setLevel(resources.getLevel());
122 member.setUpdateTime(LocalDateTime.now());
123 this.memberRepository.updateExpAndLevel(member);
124 // MemberDTO memberDTO = this.update(resources);
116 return memberDTO; 125 return memberDTO;
117 } 126 }
118 127
119 @Override 128 @Override
129 @Transactional(rollbackFor = Exception.class)
120 public MemberDTO unbindUserIpTv(Member member) { 130 public MemberDTO unbindUserIpTv(Member member) {
121 Member _member = this.save(member); 131 Member _member = this.save(member);
122 return this.memberMapper.toDto(_member); 132 return this.memberMapper.toDto(_member);
...@@ -178,6 +188,12 @@ public class MemberServiceImpl implements MemberService { ...@@ -178,6 +188,12 @@ public class MemberServiceImpl implements MemberService {
178 188
179 @Override 189 @Override
180 @Transactional(rollbackFor = Exception.class) 190 @Transactional(rollbackFor = Exception.class)
191 public void doUpdateMemberCoupon(Member member) {
192 this.memberRepository.doUpdateMemberCoupon(member);
193 }
194
195 @Override
196 @Transactional(rollbackFor = Exception.class)
181 public MemberDTO update(Member resources) { 197 public MemberDTO update(Member resources) {
182 198
183 log.info("MemberServiceImpl ==>> update ==>> resources ==>> [{}]" , resources); 199 log.info("MemberServiceImpl ==>> update ==>> resources ==>> [{}]" , resources);
...@@ -219,11 +235,11 @@ public class MemberServiceImpl implements MemberService { ...@@ -219,11 +235,11 @@ public class MemberServiceImpl implements MemberService {
219 235
220 Member member = this.memberRepository.findById(resources.getId()).orElseGet(Member::new); 236 Member member = this.memberRepository.findById(resources.getId()).orElseGet(Member::new);
221 ValidationUtil.isNull(member.getId(), "Member", "id", resources.getId()); 237 ValidationUtil.isNull(member.getId(), "Member", "id", resources.getId());
222 member.copy(resources); 238 this.memberRepository.updatePointAndDuePoint(resources);
239 // member.copy(resources);
223 240
224 Member _member = this.save(member); 241 // Member _member = this.save(member);
225 242 return this.memberMapper.toDto(member);
226 return this.memberMapper.toDto(_member);
227 243
228 } catch (Exception e) { 244 } catch (Exception e) {
229 e.printStackTrace(); 245 e.printStackTrace();
......
...@@ -25,7 +25,7 @@ public class CouponOperationServiceImpl implements CouponOperationService { ...@@ -25,7 +25,7 @@ public class CouponOperationServiceImpl implements CouponOperationService {
25 String memberCode = member.getCode(); 25 String memberCode = member.getCode();
26 MemberDTO memberDTO = this.memberService.findByCode(memberCode); 26 MemberDTO memberDTO = this.memberService.findByCode(memberCode);
27 member.setId(memberDTO.getId()); 27 member.setId(memberDTO.getId());
28 this.memberOperationService.doUpdateMember(member); 28 this.memberService.doUpdateMemberCoupon(member);
29 } 29 }
30 30
31 public void asyncCouponHistory(CouponHistory couponHistory) { 31 public void asyncCouponHistory(CouponHistory couponHistory) {
......
...@@ -94,7 +94,15 @@ public class MemberOperationServiceImpl implements MemberOperationService { ...@@ -94,7 +94,15 @@ public class MemberOperationServiceImpl implements MemberOperationService {
94 // @CachePut(key = "#resources.id") 94 // @CachePut(key = "#resources.id")
95 @Override 95 @Override
96 public MemberDTO doUpdateMemberPoints(Member resources) { 96 public MemberDTO doUpdateMemberPoints(Member resources) {
97 return this.memberService.doUpdateMemberPoints(resources); 97 // return this.memberService.doUpdateMemberPoints(resources);
98 MemberDTO memberDTO = this.findByCode(resources.getCode());
99 Member member = new Member();
100 member.setId(memberDTO.getId());
101 member.setPoints(resources.getPoints());
102 member.setDuePoints(resources.getDuePoints());
103 member.setUpdateTime(LocalDateTime.now());
104 MemberDTO memberDTO1 = this.memberService.doUpdateMemberPoints(member);
105 return memberDTO1;
98 } 106 }
99 107
100 @Override 108 @Override
......
...@@ -72,14 +72,14 @@ public class UcEventBusIptv2ManagementUcEngine { ...@@ -72,14 +72,14 @@ public class UcEventBusIptv2ManagementUcEngine {
72 @RabbitListener(queues = "#{rabbitMqSourceConfig.getEventBusQueue()}", 72 @RabbitListener(queues = "#{rabbitMqSourceConfig.getEventBusQueue()}",
73 containerFactory = "#{rabbitMqSourceConfig.getEventBusSource()}", 73 containerFactory = "#{rabbitMqSourceConfig.getEventBusSource()}",
74 autoStartup = "#{rabbitMqSourceConfig.getEventBusStartUp()}", 74 autoStartup = "#{rabbitMqSourceConfig.getEventBusStartUp()}",
75 ackMode = "MANUAL") 75 ackMode = "AUTO")
76 public void eventBusConsumer(Channel channel, Message message, String content) throws ParseException, IOException { 76 public void eventBusConsumer(Channel channel, Message message, String content) throws ParseException, IOException {
77 log.info(" receive dataSync msg , content is : {} ", content); 77 log.info(" receive dataSync msg , content is : {} ", content);
78 try { 78 try {
79 79
80 this.parseContent(content); 80 this.parseContent(content);
81 81
82 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 82 // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
83 83
84 } catch (Exception e) { 84 } catch (Exception e) {
85 85
...@@ -322,7 +322,7 @@ public class UcEventBusIptv2ManagementUcEngine { ...@@ -322,7 +322,7 @@ public class UcEventBusIptv2ManagementUcEngine {
322 description.put("mediaCode", mediaCode); 322 description.put("mediaCode", mediaCode);
323 description.put("time", time); 323 description.put("time", time);
324 msgData1.setDescription(JSON.toJSONString(description)); 324 msgData1.setDescription(JSON.toJSONString(description));
325 dataSyncMsg.setMsg(msgData1); 325 dataSyncMsg.setMsgData(JSONObject.toJSONString(msgData1));
326 return dataSyncMsg; 326 return dataSyncMsg;
327 } 327 }
328 328
......
...@@ -61,8 +61,9 @@ public class UcGatewayIptv2IptvConsumer { ...@@ -61,8 +61,9 @@ public class UcGatewayIptv2IptvConsumer {
61 log.info(" receive dataSync msg , content is : {} ", content); 61 log.info(" receive dataSync msg , content is : {} ", content);
62 try { 62 try {
63 DataSyncMsg dataSyncMsg = this.parseContent(content); 63 DataSyncMsg dataSyncMsg = this.parseContent(content);
64 64 if (Objects.nonNull(dataSyncMsg)) {
65 this.taskDeal(dataSyncMsg); 65 this.taskDeal(dataSyncMsg);
66 }
66 67
67 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 68 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
68 69
...@@ -95,7 +96,13 @@ public class UcGatewayIptv2IptvConsumer { ...@@ -95,7 +96,13 @@ public class UcGatewayIptv2IptvConsumer {
95 private DataSyncMsg parseContent(String content) { 96 private DataSyncMsg parseContent(String content) {
96 DataSyncMsg dataSyncMsg = JSONUtil.parseMsg2Object(content,DataSyncMsg.class); 97 DataSyncMsg dataSyncMsg = JSONUtil.parseMsg2Object(content,DataSyncMsg.class);
97 Assert.notNull(dataSyncMsg,"ERROR -->> operationConsumer -->> parseContent -->> 【dataSyncMsg】 not be null !!"); 98 Assert.notNull(dataSyncMsg,"ERROR -->> operationConsumer -->> parseContent -->> 【dataSyncMsg】 not be null !!");
98 DataSyncMsg.MsgData msgData = dataSyncMsg.getMsg(); 99 String msgDataStr = dataSyncMsg.getMsgData();
100 DataSyncMsg.MsgData msgData = null;
101 if (StringUtils.isNotBlank(msgDataStr)) {
102 msgData = JSONObject.parseObject(msgDataStr, DataSyncMsg.MsgData.class);
103 }else {
104 return null;
105 }
99 Long memberId = msgData.getMemberId(); 106 Long memberId = msgData.getMemberId();
100 String memberCode = msgData.getMemberCode(); 107 String memberCode = msgData.getMemberCode();
101 if (Objects.nonNull(memberId) && StringUtils.isBlank(memberCode)) { 108 if (Objects.nonNull(memberId) && StringUtils.isBlank(memberCode)) {
...@@ -117,7 +124,13 @@ public class UcGatewayIptv2IptvConsumer { ...@@ -117,7 +124,13 @@ public class UcGatewayIptv2IptvConsumer {
117 } 124 }
118 } 125 }
119 } 126 }
120 Assert.notNull(msgData,"ERROR -->> operationConsumer -->> parseContent -->> 【msgData】 not be null !!"); 127
128 if(Objects.isNull(msgData.getMemberCode()) && Objects.isNull(msgData.getMemberId())) {
129 log.error("会员信息不存在,msgData =>> {}", msgData);
130 return null;
131 }
132
133 dataSyncMsg.setMsgData(JSONObject.toJSONString(msgData));
121 return dataSyncMsg; 134 return dataSyncMsg;
122 } 135 }
123 136
......
...@@ -26,7 +26,7 @@ public class DataSyncMsg implements Serializable { ...@@ -26,7 +26,7 @@ public class DataSyncMsg implements Serializable {
26 // 发送时间 26 // 发送时间
27 private LocalDateTime time; 27 private LocalDateTime time;
28 // 消息体 28 // 消息体
29 private MsgData msg; 29 private String msgData;
30 30
31 /** 31 /**
32 * 消息体 32 * 消息体
......
...@@ -78,8 +78,8 @@ mutil-mq: ...@@ -78,8 +78,8 @@ mutil-mq:
78 password: guest 78 password: guest
79 # password: Topdraw1qaz 79 # password: Topdraw1qaz
80 # 虚拟空间 80 # 虚拟空间
81 # virtual-host: member_center_iptv_sichuan 81 virtual-host: member_center_iptv_sichuan
82 virtual-host: member_center_small_sichuan 82 # virtual-host: member_center_small_sichuan
83 publisher-confirms: true #如果对异步消息需要回调必须设置为true 83 publisher-confirms: true #如果对异步消息需要回调必须设置为true
84 84
85 # 管理侧 85 # 管理侧
...@@ -99,12 +99,12 @@ mutil-mq: ...@@ -99,12 +99,12 @@ mutil-mq:
99 service: 99 service:
100 mq: 100 mq:
101 list: 101 list:
102 # - source: event 102 - source: event
103 # exchange: event.exchange 103 exchange: event.exchange
104 # queue: event.queue 104 queue: event.queue
105 # exchange-type: direct 105 exchange-type: direct
106 # routing-key: 106 routing-key:
107 # active: service 107 active: service
108 - source: collection 108 - source: collection
109 exchange: exchange.collection 109 exchange: exchange.collection
110 queue: collection.queue 110 queue: collection.queue
...@@ -136,24 +136,24 @@ service: ...@@ -136,24 +136,24 @@ service:
136 # exchange-type: direct 136 # exchange-type: direct
137 # routing-key: 137 # routing-key:
138 # active: service 138 # active: service
139 # - source: eventBus 139 - source: eventBus
140 # exchange: uc.eventbus 140 exchange: uc.eventbus
141 # queue: uc.eventbus 141 queue: uc.eventbus
142 # exchange-type: topic 142 exchange-type: topic
143 # routing-key: uc.eventbus.*.topic 143 routing-key: uc.eventbus.*.topic
144 # active: service 144 active: service
145 - source: uce 145 - source: uce
146 exchange: uc.direct 146 exchange: uc.direct
147 queue: uc.route.key.direct.event.bbb 147 queue: uc.route.key.direct.event.bbb
148 exchange-type: direct 148 exchange-type: direct
149 routing-key: 149 routing-key:
150 active: service 150 active: management
151 - source: uce 151 - source: uce
152 exchange: exchange.MemberInfoSync 152 exchange: exchange.MemberInfoSync
153 queue: queue.MemberInfoSync 153 queue: queue.MemberInfoSync
154 exchange-type: direct 154 exchange-type: direct
155 routing-key: 155 routing-key:
156 active: service 156 active: management
157 # - source: wechat 157 # - source: wechat
158 # exchange: weixin.subOrUnSub.direct 158 # exchange: weixin.subOrUnSub.direct
159 # queue: weixin.subOrUnSub.queue 159 # queue: weixin.subOrUnSub.queue
......