丁云鹏 5 miesięcy temu
rodzic
commit
d8f8b76995
17 zmienionych plików z 1641 dodań i 54 usunięć
  1. 72 1
      recommend-server-task/pom.xml
  2. 9 1
      recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/Application.java
  3. 358 0
      recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/common/RecommendLoghubAppender.java
  4. 37 0
      recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/common/RecommendLoghubAppenderCallback.java
  5. 116 0
      recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/consumer/VideoVectorProduceConsumer.java
  6. 878 0
      recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/remote/CustomMilvusServiceClient.java
  7. 83 0
      recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/remote/MilvusRemoteService.java
  8. 27 0
      recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/remote/ModelRemoteService.java
  9. 43 0
      recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/service/ODPSService.java
  10. 0 13
      recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/xxl/FeatureJob.java
  11. 0 17
      recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/xxl/ModelJob.java
  12. 6 1
      recommend-server-task/src/main/resources/application-dev.yml
  13. 1 1
      recommend-server-task/src/main/resources/application-pre.yml
  14. 1 1
      recommend-server-task/src/main/resources/application-prod.yml
  15. 1 1
      recommend-server-task/src/main/resources/application-test.yml
  16. 1 6
      recommend-server-task/src/main/resources/application.yml
  17. 8 12
      recommend-server-task/src/main/resources/logback-spring.xml

+ 72 - 1
recommend-server-task/pom.xml

@@ -14,19 +14,90 @@
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
+        <gson.version>2.10.1</gson.version>
     </properties>
 
     <dependencies>
         <dependency>
             <groupId>com.tzld.piaoquan</groupId>
-            <artifactId>recommend-server-service</artifactId>
+            <artifactId>recommend-model-client</artifactId>
             <version>1.0.0</version>
         </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-core</artifactId>
+            <version>0.45.6-public</version>
+        </dependency>
+        <dependency>
+            <groupId>com.ctrip.framework.apollo</groupId>
+            <artifactId>apollo-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba.csp</groupId>
+            <artifactId>sentinel-datasource-apollo</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.openservices</groupId>
+            <artifactId>aliyun-log-logback-appender</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.openservices</groupId>
+            <artifactId>aliyun-log-producer</artifactId>
+            <version>0.3.10</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.openservices</groupId>
+            <artifactId>aliyun-log</artifactId>
+            <version>0.6.35</version>
+        </dependency>
+        <dependency>
+            <groupId>net.devh</groupId>
+            <artifactId>grpc-server-spring-boot-starter</artifactId>
+            <version>2.9.0.RELEASE</version>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.24</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.xuxueli</groupId>
             <artifactId>xxl-job-core</artifactId>
             <version>2.2.0</version>
         </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>${gson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.milvus</groupId>
+            <artifactId>milvus-sdk-java</artifactId>
+            <version>2.4.5</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>grpc-netty-shaded</artifactId>
+                    <groupId>io.grpc</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>grpc-protobuf</artifactId>
+                    <groupId>io.grpc</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>grpc-stub</artifactId>
+                    <groupId>io.grpc</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client-java</artifactId>
+            <version>5.0.7</version>
+        </dependency>
     </dependencies>
     <build>
         <finalName>recommend-server-task</finalName>

+ 9 - 1
recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/Application.java

@@ -1,5 +1,6 @@
 package com.tzld.piaoquan.recommend.server.task;
 
+import com.tzld.piaoquan.recommend.model.grpc.client.PredictClient;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.context.annotation.ComponentScan;
@@ -9,11 +10,18 @@ import org.springframework.context.annotation.ComponentScan;
  */
 @SpringBootApplication
 @ComponentScan({
-        "com.tzld.piaoquan.recommend.server.task.xxl"
+        "com.tzld.piaoquan.recommend.server.task.xxl",
+        "com.tzld.piaoquan.recommend.server.task.remote",
+        "com.tzld.piaoquan.recommend.server.task.service"
 })
 public class Application {
     public static void main(String[] args) {
         SpringApplication.run(Application.class, args);
     }
+
+    @Bean
+    public PredictClient predictClient() {
+        return new PredictClient();
+    }
 }
 

+ 358 - 0
recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/common/RecommendLoghubAppender.java

@@ -0,0 +1,358 @@
+package com.tzld.piaoquan.recommend.server.task.common;
+
+import ch.qos.logback.classic.spi.IThrowableProxy;
+import ch.qos.logback.classic.spi.LoggingEvent;
+import ch.qos.logback.classic.spi.StackTraceElementProxy;
+import ch.qos.logback.classic.spi.ThrowableProxyUtil;
+import ch.qos.logback.core.CoreConstants;
+import ch.qos.logback.core.UnsynchronizedAppenderBase;
+import ch.qos.logback.core.encoder.Encoder;
+import com.aliyun.openservices.aliyun.log.producer.LogProducer;
+import com.aliyun.openservices.aliyun.log.producer.Producer;
+import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
+import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
+import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
+import com.aliyun.openservices.log.common.LogItem;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * @author dyp
+ */
+public class RecommendLoghubAppender<E> extends UnsynchronizedAppenderBase<E> {
+    private String project;
+    private String endpoint;
+    private String accessKeyId;
+    private String accessKeySecret;
+    private String userAgent = "logback";
+    protected Encoder<E> encoder;
+    protected ProducerConfig producerConfig = new ProducerConfig();
+    protected ProjectConfig projectConfig;
+    protected Producer producer;
+    protected String logStore;
+    protected String topic = "";
+    protected String source = "";
+    protected String timeZone = "UTC";
+    protected String timeFormat = "yyyy-MM-dd'T'HH:mmZ";
+    protected DateTimeFormatter formatter;
+    protected java.time.format.DateTimeFormatter formatter1;
+    private String mdcFields;
+
+    public RecommendLoghubAppender() {
+    }
+
+    public void start() {
+        try {
+            this.doStart();
+        } catch (Exception var2) {
+            this.addError("Failed to start LoghubAppender.", var2);
+        }
+
+    }
+
+    private void doStart() {
+        try {
+            this.formatter = DateTimeFormat.forPattern(this.timeFormat).withZone(DateTimeZone.forID(this.timeZone));
+        } catch (Exception var2) {
+            this.formatter1 = java.time.format.DateTimeFormatter.ofPattern(this.timeFormat).withZone(ZoneId.of(this.timeZone));
+        }
+
+        this.producer = this.createProducer();
+        super.start();
+    }
+
+    public Producer createProducer() {
+        this.projectConfig = this.buildProjectConfig();
+        Producer producer = new LogProducer(this.producerConfig);
+        producer.putProjectConfig(this.projectConfig);
+        return producer;
+    }
+
+    private ProjectConfig buildProjectConfig() {
+        return new ProjectConfig(this.project, this.endpoint, this.accessKeyId, this.accessKeySecret, (String) null, this.userAgent);
+    }
+
+    public void stop() {
+        try {
+            this.doStop();
+        } catch (Exception var2) {
+            this.addError("Failed to stop LoghubAppender.", var2);
+        }
+
+    }
+
+    private void doStop() throws InterruptedException, ProducerException {
+        if (this.isStarted()) {
+            super.stop();
+            this.producer.close();
+        }
+    }
+
+    public void append(E eventObject) {
+        try {
+            this.appendEvent(eventObject);
+        } catch (Exception var3) {
+            this.addError("Failed to append event.", var3);
+        }
+
+    }
+
+    private void appendEvent(E eventObject) {
+        if (eventObject instanceof LoggingEvent) {
+            LoggingEvent event = (LoggingEvent) eventObject;
+            List<LogItem> logItems = new ArrayList();
+            LogItem item = new LogItem();
+            logItems.add(item);
+            item.SetTime((int) (event.getTimeStamp() / 1000L));
+            if (this.formatter != null) {
+                DateTime dateTime = new DateTime(event.getTimeStamp());
+                item.PushBack("time", dateTime.toString(this.formatter));
+            } else {
+                Instant instant = Instant.ofEpochMilli(event.getTimeStamp());
+                item.PushBack("time", this.formatter1.format(instant));
+            }
+
+            item.PushBack("level", event.getLevel().toString());
+            item.PushBack("thread", event.getThreadName());
+            StackTraceElement[] caller = event.getCallerData();
+            if (caller != null && caller.length > 0) {
+                item.PushBack("location", caller[0].toString());
+            }
+
+            String message = event.getFormattedMessage();
+            item.PushBack("message", message);
+            IThrowableProxy iThrowableProxy = event.getThrowableProxy();
+            if (iThrowableProxy != null) {
+                String throwable = this.getExceptionInfo(iThrowableProxy);
+                throwable = throwable + this.fullDump(event.getThrowableProxy().getStackTraceElementProxyArray());
+                item.PushBack("throwable", throwable);
+            }
+
+            if (this.encoder != null) {
+                item.PushBack("log", new String(this.encoder.encode(eventObject)));
+            }
+
+            Optional.ofNullable(this.mdcFields).ifPresent((f) -> {
+                event.getMDCPropertyMap().entrySet().stream().filter((v) -> {
+                    return Arrays.stream(f.split(",")).anyMatch((i) -> {
+                        return i.equals(v.getKey());
+                    });
+                }).forEach((map) -> {
+                    item.PushBack((String) map.getKey(), (String) map.getValue());
+                });
+            });
+
+            try {
+                this.producer.send(this.projectConfig.getProject(), this.logStore, this.topic, this.source, logItems,
+                        new RecommendLoghubAppenderCallback(this, this.projectConfig.getProject(), this.logStore,
+                                this.topic, this.source, logItems));
+            } catch (Exception var9) {
+                this.addError("Failed to send log, project=" + this.project + ", logStore=" + this.logStore + ", topic=" + this.topic + ", source=" + this.source + ", logItem=" + logItems, var9);
+            }
+
+        }
+    }
+
+    public String getTimeFormat() {
+        return this.timeFormat;
+    }
+
+    public void setTimeFormat(String timeFormat) {
+        this.timeFormat = timeFormat;
+    }
+
+    private String getExceptionInfo(IThrowableProxy iThrowableProxy) {
+        String s = iThrowableProxy.getClassName();
+        String message = iThrowableProxy.getMessage();
+        return message != null ? s + ": " + message : s;
+    }
+
+    private String fullDump(StackTraceElementProxy[] stackTraceElementProxyArray) {
+        StringBuilder builder = new StringBuilder();
+        StackTraceElementProxy[] var3 = stackTraceElementProxyArray;
+        int var4 = stackTraceElementProxyArray.length;
+
+        for (int var5 = 0; var5 < var4; ++var5) {
+            StackTraceElementProxy step = var3[var5];
+            builder.append(CoreConstants.LINE_SEPARATOR);
+            String string = step.toString();
+            builder.append('\t').append(string);
+            ThrowableProxyUtil.subjoinPackagingData(builder, step);
+        }
+
+        return builder.toString();
+    }
+
+    public String getLogStore() {
+        return this.logStore;
+    }
+
+    public void setLogStore(String logStore) {
+        this.logStore = logStore;
+    }
+
+    public String getTopic() {
+        return this.topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getSource() {
+        return this.source;
+    }
+
+    public void setSource(String source) {
+        this.source = source;
+    }
+
+    public String getTimeZone() {
+        return this.timeZone;
+    }
+
+    public void setTimeZone(String timeZone) {
+        this.timeZone = timeZone;
+    }
+
+    public String getProject() {
+        return this.project;
+    }
+
+    public void setProject(String project) {
+        this.project = project;
+    }
+
+    public String getEndpoint() {
+        return this.endpoint;
+    }
+
+    public void setEndpoint(String endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    public String getAccessKeyId() {
+        return this.accessKeyId;
+    }
+
+    public void setAccessKeyId(String accessKeyId) {
+        this.accessKeyId = accessKeyId;
+    }
+
+    public String getAccessKeySecret() {
+        return this.accessKeySecret;
+    }
+
+    public void setAccessKeySecret(String accessKeySecret) {
+        this.accessKeySecret = accessKeySecret;
+    }
+
+    public String getUserAgent() {
+        return this.userAgent;
+    }
+
+    public void setUserAgent(String userAgent) {
+        this.userAgent = userAgent;
+    }
+
+    public int getTotalSizeInBytes() {
+        return this.producerConfig.getTotalSizeInBytes();
+    }
+
+    public void setTotalSizeInBytes(int totalSizeInBytes) {
+        this.producerConfig.setTotalSizeInBytes(totalSizeInBytes);
+    }
+
+    public long getMaxBlockMs() {
+        return this.producerConfig.getMaxBlockMs();
+    }
+
+    public void setMaxBlockMs(long maxBlockMs) {
+        this.producerConfig.setMaxBlockMs(maxBlockMs);
+    }
+
+    public int getIoThreadCount() {
+        return this.producerConfig.getIoThreadCount();
+    }
+
+    public void setIoThreadCount(int ioThreadCount) {
+        this.producerConfig.setIoThreadCount(ioThreadCount);
+    }
+
+    public int getBatchSizeThresholdInBytes() {
+        return this.producerConfig.getBatchSizeThresholdInBytes();
+    }
+
+    public void setBatchSizeThresholdInBytes(int batchSizeThresholdInBytes) {
+        this.producerConfig.setBatchSizeThresholdInBytes(batchSizeThresholdInBytes);
+    }
+
+    public int getBatchCountThreshold() {
+        return this.producerConfig.getBatchCountThreshold();
+    }
+
+    public void setBatchCountThreshold(int batchCountThreshold) {
+        this.producerConfig.setBatchCountThreshold(batchCountThreshold);
+    }
+
+    public int getLingerMs() {
+        return this.producerConfig.getLingerMs();
+    }
+
+    public void setLingerMs(int lingerMs) {
+        this.producerConfig.setLingerMs(lingerMs);
+    }
+
+    public int getRetries() {
+        return this.producerConfig.getRetries();
+    }
+
+    public void setRetries(int retries) {
+        this.producerConfig.setRetries(retries);
+    }
+
+    public int getMaxReservedAttempts() {
+        return this.producerConfig.getMaxReservedAttempts();
+    }
+
+    public void setMaxReservedAttempts(int maxReservedAttempts) {
+        this.producerConfig.setMaxReservedAttempts(maxReservedAttempts);
+    }
+
+    public long getBaseRetryBackoffMs() {
+        return this.producerConfig.getBaseRetryBackoffMs();
+    }
+
+    public void setBaseRetryBackoffMs(long baseRetryBackoffMs) {
+        this.producerConfig.setBaseRetryBackoffMs(baseRetryBackoffMs);
+    }
+
+    public long getMaxRetryBackoffMs() {
+        return this.producerConfig.getMaxRetryBackoffMs();
+    }
+
+    public void setMaxRetryBackoffMs(long maxRetryBackoffMs) {
+        this.producerConfig.setMaxRetryBackoffMs(maxRetryBackoffMs);
+    }
+
+    public Encoder<E> getEncoder() {
+        return this.encoder;
+    }
+
+    public void setEncoder(Encoder<E> encoder) {
+        this.encoder = encoder;
+    }
+
+    public void setMdcFields(String mdcFields) {
+        this.mdcFields = mdcFields;
+    }
+}

+ 37 - 0
recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/common/RecommendLoghubAppenderCallback.java

@@ -0,0 +1,37 @@
+package com.tzld.piaoquan.recommend.server.task.common;
+
+
+import com.aliyun.openservices.aliyun.log.producer.Callback;
+import com.aliyun.openservices.aliyun.log.producer.Result;
+import com.aliyun.openservices.log.common.LogItem;
+
+import java.util.List;
+
+/**
+ * @author dyp
+ */
+public class RecommendLoghubAppenderCallback<E> implements Callback {
+    protected RecommendLoghubAppender<E> loghubAppender;
+    protected String project;
+    protected String logstore;
+    protected String topic;
+    protected String source;
+    protected List<LogItem> logItems;
+
+    public RecommendLoghubAppenderCallback(RecommendLoghubAppender<E> loghubAppender, String project, String logstore,
+                                           String topic, String source, List<LogItem> logItems) {
+        this.loghubAppender = loghubAppender;
+        this.project = project;
+        this.logstore = logstore;
+        this.topic = topic;
+        this.source = source;
+        this.logItems = logItems;
+    }
+
+    public void onCompletion(Result result) {
+        if (!result.isSuccessful()) {
+            this.loghubAppender.addError("Failed to send log, project=" + this.project + ", logStore=" + this.logstore + ", topic=" + this.topic + ", source=" + this.source + ", logItem=" + this.logItems + ", errorCode=" + result.getErrorCode() + ", errorMessage=" + result.getErrorMessage());
+        }
+
+    }
+}

+ 116 - 0
recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/consumer/VideoVectorProduceConsumer.java

@@ -0,0 +1,116 @@
+package com.tzld.piaoquan.recommend.server.task.consumer;
+
+import com.google.gson.JsonObject;
+import com.tzld.piaoquan.recommend.server.task.remote.MilvusRemoteService;
+import com.tzld.piaoquan.recommend.server.task.remote.ModelRemoteService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.consumer.*;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ * @author dyp
+ */
+@Component
+@Slf4j
+public class VideoVectorProduceConsumer {
+
+
+    @Autowired
+    private ModelRemoteService modelRemoteService;
+
+    @Autowired
+    private MilvusRemoteService milvusRemoteService;
+
+    @PostConstruct
+    public void init() {
+        try {
+            start();
+        } catch (Exception e) {
+            log.error("", e);
+        }
+    }
+
+    private void start() throws ClientException {
+        /**
+         * 实例接入点,从控制台实例详情页的接入点页签中获取。
+         * 如果是在阿里云ECS内网访问,建议填写VPC接入点。
+         * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
+         */
+        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
+        //指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。
+        String topic = "Your Topic";
+        //为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。
+        String consumerGroup = "Your ConsumerGroup";
+        ClientServiceProvider provider = ClientServiceProvider.loadService();
+        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
+        /**
+         * 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台访问控制的智能身份识别页签中获取。
+         * 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。
+         * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
+         */
+        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
+        ClientConfiguration clientConfiguration = builder.build();
+        //订阅消息的过滤规则,表示订阅所有Tag的消息。
+        String tag = "*";
+        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
+        //初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
+        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
+                .setClientConfiguration(clientConfiguration)
+                //设置消费者分组。
+                .setConsumerGroup(consumerGroup)
+                //设置预绑定的订阅关系。
+                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
+                //设置消费监听器。
+                .setMessageListener(new Listener())
+                .build();
+    }
+
+    public class Listener implements MessageListener {
+
+        @Override
+        public ConsumeResult consume(MessageView messageView) {
+            try {
+                ByteBuffer messageBody = messageView.getBody();
+                String messageContent = new String(messageView.getBody().array());
+                // TODO 1. obtain video
+                
+                // TODO 2. obtain feature
+
+                // TODO 3. trans vector
+                transVector();
+                // TODO 4. write milvus
+                saveVector();
+            } catch (Exception e) {
+
+            }
+            return ConsumeResult.SUCCESS;
+        }
+
+        private float[] transVector() {
+            return modelRemoteService.dssmPredict("");
+        }
+
+        private void saveVector() {
+            List<JsonObject> datas = new ArrayList<>();
+            JsonObject obj = new JsonObject();
+            obj.addProperty("id", 0L);
+            obj.addProperty("vector", "[0.3580376395471989f, -0.6023495712049978f, 0.18414012509913835f, -0" +
+                    ".26286205330961354f, 0.9029438446296592f]");
+            obj.addProperty("color", "pink");
+            datas.add(obj);
+            milvusRemoteService.insert("demo", datas);
+        }
+    }
+
+
+}

+ 878 - 0
recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/remote/CustomMilvusServiceClient.java

@@ -0,0 +1,878 @@
+package com.tzld.piaoquan.recommend.server.task.remote;
+
+import io.grpc.Status;
+import io.grpc.*;
+import io.grpc.internal.DnsNameResolverProvider;
+import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
+import io.grpc.stub.MetadataUtils;
+import io.milvus.client.AbstractMilvusGrpcClient;
+import io.milvus.client.MilvusClient;
+import io.milvus.exception.MilvusException;
+import io.milvus.exception.ServerException;
+import io.milvus.grpc.*;
+import io.milvus.orm.iterator.QueryIterator;
+import io.milvus.orm.iterator.SearchIterator;
+import io.milvus.param.*;
+import io.milvus.param.alias.AlterAliasParam;
+import io.milvus.param.alias.CreateAliasParam;
+import io.milvus.param.alias.DropAliasParam;
+import io.milvus.param.alias.ListAliasesParam;
+import io.milvus.param.bulkinsert.BulkInsertParam;
+import io.milvus.param.bulkinsert.GetBulkInsertStateParam;
+import io.milvus.param.bulkinsert.ListBulkInsertTasksParam;
+import io.milvus.param.collection.*;
+import io.milvus.param.control.*;
+import io.milvus.param.credential.CreateCredentialParam;
+import io.milvus.param.credential.DeleteCredentialParam;
+import io.milvus.param.credential.ListCredUsersParam;
+import io.milvus.param.credential.UpdateCredentialParam;
+import io.milvus.param.dml.*;
+import io.milvus.param.highlevel.collection.CreateSimpleCollectionParam;
+import io.milvus.param.highlevel.collection.ListCollectionsParam;
+import io.milvus.param.highlevel.collection.response.ListCollectionsResponse;
+import io.milvus.param.highlevel.dml.*;
+import io.milvus.param.highlevel.dml.response.*;
+import io.milvus.param.index.*;
+import io.milvus.param.partition.*;
+import io.milvus.param.resourcegroup.*;
+import io.milvus.param.role.*;
+import lombok.NonNull;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author dyp
+ */
+public class CustomMilvusServiceClient extends AbstractMilvusGrpcClient {
+    private ManagedChannel channel;
+    private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
+    private final MilvusServiceGrpc.MilvusServiceFutureStub futureStub;
+    private final long rpcDeadlineMs;
+    private long timeoutMs = 0L;
+    private RetryParam retryParam = RetryParam.newBuilder().build();
+    private DnsNameResolverProvider dnsNameResolverProvider = new DnsNameResolverProvider();
+
+    public CustomMilvusServiceClient(@NonNull final ConnectParam connectParam) {
+        if (connectParam == null) {
+            throw new NullPointerException("connectParam is marked non-null but is null");
+        } else {
+            this.rpcDeadlineMs = connectParam.getRpcDeadlineMs();
+            Metadata metadata = new Metadata();
+            metadata.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), connectParam.getAuthorization());
+            if (StringUtils.isNotEmpty(connectParam.getDatabaseName())) {
+                metadata.put(Metadata.Key.of("dbname", Metadata.ASCII_STRING_MARSHALLER), connectParam.getDatabaseName());
+            }
+
+            List<ClientInterceptor> clientInterceptors = new ArrayList();
+            clientInterceptors.add(MetadataUtils.newAttachHeadersInterceptor(metadata));
+            clientInterceptors.add(new ClientInterceptor() {
+                public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+                    return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
+                        public void start(Listener<RespT> responseListener, Metadata headers) {
+                            if (connectParam.getClientRequestId() != null && !StringUtils.isEmpty((CharSequence) connectParam.getClientRequestId().get())) {
+                                headers.put(Metadata.Key.of("client_request_id", Metadata.ASCII_STRING_MARSHALLER), connectParam.getClientRequestId().get());
+                            }
+
+                            super.start(responseListener, headers);
+                        }
+                    };
+                }
+            });
+
+            String msg;
+            try {
+                SslContext sslContext;
+                NettyChannelBuilder builder;
+                if (StringUtils.isNotEmpty(connectParam.getServerPemPath())) {
+                    sslContext = GrpcSslContexts.forClient().trustManager(new File(connectParam.getServerPemPath())).build();
+                    builder = (NettyChannelBuilder) ((NettyChannelBuilder) ((NettyChannelBuilder) NettyChannelBuilder.forAddress(connectParam.getHost(), connectParam.getPort()).overrideAuthority(connectParam.getServerName())).sslContext(sslContext).maxInboundMessageSize(2147483647).keepAliveTime(connectParam.getKeepAliveTimeMs(), TimeUnit.MILLISECONDS).keepAliveTimeout(connectParam.getKeepAliveTimeoutMs(), TimeUnit.MILLISECONDS).keepAliveWithoutCalls(connectParam.isKeepAliveWithoutCalls()).idleTimeout(connectParam.getIdleTimeoutMs(), TimeUnit.MILLISECONDS)).intercept(clientInterceptors);
+                    if (connectParam.isSecure()) {
+                        builder.useTransportSecurity();
+                    }
+                    builder.nameResolverFactory(dnsNameResolverProvider);
+                    this.channel = builder.build();
+                } else if (StringUtils.isNotEmpty(connectParam.getClientPemPath()) && StringUtils.isNotEmpty(connectParam.getClientKeyPath()) && StringUtils.isNotEmpty(connectParam.getCaPemPath())) {
+                    sslContext = GrpcSslContexts.forClient().trustManager(new File(connectParam.getCaPemPath())).keyManager(new File(connectParam.getClientPemPath()), new File(connectParam.getClientKeyPath())).build();
+                    builder = (NettyChannelBuilder) ((NettyChannelBuilder) NettyChannelBuilder.forAddress(connectParam.getHost(), connectParam.getPort()).sslContext(sslContext).maxInboundMessageSize(2147483647).keepAliveTime(connectParam.getKeepAliveTimeMs(), TimeUnit.MILLISECONDS).keepAliveTimeout(connectParam.getKeepAliveTimeoutMs(), TimeUnit.MILLISECONDS).keepAliveWithoutCalls(connectParam.isKeepAliveWithoutCalls()).idleTimeout(connectParam.getIdleTimeoutMs(), TimeUnit.MILLISECONDS)).intercept(clientInterceptors);
+                    if (connectParam.isSecure()) {
+                        builder.useTransportSecurity();
+                    }
+
+                    if (StringUtils.isNotEmpty(connectParam.getServerName())) {
+                        builder.overrideAuthority(connectParam.getServerName());
+                    }
+                    builder.nameResolverFactory(dnsNameResolverProvider);
+                    this.channel = builder.build();
+                } else {
+                    ManagedChannelBuilder<?> builder2 = ManagedChannelBuilder.forAddress(connectParam.getHost(),
+                            connectParam.getPort()).usePlaintext().maxInboundMessageSize(2147483647).keepAliveTime(connectParam.getKeepAliveTimeMs(), TimeUnit.MILLISECONDS).keepAliveTimeout(connectParam.getKeepAliveTimeoutMs(), TimeUnit.MILLISECONDS).keepAliveWithoutCalls(connectParam.isKeepAliveWithoutCalls()).idleTimeout(connectParam.getIdleTimeoutMs(), TimeUnit.MILLISECONDS).intercept(clientInterceptors);
+                    if (connectParam.isSecure()) {
+                        builder2.useTransportSecurity();
+                    }
+
+                    builder2.nameResolverFactory(dnsNameResolverProvider);
+
+                    this.channel = builder2.build();
+                }
+            } catch (IOException var6) {
+                msg = "Failed to open credentials file. Error: " + var6.getMessage();
+                this.logError(msg, new Object[0]);
+                throw new RuntimeException(msg);
+            }
+
+            assert this.channel != null;
+
+            this.blockingStub = MilvusServiceGrpc.newBlockingStub(this.channel);
+            this.futureStub = MilvusServiceGrpc.newFutureStub(this.channel);
+            this.timeoutMs = connectParam.getConnectTimeoutMs();
+            R<ConnectResponse> resp = this.retry(() -> {
+                return this.connect(connectParam);
+            });
+            if (resp.getStatus() != R.Status.Success.getCode()) {
+                msg = "Failed to initialize connection. Error: " + resp.getMessage();
+                this.logError(msg, new Object[0]);
+                throw new RuntimeException(msg);
+            } else {
+                this.timeoutMs = 0L;
+            }
+        }
+    }
+
+    protected CustomMilvusServiceClient(CustomMilvusServiceClient src) {
+        this.channel = src.channel;
+        this.blockingStub = src.blockingStub;
+        this.futureStub = src.futureStub;
+        this.rpcDeadlineMs = src.rpcDeadlineMs;
+        this.timeoutMs = src.timeoutMs;
+        this.logLevel = src.logLevel;
+        this.retryParam = src.retryParam;
+    }
+
+    protected MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub() {
+        return this.rpcDeadlineMs > 0L ? (MilvusServiceGrpc.MilvusServiceBlockingStub) ((MilvusServiceGrpc.MilvusServiceBlockingStub) this.blockingStub.withWaitForReady()).withDeadlineAfter(this.rpcDeadlineMs, TimeUnit.MILLISECONDS) : this.blockingStub;
+    }
+
+    protected MilvusServiceGrpc.MilvusServiceFutureStub futureStub() {
+        return this.futureStub;
+    }
+
+    public boolean clientIsReady() {
+        return this.channel != null && !this.channel.isShutdown() && !this.channel.isTerminated();
+    }
+
+    public void close(long maxWaitSeconds) throws InterruptedException {
+        this.channel.shutdownNow();
+        this.channel.awaitTermination(maxWaitSeconds, TimeUnit.SECONDS);
+    }
+
+    public MilvusClient withTimeout(long timeout, TimeUnit timeoutUnit) {
+        long timeoutMillis = timeoutUnit.toMillis(timeout);
+        TimeoutInterceptor timeoutInterceptor = new TimeoutInterceptor(timeoutMillis);
+        final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStubTimeout = (MilvusServiceGrpc.MilvusServiceBlockingStub) this.blockingStub.withInterceptors(new ClientInterceptor[]{timeoutInterceptor});
+        final MilvusServiceGrpc.MilvusServiceFutureStub futureStubTimeout = (MilvusServiceGrpc.MilvusServiceFutureStub) this.futureStub.withInterceptors(new ClientInterceptor[]{timeoutInterceptor});
+        CustomMilvusServiceClient newClient = new CustomMilvusServiceClient(this) {
+            protected MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub() {
+                return blockingStubTimeout;
+            }
+
+            protected MilvusServiceGrpc.MilvusServiceFutureStub futureStub() {
+                return futureStubTimeout;
+            }
+        };
+        newClient.timeoutMs = timeoutMillis;
+        return newClient;
+    }
+
+    public MilvusClient withRetry(RetryParam retryParam) {
+        CustomMilvusServiceClient newClient = new CustomMilvusServiceClient(this);
+        newClient.retryParam = retryParam;
+        return newClient;
+    }
+
+    public MilvusClient withRetry(int retryTimes) {
+        if (retryTimes <= 0) {
+            return this;
+        } else {
+            CustomMilvusServiceClient newClient = new CustomMilvusServiceClient(this);
+            newClient.retryParam.setMaxRetryTimes(retryTimes);
+            return newClient;
+        }
+    }
+
+    public MilvusClient withRetryInterval(long interval, TimeUnit timeUnit) {
+        if (interval <= 0L) {
+            return this;
+        } else {
+            CustomMilvusServiceClient newClient = new CustomMilvusServiceClient(this);
+            newClient.retryParam.setInitialBackOffMs(timeUnit.toMillis(interval));
+            newClient.retryParam.setMaxBackOffMs(timeUnit.toMillis(interval));
+            return newClient;
+        }
+    }
+
+    private <T> R<T> retry(Callable<R<T>> callable) {
+        int maxRetryTimes = this.retryParam.getMaxRetryTimes();
+        if (maxRetryTimes <= 1) {
+            try {
+                return (R) callable.call();
+            } catch (Exception var14) {
+                return R.failed(var14);
+            }
+        } else {
+            long begin = System.currentTimeMillis();
+            Callable<Boolean> timeoutChecker = () -> {
+                long current = System.currentTimeMillis();
+                long cost = current - begin;
+                return this.timeoutMs > 0L && cost >= this.timeoutMs ? Boolean.TRUE : Boolean.FALSE;
+            };
+            long retryIntervalMs = this.retryParam.getInitialBackOffMs();
+
+            for (int k = 1; k <= maxRetryTimes; ++k) {
+                try {
+                    R<T> resp = (R) callable.call();
+                    if (resp.getStatus() == R.Status.Success.getCode()) {
+                        return resp;
+                    }
+
+                    Exception e = resp.getException();
+                    if (e instanceof StatusRuntimeException) {
+                        StatusRuntimeException rpcException = (StatusRuntimeException) e;
+                        Status.Code code = rpcException.getStatus().getCode();
+                        if (code == Status.DEADLINE_EXCEEDED.getCode() || code == Status.PERMISSION_DENIED.getCode() || code == Status.UNAUTHENTICATED.getCode() || code == Status.INVALID_ARGUMENT.getCode() || code == Status.ALREADY_EXISTS.getCode() || code == Status.RESOURCE_EXHAUSTED.getCode() || code == Status.UNIMPLEMENTED.getCode()) {
+                            return resp;
+                        }
+
+                        if (timeoutChecker.call() == Boolean.TRUE) {
+                            String msg = String.format("Retry timeout: %dms, maxRetry:%d, retries: %d, reason: %s", this.timeoutMs, maxRetryTimes, k, e);
+                            throw new MilvusException(msg, code.value());
+                        }
+                    } else {
+                        if (!(e instanceof ServerException)) {
+                            return resp;
+                        }
+
+                        ServerException serverException = (ServerException) e;
+                        if (timeoutChecker.call() == Boolean.TRUE) {
+                            String msg = String.format("Retry timeout: %dms, maxRetry:%d, retries: %d, reason: %s", this.timeoutMs, maxRetryTimes, k, e);
+                            throw new MilvusException(msg, serverException.getStatus());
+                        }
+
+                        if (!this.retryParam.isRetryOnRateLimit() || serverException.getCompatibleCode() != ErrorCode.RateLimit && serverException.getStatus() != 8) {
+                            return resp;
+                        }
+                    }
+
+                    if (k >= maxRetryTimes) {
+                        String msg = String.format("Finish %d retry times, stop retry", maxRetryTimes);
+                        this.logError(msg, new Object[0]);
+                        return resp;
+                    }
+
+                    if (k > 3) {
+                        this.logWarning(String.format("Retry(%d) with interval %dms. Reason: %s", k, retryIntervalMs, e), new Object[0]);
+                    }
+
+                    TimeUnit.MILLISECONDS.sleep(retryIntervalMs);
+                    retryIntervalMs *= (long) this.retryParam.getBackOffMultiplier();
+                    if (retryIntervalMs > this.retryParam.getMaxBackOffMs()) {
+                        retryIntervalMs = this.retryParam.getMaxBackOffMs();
+                    }
+                } catch (Exception var15) {
+                    this.logError(var15.getMessage(), new Object[0]);
+                    return R.failed(var15);
+                }
+            }
+
+            String msg = String.format("Finish %d retry times, stop retry", maxRetryTimes);
+            this.logError(msg, new Object[0]);
+            return R.failed(new RuntimeException(msg));
+        }
+    }
+
+    private R<ConnectResponse> connect(@NonNull ConnectParam connectParam) {
+        if (connectParam == null) {
+            throw new NullPointerException("connectParam is marked non-null but is null");
+        } else {
+            ClientInfo info = ClientInfo.newBuilder().setSdkType("Java").setSdkVersion(this.getSDKVersion()).setUser(connectParam.getUserName()).setHost(this.getHostName()).setLocalTime(this.getLocalTimeStr()).build();
+            ConnectRequest req = ConnectRequest.newBuilder().setClientInfo(info).build();
+            ConnectResponse resp = ((MilvusServiceGrpc.MilvusServiceBlockingStub) ((MilvusServiceGrpc.MilvusServiceBlockingStub) this.blockingStub.withWaitForReady()).withDeadlineAfter(connectParam.getConnectTimeoutMs(), TimeUnit.MILLISECONDS)).connect(req);
+            if (resp.getStatus().getCode() == 0 && resp.getStatus().getErrorCode().equals(ErrorCode.Success)) {
+                return R.success(resp);
+            } else {
+                throw new RuntimeException("Failed to initialize connection. Error: " + resp.getStatus().getReason());
+            }
+        }
+    }
+
+    private String getHostName() {
+        try {
+            InetAddress address = InetAddress.getLocalHost();
+            return address.getHostName();
+        } catch (UnknownHostException var2) {
+            this.logWarning("Failed to get host name! Exception:{}", new Object[]{var2});
+            return "Unknown";
+        }
+    }
+
+    private String getLocalTimeStr() {
+        LocalDateTime now = LocalDateTime.now();
+        return now.toString();
+    }
+
+    private String getSDKVersion() {
+        Package pkg = CustomMilvusServiceClient.class.getPackage();
+        String ver = pkg.getImplementationVersion();
+        return ver == null ? "" : ver;
+    }
+
+    public void setLogLevel(LogLevel level) {
+        this.logLevel = level;
+    }
+
+    public R<Boolean> hasCollection(HasCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.hasCollection(requestParam);
+        });
+    }
+
+    public R<RpcStatus> createDatabase(CreateDatabaseParam requestParam) {
+        return this.retry(() -> {
+            return super.createDatabase(requestParam);
+        });
+    }
+
+    public R<RpcStatus> dropDatabase(DropDatabaseParam requestParam) {
+        return this.retry(() -> {
+            return super.dropDatabase(requestParam);
+        });
+    }
+
+    public R<ListDatabasesResponse> listDatabases() {
+        return this.retry(() -> {
+            return super.listDatabases();
+        });
+    }
+
+    public R<RpcStatus> alterDatabase(AlterDatabaseParam requestParam) {
+        return this.retry(() -> {
+            return super.alterDatabase(requestParam);
+        });
+    }
+
+    public R<DescribeDatabaseResponse> describeDatabase(DescribeDatabaseParam requestParam) {
+        return this.retry(() -> {
+            return super.describeDatabase(requestParam);
+        });
+    }
+
+    public R<RpcStatus> createCollection(CreateCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.createCollection(requestParam);
+        });
+    }
+
+    public R<RpcStatus> dropCollection(DropCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.dropCollection(requestParam);
+        });
+    }
+
+    public R<RpcStatus> loadCollection(LoadCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.loadCollection(requestParam);
+        });
+    }
+
+    public R<RpcStatus> releaseCollection(ReleaseCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.releaseCollection(requestParam);
+        });
+    }
+
+    public R<DescribeCollectionResponse> describeCollection(DescribeCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.describeCollection(requestParam);
+        });
+    }
+
+    public R<GetCollectionStatisticsResponse> getCollectionStatistics(GetCollectionStatisticsParam requestParam) {
+        return this.retry(() -> {
+            return super.getCollectionStatistics(requestParam);
+        });
+    }
+
+    public R<RpcStatus> renameCollection(RenameCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.renameCollection(requestParam);
+        });
+    }
+
+    public R<ShowCollectionsResponse> showCollections(ShowCollectionsParam requestParam) {
+        return this.retry(() -> {
+            return super.showCollections(requestParam);
+        });
+    }
+
+    public R<RpcStatus> alterCollection(AlterCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.alterCollection(requestParam);
+        });
+    }
+
+    public R<FlushResponse> flush(FlushParam requestParam) {
+        return this.retry(() -> {
+            return super.flush(requestParam);
+        });
+    }
+
+    public R<FlushAllResponse> flushAll(boolean syncFlushAll, long syncFlushAllWaitingInterval, long syncFlushAllTimeout) {
+        return this.retry(() -> {
+            return super.flushAll(syncFlushAll, syncFlushAllWaitingInterval, syncFlushAllTimeout);
+        });
+    }
+
+    public R<RpcStatus> createPartition(CreatePartitionParam requestParam) {
+        return this.retry(() -> {
+            return super.createPartition(requestParam);
+        });
+    }
+
+    public R<RpcStatus> dropPartition(DropPartitionParam requestParam) {
+        return this.retry(() -> {
+            return super.dropPartition(requestParam);
+        });
+    }
+
+    public R<Boolean> hasPartition(HasPartitionParam requestParam) {
+        return this.retry(() -> {
+            return super.hasPartition(requestParam);
+        });
+    }
+
+    public R<RpcStatus> loadPartitions(LoadPartitionsParam requestParam) {
+        return this.retry(() -> {
+            return super.loadPartitions(requestParam);
+        });
+    }
+
+    public R<RpcStatus> releasePartitions(ReleasePartitionsParam requestParam) {
+        return this.retry(() -> {
+            return super.releasePartitions(requestParam);
+        });
+    }
+
+    public R<GetPartitionStatisticsResponse> getPartitionStatistics(GetPartitionStatisticsParam requestParam) {
+        return this.retry(() -> {
+            return super.getPartitionStatistics(requestParam);
+        });
+    }
+
+    public R<ShowPartitionsResponse> showPartitions(ShowPartitionsParam requestParam) {
+        return this.retry(() -> {
+            return super.showPartitions(requestParam);
+        });
+    }
+
+    public R<RpcStatus> createAlias(CreateAliasParam requestParam) {
+        return this.retry(() -> {
+            return super.createAlias(requestParam);
+        });
+    }
+
+    public R<RpcStatus> dropAlias(DropAliasParam requestParam) {
+        return this.retry(() -> {
+            return super.dropAlias(requestParam);
+        });
+    }
+
+    public R<RpcStatus> alterAlias(AlterAliasParam requestParam) {
+        return this.retry(() -> {
+            return super.alterAlias(requestParam);
+        });
+    }
+
+    public R<ListAliasesResponse> listAliases(ListAliasesParam requestParam) {
+        return this.retry(() -> {
+            return super.listAliases(requestParam);
+        });
+    }
+
+    public R<RpcStatus> createIndex(CreateIndexParam requestParam) {
+        return this.retry(() -> {
+            return super.createIndex(requestParam);
+        });
+    }
+
+    public R<RpcStatus> dropIndex(DropIndexParam requestParam) {
+        return this.retry(() -> {
+            return super.dropIndex(requestParam);
+        });
+    }
+
+    public R<DescribeIndexResponse> describeIndex(DescribeIndexParam requestParam) {
+        return this.retry(() -> {
+            return super.describeIndex(requestParam);
+        });
+    }
+
+    public R<GetIndexStateResponse> getIndexState(@NonNull GetIndexStateParam requestParam) {
+        if (requestParam == null) {
+            throw new NullPointerException("requestParam is marked non-null but is null");
+        } else {
+            return this.retry(() -> {
+                return super.getIndexState(requestParam);
+            });
+        }
+    }
+
+    public R<GetIndexBuildProgressResponse> getIndexBuildProgress(@NonNull GetIndexBuildProgressParam requestParam) {
+        if (requestParam == null) {
+            throw new NullPointerException("requestParam is marked non-null but is null");
+        } else {
+            return this.retry(() -> {
+                return super.getIndexBuildProgress(requestParam);
+            });
+        }
+    }
+
+    public R<MutationResult> insert(InsertParam requestParam) {
+        return this.retry(() -> {
+            return super.insert(requestParam);
+        });
+    }
+
+    public R<MutationResult> upsert(UpsertParam requestParam) {
+        return this.retry(() -> {
+            return super.upsert(requestParam);
+        });
+    }
+
+    public R<MutationResult> delete(DeleteParam requestParam) {
+        return this.retry(() -> {
+            return super.delete(requestParam);
+        });
+    }
+
+    public R<SearchResults> search(SearchParam requestParam) {
+        return this.retry(() -> {
+            return super.search(requestParam);
+        });
+    }
+
+    public R<SearchResults> hybridSearch(HybridSearchParam requestParam) {
+        return this.retry(() -> {
+            return super.hybridSearch(requestParam);
+        });
+    }
+
+    public R<QueryResults> query(QueryParam requestParam) {
+        return this.retry(() -> {
+            return super.query(requestParam);
+        });
+    }
+
+    public R<GetMetricsResponse> getMetrics(GetMetricsParam requestParam) {
+        return this.retry(() -> {
+            return super.getMetrics(requestParam);
+        });
+    }
+
+    public R<GetFlushStateResponse> getFlushState(GetFlushStateParam requestParam) {
+        return this.retry(() -> {
+            return super.getFlushState(requestParam);
+        });
+    }
+
+    public R<GetFlushAllStateResponse> getFlushAllState(GetFlushAllStateParam requestParam) {
+        return this.retry(() -> {
+            return super.getFlushAllState(requestParam);
+        });
+    }
+
+    public R<GetPersistentSegmentInfoResponse> getPersistentSegmentInfo(GetPersistentSegmentInfoParam requestParam) {
+        return this.retry(() -> {
+            return super.getPersistentSegmentInfo(requestParam);
+        });
+    }
+
+    public R<GetQuerySegmentInfoResponse> getQuerySegmentInfo(GetQuerySegmentInfoParam requestParam) {
+        return this.retry(() -> {
+            return super.getQuerySegmentInfo(requestParam);
+        });
+    }
+
+    public R<GetReplicasResponse> getReplicas(GetReplicasParam requestParam) {
+        return this.retry(() -> {
+            return super.getReplicas(requestParam);
+        });
+    }
+
+    public R<RpcStatus> loadBalance(LoadBalanceParam requestParam) {
+        return this.retry(() -> {
+            return super.loadBalance(requestParam);
+        });
+    }
+
+    public R<GetCompactionStateResponse> getCompactionState(GetCompactionStateParam requestParam) {
+        return this.retry(() -> {
+            return super.getCompactionState(requestParam);
+        });
+    }
+
+    public R<ManualCompactionResponse> manualCompact(ManualCompactParam requestParam) {
+        return this.retry(() -> {
+            return super.manualCompact(requestParam);
+        });
+    }
+
+    public R<GetCompactionPlansResponse> getCompactionStateWithPlans(GetCompactionPlansParam requestParam) {
+        return this.retry(() -> {
+            return super.getCompactionStateWithPlans(requestParam);
+        });
+    }
+
+    public R<RpcStatus> createCredential(CreateCredentialParam requestParam) {
+        return this.retry(() -> {
+            return super.createCredential(requestParam);
+        });
+    }
+
+    public R<RpcStatus> updateCredential(UpdateCredentialParam requestParam) {
+        return this.retry(() -> {
+            return super.updateCredential(requestParam);
+        });
+    }
+
+    public R<RpcStatus> deleteCredential(DeleteCredentialParam requestParam) {
+        return this.retry(() -> {
+            return super.deleteCredential(requestParam);
+        });
+    }
+
+    public R<ListCredUsersResponse> listCredUsers(ListCredUsersParam requestParam) {
+        return this.retry(() -> {
+            return super.listCredUsers(requestParam);
+        });
+    }
+
+    public R<RpcStatus> createRole(CreateRoleParam requestParam) {
+        return this.retry(() -> {
+            return super.createRole(requestParam);
+        });
+    }
+
+    public R<RpcStatus> dropRole(DropRoleParam requestParam) {
+        return this.retry(() -> {
+            return super.dropRole(requestParam);
+        });
+    }
+
+    public R<RpcStatus> addUserToRole(AddUserToRoleParam requestParam) {
+        return this.retry(() -> {
+            return super.addUserToRole(requestParam);
+        });
+    }
+
+    public R<RpcStatus> removeUserFromRole(RemoveUserFromRoleParam requestParam) {
+        return this.retry(() -> {
+            return super.removeUserFromRole(requestParam);
+        });
+    }
+
+    public R<SelectRoleResponse> selectRole(SelectRoleParam requestParam) {
+        return this.retry(() -> {
+            return super.selectRole(requestParam);
+        });
+    }
+
+    public R<SelectUserResponse> selectUser(SelectUserParam requestParam) {
+        return this.retry(() -> {
+            return super.selectUser(requestParam);
+        });
+    }
+
+    public R<RpcStatus> grantRolePrivilege(GrantRolePrivilegeParam requestParam) {
+        return this.retry(() -> {
+            return super.grantRolePrivilege(requestParam);
+        });
+    }
+
+    public R<RpcStatus> revokeRolePrivilege(RevokeRolePrivilegeParam requestParam) {
+        return this.retry(() -> {
+            return super.revokeRolePrivilege(requestParam);
+        });
+    }
+
+    public R<SelectGrantResponse> selectGrantForRole(SelectGrantForRoleParam requestParam) {
+        return this.retry(() -> {
+            return super.selectGrantForRole(requestParam);
+        });
+    }
+
+    public R<SelectGrantResponse> selectGrantForRoleAndObject(SelectGrantForRoleAndObjectParam requestParam) {
+        return this.retry(() -> {
+            return super.selectGrantForRoleAndObject(requestParam);
+        });
+    }
+
+    public R<ImportResponse> bulkInsert(BulkInsertParam requestParam) {
+        return this.retry(() -> {
+            return super.bulkInsert(requestParam);
+        });
+    }
+
+    public R<GetImportStateResponse> getBulkInsertState(GetBulkInsertStateParam requestParam) {
+        return this.retry(() -> {
+            return super.getBulkInsertState(requestParam);
+        });
+    }
+
+    public R<ListImportTasksResponse> listBulkInsertTasks(ListBulkInsertTasksParam requestParam) {
+        return this.retry(() -> {
+            return super.listBulkInsertTasks(requestParam);
+        });
+    }
+
+    public R<CheckHealthResponse> checkHealth() {
+        return this.retry(() -> {
+            return super.checkHealth();
+        });
+    }
+
+    public R<GetVersionResponse> getVersion() {
+        return this.retry(() -> {
+            return super.getVersion();
+        });
+    }
+
+    public R<GetLoadingProgressResponse> getLoadingProgress(GetLoadingProgressParam requestParam) {
+        return this.retry(() -> {
+            return super.getLoadingProgress(requestParam);
+        });
+    }
+
+    public R<GetLoadStateResponse> getLoadState(GetLoadStateParam requestParam) {
+        return this.retry(() -> {
+            return super.getLoadState(requestParam);
+        });
+    }
+
+    public R<RpcStatus> createResourceGroup(CreateResourceGroupParam requestParam) {
+        return this.retry(() -> {
+            return super.createResourceGroup(requestParam);
+        });
+    }
+
+    public R<RpcStatus> dropResourceGroup(DropResourceGroupParam requestParam) {
+        return this.retry(() -> {
+            return super.dropResourceGroup(requestParam);
+        });
+    }
+
+    public R<ListResourceGroupsResponse> listResourceGroups(ListResourceGroupsParam requestParam) {
+        return this.retry(() -> {
+            return super.listResourceGroups(requestParam);
+        });
+    }
+
+    public R<DescribeResourceGroupResponse> describeResourceGroup(DescribeResourceGroupParam requestParam) {
+        return this.retry(() -> {
+            return super.describeResourceGroup(requestParam);
+        });
+    }
+
+    public R<RpcStatus> transferNode(TransferNodeParam requestParam) {
+        return this.retry(() -> {
+            return super.transferNode(requestParam);
+        });
+    }
+
+    public R<RpcStatus> transferReplica(TransferReplicaParam requestParam) {
+        return this.retry(() -> {
+            return super.transferReplica(requestParam);
+        });
+    }
+
+    public R<RpcStatus> updateResourceGroups(UpdateResourceGroupsParam requestParam) {
+        return this.retry(() -> {
+            return super.updateResourceGroups(requestParam);
+        });
+    }
+
+    public R<RpcStatus> createCollection(CreateSimpleCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.createCollection(requestParam);
+        });
+    }
+
+    public R<ListCollectionsResponse> listCollections(ListCollectionsParam requestParam) {
+        return this.retry(() -> {
+            return super.listCollections(requestParam);
+        });
+    }
+
+    public R<InsertResponse> insert(InsertRowsParam requestParam) {
+        return this.retry(() -> {
+            return super.insert(requestParam);
+        });
+    }
+
+    public R<DeleteResponse> delete(DeleteIdsParam requestParam) {
+        return this.retry(() -> {
+            return super.delete(requestParam);
+        });
+    }
+
+    public R<GetResponse> get(GetIdsParam requestParam) {
+        return this.retry(() -> {
+            return super.get(requestParam);
+        });
+    }
+
+    public R<QueryResponse> query(QuerySimpleParam requestParam) {
+        return this.retry(() -> {
+            return super.query(requestParam);
+        });
+    }
+
+    public R<SearchResponse> search(SearchSimpleParam requestParam) {
+        return this.retry(() -> {
+            return super.search(requestParam);
+        });
+    }
+
+    public R<QueryIterator> queryIterator(QueryIteratorParam requestParam) {
+        return this.retry(() -> {
+            return super.queryIterator(requestParam);
+        });
+    }
+
+    public R<SearchIterator> searchIterator(SearchIteratorParam requestParam) {
+        return this.retry(() -> {
+            return super.searchIterator(requestParam);
+        });
+    }
+
+    private static class TimeoutInterceptor implements ClientInterceptor {
+        private final long timeoutMillis;
+
+        TimeoutInterceptor(long timeoutMillis) {
+            this.timeoutMillis = timeoutMillis;
+        }
+
+        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+            return next.newCall(method, callOptions.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS));
+        }
+    }
+}

+ 83 - 0
recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/remote/MilvusRemoteService.java

@@ -0,0 +1,83 @@
+package com.tzld.piaoquan.recommend.server.task.remote;
+
+import io.milvus.client.MilvusClient;
+import io.milvus.grpc.MutationResult;
+import io.milvus.grpc.SearchResultData;
+import io.milvus.grpc.SearchResults;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.R;
+import io.milvus.param.dml.InsertParam;
+import io.milvus.param.dml.SearchParam;
+import io.milvus.pool.ClientPool;
+import io.milvus.pool.PoolClientFactory;
+import io.milvus.pool.PoolConfig;
+import io.milvus.v2.service.vector.request.InsertReq;
+import io.milvus.v2.service.vector.response.InsertResp;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.time.Duration;
+import java.util.List;
+
+import com.google.gson.JsonObject;
+
+/**
+ * https://github.com/milvus-io/milvus-sdk-java/blob/2.4/examples/main/java/io/milvus/v2/ClientPoolExample.java
+ *
+ * @author dyp
+ */
+@Component
+@Slf4j
+public class MilvusRemoteService {
+
+    private CustomMilvusClientPool pool;
+    private String poolKey = "milvus";
+    @Value("${zillizUri:}")
+    private String zillizUri;
+    @Value("${zillizToken:}")
+    private String zillizToken;
+
+
+    @PostConstruct
+    public void init() {
+        try {
+            // milvus 使用 io.grpc.internal.DnsNameResolverProvider
+            ConnectParam connectConfig = ConnectParam.newBuilder()
+                    .withUri(zillizUri)
+                    .withToken(zillizToken)
+                    .build();
+            PoolConfig poolConfig = PoolConfig.builder()
+                    .maxIdlePerKey(1000) // max idle clients per key
+                    .maxTotalPerKey(1000) // max total(idle + active) clients per key
+                    .maxTotal(1000) // max total clients for all keys
+                    .maxBlockWaitDuration(Duration.ofMillis(200)) // getClient() will wait 5 seconds if no idle
+                    // client available
+                    .minEvictableIdleDuration(Duration.ofSeconds(10L)) // if number of idle clients is larger than maxIdlePerKey, redundant idle clients will be evicted after 10 seconds
+                    .build();
+            pool = new CustomMilvusClientPool(poolConfig, connectConfig);
+        } catch (Exception e) {
+            log.error("MilvusClientV2Pool init error ", e);
+            throw new RuntimeException();
+        }
+    }
+
+    public void insert(String collection, List<JsonObject> data) {
+        MilvusClient client = pool.getClient(poolKey);
+        InsertParam insertReq = InsertParam.newBuilder()
+                .withCollectionName(collection)
+                .withRows(data)
+                .build();
+
+        client.insert(insertReq);
+
+    }
+
+
+    private class CustomMilvusClientPool extends ClientPool<ConnectParam, MilvusClient> {
+        public CustomMilvusClientPool(PoolConfig poolConfig, ConnectParam connectParam) throws ClassNotFoundException, NoSuchMethodException {
+            super(poolConfig, new PoolClientFactory(connectParam, CustomMilvusServiceClient.class.getName()));
+        }
+    }
+}

+ 27 - 0
recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/remote/ModelRemoteService.java

@@ -0,0 +1,27 @@
+package com.tzld.piaoquan.recommend.server.task.remote;
+
+import com.tzld.piaoquan.recommend.model.grpc.client.PredictClient;
+import com.tzld.piaoquan.recommend.model.grpc.model.PredictRequest;
+import com.tzld.piaoquan.recommend.model.grpc.model.PredictResponse;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author dyp
+ */
+@Component
+@Slf4j
+public class ModelRemoteService {
+
+    @Autowired
+    private PredictClient client;
+
+    public float[] dssmPredict(String param) {
+        String result = client.predict("dssm", param);
+        // TODO
+        return new float[0];
+
+    }
+
+}

+ 43 - 0
recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/service/ODPSService.java

@@ -0,0 +1,43 @@
+package com.tzld.piaoquan.recommend.server.task.service;
+
+import com.aliyun.odps.Instance;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.task.SQLTask;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+
+@Slf4j
+@Component
+public class ODPSService {
+    private final static String ACCESSID = "LTAIWYUujJAm7CbH";
+    private final static String ACCESSKEY = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
+    private final static String ENDPOINT = "http://service.cn.maxcompute.aliyun.com/api";
+
+    public List<Record> query(String sql) {
+        Account account = new AliyunAccount(ACCESSID, ACCESSKEY);
+        Odps odps = new Odps(account);
+        odps.setEndpoint(ENDPOINT);
+        odps.setDefaultProject("loghubods");
+        Instance i;
+        try {
+            i = SQLTask.run(odps, sql);
+            i.waitForSuccess();
+            List<Record> records = SQLTask.getResultByInstanceTunnel(i);
+            if (Objects.nonNull(records) && records.size() != 0) {
+                return records;
+            }
+        } catch (Exception e) {
+            log.error("odps query error", e);
+        }
+        return Collections.emptyList();
+    }
+}

+ 0 - 13
recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/xxl/FeatureJob.java

@@ -1,13 +0,0 @@
-package com.tzld.piaoquan.recommend.server.task.xxl;
-
-import com.xxl.job.core.biz.model.ReturnT;
-import com.xxl.job.core.handler.annotation.XxlJob;
-import org.springframework.stereotype.Component;
-
-/**
- * @author dyp
- */
-@Component
-public class FeatureJob {
-
-}

+ 0 - 17
recommend-server-task/src/main/java/com/tzld/piaoquan/recommend/server/task/xxl/ModelJob.java

@@ -1,17 +0,0 @@
-package com.tzld.piaoquan.recommend.server.task.xxl;
-
-import com.xxl.job.core.biz.model.ReturnT;
-import com.xxl.job.core.handler.annotation.XxlJob;
-import org.springframework.stereotype.Component;
-
-/**
- * @author dyp
- */
-@Component
-public class ModelJob {
-    @XxlJob("modelTrain")
-    public ReturnT<String> modelTrain() {
-        // TODO invoke
-        return ReturnT.SUCCESS;
-    }
-}

+ 6 - 1
recommend-server-task/src/main/resources/application-dev.yml

@@ -57,4 +57,9 @@ aliyun:
     endpoint: cn-hangzhou.log.aliyuncs.com
     accessKeyId: LTAIP6x1l3DXfSxm
     accessKeySecret: KbTaM9ars4OX3PMS6Xm7rtxGr1FLon
-    project: recommend-server-test
+    project: recommend-task-test
+
+odps:
+  accessKeyId: LTAIWYUujJAm7CbH
+  accessKeySecret: RfSjdiWwED1sGFlsjXv0DlfTnZTG1P
+  endpoint: http://service.cn.maxcompute.aliyun.com/api

+ 1 - 1
recommend-server-task/src/main/resources/application-pre.yml

@@ -61,4 +61,4 @@ aliyun:
     endpoint: cn-hangzhou-intranet.log.aliyuncs.com
     accessKeyId: LTAIP6x1l3DXfSxm
     accessKeySecret: KbTaM9ars4OX3PMS6Xm7rtxGr1FLon
-    project: recommend-server
+    project: recommend-task-test

+ 1 - 1
recommend-server-task/src/main/resources/application-prod.yml

@@ -61,4 +61,4 @@ aliyun:
     endpoint: cn-hangzhou-intranet.log.aliyuncs.com
     accessKeyId: LTAIP6x1l3DXfSxm
     accessKeySecret: KbTaM9ars4OX3PMS6Xm7rtxGr1FLon
-    project: recommend-server
+    project: recommend-task

+ 1 - 1
recommend-server-task/src/main/resources/application-test.yml

@@ -61,4 +61,4 @@ aliyun:
     endpoint: cn-hangzhou-intranet.log.aliyuncs.com
     accessKeyId: LTAIP6x1l3DXfSxm
     accessKeySecret: KbTaM9ars4OX3PMS6Xm7rtxGr1FLon
-    project: recommend-server-test
+    project: recommend-task-test

+ 1 - 6
recommend-server-task/src/main/resources/application.yml

@@ -9,9 +9,4 @@ logging:
     path: /datalog/weblog/${spring.application.name}/
 
 app:
-  id: recommend-server-task
-apollo:
-  bootstrap:
-    enabled: true
-    namespaces: application
-  cacheDir: /datalog/apollo-cache-dir
+  id: recommend-server-task

+ 8 - 12
recommend-server-task/src/main/resources/logback-spring.xml

@@ -156,7 +156,7 @@
         </filter>
     </appender>
 
-    <appender name="loghubAppenderInfo" class="com.tzld.piaoquan.recommend.server.common.RecommendLoghubAppender">
+    <appender name="loghubAppenderInfo" class="com.tzld.piaoquan.recommend.server.task.common.RecommendLoghubAppender">
         <!--必选项-->
         <!-- 账号及网络配置 -->
         <endpoint>${aliyun_log_endpoint}</endpoint>
@@ -195,7 +195,7 @@
         </mdcFields>
     </appender>
 
-    <appender name="loghubAppenderWarn" class="com.tzld.piaoquan.recommend.server.common.RecommendLoghubAppender">
+    <appender name="loghubAppenderWarn" class="com.tzld.piaoquan.recommend.server.task.common.RecommendLoghubAppender">
         <!--必选项-->
         <!-- 账号及网络配置 -->
         <endpoint>${aliyun_log_endpoint}</endpoint>
@@ -234,7 +234,7 @@
         </mdcFields>
     </appender>
 
-    <appender name="loghubAppenderError" class="com.tzld.piaoquan.recommend.server.common.RecommendLoghubAppender">
+    <appender name="loghubAppenderError" class="com.tzld.piaoquan.recommend.server.task.common.RecommendLoghubAppender">
         <!--必选项-->
         <!-- 账号及网络配置 -->
         <endpoint>${aliyun_log_endpoint}</endpoint>
@@ -299,27 +299,23 @@
     -->
 
     <springProfile name="dev">
-        <logger name="com.tzld.piaoquan.recommend.server" level="info"/>
+        <logger name="com.tzld.piaoquan.recommend.server.task" level="info"/>
     </springProfile>
     <springProfile name="test">
-        <logger name="com.tzld.piaoquan.recommend.server" level="info"/>
+        <logger name="com.tzld.piaoquan.recommend.server.task" level="info"/>
     </springProfile>
     <springProfile name="pre">
-        <logger name="com.tzld.piaoquan.recommend.server" level="info"/>
+        <logger name="com.tzld.piaoquan.recommend.server.task" level="info"/>
     </springProfile>
     <springProfile name="stress">
-        <logger name="com.tzld.piaoquan.recommend.server" level="info"/>
+        <logger name="com.tzld.piaoquan.recommend.server.task" level="info"/>
     </springProfile>
     <springProfile name="prod">
-        <logger name="com.tzld.piaoquan.recommend.server" level="info"/>
+        <logger name="com.tzld.piaoquan.recommend.server.task" level="info"/>
     </springProfile>
 
     <root level="info">
         <appender-ref ref="CONSOLE"/>
-        <appender-ref ref="DEBUG_FILE"/>
-        <appender-ref ref="INFO_FILE"/>
-        <appender-ref ref="WARN_FILE"/>
-        <appender-ref ref="ERROR_FILE"/>
         <appender-ref ref="loghubAppenderInfo"/>
         <appender-ref ref="loghubAppenderWarn"/>
         <appender-ref ref="loghubAppenderError"/>