Commit f4ad8dc9 f4ad8dc9623f655c9707d6d52e27081434f08de1 by xianghan

1.同步master,优化任务处理过程

1 parent 60b8b824
......@@ -5,6 +5,7 @@ import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.time.LocalDateTime;
import java.util.List;
......@@ -23,6 +24,23 @@ public interface MemberRepository extends JpaRepository<Member, Long>, JpaSpecif
Optional<Member> findByIdOrCode(Long id, String code);
@Modifying
@Query(value = "UPDATE `uc_user_tv` SET `user_iptv_id` = ?2, `update_time` = ?3 WHERE `id` = ?1", nativeQuery = true)
@Query(value = "UPDATE `uc_member` SET `user_iptv_id` = ?2, `update_time` = ?3 , `bind_iptv_platform_type`= 0, " +
"`bind_iptv_time`=?3 WHERE `id` = ?1", nativeQuery = true)
void updateUserIptvIdById(Long id, Long userIptvId, LocalDateTime now);
@Modifying
@Query(value = "UPDATE `uc_member` SET `exp` = :#{#resources.exp}, `level` = :#{#resources.level} , `update_time`= now() " +
" WHERE `id` = :#{#resources.id}", nativeQuery = true)
void updateExpAndLevel(@Param("resources") Member member);
@Modifying
@Query(value = "UPDATE `uc_member` SET `points` = :#{#resources.points}, `due_points` = :#{#resources.duePoints} , `update_time`= now() " +
" WHERE `id` = :#{#resources.id}", nativeQuery = true)
void updatePointAndDuePoint(@Param("resources") Member resources);
@Modifying
@Query(value = "UPDATE `uc_member` SET `coupon_amount` = :#{#resources.couponAmount}, `due_coupon_amount` = :#{#resources.dueCouponAmount} , `update_time`= now() " +
" WHERE `id` = :#{#resources.id}", nativeQuery = true)
void doUpdateMemberCoupon(@Param("resources") Member member);
}
......
......@@ -86,4 +86,6 @@ public interface MemberService {
void unbind(Member resources);
void updateUserIptvIdById(Long id, Long userIptvId, LocalDateTime now);
void doUpdateMemberCoupon(Member member);
}
......
......@@ -111,12 +111,22 @@ public class MemberServiceImpl implements MemberService {
}
@Override
@Transactional(rollbackFor = Exception.class)
public MemberDTO doUpdateMemberExpAndLevel(Member resources) {
MemberDTO memberDTO = this.update(resources);
MemberDTO memberDTO = this.findByCode(resources.getCode());
Member member = new Member();
member.setId(memberDTO.getId());
member.setExp(resources.getExp());
member.setLevel(resources.getLevel());
member.setUpdateTime(LocalDateTime.now());
this.memberRepository.updateExpAndLevel(member);
// MemberDTO memberDTO = this.update(resources);
return memberDTO;
}
@Override
@Transactional(rollbackFor = Exception.class)
public MemberDTO unbindUserIpTv(Member member) {
Member _member = this.save(member);
return this.memberMapper.toDto(_member);
......@@ -178,6 +188,12 @@ public class MemberServiceImpl implements MemberService {
@Override
@Transactional(rollbackFor = Exception.class)
public void doUpdateMemberCoupon(Member member) {
this.memberRepository.doUpdateMemberCoupon(member);
}
@Override
@Transactional(rollbackFor = Exception.class)
public MemberDTO update(Member resources) {
log.info("MemberServiceImpl ==>> update ==>> resources ==>> [{}]" , resources);
......@@ -219,11 +235,11 @@ public class MemberServiceImpl implements MemberService {
Member member = this.memberRepository.findById(resources.getId()).orElseGet(Member::new);
ValidationUtil.isNull(member.getId(), "Member", "id", resources.getId());
member.copy(resources);
Member _member = this.save(member);
this.memberRepository.updatePointAndDuePoint(resources);
// member.copy(resources);
return this.memberMapper.toDto(_member);
// Member _member = this.save(member);
return this.memberMapper.toDto(member);
} catch (Exception e) {
e.printStackTrace();
......
......@@ -25,7 +25,7 @@ public class CouponOperationServiceImpl implements CouponOperationService {
String memberCode = member.getCode();
MemberDTO memberDTO = this.memberService.findByCode(memberCode);
member.setId(memberDTO.getId());
this.memberOperationService.doUpdateMember(member);
this.memberService.doUpdateMemberCoupon(member);
}
public void asyncCouponHistory(CouponHistory couponHistory) {
......
......@@ -94,7 +94,15 @@ public class MemberOperationServiceImpl implements MemberOperationService {
// @CachePut(key = "#resources.id")
@Override
public MemberDTO doUpdateMemberPoints(Member resources) {
return this.memberService.doUpdateMemberPoints(resources);
// return this.memberService.doUpdateMemberPoints(resources);
MemberDTO memberDTO = this.findByCode(resources.getCode());
Member member = new Member();
member.setId(memberDTO.getId());
member.setPoints(resources.getPoints());
member.setDuePoints(resources.getDuePoints());
member.setUpdateTime(LocalDateTime.now());
MemberDTO memberDTO1 = this.memberService.doUpdateMemberPoints(member);
return memberDTO1;
}
@Override
......
......@@ -72,14 +72,14 @@ public class UcEventBusIptv2ManagementUcEngine {
@RabbitListener(queues = "#{rabbitMqSourceConfig.getEventBusQueue()}",
containerFactory = "#{rabbitMqSourceConfig.getEventBusSource()}",
autoStartup = "#{rabbitMqSourceConfig.getEventBusStartUp()}",
ackMode = "MANUAL")
ackMode = "AUTO")
public void eventBusConsumer(Channel channel, Message message, String content) throws ParseException, IOException {
log.info(" receive dataSync msg , content is : {} ", content);
try {
this.parseContent(content);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
......@@ -322,7 +322,7 @@ public class UcEventBusIptv2ManagementUcEngine {
description.put("mediaCode", mediaCode);
description.put("time", time);
msgData1.setDescription(JSON.toJSONString(description));
dataSyncMsg.setMsg(msgData1);
dataSyncMsg.setMsgData(JSONObject.toJSONString(msgData1));
return dataSyncMsg;
}
......
......@@ -61,8 +61,9 @@ public class UcGatewayIptv2IptvConsumer {
log.info(" receive dataSync msg , content is : {} ", content);
try {
DataSyncMsg dataSyncMsg = this.parseContent(content);
if (Objects.nonNull(dataSyncMsg)) {
this.taskDeal(dataSyncMsg);
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
......@@ -95,7 +96,13 @@ public class UcGatewayIptv2IptvConsumer {
private DataSyncMsg parseContent(String content) {
DataSyncMsg dataSyncMsg = JSONUtil.parseMsg2Object(content,DataSyncMsg.class);
Assert.notNull(dataSyncMsg,"ERROR -->> operationConsumer -->> parseContent -->> 【dataSyncMsg】 not be null !!");
DataSyncMsg.MsgData msgData = dataSyncMsg.getMsg();
String msgDataStr = dataSyncMsg.getMsgData();
DataSyncMsg.MsgData msgData = null;
if (StringUtils.isNotBlank(msgDataStr)) {
msgData = JSONObject.parseObject(msgDataStr, DataSyncMsg.MsgData.class);
}else {
return null;
}
Long memberId = msgData.getMemberId();
String memberCode = msgData.getMemberCode();
if (Objects.nonNull(memberId) && StringUtils.isBlank(memberCode)) {
......@@ -117,7 +124,13 @@ public class UcGatewayIptv2IptvConsumer {
}
}
}
Assert.notNull(msgData,"ERROR -->> operationConsumer -->> parseContent -->> 【msgData】 not be null !!");
if(Objects.isNull(msgData.getMemberCode()) && Objects.isNull(msgData.getMemberId())) {
log.error("会员信息不存在,msgData =>> {}", msgData);
return null;
}
dataSyncMsg.setMsgData(JSONObject.toJSONString(msgData));
return dataSyncMsg;
}
......
......@@ -47,7 +47,7 @@ public class WeiXinEventConsumer {
@RabbitHandler
@RabbitListener(queues = "#{rabbitMqSourceConfig.getWechatQueue()}",
containerFactory = "#{rabbitMqSourceConfig.getWechatSource()}",
autoStartup = "#{rabbitMqSourceConfig.getWechatStartUp()}", ackMode = "MANUAL")
autoStartup = "#{rabbitMqSourceConfig.getWechatStartUp()}", ackMode = "AUTO")
@Transactional
public void subOrUnSubEvent(Channel channel, Message message, String content) throws IOException {
try {
......@@ -61,23 +61,32 @@ public class WeiXinEventConsumer {
String openid = wechatMsg.getString("FromUserName");
String msgType = wechatMsg.getString("MsgType");
if ("event".equals(msgType)) {
String event = wechatMsg.getString("Event");
String eventKey = wechatMsg.getString("EventKey");
log.info("event ==>> {}", event);
SubscribeBean subscribeBean = new SubscribeBean();
subscribeBean.setAppid(appid);
subscribeBean.setOpenid(openid);
subscribeBean.setUnionid(unionid);
subscribeBean.setEventKey(eventKey);
if (event.equals("subscribe"))
if (event.equals("subscribe")) {
log.info("send subscribe request start");
this.restTemplateClient.subscribe(subscribeBean);
log.info("send subscribe request end ");
}
if (event.equals("unsubscribe"))
if (event.equals("unsubscribe")) {
log.info("send unsubscribe request start");
this.restTemplateClient.unsubscribe(subscribeBean);
log.info("send unsubscribe request end");
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} catch (Exception e) {
......
......@@ -26,7 +26,7 @@ public class DataSyncMsg implements Serializable {
// 发送时间
private LocalDateTime time;
// 消息体
private MsgData msg;
private String msgData;
/**
* 消息体
......
......@@ -97,7 +97,9 @@ public class RestTemplateClient {
HashMap<Object, Object> objectObjectHashMap = new HashMap<>();
objectObjectHashMap.put("content", content);
log.info("reobjectObjectHashMap ===>> [{}]",objectObjectHashMap);
restTemplate.postForEntity(url, objectObjectHashMap, String.class);
log.info("unsubscribe ===>> success");
/*ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, subscribeBean, String.class);
String entityBody = "";
if (responseEntity.getStatusCode().is2xxSuccessful()) {
......@@ -113,8 +115,9 @@ public class RestTemplateClient {
HashMap<String, String> objectObjectHashMap = new HashMap<>();
objectObjectHashMap.put("content", content);
log.info("objectObjectHashMap ===>> [{}]",objectObjectHashMap);
log.info("reobjectObjectHashMap ===>> [{}]",objectObjectHashMap);
restTemplate.postForEntity(url, objectObjectHashMap, String.class);
log.info("send subscribe request ===>> success");
/*ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, subscribeBean, String.class);
String entityBody = "";
if (responseEntity.getStatusCode().is2xxSuccessful()) {
......