rule_rank_h_by_48h.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. # -*- coding: utf-8 -*-
  2. # @ModuleName: rule_rank_h_by_48h
  3. # @Author: Liqian
  4. # @Time: 2022/8/8 15:42
  5. # @Software: PyCharm
  6. import pandas as pd
  7. import math
  8. from functools import reduce
  9. from odps import ODPS
  10. from threading import Timer
  11. from datetime import datetime, timedelta
  12. from get_data import get_data_from_odps
  13. from db_helper import RedisHelper
  14. from my_utils import filter_video_status, check_table_partition_exits
  15. from my_config import set_config
  16. from log import Log
  17. config_, _ = set_config()
  18. log_ = Log()
  19. features = [
  20. 'apptype',
  21. 'videoid',
  22. 'preview人数', # 过去48h预曝光人数
  23. 'view人数', # 过去48h曝光人数
  24. 'play人数', # 过去48h播放人数
  25. 'share人数', # 过去48h分享人数
  26. '回流人数', # 过去48h分享,过去48h回流人数
  27. 'preview次数', # 过去48h预曝光次数
  28. 'view次数', # 过去48h曝光次数
  29. 'play次数', # 过去48h播放次数
  30. 'share次数', # 过去48h分享次数
  31. 'platform_return',
  32. 'platform_preview',
  33. 'platform_preview_total',
  34. 'platform_show',
  35. 'platform_show_total',
  36. 'platform_view',
  37. 'platform_view_total',
  38. ]
  39. def h_data_check(project, table, now_date, now_h):
  40. """检查数据是否准备好"""
  41. odps = ODPS(
  42. access_id=config_.ODPS_CONFIG['ACCESSID'],
  43. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  44. project=project,
  45. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  46. connect_timeout=3000,
  47. read_timeout=500000,
  48. pool_maxsize=1000,
  49. pool_connections=1000
  50. )
  51. try:
  52. # 23点开始到8点之前(不含8点),全部用22点生成那个列表
  53. if now_h == 23:
  54. dt = datetime.strftime(now_date - timedelta(hours=1), '%Y%m%d%H')
  55. elif now_h < 8:
  56. dt = f"{datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')}22"
  57. else:
  58. dt = datetime.strftime(now_date, '%Y%m%d%H')
  59. check_res = check_table_partition_exits(date=dt, project=project, table=table)
  60. if check_res:
  61. sql = f'select * from {project}.{table} where dt = {dt}'
  62. with odps.execute_sql(sql=sql).open_reader() as reader:
  63. data_count = reader.count
  64. else:
  65. data_count = 0
  66. except Exception as e:
  67. data_count = 0
  68. return data_count
  69. def get_feature_data(now_date, now_h, project, table):
  70. """获取特征数据"""
  71. # 23点开始到8点之前(不含8点),全部用22点生成那个列表
  72. if now_h == 23:
  73. dt = datetime.strftime(now_date - timedelta(hours=1), '%Y%m%d%H')
  74. elif now_h < 8:
  75. dt = f"{datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')}22"
  76. else:
  77. dt = datetime.strftime(now_date, '%Y%m%d%H')
  78. log_.info({'feature_dt': dt})
  79. # dt = '20220425'
  80. records = get_data_from_odps(date=dt, project=project, table=table)
  81. feature_data = []
  82. for record in records:
  83. item = {}
  84. for feature_name in features:
  85. item[feature_name] = record[feature_name]
  86. feature_data.append(item)
  87. feature_df = pd.DataFrame(feature_data)
  88. return feature_df
  89. def cal_score1(df):
  90. # score1计算公式: score = 回流人数/(view人数+10000)
  91. df = df.fillna(0)
  92. df['score'] = df['回流人数'] / (df['view人数'] + 1000)
  93. df = df.sort_values(by=['score'], ascending=False)
  94. return df
  95. def cal_score2(df, param):
  96. # score2计算公式: score = share次数/(view+1000)+0.01*return/(share次数+100)
  97. df = df.fillna(0)
  98. if param.get('view_type', None) == 'video-show':
  99. df['share_rate'] = df['share次数'] / (df['platform_show'] + 1000)
  100. elif param.get('view_type', None) == 'preview':
  101. df['share_rate'] = df['share次数'] / (df['preview人数'] + 1000)
  102. else:
  103. df['share_rate'] = df['share次数'] / (df['view人数'] + 1000)
  104. df['back_rate'] = df['回流人数'] / (df['share次数'] + 100)
  105. df['score'] = df['share_rate'] + 0.01 * df['back_rate']
  106. df['platform_return_rate'] = df['platform_return'] / df['回流人数']
  107. df = df.sort_values(by=['score'], ascending=False)
  108. return df
  109. def video_rank_h(df, now_date, now_h, rule_key, param, app_type, data_key):
  110. """
  111. 获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并
  112. :param df:
  113. :param now_date:
  114. :param now_h:
  115. :param rule_key: 天级规则数据进入条件
  116. :param param: 天级规则数据进入条件参数
  117. :param app_type:
  118. :param data_key: 使用数据标识
  119. :return:
  120. """
  121. redis_helper = RedisHelper()
  122. log_.info(f"app_type = {app_type}, videos_count = {len(df)}")
  123. # videoid重复时,保留分值高
  124. df = df.sort_values(by=['score'], ascending=False)
  125. df = df.drop_duplicates(subset=['videoid'], keep='first')
  126. df['videoid'] = df['videoid'].astype(int)
  127. # 获取符合进入召回源条件的视频
  128. return_count = param.get('return_count')
  129. if return_count:
  130. day_recall_df = df[df['回流人数'] > return_count]
  131. else:
  132. day_recall_df = df
  133. platform_return_rate = param.get('platform_return_rate', 0)
  134. day_recall_df = day_recall_df[day_recall_df['platform_return_rate'] > platform_return_rate]
  135. day_recall_videos = day_recall_df['videoid'].to_list()
  136. log_.info(f'h_by48h_recall videos count = {len(day_recall_videos)}')
  137. # 视频状态过滤
  138. filtered_videos = filter_video_status(day_recall_videos)
  139. log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
  140. # 写入对应的redis
  141. now_dt = datetime.strftime(now_date, '%Y%m%d')
  142. day_video_ids = []
  143. day_recall_result = {}
  144. for video_id in filtered_videos:
  145. score = day_recall_df[day_recall_df['videoid'] == video_id]['score']
  146. day_recall_result[int(video_id)] = float(score)
  147. day_video_ids.append(int(video_id))
  148. h_48h_recall_key_name = \
  149. f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H}{app_type}:{data_key}:{rule_key}:{now_dt}:{now_h}"
  150. if len(day_recall_result) > 0:
  151. log_.info(f"count = {len(day_recall_result)}")
  152. redis_helper.add_data_with_zset(key_name=h_48h_recall_key_name, data=day_recall_result, expire_time=2 * 3600)
  153. if rule_key == 'rule1':
  154. # 去重筛选结果,保留剩余数据并写入Redis
  155. all_videos = df['videoid'].to_list()
  156. log_.info(f'h_by48h_recall all videos count = {len(all_videos)}')
  157. # 视频状态过滤
  158. all_filtered_videos = filter_video_status(all_videos)
  159. log_.info(f'all_filtered_videos count = {len(all_filtered_videos)}')
  160. # 与筛选结果去重
  161. other_videos = [video for video in all_filtered_videos if video not in day_video_ids]
  162. log_.info(f'other_videos count = {len(other_videos)}')
  163. # 写入对应的redis
  164. other_48h_recall_result = {}
  165. for video_id in other_videos:
  166. score = df[df['videoid'] == video_id]['score']
  167. other_48h_recall_result[int(video_id)] = float(score)
  168. other_h_48h_recall_key_name = \
  169. f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H_OTHER}{app_type}:{data_key}:{rule_key}:{now_dt}:{now_h}"
  170. if len(other_48h_recall_result) > 0:
  171. log_.info(f"count = {len(other_48h_recall_result)}")
  172. redis_helper.add_data_with_zset(key_name=other_h_48h_recall_key_name, data=other_48h_recall_result,
  173. expire_time=2 * 3600)
  174. def merge_df(df_left, df_right):
  175. """
  176. df按照videoid 合并,对应特征求和
  177. :param df_left:
  178. :param df_right:
  179. :return:
  180. """
  181. df_merged = pd.merge(df_left, df_right, on=['videoid'], how='outer', suffixes=['_x', '_y'])
  182. df_merged.fillna(0, inplace=True)
  183. feature_list = ['videoid']
  184. for feature in features:
  185. if feature in ['apptype', 'videoid']:
  186. continue
  187. df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
  188. feature_list.append(feature)
  189. return df_merged[feature_list]
  190. def rank_by_h(now_date, now_h, rule_params, project, table):
  191. # 获取特征数据
  192. feature_df = get_feature_data(now_date=now_date, now_h=now_h, project=project, table=table)
  193. feature_df['apptype'] = feature_df['apptype'].astype(int)
  194. # rank
  195. for app_type, params in rule_params.items():
  196. log_.info(f"app_type = {app_type}")
  197. data_params_item = params.get('data_params')
  198. rule_params_item = params.get('rule_params')
  199. for param in params.get('params_list'):
  200. data_key = param.get('data')
  201. data_param = data_params_item.get(data_key)
  202. log_.info(f"data_key = {data_key}, data_param = {data_param}")
  203. df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
  204. df_merged = reduce(merge_df, df_list)
  205. rule_key = param.get('rule')
  206. rule_param = rule_params_item.get(rule_key)
  207. log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  208. # 计算score
  209. cal_score_func = rule_param.get('cal_score_func', 1)
  210. if cal_score_func == 2:
  211. score_df = cal_score2(df=df_merged, param=rule_param)
  212. else:
  213. score_df = cal_score1(df=df_merged)
  214. video_rank_h(df=score_df, now_date=now_date, now_h=now_h, rule_key=rule_key, param=rule_param,
  215. app_type=app_type, data_key=data_key)
  216. # # to-csv
  217. # score_filename = f"score_by48h_{key}_{datetime.strftime(now_date, '%Y%m%d%H')}.csv"
  218. # score_df.to_csv(f'./data/{score_filename}')
  219. # # to-logs
  220. # log_.info({"date": datetime.strftime(now_date, '%Y%m%d%H'),
  221. # "redis_key_prefix": config_.RECALL_KEY_NAME_PREFIX_BY_48H,
  222. # "rule_key": key,
  223. # # "score_df": score_df[['videoid', 'score']]
  224. # })
  225. def h_rank_bottom(now_date, now_h, rule_params):
  226. """未按时更新数据,用模型召回数据作为当前的数据"""
  227. redis_helper = RedisHelper()
  228. if now_h == 0:
  229. redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
  230. redis_h = 23
  231. else:
  232. redis_dt = datetime.strftime(now_date, '%Y%m%d')
  233. redis_h = now_h - 1
  234. key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_BY_48H, config_.RECALL_KEY_NAME_PREFIX_BY_48H_OTHER]
  235. for app_type, params in rule_params.items():
  236. log_.info(f"app_type = {app_type}")
  237. for param in params.get('params_list'):
  238. data_key = param.get('data')
  239. rule_key = param.get('rule')
  240. log_.info(f"data_key = {data_key}, rule_key = {rule_key}")
  241. for key_prefix in key_prefix_list:
  242. key_name = f"{key_prefix}{app_type}:{data_key}:{rule_key}:{redis_dt}:{redis_h}"
  243. initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
  244. if initial_data is None:
  245. initial_data = []
  246. final_data = dict()
  247. for video_id, score in initial_data:
  248. final_data[video_id] = score
  249. # 存入对应的redis
  250. final_key_name = \
  251. f"{key_prefix}{app_type}:{data_key}:{rule_key}:{datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  252. if len(final_data) > 0:
  253. redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 3600)
  254. def h_timer_check():
  255. project = config_.PROJECT_48H_APP_TYPE
  256. table = config_.TABLE_48H_APP_TYPE
  257. rule_params = config_.RULE_PARAMS_48H_APP_TYPE
  258. now_date = datetime.today()
  259. log_.info(f"now_date: {datetime.strftime(now_date, '%Y%m%d%H')}")
  260. now_min = datetime.now().minute
  261. now_h = datetime.now().hour
  262. # 查看当前天级更新的数据是否已准备好
  263. h_data_count = h_data_check(project=project, table=table, now_date=now_date, now_h=now_h)
  264. if h_data_count > 0:
  265. log_.info(f'h_by48h_data_count = {h_data_count}')
  266. # 数据准备好,进行更新
  267. rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params, project=project, table=table)
  268. elif now_min > 50:
  269. log_.info('h_by48h_recall data is None!')
  270. h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params)
  271. else:
  272. # 数据没准备好,1分钟后重新检查
  273. Timer(60, h_timer_check).start()
  274. if __name__ == '__main__':
  275. h_timer_check()