alg_recsys_recall_1h_noregion.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. import pandas as pd
  2. import math
  3. import traceback
  4. from functools import reduce
  5. from odps import ODPS
  6. from threading import Timer
  7. from datetime import datetime, timedelta
  8. from get_data import get_data_from_odps
  9. from db_helper import RedisHelper
  10. from utils import filter_video_status, check_table_partition_exits, filter_video_status_app, send_msg_to_feishu
  11. from config import set_config
  12. from log import Log
  13. config_, _ = set_config()
  14. log_ = Log()
  15. RULE_PARAMS = {
  16. 'rule_params': {
  17. 'rule66': {'view_type': 'video-show', 'platform_return_rate': 0.001},
  18. },
  19. 'data_params': config_.DATA_PARAMS,
  20. 'params_list': [
  21. {'data': 'data66', 'rule': 'rule66'},
  22. ],
  23. }
  24. features = [
  25. 'apptype',
  26. 'videoid',
  27. 'lastonehour_preview', # 过去1小时预曝光人数 - 区分地域
  28. 'lastonehour_view', # 过去1小时曝光人数 - 区分地域
  29. 'lastonehour_play', # 过去1小时播放人数 - 区分地域
  30. 'lastonehour_share', # 过去1小时分享人数 - 区分地域
  31. 'lastonehour_return', # 过去1小时分享,过去1小时回流人数 - 区分地域
  32. 'lastonehour_preview_total', # 过去1小时预曝光次数 - 区分地域
  33. 'lastonehour_view_total', # 过去1小时曝光次数 - 区分地域
  34. 'lastonehour_play_total', # 过去1小时播放次数 - 区分地域
  35. 'lastonehour_share_total', # 过去1小时分享次数 - 区分地域
  36. 'platform_return',
  37. 'lastonehour_show', # 不区分地域
  38. 'lasttwohour_share', # h-2小时分享人数
  39. 'lasttwohour_return_now', # h-2分享,过去1小时回流人数
  40. 'lasttwohour_return', # h-2分享,h-2回流人数
  41. 'lastthreehour_share', # h-3小时分享人数
  42. 'lastthreehour_return_now', # h-3分享,过去1小时回流人数
  43. 'lastthreehour_return', # h-3分享,h-3回流人数
  44. 'lastonehour_return_new', # 过去1小时分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  45. 'lasttwohour_return_now_new', # h-2分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  46. 'lasttwohour_return_new', # h-2分享,h-2回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  47. 'lastthreehour_return_now_new', # h-3分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  48. 'lastthreehour_return_new', # h-3分享,h-3回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  49. 'platform_return_new', # 平台分发回流(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  50. ]
  51. def h_data_check(project, table, now_date):
  52. """检查数据是否准备好"""
  53. odps = ODPS(
  54. access_id=config_.ODPS_CONFIG['ACCESSID'],
  55. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  56. project=project,
  57. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  58. connect_timeout=3000,
  59. read_timeout=500000,
  60. pool_maxsize=1000,
  61. pool_connections=1000
  62. )
  63. try:
  64. dt = datetime.strftime(now_date, '%Y%m%d%H')
  65. check_res = check_table_partition_exits(date=dt, project=project, table=table)
  66. if check_res:
  67. sql = f'select * from {project}.{table} where dt = {dt}'
  68. with odps.execute_sql(sql=sql).open_reader() as reader:
  69. data_count = reader.count
  70. else:
  71. data_count = 0
  72. except Exception as e:
  73. data_count = 0
  74. return data_count
  75. def h_rank_bottom(now_date, now_h, rule_params):
  76. """未按时更新数据,用上一小时结果作为当前小时的数据"""
  77. redis_helper = RedisHelper()
  78. if now_h == 0:
  79. redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
  80. redis_h = 23
  81. else:
  82. redis_dt = datetime.strftime(now_date, '%Y%m%d')
  83. redis_h = now_h - 1
  84. key_prefix = config_.RECALL_KEY_NAME_PREFIX_BY_H_H
  85. for param in rule_params.get('params_list'):
  86. data_key = param.get('data')
  87. rule_key = param.get('rule')
  88. log_.info(f"data_key = {data_key}, rule_key = {rule_key}")
  89. key_name = f"{key_prefix}{data_key}:{rule_key}:{redis_dt}:{redis_h}"
  90. initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
  91. if initial_data is None:
  92. initial_data = []
  93. final_data = dict()
  94. for video_id, score in initial_data:
  95. final_data[video_id] = score
  96. # 存入对应的redis
  97. final_key_name = \
  98. f"{key_prefix}{data_key}:{rule_key}:{datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  99. if len(final_data) > 0:
  100. redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 3600)
  101. def get_feature_data(project, table, now_date):
  102. """获取特征数据"""
  103. dt = datetime.strftime(now_date, '%Y%m%d%H')
  104. records = get_data_from_odps(date=dt, project=project, table=table)
  105. feature_data = []
  106. for record in records:
  107. item = {}
  108. for feature_name in features:
  109. item[feature_name] = record[feature_name]
  110. feature_data.append(item)
  111. feature_df = pd.DataFrame(feature_data)
  112. return feature_df
  113. def cal_score(df, param):
  114. # score = sharerate * backrate * LOG(lastonehour_return + 1) * K2
  115. # sharerate = lastonehour_share / (lastonehour_play + 1000)
  116. # backrate = lastonehour_return / (lastonehour_share + 10)
  117. # ctr = lastonehour_play / (lastonehour_show + 1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
  118. df = df.fillna(0)
  119. df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
  120. df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
  121. df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
  122. if param.get('view_type', None) == 'video-show':
  123. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
  124. else:
  125. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
  126. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  127. df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
  128. df['score'] = df['share_rate'] * df['back_rate'] * df['log_back'] * df['K2']
  129. df = df.sort_values(by=['score'], ascending=False)
  130. return df
  131. def merge_df(df_left, df_right):
  132. """
  133. df按照videoid 合并,对应特征求和
  134. :param df_left:
  135. :param df_right:
  136. :return:
  137. """
  138. df_merged = pd.merge(df_left, df_right, on=['videoid'], how='outer', suffixes=['_x', '_y'])
  139. df_merged.fillna(0, inplace=True)
  140. feature_list = ['videoid']
  141. for feature in features:
  142. if feature in ['apptype', 'videoid']:
  143. continue
  144. df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
  145. feature_list.append(feature)
  146. return df_merged[feature_list]
  147. def merge_df_with_score(df_left, df_right):
  148. """
  149. df 按照videoid合并,平台回流人数、回流人数、分数 分别求和
  150. :param df_left:
  151. :param df_right:
  152. :return:
  153. """
  154. df_merged = pd.merge(df_left, df_right, on=['videoid'], how='outer', suffixes=['_x', '_y'])
  155. df_merged.fillna(0, inplace=True)
  156. feature_list = ['videoid', 'lastonehour_return', 'platform_return', 'score']
  157. for feature in feature_list[1:]:
  158. df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
  159. return df_merged[feature_list]
  160. def video_rank_h(df, now_date, now_h, rule_key, param, data_key):
  161. """
  162. 获取符合进入召回源条件的视频
  163. """
  164. redis_helper = RedisHelper()
  165. log_.info(f"一共有多少个视频 = {len(df)}")
  166. # videoid重复时,保留分值高
  167. df = df.sort_values(by=['score'], ascending=False)
  168. df = df.drop_duplicates(subset=['videoid'], keep='first')
  169. df['videoid'] = df['videoid'].astype(int)
  170. # 获取符合进入召回源条件的视频
  171. platform_return_rate = param.get('platform_return_rate', 0)
  172. h_recall_df = df[df['platform_return_rate'] > platform_return_rate]
  173. h_recall_videos = h_recall_df['videoid'].to_list()
  174. log_.info(f"回流率-过滤后,一共有多少个视频 = {len(h_recall_videos)}")
  175. # 视频状态过滤
  176. if data_key in ['data7', ]:
  177. filtered_videos = filter_video_status_app(h_recall_videos)
  178. else:
  179. filtered_videos = filter_video_status(h_recall_videos)
  180. log_.info(f"视频状态-过滤后,一共有多少个视频 = {len(filtered_videos)}")
  181. # 写入对应的redis
  182. now_dt = datetime.strftime(now_date, '%Y%m%d')
  183. h_video_ids = []
  184. h_recall_result = {}
  185. for video_id in filtered_videos:
  186. score = h_recall_df[h_recall_df['videoid'] == video_id]['score']
  187. h_recall_result[int(video_id)] = float(score)
  188. h_video_ids.append(int(video_id))
  189. # recall:item:score:h:
  190. h_recall_key_name = \
  191. f"{config_.RECALL_KEY_NAME_PREFIX_BY_H_H}{data_key}:{rule_key}:{now_dt}:{now_h}"
  192. log_.info("打印非地域24小时redis key:{}".format(h_recall_key_name))
  193. if len(h_recall_result) > 0:
  194. log_.info(f"开始写入头部数据:count = {len(h_recall_result)}, key = {h_recall_key_name}")
  195. redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=2 * 3600)
  196. else:
  197. log_.info(f"无数据,不写入。")
  198. def rank_by_h(now_date, now_h, rule_params, project, table):
  199. # 获取特征数据
  200. feature_df = get_feature_data(now_date=now_date, project=project, table=table)
  201. feature_df['apptype'] = feature_df['apptype'].astype(int)
  202. # rank
  203. data_params_item = rule_params.get('data_params')
  204. rule_params_item = rule_params.get('rule_params')
  205. for param in rule_params.get('params_list'):
  206. score_df_list = []
  207. data_key = param.get('data')
  208. data_param = data_params_item.get(data_key)
  209. log_.info(f"data_key = {data_key}, data_param = {data_param}")
  210. rule_key = param.get('rule')
  211. rule_param = rule_params_item.get(rule_key)
  212. log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  213. merge_func = rule_param.get('merge_func', 1)
  214. log_.info("数据采用:{},统计采用{}.".format(data_key, rule_key))
  215. log_.info("具体的规则是:{}.".format(rule_param))
  216. if merge_func == 2:
  217. for apptype, weight in data_param.items():
  218. df = feature_df[feature_df['apptype'] == apptype]
  219. # 计算score
  220. score_df = cal_score(df=df, param=rule_param)
  221. score_df['score'] = score_df['score'] * weight
  222. score_df_list.append(score_df)
  223. # 分数合并
  224. df_merged = reduce(merge_df_with_score, score_df_list)
  225. # 更新平台回流比
  226. df_merged['platform_return_rate'] = df_merged['platform_return'] / df_merged['lastonehour_return']
  227. video_rank_h(df=df_merged, now_date=now_date, now_h=now_h,
  228. rule_key=rule_key, param=rule_param, data_key=data_key)
  229. else:
  230. df_list = [feature_df[feature_df['apptype'] == apptype] for apptype, _ in data_param.items()]
  231. df_merged = reduce(merge_df, df_list)
  232. score_df = cal_score(df=df_merged, param=rule_param)
  233. video_rank_h(df=score_df, now_date=now_date, now_h=now_h,
  234. rule_key=rule_key, param=rule_param, data_key=data_key)
  235. def h_timer_check():
  236. try:
  237. project = config_.PROJECT_H_APP_TYPE
  238. table = config_.TABLE_H_APP_TYPE
  239. rule_params = RULE_PARAMS
  240. now_date = datetime.today()
  241. log_.info(f"开始执行: {datetime.strftime(now_date, '%Y%m%d%H')}")
  242. now_min = datetime.now().minute
  243. now_h = datetime.now().hour
  244. if now_h == 0:
  245. log_.info("当前时间{}小时,使用bottom的data,开始。".format(now_h))
  246. h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params)
  247. log_.info("----------当前时间{}小时,使用bottom的data,完成----------".format(now_h))
  248. return
  249. # 查看当前小时级更新的数据是否已准备好
  250. h_data_count = h_data_check(project=project, table=table, now_date=now_date)
  251. if h_data_count > 0:
  252. log_.info('上游数据表查询数据条数 h_data_count = {},开始计算。'.format(h_data_count))
  253. # 数据准备好,进行更新
  254. rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params, project=project, table=table)
  255. log_.info("数据4----------正常完成----------")
  256. elif now_min > 40:
  257. log_.info('当前分钟超过40,预计执行无法完成,使用 bottom data!')
  258. h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params)
  259. log_.info('----------当前分钟超过40,使用bottom的data,完成----------')
  260. else:
  261. log_.info("上游数据未就绪,等待...")
  262. Timer(60, h_timer_check).start()
  263. except Exception as e:
  264. log_.error(f"不区分地域小时级数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  265. send_msg_to_feishu(
  266. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  267. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  268. msg_text=f"rov-offline{config_.ENV_TEXT} - 不区分地域小时级数据更新失败\n"
  269. f"exception: {e}\n"
  270. f"traceback: {traceback.format_exc()}"
  271. )
  272. if __name__ == '__main__':
  273. log_.info("文件alg_recsys_recall_1h_noregion.py:「1小时无地域」 开始执行")
  274. h_timer_check()