|
@@ -0,0 +1,104 @@
|
|
|
+package com.tzld.piaoquan.offline.job;
|
|
|
+
|
|
|
+import com.aliyun.odps.data.Record;
|
|
|
+import com.tzld.piaoquan.growth.common.component.ODPSManager;
|
|
|
+import com.tzld.piaoquan.growth.common.dao.mapper.PushMessageCallbackMapper;
|
|
|
+import com.tzld.piaoquan.growth.common.model.po.PushMessageCallbackExample;
|
|
|
+import com.tzld.piaoquan.growth.common.utils.DateUtil;
|
|
|
+import com.tzld.piaoquan.growth.common.utils.LarkRobotUtil;
|
|
|
+import com.xxl.job.core.biz.model.ReturnT;
|
|
|
+import com.xxl.job.core.handler.annotation.XxlJob;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
+
|
|
|
+import java.util.Calendar;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+import static com.tzld.piaoquan.growth.common.common.constant.TimeConstant.MILLISECOND_DAY;
|
|
|
+import static com.tzld.piaoquan.growth.common.common.constant.TimeConstant.MILLISECOND_HOUR;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class PushMessageDataJob {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private PushMessageCallbackMapper pushMessageCallbackMapper;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private ODPSManager odpsManager;
|
|
|
+
|
|
|
+ @XxlJob("alertPushMessageJob")
|
|
|
+ public ReturnT<String> alertPushMessageJob(String param) {
|
|
|
+ long endTimestamp = System.currentTimeMillis();
|
|
|
+ long startTimestamp = endTimestamp - MILLISECOND_HOUR;
|
|
|
+ long ytdStartTimestamp = startTimestamp - MILLISECOND_DAY;
|
|
|
+ long ytdEndTimestamp = endTimestamp - MILLISECOND_DAY;
|
|
|
+ PushMessageCallbackExample example = new PushMessageCallbackExample();
|
|
|
+ example.createCriteria().andCreateTimeBetween(new Date(startTimestamp), new Date(endTimestamp));
|
|
|
+ long nowCount = pushMessageCallbackMapper.countByExample(example);
|
|
|
+ if (nowCount == 0) {
|
|
|
+ LarkRobotUtil.sendMessage("最近一小时回调消息为0");
|
|
|
+ }
|
|
|
+ PushMessageCallbackExample ytdExample = new PushMessageCallbackExample();
|
|
|
+ ytdExample.createCriteria().andCreateTimeBetween(new Date(ytdStartTimestamp), new Date(ytdEndTimestamp));
|
|
|
+ long ydtCount = pushMessageCallbackMapper.countByExample(example);
|
|
|
+ if (nowCount + 1000 < ydtCount) {
|
|
|
+ if (ydtCount == 0) {
|
|
|
+ ydtCount = 1L;
|
|
|
+ }
|
|
|
+ double res = ((double) nowCount - ydtCount) * 100 / ydtCount;
|
|
|
+ LarkRobotUtil.sendMessage("最近一小时回调消息量:" + nowCount + "\n"
|
|
|
+ + "昨日消息回调量:" + ydtCount + "\n"
|
|
|
+ + "百分比为:" + String.format("%.2f", res) + "%");
|
|
|
+ }
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ }
|
|
|
+
|
|
|
+ @XxlJob("delPushMessageCallbackJob")
|
|
|
+ public ReturnT<String> delPushMessageCallbackJob(String param) {
|
|
|
+ Calendar calendar = Calendar.getInstance();
|
|
|
+ Long beforeDayStart = DateUtil.getBeforeDayStart(7);
|
|
|
+ Date startDay = new Date(beforeDayStart * 1000);
|
|
|
+ if (StringUtils.hasText(param)) {
|
|
|
+ startDay = DateUtil.getDate(param);
|
|
|
+ }
|
|
|
+ for (int i = 0; i < 30; i++) {
|
|
|
+ // 计算当前遍历日期
|
|
|
+ Date currentStartDate = (Date) startDay.clone();
|
|
|
+ calendar.setTime(currentStartDate);
|
|
|
+ calendar.add(Calendar.DAY_OF_MONTH, -i); // 向前推i天
|
|
|
+
|
|
|
+ // 生成开始时间和结束时间
|
|
|
+ Date startTime = calendar.getTime(); // 开始时间
|
|
|
+ calendar.set(Calendar.HOUR_OF_DAY, 23);
|
|
|
+ calendar.set(Calendar.MINUTE, 59);
|
|
|
+ calendar.set(Calendar.SECOND, 59);
|
|
|
+ Date endTime = calendar.getTime(); // 结束时间
|
|
|
+
|
|
|
+ PushMessageCallbackExample example = new PushMessageCallbackExample();
|
|
|
+ example.createCriteria().andCreateTimeBetween(startTime, endTime);
|
|
|
+ long mysqlCount = pushMessageCallbackMapper.countByExample(example);
|
|
|
+ if (mysqlCount == 0) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ String pt = DateUtil.getDateString(startTime.getTime());
|
|
|
+ String sql = String.format("SELECT count(*) FROM push_message_callback WHERE pt = %s;", pt);
|
|
|
+ List<Record> recordList = odpsManager.query(sql);
|
|
|
+ if (CollectionUtils.isEmpty(recordList)) {
|
|
|
+ LarkRobotUtil.sendMessage("查询hive失败" + pt);
|
|
|
+ return ReturnT.FAIL;
|
|
|
+ }
|
|
|
+ Long hiveCount = recordList.get(0).getBigint(0);
|
|
|
+ if (Math.abs(mysqlCount - hiveCount) > 0) {
|
|
|
+ LarkRobotUtil.sendMessage("数量异常" + pt + "\n mysql数量:" + mysqlCount + "\n hive数量:" + hiveCount);
|
|
|
+ return ReturnT.FAIL;
|
|
|
+ }
|
|
|
+ pushMessageCallbackMapper.deleteByExample(example);
|
|
|
+ }
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ }
|
|
|
+}
|