alg_recsys_recall_24h_noregion.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. # -*- coding: utf-8 -*-
  2. import pandas as pd
  3. import traceback
  4. from functools import reduce
  5. from odps import ODPS
  6. from threading import Timer
  7. from datetime import datetime, timedelta
  8. from get_data import get_data_from_odps
  9. from db_helper import RedisHelper
  10. from my_utils import filter_video_status, check_table_partition_exits, filter_video_status_app, \
  11. request_post, send_msg_to_feishu
  12. from my_config import set_config
  13. from log import Log
  14. config_, _ = set_config()
  15. log_ = Log()
  16. RULE_PARAMS = {
  17. 'rule_params': {
  18. 'rule66': {'cal_score_func': 2, 'return_count': 100, 'platform_return_rate': 0.001, 'view_type': 'preview'},
  19. },
  20. 'data_params': config_.DATA_PARAMS,
  21. 'params_list': [
  22. {'data': 'data66', 'rule': 'rule66'},
  23. ]
  24. }
  25. features = [
  26. 'apptype',
  27. 'videoid',
  28. 'preview人数', # 过去24h预曝光人数
  29. 'view人数', # 过去24h曝光人数
  30. 'play人数', # 过去24h播放人数
  31. 'share人数', # 过去24h分享人数
  32. '回流人数', # 过去24h分享,过去24h回流人数
  33. 'preview次数', # 过去24h预曝光次数
  34. 'view次数', # 过去24h曝光次数
  35. 'play次数', # 过去24h播放次数
  36. 'share次数', # 过去24h分享次数
  37. 'platform_return',
  38. 'platform_preview',
  39. 'platform_preview_total',
  40. 'platform_show',
  41. 'platform_show_total',
  42. 'platform_view',
  43. 'platform_view_total',
  44. ]
  45. def get_rov_redis_key(now_date):
  46. # 获取rov模型结果存放key
  47. redis_helper = RedisHelper()
  48. now_dt = datetime.strftime(now_date, '%Y%m%d')
  49. key_name = f'{config_.RECALL_KEY_NAME_PREFIX}{now_dt}'
  50. if not redis_helper.key_exists(key_name=key_name):
  51. pre_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
  52. key_name = f'{config_.RECALL_KEY_NAME_PREFIX}{pre_dt}'
  53. return key_name
  54. def h_data_check(project, table, now_date, now_h):
  55. """检查数据是否准备好"""
  56. odps = ODPS(
  57. access_id=config_.ODPS_CONFIG['ACCESSID'],
  58. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  59. project=project,
  60. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  61. connect_timeout=3000,
  62. read_timeout=500000,
  63. pool_maxsize=1000,
  64. pool_connections=1000
  65. )
  66. try:
  67. # 23点开始到8点之前(不含8点),全部用22点生成那个列表
  68. if now_h == 23:
  69. dt = datetime.strftime(now_date - timedelta(hours=1), '%Y%m%d%H')
  70. elif now_h < 8:
  71. dt = f"{datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')}22"
  72. else:
  73. dt = datetime.strftime(now_date, '%Y%m%d%H')
  74. check_res = check_table_partition_exits(date=dt, project=project, table=table)
  75. if check_res:
  76. sql = f'select * from {project}.{table} where dt = {dt}'
  77. with odps.execute_sql(sql=sql).open_reader() as reader:
  78. data_count = reader.count
  79. else:
  80. data_count = 0
  81. except Exception as e:
  82. data_count = 0
  83. return data_count
  84. def get_feature_data(now_date, now_h, project, table):
  85. """获取特征数据"""
  86. # 23点开始到8点之前(不含8点),全部用22点生成那个列表
  87. if now_h == 23:
  88. dt = datetime.strftime(now_date - timedelta(hours=1), '%Y%m%d%H')
  89. elif now_h < 8:
  90. dt = f"{datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')}22"
  91. else:
  92. dt = datetime.strftime(now_date, '%Y%m%d%H')
  93. log_.info({'feature_dt': dt})
  94. # dt = '20220425'
  95. records = get_data_from_odps(date=dt, project=project, table=table)
  96. feature_data = []
  97. for record in records:
  98. item = {}
  99. for feature_name in features:
  100. item[feature_name] = record[feature_name]
  101. feature_data.append(item)
  102. feature_df = pd.DataFrame(feature_data)
  103. return feature_df
  104. def cal_score1(df):
  105. # score1计算公式: score = 回流人数/(view人数+10000)
  106. df = df.fillna(0)
  107. df['score'] = df['回流人数'] / (df['view人数'] + 1000)
  108. df = df.sort_values(by=['score'], ascending=False)
  109. return df
  110. def cal_score2(df, param):
  111. # score2计算公式: score = share次数/(view+1000)+0.01*return/(share次数+100)
  112. df = df.fillna(0)
  113. if param.get('view_type', None) == 'video-show':
  114. df['share_rate'] = df['share次数'] / (df['platform_show'] + 1000)
  115. elif param.get('view_type', None) == 'preview':
  116. df['share_rate'] = df['share次数'] / (df['preview人数'] + 1000)
  117. else:
  118. df['share_rate'] = df['share次数'] / (df['view人数'] + 1000)
  119. df['back_rate'] = df['回流人数'] / (df['share次数'] + 100)
  120. df['score'] = df['share_rate'] + 0.01 * df['back_rate']
  121. df['platform_return_rate'] = df['platform_return'] / df['回流人数']
  122. df = df.sort_values(by=['score'], ascending=False)
  123. return df
  124. def cal_score(df, param):
  125. # score计算公式: score1 = share次数/(view+1000)+0.01*return/(share次数+100)
  126. # ctr = lastonehour_play/(lastonehour_preview+1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
  127. # score = 0.3 * score1 + 0.7 * K2
  128. df = df.fillna(0)
  129. if param.get('view_type', None) == 'video-show':
  130. df['share_rate'] = df['share次数'] / (df['platform_show'] + 1000)
  131. df['ctr'] = df['play人数'] / (df['platform_show'] + 1000)
  132. elif param.get('view_type', None) == 'preview':
  133. df['share_rate'] = df['share次数'] / (df['preview人数'] + 1000)
  134. df['ctr'] = df['play人数'] / (df['preview人数'] + 1000)
  135. else:
  136. df['share_rate'] = df['share次数'] / (df['platform_show'] + 1000)
  137. df['ctr'] = df['play人数'] / (df['platform_show'] + 1000)
  138. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  139. df['back_rate'] = df['回流人数'] / (df['share次数'] + 100)
  140. df['platform_return_rate'] = df['platform_return'] / df['回流人数']
  141. df['score1'] = df['share_rate'] + 0.01 * df['back_rate']
  142. click_score_rate = param.get('click_score_rate', None)
  143. back_score_rate = param.get('click_score_rate', None)
  144. if click_score_rate is not None:
  145. df['score'] = (1 - click_score_rate) * df['score1'] + click_score_rate * df['K2']
  146. elif back_score_rate is not None:
  147. df['score'] = (1 - back_score_rate) * df['score1'] + back_score_rate * df['back_rate']
  148. else:
  149. df['score'] = df['score1']
  150. df = df.sort_values(by=['score'], ascending=False)
  151. return df
  152. def video_rank_h(df, now_date, now_h, rule_key, param, data_key, notify_backend):
  153. """
  154. 获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并
  155. :param df:
  156. :param now_date:
  157. :param now_h:
  158. :param rule_key: 天级规则数据进入条件
  159. :param param: 天级规则数据进入条件参数
  160. :param data_key: 使用数据标识
  161. :param notify_backend: 是否同步给后端标识
  162. :return:
  163. """
  164. redis_helper = RedisHelper()
  165. log_.info(f"一共有多少个视频 = {len(df)}")
  166. # videoid重复时,保留分值高
  167. df = df.sort_values(by=['score'], ascending=False)
  168. df = df.drop_duplicates(subset=['videoid'], keep='first')
  169. df['videoid'] = df['videoid'].astype(int)
  170. # 获取符合进入召回源条件的视频
  171. return_count = param.get('return_count')
  172. if return_count:
  173. day_recall_df = df[df['回流人数'] > return_count]
  174. else:
  175. day_recall_df = df
  176. log_.info(f"回流量-过滤后,一共有多少个视频 = {len(day_recall_df)}")
  177. platform_return_rate = param.get('platform_return_rate', 0)
  178. day_recall_df = day_recall_df[day_recall_df['platform_return_rate'] > platform_return_rate]
  179. day_recall_videos = day_recall_df['videoid'].to_list()
  180. log_.info(f"回流率-过滤后,一共有多少个视频 = {len(day_recall_videos)}")
  181. # 视频状态过滤
  182. if data_key in ['data7', ]:
  183. filtered_videos = filter_video_status_app(day_recall_videos)
  184. else:
  185. filtered_videos = filter_video_status(day_recall_videos)
  186. log_.info(f"视频状态-过滤后,一共有多少个视频 = {len(filtered_videos)}")
  187. # 写入对应的redis
  188. now_dt = datetime.strftime(now_date, '%Y%m%d')
  189. day_video_ids = []
  190. day_recall_result = {}
  191. for video_id in filtered_videos:
  192. score = day_recall_df[day_recall_df['videoid'] == video_id]['score']
  193. day_recall_result[int(video_id)] = float(score)
  194. day_video_ids.append(int(video_id))
  195. # recall:item:score:24h:
  196. h_24h_recall_key_name = \
  197. f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{data_key}:{rule_key}:{now_dt}:{now_h}"
  198. log_.info("打印非地域24小时redis key:{}".format(h_24h_recall_key_name))
  199. if len(day_recall_result) > 0:
  200. log_.info(f"开始写入头部数据:count = {len(day_recall_result)}, key = {h_24h_recall_key_name}")
  201. redis_helper.add_data_with_zset(key_name=h_24h_recall_key_name, data=day_recall_result, expire_time=2 * 3600)
  202. else:
  203. log_.info(f"无数据,不写入。")
  204. # ---------------处理剩余结果---------------
  205. log_.info('开始处理剩余结果other')
  206. all_videos = df['videoid'].to_list()
  207. if data_key in ['data7', ]:
  208. all_filtered_videos = filter_video_status_app(all_videos)
  209. else:
  210. all_filtered_videos = filter_video_status(all_videos)
  211. other_videos = [video for video in all_filtered_videos if video not in day_video_ids]
  212. log_.info(f'过滤后剩余视频数量 count = {len(other_videos)}')
  213. # 写入对应的redis
  214. other_24h_recall_result = {}
  215. json_data = []
  216. for video_id in other_videos:
  217. score = df[df['videoid'] == video_id]['score']
  218. other_24h_recall_result[int(video_id)] = float(score)
  219. json_data.append({'videoId': video_id, 'rovScore': float(score)})
  220. # recall:item:score:24h:other:
  221. other_h_24h_recall_key_name = \
  222. f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER}{data_key}:{rule_key}:{now_dt}:{now_h}"
  223. log_.info("打印非地域24小时(剩余)redis key:{}".format(other_h_24h_recall_key_name))
  224. if len(other_24h_recall_result) > 0:
  225. log_.info(f"开始写入尾部数据:count = {len(other_24h_recall_result)}, key = {other_h_24h_recall_key_name}")
  226. redis_helper.add_data_with_zset(key_name=other_h_24h_recall_key_name, data=other_24h_recall_result,
  227. expire_time=2 * 3600)
  228. else:
  229. log_.info(f"无尾部数据,不写入。")
  230. # 通知后端更新兜底视频数据
  231. if notify_backend is True:
  232. log_.info('json_data count = {}'.format(len(json_data[:5000])))
  233. result = request_post(request_url=config_.NOTIFY_BACKEND_updateFallBackVideoList_URL,
  234. request_data={'videos': json_data[:5000]})
  235. if result is None:
  236. log_.error('notify backend updateFallBackVideoList fail!')
  237. elif result['code'] == 0:
  238. log_.info('notify backend updateFallBackVideoList success!')
  239. else:
  240. log_.error('notify backend updateFallBackVideoList fail!')
  241. # 去重更新rov模型结果,并另存为redis中
  242. # initial_data_dup = {}
  243. # for video_id, score in initial_data:
  244. # if int(video_id) not in day_video_ids:
  245. # initial_data_dup[int(video_id)] = score
  246. # log_.info(f"initial data dup count = {len(initial_data_dup)}")
  247. #
  248. # initial_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP_24H}{rule_key}.{now_dt}.{now_h}"
  249. # if len(initial_data_dup) > 0:
  250. # redis_helper.add_data_with_zset(key_name=initial_key_name, data=initial_data_dup, expire_time=23 * 3600)
  251. def merge_df(df_left, df_right):
  252. """
  253. df按照videoid 合并,对应特征求和
  254. :param df_left:
  255. :param df_right:
  256. :return:
  257. """
  258. df_merged = pd.merge(df_left, df_right, on=['videoid'], how='outer', suffixes=['_x', '_y'])
  259. df_merged.fillna(0, inplace=True)
  260. feature_list = ['videoid']
  261. for feature in features:
  262. if feature in ['apptype', 'videoid']:
  263. continue
  264. df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
  265. feature_list.append(feature)
  266. return df_merged[feature_list]
  267. def merge_df_with_score(df_left, df_right):
  268. """
  269. df 按照videoid合并,平台回流人数、回流人数、分数 分别求和
  270. :param df_left:
  271. :param df_right:
  272. :return:
  273. """
  274. df_merged = pd.merge(df_left, df_right, on=['videoid'], how='outer', suffixes=['_x', '_y'])
  275. df_merged.fillna(0, inplace=True)
  276. feature_list = ['videoid', '回流人数', 'platform_return', 'score']
  277. for feature in feature_list[1:]:
  278. df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
  279. return df_merged[feature_list]
  280. def rank_by_h(now_date, now_h, rule_params, project, table):
  281. # 获取特征数据
  282. feature_df = get_feature_data(now_date=now_date, now_h=now_h, project=project, table=table)
  283. feature_df['apptype'] = feature_df['apptype'].astype(int)
  284. # rank
  285. data_params_item = rule_params.get('data_params')
  286. rule_params_item = rule_params.get('rule_params')
  287. for param in rule_params.get('params_list'):
  288. score_df_list = []
  289. notify_backend = param.get('notify_backend', False)
  290. data_key = param.get('data')
  291. data_param = data_params_item.get(data_key)
  292. rule_key = param.get('rule')
  293. rule_param = rule_params_item.get(rule_key)
  294. merge_func = rule_param.get('merge_func', 1)
  295. log_.info("数据采用:{},统计采用{}.".format(data_key, rule_key))
  296. log_.info("具体的规则是:{}.".format(rule_param))
  297. if merge_func == 2:
  298. for apptype, weight in data_param.items():
  299. df = feature_df[feature_df['apptype'] == apptype]
  300. # 计算score
  301. score_df = cal_score(df=df, param=rule_param)
  302. score_df['score'] = score_df['score'] * weight
  303. score_df_list.append(score_df)
  304. # 分数合并
  305. df_merged = reduce(merge_df_with_score, score_df_list)
  306. # 更新平台回流比
  307. df_merged['platform_return_rate'] = df_merged['platform_return'] / df_merged['回流人数']
  308. video_rank_h(df=df_merged, now_date=now_date, now_h=now_h,
  309. rule_key=rule_key, param=rule_param, data_key=data_key,
  310. notify_backend=notify_backend)
  311. else:
  312. df_list = [feature_df[feature_df['apptype'] == apptype] for apptype, _ in data_param.items()]
  313. df_merged = reduce(merge_df, df_list)
  314. score_df = cal_score(df=df_merged, param=rule_param)
  315. video_rank_h(df=score_df, now_date=now_date, now_h=now_h,
  316. rule_key=rule_key, param=rule_param, data_key=data_key,
  317. notify_backend=notify_backend)
  318. # # to-csv
  319. # score_filename = f"score_by24h_{key}_{datetime.strftime(now_date, '%Y%m%d%H')}.csv"
  320. # score_df.to_csv(f'./data/{score_filename}')
  321. # # to-logs
  322. # log_.info({"date": datetime.strftime(now_date, '%Y%m%d%H'),
  323. # "redis_key_prefix": config_.RECALL_KEY_NAME_PREFIX_BY_24H,
  324. # "rule_key": key,
  325. # # "score_df": score_df[['videoid', 'score']]
  326. # })
  327. def h_rank_bottom(now_date, now_h, rule_params):
  328. """未按时更新数据,用模型召回数据作为当前的数据"""
  329. redis_helper = RedisHelper()
  330. if now_h == 0:
  331. redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
  332. redis_h = 23
  333. else:
  334. redis_dt = datetime.strftime(now_date, '%Y%m%d')
  335. redis_h = now_h - 1
  336. key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_BY_24H, config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER]
  337. for param in rule_params.get('params_list'):
  338. data_key = param.get('data')
  339. rule_key = param.get('rule')
  340. log_.info(f"data_key = {data_key}, rule_key = {rule_key}")
  341. for key_prefix in key_prefix_list:
  342. key_name = f"{key_prefix}{data_key}:{rule_key}:{redis_dt}:{redis_h}"
  343. initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
  344. if initial_data is None:
  345. initial_data = []
  346. final_data = dict()
  347. for video_id, score in initial_data:
  348. final_data[video_id] = score
  349. # 存入对应的redis
  350. final_key_name = \
  351. f"{key_prefix}{data_key}:{rule_key}:{datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  352. if len(final_data) > 0:
  353. redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 3600)
  354. def h_timer_check():
  355. try:
  356. project = config_.PROJECT_24H_APP_TYPE
  357. table = config_.TABLE_24H_APP_TYPE
  358. rule_params = RULE_PARAMS
  359. now_date = datetime.today()
  360. log_.info(f"开始执行: {datetime.strftime(now_date, '%Y%m%d%H')}")
  361. now_min = datetime.now().minute
  362. now_h = datetime.now().hour
  363. # 查看当前天级更新的数据是否已准备好
  364. h_data_count = h_data_check(project=project, table=table, now_date=now_date, now_h=now_h)
  365. if now_h == 23 or now_h < 8:
  366. log_.info("当前时间{}小时,使用bottom的data,开始。".format(now_h))
  367. h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params)
  368. log_.info("----------当前时间{}小时,使用bottom的data,完成----------".format(now_h))
  369. elif h_data_count > 0:
  370. log_.info('上游数据表查询数据条数 h_data_count = {},开始计算。'.format(h_data_count))
  371. rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params, project=project, table=table)
  372. log_.info("数据2----------正常完成----------")
  373. elif now_min > 40:
  374. log_.info('当前分钟超过40,预计执行无法完成,使用 bottom data!')
  375. h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params)
  376. log_.info('----------当前分钟超过40,使用bottom的data,完成----------')
  377. else:
  378. # 数据没准备好,1分钟后重新检查
  379. log_.info("上游数据未就绪,等待...")
  380. Timer(60, h_timer_check).start()
  381. except Exception as e:
  382. log_.error(f"不区分地域24h数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  383. send_msg_to_feishu(
  384. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  385. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  386. msg_text=f"rov-offline{config_.ENV_TEXT} - 不区分地域24h数据更新失败\n"
  387. f"exception: {e}\n"
  388. f"traceback: {traceback.format_exc()}"
  389. )
  390. if __name__ == '__main__':
  391. log_.info("文件alg_recsys_recall_24h_noregion.py:「24小时无地域」 开始执行")
  392. h_timer_check()