test_select.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. import datetime
  2. import sys
  3. import time
  4. import os
  5. import schedule
  6. from loguru import logger
  7. sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
  8. logger.add("/app/logs/select.log", rotation="10 MB")
  9. sys.path.append('/app')
  10. import os
  11. print("Current working directory:", os.getcwd())
  12. from utils.odps_data import OdpsDataCount
  13. from utils.redis import RedisHelper
  14. def requirement_insight():
  15. """视频需求点洞察"""
  16. try:
  17. dt = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y%m%d')
  18. logger.info(f"视频需求点洞察")
  19. redis_task = "task:video_insight"
  20. redis_trigger_task = "task:video_trigger_insight"
  21. sql =f'select clickobjectid as video_id from loghubods.user_share_log where dt = {dt} and topic = "click" group by clickobjectid order by count(distinct machinecode) desc limit 1'
  22. data = OdpsDataCount.main(sql)
  23. if not data:
  24. return
  25. RedisHelper().get_client().rpush(redis_task, *data)
  26. RedisHelper().get_client().rpush(redis_trigger_task, *data)
  27. logger.info(f"[R] 写入Redis 成功 共写入 {len(data)} 条")
  28. except Exception as e:
  29. logger.error(f"[R] 写入Redis写入失败,失败信息{e}")
  30. def schedule_tasks():
  31. schedule.every().day.at("01:00").do(requirement_insight)
  32. if __name__ == "__main__":
  33. requirement_insight()