religion_class_videos_update.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. import datetime
  2. import sys
  3. import traceback
  4. import pandas as pd
  5. from odps import ODPS
  6. from threading import Timer
  7. from get_data import get_data_from_odps
  8. from my_utils import send_msg_to_feishu
  9. from db_helper import RedisHelper
  10. from my_config import set_config
  11. from log import Log
  12. config_, env = set_config()
  13. log_ = Log()
  14. features = ['videoid', 'play_count_total', 'gmt_create']
  15. def data_check(project, table, now_date):
  16. """检查数据是否准备好"""
  17. odps = ODPS(
  18. access_id=config_.ODPS_CONFIG['ACCESSID'],
  19. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  20. project=project,
  21. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  22. connect_timeout=3000,
  23. read_timeout=500000,
  24. pool_maxsize=1000,
  25. pool_connections=1000
  26. )
  27. try:
  28. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  29. sql = f'select * from {project}.{table} where dt = {dt}'
  30. with odps.execute_sql(sql=sql).open_reader() as reader:
  31. data_count = reader.count
  32. except Exception as e:
  33. data_count = 0
  34. return data_count
  35. def get_religion_videos(now_date, project, table, key_name_prefix):
  36. """获取宗教视频列表"""
  37. # 获取videoId
  38. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  39. records = get_data_from_odps(date=dt, project=project, table=table)
  40. feature_data = []
  41. for record in records:
  42. item = {}
  43. for feature_name in features:
  44. item[feature_name] = record[feature_name]
  45. feature_data.append(item)
  46. feature_df = pd.DataFrame(feature_data)
  47. # 按照发布时间和播放量进行倒序
  48. feature_df = feature_df.sort_values(by=['gmt_create', 'play_count_total'], ascending=False)
  49. print(feature_df)
  50. video_id_list = feature_df['videoid'].to_list()
  51. # 按照排序给定分数
  52. final_result = {}
  53. step = 100 / (len(video_id_list) * 2)
  54. for i, video_id in enumerate(video_id_list):
  55. score = 100 - i * step
  56. final_result[int(video_id)] = score
  57. # 写入对应的redis
  58. key_name = \
  59. f"{key_name_prefix}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
  60. if len(final_result) > 0:
  61. redis_helper = RedisHelper()
  62. redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=2 * 24 * 3600)
  63. def timer_check(religion_name):
  64. project = config_.RELIGION_VIDEOS[religion_name]['project']
  65. table = config_.RELIGION_VIDEOS[religion_name]['table']
  66. key_name_prefix = config_.RELIGION_VIDEOS[religion_name]['key_name_prefix']
  67. now_date = datetime.datetime.today()
  68. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
  69. # 查看当天更新的数据是否已准备好
  70. data_count = data_check(project=project, table=table, now_date=now_date)
  71. if data_count > 0:
  72. log_.info(f'religion_name = {religion_name}, religion_videos_count = {data_count}')
  73. # 数据准备好,进行更新
  74. get_religion_videos(now_date=now_date, project=project, table=table, key_name_prefix=key_name_prefix)
  75. else:
  76. # 数据没准备好,1分钟后重新检查
  77. Timer(5 * 60, timer_check, args=[religion_name]).start()
  78. def main():
  79. try:
  80. religion_name = sys.argv[1]
  81. timer_check(religion_name)
  82. except Exception as e:
  83. log_.error(f"宗教视频数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  84. send_msg_to_feishu(
  85. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  86. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  87. msg_text=f"rov-offline{config_.ENV_TEXT} - 宗教视频数据更新失败\n"
  88. f"exception: {e}\n"
  89. f"traceback: {traceback.format_exc()}"
  90. )
  91. if __name__ == '__main__':
  92. main()