alg_recsys_recall02_1h_region.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. # -*- coding: utf-8 -*-
  2. import multiprocessing
  3. import traceback
  4. import gevent
  5. import datetime
  6. import pandas as pd
  7. import math
  8. from functools import reduce
  9. from odps import ODPS
  10. from threading import Timer
  11. from my_utils import MysqlHelper, RedisHelper, get_data_from_odps, filter_video_status, filter_shield_video, \
  12. check_table_partition_exits, filter_video_status_app, send_msg_to_feishu, filter_political_videos
  13. from config import set_config
  14. from log import Log
  15. from check_video_limit_distribute import update_limit_video_score
  16. config_, _ = set_config()
  17. log_ = Log()
  18. region_code = config_.REGION_CODE
  19. RULE_PARAMS = {
  20. 'rule_params': {
  21. 'rule66': {
  22. 'view_type': 'video-show-region', 'platform_return_rate': 0.001,
  23. 'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66'
  24. },
  25. 'rule67': {
  26. 'view_type': 'video-show-region', 'platform_return_rate': 0.001,
  27. 'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66', 'h_rule_key': 'rule66'
  28. },
  29. 'rule68': {
  30. 'view_type': 'video-show-region', 'platform_return_rate': 0.001,
  31. 'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66',
  32. 'score_func': 'back_rate_exponential_weighting1'
  33. },
  34. },
  35. 'data_params': config_.DATA_PARAMS,
  36. 'params_list': [
  37. # 532
  38. # {'data': 'data66', 'rule': 'rule66'}, # 523-> 523 & 518
  39. # {'data': 'data66', 'rule': 'rule67'}, # 523->510
  40. # {'data': 'data66', 'rule': 'rule68'}, # 523->514
  41. # {'data': 'data66', 'rule': 'rule69'}, # 523->518
  42. ],
  43. }
  44. features = [
  45. 'apptype',
  46. 'code',
  47. 'videoid',
  48. 'lastonehour_preview', # 过去1小时预曝光人数 - 区分地域
  49. 'lastonehour_view', # 过去1小时曝光人数 - 区分地域
  50. 'lastonehour_play', # 过去1小时播放人数 - 区分地域
  51. 'lastonehour_share', # 过去1小时分享人数 - 区分地域
  52. 'lastonehour_return', # 过去1小时分享,过去1小时回流人数 - 区分地域
  53. 'lastonehour_preview_total', # 过去1小时预曝光次数 - 区分地域
  54. 'lastonehour_view_total', # 过去1小时曝光次数 - 区分地域
  55. 'lastonehour_play_total', # 过去1小时播放次数 - 区分地域
  56. 'lastonehour_share_total', # 过去1小时分享次数 - 区分地域
  57. 'platform_return',
  58. 'lastonehour_show', # 不区分地域
  59. 'lastonehour_show_region', # 地域分组
  60. 'lasttwohour_share', # h-2小时分享人数
  61. 'lasttwohour_return_now', # h-2分享,过去1小时回流人数
  62. 'lasttwohour_return', # h-2分享,h-2回流人数
  63. 'lastthreehour_share', # h-3小时分享人数
  64. 'lastthreehour_return_now', # h-3分享,过去1小时回流人数
  65. 'lastthreehour_return', # h-3分享,h-3回流人数
  66. 'lastonehour_return_new', # 过去1小时分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  67. 'lasttwohour_return_now_new', # h-2分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  68. 'lasttwohour_return_new', # h-2分享,h-2回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  69. 'lastthreehour_return_now_new', # h-3分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  70. 'lastthreehour_return_new', # h-3分享,h-3回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  71. 'platform_return_new', # 平台分发回流(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  72. ]
  73. def merge_df(df_left, df_right):
  74. """
  75. df按照videoid, code 合并,对应特征求和
  76. :param df_left:
  77. :param df_right:
  78. :return:
  79. """
  80. df_merged = pd.merge(df_left, df_right, on=['videoid', 'code'], how='outer', suffixes=['_x', '_y'])
  81. df_merged.fillna(0, inplace=True)
  82. feature_list = ['videoid', 'code']
  83. for feature in features:
  84. if feature in ['apptype', 'videoid', 'code']:
  85. continue
  86. df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
  87. feature_list.append(feature)
  88. return df_merged[feature_list]
  89. def video_rank(df, now_date, now_h, rule_key, param, region, data_key):
  90. """
  91. 获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并
  92. :param df:
  93. :param now_date:
  94. :param now_h:
  95. :param rule_key: 小时级数据进入条件
  96. :param param: 小时级数据进入条件参数
  97. :param region: 所属地域
  98. :return:
  99. """
  100. redis_helper = RedisHelper()
  101. # 获取符合进入召回源条件的视频,进入条件:小时级回流>=20 && score>=0.005
  102. return_count = param.get('return_count', 1)
  103. score_value = param.get('score_rule', 0)
  104. platform_return_rate = param.get('platform_return_rate', 0)
  105. h_recall_df = df[(df['lastonehour_return'] >= return_count) & (df['score'] >= score_value)
  106. & (df['platform_return_rate'] >= platform_return_rate)]
  107. # videoid重复时,保留分值高
  108. h_recall_df = h_recall_df.sort_values(by=['score'], ascending=False)
  109. h_recall_df = h_recall_df.drop_duplicates(subset=['videoid'], keep='first')
  110. h_recall_df['videoid'] = h_recall_df['videoid'].astype(int)
  111. log_.info(f"各种规则过滤后,一共有多少个视频 = {len(h_recall_df)}")
  112. h_recall_videos = h_recall_df['videoid'].to_list()
  113. log_.info(f"各种规则增加后,一共有多少个视频 = {len(h_recall_videos)}")
  114. # 视频状态过滤
  115. filtered_videos = filter_video_status(h_recall_videos)
  116. # 屏蔽视频过滤
  117. shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
  118. shield_key_name_list = shield_config.get(region, None)
  119. if shield_key_name_list is not None:
  120. filtered_videos = filter_shield_video(video_ids=filtered_videos, shield_key_name_list=shield_key_name_list)
  121. # 涉政视频过滤
  122. political_filter = param.get('political_filter', None)
  123. if political_filter is True:
  124. filtered_videos = filter_political_videos(video_ids=filtered_videos)
  125. log_.info(f"视频状态-涉政等-过滤后,一共有多少个视频 = {len(filtered_videos)}")
  126. h_video_ids = []
  127. # 写入对应的redis
  128. h_recall_result = {}
  129. for video_id in filtered_videos:
  130. score = h_recall_df[h_recall_df['videoid'] == video_id]['score']
  131. h_recall_result[int(video_id)] = float(score)
  132. h_video_ids.append(int(video_id))
  133. h_recall_key_name = \
  134. f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{data_key}:{rule_key}:" \
  135. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  136. log_.info("打印地域1小时的某个地域{},redis key:{}".format(region, h_recall_key_name))
  137. if len(h_recall_result) > 0:
  138. log_.info(f"开始写入头部数据:count = {len(h_recall_result)}, key = {h_recall_key_name}")
  139. redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=2 * 24 * 3600)
  140. # 限流视频score调整
  141. tmp = update_limit_video_score(initial_videos=h_recall_result, key_name=h_recall_key_name)
  142. if tmp:
  143. log_.info(f"走了限流逻辑后:count = {len(h_recall_result)}, key = {h_recall_key_name}")
  144. else:
  145. log_.info("走了限流逻辑,但没更改redis,未生效。")
  146. # 清空线上过滤应用列表
  147. # redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{app_type}.{data_key}.{rule_key}")
  148. else:
  149. log_.info(f"无数据,不写入。")
  150. def cal_score_initial(df, param):
  151. """
  152. 计算score
  153. :param df: 特征数据
  154. :param param: 规则参数
  155. :return:
  156. """
  157. # score计算公式: sharerate*backrate*logback*ctr
  158. # sharerate = lastonehour_share/(lastonehour_play+1000)
  159. # backrate = lastonehour_return/(lastonehour_share+10)
  160. # ctr = lastonehour_play/(lastonehour_preview+1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
  161. # score = sharerate * backrate * LOG(lastonehour_return+1) * K2
  162. df = df.fillna(0)
  163. df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
  164. df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
  165. df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
  166. if param.get('view_type', None) == 'video-show':
  167. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
  168. elif param.get('view_type', None) == 'video-show-region':
  169. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
  170. else:
  171. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
  172. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  173. df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
  174. df['score1'] = df['share_rate'] * df['back_rate'] * df['log_back'] * df['K2']
  175. click_score_rate = param.get('click_score_rate', None)
  176. back_score_rate = param.get('click_score_rate', None)
  177. if click_score_rate is not None:
  178. df['score'] = (1 - click_score_rate) * df['score1'] + click_score_rate * df['K2']
  179. elif back_score_rate is not None:
  180. df['score'] = (1 - back_score_rate) * df['score1'] + back_score_rate * df['back_rate']
  181. else:
  182. df['score'] = df['score1']
  183. df = df.sort_values(by=['score'], ascending=False)
  184. return df
  185. def cal_score(df, param):
  186. df = cal_score_initial(df=df, param=param)
  187. return df
  188. def process_with_region(region, df_merged, data_key, rule_key, rule_param, now_date, now_h):
  189. log_.info(f"多协程的region = {region} 开始执行")
  190. region_df = df_merged[df_merged['code'] == region]
  191. log_.info(f'该区域region = {region}, 下有多少数据量 = {len(region_df)}')
  192. score_df = cal_score(df=region_df, param=rule_param)
  193. video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=rule_key, param=rule_param,
  194. region=region, data_key=data_key)
  195. log_.info(f"多协程的region = {region} 完成执行")
  196. def process_with_param(param, data_params_item, rule_params_item, region_code_list,
  197. feature_df,
  198. now_date, now_h):
  199. data_key = param.get('data')
  200. data_param = data_params_item.get(data_key)
  201. rule_key = param.get('rule')
  202. rule_param = rule_params_item.get(rule_key)
  203. log_.info("数据采用:{},统计采用{}.".format(data_key, rule_key))
  204. log_.info("具体的规则是:{}.".format(rule_param))
  205. df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
  206. df_merged = reduce(merge_df, df_list)
  207. task_list = [
  208. gevent.spawn(process_with_region,
  209. region, df_merged, data_key, rule_key, rule_param, now_date, now_h)
  210. for region in region_code_list
  211. ]
  212. gevent.joinall(task_list)
  213. log_.info(f"多进程的 param = {param} 完成执行!")
  214. def get_feature_data(project, table, time_dt_h):
  215. records = get_data_from_odps(date=time_dt_h, project=project, table=table)
  216. feature_data = []
  217. for record in records:
  218. item = {}
  219. for feature_name in features:
  220. item[feature_name] = record[feature_name]
  221. feature_data.append(item)
  222. feature_df = pd.DataFrame(feature_data)
  223. return feature_df
  224. def rank_by_h(project, table, time_dt_h, time_hour, rule_params, region_code_list):
  225. feature_df = get_feature_data(project=project, table=table, time_dt_h=time_dt_h)
  226. feature_df['apptype'] = feature_df['apptype'].astype(int)
  227. data_params_item = rule_params.get('data_params')
  228. rule_params_item = rule_params.get('rule_params')
  229. params_list = rule_params.get('params_list')
  230. pool = multiprocessing.Pool(processes=len(params_list))
  231. for param in params_list:
  232. pool.apply_async(
  233. func=process_with_param,
  234. args=(param, data_params_item, rule_params_item, region_code_list, feature_df, time_dt_h, time_hour)
  235. )
  236. pool.close()
  237. pool.join()
  238. def h_timer_check():
  239. try:
  240. # 1 配置参数读取
  241. rule_params = RULE_PARAMS
  242. project = config_.PROJECT_REGION_APP_TYPE
  243. table = config_.TABLE_REGION_APP_TYPE
  244. region_code_list = [code for region, code in region_code.items()]
  245. # 2 开始执行-时间统计
  246. time_now = datetime.datetime.today()
  247. time_dt = datetime.datetime.strftime(time_now, '%Y%m%d')
  248. time_dt_h = datetime.datetime.strftime(time_now, '%Y%m%d%H')
  249. time_hour = datetime.datetime.now().hour
  250. time_minute = datetime.datetime.now().minute
  251. log_.info(f"开始执行: {time_dt_h}")
  252. # 查看当前小时更新的数据是否已准备好
  253. h_data_count = h_data_check(project=project, table=table, now_date=time_now)
  254. if h_data_count > 0:
  255. log_.info('上游数据表查询数据条数 h_data_count = {}, 开始进行更新。'.format(h_data_count))
  256. # 数据准备好,进行更新
  257. rank_by_h(time_dt_h=time_dt_h, time_hour=time_hour, rule_params=rule_params,
  258. project=project, table=table, region_code_list=region_code_list)
  259. log_.info("数据1----------正常完成----------")
  260. elif time_minute > 40:
  261. log_.info('当前分钟超过40,预计执行无法完成,使用 bottom data!')
  262. h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list,
  263. rule_rank_h_flag=rule_rank_h_flag)
  264. log_.info('----------当前分钟超过40,使用bottom的data,完成----------')
  265. else:
  266. # 数据没准备好,1分钟后重新检查
  267. log_.info("上游数据未就绪,等待...")
  268. Timer(60, h_timer_check).start()
  269. except Exception as e:
  270. log_.error(f"地域分组小时级数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  271. send_msg_to_feishu(
  272. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  273. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  274. msg_text=f"rov-offline{config_.ENV_TEXT} - 地域分组小时级数据更新失败\n"
  275. f"exception: {e}\n"
  276. f"traceback: {traceback.format_exc()}"
  277. )
  278. if __name__ == '__main__':
  279. log_.info("文件01_1h_region.py:「1小时地域」 开始执行")
  280. h_timer_check()