select_work.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536
  1. import datetime
  2. import sys
  3. import time
  4. import schedule
  5. from loguru import logger
  6. sys.path.append('/app')
  7. from utils.odps_data import OdpsDataCount
  8. from utils.redis import RedisHelper
  9. def requirement_insight():
  10. """视频需求点洞察"""
  11. try:
  12. // 改成获取前一天
  13. dt = (datetime.datetime.now() - datetime.timedelta(days=1))..strftime('%Y%m%d')
  14. logger.info(f"视频需求点洞察")
  15. redis_task = "task:video_insight"
  16. sql =f'select clickobjectid as video_id from user_share_log where dt = {dt} and topic = "click" group by clickobjectidorder by count(distinct machinecode) desc limit 100'
  17. data = OdpsDataCount.main(table_name, dt)
  18. if not data:
  19. return
  20. RedisHelper().get_client().rpush(redis_task, *data)
  21. logger.info(f"[R] 写入Redis 成功 共写入 {len(data)} 条")
  22. except Exception as e:
  23. logger.error(f"[R] 写入Redis写入失败,失败信息{e}")
  24. def schedule_tasks():
  25. schedule.every().day.at("01:00").do(requirement_insight)
  26. if __name__ == "__main__":
  27. schedule_tasks() # 调用任务调度函数
  28. while True:
  29. schedule.run_pending()
  30. time.sleep(1) # 每秒钟检查一次