region_rule_rank_h_new.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712
  1. # -*- coding: utf-8 -*-
  2. # @ModuleName: region_rule_rank_h
  3. # @Author: Liqian
  4. # @Time: 2022/5/5 15:54
  5. # @Software: PyCharm
  6. import json
  7. import multiprocessing
  8. import os
  9. import sys
  10. import traceback
  11. import gevent
  12. import datetime
  13. import pandas as pd
  14. import math
  15. from functools import reduce
  16. from odps import ODPS
  17. from threading import Timer, Thread
  18. from my_utils import MysqlHelper, RedisHelper, get_data_from_odps, filter_video_status, filter_shield_video, \
  19. check_table_partition_exits, filter_video_status_app, send_msg_to_feishu, filter_political_videos
  20. from my_config import set_config
  21. from log import Log
  22. from check_video_limit_distribute_new import update_limit_video_score
  23. # os.environ['NUMEXPR_MAX_THREADS'] = '16'
  24. config_, _ = set_config()
  25. log_ = Log()
  26. region_code = config_.REGION_CODE
  27. features = [
  28. 'apptype',
  29. 'code',
  30. 'videoid',
  31. 'lastonehour_preview', # 过去1小时预曝光人数
  32. 'lastonehour_view', # 过去1小时曝光人数
  33. 'lastonehour_play', # 过去1小时播放人数
  34. 'lastonehour_share', # 过去1小时分享人数
  35. 'lastonehour_return', # 过去1小时分享,过去1小时回流人数
  36. 'lastonehour_preview_total', # 过去1小时预曝光次数
  37. 'lastonehour_view_total', # 过去1小时曝光次数
  38. 'lastonehour_play_total', # 过去1小时播放次数
  39. 'lastonehour_share_total', # 过去1小时分享次数
  40. 'platform_return',
  41. 'lastonehour_show', # 不区分地域
  42. 'lastonehour_show_region', # 地域分组
  43. ]
  44. def data2file(data, filepath):
  45. """数据写入文件"""
  46. filedir = '/'.join(filepath.split('/')[:-1])
  47. if not os.path.exists(filedir):
  48. os.makedirs(filedir)
  49. with open(filepath, 'w') as wf:
  50. wf.write(data)
  51. def get_region_code(region):
  52. """获取省份对应的code"""
  53. mysql_helper = MysqlHelper(mysql_info=config_.MYSQL_INFO)
  54. sql = f"SELECT ad_code FROM region_adcode WHERE parent_id = 0 AND region LIKE '{region}%';"
  55. ad_code = mysql_helper.get_data(sql=sql)
  56. return ad_code[0][0]
  57. def h_data_check(project, table, now_date):
  58. """检查数据是否准备好"""
  59. odps = ODPS(
  60. access_id=config_.ODPS_CONFIG['ACCESSID'],
  61. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  62. project=project,
  63. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  64. connect_timeout=3000,
  65. read_timeout=500000,
  66. pool_maxsize=1000,
  67. pool_connections=1000
  68. )
  69. try:
  70. dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
  71. check_res = check_table_partition_exits(date=dt, project=project, table=table)
  72. if check_res:
  73. sql = f'select * from {project}.{table} where dt = {dt}'
  74. with odps.execute_sql(sql=sql).open_reader() as reader:
  75. data_count = reader.count
  76. else:
  77. data_count = 0
  78. except Exception as e:
  79. data_count = 0
  80. return data_count
  81. def get_rov_redis_key(now_date):
  82. """获取rov模型结果存放key"""
  83. redis_helper = RedisHelper()
  84. now_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  85. key_name = f'{config_.RECALL_KEY_NAME_PREFIX}{now_dt}'
  86. if not redis_helper.key_exists(key_name=key_name):
  87. pre_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  88. key_name = f'{config_.RECALL_KEY_NAME_PREFIX}{pre_dt}'
  89. return key_name
  90. def get_day_30day_videos(now_date, data_key, rule_key):
  91. """获取天级更新相对30天的视频id"""
  92. redis_helper = RedisHelper()
  93. day_30day_recall_key_prefix = config_.RECALL_KEY_NAME_PREFIX_30DAY
  94. now_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  95. day_30day_recall_key_name = f"{day_30day_recall_key_prefix}{data_key}:{rule_key}:{now_dt}"
  96. if not redis_helper.key_exists(key_name=day_30day_recall_key_name):
  97. redis_dt = datetime.datetime.strftime((now_date - datetime.timedelta(days=1)), '%Y%m%d')
  98. day_30day_recall_key_name = f"{day_30day_recall_key_prefix}{data_key}:{rule_key}:{redis_dt}"
  99. data = redis_helper.get_all_data_from_zset(key_name=day_30day_recall_key_name, with_scores=True)
  100. if data is None:
  101. return None
  102. video_ids = [int(video_id) for video_id, _ in data]
  103. return video_ids
  104. def get_feature_data(project, table, now_date):
  105. """获取特征数据"""
  106. dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
  107. # dt = '2022041310'
  108. records = get_data_from_odps(date=dt, project=project, table=table)
  109. feature_data = []
  110. for record in records:
  111. item = {}
  112. for feature_name in features:
  113. item[feature_name] = record[feature_name]
  114. feature_data.append(item)
  115. feature_df = pd.DataFrame(feature_data)
  116. return feature_df
  117. def cal_score(df, param):
  118. """
  119. 计算score
  120. :param df: 特征数据
  121. :param param: 规则参数
  122. :return:
  123. """
  124. # score计算公式: sharerate*backrate*logback*ctr
  125. # sharerate = lastonehour_share/(lastonehour_play+1000)
  126. # backrate = lastonehour_return/(lastonehour_share+10)
  127. # ctr = lastonehour_play/(lastonehour_preview+1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
  128. # score = sharerate * backrate * LOG(lastonehour_return+1) * K2
  129. df = df.fillna(0)
  130. df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
  131. df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
  132. df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
  133. if param.get('view_type', None) == 'video-show':
  134. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
  135. elif param.get('view_type', None) == 'video-show-region':
  136. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
  137. else:
  138. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
  139. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  140. df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
  141. df['score1'] = df['share_rate'] * df['back_rate'] * df['log_back'] * df['K2']
  142. click_score_rate = param.get('click_score_rate', None)
  143. back_score_rate = param.get('click_score_rate', None)
  144. if click_score_rate is not None:
  145. df['score'] = (1 - click_score_rate) * df['score1'] + click_score_rate * df['K2']
  146. elif back_score_rate is not None:
  147. df['score'] = (1 - back_score_rate) * df['score1'] + back_score_rate * df['back_rate']
  148. else:
  149. df['score'] = df['score1']
  150. df = df.sort_values(by=['score'], ascending=False)
  151. return df
  152. def add_func1(initial_df, pre_h_df):
  153. """当前小时级数据与前几个小时数据合并"""
  154. score_list = initial_df['score'].to_list()
  155. if len(score_list) > 0:
  156. min_score = min(score_list)
  157. else:
  158. min_score = 0
  159. pre_h_df = pre_h_df[pre_h_df['score'] > min_score]
  160. df = pd.concat([initial_df, pre_h_df], ignore_index=True)
  161. # videoid去重,保留分值高
  162. df['videoid'] = df['videoid'].astype(int)
  163. df = df.sort_values(by=['score'], ascending=False)
  164. df = df.drop_duplicates(subset=['videoid'], keep="first")
  165. return df
  166. def add_func2(initial_df, pre_h_df):
  167. """当前小时级数据与前几个小时数据合并: 当前小时存在的视频以当前小时为准,否则以高分为主"""
  168. score_list = initial_df['score'].to_list()
  169. if len(score_list) > 0:
  170. min_score = min(score_list)
  171. else:
  172. min_score = 0
  173. initial_video_id_list = initial_df['videoid'].to_list()
  174. pre_h_df = pre_h_df[pre_h_df['score'] > min_score]
  175. pre_h_df = pre_h_df[~pre_h_df['videoid'].isin(initial_video_id_list)]
  176. df = pd.concat([initial_df, pre_h_df], ignore_index=True)
  177. # videoid去重,保留分值高
  178. df['videoid'] = df['videoid'].astype(int)
  179. df = df.sort_values(by=['score'], ascending=False)
  180. df = df.drop_duplicates(subset=['videoid'], keep="first")
  181. return df
  182. def add_videos(initial_df, now_date, rule_key, region, data_key, hour_count, top, add_func):
  183. """
  184. 地域小时级数据列表中增加前6h优质视频
  185. :param initial_df: 地域小时级筛选结果
  186. :param now_date:
  187. :param data_key:
  188. :param region:
  189. :param rule_key:
  190. :param hour_count: 前几个小时, type-int
  191. :param top: type-int
  192. :return: df
  193. """
  194. redis_helper = RedisHelper()
  195. pre_h_data = []
  196. for i in range(1, hour_count+1):
  197. pre_date = now_date - datetime.timedelta(hours=i)
  198. pre_h = pre_date.hour
  199. pre_h_recall_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{data_key}:{rule_key}:" \
  200. f"{datetime.datetime.strftime(pre_date, '%Y%m%d')}:{pre_h}"
  201. pre_h_top_data = redis_helper.get_data_zset_with_index(key_name=pre_h_recall_key_name,
  202. start=0, end=top-1,
  203. desc=True, with_scores=True)
  204. if pre_h_top_data is None:
  205. continue
  206. pre_h_data.extend(pre_h_top_data)
  207. pre_h_df = pd.DataFrame(data=pre_h_data, columns=['videoid', 'score'])
  208. if add_func == 'func2':
  209. df = add_func2(initial_df=initial_df, pre_h_df=pre_h_df)
  210. else:
  211. df = add_func1(initial_df=initial_df, pre_h_df=pre_h_df)
  212. return df
  213. def video_rank(df, now_date, now_h, rule_key, param, region, data_key, add_videos_with_pre_h=False, hour_count=0,
  214. videos_count=40):
  215. """
  216. 获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并
  217. :param hour_count:
  218. :param add_videos_with_pre_h:
  219. :param data_key:
  220. :param df:
  221. :param now_date:
  222. :param now_h:
  223. :param rule_key: 小时级数据进入条件
  224. :param param: 小时级数据进入条件参数
  225. :param region: 所属地域
  226. :return:
  227. """
  228. redis_helper = RedisHelper()
  229. # 获取符合进入召回源条件的视频,进入条件:小时级回流>=20 && score>=0.005
  230. return_count = param.get('return_count', 1)
  231. score_value = param.get('score_rule', 0)
  232. platform_return_rate = param.get('platform_return_rate', 0)
  233. h_recall_df = df[(df['lastonehour_return'] >= return_count) & (df['score'] >= score_value)
  234. & (df['platform_return_rate'] >= platform_return_rate)]
  235. # videoid重复时,保留分值高
  236. h_recall_df = h_recall_df.sort_values(by=['score'], ascending=False)
  237. h_recall_df = h_recall_df.drop_duplicates(subset=['videoid'], keep='first')
  238. h_recall_df['videoid'] = h_recall_df['videoid'].astype(int)
  239. # 增加打捞的优质视频
  240. if add_videos_with_pre_h is True:
  241. add_func = param.get('add_func', None)
  242. h_recall_df = add_videos(initial_df=h_recall_df, now_date=now_date, rule_key=rule_key,
  243. region=region, data_key=data_key, hour_count=hour_count, top=10, add_func=add_func)
  244. h_recall_videos = h_recall_df['videoid'].to_list()
  245. # log_.info(f'h_recall videos count = {len(h_recall_videos)}')
  246. # 视频状态过滤
  247. if data_key in ['data7', ]:
  248. filtered_videos = filter_video_status_app(h_recall_videos)
  249. else:
  250. filtered_videos = filter_video_status(h_recall_videos)
  251. # log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
  252. # 屏蔽视频过滤
  253. shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
  254. shield_key_name_list = shield_config.get(region, None)
  255. if shield_key_name_list is not None:
  256. filtered_videos = filter_shield_video(video_ids=filtered_videos, shield_key_name_list=shield_key_name_list)
  257. # log_.info(f"shield filtered_videos count = {len(filtered_videos)}")
  258. # 涉政视频过滤
  259. political_filter = param.get('political_filter', None)
  260. if political_filter is True:
  261. log_.info(f"political filter videos count = {len(filtered_videos)}")
  262. filtered_videos = filter_political_videos(video_ids=filtered_videos)
  263. log_.info(f"political filtered videos count = {len(filtered_videos)}")
  264. h_video_ids = []
  265. h_recall_result_mapping = {}
  266. for video_id in filtered_videos:
  267. score = h_recall_df[h_recall_df['videoid'] == video_id]['score']
  268. # print(score)
  269. h_recall_result_mapping[int(video_id)] = float(score)
  270. h_video_ids.append(int(video_id))
  271. # 限流视频score调整
  272. h_recall_result_mapping = update_limit_video_score(initial_videos=h_recall_result_mapping)
  273. if h_recall_result_mapping:
  274. # 按照score排序
  275. h_recall_result = [(int(vid), float(score)) for vid, score in h_recall_result_mapping.items()]
  276. h_recall_result = sorted(h_recall_result, key=lambda x: x[1], reverse=True)
  277. h_recall_result = [(vid, round(score, 10)) for vid, score in h_recall_result]
  278. h_recall_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{data_key}:{rule_key}"
  279. log_.info(f"h_recall_result count = {len(h_recall_result)}")
  280. # 写入对应的redis
  281. redis_helper.set_data_to_redis(
  282. key_name=h_recall_key_name, value=json.dumps(h_recall_result[:videos_count]), expire_time=30 * 24 * 3600
  283. )
  284. # 写入本地文件
  285. filename = f"{region}_{data_key}_{rule_key}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.txt"
  286. data2file(data=json.dumps(h_recall_result), filepath=f"./data/region_h/{filename}")
  287. region_24h_rule_key = param.get('region_24h_rule_key', 'rule1')
  288. by_24h_rule_key = param.get('24h_rule_key', None)
  289. # 与其他召回视频池去重,存入对应的redis
  290. dup_to_redis(now_date=now_date, now_h=now_h, rule_key=rule_key,
  291. region_24h_rule_key=region_24h_rule_key, by_24h_rule_key=by_24h_rule_key,
  292. region=region, data_key=data_key, political_filter=political_filter,
  293. shield_config=shield_config)
  294. def dup_data(initial_key_name, dup_key_name, region, political_filter, shield_config, filepath, videos_count):
  295. redis_helper = RedisHelper()
  296. if redis_helper.key_exists(key_name=initial_key_name):
  297. initial_data = redis_helper.get_all_data_from_zset(key_name=initial_key_name, with_scores=True)
  298. # 屏蔽视频过滤
  299. initial_video_ids = [int(video_id) for video_id, _ in initial_data]
  300. shield_key_name_list = shield_config.get(region, None)
  301. if shield_key_name_list is not None:
  302. initial_video_ids = filter_shield_video(video_ids=initial_video_ids,
  303. shield_key_name_list=shield_key_name_list)
  304. # 涉政视频过滤
  305. if political_filter is True:
  306. initial_video_ids = filter_political_videos(video_ids=initial_video_ids)
  307. dup_data = {}
  308. for video_id, score in initial_data:
  309. if int(video_id) in initial_video_ids:
  310. dup_data[int(video_id)] = score
  311. # 限流视频score调整
  312. dup_data = update_limit_video_score(initial_videos=dup_data)
  313. if dup_data:
  314. # 按照score排序
  315. data = [(int(vid), float(score)) for vid, score in dup_data.items()]
  316. data = sorted(data, key=lambda x: x[1], reverse=True)
  317. data = [(vid, round(score, 10)) for vid, score in data]
  318. log_.info(f"data count = {len(data)}")
  319. # 写入对应的redis
  320. redis_helper.set_data_to_redis(
  321. key_name=dup_key_name, value=json.dumps(data[:videos_count]), expire_time=30 * 24 * 3600
  322. )
  323. # 写入本地文件
  324. data2file(data=json.dumps(data), filepath=filepath)
  325. def dup_to_redis(now_date, now_h, rule_key, region_24h_rule_key, by_24h_rule_key,
  326. region, data_key, political_filter, shield_config):
  327. """将地域分组小时级数据与其他召回视频池去重,存入对应的redis"""
  328. # ##### 更新地域分组小时级24h列表,并另存为redis中
  329. region_24h_key_name = \
  330. f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}:{data_key}:{region_24h_rule_key}:" \
  331. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  332. region_24h_dup_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{data_key}:{rule_key}"
  333. filename = f"{region}_{data_key}_{rule_key}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.txt"
  334. dup_data(initial_key_name=region_24h_key_name, dup_key_name=region_24h_dup_key_name, region=region,
  335. political_filter=political_filter, shield_config=shield_config, filepath=f"./data/region24h/{filename}",
  336. videos_count=100)
  337. # ##### 小程序相对24h更新结果,并另存为redis中
  338. h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{data_key}:{by_24h_rule_key}:" \
  339. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  340. h_24h_dup_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{data_key}:{rule_key}"
  341. filename = f"{region}_{data_key}_{rule_key}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.txt"
  342. dup_data(initial_key_name=h_24h_key_name, dup_key_name=h_24h_dup_key_name, region=region,
  343. political_filter=political_filter, shield_config=shield_config, filepath=f"./data/24h/{filename}",
  344. videos_count=100)
  345. # ##### 去重小程序相对24h 筛选后剩余数据 更新结果,并另存为redis中
  346. other_h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER}{data_key}:" \
  347. f"{by_24h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  348. other_h_24h_dup_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{data_key}:{rule_key}"
  349. filename = f"{region}_{data_key}_{rule_key}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.txt"
  350. dup_data(initial_key_name=other_h_24h_key_name, dup_key_name=other_h_24h_dup_key_name, region=region,
  351. political_filter=political_filter, shield_config=shield_config, filepath=f"./data/24h_other/{filename}",
  352. videos_count=200)
  353. def merge_df(df_left, df_right):
  354. """
  355. df按照videoid, code 合并,对应特征求和
  356. :param df_left:
  357. :param df_right:
  358. :return:
  359. """
  360. df_merged = pd.merge(df_left, df_right, on=['videoid', 'code'], how='outer', suffixes=['_x', '_y'])
  361. df_merged.fillna(0, inplace=True)
  362. feature_list = ['videoid', 'code']
  363. for feature in features:
  364. if feature in ['apptype', 'videoid', 'code']:
  365. continue
  366. df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
  367. feature_list.append(feature)
  368. return df_merged[feature_list]
  369. def merge_df_with_score(df_left, df_right):
  370. """
  371. df 按照[videoid, code]合并,平台回流人数、回流人数、分数 分别求和
  372. :param df_left:
  373. :param df_right:
  374. :return:
  375. """
  376. df_merged = pd.merge(df_left, df_right, on=['videoid', 'code'], how='outer', suffixes=['_x', '_y'])
  377. df_merged.fillna(0, inplace=True)
  378. feature_list = ['videoid', 'code', 'lastonehour_return', 'platform_return', 'score']
  379. for feature in feature_list[2:]:
  380. df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
  381. return df_merged[feature_list]
  382. def process_with_region(region, df_merged, data_key, rule_key, rule_param, now_date, now_h,
  383. add_videos_with_pre_h, hour_count):
  384. log_.info(f"region = {region} start...")
  385. # 计算score
  386. region_df = df_merged[df_merged['code'] == region]
  387. log_.info(f'region = {region}, region_df count = {len(region_df)}')
  388. score_df = cal_score(df=region_df, param=rule_param)
  389. video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=rule_key, param=rule_param,
  390. region=region, data_key=data_key,
  391. add_videos_with_pre_h=add_videos_with_pre_h, hour_count=hour_count)
  392. log_.info(f"region = {region} end!")
  393. def process_with_region2(region, df_merged, data_key, rule_key, rule_param, now_date, now_h,
  394. add_videos_with_pre_h, hour_count):
  395. log_.info(f"region = {region} start...")
  396. region_score_df = df_merged[df_merged['code'] == region]
  397. log_.info(f'region = {region}, region_score_df count = {len(region_score_df)}')
  398. video_rank(df=region_score_df, now_date=now_date, now_h=now_h, region=region,
  399. rule_key=rule_key, param=rule_param, data_key=data_key,
  400. add_videos_with_pre_h=add_videos_with_pre_h, hour_count=hour_count)
  401. log_.info(f"region = {region} end!")
  402. def process_with_app_type(app_type, params, region_code_list, feature_df, now_date, now_h, rule_rank_h_flag):
  403. log_.info(f"app_type = {app_type} start...")
  404. data_params_item = params.get('data_params')
  405. rule_params_item = params.get('rule_params')
  406. task_list = []
  407. for param in params.get('params_list'):
  408. data_key = param.get('data')
  409. data_param = data_params_item.get(data_key)
  410. log_.info(f"data_key = {data_key}, data_param = {data_param}")
  411. df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
  412. df_merged = reduce(merge_df, df_list)
  413. rule_key = param.get('rule')
  414. rule_param = rule_params_item.get(rule_key)
  415. log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  416. task_list.extend(
  417. [
  418. gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param,
  419. now_date, now_h, rule_rank_h_flag)
  420. for region in region_code_list
  421. ]
  422. )
  423. gevent.joinall(task_list)
  424. log_.info(f"app_type = {app_type} end!")
  425. # log_.info(f"app_type = {app_type}")
  426. # task_list = []
  427. # for data_key, data_param in params['data_params'].items():
  428. # log_.info(f"data_key = {data_key}, data_param = {data_param}")
  429. # df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
  430. # df_merged = reduce(merge_df, df_list)
  431. # for rule_key, rule_param in params['rule_params'].items():
  432. # log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  433. # task_list.extend(
  434. # [
  435. # gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param,
  436. # now_date, now_h)
  437. # for region in region_code_list
  438. # ]
  439. # )
  440. # gevent.joinall(task_list)
  441. def copy_data_for_city(region, city_code, data_key, rule_key, shield_config, now_date):
  442. """copy 对应数据到城市对应redis,并做相应屏蔽视频过滤"""
  443. log_.info(f"city_code = {city_code} start ...")
  444. redis_helper = RedisHelper()
  445. key_prefix_list = [
  446. (config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H, 'region_h'), # 地域小时级
  447. (config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H, 'region_24h'), # 地域相对24h
  448. (config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H, '24h'), # 不区分地域相对24h
  449. (config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H, '24h_other') # 不区分地域相对24h筛选后
  450. ]
  451. for key_prefix, file_folder in key_prefix_list:
  452. region_key = f"{key_prefix}{region}:{data_key}:{rule_key}"
  453. city_key = f"{key_prefix}{city_code}:{data_key}:{rule_key}"
  454. if not redis_helper.key_exists(key_name=region_key):
  455. continue
  456. region_data = redis_helper.get_data_from_redis(key_name=region_key)
  457. if not region_data:
  458. continue
  459. # 屏蔽视频过滤
  460. region_video_ids = [int(video_id) for video_id, _ in json.loads(region_data)]
  461. shield_key_name_list = shield_config.get(city_code, None)
  462. # shield_key_name_list = config_.SHIELD_CONFIG.get(city_code, None)
  463. if shield_key_name_list is not None:
  464. filtered_video_ids = filter_shield_video(video_ids=region_video_ids,
  465. shield_key_name_list=shield_key_name_list)
  466. else:
  467. filtered_video_ids = region_video_ids
  468. city_data = []
  469. for video_id, score in json.loads(region_data):
  470. if int(video_id) in filtered_video_ids:
  471. city_data.append((int(video_id), score))
  472. if len(city_data) > 0:
  473. redis_helper.set_data_to_redis(key_name=city_key, value=json.dumps(city_data), expire_time=30 * 24 * 3600)
  474. # 写入本地文件
  475. filename = f"{region}_{data_key}_{rule_key}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.txt"
  476. data2file(data=json.dumps(city_data), filepath=f"./data/{file_folder}/{filename}")
  477. log_.info(f"city_code = {city_code} end!")
  478. def process_with_param(param, data_params_item, rule_params_item, region_code_list, feature_df, now_date, now_h):
  479. log_.info(f"param = {param} start...")
  480. data_key = param.get('data')
  481. data_param = data_params_item.get(data_key)
  482. log_.info(f"data_key = {data_key}, data_param = {data_param}")
  483. rule_key = param.get('rule')
  484. rule_param = rule_params_item.get(rule_key)
  485. log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  486. merge_func = rule_param.get('merge_func', None)
  487. # 是否在地域小时级数据中增加打捞的优质视频
  488. add_videos_with_pre_h = rule_param.get('add_videos_with_pre_h', False)
  489. hour_count = rule_param.get('hour_count', 0)
  490. if merge_func == 2:
  491. score_df_list = []
  492. for apptype, weight in data_param.items():
  493. df = feature_df[feature_df['apptype'] == apptype]
  494. # 计算score
  495. score_df = cal_score(df=df, param=rule_param)
  496. score_df['score'] = score_df['score'] * weight
  497. score_df_list.append(score_df)
  498. # 分数合并
  499. df_merged = reduce(merge_df_with_score, score_df_list)
  500. # 更新平台回流比
  501. df_merged['platform_return_rate'] = df_merged['platform_return'] / df_merged['lastonehour_return']
  502. task_list = [
  503. gevent.spawn(process_with_region2,
  504. region, df_merged, data_key, rule_key, rule_param, now_date, now_h,
  505. add_videos_with_pre_h, hour_count)
  506. for region in region_code_list
  507. ]
  508. else:
  509. df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
  510. df_merged = reduce(merge_df, df_list)
  511. task_list = [
  512. gevent.spawn(process_with_region,
  513. region, df_merged, data_key, rule_key, rule_param, now_date, now_h,
  514. add_videos_with_pre_h, hour_count)
  515. for region in region_code_list
  516. ]
  517. gevent.joinall(task_list)
  518. # 特殊城市视频数据准备
  519. # 屏蔽视频过滤
  520. shield_config = rule_param.get('shield_config', config_.SHIELD_CONFIG)
  521. for region, city_list in config_.REGION_CITY_MAPPING.items():
  522. t = [
  523. gevent.spawn(
  524. copy_data_for_city,
  525. region, city_code, data_key, rule_key, shield_config, now_date
  526. )
  527. for city_code in city_list
  528. ]
  529. gevent.joinall(t)
  530. log_.info(f"param = {param} end!")
  531. def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list):
  532. # 获取特征数据
  533. feature_df = get_feature_data(project=project, table=table, now_date=now_date)
  534. feature_df['apptype'] = feature_df['apptype'].astype(int)
  535. data_params_item = rule_params.get('data_params')
  536. rule_params_item = rule_params.get('rule_params')
  537. params_list = rule_params.get('params_list_new')
  538. pool = multiprocessing.Pool(processes=len(params_list))
  539. for param in params_list:
  540. pool.apply_async(
  541. func=process_with_param,
  542. args=(param, data_params_item, rule_params_item, region_code_list, feature_df, now_date, now_h)
  543. )
  544. pool.close()
  545. pool.join()
  546. def h_bottom_process(param, rule_params_item, region_code_list, now_date, now_h):
  547. data_key = param.get('data')
  548. rule_key = param.get('rule')
  549. rule_param = rule_params_item.get(rule_key)
  550. log_.info(f"data_key = {data_key}, rule_key = {rule_key}, rule_param = {rule_param}")
  551. region_24h_rule_key = rule_param.get('region_24h_rule_key', 'rule1')
  552. by_24h_rule_key = rule_param.get('24h_rule_key', None)
  553. # 涉政视频过滤
  554. political_filter = param.get('political_filter', None)
  555. # 屏蔽视频过滤
  556. shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
  557. for region in region_code_list:
  558. log_.info(f"region = {region}")
  559. # 与其他召回视频池去重,存入对应的redis
  560. dup_to_redis(now_date=now_date, now_h=now_h, rule_key=rule_key,
  561. region_24h_rule_key=region_24h_rule_key, region=region,
  562. data_key=data_key, by_24h_rule_key=by_24h_rule_key,
  563. political_filter=political_filter, shield_config=shield_config)
  564. # 特殊城市视频数据准备
  565. for region, city_list in config_.REGION_CITY_MAPPING.items():
  566. t = [
  567. gevent.spawn(
  568. copy_data_for_city,
  569. region, city_code, data_key, rule_key, shield_config, now_date
  570. )
  571. for city_code in city_list
  572. ]
  573. gevent.joinall(t)
  574. def h_rank_bottom(now_date, now_h, rule_params, region_code_list):
  575. """未按时更新数据,用上一小时结果作为当前小时的数据"""
  576. # 以上一小时的地域分组数据作为当前小时的数据
  577. rule_params_item = rule_params.get('rule_params')
  578. params_list = rule_params.get('params_list_new')
  579. pool = multiprocessing.Pool(processes=len(params_list))
  580. for param in params_list:
  581. pool.apply_async(
  582. func=h_bottom_process,
  583. args=(param, rule_params_item, region_code_list, now_date, now_h)
  584. )
  585. pool.close()
  586. pool.join()
  587. def h_timer_check():
  588. try:
  589. rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
  590. project = config_.PROJECT_REGION_APP_TYPE
  591. table = config_.TABLE_REGION_APP_TYPE
  592. region_code_list = [code for region, code in region_code.items()]
  593. now_date = datetime.datetime.today()
  594. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  595. now_h = datetime.datetime.now().hour
  596. now_min = datetime.datetime.now().minute
  597. if now_h == 0:
  598. h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list)
  599. log_.info(f"region_h_data end!")
  600. return
  601. # 查看当前小时更新的数据是否已准备好
  602. h_data_count = h_data_check(project=project, table=table, now_date=now_date)
  603. if h_data_count > 0:
  604. log_.info(f'region_h_data_count = {h_data_count}')
  605. # 数据准备好,进行更新
  606. rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params,
  607. project=project, table=table, region_code_list=region_code_list)
  608. log_.info(f"region_h_data end!")
  609. elif now_min > 40:
  610. log_.info('h_recall data is None, use bottom data!')
  611. h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list)
  612. log_.info(f"region_h_data end!")
  613. else:
  614. # 数据没准备好,1分钟后重新检查
  615. Timer(60, h_timer_check).start()
  616. except Exception as e:
  617. log_.error(f"地域分组小时级数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  618. send_msg_to_feishu(
  619. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  620. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  621. msg_text=f"rov-offline{config_.ENV_TEXT} - 地域分组小时级数据更新失败\n"
  622. f"exception: {e}\n"
  623. f"traceback: {traceback.format_exc()}"
  624. )
  625. if __name__ == '__main__':
  626. log_.info(f"region_h_data start...")
  627. h_timer_check()