瀏覽代碼

model service

丁云鹏 5 月之前
父節點
當前提交
bd61fb0133

+ 14 - 1
recommend-server-service/pom.xml

@@ -27,6 +27,7 @@
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-pool2</artifactId>
+            <version>2.12.0</version>
         </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
@@ -57,6 +58,12 @@
             <groupId>redis.clients</groupId>
             <artifactId>jedis</artifactId>
             <version>3.3.0</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>commons-pool2</artifactId>
+                    <groupId>org.apache.commons</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <!--        <dependency>-->
         <!--            <groupId>org.apache.hadoop</groupId>-->
@@ -191,7 +198,7 @@
         <dependency>
             <groupId>com.google.protobuf</groupId>
             <artifactId>protobuf-java</artifactId>
-            <version>3.12.0</version>
+            <version>3.24.0</version>
         </dependency>
         <dependency>
             <groupId>org.projectlombok</groupId>
@@ -291,6 +298,12 @@
             <groupId>io.milvus</groupId>
             <artifactId>milvus-sdk-java</artifactId>
             <version>2.4.5</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>commons-pool2</artifactId>
+                    <groupId>org.apache.commons</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
 

+ 0 - 2
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/Application.java

@@ -1,7 +1,5 @@
 package com.tzld.piaoquan.recommend.server;
 
-// import com.tzld.piaoquan.recommend.feature.client.FeatureClient;
-
 import com.tzld.piaoquan.abtest.client.ABTestClient;
 import com.tzld.piaoquan.recommend.feature.client.FeatureClient;
 import com.tzld.piaoquan.recommend.feature.client.FeatureV2Client;

+ 878 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/remote/CustomMilvusServiceClient.java

@@ -0,0 +1,878 @@
+package com.tzld.piaoquan.recommend.server.remote;
+
+import io.grpc.*;
+import io.grpc.Status;
+import io.grpc.internal.DnsNameResolverProvider;
+import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
+import io.grpc.stub.MetadataUtils;
+import io.milvus.client.AbstractMilvusGrpcClient;
+import io.milvus.client.MilvusClient;
+import io.milvus.exception.MilvusException;
+import io.milvus.exception.ServerException;
+import io.milvus.grpc.*;
+import io.milvus.orm.iterator.QueryIterator;
+import io.milvus.orm.iterator.SearchIterator;
+import io.milvus.param.*;
+import io.milvus.param.alias.AlterAliasParam;
+import io.milvus.param.alias.CreateAliasParam;
+import io.milvus.param.alias.DropAliasParam;
+import io.milvus.param.alias.ListAliasesParam;
+import io.milvus.param.bulkinsert.BulkInsertParam;
+import io.milvus.param.bulkinsert.GetBulkInsertStateParam;
+import io.milvus.param.bulkinsert.ListBulkInsertTasksParam;
+import io.milvus.param.collection.*;
+import io.milvus.param.control.*;
+import io.milvus.param.credential.CreateCredentialParam;
+import io.milvus.param.credential.DeleteCredentialParam;
+import io.milvus.param.credential.ListCredUsersParam;
+import io.milvus.param.credential.UpdateCredentialParam;
+import io.milvus.param.dml.*;
+import io.milvus.param.highlevel.collection.CreateSimpleCollectionParam;
+import io.milvus.param.highlevel.collection.ListCollectionsParam;
+import io.milvus.param.highlevel.collection.response.ListCollectionsResponse;
+import io.milvus.param.highlevel.dml.*;
+import io.milvus.param.highlevel.dml.response.*;
+import io.milvus.param.index.*;
+import io.milvus.param.partition.*;
+import io.milvus.param.resourcegroup.*;
+import io.milvus.param.role.*;
+import lombok.NonNull;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author dyp
+ */
+public class CustomMilvusServiceClient extends AbstractMilvusGrpcClient {
+    private ManagedChannel channel;
+    private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
+    private final MilvusServiceGrpc.MilvusServiceFutureStub futureStub;
+    private final long rpcDeadlineMs;
+    private long timeoutMs = 0L;
+    private RetryParam retryParam = RetryParam.newBuilder().build();
+    private DnsNameResolverProvider dnsNameResolverProvider = new DnsNameResolverProvider();
+
+    public CustomMilvusServiceClient(@NonNull final ConnectParam connectParam) {
+        if (connectParam == null) {
+            throw new NullPointerException("connectParam is marked non-null but is null");
+        } else {
+            this.rpcDeadlineMs = connectParam.getRpcDeadlineMs();
+            Metadata metadata = new Metadata();
+            metadata.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), connectParam.getAuthorization());
+            if (StringUtils.isNotEmpty(connectParam.getDatabaseName())) {
+                metadata.put(Metadata.Key.of("dbname", Metadata.ASCII_STRING_MARSHALLER), connectParam.getDatabaseName());
+            }
+
+            List<ClientInterceptor> clientInterceptors = new ArrayList();
+            clientInterceptors.add(MetadataUtils.newAttachHeadersInterceptor(metadata));
+            clientInterceptors.add(new ClientInterceptor() {
+                public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+                    return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
+                        public void start(Listener<RespT> responseListener, Metadata headers) {
+                            if (connectParam.getClientRequestId() != null && !StringUtils.isEmpty((CharSequence) connectParam.getClientRequestId().get())) {
+                                headers.put(Metadata.Key.of("client_request_id", Metadata.ASCII_STRING_MARSHALLER), connectParam.getClientRequestId().get());
+                            }
+
+                            super.start(responseListener, headers);
+                        }
+                    };
+                }
+            });
+
+            String msg;
+            try {
+                SslContext sslContext;
+                NettyChannelBuilder builder;
+                if (StringUtils.isNotEmpty(connectParam.getServerPemPath())) {
+                    sslContext = GrpcSslContexts.forClient().trustManager(new File(connectParam.getServerPemPath())).build();
+                    builder = (NettyChannelBuilder) ((NettyChannelBuilder) ((NettyChannelBuilder) NettyChannelBuilder.forAddress(connectParam.getHost(), connectParam.getPort()).overrideAuthority(connectParam.getServerName())).sslContext(sslContext).maxInboundMessageSize(2147483647).keepAliveTime(connectParam.getKeepAliveTimeMs(), TimeUnit.MILLISECONDS).keepAliveTimeout(connectParam.getKeepAliveTimeoutMs(), TimeUnit.MILLISECONDS).keepAliveWithoutCalls(connectParam.isKeepAliveWithoutCalls()).idleTimeout(connectParam.getIdleTimeoutMs(), TimeUnit.MILLISECONDS)).intercept(clientInterceptors);
+                    if (connectParam.isSecure()) {
+                        builder.useTransportSecurity();
+                    }
+                    builder.nameResolverFactory(dnsNameResolverProvider);
+                    this.channel = builder.build();
+                } else if (StringUtils.isNotEmpty(connectParam.getClientPemPath()) && StringUtils.isNotEmpty(connectParam.getClientKeyPath()) && StringUtils.isNotEmpty(connectParam.getCaPemPath())) {
+                    sslContext = GrpcSslContexts.forClient().trustManager(new File(connectParam.getCaPemPath())).keyManager(new File(connectParam.getClientPemPath()), new File(connectParam.getClientKeyPath())).build();
+                    builder = (NettyChannelBuilder) ((NettyChannelBuilder) NettyChannelBuilder.forAddress(connectParam.getHost(), connectParam.getPort()).sslContext(sslContext).maxInboundMessageSize(2147483647).keepAliveTime(connectParam.getKeepAliveTimeMs(), TimeUnit.MILLISECONDS).keepAliveTimeout(connectParam.getKeepAliveTimeoutMs(), TimeUnit.MILLISECONDS).keepAliveWithoutCalls(connectParam.isKeepAliveWithoutCalls()).idleTimeout(connectParam.getIdleTimeoutMs(), TimeUnit.MILLISECONDS)).intercept(clientInterceptors);
+                    if (connectParam.isSecure()) {
+                        builder.useTransportSecurity();
+                    }
+
+                    if (StringUtils.isNotEmpty(connectParam.getServerName())) {
+                        builder.overrideAuthority(connectParam.getServerName());
+                    }
+                    builder.nameResolverFactory(dnsNameResolverProvider);
+                    this.channel = builder.build();
+                } else {
+                    ManagedChannelBuilder<?> builder2 = ManagedChannelBuilder.forAddress(connectParam.getHost(),
+                            connectParam.getPort()).usePlaintext().maxInboundMessageSize(2147483647).keepAliveTime(connectParam.getKeepAliveTimeMs(), TimeUnit.MILLISECONDS).keepAliveTimeout(connectParam.getKeepAliveTimeoutMs(), TimeUnit.MILLISECONDS).keepAliveWithoutCalls(connectParam.isKeepAliveWithoutCalls()).idleTimeout(connectParam.getIdleTimeoutMs(), TimeUnit.MILLISECONDS).intercept(clientInterceptors);
+                    if (connectParam.isSecure()) {
+                        builder2.useTransportSecurity();
+                    }
+
+                    builder2.nameResolverFactory(dnsNameResolverProvider);
+
+                    this.channel = builder2.build();
+                }
+            } catch (IOException var6) {
+                msg = "Failed to open credentials file. Error: " + var6.getMessage();
+                this.logError(msg, new Object[0]);
+                throw new RuntimeException(msg);
+            }
+
+            assert this.channel != null;
+
+            this.blockingStub = MilvusServiceGrpc.newBlockingStub(this.channel);
+            this.futureStub = MilvusServiceGrpc.newFutureStub(this.channel);
+            this.timeoutMs = connectParam.getConnectTimeoutMs();
+            R<ConnectResponse> resp = this.retry(() -> {
+                return this.connect(connectParam);
+            });
+            if (resp.getStatus() != R.Status.Success.getCode()) {
+                msg = "Failed to initialize connection. Error: " + resp.getMessage();
+                this.logError(msg, new Object[0]);
+                throw new RuntimeException(msg);
+            } else {
+                this.timeoutMs = 0L;
+            }
+        }
+    }
+
+    protected CustomMilvusServiceClient(CustomMilvusServiceClient src) {
+        this.channel = src.channel;
+        this.blockingStub = src.blockingStub;
+        this.futureStub = src.futureStub;
+        this.rpcDeadlineMs = src.rpcDeadlineMs;
+        this.timeoutMs = src.timeoutMs;
+        this.logLevel = src.logLevel;
+        this.retryParam = src.retryParam;
+    }
+
+    protected MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub() {
+        return this.rpcDeadlineMs > 0L ? (MilvusServiceGrpc.MilvusServiceBlockingStub) ((MilvusServiceGrpc.MilvusServiceBlockingStub) this.blockingStub.withWaitForReady()).withDeadlineAfter(this.rpcDeadlineMs, TimeUnit.MILLISECONDS) : this.blockingStub;
+    }
+
+    protected MilvusServiceGrpc.MilvusServiceFutureStub futureStub() {
+        return this.futureStub;
+    }
+
+    public boolean clientIsReady() {
+        return this.channel != null && !this.channel.isShutdown() && !this.channel.isTerminated();
+    }
+
+    public void close(long maxWaitSeconds) throws InterruptedException {
+        this.channel.shutdownNow();
+        this.channel.awaitTermination(maxWaitSeconds, TimeUnit.SECONDS);
+    }
+
+    public MilvusClient withTimeout(long timeout, TimeUnit timeoutUnit) {
+        long timeoutMillis = timeoutUnit.toMillis(timeout);
+        TimeoutInterceptor timeoutInterceptor = new TimeoutInterceptor(timeoutMillis);
+        final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStubTimeout = (MilvusServiceGrpc.MilvusServiceBlockingStub) this.blockingStub.withInterceptors(new ClientInterceptor[]{timeoutInterceptor});
+        final MilvusServiceGrpc.MilvusServiceFutureStub futureStubTimeout = (MilvusServiceGrpc.MilvusServiceFutureStub) this.futureStub.withInterceptors(new ClientInterceptor[]{timeoutInterceptor});
+        CustomMilvusServiceClient newClient = new CustomMilvusServiceClient(this) {
+            protected MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub() {
+                return blockingStubTimeout;
+            }
+
+            protected MilvusServiceGrpc.MilvusServiceFutureStub futureStub() {
+                return futureStubTimeout;
+            }
+        };
+        newClient.timeoutMs = timeoutMillis;
+        return newClient;
+    }
+
+    public MilvusClient withRetry(RetryParam retryParam) {
+        CustomMilvusServiceClient newClient = new CustomMilvusServiceClient(this);
+        newClient.retryParam = retryParam;
+        return newClient;
+    }
+
+    public MilvusClient withRetry(int retryTimes) {
+        if (retryTimes <= 0) {
+            return this;
+        } else {
+            CustomMilvusServiceClient newClient = new CustomMilvusServiceClient(this);
+            newClient.retryParam.setMaxRetryTimes(retryTimes);
+            return newClient;
+        }
+    }
+
+    public MilvusClient withRetryInterval(long interval, TimeUnit timeUnit) {
+        if (interval <= 0L) {
+            return this;
+        } else {
+            CustomMilvusServiceClient newClient = new CustomMilvusServiceClient(this);
+            newClient.retryParam.setInitialBackOffMs(timeUnit.toMillis(interval));
+            newClient.retryParam.setMaxBackOffMs(timeUnit.toMillis(interval));
+            return newClient;
+        }
+    }
+
+    private <T> R<T> retry(Callable<R<T>> callable) {
+        int maxRetryTimes = this.retryParam.getMaxRetryTimes();
+        if (maxRetryTimes <= 1) {
+            try {
+                return (R) callable.call();
+            } catch (Exception var14) {
+                return R.failed(var14);
+            }
+        } else {
+            long begin = System.currentTimeMillis();
+            Callable<Boolean> timeoutChecker = () -> {
+                long current = System.currentTimeMillis();
+                long cost = current - begin;
+                return this.timeoutMs > 0L && cost >= this.timeoutMs ? Boolean.TRUE : Boolean.FALSE;
+            };
+            long retryIntervalMs = this.retryParam.getInitialBackOffMs();
+
+            for (int k = 1; k <= maxRetryTimes; ++k) {
+                try {
+                    R<T> resp = (R) callable.call();
+                    if (resp.getStatus() == R.Status.Success.getCode()) {
+                        return resp;
+                    }
+
+                    Exception e = resp.getException();
+                    if (e instanceof StatusRuntimeException) {
+                        StatusRuntimeException rpcException = (StatusRuntimeException) e;
+                        Status.Code code = rpcException.getStatus().getCode();
+                        if (code == io.grpc.Status.DEADLINE_EXCEEDED.getCode() || code == io.grpc.Status.PERMISSION_DENIED.getCode() || code == io.grpc.Status.UNAUTHENTICATED.getCode() || code == io.grpc.Status.INVALID_ARGUMENT.getCode() || code == io.grpc.Status.ALREADY_EXISTS.getCode() || code == io.grpc.Status.RESOURCE_EXHAUSTED.getCode() || code == io.grpc.Status.UNIMPLEMENTED.getCode()) {
+                            return resp;
+                        }
+
+                        if (timeoutChecker.call() == Boolean.TRUE) {
+                            String msg = String.format("Retry timeout: %dms, maxRetry:%d, retries: %d, reason: %s", this.timeoutMs, maxRetryTimes, k, e);
+                            throw new MilvusException(msg, code.value());
+                        }
+                    } else {
+                        if (!(e instanceof ServerException)) {
+                            return resp;
+                        }
+
+                        ServerException serverException = (ServerException) e;
+                        if (timeoutChecker.call() == Boolean.TRUE) {
+                            String msg = String.format("Retry timeout: %dms, maxRetry:%d, retries: %d, reason: %s", this.timeoutMs, maxRetryTimes, k, e);
+                            throw new MilvusException(msg, serverException.getStatus());
+                        }
+
+                        if (!this.retryParam.isRetryOnRateLimit() || serverException.getCompatibleCode() != ErrorCode.RateLimit && serverException.getStatus() != 8) {
+                            return resp;
+                        }
+                    }
+
+                    if (k >= maxRetryTimes) {
+                        String msg = String.format("Finish %d retry times, stop retry", maxRetryTimes);
+                        this.logError(msg, new Object[0]);
+                        return resp;
+                    }
+
+                    if (k > 3) {
+                        this.logWarning(String.format("Retry(%d) with interval %dms. Reason: %s", k, retryIntervalMs, e), new Object[0]);
+                    }
+
+                    TimeUnit.MILLISECONDS.sleep(retryIntervalMs);
+                    retryIntervalMs *= (long) this.retryParam.getBackOffMultiplier();
+                    if (retryIntervalMs > this.retryParam.getMaxBackOffMs()) {
+                        retryIntervalMs = this.retryParam.getMaxBackOffMs();
+                    }
+                } catch (Exception var15) {
+                    this.logError(var15.getMessage(), new Object[0]);
+                    return R.failed(var15);
+                }
+            }
+
+            String msg = String.format("Finish %d retry times, stop retry", maxRetryTimes);
+            this.logError(msg, new Object[0]);
+            return R.failed(new RuntimeException(msg));
+        }
+    }
+
+    private R<ConnectResponse> connect(@NonNull ConnectParam connectParam) {
+        if (connectParam == null) {
+            throw new NullPointerException("connectParam is marked non-null but is null");
+        } else {
+            ClientInfo info = ClientInfo.newBuilder().setSdkType("Java").setSdkVersion(this.getSDKVersion()).setUser(connectParam.getUserName()).setHost(this.getHostName()).setLocalTime(this.getLocalTimeStr()).build();
+            ConnectRequest req = ConnectRequest.newBuilder().setClientInfo(info).build();
+            ConnectResponse resp = ((MilvusServiceGrpc.MilvusServiceBlockingStub) ((MilvusServiceGrpc.MilvusServiceBlockingStub) this.blockingStub.withWaitForReady()).withDeadlineAfter(connectParam.getConnectTimeoutMs(), TimeUnit.MILLISECONDS)).connect(req);
+            if (resp.getStatus().getCode() == 0 && resp.getStatus().getErrorCode().equals(ErrorCode.Success)) {
+                return R.success(resp);
+            } else {
+                throw new RuntimeException("Failed to initialize connection. Error: " + resp.getStatus().getReason());
+            }
+        }
+    }
+
+    private String getHostName() {
+        try {
+            InetAddress address = InetAddress.getLocalHost();
+            return address.getHostName();
+        } catch (UnknownHostException var2) {
+            this.logWarning("Failed to get host name! Exception:{}", new Object[]{var2});
+            return "Unknown";
+        }
+    }
+
+    private String getLocalTimeStr() {
+        LocalDateTime now = LocalDateTime.now();
+        return now.toString();
+    }
+
+    private String getSDKVersion() {
+        Package pkg = CustomMilvusServiceClient.class.getPackage();
+        String ver = pkg.getImplementationVersion();
+        return ver == null ? "" : ver;
+    }
+
+    public void setLogLevel(LogLevel level) {
+        this.logLevel = level;
+    }
+
+    public R<Boolean> hasCollection(HasCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.hasCollection(requestParam);
+        });
+    }
+
+    public R<RpcStatus> createDatabase(CreateDatabaseParam requestParam) {
+        return this.retry(() -> {
+            return super.createDatabase(requestParam);
+        });
+    }
+
+    public R<RpcStatus> dropDatabase(DropDatabaseParam requestParam) {
+        return this.retry(() -> {
+            return super.dropDatabase(requestParam);
+        });
+    }
+
+    public R<ListDatabasesResponse> listDatabases() {
+        return this.retry(() -> {
+            return super.listDatabases();
+        });
+    }
+
+    public R<RpcStatus> alterDatabase(AlterDatabaseParam requestParam) {
+        return this.retry(() -> {
+            return super.alterDatabase(requestParam);
+        });
+    }
+
+    public R<DescribeDatabaseResponse> describeDatabase(DescribeDatabaseParam requestParam) {
+        return this.retry(() -> {
+            return super.describeDatabase(requestParam);
+        });
+    }
+
+    public R<RpcStatus> createCollection(CreateCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.createCollection(requestParam);
+        });
+    }
+
+    public R<RpcStatus> dropCollection(DropCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.dropCollection(requestParam);
+        });
+    }
+
+    public R<RpcStatus> loadCollection(LoadCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.loadCollection(requestParam);
+        });
+    }
+
+    public R<RpcStatus> releaseCollection(ReleaseCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.releaseCollection(requestParam);
+        });
+    }
+
+    public R<DescribeCollectionResponse> describeCollection(DescribeCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.describeCollection(requestParam);
+        });
+    }
+
+    public R<GetCollectionStatisticsResponse> getCollectionStatistics(GetCollectionStatisticsParam requestParam) {
+        return this.retry(() -> {
+            return super.getCollectionStatistics(requestParam);
+        });
+    }
+
+    public R<RpcStatus> renameCollection(RenameCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.renameCollection(requestParam);
+        });
+    }
+
+    public R<ShowCollectionsResponse> showCollections(ShowCollectionsParam requestParam) {
+        return this.retry(() -> {
+            return super.showCollections(requestParam);
+        });
+    }
+
+    public R<RpcStatus> alterCollection(AlterCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.alterCollection(requestParam);
+        });
+    }
+
+    public R<FlushResponse> flush(FlushParam requestParam) {
+        return this.retry(() -> {
+            return super.flush(requestParam);
+        });
+    }
+
+    public R<FlushAllResponse> flushAll(boolean syncFlushAll, long syncFlushAllWaitingInterval, long syncFlushAllTimeout) {
+        return this.retry(() -> {
+            return super.flushAll(syncFlushAll, syncFlushAllWaitingInterval, syncFlushAllTimeout);
+        });
+    }
+
+    public R<RpcStatus> createPartition(CreatePartitionParam requestParam) {
+        return this.retry(() -> {
+            return super.createPartition(requestParam);
+        });
+    }
+
+    public R<RpcStatus> dropPartition(DropPartitionParam requestParam) {
+        return this.retry(() -> {
+            return super.dropPartition(requestParam);
+        });
+    }
+
+    public R<Boolean> hasPartition(HasPartitionParam requestParam) {
+        return this.retry(() -> {
+            return super.hasPartition(requestParam);
+        });
+    }
+
+    public R<RpcStatus> loadPartitions(LoadPartitionsParam requestParam) {
+        return this.retry(() -> {
+            return super.loadPartitions(requestParam);
+        });
+    }
+
+    public R<RpcStatus> releasePartitions(ReleasePartitionsParam requestParam) {
+        return this.retry(() -> {
+            return super.releasePartitions(requestParam);
+        });
+    }
+
+    public R<GetPartitionStatisticsResponse> getPartitionStatistics(GetPartitionStatisticsParam requestParam) {
+        return this.retry(() -> {
+            return super.getPartitionStatistics(requestParam);
+        });
+    }
+
+    public R<ShowPartitionsResponse> showPartitions(ShowPartitionsParam requestParam) {
+        return this.retry(() -> {
+            return super.showPartitions(requestParam);
+        });
+    }
+
+    public R<RpcStatus> createAlias(CreateAliasParam requestParam) {
+        return this.retry(() -> {
+            return super.createAlias(requestParam);
+        });
+    }
+
+    public R<RpcStatus> dropAlias(DropAliasParam requestParam) {
+        return this.retry(() -> {
+            return super.dropAlias(requestParam);
+        });
+    }
+
+    public R<RpcStatus> alterAlias(AlterAliasParam requestParam) {
+        return this.retry(() -> {
+            return super.alterAlias(requestParam);
+        });
+    }
+
+    public R<ListAliasesResponse> listAliases(ListAliasesParam requestParam) {
+        return this.retry(() -> {
+            return super.listAliases(requestParam);
+        });
+    }
+
+    public R<RpcStatus> createIndex(CreateIndexParam requestParam) {
+        return this.retry(() -> {
+            return super.createIndex(requestParam);
+        });
+    }
+
+    public R<RpcStatus> dropIndex(DropIndexParam requestParam) {
+        return this.retry(() -> {
+            return super.dropIndex(requestParam);
+        });
+    }
+
+    public R<DescribeIndexResponse> describeIndex(DescribeIndexParam requestParam) {
+        return this.retry(() -> {
+            return super.describeIndex(requestParam);
+        });
+    }
+
+    public R<GetIndexStateResponse> getIndexState(@NonNull GetIndexStateParam requestParam) {
+        if (requestParam == null) {
+            throw new NullPointerException("requestParam is marked non-null but is null");
+        } else {
+            return this.retry(() -> {
+                return super.getIndexState(requestParam);
+            });
+        }
+    }
+
+    public R<GetIndexBuildProgressResponse> getIndexBuildProgress(@NonNull GetIndexBuildProgressParam requestParam) {
+        if (requestParam == null) {
+            throw new NullPointerException("requestParam is marked non-null but is null");
+        } else {
+            return this.retry(() -> {
+                return super.getIndexBuildProgress(requestParam);
+            });
+        }
+    }
+
+    public R<MutationResult> insert(InsertParam requestParam) {
+        return this.retry(() -> {
+            return super.insert(requestParam);
+        });
+    }
+
+    public R<MutationResult> upsert(UpsertParam requestParam) {
+        return this.retry(() -> {
+            return super.upsert(requestParam);
+        });
+    }
+
+    public R<MutationResult> delete(DeleteParam requestParam) {
+        return this.retry(() -> {
+            return super.delete(requestParam);
+        });
+    }
+
+    public R<SearchResults> search(SearchParam requestParam) {
+        return this.retry(() -> {
+            return super.search(requestParam);
+        });
+    }
+
+    public R<SearchResults> hybridSearch(HybridSearchParam requestParam) {
+        return this.retry(() -> {
+            return super.hybridSearch(requestParam);
+        });
+    }
+
+    public R<QueryResults> query(QueryParam requestParam) {
+        return this.retry(() -> {
+            return super.query(requestParam);
+        });
+    }
+
+    public R<GetMetricsResponse> getMetrics(GetMetricsParam requestParam) {
+        return this.retry(() -> {
+            return super.getMetrics(requestParam);
+        });
+    }
+
+    public R<GetFlushStateResponse> getFlushState(GetFlushStateParam requestParam) {
+        return this.retry(() -> {
+            return super.getFlushState(requestParam);
+        });
+    }
+
+    public R<GetFlushAllStateResponse> getFlushAllState(GetFlushAllStateParam requestParam) {
+        return this.retry(() -> {
+            return super.getFlushAllState(requestParam);
+        });
+    }
+
+    public R<GetPersistentSegmentInfoResponse> getPersistentSegmentInfo(GetPersistentSegmentInfoParam requestParam) {
+        return this.retry(() -> {
+            return super.getPersistentSegmentInfo(requestParam);
+        });
+    }
+
+    public R<GetQuerySegmentInfoResponse> getQuerySegmentInfo(GetQuerySegmentInfoParam requestParam) {
+        return this.retry(() -> {
+            return super.getQuerySegmentInfo(requestParam);
+        });
+    }
+
+    public R<GetReplicasResponse> getReplicas(GetReplicasParam requestParam) {
+        return this.retry(() -> {
+            return super.getReplicas(requestParam);
+        });
+    }
+
+    public R<RpcStatus> loadBalance(LoadBalanceParam requestParam) {
+        return this.retry(() -> {
+            return super.loadBalance(requestParam);
+        });
+    }
+
+    public R<GetCompactionStateResponse> getCompactionState(GetCompactionStateParam requestParam) {
+        return this.retry(() -> {
+            return super.getCompactionState(requestParam);
+        });
+    }
+
+    public R<ManualCompactionResponse> manualCompact(ManualCompactParam requestParam) {
+        return this.retry(() -> {
+            return super.manualCompact(requestParam);
+        });
+    }
+
+    public R<GetCompactionPlansResponse> getCompactionStateWithPlans(GetCompactionPlansParam requestParam) {
+        return this.retry(() -> {
+            return super.getCompactionStateWithPlans(requestParam);
+        });
+    }
+
+    public R<RpcStatus> createCredential(CreateCredentialParam requestParam) {
+        return this.retry(() -> {
+            return super.createCredential(requestParam);
+        });
+    }
+
+    public R<RpcStatus> updateCredential(UpdateCredentialParam requestParam) {
+        return this.retry(() -> {
+            return super.updateCredential(requestParam);
+        });
+    }
+
+    public R<RpcStatus> deleteCredential(DeleteCredentialParam requestParam) {
+        return this.retry(() -> {
+            return super.deleteCredential(requestParam);
+        });
+    }
+
+    public R<ListCredUsersResponse> listCredUsers(ListCredUsersParam requestParam) {
+        return this.retry(() -> {
+            return super.listCredUsers(requestParam);
+        });
+    }
+
+    public R<RpcStatus> createRole(CreateRoleParam requestParam) {
+        return this.retry(() -> {
+            return super.createRole(requestParam);
+        });
+    }
+
+    public R<RpcStatus> dropRole(DropRoleParam requestParam) {
+        return this.retry(() -> {
+            return super.dropRole(requestParam);
+        });
+    }
+
+    public R<RpcStatus> addUserToRole(AddUserToRoleParam requestParam) {
+        return this.retry(() -> {
+            return super.addUserToRole(requestParam);
+        });
+    }
+
+    public R<RpcStatus> removeUserFromRole(RemoveUserFromRoleParam requestParam) {
+        return this.retry(() -> {
+            return super.removeUserFromRole(requestParam);
+        });
+    }
+
+    public R<SelectRoleResponse> selectRole(SelectRoleParam requestParam) {
+        return this.retry(() -> {
+            return super.selectRole(requestParam);
+        });
+    }
+
+    public R<SelectUserResponse> selectUser(SelectUserParam requestParam) {
+        return this.retry(() -> {
+            return super.selectUser(requestParam);
+        });
+    }
+
+    public R<RpcStatus> grantRolePrivilege(GrantRolePrivilegeParam requestParam) {
+        return this.retry(() -> {
+            return super.grantRolePrivilege(requestParam);
+        });
+    }
+
+    public R<RpcStatus> revokeRolePrivilege(RevokeRolePrivilegeParam requestParam) {
+        return this.retry(() -> {
+            return super.revokeRolePrivilege(requestParam);
+        });
+    }
+
+    public R<SelectGrantResponse> selectGrantForRole(SelectGrantForRoleParam requestParam) {
+        return this.retry(() -> {
+            return super.selectGrantForRole(requestParam);
+        });
+    }
+
+    public R<SelectGrantResponse> selectGrantForRoleAndObject(SelectGrantForRoleAndObjectParam requestParam) {
+        return this.retry(() -> {
+            return super.selectGrantForRoleAndObject(requestParam);
+        });
+    }
+
+    public R<ImportResponse> bulkInsert(BulkInsertParam requestParam) {
+        return this.retry(() -> {
+            return super.bulkInsert(requestParam);
+        });
+    }
+
+    public R<GetImportStateResponse> getBulkInsertState(GetBulkInsertStateParam requestParam) {
+        return this.retry(() -> {
+            return super.getBulkInsertState(requestParam);
+        });
+    }
+
+    public R<ListImportTasksResponse> listBulkInsertTasks(ListBulkInsertTasksParam requestParam) {
+        return this.retry(() -> {
+            return super.listBulkInsertTasks(requestParam);
+        });
+    }
+
+    public R<CheckHealthResponse> checkHealth() {
+        return this.retry(() -> {
+            return super.checkHealth();
+        });
+    }
+
+    public R<GetVersionResponse> getVersion() {
+        return this.retry(() -> {
+            return super.getVersion();
+        });
+    }
+
+    public R<GetLoadingProgressResponse> getLoadingProgress(GetLoadingProgressParam requestParam) {
+        return this.retry(() -> {
+            return super.getLoadingProgress(requestParam);
+        });
+    }
+
+    public R<GetLoadStateResponse> getLoadState(GetLoadStateParam requestParam) {
+        return this.retry(() -> {
+            return super.getLoadState(requestParam);
+        });
+    }
+
+    public R<RpcStatus> createResourceGroup(CreateResourceGroupParam requestParam) {
+        return this.retry(() -> {
+            return super.createResourceGroup(requestParam);
+        });
+    }
+
+    public R<RpcStatus> dropResourceGroup(DropResourceGroupParam requestParam) {
+        return this.retry(() -> {
+            return super.dropResourceGroup(requestParam);
+        });
+    }
+
+    public R<ListResourceGroupsResponse> listResourceGroups(ListResourceGroupsParam requestParam) {
+        return this.retry(() -> {
+            return super.listResourceGroups(requestParam);
+        });
+    }
+
+    public R<DescribeResourceGroupResponse> describeResourceGroup(DescribeResourceGroupParam requestParam) {
+        return this.retry(() -> {
+            return super.describeResourceGroup(requestParam);
+        });
+    }
+
+    public R<RpcStatus> transferNode(TransferNodeParam requestParam) {
+        return this.retry(() -> {
+            return super.transferNode(requestParam);
+        });
+    }
+
+    public R<RpcStatus> transferReplica(TransferReplicaParam requestParam) {
+        return this.retry(() -> {
+            return super.transferReplica(requestParam);
+        });
+    }
+
+    public R<RpcStatus> updateResourceGroups(UpdateResourceGroupsParam requestParam) {
+        return this.retry(() -> {
+            return super.updateResourceGroups(requestParam);
+        });
+    }
+
+    public R<RpcStatus> createCollection(CreateSimpleCollectionParam requestParam) {
+        return this.retry(() -> {
+            return super.createCollection(requestParam);
+        });
+    }
+
+    public R<ListCollectionsResponse> listCollections(ListCollectionsParam requestParam) {
+        return this.retry(() -> {
+            return super.listCollections(requestParam);
+        });
+    }
+
+    public R<InsertResponse> insert(InsertRowsParam requestParam) {
+        return this.retry(() -> {
+            return super.insert(requestParam);
+        });
+    }
+
+    public R<DeleteResponse> delete(DeleteIdsParam requestParam) {
+        return this.retry(() -> {
+            return super.delete(requestParam);
+        });
+    }
+
+    public R<GetResponse> get(GetIdsParam requestParam) {
+        return this.retry(() -> {
+            return super.get(requestParam);
+        });
+    }
+
+    public R<QueryResponse> query(QuerySimpleParam requestParam) {
+        return this.retry(() -> {
+            return super.query(requestParam);
+        });
+    }
+
+    public R<SearchResponse> search(SearchSimpleParam requestParam) {
+        return this.retry(() -> {
+            return super.search(requestParam);
+        });
+    }
+
+    public R<QueryIterator> queryIterator(QueryIteratorParam requestParam) {
+        return this.retry(() -> {
+            return super.queryIterator(requestParam);
+        });
+    }
+
+    public R<SearchIterator> searchIterator(SearchIteratorParam requestParam) {
+        return this.retry(() -> {
+            return super.searchIterator(requestParam);
+        });
+    }
+
+    private static class TimeoutInterceptor implements ClientInterceptor {
+        private final long timeoutMillis;
+
+        TimeoutInterceptor(long timeoutMillis) {
+            this.timeoutMillis = timeoutMillis;
+        }
+
+        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+            return next.newCall(method, callOptions.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS));
+        }
+    }
+}

+ 0 - 14
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/remote/ModelRemoteService.java

@@ -1,24 +1,10 @@
 package com.tzld.piaoquan.recommend.server.remote;
 
 import com.tzld.piaoquan.recommend.feature.client.FeatureClient;
-import com.tzld.piaoquan.recommend.server.util.JSONUtils;
-import io.milvus.client.MilvusClient;
-import io.milvus.grpc.SearchResultData;
-import io.milvus.grpc.SearchResults;
-import io.milvus.param.ConnectParam;
-import io.milvus.param.R;
-import io.milvus.param.dml.SearchParam;
-import io.milvus.pool.MilvusClientV1Pool;
-import io.milvus.pool.PoolConfig;
-import io.paddle.serving.client.Client;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.PostConstruct;
-import java.time.Duration;
-
 /**
  * @author dyp
  */

+ 24 - 15
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/remote/MilvusRemoteService.java → recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/remote/ZillizRemoteService.java

@@ -1,13 +1,13 @@
 package com.tzld.piaoquan.recommend.server.remote;
 
-import com.tzld.piaoquan.recommend.server.util.JSONUtils;
 import io.milvus.client.MilvusClient;
 import io.milvus.grpc.SearchResultData;
 import io.milvus.grpc.SearchResults;
 import io.milvus.param.ConnectParam;
 import io.milvus.param.R;
 import io.milvus.param.dml.SearchParam;
-import io.milvus.pool.MilvusClientV1Pool;
+import io.milvus.pool.ClientPool;
+import io.milvus.pool.PoolClientFactory;
 import io.milvus.pool.PoolConfig;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
@@ -23,22 +23,26 @@ import java.time.Duration;
  */
 @Component
 @Slf4j
-public class MilvusRemoteService {
+public class ZillizRemoteService {
 
-    private MilvusClientV1Pool pool;
+    private CustomMilvusClientPool pool;
     private String poolKey = "milvus";
-    @Value("${milvusHost:}")
-    private String milvusHost;
-    @Value("${milvusPort:}")
-    private int milvusPort;
+    @Value("${zillizUri:}")
+    private String zillizUri;
+    @Value("${zillizToken:}")
+    private String zillizToken;
 
 
     @PostConstruct
     public void init() {
         try {
+            // milvus 使用 io.grpc.internal.DnsNameResolverProvider
             ConnectParam connectConfig = ConnectParam.newBuilder()
-                    .withHost(milvusHost)
-                    .withPort(milvusPort)
+                    .withHost("in01-6540fb97902829b.ali-cn-hangzhou.vectordb.zilliz.com.cn")
+                    .withPort(19530)
+                    .withSecure(true)
+                    //.withUri(zillizUri)
+                    .withToken(zillizToken)
                     .build();
             PoolConfig poolConfig = PoolConfig.builder()
                     .maxIdlePerKey(1000) // max idle clients per key
@@ -48,7 +52,7 @@ public class MilvusRemoteService {
                     // client available
                     .minEvictableIdleDuration(Duration.ofSeconds(10L)) // if number of idle clients is larger than maxIdlePerKey, redundant idle clients will be evicted after 10 seconds
                     .build();
-            pool = new MilvusClientV1Pool(poolConfig, connectConfig);
+            pool = new CustomMilvusClientPool(poolConfig, connectConfig);
         } catch (Exception e) {
             log.error("MilvusClientV2Pool init error ", e);
             throw new RuntimeException();
@@ -57,15 +61,20 @@ public class MilvusRemoteService {
 
     public SearchResultData search(SearchParam param) {
         MilvusClient client = pool.getClient(poolKey);
-
-        // Construct a vector to search top5 similar records, return the book title for us.
-        // This vector is equal to the No.3 record, we suppose the No.3 record is the most similar.
         R<SearchResults> searchRet = client.search(param);
+        pool.returnClient(poolKey, client);
+
         if (searchRet.getStatus() != R.Status.Success.getCode()) {
             throw new RuntimeException("Failed to search! Error: " + searchRet.getMessage());
         }
-        System.out.println(JSONUtils.toJson(searchRet));
+
         return searchRet.getData().getResults();
     }
 
+
+    private class CustomMilvusClientPool extends ClientPool<ConnectParam, MilvusClient> {
+        public CustomMilvusClientPool(PoolConfig poolConfig, ConnectParam connectParam) throws ClassNotFoundException, NoSuchMethodException {
+            super(poolConfig, new PoolClientFactory(connectParam, CustomMilvusServiceClient.class.getName()));
+        }
+    }
 }

+ 3 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/VecSearchService.java

@@ -1,6 +1,6 @@
 package com.tzld.piaoquan.recommend.server.service;
 
-import com.tzld.piaoquan.recommend.server.remote.MilvusRemoteService;
+import com.tzld.piaoquan.recommend.server.remote.ZillizRemoteService;
 import io.milvus.grpc.SearchResultData;
 import io.milvus.param.MetricType;
 import io.milvus.param.dml.SearchParam;
@@ -16,7 +16,7 @@ import java.util.List;
 @Service
 public class VecSearchService {
     @Autowired
-    private MilvusRemoteService milvusRemoteService;
+    private ZillizRemoteService zillizRemoteService;
 
     private String videoCollectionName = "video";
 
@@ -31,7 +31,7 @@ public class VecSearchService {
                 .addOutField("video_vec")
                 .addOutField("vid")
                 .build();
-        SearchResultData data = milvusRemoteService.search(param);
+        SearchResultData data = zillizRemoteService.search(param);
         return null;
     }
 }

+ 2 - 2
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/WarmUpService.java

@@ -35,8 +35,8 @@ public class WarmUpService {
         featureRedisTemplate.opsForValue().get("");
         longVideoRedisTemplate.opsForValue().get("");
         com.tzld.piaoquan.recommend.server.service.score.ScorerUtils.warmUp();
-        com.tzld.piaoquan.recommend.server.framework.score.ScorerUtils.warmUp();
+        //com.tzld.piaoquan.recommend.server.framework.score.ScorerUtils.warmUp();
         wxVideoStatusRepository.count();
-        SimilarityUtils.init();
+        //SimilarityUtils.init();
     }
 }

文件差異過大導致無法顯示
+ 31 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/web/DemoController.java


+ 1 - 1
recommend-server-service/src/main/resources/application-dev.yml

@@ -100,7 +100,7 @@ spring:
       idle-timeout: 30000
   jpa:
     hibernate:
-      ddl-auto: validate
+      ddl-auto: none
     database: mysql
 xxl:
   job:

部分文件因文件數量過多而無法顯示