|  | @@ -15,6 +15,7 @@ import numpy as np
 | 
											
												
													
														|  |  from log import Log
 |  |  from log import Log
 | 
											
												
													
														|  |  import os
 |  |  import os
 | 
											
												
													
														|  |  from argparse import ArgumentParser
 |  |  from argparse import ArgumentParser
 | 
											
												
													
														|  | 
 |  | +from constants import AutoReplyAccountType
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  CONFIG, _ = set_config()
 |  |  CONFIG, _ = set_config()
 | 
											
												
													
														|  |  LOGGER = Log()
 |  |  LOGGER = Log()
 | 
											
										
											
												
													
														|  | @@ -22,7 +23,6 @@ LOGGER = Log()
 | 
											
												
													
														|  |  BASE_GROUP_NAME = 'stg0909-base'
 |  |  BASE_GROUP_NAME = 'stg0909-base'
 | 
											
												
													
														|  |  EXPLORE1_GROUP_NAME = 'stg0909-explore1'
 |  |  EXPLORE1_GROUP_NAME = 'stg0909-explore1'
 | 
											
												
													
														|  |  EXPLORE2_GROUP_NAME = 'stg0909-explore2'
 |  |  EXPLORE2_GROUP_NAME = 'stg0909-explore2'
 | 
											
												
													
														|  | -#TODO: fetch gh_id from external data source
 |  | 
 | 
											
												
													
														|  |  GH_IDS = ('gh_ac43e43b253b', 'gh_93e00e187787', 'gh_77f36c109fb1',
 |  |  GH_IDS = ('gh_ac43e43b253b', 'gh_93e00e187787', 'gh_77f36c109fb1',
 | 
											
												
													
														|  |            'gh_68e7fdc09fe4', 'gh_b181786a6c8c')
 |  |            'gh_68e7fdc09fe4', 'gh_b181786a6c8c')
 | 
											
												
													
														|  |  CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center"
 |  |  CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center"
 | 
											
										
											
												
													
														|  | @@ -36,6 +36,24 @@ GH_DETAIL = 'gh_detail'
 | 
											
												
													
														|  |  STATS_PERIOD_DAYS = 5
 |  |  STATS_PERIOD_DAYS = 5
 | 
											
												
													
														|  |  SEND_N = 2
 |  |  SEND_N = 2
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +pd.set_option('display.max_rows', None)
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +def get_and_update_gh_ids(run_dt):
 | 
											
												
													
														|  | 
 |  | +    gh = get_odps_df_of_max_partition(ODS_PROJECT, GH_DETAIL, {'dt': run_dt})
 | 
											
												
													
														|  | 
 |  | +    gh = gh.to_pandas()
 | 
											
												
													
														|  | 
 |  | +    gh = gh[gh['type'] == AutoReplyAccountType.SELF_OWNED_GZH.value]
 | 
											
												
													
														|  | 
 |  | +    # default单独处理
 | 
											
												
													
														|  | 
 |  | +    if 'default' not in gh['gh_id'].values:
 | 
											
												
													
														|  | 
 |  | +        new_row = pd.DataFrame({'gh_id': ['default'], 'gh_name': ['默认'], 'type': [2], 'category1': ['泛生活']},
 | 
											
												
													
														|  | 
 |  | +                               index=[0])
 | 
											
												
													
														|  | 
 |  | +        gh = pd.concat([gh, new_row], ignore_index=True)
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +    gh = gh.drop_duplicates(subset=['gh_id'])
 | 
											
												
													
														|  | 
 |  | +    global GH_IDS
 | 
											
												
													
														|  | 
 |  | +    GH_IDS = tuple(gh['gh_id'])
 | 
											
												
													
														|  | 
 |  | +    return gh
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  |  def check_data_partition(project, table, data_dt, data_hr=None):
 |  |  def check_data_partition(project, table, data_dt, data_hr=None):
 | 
											
												
													
														|  |      """检查数据是否准备好"""
 |  |      """检查数据是否准备好"""
 | 
											
												
													
														|  |      try:
 |  |      try:
 | 
											
										
											
												
													
														|  | @@ -87,11 +105,9 @@ def process_reply_stats(project, table, period, run_dt):
 | 
											
												
													
														|  |      return merged_df
 |  |      return merged_df
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | -def rank_for_layer1(run_dt, run_hour, gh):
 |  | 
 | 
											
												
													
														|  | 
 |  | +def rank_for_layer1(run_dt, run_hour, project, table, gh_df):
 | 
											
												
													
														|  |      # TODO: 加审核&退场
 |  |      # TODO: 加审核&退场
 | 
											
												
													
														|  | -    gh = gh[gh['type'] == 1]
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -    df = get_odps_df_of_max_partition(ODS_PROJECT, EXPLORE_POOL_TABLE, {'dt': run_dt})
 |  | 
 | 
											
												
													
														|  | 
 |  | +    df = get_odps_df_of_max_partition(project, table, {'dt': run_dt})
 | 
											
												
													
														|  |      df = df.to_pandas()
 |  |      df = df.to_pandas()
 | 
											
												
													
														|  |      # 确保重跑时可获得一致结果
 |  |      # 确保重跑时可获得一致结果
 | 
											
												
													
														|  |      dt_version = f'{run_dt}{run_hour}'
 |  |      dt_version = f'{run_dt}{run_hour}'
 | 
											
										
											
												
													
														|  | @@ -102,13 +118,10 @@ def rank_for_layer1(run_dt, run_hour, gh):
 | 
											
												
													
														|  |      # 按照 category1 分类后进行加权随机抽样
 |  |      # 按照 category1 分类后进行加权随机抽样
 | 
											
												
													
														|  |      sampled_df = df.groupby('category1').apply(
 |  |      sampled_df = df.groupby('category1').apply(
 | 
											
												
													
														|  |          lambda x: x.sample(n=SEND_N, weights=x['score'], replace=False)).reset_index(drop=True)
 |  |          lambda x: x.sample(n=SEND_N, weights=x['score'], replace=False)).reset_index(drop=True)
 | 
											
												
													
														|  | -    # 添加 'sort' 列
 |  | 
 | 
											
												
													
														|  |      sampled_df['sort'] = sampled_df.groupby('category1')['score'].rank(method='first', ascending=False).astype(int)
 |  |      sampled_df['sort'] = sampled_df.groupby('category1')['score'].rank(method='first', ascending=False).astype(int)
 | 
											
												
													
														|  | -    # 按得分排序
 |  | 
 | 
											
												
													
														|  | -    sampled_df = sampled_df.sort_values(by=['category1', 'score'], ascending=[True, False]).reset_index(drop=True)
 |  | 
 | 
											
												
													
														|  |      sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
 |  |      sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
 | 
											
												
													
														|  |      sampled_df['dt_version'] = dt_version
 |  |      sampled_df['dt_version'] = dt_version
 | 
											
												
													
														|  | -    extend_df = sampled_df.merge(gh, on='category1')
 |  | 
 | 
											
												
													
														|  | 
 |  | +    extend_df = sampled_df.merge(gh_df, on='category1')
 | 
											
												
													
														|  |      result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
 |  |      result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
 | 
											
												
													
														|  |      return result_df
 |  |      return result_df
 | 
											
												
													
														|  |  
 |  |  
 | 
											
										
											
												
													
														|  | @@ -137,11 +150,9 @@ def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
 | 
											
												
													
														|  |      # fallback to base if necessary
 |  |      # fallback to base if necessary
 | 
											
												
													
														|  |      base_strategy_df = get_last_strategy_result(
 |  |      base_strategy_df = get_last_strategy_result(
 | 
											
												
													
														|  |          project, rank_table, dt_version, BASE_GROUP_NAME)
 |  |          project, rank_table, dt_version, BASE_GROUP_NAME)
 | 
											
												
													
														|  | -    gh_ids = list(GH_IDS)
 |  | 
 | 
											
												
													
														|  | -    if 'default' in gh_ids:
 |  | 
 | 
											
												
													
														|  | -        gh_ids.remove('default')
 |  | 
 | 
											
												
													
														|  | -    gh_ids = tuple(gh_ids)
 |  | 
 | 
											
												
													
														|  | -    for gh_id in gh_ids:
 |  | 
 | 
											
												
													
														|  | 
 |  | +    for gh_id in GH_IDS:
 | 
											
												
													
														|  | 
 |  | +        if gh_id == 'default':
 | 
											
												
													
														|  | 
 |  | +            continue
 | 
											
												
													
														|  |          sub_df = df.query(f'gh_id == "{gh_id}"')
 |  |          sub_df = df.query(f'gh_id == "{gh_id}"')
 | 
											
												
													
														|  |          if len(sub_df) < SEND_N:
 |  |          if len(sub_df) < SEND_N:
 | 
											
												
													
														|  |              LOGGER.warning(
 |  |              LOGGER.warning(
 | 
											
										
											
												
													
														|  | @@ -205,32 +216,17 @@ def check_result_data(df):
 | 
											
												
													
														|  |          for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
 |  |          for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
 | 
											
												
													
														|  |              sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
 |  |              sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
 | 
											
												
													
														|  |              if len(sub_df) != SEND_N:
 |  |              if len(sub_df) != SEND_N:
 | 
											
												
													
														|  | -                raise Exception(f"Result not enough for gh_id[{gh_id}]")
 |  | 
 | 
											
												
													
														|  | 
 |  | +                raise Exception(f"Result not enough for gh_id[{gh_id}] group[{key}]")
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
 |  |  def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
 | 
											
												
													
														|  | -    now_date = datetime.today()
 |  | 
 | 
											
												
													
														|  | -    next_date = now_date + timedelta(days=1)
 |  | 
 | 
											
												
													
														|  | -    next_dt = next_date.strftime("%Y%m%d")
 |  | 
 | 
											
												
													
														|  | -    gh = get_odps_df_of_max_partition(ODS_PROJECT, GH_DETAIL, {'dt': next_dt})
 |  | 
 | 
											
												
													
														|  | -    gh = gh.to_pandas()
 |  | 
 | 
											
												
													
														|  | -    gh = gh[gh['type'] == 1]
 |  | 
 | 
											
												
													
														|  | -    if 'default' not in gh['gh_id'].values:
 |  | 
 | 
											
												
													
														|  | -        # 如果没有,添加一行
 |  | 
 | 
											
												
													
														|  | -        new_row = pd.DataFrame({'gh_id': ['default'], 'gh_name': ['默认'], 'type': [1], 'category1': ['泛生活']},
 |  | 
 | 
											
												
													
														|  | -                               index=[0])
 |  | 
 | 
											
												
													
														|  | -        # 使用pd.concat添加新行
 |  | 
 | 
											
												
													
														|  | -        gh = pd.concat([gh, new_row], ignore_index=True)
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -    gh = gh.drop_duplicates(subset=['gh_id'])
 |  | 
 | 
											
												
													
														|  | -    gh_ids = tuple(gh['gh_id'])
 |  | 
 | 
											
												
													
														|  | -    global GH_IDS
 |  | 
 | 
											
												
													
														|  | -    GH_IDS = gh_ids
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  |      dt_version = f'{run_dt}{run_hour}'
 |  |      dt_version = f'{run_dt}{run_hour}'
 | 
											
												
													
														|  |      dry_run = kwargs.get('dry_run', False)
 |  |      dry_run = kwargs.get('dry_run', False)
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | -    layer1_rank = rank_for_layer1(run_dt, run_hour, gh)
 |  | 
 | 
											
												
													
														|  | 
 |  | +    next_dt = (datetime.strptime(run_dt, "%Y%m%d") + timedelta(1)).strftime("%Y%m%d")
 | 
											
												
													
														|  | 
 |  | +    gh_df = get_and_update_gh_ids(next_dt)
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +    layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
 | 
											
												
													
														|  |      layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT,
 |  |      layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT,
 | 
											
												
													
														|  |                                    GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
 |  |                                    GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
 | 
											
												
													
														|  |      base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT,
 |  |      base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT,
 | 
											
										
											
												
													
														|  | @@ -257,17 +253,17 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
 | 
											
												
													
														|  |          return
 |  |          return
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |      # save to ODPS
 |  |      # save to ODPS
 | 
											
												
													
														|  | -    # t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
 |  | 
 | 
											
												
													
														|  | -    # part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
 |  | 
 | 
											
												
													
														|  | -    # part_spec =','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
 |  | 
 | 
											
												
													
														|  | -    # with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
 |  | 
 | 
											
												
													
														|  | -    #     writer.write(list(final_df.itertuples(index=False)))
 |  | 
 | 
											
												
													
														|  | 
 |  | +    t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
 | 
											
												
													
														|  | 
 |  | +    part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
 | 
											
												
													
														|  | 
 |  | +    part_spec =','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
 | 
											
												
													
														|  | 
 |  | +    with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
 | 
											
												
													
														|  | 
 |  | +        writer.write(list(final_df.itertuples(index=False)))
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |      # sync to MySQL
 |  |      # sync to MySQL
 | 
											
												
													
														|  |      data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
 |  |      data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
 | 
											
												
													
														|  | -    # data_columns = list(final_df.columns)
 |  | 
 | 
											
												
													
														|  | -    # mysql = MysqlHelper(CONFIG.MYSQL_CRAWLER_INFO)
 |  | 
 | 
											
												
													
														|  | -    # mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
 |  | 
 | 
											
												
													
														|  | 
 |  | +    data_columns = list(final_df.columns)
 | 
											
												
													
														|  | 
 |  | +    mysql = MysqlHelper(CONFIG.MYSQL_CRAWLER_INFO)
 | 
											
												
													
														|  | 
 |  | +    mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  def main_loop():
 |  |  def main_loop():
 |