rule_rank_day_by_30day.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. import pandas as pd
  2. import math
  3. from functools import reduce
  4. from odps import ODPS
  5. from threading import Timer
  6. from datetime import datetime, timedelta
  7. from get_data import get_data_from_odps
  8. from db_helper import RedisHelper
  9. from utils import filter_video_status, check_table_partition_exits
  10. from config import set_config
  11. from log import Log
  12. config_, _ = set_config()
  13. log_ = Log()
  14. features = [
  15. 'apptype',
  16. 'videoid',
  17. 'preview人数', # 过去24h预曝光人数
  18. 'view人数', # 过去24h曝光人数
  19. 'play人数', # 过去24h播放人数
  20. 'share人数', # 过去24h分享人数
  21. '回流人数', # 过去24h分享,过去24h回流人数
  22. 'preview次数', # 过去24h预曝光次数
  23. 'view次数', # 过去24h曝光次数
  24. 'play次数', # 过去24h播放次数
  25. 'share次数', # 过去24h分享次数
  26. 'platform_return',
  27. 'platform_preview',
  28. 'platform_preview_total',
  29. 'platform_show',
  30. 'platform_show_total',
  31. 'platform_view',
  32. 'platform_view_total',
  33. ]
  34. def data_check(project, table, now_date):
  35. """检查数据是否准备好"""
  36. odps = ODPS(
  37. access_id=config_.ODPS_CONFIG['ACCESSID'],
  38. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  39. project=project,
  40. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  41. connect_timeout=3000,
  42. read_timeout=500000,
  43. pool_maxsize=1000,
  44. pool_connections=1000
  45. )
  46. try:
  47. dt = datetime.strftime(now_date, '%Y%m%d')
  48. check_res = check_table_partition_exits(date=dt, project=project, table=table)
  49. if check_res:
  50. sql = f'select * from {project}.{table} where dt = {dt}'
  51. with odps.execute_sql(sql=sql).open_reader() as reader:
  52. data_count = reader.count
  53. else:
  54. data_count = 0
  55. except Exception as e:
  56. data_count = 0
  57. return data_count
  58. def get_feature_data(now_date, project, table):
  59. """获取特征数据"""
  60. dt = datetime.strftime(now_date, '%Y%m%d')
  61. records = get_data_from_odps(date=dt, project=project, table=table)
  62. feature_data = []
  63. for record in records:
  64. item = {}
  65. for feature_name in features:
  66. item[feature_name] = record[feature_name]
  67. feature_data.append(item)
  68. feature_df = pd.DataFrame(feature_data)
  69. return feature_df
  70. def cal_score(df, param):
  71. # score计算公式: score = share次数/(view+1000)+0.01*return/(share次数+100)
  72. df = df.fillna(0)
  73. if param.get('view_type', None) == 'video-show':
  74. df['share_rate'] = df['share次数'] / (df['platform_show'] + 1000)
  75. elif param.get('view_type', None) == 'preview':
  76. df['share_rate'] = df['share次数'] / (df['preview人数'] + 1000)
  77. else:
  78. df['share_rate'] = df['share次数'] / (df['view人数'] + 1000)
  79. df['back_rate'] = df['回流人数'] / (df['share次数'] + 100)
  80. df['score'] = df['share_rate'] + 0.01 * df['back_rate']
  81. df['platform_return_rate'] = df['platform_return'] / df['回流人数']
  82. df = df.sort_values(by=['score'], ascending=False)
  83. return df
  84. def video_rank_h(df, now_date, rule_key, param, data_key):
  85. """
  86. 获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并
  87. :param df:
  88. :param now_date:
  89. :param rule_key: 天级规则数据进入条件
  90. :param param: 天级规则数据进入条件参数
  91. :param data_key: 使用数据标识
  92. :return:
  93. """
  94. redis_helper = RedisHelper()
  95. log_.info(f"videos_count = {len(df)}")
  96. # videoid重复时,保留分值高
  97. df = df.sort_values(by=['score'], ascending=False)
  98. df = df.drop_duplicates(subset=['videoid'], keep='first')
  99. df['videoid'] = df['videoid'].astype(int)
  100. day_recall_videos = df['videoid'].to_list()
  101. log_.info(f'day_by30day_recall videos count = {len(day_recall_videos)}')
  102. # 视频状态过滤
  103. filtered_videos = filter_video_status(day_recall_videos)
  104. log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
  105. # 获取top视频
  106. top = param.get('top')
  107. day_recall_df = df[df['videoid'].isin(filtered_videos)]
  108. day_recall_df = day_recall_df.sort_values(by=['score'], ascending=False)
  109. day_recall_df = day_recall_df[:top]
  110. # 写入对应的redis
  111. now_dt = datetime.strftime(now_date, '%Y%m%d')
  112. day_video_ids = []
  113. day_recall_result = {}
  114. for video_id in day_recall_df['videoid'].to_list():
  115. score = day_recall_df[day_recall_df['videoid'] == video_id]['score']
  116. day_recall_result[int(video_id)] = float(score)
  117. day_video_ids.append(int(video_id))
  118. day_30day_recall_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_30DAY}{data_key}:{rule_key}:{now_dt}"
  119. if len(day_recall_result) > 0:
  120. log_.info(f"count = {len(day_recall_result)}, key = {day_30day_recall_key_name}")
  121. redis_helper.add_data_with_zset(key_name=day_30day_recall_key_name, data=day_recall_result,
  122. expire_time=2 * 24 * 3600)
  123. def merge_df(df_left, df_right):
  124. """
  125. df按照videoid 合并,对应特征求和
  126. :param df_left:
  127. :param df_right:
  128. :return:
  129. """
  130. df_merged = pd.merge(df_left, df_right, on=['videoid'], how='outer', suffixes=['_x', '_y'])
  131. df_merged.fillna(0, inplace=True)
  132. feature_list = ['videoid']
  133. for feature in features:
  134. if feature in ['apptype', 'videoid']:
  135. continue
  136. df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
  137. feature_list.append(feature)
  138. return df_merged[feature_list]
  139. def merge_df_with_score(df_left, df_right):
  140. """
  141. df 按照videoid合并,平台回流人数、回流人数、分数 分别求和
  142. :param df_left:
  143. :param df_right:
  144. :return:
  145. """
  146. df_merged = pd.merge(df_left, df_right, on=['videoid'], how='outer', suffixes=['_x', '_y'])
  147. df_merged.fillna(0, inplace=True)
  148. feature_list = ['videoid', '回流人数', 'platform_return', 'score']
  149. for feature in feature_list[1:]:
  150. df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
  151. return df_merged[feature_list]
  152. def rank(now_date, rule_params, project, table):
  153. # 获取特征数据
  154. feature_df = get_feature_data(now_date=now_date, project=project, table=table)
  155. feature_df['apptype'] = feature_df['apptype'].astype(int)
  156. # rank
  157. data_params_item = rule_params.get('data_params')
  158. rule_params_item = rule_params.get('rule_params')
  159. for param in rule_params.get('params_list'):
  160. score_df_list = []
  161. data_key = param.get('data')
  162. data_param = data_params_item.get(data_key)
  163. log_.info(f"data_key = {data_key}, data_param = {data_param}")
  164. rule_key = param.get('rule')
  165. rule_param = rule_params_item.get(rule_key)
  166. log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  167. merge_func = rule_param.get('merge_func', 1)
  168. if merge_func == 2:
  169. for apptype, weight in data_param.items():
  170. df = feature_df[feature_df['apptype'] == apptype]
  171. # 计算score
  172. score_df = cal_score(df=df, param=rule_param)
  173. score_df['score'] = score_df['score'] * weight
  174. score_df_list.append(score_df)
  175. # 分数合并
  176. df_merged = reduce(merge_df_with_score, score_df_list)
  177. # 更新平台回流比
  178. df_merged['platform_return_rate'] = df_merged['platform_return'] / df_merged['回流人数']
  179. video_rank_h(df=df_merged, now_date=now_date, rule_key=rule_key, param=rule_param, data_key=data_key)
  180. else:
  181. df_list = [feature_df[feature_df['apptype'] == apptype] for apptype, _ in data_param.items()]
  182. df_merged = reduce(merge_df, df_list)
  183. score_df = cal_score(df=df_merged, param=rule_param)
  184. video_rank_h(df=score_df, now_date=now_date, rule_key=rule_key, param=rule_param, data_key=data_key)
  185. def rank_bottom(now_date, rule_params):
  186. """未按时更新数据,用前一天数据作为当前的数据"""
  187. redis_helper = RedisHelper()
  188. redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
  189. key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_30DAY]
  190. for param in rule_params.get('params_list'):
  191. data_key = param.get('data')
  192. rule_key = param.get('rule')
  193. log_.info(f"data_key = {data_key}, rule_key = {rule_key}")
  194. for key_prefix in key_prefix_list:
  195. key_name = f"{key_prefix}{data_key}:{rule_key}:{redis_dt}"
  196. initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
  197. if initial_data is None:
  198. initial_data = []
  199. final_data = dict()
  200. for video_id, score in initial_data:
  201. final_data[video_id] = score
  202. # 存入对应的redis
  203. final_key_name = \
  204. f"{key_prefix}{data_key}:{rule_key}:{datetime.strftime(now_date, '%Y%m%d')}"
  205. if len(final_data) > 0:
  206. redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 24 * 3600)
  207. def timer_check():
  208. project = config_.PROJECT_30DAY_APP_TYPE
  209. table = config_.TABLE_30DAY_APP_TYPE
  210. rule_params = config_.RULE_PARAMS_30DAY_APP_TYPE
  211. now_date = datetime.today()
  212. log_.info(f"now_date: {datetime.strftime(now_date, '%Y%m%d')}")
  213. now_h = datetime.now().hour
  214. # 查看当前天级更新的数据是否已准备好
  215. data_count = data_check(project=project, table=table, now_date=now_date)
  216. if data_count > 0:
  217. log_.info(f'day_by30day_data_count = {data_count}')
  218. # 数据准备好,进行更新
  219. rank(now_date=now_date, rule_params=rule_params, project=project, table=table)
  220. elif now_h > 3:
  221. log_.info('day_by30day_recall data is None!')
  222. rank_bottom(now_date=now_date, rule_params=rule_params)
  223. else:
  224. # 数据没准备好,5分钟后重新检查
  225. Timer(5 * 60, timer_check).start()
  226. if __name__ == '__main__':
  227. timer_check()