import asyncio import time from datetime import datetime from applications.api import feishu_robot from applications.utils import task_schedule_response from applications.tasks.monitor_tasks import check_kimi_balance from applications.tasks.monitor_tasks import GetOffVideos class TaskScheduler: def __init__(self, data, log_service, db_client): self.data = data self.log_client = log_service self.db_client = db_client self.table = "long_articles_task_manager" async def whether_task_processing(self, task_name: str) -> bool: """whether task is processing""" query = f""" select start_timestamp from {self.table} where task_name = %s and task_status = %s; """ response, error = await self.db_client.async_fetch( query=query, params=(task_name, 1) ) if not response: # no task is processing return False else: start_timestamp = response[0]["start_timestamp"] # todo: every task should has a unique expire timestamp, remember to write that in a task config file if int(time.time()) - start_timestamp >= 86400: await feishu_robot.bot( title=f"{task_name} has been processing for more than one day", detail={"timestamp": start_timestamp}, env="long_articles_task", ) return True async def record_task(self, task_name, date_string): """record task""" query = f"""insert into {self.table} (date_string, task_name, start_timestamp) values (%s, %s, %s);""" await self.db_client.async_save( query=query, params=(date_string, task_name, int(time.time())) ) async def lock_task(self, task_name, date_string): query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;""" return await self.db_client.async_save( query=query, params=(1, task_name, date_string, 0) ) async def release_task(self, task_name, date_string, final_status): """ 任务执行完成之后,将任务状态设置为完成状态/失败状态 """ query = f""" update {self.table} set task_status = %s, finish_timestamp = %s where task_name = %s and date_string = %s and task_status = %s; """ return await self.db_client.async_save( query=query, params=(final_status, int(time.time()), task_name, date_string, 1), ) async def deal(self): task_name = self.data.get("task_name") date_string = self.data.get("date_string") if not task_name: await self.log_client.log( contents={ "task": task_name, "function": "task_scheduler_deal", "message": "not task name in params", "status": "fail", "data": self.data, } ) return await task_schedule_response.fail_response( error_code="4002", error_message="task_name must be input" ) if not date_string: date_string = datetime.today().strftime("%Y-%m-%d") # prepare for task if await self.whether_task_processing(task_name): return await task_schedule_response.fail_response( error_code="5001", error_message="task is processing" ) await self.record_task(task_name=task_name, date_string=date_string) await self.lock_task(task_name, date_string) match task_name: case "check_kimi_balance": response = await check_kimi_balance() await self.log_client.log( contents={ "task": task_name, "function": "task_scheduler_deal", "message": "check_kimi_balance task execute successfully", "status": "success", "data": response, } ) return await task_schedule_response.success_response( task_name=task_name, data=response ) case "get_off_videos": async def background_get_off_videos(): sub_task = GetOffVideos(self.db_client, self.log_client) await sub_task.get_off_job() task_status = await sub_task.check() await self.release_task( task_name=task_name, date_string=date_string, final_status=task_status, ) asyncio.create_task(background_get_off_videos()) return await task_schedule_response.success_response( task_name=task_name, data={"code": 0, "message": "get off_videos started background"}, ) case _: await self.log_client.log( contents={ "task": task_name, "function": "task_scheduler_deal", "message": "wrong task input", "status": "success", "data": self.data, } ) await self.release_task(task_name, date_string, 99) return await task_schedule_response.fail_response( error_code="4001", error_message="wrong task name input" )