task_scheduler.py 8.6 KB


  1. import asyncio
  2. import time
  3. from datetime import datetime
  4. from applications.api import feishu_robot
  5. from applications.utils import task_schedule_response
  6. from applications.tasks.monitor_tasks import check_kimi_balance
  7. from applications.tasks.monitor_tasks import GetOffVideos
  8. from applications.tasks.monitor_tasks import CheckVideoAuditStatus
  9. from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
  10. from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
  11. from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
  12. class TaskScheduler:
  13. def __init__(self, data, log_service, db_client):
  14. self.data = data
  15. self.log_client = log_service
  16. self.db_client = db_client
  17. self.table = "long_articles_task_manager"
  18. async def whether_task_processing(self, task_name: str) -> bool:
  19. """whether task is processing"""
  20. query = f"""
  21. select start_timestamp from {self.table} where task_name = %s and task_status = %s;
  22. """
  23. response, error = await self.db_client.async_fetch(
  24. query=query, params=(task_name, 1)
  25. )
  26. if not response:
  27. # no task is processing
  28. return False
  29. else:
  30. start_timestamp = response[0]["start_timestamp"]
  31. # todo: every task should has a unique expire timestamp, remember to write that in a task config file
  32. if int(time.time()) - start_timestamp >= 86400:
  33. await feishu_robot.bot(
  34. title=f"{task_name} has been processing for more than one day",
  35. detail={"timestamp": start_timestamp},
  36. env="long_articles_task",
  37. )
  38. return True
  39. async def record_task(self, task_name, date_string):
  40. """record task"""
  41. query = f"""insert into {self.table} (date_string, task_name, start_timestamp) values (%s, %s, %s);"""
  42. await self.db_client.async_save(
  43. query=query, params=(date_string, task_name, int(time.time()))
  44. )
  45. async def lock_task(self, task_name, date_string):
  46. query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
  47. return await self.db_client.async_save(
  48. query=query, params=(1, task_name, date_string, 0)
  49. )
  50. async def release_task(self, task_name, date_string, final_status):
  51. """
  52. 任务执行完成之后,将任务状态设置为完成状态/失败状态
  53. """
  54. query = f"""
  55. update {self.table}
  56. set task_status = %s, finish_timestamp = %s
  57. where task_name = %s and date_string = %s and task_status = %s;
  58. """
  59. return await self.db_client.async_save(
  60. query=query,
  61. params=(final_status, int(time.time()), task_name, date_string, 1),
  62. )
  63. async def deal(self):
  64. task_name = self.data.get("task_name")
  65. date_string = self.data.get("date_string")
  66. if not task_name:
  67. await self.log_client.log(
  68. contents={
  69. "task": task_name,
  70. "function": "task_scheduler_deal",
  71. "message": "not task name in params",
  72. "status": "fail",
  73. "data": self.data,
  74. }
  75. )
  76. return await task_schedule_response.fail_response(
  77. error_code="4002", error_message="task_name must be input"
  78. )
  79. if not date_string:
  80. date_string = datetime.today().strftime("%Y-%m-%d")
  81. # prepare for task
  82. if await self.whether_task_processing(task_name):
  83. return await task_schedule_response.fail_response(
  84. error_code="5001", error_message="task is processing"
  85. )
  86. await self.record_task(task_name=task_name, date_string=date_string)
  87. await self.lock_task(task_name, date_string)
  88. match task_name:
  89. case "check_kimi_balance":
  90. response = await check_kimi_balance()
  91. await self.log_client.log(
  92. contents={
  93. "task": task_name,
  94. "function": "task_scheduler_deal",
  95. "message": "check_kimi_balance task execute successfully",
  96. "status": "success",
  97. "data": response,
  98. }
  99. )
  100. await self.release_task(
  101. task_name=task_name,
  102. date_string=date_string,
  103. final_status=response["code"],
  104. )
  105. return await task_schedule_response.success_response(
  106. task_name=task_name, data=response
  107. )
  108. case "get_off_videos":
  109. async def background_get_off_videos():
  110. sub_task = GetOffVideos(self.db_client, self.log_client)
  111. await sub_task.get_off_job()
  112. task_status = await sub_task.check()
  113. await self.release_task(
  114. task_name=task_name,
  115. date_string=date_string,
  116. final_status=task_status,
  117. )
  118. asyncio.create_task(background_get_off_videos())
  119. return await task_schedule_response.success_response(
  120. task_name=task_name,
  121. data={"code": 0, "message": "get off_videos started background"},
  122. )
  123. case "check_publish_video_audit_status":
  124. async def background_check_publish_video_audit_status():
  125. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client)
  126. print("start processing task status: ")
  127. task_status = await sub_task.deal()
  128. await self.release_task(
  129. task_name=task_name,
  130. date_string=date_string,
  131. final_status=task_status,
  132. )
  133. print("finish task status: ", task_status)
  134. asyncio.create_task(background_check_publish_video_audit_status())
  135. return await task_schedule_response.success_response(
  136. task_name=task_name,
  137. data={
  138. "code": 0,
  139. "message": "check publish video audit status started",
  140. },
  141. )
  142. case "outside_article_monitor":
  143. async def background_outside_article_monitor():
  144. collect_task = OutsideGzhArticlesCollector(self.db_client)
  145. await collect_task.deal()
  146. monitor_task = OutsideGzhArticlesMonitor(self.db_client)
  147. final_status = await monitor_task.deal()
  148. await self.release_task(
  149. task_name, date_string, final_status=final_status
  150. )
  151. asyncio.create_task(background_outside_article_monitor())
  152. return await task_schedule_response.success_response(
  153. task_name=task_name,
  154. data={
  155. "code": 0,
  156. "message": "outside_article_monitor started background",
  157. },
  158. )
  159. case "inner_article_monitor":
  160. async def background_inner_article_monitor():
  161. task = InnerGzhArticlesMonitor(self.db_client)
  162. final_status = await task.deal()
  163. await self.release_task(task_name, date_string, final_status=final_status)
  164. asyncio.create_task(background_inner_article_monitor())
  165. return await task_schedule_response.success_response(
  166. task_name=task_name,
  167. data={
  168. "code": 0,
  169. "message": "inner_article_monitor started background",
  170. }
  171. )
  172. case _:
  173. await self.log_client.log(
  174. contents={
  175. "task": task_name,
  176. "function": "task_scheduler_deal",
  177. "message": "wrong task input",
  178. "status": "success",
  179. "data": self.data,
  180. }
  181. )
  182. await self.release_task(task_name, date_string, 99)
  183. return await task_schedule_response.fail_response(
  184. error_code="4001", error_message="wrong task name input"
  185. )