app_rank_h.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. # -*- coding: utf-8 -*-
  2. # @ModuleName: app_rank_h
  3. # @Author: Liqian
  4. # @Time: 2022/3/18 下午2:03
  5. # @Software: PyCharm
  6. import datetime
  7. from datetime import datetime as dt
  8. from threading import Timer
  9. from log import Log
  10. from db_helper import RedisHelper
  11. from config import set_config
  12. from odps import ODPS
  13. log_ = Log()
  14. config_, env = set_config()
  15. def op_data_check(project, table, hour):
  16. """检查数据是否准备好"""
  17. odps = ODPS(
  18. access_id=config_.ODPS_CONFIG['ACCESSID'],
  19. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  20. project=project,
  21. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  22. connect_timeout=3000,
  23. read_timeout=500000,
  24. pool_maxsize=1000,
  25. pool_connections=1000
  26. )
  27. try:
  28. sql = f'select * from {project}.{table} where hour = {hour}'
  29. with odps.execute_sql(sql=sql).open_reader() as reader:
  30. data_count = reader.count
  31. except Exception as e:
  32. data_count = 0
  33. return data_count
  34. def get_op_data(now_date, project, table):
  35. """
  36. 获取运营提供的数据
  37. """
  38. hour = dt.strftime(now_date, '%Y%m%d%H')
  39. odps = ODPS(
  40. access_id=config_.ODPS_CONFIG['ACCESSID'],
  41. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  42. project=project,
  43. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  44. connect_timeout=3000,
  45. read_timeout=500000,
  46. pool_maxsize=1000,
  47. pool_connections=1000
  48. )
  49. records = odps.read_table(name=table, partition='hour=%s' % hour)
  50. op_data = []
  51. for record in records:
  52. item = {'videoid': record['videoid'], 'rank': record['rank']}
  53. # print(item)
  54. op_data.append(item)
  55. # print(op_data)
  56. return op_data
  57. def app_rank_op(now_date, now_h):
  58. """
  59. 票圈视频App推荐列表小时级更新,小时级数据由运营提供
  60. """
  61. log_.info("now date: {}".format(now_date))
  62. # 获取rov模型结果
  63. redis_helper = RedisHelper()
  64. key_name = get_redis_key_date(now_date=now_date)
  65. initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
  66. log_.info(f'initial data count = {len(initial_data)}')
  67. # 获取当前小时op更新的数据
  68. # op_key_name = f"{config_.APP_OP_VIDEOS_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
  69. # op_data = redis_helper.get_data_zset_with_index(key_name=op_key_name, start=0, end=-1, with_scores=True)
  70. op_data = get_op_data(now_date=now_date, project=config_.APP_OP_PROJECT, table=config_.APP_OP_TABLE)
  71. # 倒序排序
  72. op_data.sort(key=lambda x: int(x['rank']), reverse=True)
  73. op_video_ids = [int(item['videoid']) for item in op_data]
  74. log_.info(f'op video count = {len(op_video_ids)}')
  75. # 对op更新数据给定score
  76. final_data = dict()
  77. for i, video_id in enumerate(op_video_ids):
  78. score = 1000 + i + 1
  79. final_data[video_id] = score
  80. # 合并结果
  81. for video_id, score in initial_data:
  82. if int(video_id) not in op_video_ids:
  83. final_data[int(video_id)] = score
  84. # print(video_id, score)
  85. # print(op_data)
  86. # print(final_data)
  87. log_.info(f'final data count = {len(final_data)}')
  88. # 存入对应的redis
  89. final_key_name = f"{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
  90. redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=24 * 3600)
  91. def get_redis_key_date(now_date):
  92. # 获取rov模型结果存放key
  93. redis_helper = RedisHelper()
  94. now_dt = dt.strftime(now_date, '%Y%m%d')
  95. key_name = f'{config_.RECALL_KEY_NAME_PREFIX_APP}{now_dt}'
  96. if not redis_helper.key_exists(key_name=key_name):
  97. pre_dt = dt.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  98. key_name = f'{config_.RECALL_KEY_NAME_PREFIX_APP}{pre_dt}'
  99. return key_name
  100. def app_rank_bottom(now_date, now_h):
  101. """运营未按时更新数据,用rov模型结果作为当前小时的数据"""
  102. # 获取rov模型结果
  103. redis_helper = RedisHelper()
  104. key_name = get_redis_key_date(now_date=now_date)
  105. initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
  106. final_data = dict()
  107. for video_id, score in initial_data:
  108. final_data[video_id] = score
  109. # 存入对应的redis
  110. final_key_name = f"{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
  111. redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=24 * 3600)
  112. def app_timer_check():
  113. now_date = dt.today()
  114. log_.info(f"now_date: {dt.strftime(now_date, '%Y%m%d')}")
  115. now_h = dt.now().hour
  116. now_min = dt.now().minute
  117. # 查看当前小时op更新的数据是否已准备好
  118. # op_key_name = f"{config_.APP_OP_VIDEOS_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
  119. op_data_count = op_data_check(project=config_.APP_OP_PROJECT, table=config_.APP_OP_TABLE,
  120. hour=dt.strftime(now_date, '%Y%m%d%H'))
  121. if op_data_count > 0:
  122. # 数据准备好,进行更新
  123. app_rank_op(now_date=now_date, now_h=now_h)
  124. elif now_min > 50:
  125. log_.info('op data is None, use bottom data!')
  126. app_rank_bottom(now_date=now_date, now_h=now_h)
  127. else:
  128. # 数据没准备好,1分钟后重新检查
  129. Timer(60, app_timer_check).start()
  130. if __name__ == '__main__':
  131. # now_date = dt.today()
  132. # print(dt.strftime(now_date, '%Y%m%d%H'))
  133. # get_op_data(now_date=now_date)
  134. # now_h = dt.now().hour
  135. # app_rank_op(now_date=now_date, now_h=now_h)
  136. # key_name = get_redis_key_date(now_date=now_date)
  137. # print(key_name)
  138. app_timer_check()