import asyncio import datetime import sys import time import orjson from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from loguru import logger sys.path.append('/app') from utils.feishu_utils import Feishu from utils.odps_data import OdpsDataCount from utils.redis import RedisHelper class StartGetRecommend(object): @classmethod async def run(cls): dt = (datetime.datetime.now() - datetime.timedelta(hours=1)).strftime('%Y%m%d%H') # 获取前一小时 tasks = OdpsDataCount.get_data_count(dt) logger.info(f"[获取] {dt}时间,共获取到{len(tasks)} 条") if len(tasks) > 0: await RedisHelper().get_client().rpush('gong_ji_heng_ceng:scan_tasks', *tasks) logger.info(f"[获取] {dt}时间,共获取到{len(tasks)}条,写入redis成功") logger.info(f"[获取] 开始写入飞书表格") current_time = datetime.datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") for task in tasks: time.sleep(0.5) task = orjson.loads(task) values = [[task['partition'], task['video_id'], task['channel'], task['time'], task["type"],formatted_time]] Feishu.insert_columns("JY4esfYvShLbTkthHEqcTw5qnsh", "f53916", "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values("JY4esfYvShLbTkthHEqcTw5qnsh", "f53916", "A2:Z2", values) logger.info(f"[处理] 写入飞书表格一条成功") logger.info(f"[处理] 写入飞书表格全部成功") async def run(): scheduler = AsyncIOScheduler() try: scheduler.add_job(StartGetRecommend.run, trigger=CronTrigger(minute=55, second=0)) # 每小时获取一次 scheduler.start() await asyncio.Event().wait() except KeyboardInterrupt: pass except Exception as e: pass finally: scheduler.shutdown() if __name__ == '__main__': # asyncio.run(StartGetRecommend.run()) loop = asyncio.get_event_loop() loop.run_until_complete(run())