Jelajahi Sumber

feat:添加XGB训练

zhaohaipeng 9 bulan lalu
induk
melakukan
338da788fd
9 mengubah file dengan 365 tambahan dan 8 penghapusan
  1. 3 1
      .gitignore
  2. 0 0
      XGB/__init__.py
  3. 186 0
      XGB/xgboost_train.py
  4. 20 0
      client/K8SClient.py
  5. 30 6
      client/ODPSClient.py
  6. 68 0
      client/XxlJobClient.py
  7. 4 1
      config/config.ini
  8. 25 0
      k8s.py
  9. 29 0
      xxl_job.py

+ 3 - 1
.gitignore

@@ -58,4 +58,6 @@ docs/_build/
 # PyBuilder
 target/
 
-.idea
+.idea
+
+XGB/new*

+ 0 - 0
XGB/__init__.py


+ 186 - 0
XGB/xgboost_train.py

@@ -0,0 +1,186 @@
+import concurrent.futures
+import json
+import logging
+from datetime import datetime, timedelta
+
+import pandas as pd
+import xgboost as xgb
+
+from client import ODPSClient
+
+odps_client = ODPSClient.ODPSClient()
+
+features_name = ['1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01', '3_vov01', '4_vov01', '5_vov01',
+                 '3_vov012', '4_vov012', '5_vov012', "12_change", "23_change", "34_change", '2_vov01', '3_vov01',
+                 '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012']
+
+column_names = ['曝光占比', 'vov0', '分子', '分母', '1_vov0', '2_vov0', '3_vov0', '4_vov0', '5_vov0', '2_vov01',
+                '3_vov01', '4_vov01', '5_vov01', '3_vov012', '4_vov012', '5_vov012', '1_vov0_分子', '1_vov0_分母',
+                '2_vov0_分子', '2_vov0_分母', '3_vov0_分子', '3_vov0_分母', '4_vov0_分子', '4_vov0_分母',
+                '5_vov0_分子', '5_vov0_分母', '2_vov01_分子', '2_vov01_分母', '3_vov01_分子', '3_vov01_分母',
+                '4_vov01_分子', '4_vov01_分母', '5_vov01_分子', '5_vov01_分母', '3_vov012_分子', '3_vov012_分母',
+                '4_vov012_分子', '4_vov012_分母', '5_vov012_分子', '5_vov012_分母']
+
+# 配置日志格式和日志级别
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+
+# 创建一个logger
+logger = logging.getLogger(__name__)
+
+
+def get_partition_df(table, dt):
+    logger.info(f"开始下载: {table} -- {dt} 的数据")
+
+    download_session = odps_client.get_download_session(table, dt)
+    logger.info(f"表: {table} 中的分区 {dt}, 共有 {download_session.count} 条数据")
+    with download_session.open_arrow_reader(0, download_session.count) as reader:
+        # 将所有数据加载到 DataFrame 中
+        df = pd.concat([batch.to_pandas() for batch in reader])
+
+    logger.info(f"下载结束: {table} -- {dt} 的数据")
+    return df
+
+
+def fetch_label_data(label_dt):
+    """
+    获取 label数据
+    :return:
+    """
+    logger.info(f"fetch_label_data.dt: {label_dt}")
+
+    def extract_label(row):
+        feature = json.loads(row['feature'])
+        return pd.Series({
+            'vid': row['vid'],
+            '分母': int(feature['1_vov0_分母']),
+            "分子": feature['1_vov0_分子'],
+            'vov0': feature['1_vov0']
+        })
+
+    train_df = get_partition_df("alg_vid_vov_new", label_dt)
+    applied_df = train_df.apply(extract_label, axis=1)
+
+    # 计算曝光占比
+    view_sum = applied_df['分母'].sum()
+    applied_df['曝光占比'] = round(applied_df['分母'] / view_sum, 6)
+    return applied_df
+
+
+def fetch_feature_data(feature_dt):
+    """
+    获取feature数据
+    :return:
+    """
+
+    logger.info(f"fetch_feature_data.dt: {feature_dt}")
+
+    def extract_feature(row):
+        feature = json.loads(row['feature'])
+        return pd.Series({
+            'vid': row['vid'],
+            **feature
+        })
+
+    feature_df = get_partition_df("alg_vid_vov_new", feature_dt)
+    return feature_df.apply(extract_feature, axis=1)
+
+
+def fetch_data(label_datetime: datetime):
+    label_dt = label_datetime.strftime("%Y%m%d")
+    feature_dt = (label_datetime - timedelta(days=1)).strftime("%Y%m%d")
+
+    with concurrent.futures.ThreadPoolExecutor(2) as executor:
+        label_future = executor.submit(fetch_label_data, label_dt)
+        feature_future = executor.submit(fetch_feature_data, feature_dt)
+        label_apply_df = label_future.result()
+        feature_apply_df = feature_future.result()
+
+    df = pd.merge(label_apply_df, feature_apply_df, on="vid", how='left')
+    df.fillna(0, inplace=True)
+    df.sort_values(by=['曝光占比'], ascending=False, inplace=True)
+
+    for col in column_names:
+        df[col] = pd.to_numeric(df[col], errors='coerce')
+
+    df["12_change"] = df["1_vov0"] - df["2_vov0"]
+    df["23_change"] = df["2_vov0"] - df["3_vov0"]
+    df["34_change"] = df["3_vov0"] - df["4_vov0"]
+
+    feature_array = df[features_name].values
+    df["label"] = df["vov0"].apply(lambda x: 1 if x > 0.25 else 0)
+    label_array = df["label"].values
+
+    return df, feature_array, label_array
+
+
+def _main():
+    logger.info(f"XGB模型训练")
+
+    df, trains_array, trains_label_array = fetch_data((datetime.now() - timedelta(days=2)))
+    logger.info("特征获取完成,开始训练")
+    model = xgb.XGBClassifier(
+        n_estimators=100,
+        learning_rate=0.01,
+        max_depth=5,
+        min_child_weight=1,
+        gamma=0,
+        subsample=0.8,
+        colsample_bytree=0.8,
+        objective='binary:logistic',
+        nthread=8,
+        scale_pos_weight=1,
+        random_state=2024,
+        seed=2024,
+    )
+    model.fit(trains_array, trains_label_array, verbose=True)
+
+    logger.info("获取评测数据")
+    df_test, tests_array, _ = fetch_data(datetime.now() - timedelta(days=1))
+    y_pred = model.predict_proba(tests_array)[:, 1]
+    df_test["y_pred"] = y_pred
+
+    condition_choose = ((df_test['y_pred'] <= 0.2)
+                        # & ((df_test['1_vov0_分母'] > 50) | (df_test['2_vov0_分母'] > 50) | (df_test['3_vov0_分母'] > 50))
+                        & (df_test.index <= 10000)
+                        )
+    profit_threshold = 0.3
+    condition_choose_real = condition_choose & (df_test['vov0'] <= profit_threshold)
+    df_test["condition_choose"] = condition_choose
+    df_test[["vid", "曝光占比", "vov0", "condition_choose"]].to_csv(
+        "new_" + (datetime.now() - timedelta(days=1)).strftime("%Y%m%d"), sep="\t", index=False)
+
+    choose_bad = condition_choose.sum()
+    choose_bad_real_bad = condition_choose_real.sum()
+    acc = choose_bad_real_bad / choose_bad
+    logger.info(
+        f"acc:{acc} "
+        f"分子={choose_bad_real_bad} "
+        f"分母={choose_bad} "
+        f"总视频数={df_test.size} "
+        f"盈利计算标注vov0大于:{profit_threshold}"
+    )
+
+    surface = df_test.loc[condition_choose, '曝光占比'].sum()
+    surface_income = df_test.loc[condition_choose_real, '曝光占比'].sum()
+    logger.info(
+        f"总影响面:{round(surface, 6)} "
+        f"盈利影响面:{round(surface_income, 6)} "
+        f"亏损影响面:{round(surface - surface_income, 6)}"
+    )
+
+    df_test["profit_loss_value"] = df_test['分母'] * (df_test['vov0'] - profit_threshold)
+    profit_loss_value = df_test.loc[condition_choose, 'profit_loss_value'].sum()
+    profit_value = df_test.loc[condition_choose_real, 'profit_loss_value'].sum()
+    logger.info(
+        f"总盈亏:{round(profit_loss_value, 1)} "
+        f"纯盈利:{round(profit_value, 1)} "
+        f"纯亏损:{round(profit_loss_value - profit_value, 1)} "
+        f"盈利效率:{round(profit_loss_value / profit_value, 6)}"
+    )
+
+
+if __name__ == '__main__':
+    try:
+        _main()
+    except Exception as e:
+        logger.error("VOV过滤XGB模型训练异常: ", e)

+ 20 - 0
client/K8SClient.py

@@ -0,0 +1,20 @@
+from kubernetes import client, config
+
+
+class K8SClient(object):
+
+    def __init__(self, config_file):
+        self.config_file = config_file
+        config.load_kube_config(self.config_file)
+
+        self.core_api = client.CoreV1Api()
+        self.apps_api = client.AppsV1Api()
+
+    def get_pod_info_by_deployment(self, deployment_name, namespace='default'):
+        deployment = self.apps_api.read_namespaced_deployment(name=deployment_name, namespace=namespace)
+        replicaset_selector = deployment.spec.selector.match_labels
+        # 根据 ReplicaSet 的标签选择器查找所有相关的 Pod
+        label_selector = ",".join([f"{key}={value}" for key, value in replicaset_selector.items()])
+        pods = self.core_api.list_namespaced_pod(namespace=namespace, label_selector=label_selector)
+
+        return pods.items

+ 30 - 6
client/ODPSClient.py

@@ -7,8 +7,8 @@ from odps.tunnel import TableTunnel
 from util import convert_util, date_util
 
 
-class ODPSClient:
-    def __init__(self):
+class ODPSClient(object):
+    def __init__(self, project="loghubods"):
         self.accessId = "LTAIWYUujJAm7CbH"
         self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
         self.endpoint = "http://service.odps.aliyun.com/api"
@@ -16,16 +16,40 @@ class ODPSClient:
         self.odps = ODPS(
             self.accessId,
             self.accessSecret,
-            "",
+            project,
             self.endpoint
         )
 
-    def get_all_record(self, project: str, table: str, dt: str) -> list:
+    def get_all_record(self, table: str, dt: str) -> list:
         tunnel = TableTunnel(self.odps)
-        download_session = tunnel.create_download_session(f"{project}.{table}", partition_spec=f"dt={dt}")
+
+        download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
+        count = download_session.count
+
+        print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据")
+        result = []
         with download_session.open_record_reader(0, download_session.count) as reader:
             for record in reader:
-                print(record)
+                result.append(record)
+        return result
+
+    def get_download_session(self, table: str, dt: str):
+        tunnel = TableTunnel(self.odps)
+
+        return tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
+
+    def get_all_record_batch(self, table: str, dt: str) -> list:
+        tunnel = TableTunnel(self.odps)
+
+        download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
+        count = download_session.count
+
+        print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据")
+        result = []
+        with download_session.open_arrow_reader(0, download_session.count) as reader:
+            for batch in reader:
+                result.append(batch)
+        return result
 
     @classmethod
     def get_all_partition_info(cls, table_name: str, page_num=1, page_size=30, project="loghubods") -> list[dict]:

+ 68 - 0
client/XxlJobClient.py

@@ -0,0 +1,68 @@
+import json
+
+import requests
+
+header = {
+    "Cookie": "Hm_lvt_7f5c31e86e4cc8a706f7b3957d81f353=1714270679; "
+              "XXL_JOB_LOGIN_IDENTITY=7b226964223a312c22757365726e616d65223a2261646d696e222c2270617373776f7264"
+              "223a226531306164633339343962613539616262653536653035376632306638383365222c22726f6c65223a312c2270"
+              "65726d697373696f6e223a6e756c6c7d",
+    "content-type": "application/x-www-form-urlencoded; charset=UTF-8"
+}
+
+
+class XxJobClient(object):
+
+    def __init__(self, base_url):
+        self.base_url = base_url
+
+    def get_all_job_log(self, job_group=0, job_id=0, log_status=-1) -> list[dict]:
+        """
+        获取job的执行日志
+        :param job_group: 执行器ID, 0-全部
+        :param job_id: 任务ID, 0-全部
+        :param log_status:  任务状态,-1-全部, 1-成功, 2-失败, 3-进行中
+        :return:
+        """
+        url = f"{self.base_url}/joblog/pageList"
+        param = {
+            "start": 0,
+            "length": 1000,
+            "jobGroup": job_group,
+            "jobId": job_id,
+            "logStatus": log_status,
+            "filterTime": "2024-09-27 00:00:00 - 2024-09-27 23:59:59"
+        }
+        response = requests.post(url, data=param, headers=header)
+        return json.loads(response.content)['data']
+
+    def get_all_job_group(self) -> list[dict]:
+        """
+        获取所有的执行器
+        :return:
+        """
+
+        url = f"{self.base_url}/jobgroup/pageList"
+        param = {
+            "start": 0,
+            "length": 1000,
+        }
+        response = requests.post(url, data=param, headers=header)
+        return json.loads(response.content)['data']
+
+    def get_all_job_info(self, job_group=0, trigger_status=-1) -> list[dict]:
+        """
+        获取所有的job信息
+        :param job_group: 执行器ID
+        :param trigger_status: 状态 0-关,1-开,-1-全部
+        :return:
+        """
+        url = f"{self.base_url}/jobinfo/pageList"
+        param = {
+            "jobGroup": job_group,
+            "start": 0,
+            "length": 1000,
+            "triggerStatus": trigger_status
+        }
+        response = requests.post(url, data=param, headers=header)
+        return json.loads(response.content)['data']

+ 4 - 1
config/config.ini

@@ -1,2 +1,5 @@
 [feishu]
-model.webhook = https://open.feishu.cn/open-apis/bot/v2/hook/540d4098-367a-4068-9a44-b8109652f07c
+model.webhook = https://open.feishu.cn/open-apis/bot/v2/hook/540d4098-367a-4068-9a44-b8109652f07c
+
+[k8s]
+config.file=/Users/zhao/.kube/config_prod

+ 25 - 0
k8s.py

@@ -0,0 +1,25 @@
+import argparse
+import configparser
+import json
+
+from client import K8SClient
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(description="k8s")
+    parser.add_argument("-c", "--config", required=False, help="config file path",
+                        default="/Users/zhao/Desktop/Code/Python/model_monitor/config/config.ini")
+    args = parser.parse_args()
+
+    # 读取配置文件
+    config = configparser.ConfigParser()
+    config.read(args.config)
+    k8s_config_file = config.get("k8s", "config.file")
+
+    k8s_client = K8SClient.K8SClient(k8s_config_file)
+
+    pods = k8s_client.get_pod_info_by_deployment("ad-engine")
+
+    for pod in pods:
+        for item in pod.status.conditions:
+            print(pod.status)
+            print(item)

+ 29 - 0
xxl_job.py

@@ -0,0 +1,29 @@
+from client import XxlJobClient
+
+XXL_JOB_BASE_URL = "http://xxl-job-internal.piaoquantv.com/xxl-job-admin"
+
+xxl_job_client = XxlJobClient.XxJobClient(XXL_JOB_BASE_URL)
+
+
+def _main():
+    all_job_group = {}
+    for item in xxl_job_client.get_all_job_group():
+        all_job_group[item['id']] = item
+
+    for item in xxl_job_client.get_all_job_info():
+        job_name = item['jobDesc']
+        job_id = item['id']
+        job_author = item['author']
+        job_group = item['jobGroup']
+        for log_item in xxl_job_client.get_all_job_log(job_id=job_id, log_status=2):
+            job_group_info = all_job_group[job_group]
+            print(
+                f"【任务执行失败】执行器: {job_group_info['title']} "
+                f"任务ID: {job_id}, "
+                f"任务名称: {job_name},"
+                f" 执行时间: {log_item['triggerTime']}"
+            )
+
+
+if __name__ == '__main__':
+    _main()