task_scheduler.py 12 KB


  1. import asyncio
  2. import time
  3. from datetime import datetime
  4. from applications.api import feishu_robot
  5. from applications.utils import task_schedule_response
  6. from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
  7. from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
  8. from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
  9. from applications.tasks.llm_tasks import TitleRewrite
  10. from applications.tasks.monitor_tasks import check_kimi_balance
  11. from applications.tasks.monitor_tasks import GetOffVideos
  12. from applications.tasks.monitor_tasks import CheckVideoAuditStatus
  13. from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
  14. from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
  15. from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
  16. from applications.tasks.monitor_tasks import TaskProcessingMonitor
  17. from applications.tasks.task_mapper import TaskMapper
  18. class TaskScheduler(TaskMapper):
  19. def __init__(self, data, log_service, db_client):
  20. self.data = data
  21. self.log_client = log_service
  22. self.db_client = db_client
  23. self.table = "long_articles_task_manager"
  24. async def whether_task_processing(self, task_name: str) -> bool:
  25. """whether task is processing"""
  26. query = f"""
  27. select start_timestamp from {self.table} where task_name = %s and task_status = %s;
  28. """
  29. response, error = await self.db_client.async_fetch(
  30. query=query, params=(task_name, 1)
  31. )
  32. if not response:
  33. # no task is processing
  34. return False
  35. else:
  36. start_timestamp = response[0]["start_timestamp"]
  37. if int(time.time()) - start_timestamp >= self.get_task_config(
  38. task_name
  39. ).get("expire_duration", self.DEFAULT_TIMEOUT):
  40. await feishu_robot.bot(
  41. title=f"{task_name} has been processing over timeout",
  42. detail={"timestamp": start_timestamp},
  43. env="long_articles_task",
  44. )
  45. return True
  46. async def record_task(self, task_name, date_string):
  47. """record task"""
  48. query = f"""insert into {self.table} (date_string, task_name, start_timestamp) values (%s, %s, %s);"""
  49. await self.db_client.async_save(
  50. query=query, params=(date_string, task_name, int(time.time()))
  51. )
  52. async def lock_task(self, task_name, date_string):
  53. query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
  54. return await self.db_client.async_save(
  55. query=query, params=(1, task_name, date_string, 0)
  56. )
  57. async def release_task(self, task_name, date_string, final_status):
  58. """
  59. 任务执行完成之后,将任务状态设置为完成状态/失败状态
  60. """
  61. query = f"""
  62. update {self.table}
  63. set task_status = %s, finish_timestamp = %s
  64. where task_name = %s and date_string = %s and task_status = %s;
  65. """
  66. return await self.db_client.async_save(
  67. query=query,
  68. params=(final_status, int(time.time()), task_name, date_string, 1),
  69. )
  70. async def deal(self):
  71. task_name = self.data.get("task_name")
  72. date_string = self.data.get("date_string")
  73. if not task_name:
  74. await self.log_client.log(
  75. contents={
  76. "task": task_name,
  77. "function": "task_scheduler_deal",
  78. "message": "not task name in params",
  79. "status": "fail",
  80. "data": self.data,
  81. }
  82. )
  83. return await task_schedule_response.fail_response(
  84. error_code="4002", error_message="task_name must be input"
  85. )
  86. if not date_string:
  87. date_string = datetime.today().strftime("%Y-%m-%d")
  88. # prepare for task
  89. if await self.whether_task_processing(task_name):
  90. return await task_schedule_response.fail_response(
  91. error_code="5001", error_message="task is processing"
  92. )
  93. await self.record_task(task_name=task_name, date_string=date_string)
  94. await self.lock_task(task_name, date_string)
  95. match task_name:
  96. case "check_kimi_balance":
  97. response = await check_kimi_balance()
  98. await self.log_client.log(
  99. contents={
  100. "task": task_name,
  101. "function": "task_scheduler_deal",
  102. "message": "check_kimi_balance task execute successfully",
  103. "status": "success",
  104. "data": response,
  105. }
  106. )
  107. await self.release_task(
  108. task_name=task_name,
  109. date_string=date_string,
  110. final_status=response["code"],
  111. )
  112. return await task_schedule_response.success_response(
  113. task_name=task_name, data=response
  114. )
  115. case "get_off_videos":
  116. async def background_get_off_videos():
  117. sub_task = GetOffVideos(self.db_client, self.log_client)
  118. await sub_task.get_off_job()
  119. task_status = await sub_task.check()
  120. await self.release_task(
  121. task_name=task_name,
  122. date_string=date_string,
  123. final_status=task_status,
  124. )
  125. asyncio.create_task(background_get_off_videos())
  126. return await task_schedule_response.success_response(
  127. task_name=task_name,
  128. data={"code": 0, "message": "get off_videos started background"},
  129. )
  130. case "check_publish_video_audit_status":
  131. async def background_check_publish_video_audit_status():
  132. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client)
  133. print("start processing task status: ")
  134. task_status = await sub_task.deal()
  135. await self.release_task(
  136. task_name=task_name,
  137. date_string=date_string,
  138. final_status=task_status,
  139. )
  140. print("finish task status: ", task_status)
  141. asyncio.create_task(background_check_publish_video_audit_status())
  142. return await task_schedule_response.success_response(
  143. task_name=task_name,
  144. data={
  145. "code": 0,
  146. "message": "check publish video audit status started",
  147. },
  148. )
  149. case "outside_article_monitor":
  150. async def background_outside_article_monitor():
  151. collect_task = OutsideGzhArticlesCollector(self.db_client)
  152. await collect_task.deal()
  153. monitor_task = OutsideGzhArticlesMonitor(self.db_client)
  154. final_status = await monitor_task.deal()
  155. await self.release_task(
  156. task_name, date_string, final_status=final_status
  157. )
  158. asyncio.create_task(background_outside_article_monitor())
  159. return await task_schedule_response.success_response(
  160. task_name=task_name,
  161. data={
  162. "code": 0,
  163. "message": "outside_article_monitor started background",
  164. },
  165. )
  166. case "inner_article_monitor":
  167. async def background_inner_article_monitor():
  168. task = InnerGzhArticlesMonitor(self.db_client)
  169. final_status = await task.deal()
  170. await self.release_task(
  171. task_name, date_string, final_status=final_status
  172. )
  173. asyncio.create_task(background_inner_article_monitor())
  174. return await task_schedule_response.success_response(
  175. task_name=task_name,
  176. data={"code": 0, "message": "task started background"},
  177. )
  178. case "title_rewrite":
  179. async def background_title_rewrite():
  180. sub_task = TitleRewrite(self.db_client, self.log_client)
  181. await sub_task.deal()
  182. await self.release_task(
  183. task_name=task_name, date_string=date_string, final_status=2
  184. )
  185. asyncio.create_task(background_title_rewrite())
  186. return await task_schedule_response.success_response(
  187. task_name=task_name,
  188. data={
  189. "code": 0,
  190. "message": "inner_article_monitor started background",
  191. },
  192. )
  193. case "daily_publish_articles_recycle":
  194. async def background_daily_publish_articles_recycle():
  195. sub_task = RecycleDailyPublishArticlesTask(
  196. self.db_client, self.log_client, date_string
  197. )
  198. await sub_task.deal()
  199. task = CheckDailyPublishArticlesTask(
  200. self.db_client, self.log_client, date_string
  201. )
  202. await task.deal()
  203. await self.release_task(
  204. task_name=task_name, date_string=date_string, final_status=2
  205. )
  206. asyncio.create_task(background_daily_publish_articles_recycle())
  207. return await task_schedule_response.success_response(
  208. task_name=task_name,
  209. data={"code": 0, "message": "task started background"},
  210. )
  211. case "update_root_source_id":
  212. async def background_update_root_source_id():
  213. sub_task = UpdateRootSourceIdAndUpdateTimeTask(
  214. self.db_client, self.log_client
  215. )
  216. await sub_task.deal()
  217. await self.release_task(
  218. task_name=task_name, date_string=date_string, final_status=2
  219. )
  220. asyncio.create_task(background_update_root_source_id())
  221. return await task_schedule_response.success_response(
  222. task_name=task_name,
  223. data={"code": 0, "message": "task started background"},
  224. )
  225. case "task_processing_monitor":
  226. async def background_task_processing_monitor():
  227. sub_task = TaskProcessingMonitor(self.db_client)
  228. await sub_task.deal()
  229. await self.release_task(task_name=task_name, date_string=date_string, final_status=2)
  230. asyncio.create_task(background_task_processing_monitor())
  231. return await task_schedule_response.success_response(
  232. task_name=task_name,
  233. data={"code": 0, "message": "task started background"},
  234. )
  235. case _:
  236. await self.log_client.log(
  237. contents={
  238. "task": task_name,
  239. "function": "task_scheduler_deal",
  240. "message": "wrong task input",
  241. "status": "success",
  242. "data": self.data,
  243. }
  244. )
  245. await self.release_task(task_name, date_string, 99)
  246. return await task_schedule_response.fail_response(
  247. error_code="4001", error_message="wrong task name input"
  248. )