job_top_day_redis.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. import json
  2. import time
  3. from datetime import datetime
  4. from collections import defaultdict
  5. import schedule
  6. from loguru import logger
  7. import json
  8. from common.feishu_utils import Feishu
  9. from common.redis import insert_job_data, get_top_data, get_llen_list, get_lrange_list
  10. def jab_day_recommend():
  11. """获取每日每小时top前100数据"""
  12. try:
  13. logger.info(f"开始获取溯源到的数据")
  14. list_task = [{'task:top_data_ks_gjc': '77618314'}, {'task:top_data_dy_gjc':'77618313'}, {'task:top_data_ks_gz':'77618315'}, {'task:top_data_dy_gz':'77618317'}]
  15. for task in list_task:
  16. for key, value in task.items():
  17. count = get_llen_list(key)
  18. if count > 0:
  19. result = []
  20. channel = None
  21. tag_transport_channel = None
  22. top_task = get_lrange_list(key,count)
  23. for top in top_task:
  24. data = json.loads(top)
  25. channel_account_id = data.get("channel_account_id", "")
  26. tag_transport_channel = data.get("tag_transport_channel", "")
  27. channel = data.get("channel", "")
  28. result.append(channel_account_id)
  29. result_string = ",".join(result)
  30. result_string = ",".join(dict.fromkeys(result_string.split(",")))
  31. values = [
  32. [
  33. "",
  34. tag_transport_channel,
  35. result_string,
  36. "lev-供给,rol-机器,#str-搬运搜索top视频溯源账号_42,genMod-账号",
  37. value,
  38. "2",
  39. "15",
  40. "通用-分享到群",
  41. "AI片尾引导",
  42. "zhifeng_emo,sijia",
  43. "",
  44. "",
  45. "AI标题",
  46. "1",
  47. "",
  48. "王雪珂",
  49. channel
  50. ]
  51. ]
  52. Feishu.insert_columns("YqiSsMvvMhr5tCttL7tcRbHTnrd", "d79d48", "ROWS", 1, 2)
  53. time.sleep(0.5)
  54. Feishu.update_values("YqiSsMvvMhr5tCttL7tcRbHTnrd", "d79d48", "A2:Z2", values)
  55. logger.info(f"[+] 成功写入改造飞书表格")
  56. else:
  57. continue
  58. logger.info(f"写入top前100数据成功")
  59. return
  60. except Exception as e:
  61. logger.error(f"写入飞书异常,异常信息{e}")
  62. return
  63. def schedule_tasks():
  64. schedule.every().day.at("00:40").do(jab_day_recommend)
  65. if __name__ == "__main__":
  66. # jab_day_recommend()
  67. schedule_tasks() # 调用任务调度函数
  68. while True:
  69. schedule.run_pending()
  70. time.sleep(1) # 每秒钟检查一次