task_scheduler.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. import time
  2. from datetime import datetime
  3. from applications.api import feishu_robot
  4. from applications.utils import task_schedule_response
  5. from applications.tasks.monitor_tasks import check_kimi_balance
  6. class TaskScheduler:
  7. def __init__(self, data, log_service, db_client):
  8. self.data = data
  9. self.log_client = log_service
  10. self.db_client = db_client
  11. self.table = "long_articles_task_manager"
  12. async def whether_task_processing(self, task_name: str) -> bool:
  13. """whether task is processing"""
  14. query = f"""
  15. select start_timestamp from {self.table} where task_name = %s and task_status = %s;
  16. """
  17. response, error = await self.db_client.async_fetch(
  18. query=query, params=(task_name, 1)
  19. )
  20. if not response:
  21. # no task is processing
  22. return False
  23. else:
  24. start_timestamp = response[0]["start_timestamp"]
  25. # todo: every task should has a unique expire timestamp, remember to write that in a task config file
  26. if int(time.time()) - start_timestamp >= 86400:
  27. feishu_robot.bot(
  28. title=f"{task_name} has been processing for more than one day",
  29. detail={"timestamp": start_timestamp},
  30. )
  31. return True
  32. async def record_task(self, task_name, date_string):
  33. """record task"""
  34. query = f"""insert into {self.table} (date_string, task_name, start_timestamp) values (%s, %s, %s);"""
  35. await self.db_client.async_save(
  36. query=query, params=(date_string, task_name, int(time.time()))
  37. )
  38. async def lock_task(self, task_name, date_string):
  39. query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
  40. return await self.db_client.async_save(
  41. query=query, params=(1, task_name, date_string, 0)
  42. )
  43. async def release_task(self, task_name, date_string, final_status):
  44. """
  45. 任务执行完成之后,将任务状态设置为完成状态/失败状态
  46. """
  47. query = f"""
  48. update {self.table} set task_status = %s, finish_timestamp = %s
  49. where task_name = %s and date_string = %s and task_status = %s;
  50. """
  51. return await self.db_client.async_save(
  52. query=query,
  53. params=(final_status, int(time.time()), task_name, date_string, 1),
  54. )
  55. async def deal(self):
  56. task_name = self.data.get("task_name")
  57. date_string = self.data.get("date_string")
  58. if not task_name:
  59. await self.log_client.log(
  60. contents={
  61. "task": task_name,
  62. "function": "task_scheduler_deal",
  63. "message": "not task name in params",
  64. "status": "fail",
  65. "data": self.data,
  66. }
  67. )
  68. return await task_schedule_response.fail_response(
  69. error_code="4002", error_message="task_name must be input"
  70. )
  71. if not date_string:
  72. date_string = datetime.today().strftime("%Y-%m-%d")
  73. match task_name:
  74. case "check_kimi_balance":
  75. if await self.whether_task_processing(task_name):
  76. return await task_schedule_response.fail_response(
  77. error_code="5001", error_message="task is processing"
  78. )
  79. await self.record_task(task_name=task_name, date_string=date_string)
  80. await self.lock_task(task_name, date_string)
  81. response = await check_kimi_balance()
  82. await self.log_client.log(
  83. contents={
  84. "task": task_name,
  85. "function": "task_scheduler_deal",
  86. "message": "check_kimi_balance task execute successfully",
  87. "status": "success",
  88. "data": response,
  89. }
  90. )
  91. await self.release_task(
  92. task_name=task_name,
  93. date_string=date_string,
  94. final_status=response["code"],
  95. )
  96. return await task_schedule_response.success_response(
  97. task_name=task_name, data=response
  98. )
  99. case _:
  100. await self.log_client.log(
  101. contents={
  102. "task": task_name,
  103. "function": "task_scheduler_deal",
  104. "message": "wrong task input",
  105. "status": "success",
  106. "data": self.data,
  107. }
  108. )
  109. return await task_schedule_response.fail_response(
  110. error_code="4001", error_message="wrong task name input"
  111. )