Ver Fonte

feat:VOV模型

zhaohaipeng há 9 meses atrás
pai
commit
cea9decefe

+ 65 - 45
XGB/check_data.py

@@ -1,45 +1,65 @@
-import pandas as pd
-
-old_date_train = f"/Users/zhao/Downloads/20241010.csv"
-new_date_train = f"/Users/zhao/Desktop/Code/Python/model_monitor/XGB/data/all/20241010_train.csv"
-
-# 读取两个 CSV 文件
-old_df = pd.read_csv(old_date_train)
-new_df = pd.read_csv(new_date_train)
-
-if old_df.shape[0] != new_df.shape[0]:
-    print(f"新老训练数据集长度不一样 新数据集: {new_df.shape[0]}, 老数据集: {old_df.shape[0]}")
-
-old_df_col = old_df.columns
-new_df_col = new_df.columns
-if len(old_df_col) != len(new_df_col):
-    print(f"两个文件列数不一样 新文件: {new_df_col}, 老文件: {old_df_col}")
-
-for col in old_df_col:
-    if col not in new_df_col:
-        print(f"列 {col} 在老文件存在,新文件不存在")
-
-for col in new_df_col:
-    if col not in old_df_col:
-        print(f"列 {col} 在新文件存在,老文件不存在")
-
-old_df.set_index("vid", inplace=True)
-new_df.set_index("vid", inplace=True)
-
-old_dict = old_df.to_dict(orient="index")
-new_dict = new_df.to_dict(orient="index")
-
-for e in new_dict:
-    if e not in old_dict:
-        print(f"vid {e} 在新文件中存在,在老文件中不存在")
-    new_row = new_dict[e]
-    old_row = old_dict[e]
-    for col in new_df_col:
-        if col not in old_row:
-            print(f"vid {e} 的列 {col} 在老文件中不存在")
-            continue
-        if col in new_row:
-            print(f"vid {e} 的列 {col} 在新文件中不存在")
-            continue
-        if old_row[col] != new_row[col]:
-            print(f"vid {e} 列 {col} 的值在新老文件不一样, 新文件的值: {new_row[col]}, 老文件的值: {old_row[col]}")
+# import pandas as pd
+#
+# old_date_train = f"/Users/zhao/Desktop/Code/Python/model_monitor/XGB/data/all/20241012_predict.csv"
+# new_date_train = f"/Users/zhao/Desktop/Code/Python/model_monitor/XGB/20241012_predict_1.csv"
+#
+# # 读取两个 CSV 文件
+# old_df = pd.read_csv(old_date_train)
+# new_df = pd.read_csv(new_date_train)
+#
+# if old_df.shape[0] != new_df.shape[0]:
+#     print(f"新老训练数据集长度不一样 新数据集: {new_df.shape[0]}, 老数据集: {old_df.shape[0]}")
+#
+# old_df_col = old_df.columns
+# new_df_col = new_df.columns
+# if len(old_df_col) != len(new_df_col):
+#     print(f"两个文件列数不一样 新文件: {new_df_col}, 老文件: {old_df_col}")
+#
+# for col in old_df_col:
+#     if col not in new_df_col:
+#         print(f"列 {col} 在老文件存在,新文件不存在")
+#
+# for col in new_df_col:
+#     if col not in old_df_col:
+#         print(f"列 {col} 在新文件存在,老文件不存在")
+#
+# old_df.set_index("vid", inplace=True)
+# new_df.set_index("vid", inplace=True)
+#
+# old_dict = old_df.to_dict(orient="index")
+# new_dict = new_df.to_dict(orient="index")
+#
+# for e in new_dict:
+#     if e not in old_dict:
+#         print(f"vid {e} 在新文件中存在,在老文件中不存在")
+#     new_row = new_dict[e]
+#     old_row = old_dict[e]
+#     for col in new_df_col:
+#         if col in ['vid', '曝光占比', '分子', '分母', 'label']:
+#             continue
+#         if col not in old_row:
+#             print(f"vid {e} 的列 {col} 在老文件中不存在")
+#             continue
+#         # if col in new_row:
+#         #     print(f"vid {e} 的列 {col} 在新文件中不存在")
+#         #     continue
+#         if old_row[col] != new_row[col]:
+#             print(f"vid {e} 列 {col} 的值在新老文件不一样, 新文件的值: {new_row[col]}, 老文件的值: {old_row[col]}")
+#
+# # z_vid = set()
+# # with open("/Users/zhao/Desktop/Code/Python/rov-offline/write_redis/filtered_vid", "r") as f:
+# #     for line in f:
+# #         z_vid.add(line.replace("\n", ""))
+# #
+# # p_vid = set()
+# # with open("./filtered_vid.txt", "r") as f:
+# #     for line in f:
+# #         p_vid.add(line.replace("\n", ""))
+# #
+# # for e in z_vid:
+# #     if e not in p_vid:
+# #         print(f"VID: {e} 离线预测有,在线预测没有")
+# #
+# # for e in p_vid:
+# #     if e not in z_vid:
+# #         print(f"VID: {e} 在线预测有,离线预测没有")

+ 67 - 11
XGB/xgboost_train.py → XGB/vov_xgboost_train.py

@@ -7,8 +7,12 @@ import pandas as pd
 import xgboost as xgb
 
 from client import ODPSClient
+from config import ConfigManager
+from helper import RedisHelper
+from util import feishu_inform_util
 
 odps_client = ODPSClient.ODPSClient()
+config_manager = ConfigManager.ConfigManager()
 
 features_name = ['1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01', '3_vov01', '4_vov01', '5_vov01',
                  '3_vov012', '4_vov012', '5_vov012', "12_change", "23_change", "34_change", '2_vov01', '3_vov01',
@@ -22,7 +26,7 @@ column_names = ['曝光占比', 'vov0', '分子', '分母', '1_vov0', '2_vov0',
                 '4_vov012_分子', '4_vov012_分母', '5_vov012_分子', '5_vov012_分母']
 
 # 创建一个logger
-logger = logging.getLogger("xgboost_train.py")
+logger = logging.getLogger("vov_xgboost_train.py")
 logger.setLevel(logging.INFO)  # 设置日志级别
 
 # 创建Handler用于输出到文件
@@ -251,6 +255,7 @@ def xgb_multi_dt_data(t_1_label_dt: datetime):
 
         t_3_label_dt = t_1_label_dt - timedelta(2)
         logger.info(f"VOV模型特征数据处理:t_3_label_future.label_datetime: {t_3_label_dt.strftime('%Y%m%d')}")
+
         t_3_label_future = executor.submit(fetch_data, t_3_label_dt, t_3_label_dt - timedelta(1), t_3_label_dt)
         t_1_label_df = t_1_label_future.result()
         t_2_label_df = t_2_label_future.result()
@@ -261,7 +266,7 @@ def xgb_multi_dt_data(t_1_label_dt: datetime):
 
 def _main():
     logger.info(f"XGB模型训练")
-    train_df = xgb_multi_dt_data((datetime.now() - timedelta(days=2)))
+    train_df = xgb_multi_dt_data((datetime.now() - timedelta(days=3)))
     trains_array = train_df[features_name].values
     trains_label_array = train_df['label'].values
 
@@ -283,8 +288,8 @@ def _main():
     model.fit(trains_array, trains_label_array)
 
     logger.info("获取评测数据")
-    start_label_datetime = datetime.now() - timedelta(days=1)
-    feature_start_datetime = start_label_datetime - timedelta(1)
+    start_label_datetime = datetime.now() - timedelta(days=2)
+    feature_start_datetime = start_label_datetime
 
     predict_df = fetch_data(start_label_datetime, feature_start_datetime, start_label_datetime)
     tests_array = predict_df[features_name].values
@@ -293,19 +298,19 @@ def _main():
     condition_choose = (
             (predict_df['y_pred'] <= 0.1) &
             (
-                    (predict_df['4_vov0_分母'] > 50) |
                     (predict_df['2_vov0_分母'] > 50) |
-                    (predict_df['3_vov0_分母'] > 50)
+                    (predict_df['3_vov0_分母'] > 50) |
+                    (predict_df['4_vov0_分母'] > 50)
             ) &
             (
-                (predict_df['1_vov0'] - predict_df['2_vov0'] <= 0.1)
+                (predict_df['1_vov0'] - predict_df['2_vov0'] < 0.1)
             )
     )
     profit_threshold = 0.3
     condition_choose_real = condition_choose & (predict_df['vov0'] <= profit_threshold)
     predict_df["condition_choose"] = condition_choose
     predict_df[["vid", "曝光占比", "vov0", "condition_choose"]].to_csv(
-        "new_" + (datetime.now() - timedelta(days=1)).strftime("%Y%m%d"),
+        "./file/new_" + (datetime.now() - timedelta(days=1)).strftime("%Y%m%d"),
         sep="\t",
         index=False
     )
@@ -339,12 +344,63 @@ def _main():
         f"盈利效率:{round(profit_loss_value / profit_value, 6)}"
     )
 
-    filtered_vid = predict_df.loc[condition_choose_real, 'vid'].unique()
-    print(f"要过滤掉的视频ID为: {filtered_vid}")
+    filtered_vid = predict_df.loc[condition_choose, 'vid'].unique()
+
+    # 写入Redis
+    redis_key = f"redis:lower_vov_vid:{datetime.now().strftime('%Y%m%d')}"
+
+    logger.info(f"当前环境为: {config_manager.get_env()}, 要写入的Redis Key为: {redis_key}")
+    host, port, password = config_manager.get_algorithm_redis_info()
+    alg_redis = RedisHelper.RedisHelper(host=host, port=port, password=password)
+    for vid in filtered_vid.tolist():
+        alg_redis.add_number_to_set(redis_key, vid)
+
+    alg_redis.set_expire(redis_key, 86400)
 
 
 if __name__ == '__main__':
+    card_json = {
+        "config": {},
+        "i18n_elements": {
+            "zh_cn": [
+                {
+                    "tag": "markdown",
+                    "content": "",
+                    "text_align": "left",
+                    "text_size": "normal"
+                }
+            ]
+        },
+        "i18n_header": {
+            "zh_cn": {
+                "title": {
+                    "tag": "plain_text",
+                    "content": "XGB模型训练预测完成"
+                },
+                "template": "info"
+            }
+        }
+    }
+
     try:
-        _main()
+        # _main()
+
+        msg_text = f"\n- 所属项目: model_monitor" \
+                   f"\n- 所属环境: {config_manager.get_env()}" \
+                   f"\n- 告警描述: VOV预测模型训练和预测完成, 用于低VOV视频过滤"
+        card_json['i18n_elements']['zh_cn'][0]['content'] = msg_text
+
     except Exception as e:
         logger.error("VOV过滤XGB模型训练异常: ", e)
+        msg_text = f"\n- 所属项目: rov-offline" \
+                   f"\n- 告警名称: XGB模型训练失败" \
+                   f"\n- 所属环境: {config_manager.get_env()}" \
+                   f"\n- 告警描述: VOV预测模型训练和预测失败, 用于低VOV视频过滤"
+        card_json['i18n_header']['zh_cn']['template'] = "error"
+        card_json['i18n_header']['zh_cn']["title"]['content'] = "XGB模型训练预测失败"
+        card_json['i18n_elements']['zh_cn'][0]['content'] = msg_text
+        # 发送通知
+    feishu_inform_util.send_card_msg_to_feishu(
+        webhook=config_manager.get_vov_model_inform_feishu_webhook(),
+        card_json=card_json
+    )

+ 37 - 0
config/ConfigManager.py

@@ -0,0 +1,37 @@
+import configparser
+import os
+from pathlib import Path
+
+
+class ConfigManager(object):
+    def __init__(self):
+        # 获取环境变量 ROV_OFFLINE_ENV
+        env = os.environ.get('ROV_OFFLINE_ENV')
+        print(f"当前环境: {env}")
+        project_home = Path(__file__).parent.parent.absolute()
+        print(f"当前项目目录为: {project_home}")
+        self.env = env
+        self.project_home = project_home
+
+        if env == "pro":
+            self.config_file = f"{project_home}/config/config.ini"
+        else:
+            self.config_file = f"{project_home}/config/config_test.ini"
+        self.config = configparser.ConfigParser()
+        self.config.read(self.config_file)
+
+    def get_env(self):
+        return self.env if self.env is not None else "test"
+
+    def get_algorithm_redis_info(self):
+        return (
+            self.config.get("algorithm.redis", "host"),
+            self.config.get("algorithm.redis", "port"),
+            self.config.get("algorithm.redis", "password")
+        )
+
+    def get_model_inform_feishu_webhook(self):
+        return self.config.get("feishu", "model.webhook")
+
+    def get_vov_model_inform_feishu_webhook(self):
+        return self.config.get("feishu", "vov.model.webhook")

+ 7 - 1
config/config.ini

@@ -1,5 +1,11 @@
 [feishu]
 model.webhook = https://open.feishu.cn/open-apis/bot/v2/hook/540d4098-367a-4068-9a44-b8109652f07c
+vov.model.webhook = https://open.feishu.cn/open-apis/bot/v2/hook/cbf0bc0f-994a-489b-9f77-de0c5be107cd
 
 [k8s]
-config.file=/Users/zhao/.kube/config_prod
+config.file = /Users/zhao/.kube/config_prod
+
+[algorithm.redis]
+host = r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com
+port = 6379
+password = Wqsd@2019

+ 11 - 0
config/config_test.ini

@@ -0,0 +1,11 @@
+[feishu]
+model.webhook = https://open.feishu.cn/open-apis/bot/v2/hook/540d4098-367a-4068-9a44-b8109652f07c
+vov.model.webhook=https://open.feishu.cn/open-apis/bot/v2/hook/cbf0bc0f-994a-489b-9f77-de0c5be107cd
+
+[k8s]
+config.file = /Users/zhao/.kube/config
+
+[algorithm.redis]
+host = r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com
+port = 6379
+password = Wqsd@2019

+ 16 - 0
helper/RedisHelper.py

@@ -0,0 +1,16 @@
+import redis
+
+
+class RedisHelper(object):
+    def __init__(self, host: str, port: int, password: str):
+        self.host = host
+        self.port = port
+        self.password = password
+        self.redis_conn = redis.Redis(host=self.host, port=self.port, password=password)
+
+    def add_number_to_set(self, key: str, number):
+        print(f"Redis Set写入: {key} ---> {number}")
+        self.redis_conn.sadd(key, number)
+
+    def set_expire(self, key: str, expire: int):
+        self.redis_conn.expire(key, expire)

+ 0 - 0
helper/__init__.py


+ 0 - 0
log/__init__.py


+ 1 - 0
log/logging.py

@@ -0,0 +1 @@
+import logging

+ 13 - 0
sh/vov_xgboost_train.sh

@@ -0,0 +1,13 @@
+#!/bin/sh
+
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+
+echo "开始执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+  /root/anaconda3/bin/python ../XGB/vov_xgboost_train.py
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+  /root/anaconda3/bin/python ../XGB/vov_xgboost_train.py
+fi
+echo "结束执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+echo "all done"