Browse Source

feat:修改目录

zhaohaipeng 5 months ago
parent
commit
94725765a0
9 changed files with 413 additions and 389 deletions
  1. 37 0
      client/OSSClient.py
  2. 109 0
      helper/MySQLHelper.py
  3. 0 6
      model/auc.py
  4. 245 0
      model/crowd_choose_offline_check.py
  5. 2 1
      model/feature.py
  6. 0 198
      model/model_predict_analyse_20241101.py
  7. 20 18
      script/data_download.py
  8. 0 9
      script/t.py
  9. 0 157
      t.py

+ 37 - 0
client/OSSClient.py

@@ -0,0 +1,37 @@
+import logging
+
+import oss2
+from oss2.credentials import StaticCredentialsProvider
+
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+
+
+class OSSClient(object):
+    def __init__(self, access_key, access_secret, endpoint, region):
+        print(f"oss.version: {oss2.__version__}")
+        self.access_key = access_key
+        self.access_secret = access_secret
+        self.endpoint = endpoint
+        self.region = region
+        self.auth = oss2.ProviderAuthV4(StaticCredentialsProvider(self.access_key, self.access_secret))
+
+    def download_file(self, bucket_name, oss_file_path, local_file_path):
+        bucket = self._get_bucket(bucket_name)
+        bucket.get_object_to_file(oss_file_path, local_file_path)
+
+    def file_is_exist(self, bucket_name, oss_file_path) -> bool:
+        """
+        检查阿里云 OSS 文件是否存在。
+        :param bucket_name: OSS Bucket 名称
+        :param oss_file_path: 文件路径,例如 'folder/file.txt'
+        :return: 布尔值,表示文件是否存在
+        """
+        try:
+            bucket = self._get_bucket(bucket_name)
+            exists = bucket.object_exists(oss_file_path)
+            return exists
+        except Exception as e:
+            return False
+
+    def _get_bucket(self, bucket_name: str):
+        return oss2.Bucket(auth=self.auth, endpoint=self.endpoint, bucket_name=bucket_name, region=self.region)

+ 109 - 0
helper/MySQLHelper.py

@@ -0,0 +1,109 @@
+import logging
+
+import pymysql
+import pymysql.cursors
+
+
+class MySQLHelper:
+    def __init__(self, host, username, password, database, **kwargs):
+        """
+        初始化 MySQLHelper 实例。
+
+        :param host: 数据库地址
+        :param username: 数据库用户名
+        :param password: 数据库密码
+        :param database: 数据库名称
+        :param kwargs: 可选其他数据库参数(如端口、超时等)
+        """
+        self.connection_args = {
+            'host': host,
+            'user': username,
+            'password': password,
+            'database': database,
+            'charset': 'utf8mb4',
+            'cursorclass': pymysql.cursors.DictCursor,
+        }
+        self.connection_args.update(kwargs)
+        self.conn = None
+        self._connect()
+        logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
+
+    def _connect(self):
+        """建立数据库连接,支持断开后自动重连。"""
+        try:
+            if self.conn is None or not self.conn.open:
+                self.conn = pymysql.connect(**self.connection_args)
+                logging.info("成功连接到数据库。")
+        except pymysql.MySQLError as e:
+            logging.error(f"数据库连接失败: {e}")
+            raise
+
+    def execute_query(self, sql: str, params: tuple = None) -> list[dict]:
+        """
+        执行 SELECT 查询。
+
+        :param sql: 查询的 SQL 语句
+        :param params: 可选的 SQL 参数
+        :return: 查询结果的列表,每条记录为字典
+        """
+        self._connect()
+        try:
+            with self.conn.cursor() as cursor:
+                cursor.execute(sql, params)
+                results = cursor.fetchall()
+                logging.info(f"成功执行查询: {sql}")
+                return results
+        except pymysql.MySQLError as e:
+            logging.error(f"查询失败: {e} -> SQL: {sql}")
+            raise
+
+    def execute_update(self, sql: str, params: tuple = None) -> int:
+        """
+        执行 INSERT/UPDATE/DELETE 操作。
+
+        :param sql: 执行的 SQL 语句
+        :param params: 可选的 SQL 参数
+        :return: 受影响的行数
+        """
+        self._connect()
+        try:
+            with self.conn.cursor() as cursor:
+                cursor.execute(sql, params)
+                self.conn.commit()
+                logging.info(f"成功执行更新: {sql}")
+                return cursor.rowcount
+        except pymysql.MySQLError as e:
+            self.conn.rollback()
+            logging.error(f"更新失败: {e} -> SQL: {sql}")
+            raise
+
+    def execute_many(self, sql: str, param_list: list[tuple]) -> int:
+        """
+        批量执行 SQL 语句。
+
+        :param sql: 执行的 SQL 语句
+        :param param_list: 参数列表,每个元素是参数元组
+        :return: 受影响的总行数
+        """
+        self._connect()
+        try:
+            with self.conn.cursor() as cursor:
+                rowcount = cursor.executemany(sql, param_list)
+                self.conn.commit()
+                logging.info(f"成功批量执行 SQL,共影响 {rowcount} 行。")
+                return rowcount
+        except pymysql.MySQLError as e:
+            self.conn.rollback()
+            logging.error(f"批量操作失败: {e} -> SQL: {sql}")
+            raise
+
+    def close(self):
+        """关闭数据库连接。"""
+        if self.conn and self.conn.open:
+            self.conn.close()
+            logging.info("数据库连接已关闭。")
+
+    def __del__(self):
+        """析构时确保连接关闭。"""
+        self.close()
+

+ 0 - 6
model/auc.py

@@ -1,6 +0,0 @@
-import os
-import sys
-
-if __name__ == '__main__':
-    print("a b c")
-    print(os.environ.get("PREDICT_CACHE_PATH"))

+ 245 - 0
model/crowd_choose_offline_check.py

@@ -0,0 +1,245 @@
+# This is a sample Python script.
+
+# Press ⌃R to execute it or replace it with your code.
+# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
+import hashlib
+import json
+import os
+import subprocess
+import time
+import traceback
+import urllib
+import datetime
+import requests
+import pymysql
+from contextlib import contextmanager
+
+from sqlalchemy import create_engine, Numeric, Float
+from sqlalchemy.orm import sessionmaker
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy import Column, Integer, String, DateTime, Text
+import ssl
+ssl._create_default_https_context = ssl._create_unverified_context
+HOST = 'rm-bp1nx318263k95yo3318.mysql.rds.aliyuncs.com'
+PORT = '3306'
+DATABASE = 'uservideo_bi'
+USERNAME = 'majin'
+PASSWORD = 'E5d2c960fdf3f5f0be5a27eea2f906ef'
+DB_URI = "mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,
+                                                                                            password=PASSWORD,
+                                                                                            host=HOST, port=PORT,
+                                                                                            db=DATABASE)
+
+# HOST = 'rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com'
+# PORT = '3306'
+# DATABASE = 'mpad'
+# USERNAME = 'majin'
+# PASSWORD = 'e5d2c960fdf3f5f0be5a27eea2f906ef'
+# DB_URI = "mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,
+#                                                                                         password=PASSWORD,
+#                                                                                         host=HOST, port=PORT,
+#                                                                                         db=DATABASE)
+
+
+Base = declarative_base()
+
+class WECHAT_AD_PUBLISHER_ADUNIT_GENERAL(Base):
+    __tablename__ = 'wechat_ad_publisher_adunit_general'
+    id = Column(Integer, primary_key=True)
+    ad_unit_id = Column(String(1000))
+    ad_unit_name = Column(String(1000))
+    ad_slot = Column(String(1000))
+    click_count = Column(Integer, default=0)
+    click_rate = Column(Float, default=0.0)
+    date = Column(String(1000))
+    ecpm = Column(String(1000))
+    exposure_count = Column(Integer, default=0)
+    exposure_rate = Column(Float, default=0.0)
+    income = Column(Integer, default=0)
+    req_succ_count = Column(Integer, default=0)
+    app_type = Column(Integer, default=0)
+    slot_str = Column(String(1000))
+    date_str = Column(String(1000))
+
+    def __init__(self):
+        print(f"AUNIT_GENERAL.init app_type = {self.app_type}, ad_unit_id = {self.ad_unit_id}")
+
+    def __repr__(self):
+        return '<WECHAT_AD_PUBLISHER_ADUNIT_GENERAL %r>' % self.ad_unit_id
+
+class WECHAT_AD_PUBLISHER_ADPOS_GENERAL(Base):
+    __tablename__ = 'wechat_ad_publisher_adpos_general'
+    id = Column(Integer, primary_key=True)
+    ad_slot = Column(String(1000))
+    click_count = Column(Integer, default=0)
+    click_rate = Column(Float, default=0.0)
+    date = Column(String(1000))
+    ecpm = Column(String(1000))
+    exposure_count = Column(Integer, default=0)
+    exposure_rate = Column(Float, default=0.0)
+    income = Column(Integer, default=0)
+    req_succ_count = Column(Integer, default=0)
+    app_type = Column(Integer, default=0)
+    slot_str = Column(String(1000))
+    date_str = Column(String(1000))
+
+    def __init__(self):
+        print(f"ADPOS_GENERAL.init app_type = {self.app_type}, ad_slot = {self.ad_slot}")
+
+    def __repr__(self):
+        return '<wechat_ad_publisher_adpos_general %r>' % self.ad_slot
+
+
+@contextmanager
+def session_maker(session=None, session_factory=None):
+    try:
+        if session_factory is None:
+            engine = create_engine(DB_URI)
+            session_factory = sessionmaker(bind=engine)
+        if session is None:
+            session = session_factory()
+        yield session
+    except:
+        session.rollback()
+        raise
+    else:
+        session.commit()
+        # logger.debug('session.commit(){}'.format(session))
+    finally:
+        session.close()
+        # logger.debug('session.close(){}'.format(session))
+
+def add_ad_data(data, app_type):
+    # Use a breakpoint in the code line below to debug your script.
+    print(f'Hi, add_ad_data.app_type = {app_type}, data = {data}')  # Press ⌘F8 to toggle the breakpoint.
+    stat_item = data['stat_item']
+    try:
+        with session_maker() as session:
+            wechat_ad_publisher_adunit_general = WECHAT_AD_PUBLISHER_ADUNIT_GENERAL()
+            wechat_ad_publisher_adunit_general.ad_unit_id = data['ad_unit_id']
+            wechat_ad_publisher_adunit_general.ad_unit_name = data['ad_unit_name']
+            wechat_ad_publisher_adunit_general.ad_slot = stat_item['ad_slot']
+            wechat_ad_publisher_adunit_general.click_count = stat_item['click_count']
+            wechat_ad_publisher_adunit_general.click_rate = stat_item['click_rate']
+            wechat_ad_publisher_adunit_general.date = stat_item['date']
+            wechat_ad_publisher_adunit_general.ecpm = stat_item['ecpm']
+            wechat_ad_publisher_adunit_general.exposure_count = stat_item['exposure_count']
+            wechat_ad_publisher_adunit_general.exposure_rate = stat_item['exposure_rate']
+            wechat_ad_publisher_adunit_general.income = stat_item['income']
+            wechat_ad_publisher_adunit_general.req_succ_count = stat_item['req_succ_count']
+            wechat_ad_publisher_adunit_general.slot_str = stat_item['slot_str']
+            wechat_ad_publisher_adunit_general.date_str = stat_item['date'].replace('-','')
+            wechat_ad_publisher_adunit_general.app_type = app_type
+            session.add(wechat_ad_publisher_adunit_general)
+            print(f'add_ad_data is OK!; app_type = {app_type}')
+    except Exception as e:
+        traceback.print_exc()
+        print(f"add_ad_data error: app_type = {app_type}; traceback.format_exc = {traceback.format_exc()}")
+
+def add_ad_adpos_data(stat_item, app_type):
+    # Use a breakpoint in the code line below to debug your script.
+    print(f'Hi, add_ad_adpos_data.app_type = {app_type}, stat_time = {stat_item}')  # Press ⌘F8 to toggle the breakpoint.
+    try:
+        with session_maker() as session:
+            wechat_ad_publisher_adpos_general = WECHAT_AD_PUBLISHER_ADPOS_GENERAL()
+            wechat_ad_publisher_adpos_general.ad_slot = stat_item['ad_slot']
+            wechat_ad_publisher_adpos_general.click_count = stat_item['click_count']
+            wechat_ad_publisher_adpos_general.click_rate = stat_item['click_rate']
+            wechat_ad_publisher_adpos_general.date = stat_item['date']
+            wechat_ad_publisher_adpos_general.ecpm = stat_item['ecpm']
+            wechat_ad_publisher_adpos_general.exposure_count = stat_item['exposure_count']
+            wechat_ad_publisher_adpos_general.exposure_rate = stat_item['exposure_rate']
+            wechat_ad_publisher_adpos_general.income = stat_item['income']
+            wechat_ad_publisher_adpos_general.req_succ_count = stat_item['req_succ_count']
+            wechat_ad_publisher_adpos_general.slot_str = stat_item['slot_str']
+            wechat_ad_publisher_adpos_general.date_str = stat_item['date'].replace('-','')
+            wechat_ad_publisher_adpos_general.app_type = app_type
+            session.add(wechat_ad_publisher_adpos_general)
+            print(f'add_ad_adpos_data is OK; app_type = {app_type}')
+    except Exception as e:
+        traceback.print_exc()
+        print(f"add_ad_adpos_data error: app_type = {app_type}; traceback.format_exc = {traceback.format_exc()}")
+
+
+
+def post_inform(url, content_text):
+    url = url
+    data = json.dumps(content_text)
+    data = bytes(data, 'utf8')
+    print(f"post_inform data = {data}")
+    headers = {"Content-Type": 'application/json'}
+    req = urllib.request.Request(url=url, headers=headers, data=data)
+    try:
+        resp = urllib.request.urlopen(req, timeout=10).read()
+        print(f"post_inform resp = {resp.decode('utf-8')}")
+        return resp.decode('utf-8')
+    except Exception as e:
+        print(e)
+
+def get_inform(url):
+    url = url
+    headers = {"Content-Type": 'application/json'}
+    print(f"get_inform url = {url}")
+    req = urllib.request.Request(url=url, headers=headers)
+    try:
+        resp = urllib.request.urlopen(req, timeout=10).read()
+        print(f"get_inform resp = {resp.decode('utf-8')}")
+        return resp.decode('utf-8')
+    except Exception as e:
+        print(e)
+
+def get_mp_info(app_type):
+    datestr = datetime.datetime.strftime(datetime.datetime.now() - datetime.timedelta(days=+1), '%Y-%m-%d')
+    print(f"get_mp_info: app_type = {app_type} date = {datestr}")
+    time_str = time.strftime("%Y:%m:%d %H")
+    print(f"get_mp_info: app_type= {app_type} time = {time_str}")
+    md5 = hashlib.md5('{}'.format(time_str).encode(encoding='UTF-8')).hexdigest()
+    print(f"get_mp_info: app_type = {app_type} md5 = {md5}")
+
+    getliveaccesstoken_url = "https://longvideoapi.piaoquantv.com/longvideoapi/weixin/getWxAccessToken/{}".format(app_type)
+    print(f"get_mp_info getliveaccesstoken_url = {getliveaccesstoken_url}")
+    ret = get_inform(getliveaccesstoken_url)
+    data = json.loads(ret).get('data',{})
+    print(f"get_mp_info app_type = {app_type} getWxAccessToken date = {data}")
+    with session_maker() as session:
+        task = session.query(WECHAT_AD_PUBLISHER_ADUNIT_GENERAL).filter_by(date=datestr,app_type=app_type).first()
+        if task is None:
+            getweanalysisappiddailyvisittrend_url = 'https://api.weixin.qq.com/publisher/stat?action=publisher_adunit_general&access_token={}&page=1&page_size=100&start_date={}&end_date={}'.format(
+                data, datestr, datestr)
+            print(f"get_mp_info app_type = {app_type} publisher/stat adunit = {getweanalysisappiddailyvisittrend_url}")
+            ret = get_inform(getweanalysisappiddailyvisittrend_url)
+            print(f"get_mp_info app_type = {app_type} publisher/stat adunit result = {ret}")
+            list = json.loads(ret).get('list',[])
+            for item in list:
+                add_ad_data(item, app_type)
+
+        task = session.query(WECHAT_AD_PUBLISHER_ADPOS_GENERAL).filter_by(date=datestr, app_type=app_type).first()
+        if task is None:
+            getweanalysisappiddailyvisittrend_url = 'https://api.weixin.qq.com/publisher/stat?action=publisher_adpos_general&access_token={}&page=1&page_size=100&start_date={}&end_date={}'.format(
+                data, datestr, datestr)
+            print(f"get_mp_info app_type = {app_type} publisher/stat adops = {getweanalysisappiddailyvisittrend_url}")
+            ret = get_inform(getweanalysisappiddailyvisittrend_url)
+            print(f"get_mp_info app_type = {app_type} publisher/stat adops result = {ret}")
+            list = json.loads(ret).get('list',[])
+            for item in list:
+                add_ad_adpos_data(item, app_type)
+            summary = json.loads(ret)['summary']
+            summary['ad_slot'] = 'SLOT_ID_WEAPP_ALL'
+            summary['date'] = datestr
+            summary['slot_str'] = 'summary'
+            add_ad_adpos_data(summary, app_type)
+
+# Press the green button in the gutter to run the script.
+if __name__ == '__main__':
+    app_type_list = [0,2,3,4,5,6,17,18,19,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36]
+    # app_type_list = [2,23,24,25]
+    for app_type in app_type_list:
+        print(f"start app_type = {app_type}")
+        try:
+            get_mp_info(app_type)
+        except Exception as e:
+            print(f"app_type {app_type} get data error: {traceback.format_exc()}")
+        print(f"end app_type = {app_type}")
+        print("")
+# See PyCharm help at https://www.jetbrains.com/help/pycharm/
+

+ 2 - 1
model/feature.py

@@ -1,5 +1,6 @@
 import glob
 import os.path
+from datetime import timedelta
 
 import numpy as np
 import pandas as pd
@@ -391,7 +392,7 @@ def _multi_importance_flat_map(importance_map: dict):
 
 
 def _main():
-    model_path = "/Users/zhao/Desktop/tzld/XGB/35_ad_model"
+    model_path = "/Users/zhao/Desktop/tzld/ad/model"
     all_model = glob.glob(f"{model_path}/*")
     model_dict = {}
     for e in all_model:

+ 0 - 198
model/model_predict_analyse_20241101.py

@@ -1,198 +0,0 @@
-# import argparse
-# import gzip
-# import os.path
-# from collections import OrderedDict
-#
-# import pandas as pd
-# from hdfs import InsecureClient
-#
-# client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
-#
-# SEGMENT_BASE_PATH = os.environ.get("SEGMENT_BASE_PATH", "/dw/recommend/model/36_score_calibration_file")
-# PREDICT_CACHE_PATH = os.environ.get("PREDICT_CACHE_PATH", "/root/zhaohp/XGB/predict_cache")
-#
-#
-# def read_predict_from_local_txt(txt_file) -> list:
-#     result = []
-#     with open(txt_file, "r") as f:
-#         for line in f.readlines():
-#             sp = line.replace("\n", "").split("\t")
-#             if len(sp) == 4:
-#                 label = int(sp[0])
-#                 cid = sp[3].split("_")[0]
-#                 score = float(sp[2].replace("[", "").replace("]", "").split(",")[1])
-#                 result.append({
-#                     "label": label,
-#                     "cid": cid,
-#                     "score": score
-#                 })
-#     return result
-#
-#
-# def read_predict_from_hdfs(hdfs_path: str) -> list:
-#     if not hdfs_path.endswith("/"):
-#         hdfs_path += "/"
-#     result = []
-#     for file in client.list(hdfs_path):
-#         with client.read(hdfs_path + file) as reader:
-#             with gzip.GzipFile(fileobj=reader, mode="rb") as gz_file:
-#                 for line in gz_file.read().decode("utf-8").split("\n"):
-#                     split = line.split("\t")
-#                     if len(split) == 4:
-#                         cid = split[3].split("_")[0]
-#                         label = int(split[0])
-#                         score = float(split[2].replace("[", "").replace("]", "").split(",")[1])
-#                         result.append({
-#                             "cid": cid,
-#                             "label": label,
-#                             "score": score
-#                         })
-#
-#     return result
-#
-#
-# def _segment_v1(scores, step):
-#     bins = []
-#     for i in range(0, len(scores), int((len(scores) / step))):
-#         if i == 0:
-#             bins.append(0)
-#         else:
-#             bins.append(scores[i])
-#     bins.append(1)
-#     return list(OrderedDict.fromkeys(bins))
-#
-#
-# def segment_calc_diff_rate_by_score(df: pd.DataFrame, segment_file_path: str, step=100) -> [pd.DataFrame, pd.DataFrame]:
-#     sored_df = df.sort_values(by=['score'])
-#     # 评估分数分段
-#     scores = sored_df['score'].values
-#
-#     bins = _segment_v1(scores, step)
-#
-#     # 等分分桶
-#     # split_indices = np.array_split(np.arange(len(scores)), step)
-#     # bins = [scores[index[0]] for index in split_indices] + [scores[split_indices[-1][-1]]]
-#
-#     sored_df['score_segment'] = pd.cut(sored_df['score'], bins=bins)
-#
-#     # 计算分段内分数的差异
-#     group_df = sored_df.groupby("score_segment", observed=True).agg(
-#         segment_label_sum=('label', 'sum'),
-#         segment_label_cnt=('label', 'count'),
-#         segment_score_avg=('score', 'mean'),
-#     ).reset_index()
-#     group_df['segment_true_score'] = group_df['segment_label_sum'] / group_df['segment_label_cnt']
-#     group_df['segment_diff_rate'] = (group_df['segment_score_avg'] / group_df['segment_true_score'] - 1).mask(group_df['segment_true_score'] == 0, 0)
-#
-#     # 完整的分段文件保存
-#     csv_data = group_df.to_csv(sep="\t", index=False)
-#     with client.write(segment_file_path, encoding='utf-8', overwrite=True) as writer:
-#         writer.write(csv_data)
-#
-#     filtered_df = group_df[(abs(group_df['segment_diff_rate']) >= 0.2) & (group_df['segment_label_cnt'] >= 1000)]
-#     filtered_df = filtered_df[['score_segment', 'segment_diff_rate']]
-#     # 每条曝光数据添加对应分数的diff
-#     merged_df = pd.merge(sored_df, filtered_df, on="score_segment", how="left")
-#
-#     merged_df['segment_diff_rate'] = merged_df['segment_diff_rate'].fillna(0)
-#     return merged_df, filtered_df
-#
-#
-# def read_and_calibration_predict(predict_path: str) -> [pd.DataFrame, pd.DataFrame, pd.DataFrame]:
-#     """
-#     读取评估结果,并进行校准
-#     """
-#     # 本地调试使用
-#     # predicts = read_predict_from_local_txt(predict_path)
-#     predicts = read_predict_from_hdfs(predict_path)
-#     df = pd.DataFrame(predicts)
-#
-#     # 模型分分段计算与真实ctcvr的dff_rate
-#     predict_basename = os.path.basename(predict_path)
-#     if predict_basename.endswith("/"):
-#         predict_basename = predict_basename[:-1]
-#     df, segment_df = segment_calc_diff_rate_by_score(df, segment_file_path=f"{SEGMENT_BASE_PATH}/{predict_basename}.txt", step=100)
-#
-#     # 生成校准后的分数
-#     df['score_2'] = df['score'] / (1 + df['segment_diff_rate'])
-#
-#     # 按CID统计真实ctcvr和校准前后的平均模型分
-#     grouped_df = df.groupby("cid").agg(
-#         view=('cid', 'size'),
-#         conv=('label', 'sum'),
-#         score_avg=('score', lambda x: round(x.mean(), 6)),
-#         score_2_avg=('score_2', lambda x: round(x.mean(), 6)),
-#     ).reset_index()
-#     grouped_df['true_ctcvr'] = grouped_df['conv'] / grouped_df['view']
-#
-#     return df, grouped_df, segment_df
-#
-#
-# def predict_local_save_for_auc(old_df: pd.DataFrame, new_df: pd.DataFrame):
-#     """
-#     本地保存一份评估结果, 计算AUC使用
-#     """
-#     d = {"old": old_df, "new": new_df}
-#     for key in d:
-#         df = d[key][['label', "score"]]
-#         df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_1.txt", sep="\t", index=False, header=False)
-#         df = d[key][['label', "score_2"]]
-#         df.to_csv(f"{PREDICT_CACHE_PATH}/{key}_2.txt", sep="\t", index=False, header=False)
-#
-#
-# def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str):
-#     old_df, old_group_df, old_segment_df = read_and_calibration_predict(old_predict_path)
-#     new_df, new_group_df, new_segment_df = read_and_calibration_predict(new_predict_path)
-#
-#     predict_local_save_for_auc(old_df, new_df)
-#
-#     # 分段文件保存, 此处保留的最后使用的分段文件,不是所有的分段
-#     new_segment_df.to_csv(calibration_file, sep='\t', index=False, header=False)
-#
-#     # 字段重命名,和列过滤
-#     old_group_df.rename(columns={'score_avg': 'old_score_avg', 'score_2_avg': 'old_score_2_avg'}, inplace=True)
-#     new_group_df.rename(columns={'score_avg': 'new_score_avg', 'score_2_avg': 'new_score_2_avg'}, inplace=True)
-#     old_group_df = old_group_df[['cid', 'view', 'conv', 'true_ctcvr', 'old_score_avg', 'old_score_2_avg']]
-#     new_group_df = new_group_df[['cid', 'new_score_avg', 'new_score_2_avg']]
-#
-#     merged = pd.merge(old_group_df, new_group_df, on='cid', how='left')
-#
-#     # 计算与真实ctcvr的差异值
-#     merged["(new-true)/true"] = (merged['new_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
-#     merged["(old-true)/true"] = (merged['old_score_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
-#
-#     # 计算校准后的模型分与ctcvr的差异值
-#     merged["(new2-true)/true"] = (merged['new_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
-#     merged["(old2-true)/true"] = (merged['old_score_2_avg'] / merged['true_ctcvr'] - 1).mask(merged['true_ctcvr'] == 0, 0)
-#
-#     # 按照曝光排序,写入本地文件
-#     merged = merged.sort_values(by=['view'], ascending=False)
-#     merged = merged[[
-#         'cid', 'view', "conv", "true_ctcvr",
-#         "old_score_avg", "new_score_avg", "(old-true)/true", "(new-true)/true",
-#         "old_score_2_avg", "new_score_2_avg", "(old2-true)/true", "(new2-true)/true",
-#     ]]
-#
-#     # 根据文件名保存不同的格式
-#     if analyse_file.endswith(".csv"):
-#         merged.to_csv(analyse_file, index=False)
-#     else:
-#         with open(analyse_file, "w") as writer:
-#             writer.write(merged.to_string(index=False))
-#     print("0")
-#
-#
-# if __name__ == '__main__':
-#     parser = argparse.ArgumentParser(description="model_predict_analyse_20241101.py")
-#     parser.add_argument("-op", "--old_predict_path", required=True, help="老模型评估结果")
-#     parser.add_argument("-np", "--new_predict_path", required=True, help="新模型评估结果")
-#     parser.add_argument("-af", "--analyse_file", required=True, help="最后计算结果的保存路径")
-#     parser.add_argument("-cf", "--calibration_file", required=True, help="线上使用的segment文件保存路径")
-#     args = parser.parse_args()
-#
-#     _main(
-#         old_predict_path=args.old_predict_path,
-#         new_predict_path=args.new_predict_path,
-#         calibration_file=args.calibration_file,
-#         analyse_file=args.analyse_file
-#     )

+ 20 - 18
vov/data_download.py → script/data_download.py

@@ -1,4 +1,3 @@
-import math
 import time
 from concurrent.futures import ThreadPoolExecutor, as_completed
 from typing import Callable, Sequence
@@ -47,23 +46,28 @@ def process_tasks(tasks: Sequence[Callable[[], None]], max_workers: int) -> None
 
 
 def ad_download() -> None:
-    batch_size = 10000
-    total_records = 6731154
-    max_workers = 100
-    sql_file_path = "/Users/zhao/Desktop/tzld/ad/人群选择/v1.sql"
-
-    def create_task(task_index: int) -> Callable[[], None]:
+    max_workers = 24
+    sql_file_path = "/Users/zhao/Desktop/tzld/ad/sql/特征平均值.sql"
+    dts = ["20241206",
+           "20241207",
+           "20241208",
+           "20241209",
+           "20241210",
+           "20241211",
+           "20241212",
+           "20241213",
+           "20241214",
+           "20241215",
+           "20241216"]
+
+    def create_task(dt: str) -> Callable[[], None]:
         def task() -> None:
-            offset = task_index * batch_size
             params = {
-                "start_bizdate": "20240909",
-                "end_bizdate": "20241208",
-                "l_bizdate": "20241209",
-                "offset": str(offset),
-                "size": str(batch_size),
+                "dt_1": dt,
+                "dt_2": dt
             }
-            result_file_path = f"/Users/zhao/Desktop/tzld/ad/人群选择/csv/{task_index}.csv"
-            print(f"准备任务: {task_index + 1} 页,记录范围: {offset} - {offset + batch_size}")
+            result_file_path = f"/Users/zhao/Desktop/tzld/ad/特征/{dt}.csv"
+            print(f"准备任务: {dt}")
             odps_client.execute_sql_file_result_save_fle(
                 sql_file_path,
                 params,
@@ -72,10 +76,8 @@ def ad_download() -> None:
 
         return task
 
-    total_pages = math.ceil(total_records / batch_size)
-    tasks = [create_task(task_index) for task_index in range(total_pages)]
+    tasks = [create_task(dt) for dt in dts]
 
-    print(f"总页数: {total_pages}")
     process_tasks(tasks, max_workers)
     print("数据下载完成。")
 

+ 0 - 9
script/t.py

@@ -1,9 +0,0 @@
-import json
-
-with open("/Users/zhao/Desktop/1.json", 'r') as reader:
-    j = reader.read()
-
-creative_list = json.loads(j)
-for c in creative_list:
-    for item in c['creativeConfigList']:
-        print(item['creativeCode'])

+ 0 - 157
t.py

@@ -1,157 +0,0 @@
-# -*- coding: utf-8 -*-
-import argparse
-import json
-from datetime import datetime
-
-import pytz
-import requests
-
-server_robot = {
-    'webhook': 'https://open.feishu.cn/open-apis/bot/v2/hook/926982f5-e7af-40f5-81fd-27d8f42718e4',
-}
-
-level_header_template_map = {
-    "info": "turquoise",
-    "error": "red",
-    "warn": "yellow"
-}
-
-level_header_title_content_map = {
-    "info": "广告模型自动更新通知",
-    "error": "广告模型自动更新告警",
-    "warn": "广告模型自动更新告警"
-}
-
-level_task_status_map = {
-    "info": "任务执行成功",
-    "error": "任务执行失败",
-    "warn": "任务执行失败",
-}
-
-
-def send_card_msg_to_feishu(webhook, card_json):
-    """发送消息到飞书"""
-    headers = {'Content-Type': 'application/json'}
-    payload_message = {
-        "msg_type": "interactive",
-        "card": card_json
-    }
-    print(f"推送飞书消息内容: {json.dumps(payload_message)}")
-    response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
-    print(response.text)
-
-
-def timestamp_format(timestamp: str) -> str:
-    try:
-        return (datetime.utcfromtimestamp(int(timestamp))
-                .replace(tzinfo=pytz.UTC)
-                .astimezone(pytz.timezone('Asia/Shanghai'))
-                .strftime('%Y-%m-%d %H:%M:%S')
-                )
-    except ValueError as e:
-        return timestamp
-
-
-def train_info_parse(train_info_file: str) -> dict:
-    train_info = {}
-    with open(train_info_file, 'r') as f:
-        for line in f:
-            split = line.split(":")
-            if len(split) == 2:
-                key = split[0].strip()
-                value = split[1].strip()
-                train_info[key] = value
-    return train_info
-
-
-def seconds_convert(seconds):
-    hours = seconds // 3600
-    minutes = (seconds % 3600) // 60
-    seconds = seconds % 60
-    return f"{hours}小时 {minutes}分钟 {seconds}秒"
-
-
-def join_end_time(msg_text: str, train_info: dict) -> str:
-    if "结束时间" in train_info:
-        msg_text = f"{msg_text}" \
-                   f"\n- 结束时间: {timestamp_format(train_info['结束时间'])}"
-    return msg_text
-
-
-def join_start_time(msg_text: str, train_info: dict) -> str:
-    if "开始时间" in train_info:
-        msg_text = f"{msg_text}" \
-                   f"\n- 开始时间: {timestamp_format(train_info['开始时间'])}"
-    return msg_text
-
-def join_running_time(msg_text: str, train_info: dict) -> str:
-
-    return msg_text
-
-def _monitor(model_train_info: str):
-    """消息推送"""
-    msg_text = f"- 当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
-    train_info = train_info_parse(model_train_info)
-    start = train_info['开始时间']
-    result = train_info['广告模型自动更新任务结果']
-    level = "error" if result == '失败' else "info"
-
-    card_json = {
-        "schema": "2.0",
-        "header": {
-            "title": {
-                "tag": "plain_text",
-                "content": level_header_title_content_map[level]
-            },
-            "template": level_header_template_map[level]
-        },
-        "body": {
-            "elements": [
-                {
-                    "tag": "markdown",
-                    "content": msg_text,
-                    "text_align": "left",
-                    "text_size": "normal",
-                    "element_id": "overview"
-                }
-            ]
-        }
-    }
-    # if top10 is not None and len(top10) > 0:
-    #     collapsible_panel = {
-    #         "tag": "collapsible_panel",
-    #         "header": {
-    #             "title": {
-    #                 "tag": "markdown",
-    #                 "content": "**Top10差异详情**"
-    #             },
-    #             "vertical_align": "center",
-    #             "padding": "4px 0px 4px 8px"
-    #         },
-    #         "border": {
-    #             "color": "grey",
-    #             "corner_radius": "5px"
-    #         },
-    #         "element_id": "detail",
-    #         "elements": [
-    #             {
-    #                 "tag": "markdown",
-    #                 "content": top10.replace("\\n", "\n").replace("\\t", "\t"),
-    #                 "element_id": "Top10CID"
-    #             }
-    #         ]
-    #     }
-    #     card_json['body']['elements'].append(collapsible_panel)
-
-    send_card_msg_to_feishu(
-        webhook=server_robot.get('webhook'),
-        card_json=card_json
-    )
-
-
-if __name__ == '__main__':
-    parser = argparse.ArgumentParser(description='告警Utils')
-    parser.add_argument("--tif", type=str, help='模型模型过程记录文件', required=True)
-    args = parser.parse_args()
-
-    _monitor(model_train_info=args.tif)