alg_growth_3rd_gh_reply_video_v1.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. # -*- coding: utf-8 -*-
  2. import pandas as pd
  3. import traceback
  4. import odps
  5. from odps import ODPS
  6. import json
  7. from threading import Timer
  8. from datetime import datetime, timedelta
  9. from db_helper import MysqlHelper
  10. from my_utils import check_table_partition_exits_v2, get_dataframe_from_odps, \
  11. get_odps_df_of_max_partition, get_odps_instance, get_odps_df_of_recent_partitions
  12. from my_utils import request_post, send_msg_to_feishu
  13. from my_config import set_config
  14. import numpy as np
  15. from log import Log
  16. import os
  17. from argparse import ArgumentParser
  18. from constants import AutoReplyAccountType
  19. CONFIG, _ = set_config()
  20. LOGGER = Log()
  21. BASE_GROUP_NAME = '3rd-party-base'
  22. EXPLORE1_GROUP_NAME = '3rd-party-explore1'
  23. EXPLORE2_GROUP_NAME = '3rd-party-explore2'
  24. # GH_IDS will be updated by get_and_update_gh_ids
  25. GH_IDS = ('default',)
  26. pd.set_option('display.max_rows', None)
  27. TARGET_GH_IDS = (
  28. 'gh_250c51d5ce69',
  29. 'gh_8a29eebc2012',
  30. 'gh_ff16c412ab97',
  31. 'gh_1014734791e0',
  32. 'gh_570967881eae',
  33. 'gh_a7c21403c493',
  34. 'gh_7f062810b4e7',
  35. 'gh_c8060587e6d1',
  36. 'gh_1da8f62f4a0d',
  37. 'gh_56b65b7d4520',
  38. 'gh_eeec7c2e28a5',
  39. 'gh_7c89d5a3e745',
  40. 'gh_ee5b4b07ed8b',
  41. 'gh_0d3c97cc30cc',
  42. 'gh_c783350a9660',
  43. )
  44. CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center"
  45. ODS_PROJECT = "loghubods"
  46. EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
  47. GH_REPLY_STATS_TABLE = 'alg_growth_3rd_gh_reply_video_stats'
  48. ODPS_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data'
  49. GH_DETAIL = 'gh_detail'
  50. RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
  51. STATS_PERIOD_DAYS = 5
  52. SEND_N = 1
  53. def get_and_update_gh_ids(run_dt):
  54. gh = get_odps_df_of_max_partition(ODS_PROJECT, GH_DETAIL, {'dt': run_dt})
  55. gh = gh.to_pandas()
  56. gh = gh[gh['type'] == AutoReplyAccountType.EXTERNAL_GZH.value]
  57. # default单独处理
  58. if 'default' not in gh['gh_id'].values:
  59. new_row = pd.DataFrame({'gh_id': ['default'], 'gh_name': ['默认'], 'type': [2], 'category1': ['泛生活']},
  60. index=[0])
  61. gh = pd.concat([gh, new_row], ignore_index=True)
  62. gh = gh.drop_duplicates(subset=['gh_id'])
  63. global GH_IDS
  64. GH_IDS = tuple(gh['gh_id'])
  65. return gh
  66. def check_data_partition(project, table, data_dt, data_hr=None):
  67. """检查数据是否准备好"""
  68. try:
  69. partition_spec = {'dt': data_dt}
  70. if data_hr:
  71. partition_spec['hour'] = data_hr
  72. part_exist, data_count = check_table_partition_exits_v2(
  73. project, table, partition_spec)
  74. except Exception as e:
  75. data_count = 0
  76. return data_count
  77. def get_last_strategy_result(project, rank_table, dt_version, key):
  78. strategy_df = get_odps_df_of_max_partition(
  79. project, rank_table, {'ctime': dt_version}
  80. ).to_pandas()
  81. sub_df = strategy_df.query(f'strategy_key == "{key}"')
  82. sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates()
  83. return sub_df
  84. def process_reply_stats(project, table, period, run_dt):
  85. # 获取多天即转统计数据用于聚合
  86. df = get_odps_df_of_recent_partitions(project, table, period, {'dt': run_dt})
  87. df = df.to_pandas()
  88. df['video_id'] = df['video_id'].astype('int64')
  89. df = df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']]
  90. # 账号内聚合
  91. df = df.groupby(['video_id', 'gh_id']).agg({
  92. 'send_count': 'sum',
  93. 'first_visit_uv': 'sum',
  94. 'day0_return': 'sum'
  95. }).reset_index()
  96. # 聚合所有数据作为default
  97. default_stats_df = df.groupby('video_id').agg({
  98. 'send_count': 'sum',
  99. 'first_visit_uv': 'sum',
  100. 'day0_return': 'sum'
  101. }).reset_index()
  102. default_stats_df['gh_id'] = 'default'
  103. merged_df = pd.concat([df, default_stats_df]).reset_index(drop=True)
  104. merged_df['score'] = merged_df['day0_return'] / (merged_df['send_count'] + 500)
  105. return merged_df
  106. def rank_for_layer1(run_dt, run_hour, project, table, gh):
  107. # TODO: 加审核&退场
  108. df = get_odps_df_of_max_partition(project, table, {'dt': run_dt})
  109. df = df.to_pandas()
  110. # 确保重跑时可获得一致结果
  111. dt_version = f'{run_dt}{run_hour}'
  112. np.random.seed(int(dt_version) + 1)
  113. # TODO: 修改权重计算策略
  114. df['score'] = df['ros']
  115. # 按照 category1 分类后进行加权随机抽样
  116. sampled_df = df.groupby('category1').apply(
  117. lambda x: x.sample(n=SEND_N, weights=x['score'], replace=False)).reset_index(drop=True)
  118. sampled_df['sort'] = sampled_df.groupby('category1')['score'].rank(method='first', ascending=False).astype(int)
  119. # 按得分排序
  120. sampled_df = sampled_df.sort_values(by=['category1', 'score'], ascending=[True, False]).reset_index(drop=True)
  121. sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
  122. sampled_df['dt_version'] = dt_version
  123. extend_df = sampled_df.merge(gh, on='category1')
  124. result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
  125. return result_df
  126. def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
  127. stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
  128. # 确保重跑时可获得一致结果
  129. dt_version = f'{run_dt}{run_hour}'
  130. np.random.seed(int(dt_version) + 1)
  131. # TODO: 计算账号间相关性
  132. ## 账号两两组合,取有RoVn数值视频的交集,单个账号内的RoVn(平滑后)组成向量
  133. ## 求向量相关系数或cosine相似度
  134. ## 单个视频的RoVn加权求和
  135. # 当前实现基础版本:只在账号内求二级探索排序分
  136. sampled_dfs = []
  137. # 处理default逻辑(default-explore2)
  138. default_stats_df = stats_df.query('gh_id == "default"')
  139. sampled_df = default_stats_df.sample(n=SEND_N, weights=default_stats_df['score'])
  140. sampled_df['sort'] = range(1, len(sampled_df) + 1)
  141. sampled_dfs.append(sampled_df)
  142. # 基础过滤for账号
  143. df = stats_df.query('send_count > 200 and score > 0')
  144. # fallback to base if necessary
  145. base_strategy_df = get_last_strategy_result(
  146. project, rank_table, dt_version, BASE_GROUP_NAME)
  147. for gh_id in GH_IDS:
  148. if gh_id == 'default':
  149. continue
  150. sub_df = df.query(f'gh_id == "{gh_id}"')
  151. if len(sub_df) < SEND_N:
  152. LOGGER.warning(
  153. "gh_id[{}] rows[{}] not enough for layer2, fallback to base"
  154. .format(gh_id, len(sub_df)))
  155. sub_df = base_strategy_df.query(f'gh_id == "{gh_id}"')
  156. sub_df['score'] = sub_df['sort']
  157. sampled_df = sub_df.sample(n=SEND_N, weights=sub_df['score'])
  158. sampled_df['sort'] = range(1, len(sampled_df) + 1)
  159. sampled_dfs.append(sampled_df)
  160. extend_df = pd.concat(sampled_dfs)
  161. extend_df['strategy_key'] = EXPLORE2_GROUP_NAME
  162. extend_df['dt_version'] = dt_version
  163. result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
  164. return result_df
  165. def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
  166. stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
  167. # TODO: support to set base manually
  168. dt_version = f'{run_dt}{run_hour}'
  169. # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
  170. base_strategy_df = get_last_strategy_result(
  171. project, rank_table, dt_version, stg_key)
  172. default_stats_df = stats_df.query('gh_id == "default"')
  173. # 在账号内排序,决定该账号(包括default)的base利用内容
  174. # 排序过程中,确保当前base策略参与排序,因此先关联再过滤
  175. non_default_ids = list(filter(lambda x: x != 'default', GH_IDS))
  176. gh_ids_str = ','.join(f'"{x}"' for x in non_default_ids)
  177. stats_df = stats_df.query(f'gh_id in ({gh_ids_str})')
  178. stats_with_strategy_df = stats_df \
  179. .merge(
  180. base_strategy_df,
  181. on=['gh_id', 'video_id'],
  182. how='outer') \
  183. .query('strategy_key.notna() or (send_count > 500 and score > 0.05)') \
  184. .fillna({'score': 0.0})
  185. # 合并default和分账号数据
  186. grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index()
  187. def set_top_n(group, n=2):
  188. group_sorted = group.sort_values(by='score', ascending=False)
  189. top_n = group_sorted.head(n)
  190. top_n['sort'] = range(1, len(top_n) + 1)
  191. return top_n
  192. ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, SEND_N)
  193. ranked_df = ranked_df.reset_index(drop=True)
  194. ranked_df['strategy_key'] = stg_key
  195. ranked_df['dt_version'] = dt_version
  196. ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
  197. return ranked_df
  198. def check_result_data(df):
  199. for gh_id in GH_IDS:
  200. for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
  201. sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
  202. n_records = len(sub_df)
  203. if n_records != SEND_N:
  204. raise Exception(f"Unexpected record count: {gh_id},{key},{n_records}")
  205. def postprocess_override_by_config(df, dt_version):
  206. config = json.load(open("configs/3rd_gh_reply_video.json"))
  207. override_data = {
  208. 'strategy_key': [],
  209. 'gh_id': [],
  210. 'sort': [],
  211. 'video_id': []
  212. }
  213. for gh_id in config:
  214. gh_config = config[gh_id]
  215. for key in gh_config:
  216. for video_config in gh_config[key]:
  217. # remove current
  218. position = video_config['position']
  219. video_id = video_config['video_id']
  220. df = df.drop(df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}" and sort == {position}').index)
  221. override_data['strategy_key'].append(key)
  222. override_data['gh_id'].append(gh_id)
  223. override_data['sort'].append(position)
  224. override_data['video_id'].append(video_id)
  225. n_records = len(override_data['strategy_key'])
  226. override_data['dt_version'] = [dt_version] * n_records
  227. override_data['score'] = [0.0] * n_records
  228. df_to_append = pd.DataFrame(override_data)
  229. df = pd.concat([df, df_to_append], ignore_index=True)
  230. return df
  231. def rank_for_base_designate(run_dt, run_hour, stg_key):
  232. dt_version = f'{run_dt}{run_hour}'
  233. ranked_df = pd.DataFrame() # 初始化一个空的 DataFrame
  234. for gh_id in GH_IDS:
  235. if gh_id in TARGET_GH_IDS:
  236. temp_df = pd.DataFrame({
  237. 'strategy_key': [stg_key],
  238. 'dt_version': [dt_version],
  239. 'gh_id': [gh_id],
  240. 'sort': [1],
  241. 'video_id': [13586800],
  242. 'score': [0.5]
  243. })
  244. else:
  245. temp_df = pd.DataFrame({
  246. 'strategy_key': [stg_key],
  247. 'dt_version': [dt_version],
  248. 'gh_id': [gh_id],
  249. 'sort': [1],
  250. 'video_id': [20463342],
  251. 'score': [0.5]
  252. })
  253. ranked_df = pd.concat([ranked_df, temp_df], ignore_index=True)
  254. return ranked_df
  255. def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
  256. dt_version = f'{run_dt}{run_hour}'
  257. dry_run = kwargs.get('dry_run', False)
  258. gh_df = get_and_update_gh_ids(run_dt)
  259. layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
  260. layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
  261. base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE,BASE_GROUP_NAME)
  262. # layer2_rank = rank_for_base_designate(run_dt, run_hour, EXPLORE2_GROUP_NAME)
  263. # base_rank = rank_for_base_designate(run_dt, run_hour, BASE_GROUP_NAME)
  264. final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
  265. final_rank_df = postprocess_override_by_config(final_rank_df, dt_version)
  266. check_result_data(final_rank_df)
  267. odps_instance = get_odps_instance(project)
  268. odps_ranked_df = odps.DataFrame(final_rank_df)
  269. video_df = get_dataframe_from_odps('videoods', 'wx_video')
  270. video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
  271. video_df = video_df['id', 'title', 'cover_url']
  272. final_df = odps_ranked_df.join(video_df, on=('video_id', 'id'))
  273. final_df = final_df.to_pandas()
  274. final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']]
  275. # reverse sending order
  276. final_df['sort'] = SEND_N + 1 - final_df['sort']
  277. if dry_run:
  278. print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']]
  279. .sort_values(by=['strategy_key', 'gh_id', 'sort']))
  280. return
  281. # save to ODPS
  282. t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
  283. part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
  284. part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
  285. with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
  286. writer.write(list(final_df.itertuples(index=False)))
  287. # sync to MySQL
  288. data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
  289. data_columns = list(final_df.columns)
  290. mysql = MysqlHelper(CONFIG.MYSQL_CRAWLER_INFO)
  291. mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
  292. def main_loop():
  293. argparser = ArgumentParser()
  294. argparser.add_argument('-n', '--dry-run', action='store_true')
  295. argparser.add_argument('--run-at',help='assume to run at date and hour, yyyyMMddHH')
  296. args = argparser.parse_args()
  297. run_date = datetime.today()
  298. if args.run_at:
  299. run_date = datetime.strptime(args.run_at, "%Y%m%d%H")
  300. LOGGER.info(f"Assume to run at {run_date.strftime('%Y-%m-%d %H:00')}")
  301. try:
  302. now_date = datetime.today()
  303. LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}")
  304. last_date = run_date - timedelta(1)
  305. last_dt = last_date.strftime("%Y%m%d")
  306. # 查看当前天级更新的数据是否已准备好
  307. # 当前上游统计表为天级更新,但字段设计为兼容小时级
  308. h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
  309. if h_data_count > 0:
  310. LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
  311. run_dt = run_date.strftime("%Y%m%d")
  312. run_hour = run_date.strftime("%H")
  313. LOGGER.info(f'run_dt: {run_dt}, run_hour: {run_hour}')
  314. build_and_transfer_data(run_dt, run_hour, ODS_PROJECT,
  315. dry_run=args.dry_run)
  316. LOGGER.info('数据更新完成')
  317. else:
  318. LOGGER.info("上游数据未就绪,等待60s")
  319. Timer(60, main_loop).start()
  320. return
  321. except Exception as e:
  322. LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  323. if CONFIG.ENV_TEXT == '开发环境':
  324. return
  325. send_msg_to_feishu(
  326. webhook=CONFIG.FEISHU_ROBOT['growth_task_robot'].get('webhook'),
  327. key_word=CONFIG.FEISHU_ROBOT['growth_task_robot'].get('key_word'),
  328. msg_text=f"rov-offline{CONFIG.ENV_TEXT} - 数据更新失败\n"
  329. f"exception: {e}\n"
  330. f"traceback: {traceback.format_exc()}"
  331. )
  332. if __name__ == '__main__':
  333. LOGGER.info("%s 开始执行" % os.path.basename(__file__))
  334. LOGGER.info(f"environment: {CONFIG.ENV_TEXT}")
  335. main_loop()