task_scheduler.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import asyncio
  2. import time
  3. from datetime import datetime
  4. from applications.api import feishu_robot
  5. from applications.utils import task_schedule_response
  6. from applications.tasks.monitor_tasks import check_kimi_balance
  7. from applications.tasks.monitor_tasks import GetOffVideos
  8. class TaskScheduler:
  9. def __init__(self, data, log_service, db_client):
  10. self.data = data
  11. self.log_client = log_service
  12. self.db_client = db_client
  13. self.table = "long_articles_task_manager"
  14. async def whether_task_processing(self, task_name: str) -> bool:
  15. """whether task is processing"""
  16. query = f"""
  17. select start_timestamp from {self.table} where task_name = %s and task_status = %s;
  18. """
  19. response, error = await self.db_client.async_fetch(
  20. query=query, params=(task_name, 1)
  21. )
  22. if not response:
  23. # no task is processing
  24. return False
  25. else:
  26. start_timestamp = response[0]["start_timestamp"]
  27. # todo: every task should has a unique expire timestamp, remember to write that in a task config file
  28. if int(time.time()) - start_timestamp >= 86400:
  29. await feishu_robot.bot(
  30. title=f"{task_name} has been processing for more than one day",
  31. detail={"timestamp": start_timestamp},
  32. env="long_articles_task",
  33. )
  34. return True
  35. async def record_task(self, task_name, date_string):
  36. """record task"""
  37. query = f"""insert into {self.table} (date_string, task_name, start_timestamp) values (%s, %s, %s);"""
  38. await self.db_client.async_save(
  39. query=query, params=(date_string, task_name, int(time.time()))
  40. )
  41. async def lock_task(self, task_name, date_string):
  42. query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
  43. return await self.db_client.async_save(
  44. query=query, params=(1, task_name, date_string, 0)
  45. )
  46. async def release_task(self, task_name, date_string, final_status):
  47. """
  48. 任务执行完成之后,将任务状态设置为完成状态/失败状态
  49. """
  50. query = f"""
  51. update {self.table}
  52. set task_status = %s, finish_timestamp = %s
  53. where task_name = %s and date_string = %s and task_status = %s;
  54. """
  55. return await self.db_client.async_save(
  56. query=query,
  57. params=(final_status, int(time.time()), task_name, date_string, 1),
  58. )
  59. async def deal(self):
  60. task_name = self.data.get("task_name")
  61. date_string = self.data.get("date_string")
  62. if not task_name:
  63. await self.log_client.log(
  64. contents={
  65. "task": task_name,
  66. "function": "task_scheduler_deal",
  67. "message": "not task name in params",
  68. "status": "fail",
  69. "data": self.data,
  70. }
  71. )
  72. return await task_schedule_response.fail_response(
  73. error_code="4002", error_message="task_name must be input"
  74. )
  75. if not date_string:
  76. date_string = datetime.today().strftime("%Y-%m-%d")
  77. # prepare for task
  78. if await self.whether_task_processing(task_name):
  79. return await task_schedule_response.fail_response(
  80. error_code="5001", error_message="task is processing"
  81. )
  82. await self.record_task(task_name=task_name, date_string=date_string)
  83. await self.lock_task(task_name, date_string)
  84. match task_name:
  85. case "check_kimi_balance":
  86. response = await check_kimi_balance()
  87. await self.log_client.log(
  88. contents={
  89. "task": task_name,
  90. "function": "task_scheduler_deal",
  91. "message": "check_kimi_balance task execute successfully",
  92. "status": "success",
  93. "data": response,
  94. }
  95. )
  96. return await task_schedule_response.success_response(
  97. task_name=task_name, data=response
  98. )
  99. case "get_off_videos":
  100. async def background_get_off_videos():
  101. sub_task = GetOffVideos(self.db_client, self.log_client)
  102. await sub_task.get_off_job()
  103. task_status = await sub_task.check()
  104. await self.release_task(
  105. task_name=task_name,
  106. date_string=date_string,
  107. final_status=task_status,
  108. )
  109. asyncio.create_task(background_get_off_videos())
  110. return await task_schedule_response.success_response(
  111. task_name=task_name,
  112. data={"code": 0, "message": "get off_videos started background"},
  113. )
  114. case _:
  115. await self.log_client.log(
  116. contents={
  117. "task": task_name,
  118. "function": "task_scheduler_deal",
  119. "message": "wrong task input",
  120. "status": "success",
  121. "data": self.data,
  122. }
  123. )
  124. await self.release_task(task_name, date_string, 99)
  125. return await task_schedule_response.fail_response(
  126. error_code="4001", error_message="wrong task name input"
  127. )