丁云鹏 1 рік тому
батько
коміт
669c1e6528

+ 99 - 0
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/FeatureDiff.java

@@ -0,0 +1,99 @@
+package com.tzld.piaoquan.recommend.feature.produce;
+
+import com.google.common.reflect.TypeToken;
+import com.tzld.piaoquan.recommend.feature.produce.service.ODPSService;
+import com.tzld.piaoquan.recommend.feature.produce.util.JSONUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author dyp
+ */
+@Slf4j
+public class FeatureDiff {
+
+    private static ODPSService odpsService = new ODPSService();
+
+    private static Map<String, String> tableToCol;
+
+    static {
+        tableToCol = new HashMap<>();
+        tableToCol.put("alg_vid_feature_all_exp", "b1_feature");
+        tableToCol.put("alg_vid_feature_all_share", "b2_feature");
+        tableToCol.put("alg_vid_feature_all_return", "b3_feature");
+        tableToCol.put("alg_vid_feature_head_play", "b4_feature");
+        tableToCol.put("alg_vid_feature_feed_play", "b5_feature");
+        tableToCol.put("alg_vid_feature_exp2share", "b6_feature");
+        tableToCol.put("alg_vid_feature_share2return", "b7_feature");
+        tableToCol.put("alg_vid_feature_feed_noflow_exp", "b8_feature");
+        tableToCol.put("alg_vid_feature_feed_noflow_root_share", "b9_feature");
+        tableToCol.put("alg_vid_feature_feed_noflow_root_return", "b10_feature");
+        tableToCol.put("alg_vid_feature_feed_flow_exp", "b11_feature");
+        tableToCol.put("alg_vid_feature_feed_flow_root_share", "b12_feature");
+        tableToCol.put("alg_vid_feature_feed_flow_root_return", "b13_feature");
+        tableToCol.put("alg_vid_feature_feed_apptype_exp", "b14_feature");
+        tableToCol.put("alg_vid_feature_feed_apptype_root_share", "b15_feature");
+        tableToCol.put("alg_vid_feature_feed_apptype_root_return", "b16_feature");
+        tableToCol.put("alg_vid_feature_feed_province_exp", "b17_feature");
+        tableToCol.put("alg_vid_feature_feed_province_root_share", "b18_feature");
+        tableToCol.put("alg_vid_feature_feed_province_root_return", "b19_feature");
+//        tableToCol.put("", "");
+//        tableToCol.put("", "");
+//        tableToCol.put("", "");
+//        tableToCol.put("", "");
+//        tableToCol.put("", "");
+//        tableToCol.put("", "");
+    }
+
+    public static void main(String[] args) {
+
+        SparkConf sparkConf = new SparkConf()
+                // .setMaster("local")
+                .setAppName("odps sync to redis");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+        // ODPS
+        log.info("read odps");
+        String project = "loghubods";
+        String table = "alg_recsys_sample_all_new";
+        String partition = "dt=20240616,hh=10";
+
+        int partitionNum = 1;
+        JavaRDD<Map<String, String>> fieldValues = odpsService.read(jsc, project, table, partition, partitionNum);
+        if (fieldValues == null) {
+            log.info("odps empty");
+            return;
+        }
+
+        long diffCount = fieldValues.repartition(partitionNum).filter(new Function<Map<String, String>, Boolean>() {
+            @Override
+            public Boolean call(Map<String, String> map) throws Exception {
+
+                if (StringUtils.isNotBlank(map.get("flowpool"))) {
+                    return false;
+                }
+
+                Map<String, String> metaFeatureMap = JSONUtils.fromJson(map.get("metafeaturemap"), new TypeToken<Map<String, String>>() {
+                }, Collections.emptyMap());
+                for (Map.Entry<String, String> e : tableToCol.entrySet()) {
+                    if (StringUtils.equals(map.get(e.getValue()), metaFeatureMap.get(e.getKey()))) {
+                        continue;
+                    }
+                    return true;
+                }
+                return false;
+            }
+        }).count();
+
+        log.info("diff count {}", diffCount);
+
+    }
+}

+ 13 - 1
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/service/ODPSService.java

@@ -69,6 +69,15 @@ public class ODPSService {
         return readData;
     }
 
+    public JavaRDD<Map<String, String>> read(JavaSparkContext jsc, String project, String table, String partition,
+                                             int partitionNum) {
+        OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
+
+        JavaRDD<Map<String, String>> readData = odpsOps.readTableWithJava(project, table, partition,
+                new RecordToMap(), partitionNum);
+        return readData;
+    }
+
     static class RecordToMap implements Function2<Record, TableSchema, Map<String, String>> {
         private List<String> cols;
 
@@ -76,11 +85,14 @@ public class ODPSService {
             this.cols = cols;
         }
 
+        public RecordToMap() {
+        }
+
         @Override
         public Map<String, String> call(Record r, TableSchema schema) {
             Map<String, String> map = new HashMap<>();
             for (int i = 0; i < schema.getColumns().size(); i++) {
-                if (cols.contains(r.getColumns()[i].getName())) {
+                if (cols == null || cols.contains(r.getColumns()[i].getName())) {
                     Object obj = r.get(i);
                     if (obj instanceof SimpleJsonValue) {
                         map.put(r.getColumns()[i].getName(), ((SimpleJsonValue) obj).toString());