rule_rank_h_18_19.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. # -*- coding: utf-8 -*-
  2. # @ModuleName: rule_rank_h_18_19
  3. # @Author: Liqian
  4. # @Time: 2022/4/21 下午4:31
  5. # @Software: PyCharm
  6. import time
  7. import datetime
  8. import pandas as pd
  9. import math
  10. import random
  11. from odps import ODPS
  12. from threading import Timer
  13. from get_data import get_data_from_odps
  14. from db_helper import RedisHelper, MysqlHelper
  15. from my_config import set_config
  16. from log import Log
  17. from my_utils import filter_video_status
  18. config_, env = set_config()
  19. log_ = Log()
  20. features = [
  21. 'videoid',
  22. 'lastonehour_view', # 过去1小时曝光
  23. 'lastonehour_play', # 过去1小时播放
  24. 'lastonehour_share', # 过去1小时分享
  25. 'lastonehour_return', # 过去1小时分享,过去1小时回流
  26. ]
  27. def h_data_check(project, table, now_date):
  28. """检查数据是否准备好"""
  29. odps = ODPS(
  30. access_id=config_.ODPS_CONFIG['ACCESSID'],
  31. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  32. project=project,
  33. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  34. connect_timeout=3000,
  35. read_timeout=500000,
  36. pool_maxsize=1000,
  37. pool_connections=1000
  38. )
  39. try:
  40. dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
  41. sql = f'select * from {project}.{table} where dt = {dt}'
  42. with odps.execute_sql(sql=sql).open_reader() as reader:
  43. data_count = reader.count
  44. except Exception as e:
  45. data_count = 0
  46. return data_count
  47. def get_feature_data(now_date, project, table):
  48. """获取特征数据"""
  49. dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
  50. # dt = '2022041310'
  51. records = get_data_from_odps(date=dt, project=project, table=table)
  52. feature_data = []
  53. for record in records:
  54. item = {}
  55. for feature_name in features:
  56. item[feature_name] = record[feature_name]
  57. feature_data.append(item)
  58. feature_df = pd.DataFrame(feature_data)
  59. return feature_df
  60. def cal_score(df):
  61. """
  62. 计算score
  63. :param df: 特征数据
  64. :return:
  65. """
  66. # score计算公式: sharerate*backrate*logback*ctr
  67. # sharerate = lastonehour_share/(lastonehour_play+1000)
  68. # backrate = lastonehour_return/(lastonehour_share+10)
  69. # ctr = lastonehour_play/(lastonehour_view+1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
  70. # score = sharerate * backrate * LOG(lastonehour_return+1) * K2
  71. # 视频状态过滤
  72. log_.info(f'initial_df count = {len(df)}')
  73. video_ids = [int(video_id) for video_id in df['videoid']]
  74. filtered_result = filter_video_status(video_ids=video_ids)
  75. filter_result = set(video_ids) - set(filtered_result)
  76. df['videoid'] = df['videoid'].astype(int)
  77. filter_df = df[df['videoid'].isin(filter_result)]
  78. df = df.append(filter_df)
  79. df = df.drop_duplicates(['videoid'], keep=False)
  80. log_.info(f'filtered_df count = {len(df)}')
  81. # 计算score
  82. df = df.fillna(0)
  83. df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
  84. df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
  85. df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
  86. df['ctr'] = df['lastonehour_play'] / (df['lastonehour_view'] + 1000)
  87. df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
  88. df['score'] = df['share_rate'] * df['back_rate'] * df['log_back'] * df['K2']
  89. df = df.sort_values(by=['score'], ascending=False)
  90. return df
  91. def video_rank(app_type, df, now_date, now_h, return_count):
  92. """
  93. 根据回流数量,对视频进行二次排序
  94. :param app_type:
  95. :param df:
  96. :param now_date:
  97. :param now_h:
  98. :param return_count: 小时级数据回流限制数
  99. :return:
  100. """
  101. log_.info(f'df length = {len(df)}')
  102. # 获取符合进入召回源条件的视频,进入条件:小时级回流>=20 && score>=0.005
  103. h_recall_df = df[(df['lastonehour_return'] >= return_count) & (df['score'] >= 0.005)]
  104. h_recall_videos = h_recall_df['videoid'].to_list()
  105. log_.info(f'h_recall videos count = {len(h_recall_videos)}')
  106. # 不符合进入召回源条件的视频
  107. df = df.append(h_recall_df)
  108. h_else_df = df.drop_duplicates(['videoid'], keep=False)
  109. h_else_df = h_else_df.sort_values(by=['score'], ascending=False)
  110. h_else_videos = h_else_df['videoid'].to_list()
  111. # 合并,给定分数
  112. final_videos = h_recall_videos + h_else_videos
  113. final_result = {}
  114. step = round(100/len(final_videos), 3)
  115. for i, video_id in enumerate(final_videos):
  116. score = 100 - i * step
  117. final_result[int(video_id)] = score
  118. # 写入对应的redis
  119. key_name = \
  120. f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  121. if len(final_result) > 0:
  122. redis_helper = RedisHelper()
  123. redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=23 * 3600)
  124. def rank_by_h(app_type, now_date, now_h, return_count_list, project, table):
  125. # 获取特征数据
  126. feature_df = get_feature_data(now_date=now_date, project=project, table=table)
  127. # 计算score
  128. score_df = cal_score(df=feature_df)
  129. # rank
  130. for cnt in return_count_list:
  131. log_.info(f"return_count = {cnt}")
  132. video_rank(app_type=app_type, df=score_df, now_date=now_date, now_h=now_h, return_count=cnt)
  133. # to-csv
  134. score_filename = f"score_{app_type}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
  135. score_df.to_csv(f'./data/{score_filename}')
  136. def h_rank_bottom(app_type, now_date, now_h):
  137. """未按时更新数据,用上一小时结果作为当前小时的数据"""
  138. log_.info(f"app_type = {app_type}")
  139. # 获取rov模型结果
  140. redis_helper = RedisHelper()
  141. if now_h == 0:
  142. redis_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  143. redis_h = 23
  144. else:
  145. redis_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  146. redis_h = now_h - 1
  147. key_name = f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.{redis_dt}.{redis_h}"
  148. initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
  149. final_data = dict()
  150. for video_id, score in initial_data:
  151. final_data[video_id] = score
  152. # 存入对应的redis
  153. final_key_name = \
  154. f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  155. if len(final_data) > 0:
  156. redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600)
  157. def h_timer_check(app_type):
  158. log_.info(f"app_type = {app_type}")
  159. return_count_list = [20]
  160. now_date = datetime.datetime.today()
  161. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  162. now_h = datetime.datetime.now().hour
  163. now_min = datetime.datetime.now().minute
  164. if now_h == 0:
  165. h_rank_bottom(app_type=app_type, now_date=now_date, now_h=now_h)
  166. return
  167. # 查看当前小时更新的数据是否已准备好
  168. project = config_.PREDICT_PROJECT_18_19[str(app_type)]
  169. table = config_.PREDICT_TABLE_18_19[str(app_type)]
  170. h_data_count = h_data_check(project=project, table=table, now_date=now_date)
  171. if h_data_count > 0:
  172. log_.info(f'h_data_count = {h_data_count}')
  173. # 数据准备好,进行更新
  174. rank_by_h(app_type=app_type, now_date=now_date, now_h=now_h,
  175. return_count_list=return_count_list, project=project, table=table)
  176. elif now_min > 50:
  177. log_.info('h_recall data is None, use bottom data!')
  178. h_rank_bottom(app_type=app_type, now_date=now_date, now_h=now_h)
  179. else:
  180. # 数据没准备好,1分钟后重新检查
  181. Timer(60, h_timer_check, args=[app_type]).start()
  182. def predict(app_type_list):
  183. for app_type in app_type_list:
  184. h_timer_check(app_type=app_type)
  185. def predict_test(app_type_list, count):
  186. now_date = datetime.datetime.today()
  187. now_h = datetime.datetime.now().hour
  188. log_.info(f"now_date = {datetime.datetime.strftime(now_date, '%Y%m%d%H')}, now_h = {now_h}")
  189. # 获取测试环境中最近发布的40000条视频
  190. sql = "SELECT id FROM wx_video ORDER BY id DESC LIMIT 40000;"
  191. mysql_helper = MysqlHelper(mysql_info=config_.MYSQL_INFO)
  192. data = mysql_helper.get_data(sql=sql)
  193. video_ids = [int(video[0]) for video in data]
  194. # 视频状态过滤
  195. filtered_videos = filter_video_status(video_ids)
  196. log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
  197. for app_type in app_type_list:
  198. log_.info(f"app_type = {app_type}")
  199. videos_temp = random.sample(filtered_videos, count)
  200. redis_data_temp = {}
  201. csv_data_temp = []
  202. for video_id in videos_temp:
  203. score = random.uniform(0, 100)
  204. redis_data_temp[video_id] = score
  205. csv_data_temp.append({'video_id': video_id, 'rov_score': score})
  206. # 打包预测结果存入csv
  207. score_df = pd.DataFrame(data=csv_data_temp, columns=['video_id', 'rov_score'])
  208. score_df = score_df.sort_values(by=['rov_score'], ascending=False)
  209. score_filename = f"score_{app_type}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
  210. score_df.to_csv(f'./data/{score_filename}', index=False)
  211. # 存入对应的redis
  212. key_name = \
  213. f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  214. redis_helper = RedisHelper()
  215. redis_helper.add_data_with_zset(key_name=key_name, data=redis_data_temp)
  216. log_.info('data to redis finished!')
  217. if __name__ == '__main__':
  218. app_type_list = [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]
  219. log_.info(f'appType: {app_type_list} predict start...')
  220. predict_start = time.time()
  221. if env in ['dev', 'test']:
  222. predict_test(app_type_list=app_type_list, count=300)
  223. elif env in ['pre', 'pro']:
  224. predict(app_type_list=app_type_list)
  225. else:
  226. log_.error('env error')
  227. predict_end = time.time()
  228. log_.info(f'appType: {app_type_list} predict end, execute time = {(predict_end - predict_start) * 1000}ms')