Prechádzať zdrojové kódy

model load from oss

丁云鹏 1 rok pred
rodič
commit
87ff361111

+ 44 - 19
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/score/model/ModelManager.java

@@ -6,14 +6,14 @@ import com.aliyun.oss.OSSClientBuilder;
 import com.aliyun.oss.common.auth.CredentialsProvider;
 import com.aliyun.oss.common.auth.DefaultCredentialProvider;
 import com.aliyun.oss.model.OSSObject;
+import com.ctrip.framework.apollo.Config;
+import com.ctrip.framework.apollo.ConfigService;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -23,21 +23,44 @@ import java.util.concurrent.TimeUnit;
 public class ModelManager {
     private static final int SCHEDULE_PERIOD = 10;
     private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-    private static final String SUCCESS = "_SUCCESS";
     private static ModelManager instance;
-    Map<String, ModelLoadTask> loadTasks = new HashMap<String, ModelLoadTask>();
-    Map<String, String> modelPathMap = new HashMap<String, String>();
+    Map<String, ModelLoadTask> loadTasks = new HashMap<>();
+    Map<String, String> modelPathMap = new HashMap<>();
     private OSS client;
     private String bucketName;
 
+    private final String modelOssEndpoint = "model.oss.endpoint";
+    private final String modelOssAccessKeyId = "model.oss.accessKeyId";
+    private final String modelOssAccessKeySecret = "model.oss.accessKetSecret";
+    private final String modelOssBucketName = "model.oss.bucketName";
+
     private ModelManager() {
-        // TODO config load
-        OssConfig config = null;
+        // config load
+        Config config = ConfigService.getAppConfig();
+        String endpoint = config.getProperty(modelOssEndpoint, "");
+        String accessKeyId = config.getProperty(modelOssAccessKeyId, "");
+        String accessKetSecret = config.getProperty(modelOssAccessKeySecret, "");
         // oss client
-        CredentialsProvider credentialsProvider = new DefaultCredentialProvider(config.getAccessKeyId(), config.getAccessKeySecret());
-        client = new OSSClientBuilder().build(config.getEndpoint(), credentialsProvider);
+        CredentialsProvider credentialsProvider = new DefaultCredentialProvider(accessKeyId, accessKetSecret);
+        this.client = new OSSClientBuilder().build(endpoint, credentialsProvider);
+        this.bucketName = config.getProperty(modelOssBucketName, "");
+
+        config.addChangeListener(changeEvent -> {
+            if (changeEvent.isChanged(modelOssEndpoint)
+                    || changeEvent.isChanged(modelOssAccessKeyId)
+                    || changeEvent.isChanged(modelOssAccessKeySecret)) {
+                String endpointNew = config.getProperty(modelOssEndpoint, "");
+                String accessKeyIdNew = config.getProperty(modelOssAccessKeyId, "");
+                String accessKetSecretNew = config.getProperty(modelOssAccessKeySecret, "");
+                CredentialsProvider credentialsProviderNew = new DefaultCredentialProvider(accessKeyIdNew,
+                        accessKetSecretNew);
+                this.client = new OSSClientBuilder().build(endpointNew, credentialsProviderNew);
+            }
+            if (changeEvent.isChanged(modelOssBucketName)) {
+                this.bucketName = config.getProperty(modelOssBucketName, "");
+            }
+        });
 
-        this.bucketName = config.getBucketName();
 
         start(SCHEDULE_PERIOD);
     }
@@ -62,15 +85,17 @@ public class ModelManager {
      */
     public void registerModel(String modelName, String path, Class<? extends Model> modelClass) throws ModelRegisterException, IOException {
         if (modelPathMap.containsKey(modelName)) {
-            String oldPath = modelPathMap.get(modelName);
-            if (path.equals(oldPath)) {
-                //如果模型的path没有发生改变, 不做任何操作
-                log.info("Model [{}] and Path [{}] has exist", modelName, path);
-                return;
-            } else {
-                //如果模型的path发生改变, 需要注销掉原有的任务
-                unRegisterModel(modelName);
-            }
+            // fail fast
+            throw new RuntimeException(modelName + " already exists");
+//            String oldPath = modelPathMap.get(modelName);
+//            if (path.equals(oldPath)) {
+//                //如果模型的path没有发生改变, 不做任何操作
+//                log.info("Model [{}] and Path [{}] has exist", modelName, path);
+//                return;
+//            } else {
+//                //如果模型的path发生改变, 需要注销掉原有的任务
+//                unRegisterModel(modelName);
+//            }
         }
 
         modelPathMap.put(modelName, path);