123456789101112131415161718192021222324252627282930313233343536373839 |
- import datetime
- import sys
- import time
- import schedule
- from loguru import logger
- logger.add("/app/logs/select.log", rotation="10 MB")
- sys.path.append('/app')
- import os
- print("Current working directory:", os.getcwd())
- from utils.odps_data import OdpsDataCount
- from utils.redis import RedisHelper
- def requirement_insight():
- """视频需求点洞察"""
- try:
- dt = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y%m%d')
- logger.info(f"视频需求点洞察")
- redis_task = "task:video_insight"
- sql =f'select clickobjectid as video_id from loghubods.user_share_log where dt = {dt} and topic = "click" group by clickobjectid order by count(distinct machinecode) desc limit 1000'
- data = OdpsDataCount.main(sql)
- if not data:
- return
- RedisHelper().get_client().rpush(redis_task, *data)
- logger.info(f"[R] 写入Redis 成功 共写入 {len(data)} 条")
- except Exception as e:
- logger.error(f"[R] 写入Redis写入失败,失败信息{e}")
- def schedule_tasks():
- schedule.every().day.at("01:00").do(requirement_insight)
- if __name__ == "__main__":
- schedule_tasks() # 调用任务调度函数
- while True:
- schedule.run_pending()
- time.sleep(1) # 每秒钟检查一次
|