Browse Source

Merge branch 'feature/20240906-gh-autoreply-strategy' of algorithm/rov-offline into master

fengzhoutian 7 tháng trước cách đây
mục cha
commit
14e5c8c824
5 tập tin đã thay đổi với 411 bổ sung0 xóa
  1. 257 0
      alg_growth_gh_reply_video_v1.py
  2. 26 0
      db_helper.py
  3. 36 0
      my_config.py
  4. 91 0
      my_utils.py
  5. 1 0
      requirements.txt

+ 257 - 0
alg_growth_gh_reply_video_v1.py

@@ -0,0 +1,257 @@
+# -*- coding: utf-8 -*-
+
+import pandas as pd
+import traceback
+import odps
+from odps import ODPS
+from threading import Timer
+from datetime import datetime, timedelta
+from db_helper import MysqlHelper
+from my_utils import check_table_partition_exits_v2, get_dataframe_from_odps, \
+    get_odps_df_of_max_partition, get_odps_instance, get_odps_df_of_recent_partitions
+from my_utils import request_post, send_msg_to_feishu
+from my_config import set_config
+import numpy as np
+from log import Log
+import os
+
+CONFIG, _ = set_config()
+LOGGER = Log()
+
+BASE_GROUP_NAME = 'stg0909-base'
+EXPLORE1_GROUP_NAME = 'stg0909-explore1'
+EXPLORE2_GROUP_NAME = 'stg0909-explore2'
+#TODO: fetch gh_id from external data source
+GH_IDS = ('gh_ac43e43b253b', 'gh_93e00e187787', 'gh_77f36c109fb1', 'gh_68e7fdc09fe4')
+CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center"
+
+ODS_PROJECT = "loghubods"
+EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
+GH_REPLY_STATS_TABLE = 'alg_growth_gh_reply_video_stats'
+ODPS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
+RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
+STATS_PERIOD_DAYS = 3
+SEND_N = 2
+
+def check_data_partition(project, table, data_dt, data_hr=None):
+    """检查数据是否准备好"""
+    try:
+        partition_spec = {'dt': data_dt}
+        if data_hr:
+            partition_spec['hour'] = data_hr
+        part_exist, data_count = check_table_partition_exits_v2(
+            project, table, partition_spec)
+    except Exception as e:
+        data_count = 0
+    return data_count
+
+
+def process_reply_stats(project, table, period, run_dt):
+    # 获取多天即转统计数据用于聚合
+    df = get_odps_df_of_recent_partitions(project, table, period, {'dt': run_dt})
+    df = df.to_pandas()
+
+    df['video_id'] = df['video_id'].astype('int64')
+    df = df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']]
+
+    # 账号内聚合
+    df = df.groupby(['video_id', 'gh_id']).agg({
+        'send_count': 'sum',
+        'first_visit_uv': 'sum',
+        'day0_return': 'sum'
+    }).reset_index()
+
+    # 聚合所有数据作为default
+    default_stats_df = df.groupby('video_id').agg({
+        'send_count': 'sum',
+        'first_visit_uv': 'sum',
+        'day0_return': 'sum'
+    }).reset_index()
+    default_stats_df['gh_id'] = 'default'
+
+    merged_df = pd.concat([df, default_stats_df]).reset_index(drop=True)
+
+    merged_df['score'] = merged_df['day0_return'] / (merged_df['first_visit_uv'] + 1000)
+    return merged_df
+
+
+def rank_for_layer1(run_dt, run_hour, project, table):
+    # TODO: 加审核&退场
+    df = get_odps_df_of_max_partition(project, table, {'dt': run_dt})
+    df = df.to_pandas()
+    # 确保重跑时可获得一致结果
+    dt_version = f'{run_dt}{run_hour}'
+    np.random.seed(int(dt_version)+1)
+
+    # TODO: 修改权重计算策略
+    sample_weights = df['rov']
+
+    sampled_df = df.sample(n=SEND_N, weights=sample_weights)
+    sampled_df['sort'] = range(1, len(sampled_df) + 1)
+    sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
+    sampled_df['dt_version'] = dt_version
+
+    gh_name_df = pd.DataFrame({'gh_id': GH_IDS + ('default', )})
+    sampled_df['_tmpkey'] = 1
+    gh_name_df['_tmpkey'] = 1
+    extend_df = sampled_df.merge(gh_name_df, on='_tmpkey').drop('_tmpkey', axis=1)
+
+    result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id']]
+    return result_df
+
+def rank_for_layer2(run_dt, run_hour, project, table):
+    stats_df = process_reply_stats(project, table, STATS_PERIOD_DAYS, run_dt)
+
+    # 确保重跑时可获得一致结果
+    dt_version = f'{run_dt}{run_hour}'
+    np.random.seed(int(dt_version)+1)
+    # TODO: 计算账号间相关性
+    ## 账号两两组合,取有RoVn数值视频的交集,单个账号内的RoVn(平滑后)组成向量
+    ## 求向量相关系数或cosine相似度
+    ## 单个视频的RoVn加权求和
+    # 当前实现基础版本:只在账号内求二级探索排序分
+
+    sampled_dfs = []
+    # 处理default逻辑(default-explore2)
+    default_stats_df = stats_df.query('gh_id == "default"')
+    sampled_df = default_stats_df.sample(n=SEND_N, weights=default_stats_df['score'])
+    sampled_df['sort'] = range(1, len(sampled_df) + 1)
+    sampled_dfs.append(sampled_df)
+
+    # 基础过滤for账号
+    df = stats_df.query('day0_return > 100')
+    # TODO: fetch send_count
+    # TODO: 个数不足时的兜底逻辑
+    for gh_id in GH_IDS:
+        sub_df = df.query(f'gh_id == "{gh_id}"')
+        sampled_df = sub_df.sample(n=SEND_N, weights=sub_df['score'])
+        sampled_df['sort'] = range(1, len(sampled_df) + 1)
+        sampled_dfs.append(sampled_df)
+        if len(sampled_df) != SEND_N:
+            raise
+
+    extend_df = pd.concat(sampled_dfs)
+    extend_df['strategy_key'] = EXPLORE2_GROUP_NAME
+    extend_df['dt_version'] = dt_version
+    result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id']]
+    return result_df
+
+def rank_for_base(run_dt, run_hour, project, stats_table, rank_table):
+    stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
+
+    #TODO: support to set base manually
+    dt_version = f'{run_dt}{run_hour}'
+
+    # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
+    strategy_df = get_odps_df_of_max_partition(
+        project, rank_table, { 'ctime': dt_version }
+    ).to_pandas()
+    base_strategy_df = strategy_df.query('strategy_key.str.contains("base")')
+    base_strategy_df = base_strategy_df[['gh_id', 'video_id', 'strategy_key']].drop_duplicates()
+
+    default_stats_df = stats_df.query('gh_id == "default"')
+
+    # 在账号内排序,决定该账号(包括default)的base利用内容
+    # 排序过程中,确保当前base策略参与排序,因此先关联再过滤
+    gh_ids_str = ','.join(f'"{x}"' for x in GH_IDS)
+    stats_df = stats_df.query(f'gh_id in ({gh_ids_str})')
+
+    stats_with_strategy_df = stats_df \
+        .merge(
+            base_strategy_df,
+            on=['gh_id', 'video_id'],
+            how='left') \
+        .query('strategy_key.notna() or day0_return > 100')
+
+    # 合并default和分账号数据
+    grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index()
+
+    def set_top_n(group, n=2):
+        group_sorted = group.sort_values(by='score', ascending=False)
+        top_n = group_sorted.head(n)
+        top_n['sort'] = range(1, n + 1)
+        return top_n
+    ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, SEND_N)
+    ranked_df = ranked_df.reset_index(drop=True)
+    #ranked_df['sort'] = grouped_stats_df.groupby('gh_id')['score'].rank(ascending=False)
+    ranked_df['strategy_key'] = BASE_GROUP_NAME
+    ranked_df['dt_version'] = dt_version
+    ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id']]
+    return ranked_df
+
+
+def build_and_transfer_data(run_dt, run_hour, project):
+    dt_version = f'{run_dt}{run_hour}'
+
+    layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE)
+    layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE)
+    base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT,
+                              GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
+    final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
+
+    odps_instance = get_odps_instance(project)
+    odps_ranked_df = odps.DataFrame(final_rank_df)
+
+    video_df = get_dataframe_from_odps('videoods', 'wx_video')
+    video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
+    video_df = video_df['id', 'title', 'cover_url']
+    final_df = odps_ranked_df.join(video_df, on=('video_id', 'id'))
+
+    final_df = final_df.to_pandas()
+    final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url']]
+
+    # reverse sending order
+    final_df['sort'] = SEND_N + 1 - final_df['sort']
+
+    # save to ODPS
+    t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
+    part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
+    part_spec =','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
+    with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
+        writer.write(list(final_df.itertuples(index=False)))
+
+    # sync to MySQL
+    data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
+    data_columns = list(final_df.columns)
+    mysql = MysqlHelper(CONFIG.MYSQL_CRAWLER_INFO)
+    mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
+
+
+def main_loop():
+    try:
+        now_date = datetime.today()
+        LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}")
+        now_hour = now_date.strftime("%H")
+
+        last_date = now_date - timedelta(1)
+        last_dt = last_date.strftime("%Y%m%d")
+        # 查看当前天级更新的数据是否已准备好
+        # 当前上游统计表为天级更新,但字段设计为兼容小时级
+        h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
+        if h_data_count > 0:
+            LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
+            run_dt = now_date.strftime("%Y%m%d")
+            LOGGER.info(f'run_dt: {run_dt}, run_hour: {now_hour}')
+            build_and_transfer_data(run_dt, now_hour, ODS_PROJECT)
+            LOGGER.info('数据更新完成')
+        else:
+            LOGGER.info("上游数据未就绪,等待60s")
+            Timer(60, main_loop).start()
+        return
+    except Exception as e:
+        LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        if CONFIG.ENV_TEXT == '开发环境':
+            return
+        send_msg_to_feishu(
+            webhook=CONFIG.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=CONFIG.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{CONFIG.ENV_TEXT} - 数据更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )
+
+
+if __name__ == '__main__':
+    LOGGER.info("%s 开始执行" % os.path.basename(__file__))
+    LOGGER.info(f"environment: {CONFIG.ENV_TEXT}")
+    main_loop()

+ 26 - 0
db_helper.py

@@ -362,6 +362,32 @@ class MysqlHelper(object):
         conn.close()
         return data
 
+    def batch_insert(self, table, data, columns=None):
+        """
+        data: data, list[tuple] or list[dict]
+        columns: column names, list, required if data is list[tuple]
+        """
+        if data is None or len(data) == 0:
+            return
+        conn = pymysql.connect(**self.mysql_info)
+        try:
+            if columns is not None:
+                if len(data[0]) != len(columns):
+                    raise Exception("data length != column length")
+                columns_str = ','.join(columns)
+            else:
+                if isinstance(data[0], dict):
+                    columns_str = ','.join(data[0].keys())
+            placeholders_str = ','.join(['%s'] * len(data[0]))
+            with conn.cursor() as cursor:
+                sql_str = f"INSERT INTO {table} ({columns_str}) VALUES ({placeholders_str})"
+                cursor.executemany(sql_str, data)
+                conn.commit()
+        except pymysql.MySQLError as e:
+            print(f"Error in batch_insert: {e}")
+            connection.rollback()
+        conn.close()
+
 
 if __name__ == '__main__':
     redis_helper = RedisHelper()

+ 36 - 0
my_config.py

@@ -2430,6 +2430,15 @@ class DevelopmentConfig(BaseConfig):
         'charset': 'utf8'
     }
 
+    MYSQL_CRAWLER_INFO = {
+        'host': 'rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com',
+        'port': 3306,
+        'user': 'crawler',
+        'password': 'crawler123456@',
+        'db': 'piaoquan-crawler',
+        'charset': 'utf8mb4'
+    }
+
     # 测试环境 过滤用mysql地址
     FILTER_MYSQL_INFO = {
         # 'host': 'am-bp1g3ys9u00u483uc131930.ads.aliyuncs.com',
@@ -2522,6 +2531,15 @@ class TestConfig(BaseConfig):
         'charset': 'utf8'
     }
 
+    MYSQL_CRAWLER_INFO = {
+        'host': 'rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com',
+        'port': 3306,
+        'user': 'crawler',
+        'password': 'crawler123456@',
+        'db': 'piaoquan-crawler',
+        'charset': 'utf8mb4'
+    }
+
     # 测试环境 过滤用mysql地址
     FILTER_MYSQL_INFO = {
         # 'host': 'am-bp1g3ys9u00u483uc131930.ads.aliyuncs.com',
@@ -2613,6 +2631,15 @@ class PreProductionConfig(BaseConfig):
         'charset': 'utf8'
     }
 
+    MYSQL_CRAWLER_INFO = {
+        'host': 'rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
+        'port': 3306,
+        'user': 'crawler',
+        'password': 'crawler123456@',
+        'db': 'piaoquan-crawler',
+        'charset': 'utf8mb4'
+    }
+
     # 生产环境 过滤用mysql地址
     FILTER_MYSQL_INFO = {
         'host': 'am-bp15tqt957i3b3sgi131950.ads.aliyuncs.com',
@@ -2697,6 +2724,15 @@ class ProductionConfig(BaseConfig):
         'charset': 'utf8'
     }
 
+    MYSQL_CRAWLER_INFO = {
+        'host': 'rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
+        'port': 3306,
+        'user': 'crawler',
+        'password': 'crawler123456@',
+        'db': 'piaoquan-crawler',
+        'charset': 'utf8mb4'
+    }
+
     # 生产环境 过滤用mysql地址
     FILTER_MYSQL_INFO = {
         'host': 'am-bp15tqt957i3b3sgi131950.ads.aliyuncs.com',

+ 91 - 0
my_utils.py

@@ -7,6 +7,7 @@ import traceback
 import pandas as pd
 
 from odps import ODPS
+from odps.df import DataFrame
 from my_config import set_config
 from db_helper import HologresHelper, MysqlHelper, RedisHelper
 from log import Log
@@ -16,6 +17,16 @@ config_, env = set_config()
 log_ = Log()
 
 
+def get_odps_instance(project):
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+    )
+    return odps
+
+
 def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000,
                        pool_maxsize=1000, pool_connections=1000):
     odps = ODPS(
@@ -72,6 +83,65 @@ def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=
     return records
 
 
+def get_dataframe_from_odps(project, table, partition_spec_dict=None):
+    """
+    从odps获取数据
+    :param partition_spec_dict: 分区spec type-dict
+    :param project: type-string
+    :param table: 表名 type-string
+    :return: odps.DataFrame
+    """
+    odps = get_odps_instance(project)
+    if partition_spec_dict:
+        spec = ','.join(['{}={}'.format(k, partition_spec_dict[k]) for k in
+                         partition_spec_dict.keys()])
+        return DataFrame(odps.get_table(name=table)).filter_parts(spec)
+    else:
+        return DataFrame(odps.get_table(name=table))
+
+
+def get_odps_df_of_max_partition(project, table, rb_spec=None):
+    """
+    rb_spec: spec for right bound of partition names. type-dict
+    return odps.DataFrame
+    """
+    odps = get_odps_instance(project)
+    t = odps.get_table(table)
+    df = DataFrame(odps.get_table(table))
+    if rb_spec is None:
+        return df.filter_parts(t.get_max_partition().partition_spec)
+    else:
+        spec = ','.join(['{}<{}'.format(k, rb_spec[k]) for k in rb_spec.keys()])
+        part_iter = t.iterate_partitions(spec=spec, reverse=True)
+        try:
+            partition = next(part_iter)
+            return df.filter_parts(partition)
+        except StopIteration:
+            return None
+
+def get_odps_df_of_recent_partitions(project, table, n=1, rb_spec=None):
+    """
+    rb_spec: spec for right bound of partition names. type-dict
+    return odps.DataFrame
+    """
+    odps = get_odps_instance(project)
+    t = odps.get_table(table)
+    df = DataFrame(odps.get_table(table))
+    spec = None
+    if rb_spec:
+        spec = ','.join(['{}<{}'.format(k, rb_spec[k]) for k in rb_spec.keys()])
+    part_iter = t.iterate_partitions(spec=spec, reverse=True)
+    selected_parts = []
+    try:
+        for i in range(0, n):
+            partition = next(part_iter)
+            selected_parts.append(partition)
+            log_.info(f"table: {table}, selected part: {partition.name}")
+    except StopIteration:
+        log_.info(f"table: {table}, no more parts to iterate")
+    return df.filter_parts(selected_parts)
+
+
 def check_table_partition_exits(date, project, table, connect_timeout=3000, read_timeout=500000,
                                 pool_maxsize=1000, pool_connections=1000):
     """
@@ -99,6 +169,27 @@ def check_table_partition_exits(date, project, table, connect_timeout=3000, read
     return t.exist_partition(partition_spec=f'dt={date}')
 
 
+def check_table_partition_exits_v2(project, table, partition_spec_dict):
+    """
+    判断表中是否存在指定分区,并返回分区纪录数量
+    注:ODPS新版本移除了timeout等参数
+    :param project: 库名 type-string
+    :param table: 表名 type-string
+    :param partition_spec_dict: 分区spec type-dict
+    :return: if_exist, num_records
+    """
+    odps = get_odps_instance(project)
+    t = odps.get_table(name=table)
+    spec = ','.join(['{}={}'.format(k, partition_spec_dict[k]) for k in
+                     partition_spec_dict.keys()])
+    if t.exist_partition(partition_spec=spec):
+        with t.open_reader(partition=spec) as reader:
+            count = reader.count
+        return True, count
+    else:
+        return False, 0
+
+
 def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
     """
     将数据写入pickle文件中

+ 1 - 0
requirements.txt

@@ -8,3 +8,4 @@ requests==2.24.0
 odps==3.5.1
 psycopg2==2.9.1
 scikit_learn==1.0.1
+aliyun-log-python-sdk==0.9.11