丁云鹏 4 mesiacov pred
rodič
commit
e8047dd5fb

+ 6 - 1
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/ODPSToRedis.java

@@ -67,7 +67,12 @@ public class ODPSToRedis {
             if (count <= 0) {
                 if (retry <= 0) {
                     // TODO 报警
-                    return;
+                    Map<String, String> partitionMap = odpsService.getLastestPartition(config, argMap);
+                    argMap.putAll(partitionMap);
+                    count = odpsService.count(config, argMap);
+                    if (count <= 0) {
+                        return;
+                    }
                 } else {
                     try {
                         log.info("sleep {}", retry);

+ 52 - 0
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/service/ODPSService.java

@@ -40,6 +40,7 @@ public class ODPSService {
     private final String tunnelUrl = "http://dt.cn-hangzhou-vpc.maxcompute.aliyun-inc.com";
     private final String sqlFormat = "select %s from %s where 1=1 %s ;";
     private final String countSqlFormat = "select count(1) as count from %s where 1=1 %s ;";
+    private final String latestPartitionSqlFormat = "select %s from %s where 0=1 OR %s ;";
 
 
     public JavaRDD<Map<String, String>> read(JavaSparkContext jsc, DTSConfig config, Map<String, String> argMap) {
@@ -141,6 +142,57 @@ public class ODPSService {
         return Integer.valueOf(records.get(0).getString(0));
     }
 
+    public Map<String, String> getLastestPartition(DTSConfig config, Map<String, String> argMap) {
+        String project = argMap.get("project");
+        String table = argMap.get("table");
+        // dt = MAX_PT('alg_sence_type_feature')
+        StringBuilder sb = new StringBuilder();
+        sb.append("1");
+        for (String partition : config.getOdps().getPartition()) {
+            sb.append(",max(");
+            sb.append(partition);
+            sb.append(") as ");
+            sb.append(partition);
+        }
+        String cols = sb.toString();
+
+        sb = new StringBuilder();
+        for (String partition : config.getOdps().getPartition()) {
+            sb.append(" OR ");
+            sb.append(partition);
+            sb.append(" = ");
+            sb.append("MAX_PT('");
+            sb.append(table);
+            sb.append("')");
+        }
+        String condition = sb.toString();
+
+        Account account = new AliyunAccount(accessId, accessKey);
+        Odps odps = new Odps(account);
+        odps.setEndpoint(odpsUrl);
+        odps.setDefaultProject(project);
+
+        String sql = String.format(latestPartitionSqlFormat, cols, table, condition);
+        List<Record> records;
+        try {
+            Instance i = SQLTask.run(odps, sql);
+            i.waitForSuccess();
+            records = SQLTask.getResult(i);
+        } catch (OdpsException e) {
+            log.error("request odps error", e);
+            return Collections.emptyMap();
+        }
+
+        if (records.size() > 0) {
+            Map<String, String> map = new HashMap<>();
+            Record record = records.get(0);
+            for (int i = 0; i < record.getColumnCount(); i++) {
+                map.put(record.getColumns()[i].getName(), record.getString(i));
+            }
+        }
+        return Collections.emptyMap();
+
+    }
 
     /**
      * @return k: 列名 v:值