app_rank_h.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. # -*- coding: utf-8 -*-
  2. # @ModuleName: app_rank_h
  3. # @Author: Liqian
  4. # @Time: 2022/3/18 下午2:03
  5. # @Software: PyCharm
  6. import datetime
  7. from datetime import datetime as dt
  8. from threading import Timer
  9. from log import Log
  10. from db_helper import RedisHelper
  11. from config import set_config
  12. from odps import ODPS
  13. log_ = Log()
  14. config_, env = set_config()
  15. def op_data_check(project, table, hour):
  16. """检查数据是否准备好"""
  17. odps = ODPS(
  18. access_id=config_.ODPS_CONFIG['ACCESSID'],
  19. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  20. project=project,
  21. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  22. connect_timeout=3000,
  23. read_timeout=500000,
  24. pool_maxsize=1000,
  25. pool_connections=1000
  26. )
  27. try:
  28. sql = f'select * from {project}.{table} where hour = {hour}'
  29. with odps.execute_sql(sql=sql).open_reader() as reader:
  30. data_count = reader.count
  31. except Exception as e:
  32. data_count = 0
  33. return data_count
  34. def get_op_data(now_date, project, table):
  35. """
  36. 获取运营提供的数据
  37. """
  38. hour = dt.strftime(now_date, '%Y%m%d%H')
  39. odps = ODPS(
  40. access_id=config_.ODPS_CONFIG['ACCESSID'],
  41. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  42. project=project,
  43. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  44. connect_timeout=3000,
  45. read_timeout=500000,
  46. pool_maxsize=1000,
  47. pool_connections=1000
  48. )
  49. records = odps.read_table(name=table, partition='hour=%s' % hour)
  50. op_data = []
  51. for record in records:
  52. item = {'videoid': record['videoid'], 'rank': record['rank']}
  53. # print(item)
  54. op_data.append(item)
  55. # print(op_data)
  56. return op_data
  57. def app_rank_op(now_date, now_h):
  58. """
  59. 票圈视频App推荐列表小时级更新,小时级数据由运营提供
  60. """
  61. log_.info("now date: {}".format(now_date))
  62. # 获取rov模型结果
  63. redis_helper = RedisHelper()
  64. key_name = get_redis_key_date(now_date=now_date)
  65. initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
  66. # 获取当前小时op更新的数据
  67. # op_key_name = f"{config_.APP_OP_VIDEOS_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
  68. # op_data = redis_helper.get_data_zset_with_index(key_name=op_key_name, start=0, end=-1, with_scores=True)
  69. op_data = get_op_data(now_date=now_date, project=config_.APP_OP_PROJECT, table=config_.APP_OP_TABLE)
  70. # 倒序排序
  71. op_data.sort(key=lambda x: x['rank'], reverse=True)
  72. op_video_ids = [int(item['videoid']) for item in op_data]
  73. # 对op更新数据给定score
  74. final_data = dict()
  75. for i, video_id in enumerate(op_video_ids):
  76. score = 1000 + i + 1
  77. final_data[video_id] = score
  78. # 合并结果
  79. for video_id, score in initial_data:
  80. if int(video_id) not in op_video_ids:
  81. final_data[int(video_id)] = score
  82. # print(video_id, score)
  83. # print(op_data)
  84. # print(final_data)
  85. # 存入对应的redis
  86. final_key_name = f"{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
  87. redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=24 * 3600)
  88. def get_redis_key_date(now_date):
  89. # 获取rov模型结果存放key
  90. redis_helper = RedisHelper()
  91. now_dt = dt.strftime(now_date, '%Y%m%d')
  92. key_name = f'{config_.RECALL_KEY_NAME_PREFIX_APP}{now_dt}'
  93. if not redis_helper.key_exists(key_name=key_name):
  94. pre_dt = dt.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  95. key_name = f'{config_.RECALL_KEY_NAME_PREFIX_APP}{pre_dt}'
  96. return key_name
  97. def app_rank_bottom(now_date, now_h):
  98. """运营未按时更新数据,用rov模型结果作为当前小时的数据"""
  99. # 获取rov模型结果
  100. redis_helper = RedisHelper()
  101. key_name = get_redis_key_date(now_date=now_date)
  102. initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
  103. final_data = dict()
  104. for video_id, score in initial_data:
  105. final_data[video_id] = score
  106. # 存入对应的redis
  107. final_key_name = f"{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
  108. redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=24 * 3600)
  109. def app_timer_check():
  110. now_date = dt.today()
  111. print(dt.strftime(now_date, '%Y%m%d%H'))
  112. now_h = dt.now().hour
  113. now_min = dt.now().minute
  114. # 查看当前小时op更新的数据是否已准备好
  115. # op_key_name = f"{config_.APP_OP_VIDEOS_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
  116. op_data_count = op_data_check(project=config_.APP_OP_PROJECT, table=config_.APP_OP_TABLE,
  117. hour=dt.strftime(now_date, '%Y%m%d%H'))
  118. if op_data_count > 0:
  119. # 数据准备好,进行更新
  120. app_rank_op(now_date=now_date, now_h=now_h)
  121. elif now_min > 50:
  122. app_rank_bottom(now_date=now_date, now_h=now_h)
  123. else:
  124. # 数据没准备好,1分钟后重新检查
  125. Timer(60, app_timer_check).start()
  126. if __name__ == '__main__':
  127. # now_date = dt.today()
  128. # print(dt.strftime(now_date, '%Y%m%d%H'))
  129. # get_op_data(now_date=now_date)
  130. # now_h = dt.now().hour
  131. # app_rank_op(now_date=now_date, now_h=now_h)
  132. # key_name = get_redis_key_date(now_date=now_date)
  133. # print(key_name)
  134. app_timer_check()