package com.tzld.piaoquan.wecom.job; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.aliyun.odps.data.Record; import com.google.common.collect.Lists; import com.tzld.piaoquan.wecom.dao.mapper.*; import com.tzld.piaoquan.wecom.model.bo.PushMessage; import com.tzld.piaoquan.wecom.model.po.*; import com.tzld.piaoquan.wecom.service.MessageAttachmentService; import com.tzld.piaoquan.wecom.service.MessageService; import com.tzld.piaoquan.wecom.utils.*; import com.tzld.piaoquan.wecom.utils.page.Page; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.io.File; import java.io.InputStream; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.*; import java.util.stream.Collectors; import static com.tzld.piaoquan.wecom.common.constant.RedisConstant.GUARANTEED_MINIPROGRAM_KEY; import static com.tzld.piaoquan.wecom.common.constant.TimeConstant.MILLISECOND_DAY; @Log4j2 @Component public class WeComMessageDataJob { @Autowired private UserMapper userMapper; @Autowired private MessageAttachmentMapper messageAttachmentMapper; @Autowired private RedisTemplate redisTemplate; @Autowired private MessageService messageService; @Autowired private MessageAttachmentService messageAttachmentService; @Autowired private StaffWithUserMapper staffWithUserMapper; @Autowired private StaffMapper staffMapper; @Autowired SendMessageMapper sendMessageMapper; private static final int MAX_VIDEO_NUM = 3; //发送小程序标题限制字节数 private static final int MAX_BYTES = 64; //历史优质视频可推送用户列表 List goodHistoryPushList = new ArrayList<>(); //保底视频列表 List guaranteedVideoIdList = new ArrayList<>(); //从缓存中获取的保底视频数量 int getGuaranteedVideoIdNum = 0; Map pageMap = new HashMap<>(); //初始化操作 void init() { //历史优质视频获取 String sql = String.format("SELECT * FROM loghubods.history_good_video_can_push_user_list where dt = %s;", DateUtil.getBeforeDayDateString()); List recordList = OdpsUtil.getOdpsData(sql); if (CollectionUtils.isEmpty(recordList)) { return; } List list = new ArrayList<>(); for (Record record : recordList) { PushMessage pushMessage = new PushMessage(); Long videoId = Long.parseLong((String) record.get(0)); Set userIds = new HashSet<>(JSONObject.parseArray((String) record.get(1), Long.class)); pushMessage.setVideoId(videoId); pushMessage.setUserIds(userIds); list.add(pushMessage); } goodHistoryPushList = list; getGuaranteedVideoIdNum = 0; //保底视频获取 List videoIdList = Objects.requireNonNull(redisTemplate.opsForList().range(GUARANTEED_MINIPROGRAM_KEY, 0, -1)) .stream().map(o -> (Integer) o).map(String::valueOf).map(Long::parseLong).collect(Collectors.toList()); if (CollectionUtils.isEmpty(videoIdList)) { log.error("推送消息初始化失败,保底数据为空"); throw new RuntimeException("保底数据为空"); } List saveVideoIds = new ArrayList<>(); for (Long videoId : videoIdList) { getGuaranteedVideoIdNum++; MessageAttachmentExample example = new MessageAttachmentExample(); example.createCriteria().andMiniprogramVideoIdEqualTo(videoId); List messageAttachmentList = messageAttachmentMapper.selectByExample(example); if (CollectionUtils.isEmpty(messageAttachmentList)) { continue; } MessageAttachment messageAttachment = messageAttachmentList.get(0); if (messageAttachment.getSendTime() != null && DateUtil.dateDifference(new Date(), messageAttachment.getSendTime()) < 180 * MILLISECOND_DAY) { continue; } saveVideoIds.add(videoId); if (saveVideoIds.size() >= MAX_VIDEO_NUM) { break; } } if (saveVideoIds.size() < MAX_VIDEO_NUM) { log.error("推送消息初始化失败,保底数据不足"); throw new RuntimeException("保底数据不足"); } guaranteedVideoIdList = saveVideoIds; } @XxlJob("assembleSendMessageJob") public ReturnT assembleSendMessage(String param) { init(); Long staffId = null; if (StringUtils.isNotEmpty(param)) { staffId = Long.parseLong(param); } UserExample example = new UserExample(); example.createCriteria().andExternalUserId3rdPartyIsNotNull(); long count = userMapper.countByExample(example); int page = 1; int pageSize = 1000; long totalPageSize = count / pageSize + 1; for (; page <= totalPageSize; page++) { example.setPage(new Page<>(page, pageSize)); List userList = userMapper.selectByExample(example); if (CollectionUtils.isEmpty(userList)) { continue; } //落库逻辑 List allSeneMessageList = new ArrayList<>(); for (User user : userList) { List sendMessageList = getSendMessage(user, staffId); if (!CollectionUtils.isEmpty(sendMessageList)) { allSeneMessageList.addAll(sendMessageList); } } if (!CollectionUtils.isEmpty(allSeneMessageList)) { sendMessageMapper.insertList(allSeneMessageList); } } //组装好当天要发送的消息后 记录时间 删除保底数据 saveGuaranteedVideoIdList(guaranteedVideoIdList); return ReturnT.SUCCESS; } public void saveGuaranteedVideoIdList(List videoIdList) { MessageAttachmentExample example = new MessageAttachmentExample(); example.createCriteria().andMiniprogramVideoIdIn(videoIdList); List messageAttachmentList = messageAttachmentMapper.selectByExample(example); for (MessageAttachment messageAttachment : messageAttachmentList) { MessageAttachment updateMessageAttachment = new MessageAttachment(); updateMessageAttachment.setId(messageAttachment.getId()); updateMessageAttachment.setSendTime(new Date()); messageAttachmentMapper.updateByPrimaryKeySelective(updateMessageAttachment); } log.info("getGuaranteedVideoIdNum={}", getGuaranteedVideoIdNum); //移除从redis中获取的保底数据 for (int i = 0; i < getGuaranteedVideoIdNum; i++) { redisTemplate.opsForList().leftPop(GUARANTEED_MINIPROGRAM_KEY); } } public List getSendMessage(User user, Long staffId) { StaffWithUserExample example = new StaffWithUserExample(); StaffWithUserExample.Criteria criteria = example.createCriteria(); criteria.andUserIdEqualTo(user.getId()); if (staffId != null) { criteria.andUserIdEqualTo(staffId); } List staffWithUserList = staffWithUserMapper.selectByExample(example); if (CollectionUtils.isEmpty(staffWithUserList)) { return null; } int n = 0; List sendMessageList = new ArrayList<>(); SendMessage sendMessage = new SendMessage(); for (PushMessage pushMessage : goodHistoryPushList) { if (pushMessage.getUserIds().contains(user.getId())) { if (n == 0) { sendMessage.setVideoId1(pushMessage.getVideoId()); } if (n == 1) { sendMessage.setVideoId2(pushMessage.getVideoId()); } if (n == 2) { sendMessage.setVideoId3(pushMessage.getVideoId()); } n++; if (n >= MAX_VIDEO_NUM) { break; } } } //保底数据 if (n < MAX_VIDEO_NUM) { for (Long videoId : guaranteedVideoIdList) { if (n == 0) { sendMessage.setVideoId1(videoId); } if (n == 1) { sendMessage.setVideoId2(videoId); } if (n == 2) { sendMessage.setVideoId3(videoId); } n++; if (n >= MAX_VIDEO_NUM) { break; } } } if (n < MAX_VIDEO_NUM) { log.error("组装数据失败 user={}", user); return null; } for (StaffWithUser staffWithUser : staffWithUserList) { SendMessage newSendMessage = new SendMessage(); BeanUtils.copyProperties(sendMessage, newSendMessage); newSendMessage.setStaffId(staffWithUser.getStaffId()); newSendMessage.setUserId(staffWithUser.getUserId()); sendMessageList.add(newSendMessage); } return sendMessageList; } @XxlJob("pushSendMessageJob") public ReturnT pushSendMessage(String param) { List groupList = sendMessageMapper.getGroupList(DateUtil.getThatDayDate(), 0); if (StringUtils.isNotEmpty(param)) { groupList = groupList.stream().filter(e -> e.getStaffId() == Long.parseLong(param)).collect(Collectors.toList()); } if (CollectionUtils.isEmpty(groupList)) { return ReturnT.SUCCESS; } for (SendMessage sendMessage : groupList) { sendMessage.setIsSend(0); sendMessage.setCreateTime(DateUtil.getThatDayDate()); List sendUserList = sendMessageMapper.selectExternalUserId3rdParty(sendMessage); boolean flag = pushMessage(sendUserList, sendMessage); if (flag) { SendMessage updateSendMessage = new SendMessage(); updateSendMessage.setIsSend(1); SendMessageExample example = new SendMessageExample(); example.createCriteria() .andVideoId1EqualTo(sendMessage.getVideoId1()) .andVideoId2EqualTo(sendMessage.getVideoId2()) .andVideoId3EqualTo(sendMessage.getVideoId3()) .andStaffIdEqualTo(sendMessage.getStaffId()) .andCreateTimeGreaterThan(DateUtil.getThatDayDate()); sendMessageMapper.updateByExampleSelective(updateSendMessage, example); } } return ReturnT.SUCCESS; } public boolean pushMessage(List sendUserList, SendMessage sendMessage) { List pushList = new ArrayList<>(); StaffExample staffExample = new StaffExample(); staffExample.createCriteria().andIdEqualTo(sendMessage.getStaffId()); List staffList = staffMapper.selectByExample(staffExample); Staff staff = staffList.get(0); String text = messageService.getMessageText(); String name = MessageUtil.getName(staff.getRemark()); JSONObject jsonObject = new JSONObject(); jsonObject.put("name", name); jsonObject.put("text", text); JSONArray attachments = new JSONArray(); List videoIdList = new ArrayList<>(); videoIdList.add(sendMessage.getVideoId1()); videoIdList.add(sendMessage.getVideoId2()); videoIdList.add(sendMessage.getVideoId3()); for (Long videoId : videoIdList) { JSONObject attachment = new JSONObject(); attachment.put("msgtype", "miniprogram"); MessageAttachmentExample example = new MessageAttachmentExample(); example.createCriteria().andMiniprogramVideoIdEqualTo(videoId); List messageAttachmentList = messageAttachmentMapper.selectByExample(example); if (CollectionUtils.isEmpty(messageAttachmentList)) { throw new RuntimeException("附件信息查询异常"); } MessageAttachment messageAttachment = messageAttachmentList.get(0); JSONObject miniprogram = new JSONObject(); miniprogram.put("appid", messageAttachment.getAppid()); String title = messageAttachment.getTitle(); if (title.getBytes(StandardCharsets.UTF_8).length > MAX_BYTES) { title = ToolUtils.truncateString(title, MAX_BYTES - 3) + "..."; } miniprogram.put("title", title); miniprogram.put("cover", messageAttachment.getCover()); String page = ""; String key = staff.getStaffExtId() + "_" + videoId; if (pageMap.containsKey(key)) { page = pageMap.get(key); } else { page = messageAttachmentService.getPage(staff, videoId); pageMap.put(key, page); } if (StringUtils.isEmpty(page)) { throw new RuntimeException("获取page失败"); } miniprogram.put("page", page); attachment.put("miniprogram", miniprogram); attachments.add(0, attachment); } jsonObject.put("attachments", attachments); List> lists = Lists.partition(sendUserList, 10000); for (List list : lists) { List staffEuList = new ArrayList<>(); JSONObject newJSONObject = new JSONObject(); newJSONObject.putAll(jsonObject); JSONObject staff_eu = new JSONObject(); staff_eu.put("staff_ext_id", staff.getStaffExtId()); staff_eu.put("eu_ext_ids", list); staffEuList.add(staff_eu); newJSONObject.put("staff_eu_list", staffEuList); pushList.add(newJSONObject); } if (CollectionUtils.isEmpty(pushList)) { return false; } for (JSONObject pushJsonObject : pushList) { log.info("pushMessage pushJsonObject={}", pushJsonObject); boolean flag = messageService.pushMessage(pushJsonObject); if (!flag) { return flag; } } return true; } }