alg_recsys_recall_1h_region.py 60 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221
  1. # -*- coding: utf-8 -*-
  2. import multiprocessing
  3. import traceback
  4. import gevent
  5. import datetime
  6. import pandas as pd
  7. import math
  8. from functools import reduce
  9. from odps import ODPS
  10. from threading import Timer
  11. from my_utils import MysqlHelper, RedisHelper, get_data_from_odps, filter_video_status, filter_shield_video, \
  12. check_table_partition_exits, filter_video_status_app, send_msg_to_feishu, filter_political_videos
  13. from my_config import set_config
  14. from log import Log
  15. from check_video_limit_distribute import update_limit_video_score
  16. config_, _ = set_config()
  17. log_ = Log()
  18. region_code = config_.REGION_CODE
  19. RULE_PARAMS = {
  20. 'rule_params': {
  21. 'rule66': {
  22. 'view_type': 'video-show-region',
  23. # 'score_func': '20240223',
  24. # 'lastonehour_allreturn': "1",
  25. 'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66'
  26. },
  27. 'rule67': {
  28. 'view_type': 'video-show-region', 'platform_return_rate': 0.001,
  29. 'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66', 'h_rule_key': 'rule66'
  30. },
  31. 'rule68': {
  32. 'view_type': 'video-show-region', 'platform_return_rate': 0.001,
  33. 'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66',
  34. 'score_func': 'back_rate_exponential_weighting1'
  35. },
  36. },
  37. 'data_params': config_.DATA_PARAMS,
  38. 'params_list': [
  39. # 532
  40. {'data': 'data66', 'rule': 'rule66'}, # 523-> 523 & 518
  41. # {'data': 'data66', 'rule': 'rule67'}, # 523->510
  42. # {'data': 'data66', 'rule': 'rule68'}, # 523->514
  43. # {'data': 'data66', 'rule': 'rule69'}, # 523->518
  44. ],
  45. }
  46. features = [
  47. 'apptype',
  48. 'code',
  49. 'videoid',
  50. 'lastonehour_preview', # 过去1小时预曝光人数 - 区分地域
  51. 'lastonehour_view', # 过去1小时曝光人数 - 区分地域
  52. 'lastonehour_play', # 过去1小时播放人数 - 区分地域
  53. 'lastonehour_share', # 过去1小时分享人数 - 区分地域
  54. 'lastonehour_return', # 过去1小时分享,过去1小时回流人数 - 区分地域
  55. 'lastonehour_preview_total', # 过去1小时预曝光次数 - 区分地域
  56. 'lastonehour_view_total', # 过去1小时曝光次数 - 区分地域
  57. 'lastonehour_play_total', # 过去1小时播放次数 - 区分地域
  58. 'lastonehour_share_total', # 过去1小时分享次数 - 区分地域
  59. 'platform_return',
  60. 'lastonehour_show', # 不区分地域
  61. 'lastonehour_show_region', # 地域分组
  62. 'lasttwohour_share', # h-2小时分享人数
  63. 'lasttwohour_return_now', # h-2分享,过去1小时回流人数
  64. 'lasttwohour_return', # h-2分享,h-2回流人数
  65. 'lastthreehour_share', # h-3小时分享人数
  66. 'lastthreehour_return_now', # h-3分享,过去1小时回流人数
  67. 'lastthreehour_return', # h-3分享,h-3回流人数
  68. 'lastonehour_return_new', # 过去1小时分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  69. 'lasttwohour_return_now_new', # h-2分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  70. 'lasttwohour_return_new', # h-2分享,h-2回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  71. 'lastthreehour_return_now_new', # h-3分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  72. 'lastthreehour_return_new', # h-3分享,h-3回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  73. 'platform_return_new', # 平台分发回流(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
  74. 'lastonehour_allreturn',
  75. 'lastonehour_allreturn_sharecnt'
  76. ]
  77. def get_region_code(region):
  78. """获取省份对应的code"""
  79. mysql_helper = MysqlHelper(mysql_info=config_.MYSQL_INFO)
  80. sql = f"SELECT ad_code FROM region_adcode WHERE parent_id = 0 AND region LIKE '{region}%';"
  81. ad_code = mysql_helper.get_data(sql=sql)
  82. return ad_code[0][0]
  83. def h_data_check(project, table, now_date):
  84. """检查数据是否准备好"""
  85. odps = ODPS(
  86. access_id=config_.ODPS_CONFIG['ACCESSID'],
  87. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  88. project=project,
  89. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  90. connect_timeout=3000,
  91. read_timeout=500000,
  92. pool_maxsize=1000,
  93. pool_connections=1000
  94. )
  95. try:
  96. dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
  97. # 测试 张博
  98. check_res = check_table_partition_exits(date=dt, project=project, table=table)
  99. if check_res:
  100. sql = f'select * from {project}.{table} where dt = "{dt}"'
  101. print("zhangbo-sql-是否有数据")
  102. print(sql)
  103. with odps.execute_sql(sql=sql).open_reader() as reader:
  104. data_count = reader.count
  105. else:
  106. data_count = 0
  107. except Exception as e:
  108. data_count = 0
  109. return data_count
  110. def get_rov_redis_key(now_date):
  111. """获取rov模型结果存放key"""
  112. redis_helper = RedisHelper()
  113. now_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  114. key_name = f'{config_.RECALL_KEY_NAME_PREFIX}{now_dt}'
  115. if not redis_helper.key_exists(key_name=key_name):
  116. pre_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  117. key_name = f'{config_.RECALL_KEY_NAME_PREFIX}{pre_dt}'
  118. return key_name
  119. def get_day_30day_videos(now_date, data_key, rule_key):
  120. """获取天级更新相对30天的视频id"""
  121. redis_helper = RedisHelper()
  122. day_30day_recall_key_prefix = config_.RECALL_KEY_NAME_PREFIX_30DAY
  123. now_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  124. day_30day_recall_key_name = f"{day_30day_recall_key_prefix}{data_key}:{rule_key}:{now_dt}"
  125. if not redis_helper.key_exists(key_name=day_30day_recall_key_name):
  126. redis_dt = datetime.datetime.strftime((now_date - datetime.timedelta(days=1)), '%Y%m%d')
  127. day_30day_recall_key_name = f"{day_30day_recall_key_prefix}{data_key}:{rule_key}:{redis_dt}"
  128. data = redis_helper.get_all_data_from_zset(key_name=day_30day_recall_key_name, with_scores=True)
  129. if data is None:
  130. return None
  131. video_ids = [int(video_id) for video_id, _ in data]
  132. return video_ids
  133. def get_feature_data(project, table, now_date):
  134. """获取特征数据"""
  135. dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
  136. # 张博 测试
  137. records = get_data_from_odps(date=dt, project=project, table=table)
  138. feature_data = []
  139. for record in records:
  140. item = {}
  141. for feature_name in features:
  142. item[feature_name] = record[feature_name]
  143. feature_data.append(item)
  144. feature_df = pd.DataFrame(feature_data)
  145. return feature_df
  146. def cal_score_initial_20240223(df, param):
  147. """
  148. 计算score
  149. :param df: 特征数据
  150. :param param: 规则参数
  151. :return:
  152. """
  153. log_.info("进入了cal_score_initial_20240223")
  154. df = df.fillna(0)
  155. df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
  156. df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
  157. df['back_rate_new'] = (df['lastonehour_return'] + 1) / (df['lastonehour_share'] + 10)
  158. df['back_rate_all'] = df['lastonehour_allreturn'] / (df['lastonehour_allreturn_sharecnt'] + 10)
  159. df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
  160. df['log_back_all'] = (df['lastonehour_allreturn'] + 1).apply(math.log)
  161. if param.get('view_type', None) == 'video-show':
  162. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
  163. elif param.get('view_type', None) == 'video-show-region':
  164. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
  165. else:
  166. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
  167. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  168. df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
  169. df['score'] = df['share_rate'] * (
  170. df['back_rate_new'] + 0.01 * df['back_rate_all']
  171. ) * (
  172. df['log_back'] + 0.01 * df['log_back_all']
  173. ) * df['K2']
  174. df = df.sort_values(by=['score'], ascending=False)
  175. return df
  176. def cal_score_initial(df, param):
  177. """
  178. 计算score
  179. :param df: 特征数据
  180. :param param: 规则参数
  181. :return:
  182. """
  183. # score计算公式: sharerate*backrate*logback*ctr
  184. # sharerate = lastonehour_share/(lastonehour_play+1000)
  185. # backrate = lastonehour_return/(lastonehour_share+10)
  186. # ctr = lastonehour_play/(lastonehour_preview+1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
  187. # score = sharerate * backrate * LOG(lastonehour_return+1) * K2
  188. df = df.fillna(0)
  189. df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
  190. df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
  191. df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
  192. if param.get('view_type', None) == 'video-show':
  193. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
  194. elif param.get('view_type', None) == 'video-show-region':
  195. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
  196. else:
  197. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
  198. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  199. df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
  200. df['score1'] = df['share_rate'] * df['back_rate'] * df['log_back'] * df['K2']
  201. click_score_rate = param.get('click_score_rate', None)
  202. back_score_rate = param.get('click_score_rate', None)
  203. if click_score_rate is not None:
  204. df['score'] = (1 - click_score_rate) * df['score1'] + click_score_rate * df['K2']
  205. elif back_score_rate is not None:
  206. df['score'] = (1 - back_score_rate) * df['score1'] + back_score_rate * df['back_rate']
  207. else:
  208. df['score'] = df['score1']
  209. df = df.sort_values(by=['score'], ascending=False)
  210. return df
  211. def cal_score_add_return(df, param):
  212. # score计算公式: sharerate*(backrate*logback + backrate2*logback_now2 + backrate3*logback_now3)*ctr
  213. # sharerate = lastonehour_share/(lastonehour_play+1000)
  214. # backrate = lastonehour_return/(lastonehour_share+10)
  215. # backrate2 = lasttwohour_return_now/(lasttwohour_share+10)
  216. # backrate3 = lastthreehour_return_now/(lastthreehour_share+10)
  217. # ctr = lastonehour_play/(lastonehour_preview+1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
  218. # score = k2 * sharerate * (backrate * LOG(lastonehour_return+1) + backrate_2 * LOG(lasttwohour_return_now+1) + backrate_3 * LOG(lastthreehour_return_now+1))
  219. df = df.fillna(0)
  220. df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
  221. df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
  222. df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
  223. df['back_rate2'] = df['lasttwohour_return_now'] / (df['lasttwohour_share'] + 10)
  224. df['log_back2'] = (df['lasttwohour_return_now'] + 1).apply(math.log)
  225. df['back_rate3'] = df['lastthreehour_return_now'] / (df['lastthreehour_share'] + 10)
  226. df['log_back3'] = (df['lastthreehour_return_now'] + 1).apply(math.log)
  227. if param.get('view_type', None) == 'video-show':
  228. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
  229. elif param.get('view_type', None) == 'video-show-region':
  230. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
  231. else:
  232. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
  233. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  234. df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
  235. df['score'] = df['K2'] * df['share_rate'] * (
  236. df['back_rate'] * df['log_back'] +
  237. df['back_rate2'] * df['log_back2'] +
  238. df['back_rate3'] * df['log_back3']
  239. )
  240. df = df.sort_values(by=['score'], ascending=False)
  241. return df
  242. def cal_score_multiply_return_retention(df, param):
  243. # score计算公式: k2 * sharerate * backrate * LOG(lastonehour_return+1) * 前两小时回流留存
  244. # sharerate = lastonehour_share/(lastonehour_play+1000)
  245. # backrate = lastonehour_return/(lastonehour_share+10)
  246. # ctr = lastonehour_play/(lastonehour_preview+1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
  247. # 前两小时回流留存 return_retention_initial = (lasttwohour_return_now + lastthreehour_return_now)/(lasttwohour_return + lastthreehour_return + 1)
  248. # return_retention = 0.5 if return_retention_initial == 0 else return_retention_initial
  249. # score = k2 * sharerate * backrate * LOG(lastonehour_return+1) * return_retention
  250. df = df.fillna(0)
  251. df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
  252. df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
  253. df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
  254. if param.get('view_type', None) == 'video-show':
  255. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
  256. elif param.get('view_type', None) == 'video-show-region':
  257. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
  258. else:
  259. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
  260. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  261. df['return_retention_initial'] = (df['lasttwohour_return_now'] + df['lastthreehour_return_now']) / \
  262. (df['lasttwohour_return'] + df['lastthreehour_return'] + 1)
  263. df['return_retention'] = df['return_retention_initial'].apply(lambda x: 0.5 if x == 0 else x)
  264. df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
  265. df['score'] = df['K2'] * df['share_rate'] * df['back_rate'] * df['log_back'] * df['return_retention']
  266. df = df.sort_values(by=['score'], ascending=False)
  267. return df
  268. def cal_score_update_backrate(df, param):
  269. # score计算公式: k2 * sharerate * (backrate + backrate * backrate_2 * backrate_3) * LOG(lastonehour_return+1)
  270. # sharerate = lastonehour_share/(lastonehour_play+1000)
  271. # backrate = lastonehour_return/(lastonehour_share+10)
  272. # backrate2 = lasttwohour_return_now/(lasttwohour_share+10)
  273. # backrate3 = lastthreehour_return_now/(lastthreehour_share+10)
  274. # ctr = lastonehour_play/(lastonehour_preview+1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
  275. # backrate1_3_initial = backrate * backrate_2 * backrate_3
  276. # backrate1_3 = 0.02 if backrate1_3_initial == 0 else backrate1_3_initial
  277. # score = k2 * sharerate * (backrate + backrate1_3) * LOG(lastonehour_return+1)
  278. df = df.fillna(0)
  279. df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
  280. df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
  281. df['back_rate2'] = df['lasttwohour_return_now'] / (df['lasttwohour_share'] + 10)
  282. df['back_rate3'] = df['lastthreehour_return_now'] / (df['lastthreehour_share'] + 10)
  283. df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
  284. if param.get('view_type', None) == 'video-show':
  285. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
  286. elif param.get('view_type', None) == 'video-show-region':
  287. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
  288. else:
  289. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
  290. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  291. df['backrate1_3_initial'] = df['back_rate'] * df['back_rate2'] * df['back_rate3']
  292. df['backrate1_3'] = df['backrate1_3_initial'].apply(lambda x: 0.02 if x == 0 else x)
  293. df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
  294. df['score'] = df['K2'] * df['share_rate'] * (df['back_rate'] + df['backrate1_3']) * df['log_back']
  295. df = df.sort_values(by=['score'], ascending=False)
  296. return df
  297. def cal_score_with_new_return(df, param):
  298. # 回流数据使用 分享限制地域,回流不限制地域 统计数据
  299. # score计算公式: sharerate*backrate*logback*ctr
  300. # sharerate = lastonehour_share/(lastonehour_play+1000)
  301. # backrate = lastonehour_return_new/(lastonehour_share+10)
  302. # ctr = lastonehour_play/(lastonehour_preview+1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
  303. # score = sharerate * backrate * LOG(lastonehour_return_new+1) * K2
  304. df = df.fillna(0)
  305. df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
  306. df['back_rate'] = df['lastonehour_return_new'] / (df['lastonehour_share'] + 10)
  307. df['log_back'] = (df['lastonehour_return_new'] + 1).apply(math.log)
  308. if param.get('view_type', None) == 'video-show':
  309. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
  310. elif param.get('view_type', None) == 'video-show-region':
  311. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
  312. else:
  313. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
  314. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  315. df['platform_return_rate'] = df['platform_return_new'] / df['lastonehour_return_new']
  316. df['score'] = df['share_rate'] * df['back_rate'] * df['log_back'] * df['K2']
  317. df = df.sort_values(by=['score'], ascending=False)
  318. return df
  319. def cal_score_multiply_return_retention_with_new_return(df, param):
  320. # 回流数据使用 分享限制地域,回流不限制地域 统计数据
  321. # score计算公式: k2 * sharerate * backrate * LOG(lastonehour_return_new+1) * 前两小时回流留存
  322. # sharerate = lastonehour_share/(lastonehour_play+1000)
  323. # backrate = lastonehour_return_new/(lastonehour_share+10)
  324. # ctr = lastonehour_play/(lastonehour_preview+1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
  325. # 前两小时回流留存 return_retention_initial = (lasttwohour_return_now_new + lastthreehour_return_now_new)/(lasttwohour_return_new + lastthreehour_return_new + 1)
  326. # return_retention = 0.5 if return_retention_initial == 0 else return_retention_initial
  327. # score = k2 * sharerate * backrate * LOG(lastonehour_return_new+1) * return_retention
  328. df = df.fillna(0)
  329. df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
  330. df['back_rate'] = df['lastonehour_return_new'] / (df['lastonehour_share'] + 10)
  331. df['log_back'] = (df['lastonehour_return_new'] + 1).apply(math.log)
  332. if param.get('view_type', None) == 'video-show':
  333. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
  334. elif param.get('view_type', None) == 'video-show-region':
  335. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
  336. else:
  337. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
  338. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  339. df['return_retention_initial'] = (df['lasttwohour_return_now_new'] + df['lastthreehour_return_now_new']) / \
  340. (df['lasttwohour_return_new'] + df['lastthreehour_return_new'] + 1)
  341. df['return_retention'] = df['return_retention_initial'].apply(lambda x: 0.5 if x == 0 else x)
  342. df['platform_return_rate'] = df['platform_return_new'] / df['lastonehour_return_new']
  343. df['score'] = df['K2'] * df['share_rate'] * df['back_rate'] * df['log_back'] * df['return_retention']
  344. df = df.sort_values(by=['score'], ascending=False)
  345. return df
  346. def cal_score_with_back_view0(df, param):
  347. # score = sharerate*backrate*log(return+1)*CTR,
  348. # sharerate=(lastonehour_share+1)/(lastonehour_play+1000)
  349. # backrate=(lastonehour_return+1)/(lastonehour_share+10)
  350. # CTR=(lastonehour_play+1)/(lastonehour_view+100), ctr不进行校正
  351. df = df.fillna(0)
  352. df['share_rate'] = (df['lastonehour_share'] + 1) / (df['lastonehour_play'] + 1000)
  353. df['back_rate'] = (df['lastonehour_return'] + 1) / (df['lastonehour_share'] + 10)
  354. df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
  355. df['ctr'] = (df['lastonehour_play'] + 1) / (df['lastonehour_view'] + 100)
  356. df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
  357. df['score'] = df['share_rate'] * df['back_rate'] * df['log_back'] * df['ctr']
  358. df = df.sort_values(by=['score'], ascending=False)
  359. return df
  360. def cal_score_with_back_view1(df, param):
  361. # score = back_play_rate*log(return+1)*CTR,
  362. # back_play_rate=(lastonehour_return+1)/(lastonehour_play+1000)
  363. # CTR=(lastonehour_play+1)/(lastonehour_view+100), ctr不进行校正
  364. df = df.fillna(0)
  365. df['back_play_rate'] = (df['lastonehour_return'] + 1) / (df['lastonehour_play'] + 1000)
  366. df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
  367. df['ctr'] = (df['lastonehour_play'] + 1) / (df['lastonehour_view'] + 100)
  368. df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
  369. df['score'] = df['back_play_rate'] * df['log_back'] * df['ctr']
  370. df = df.sort_values(by=['score'], ascending=False)
  371. return df
  372. def cal_score_with_back_rate_exponential_weighting1(df, param):
  373. """
  374. 计算score
  375. :param df: 特征数据
  376. :param param: 规则参数
  377. :return:
  378. """
  379. # score计算公式: score = sharerate * backrate ^ 2 * LOG(lastonehour_return + 1) * K2
  380. # sharerate = lastonehour_share / (lastonehour_play + 1000)
  381. # backrate = lastonehour_return / (lastonehour_share + 10)
  382. # ctr = lastonehour_play / (lastonehour_show + 1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
  383. df = df.fillna(0)
  384. df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
  385. df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
  386. df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
  387. if param.get('view_type', None) == 'video-show':
  388. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
  389. elif param.get('view_type', None) == 'video-show-region':
  390. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
  391. else:
  392. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
  393. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  394. df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
  395. df['score'] = df['share_rate'] * df['back_rate'] ** 2 * df['log_back'] * df['K2']
  396. df = df.sort_values(by=['score'], ascending=False)
  397. return df
  398. def cal_score_with_back_rate_exponential_weighting2(df, param):
  399. """
  400. 计算score
  401. :param df: 特征数据
  402. :param param: 规则参数
  403. :return:
  404. """
  405. # score计算公式: score = sharerate ^ 0.5 * backrate ^ 2 * LOG(lastonehour_return + 1) * K2 ^ 0.5
  406. # sharerate = lastonehour_share / (lastonehour_play + 1000)
  407. # backrate = lastonehour_return / (lastonehour_share + 10)
  408. # ctr = lastonehour_play / (lastonehour_show + 1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
  409. df = df.fillna(0)
  410. df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
  411. df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
  412. df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
  413. if param.get('view_type', None) == 'video-show':
  414. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
  415. elif param.get('view_type', None) == 'video-show-region':
  416. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
  417. else:
  418. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
  419. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  420. df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
  421. df['score'] = df['share_rate'] ** 0.5 * df['back_rate'] ** 2 * df['log_back'] * df['K2'] ** 0.5
  422. df = df.sort_values(by=['score'], ascending=False)
  423. return df
  424. def cal_score_with_back_rate_by_rank_weighting(df, param):
  425. """
  426. add by sunmingze 20231123
  427. 计算score
  428. :param df: 特征数据
  429. :param param: 规则参数
  430. :return:
  431. """
  432. # score计算公式: score = 1 / sharerate(rank)^0.5 + 5 / backrate(rank)^0.5 + 10 / LOG(lastonehour_return +1)(rank) ^0.5
  433. # + 1 / K2(rank)^0.5
  434. # sharerate = lastonehour_share / (lastonehour_play + 1000)
  435. # backrate = lastonehour_return / (lastonehour_share + 10)
  436. # ctr = lastonehour_play / (lastonehour_show + 1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
  437. df = df.fillna(0)
  438. df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
  439. df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
  440. df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
  441. if param.get('view_type', None) == 'video-show':
  442. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
  443. elif param.get('view_type', None) == 'video-show-region':
  444. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
  445. else:
  446. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
  447. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  448. df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
  449. # 分别的得到sharerate、backrate、K值、return人数的序关系
  450. df['rank_by_sharerate'] = df['share_rate'].rank(ascending=0, method='dense')
  451. df['rank_by_backrate'] = df['back_rate'].rank(ascending=0, method='dense')
  452. df['rank_by_K2'] = df['K2'].rank(ascending=0, method='dense')
  453. df['rank_by_logback'] = df['log_back'].rank(ascending=0, method='dense')
  454. # 计算基于序的加法关系函数
  455. df['score'] = 1/(df['rank_by_sharerate'] + 10) + 5/(df['rank_by_backrate'] + 10)
  456. df['score'] = df['score'] + 5/(df['rank_by_logback'] + 10) + 1/(df['rank_by_K2'] + 10)
  457. df = df.sort_values(by=['score'], ascending=False)
  458. return df
  459. def cal_score(df, param):
  460. if param.get('return_data', None) == 'share_region_return':
  461. if param.get('score_func', None) == 'multiply_return_retention':
  462. df = cal_score_multiply_return_retention_with_new_return(df=df, param=param)
  463. else:
  464. df = cal_score_with_new_return(df=df, param=param)
  465. else:
  466. if param.get('score_func', None) == 'add_backrate*log(return+1)':
  467. df = cal_score_add_return(df=df, param=param)
  468. elif param.get('score_func', None) == 'multiply_return_retention':
  469. df = cal_score_multiply_return_retention(df=df, param=param)
  470. elif param.get('score_func', None) == 'update_backrate':
  471. df = cal_score_update_backrate(df=df, param=param)
  472. elif param.get('score_func', None) == 'back_view0':
  473. df = cal_score_with_back_view0(df=df, param=param)
  474. elif param.get('score_func', None) == 'back_view1':
  475. df = cal_score_with_back_view1(df=df, param=param)
  476. elif param.get('score_func', None) == 'back_rate_exponential_weighting1':
  477. df = cal_score_with_back_rate_exponential_weighting1(df=df, param=param)
  478. elif param.get('score_func', None) == 'back_rate_exponential_weighting2':
  479. df = cal_score_with_back_rate_exponential_weighting2(df=df, param=param)
  480. elif param.get('score_func', None) == 'back_rate_rank_weighting':
  481. df = cal_score_with_back_rate_by_rank_weighting(df=df, param=param)
  482. elif param.get('score_func', None) == '20240223':
  483. df = cal_score_initial_20240223(df=df, param=param)
  484. else:
  485. df = cal_score_initial(df=df, param=param)
  486. return df
  487. def add_func1(initial_df, pre_h_df):
  488. """当前小时级数据与前几个小时数据合并"""
  489. score_list = initial_df['score'].to_list()
  490. if len(score_list) > 0:
  491. min_score = min(score_list)
  492. else:
  493. min_score = 0
  494. pre_h_df = pre_h_df[pre_h_df['score'] > min_score]
  495. df = pd.concat([initial_df, pre_h_df], ignore_index=True)
  496. # videoid去重,保留分值高
  497. df['videoid'] = df['videoid'].astype(int)
  498. df = df.sort_values(by=['score'], ascending=False)
  499. df = df.drop_duplicates(subset=['videoid'], keep="first")
  500. return df
  501. def add_func2(initial_df, pre_h_df):
  502. """当前小时级数据与前几个小时数据合并: 当前小时存在的视频以当前小时为准,否则以高分为主"""
  503. score_list = initial_df['score'].to_list()
  504. if len(score_list) > 0:
  505. min_score = min(score_list)
  506. else:
  507. min_score = 0
  508. initial_video_id_list = initial_df['videoid'].to_list()
  509. pre_h_df = pre_h_df[pre_h_df['score'] > min_score]
  510. pre_h_df = pre_h_df[~pre_h_df['videoid'].isin(initial_video_id_list)]
  511. df = pd.concat([initial_df, pre_h_df], ignore_index=True)
  512. # videoid去重,保留分值高
  513. df['videoid'] = df['videoid'].astype(int)
  514. df = df.sort_values(by=['score'], ascending=False)
  515. df = df.drop_duplicates(subset=['videoid'], keep="first")
  516. return df
  517. def add_videos(initial_df, now_date, rule_key, region, data_key, hour_count, top, add_func):
  518. """
  519. 地域小时级数据列表中增加前6h优质视频
  520. :param initial_df: 地域小时级筛选结果
  521. :param now_date:
  522. :param data_key:
  523. :param region:
  524. :param rule_key:
  525. :param hour_count: 前几个小时, type-int
  526. :param top: type-int
  527. :return: df
  528. """
  529. redis_helper = RedisHelper()
  530. pre_h_data = []
  531. for i in range(1, hour_count+1):
  532. pre_date = now_date - datetime.timedelta(hours=i)
  533. pre_h = pre_date.hour
  534. pre_h_recall_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{data_key}:{rule_key}:" \
  535. f"{datetime.datetime.strftime(pre_date, '%Y%m%d')}:{pre_h}"
  536. pre_h_top_data = redis_helper.get_data_zset_with_index(key_name=pre_h_recall_key_name,
  537. start=0, end=top-1,
  538. desc=True, with_scores=True)
  539. if pre_h_top_data is None:
  540. continue
  541. pre_h_data.extend(pre_h_top_data)
  542. pre_h_df = pd.DataFrame(data=pre_h_data, columns=['videoid', 'score'])
  543. if add_func == 'func2':
  544. df = add_func2(initial_df=initial_df, pre_h_df=pre_h_df)
  545. else:
  546. df = add_func1(initial_df=initial_df, pre_h_df=pre_h_df)
  547. return df
  548. def video_rank(df, now_date, now_h, rule_key, param, region, data_key, rule_rank_h_flag,
  549. add_videos_with_pre_h=False, hour_count=0):
  550. """
  551. 获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并
  552. :param df:
  553. :param now_date:
  554. :param now_h:
  555. :param rule_key: 小时级数据进入条件
  556. :param param: 小时级数据进入条件参数
  557. :param region: 所属地域
  558. :return:
  559. """
  560. redis_helper = RedisHelper()
  561. # 获取符合进入召回源条件的视频,进入条件:小时级回流>=20 && score>=0.005
  562. return_count = param.get('return_count', 1)
  563. score_value = param.get('score_rule', 0)
  564. platform_return_rate = param.get('platform_return_rate', 0)
  565. # h_recall_df = df[(df['lastonehour_return'] >= return_count) & (df['score'] >= score_value)
  566. # & (df['platform_return_rate'] >= platform_return_rate)]
  567. h_recall_df = df[
  568. (df['lastonehour_return'] >= return_count) &
  569. (df['score'] >= score_value) &
  570. (df['platform_return_rate'] >= platform_return_rate)
  571. ]
  572. if "lastonehour_allreturn" in param.keys():
  573. log_.info("采用 lastonehour_allreturn 过滤")
  574. h_recall_df = df[
  575. (df['lastonehour_allreturn'] > 0)
  576. ]
  577. # try:
  578. # if "return_countv2" in param.keys() and "platform_return_ratev2" in param.keys():
  579. # return_countv2 = param["return_countv2"]
  580. # platform_return_ratev2 = param["platform_return_ratev2"]
  581. # h_recall_df = h_recall_df[
  582. # df['platform_return_rate'] >= platform_return_ratev2 |
  583. # (df['platform_return_rate'] < platform_return_ratev2 & df['lastonehour_return'] > return_countv2)
  584. # ]
  585. # except Exception as e:
  586. # log_.error("return_countv2 is wrong with{}".format(e))
  587. # videoid重复时,保留分值高
  588. h_recall_df = h_recall_df.sort_values(by=['score'], ascending=False)
  589. h_recall_df = h_recall_df.drop_duplicates(subset=['videoid'], keep='first')
  590. h_recall_df['videoid'] = h_recall_df['videoid'].astype(int)
  591. log_.info(f"各种规则过滤后,一共有多少个视频 = {len(h_recall_df)}")
  592. # 增加打捞的优质视频
  593. if add_videos_with_pre_h is True:
  594. add_func = param.get('add_func', None)
  595. h_recall_df = add_videos(initial_df=h_recall_df, now_date=now_date, rule_key=rule_key,
  596. region=region, data_key=data_key, hour_count=hour_count, top=10, add_func=add_func)
  597. log_.info(f"打捞优质视频完成")
  598. h_recall_videos = h_recall_df['videoid'].to_list()
  599. log_.info(f"各种规则增加后,一共有多少个视频 = {len(h_recall_videos)}")
  600. # 视频状态过滤
  601. if data_key in ['data7', ]:
  602. filtered_videos = filter_video_status_app(h_recall_videos)
  603. else:
  604. filtered_videos = filter_video_status(h_recall_videos)
  605. # 屏蔽视频过滤
  606. shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
  607. shield_key_name_list = shield_config.get(region, None)
  608. if shield_key_name_list is not None:
  609. filtered_videos = filter_shield_video(video_ids=filtered_videos, shield_key_name_list=shield_key_name_list)
  610. # 涉政视频过滤
  611. political_filter = param.get('political_filter', None)
  612. if political_filter is True:
  613. filtered_videos = filter_political_videos(video_ids=filtered_videos)
  614. log_.info(f"视频状态-涉政等-过滤后,一共有多少个视频 = {len(filtered_videos)}")
  615. h_video_ids = []
  616. by_30day_rule_key = param.get('30day_rule_key', None)
  617. if by_30day_rule_key is not None:
  618. # 与相对30天列表去重
  619. h_video_ids = get_day_30day_videos(now_date=now_date, data_key=data_key, rule_key=by_30day_rule_key)
  620. if h_video_ids is not None:
  621. filtered_videos = [video_id for video_id in filtered_videos if int(video_id) not in h_video_ids]
  622. # 写入对应的redis
  623. h_recall_result = {}
  624. for video_id in filtered_videos:
  625. score = h_recall_df[h_recall_df['videoid'] == video_id]['score']
  626. h_recall_result[int(video_id)] = float(score)
  627. h_video_ids.append(int(video_id))
  628. h_recall_key_name = \
  629. f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{data_key}:{rule_key}:" \
  630. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  631. log_.info("打印地域1小时的某个地域{},redis key:{}".format(region, h_recall_key_name))
  632. if len(h_recall_result) > 0:
  633. log_.info(f"开始写入头部数据:count = {len(h_recall_result)}, key = {h_recall_key_name}")
  634. redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=2 * 24 * 3600)
  635. # 限流视频score调整
  636. tmp = update_limit_video_score(initial_videos=h_recall_result, key_name=h_recall_key_name)
  637. if tmp:
  638. log_.info(f"走了限流逻辑后:count = {len(h_recall_result)}, key = {h_recall_key_name}")
  639. else:
  640. log_.info("走了限流逻辑,但没更改redis,未生效。")
  641. # 清空线上过滤应用列表
  642. # redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{app_type}.{data_key}.{rule_key}")
  643. else:
  644. log_.info(f"无数据,不写入。")
  645. # h_rule_key = param.get('h_rule_key', None)
  646. # region_24h_rule_key = param.get('region_24h_rule_key', 'rule1')
  647. # by_24h_rule_key = param.get('24h_rule_key', None)
  648. # by_48h_rule_key = param.get('48h_rule_key', None)
  649. # dup_remove = param.get('dup_remove', True)
  650. # # 与其他召回视频池去重,存入对应的redis
  651. # dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, h_rule_key=h_rule_key,
  652. # region_24h_rule_key=region_24h_rule_key, by_24h_rule_key=by_24h_rule_key,
  653. # by_48h_rule_key=by_48h_rule_key, region=region, data_key=data_key,
  654. # rule_rank_h_flag=rule_rank_h_flag, political_filter=political_filter,
  655. # shield_config=shield_config, dup_remove=dup_remove)
  656. def dup_data(h_video_ids, initial_key_name, dup_key_name, region, political_filter, shield_config, dup_remove):
  657. redis_helper = RedisHelper()
  658. if redis_helper.key_exists(key_name=initial_key_name):
  659. initial_data = redis_helper.get_all_data_from_zset(key_name=initial_key_name, with_scores=True)
  660. # 屏蔽视频过滤
  661. initial_video_ids = [int(video_id) for video_id, _ in initial_data]
  662. shield_key_name_list = shield_config.get(region, None)
  663. if shield_key_name_list is not None:
  664. initial_video_ids = filter_shield_video(video_ids=initial_video_ids,
  665. shield_key_name_list=shield_key_name_list)
  666. # 涉政视频过滤
  667. if political_filter is True:
  668. initial_video_ids = filter_political_videos(video_ids=initial_video_ids)
  669. dup_data = {}
  670. # 视频去重逻辑
  671. if dup_remove is True:
  672. for video_id, score in initial_data:
  673. if int(video_id) not in h_video_ids and int(video_id) in initial_video_ids:
  674. dup_data[int(video_id)] = score
  675. h_video_ids.append(int(video_id))
  676. else:
  677. for video_id, score in initial_data:
  678. if int(video_id) in initial_video_ids:
  679. dup_data[int(video_id)] = score
  680. if len(dup_data) > 0:
  681. redis_helper.add_data_with_zset(key_name=dup_key_name, data=dup_data, expire_time=2 * 24 * 3600)
  682. # 限流视频score调整
  683. update_limit_video_score(initial_videos=dup_data, key_name=dup_key_name)
  684. return h_video_ids
  685. def dup_to_redis(h_video_ids, now_date, now_h, rule_key, h_rule_key, region_24h_rule_key, by_24h_rule_key, by_48h_rule_key,
  686. region, data_key, rule_rank_h_flag, political_filter, shield_config, dup_remove):
  687. """将地域分组小时级数据与其他召回视频池去重,存入对应的redis"""
  688. # ##### 去重更新不区分地域小时级列表,并另存为redis中
  689. if h_rule_key is not None:
  690. h_key_name = \
  691. f"{config_.RECALL_KEY_NAME_PREFIX_BY_H_H}{data_key}:{h_rule_key}:" \
  692. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  693. h_dup_key_name = \
  694. f"{config_.RECALL_KEY_NAME_PREFIX_DUP_H_H}{region}:{data_key}:{rule_key}:" \
  695. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  696. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_key_name,
  697. dup_key_name=h_dup_key_name, region=region, political_filter=political_filter,
  698. shield_config=shield_config, dup_remove=dup_remove)
  699. # ##### 去重更新地域分组小时级24h列表,并另存为redis中
  700. region_24h_key_name = \
  701. f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}:{data_key}:{region_24h_rule_key}:" \
  702. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  703. region_24h_dup_key_name = \
  704. f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
  705. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  706. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=region_24h_key_name,
  707. dup_key_name=region_24h_dup_key_name, region=region, political_filter=political_filter,
  708. shield_config=shield_config, dup_remove=dup_remove)
  709. if rule_rank_h_flag == '48h':
  710. # ##### 去重小程序相对48h更新结果,并另存为redis中
  711. h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H}{data_key}:{by_48h_rule_key}:" \
  712. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  713. h_48h_dup_key_name = \
  714. f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H}{region}:{data_key}:{rule_key}:" \
  715. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  716. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_48h_key_name,
  717. dup_key_name=h_48h_dup_key_name, region=region, political_filter=political_filter,
  718. shield_config=shield_config, dup_remove=dup_remove)
  719. # ##### 去重小程序相对48h 筛选后剩余数据 更新结果,并另存为redis中
  720. if by_48h_rule_key == 'rule1':
  721. other_h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H_OTHER}{data_key}:" \
  722. f"{by_48h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  723. other_h_48h_dup_key_name = \
  724. f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H}{region}:{data_key}:{rule_key}:" \
  725. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  726. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_48h_key_name,
  727. dup_key_name=other_h_48h_dup_key_name, region=region,
  728. political_filter=political_filter, shield_config=shield_config,
  729. dup_remove=dup_remove)
  730. else:
  731. # ##### 去重小程序相对24h更新结果,并另存为redis中
  732. h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{data_key}:{by_24h_rule_key}:" \
  733. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  734. h_24h_dup_key_name = \
  735. f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
  736. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  737. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_24h_key_name,
  738. dup_key_name=h_24h_dup_key_name, region=region, political_filter=political_filter,
  739. shield_config=shield_config, dup_remove=dup_remove)
  740. # ##### 去重小程序相对24h 筛选后剩余数据 更新结果,并另存为redis中
  741. # if by_24h_rule_key in ['rule3', 'rule4']:
  742. other_h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER}{data_key}:" \
  743. f"{by_24h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  744. other_h_24h_dup_key_name = \
  745. f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
  746. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  747. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_24h_key_name,
  748. dup_key_name=other_h_24h_dup_key_name, region=region, political_filter=political_filter,
  749. shield_config=shield_config, dup_remove=dup_remove)
  750. # ##### 去重小程序模型更新结果,并另存为redis中
  751. # model_key_name = get_rov_redis_key(now_date=now_date)
  752. # model_data_dup_key_name = \
  753. # f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H}{region}:{data_key}:{rule_key}:" \
  754. # f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  755. # h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=model_key_name,
  756. # dup_key_name=model_data_dup_key_name, region=region)
  757. def merge_df(df_left, df_right):
  758. """
  759. df按照videoid, code 合并,对应特征求和
  760. :param df_left:
  761. :param df_right:
  762. :return:
  763. """
  764. df_merged = pd.merge(df_left, df_right, on=['videoid', 'code'], how='outer', suffixes=['_x', '_y'])
  765. df_merged.fillna(0, inplace=True)
  766. feature_list = ['videoid', 'code']
  767. for feature in features:
  768. if feature in ['apptype', 'videoid', 'code']:
  769. continue
  770. df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
  771. feature_list.append(feature)
  772. return df_merged[feature_list]
  773. def merge_df_with_score(df_left, df_right):
  774. """
  775. df 按照[videoid, code]合并,平台回流人数、回流人数、分数 分别求和
  776. :param df_left:
  777. :param df_right:
  778. :return:
  779. """
  780. df_merged = pd.merge(df_left, df_right, on=['videoid', 'code'], how='outer', suffixes=['_x', '_y'])
  781. df_merged.fillna(0, inplace=True)
  782. feature_list = ['videoid', 'code', 'lastonehour_return', 'platform_return', 'score']
  783. for feature in feature_list[2:]:
  784. df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
  785. return df_merged[feature_list]
  786. def process_with_region(region, df_merged, data_key, rule_key, rule_param, now_date, now_h,
  787. rule_rank_h_flag, add_videos_with_pre_h, hour_count):
  788. log_.info(f"多协程的region = {region} 开始执行")
  789. region_df = df_merged[df_merged['code'] == region]
  790. log_.info(f'该区域region = {region}, 下有多少数据量 = {len(region_df)}')
  791. score_df = cal_score(df=region_df, param=rule_param)
  792. video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=rule_key, param=rule_param,
  793. region=region, data_key=data_key, rule_rank_h_flag=rule_rank_h_flag,
  794. add_videos_with_pre_h=add_videos_with_pre_h, hour_count=hour_count)
  795. log_.info(f"多协程的region = {region} 完成执行")
  796. def process_with_region2(region, df_merged, data_key, rule_key, rule_param, now_date, now_h,
  797. rule_rank_h_flag, add_videos_with_pre_h, hour_count):
  798. log_.info(f"region = {region} start...")
  799. region_score_df = df_merged[df_merged['code'] == region]
  800. log_.info(f'region = {region}, region_score_df count = {len(region_score_df)}')
  801. video_rank(df=region_score_df, now_date=now_date, now_h=now_h, region=region,
  802. rule_key=rule_key, param=rule_param, data_key=data_key, rule_rank_h_flag=rule_rank_h_flag,
  803. add_videos_with_pre_h=add_videos_with_pre_h, hour_count=hour_count)
  804. log_.info(f"region = {region} end!")
  805. def process_with_app_type(app_type, params, region_code_list, feature_df, now_date, now_h, rule_rank_h_flag):
  806. log_.info(f"app_type = {app_type} start...")
  807. data_params_item = params.get('data_params')
  808. rule_params_item = params.get('rule_params')
  809. task_list = []
  810. for param in params.get('params_list'):
  811. data_key = param.get('data')
  812. data_param = data_params_item.get(data_key)
  813. log_.info(f"data_key = {data_key}, data_param = {data_param}")
  814. df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
  815. df_merged = reduce(merge_df, df_list)
  816. rule_key = param.get('rule')
  817. rule_param = rule_params_item.get(rule_key)
  818. log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  819. task_list.extend(
  820. [
  821. gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param,
  822. now_date, now_h, rule_rank_h_flag)
  823. for region in region_code_list
  824. ]
  825. )
  826. gevent.joinall(task_list)
  827. log_.info(f"app_type = {app_type} end!")
  828. # log_.info(f"app_type = {app_type}")
  829. # task_list = []
  830. # for data_key, data_param in params['data_params'].items():
  831. # log_.info(f"data_key = {data_key}, data_param = {data_param}")
  832. # df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
  833. # df_merged = reduce(merge_df, df_list)
  834. # for rule_key, rule_param in params['rule_params'].items():
  835. # log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  836. # task_list.extend(
  837. # [
  838. # gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param,
  839. # now_date, now_h)
  840. # for region in region_code_list
  841. # ]
  842. # )
  843. # gevent.joinall(task_list)
  844. def copy_data_for_city(region, city_code, data_key, rule_key, now_date, now_h, shield_config):
  845. """copy 对应数据到城市对应redis,并做相应屏蔽视频过滤"""
  846. log_.info(f"city_code = {city_code} start ...")
  847. redis_helper = RedisHelper()
  848. key_prefix_list = [
  849. config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H, # 地域小时级
  850. config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H, # 地域相对24h
  851. config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H, # 不区分地域相对24h
  852. config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H, # 不区分地域相对24h筛选后
  853. config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H, # rov大列表
  854. ]
  855. for key_prefix in key_prefix_list:
  856. region_key = f"{key_prefix}{region}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  857. city_key = f"{key_prefix}{city_code}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  858. if not redis_helper.key_exists(key_name=region_key):
  859. continue
  860. region_data = redis_helper.get_all_data_from_zset(key_name=region_key, with_scores=True)
  861. if not region_data:
  862. continue
  863. # 屏蔽视频过滤
  864. region_video_ids = [int(video_id) for video_id, _ in region_data]
  865. shield_key_name_list = shield_config.get(city_code, None)
  866. # shield_key_name_list = config_.SHIELD_CONFIG.get(city_code, None)
  867. if shield_key_name_list is not None:
  868. filtered_video_ids = filter_shield_video(video_ids=region_video_ids,
  869. shield_key_name_list=shield_key_name_list)
  870. else:
  871. filtered_video_ids = region_video_ids
  872. city_data = {}
  873. for video_id, score in region_data:
  874. if int(video_id) in filtered_video_ids:
  875. city_data[int(video_id)] = score
  876. if len(city_data) > 0:
  877. redis_helper.add_data_with_zset(key_name=city_key, data=city_data, expire_time=2 * 24 * 3600)
  878. log_.info(f"city_code = {city_code} end!")
  879. def process_with_param(param, data_params_item, rule_params_item, region_code_list, feature_df, now_date, now_h, rule_rank_h_flag):
  880. data_key = param.get('data')
  881. data_param = data_params_item.get(data_key)
  882. rule_key = param.get('rule')
  883. rule_param = rule_params_item.get(rule_key)
  884. merge_func = rule_param.get('merge_func', None)
  885. log_.info("数据采用:{},统计采用{}.".format(data_key, rule_key))
  886. log_.info("具体的规则是:{}.".format(rule_param))
  887. # 是否在地域小时级数据中增加打捞的优质视频
  888. add_videos_with_pre_h = rule_param.get('add_videos_with_pre_h', False)
  889. hour_count = rule_param.get('hour_count', 0)
  890. if merge_func == 2:
  891. score_df_list = []
  892. for apptype, weight in data_param.items():
  893. df = feature_df[feature_df['apptype'] == apptype]
  894. # 计算score
  895. score_df = cal_score(df=df, param=rule_param)
  896. score_df['score'] = score_df['score'] * weight
  897. score_df_list.append(score_df)
  898. # 分数合并
  899. df_merged = reduce(merge_df_with_score, score_df_list)
  900. # 更新平台回流比
  901. df_merged['platform_return_rate'] = df_merged['platform_return'] / df_merged['lastonehour_return']
  902. task_list = [
  903. gevent.spawn(process_with_region2,
  904. region, df_merged, data_key, rule_key, rule_param, now_date, now_h, rule_rank_h_flag,
  905. add_videos_with_pre_h, hour_count)
  906. for region in region_code_list
  907. ]
  908. else:
  909. df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
  910. df_merged = reduce(merge_df, df_list)
  911. task_list = [
  912. gevent.spawn(process_with_region,
  913. region, df_merged, data_key, rule_key, rule_param, now_date, now_h, rule_rank_h_flag,
  914. add_videos_with_pre_h, hour_count)
  915. for region in region_code_list
  916. ]
  917. gevent.joinall(task_list)
  918. # 特殊城市视频数据准备
  919. # 屏蔽视频过滤
  920. # shield_config = rule_param.get('shield_config', config_.SHIELD_CONFIG)
  921. # for region, city_list in config_.REGION_CITY_MAPPING.items():
  922. # t = [
  923. # gevent.spawn(
  924. # copy_data_for_city,
  925. # region, city_code, data_key, rule_key, now_date, now_h, shield_config
  926. # )
  927. # for city_code in city_list
  928. # ]
  929. # gevent.joinall(t)
  930. log_.info(f"多进程的 param = {param} 完成执行!")
  931. def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list, rule_rank_h_flag):
  932. # 获取特征数据
  933. feature_df = get_feature_data(project=project, table=table, now_date=now_date)
  934. feature_df['apptype'] = feature_df['apptype'].astype(int)
  935. data_params_item = rule_params.get('data_params')
  936. rule_params_item = rule_params.get('rule_params')
  937. params_list = rule_params.get('params_list')
  938. pool = multiprocessing.Pool(processes=len(params_list))
  939. for param in params_list:
  940. pool.apply_async(
  941. func=process_with_param,
  942. args=(param, data_params_item, rule_params_item, region_code_list, feature_df, now_date, now_h, rule_rank_h_flag)
  943. )
  944. pool.close()
  945. pool.join()
  946. def h_bottom_process(param, rule_params_item, region_code_list, key_prefix, redis_dt, redis_h,
  947. now_date, now_h, rule_rank_h_flag):
  948. redis_helper = RedisHelper()
  949. data_key = param.get('data')
  950. rule_key = param.get('rule')
  951. rule_param = rule_params_item.get(rule_key)
  952. log_.info(f"data_key = {data_key}, rule_key = {rule_key}, rule_param = {rule_param}")
  953. h_rule_key = rule_param.get('h_rule_key', None)
  954. region_24h_rule_key = rule_param.get('region_24h_rule_key', 'rule1')
  955. by_24h_rule_key = rule_param.get('24h_rule_key', None)
  956. by_48h_rule_key = rule_param.get('48h_rule_key', None)
  957. # 涉政视频过滤
  958. political_filter = param.get('political_filter', None)
  959. # 屏蔽视频过滤
  960. shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
  961. dup_remove = param.get('dup_remove', True)
  962. for region in region_code_list:
  963. log_.info(f"region = {region}")
  964. key_name = f"{key_prefix}{region}:{data_key}:{rule_key}:{redis_dt}:{redis_h}"
  965. initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
  966. if initial_data is None:
  967. initial_data = []
  968. final_data = dict()
  969. h_video_ids = []
  970. for video_id, score in initial_data:
  971. final_data[video_id] = score
  972. h_video_ids.append(int(video_id))
  973. # 存入对应的redis
  974. final_key_name = \
  975. f"{key_prefix}{region}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  976. if len(final_data) > 0:
  977. redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 24 * 3600)
  978. # 与其他召回视频池去重,存入对应的redis
  979. dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, h_rule_key=h_rule_key,
  980. region_24h_rule_key=region_24h_rule_key, region=region,
  981. data_key=data_key, by_24h_rule_key=by_24h_rule_key,
  982. by_48h_rule_key=by_48h_rule_key, rule_rank_h_flag=rule_rank_h_flag,
  983. political_filter=political_filter, shield_config=shield_config, dup_remove=dup_remove)
  984. # 特殊城市视频数据准备
  985. for region, city_list in config_.REGION_CITY_MAPPING.items():
  986. t = [
  987. gevent.spawn(
  988. copy_data_for_city,
  989. region, city_code, data_key, rule_key, now_date, now_h, shield_config
  990. )
  991. for city_code in city_list
  992. ]
  993. gevent.joinall(t)
  994. def h_rank_bottom(now_date, now_h, rule_params, region_code_list, rule_rank_h_flag):
  995. """未按时更新数据,用上一小时结果作为当前小时的数据"""
  996. # 获取rov模型结果
  997. # redis_helper = RedisHelper()
  998. if now_h == 0:
  999. redis_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  1000. redis_h = 23
  1001. else:
  1002. redis_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  1003. redis_h = now_h - 1
  1004. # 以上一小时的地域分组数据作为当前小时的数据
  1005. key_prefix = config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H
  1006. rule_params_item = rule_params.get('rule_params')
  1007. params_list = rule_params.get('params_list')
  1008. pool = multiprocessing.Pool(processes=len(params_list))
  1009. for param in params_list:
  1010. pool.apply_async(
  1011. func=h_bottom_process,
  1012. args=(param, rule_params_item, region_code_list, key_prefix, redis_dt, redis_h, now_date, now_h, rule_rank_h_flag)
  1013. )
  1014. pool.close()
  1015. pool.join()
  1016. def h_timer_check():
  1017. try:
  1018. rule_rank_h_flag = "24h"
  1019. rule_params = RULE_PARAMS
  1020. project = config_.PROJECT_REGION_APP_TYPE
  1021. table = config_.TABLE_REGION_APP_TYPE
  1022. region_code_list = [code for region, code in region_code.items()]
  1023. now_date = datetime.datetime.today()
  1024. log_.info(f"开始执行: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  1025. now_h = datetime.datetime.now().hour
  1026. now_min = datetime.datetime.now().minute
  1027. if now_h == 0:
  1028. log_.info("当前时间{}小时,使用bottom的data,开始。".format(now_h))
  1029. h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list,
  1030. rule_rank_h_flag=rule_rank_h_flag)
  1031. log_.info("----------当前时间{}小时,使用bottom的data,完成----------".format(now_h))
  1032. return
  1033. # 查看当前小时更新的数据是否已准备好
  1034. h_data_count = h_data_check(project=project, table=table, now_date=now_date)
  1035. if h_data_count > 0:
  1036. log_.info('上游数据表查询数据条数 h_data_count = {},开始计算。'.format(h_data_count))
  1037. # 数据准备好,进行更新
  1038. rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params,
  1039. project=project, table=table, region_code_list=region_code_list, rule_rank_h_flag=rule_rank_h_flag)
  1040. log_.info("数据1----------正常完成----------")
  1041. elif now_min > 40:
  1042. log_.info('当前分钟超过40,预计执行无法完成,使用 bottom data!')
  1043. h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list,
  1044. rule_rank_h_flag=rule_rank_h_flag)
  1045. log_.info('----------当前分钟超过40,使用bottom的data,完成----------')
  1046. else:
  1047. # 数据没准备好,1分钟后重新检查
  1048. log_.info("上游数据未就绪,等待...")
  1049. Timer(60, h_timer_check).start()
  1050. # send_msg_to_feishu(
  1051. # webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  1052. # key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  1053. # msg_text=f"rov-offline{config_.ENV_TEXT} - 推荐视频数据更新完成\n"
  1054. # f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}\n"
  1055. # f"now_h: {now_h}\n"
  1056. # f"finished time: {datetime.datetime.strftime(datetime.datetime.now(), '%Y%m%d %H:%M:%S')}"
  1057. # )
  1058. except Exception as e:
  1059. log_.error(f"地域分组小时级数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  1060. send_msg_to_feishu(
  1061. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  1062. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  1063. msg_text=f"rov-offline{config_.ENV_TEXT} - 地域分组小时级数据更新失败\n"
  1064. f"exception: {e}\n"
  1065. f"traceback: {traceback.format_exc()}"
  1066. )
  1067. if __name__ == '__main__':
  1068. log_.info("文件alg_recsys_recall_1h_region.py:「1小时地域」 开始执行")
  1069. h_timer_check()