task_scheduler.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. import time
  2. from datetime import datetime
  3. from applications.api import feishu_robot
  4. from applications.utils import task_schedule_response
  5. from applications.tasks.monitor_tasks import check_kimi_balance
  6. class TaskScheduler:
  7. def __init__(self, data, log_service, db_client):
  8. self.data = data
  9. self.log_client = log_service
  10. self.db_client = db_client
  11. self.table = "long_articles_task_manager"
  12. async def whether_task_processing(self, task_name: str) -> bool:
  13. """whether task is processing"""
  14. query = f"""
  15. select start_timestamp from {self.table} where task_name = %s and task_status = %s;
  16. """
  17. response = await self.db_client.async_fetch(query=query, params=(task_name, 1))
  18. if not response:
  19. # no task is processing
  20. return False
  21. else:
  22. start_timestamp = response[0]["start_timestamp"]
  23. # todo: every task should has a unique expire timestamp, remember to write that in a task config file
  24. if int(time.time()) - start_timestamp >= 86400:
  25. feishu_robot.bot(
  26. title=f"{task_name} has been processing for more than one day",
  27. detail={"timestamp": start_timestamp},
  28. )
  29. return True
  30. async def record_task(self, task_name, date_string):
  31. """record task"""
  32. query = f"""insert into {self.table} (date_string, task_name, start_timestamp) values (%s, %s, %s);"""
  33. await self.db_client.async_save(
  34. query=query, params=(date_string, task_name, int(time.time()))
  35. )
  36. async def lock_task(self, task_name, date_string):
  37. query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
  38. return await self.db_client.async_save(
  39. query=query, params=(1, task_name, date_string, 0)
  40. )
  41. async def release_task(self, task_name, date_string, final_status):
  42. """
  43. 任务执行完成之后,将任务状态设置为完成状态/失败状态
  44. """
  45. query = f"""
  46. update {self.table} set task_status = %s, finish_timestamp = %s
  47. where task_name = %s and date_string = %s and task_status = %s;
  48. """
  49. return await self.db_client.async_save(
  50. query=query, params=(final_status, int(time.time()), task_name, date_string, 1)
  51. )
  52. async def deal(self):
  53. task_name = self.data.get("task_name")
  54. date_string = self.data.get("date_string")
  55. if not task_name:
  56. await self.log_client.log(
  57. contents={
  58. "task": task_name,
  59. "function": "task_scheduler_deal",
  60. "message": "not task name in params",
  61. "status": "fail",
  62. "data": self.data,
  63. }
  64. )
  65. return await task_schedule_response.fail_response(
  66. error_code="4002", error_message="task_name must be input"
  67. )
  68. if not date_string:
  69. date_string = datetime.today().strftime("%Y-%m-%d")
  70. match task_name:
  71. case "check_kimi_balance":
  72. if await self.whether_task_processing(task_name):
  73. return await task_schedule_response.fail_response(
  74. error_code="5001", error_message="task is processing"
  75. )
  76. await self.record_task(task_name=task_name, date_string=date_string)
  77. await self.lock_task(task_name, date_string)
  78. response = await check_kimi_balance()
  79. await self.log_client.log(
  80. contents={
  81. "task": task_name,
  82. "function": "task_scheduler_deal",
  83. "message": "check_kimi_balance task execute successfully",
  84. "status": "success",
  85. "data": response,
  86. }
  87. )
  88. await self.release_task(task_name=task_name, date_string=date_string, final_status=response['code'])
  89. return await task_schedule_response.success_response(
  90. task_name=task_name, data=response
  91. )
  92. case _:
  93. await self.log_client.log(
  94. contents={
  95. "task": task_name,
  96. "function": "task_scheduler_deal",
  97. "message": "wrong task input",
  98. "status": "success",
  99. "data": self.data,
  100. }
  101. )
  102. return await task_schedule_response.fail_response(
  103. error_code="4001", error_message="wrong task name input"
  104. )