WeComMessageDataJob.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. package com.tzld.piaoquan.wecom.job;
  2. import com.alibaba.fastjson.JSONArray;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.aliyun.odps.data.Record;
  5. import com.google.common.collect.Lists;
  6. import com.tzld.piaoquan.wecom.dao.mapper.*;
  7. import com.tzld.piaoquan.wecom.model.bo.PushMessage;
  8. import com.tzld.piaoquan.wecom.model.po.*;
  9. import com.tzld.piaoquan.wecom.service.MessageAttachmentService;
  10. import com.tzld.piaoquan.wecom.service.MessageService;
  11. import com.tzld.piaoquan.wecom.utils.DateUtil;
  12. import com.tzld.piaoquan.wecom.utils.MessageUtil;
  13. import com.tzld.piaoquan.wecom.utils.OdpsUtil;
  14. import com.tzld.piaoquan.wecom.utils.page.Page;
  15. import com.xxl.job.core.biz.model.ReturnT;
  16. import com.xxl.job.core.handler.annotation.XxlJob;
  17. import org.apache.commons.lang3.StringUtils;
  18. import org.springframework.beans.BeanUtils;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import org.springframework.data.redis.core.RedisTemplate;
  21. import org.springframework.stereotype.Component;
  22. import org.springframework.util.CollectionUtils;
  23. import java.util.*;
  24. import java.util.stream.Collectors;
  25. import static com.tzld.piaoquan.wecom.common.constant.RedisConstant.GUARANTEED_MINIPROGRAM_KEY;
  26. import static com.tzld.piaoquan.wecom.common.constant.TimeConstant.MILLISECOND_DAY;
  27. @Component
  28. public class WeComMessageDataJob {
  29. @Autowired
  30. private UserMapper userMapper;
  31. @Autowired
  32. private MessageAttachmentMapper messageAttachmentMapper;
  33. @Autowired
  34. private RedisTemplate<String, Object> redisTemplate;
  35. @Autowired
  36. private MessageService messageService;
  37. @Autowired
  38. private MessageAttachmentService messageAttachmentService;
  39. @Autowired
  40. private StaffWithUserMapper staffWithUserMapper;
  41. @Autowired
  42. private StaffMapper staffMapper;
  43. @Autowired
  44. SendMessageMapper sendMessageMapper;
  45. private static final int MAX_VIDEO_NUM = 3;
  46. //历史优质视频可推送用户列表
  47. List<PushMessage> goodHistoryPushList = new ArrayList<>();
  48. //保底视频列表
  49. List<Long> guaranteedVideoIdList = new ArrayList<>();
  50. //从缓存中获取的保底视频数量
  51. int getGuaranteedVideoIdNum = 0;
  52. Map<String, String> pageMap = new HashMap<>();
  53. //初始化操作
  54. void init() {
  55. //历史优质视频获取
  56. String sql = String.format("SELECT * FROM loghubods.history_good_video_can_push_user_list where dt = %s;", DateUtil.getBeforeDayDateString());
  57. List<Record> recordList = OdpsUtil.getOdpsData(sql);
  58. if (CollectionUtils.isEmpty(recordList)) {
  59. return;
  60. }
  61. List<PushMessage> list = new ArrayList<>();
  62. for (Record record : recordList) {
  63. PushMessage pushMessage = new PushMessage();
  64. Long videoId = Long.parseLong((String) record.get(0));
  65. Set<Long> userIds = new HashSet<>(JSONObject.parseArray((String) record.get(1), Long.class));
  66. pushMessage.setVideoId(videoId);
  67. pushMessage.setUserIds(userIds);
  68. list.add(pushMessage);
  69. }
  70. goodHistoryPushList = list;
  71. getGuaranteedVideoIdNum = 0;
  72. //保底视频获取
  73. List<Long> videoIdList = Objects.requireNonNull(redisTemplate.opsForList().range(GUARANTEED_MINIPROGRAM_KEY, 0, -1))
  74. .stream().map(o -> (Integer) o).map(String::valueOf).map(Long::parseLong).collect(Collectors.toList());
  75. if (CollectionUtils.isEmpty(videoIdList)) {
  76. throw new RuntimeException("保底数据为空");
  77. }
  78. List<Long> saveVideoIds = new ArrayList<>();
  79. for (Long videoId : videoIdList) {
  80. getGuaranteedVideoIdNum++;
  81. MessageAttachmentExample example = new MessageAttachmentExample();
  82. example.createCriteria().andMiniprogramVideoIdEqualTo(videoId);
  83. List<MessageAttachment> messageAttachmentList = messageAttachmentMapper.selectByExample(example);
  84. if (CollectionUtils.isEmpty(messageAttachmentList)) {
  85. continue;
  86. }
  87. MessageAttachment messageAttachment = messageAttachmentList.get(0);
  88. if (messageAttachment.getSendTime() != null
  89. && DateUtil.dateDifference(new Date(), messageAttachment.getSendTime()) < 180 * MILLISECOND_DAY) {
  90. continue;
  91. }
  92. saveVideoIds.add(videoId);
  93. if (saveVideoIds.size() >= MAX_VIDEO_NUM) {
  94. break;
  95. }
  96. }
  97. if (saveVideoIds.size() < MAX_VIDEO_NUM) {
  98. throw new RuntimeException("保底数据不足");
  99. }
  100. guaranteedVideoIdList = saveVideoIds;
  101. }
  102. @XxlJob("assembleSendMessageJob")
  103. public ReturnT<String> assembleSendMessage(String param) {
  104. init();
  105. Map<String, List<String>> res = new HashMap<>();
  106. UserExample example = new UserExample();
  107. long count = userMapper.countByExample(example);
  108. int page = 1;
  109. int pageSize = 1000;
  110. long totalPageSize = count / pageSize + 1;
  111. for (; page <= totalPageSize; page++) {
  112. example.setPage(new Page<>(page, pageSize));
  113. List<User> userList = userMapper.selectByExample(example);
  114. if (CollectionUtils.isEmpty(userList)) {
  115. continue;
  116. }
  117. //落库逻辑
  118. List<SendMessage> allSeneMessageList = new ArrayList<>();
  119. for (User user : userList) {
  120. List<SendMessage> sendMessageList = getSendMessage(user);
  121. if (!CollectionUtils.isEmpty(sendMessageList)) {
  122. allSeneMessageList.addAll(sendMessageList);
  123. }
  124. }
  125. if (!CollectionUtils.isEmpty(allSeneMessageList)) {
  126. sendMessageMapper.insertList(allSeneMessageList);
  127. }
  128. }
  129. //组装好当天要发送的消息后 记录时间 删除保底数据
  130. saveGuaranteedVideoIdList(guaranteedVideoIdList);
  131. return ReturnT.SUCCESS;
  132. }
  133. public void saveGuaranteedVideoIdList(List<Long> videoIdList) {
  134. MessageAttachmentExample example = new MessageAttachmentExample();
  135. example.createCriteria().andMiniprogramVideoIdIn(videoIdList);
  136. List<MessageAttachment> messageAttachmentList = messageAttachmentMapper.selectByExample(example);
  137. for (MessageAttachment messageAttachment : messageAttachmentList) {
  138. MessageAttachment updateMessageAttachment = new MessageAttachment();
  139. updateMessageAttachment.setId(messageAttachment.getId());
  140. updateMessageAttachment.setSendTime(new Date());
  141. messageAttachmentMapper.updateByPrimaryKeySelective(updateMessageAttachment);
  142. }
  143. //移除从redis中获取的保底数据
  144. for (int i = 0; i < getGuaranteedVideoIdNum; i++) {
  145. redisTemplate.opsForList().leftPop(GUARANTEED_MINIPROGRAM_KEY);
  146. }
  147. }
  148. public List<SendMessage> getSendMessage(User user) {
  149. int n = 0;
  150. List<SendMessage> sendMessageList = new ArrayList<>();
  151. SendMessage sendMessage = new SendMessage();
  152. for (PushMessage pushMessage : goodHistoryPushList) {
  153. if (pushMessage.getUserIds().contains(user.getId())) {
  154. if (n == 0) {
  155. sendMessage.setVideoId1(pushMessage.getVideoId());
  156. }
  157. if (n == 1) {
  158. sendMessage.setVideoId2(pushMessage.getVideoId());
  159. }
  160. if (n == 2) {
  161. sendMessage.setVideoId3(pushMessage.getVideoId());
  162. }
  163. n++;
  164. if (n >= MAX_VIDEO_NUM) {
  165. break;
  166. }
  167. }
  168. }
  169. //保底数据
  170. if (n < MAX_VIDEO_NUM) {
  171. for (Long videoId : guaranteedVideoIdList) {
  172. if (n == 0) {
  173. sendMessage.setVideoId1(videoId);
  174. }
  175. if (n == 1) {
  176. sendMessage.setVideoId2(videoId);
  177. }
  178. if (n == 2) {
  179. sendMessage.setVideoId3(videoId);
  180. }
  181. n++;
  182. if (n >= MAX_VIDEO_NUM) {
  183. break;
  184. }
  185. }
  186. }
  187. if (n < MAX_VIDEO_NUM) {
  188. throw new RuntimeException("保底数据异常");
  189. }
  190. StaffWithUserExample example = new StaffWithUserExample();
  191. example.createCriteria().andUserIdEqualTo(user.getId());
  192. List<StaffWithUser> staffWithUserList = staffWithUserMapper.selectByExample(example);
  193. if (CollectionUtils.isEmpty(staffWithUserList)) {
  194. return null;
  195. }
  196. for (StaffWithUser staffWithUser : staffWithUserList) {
  197. SendMessage newSendMessage = new SendMessage();
  198. BeanUtils.copyProperties(sendMessage, newSendMessage);
  199. newSendMessage.setStaffId(staffWithUser.getStaffId());
  200. newSendMessage.setUserId(staffWithUser.getUserId());
  201. sendMessageList.add(newSendMessage);
  202. }
  203. return sendMessageList;
  204. }
  205. @XxlJob("pushSendMessageJob")
  206. public ReturnT<String> pushSendMessage(String param) {
  207. List<SendMessage> groupList = sendMessageMapper.getGroupList(DateUtil.getThatDayDate(), 0);
  208. for (SendMessage sendMessage : groupList) {
  209. sendMessage.setIsSend(0);
  210. sendMessage.setCreateTime(DateUtil.getThatDayDate());
  211. List<String> sendUserList = sendMessageMapper.selectExternalUserId3rdParty(sendMessage);
  212. boolean flag = pushMessage(sendUserList, sendMessage);
  213. if (flag) {
  214. SendMessage updateSendMessage = new SendMessage();
  215. updateSendMessage.setIsSend(1);
  216. SendMessageExample example = new SendMessageExample();
  217. example.createCriteria()
  218. .andVideoId1EqualTo(sendMessage.getVideoId1())
  219. .andVideoId2EqualTo(sendMessage.getVideoId2())
  220. .andVideoId3EqualTo(sendMessage.getVideoId3())
  221. .andStaffIdEqualTo(sendMessage.getStaffId())
  222. .andCreateTimeGreaterThan(DateUtil.getThatDayDate());
  223. sendMessageMapper.updateByExampleSelective(updateSendMessage, example);
  224. }
  225. }
  226. return ReturnT.SUCCESS;
  227. }
  228. public boolean pushMessage(List<String> sendUserList, SendMessage sendMessage) {
  229. List<JSONObject> pushList = new ArrayList<>();
  230. StaffExample staffExample = new StaffExample();
  231. staffExample.createCriteria().andIdEqualTo(sendMessage.getStaffId());
  232. List<Staff> staffList = staffMapper.selectByExample(staffExample);
  233. Staff staff = staffList.get(0);
  234. String text = messageService.getMessageText();
  235. String name = MessageUtil.getName();
  236. JSONObject jsonObject = new JSONObject();
  237. jsonObject.put("name", name);
  238. jsonObject.put("text", text);
  239. JSONArray attachments = new JSONArray();
  240. List<Long> videoIdList = new ArrayList<>();
  241. videoIdList.add(sendMessage.getVideoId1());
  242. videoIdList.add(sendMessage.getVideoId2());
  243. videoIdList.add(sendMessage.getVideoId3());
  244. for (Long videoId : videoIdList) {
  245. JSONObject attachment = new JSONObject();
  246. attachment.put("msgtype", "miniprogram");
  247. MessageAttachmentExample example = new MessageAttachmentExample();
  248. example.createCriteria().andMiniprogramVideoIdEqualTo(videoId);
  249. List<MessageAttachment> messageAttachmentList = messageAttachmentMapper.selectByExample(example);
  250. if (CollectionUtils.isEmpty(messageAttachmentList)) {
  251. throw new RuntimeException("附件信息查询异常");
  252. }
  253. MessageAttachment messageAttachment = messageAttachmentList.get(0);
  254. JSONObject miniprogram = new JSONObject();
  255. miniprogram.put("appid", messageAttachment.getAppid());
  256. miniprogram.put("title", messageAttachment.getTitle());
  257. miniprogram.put("cover", messageAttachment.getCover());
  258. String page = "";
  259. String key = staff.getStaffExtId() + "_" + videoId;
  260. if (pageMap.containsKey(key)) {
  261. page = pageMap.get(key);
  262. } else {
  263. page = messageAttachmentService.getPage(staff, videoId);
  264. pageMap.put(key, page);
  265. }
  266. if (StringUtils.isEmpty(page)) {
  267. throw new RuntimeException("获取page失败");
  268. }
  269. miniprogram.put("page", page);
  270. attachment.put("miniprogram", miniprogram);
  271. attachments.add(attachment);
  272. }
  273. jsonObject.put("attachments", attachments);
  274. List<List<String>> lists = Lists.partition(sendUserList, 10000);
  275. for (List<String> list : lists) {
  276. List<JSONObject> staffEuList = new ArrayList<>();
  277. JSONObject newJSONObject = new JSONObject();
  278. newJSONObject.putAll(jsonObject);
  279. JSONObject staff_eu = new JSONObject();
  280. staff_eu.put("staff_ext_id", staff.getStaffExtId());
  281. staff_eu.put("eu_ext_ids", list);
  282. staffEuList.add(staff_eu);
  283. newJSONObject.put("staff_eu_list", staffEuList);
  284. pushList.add(newJSONObject);
  285. }
  286. if (CollectionUtils.isEmpty(pushList)) {
  287. throw new RuntimeException("推送视频生成失败");
  288. }
  289. for (JSONObject pushJsonObject : pushList) {
  290. boolean flag = messageService.pushMessage(pushJsonObject);
  291. if (!flag) {
  292. return flag;
  293. }
  294. }
  295. return true;
  296. }
  297. }