|
@@ -5,7 +5,8 @@ import traceback
|
|
|
import odps
|
|
|
from odps import ODPS
|
|
|
import json
|
|
|
-from threading import Timer
|
|
|
+import time
|
|
|
+from pymysql.cursors import DictCursor
|
|
|
from datetime import datetime, timedelta
|
|
|
from db_helper import MysqlHelper
|
|
|
from my_utils import check_table_partition_exits_v2, get_dataframe_from_odps, \
|
|
@@ -41,20 +42,27 @@ STATS_PERIOD_DAYS = 5
|
|
|
SEND_N = 1
|
|
|
|
|
|
def get_and_update_gh_ids(run_dt):
|
|
|
- gh = get_odps_df_of_max_partition(ODS_PROJECT, GH_DETAIL, {'dt': run_dt})
|
|
|
- gh = gh.to_pandas()
|
|
|
+ db = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
|
|
|
gh_type = AutoReplyAccountType.EXTERNAL_GZH.value
|
|
|
- gh = gh.query(f'type == {gh_type} and is_delete == 0')
|
|
|
+ sqlstr = f"""
|
|
|
+ SELECT gh_id, gh_name, category1, category2, channel,
|
|
|
+ video_ids, strategy_status
|
|
|
+ FROM {GH_DETAIL}
|
|
|
+ WHERE is_delete = 0 AND `type` = {gh_type}
|
|
|
+ """
|
|
|
+ account_data = db.get_data(sqlstr, DictCursor)
|
|
|
+ account_df = pd.DataFrame(account_data)
|
|
|
+
|
|
|
# default单独处理
|
|
|
- if 'default' not in gh['gh_id'].values:
|
|
|
+ if 'default' not in account_df['gh_id'].values:
|
|
|
new_row = pd.DataFrame({'gh_id': ['default'], 'gh_name': ['默认'], 'type': [2], 'category1': ['泛生活']},
|
|
|
index=[0])
|
|
|
- gh = pd.concat([gh, new_row], ignore_index=True)
|
|
|
+ account_df = pd.concat([account_df, new_row], ignore_index=True)
|
|
|
|
|
|
- gh = gh.drop_duplicates(subset=['gh_id'])
|
|
|
+ account_df = account_df.drop_duplicates(subset=['gh_id'])
|
|
|
global GH_IDS
|
|
|
- GH_IDS = tuple(gh['gh_id'])
|
|
|
- return gh
|
|
|
+ GH_IDS = tuple(account_df['gh_id'])
|
|
|
+ return account_df
|
|
|
|
|
|
|
|
|
def check_data_partition(project, table, data_dt, data_hr=None):
|
|
@@ -74,9 +82,10 @@ def get_last_strategy_result(project, rank_table, dt_version, key):
|
|
|
strategy_df = get_odps_df_of_max_partition(
|
|
|
project, rank_table, {'ctime': dt_version}
|
|
|
).to_pandas()
|
|
|
+ dt_version = strategy_df.iloc[0]['dt_version']
|
|
|
sub_df = strategy_df.query(f'strategy_key == "{key}"')
|
|
|
sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates()
|
|
|
- return sub_df
|
|
|
+ return sub_df, dt_version
|
|
|
|
|
|
|
|
|
def process_reply_stats(project, table, period, run_dt):
|
|
@@ -154,7 +163,7 @@ def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
|
|
|
df = stats_df.query('send_count > 200 and score > 0')
|
|
|
|
|
|
# fallback to base if necessary
|
|
|
- base_strategy_df = get_last_strategy_result(
|
|
|
+ base_strategy_df, _ = get_last_strategy_result(
|
|
|
project, rank_table, dt_version, BASE_GROUP_NAME)
|
|
|
|
|
|
for gh_id in GH_IDS:
|
|
@@ -185,7 +194,7 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
|
|
|
dt_version = f'{run_dt}{run_hour}'
|
|
|
|
|
|
# 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
|
|
|
- base_strategy_df = get_last_strategy_result(
|
|
|
+ base_strategy_df, _ = get_last_strategy_result(
|
|
|
project, rank_table, dt_version, stg_key)
|
|
|
|
|
|
default_stats_df = stats_df.query('gh_id == "default"')
|
|
@@ -259,12 +268,7 @@ def postprocess_override_by_config(df, dt_version):
|
|
|
return df
|
|
|
|
|
|
|
|
|
-def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
|
|
|
- dt_version = f'{run_dt}{run_hour}'
|
|
|
- dry_run = kwargs.get('dry_run', False)
|
|
|
-
|
|
|
- gh_df = get_and_update_gh_ids(run_dt)
|
|
|
-
|
|
|
+def build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run):
|
|
|
layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
|
|
|
layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
|
|
|
base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE,BASE_GROUP_NAME)
|
|
@@ -274,16 +278,7 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
|
|
|
final_rank_df = postprocess_override_by_config(final_rank_df, dt_version)
|
|
|
check_result_data(final_rank_df)
|
|
|
|
|
|
- odps_instance = get_odps_instance(project)
|
|
|
- odps_ranked_df = odps.DataFrame(final_rank_df)
|
|
|
-
|
|
|
- video_df = get_dataframe_from_odps('videoods', 'wx_video')
|
|
|
- video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
|
|
|
- video_df = video_df['id', 'title', 'cover_url']
|
|
|
- final_df = odps_ranked_df.join(video_df, on=('video_id', 'id'))
|
|
|
-
|
|
|
- final_df = final_df.to_pandas()
|
|
|
- final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']]
|
|
|
+ final_df = join_video_info(final_rank_df)
|
|
|
|
|
|
# reverse sending order
|
|
|
final_df['sort'] = SEND_N + 1 - final_df['sort']
|
|
@@ -321,10 +316,112 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
|
|
|
rows = mysql.execute(sql)
|
|
|
|
|
|
|
|
|
-def main_loop():
|
|
|
+def build_and_transfer_delta_mode(account_df, dt_version, dry_run):
|
|
|
+ # 获取最新策略信息, 策略表dt_version(ctime partition)采用当前时间
|
|
|
+ last_strategy, last_dt_version = get_last_strategy_result(
|
|
|
+ ODS_PROJECT, ODPS_RANK_RESULT_TABLE, dt_version, BASE_GROUP_NAME)
|
|
|
+ account_map = { x['gh_id']: x for x in account_df.to_dict(orient='records') }
|
|
|
+ all_accounts = account_df['gh_id'].unique()
|
|
|
+ accounts_in_strategy = last_strategy['gh_id'].unique()
|
|
|
+ delta_accounts = [x for x in set(all_accounts) - set(accounts_in_strategy)]
|
|
|
+ if len(delta_accounts) > 0:
|
|
|
+ LOGGER.info('Found {} new accounts: {}'.format(
|
|
|
+ len(delta_accounts), ','.join(delta_accounts)))
|
|
|
+ else:
|
|
|
+ LOGGER.info('Found 0 new account, do nothing.')
|
|
|
+ return
|
|
|
+
|
|
|
+ default_video = {
|
|
|
+ '泛生活': [20463342],
|
|
|
+ '泛历史': [13586800],
|
|
|
+ }
|
|
|
+
|
|
|
+ # 新增账号,不存在历史,可直接忽略strategy_status字段
|
|
|
+ # TODO: set default by history stats
|
|
|
+ groups = (BASE_GROUP_NAME, EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME)
|
|
|
+ rows = []
|
|
|
+ for gh_id in delta_accounts:
|
|
|
+ account_info = account_map[gh_id]
|
|
|
+ configured_videos = account_info['video_ids']
|
|
|
+ if configured_videos:
|
|
|
+ LOGGER.info(f'{gh_id} has configured video IDs: {configured_videos}')
|
|
|
+ video_ids = [int(x) for x in configured_videos.split(',')]
|
|
|
+ else:
|
|
|
+ video_ids = default_video[account_info['category1']]
|
|
|
+ for group_key in groups:
|
|
|
+ for idx in range(SEND_N):
|
|
|
+ row = {
|
|
|
+ 'strategy_key': group_key,
|
|
|
+ 'gh_id': gh_id,
|
|
|
+ 'sort': idx + 1,
|
|
|
+ 'video_id': video_ids[idx],
|
|
|
+ 'dt_version': last_dt_version,
|
|
|
+ 'score': 0.0
|
|
|
+ }
|
|
|
+ rows.append(row)
|
|
|
+ df = pd.DataFrame(rows)
|
|
|
+ final_df = join_video_info(df)
|
|
|
+ if dry_run:
|
|
|
+ print(final_df)
|
|
|
+ return
|
|
|
+
|
|
|
+ # 增量记录更新至MySQL
|
|
|
+ data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
|
|
|
+ data_columns = list(final_df.columns)
|
|
|
+ mysql = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
|
|
|
+ mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
|
|
|
+
|
|
|
+ # 全量记录写回ODPS
|
|
|
+ last_odps_df = get_odps_df_of_max_partition(
|
|
|
+ ODS_PROJECT, ODPS_RANK_RESULT_TABLE, {'ctime': dt_version}
|
|
|
+ ).to_pandas()
|
|
|
+ updated_odps_df = pd.concat([final_df, last_odps_df], ignore_index=True)
|
|
|
+
|
|
|
+ odps_instance = get_odps_instance(ODS_PROJECT)
|
|
|
+ t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
|
|
|
+ target_dt = last_dt_version[0:8]
|
|
|
+ target_hour = last_dt_version[8:10]
|
|
|
+ part_spec_dict = {'dt': target_dt, 'hour': target_hour, 'ctime': last_dt_version}
|
|
|
+ part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
|
|
|
+ with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
|
|
|
+ writer.write(list(updated_odps_df.itertuples(index=False)))
|
|
|
+
|
|
|
+
|
|
|
+def join_video_info(df):
|
|
|
+ odps_instance = get_odps_instance(ODS_PROJECT)
|
|
|
+ odps_df = odps.DataFrame(df)
|
|
|
+
|
|
|
+ video_df = get_dataframe_from_odps('videoods', 'wx_video')
|
|
|
+ video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
|
|
|
+ video_df = video_df['id', 'title', 'cover_url']
|
|
|
+ final_df = odps_df.join(video_df, on=('video_id', 'id'))
|
|
|
+
|
|
|
+ final_df = final_df.to_pandas()
|
|
|
+ final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']]
|
|
|
+ return final_df
|
|
|
+
|
|
|
+
|
|
|
+def build_and_transfer_data(run_dt, run_hour, **kwargs):
|
|
|
+ dt_version = f'{run_dt}{run_hour}'
|
|
|
+ dry_run = kwargs.get('dry_run', False)
|
|
|
+ mode = kwargs.get('mode')
|
|
|
+
|
|
|
+ gh_df = get_and_update_gh_ids(run_dt)
|
|
|
+
|
|
|
+ if mode == 'delta':
|
|
|
+ return build_and_transfer_delta_mode(gh_df, dt_version, dry_run)
|
|
|
+ else:
|
|
|
+ return build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run)
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ LOGGER.info("%s 开始执行" % os.path.basename(__file__))
|
|
|
+ LOGGER.info(f"environment: {CONFIG.ENV_TEXT}")
|
|
|
+
|
|
|
argparser = ArgumentParser()
|
|
|
argparser.add_argument('-n', '--dry-run', action='store_true')
|
|
|
argparser.add_argument('--run-at',help='assume to run at date and hour, yyyyMMddHH')
|
|
|
+ argparser.add_argument('--mode', default='base', choices=['base', 'delta'], help='run mode')
|
|
|
args = argparser.parse_args()
|
|
|
|
|
|
run_date = datetime.today()
|
|
@@ -340,21 +437,22 @@ def main_loop():
|
|
|
last_dt = last_date.strftime("%Y%m%d")
|
|
|
# 查看当前天级更新的数据是否已准备好
|
|
|
# 当前上游统计表为天级更新,但字段设计为兼容小时级
|
|
|
- h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
|
|
|
- if h_data_count > 0:
|
|
|
- LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
|
|
|
- run_dt = run_date.strftime("%Y%m%d")
|
|
|
- run_hour = run_date.strftime("%H")
|
|
|
- LOGGER.info(f'run_dt: {run_dt}, run_hour: {run_hour}')
|
|
|
- build_and_transfer_data(run_dt, run_hour, ODS_PROJECT,
|
|
|
- dry_run=args.dry_run)
|
|
|
- LOGGER.info('数据更新完成')
|
|
|
- else:
|
|
|
- LOGGER.info("上游数据未就绪,等待60s")
|
|
|
- Timer(60, main_loop).start()
|
|
|
- return
|
|
|
+ while True:
|
|
|
+ h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
|
|
|
+ if h_data_count > 0:
|
|
|
+ LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
|
|
|
+ run_dt = run_date.strftime("%Y%m%d")
|
|
|
+ run_hour = run_date.strftime("%H")
|
|
|
+ LOGGER.info(f'run_dt: {run_dt}, run_hour: {run_hour}')
|
|
|
+ build_and_transfer_data(run_dt, run_hour, dry_run=args.dry_run, mode=args.mode)
|
|
|
+ LOGGER.info('数据更新完成')
|
|
|
+ return
|
|
|
+ else:
|
|
|
+ LOGGER.info("上游数据未就绪,等待60s")
|
|
|
+ time.sleep(60)
|
|
|
except Exception as e:
|
|
|
LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
|
|
|
+ return
|
|
|
if CONFIG.ENV_TEXT == '开发环境':
|
|
|
return
|
|
|
send_msg_to_feishu(
|
|
@@ -367,6 +465,4 @@ def main_loop():
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
- LOGGER.info("%s 开始执行" % os.path.basename(__file__))
|
|
|
- LOGGER.info(f"environment: {CONFIG.ENV_TEXT}")
|
|
|
- main_loop()
|
|
|
+ main()
|