package com.tzld.piaoquan.wecom.job; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.aliyun.odps.data.Record; import com.google.common.collect.Lists; import com.tzld.piaoquan.wecom.dao.mapper.*; import com.tzld.piaoquan.wecom.model.bo.PushMessage; import com.tzld.piaoquan.wecom.model.bo.VideoCombination; import com.tzld.piaoquan.wecom.model.bo.VideoParam; import com.tzld.piaoquan.wecom.model.bo.XxlJobParam; import com.tzld.piaoquan.wecom.model.po.*; import com.tzld.piaoquan.wecom.model.vo.GuaranteedParam; import com.tzld.piaoquan.wecom.service.MessageAttachmentService; import com.tzld.piaoquan.wecom.service.MessageService; import com.tzld.piaoquan.wecom.utils.DateUtil; import com.tzld.piaoquan.wecom.utils.LarkRobotUtil; import com.tzld.piaoquan.wecom.utils.OdpsUtil; import com.tzld.piaoquan.wecom.utils.ToolUtils; import com.tzld.piaoquan.wecom.utils.page.Page; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.stream.Collectors; import static com.tzld.piaoquan.wecom.common.constant.MessageConstant.MAX_VIDEO_NUM; import static com.tzld.piaoquan.wecom.common.constant.RedisConstant.GUARANTEED_MINI_PROGRAM_KEY; import static com.tzld.piaoquan.wecom.common.constant.TimeConstant.MILLISECOND_DAY; @Log4j2 @Component public class WeComMessageDataJob { @Autowired private UserMapper userMapper; @Autowired private MessageAttachmentMapper messageAttachmentMapper; @Autowired private RedisTemplate redisTemplate; @Autowired private MessageService messageService; @Autowired private MessageAttachmentService messageAttachmentService; @Autowired private StaffWithUserMapper staffWithUserMapper; @Autowired private StaffMapper staffMapper; @Autowired private SendMessageMapper sendMessageMapper; @Autowired private CorpMapper corpMapper; //发送小程序标题限制字节数 private static final int MAX_BYTES = 64; //历史优质视频可推送用户列表 Map> historicalTopMap = new HashMap<>(); //保底视频列表 Map guaranteedVideoMap = new HashMap<>(); Map pageMap = new HashMap<>(); //初始化操作 private void init(List staffIds) { //历史优质视频获取 String sql = String.format("SELECT * FROM loghubods.history_good_video_can_push_user_list where dt = %s;", DateUtil.getBeforeDayDateString()); List recordList = OdpsUtil.getOdpsData(sql); if (CollectionUtils.isEmpty(recordList)) { LarkRobotUtil.sendMessage("历史优质视频为空"); } else { List list = new ArrayList<>(); for (Record record : recordList) { PushMessage pushMessage = new PushMessage(); Long videoId = Long.parseLong((String) record.get(0)); Set userIds = new HashSet<>(JSONObject.parseArray((String) record.get(1), Long.class)); Long staffId = Long.parseLong((String) record.get(2)); Double score = Double.parseDouble((String) record.get(3)); pushMessage.setVideoId(videoId); pushMessage.setUserIds(userIds); pushMessage.setStaffId(staffId); pushMessage.setScore(score); list.add(pushMessage); } historicalTopMap = list.stream() .collect(Collectors.groupingBy(PushMessage::getStaffId, Collectors.mapping(pushMessage -> pushMessage, Collectors.toList()))) .entrySet() .stream() .collect(Collectors.toMap( Map.Entry::getKey, entry -> entry.getValue().stream() .sorted(Comparator.comparing(PushMessage::getScore).reversed()) // 根据 score 降序排序 .collect(Collectors.toList()) )); } //保底视频获取 String key = String.format(GUARANTEED_MINI_PROGRAM_KEY, DateUtil.getThatDayDateString()); GuaranteedParam guaranteedParam = (GuaranteedParam) redisTemplate.opsForValue().get(key); if (guaranteedParam == null || CollectionUtils.isEmpty(guaranteedParam.getVideoParamList())) { LarkRobotUtil.sendMessage("保底视频获取异常,请检查" + DateUtil.getThatDayDateString()); throw new RuntimeException(); } Map videoMap = new HashMap<>(); for (VideoParam videoParam : guaranteedParam.getVideoParamList()) { if (videoParam.getStaffId() == null) { LarkRobotUtil.sendMessage("保底视频获取异常,StaffId为空" + DateUtil.getThatDayDateString()); continue; } if (videoParam.getStaffId() != 0) { if (!CollectionUtils.isEmpty(staffIds) && !staffIds.contains(videoParam.getStaffId())) { continue; } } //默认组视频不做查询 if (videoParam.getStaffId() == 0L) { if (videoParam.getVideoIds().size() < MAX_VIDEO_NUM) { LarkRobotUtil.sendMessage("默认组视频数量不足" + DateUtil.getThatDayDateString()); } videoMap.put(videoParam.getStaffId(), combination(videoParam.getVideoIds())); continue; } if (CollectionUtils.isEmpty(videoParam.getVideoIds()) || videoParam.getVideoIds().size() < MAX_VIDEO_NUM) { LarkRobotUtil.sendMessage("保底视频数量异常,请查看" + guaranteedParam.getDate() + videoParam.getStaffId()); throw new RuntimeException(); } for (Long videoId : videoParam.getVideoIds()) { MessageAttachmentExample example = new MessageAttachmentExample(); example.createCriteria().andMiniprogramVideoIdEqualTo(videoId).andStaffIdEqualTo(videoParam.getStaffId()); List messageAttachmentList = messageAttachmentMapper.selectByExample(example); if (CollectionUtils.isEmpty(messageAttachmentList)) { LarkRobotUtil.sendMessage("保底视频不存在,请查看videoId=" + videoId); continue; } MessageAttachment messageAttachment = messageAttachmentList.get(0); if (messageAttachment.getSendTime() != null && DateUtil.dateDifference(new Date(), messageAttachment.getSendTime()) < 90 * MILLISECOND_DAY) { LarkRobotUtil.sendMessage("保底视频半年内已发送,请查看videoId=" + videoId); } } //重新组合视频id videoMap.put(videoParam.getStaffId(), combination(videoParam.getVideoIds())); } if (!videoMap.containsKey(0L)) { LarkRobotUtil.sendMessage("保底视频没有默认组,请查看" + guaranteedParam.getDate()); throw new RuntimeException(); } log.info("保底数据获取,videoMap={}", videoMap); this.guaranteedVideoMap = videoMap; } private VideoCombination combination(List videoIds) { VideoCombination videoCombination = new VideoCombination(); videoCombination.setThreeVideoList(combinedVideoList(videoIds, 3)); videoCombination.setThreeVideoIndex(0); videoCombination.setTwoVideoList(combinedVideoList(videoIds, 2)); videoCombination.setTwoVideoIndex(0); videoCombination.setOneVideoList(combinedVideoList(videoIds, 1)); videoCombination.setOneVideoIndex(0); return videoCombination; } private List> combinedVideoList(List videoIds, int count) { List> res = new ArrayList<>(); // 此处假设视频的打开率与上下文无关,只保证每个视频在每个位置都有发送 for (int i = 0; i < videoIds.size(); i++) { List list = new ArrayList<>(); for (int j = 0; j < count; j++) { int index = (i + j) % videoIds.size(); list.add(videoIds.get(index)); } res.add(list); } return res; } @XxlJob("assembleSendMessageJob") public ReturnT assembleSendMessage(String param) { XxlJobParam xxlJobParam = new XxlJobParam(); if (StringUtils.isNotEmpty(param)) { xxlJobParam = JSONObject.parseObject(param, XxlJobParam.class); } CorpExample corpExample = new CorpExample(); if (xxlJobParam.getCorpId() != null) { corpExample.createCriteria().andIdEqualTo(xxlJobParam.getCorpId()); } List corps = corpMapper.selectByExample(corpExample); if (CollectionUtils.isEmpty(corps)) { return ReturnT.SUCCESS; } List staffIds = new ArrayList<>(); if (xxlJobParam.getStaffId() != null) { staffIds.add(xxlJobParam.getStaffId()); } if (!CollectionUtils.isEmpty(xxlJobParam.getStaffIds())) { staffIds.addAll(xxlJobParam.getStaffIds()); } init(staffIds); for (Corp corp : corps) { UserExample userExample = new UserExample(); userExample.createCriteria().andExternalUserIdIsNotNull().andCorpIdEqualTo(corp.getId()); if (xxlJobParam.getUserId() != null) { userExample.createCriteria().andIdEqualTo(xxlJobParam.getUserId()); } long count = userMapper.countByExample(userExample); int page = 1; int pageSize = 1000; long totalPageSize = count / pageSize + 1; for (; page <= totalPageSize; page++) { userExample.setPage(new Page<>(page, pageSize)); List userList = userMapper.selectByExample(userExample); if (CollectionUtils.isEmpty(userList)) { continue; } //落库逻辑 List allSeneMessageList = new ArrayList<>(); for (User user : userList) { List sendMessageList = getSendMessage(user, staffIds, corp.getId()); if (CollectionUtils.isEmpty(sendMessageList)) { continue; } allSeneMessageList.addAll(sendMessageList); } if (CollectionUtils.isEmpty(allSeneMessageList)) { continue; } sendMessageMapper.insertList(allSeneMessageList); } } //组装好当天要发送的消息后 记录时间 saveGuaranteedVideoIdList(staffIds); return ReturnT.SUCCESS; } private void saveGuaranteedVideoIdList(List staffIds) { String key = String.format(GUARANTEED_MINI_PROGRAM_KEY, DateUtil.getThatDayDateString()); GuaranteedParam guaranteedParam = (GuaranteedParam) redisTemplate.opsForValue().get(key); if (guaranteedParam == null || CollectionUtils.isEmpty(guaranteedParam.getVideoParamList())) { return; } if (CollectionUtils.isEmpty(staffIds)) { Set videoIdSet = new HashSet<>(); for (VideoParam videoParam : guaranteedParam.getVideoParamList()) { if (CollectionUtils.isEmpty(videoParam.getVideoIds())) { continue; } videoIdSet.addAll(videoParam.getVideoIds()); } List videoIdList = new ArrayList<>(videoIdSet); MessageAttachmentExample example = new MessageAttachmentExample(); example.createCriteria().andMiniprogramVideoIdIn(videoIdList).andSendTimeIsNull(); MessageAttachment updateMessageAttachment = new MessageAttachment(); updateMessageAttachment.setSendTime(new Date()); messageAttachmentMapper.updateByExampleSelective(updateMessageAttachment, example); } else { for (VideoParam videoParam : guaranteedParam.getVideoParamList()) { if (!CollectionUtils.isEmpty(staffIds) && staffIds.contains(videoParam.getStaffId())) { if (CollectionUtils.isEmpty(videoParam.getVideoIds())) { continue; } MessageAttachmentExample example = new MessageAttachmentExample(); example.createCriteria().andMiniprogramVideoIdIn(videoParam.getVideoIds()).andStaffIdEqualTo(videoParam.getStaffId()); MessageAttachment updateMessageAttachment = new MessageAttachment(); updateMessageAttachment.setSendTime(new Date()); messageAttachmentMapper.updateByExampleSelective(updateMessageAttachment, example); } } } } private List getSendMessage(User user, List staffIds, Long corpId) { StaffWithUserExample example = new StaffWithUserExample(); StaffWithUserExample.Criteria criteria = example.createCriteria(); criteria.andUserIdEqualTo(user.getId()); criteria.andIsDeleteEqualTo(0); if (!CollectionUtils.isEmpty(staffIds)) { criteria.andStaffIdIn(staffIds); } List staffWithUserList = staffWithUserMapper.selectByExample(example); if (CollectionUtils.isEmpty(staffWithUserList)) { return Collections.emptyList(); } List sendMessageList = new ArrayList<>(); for (StaffWithUser staffWithUser : staffWithUserList) { SendMessage sendMessage = new SendMessage(); int n = fillHistoricalTopMessages(sendMessage, user.getId(), staffWithUser.getStaffId()); if (n < MAX_VIDEO_NUM) { // 保底数据 n = fillGuaranteedMessages(sendMessage, staffWithUser.getStaffId(), n); } if (n < MAX_VIDEO_NUM) { LarkRobotUtil.sendMessage("组装数据失败 user=" + user); throw new RuntimeException("组装数据失败"); } sendMessage.setCorpId(corpId); sendMessage.setStaffId(staffWithUser.getStaffId()); sendMessage.setUserId(staffWithUser.getUserId()); sendMessageList.add(sendMessage); } return sendMessageList; } private int fillHistoricalTopMessages(SendMessage sendMessage, Long userId, Long staffId) { List pushMessages = historicalTopMap.get(staffId); if (CollectionUtils.isEmpty(pushMessages)) { return 0; } int n = 0; for (PushMessage pushMessage : pushMessages) { if (pushMessage.getUserIds().contains(userId)) { setVideoId(sendMessage, n, pushMessage.getVideoId()); n++; if (n >= MAX_VIDEO_NUM) { break; } } } return n; } private int fillGuaranteedMessages(SendMessage sendMessage, Long staffId, int currentCount) { VideoCombination videoCombination = guaranteedVideoMap.get(staffId); if (videoCombination == null) { videoCombination = guaranteedVideoMap.get(0L); } if (videoCombination == null) { log.error("保底数据获取失败,staffId={}, guaranteedVideoMap={}", staffId, guaranteedVideoMap); LarkRobotUtil.sendMessage("组装数据时,保底数据获取异常"); throw new RuntimeException("保底数据获取异常"); } if (currentCount < MAX_VIDEO_NUM) { switch (currentCount) { case 0: currentCount = setVideoId(sendMessage, videoCombination.getThreeVideoList().get(videoCombination.getThreeVideoIndex()), currentCount); videoCombination.setThreeVideoIndex( (videoCombination.getThreeVideoIndex() + 1) % videoCombination.getThreeVideoList().size()); break; case 1: currentCount = setVideoId(sendMessage, videoCombination.getTwoVideoList().get(videoCombination.getTwoVideoIndex()), currentCount); videoCombination.setTwoVideoIndex( (videoCombination.getTwoVideoIndex() + 1) % videoCombination.getTwoVideoList().size()); break; case 2: currentCount = setVideoId(sendMessage, videoCombination.getOneVideoList().get(videoCombination.getOneVideoIndex()), currentCount); videoCombination.setOneVideoIndex( (videoCombination.getOneVideoIndex() + 1) % videoCombination.getOneVideoList().size()); break; default: break; } } return currentCount; } private int setVideoId(SendMessage sendMessage, List videoIds, Integer currentCount) { for (Long videoId : videoIds) { setVideoId(sendMessage, currentCount, videoId); currentCount++; if (currentCount >= MAX_VIDEO_NUM) { break; } } return currentCount; } private void setVideoId(SendMessage sendMessage, int index, Long videoId) { switch (index) { case 0: sendMessage.setVideoId1(videoId); break; case 1: sendMessage.setVideoId2(videoId); break; case 2: sendMessage.setVideoId3(videoId); break; default: break; } } @XxlJob("pushSendMessageJob") public ReturnT pushSendMessage(String param) { XxlJobParam xxlJobParam = new XxlJobParam(); if (StringUtils.isNotEmpty(param)) { xxlJobParam = JSONObject.parseObject(param, XxlJobParam.class); } List groupList = sendMessageMapper.getGroupList(DateUtil.getThatDayDate(), 0); if (xxlJobParam.getCorpId() != null) { Long corpId = xxlJobParam.getCorpId(); groupList = groupList.stream().filter(e -> Objects.equals(e.getCorpId(), corpId)).collect(Collectors.toList()); } if (CollectionUtils.isEmpty(groupList)) { return ReturnT.SUCCESS; } if (xxlJobParam.getStaffId() != null) { Long staffId = xxlJobParam.getStaffId(); groupList = groupList.stream().filter(e -> Objects.equals(e.getStaffId(), staffId)).collect(Collectors.toList()); } if (CollectionUtils.isEmpty(groupList)) { return ReturnT.SUCCESS; } for (SendMessage sendMessage : groupList) { pushAndUpdateMessage(sendMessage); } Map> groupedByStaffId = groupList.stream().collect(Collectors.groupingBy(SendMessage::getStaffId)); for (Map.Entry> entry : groupedByStaffId.entrySet()) { SendMessageExample example = new SendMessageExample(); example.createCriteria().andStaffIdEqualTo(entry.getKey()).andIsSendEqualTo(0) .andCreateTimeGreaterThan(DateUtil.getThatDayDate()); long l = sendMessageMapper.countByExample(example); //增加重试 if (l > 0) { List retryGroupList = sendMessageMapper.getGroupList(DateUtil.getThatDayDate(), 0); retryGroupList = retryGroupList.stream().filter(e -> Objects.equals(e.getStaffId(), entry.getKey())) .collect(Collectors.toList()); for (SendMessage sendMessage : retryGroupList) { pushAndUpdateMessage(sendMessage); } } } SendMessageExample example = new SendMessageExample(); example.createCriteria().andIsSendEqualTo(0).andCreateTimeGreaterThan(DateUtil.getThatDayDate()); long l = sendMessageMapper.countByExample(example); if (l > 0) { LarkRobotUtil.sendMessage("存在发送失败消息,请检查@薛一鸣"); } return ReturnT.SUCCESS; } private void pushAndUpdateMessage(SendMessage sendMessage) { sendMessage.setIsSend(0); sendMessage.setCreateTime(DateUtil.getThatDayDate()); List sendUserList = sendMessageMapper.selectExternalUserId(sendMessage); boolean flag = pushMessage(sendUserList, sendMessage); if (flag) { SendMessage updateSendMessage = new SendMessage(); updateSendMessage.setIsSend(1); SendMessageExample example = new SendMessageExample(); example.createCriteria() .andVideoId1EqualTo(sendMessage.getVideoId1()) .andVideoId2EqualTo(sendMessage.getVideoId2()) .andVideoId3EqualTo(sendMessage.getVideoId3()) .andStaffIdEqualTo(sendMessage.getStaffId()) .andCreateTimeGreaterThan(DateUtil.getThatDayDate()); sendMessageMapper.updateByExampleSelective(updateSendMessage, example); } } private boolean pushMessage(List sendUserList, SendMessage sendMessage) { List pushList = new ArrayList<>(); StaffExample staffExample = new StaffExample(); staffExample.createCriteria().andIdEqualTo(sendMessage.getStaffId()); List staffList = staffMapper.selectByExample(staffExample); Staff staff = staffList.get(0); JSONObject jsonObject = new JSONObject(); jsonObject.put("chat_type", "single"); JSONObject text = new JSONObject(); String content = messageService.getMessageText(); text.put("content", content); jsonObject.put("text", text); jsonObject.put("sender", staff.getCarrierId()); JSONArray attachments = new JSONArray(); List videoIdList = new ArrayList<>(); videoIdList.add(sendMessage.getVideoId1()); videoIdList.add(sendMessage.getVideoId2()); videoIdList.add(sendMessage.getVideoId3()); for (Long videoId : videoIdList) { JSONObject attachment = new JSONObject(); attachment.put("msgtype", "miniprogram"); MessageAttachmentExample example = new MessageAttachmentExample(); example.createCriteria().andMiniprogramVideoIdEqualTo(videoId); List messageAttachmentList = messageAttachmentMapper.selectByExample(example); if (CollectionUtils.isEmpty(messageAttachmentList)) { log.error("附件信息查询异常"); return false; } MessageAttachment messageAttachment = messageAttachmentList.get(0); JSONObject miniprogram = new JSONObject(); miniprogram.put("appid", messageAttachment.getAppid()); String title = messageAttachment.getTitle(); if (title.getBytes(StandardCharsets.UTF_8).length > MAX_BYTES) { title = ToolUtils.truncateString(title, MAX_BYTES - 3) + "..."; } miniprogram.put("title", title); String picMediaId = messageAttachmentService.getPicMediaId(messageAttachment.getCover(), sendMessage.getCorpId()); if (StringUtils.isEmpty(picMediaId)) { log.error("pushMessage getPicMediaId error cover={}", messageAttachment.getCover()); return false; } miniprogram.put("pic_media_id", picMediaId); String page = ""; String key = staff.getCarrierId() + "_" + videoId; if (pageMap.containsKey(key)) { page = pageMap.get(key); } else { page = messageAttachmentService.getPage(staff, videoId); pageMap.put(key, page); } if (StringUtils.isEmpty(page)) { log.error("pushMessage get page error videoId={} staff={}", videoId, staff); return false; } miniprogram.put("page", page); attachment.put("miniprogram", miniprogram); attachments.add(0, attachment); } jsonObject.put("attachments", attachments); List> lists = Lists.partition(sendUserList, 10000); for (List list : lists) { JSONArray externalUserIds = JSONArray.parseArray(JSON.toJSONString(list)); JSONObject newJSONObject = new JSONObject(); newJSONObject.putAll(jsonObject); newJSONObject.put("external_userid", externalUserIds); pushList.add(newJSONObject); } if (CollectionUtils.isEmpty(pushList)) { return false; } for (JSONObject pushJsonObject : pushList) { log.info("pushMessage pushJsonObject={}", pushJsonObject); boolean flag = messageService.pushWeComMessage(pushJsonObject, sendMessage.getCorpId()); if (!flag) { return flag; } } return true; } @XxlJob("existGuaranteesJob") public ReturnT existGuarantees(String param) { //保底视频获取 String key = String.format(GUARANTEED_MINI_PROGRAM_KEY, DateUtil.getNextDayDateString()); GuaranteedParam guaranteedParam = (GuaranteedParam) redisTemplate.opsForValue().get(key); if (guaranteedParam == null || CollectionUtils.isEmpty(guaranteedParam.getVideoParamList())) { LarkRobotUtil.sendMessage("保底视频异常,请检查" + DateUtil.getNextDayDateString()); } return ReturnT.SUCCESS; } }