Browse Source

oss delete

丁云鹏 10 tháng trước cách đây
mục cha
commit
3dfaefc043

+ 5 - 0
recommend-feature-produce/pom.xml

@@ -102,6 +102,11 @@
             <artifactId>slf4j-simple</artifactId>
             <version>1.7.28</version>
         </dependency>
+        <dependency>
+            <groupId>com.aliyun.oss</groupId>
+            <artifactId>aliyun-sdk-oss</artifactId>
+            <version>3.17.4</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

+ 0 - 144
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/Test.java

@@ -1,144 +0,0 @@
-package com.tzld.piaoquan.recommend.feature.produce;
-
-import com.aliyun.odps.Instance;
-import com.aliyun.odps.Odps;
-import com.aliyun.odps.OdpsException;
-import com.aliyun.odps.account.Account;
-import com.aliyun.odps.account.AliyunAccount;
-import com.aliyun.odps.data.Record;
-import com.aliyun.odps.data.SimpleJsonValue;
-import com.aliyun.odps.task.SQLTask;
-import com.google.common.reflect.TypeToken;
-import com.tzld.piaoquan.recommend.feature.produce.util.CommonCollectionUtils;
-import com.tzld.piaoquan.recommend.feature.produce.util.JSONUtils;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @author dyp
- */
-@Slf4j
-public class Test {
-    private static Map<String, String> tableToCol;
-
-    static {
-        tableToCol = new HashMap<>();
-        tableToCol.put("alg_vid_feature_all_exp", "b1_feature");
-        tableToCol.put("alg_vid_feature_all_share", "b2_feature");
-        tableToCol.put("alg_vid_feature_all_return", "b3_feature");
-//        tableToCol.put("alg_vid_feature_head_play", "b4_feature");
-//        tableToCol.put("alg_vid_feature_feed_play", "b5_feature");
-        tableToCol.put("alg_vid_feature_exp2share", "b6_feature");
-        tableToCol.put("alg_vid_feature_share2return", "b7_feature");
-        tableToCol.put("alg_vid_feature_feed_noflow_exp", "b8_feature");
-        tableToCol.put("alg_vid_feature_feed_noflow_root_share", "b9_feature");
-        tableToCol.put("alg_vid_feature_feed_noflow_root_return", "b10_feature");
-        tableToCol.put("alg_vid_feature_feed_flow_exp", "b11_feature");
-        tableToCol.put("alg_vid_feature_feed_flow_root_share", "b12_feature");
-        tableToCol.put("alg_vid_feature_feed_flow_root_return", "b13_feature");
-//        tableToCol.put("alg_vid_feature_feed_apptype_exp", "b14_feature");
-//        tableToCol.put("alg_vid_feature_feed_apptype_root_share", "b15_feature");
-//        tableToCol.put("alg_vid_feature_feed_apptype_root_return", "b16_feature");
-        tableToCol.put("alg_vid_feature_feed_province_exp", "b17_feature");
-        tableToCol.put("alg_vid_feature_feed_province_root_share", "b18_feature");
-        tableToCol.put("alg_vid_feature_feed_province_root_return", "b19_feature");
-        tableToCol.put("alg_mid_feature_play", "c1_feature");
-        tableToCol.put("alg_mid_feature_share_and_return", "c2_feature");
-        tableToCol.put("alg_mid_feature_play_tags", "c3_feature");
-        tableToCol.put("alg_mid_feature_return_tags", "c4_feature");
-        tableToCol.put("alg_mid_feature_share_tags", "c5_feature");
-        tableToCol.put("alg_mid_feature_feed_exp_share_tags", "c6_feature");
-        tableToCol.put("alg_mid_feature_feed_exp_return_tags", "c7_feature");
-        tableToCol.put("alg_mid_feature_sharecf", "c8_feature");
-        tableToCol.put("alg_mid_feature_returncf", "c9_feature");
-
-
-    }
-
-    public static void main(String[] args) {
-//        args = new String[10];
-//        args[0] = "-project";
-//        args[1] = "loghubods";
-//        args[2] = "-table";
-//        args[3] = "alg_mid_feature_play";
-//        args[4] = "-dt";
-//        args[5] = "20240612";
-//        args[6] = "-hh";
-//        args[7] = "15";
-//        args[8] = "-env";
-//        args[9] = "test";
-//        ODPSToRedis.main(args);
-
-        final String accessId = "LTAIWYUujJAm7CbH";
-        final String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
-        final String odpsUrl = "http://service.odps.aliyun.com/api";
-        final String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun.com";
-        Account account = new AliyunAccount(accessId, accessKey);
-        Odps odps = new Odps(account);
-        odps.setEndpoint(odpsUrl);
-        odps.setDefaultProject("loghubods");
-
-        String sql = "select * from loghubods.alg_recsys_sample_all_new where dt=20240616 and hh=18 and flowpool = ''" +
-                " AND abcode='ab0' limit 100;";
-
-        List<Record> records = null;
-        try {
-            Instance i = SQLTask.run(odps, sql);
-            i.waitForSuccess();
-            records = SQLTask.getResult(i);
-        } catch (OdpsException e) {
-            log.error("request odps error", e);
-        }
-
-        List<Map<String, String>> fieldValues = CommonCollectionUtils.toList(records, r -> {
-            Map<String, String> map = new HashMap<>();
-            for (int i = 0; i < r.getColumnCount(); i++) {
-                Object obj = r.get(i);
-                if (obj instanceof SimpleJsonValue) {
-                    map.put(r.getColumns()[i].getName(), ((SimpleJsonValue) obj).toString());
-                } else {
-                    map.put(r.getColumns()[i].getName(), r.getString(i));
-                }
-            }
-            return map;
-        });
-
-
-        int diff = 0;
-        A:
-        for (Map<String, String> map : fieldValues) {
-            Map<String, Map<String, String>> metaFeatureMap = JSONUtils.fromJson(map.get("metafeaturemap"),
-                    new TypeToken<Map<String, Map<String, String>>>() {
-                    }, Collections.emptyMap());
-            for (Map.Entry<String, String> e : tableToCol.entrySet()) {
-                Map<String, String> offline = JSONUtils.fromJson(map.get(e.getValue()), new TypeToken<Map<String, String>>() {
-                }, Collections.emptyMap());
-
-                Map<String, String> online = metaFeatureMap.getOrDefault(e.getKey(), new HashMap<>());
-
-
-                if (offline.size() != online.size()) {
-                    diff++;
-                    continue A;
-                }
-
-                for (Map.Entry<String, String> offlineE : offline.entrySet()) {
-                    if (!StringUtils.equals(online.get(offlineE.getKey()), offlineE.getValue())) {
-                        diff++;
-                        continue A;
-                    }
-                }
-            }
-        }
-
-        log.info("diff {}", diff);
-
-    }
-
-
-}

+ 51 - 0
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/VideoCleanODPSToHDFS.java

@@ -0,0 +1,51 @@
+package com.tzld.piaoquan.recommend.feature.produce;
+
+import com.tzld.piaoquan.recommend.feature.produce.service.HDFSService;
+import com.tzld.piaoquan.recommend.feature.produce.service.ODPSService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.Map;
+
+/**
+ * @author dyp
+ */
+@Slf4j
+public class VideoCleanODPSToHDFS {
+    public static void main(String[] args) {
+        SparkConf sparkConf = new SparkConf()
+                .setAppName("VideoCleanODPSToHDFS");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+        ODPSService odpsService = new ODPSService();
+        JavaRDD<Map<String, String>> record = odpsService.read(jsc, "loghubods", "not_active_videos", "", 90);
+
+        JavaRDD<String> data = record.map(r -> {
+            StringBuilder sb = new StringBuilder();
+            sb.append(r.get("videoid"));
+            sb.append("\t");
+            sb.append(r.get("video_path"));
+            sb.append("\t");
+            sb.append(r.get("transed_video_path"));
+            sb.append("\t");
+            sb.append(r.get("cover_img_path"));
+            sb.append("\t");
+            sb.append(r.get("self_cover_img_path"));
+            sb.append("\t");
+            sb.append(r.get("share_moment_img_path"));
+            return sb.toString();
+        });
+
+        String path = "/dyp/oss/video_clean/";
+        try {
+            HDFSService hdfsService = new HDFSService();
+            hdfsService.delete(path);
+            data.coalesce(1000).saveAsTextFile(path);
+        } catch (Exception e) {
+            log.info("hdfs error ", e);
+        }
+    }
+
+}

+ 34 - 0
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/service/HDFSService.java

@@ -0,0 +1,34 @@
+package com.tzld.piaoquan.recommend.feature.produce.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * @author dyp
+ */
+@Slf4j
+public class HDFSService implements Serializable {
+    private FileSystem fSystem;
+
+    public HDFSService() throws IOException {
+        fSystem = FileSystem.get(new Configuration());
+    }
+
+    public boolean createDir(String dir) throws IOException {
+        Path dirPath = new Path(dir);
+        if (!fSystem.exists(dirPath)) {
+            fSystem.mkdirs(dirPath);
+        }
+        return true;
+    }
+
+    public boolean delete(String path) throws IOException {
+        return fSystem.deleteOnExit(new Path(path));
+    }
+
+}

+ 40 - 0
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/service/OSSService.java

@@ -0,0 +1,40 @@
+package com.tzld.piaoquan.recommend.feature.produce.service;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClientBuilder;
+import com.aliyun.oss.model.CopyObjectRequest;
+import com.aliyun.oss.model.CopyObjectResult;
+import com.aliyun.oss.model.ObjectMetadata;
+import com.aliyun.oss.model.StorageClass;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * @author dyp
+ */
+@Slf4j
+public class OSSService implements Serializable {
+    private String accessId = "LTAI5tHMkNaRhpiDB1yWMZPn";
+    private String accessKey = "XLi5YUJusVwbbQOaGeGsaRJ1Qyzbui";
+    private String endpoint = "https://oss-cn-hangzhou.aliyuncs.com";
+
+    public void transToDeepColdArchive(String bucketName, List<String> objectNames) {
+        OSS ossClient = new OSSClientBuilder().build(endpoint, accessId, accessKey);
+        for (String objectName : objectNames) {
+            try {
+                CopyObjectRequest request = new CopyObjectRequest(bucketName, objectName, bucketName, objectName);
+                ObjectMetadata objectMetadata = new ObjectMetadata();
+                objectMetadata.setHeader("x-oss-storage-class", StorageClass.DeepColdArchive);
+                request.setNewObjectMetadata(objectMetadata);
+                CopyObjectResult result = ossClient.copyObject(request);
+            } catch (Exception e) {
+                log.info("copy error {}", objectName, e);
+            }
+        }
+        if (ossClient != null) {
+            ossClient.shutdown();
+        }
+    }
+}