PushMessageDataJob.java 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package com.tzld.piaoquan.offline.job;
  2. import com.aliyun.odps.data.Record;
  3. import com.tzld.piaoquan.growth.common.component.ODPSManager;
  4. import com.tzld.piaoquan.growth.common.dao.mapper.PushMessageCallbackMapper;
  5. import com.tzld.piaoquan.growth.common.model.po.PushMessageCallbackExample;
  6. import com.tzld.piaoquan.growth.common.utils.DateUtil;
  7. import com.tzld.piaoquan.growth.common.utils.LarkRobotUtil;
  8. import com.xxl.job.core.biz.model.ReturnT;
  9. import com.xxl.job.core.handler.annotation.XxlJob;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.stereotype.Component;
  13. import org.springframework.util.CollectionUtils;
  14. import org.springframework.util.StringUtils;
  15. import java.util.Calendar;
  16. import java.util.Date;
  17. import java.util.List;
  18. import static com.tzld.piaoquan.growth.common.common.constant.TimeConstant.MILLISECOND_DAY;
  19. import static com.tzld.piaoquan.growth.common.common.constant.TimeConstant.MILLISECOND_HOUR;
  20. @Slf4j
  21. @Component
  22. public class PushMessageDataJob {
  23. @Autowired
  24. private PushMessageCallbackMapper pushMessageCallbackMapper;
  25. @Autowired
  26. private ODPSManager odpsManager;
  27. @XxlJob("alertPushMessageJob")
  28. public ReturnT<String> alertPushMessageJob(String param) {
  29. long endTimestamp = System.currentTimeMillis();
  30. long startTimestamp = endTimestamp - MILLISECOND_HOUR;
  31. long ytdStartTimestamp = startTimestamp - MILLISECOND_DAY;
  32. long ytdEndTimestamp = endTimestamp - MILLISECOND_DAY;
  33. PushMessageCallbackExample example = new PushMessageCallbackExample();
  34. example.createCriteria().andCreateTimeBetween(new Date(startTimestamp), new Date(endTimestamp));
  35. long nowCount = pushMessageCallbackMapper.countByExample(example);
  36. if (nowCount == 0) {
  37. LarkRobotUtil.sendMessage("最近一小时回调消息为0");
  38. }
  39. PushMessageCallbackExample ytdExample = new PushMessageCallbackExample();
  40. ytdExample.createCriteria().andCreateTimeBetween(new Date(ytdStartTimestamp), new Date(ytdEndTimestamp));
  41. long ydtCount = pushMessageCallbackMapper.countByExample(example);
  42. if (nowCount + 1000 < ydtCount) {
  43. if (ydtCount == 0) {
  44. ydtCount = 1L;
  45. }
  46. double res = ((double) nowCount - ydtCount) * 100 / ydtCount;
  47. LarkRobotUtil.sendMessage("最近一小时回调消息量:" + nowCount + "\n"
  48. + "昨日消息回调量:" + ydtCount + "\n"
  49. + "百分比为:" + String.format("%.2f", res) + "%");
  50. }
  51. return ReturnT.SUCCESS;
  52. }
  53. @XxlJob("delPushMessageCallbackJob")
  54. public ReturnT<String> delPushMessageCallbackJob(String param) {
  55. Calendar calendar = Calendar.getInstance();
  56. Long beforeDayStart = DateUtil.getBeforeDayStart(7);
  57. Date startDay = new Date(beforeDayStart * 1000);
  58. if (StringUtils.hasText(param)) {
  59. startDay = DateUtil.getDate(param);
  60. }
  61. for (int i = 0; i < 30; i++) {
  62. // 计算当前遍历日期
  63. Date currentStartDate = (Date) startDay.clone();
  64. calendar.setTime(currentStartDate);
  65. calendar.add(Calendar.DAY_OF_MONTH, -i); // 向前推i天
  66. // 生成开始时间和结束时间
  67. Date startTime = calendar.getTime(); // 开始时间
  68. calendar.set(Calendar.HOUR_OF_DAY, 23);
  69. calendar.set(Calendar.MINUTE, 59);
  70. calendar.set(Calendar.SECOND, 59);
  71. Date endTime = calendar.getTime(); // 结束时间
  72. PushMessageCallbackExample example = new PushMessageCallbackExample();
  73. example.createCriteria().andCreateTimeBetween(startTime, endTime);
  74. long mysqlCount = pushMessageCallbackMapper.countByExample(example);
  75. if (mysqlCount == 0) {
  76. break;
  77. }
  78. String pt = DateUtil.getDateString(startTime.getTime(), "yyyyMMdd");
  79. String sql = String.format("SELECT count(*) FROM push_message_callback WHERE pt = %s;", pt);
  80. List<Record> recordList = odpsManager.query(sql);
  81. if (CollectionUtils.isEmpty(recordList)) {
  82. LarkRobotUtil.sendMessage("查询hive失败" + pt);
  83. return ReturnT.FAIL;
  84. }
  85. Long hiveCount = recordList.get(0).getBigint(0);
  86. if (Math.abs(mysqlCount - hiveCount) > 0) {
  87. LarkRobotUtil.sendMessage("数量异常" + pt + "\n mysql数量:" + mysqlCount + "\n hive数量:" + hiveCount);
  88. return ReturnT.FAIL;
  89. }
  90. pushMessageCallbackMapper.deleteByExample(example);
  91. }
  92. return ReturnT.SUCCESS;
  93. }
  94. }