region_rule_rank_h_by24h.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. # -*- coding: utf-8 -*-
  2. # @ModuleName: region_rule_rank_h
  3. # @Author: Liqian
  4. # @Time: 2022/5/5 15:54
  5. # @Software: PyCharm
  6. import time
  7. import multiprocessing
  8. import os
  9. import traceback
  10. import gevent
  11. import datetime
  12. import pandas as pd
  13. import math
  14. from functools import reduce
  15. from odps import ODPS
  16. from threading import Timer, Thread
  17. from my_utils import RedisHelper, get_data_from_odps, filter_video_status, check_table_partition_exits, \
  18. filter_video_status_app, send_msg_to_feishu
  19. from my_config import set_config
  20. from log import Log
  21. # os.environ['NUMEXPR_MAX_THREADS'] = '16'
  22. config_, _ = set_config()
  23. log_ = Log()
  24. region_code = config_.REGION_CODE
  25. features = [
  26. 'apptype',
  27. 'code', # 省份编码
  28. 'videoid',
  29. 'lastday_preview', # 昨日预曝光人数
  30. 'lastday_view', # 昨日曝光人数
  31. 'lastday_play', # 昨日播放人数
  32. 'lastday_share', # 昨日分享人数
  33. 'lastday_return', # 昨日回流人数
  34. 'lastday_preview_total', # 昨日预曝光次数
  35. 'lastday_view_total', # 昨日曝光次数
  36. 'lastday_play_total', # 昨日播放次数
  37. 'lastday_share_total', # 昨日分享次数
  38. 'platform_return',
  39. 'platform_preview',
  40. 'platform_preview_total',
  41. 'platform_show',
  42. 'platform_show_total',
  43. 'platform_view',
  44. 'platform_view_total',
  45. ]
  46. def get_rov_redis_key(now_date):
  47. """获取rov模型结果存放key"""
  48. redis_helper = RedisHelper()
  49. now_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  50. key_name = f'{config_.RECALL_KEY_NAME_PREFIX}{now_dt}'
  51. if not redis_helper.key_exists(key_name=key_name):
  52. pre_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  53. key_name = f'{config_.RECALL_KEY_NAME_PREFIX}{pre_dt}'
  54. return key_name
  55. def data_check(project, table, now_date):
  56. """检查数据是否准备好"""
  57. odps = ODPS(
  58. access_id=config_.ODPS_CONFIG['ACCESSID'],
  59. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  60. project=project,
  61. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  62. connect_timeout=3000,
  63. read_timeout=500000,
  64. pool_maxsize=1000,
  65. pool_connections=1000
  66. )
  67. try:
  68. dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
  69. check_res = check_table_partition_exits(date=dt, project=project, table=table)
  70. if check_res:
  71. sql = f'select * from {project}.{table} where dt = {dt}'
  72. with odps.execute_sql(sql=sql).open_reader() as reader:
  73. data_count = reader.count
  74. else:
  75. data_count = 0
  76. except Exception as e:
  77. data_count = 0
  78. return data_count
  79. def get_feature_data(project, table, now_date):
  80. """获取特征数据"""
  81. dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
  82. # dt = '2022041310'
  83. records = get_data_from_odps(date=dt, project=project, table=table)
  84. feature_data = []
  85. for record in records:
  86. item = {}
  87. for feature_name in features:
  88. item[feature_name] = record[feature_name]
  89. feature_data.append(item)
  90. feature_df = pd.DataFrame(feature_data)
  91. return feature_df
  92. def cal_score(df, param):
  93. """
  94. 计算score
  95. :param df: 特征数据
  96. :param param:
  97. :return:
  98. """
  99. # score计算公式: sharerate*backrate*logback*ctr
  100. # sharerate = lastday_share/(lastday_play+1000)
  101. # backrate = lastday_return/(lastday_share+10)
  102. # ctr = lastday_play/(lastday_preview+1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
  103. # score = sharerate * backrate * LOG(lastday_return+1) * K2
  104. df = df.fillna(0)
  105. df['share_rate'] = df['lastday_share'] / (df['lastday_play'] + 1000)
  106. df['back_rate'] = df['lastday_return'] / (df['lastday_share'] + 10)
  107. df['log_back'] = (df['lastday_return'] + 1).apply(math.log)
  108. if param.get('view_type', None) == 'video-show':
  109. df['ctr'] = df['lastday_play'] / (df['platform_show'] + 1000)
  110. else:
  111. df['ctr'] = df['lastday_play'] / (df['lastday_preview'] + 1000)
  112. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  113. df['platform_return_rate'] = df['platform_return'] / df['lastday_return']
  114. df['score1'] = df['share_rate'] * df['back_rate'] * df['log_back'] * df['K2']
  115. click_score_rate = param.get('click_score_rate', None)
  116. back_score_rate = param.get('click_score_rate', None)
  117. if click_score_rate is not None:
  118. df['score'] = (1 - click_score_rate) * df['score1'] + click_score_rate * df['K2']
  119. elif back_score_rate is not None:
  120. df['score'] = (1 - back_score_rate) * df['score1'] + back_score_rate * df['back_rate']
  121. else:
  122. df['score'] = df['score1']
  123. df = df.sort_values(by=['score'], ascending=False)
  124. return df
  125. def video_rank(df, now_date, now_h, rule_key, param, region, data_key):
  126. """
  127. 获取符合进入召回源条件的视频
  128. :param df:
  129. :param now_date:
  130. :param now_h:
  131. :param rule_key: 小时级数据进入条件
  132. :param param: 小时级数据进入条件参数
  133. :param region: 所属地域
  134. :return:
  135. """
  136. redis_helper = RedisHelper()
  137. # 获取符合进入召回源条件的视频
  138. return_count = param.get('return_count', 1)
  139. score_value = param.get('score_rule', 0)
  140. platform_return_rate = param.get('platform_return_rate', 0)
  141. h_recall_df = df[(df['lastday_return'] >= return_count) & (df['score'] >= score_value)
  142. & (df['platform_return_rate'] >= platform_return_rate)]
  143. log_.info(f'h_recall_df count = {len(h_recall_df)}')
  144. # videoid重复时,保留分值高
  145. h_recall_df = h_recall_df.sort_values(by=['score'], ascending=False)
  146. h_recall_df = h_recall_df.drop_duplicates(subset=['videoid'], keep='first')
  147. h_recall_df['videoid'] = h_recall_df['videoid'].astype(int)
  148. h_recall_videos = h_recall_df['videoid'].to_list()
  149. log_.info(f'h_recall_videos count = {len(h_recall_videos)}')
  150. # log_.info('h_recall_videos:{}'.format('-'.join([str(i) for i in h_recall_videos])))
  151. # 视频状态过滤
  152. if data_key in ['data7', ]:
  153. filtered_videos = filter_video_status_app(h_recall_videos)
  154. else:
  155. filtered_videos = filter_video_status(h_recall_videos)
  156. log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
  157. # 写入对应的redis
  158. h_video_ids = []
  159. day_recall_result = {}
  160. for video_id in filtered_videos:
  161. score = h_recall_df[h_recall_df['videoid'] == video_id]['score']
  162. # print(score)
  163. day_recall_result[int(video_id)] = float(score)
  164. h_video_ids.append(int(video_id))
  165. day_recall_key_name = \
  166. f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}:{data_key}:{rule_key}:" \
  167. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  168. # log_.info("day_recall_result.type:{}".format(str(type(day_recall_result))))
  169. # log_.info("begin to write redis for day_recall_key_name:{} with {}".format(day_recall_key_name,
  170. # str(len(day_recall_result))))
  171. if len(day_recall_result) > 0:
  172. redis_helper.add_data_with_zset(key_name=day_recall_key_name, data=day_recall_result, expire_time=2 * 3600)
  173. # 清空线上过滤应用列表
  174. # redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{app_type}.{data_key}.{rule_key}")
  175. # 与其他召回视频池去重,存入对应的redis
  176. # dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, region=region)
  177. def merge_df(df_left, df_right):
  178. """
  179. df按照videoid, code 合并,对应特征求和
  180. :param df_left:
  181. :param df_right:
  182. :return:
  183. """
  184. df_merged = pd.merge(df_left, df_right, on=['videoid', 'code'], how='outer', suffixes=['_x', '_y'])
  185. df_merged.fillna(0, inplace=True)
  186. feature_list = ['videoid', 'code']
  187. for feature in features:
  188. if feature in ['apptype', 'videoid', 'code']:
  189. continue
  190. df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
  191. feature_list.append(feature)
  192. return df_merged[feature_list]
  193. def merge_df_with_score(df_left, df_right):
  194. """
  195. df 按照[videoid, code]合并,平台回流人数、回流人数、分数 分别求和
  196. :param df_left:
  197. :param df_right:
  198. :return:
  199. """
  200. df_merged = pd.merge(df_left, df_right, on=['videoid', 'code'], how='outer', suffixes=['_x', '_y'])
  201. df_merged.fillna(0, inplace=True)
  202. feature_list = ['videoid', 'code', 'lastday_return', 'platform_return', 'score']
  203. for feature in feature_list[2:]:
  204. df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
  205. return df_merged[feature_list]
  206. def process_with_region(region, df_merged, data_key, rule_key, rule_param, now_date, now_h):
  207. log_.info(f"region = {region} start...")
  208. # 计算score
  209. region_df = df_merged[df_merged['code'] == region]
  210. log_.info(f'region = {region}, region_df count = {len(region_df)}')
  211. score_df = cal_score(df=region_df, param=rule_param)
  212. video_rank(df=score_df, now_date=now_date, now_h=now_h, region=region,
  213. rule_key=rule_key, param=rule_param, data_key=data_key)
  214. log_.info(f"region = {region} end!")
  215. def process_with_region2(region, df_merged, data_key, rule_key, rule_param, now_date, now_h):
  216. log_.info(f"region = {region} start...")
  217. region_score_df = df_merged[df_merged['code'] == region]
  218. log_.info(f'region = {region}, region_score_df count = {len(region_score_df)}')
  219. video_rank(df=region_score_df, now_date=now_date, now_h=now_h, region=region,
  220. rule_key=rule_key, param=rule_param, data_key=data_key)
  221. log_.info(f"region = {region} end!")
  222. def process_with_app_type(app_type, params, region_code_list, feature_df, now_date, now_h):
  223. log_.info(f"app_type = {app_type} start...")
  224. data_params_item = params.get('data_params')
  225. rule_params_item = params.get('rule_params')
  226. for param in params.get('params_list'):
  227. data_key = param.get('data')
  228. data_param = data_params_item.get(data_key)
  229. log_.info(f"data_key = {data_key}, data_param = {data_param}")
  230. df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
  231. df_merged = reduce(merge_df, df_list)
  232. rule_key = param.get('rule')
  233. rule_param = rule_params_item.get(rule_key)
  234. log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  235. task_list = [
  236. gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param,
  237. now_date, now_h)
  238. for region in region_code_list
  239. ]
  240. gevent.joinall(task_list)
  241. log_.info(f"app_type = {app_type} end!")
  242. def process_with_param(param, data_params_item, rule_params_item, region_code_list, feature_df, now_date, now_h):
  243. log_.info(f"param = {param} start...")
  244. data_key = param.get('data')
  245. data_param = data_params_item.get(data_key)
  246. log_.info(f"data_key = {data_key}, data_param = {data_param}")
  247. rule_key = param.get('rule')
  248. rule_param = rule_params_item.get(rule_key)
  249. log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  250. merge_func = rule_param.get('merge_func', None)
  251. if merge_func == 2:
  252. score_df_list = []
  253. for apptype, weight in data_param.items():
  254. df = feature_df[feature_df['apptype'] == apptype]
  255. # 计算score
  256. score_df = cal_score(df=df, param=rule_param)
  257. score_df['score'] = score_df['score'] * weight
  258. score_df_list.append(score_df)
  259. # 分数合并
  260. df_merged = reduce(merge_df_with_score, score_df_list)
  261. # 更新平台回流比
  262. df_merged['platform_return_rate'] = df_merged['platform_return'] / df_merged['lastday_return']
  263. task_list = [
  264. gevent.spawn(process_with_region2, region, df_merged, data_key, rule_key, rule_param, now_date, now_h)
  265. for region in region_code_list
  266. ]
  267. else:
  268. df_list = [feature_df[feature_df['apptype'] == apptype] for apptype, _ in data_param.items()]
  269. df_merged = reduce(merge_df, df_list)
  270. task_list = [
  271. gevent.spawn(process_with_region, region, df_merged, data_key, rule_key, rule_param, now_date, now_h)
  272. for region in region_code_list
  273. ]
  274. gevent.joinall(task_list)
  275. log_.info(f"param = {param} end!")
  276. def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
  277. # 获取特征数据
  278. feature_df = get_feature_data(project=project, table=table, now_date=now_date)
  279. feature_df['apptype'] = feature_df['apptype'].astype(int)
  280. # rank
  281. data_params_item = rule_params.get('data_params')
  282. rule_params_item = rule_params.get('rule_params')
  283. params_list = rule_params.get('params_list')
  284. pool = multiprocessing.Pool(processes=len(params_list))
  285. for param in params_list:
  286. pool.apply_async(
  287. func=process_with_param,
  288. args=(param, data_params_item, rule_params_item, region_code_list, feature_df, now_date, now_h)
  289. )
  290. pool.close()
  291. pool.join()
  292. """
  293. pool = multiprocessing.Pool(processes=len(config_.APP_TYPE))
  294. for app_type, params in rule_params.items():
  295. pool.apply_async(func=process_with_app_type,
  296. args=(app_type, params, region_code_list, feature_df, now_date, now_h))
  297. pool.close()
  298. pool.join()
  299. """
  300. # for app_type, params in rule_params.items():
  301. # log_.info(f"app_type = {app_type}")
  302. # for data_key, data_param in params['data_params'].items():
  303. # log_.info(f"data_key = {data_key}, data_param = {data_param}")
  304. # df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
  305. # df_merged = reduce(merge_df, df_list)
  306. # for rule_key, rule_param in params['rule_params'].items():
  307. # log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  308. # task_list = [
  309. # gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param,
  310. # now_date, now_h)
  311. # for region in region_code_list
  312. # ]
  313. # gevent.joinall(task_list)
  314. # for key, value in rule_params.items():
  315. # log_.info(f"rule = {key}, param = {value}")
  316. # for region in region_code_list:
  317. # log_.info(f"region = {region}")
  318. # # 计算score
  319. # region_df = feature_df[feature_df['code'] == region]
  320. # log_.info(f'region_df count = {len(region_df)}')
  321. # score_df = cal_score(df=region_df, param=value)
  322. # video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=key, param=value, region=region)
  323. # # to-csv
  324. # score_filename = f"score_24h_{region}_{key}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
  325. # score_df.to_csv(f'./data/{score_filename}')
  326. # # to-logs
  327. # log_.info({"date": datetime.datetime.strftime(now_date, '%Y%m%d%H'),
  328. # "region_code": region,
  329. # "redis_key_prefix": config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H,
  330. # "rule_key": key,
  331. # # "score_df": score_df[['videoid', 'score']]
  332. # })
  333. def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region):
  334. """将地域分组小时级数据与其他召回视频池去重,存入对应的redis"""
  335. redis_helper = RedisHelper()
  336. # ##### 去重小程序天级更新结果,并另存为redis中
  337. day_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_DAY}rule2.{datetime.datetime.strftime(now_date, '%Y%m%d')}"
  338. if redis_helper.key_exists(key_name=day_key_name):
  339. day_data = redis_helper.get_all_data_from_zset(key_name=day_key_name, with_scores=True)
  340. log_.info(f'day data count = {len(day_data)}')
  341. day_dup = {}
  342. for video_id, score in day_data:
  343. if int(video_id) not in h_video_ids:
  344. day_dup[int(video_id)] = score
  345. h_video_ids.append(int(video_id))
  346. log_.info(f"day data dup count = {len(day_dup)}")
  347. day_dup_key_name = \
  348. f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_DAY_24H}{region}.{rule_key}." \
  349. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  350. if len(day_dup) > 0:
  351. redis_helper.add_data_with_zset(key_name=day_dup_key_name, data=day_dup, expire_time=23 * 3600)
  352. # ##### 去重小程序模型更新结果,并另存为redis中
  353. model_key_name = get_rov_redis_key(now_date=now_date)
  354. model_data = redis_helper.get_all_data_from_zset(key_name=model_key_name, with_scores=True)
  355. log_.info(f'model data count = {len(model_data)}')
  356. model_data_dup = {}
  357. for video_id, score in model_data:
  358. if int(video_id) not in h_video_ids:
  359. model_data_dup[int(video_id)] = score
  360. h_video_ids.append(int(video_id))
  361. log_.info(f"model data dup count = {len(model_data_dup)}")
  362. model_data_dup_key_name = \
  363. f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_24H}{region}.{rule_key}." \
  364. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  365. if len(model_data_dup) > 0:
  366. redis_helper.add_data_with_zset(key_name=model_data_dup_key_name, data=model_data_dup, expire_time=23 * 3600)
  367. def h_rank_bottom(now_date, now_h, rule_params, region_code_list):
  368. """未按时更新数据,用上一小时结果作为当前小时的数据"""
  369. redis_helper = RedisHelper()
  370. if now_h == 0:
  371. redis_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  372. redis_h = 23
  373. else:
  374. redis_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  375. redis_h = now_h - 1
  376. # 以上一小时的地域分组数据作为当前小时的数据
  377. key_prefix = config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H
  378. for param in rule_params.get('params_list'):
  379. data_key = param.get('data')
  380. rule_key = param.get('rule')
  381. log_.info(f"data_key = {data_key}, rule_key = {rule_key}")
  382. for region in region_code_list:
  383. log_.info(f"region = {region}")
  384. key_name = f"{key_prefix}{region}:{data_key}:{rule_key}:{redis_dt}:{redis_h}"
  385. initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
  386. if initial_data is None:
  387. initial_data = []
  388. final_data = dict()
  389. h_video_ids = []
  390. for video_id, score in initial_data:
  391. final_data[video_id] = score
  392. h_video_ids.append(int(video_id))
  393. # 存入对应的redis
  394. final_key_name = \
  395. f"{key_prefix}{region}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  396. if len(final_data) > 0:
  397. redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 3600)
  398. """
  399. for app_type, params in rule_params.items():
  400. log_.info(f"app_type = {app_type}")
  401. for param in params.get('params_list'):
  402. data_key = param.get('data')
  403. rule_key = param.get('rule')
  404. log_.info(f"data_key = {data_key}, rule_key = {rule_key}")
  405. for region in region_code_list:
  406. log_.info(f"region = {region}")
  407. key_name = f"{key_prefix}{region}:{app_type}:{data_key}:{rule_key}:{redis_dt}:{redis_h}"
  408. initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
  409. if initial_data is None:
  410. initial_data = []
  411. final_data = dict()
  412. h_video_ids = []
  413. for video_id, score in initial_data:
  414. final_data[video_id] = score
  415. h_video_ids.append(int(video_id))
  416. # 存入对应的redis
  417. final_key_name = \
  418. f"{key_prefix}{region}:{app_type}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  419. if len(final_data) > 0:
  420. redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 3600)
  421. # 清空线上过滤应用列表
  422. # redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{app_type}.{data_key}.{rule_key}")
  423. # 与其他召回视频池去重,存入对应的redis
  424. # dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, region=region)
  425. """
  426. def h_timer_check():
  427. try:
  428. rule_params = config_.RULE_PARAMS_REGION_24H_APP_TYPE
  429. project = config_.PROJECT_REGION_24H_APP_TYPE
  430. table = config_.TABLE_REGION_24H_APP_TYPE
  431. region_code_list = [code for region, code in region_code.items() if code != '-1']
  432. now_date = datetime.datetime.today()
  433. now_h = datetime.datetime.now().hour
  434. now_min = datetime.datetime.now().minute
  435. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  436. redis_helper = RedisHelper()
  437. # 查看当天更新的数据是否已准备好
  438. h_data_count = data_check(project=project, table=table, now_date=now_date)
  439. if h_data_count > 0:
  440. log_.info(f'region_24h_data_count = {h_data_count}')
  441. # 数据准备好,进行更新
  442. rank_by_24h(now_date=now_date, now_h=now_h, rule_params=rule_params,
  443. project=project, table=table, region_code_list=region_code_list)
  444. log_.info(f"region_24h_data end!")
  445. redis_helper.set_data_to_redis(
  446. key_name=f"{config_.REGION_24H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}", value='1', expire_time=2 * 3600
  447. )
  448. log_.info(f"region_24h_data status update to '1' finished!")
  449. elif now_min > 40:
  450. log_.info('24h_recall data is None, use bottom data!')
  451. h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list)
  452. log_.info(f"region_24h_data end!")
  453. redis_helper.set_data_to_redis(
  454. key_name=f"{config_.REGION_24H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}", value='1', expire_time=2 * 3600
  455. )
  456. log_.info(f"region_24h_data status update to '1' finished!")
  457. else:
  458. # 数据没准备好,1分钟后重新检查
  459. Timer(60, h_timer_check).start()
  460. except Exception as e:
  461. log_.error(f"地域分组24h数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  462. send_msg_to_feishu(
  463. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  464. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  465. msg_text=f"rov-offline{config_.ENV_TEXT} - 地域分组24h数据更新失败\n"
  466. f"exception: {e}\n"
  467. f"traceback: {traceback.format_exc()}"
  468. )
  469. if __name__ == '__main__':
  470. log_.info(f"region_24h_data start...")
  471. h_timer_check()