瀏覽代碼

增加删除回调消息定时任务

xueyiming 8 月之前
父節點
當前提交
faacebc75c

+ 49 - 4
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/XxlJobService.java

@@ -3,6 +3,7 @@ package com.tzld.longarticle.recommend.server.service;
 import cn.hutool.core.collection.CollectionUtil;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
+import com.aliyun.odps.data.Record;
 import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.tzld.longarticle.recommend.server.common.enums.recommend.AccountBusinessTypeEnum;
 import com.tzld.longarticle.recommend.server.common.enums.recommend.FeishuRobotIdEnum;
@@ -19,6 +20,7 @@ import com.tzld.longarticle.recommend.server.model.entity.crawler.LongArticlesVi
 import com.tzld.longarticle.recommend.server.model.entity.longArticle.GetOffVideoArticle;
 import com.tzld.longarticle.recommend.server.model.entity.longArticle.LongArticlesMatchVideo;
 import com.tzld.longarticle.recommend.server.model.entity.longArticle.LongArticlesReadRate;
+import com.tzld.longarticle.recommend.server.remote.ODPSManager;
 import com.tzld.longarticle.recommend.server.repository.crawler.GetOffVideoCrawlerRepository;
 import com.tzld.longarticle.recommend.server.repository.crawler.LongArticlesRootSourceIdRepository;
 import com.tzld.longarticle.recommend.server.repository.crawler.LongArticlesVideoRepository;
@@ -38,10 +40,7 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 
 import java.time.LocalTime;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.stream.Collectors;
 
 import static com.tzld.longarticle.recommend.server.common.constant.TimeConstant.MILLISECOND_DAY;
@@ -69,6 +68,8 @@ public class XxlJobService {
     private LongArticlesRootSourceIdRepository longArticlesRootSourceIdRepository;
     @Autowired
     private PushMessageCallbackMapper pushMessageCallbackMapper;
+    @Autowired
+    private ODPSManager odpsManager;
 
     @ApolloJsonValue("${touliu.account.ghIds:[\"gh_93e00e187787\", \"gh_ac43e43b253b\", \"gh_68e7fdc09fe4\",\"gh_77f36c109fb1\", \"gh_b181786a6c8c\", \"gh_1ee2e1b39ccf\"]}")
     private List<String> touliuAccountGhIds;
@@ -327,4 +328,48 @@ public class XxlJobService {
         }
         return ReturnT.SUCCESS;
     }
+    @XxlJob("delPushMessageCallbackJob")
+    public ReturnT<String> delPushMessageCallbackJob(String param) {
+        Calendar calendar = Calendar.getInstance();
+        Long beforeDayStart = DateUtils.getBeforeDayStart(7);
+        Date sevenDaysAgoStart = new Date(beforeDayStart * 1000);
+        if (StringUtils.hasText(param)) {
+            sevenDaysAgoStart = DateUtils.getDate(param);
+        }
+        for (int i = 0; i < 30; i++) {
+            // 计算当前遍历日期
+            Date currentStartDate = (Date) sevenDaysAgoStart.clone();
+            calendar.setTime(currentStartDate);
+            calendar.add(Calendar.DAY_OF_MONTH, -i); // 向前推i天
+
+            // 生成开始时间和结束时间
+            Date startTime = calendar.getTime(); // 开始时间
+            calendar.set(Calendar.HOUR_OF_DAY, 23);
+            calendar.set(Calendar.MINUTE, 59);
+            calendar.set(Calendar.SECOND, 59);
+            Date endTime = calendar.getTime(); // 结束时间
+
+            PushMessageCallbackExample example = new PushMessageCallbackExample();
+            example.createCriteria().andCreateTimeBetween(startTime, endTime);
+            long mySqlCount = pushMessageCallbackMapper.countByExample(example);
+            if (mySqlCount == 0) {
+                break;
+            }
+            String pt = DateUtils.getDateString(startTime.getTime());
+            String sql = String.format("SELECT count(*) FROM push_message_callback WHERE pt = %s", pt);
+            List<Record> recordList = odpsManager.query(sql);
+            if (CollectionUtil.isEmpty(recordList)) {
+                LarkRobotUtil.sendMessage("查询hive是失败" + pt);
+                return ReturnT.FAIL;
+            }
+            Long hiveCount = recordList.get(0).getBigint(0);
+            if (Math.abs(mySqlCount - hiveCount) > 500) {
+                LarkRobotUtil.sendMessage("数量异常" + pt);
+                return ReturnT.FAIL;
+            }
+            pushMessageCallbackMapper.deleteByExample(example);
+        }
+        return ReturnT.SUCCESS;
+    }
+
 }

+ 15 - 0
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/util/DateUtils.java

@@ -268,4 +268,19 @@ public final class DateUtils {
         date = date.minusDays(days);
         return date.format(formatter);
     }
+
+    public static String getDateString(Long timestamp) {
+        // 创建日期时间格式化器
+        DateTimeFormatter dateFormat = DateTimeFormatter.ofPattern("yyyyMMdd");
+        // 将时间戳转换为 LocalDateTime
+        LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault());
+        // 格式化日期时间并返回
+        return dateTime.format(dateFormat);
+    }
+
+    public static Date getDate(String dateString) {
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+        LocalDate localDate = LocalDate.parse(dateString, formatter);
+        return Date.from(localDate.atStartOfDay(ZoneId.systemDefault()).toInstant());
+    }
 }