task_scheduler.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import time
  2. from datetime import datetime
  3. from applications.api import feishu_robot
  4. from applications.utils import task_schedule_response
  5. from applications.tasks.monitor_tasks import check_kimi_balance
  6. from applications.tasks.monitor_tasks import GetOffVideos
  7. class TaskScheduler:
  8. def __init__(self, data, log_service, db_client):
  9. self.data = data
  10. self.log_client = log_service
  11. self.db_client = db_client
  12. self.table = "long_articles_task_manager"
  13. async def whether_task_processing(self, task_name: str) -> bool:
  14. """whether task is processing"""
  15. query = f"""
  16. select start_timestamp from {self.table} where task_name = %s and task_status = %s;
  17. """
  18. response, error = await self.db_client.async_fetch(
  19. query=query, params=(task_name, 1)
  20. )
  21. if not response:
  22. # no task is processing
  23. return False
  24. else:
  25. start_timestamp = response[0]["start_timestamp"]
  26. # todo: every task should has a unique expire timestamp, remember to write that in a task config file
  27. if int(time.time()) - start_timestamp >= 86400:
  28. await feishu_robot.bot(
  29. title=f"{task_name} has been processing for more than one day",
  30. detail={"timestamp": start_timestamp},
  31. env="long_articles_task",
  32. )
  33. return True
  34. async def record_task(self, task_name, date_string):
  35. """record task"""
  36. query = f"""insert into {self.table} (date_string, task_name, start_timestamp) values (%s, %s, %s);"""
  37. await self.db_client.async_save(
  38. query=query, params=(date_string, task_name, int(time.time()))
  39. )
  40. async def lock_task(self, task_name, date_string):
  41. query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
  42. return await self.db_client.async_save(
  43. query=query, params=(1, task_name, date_string, 0)
  44. )
  45. async def release_task(self, task_name, date_string, final_status):
  46. """
  47. 任务执行完成之后,将任务状态设置为完成状态/失败状态
  48. """
  49. query = f"""
  50. update {self.table} set task_status = %s, finish_timestamp = %s
  51. where task_name = %s and date_string = %s and task_status = %s;
  52. """
  53. return await self.db_client.async_save(
  54. query=query,
  55. params=(final_status, int(time.time()), task_name, date_string, 1),
  56. )
  57. async def deal(self):
  58. task_name = self.data.get("task_name")
  59. date_string = self.data.get("date_string")
  60. if not task_name:
  61. await self.log_client.log(
  62. contents={
  63. "task": task_name,
  64. "function": "task_scheduler_deal",
  65. "message": "not task name in params",
  66. "status": "fail",
  67. "data": self.data,
  68. }
  69. )
  70. return await task_schedule_response.fail_response(
  71. error_code="4002", error_message="task_name must be input"
  72. )
  73. if not date_string:
  74. date_string = datetime.today().strftime("%Y-%m-%d")
  75. # prepare for task
  76. if await self.whether_task_processing(task_name):
  77. return await task_schedule_response.fail_response(
  78. error_code="5001", error_message="task is processing"
  79. )
  80. await self.record_task(task_name=task_name, date_string=date_string)
  81. await self.lock_task(task_name, date_string)
  82. match task_name:
  83. case "check_kimi_balance":
  84. response = await check_kimi_balance()
  85. await self.log_client.log(
  86. contents={
  87. "task": task_name,
  88. "function": "task_scheduler_deal",
  89. "message": "check_kimi_balance task execute successfully",
  90. "status": "success",
  91. "data": response,
  92. }
  93. )
  94. await self.release_task(task_name, date_string, response["code"])
  95. return await task_schedule_response.success_response(
  96. task_name=task_name, data=response
  97. )
  98. case "get_off_videos":
  99. sub_task = GetOffVideos(self.db_client, self.log_client)
  100. await sub_task.get_off_job()
  101. response_code = await sub_task.check()
  102. await self.release_task(task_name, date_string, response_code)
  103. return await task_schedule_response.success_response(
  104. task_name=task_name, data={"code": response_code, "message": "get off_videos finished"}
  105. )
  106. case _:
  107. await self.log_client.log(
  108. contents={
  109. "task": task_name,
  110. "function": "task_scheduler_deal",
  111. "message": "wrong task input",
  112. "status": "success",
  113. "data": self.data,
  114. }
  115. )
  116. await self.release_task(task_name, date_string, 99)
  117. return await task_schedule_response.fail_response(
  118. error_code="4001", error_message="wrong task name input"
  119. )