package com.tzld.piaoquan.offline.job; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.tzld.piaoquan.growth.common.common.enums.TimeEnum; import com.tzld.piaoquan.growth.common.component.HttpPoolClient; import com.tzld.piaoquan.growth.common.component.ProxyHttpPoolClient; import com.tzld.piaoquan.growth.common.dao.mapper.CorpMapper; import com.tzld.piaoquan.growth.common.dao.mapper.CorpStatisticsTotalMapper; import com.tzld.piaoquan.growth.common.dao.mapper.StaffMapper; import com.tzld.piaoquan.growth.common.dao.mapper.StaffStatisticsTotalMapper; import com.tzld.piaoquan.growth.common.model.bo.XxlJobParam; import com.tzld.piaoquan.growth.common.model.po.*; import com.tzld.piaoquan.growth.common.service.WeComAccessTokenService; import com.tzld.piaoquan.growth.common.service.WeComSendService; import com.tzld.piaoquan.growth.common.utils.DateUtil; import com.tzld.piaoquan.growth.common.utils.DateUtils; import com.tzld.piaoquan.growth.common.utils.LarkRobotUtil; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import static com.tzld.piaoquan.growth.common.common.constant.WeComConstant.*; @Slf4j @Component public class WeComStaffDataJob { @Autowired private HttpPoolClient httpPoolClient; @Autowired private ProxyHttpPoolClient proxyHttpPoolClient; @Autowired private WeComAccessTokenService weComAccessTokenService; @Autowired private StaffMapper staffMapper; @Autowired private CorpMapper corpMapper; @Autowired private StaffStatisticsTotalMapper staffStatisticsTotalMapper; @Autowired private CorpStatisticsTotalMapper corpStatisticsTotalMapper; @XxlJob("insertStaffJob") public ReturnT insertStaff(String param) { try { XxlJobParam xxlJobParam = new XxlJobParam(); if (StringUtils.isNotEmpty(param)) { xxlJobParam = JSONObject.parseObject(param, XxlJobParam.class); } CorpExample corpExample = new CorpExample(); if (xxlJobParam.getCorpId() != null) { corpExample.createCriteria().andIdEqualTo(xxlJobParam.getCorpId()); } List corps = corpMapper.selectByExample(corpExample); if (CollectionUtils.isEmpty(corps)) { return ReturnT.SUCCESS; } for (Corp corp : corps) { List carrierIdList = getCarrierIdList(corp.getId()); if (CollectionUtils.isEmpty(carrierIdList)) { continue; } for (String carrierId : carrierIdList) { StaffExample example = new StaffExample(); example.createCriteria().andCarrierIdEqualTo(carrierId).andCorpIdEqualTo(corp.getId()); List staffList = staffMapper.selectByExample(example); if (CollectionUtils.isEmpty(staffList)) { Staff staff = new Staff(); staff.setCorpId(corp.getId()); staff.setCarrierId(carrierId); staff.setRemark(""); staffMapper.insert(staff); } } } } catch (Exception e) { LarkRobotUtil.sendMessage("更新员工失败"); log.error("insertStaff error", e); } return ReturnT.SUCCESS; } private List getCarrierIdList(Long corpId) throws IOException { String weComAccessToken = weComAccessTokenService.getWeComAccessToken(corpId); String url = String.format(GET_WE_COM_FOLLOW_USER_LIST + "?access_token=%s", weComAccessToken); String res; if (corpId == 1L) { res = httpPoolClient.get(url); } else { res = proxyHttpPoolClient.get(url); } log.info("getCarrierIdList corp = {}, res={}", corpId, res); JSONObject jsonObject = JSONObject.parseObject(res); Integer errcode = jsonObject.getInteger("errcode"); if (errcode == 0) { return jsonObject.getJSONArray("follow_user").stream().map(String::valueOf).collect(Collectors.toList()); } return null; } @XxlJob("statisticsTotalJob") public ReturnT statisticsTotal(String param) throws IOException { String date; if (StringUtils.isNotEmpty(param)) { date = param; } else { date = DateUtil.getThatDayDateString(); } long startTime = DateUtil.dateStrToTimestamp(date, "yyyy-MM-dd"); long endTime = startTime + TimeEnum.DAY.getTime(); StaffExample staffExample = new StaffExample(); staffExample.createCriteria().andIsDeleteEqualTo(0); List staffs = staffMapper.selectByExample(staffExample); for (Staff staff : staffs) { statisticsStaffTotal(staff, startTime, endTime, date); } StaffStatisticsTotalExample staffStatisticsTotalExample = new StaffStatisticsTotalExample(); staffStatisticsTotalExample.createCriteria().andDateEqualTo(date); List staffStatisticsTotals = staffStatisticsTotalMapper.selectByExample(staffStatisticsTotalExample); if (!CollectionUtils.isEmpty(staffStatisticsTotals)) { // 2. 分组并多字段求和(不使用构造方法) List results = new ArrayList<>(staffStatisticsTotals.stream() // 按产品类型分组 .collect(Collectors.groupingBy( StaffStatisticsTotal::getCorpId, // 对每组数据累加:使用默认构造的结果对象,通过setter更新值 Collectors.reducing( new CorpStatisticsTotal(), // 初始值:默认构造的空对象 // 转换函数:将Order转换为临时结果对象(用setter设置初始值) s -> { CorpStatisticsTotal temp = new CorpStatisticsTotal(); temp.setCorpId(s.getCorpId()); temp.setChatCnt(s.getChatCnt()); temp.setMessageCnt(s.getMessageCnt()); temp.setNegativeFeedbackCnt(s.getNegativeFeedbackCnt()); temp.setNewApplyCnt(s.getNewApplyCnt()); temp.setNewContactCnt(s.getNewContactCnt()); return temp; }, // 累加函数:合并两个结果对象(用setter更新总和) (sum1, sum2) -> { // 确保分组类型一致(同组数据type相同,取一个即可) sum1.setCorpId(sum2.getCorpId()); // 累加数量 sum1.setChatCnt(sum1.getChatCnt() + sum2.getChatCnt()); sum1.setMessageCnt(sum1.getMessageCnt() + sum2.getMessageCnt()); sum1.setNegativeFeedbackCnt(sum1.getNegativeFeedbackCnt() + sum2.getNegativeFeedbackCnt()); sum1.setNewApplyCnt(sum1.getNewApplyCnt() + sum2.getNewApplyCnt()); sum1.setNewContactCnt(sum1.getNewContactCnt() + sum2.getNewContactCnt()); return sum1; } ) )) // 将Map的值(结果对象)转换为List .values()); for (CorpStatisticsTotal corpStatisticsTotal : results) { corpStatisticsTotal.setDate(date); corpStatisticsTotalMapper.insertSelective(corpStatisticsTotal); } } return ReturnT.SUCCESS; } private void statisticsStaffTotal(Staff staff, long startTime, long endTime, String date) throws IOException { Long corpId = staff.getCorpId(); String accessToken = weComAccessTokenService.getWeComAccessToken(corpId); String url = POST_WE_COM_USER_BEHAVIOR_DATA + "?access_token=" + accessToken; JSONObject params = new JSONObject(); params.put("start_time", startTime); params.put("end_time", endTime); JSONArray userIds = new JSONArray(); userIds.add(staff.getCarrierId()); params.put("userid", userIds); String res; if (staff.getCorpId() == 1L) { res = httpPoolClient.post(url, params.toJSONString()); } else { res = proxyHttpPoolClient.post(url); } if (StringUtils.isNotEmpty(res)) { JSONObject jsonObject = JSONObject.parseObject(res); Integer errcode = jsonObject.getInteger("errcode"); if (errcode == 0) { JSONArray jsonArray = jsonObject.getJSONArray("behavior_data"); if (!jsonArray.isEmpty()) { JSONObject data = jsonArray.getJSONObject(0); StaffStatisticsTotal statisticsTotal = new StaffStatisticsTotal(); statisticsTotal.setStatTime(data.getLong("stat_time")); statisticsTotal.setChatCnt(data.getInteger("chat_cnt")); statisticsTotal.setMessageCnt(data.getInteger("message_cnt")); statisticsTotal.setNegativeFeedbackCnt(data.getInteger("negative_feedback_cnt")); statisticsTotal.setNewApplyCnt(data.getInteger("new_apply_cnt")); statisticsTotal.setNewContactCnt(data.getInteger("new_contact_cnt")); statisticsTotal.setDate(date); statisticsTotal.setCorpId(corpId); statisticsTotal.setStaffId(staff.getId()); staffStatisticsTotalMapper.insertSelective(statisticsTotal); } } } } }