Browse Source

Merge branch '20251219_feature_fjy_ad_chubuchu' of algorithm/ad-engine into master

fanjinyang 2 days ago
parent
commit
d4020c1baf

+ 1 - 0
ad-engine-commons/src/main/java/com/tzld/piaoquan/ad/engine/commons/enums/RedisPrefixEnum.java

@@ -4,6 +4,7 @@ public enum RedisPrefixEnum {
 
     ADVANCE_SHOW_AD_FLAG("ad:advance:show:ad:flag:%s", "是否提前出广告标识,0-否;1-是"),
     ADVER_IS_API_EQ_0_IDS("ad:adver:isapi:0", "未回传广告主ID集合"),
+    AD_USER_ROR_BEHAVIOR("ad:user:ror:behavior:%s", "用户ror行为特征")
     ;
     private String prefix;
     private String desc;

+ 129 - 0
ad-engine-server/src/main/java/com/tzld/piaoquan/ad/engine/server/config/EurekaOverrideStatusConfig.java

@@ -0,0 +1,129 @@
+package com.tzld.piaoquan.ad.engine.server.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * Eureka覆盖状态配置
+ */
+@Component
+@ConfigurationProperties(prefix = "eureka.override.status")
+public class EurekaOverrideStatusConfig {
+
+    /**
+     * 是否启用覆盖状态移除功能
+     */
+    private boolean removalEnabled = true;
+
+    /**
+     * 移除覆盖状态后等待时间(毫秒)
+     */
+    private long waitTimeAfterRemoval = 1000L;
+
+    /**
+     * 是否在启动时打印详细日志
+     */
+    private boolean verboseLogging = true;
+
+    /**
+     * 初始延迟时间(毫秒)- 确保Eureka客户端完全初始化
+     */
+    private long initialDelay = 5000L;
+
+    /**
+     * 最大重试次数
+     */
+    private int maxRetries = 3;
+
+    /**
+     * 重试间隔时间(毫秒)
+     */
+    private long retryInterval = 2000L;
+
+    /**
+     * Eureka服务器URL(可选,如果不配置会尝试自动获取)
+     */
+    private String eurekaServerUrl;
+
+    /**
+     * 飞书报警URL(配置为ad.platform.feishu.alert.server-up对应的webhook地址)
+     */
+    private String feishuAlertUrl;
+
+    /**
+     * 飞书消息格式类型(text: 纯文本, card: 富文本卡片)
+     */
+    private String feishuMessageType = "card";
+
+    public boolean isRemovalEnabled() {
+        return removalEnabled;
+    }
+
+    public void setRemovalEnabled(boolean removalEnabled) {
+        this.removalEnabled = removalEnabled;
+    }
+
+    public long getWaitTimeAfterRemoval() {
+        return waitTimeAfterRemoval;
+    }
+
+    public void setWaitTimeAfterRemoval(long waitTimeAfterRemoval) {
+        this.waitTimeAfterRemoval = waitTimeAfterRemoval;
+    }
+
+    public boolean isVerboseLogging() {
+        return verboseLogging;
+    }
+
+    public void setVerboseLogging(boolean verboseLogging) {
+        this.verboseLogging = verboseLogging;
+    }
+
+    public long getInitialDelay() {
+        return initialDelay;
+    }
+
+    public void setInitialDelay(long initialDelay) {
+        this.initialDelay = initialDelay;
+    }
+
+    public int getMaxRetries() {
+        return maxRetries;
+    }
+
+    public void setMaxRetries(int maxRetries) {
+        this.maxRetries = maxRetries;
+    }
+
+    public long getRetryInterval() {
+        return retryInterval;
+    }
+
+    public void setRetryInterval(long retryInterval) {
+        this.retryInterval = retryInterval;
+    }
+
+    public String getEurekaServerUrl() {
+        return eurekaServerUrl;
+    }
+
+    public void setEurekaServerUrl(String eurekaServerUrl) {
+        this.eurekaServerUrl = eurekaServerUrl;
+    }
+
+    public String getFeishuAlertUrl() {
+        return feishuAlertUrl;
+    }
+
+    public void setFeishuAlertUrl(String feishuAlertUrl) {
+        this.feishuAlertUrl = feishuAlertUrl;
+    }
+
+    public String getFeishuMessageType() {
+        return feishuMessageType;
+    }
+
+    public void setFeishuMessageType(String feishuMessageType) {
+        this.feishuMessageType = feishuMessageType;
+    }
+}

+ 60 - 0
ad-engine-server/src/main/java/com/tzld/piaoquan/ad/engine/server/config/RestClientConfig.java

@@ -0,0 +1,60 @@
+package com.tzld.piaoquan.ad.engine.server.config;
+
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
+import org.springframework.web.client.RestTemplate;
+
+@Configuration
+public class RestClientConfig {
+
+    @Value("${rest.eureka.connect-timeout-ms:2000}")
+    private int connectTimeoutMs;
+
+    @Value("${rest.eureka.read-timeout-ms:5000}")
+    private int readTimeoutMs;
+
+    @Value("${rest.eureka.connection-request-timeout-ms:2000}")
+    private int connectionRequestTimeoutMs;
+
+    @Value("${rest.eureka.pool.max-total:100}")
+    private int maxTotal;
+
+    @Value("${rest.eureka.pool.max-per-route:20}")
+    private int maxPerRoute;
+
+    @Bean(name = "eurekaRestTemplate")
+    public RestTemplate eurekaRestTemplate() {
+        // Connection pool
+        PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
+        cm.setMaxTotal(maxTotal);
+        cm.setDefaultMaxPerRoute(maxPerRoute);
+
+        // Timeouts
+        RequestConfig requestConfig = RequestConfig.custom()
+                .setConnectTimeout(connectTimeoutMs)
+                .setSocketTimeout(readTimeoutMs)
+                .setConnectionRequestTimeout(connectionRequestTimeoutMs)
+                .build();
+
+        CloseableHttpClient httpClient = HttpClientBuilder.create()
+                .setConnectionManager(cm)
+                .setDefaultRequestConfig(requestConfig)
+                .evictIdleConnections(30, java.util.concurrent.TimeUnit.SECONDS)
+                .build();
+
+        HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory(httpClient);
+        // Also set timeouts at factory level (some Spring versions prefer this)
+        factory.setConnectTimeout(connectTimeoutMs);
+        factory.setReadTimeout(readTimeoutMs);
+        factory.setConnectionRequestTimeout(connectionRequestTimeoutMs);
+
+        return new RestTemplate(factory);
+    }
+}
+

+ 444 - 0
ad-engine-server/src/main/java/com/tzld/piaoquan/ad/engine/server/listener/EurekaOverrideStatusRemovalListener.java

@@ -0,0 +1,444 @@
+package com.tzld.piaoquan.ad.engine.server.listener;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.netflix.appinfo.ApplicationInfoManager;
+import com.netflix.appinfo.InstanceInfo;
+import com.tzld.piaoquan.ad.engine.server.config.EurekaOverrideStatusConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.core.env.Environment;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Eureka覆盖状态移除监听器
+ * 应用启动完成后,移除该节点在Eureka上的覆盖状态
+ *
+ * 修复内容:
+ * 1. 异步执行避免阻塞应用启动
+ * 2. 添加重试机制
+ * 3. 正确移除覆盖状态而不是设置新的覆盖状态
+ * 4. 完善状态验证
+ */
+@Component
+public class EurekaOverrideStatusRemovalListener implements ApplicationListener<ApplicationReadyEvent> {
+
+    private static final Logger log = LoggerFactory.getLogger(EurekaOverrideStatusRemovalListener.class);
+
+    @Autowired(required = false)
+    private ApplicationInfoManager applicationInfoManager;
+
+    @Autowired
+    private EurekaOverrideStatusConfig config;
+
+    @Value("${eureka.client.serviceUrl.defaultZone:}")
+    private String eurekaDefaultZone;
+
+    @Autowired
+    private Environment environment;
+
+    @Autowired
+    @Qualifier("eurekaRestTemplate")
+    private RestTemplate restTemplate;
+
+    @Override
+    public void onApplicationEvent(ApplicationReadyEvent event) {
+        if (!config.isRemovalEnabled()) {
+            log.info("Eureka override status removal is disabled by configuration");
+            return;
+        }
+
+        log.info("Application ready event received, scheduling Eureka override status removal...");
+
+        // 异步执行,避免阻塞应用启动
+        CompletableFuture.runAsync(() -> {
+            boolean success = false;
+            String errorMessage = null;
+
+            try {
+                // 添加初始延迟,确保Eureka客户端完全初始化
+                Thread.sleep(config.getInitialDelay());
+                success = removeOverrideStatusWithRetry();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.error("Thread interrupted during initial delay", e);
+                errorMessage = "Thread interrupted: " + e.getMessage();
+            } catch (Exception e) {
+                log.error("Failed to remove Eureka override status", e);
+                errorMessage = e.getMessage();
+            } finally {
+                // 打印日志
+                log.info("Eureka override status removal result: {} - {}",success, errorMessage);
+            }
+        });
+    }
+
+    /**
+     * 带重试机制的覆盖状态移除
+     * @return true 如果成功移除,false 如果失败
+     */
+    private boolean removeOverrideStatusWithRetry() {
+        int maxRetries = config.getMaxRetries();
+        for (int attempt = 1; attempt <= maxRetries; attempt++) {
+            try {
+                log.info("Attempting to remove Eureka override status (attempt {}/{})", attempt, maxRetries);
+
+                if (attemptRemoveOverrideStatus()) {
+                    log.info("Successfully removed Eureka override status on attempt {}", attempt);
+                    return true;
+                }
+            } catch (Exception e) {
+                log.warn("Attempt {} to remove override status failed: {}", attempt, e.getMessage());
+                if (config.isVerboseLogging()) {
+                    log.debug("Detailed error for attempt " + attempt, e);
+                }
+            }
+
+            // 如果不是最后一次尝试,等待重试间隔
+            if (attempt < maxRetries) {
+                try {
+                    log.info("Waiting {}ms before retry...", config.getRetryInterval());
+                    Thread.sleep(config.getRetryInterval());
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    log.error("Thread interrupted during retry wait", e);
+                    return false;
+                }
+            }
+        }
+        log.error("Failed to remove override status after {} attempts", maxRetries);
+        return false;
+    }
+    /**
+     * 尝试移除覆盖状态
+     * @return true 如果成功移除或无需移除,false 如果移除失败
+     */
+    private boolean attemptRemoveOverrideStatus() throws InterruptedException {
+        if (applicationInfoManager == null) {
+            log.warn("ApplicationInfoManager not found, Eureka may not be enabled");
+            return false;
+        }
+
+        InstanceInfo instanceInfo = applicationInfoManager.getInfo();
+        if (instanceInfo == null) {
+            log.warn("InstanceInfo not found");
+            return false;
+        }
+
+        // 获取实例信息
+        String instanceId = instanceInfo.getInstanceId();
+        String appName = instanceInfo.getAppName();
+        InstanceInfo.InstanceStatus currentStatus = instanceInfo.getStatus();
+        InstanceInfo.InstanceStatus localOverriddenStatus = instanceInfo.getOverriddenStatus();
+
+        if (config.isVerboseLogging()) {
+            log.info("Eureka instance info - App: {}, InstanceId: {}, Current status: {}, Local Overridden status: {}",
+                    appName, instanceId, currentStatus, localOverriddenStatus);
+        }
+
+        // 总是查询Eureka Server获取最新的覆盖状态
+        log.info("Querying Eureka Server for latest override status of instance [{}]", instanceId);
+        InstanceInfo.InstanceStatus serverOverriddenStatus = queryServerOverrideStatus(appName, instanceId);
+
+        if (serverOverriddenStatus == null) {
+            log.info("No override status found on Eureka Server for instance [{}]", instanceId);
+            return true; // 服务器上没有覆盖状态,无需处理
+        }
+
+        log.info("Eureka Server reports override status [{}] for instance [{}]", serverOverriddenStatus, instanceId);
+
+        // 判断是否需要清理 - 只处理OUT_OF_SERVICE
+        if (serverOverriddenStatus == InstanceInfo.InstanceStatus.OUT_OF_SERVICE) {
+            log.info("Found OUT_OF_SERVICE override status on server for instance [{}], removing it...", instanceId);
+
+            // 通过Eureka REST API移除覆盖状态
+            boolean removed = removeOverrideStatusViaRestApi(instanceInfo);
+
+            if (removed) {
+                // 等待状态更新生效
+                Thread.sleep(config.getWaitTimeAfterRemoval());
+
+                // 验证是否成功移除覆盖状态
+                return verifyStatusRemoval(instanceId);
+            } else {
+                log.warn("Failed to remove override status via REST API for instance [{}]", instanceId);
+                return false;
+            }
+        } else {
+            log.info("Instance [{}] has override status [{}] on server, but it's not OUT_OF_SERVICE, skipping removal",
+                    instanceId, serverOverriddenStatus);
+            return true; // 不是OUT_OF_SERVICE状态,不需要处理,视为成功
+        }
+    }
+
+    /**
+     * 查询Eureka Server上的覆盖状态
+     * @param appName 应用名称
+     * @param instanceId 实例ID
+     * @return 服务器上的覆盖状态,如果没有则返回null
+     */
+    private InstanceInfo.InstanceStatus queryServerOverrideStatus(String appName, String instanceId) {
+        try {
+            // 构建查询URL: GET /eureka/apps/{APP}/{INSTANCE_ID}
+            String eurekaUrl = getEurekaServerUrl();
+            if (eurekaUrl == null) {
+                log.warn("Cannot determine Eureka server URL");
+                return null;
+            }
+
+            String queryUrl = eurekaUrl + "/apps/" + appName + "/" + instanceId;
+            log.debug("Querying Eureka Server: {}", queryUrl);
+
+            // 设置Accept头为application/json
+            HttpHeaders headers = new HttpHeaders();
+            headers.set("Accept", "application/json");
+            HttpEntity<String> entity = new HttpEntity<>(headers);
+
+            ResponseEntity<String> response = restTemplate.exchange(queryUrl, HttpMethod.GET, entity, String.class);
+
+            if (response.getStatusCode().is2xxSuccessful() && response.getBody() != null) {
+                return parseOverrideStatusFromJson(response.getBody());
+            } else {
+                log.warn("Failed to query Eureka Server, status: {}", response.getStatusCode());
+                return null;
+            }
+
+        } catch (Exception e) {
+            log.warn("Error querying Eureka Server for override status: {}", e.getMessage());
+            return null;
+        }
+    }
+
+    /**
+     * 从JSON响应中解析覆盖状态
+     * @param jsonResponse JSON响应内容
+     * @return 覆盖状态,如果没有则返回null
+     */
+    private InstanceInfo.InstanceStatus parseOverrideStatusFromJson(String jsonResponse) {
+        try {
+            // 使用 JSON 解析(FastJSON),而不是正则
+            JSONObject root = JSONObject.parseObject(jsonResponse);
+            String statusStr = findOverrideStatusRecursively(root);
+
+            if (statusStr == null || statusStr.trim().isEmpty()) {
+                log.debug("No overriddenStatus found in JSON response");
+                return null;
+            }
+
+            log.debug("Found overriddenStatus in JSON: {}", statusStr);
+            try {
+                return InstanceInfo.InstanceStatus.valueOf(statusStr.trim().toUpperCase());
+            } catch (IllegalArgumentException e) {
+                log.warn("Unknown override status: {}", statusStr);
+                return null;
+            }
+        } catch (Exception e) {
+            log.warn("Error parsing override status from JSON: {}", e.getMessage());
+            return null;
+        }
+    }
+
+    /**
+     * 在JSON结构中递归查找 overriddenStatus/overriddenstatus 字段
+     */
+    private String findOverrideStatusRecursively(Object node) {
+        if (node == null) return null;
+
+        if (node instanceof JSONObject) {
+            JSONObject obj = (JSONObject) node;
+            // 先尝试直接命中字段
+            for (String key : obj.keySet()) {
+                if ("overriddenStatus".equalsIgnoreCase(key) || "overriddenstatus".equalsIgnoreCase(key)) {
+                    Object val = obj.get(key);
+                    return val == null ? null : String.valueOf(val);
+                }
+            }
+            // 递归遍历子节点
+            for (String key : obj.keySet()) {
+                String found = findOverrideStatusRecursively(obj.get(key));
+                if (found != null) return found;
+            }
+            return null;
+        } else if (node instanceof JSONArray) {
+            JSONArray arr = (JSONArray) node;
+            for (Object el : arr) {
+                String found = findOverrideStatusRecursively(el);
+                if (found != null) return found;
+            }
+            return null;
+        } else {
+            return null;
+        }
+    }
+
+
+    /**
+     * 通过Eureka REST API移除覆盖状态
+     * @param instanceInfo 实例信息
+     * @return true 如果成功移除,false 如果失败
+     */
+    private boolean removeOverrideStatusViaRestApi(InstanceInfo instanceInfo) {
+        try {
+            // 构造Eureka Server的REST API URL
+            // DELETE /eureka/apps/{appName}/{instanceId}/status
+            String eurekaServerUrl = getEurekaServerUrl();
+            if (eurekaServerUrl == null) {
+                log.warn("Cannot determine Eureka server URL");
+                return false;
+            }
+
+            String appName = instanceInfo.getAppName();
+            String instanceId = instanceInfo.getInstanceId();
+
+            // 构造删除覆盖状态的URL(去掉lastDirtyTimestamp参数)
+            String deleteOverrideUrl = String.format("%s/apps/%s/%s/status",
+                    eurekaServerUrl, appName, instanceId);
+
+            if (config.isVerboseLogging()) {
+                log.info("Attempting to remove override status via REST API: {}", deleteOverrideUrl);
+            }
+
+            // 发送DELETE请求移除覆盖状态
+            ResponseEntity<String> response = restTemplate.exchange(
+                    deleteOverrideUrl,
+                    HttpMethod.DELETE,
+                    null,
+                    String.class
+            );
+
+            // 接受所有2xx状态码作为成功
+            if (response.getStatusCode().is2xxSuccessful()) {
+                log.info("Successfully removed override status via REST API for instance [{}], status code: {}",
+                        instanceId, response.getStatusCode());
+                return true;
+            } else {
+                log.warn("Failed to remove override status via REST API. Status code: {}, Response: {}",
+                        response.getStatusCode(), response.getBody());
+                return false;
+            }
+
+        } catch (Exception e) {
+            log.error("Error removing override status via REST API", e);
+            return false;
+        }
+    }
+
+    /**
+     * 获取Eureka Server URL
+     * @return Eureka Server URL,如果无法确定则返回null
+     */
+    private String getEurekaServerUrl() {
+        try {
+            // 1. 优先使用配置文件中的URL
+            if (config.getEurekaServerUrl() != null && !config.getEurekaServerUrl().trim().isEmpty()) {
+                String configUrl = config.getEurekaServerUrl().trim();
+                if (configUrl.endsWith("/eureka/")) {
+                    configUrl = configUrl.substring(0, configUrl.length() - 1);
+                } else if (!configUrl.endsWith("/eureka")) {
+                    configUrl += "/eureka";
+                }
+                return configUrl;
+            }
+
+            // 2. 通过 @Value 注入的配置
+            String eurekaUrl = (eurekaDefaultZone != null && !eurekaDefaultZone.trim().isEmpty())
+                    ? eurekaDefaultZone.trim() : null;
+
+            // 3. 从 Spring Environment 读取(兼容两种键写法)
+            if (eurekaUrl == null || eurekaUrl.isEmpty()) {
+                eurekaUrl = environment.getProperty("eureka.client.service-url.defaultZone");
+            }
+            if (eurekaUrl == null || eurekaUrl.isEmpty()) {
+                eurekaUrl = environment.getProperty("eureka.client.serviceUrl.defaultZone");
+            }
+
+            // 4. 系统属性/环境变量兜底
+            if (eurekaUrl == null || eurekaUrl.isEmpty()) {
+                eurekaUrl = System.getProperty("eureka.client.serviceUrl.defaultZone");
+            }
+            if (eurekaUrl == null || eurekaUrl.isEmpty()) {
+                eurekaUrl = System.getenv("EUREKA_DEFAULT_ZONE");
+            }
+
+            if (eurekaUrl != null && !eurekaUrl.trim().isEmpty()) {
+                eurekaUrl = eurekaUrl.trim();
+                // 处理URL格式
+                if (eurekaUrl.endsWith("/eureka/")) {
+                    eurekaUrl = eurekaUrl.substring(0, eurekaUrl.length() - 1);
+                } else if (!eurekaUrl.endsWith("/eureka")) {
+                    eurekaUrl += "/eureka";
+                }
+                return eurekaUrl;
+            }
+
+            // 5. 使用默认值(根据实际环境调整)
+            log.warn("Cannot determine Eureka server URL from configuration, return null");
+            return null;
+
+        } catch (Exception e) {
+            log.error("Error getting Eureka server URL", e);
+            return null;
+        }
+    }
+
+    /**
+     * 验证覆盖状态是否已被移除
+     * @param instanceId 实例ID
+     * @return true 如果覆盖状态已移除或状态正常,false 如果仍存在问题状态
+     */
+    private boolean verifyStatusRemoval(String instanceId) {
+        try {
+            // 获取本地信息用于日志记录
+            InstanceInfo localInfo = applicationInfoManager.getInfo();
+            if (localInfo == null) {
+                log.warn("Cannot verify status removal - InstanceInfo is null");
+                return false;
+            }
+
+            String appName = localInfo.getAppName();
+            InstanceInfo.InstanceStatus localCurrentStatus = localInfo.getStatus();
+            InstanceInfo.InstanceStatus localOverriddenStatus = localInfo.getOverriddenStatus();
+
+            if (config.isVerboseLogging()) {
+                log.info("After removal attempt - Instance [{}] local current status: {}, local override status: {}",
+                        instanceId, localCurrentStatus, localOverriddenStatus);
+            }
+
+            // 查询服务器状态进行验证
+            log.info("Verifying override status removal by querying Eureka Server for instance [{}]", instanceId);
+            InstanceInfo.InstanceStatus serverOverriddenStatus = queryServerOverrideStatus(appName, instanceId);
+
+            if (serverOverriddenStatus == null) {
+                log.info("Successfully verified override status removal for instance [{}]. No override status found on server",
+                        instanceId);
+                return true;
+            } else if (serverOverriddenStatus == InstanceInfo.InstanceStatus.OUT_OF_SERVICE) {
+                log.warn("Override status removal failed. Instance [{}] still has OUT_OF_SERVICE override status on server",
+                        instanceId);
+                return false;
+            } else {
+                // 有其他类型的覆盖状态,但不是OUT_OF_SERVICE,这是可以接受的
+                log.info("Instance [{}] has override status [{}] on server (not OUT_OF_SERVICE). This is acceptable.",
+                        instanceId, serverOverriddenStatus);
+                return true;
+            }
+
+        } catch (Exception e) {
+            log.error("Error verifying status removal for instance [{}]", instanceId, e);
+            return false;
+        }
+    }
+
+}

+ 12 - 0
ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/feature/FeatureService.java

@@ -102,6 +102,18 @@ public class FeatureService {
         return this.invokeFeatureService(protos);
     }
 
+    /**
+     * 获取用户行为特征(用户分层、启动数、ror)
+     * @param mid
+     * @return
+     */
+    public Feature getMidBehaviorFeature(String tableName, String mid) {
+        List<FeatureKeyProto> protos = new ArrayList<>();
+        protos.add(genWithMid(tableName, mid));
+        return this.invokeFeatureService(protos);
+    }
+
+
     public Feature invokeFeatureService(List<FeatureKeyProto> protos) {
 
         Map<String, String> featureMap = remoteService.getFeature(protos);

+ 11 - 0
ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/predict/impl/PredictModelServiceImpl.java

@@ -77,6 +77,8 @@ public class PredictModelServiceImpl implements PredictModelService {
     private PredictStrategyBy819 predictStrategyBy819;
     @Autowired
     private PredictStrategyBy820 predictStrategyBy820;
+    @Autowired
+    private PredictStrategyByRor predictStrategyByRor;
 
     @Autowired
     private UserService userService;
@@ -237,6 +239,15 @@ public class PredictModelServiceImpl implements PredictModelService {
                     return predictResult;
                 }
             }
+            // ror行为策略
+            Map<String, Object> userRorPredict = predictStrategyByRor.predict(predictContext);
+            if (MapUtils.isNotEmpty(userRorPredict)) {
+                // 填充 819 参数
+                if (MapUtils.isNotEmpty(predictExtInfo)) {
+                    userRorPredict.putAll(predictExtInfo);
+                }
+                return userRorPredict;
+            }
 
             Map<String, Object> predictResult;
             if (expCodes.contains("599")){

+ 6 - 2
ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/predict/v2/BasicPredict.java

@@ -24,8 +24,12 @@ public abstract class BasicPredict {
     protected Map<String, Object> rtnAdPredict(PredictContext ctx) {
         Map<String, Object> rtnMap = new HashMap<>();
         rtnMap.put("ad_predict", 2);
-        rtnMap.putAll(ctx.getLogParam().getScoreMap());
-        rtnMap.put("pqtId", ctx.getPqtId());
+        if (ctx != null) {
+            if( ctx.getLogParam() != null){
+                rtnMap.putAll(ctx.getLogParam().getScoreMap());
+            }
+            rtnMap.put("pqtId", ctx.getPqtId());
+        }
         return rtnMap;
     }
 

+ 215 - 0
ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/predict/v2/PredictStrategyByRor.java

@@ -0,0 +1,215 @@
+package com.tzld.piaoquan.ad.engine.service.predict.v2;
+
+import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
+import com.tzld.piaoquan.ad.engine.commons.enums.RedisPrefixEnum;
+import com.tzld.piaoquan.ad.engine.commons.redis.AdRedisHelper;
+import com.tzld.piaoquan.ad.engine.service.feature.Feature;
+import com.tzld.piaoquan.ad.engine.service.feature.FeatureService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 基于 ROR的广告预测策略
+ * <p>
+ * 核心逻辑:
+ * 1. 根据用户的历史行为特征(启动次数launchs、留存率ror、人群分层ad_level)计算展示广告的概率阈值
+ * 2. 通过 mid 的 hash 值生成伪随机分数
+ * 3. 如果分数 <= 阈值,则展示广告;否则不展示
+ * <p>
+ * 用于控制不同用户群体的广告曝光频率,实现精细化运营
+ */
+@Slf4j
+@Service
+public class PredictStrategyByRor extends BasicPredict {
+
+    /** 特征服务,用于获取用户行为特征 */
+    @Autowired
+    private FeatureService featureService;
+
+    /** Redis 客户端,用于获取基于用户特征的概率配置 */
+    @Autowired
+    private AdRedisHelper adRedisHelper;
+
+    /**
+     * Apollo 动态配置:根据 rootSessionId 尾号和 appType 进行流量分桶
+     * <p>
+     * 配置格式示例:
+     * <pre>
+     * [
+     *   {
+     *     "appType": ["0", "3"],
+     *     "tail": ["0", "1", "2"],
+     *     "config": {"default_probability": 0.5}
+     *   }
+     * ]
+     * </pre>
+     */
+    @ApolloJsonValue("${experiment.ror.root.session.id.tail.config:[]}")
+    private List<RootSessionIdTailConfigItem> configItems;
+
+    private static final String TABLE_NAME = "alg_mid_history_behavior_1month";
+
+    /**
+     * 策略名称标识
+     */
+    @Override
+    public String name() {
+        return "launch_layer_ror";
+    }
+
+    /**
+     * 核心预测方法:决定是否向用户展示广告
+     *
+     * @param ctx 预测上下文,包含 mid、appType、rootSessionId 等信息
+     * @return 预测结果 Map,包含:
+     *         - ad_predict: 1=不展示广告,2=展示广告
+     *         - score: 用户的伪随机分数
+     *         - threshold: 广告展示概率阈值
+     *         - launchs/ror/ad_level: 用户行为特征
+     *         - 返回 null 表示跳过该策略
+     */
+    @Override
+    public Map<String, Object> predict(PredictContext ctx) {
+
+        try {
+            String rootSessionId = ctx.getRootSessionId();
+
+            // 前置校验:配置为空或 rootSessionId 为空时,返回 null(跳过该策略)
+            if (CollectionUtils.isEmpty(configItems) || StringUtils.isBlank(rootSessionId)) {
+                return null;
+            }
+
+            String appType = ctx.getAppType();
+
+            // 获取默认概率阈值(基于 rootSessionId 尾号和 appType 匹配配置)
+            Double defaultProbability = getDefaultProbability(rootSessionId, appType);
+
+            // 用户不在实验分桶内,跳过该策略
+            if (defaultProbability == null) {
+                return null;
+            }
+
+            Map<String, Object> rtnMap = new HashMap<>();
+
+            // 用户行为特征变量(来自离线特征表 alg_mid_history_behavior_1month)
+            String launchs = null;   // 启动次数分桶(如 "0-5", "5-10" 等)
+            String ror = null;       // 留存率分桶
+            String adLevel = null;   // 广告等级(用户对广告的敏感度分层)
+
+            // 根据 mid 获取用户近一个月的历史行为特征
+            Feature feature = featureService.getMidBehaviorFeature(TABLE_NAME, ctx.getMid());
+
+            // 安全地提取特征值(多层 null 检查)
+            if (feature != null && feature.getUserFeature() != null && feature.getUserFeature().get("") != null) {
+                Map<String, String> algMidHistoryBehavior1month = feature.getUserFeature().get(TABLE_NAME);
+                launchs = algMidHistoryBehavior1month.get("launchs");
+                ror = algMidHistoryBehavior1month.get("ror");
+                adLevel = algMidHistoryBehavior1month.get("ad_level");
+            }
+
+            // 计算最终的广告展示概率阈值
+            // 优先使用 Redis 中基于用户特征的精细化阈值,否则使用默认阈值
+            double showAdProbability = getShowAdProbability(launchs, ror, adLevel, defaultProbability);
+
+            // 基于 mid 的 hash 值生成 [0, 1) 范围内的伪随机分数
+            // 同一个 mid 在同一小时内(RandW 每小时更新)会得到相同的分数
+            double score = this.calcScoreByMid(ctx.getMid());
+
+            // 核心决策逻辑:分数 <= 阈值 → 展示广告
+            if (score <= showAdProbability) {
+                // 展示广告,ad_predict = 2
+                rtnMap.putAll(rtnAdPredict(ctx));
+                rtnMap.put("model", this.name());
+            } else {
+                // 不展示广告,ad_predict = 1
+                rtnMap.putAll(rtnNoAdPredict(ctx));
+                rtnMap.put("no_ad_strategy", this.name());
+            }
+
+            // 记录决策相关的特征和参数,用于日志分析和效果追踪
+            rtnMap.put("score", score);
+            rtnMap.put("threshold", showAdProbability);
+            rtnMap.put("launchs", launchs);
+            rtnMap.put("ror", ror);
+            rtnMap.put("ad_level", adLevel);
+            return rtnMap;
+        } catch (Exception e) {
+            log.error("[PredictStrategyByRor] predict error, ctx: {}", ctx, e);
+            return null;
+        }
+    }
+
+    /**
+     * 获取广告展示概率阈值
+     * <p>
+     * 策略:根据用户的 (ad_level, launchs, ror) 组合从 Redis 查询对应的概率值
+     * 如果查询失败或无数据,则使用默认概率
+     *
+     * @param launchs            启动次数分桶
+     * @param ror                留存率分桶
+     * @param ad_level           人群分层
+     * @param defaultProbability 默认概率(兜底值)
+     * @return 广告展示概率阈值 [0, 1]
+     */
+    private double getShowAdProbability(String launchs, String ror, String ad_level, Double defaultProbability) {
+        // 任一特征为空,使用默认概率
+        if (StringUtils.isAnyBlank(launchs, ror, ad_level)) {
+            return defaultProbability;
+        }
+        try {
+            // 构建 Redis key:格式为 "ad_level:launchs:ror",例如 "有转化:10:000"
+            String keyId = ad_level + ":" + launchs + ":" + ror;
+            String key = String.format(RedisPrefixEnum.AD_USER_ROR_BEHAVIOR.getPrefix(), keyId);
+
+            // 从 Redis 获取概率值
+            String probability = adRedisHelper.get(key);
+
+            // 解析概率值,如果为 null 则使用默认值
+            return probability == null ? defaultProbability : Double.parseDouble(probability);
+        } catch (Exception e) {
+            // 解析失败(如非数字字符串)或 Redis 异常,记录错误并使用默认值
+            log.error("getShowAdProbability error, launchs: {}, ror: {}, ad_level: {}, e = ", launchs, ror, ad_level, e);
+            return defaultProbability;
+        }
+    }
+
+    /**
+     * 获取默认概率阈值
+     * <p>
+     * 根据 rootSessionId 的最后一位字符(尾号)和 appType 匹配配置,用于流量分桶实验
+     *
+     * @param rootSessionId 根会话 ID
+     * @param appType       应用类型
+     * @return 默认概率,如果不匹配任何配置则返回 null
+     */
+    private Double getDefaultProbability(String rootSessionId, String appType) {
+        // 前置校验
+        if (CollectionUtils.isEmpty(configItems) || StringUtils.isAnyBlank(rootSessionId) || appType == null) {
+            return null;
+        }
+
+        // 提取 rootSessionId 的最后一个字符作为尾号,用于流量分桶
+        String tail = rootSessionId.substring(rootSessionId.length() - 1);
+
+        // 遍历配置项,查找同时匹配 appType 和尾号的配置
+        for (RootSessionIdTailConfigItem item : configItems) {
+            if (item.getAppType() != null && item.getTail() != null) {
+                if (item.getAppType().contains(appType) && item.getTail().contains(tail)) {
+                    return item.getConfig().get("default_probability");
+                }
+            }
+
+        }
+
+        // 未匹配到任何配置
+        return null;
+    }
+
+}