فهرست منبع

Merge branch 'feature/20241022-gh-3rd-reply-manual-explore' of algorithm/rov-offline into master

fengzhoutian 6 ماه پیش
والد
کامیت
3ed0751c5d
4فایلهای تغییر یافته به همراه301 افزوده شده و 236 حذف شده
  1. 57 234
      alg_growth_3rd_gh_reply_video_v1.py
  2. 4 2
      alg_growth_gh_reply_video_v1.py
  3. 226 0
      configs/3rd_gh_reply_video.json
  4. 14 0
      constants.py

+ 57 - 234
alg_growth_3rd_gh_reply_video_v1.py

@@ -4,6 +4,7 @@ import pandas as pd
 import traceback
 import odps
 from odps import ODPS
+import json
 from threading import Timer
 from datetime import datetime, timedelta
 from db_helper import MysqlHelper
@@ -23,225 +24,8 @@ LOGGER = Log()
 BASE_GROUP_NAME = '3rd-party-base'
 EXPLORE1_GROUP_NAME = '3rd-party-explore1'
 EXPLORE2_GROUP_NAME = '3rd-party-explore2'
-# TODO: fetch gh_id from external data source
-GH_IDS = (
-    'gh_2863155cedcb',
-    'gh_c1acd6bac0f8',
-    'gh_da993c5f7f64',
-    'gh_495d71abbda2',
-    'gh_e2318164f869',
-    'gh_fc4ec610756e',
-    'gh_2450ad774945',
-    'gh_175925e40318',
-    'gh_994adaf7a539',
-    'gh_250c51d5ce69',
-    'gh_37adcd637351',
-    'gh_a36405f4e5d3',
-    'gh_ee5b4b07ed8b',
-    'gh_11debb2c8392',
-    'gh_d645c1ef7fb0',
-    'gh_1899b728af86',
-    'gh_059a27ea86b2',
-    'gh_6454c103be14',
-    'gh_63745bad4f21',
-    'gh_8a29eebc2012',
-    'gh_57bc9846c86a',
-    'gh_570967881eae',
-    'gh_197a0d8caa31',
-    'gh_93af434e3f47',
-    'gh_184f2d765f55',
-    'gh_8157d8fd284e',
-    'gh_8e1d1f19d44f',
-    'gh_1da8f62f4a0d',
-    'gh_fd4df7c45bb9',
-    'gh_dcfcf74b0846',
-    'gh_3afc3a8b8a3d',
-    'gh_ef699270bf64',
-    'gh_ba870f8b178b',
-    'gh_58cdb2f1f0d0',
-    'gh_3dce33de6994',
-    'gh_543e6d7e15f3',
-    'gh_0d55fee6b78d',
-    'gh_1c11651e0df4',
-    'gh_7f4741dd5fea',
-    'gh_33e9e4cbed84',
-    'gh_23fa67ebb016',
-    'gh_33c26df4eab0',
-    'gh_01c93b07605f',
-    'gh_c655e3c8a121',
-    'gh_83adb78f4ede',
-    'gh_0cc7c6d712eb',
-    'gh_1b8bfb5c4ffd',
-    'gh_e4fb77b1023b',
-    'gh_f994f5d9a4b6',
-    'gh_e0ca8ba4ed91',
-    'gh_b2b4d5aa6b49',
-    'gh_53759f90b0c5',
-    'gh_d219a0cc8a35',
-    'gh_930b5ef5a185',
-    'gh_22cad14dc4ec',
-    'gh_8734c02f2983',
-    'gh_8d68e68f2d08',
-    'gh_c603033bf881',
-    'gh_55ac4e447179',
-    'gh_8b5c838ac19a',
-    'gh_aed71f26e7e6',
-    'gh_330ef0db846d',
-    'gh_87eca527c626',
-    'gh_7a14b4c15090',
-    'gh_b74693fed783',
-    'gh_e1594e6db64b',
-    'gh_d32daba8ccf8',
-    'gh_23e084923def',
-    'gh_148aa4a776ce',
-    'gh_0df4d6b647ea',
-    'gh_041d1c819c30',
-    'gh_7e33fbba4398',
-    'gh_354ab82cf9b3',
-    'gh_b1f1d7a1f351',
-    'gh_793647539ef5',
-    'gh_1ff0c29f8408',
-    'gh_ecef1c08bcf4',
-    'gh_22f53f6a2b5d',
-    'gh_34820675d0fc',
-    'gh_4175a8f745f6',
-    'gh_81145598368a',
-    'gh_5f0bb5822e10',
-    'gh_65d8db4e97ca',
-    'gh_a09594d52fda',
-    'gh_4411cf1e5f4e',
-    'gh_9ee5f5e8425f',
-    'gh_df24adad2521',
-    'gh_30b472377707',
-    'gh_bb6775b47656',
-    'gh_69808935bba0',
-    'gh_fb77872bf907',
-    'gh_830c4aa1b262',
-    'gh_b5393e35caa4',
-    'gh_fa7dceae7c9d',
-    'gh_449fb0c2d817',
-    'gh_d6e75ad9094f',
-    'gh_1cbb453a800e',
-    'gh_1b243f162cbd',
-    'gh_50db6881c86e',
-    'gh_9d94775e8137',
-    'gh_d37101fb9b98',
-    'gh_ed86d05703eb',
-    'gh_ac4072121e24',
-    'gh_620af8e24fb9',
-    'gh_ee4783ded544',
-    'gh_d2bb5f1b9498',
-    'gh_5044de6e1597',
-    'gh_d94de77a8d08',
-    'gh_98624814f69a',
-    'gh_4c38b9d4474a',
-    'gh_f2a6c90c56cb',
-    'gh_26f1353fda5a',
-    'gh_143743361496',
-    'gh_126c99b39cea',
-    'gh_53e6e0a1b1bd',
-    'gh_859aafbcda3d',
-    'gh_cfce2617bd82',
-    'gh_db8ea2bc6687',
-    'gh_c4708b8cfe39',
-    'gh_57d2388bd01d',
-    'gh_5fffe35cc12a',
-    'gh_45980a6448f3',
-    'gh_f5120c12ee23',
-    'gh_bf79e3645d7a',
-    'gh_6c6d81dd642d',
-    'gh_57ee6a6ef204',
-    'gh_45be25d1c06b',
-    'gh_3ee85ba7c3ae',
-    'gh_7c89d5a3e745',
-    'gh_c46be9ea4eef',
-    'gh_cedc3c4eb48b',
-    'gh_8a91fa7f32aa',
-    'gh_5207b355776f',
-    'gh_6c7f73de400b',
-    'gh_d2f3805f8fa3',
-    'gh_7dd47f8aca4e',
-    'gh_967f9abb9ccd',
-    'gh_f46c6c9b53fa',
-    'gh_086abf2a536b',
-    'gh_6e11282216f3',
-    'gh_f5332b8dfb63',
-    'gh_f78610e292ba',
-    'gh_06699758fa4b',
-    'gh_92323d0bea11',
-    'gh_517aed4e8197',
-    'gh_c80462b5a330',
-    'gh_1b1c3ced734e',
-    'gh_dd54e30b03ad',
-    'gh_cadd0ea4fab3',
-    'gh_ef07a709127e',
-    'gh_ab6ca922e605',
-    'gh_8b69b67ea723',
-    'gh_363c54315788',
-    'gh_a363987c60bf',
-    'gh_86ca35774fcf',
-    'gh_518694803ae7',
-    'gh_f98d5f17e9ea',
-    'gh_5e0cd3f7b457',
-    'gh_9e0d149e2c0a',
-    'gh_7e77b09bb4f5',
-    'gh_261bbd99a906',
-    'gh_2dc8e3a7b6c9',
-    'gh_1ec8dae66c97',
-    'gh_7f062810b4e7',
-    'gh_3c112c0c9c8b',
-    'gh_01cd19465b39',
-    'gh_8cc8ae6eb9a5',
-    'gh_210f7ce6f418',
-    'gh_04804a94e325',
-    'gh_4685665647f0',
-    'gh_d7fa96aeb839',
-    'gh_210cb680d83d',
-    'gh_862b00a394e3',
-    'gh_3cf7b310906a',
-    'gh_669555ebea28',
-    'gh_aaac62205137',
-    'gh_0a03f8fa63ba',
-    'gh_b8b2d4184832',
-    'gh_819a632d4bb1',
-    'gh_db09b87a0fc9',
-    'gh_b673c01e7bd8',
-    'gh_6da61a15044a',
-    'gh_2f1fab4efaef',
-    'gh_da22f64152d5',
-    'gh_ff9fe99f2097',
-    'gh_33731afddbdb',
-    'gh_4d2f75c3c3fe',
-    'gh_40ff43c50773',
-    'gh_56b65b7d4520',
-    'gh_ff16c412ab97',
-    'gh_8bf689ae15cc',
-    'gh_650b17dbba8f',
-    'gh_b63b9dde3f4b',
-    'gh_36e74017026e',
-    'gh_a8851bfa953b',
-    'gh_ec5beb465640',
-    'gh_133c36b99b14',
-    'gh_b144210318e5',
-    'gh_3bffce62dbb4',
-    'gh_2fbff340c683',
-    'gh_3ceae370dcf5',
-    'gh_530b634707b0',
-    'gh_b7cdece20099',
-    'gh_9e0c7a370aaf',
-    'gh_96412c0393e3',
-    'gh_c8060587e6d1',
-    'gh_0d3c97cc30cc',
-    'gh_491189a534f2',
-    'gh_fe9620386c2c',
-    'gh_9d50b7067f07',
-    'gh_e1331141406a',
-    'gh_d6db13fcf14d',
-    'gh_5522900b6a67',
-    'gh_a7c21403c493',
-    'gh_eeec7c2e28a5',
-    'gh_c783350a9660',
-)
+# GH_IDS will be updated by get_and_update_gh_ids
+GH_IDS = ('default',)
 
 TARGET_GH_IDS = (
     'gh_250c51d5ce69',
@@ -266,8 +50,7 @@ CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/forma
 ODS_PROJECT = "loghubods"
 EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
 GH_REPLY_STATS_TABLE = 'alg_growth_3rd_gh_reply_video_stats'
-# ODPS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
-ODPS_3RD_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data'
+ODPS_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data'
 GH_DETAIL = 'gh_detail'
 RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
 STATS_PERIOD_DAYS = 5
@@ -424,7 +207,8 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
 
     # 在账号内排序,决定该账号(包括default)的base利用内容
     # 排序过程中,确保当前base策略参与排序,因此先关联再过滤
-    gh_ids_str = ','.join(f'"{x}"' for x in GH_IDS)
+    non_default_ids = list(filter(lambda x: x != 'default', GH_IDS))
+    gh_ids_str = ','.join(f'"{x}"' for x in non_default_ids)
     stats_df = stats_df.query(f'gh_id in ({gh_ids_str})')
 
     stats_with_strategy_df = stats_df \
@@ -455,8 +239,38 @@ def check_result_data(df):
     for gh_id in GH_IDS:
         for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
             sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
-            if len(sub_df) != SEND_N:
-                raise Exception(f"Result not enough for gh_id[{gh_id}], group[{key}]")
+            n_records = len(sub_df)
+            if n_records != SEND_N:
+                raise Exception(f"Unexpected record count: {gh_id},{key},{n_records}")
+
+
+def postprocess_override_by_config(df, dt_version):
+    config = json.load(open("configs/3rd_gh_reply_video.json"))
+    override_data = {
+        'strategy_key': [],
+        'gh_id': [],
+        'sort': [],
+        'video_id': []
+    }
+
+    for gh_id in config:
+        gh_config = config[gh_id]
+        for key in gh_config:
+            for video_config in gh_config[key]:
+                # remove current
+                position = video_config['position']
+                video_id = video_config['video_id']
+                df = df.drop(df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}" and sort == {position}').index)
+                override_data['strategy_key'].append(key)
+                override_data['gh_id'].append(gh_id)
+                override_data['sort'].append(position)
+                override_data['video_id'].append(video_id)
+    n_records = len(override_data['strategy_key'])
+    override_data['dt_version'] = [dt_version] * n_records
+    override_data['score'] = [0.0] * n_records
+    df_to_append = pd.DataFrame(override_data)
+    df = pd.concat([df, df_to_append], ignore_index=True)
+    return df
 
 
 def rank_for_base_designate(run_dt, run_hour, stg_key):
@@ -492,13 +306,15 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
 
     gh_df = get_and_update_gh_ids(run_dt)
 
-    layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, ,gh_df)
-    # layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_3RD_RANK_RESULT_TABLE)
-    # base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_3RD_RANK_RESULT_TABLE,BASE_GROUP_NAME)
+    layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
+    # layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
+    # base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE,BASE_GROUP_NAME)
     layer2_rank = rank_for_base_designate(run_dt, run_hour, EXPLORE2_GROUP_NAME)
     base_rank = rank_for_base_designate(run_dt, run_hour, BASE_GROUP_NAME)
 
     final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
+
+    final_rank_df = postprocess_override_by_config(final_rank_df, dt_version)
     check_result_data(final_rank_df)
 
     odps_instance = get_odps_instance(project)
@@ -516,11 +332,12 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
     final_df['sort'] = SEND_N + 1 - final_df['sort']
 
     if dry_run:
-        print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']])
+        print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']]
+              .sort_values(by=['strategy_key', 'gh_id', 'sort']))
         return
 
     # save to ODPS
-    t = odps_instance.get_table(ODPS_3RD_RANK_RESULT_TABLE)
+    t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
     part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
     part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
     with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
@@ -536,23 +353,29 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
 def main_loop():
     argparser = ArgumentParser()
     argparser.add_argument('-n', '--dry-run', action='store_true')
+    argparser.add_argument('--run-at',help='assume to run at date and hour, yyyyMMddHH')
     args = argparser.parse_args()
 
+    run_date = datetime.today()
+    if args.run_at:
+        run_date = datetime.strptime(args.run_at, "%Y%m%d%H")
+        LOGGER.info(f"Assume to run at {run_date.strftime('%Y-%m-%d %H:00')}")
+
     try:
         now_date = datetime.today()
         LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}")
-        now_hour = now_date.strftime("%H")
 
-        last_date = now_date - timedelta(1)
+        last_date = run_date - timedelta(1)
         last_dt = last_date.strftime("%Y%m%d")
         # 查看当前天级更新的数据是否已准备好
         # 当前上游统计表为天级更新,但字段设计为兼容小时级
         h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
         if h_data_count > 0:
             LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
-            run_dt = now_date.strftime("%Y%m%d")
-            LOGGER.info(f'run_dt: {run_dt}, run_hour: {now_hour}')
-            build_and_transfer_data(run_dt, now_hour, ODS_PROJECT,
+            run_dt = run_date.strftime("%Y%m%d")
+            run_hour = run_date.strftime("%H")
+            LOGGER.info(f'run_dt: {run_dt}, run_hour: {run_hour}')
+            build_and_transfer_data(run_dt, run_hour, ODS_PROJECT,
                                     dry_run=args.dry_run)
             LOGGER.info('数据更新完成')
         else:

+ 4 - 2
alg_growth_gh_reply_video_v1.py

@@ -193,7 +193,8 @@ def rank_for_base(run_dt, run_hour, rank_table):
 
     # 在账号内排序,决定该账号(包括default)的base利用内容
     # 排序过程中,确保当前base策略参与排序,因此先关联再过滤
-    gh_ids_str = ','.join(f'"{x}"' for x in GH_IDS)
+    non_default_ids = list(filter(lambda x: x != 'default', GH_IDS))
+    gh_ids_str = ','.join(f'"{x}"' for x in non_default_ids)
     stats_df = stats_df.query(f'gh_id in ({gh_ids_str})')
 
     stats_with_strategy_df = stats_df \
@@ -256,7 +257,8 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
     final_df['sort'] = SEND_N + 1 - final_df['sort']
 
     if dry_run:
-        print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']])
+        print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']]
+              .sort_values(by=['strategy_key', 'gh_id', 'sort']))
         return
 
     # save to ODPS

+ 226 - 0
configs/3rd_gh_reply_video.json

@@ -0,0 +1,226 @@
+{
+  "gh_a7c21403c493": {
+    "3rd-party-explore2": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ],
+    "3rd-party-base": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ]
+  },
+  "gh_e0ca8ba4ed91": {
+    "3rd-party-explore2": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ],
+    "3rd-party-base": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ]
+  },
+  "gh_859aafbcda3d": {
+    "3rd-party-explore2": [
+      {
+        "position": 1,
+        "video_id": 20463342
+      }
+    ],
+    "3rd-party-base": [
+      {
+        "position": 1,
+        "video_id": 20463342
+      }
+    ]
+  },
+  "gh_1014734791e0": {
+    "3rd-party-explore2": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ],
+    "3rd-party-base": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ]
+  },
+  "gh_7e33fbba4398": {
+    "3rd-party-explore2": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ],
+    "3rd-party-base": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ]
+  },
+  "gh_aed71f26e7e6": {
+    "3rd-party-explore2": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ],
+    "3rd-party-base": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ]
+  },
+  "gh_c783350a9660": {
+    "3rd-party-explore2": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ],
+    "3rd-party-base": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ]
+  },
+  "gh_b2b4d5aa6b49": {
+    "3rd-party-explore2": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ],
+    "3rd-party-base": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ]
+  },
+  "gh_9a5351c164b4": {
+    "3rd-party-explore2": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ],
+    "3rd-party-base": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ]
+  },
+  "gh_ff16c412ab97": {
+    "3rd-party-explore2": [
+      {
+        "position": 1,
+        "video_id": 20463342
+      }
+    ],
+    "3rd-party-base": [
+      {
+        "position": 1,
+        "video_id": 20463342
+      }
+    ]
+  },
+  "gh_1b8bfb5c4ffd": {
+    "3rd-party-explore2": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ],
+    "3rd-party-base": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ]
+  },
+  "gh_98624814f69a": {
+    "3rd-party-explore2": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ],
+    "3rd-party-base": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ]
+  },
+  "gh_143743361496": {
+    "3rd-party-explore2": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ],
+    "3rd-party-base": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ]
+  },
+  "gh_7f062810b4e7": {
+    "3rd-party-explore2": [
+      {
+        "position": 1,
+        "video_id": 20463342
+      }
+    ],
+    "3rd-party-base": [
+      {
+        "position": 1,
+        "video_id": 20463342
+      }
+    ]
+  },
+  "gh_2caed26a859c": {
+    "3rd-party-explore2": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ],
+    "3rd-party-base": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ]
+  },
+  "gh_eeec7c2e28a5": {
+    "3rd-party-explore2": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ],
+    "3rd-party-base": [
+      {
+        "position": 1,
+        "video_id": 13586800
+      }
+    ]
+  }
+}

+ 14 - 0
constants.py

@@ -0,0 +1,14 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim:fenc=utf-8
+
+"""
+Constant values.
+"""
+
+from enum import Enum
+
+class AutoReplyAccountType(Enum):
+    SELF_OWNED_GZH = 1
+    EXTERNAL_GZH = 2
+    SELF_OWNED_QYWX = 3