ソースを参照

feat:修改下载脚本

zhaohaipeng 7 ヶ月 前
コミット
b0f1020c05
2 ファイル変更174 行追加87 行削除
  1. 153 73
      t.py
  2. 21 14
      vov/data_download.py

+ 153 - 73
t.py

@@ -1,77 +1,157 @@
-import numpy as np
-import pandas as pd
-
-from client import ODPSClient
-
-recall_result = "/Users/zhao/Desktop/20241124_recall.csv"
-day_oss = "/Users/zhao/Desktop/20241124_day_oss.csv"
-
-odps = ODPSClient.ODPSClient()
-
-
-def read_day_recall_v2() -> pd.DataFrame:
-    df = pd.read_csv(day_oss)
-    df['cpm_rank'] = df.groupby("type")['score'].rank(method='first', ascending=False).astype(int)
-    df['cpm_rank_2'] = df.groupby("type")['cpm'].rank(method='first', ascending=False).astype(int)
-
-    df['view_rank'] = df.groupby("type")['view_rate'].rank(method='first', ascending=False).astype(int)
-
-    df['day_recall_v2'] = np.where(
-        ((df['type'] == '14d') & ((df['cpm_rank'] <= 30) | (df['cpm_rank_2'] <= 20) | (df['view_rank'] <= 30))) |
-        ((df['type'] == '3d') & ((df['cpm_rank'] <= 50) | (df['cpm_rank_2'] <= 30) | (df['view_rank'] <= 50))) |
-        ((df['type'] == '1d') & ((df['cpm_rank'] <= 80) | (df['cpm_rank_2'] <= 50) | (df['view_rank'] <= 100))),
-        True,
-        False
+# -*- coding: utf-8 -*-
+import argparse
+import json
+from datetime import datetime
+
+import pytz
+import requests
+
+server_robot = {
+    'webhook': 'https://open.feishu.cn/open-apis/bot/v2/hook/926982f5-e7af-40f5-81fd-27d8f42718e4',
+}
+
+level_header_template_map = {
+    "info": "turquoise",
+    "error": "red",
+    "warn": "yellow"
+}
+
+level_header_title_content_map = {
+    "info": "广告模型自动更新通知",
+    "error": "广告模型自动更新告警",
+    "warn": "广告模型自动更新告警"
+}
+
+level_task_status_map = {
+    "info": "任务执行成功",
+    "error": "任务执行失败",
+    "warn": "任务执行失败",
+}
+
+
+def send_card_msg_to_feishu(webhook, card_json):
+    """发送消息到飞书"""
+    headers = {'Content-Type': 'application/json'}
+    payload_message = {
+        "msg_type": "interactive",
+        "card": card_json
+    }
+    print(f"推送飞书消息内容: {json.dumps(payload_message)}")
+    response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
+    print(response.text)
+
+
+def timestamp_format(timestamp: str) -> str:
+    try:
+        return (datetime.utcfromtimestamp(int(timestamp))
+                .replace(tzinfo=pytz.UTC)
+                .astimezone(pytz.timezone('Asia/Shanghai'))
+                .strftime('%Y-%m-%d %H:%M:%S')
+                )
+    except ValueError as e:
+        return timestamp
+
+
+def train_info_parse(train_info_file: str) -> dict:
+    train_info = {}
+    with open(train_info_file, 'r') as f:
+        for line in f:
+            split = line.split(":")
+            if len(split) == 2:
+                key = split[0].strip()
+                value = split[1].strip()
+                train_info[key] = value
+    return train_info
+
+
+def seconds_convert(seconds):
+    hours = seconds // 3600
+    minutes = (seconds % 3600) // 60
+    seconds = seconds % 60
+    return f"{hours}小时 {minutes}分钟 {seconds}秒"
+
+
+def join_end_time(msg_text: str, train_info: dict) -> str:
+    if "结束时间" in train_info:
+        msg_text = f"{msg_text}" \
+                   f"\n- 结束时间: {timestamp_format(train_info['结束时间'])}"
+    return msg_text
+
+
+def join_start_time(msg_text: str, train_info: dict) -> str:
+    if "开始时间" in train_info:
+        msg_text = f"{msg_text}" \
+                   f"\n- 开始时间: {timestamp_format(train_info['开始时间'])}"
+    return msg_text
+
+def join_running_time(msg_text: str, train_info: dict) -> str:
+
+    return msg_text
+
+def _monitor(model_train_info: str):
+    """消息推送"""
+    msg_text = f"- 当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
+    train_info = train_info_parse(model_train_info)
+    start = train_info['开始时间']
+    result = train_info['广告模型自动更新任务结果']
+    level = "error" if result == '失败' else "info"
+
+    card_json = {
+        "schema": "2.0",
+        "header": {
+            "title": {
+                "tag": "plain_text",
+                "content": level_header_title_content_map[level]
+            },
+            "template": level_header_template_map[level]
+        },
+        "body": {
+            "elements": [
+                {
+                    "tag": "markdown",
+                    "content": msg_text,
+                    "text_align": "left",
+                    "text_size": "normal",
+                    "element_id": "overview"
+                }
+            ]
+        }
+    }
+    # if top10 is not None and len(top10) > 0:
+    #     collapsible_panel = {
+    #         "tag": "collapsible_panel",
+    #         "header": {
+    #             "title": {
+    #                 "tag": "markdown",
+    #                 "content": "**Top10差异详情**"
+    #             },
+    #             "vertical_align": "center",
+    #             "padding": "4px 0px 4px 8px"
+    #         },
+    #         "border": {
+    #             "color": "grey",
+    #             "corner_radius": "5px"
+    #         },
+    #         "element_id": "detail",
+    #         "elements": [
+    #             {
+    #                 "tag": "markdown",
+    #                 "content": top10.replace("\\n", "\n").replace("\\t", "\t"),
+    #                 "element_id": "Top10CID"
+    #             }
+    #         ]
+    #     }
+    #     card_json['body']['elements'].append(collapsible_panel)
+
+    send_card_msg_to_feishu(
+        webhook=server_robot.get('webhook'),
+        card_json=card_json
     )
-    df.to_csv("/Users/zhao/Desktop/3.csv", index=False)
-
-    grouped_df = (
-        df.groupby('cid', as_index=False)  # 按 CID 分组
-        .agg(day_recall_v2=('day_recall_v2', 'any'))  # 只要有一个为 True,就为 True
-    )
-
-    return grouped_df
-
-
-def read_day_recall() -> pd.DataFrame:
-    df = pd.read_csv(day_oss)
-    df['cpm_rank'] = df.groupby("type")['score'].rank(method='first', ascending=False).astype(int)
-
-    df['view_rank'] = df.groupby("type")['view_rate'].rank(method='first', ascending=False).astype(int)
-
-    df['day_recall_v1'] = np.where(
-        ((df['type'] == '14d') & ((df['cpm_rank'] <= 30) | (df['view_rank'] <= 20))) |
-        ((df['type'] == '3d') & ((df['cpm_rank'] <= 50) | (df['view_rank'] <= 30))) |
-        ((df['type'] == '1d') & ((df['cpm_rank'] <= 80) | (df['view_rank'] <= 50))),
-        True,
-        False
-    )
-
-    df.to_csv("/Users/zhao/Desktop/2.csv", index=False)
-
-    grouped_df = (
-        df.groupby('cid', as_index=False)  # 按 CID 分组
-        .agg(day_recall_v1=('day_recall_v1', 'any'))  # 只要有一个为 True,就为 True
-    )
-
-    return grouped_df
-
-
-def _main():
-    day_recall = read_day_recall()
-
-    day_recall_v2 = read_day_recall_v2()
-
-    recall = pd.read_csv(recall_result)
-    recall['base_diff'] = recall['产品-17-前五组'] - recall['产品-35-前五组']
-
-    recall = (pd.merge(recall, day_recall, on='cid', how='left')
-              .merge(day_recall_v2, on='cid', how='left'))
-
-    recall.to_csv("/Users/zhao/Desktop/1.csv", index=False)
-
-    print(recall)
 
 
 if __name__ == '__main__':
-    _main()
+    parser = argparse.ArgumentParser(description='告警Utils')
+    parser.add_argument("--tif", type=str, help='模型模型过程记录文件', required=True)
+    args = parser.parse_args()
+
+    _monitor(model_train_info=args.tif)

+ 21 - 14
vov/data_download.py

@@ -1,3 +1,5 @@
+import math
+
 import pandas as pd
 
 from client import ODPSClient
@@ -66,26 +68,30 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
 
 
 def ad_download():
-    # 日期列表
-    date_list = ["20241123", "20241124", "20241125", "20241126", "20241127", "20241128"]
+    batch_size = 10000
+    # 计算总页数
+    total_pages = math.ceil(7633541 / batch_size)
+    print(f"总页数: {total_pages}")
     # 最大线程数
     max_workers = 24
     # SQL 文件路径
-    sql_file_path = "/Users/zhao/Desktop/广告分析.sql"
+    sql_file_path = "/Users/zhao/Desktop/tzld/ad/人群选择/v1.sql"
     # 线程池
     with ThreadPoolExecutor(max_workers=max_workers) as executor:
-        # 存储任务
-        future_tasks = {}
+        for page in range(total_pages):
+            offset = page * batch_size
+            print(f"正在下载第 {page + 1}/{total_pages} 页,记录范围: {offset} - {offset + batch_size}")
+
+            # 存储任务
+            future_tasks = {}
 
-        for date in date_list:
             params = {
-                "bizdate": date,
-                "hh": "23",
-                "hhl": "00",
-                "filter_id": "XXXXXXXX",
-                "apptype": "3,36,6,17"
+                "start_bizdate": "20240909",
+                "end_bizdate": "20241208",
+                "offset": str(offset),
+                "size": str(batch_size),
             }
-            result_file_path = f"/Users/zhao/Desktop/{date}.csv"
+            result_file_path = f"/Users/zhao/Desktop/{page}.csv"
 
             # 提交任务
             future = executor.submit(
@@ -102,9 +108,10 @@ def ad_download():
             try:
                 # 获取任务执行结果
                 future.result()
-                print(f"Completed: {result_file_path} for date {params['bizdate']}")
+                print(f"Completed: {result_file_path} for date {page}")
             except Exception as exc:
-                print(f"Error: {exc} for date {params['bizdate']}")
+                print(f"Error: {exc} for date {page}")
+    print("数据下载完成。")
 
 
 def _main():