123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452 |
- package com.tzld.piaoquan.wecom.job;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import com.aliyun.odps.data.Record;
- import com.google.common.collect.Lists;
- import com.tzld.piaoquan.wecom.dao.mapper.*;
- import com.tzld.piaoquan.wecom.model.bo.PushMessage;
- import com.tzld.piaoquan.wecom.model.bo.VideoParam;
- import com.tzld.piaoquan.wecom.model.bo.XxlJobParam;
- import com.tzld.piaoquan.wecom.model.po.*;
- import com.tzld.piaoquan.wecom.model.vo.GuaranteedParam;
- import com.tzld.piaoquan.wecom.service.MessageAttachmentService;
- import com.tzld.piaoquan.wecom.service.MessageService;
- import com.tzld.piaoquan.wecom.utils.DateUtil;
- import com.tzld.piaoquan.wecom.utils.LarkRobotUtil;
- import com.tzld.piaoquan.wecom.utils.OdpsUtil;
- import com.tzld.piaoquan.wecom.utils.ToolUtils;
- import com.tzld.piaoquan.wecom.utils.page.Page;
- import com.xxl.job.core.biz.model.ReturnT;
- import com.xxl.job.core.handler.annotation.XxlJob;
- import lombok.extern.log4j.Log4j2;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.stereotype.Component;
- import org.springframework.util.CollectionUtils;
- import java.nio.charset.StandardCharsets;
- import java.util.*;
- import java.util.stream.Collectors;
- import static com.tzld.piaoquan.wecom.common.constant.MessageConstant.MAX_VIDEO_NUM;
- import static com.tzld.piaoquan.wecom.common.constant.RedisConstant.GUARANTEED_MINI_PROGRAM_KEY;
- import static com.tzld.piaoquan.wecom.common.constant.TimeConstant.MILLISECOND_DAY;
- @Log4j2
- @Component
- public class WeComMessageDataJob {
- @Autowired
- private UserMapper userMapper;
- @Autowired
- private MessageAttachmentMapper messageAttachmentMapper;
- @Autowired
- private RedisTemplate<String, Object> redisTemplate;
- @Autowired
- private MessageService messageService;
- @Autowired
- private MessageAttachmentService messageAttachmentService;
- @Autowired
- private StaffWithUserMapper staffWithUserMapper;
- @Autowired
- private StaffMapper staffMapper;
- @Autowired
- private SendMessageMapper sendMessageMapper;
- @Autowired
- private CorpMapper corpMapper;
- //发送小程序标题限制字节数
- private static final int MAX_BYTES = 64;
- //历史优质视频可推送用户列表
- Map<Long, List<PushMessage>> historicalTopMap = new HashMap<>();
- //保底视频列表
- Map<Long, List<Long>> guaranteedVideoMap = new HashMap<>();
- Map<String, String> pageMap = new HashMap<>();
- //初始化操作
- private void init() {
- //历史优质视频获取
- String sql = String.format("SELECT * FROM loghubods.history_good_video_can_push_user_list where dt = %s;",
- DateUtil.getBeforeDayDateString());
- List<Record> recordList = OdpsUtil.getOdpsData(sql);
- if (CollectionUtils.isEmpty(recordList)) {
- return;
- }
- List<PushMessage> list = new ArrayList<>();
- for (Record record : recordList) {
- PushMessage pushMessage = new PushMessage();
- Long videoId = Long.parseLong((String) record.get(0));
- Set<Long> userIds = new HashSet<>(JSONObject.parseArray((String) record.get(1), Long.class));
- Long staffId = Long.parseLong((String) record.get(2));
- Double score = Double.parseDouble((String) record.get(3));
- pushMessage.setVideoId(videoId);
- pushMessage.setUserIds(userIds);
- pushMessage.setStaffId(staffId);
- pushMessage.setScore(score);
- list.add(pushMessage);
- }
- Map<Long, List<PushMessage>> groupedMap = list.stream()
- .collect(Collectors.groupingBy(PushMessage::getStaffId,
- Collectors.mapping(pushMessage -> pushMessage,
- Collectors.toList())))
- .entrySet()
- .stream()
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- entry -> entry.getValue().stream()
- .sorted(Comparator.comparing(PushMessage::getScore).reversed()) // 根据 score 降序排序
- .collect(Collectors.toList())
- ));
- historicalTopMap = groupedMap;
- //保底视频获取
- String key = String.format(GUARANTEED_MINI_PROGRAM_KEY, DateUtil.getThatDayDateString());
- GuaranteedParam guaranteedParam = (GuaranteedParam) redisTemplate.opsForValue().get(key);
- if (guaranteedParam == null
- || CollectionUtils.isEmpty(guaranteedParam.getVideoParamList())) {
- LarkRobotUtil.sendMessage("保底视频获取异常,请检查" + DateUtil.getThatDayDateString());
- throw new RuntimeException();
- }
- Map<Long, List<Long>> videoMap = new HashMap<>();
- for (VideoParam videoParam : guaranteedParam.getVideoParamList()) {
- if (videoParam.getStaffId() == null) {
- LarkRobotUtil.sendMessage("保底视频获取异常,StaffId为空" + DateUtil.getThatDayDateString());
- throw new RuntimeException();
- }
- if (CollectionUtils.isEmpty(videoParam.getVideoIds()) || videoParam.getVideoIds().size() < MAX_VIDEO_NUM) {
- LarkRobotUtil.sendMessage("保底视频数量异常,请查看" + guaranteedParam.getDate() + videoParam.getStaffId());
- throw new RuntimeException();
- }
- for (Long videoId : videoParam.getVideoIds()) {
- MessageAttachmentExample example = new MessageAttachmentExample();
- example.createCriteria().andMiniprogramVideoIdEqualTo(videoId).andStaffIdEqualTo(videoParam.getStaffId());
- List<MessageAttachment> messageAttachmentList = messageAttachmentMapper.selectByExample(example);
- if (CollectionUtils.isEmpty(messageAttachmentList)) {
- LarkRobotUtil.sendMessage("保底视频不存在,请查看videoId=" + videoId);
- throw new RuntimeException();
- }
- MessageAttachment messageAttachment = messageAttachmentList.get(0);
- if (messageAttachment.getSendTime() != null
- && DateUtil.dateDifference(new Date(), messageAttachment.getSendTime()) < 180 * MILLISECOND_DAY) {
- LarkRobotUtil.sendMessage("保底视频半年内已发送,请查看videoId=" + videoId);
- throw new RuntimeException();
- }
- }
- videoMap.put(videoParam.getStaffId(), videoParam.getVideoIds());
- }
- if (!videoMap.containsKey(0L)) {
- LarkRobotUtil.sendMessage("保底视频没有默认组,请查看" + guaranteedParam.getDate());
- throw new RuntimeException();
- }
- this.guaranteedVideoMap = videoMap;
- }
- @XxlJob("assembleSendMessageJob")
- public ReturnT<String> assembleSendMessage(String param) {
- XxlJobParam xxlJobParam = new XxlJobParam();
- if (StringUtils.isNotEmpty(param)) {
- xxlJobParam = JSONObject.parseObject(param, XxlJobParam.class);
- }
- init();
- CorpExample corpExample = new CorpExample();
- if (xxlJobParam.getCorpId() != null) {
- corpExample.createCriteria().andIdEqualTo(xxlJobParam.getCorpId());
- }
- List<Corp> corps = corpMapper.selectByExample(corpExample);
- if (CollectionUtils.isEmpty(corps)) {
- return ReturnT.SUCCESS;
- }
- for (Corp corp : corps) {
- Long staffId = null;
- if (xxlJobParam.getStaffId() != null) {
- staffId = xxlJobParam.getStaffId();
- }
- UserExample userExample = new UserExample();
- userExample.createCriteria().andExternalUserIdIsNotNull().andCorpIdEqualTo(corp.getId());
- if (xxlJobParam.getUserId() != null) {
- userExample.createCriteria().andIdEqualTo(xxlJobParam.getUserId());
- }
- long count = userMapper.countByExample(userExample);
- int page = 1;
- int pageSize = 1000;
- long totalPageSize = count / pageSize + 1;
- for (; page <= totalPageSize; page++) {
- userExample.setPage(new Page<>(page, pageSize));
- List<User> userList = userMapper.selectByExample(userExample);
- if (CollectionUtils.isEmpty(userList)) {
- continue;
- }
- //落库逻辑
- List<SendMessage> allSeneMessageList = new ArrayList<>();
- for (User user : userList) {
- List<SendMessage> sendMessageList = getSendMessage(user, staffId, corp.getId());
- if (CollectionUtils.isEmpty(sendMessageList)) {
- continue;
- }
- allSeneMessageList.addAll(sendMessageList);
- }
- if (CollectionUtils.isEmpty(allSeneMessageList)) {
- continue;
- }
- sendMessageMapper.insertList(allSeneMessageList);
- }
- }
- //组装好当天要发送的消息后 记录时间 删除保底数据
- saveGuaranteedVideoIdList();
- return ReturnT.SUCCESS;
- }
- private void saveGuaranteedVideoIdList() {
- String key = String.format(GUARANTEED_MINI_PROGRAM_KEY, DateUtil.getThatDayDateString());
- GuaranteedParam guaranteedParam = (GuaranteedParam) redisTemplate.opsForValue().get(key);
- if (guaranteedParam == null || CollectionUtils.isEmpty(guaranteedParam.getVideoParamList())) {
- return;
- }
- List<Long> videoIdList = new ArrayList<>();
- for (VideoParam videoParam : guaranteedParam.getVideoParamList()) {
- if (CollectionUtils.isEmpty(videoParam.getVideoIds())) {
- continue;
- }
- videoIdList.addAll(videoParam.getVideoIds());
- }
- MessageAttachmentExample example = new MessageAttachmentExample();
- example.createCriteria().andMiniprogramVideoIdIn(videoIdList);
- List<MessageAttachment> messageAttachmentList = messageAttachmentMapper.selectByExample(example);
- for (MessageAttachment messageAttachment : messageAttachmentList) {
- MessageAttachment updateMessageAttachment = new MessageAttachment();
- updateMessageAttachment.setId(messageAttachment.getId());
- updateMessageAttachment.setSendTime(new Date());
- messageAttachmentMapper.updateByPrimaryKeySelective(updateMessageAttachment);
- }
- redisTemplate.delete(key);
- }
- private List<SendMessage> getSendMessage(User user, Long staffId, Long corpId) {
- StaffWithUserExample example = new StaffWithUserExample();
- StaffWithUserExample.Criteria criteria = example.createCriteria();
- criteria.andUserIdEqualTo(user.getId());
- criteria.andIsDeleteEqualTo(0);
- if (staffId != null) {
- criteria.andStaffIdEqualTo(staffId);
- }
- List<StaffWithUser> staffWithUserList = staffWithUserMapper.selectByExample(example);
- if (CollectionUtils.isEmpty(staffWithUserList)) {
- return Collections.emptyList();
- }
- List<SendMessage> sendMessageList = new ArrayList<>();
- for (StaffWithUser staffWithUser : staffWithUserList) {
- SendMessage sendMessage = new SendMessage();
- int n = fillHistoricalTopMessages(sendMessage, user.getId(), staffWithUser.getStaffId());
- if (n < MAX_VIDEO_NUM) {
- // 保底数据
- n = fillGuaranteedMessages(sendMessage, staffWithUser.getStaffId(), n);
- }
- if (n < MAX_VIDEO_NUM) {
- LarkRobotUtil.sendMessage("组装数据失败 user=" + user);
- throw new RuntimeException("组装数据失败");
- }
- sendMessage.setCorpId(corpId);
- sendMessage.setStaffId(staffWithUser.getStaffId());
- sendMessage.setUserId(staffWithUser.getUserId());
- sendMessageList.add(sendMessage);
- }
- return sendMessageList;
- }
- private int fillHistoricalTopMessages(SendMessage sendMessage, Long userId, Long staffId) {
- List<PushMessage> list = historicalTopMap.get(staffId);
- if (!CollectionUtils.isEmpty(list)) {
- int n = 0;
- for (PushMessage pushMessage : list) {
- if (pushMessage.getUserIds().contains(userId)) {
- setVideoId(sendMessage, n, pushMessage.getVideoId());
- n++;
- if (n >= MAX_VIDEO_NUM) {
- break;
- }
- }
- }
- return n;
- }
- return 0;
- }
- private int fillGuaranteedMessages(SendMessage sendMessage, Long staffId, int currentCount) {
- List<Long> guaranteedVideoIdList = guaranteedVideoMap.get(staffId);
- if (CollectionUtils.isEmpty(guaranteedVideoIdList)) {
- guaranteedVideoIdList = guaranteedVideoMap.get(0L);
- }
- if (CollectionUtils.isEmpty(guaranteedVideoIdList)) {
- LarkRobotUtil.sendMessage("组装数据时,保底数据获取异常");
- throw new RuntimeException("保底数据获取异常");
- }
- if (currentCount < MAX_VIDEO_NUM) {
- for (Long videoId : guaranteedVideoIdList) {
- setVideoId(sendMessage, currentCount, videoId);
- currentCount++;
- if (currentCount >= MAX_VIDEO_NUM) {
- break;
- }
- }
- }
- return currentCount;
- }
- private void setVideoId(SendMessage sendMessage, int index, Long videoId) {
- switch (index) {
- case 0:
- sendMessage.setVideoId1(videoId);
- break;
- case 1:
- sendMessage.setVideoId2(videoId);
- break;
- case 2:
- sendMessage.setVideoId3(videoId);
- break;
- default:
- break;
- }
- }
- @XxlJob("pushSendMessageJob")
- public ReturnT<String> pushSendMessage(String param) {
- XxlJobParam xxlJobParam = new XxlJobParam();
- if (StringUtils.isNotEmpty(param)) {
- xxlJobParam = JSONObject.parseObject(param, XxlJobParam.class);
- }
- List<SendMessage> groupList = sendMessageMapper.getGroupList(DateUtil.getThatDayDate(), 0);
- if (xxlJobParam.getCorpId() != null) {
- Long corpId = xxlJobParam.getCorpId();
- groupList = groupList.stream().filter(e -> Objects.equals(e.getCorpId(), corpId)).collect(Collectors.toList());
- }
- if (CollectionUtils.isEmpty(groupList)) {
- return ReturnT.SUCCESS;
- }
- if (xxlJobParam.getStaffId() != null) {
- Long staffId = xxlJobParam.getStaffId();
- groupList = groupList.stream().filter(e -> Objects.equals(e.getStaffId(), staffId)).collect(Collectors.toList());
- }
- if (CollectionUtils.isEmpty(groupList)) {
- return ReturnT.SUCCESS;
- }
- for (SendMessage sendMessage : groupList) {
- sendMessage.setIsSend(0);
- sendMessage.setCreateTime(DateUtil.getThatDayDate());
- List<String> sendUserList = sendMessageMapper.selectExternalUserId(sendMessage);
- boolean flag = pushMessage(sendUserList, sendMessage);
- if (flag) {
- SendMessage updateSendMessage = new SendMessage();
- updateSendMessage.setIsSend(1);
- SendMessageExample example = new SendMessageExample();
- example.createCriteria()
- .andVideoId1EqualTo(sendMessage.getVideoId1())
- .andVideoId2EqualTo(sendMessage.getVideoId2())
- .andVideoId3EqualTo(sendMessage.getVideoId3())
- .andStaffIdEqualTo(sendMessage.getStaffId())
- .andCreateTimeGreaterThan(DateUtil.getThatDayDate());
- sendMessageMapper.updateByExampleSelective(updateSendMessage, example);
- }
- }
- return ReturnT.SUCCESS;
- }
- private boolean pushMessage(List<String> sendUserList, SendMessage sendMessage) {
- List<JSONObject> pushList = new ArrayList<>();
- StaffExample staffExample = new StaffExample();
- staffExample.createCriteria().andIdEqualTo(sendMessage.getStaffId());
- List<Staff> staffList = staffMapper.selectByExample(staffExample);
- Staff staff = staffList.get(0);
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("chat_type", "single");
- JSONObject text = new JSONObject();
- String content = messageService.getMessageText();
- text.put("content", content);
- jsonObject.put("text", text);
- jsonObject.put("sender", staff.getCarrierId());
- JSONArray attachments = new JSONArray();
- List<Long> videoIdList = new ArrayList<>();
- videoIdList.add(sendMessage.getVideoId1());
- videoIdList.add(sendMessage.getVideoId2());
- videoIdList.add(sendMessage.getVideoId3());
- for (Long videoId : videoIdList) {
- JSONObject attachment = new JSONObject();
- attachment.put("msgtype", "miniprogram");
- MessageAttachmentExample example = new MessageAttachmentExample();
- example.createCriteria().andMiniprogramVideoIdEqualTo(videoId);
- List<MessageAttachment> messageAttachmentList = messageAttachmentMapper.selectByExample(example);
- if (CollectionUtils.isEmpty(messageAttachmentList)) {
- throw new RuntimeException("附件信息查询异常");
- }
- MessageAttachment messageAttachment = messageAttachmentList.get(0);
- JSONObject miniprogram = new JSONObject();
- miniprogram.put("appid", messageAttachment.getAppid());
- String title = messageAttachment.getTitle();
- if (title.getBytes(StandardCharsets.UTF_8).length > MAX_BYTES) {
- title = ToolUtils.truncateString(title, MAX_BYTES - 3) + "...";
- }
- miniprogram.put("title", title);
- String picMediaId = messageAttachmentService.getPicMediaId(messageAttachment.getCover(), sendMessage.getCorpId());
- if (StringUtils.isEmpty(picMediaId)) {
- log.error("pushMessage getPicMediaId error cover={}", messageAttachment.getCover());
- return false;
- }
- miniprogram.put("pic_media_id", picMediaId);
- String page = "";
- String key = staff.getCarrierId() + "_" + videoId;
- if (pageMap.containsKey(key)) {
- page = pageMap.get(key);
- } else {
- page = messageAttachmentService.getPage(staff, videoId);
- pageMap.put(key, page);
- }
- if (StringUtils.isEmpty(page)) {
- log.error("pushMessage get page error videoId={} staff={}", videoId, staff);
- return false;
- }
- miniprogram.put("page", page);
- attachment.put("miniprogram", miniprogram);
- attachments.add(0, attachment);
- }
- jsonObject.put("attachments", attachments);
- List<List<String>> lists = Lists.partition(sendUserList, 10000);
- for (List<String> list : lists) {
- JSONArray externalUserIds = JSONArray.parseArray(JSON.toJSONString(list));
- JSONObject newJSONObject = new JSONObject();
- newJSONObject.putAll(jsonObject);
- newJSONObject.put("external_userid", externalUserIds);
- pushList.add(newJSONObject);
- }
- if (CollectionUtils.isEmpty(pushList)) {
- return false;
- }
- for (JSONObject pushJsonObject : pushList) {
- log.info("pushMessage pushJsonObject={}", pushJsonObject);
- boolean flag = messageService.pushWeComMessage(pushJsonObject, sendMessage.getCorpId());
- if (!flag) {
- return flag;
- }
- }
- return true;
- }
- }
|