region_rule_rank_h.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579
  1. # -*- coding: utf-8 -*-
  2. # @ModuleName: region_rule_rank_h
  3. # @Author: Liqian
  4. # @Time: 2022/5/5 15:54
  5. # @Software: PyCharm
  6. import gevent
  7. import datetime
  8. import pandas as pd
  9. import math
  10. from functools import reduce
  11. from odps import ODPS
  12. from threading import Timer
  13. from utils import MysqlHelper, RedisHelper, get_data_from_odps, filter_video_status, filter_shield_video, check_table_partition_exits
  14. from config import set_config
  15. from log import Log
  16. from check_video_limit_distribute import update_limit_video_score
  17. config_, _ = set_config()
  18. log_ = Log()
  19. region_code = config_.REGION_CODE
  20. features = [
  21. 'apptype',
  22. 'code',
  23. 'videoid',
  24. 'lastonehour_preview', # 过去1小时预曝光人数
  25. 'lastonehour_view', # 过去1小时曝光人数
  26. 'lastonehour_play', # 过去1小时播放人数
  27. 'lastonehour_share', # 过去1小时分享人数
  28. 'lastonehour_return', # 过去1小时分享,过去1小时回流人数
  29. 'lastonehour_preview_total', # 过去1小时预曝光次数
  30. 'lastonehour_view_total', # 过去1小时曝光次数
  31. 'lastonehour_play_total', # 过去1小时播放次数
  32. 'lastonehour_share_total', # 过去1小时分享次数
  33. 'platform_return',
  34. 'lastonehour_show', # 不区分地域
  35. 'lastonehour_show_region', # 地域分组
  36. ]
  37. def get_region_code(region):
  38. """获取省份对应的code"""
  39. mysql_helper = MysqlHelper(mysql_info=config_.MYSQL_INFO)
  40. sql = f"SELECT ad_code FROM region_adcode WHERE parent_id = 0 AND region LIKE '{region}%';"
  41. ad_code = mysql_helper.get_data(sql=sql)
  42. return ad_code[0][0]
  43. def h_data_check(project, table, now_date):
  44. """检查数据是否准备好"""
  45. odps = ODPS(
  46. access_id=config_.ODPS_CONFIG['ACCESSID'],
  47. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  48. project=project,
  49. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  50. connect_timeout=3000,
  51. read_timeout=500000,
  52. pool_maxsize=1000,
  53. pool_connections=1000
  54. )
  55. try:
  56. dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
  57. check_res = check_table_partition_exits(date=dt, project=project, table=table)
  58. if check_res:
  59. sql = f'select * from {project}.{table} where dt = {dt}'
  60. with odps.execute_sql(sql=sql).open_reader() as reader:
  61. data_count = reader.count
  62. else:
  63. data_count = 0
  64. except Exception as e:
  65. data_count = 0
  66. return data_count
  67. def get_rov_redis_key(now_date):
  68. """获取rov模型结果存放key"""
  69. redis_helper = RedisHelper()
  70. now_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  71. key_name = f'{config_.RECALL_KEY_NAME_PREFIX}{now_dt}'
  72. if not redis_helper.key_exists(key_name=key_name):
  73. pre_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  74. key_name = f'{config_.RECALL_KEY_NAME_PREFIX}{pre_dt}'
  75. return key_name
  76. def get_feature_data(project, table, now_date):
  77. """获取特征数据"""
  78. dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
  79. # dt = '2022041310'
  80. records = get_data_from_odps(date=dt, project=project, table=table)
  81. feature_data = []
  82. for record in records:
  83. item = {}
  84. for feature_name in features:
  85. item[feature_name] = record[feature_name]
  86. feature_data.append(item)
  87. feature_df = pd.DataFrame(feature_data)
  88. return feature_df
  89. def cal_score(df, param):
  90. """
  91. 计算score
  92. :param df: 特征数据
  93. :param param: 规则参数
  94. :return:
  95. """
  96. # score计算公式: sharerate*backrate*logback*ctr
  97. # sharerate = lastonehour_share/(lastonehour_play+1000)
  98. # backrate = lastonehour_return/(lastonehour_share+10)
  99. # ctr = lastonehour_play/(lastonehour_preview+1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
  100. # score = sharerate * backrate * LOG(lastonehour_return+1) * K2
  101. df = df.fillna(0)
  102. df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
  103. df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
  104. df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
  105. if param.get('view_type', None) == 'video-show':
  106. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
  107. elif param.get('view_type', None) == 'video-show-region':
  108. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
  109. else:
  110. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
  111. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  112. df['score'] = df['share_rate'] * df['back_rate'] * df['log_back'] * df['K2']
  113. df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
  114. df = df.sort_values(by=['score'], ascending=False)
  115. return df
  116. def video_rank(df, now_date, now_h, rule_key, param, region, app_type, data_key):
  117. """
  118. 获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并
  119. :param df:
  120. :param now_date:
  121. :param now_h:
  122. :param rule_key: 小时级数据进入条件
  123. :param param: 小时级数据进入条件参数
  124. :param region: 所属地域
  125. :return:
  126. """
  127. redis_helper = RedisHelper()
  128. # 获取符合进入召回源条件的视频,进入条件:小时级回流>=20 && score>=0.005
  129. return_count = param.get('return_count', 1)
  130. score_value = param.get('score_rule', 0)
  131. platform_return_rate = param.get('platform_return_rate', 0)
  132. h_recall_df = df[(df['lastonehour_return'] >= return_count) & (df['score'] >= score_value)
  133. & (df['platform_return_rate'] >= platform_return_rate)]
  134. # videoid重复时,保留分值高
  135. h_recall_df = h_recall_df.sort_values(by=['score'], ascending=False)
  136. h_recall_df = h_recall_df.drop_duplicates(subset=['videoid'], keep='first')
  137. h_recall_df['videoid'] = h_recall_df['videoid'].astype(int)
  138. h_recall_videos = h_recall_df['videoid'].to_list()
  139. log_.info(f'h_recall videos count = {len(h_recall_videos)}')
  140. # 视频状态过滤
  141. filtered_videos = filter_video_status(h_recall_videos)
  142. log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
  143. # 屏蔽视频过滤
  144. shield_key_name_list = config_.SHIELD_CONFIG.get(region, None)
  145. if shield_key_name_list is not None:
  146. filtered_videos = filter_shield_video(video_ids=filtered_videos, shield_key_name_list=shield_key_name_list)
  147. log_.info(f"shield filtered_videos count = {len(filtered_videos)}")
  148. # 写入对应的redis
  149. h_video_ids = []
  150. h_recall_result = {}
  151. for video_id in filtered_videos:
  152. score = h_recall_df[h_recall_df['videoid'] == video_id]['score']
  153. # print(score)
  154. h_recall_result[int(video_id)] = float(score)
  155. h_video_ids.append(int(video_id))
  156. h_recall_key_name = \
  157. f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}.{app_type}.{data_key}.{rule_key}." \
  158. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  159. if len(h_recall_result) > 0:
  160. redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=23 * 3600)
  161. # 限流视频score调整
  162. update_limit_video_score(initial_videos=h_recall_result, key_name=h_recall_key_name)
  163. # 清空线上过滤应用列表
  164. redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{app_type}.{data_key}.{rule_key}")
  165. region_24h_rule_key = param.get('region_24h_rule_key', 'rule1')
  166. by_24h_rule_key = param.get('24h_rule_key', None)
  167. # 与其他召回视频池去重,存入对应的redis
  168. dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key,
  169. region_24h_rule_key=region_24h_rule_key, by_24h_rule_key=by_24h_rule_key,
  170. region=region, app_type=app_type, data_key=data_key)
  171. def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, by_24h_rule_key, region, app_type, data_key):
  172. """将地域分组小时级数据与其他召回视频池去重,存入对应的redis"""
  173. redis_helper = RedisHelper()
  174. # # ##### 去重更新地域分组天级列表,并另存为redis中
  175. # region_day_key_name = \
  176. # f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_DAY}{region}.rule1." \
  177. # f"{datetime.datetime.strftime(now_date, '%Y%m%d')}"
  178. # if redis_helper.key_exists(key_name=region_day_key_name):
  179. # region_day_data = redis_helper.get_data_zset_with_index(
  180. # key_name=region_day_key_name, start=0, end=-1, with_scores=True)
  181. # log_.info(f'region day data count = {len(region_day_data)}')
  182. # region_day_dup = {}
  183. # for video_id, score in region_day_data:
  184. # if int(video_id) not in h_video_ids:
  185. # region_day_dup[int(video_id)] = score
  186. # h_video_ids.append(int(video_id))
  187. # log_.info(f"region day data dup count = {len(region_day_dup)}")
  188. # region_day_dup_key_name = \
  189. # f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_DAY_H}{region}.{rule_key}." \
  190. # f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  191. # if len(region_day_dup) > 0:
  192. # redis_helper.add_data_with_zset(key_name=region_day_dup_key_name, data=region_day_dup, expire_time=23 * 3600)
  193. # ##### 去重更新地域分组小时级24h列表,并另存为redis中
  194. region_24h_key_name = \
  195. f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}.{app_type}.{data_key}.{region_24h_rule_key}." \
  196. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  197. if redis_helper.key_exists(key_name=region_24h_key_name):
  198. region_24h_data = redis_helper.get_all_data_from_zset(key_name=region_24h_key_name, with_scores=True)
  199. log_.info(f'region 24h data count = {len(region_24h_data)}')
  200. # 屏蔽视频过滤
  201. region_24h_video_ids = [int(video_id) for video_id, _ in region_24h_data]
  202. shield_key_name_list = config_.SHIELD_CONFIG.get(region, None)
  203. if shield_key_name_list is not None:
  204. region_24h_video_ids = filter_shield_video(video_ids=region_24h_video_ids, shield_key_name_list=shield_key_name_list)
  205. log_.info(f"shield filtered_videos count = {len(region_24h_video_ids)}")
  206. region_24h_dup = {}
  207. for video_id, score in region_24h_data:
  208. if int(video_id) not in h_video_ids and int(video_id) in region_24h_video_ids:
  209. region_24h_dup[int(video_id)] = score
  210. h_video_ids.append(int(video_id))
  211. log_.info(f"region 24h data dup count = {len(region_24h_dup)}")
  212. region_24h_dup_key_name = \
  213. f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}.{app_type}.{data_key}.{rule_key}." \
  214. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  215. if len(region_24h_dup) > 0:
  216. redis_helper.add_data_with_zset(key_name=region_24h_dup_key_name, data=region_24h_dup, expire_time=23 * 3600)
  217. # 限流视频score调整
  218. update_limit_video_score(initial_videos=region_24h_dup, key_name=region_24h_dup_key_name)
  219. # 清空线上过滤应用列表
  220. # redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{app_type}.{data_key}.{region}.{rule_key}")
  221. # ##### 去重小程序天级更新结果,并另存为redis中
  222. # day_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_DAY}rule2.{datetime.datetime.strftime(now_date, '%Y%m%d')}"
  223. # if redis_helper.key_exists(key_name=day_key_name):
  224. # day_data = redis_helper.get_data_zset_with_index(
  225. # key_name=day_key_name, start=0, end=-1, with_scores=True)
  226. # log_.info(f'day data count = {len(day_data)}')
  227. # day_dup = {}
  228. # for video_id, score in day_data:
  229. # if int(video_id) not in h_video_ids:
  230. # day_dup[int(video_id)] = score
  231. # h_video_ids.append(int(video_id))
  232. # log_.info(f"day data dup count = {len(day_dup)}")
  233. # day_dup_key_name = \
  234. # f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_DAY_H}{region}.{rule_key}." \
  235. # f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  236. # if len(day_dup) > 0:
  237. # redis_helper.add_data_with_zset(key_name=day_dup_key_name, data=day_dup, expire_time=23 * 3600)
  238. # ##### 去重小程序相对24h更新结果,并另存为redis中
  239. day_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{app_type}.{data_key}.{by_24h_rule_key}." \
  240. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  241. if redis_helper.key_exists(key_name=day_key_name):
  242. day_data = redis_helper.get_all_data_from_zset(key_name=day_key_name, with_scores=True)
  243. log_.info(f'24h data count = {len(day_data)}')
  244. # 屏蔽视频过滤
  245. day_video_ids = [int(video_id) for video_id, _ in day_data]
  246. shield_key_name_list = config_.SHIELD_CONFIG.get(region, None)
  247. if shield_key_name_list is not None:
  248. day_video_ids = filter_shield_video(video_ids=day_video_ids, shield_key_name_list=shield_key_name_list)
  249. log_.info(f"shield filtered_videos count = {len(day_video_ids)}")
  250. day_dup = {}
  251. for video_id, score in day_data:
  252. if int(video_id) not in h_video_ids and int(video_id) in day_video_ids:
  253. day_dup[int(video_id)] = score
  254. h_video_ids.append(int(video_id))
  255. log_.info(f"24h data dup count = {len(day_dup)}")
  256. day_dup_key_name = \
  257. f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}.{app_type}.{data_key}.{rule_key}." \
  258. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  259. if len(day_dup) > 0:
  260. redis_helper.add_data_with_zset(key_name=day_dup_key_name, data=day_dup, expire_time=23 * 3600)
  261. # 限流视频score调整
  262. update_limit_video_score(initial_videos=day_dup, key_name=day_dup_key_name)
  263. # 清空线上过滤应用列表
  264. redis_helper.del_keys(key_name=f"{config_.H_VIDEO_FILER_24H}{region}.{app_type}.{data_key}.{rule_key}")
  265. # ##### 去重小程序相对24h 筛选后剩余数据 更新结果,并另存为redis中
  266. if by_24h_rule_key == 'rule3':
  267. other_h_24h_recall_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER}{app_type}.{data_key}." \
  268. f"{by_24h_rule_key}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  269. if redis_helper.key_exists(key_name=other_h_24h_recall_key_name):
  270. other_24h_data = redis_helper.get_all_data_from_zset(key_name=other_h_24h_recall_key_name, with_scores=True)
  271. log_.info(f'24h other data count = {len(other_24h_data)}')
  272. # 屏蔽视频过滤
  273. other_24h_video_ids = [int(video_id) for video_id, _ in other_24h_data]
  274. shield_key_name_list = config_.SHIELD_CONFIG.get(region, None)
  275. if shield_key_name_list is not None:
  276. other_24h_video_ids = filter_shield_video(video_ids=other_24h_video_ids, shield_key_name_list=shield_key_name_list)
  277. log_.info(f"shield filtered_videos count = {len(other_24h_video_ids)}")
  278. other_24h_dup = {}
  279. for video_id, score in other_24h_data:
  280. if int(video_id) not in h_video_ids and int(video_id) in other_24h_video_ids:
  281. other_24h_dup[int(video_id)] = score
  282. h_video_ids.append(int(video_id))
  283. log_.info(f"other 24h data dup count = {len(other_24h_dup)}")
  284. other_24h_dup_key_name = \
  285. f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}.{app_type}.{data_key}.{rule_key}." \
  286. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  287. if len(other_24h_dup) > 0:
  288. redis_helper.add_data_with_zset(key_name=other_24h_dup_key_name, data=other_24h_dup, expire_time=23 * 3600)
  289. # 限流视频score调整
  290. update_limit_video_score(initial_videos=other_24h_dup, key_name=other_24h_dup_key_name)
  291. # ##### 去重小程序模型更新结果,并另存为redis中
  292. model_key_name = get_rov_redis_key(now_date=now_date)
  293. model_data = redis_helper.get_all_data_from_zset(key_name=model_key_name, with_scores=True)
  294. log_.info(f'model data count = {len(model_data)}')
  295. # 屏蔽视频过滤
  296. model_video_ids = [int(video_id) for video_id, _ in model_data]
  297. shield_key_name_list = config_.SHIELD_CONFIG.get(region, None)
  298. if shield_key_name_list is not None:
  299. model_video_ids = filter_shield_video(video_ids=model_video_ids, shield_key_name_list=shield_key_name_list)
  300. log_.info(f"shield filtered_videos count = {len(model_video_ids)}")
  301. model_data_dup = {}
  302. for video_id, score in model_data:
  303. if int(video_id) not in h_video_ids and int(video_id) in model_video_ids:
  304. model_data_dup[int(video_id)] = score
  305. h_video_ids.append(int(video_id))
  306. log_.info(f"model data dup count = {len(model_data_dup)}")
  307. model_data_dup_key_name = \
  308. f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H}{region}.{app_type}.{data_key}.{rule_key}." \
  309. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  310. if len(model_data_dup) > 0:
  311. redis_helper.add_data_with_zset(key_name=model_data_dup_key_name, data=model_data_dup, expire_time=23 * 3600)
  312. # 限流视频score调整
  313. update_limit_video_score(initial_videos=model_data_dup, key_name=model_data_dup_key_name)
  314. def merge_df(df_left, df_right):
  315. """
  316. df按照videoid, code 合并,对应特征求和
  317. :param df_left:
  318. :param df_right:
  319. :return:
  320. """
  321. df_merged = pd.merge(df_left, df_right, on=['videoid', 'code'], how='outer', suffixes=['_x', '_y'])
  322. df_merged.fillna(0, inplace=True)
  323. feature_list = ['videoid', 'code']
  324. for feature in features:
  325. if feature in ['apptype', 'videoid', 'code']:
  326. continue
  327. df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
  328. feature_list.append(feature)
  329. return df_merged[feature_list]
  330. def process_with_region(region, df_merged, app_type, data_key, rule_key, rule_param, now_date, now_h):
  331. log_.info(f"region = {region}")
  332. # 计算score
  333. region_df = df_merged[df_merged['code'] == region]
  334. log_.info(f'region_df count = {len(region_df)}')
  335. score_df = cal_score(df=region_df, param=rule_param)
  336. video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=rule_key, param=rule_param,
  337. region=region, app_type=app_type, data_key=data_key)
  338. def process_with_app_type(app_type, params, region_code_list, feature_df, now_date, now_h):
  339. log_.info(f"app_type = {app_type}")
  340. data_params_item = params.get('data_params')
  341. rule_params_item = params.get('rule_params')
  342. task_list = []
  343. for param in params.get('params_list'):
  344. data_key = param.get('data')
  345. data_param = data_params_item.get(data_key)
  346. log_.info(f"data_key = {data_key}, data_param = {data_param}")
  347. df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
  348. df_merged = reduce(merge_df, df_list)
  349. rule_key = param.get('rule')
  350. rule_param = rule_params_item.get(rule_key)
  351. log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  352. task_list.extend(
  353. [
  354. gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param,
  355. now_date, now_h)
  356. for region in region_code_list
  357. ]
  358. )
  359. gevent.joinall(task_list)
  360. #
  361. # log_.info(f"app_type = {app_type}")
  362. # task_list = []
  363. # for data_key, data_param in params['data_params'].items():
  364. # log_.info(f"data_key = {data_key}, data_param = {data_param}")
  365. # df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
  366. # df_merged = reduce(merge_df, df_list)
  367. # for rule_key, rule_param in params['rule_params'].items():
  368. # log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  369. # task_list.extend(
  370. # [
  371. # gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param,
  372. # now_date, now_h)
  373. # for region in region_code_list
  374. # ]
  375. # )
  376. # gevent.joinall(task_list)
  377. def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list):
  378. # 获取特征数据
  379. feature_df = get_feature_data(project=project, table=table, now_date=now_date)
  380. feature_df['apptype'] = feature_df['apptype'].astype(int)
  381. t = [
  382. gevent.spawn(process_with_app_type, app_type, params, region_code_list, feature_df, now_date, now_h)
  383. for app_type, params in rule_params.items()
  384. ]
  385. gevent.joinall(t)
  386. # for app_type, params in rule_params.items():
  387. # log_.info(f"app_type = {app_type}")
  388. # for data_key, data_param in params['data_params'].items():
  389. # log_.info(f"data_key = {data_key}, data_param = {data_param}")
  390. # df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
  391. # df_merged = reduce(merge_df, df_list)
  392. # for rule_key, rule_param in params['rule_params'].items():
  393. # log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  394. # task_list = [
  395. # gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param, now_date, now_h)
  396. # for region in region_code_list
  397. # ]
  398. # gevent.joinall(task_list)
  399. # rank
  400. # for key, value in rule_params.items():
  401. # log_.info(f"rule = {key}, param = {value}")
  402. # for region in region_code_list:
  403. # log_.info(f"region = {region}")
  404. # # 计算score
  405. # region_df = feature_df[feature_df['code'] == region]
  406. # log_.info(f'region_df count = {len(region_df)}')
  407. # score_df = cal_score(df=region_df, param=value)
  408. # video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=key, param=value, region=region)
  409. # # to-csv
  410. # score_filename = f"score_{region}_{key}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
  411. # score_df.to_csv(f'./data/{score_filename}')
  412. # # to-logs
  413. # log_.info({"date": datetime.datetime.strftime(now_date, '%Y%m%d%H'),
  414. # "region_code": region,
  415. # "redis_key_prefix": config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H,
  416. # "rule_key": key,
  417. # # "score_df": score_df[['videoid', 'score']]
  418. # }
  419. # )
  420. def h_rank_bottom(now_date, now_h, rule_params, region_code_list):
  421. """未按时更新数据,用上一小时结果作为当前小时的数据"""
  422. # 获取rov模型结果
  423. redis_helper = RedisHelper()
  424. if now_h == 0:
  425. redis_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  426. redis_h = 23
  427. else:
  428. redis_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  429. redis_h = now_h - 1
  430. # 以上一小时的地域分组数据作为当前小时的数据
  431. key_prefix = config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H
  432. for app_type, params in rule_params.items():
  433. log_.info(f"app_type = {app_type}")
  434. rule_params_item = params.get('rule_params')
  435. for param in params.get('params_list'):
  436. data_key = param.get('data')
  437. rule_key = param.get('rule')
  438. rule_param = rule_params_item.get(rule_key)
  439. log_.info(f"data_key = {data_key}, rule_key = {rule_key}, rule_param = {rule_param}")
  440. region_24h_rule_key = rule_param.get('region_24h_rule_key', 'rule1')
  441. for region in region_code_list:
  442. log_.info(f"region = {region}")
  443. key_name = f"{key_prefix}{region}.{app_type}.{data_key}.{rule_key}.{redis_dt}.{redis_h}"
  444. initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
  445. if initial_data is None:
  446. initial_data = []
  447. final_data = dict()
  448. h_video_ids = []
  449. for video_id, score in initial_data:
  450. final_data[video_id] = score
  451. h_video_ids.append(int(video_id))
  452. # 存入对应的redis
  453. final_key_name = \
  454. f"{key_prefix}{region}.{app_type}.{data_key}.{rule_key}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  455. if len(final_data) > 0:
  456. redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600)
  457. # 清空线上过滤应用列表
  458. redis_helper.del_keys(
  459. key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{app_type}.{data_key}.{rule_key}")
  460. # 与其他召回视频池去重,存入对应的redis
  461. dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key,
  462. region_24h_rule_key=region_24h_rule_key, region=region,
  463. app_type=app_type, data_key=data_key)
  464. # for data_key, data_param in params['data_params'].items():
  465. # log_.info(f"data_key = {data_key}, data_param = {data_param}")
  466. # for rule_key, rule_param in params['rule_params'].items():
  467. # log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  468. # region_24h_rule_key = rule_param.get('region_24h_rule_key', 'rule1')
  469. # for region in region_code_list:
  470. # log_.info(f"region = {region}")
  471. # key_name = f"{key_prefix}{region}.{app_type}.{data_key}.{rule_key}.{redis_dt}.{redis_h}"
  472. # initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
  473. # if initial_data is None:
  474. # initial_data = []
  475. # final_data = dict()
  476. # h_video_ids = []
  477. # for video_id, score in initial_data:
  478. # final_data[video_id] = score
  479. # h_video_ids.append(int(video_id))
  480. # # 存入对应的redis
  481. # final_key_name = \
  482. # f"{key_prefix}{region}.{app_type}.{data_key}.{rule_key}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  483. # if len(final_data) > 0:
  484. # redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600)
  485. # # 清空线上过滤应用列表
  486. # redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{app_type}.{data_key}.{rule_key}")
  487. # # 与其他召回视频池去重,存入对应的redis
  488. # dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key,
  489. # region_24h_rule_key=region_24h_rule_key, region=region,
  490. # app_type=app_type, data_key=data_key)
  491. def h_timer_check():
  492. rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
  493. project = config_.PROJECT_REGION_APP_TYPE
  494. table = config_.TABLE_REGION_APP_TYPE
  495. region_code_list = [code for region, code in region_code.items()]
  496. now_date = datetime.datetime.today()
  497. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  498. now_h = datetime.datetime.now().hour
  499. now_min = datetime.datetime.now().minute
  500. if now_h == 0:
  501. h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list)
  502. return
  503. # 查看当前小时更新的数据是否已准备好
  504. h_data_count = h_data_check(project=project, table=table, now_date=now_date)
  505. if h_data_count > 0:
  506. log_.info(f'region_h_data_count = {h_data_count}')
  507. # 数据准备好,进行更新
  508. rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params,
  509. project=project, table=table, region_code_list=region_code_list)
  510. elif now_min > 50:
  511. log_.info('h_recall data is None, use bottom data!')
  512. h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list)
  513. else:
  514. # 数据没准备好,1分钟后重新检查
  515. Timer(60, h_timer_check).start()
  516. if __name__ == '__main__':
  517. h_timer_check()