package com.tzld.piaoquan.wecom.job; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.aliyun.odps.data.Record; import com.google.common.collect.Lists; import com.tzld.piaoquan.wecom.dao.mapper.*; import com.tzld.piaoquan.wecom.model.bo.PushMessage; import com.tzld.piaoquan.wecom.model.bo.VideoParam; import com.tzld.piaoquan.wecom.model.bo.XxlJobParam; import com.tzld.piaoquan.wecom.model.po.*; import com.tzld.piaoquan.wecom.model.vo.GuaranteedParam; import com.tzld.piaoquan.wecom.service.MessageAttachmentService; import com.tzld.piaoquan.wecom.service.MessageService; import com.tzld.piaoquan.wecom.utils.DateUtil; import com.tzld.piaoquan.wecom.utils.LarkRobotUtil; import com.tzld.piaoquan.wecom.utils.OdpsUtil; import com.tzld.piaoquan.wecom.utils.ToolUtils; import com.tzld.piaoquan.wecom.utils.page.Page; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.stream.Collectors; import static com.tzld.piaoquan.wecom.common.constant.MessageConstant.MAX_VIDEO_NUM; import static com.tzld.piaoquan.wecom.common.constant.RedisConstant.GUARANTEED_MINI_PROGRAM_KEY; import static com.tzld.piaoquan.wecom.common.constant.TimeConstant.MILLISECOND_DAY; @Log4j2 @Component public class WeComMessageDataJob { @Autowired private UserMapper userMapper; @Autowired private MessageAttachmentMapper messageAttachmentMapper; @Autowired private RedisTemplate redisTemplate; @Autowired private MessageService messageService; @Autowired private MessageAttachmentService messageAttachmentService; @Autowired private StaffWithUserMapper staffWithUserMapper; @Autowired private StaffMapper staffMapper; @Autowired private SendMessageMapper sendMessageMapper; @Autowired private CorpMapper corpMapper; //发送小程序标题限制字节数 private static final int MAX_BYTES = 64; //历史优质视频可推送用户列表 Map> historicalTopMap = new HashMap<>(); //保底视频列表 Map> guaranteedVideoMap = new HashMap<>(); Map pageMap = new HashMap<>(); //初始化操作 private void init() { //历史优质视频获取 String sql = String.format("SELECT * FROM loghubods.history_good_video_can_push_user_list where dt = %s;", DateUtil.getBeforeDayDateString()); List recordList = OdpsUtil.getOdpsData(sql); if (CollectionUtils.isEmpty(recordList)) { return; } List list = new ArrayList<>(); for (Record record : recordList) { PushMessage pushMessage = new PushMessage(); Long videoId = Long.parseLong((String) record.get(0)); Set userIds = new HashSet<>(JSONObject.parseArray((String) record.get(1), Long.class)); Long staffId = Long.parseLong((String) record.get(2)); Double score = Double.parseDouble((String) record.get(3)); pushMessage.setVideoId(videoId); pushMessage.setUserIds(userIds); pushMessage.setStaffId(staffId); pushMessage.setScore(score); list.add(pushMessage); } Map> groupedMap = list.stream() .collect(Collectors.groupingBy(PushMessage::getStaffId, Collectors.mapping(pushMessage -> pushMessage, Collectors.toList()))) .entrySet() .stream() .collect(Collectors.toMap( Map.Entry::getKey, entry -> entry.getValue().stream() .sorted(Comparator.comparing(PushMessage::getScore).reversed()) // 根据 score 降序排序 .collect(Collectors.toList()) )); historicalTopMap = groupedMap; //保底视频获取 String key = String.format(GUARANTEED_MINI_PROGRAM_KEY, DateUtil.getThatDayDateString()); GuaranteedParam guaranteedParam = (GuaranteedParam) redisTemplate.opsForValue().get(key); if (guaranteedParam == null || CollectionUtils.isEmpty(guaranteedParam.getVideoParamList())) { LarkRobotUtil.sendMessage("保底视频获取异常,请检查" + DateUtil.getThatDayDateString()); throw new RuntimeException(); } Map> videoMap = new HashMap<>(); for (VideoParam videoParam : guaranteedParam.getVideoParamList()) { if (videoParam.getStaffId() == null) { LarkRobotUtil.sendMessage("保底视频获取异常,StaffId为空" + DateUtil.getThatDayDateString()); throw new RuntimeException(); } if (CollectionUtils.isEmpty(videoParam.getVideoIds()) || videoParam.getVideoIds().size() < MAX_VIDEO_NUM) { LarkRobotUtil.sendMessage("保底视频数量异常,请查看" + guaranteedParam.getDate() + videoParam.getStaffId()); throw new RuntimeException(); } for (Long videoId : videoParam.getVideoIds()) { MessageAttachmentExample example = new MessageAttachmentExample(); example.createCriteria().andMiniprogramVideoIdEqualTo(videoId).andStaffIdEqualTo(videoParam.getStaffId()); List messageAttachmentList = messageAttachmentMapper.selectByExample(example); if (CollectionUtils.isEmpty(messageAttachmentList)) { LarkRobotUtil.sendMessage("保底视频不存在,请查看videoId=" + videoId); throw new RuntimeException(); } MessageAttachment messageAttachment = messageAttachmentList.get(0); if (messageAttachment.getSendTime() != null && DateUtil.dateDifference(new Date(), messageAttachment.getSendTime()) < 180 * MILLISECOND_DAY) { LarkRobotUtil.sendMessage("保底视频半年内已发送,请查看videoId=" + videoId); throw new RuntimeException(); } } videoMap.put(videoParam.getStaffId(), videoParam.getVideoIds()); } if (!videoMap.containsKey(0L)) { LarkRobotUtil.sendMessage("保底视频没有默认组,请查看" + guaranteedParam.getDate()); throw new RuntimeException(); } this.guaranteedVideoMap = videoMap; } @XxlJob("assembleSendMessageJob") public ReturnT assembleSendMessage(String param) { XxlJobParam xxlJobParam = new XxlJobParam(); if (StringUtils.isNotEmpty(param)) { xxlJobParam = JSONObject.parseObject(param, XxlJobParam.class); } init(); CorpExample corpExample = new CorpExample(); if (xxlJobParam.getCorpId() != null) { corpExample.createCriteria().andIdEqualTo(xxlJobParam.getCorpId()); } List corps = corpMapper.selectByExample(corpExample); if (CollectionUtils.isEmpty(corps)) { return ReturnT.SUCCESS; } for (Corp corp : corps) { Long staffId = null; if (xxlJobParam.getStaffId() != null) { staffId = xxlJobParam.getStaffId(); } UserExample userExample = new UserExample(); userExample.createCriteria().andExternalUserIdIsNotNull().andCorpIdEqualTo(corp.getId()); if (xxlJobParam.getUserId() != null) { userExample.createCriteria().andIdEqualTo(xxlJobParam.getUserId()); } long count = userMapper.countByExample(userExample); int page = 1; int pageSize = 1000; long totalPageSize = count / pageSize + 1; for (; page <= totalPageSize; page++) { userExample.setPage(new Page<>(page, pageSize)); List userList = userMapper.selectByExample(userExample); if (CollectionUtils.isEmpty(userList)) { continue; } //落库逻辑 List allSeneMessageList = new ArrayList<>(); for (User user : userList) { List sendMessageList = getSendMessage(user, staffId, corp.getId()); if (CollectionUtils.isEmpty(sendMessageList)) { continue; } allSeneMessageList.addAll(sendMessageList); } if (CollectionUtils.isEmpty(allSeneMessageList)) { continue; } sendMessageMapper.insertList(allSeneMessageList); } } //组装好当天要发送的消息后 记录时间 删除保底数据 saveGuaranteedVideoIdList(); return ReturnT.SUCCESS; } private void saveGuaranteedVideoIdList() { String key = String.format(GUARANTEED_MINI_PROGRAM_KEY, DateUtil.getThatDayDateString()); GuaranteedParam guaranteedParam = (GuaranteedParam) redisTemplate.opsForValue().get(key); if (guaranteedParam == null || CollectionUtils.isEmpty(guaranteedParam.getVideoParamList())) { return; } List videoIdList = new ArrayList<>(); for (VideoParam videoParam : guaranteedParam.getVideoParamList()) { if (CollectionUtils.isEmpty(videoParam.getVideoIds())) { continue; } videoIdList.addAll(videoParam.getVideoIds()); } MessageAttachmentExample example = new MessageAttachmentExample(); example.createCriteria().andMiniprogramVideoIdIn(videoIdList); List messageAttachmentList = messageAttachmentMapper.selectByExample(example); for (MessageAttachment messageAttachment : messageAttachmentList) { MessageAttachment updateMessageAttachment = new MessageAttachment(); updateMessageAttachment.setId(messageAttachment.getId()); updateMessageAttachment.setSendTime(new Date()); messageAttachmentMapper.updateByPrimaryKeySelective(updateMessageAttachment); } redisTemplate.delete(key); } private List getSendMessage(User user, Long staffId, Long corpId) { StaffWithUserExample example = new StaffWithUserExample(); StaffWithUserExample.Criteria criteria = example.createCriteria(); criteria.andUserIdEqualTo(user.getId()); criteria.andIsDeleteEqualTo(0); if (staffId != null) { criteria.andStaffIdEqualTo(staffId); } List staffWithUserList = staffWithUserMapper.selectByExample(example); if (CollectionUtils.isEmpty(staffWithUserList)) { return Collections.emptyList(); } List sendMessageList = new ArrayList<>(); for (StaffWithUser staffWithUser : staffWithUserList) { SendMessage sendMessage = new SendMessage(); int n = fillHistoricalTopMessages(sendMessage, user.getId(), staffWithUser.getStaffId()); if (n < MAX_VIDEO_NUM) { // 保底数据 n = fillGuaranteedMessages(sendMessage, staffWithUser.getStaffId(), n); } if (n < MAX_VIDEO_NUM) { LarkRobotUtil.sendMessage("组装数据失败 user=" + user); throw new RuntimeException("组装数据失败"); } sendMessage.setCorpId(corpId); sendMessage.setStaffId(staffWithUser.getStaffId()); sendMessage.setUserId(staffWithUser.getUserId()); sendMessageList.add(sendMessage); } return sendMessageList; } private int fillHistoricalTopMessages(SendMessage sendMessage, Long userId, Long staffId) { List list = historicalTopMap.get(staffId); if (!CollectionUtils.isEmpty(list)) { int n = 0; for (PushMessage pushMessage : list) { if (pushMessage.getUserIds().contains(userId)) { setVideoId(sendMessage, n, pushMessage.getVideoId()); n++; if (n >= MAX_VIDEO_NUM) { break; } } } return n; } return 0; } private int fillGuaranteedMessages(SendMessage sendMessage, Long staffId, int currentCount) { List guaranteedVideoIdList = guaranteedVideoMap.get(staffId); if (CollectionUtils.isEmpty(guaranteedVideoIdList)) { guaranteedVideoIdList = guaranteedVideoMap.get(0L); } if (CollectionUtils.isEmpty(guaranteedVideoIdList)) { LarkRobotUtil.sendMessage("组装数据时,保底数据获取异常"); throw new RuntimeException("保底数据获取异常"); } if (currentCount < MAX_VIDEO_NUM) { for (Long videoId : guaranteedVideoIdList) { setVideoId(sendMessage, currentCount, videoId); currentCount++; if (currentCount >= MAX_VIDEO_NUM) { break; } } } return currentCount; } private void setVideoId(SendMessage sendMessage, int index, Long videoId) { switch (index) { case 0: sendMessage.setVideoId1(videoId); break; case 1: sendMessage.setVideoId2(videoId); break; case 2: sendMessage.setVideoId3(videoId); break; default: break; } } @XxlJob("pushSendMessageJob") public ReturnT pushSendMessage(String param) { XxlJobParam xxlJobParam = new XxlJobParam(); if (StringUtils.isNotEmpty(param)) { xxlJobParam = JSONObject.parseObject(param, XxlJobParam.class); } List groupList = sendMessageMapper.getGroupList(DateUtil.getThatDayDate(), 0); if (xxlJobParam.getCorpId() != null) { Long corpId = xxlJobParam.getCorpId(); groupList = groupList.stream().filter(e -> Objects.equals(e.getCorpId(), corpId)).collect(Collectors.toList()); } if (CollectionUtils.isEmpty(groupList)) { return ReturnT.SUCCESS; } if (xxlJobParam.getStaffId() != null) { Long staffId = xxlJobParam.getStaffId(); groupList = groupList.stream().filter(e -> Objects.equals(e.getStaffId(), staffId)).collect(Collectors.toList()); } if (CollectionUtils.isEmpty(groupList)) { return ReturnT.SUCCESS; } for (SendMessage sendMessage : groupList) { sendMessage.setIsSend(0); sendMessage.setCreateTime(DateUtil.getThatDayDate()); List sendUserList = sendMessageMapper.selectExternalUserId(sendMessage); boolean flag = pushMessage(sendUserList, sendMessage); if (flag) { SendMessage updateSendMessage = new SendMessage(); updateSendMessage.setIsSend(1); SendMessageExample example = new SendMessageExample(); example.createCriteria() .andVideoId1EqualTo(sendMessage.getVideoId1()) .andVideoId2EqualTo(sendMessage.getVideoId2()) .andVideoId3EqualTo(sendMessage.getVideoId3()) .andStaffIdEqualTo(sendMessage.getStaffId()) .andCreateTimeGreaterThan(DateUtil.getThatDayDate()); sendMessageMapper.updateByExampleSelective(updateSendMessage, example); } } return ReturnT.SUCCESS; } private boolean pushMessage(List sendUserList, SendMessage sendMessage) { List pushList = new ArrayList<>(); StaffExample staffExample = new StaffExample(); staffExample.createCriteria().andIdEqualTo(sendMessage.getStaffId()); List staffList = staffMapper.selectByExample(staffExample); Staff staff = staffList.get(0); JSONObject jsonObject = new JSONObject(); jsonObject.put("chat_type", "single"); JSONObject text = new JSONObject(); String content = messageService.getMessageText(); text.put("content", content); jsonObject.put("text", text); jsonObject.put("sender", staff.getCarrierId()); JSONArray attachments = new JSONArray(); List videoIdList = new ArrayList<>(); videoIdList.add(sendMessage.getVideoId1()); videoIdList.add(sendMessage.getVideoId2()); videoIdList.add(sendMessage.getVideoId3()); for (Long videoId : videoIdList) { JSONObject attachment = new JSONObject(); attachment.put("msgtype", "miniprogram"); MessageAttachmentExample example = new MessageAttachmentExample(); example.createCriteria().andMiniprogramVideoIdEqualTo(videoId); List messageAttachmentList = messageAttachmentMapper.selectByExample(example); if (CollectionUtils.isEmpty(messageAttachmentList)) { throw new RuntimeException("附件信息查询异常"); } MessageAttachment messageAttachment = messageAttachmentList.get(0); JSONObject miniprogram = new JSONObject(); miniprogram.put("appid", messageAttachment.getAppid()); String title = messageAttachment.getTitle(); if (title.getBytes(StandardCharsets.UTF_8).length > MAX_BYTES) { title = ToolUtils.truncateString(title, MAX_BYTES - 3) + "..."; } miniprogram.put("title", title); String picMediaId = messageAttachmentService.getPicMediaId(messageAttachment.getCover(), sendMessage.getCorpId()); if (StringUtils.isEmpty(picMediaId)) { log.error("pushMessage getPicMediaId error cover={}", messageAttachment.getCover()); return false; } miniprogram.put("pic_media_id", picMediaId); String page = ""; String key = staff.getCarrierId() + "_" + videoId; if (pageMap.containsKey(key)) { page = pageMap.get(key); } else { page = messageAttachmentService.getPage(staff, videoId); pageMap.put(key, page); } if (StringUtils.isEmpty(page)) { log.error("pushMessage get page error videoId={} staff={}", videoId, staff); return false; } miniprogram.put("page", page); attachment.put("miniprogram", miniprogram); attachments.add(0, attachment); } jsonObject.put("attachments", attachments); List> lists = Lists.partition(sendUserList, 10000); for (List list : lists) { JSONArray externalUserIds = JSONArray.parseArray(JSON.toJSONString(list)); JSONObject newJSONObject = new JSONObject(); newJSONObject.putAll(jsonObject); newJSONObject.put("external_userid", externalUserIds); pushList.add(newJSONObject); } if (CollectionUtils.isEmpty(pushList)) { return false; } for (JSONObject pushJsonObject : pushList) { log.info("pushMessage pushJsonObject={}", pushJsonObject); boolean flag = messageService.pushWeComMessage(pushJsonObject, sendMessage.getCorpId()); if (!flag) { return flag; } } return true; } }