task_scheduler.py 9.3 KB


  1. import asyncio
  2. import time
  3. from datetime import datetime
  4. from applications.api import feishu_robot
  5. from applications.tasks.llm_tasks import TitleRewrite
  6. from applications.utils import task_schedule_response
  7. from applications.tasks.llm_tasks import TitleRewrite
  8. from applications.tasks.monitor_tasks import check_kimi_balance
  9. from applications.tasks.monitor_tasks import GetOffVideos
  10. from applications.tasks.monitor_tasks import CheckVideoAuditStatus
  11. from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
  12. from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
  13. from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
  14. class TaskScheduler:
  15. def __init__(self, data, log_service, db_client):
  16. self.data = data
  17. self.log_client = log_service
  18. self.db_client = db_client
  19. self.table = "long_articles_task_manager"
  20. async def whether_task_processing(self, task_name: str) -> bool:
  21. """whether task is processing"""
  22. query = f"""
  23. select start_timestamp from {self.table} where task_name = %s and task_status = %s;
  24. """
  25. response, error = await self.db_client.async_fetch(
  26. query=query, params=(task_name, 1)
  27. )
  28. if not response:
  29. # no task is processing
  30. return False
  31. else:
  32. start_timestamp = response[0]["start_timestamp"]
  33. # todo: every task should has a unique expire timestamp, remember to write that in a task config file
  34. if int(time.time()) - start_timestamp >= 86400:
  35. await feishu_robot.bot(
  36. title=f"{task_name} has been processing for more than one day",
  37. detail={"timestamp": start_timestamp},
  38. env="long_articles_task",
  39. )
  40. return True
  41. async def record_task(self, task_name, date_string):
  42. """record task"""
  43. query = f"""insert into {self.table} (date_string, task_name, start_timestamp) values (%s, %s, %s);"""
  44. await self.db_client.async_save(
  45. query=query, params=(date_string, task_name, int(time.time()))
  46. )
  47. async def lock_task(self, task_name, date_string):
  48. query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
  49. return await self.db_client.async_save(
  50. query=query, params=(1, task_name, date_string, 0)
  51. )
  52. async def release_task(self, task_name, date_string, final_status):
  53. """
  54. 任务执行完成之后,将任务状态设置为完成状态/失败状态
  55. """
  56. query = f"""
  57. update {self.table}
  58. set task_status = %s, finish_timestamp = %s
  59. where task_name = %s and date_string = %s and task_status = %s;
  60. """
  61. return await self.db_client.async_save(
  62. query=query,
  63. params=(final_status, int(time.time()), task_name, date_string, 1),
  64. )
  65. async def deal(self):
  66. task_name = self.data.get("task_name")
  67. date_string = self.data.get("date_string")
  68. if not task_name:
  69. await self.log_client.log(
  70. contents={
  71. "task": task_name,
  72. "function": "task_scheduler_deal",
  73. "message": "not task name in params",
  74. "status": "fail",
  75. "data": self.data,
  76. }
  77. )
  78. return await task_schedule_response.fail_response(
  79. error_code="4002", error_message="task_name must be input"
  80. )
  81. if not date_string:
  82. date_string = datetime.today().strftime("%Y-%m-%d")
  83. # prepare for task
  84. if await self.whether_task_processing(task_name):
  85. return await task_schedule_response.fail_response(
  86. error_code="5001", error_message="task is processing"
  87. )
  88. await self.record_task(task_name=task_name, date_string=date_string)
  89. await self.lock_task(task_name, date_string)
  90. match task_name:
  91. case "check_kimi_balance":
  92. response = await check_kimi_balance()
  93. await self.log_client.log(
  94. contents={
  95. "task": task_name,
  96. "function": "task_scheduler_deal",
  97. "message": "check_kimi_balance task execute successfully",
  98. "status": "success",
  99. "data": response,
  100. }
  101. )
  102. await self.release_task(
  103. task_name=task_name,
  104. date_string=date_string,
  105. final_status=response["code"],
  106. )
  107. return await task_schedule_response.success_response(
  108. task_name=task_name, data=response
  109. )
  110. case "get_off_videos":
  111. async def background_get_off_videos():
  112. sub_task = GetOffVideos(self.db_client, self.log_client)
  113. await sub_task.get_off_job()
  114. task_status = await sub_task.check()
  115. await self.release_task(
  116. task_name=task_name,
  117. date_string=date_string,
  118. final_status=task_status,
  119. )
  120. asyncio.create_task(background_get_off_videos())
  121. return await task_schedule_response.success_response(
  122. task_name=task_name,
  123. data={"code": 0, "message": "get off_videos started background"},
  124. )
  125. case "check_publish_video_audit_status":
  126. async def background_check_publish_video_audit_status():
  127. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client)
  128. print("start processing task status: ")
  129. task_status = await sub_task.deal()
  130. await self.release_task(
  131. task_name=task_name,
  132. date_string=date_string,
  133. final_status=task_status,
  134. )
  135. print("finish task status: ", task_status)
  136. asyncio.create_task(background_check_publish_video_audit_status())
  137. return await task_schedule_response.success_response(
  138. task_name=task_name,
  139. data={
  140. "code": 0,
  141. "message": "check publish video audit status started",
  142. },
  143. )
  144. case "outside_article_monitor":
  145. async def background_outside_article_monitor():
  146. collect_task = OutsideGzhArticlesCollector(self.db_client)
  147. await collect_task.deal()
  148. monitor_task = OutsideGzhArticlesMonitor(self.db_client)
  149. final_status = await monitor_task.deal()
  150. await self.release_task(
  151. task_name, date_string, final_status=final_status
  152. )
  153. asyncio.create_task(background_outside_article_monitor())
  154. return await task_schedule_response.success_response(
  155. task_name=task_name,
  156. data={
  157. "code": 0,
  158. "message": "outside_article_monitor started background",
  159. },
  160. )
  161. case "inner_article_monitor":
  162. async def background_inner_article_monitor():
  163. task = InnerGzhArticlesMonitor(self.db_client)
  164. final_status = await task.deal()
  165. await self.release_task(task_name, date_string, final_status=final_status)
  166. asyncio.create_task(background_inner_article_monitor())
  167. return await task_schedule_response.success_response(
  168. task_name=task_name,
  169. data={
  170. "code": 0,
  171. "message": "inner_article_monitor started background",
  172. }
  173. )
  174. case "title_rewrite":
  175. async def background_title_rewrite():
  176. sub_task = TitleRewrite(self.db_client, self.log_client)
  177. await sub_task.deal()
  178. await self.release_task(task_name=task_name, date_string=date_string, final_status=2)
  179. asyncio.create_task(background_title_rewrite())
  180. return await task_schedule_response.success_response(
  181. task_name=task_name,
  182. data={
  183. "code": 0,
  184. "message": "inner_article_monitor started background",
  185. }
  186. )
  187. case _:
  188. await self.log_client.log(
  189. contents={
  190. "task": task_name,
  191. "function": "task_scheduler_deal",
  192. "message": "wrong task input",
  193. "status": "success",
  194. "data": self.data,
  195. }
  196. )
  197. await self.release_task(task_name, date_string, 99)
  198. return await task_schedule_response.fail_response(
  199. error_code="4001", error_message="wrong task name input"
  200. )