alg_growth_we_com_reply_video_v1.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. # -*- coding: utf-8 -*-
  2. import pandas as pd
  3. import traceback
  4. import odps
  5. from odps import ODPS
  6. from threading import Timer
  7. from datetime import datetime, timedelta
  8. from db_helper import MysqlHelper
  9. from my_utils import check_table_partition_exits_v2, get_dataframe_from_odps, \
  10. get_odps_df_of_max_partition, get_odps_instance, get_odps_df_of_recent_partitions
  11. from my_utils import request_post, send_msg_to_feishu
  12. from my_config import set_config
  13. import numpy as np
  14. from log import Log
  15. import os
  16. from argparse import ArgumentParser
  17. from constants import AutoReplyAccountType
  18. from alg_growth_common import check_unsafe_video, filter_unsafe_video
  19. CONFIG, _ = set_config()
  20. LOGGER = Log()
  21. BASE_GROUP_NAME = 'we-com-base'
  22. EXPLORE1_GROUP_NAME = 'we-com-explore1'
  23. EXPLORE2_GROUP_NAME = 'we-com-explore2'
  24. # TODO: fetch gh_id from external data source
  25. GH_IDS = ('SongYi', 'XinYi', '17512006748', '18810931977', '15146364945',
  26. 'lky', '19270839710', 'ManShiGuang', 'ShengHuoLeQu', '16524700048', '16584214894')
  27. CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center"
  28. ODS_PROJECT = "loghubods"
  29. EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
  30. GH_REPLY_STATS_TABLE = 'alg_growth_3rd_gh_reply_video_stats'
  31. # ODPS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
  32. ODPS_WE_COM_RANK_RESULT_TABLE = 'alg_we_com_autoreply_video_rank_data'
  33. RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
  34. STATS_PERIOD_DAYS = 5
  35. SEND_N = 2
  36. def check_data_partition(project, table, data_dt, data_hr=None):
  37. """检查数据是否准备好"""
  38. try:
  39. partition_spec = {'dt': data_dt}
  40. if data_hr:
  41. partition_spec['hour'] = data_hr
  42. part_exist, data_count = check_table_partition_exits_v2(
  43. project, table, partition_spec)
  44. except Exception as e:
  45. data_count = 0
  46. return data_count
  47. def get_last_strategy_result(project, rank_table, dt_version, key):
  48. strategy_df = get_odps_df_of_max_partition(
  49. project, rank_table, {'ctime': dt_version}
  50. ).to_pandas()
  51. sub_df = strategy_df.query(f'strategy_key == "{key}"')
  52. sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates()
  53. return sub_df
  54. def process_reply_stats(project, table, period, run_dt):
  55. # 获取多天即转统计数据用于聚合
  56. df = get_odps_df_of_recent_partitions(project, table, period, {'dt': run_dt})
  57. df = df.to_pandas()
  58. df['video_id'] = df['video_id'].astype('int64')
  59. df = df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']]
  60. # 账号内聚合
  61. df = df.groupby(['video_id', 'gh_id']).agg({
  62. 'send_count': 'sum',
  63. 'first_visit_uv': 'sum',
  64. 'day0_return': 'sum'
  65. }).reset_index()
  66. # 聚合所有数据作为default
  67. default_stats_df = df.groupby('video_id').agg({
  68. 'send_count': 'sum',
  69. 'first_visit_uv': 'sum',
  70. 'day0_return': 'sum'
  71. }).reset_index()
  72. default_stats_df['gh_id'] = 'default'
  73. merged_df = pd.concat([df, default_stats_df]).reset_index(drop=True)
  74. merged_df['score'] = merged_df['day0_return'] / (merged_df['send_count'] + 500)
  75. return merged_df
  76. def rank_for_layer1(run_dt, run_hour, project, table):
  77. # TODO: 加审核&退场
  78. df = get_odps_df_of_max_partition(project, table, {'dt': run_dt})
  79. df = df.to_pandas()
  80. # 确保重跑时可获得一致结果
  81. dt_version = f'{run_dt}{run_hour}'
  82. np.random.seed(int(dt_version) + 1)
  83. # TODO: 修改权重计算策略
  84. df['score'] = df['ros']
  85. sampled_df = df.sample(n=SEND_N, weights=df['score'])
  86. sampled_df['sort'] = range(1, len(sampled_df) + 1)
  87. sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
  88. sampled_df['dt_version'] = dt_version
  89. gh_name_df = pd.DataFrame({'gh_id': GH_IDS + ('default',)})
  90. sampled_df['_tmpkey'] = 1
  91. gh_name_df['_tmpkey'] = 1
  92. extend_df = sampled_df.merge(gh_name_df, on='_tmpkey').drop('_tmpkey', axis=1)
  93. result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
  94. return result_df
  95. def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
  96. stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
  97. # 确保重跑时可获得一致结果
  98. dt_version = f'{run_dt}{run_hour}'
  99. np.random.seed(int(dt_version) + 1)
  100. # TODO: 计算账号间相关性
  101. ## 账号两两组合,取有RoVn数值视频的交集,单个账号内的RoVn(平滑后)组成向量
  102. ## 求向量相关系数或cosine相似度
  103. ## 单个视频的RoVn加权求和
  104. # 当前实现基础版本:只在账号内求二级探索排序分
  105. sampled_dfs = []
  106. # 处理default逻辑(default-explore2)
  107. default_stats_df = stats_df.query('gh_id == "default"')
  108. sampled_df = default_stats_df.sample(n=SEND_N, weights=default_stats_df['score'])
  109. sampled_df['sort'] = range(1, len(sampled_df) + 1)
  110. sampled_dfs.append(sampled_df)
  111. # 基础过滤for账号
  112. df = stats_df.query('day0_return > 100')
  113. # fallback to base if necessary
  114. base_strategy_df = get_last_strategy_result(
  115. project, rank_table, dt_version, BASE_GROUP_NAME)
  116. for gh_id in GH_IDS:
  117. sub_df = df.query(f'gh_id == "{gh_id}"')
  118. if len(sub_df) < SEND_N:
  119. LOGGER.warning(
  120. "gh_id[{}] rows[{}] not enough for layer2, fallback to base"
  121. .format(gh_id, len(sub_df)))
  122. sub_df = base_strategy_df.query(f'gh_id == "{gh_id}"')
  123. sub_df['score'] = sub_df['sort']
  124. sampled_df = sub_df.sample(n=SEND_N, weights=sub_df['score'])
  125. sampled_df['sort'] = range(1, len(sampled_df) + 1)
  126. sampled_dfs.append(sampled_df)
  127. extend_df = pd.concat(sampled_dfs)
  128. extend_df['strategy_key'] = EXPLORE2_GROUP_NAME
  129. extend_df['dt_version'] = dt_version
  130. result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
  131. return result_df
  132. def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
  133. stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
  134. # TODO: support to set base manually
  135. dt_version = f'{run_dt}{run_hour}'
  136. # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
  137. base_strategy_df = get_last_strategy_result(
  138. project, rank_table, dt_version, stg_key)
  139. default_stats_df = stats_df.query('gh_id == "default"')
  140. # 在账号内排序,决定该账号(包括default)的base利用内容
  141. # 排序过程中,确保当前base策略参与排序,因此先关联再过滤
  142. gh_ids_str = ','.join(f'"{x}"' for x in GH_IDS)
  143. stats_df = stats_df.query(f'gh_id in ({gh_ids_str})')
  144. stats_with_strategy_df = stats_df \
  145. .merge(
  146. base_strategy_df,
  147. on=['gh_id', 'video_id'],
  148. how='left') \
  149. .query('strategy_key.notna() or score > 0.1')
  150. # 合并default和分账号数据
  151. grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index()
  152. def set_top_n(group, n=2):
  153. group_sorted = group.sort_values(by='score', ascending=False)
  154. top_n = group_sorted.head(n)
  155. top_n['sort'] = range(1, len(top_n) + 1)
  156. return top_n
  157. ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, SEND_N)
  158. ranked_df = ranked_df.reset_index(drop=True)
  159. # ranked_df['sort'] = grouped_stats_df.groupby('gh_id')['score'].rank(ascending=False)
  160. ranked_df['strategy_key'] = stg_key
  161. ranked_df['dt_version'] = dt_version
  162. ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
  163. return ranked_df
  164. def check_result_data(df):
  165. check_unsafe_video(df, False)
  166. for gh_id in GH_IDS + ('default',):
  167. for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
  168. sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
  169. if len(sub_df) != SEND_N:
  170. raise Exception(f"Result not enough for gh_id[{gh_id}]")
  171. def rank_for_base_designate(run_dt, run_hour, stg_key):
  172. dt_version = f'{run_dt}{run_hour}'
  173. ranked_df = pd.DataFrame() # 初始化一个空的 DataFrame
  174. # 定义每个 gh_id 的视频信息
  175. gh_id_data = {
  176. 'SongYi': [
  177. (21613310, 0.6, 1),
  178. (21816708, 0.5, 2)
  179. ],
  180. 'XinYi': [
  181. (21176894, 0.6, 1),
  182. (21072591, 0.5, 2)
  183. ],
  184. '17512006748': [
  185. (21176894, 0.6, 1),
  186. (21816708, 0.5, 2)
  187. ],
  188. '18810931977': [
  189. (21176894, 0.6, 1),
  190. (21072591, 0.5, 2)
  191. ],
  192. '15146364945': [
  193. (23970780, 0.6, 1),
  194. (36695948, 0.5, 2)
  195. ],
  196. 'lky': [
  197. (21176894, 0.6, 1),
  198. (21072591, 0.5, 2)
  199. ],
  200. '19270839710': [
  201. (12794884, 0.6, 1),
  202. (13437896, 0.5, 2)
  203. ],
  204. 'ManShiGuang': [
  205. (21613310, 0.6, 1),
  206. (21816708, 0.5, 2)
  207. ],
  208. 'ShengHuoLeQu': [
  209. (21613310, 0.6, 1),
  210. (21816708, 0.5, 2)
  211. ],
  212. '16524700048': [
  213. (21613310, 0.6, 1),
  214. (21816708, 0.5, 2)
  215. ],
  216. '16584214894':[
  217. (23970780, 0.6, 1),
  218. (36695948, 0.5, 2)
  219. ]
  220. }
  221. # 默认视频信息
  222. default_data = [
  223. (12794884, 0.6, 1),
  224. (13788955, 0.5, 2)
  225. ]
  226. # 遍历 gh_id 列表
  227. for gh_id in GH_IDS + ('default',):
  228. if gh_id in gh_id_data:
  229. data_to_use = gh_id_data[gh_id]
  230. else:
  231. data_to_use = default_data
  232. # 创建 DataFrame 并拼接
  233. for video_id, score, sort in data_to_use:
  234. temp_df = pd.DataFrame({
  235. 'strategy_key': [stg_key],
  236. 'dt_version': [dt_version],
  237. 'gh_id': [gh_id],
  238. 'sort': [sort],
  239. 'video_id': [video_id],
  240. 'score': [score]
  241. })
  242. ranked_df = pd.concat([ranked_df, temp_df], ignore_index=True)
  243. return ranked_df
  244. def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
  245. dt_version = f'{run_dt}{run_hour}'
  246. dry_run = kwargs.get('dry_run', False)
  247. # layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE)
  248. # layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_WE_COM_RANK_RESULT_TABLE)
  249. # base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_WE_COM_RANK_RESULT_TABLE,BASE_GROUP_NAME)
  250. layer1_rank = rank_for_base_designate(run_dt, run_hour, EXPLORE1_GROUP_NAME)
  251. layer2_rank = rank_for_base_designate(run_dt, run_hour, EXPLORE2_GROUP_NAME)
  252. base_rank = rank_for_base_designate(run_dt, run_hour, BASE_GROUP_NAME)
  253. final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
  254. check_result_data(final_rank_df)
  255. odps_instance = get_odps_instance(project)
  256. odps_ranked_df = odps.DataFrame(final_rank_df)
  257. video_df = get_dataframe_from_odps('videoods', 'wx_video')
  258. video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
  259. video_df = video_df['id', 'title', 'cover_url']
  260. final_df = odps_ranked_df.join(video_df, on=('video_id', 'id'))
  261. final_df = final_df.to_pandas()
  262. final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']]
  263. # reverse sending order
  264. final_df['sort'] = SEND_N + 1 - final_df['sort']
  265. if dry_run:
  266. print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']])
  267. return
  268. # save to ODPS
  269. t = odps_instance.get_table(ODPS_WE_COM_RANK_RESULT_TABLE)
  270. part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
  271. part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
  272. with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
  273. writer.write(list(final_df.itertuples(index=False)))
  274. # sync to MySQL
  275. data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
  276. data_columns = list(final_df.columns)
  277. mysql = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
  278. mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
  279. def main_loop():
  280. argparser = ArgumentParser()
  281. argparser.add_argument('-n', '--dry-run', action='store_true')
  282. args = argparser.parse_args()
  283. try:
  284. now_date = datetime.today()
  285. LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}")
  286. now_hour = now_date.strftime("%H")
  287. last_date = now_date - timedelta(1)
  288. last_dt = last_date.strftime("%Y%m%d")
  289. # 查看当前天级更新的数据是否已准备好
  290. # 当前上游统计表为天级更新,但字段设计为兼容小时级
  291. h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
  292. if h_data_count > 0:
  293. LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
  294. run_dt = now_date.strftime("%Y%m%d")
  295. LOGGER.info(f'run_dt: {run_dt}, run_hour: {now_hour}')
  296. build_and_transfer_data(run_dt, now_hour, ODS_PROJECT,
  297. dry_run=args.dry_run)
  298. LOGGER.info('数据更新完成')
  299. else:
  300. LOGGER.info("上游数据未就绪,等待60s")
  301. Timer(60, main_loop).start()
  302. return
  303. except Exception as e:
  304. LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  305. if CONFIG.ENV_TEXT == '开发环境' or args.dry_run:
  306. return
  307. send_msg_to_feishu(
  308. webhook=CONFIG.FEISHU_ROBOT['growth_task_robot'].get('webhook'),
  309. key_word=CONFIG.FEISHU_ROBOT['growth_task_robot'].get('key_word'),
  310. msg_text=f"rov-offline{CONFIG.ENV_TEXT} - 数据更新失败\n"
  311. f"exception: {e}\n"
  312. f"traceback: {traceback.format_exc()}"
  313. )
  314. if __name__ == '__main__':
  315. LOGGER.info("%s 开始执行" % os.path.basename(__file__))
  316. LOGGER.info(f"environment: {CONFIG.ENV_TEXT}")
  317. main_loop()