WeComMessageDataJob.java 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. package com.tzld.piaoquan.wecom.job;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.aliyun.odps.data.Record;
  6. import com.google.common.collect.Lists;
  7. import com.tzld.piaoquan.wecom.dao.mapper.*;
  8. import com.tzld.piaoquan.wecom.model.bo.PushMessage;
  9. import com.tzld.piaoquan.wecom.model.po.*;
  10. import com.tzld.piaoquan.wecom.service.MessageAttachmentService;
  11. import com.tzld.piaoquan.wecom.service.MessageService;
  12. import com.tzld.piaoquan.wecom.utils.*;
  13. import com.tzld.piaoquan.wecom.utils.page.Page;
  14. import com.xxl.job.core.biz.model.ReturnT;
  15. import com.xxl.job.core.handler.annotation.XxlJob;
  16. import lombok.extern.log4j.Log4j2;
  17. import org.apache.commons.lang3.StringUtils;
  18. import org.springframework.beans.BeanUtils;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import org.springframework.data.redis.core.RedisTemplate;
  21. import org.springframework.stereotype.Component;
  22. import org.springframework.util.CollectionUtils;
  23. import java.io.File;
  24. import java.io.InputStream;
  25. import java.io.OutputStream;
  26. import java.net.HttpURLConnection;
  27. import java.net.URL;
  28. import java.nio.charset.StandardCharsets;
  29. import java.nio.file.Files;
  30. import java.nio.file.Paths;
  31. import java.util.*;
  32. import java.util.stream.Collectors;
  33. import static com.tzld.piaoquan.wecom.common.constant.RedisConstant.GUARANTEED_MINIPROGRAM_KEY;
  34. import static com.tzld.piaoquan.wecom.common.constant.TimeConstant.MILLISECOND_DAY;
  35. @Log4j2
  36. @Component
  37. public class WeComMessageDataJob {
  38. @Autowired
  39. private UserMapper userMapper;
  40. @Autowired
  41. private MessageAttachmentMapper messageAttachmentMapper;
  42. @Autowired
  43. private RedisTemplate<String, Object> redisTemplate;
  44. @Autowired
  45. private MessageService messageService;
  46. @Autowired
  47. private MessageAttachmentService messageAttachmentService;
  48. @Autowired
  49. private StaffWithUserMapper staffWithUserMapper;
  50. @Autowired
  51. private StaffMapper staffMapper;
  52. @Autowired
  53. SendMessageMapper sendMessageMapper;
  54. private static final int MAX_VIDEO_NUM = 3;
  55. //发送小程序标题限制字节数
  56. private static final int MAX_BYTES = 64;
  57. //历史优质视频可推送用户列表
  58. List<PushMessage> goodHistoryPushList = new ArrayList<>();
  59. //保底视频列表
  60. List<Long> guaranteedVideoIdList = new ArrayList<>();
  61. //从缓存中获取的保底视频数量
  62. int getGuaranteedVideoIdNum = 0;
  63. Map<String, String> pageMap = new HashMap<>();
  64. //初始化操作
  65. void init() {
  66. //历史优质视频获取
  67. String sql = String.format("SELECT * FROM loghubods.history_good_video_can_push_user_list where dt = %s;", DateUtil.getBeforeDayDateString());
  68. List<Record> recordList = OdpsUtil.getOdpsData(sql);
  69. if (CollectionUtils.isEmpty(recordList)) {
  70. return;
  71. }
  72. List<PushMessage> list = new ArrayList<>();
  73. for (Record record : recordList) {
  74. PushMessage pushMessage = new PushMessage();
  75. Long videoId = Long.parseLong((String) record.get(0));
  76. Set<Long> userIds = new HashSet<>(JSONObject.parseArray((String) record.get(1), Long.class));
  77. pushMessage.setVideoId(videoId);
  78. pushMessage.setUserIds(userIds);
  79. list.add(pushMessage);
  80. }
  81. goodHistoryPushList = list;
  82. getGuaranteedVideoIdNum = 0;
  83. //保底视频获取
  84. List<Long> videoIdList = Objects.requireNonNull(redisTemplate.opsForList().range(GUARANTEED_MINIPROGRAM_KEY, 0, -1))
  85. .stream().map(o -> (Integer) o).map(String::valueOf).map(Long::parseLong).collect(Collectors.toList());
  86. if (CollectionUtils.isEmpty(videoIdList)) {
  87. log.error("推送消息初始化失败,保底数据为空");
  88. throw new RuntimeException("保底数据为空");
  89. }
  90. List<Long> saveVideoIds = new ArrayList<>();
  91. for (Long videoId : videoIdList) {
  92. getGuaranteedVideoIdNum++;
  93. MessageAttachmentExample example = new MessageAttachmentExample();
  94. example.createCriteria().andMiniprogramVideoIdEqualTo(videoId);
  95. List<MessageAttachment> messageAttachmentList = messageAttachmentMapper.selectByExample(example);
  96. if (CollectionUtils.isEmpty(messageAttachmentList)) {
  97. continue;
  98. }
  99. MessageAttachment messageAttachment = messageAttachmentList.get(0);
  100. if (messageAttachment.getSendTime() != null
  101. && DateUtil.dateDifference(new Date(), messageAttachment.getSendTime()) < 180 * MILLISECOND_DAY) {
  102. continue;
  103. }
  104. saveVideoIds.add(videoId);
  105. if (saveVideoIds.size() >= MAX_VIDEO_NUM) {
  106. break;
  107. }
  108. }
  109. if (saveVideoIds.size() < MAX_VIDEO_NUM) {
  110. log.error("推送消息初始化失败,保底数据不足");
  111. throw new RuntimeException("保底数据不足");
  112. }
  113. guaranteedVideoIdList = saveVideoIds;
  114. }
  115. @XxlJob("assembleSendMessageJob")
  116. public ReturnT<String> assembleSendMessage(String param) {
  117. init();
  118. Long staffId = null;
  119. if (StringUtils.isNotEmpty(param)) {
  120. staffId = Long.parseLong(param);
  121. }
  122. UserExample example = new UserExample();
  123. example.createCriteria().andExternalUserId3rdPartyIsNotNull();
  124. long count = userMapper.countByExample(example);
  125. int page = 1;
  126. int pageSize = 1000;
  127. long totalPageSize = count / pageSize + 1;
  128. for (; page <= totalPageSize; page++) {
  129. example.setPage(new Page<>(page, pageSize));
  130. List<User> userList = userMapper.selectByExample(example);
  131. if (CollectionUtils.isEmpty(userList)) {
  132. continue;
  133. }
  134. //落库逻辑
  135. List<SendMessage> allSeneMessageList = new ArrayList<>();
  136. for (User user : userList) {
  137. List<SendMessage> sendMessageList = getSendMessage(user, staffId);
  138. if (!CollectionUtils.isEmpty(sendMessageList)) {
  139. allSeneMessageList.addAll(sendMessageList);
  140. }
  141. }
  142. if (!CollectionUtils.isEmpty(allSeneMessageList)) {
  143. sendMessageMapper.insertList(allSeneMessageList);
  144. }
  145. }
  146. //组装好当天要发送的消息后 记录时间 删除保底数据
  147. saveGuaranteedVideoIdList(guaranteedVideoIdList);
  148. return ReturnT.SUCCESS;
  149. }
  150. public void saveGuaranteedVideoIdList(List<Long> videoIdList) {
  151. MessageAttachmentExample example = new MessageAttachmentExample();
  152. example.createCriteria().andMiniprogramVideoIdIn(videoIdList);
  153. List<MessageAttachment> messageAttachmentList = messageAttachmentMapper.selectByExample(example);
  154. for (MessageAttachment messageAttachment : messageAttachmentList) {
  155. MessageAttachment updateMessageAttachment = new MessageAttachment();
  156. updateMessageAttachment.setId(messageAttachment.getId());
  157. updateMessageAttachment.setSendTime(new Date());
  158. messageAttachmentMapper.updateByPrimaryKeySelective(updateMessageAttachment);
  159. }
  160. log.info("getGuaranteedVideoIdNum={}", getGuaranteedVideoIdNum);
  161. //移除从redis中获取的保底数据
  162. for (int i = 0; i < getGuaranteedVideoIdNum; i++) {
  163. redisTemplate.opsForList().leftPop(GUARANTEED_MINIPROGRAM_KEY);
  164. }
  165. }
  166. public List<SendMessage> getSendMessage(User user, Long staffId) {
  167. StaffWithUserExample example = new StaffWithUserExample();
  168. StaffWithUserExample.Criteria criteria = example.createCriteria();
  169. criteria.andUserIdEqualTo(user.getId());
  170. if (staffId != null) {
  171. criteria.andUserIdEqualTo(staffId);
  172. }
  173. List<StaffWithUser> staffWithUserList = staffWithUserMapper.selectByExample(example);
  174. if (CollectionUtils.isEmpty(staffWithUserList)) {
  175. return null;
  176. }
  177. int n = 0;
  178. List<SendMessage> sendMessageList = new ArrayList<>();
  179. SendMessage sendMessage = new SendMessage();
  180. for (PushMessage pushMessage : goodHistoryPushList) {
  181. if (pushMessage.getUserIds().contains(user.getId())) {
  182. if (n == 0) {
  183. sendMessage.setVideoId1(pushMessage.getVideoId());
  184. }
  185. if (n == 1) {
  186. sendMessage.setVideoId2(pushMessage.getVideoId());
  187. }
  188. if (n == 2) {
  189. sendMessage.setVideoId3(pushMessage.getVideoId());
  190. }
  191. n++;
  192. if (n >= MAX_VIDEO_NUM) {
  193. break;
  194. }
  195. }
  196. }
  197. //保底数据
  198. if (n < MAX_VIDEO_NUM) {
  199. for (Long videoId : guaranteedVideoIdList) {
  200. if (n == 0) {
  201. sendMessage.setVideoId1(videoId);
  202. }
  203. if (n == 1) {
  204. sendMessage.setVideoId2(videoId);
  205. }
  206. if (n == 2) {
  207. sendMessage.setVideoId3(videoId);
  208. }
  209. n++;
  210. if (n >= MAX_VIDEO_NUM) {
  211. break;
  212. }
  213. }
  214. }
  215. if (n < MAX_VIDEO_NUM) {
  216. log.error("组装数据失败 user={}", user);
  217. return null;
  218. }
  219. for (StaffWithUser staffWithUser : staffWithUserList) {
  220. SendMessage newSendMessage = new SendMessage();
  221. BeanUtils.copyProperties(sendMessage, newSendMessage);
  222. newSendMessage.setStaffId(staffWithUser.getStaffId());
  223. newSendMessage.setUserId(staffWithUser.getUserId());
  224. sendMessageList.add(newSendMessage);
  225. }
  226. return sendMessageList;
  227. }
  228. @XxlJob("pushSendMessageJob")
  229. public ReturnT<String> pushSendMessage(String param) {
  230. List<SendMessage> groupList = sendMessageMapper.getGroupList(DateUtil.getThatDayDate(), 0);
  231. if (StringUtils.isNotEmpty(param)) {
  232. groupList = groupList.stream().filter(e -> e.getStaffId() == Long.parseLong(param)).collect(Collectors.toList());
  233. }
  234. if (CollectionUtils.isEmpty(groupList)) {
  235. return ReturnT.SUCCESS;
  236. }
  237. for (SendMessage sendMessage : groupList) {
  238. sendMessage.setIsSend(0);
  239. sendMessage.setCreateTime(DateUtil.getThatDayDate());
  240. List<String> sendUserList = sendMessageMapper.selectExternalUserId3rdParty(sendMessage);
  241. boolean flag = pushMessage(sendUserList, sendMessage);
  242. if (flag) {
  243. SendMessage updateSendMessage = new SendMessage();
  244. updateSendMessage.setIsSend(1);
  245. SendMessageExample example = new SendMessageExample();
  246. example.createCriteria()
  247. .andVideoId1EqualTo(sendMessage.getVideoId1())
  248. .andVideoId2EqualTo(sendMessage.getVideoId2())
  249. .andVideoId3EqualTo(sendMessage.getVideoId3())
  250. .andStaffIdEqualTo(sendMessage.getStaffId())
  251. .andCreateTimeGreaterThan(DateUtil.getThatDayDate());
  252. sendMessageMapper.updateByExampleSelective(updateSendMessage, example);
  253. }
  254. }
  255. return ReturnT.SUCCESS;
  256. }
  257. public boolean pushMessage(List<String> sendUserList, SendMessage sendMessage) {
  258. List<JSONObject> pushList = new ArrayList<>();
  259. StaffExample staffExample = new StaffExample();
  260. staffExample.createCriteria().andIdEqualTo(sendMessage.getStaffId());
  261. List<Staff> staffList = staffMapper.selectByExample(staffExample);
  262. Staff staff = staffList.get(0);
  263. String text = messageService.getMessageText();
  264. String name = MessageUtil.getName(staff.getRemark());
  265. JSONObject jsonObject = new JSONObject();
  266. jsonObject.put("name", name);
  267. jsonObject.put("text", text);
  268. JSONArray attachments = new JSONArray();
  269. List<Long> videoIdList = new ArrayList<>();
  270. videoIdList.add(sendMessage.getVideoId1());
  271. videoIdList.add(sendMessage.getVideoId2());
  272. videoIdList.add(sendMessage.getVideoId3());
  273. for (Long videoId : videoIdList) {
  274. JSONObject attachment = new JSONObject();
  275. attachment.put("msgtype", "miniprogram");
  276. MessageAttachmentExample example = new MessageAttachmentExample();
  277. example.createCriteria().andMiniprogramVideoIdEqualTo(videoId);
  278. List<MessageAttachment> messageAttachmentList = messageAttachmentMapper.selectByExample(example);
  279. if (CollectionUtils.isEmpty(messageAttachmentList)) {
  280. throw new RuntimeException("附件信息查询异常");
  281. }
  282. MessageAttachment messageAttachment = messageAttachmentList.get(0);
  283. JSONObject miniprogram = new JSONObject();
  284. miniprogram.put("appid", messageAttachment.getAppid());
  285. String title = messageAttachment.getTitle();
  286. if (title.getBytes(StandardCharsets.UTF_8).length > MAX_BYTES) {
  287. title = ToolUtils.truncateString(title, MAX_BYTES - 3) + "...";
  288. }
  289. miniprogram.put("title", title);
  290. miniprogram.put("cover", messageAttachment.getCover());
  291. String page = "";
  292. String key = staff.getStaffExtId() + "_" + videoId;
  293. if (pageMap.containsKey(key)) {
  294. page = pageMap.get(key);
  295. } else {
  296. page = messageAttachmentService.getPage(staff, videoId);
  297. pageMap.put(key, page);
  298. }
  299. if (StringUtils.isEmpty(page)) {
  300. throw new RuntimeException("获取page失败");
  301. }
  302. miniprogram.put("page", page);
  303. attachment.put("miniprogram", miniprogram);
  304. attachments.add(0, attachment);
  305. }
  306. jsonObject.put("attachments", attachments);
  307. List<List<String>> lists = Lists.partition(sendUserList, 10000);
  308. for (List<String> list : lists) {
  309. List<JSONObject> staffEuList = new ArrayList<>();
  310. JSONObject newJSONObject = new JSONObject();
  311. newJSONObject.putAll(jsonObject);
  312. JSONObject staff_eu = new JSONObject();
  313. staff_eu.put("staff_ext_id", staff.getStaffExtId());
  314. staff_eu.put("eu_ext_ids", list);
  315. staffEuList.add(staff_eu);
  316. newJSONObject.put("staff_eu_list", staffEuList);
  317. pushList.add(newJSONObject);
  318. }
  319. if (CollectionUtils.isEmpty(pushList)) {
  320. return false;
  321. }
  322. for (JSONObject pushJsonObject : pushList) {
  323. log.info("pushMessage pushJsonObject={}", pushJsonObject);
  324. boolean flag = messageService.pushMessage(pushJsonObject);
  325. if (!flag) {
  326. return flag;
  327. }
  328. }
  329. return true;
  330. }
  331. }