alg_growth_gh_reply_video_v1.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. # -*- coding: utf-8 -*-
  2. import pandas as pd
  3. import traceback
  4. import odps
  5. from odps import ODPS
  6. from threading import Timer
  7. from datetime import datetime, timedelta
  8. from db_helper import MysqlHelper
  9. from my_utils import check_table_partition_exits_v2, get_dataframe_from_odps, \
  10. get_odps_df_of_max_partition, get_odps_instance, get_odps_df_of_recent_partitions
  11. from my_utils import request_post, send_msg_to_feishu
  12. from my_config import set_config
  13. import numpy as np
  14. from log import Log
  15. import os
  16. CONFIG, _ = set_config()
  17. LOGGER = Log()
  18. BASE_GROUP_NAME = 'stg0909-base'
  19. EXPLORE1_GROUP_NAME = 'stg0909-explore1'
  20. EXPLORE2_GROUP_NAME = 'stg0909-explore2'
  21. #TODO: fetch gh_id from external data source
  22. GH_IDS = ('gh_ac43e43b253b', 'gh_93e00e187787', 'gh_77f36c109fb1', 'gh_68e7fdc09fe4')
  23. CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center"
  24. ODS_PROJECT = "loghubods"
  25. EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
  26. GH_REPLY_STATS_TABLE = 'alg_growth_gh_reply_video_stats'
  27. ODPS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
  28. RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
  29. STATS_PERIOD_DAYS = 3
  30. def check_data_partition(project, table, data_dt, data_hr=None):
  31. """检查数据是否准备好"""
  32. try:
  33. partition_spec = {'dt': data_dt}
  34. if data_hr:
  35. partition_spec['hour'] = data_hr
  36. part_exist, data_count = check_table_partition_exits_v2(
  37. project, table, partition_spec)
  38. except Exception as e:
  39. data_count = 0
  40. return data_count
  41. def rank_for_layer1(run_dt, run_hour, project, table):
  42. # TODO: 加审核&退场
  43. df = get_odps_df_of_max_partition(project, table, {'dt': run_dt})
  44. df = df.to_pandas()
  45. # 确保重跑时可获得一致结果
  46. dt_version = f'{run_dt}{run_hour}'
  47. np.random.seed(int(dt_version)+1)
  48. # TODO: 修改权重计算策略
  49. sample_weights = df['rov']
  50. sampled_df = df.sample(n=2, weights=sample_weights)
  51. sampled_df['sort'] = range(1, len(sampled_df) + 1)
  52. sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
  53. sampled_df['dt_version'] = dt_version
  54. gh_name_df = pd.DataFrame({'gh_id': GH_IDS + ('default', )})
  55. sampled_df['_tmpkey'] = 1
  56. gh_name_df['_tmpkey'] = 1
  57. extend_df = sampled_df.merge(gh_name_df, on='_tmpkey').drop('_tmpkey', axis=1)
  58. result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id']]
  59. return result_df
  60. def rank_for_layer2(run_dt, run_hour, project, table):
  61. df = get_odps_df_of_recent_partitions(project, table, STATS_PERIOD_DAYS, {'dt': run_dt})
  62. df = df.to_pandas()
  63. df['video_id'] = df['video_id'].astype('int64')
  64. df = df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']]
  65. df = df.groupby(['video_id', 'gh_id']).agg({
  66. 'send_count': 'sum',
  67. 'first_visit_uv': 'sum',
  68. 'day0_return': 'sum'
  69. }).reset_index()
  70. # 确保重跑时可获得一致结果
  71. dt_version = f'{run_dt}{run_hour}'
  72. np.random.seed(int(dt_version)+1)
  73. # TODO: 计算账号间相关性
  74. ## 账号两两组合,取有RoVn数值视频的交集,单个账号内的RoVn(平滑后)组成向量
  75. ## 求向量相关系数或cosine相似度
  76. ## 单个视频的RoVn加权求和
  77. # 当前实现基础版本:只在账号内求二级探索排序分
  78. sampled_dfs = []
  79. # 处理default逻辑(default-explore2)
  80. default_stats_df = df \
  81. .groupby('video_id') \
  82. .agg({'send_count': 'sum',
  83. 'first_visit_uv': 'sum',
  84. 'day0_return': 'sum'}) \
  85. .reset_index()
  86. default_stats_df['gh_id'] = 'default'
  87. default_stats_df['score'] = default_stats_df['day0_return'] / (default_stats_df['first_visit_uv'] + 1000)
  88. sampled_df = default_stats_df.sample(n=2, weights=default_stats_df['score'])
  89. sampled_df['sort'] = range(1, len(sampled_df) + 1)
  90. sampled_dfs.append(sampled_df)
  91. # 基础过滤for账号
  92. df = df.query('day0_return > 100')
  93. # TODO: fetch send_count
  94. # TODO: 个数不足时的兜底逻辑
  95. df['score'] = df['day0_return'] / (df['first_visit_uv'] + 1000)
  96. for gh_id in GH_IDS:
  97. sub_df = df.query(f'gh_id == "{gh_id}"')
  98. sampled_df = sub_df.sample(n=2, weights=sub_df['score'])
  99. sampled_df['sort'] = range(1, len(sampled_df) + 1)
  100. sampled_dfs.append(sampled_df)
  101. if len(sampled_df) != 2:
  102. raise
  103. extend_df = pd.concat(sampled_dfs)
  104. extend_df['strategy_key'] = EXPLORE2_GROUP_NAME
  105. extend_df['dt_version'] = dt_version
  106. result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id']]
  107. return result_df
  108. def rank_for_base(run_dt, run_hour, project, stats_table, rank_table):
  109. #TODO: support to set base manually
  110. dt_version = f'{run_dt}{run_hour}'
  111. # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
  112. strategy_df = get_odps_df_of_max_partition(
  113. project, rank_table, { 'ctime': dt_version }
  114. ).to_pandas()
  115. base_strategy_df = strategy_df.query('strategy_key.str.contains("base")')
  116. base_strategy_df = base_strategy_df[['gh_id', 'video_id', 'strategy_key']].drop_duplicates()
  117. # 获取多天即转统计数据,聚合
  118. stats_df = get_odps_df_of_recent_partitions(
  119. project, stats_table, STATS_PERIOD_DAYS, {'dt': run_dt}
  120. ).to_pandas()
  121. stats_df['video_id'] = stats_df['video_id'].astype('int64')
  122. stats_df = stats_df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']]
  123. stats_df = stats_df.groupby(['video_id', 'gh_id']).agg({
  124. 'send_count': 'sum',
  125. 'first_visit_uv': 'sum',
  126. 'day0_return': 'sum'
  127. }).reset_index()
  128. # 聚合所有数据作为新号base利用数据(default-base)
  129. default_stats_df = stats_df \
  130. .groupby('video_id') \
  131. .agg({'send_count': 'sum',
  132. 'first_visit_uv': 'sum',
  133. 'day0_return': 'sum'}) \
  134. .reset_index()
  135. default_stats_df['gh_id'] = 'default'
  136. # 在账号内排序,决定该账号(包括default)的base利用内容
  137. # 排序过程中,确保当前base策略参与排序,因此先关联再过滤
  138. gh_ids_str = ','.join(f'"{x}"' for x in GH_IDS)
  139. stats_df = stats_df.query(f'gh_id in ({gh_ids_str})')
  140. stats_with_strategy_df = stats_df \
  141. .merge(
  142. base_strategy_df,
  143. on=['gh_id', 'video_id'],
  144. how='left') \
  145. .query('strategy_key.notna() or day0_return > 100')
  146. # 合并default和分账号数据
  147. grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index()
  148. grouped_stats_df['score'] = grouped_stats_df['day0_return'] / (grouped_stats_df['first_visit_uv'] + 1000)
  149. def set_top_n(group, n=2):
  150. group_sorted = group.sort_values(by='score', ascending=False)
  151. top_n = group_sorted.head(n)
  152. top_n['sort'] = range(1, n + 1)
  153. return top_n
  154. ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n)
  155. ranked_df = ranked_df.reset_index(drop=True)
  156. #ranked_df['sort'] = grouped_stats_df.groupby('gh_id')['score'].rank(ascending=False)
  157. ranked_df['strategy_key'] = BASE_GROUP_NAME
  158. ranked_df['dt_version'] = dt_version
  159. ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id']]
  160. return ranked_df
  161. def build_and_transfer_data(run_dt, run_hour, project):
  162. dt_version = f'{run_dt}{run_hour}'
  163. layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE)
  164. layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE)
  165. base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT,
  166. GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
  167. final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
  168. odps_instance = get_odps_instance(project)
  169. odps_ranked_df = odps.DataFrame(final_rank_df)
  170. video_df = get_dataframe_from_odps('videoods', 'wx_video')
  171. video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
  172. video_df = video_df['id', 'title', 'cover_url']
  173. final_df = odps_ranked_df.join(video_df, on=('video_id', 'id'))
  174. final_df = final_df.to_pandas()
  175. final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url']]
  176. # save to ODPS
  177. t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
  178. part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
  179. part_spec =','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
  180. with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
  181. writer.write(list(final_df.itertuples(index=False)))
  182. # sync to MySQL
  183. data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
  184. data_columns = list(final_df.columns)
  185. mysql = MysqlHelper(CONFIG.MYSQL_CRAWLER_INFO)
  186. mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
  187. def main_loop():
  188. try:
  189. now_date = datetime.today()
  190. LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}")
  191. now_hour = now_date.strftime("%H")
  192. last_date = now_date - timedelta(1)
  193. last_dt = last_date.strftime("%Y%m%d")
  194. # 查看当前天级更新的数据是否已准备好
  195. # 当前上游统计表为天级更新,但字段设计为兼容小时级
  196. h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
  197. if h_data_count > 0:
  198. LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
  199. run_dt = now_date.strftime("%Y%m%d")
  200. LOGGER.info(f'run_dt: {run_dt}, run_hour: {now_hour}')
  201. build_and_transfer_data(run_dt, now_hour, ODS_PROJECT)
  202. LOGGER.info('数据更新完成')
  203. else:
  204. LOGGER.info("上游数据未就绪,等待60s")
  205. Timer(60, main_loop).start()
  206. return
  207. except Exception as e:
  208. LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  209. if CONFIG.ENV_TEXT == '开发环境':
  210. return
  211. return # 暂时不发送报警
  212. send_msg_to_feishu(
  213. webhook=CONFIG.FEISHU_ROBOT['server_robot'].get('webhook'),
  214. key_word=CONFIG.FEISHU_ROBOT['server_robot'].get('key_word'),
  215. msg_text=f"rov-offline{CONFIG.ENV_TEXT} - 数据更新失败\n"
  216. f"exception: {e}\n"
  217. f"traceback: {traceback.format_exc()}"
  218. )
  219. if __name__ == '__main__':
  220. LOGGER.info("%s 开始执行" % os.path.basename(__file__))
  221. LOGGER.info(f"environment: {CONFIG.ENV_TEXT}")
  222. main_loop()