Bladeren bron

oss delete

丁云鹏 10 maanden geleden
bovenliggende
commit
42bede7ee8

+ 10 - 2
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/VideoCleanExecutor.java

@@ -2,6 +2,7 @@ package com.tzld.piaoquan.recommend.feature.produce;
 
 import com.tzld.piaoquan.recommend.feature.produce.service.OSSService;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.math.NumberUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
@@ -17,7 +18,10 @@ import java.util.List;
 public class VideoCleanExecutor {
     public static void main(String[] args) {
         String file = args[0];
-        int repartition = Integer.valueOf(args[1]);
+        int repartition = NumberUtils.toInt(args[1], 160);
+        int sync = NumberUtils.toInt(args[2], 0);
+
+
         log.info("hdfs file {}", file);
         SparkConf sparkConf = new SparkConf()
                 //.setMaster("local")
@@ -34,7 +38,11 @@ public class VideoCleanExecutor {
                 String[] data = StringUtils.split(s.next(), "\t");
                 objectNames.add(data[2]);
             }
-            ossService.transToDeepColdArchive("art-pubbucket", objectNames);
+            if (sync == 1) {
+                ossService.transToDeepColdArchive("art-pubbucket", objectNames);
+            } else {
+                ossService.transToDeepColdArchive2("art-pubbucket", objectNames);
+            }
         });
     }
 

+ 34 - 0
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/service/OSSService.java

@@ -9,6 +9,10 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author dyp
@@ -39,4 +43,34 @@ public class OSSService implements Serializable {
             ossClient.shutdown();
         }
     }
+
+    public void transToDeepColdArchive2(String bucketName, List<String> objectNames) {
+        OSS ossClient = new OSSClientBuilder().build(endpoint, accessId, accessKey);
+        CountDownLatch cdl = new CountDownLatch(objectNames.size());
+        ExecutorService es = Executors.newFixedThreadPool(4);
+        for (String objectName : objectNames) {
+            es.submit(() -> {
+                try {
+                    if (!objectName.startsWith("http")) {
+                        CopyObjectRequest request = new CopyObjectRequest(bucketName, objectName, bucketName, objectName);
+                        ObjectMetadata objectMetadata = new ObjectMetadata();
+                        objectMetadata.setHeader("x-oss-storage-class", "DeepColdArchive");
+                        request.setNewObjectMetadata(objectMetadata);
+                        ossClient.copyObject(request);
+                    }
+                } catch (Exception e) {
+                    log.error("transToDeepColdArchive error {} {}", objectName, e.getMessage(), e);
+                }
+                cdl.countDown();
+            });
+        }
+        try {
+            cdl.await(1, TimeUnit.HOURS);
+        } catch (InterruptedException e) {
+            log.error("transToDeepColdArchive error", e);
+        }
+        if (ossClient != null) {
+            ossClient.shutdown();
+        }
+    }
 }