import datetime import sys import time import schedule from loguru import logger logger.add("/app/logs/select.log", rotation="10 MB") sys.path.append('/app') import os print("Current working directory:", os.getcwd()) from utils.odps_data import OdpsDataCount from utils.redis import RedisHelper def requirement_insight(): """视频需求点洞察""" try: dt = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y%m%d') logger.info(f"视频需求点洞察") redis_task = "task:video_insight" sql =f'select clickobjectid as video_id from loghubods.user_share_log where dt = {dt} and topic = "click" group by clickobjectid order by count(distinct machinecode) desc limit 1000' data = OdpsDataCount.main(sql) if not data: return RedisHelper().get_client().rpush(redis_task, *data) logger.info(f"[R] 写入Redis 成功 共写入 {len(data)} 条") except Exception as e: logger.error(f"[R] 写入Redis写入失败,失败信息{e}") def schedule_tasks(): schedule.every().day.at("01:00").do(requirement_insight) if __name__ == "__main__": schedule_tasks() # 调用任务调度函数 while True: schedule.run_pending() time.sleep(1) # 每秒钟检查一次