1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 |
- import asyncio
- import datetime
- import sys
- import time
- import orjson
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- from apscheduler.triggers.cron import CronTrigger
- from loguru import logger
- sys.path.append('/app')
- from utils.feishu_utils import Feishu
- from utils.odps_data import OdpsDataCount
- from utils.redis import RedisHelper
- class StartGetRecommend(object):
- @classmethod
- async def run(cls):
- dt = (datetime.datetime.now() - datetime.timedelta(hours=1)).strftime('%Y%m%d%H') # 获取前一小时
- tasks = OdpsDataCount.get_data_count(dt)
- logger.info(f"[获取] {dt}时间,共获取到{len(tasks)} 条")
- if len(tasks) > 0:
- await RedisHelper().get_client().rpush('gong_ji_heng_ceng:scan_tasks', *tasks)
- logger.info(f"[获取] {dt}时间,共获取到{len(tasks)}条,写入redis成功")
- logger.info(f"[获取] 开始写入飞书表格")
- current_time = datetime.datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- for task in tasks:
- time.sleep(0.5)
- task = orjson.loads(task)
- values = [[task['partition'], task['video_id'], task['channel'], task['time'], task["type"],formatted_time]]
- Feishu.insert_columns("JY4esfYvShLbTkthHEqcTw5qnsh", "f53916", "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values("JY4esfYvShLbTkthHEqcTw5qnsh", "f53916", "A2:Z2", values)
- logger.info(f"[处理] 写入飞书表格一条成功")
- logger.info(f"[处理] 写入飞书表格全部成功")
- async def run():
- scheduler = AsyncIOScheduler()
- try:
- scheduler.add_job(StartGetRecommend.run, trigger=CronTrigger(minute=55, second=0)) # 每小时获取一次
- scheduler.start()
- await asyncio.Event().wait()
- except KeyboardInterrupt:
- pass
- except Exception as e:
- pass
- finally:
- scheduler.shutdown()
- if __name__ == '__main__':
- # asyncio.run(StartGetRecommend.run())
- loop = asyncio.get_event_loop()
- loop.run_until_complete(run())
|