丁云鹏 4 mesiacov pred
rodič
commit
31bd11dc55

+ 81 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/xxl/RickUserCacheJob.java

@@ -0,0 +1,81 @@
+package com.tzld.piaoquan.recommend.server.xxl;
+
+import com.aliyun.odps.data.Record;
+import com.tzld.piaoquan.recommend.server.common.RedisKeyConstants;
+import com.tzld.piaoquan.recommend.server.service.odps.ODPSManager;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import com.xxl.job.core.log.XxlJobLogger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author sunxy
+ */
+@Component
+@Slf4j
+public class RickUserCacheJob {
+
+    @Autowired
+    private ODPSManager odpsManager;
+
+    @Autowired
+    private RedisTemplate<String, String> redisTemplate;
+
+    @XxlJob("upldateRiskUserCache")
+    public ReturnT<String> upldateRiskUserCache(String params) {
+        // 执行 ODPS Sql,获取数据
+        Pair<List<String>, List<String>> pairs = queryRiskUser();
+        if (pairs == null) {
+            XxlJobLogger.log("updateNotUserUploadUserCache, uids is empty");
+            return ReturnT.SUCCESS;
+        }
+        List<String> uids = pairs.getLeft();
+        if (CollectionUtils.isNotEmpty(uids)) {
+            redisTemplate.delete(RedisKeyConstants.Recommend.riskUserUid);
+            redisTemplate.opsForSet().add(RedisKeyConstants.Recommend.riskUserUid,
+                    uids.toArray(new String[uids.size()]));
+        }
+
+        List<String> mids = pairs.getLeft();
+        if (CollectionUtils.isNotEmpty(mids)) {
+            redisTemplate.delete(RedisKeyConstants.Recommend.riskUserMid);
+            redisTemplate.opsForSet().add(RedisKeyConstants.Recommend.riskUserMid,
+                    mids.toArray(new String[uids.size()]));
+        }
+
+        XxlJobLogger.log("upldateRiskUserCache finish");
+        return ReturnT.SUCCESS;
+    }
+
+    private Pair<List<String>, List<String>> queryRiskUser() {
+        String sql = "SELECT uid,mid FROM loghubods.wx_review_staff_list WHERE dt = MAX_PT('loghubods.wx_review_staff_list')";
+        List<Record> records = odpsManager.query(sql);
+        if (records == null || records.size() == 0) {
+            return null;
+        }
+
+        List<String> uids = new ArrayList<>();
+        List<String> mids = new ArrayList<>();
+        for (Record record : records) {
+            if (StringUtils.isNotBlank(record.getString(0))) {
+                uids.add(record.getString(0));
+            }
+
+            if (StringUtils.isNotBlank(record.getString(1))) {
+                mids.add(record.getString(1));
+            }
+        }
+        return Pair.of(uids, mids);
+    }
+
+
+}