|
@@ -14,6 +14,7 @@ from my_config import set_config
|
|
|
import numpy as np
|
|
|
from log import Log
|
|
|
import os
|
|
|
+from argparse import ArgumentParser
|
|
|
|
|
|
CONFIG, _ = set_config()
|
|
|
LOGGER = Log()
|
|
@@ -22,7 +23,8 @@ BASE_GROUP_NAME = 'stg0909-base'
|
|
|
EXPLORE1_GROUP_NAME = 'stg0909-explore1'
|
|
|
EXPLORE2_GROUP_NAME = 'stg0909-explore2'
|
|
|
#TODO: fetch gh_id from external data source
|
|
|
-GH_IDS = ('gh_ac43e43b253b', 'gh_93e00e187787', 'gh_77f36c109fb1', 'gh_68e7fdc09fe4')
|
|
|
+GH_IDS = ('gh_ac43e43b253b', 'gh_93e00e187787', 'gh_77f36c109fb1',
|
|
|
+ 'gh_68e7fdc09fe4', 'gh_b181786a6c8c')
|
|
|
CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center"
|
|
|
|
|
|
ODS_PROJECT = "loghubods"
|
|
@@ -30,7 +32,7 @@ EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
|
|
|
GH_REPLY_STATS_TABLE = 'alg_growth_gh_reply_video_stats'
|
|
|
ODPS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
|
|
|
RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
|
|
|
-STATS_PERIOD_DAYS = 3
|
|
|
+STATS_PERIOD_DAYS = 5
|
|
|
SEND_N = 2
|
|
|
|
|
|
def check_data_partition(project, table, data_dt, data_hr=None):
|
|
@@ -80,7 +82,7 @@ def process_reply_stats(project, table, period, run_dt):
|
|
|
|
|
|
merged_df = pd.concat([df, default_stats_df]).reset_index(drop=True)
|
|
|
|
|
|
- merged_df['score'] = merged_df['day0_return'] / (merged_df['send_count'] + 1000)
|
|
|
+ merged_df['score'] = merged_df['day0_return'] / (merged_df['send_count'] + 500)
|
|
|
return merged_df
|
|
|
|
|
|
|
|
@@ -174,7 +176,7 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table):
|
|
|
base_strategy_df,
|
|
|
on=['gh_id', 'video_id'],
|
|
|
how='left') \
|
|
|
- .query('strategy_key.notna() or day0_return > 100')
|
|
|
+ .query('strategy_key.notna() or score > 0.1')
|
|
|
|
|
|
# 合并default和分账号数据
|
|
|
grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index()
|
|
@@ -201,8 +203,9 @@ def check_result_data(df):
|
|
|
raise Exception(f"Result not enough for gh_id[{gh_id}] group[{group}]")
|
|
|
|
|
|
|
|
|
-def build_and_transfer_data(run_dt, run_hour, project):
|
|
|
+def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
|
|
|
dt_version = f'{run_dt}{run_hour}'
|
|
|
+ dry_run = kwargs.get('dry_run', False)
|
|
|
|
|
|
layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE)
|
|
|
layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT,
|
|
@@ -226,6 +229,10 @@ def build_and_transfer_data(run_dt, run_hour, project):
|
|
|
# reverse sending order
|
|
|
final_df['sort'] = SEND_N + 1 - final_df['sort']
|
|
|
|
|
|
+ if dry_run:
|
|
|
+ print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']])
|
|
|
+ return
|
|
|
+
|
|
|
# save to ODPS
|
|
|
t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
|
|
|
part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
|
|
@@ -241,6 +248,10 @@ def build_and_transfer_data(run_dt, run_hour, project):
|
|
|
|
|
|
|
|
|
def main_loop():
|
|
|
+ argparser = ArgumentParser()
|
|
|
+ argparser.add_argument('-n', '--dry-run', action='store_true')
|
|
|
+ args = argparser.parse_args()
|
|
|
+
|
|
|
try:
|
|
|
now_date = datetime.today()
|
|
|
LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}")
|
|
@@ -255,7 +266,8 @@ def main_loop():
|
|
|
LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
|
|
|
run_dt = now_date.strftime("%Y%m%d")
|
|
|
LOGGER.info(f'run_dt: {run_dt}, run_hour: {now_hour}')
|
|
|
- build_and_transfer_data(run_dt, now_hour, ODS_PROJECT)
|
|
|
+ build_and_transfer_data(run_dt, now_hour, ODS_PROJECT,
|
|
|
+ dry_run=args.dry_run)
|
|
|
LOGGER.info('数据更新完成')
|
|
|
else:
|
|
|
LOGGER.info("上游数据未就绪,等待60s")
|