123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- import time
- from datetime import datetime
- from applications.api import feishu_robot
- from applications.utils import task_schedule_response
- from applications.tasks.monitor_tasks import check_kimi_balance
- class TaskScheduler:
- def __init__(self, data, log_service, db_client):
- self.data = data
- self.log_client = log_service
- self.db_client = db_client
- self.table = "long_articles_task_manager"
- async def whether_task_processing(self, task_name: str) -> bool:
- """whether task is processing"""
- query = f"""
- select start_timestamp from {self.table} where task_name = %s and task_status = %s;
- """
- response, error = await self.db_client.async_fetch(
- query=query, params=(task_name, 1)
- )
- if not response:
- # no task is processing
- return False
- else:
- start_timestamp = response[0]["start_timestamp"]
- # todo: every task should has a unique expire timestamp, remember to write that in a task config file
- if int(time.time()) - start_timestamp >= 86400:
- feishu_robot.bot(
- title=f"{task_name} has been processing for more than one day",
- detail={"timestamp": start_timestamp},
- )
- return True
- async def record_task(self, task_name, date_string):
- """record task"""
- query = f"""insert into {self.table} (date_string, task_name, start_timestamp) values (%s, %s, %s);"""
- await self.db_client.async_save(
- query=query, params=(date_string, task_name, int(time.time()))
- )
- async def lock_task(self, task_name, date_string):
- query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
- return await self.db_client.async_save(
- query=query, params=(1, task_name, date_string, 0)
- )
- async def release_task(self, task_name, date_string, final_status):
- """
- 任务执行完成之后,将任务状态设置为完成状态/失败状态
- """
- query = f"""
- update {self.table} set task_status = %s, finish_timestamp = %s
- where task_name = %s and date_string = %s and task_status = %s;
- """
- return await self.db_client.async_save(
- query=query,
- params=(final_status, int(time.time()), task_name, date_string, 1),
- )
- async def deal(self):
- task_name = self.data.get("task_name")
- date_string = self.data.get("date_string")
- if not task_name:
- await self.log_client.log(
- contents={
- "task": task_name,
- "function": "task_scheduler_deal",
- "message": "not task name in params",
- "status": "fail",
- "data": self.data,
- }
- )
- return await task_schedule_response.fail_response(
- error_code="4002", error_message="task_name must be input"
- )
- if not date_string:
- date_string = datetime.today().strftime("%Y-%m-%d")
- match task_name:
- case "check_kimi_balance":
- if await self.whether_task_processing(task_name):
- return await task_schedule_response.fail_response(
- error_code="5001", error_message="task is processing"
- )
- await self.record_task(task_name=task_name, date_string=date_string)
- await self.lock_task(task_name, date_string)
- response = await check_kimi_balance()
- await self.log_client.log(
- contents={
- "task": task_name,
- "function": "task_scheduler_deal",
- "message": "check_kimi_balance task execute successfully",
- "status": "success",
- "data": response,
- }
- )
- await self.release_task(
- task_name=task_name,
- date_string=date_string,
- final_status=response["code"],
- )
- return await task_schedule_response.success_response(
- task_name=task_name, data=response
- )
- case _:
- await self.log_client.log(
- contents={
- "task": task_name,
- "function": "task_scheduler_deal",
- "message": "wrong task input",
- "status": "success",
- "data": self.data,
- }
- )
- return await task_schedule_response.fail_response(
- error_code="4001", error_message="wrong task name input"
- )
|