丁云鹏 1 месяц назад
Родитель
Сommit
c98cc14537

+ 1 - 0
.gitignore

@@ -1,5 +1,6 @@
 # ---> Java
 *.class
+*.DS_Store
 
 # Mobile Tools for Java (J2ME)
 .mtj.tmp/

BIN
recommend-feature-produce/.DS_Store


+ 1 - 1
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/service/RedisService.java

@@ -46,7 +46,7 @@ public class RedisService implements Serializable {
             String redisKey = redisKey(record, config);
             String value = JSONUtils.toJson(record);
             batch.put(redisKey, value);
-            if (batch.size() >= 5000) {
+            if (batch.size() >= 3000) {
                 mSet(jedis, batch, expire, TimeUnit.SECONDS);
                 batch.clear();
             }

+ 171 - 0
recommend-feature-produce/src/main/python/monitor.py

@@ -0,0 +1,171 @@
+import json5
+import redis
+import requests
+from typing import List, Dict, Any
+
+# Redis连接配置(根据实际情况修改)
+REDIS_CONFIG = {
+    'host': 'r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com',
+    'port': 6379,
+    'db': 0,
+    'password': 'Wqsd@2019'
+}
+
+# 飞书机器人Webhook地址(根据实际情况修改)
+FEISHU_WEBHOOK = "https://open.feishu.cn/open-apis/bot/v2/hook/540d4098-367a-4068-9a44-b8109652f07c"
+APOLLO_SERVER = "http://apolloconfig-internal.piaoquantv.com"
+APP_ID = "recommend-feature"
+NAMESPACE = "application"  # 或 "application"
+
+
+def get_apollo_config(
+    config_server_url: str,
+    app_id: str,
+    namespace_name: str = "application",
+    cluster_name: str = "default",
+    timeout: int = 5
+) -> Dict[str, Any]:
+    """
+    通过带缓存的Http接口从Apollo读取配置
+    
+    :param config_server_url: Apollo配置服务的地址
+    :param app_id: 应用的appId
+    :param namespace_name: Namespace的名字,默认为"application"
+    :param cluster_name: 集群名,默认为"default"
+    :param timeout: 请求超时时间(秒),默认为5
+    :return: 解析后的配置字典
+    :raises: ValueError - 当配置解析失败时
+    :raises: requests.RequestException - 当HTTP请求失败时
+    """
+    # 构造请求URL
+    url = f"{config_server_url}/configfiles/json/{app_id}/{cluster_name}/{namespace_name}"
+    
+    try:
+        # 发送HTTP GET请求
+        response = requests.get(url, timeout=timeout)
+        response.raise_for_status()  # 检查HTTP错误
+        
+        # 解析响应内容
+        config_data = response.json()
+        
+        # 处理非properties类型的namespace(返回内容在content字段中)
+        if "content" in config_data and isinstance(config_data["content"], str):
+            try:
+                return json5.loads(config_data["content"])
+            except json5.JSONDecodeError as e:
+                raise ValueError(f"Failed to parse namespace content: {e}")
+        
+        return config_data
+    
+    except json5.JSONDecodeError as e:
+        raise ValueError(f"Invalid JSON response: {e}")
+    except requests.RequestException as e:
+        raise requests.RequestException(f"Failed to get Apollo config: {e}")
+
+def parse_apollo_config(config_str: str) -> List[Dict[str, Any]]:
+    """解析Apollo配置"""
+    try:
+        config = json5.loads(config_str)
+        return config
+    except json5.JSONDecodeError as e:
+        raise ValueError(f"Invalid JSON config: {e}")
+
+def get_table_list(config: List[Dict[str, Any]]) -> List[str]:
+    """从配置中获取table列表"""
+    return [item['odps']['table'] for item in config]
+
+def check_redis_ttl(redis_conn: redis.Redis, table_name: str, threshold: int = 3600) -> bool:
+    """
+    检查Redis key的TTL是否小于阈值
+    :param redis_conn: Redis连接
+    :param threshold: 阈值(秒)
+    :return: 如果TTL小于阈值返回True,否则返回False
+    """
+    # 这里假设我们检查第一个匹配的key的TTL
+    # 实际情况可能需要根据业务逻辑调整
+    key = f"ttl:{table_name}"
+    print(key)
+    
+    ttl = redis_conn.ttl(key)
+    print(ttl)
+    return ttl, ttl < threshold
+
+def send_feishu_alert(message: str, webhook_url: str = FEISHU_WEBHOOK) -> bool:
+    """
+    发送飞书消息报警
+    :param message: 报警消息内容
+    :param webhook_url: 飞书机器人Webhook地址
+    :return: 是否发送成功
+    """
+    headers = {
+        "Content-Type": "application/json"
+    }
+    
+    payload = {
+        "msg_type": "text",
+        "content": {
+            "text": message
+        }
+    }
+    
+    try:
+        response = requests.post(webhook_url, headers=headers, json=payload)
+        response.raise_for_status()
+        return True
+    except requests.RequestException as e:
+        print(f"Failed to send Feishu alert: {e}")
+        return False
+
+def main():
+    # 1. 解析Apollo配置
+    try:
+        # 获取配置
+        all_config = get_apollo_config(
+            config_server_url=APOLLO_SERVER,
+            app_id=APP_ID,
+            namespace_name=NAMESPACE
+        )
+
+        config = json5.loads(all_config["dts.config"])
+        print(config)
+
+    except ValueError as e:
+        print(f"Error parsing config: {e}")
+        return
+    
+    # 2. 获取table列表
+    table_list = get_table_list(config)
+    print(f"Table list: {table_list}")
+    
+    # 3. 初始化Redis连接
+    try:
+        redis_conn = redis.Redis(**REDIS_CONFIG)
+        redis_conn.ping()  # 测试连接
+    except redis.RedisError as e:
+        print(f"Failed to connect to Redis: {e}")
+        return
+    
+    # 4. 遍历配置检查TTL
+    alert_msg=[]
+    for item in config:
+        redis_config = item['redis']
+        prefix = redis_config['prefix']
+        table_name = item['odps']['table']
+        
+        try:
+            ttl,check=check_redis_ttl(redis_conn, table_name)
+            if check:
+                alert_msg.append(f"Table: {table_name} Prefix: {prefix} TTL: {ttl}")
+        except redis.RedisError as e:
+            print(f"Error checking Redis TTL for table {table_name}: {e}")
+
+        # 5. 发送飞书报警
+    if alert_msg:  # 如果有报警消息才发送
+        alert_msg_str = "\n".join(alert_msg)  # 用换行符连接所有消息
+        if not send_feishu_alert(f"Redis TTL Alert: TTL is less than 3600s!\n{alert_msg_str}"):
+            print("Failed to send Feishu alert")
+    else:
+        print("No Redis TTL alerts found.")
+
+if __name__ == "__main__":
+    main()

+ 1 - 0
recommend-feature-produce/src/main/python/vm.sh

@@ -0,0 +1 @@
+source myenv/bin/activate