package com.tzld.piaoquan.offline.job; import com.aliyun.odps.data.Record; import com.tzld.piaoquan.growth.common.component.ODPSManager; import com.tzld.piaoquan.growth.common.dao.mapper.PushMessageCallbackMapper; import com.tzld.piaoquan.growth.common.model.po.PushMessageCallbackExample; import com.tzld.piaoquan.growth.common.utils.DateUtil; import com.tzld.piaoquan.growth.common.utils.LarkRobotUtil; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.util.Calendar; import java.util.Date; import java.util.List; import static com.tzld.piaoquan.growth.common.common.constant.TimeConstant.MILLISECOND_DAY; import static com.tzld.piaoquan.growth.common.common.constant.TimeConstant.MILLISECOND_HOUR; @Slf4j @Component public class PushMessageDataJob { @Autowired private PushMessageCallbackMapper pushMessageCallbackMapper; @Autowired private ODPSManager odpsManager; @XxlJob("alertPushMessageJob") public ReturnT alertPushMessageJob(String param) { long endTimestamp = System.currentTimeMillis(); long startTimestamp = endTimestamp - MILLISECOND_HOUR; long ytdStartTimestamp = startTimestamp - MILLISECOND_DAY; long ytdEndTimestamp = endTimestamp - MILLISECOND_DAY; PushMessageCallbackExample example = new PushMessageCallbackExample(); example.createCriteria().andCreateTimeBetween(new Date(startTimestamp), new Date(endTimestamp)); long nowCount = pushMessageCallbackMapper.countByExample(example); if (nowCount == 0) { LarkRobotUtil.sendMessage("最近一小时回调消息为0"); } PushMessageCallbackExample ytdExample = new PushMessageCallbackExample(); ytdExample.createCriteria().andCreateTimeBetween(new Date(ytdStartTimestamp), new Date(ytdEndTimestamp)); long ydtCount = pushMessageCallbackMapper.countByExample(example); if (nowCount + 1000 < ydtCount) { if (ydtCount == 0) { ydtCount = 1L; } double res = ((double) nowCount - ydtCount) * 100 / ydtCount; LarkRobotUtil.sendMessage("最近一小时回调消息量:" + nowCount + "\n" + "昨日消息回调量:" + ydtCount + "\n" + "百分比为:" + String.format("%.2f", res) + "%"); } return ReturnT.SUCCESS; } @XxlJob("delPushMessageCallbackJob") public ReturnT delPushMessageCallbackJob(String param) { Calendar calendar = Calendar.getInstance(); Long beforeDayStart = DateUtil.getBeforeDayStart(7); Date startDay = new Date(beforeDayStart * 1000); if (StringUtils.hasText(param)) { startDay = DateUtil.getDate(param); } for (int i = 0; i < 30; i++) { // 计算当前遍历日期 Date currentStartDate = (Date) startDay.clone(); calendar.setTime(currentStartDate); calendar.add(Calendar.DAY_OF_MONTH, -i); // 向前推i天 // 生成开始时间和结束时间 Date startTime = calendar.getTime(); // 开始时间 calendar.set(Calendar.HOUR_OF_DAY, 23); calendar.set(Calendar.MINUTE, 59); calendar.set(Calendar.SECOND, 59); Date endTime = calendar.getTime(); // 结束时间 PushMessageCallbackExample example = new PushMessageCallbackExample(); example.createCriteria().andCreateTimeBetween(startTime, endTime); long mysqlCount = pushMessageCallbackMapper.countByExample(example); if (mysqlCount == 0) { break; } String pt = DateUtil.getDateString(startTime.getTime(), "yyyyMMdd"); String sql = String.format("SELECT count(*) FROM push_message_callback WHERE pt = %s;", pt); List recordList = odpsManager.query(sql); if (CollectionUtils.isEmpty(recordList)) { LarkRobotUtil.sendMessage("查询hive失败" + pt); return ReturnT.FAIL; } Long hiveCount = recordList.get(0).getBigint(0); if (Math.abs(mysqlCount - hiveCount) > 0) { LarkRobotUtil.sendMessage("数量异常" + pt + "\n mysql数量:" + mysqlCount + "\n hive数量:" + hiveCount); return ReturnT.FAIL; } pushMessageCallbackMapper.deleteByExample(example); } return ReturnT.SUCCESS; } }