task_scheduler.py 13 KB


  1. import asyncio
  2. import time
  3. from datetime import datetime
  4. from applications.api import feishu_robot
  5. from applications.utils import task_schedule_response
  6. from applications.tasks.crawler_tasks import CrawlerToutiao
  7. from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
  8. from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
  9. from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
  10. from applications.tasks.llm_tasks import TitleRewrite
  11. from applications.tasks.monitor_tasks import check_kimi_balance
  12. from applications.tasks.monitor_tasks import GetOffVideos
  13. from applications.tasks.monitor_tasks import CheckVideoAuditStatus
  14. from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
  15. from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
  16. from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
  17. from applications.tasks.monitor_tasks import TaskProcessingMonitor
  18. from applications.tasks.task_mapper import TaskMapper
  19. class TaskScheduler(TaskMapper):
  20. def __init__(self, data, log_service, db_client):
  21. self.data = data
  22. self.log_client = log_service
  23. self.db_client = db_client
  24. self.table = "long_articles_task_manager"
  25. async def whether_task_processing(self, task_name: str) -> bool:
  26. """whether task is processing"""
  27. query = f"""
  28. select start_timestamp from {self.table} where task_name = %s and task_status = %s;
  29. """
  30. response = await self.db_client.async_fetch(query=query, params=(task_name, 1))
  31. if not response:
  32. # no task is processing
  33. return False
  34. else:
  35. start_timestamp = response[0]["start_timestamp"]
  36. if int(time.time()) - start_timestamp >= self.get_task_config(
  37. task_name
  38. ).get("expire_duration", self.DEFAULT_TIMEOUT):
  39. await feishu_robot.bot(
  40. title=f"{task_name} has been processing over timeout",
  41. detail={"timestamp": start_timestamp},
  42. env="long_articles_task",
  43. )
  44. return True
  45. async def record_task(self, task_name, date_string):
  46. """record task"""
  47. query = f"""insert into {self.table} (date_string, task_name, start_timestamp) values (%s, %s, %s);"""
  48. await self.db_client.async_save(
  49. query=query, params=(date_string, task_name, int(time.time()))
  50. )
  51. async def lock_task(self, task_name, date_string):
  52. query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
  53. return await self.db_client.async_save(
  54. query=query,
  55. params=(
  56. self.TASK_PROCESSING_STATUS,
  57. task_name,
  58. date_string,
  59. self.TASK_INIT_STATUS,
  60. ),
  61. )
  62. async def release_task(self, task_name, date_string, final_status=None):
  63. """
  64. 任务执行完成之后,将任务状态设置为完成状态/失败状态
  65. """
  66. if not final_status:
  67. final_status = self.TASK_SUCCESS_STATUS
  68. query = f"""
  69. update {self.table}
  70. set task_status = %s, finish_timestamp = %s
  71. where task_name = %s and date_string = %s and task_status = %s;
  72. """
  73. return await self.db_client.async_save(
  74. query=query,
  75. params=(
  76. final_status,
  77. int(time.time()),
  78. task_name,
  79. date_string,
  80. self.TASK_PROCESSING_STATUS,
  81. ),
  82. )
  83. async def deal(self):
  84. task_name = self.data.get("task_name")
  85. date_string = self.data.get("date_string")
  86. if not task_name:
  87. await self.log_client.log(
  88. contents={
  89. "task": task_name,
  90. "function": "task_scheduler_deal",
  91. "message": "not task name in params",
  92. "status": "fail",
  93. "data": self.data,
  94. }
  95. )
  96. return await task_schedule_response.fail_response(
  97. error_code="4002", error_message="task_name must be input"
  98. )
  99. if not date_string:
  100. date_string = datetime.today().strftime("%Y-%m-%d")
  101. # prepare for task
  102. if await self.whether_task_processing(task_name):
  103. return await task_schedule_response.fail_response(
  104. error_code="5001", error_message="task is processing"
  105. )
  106. await self.record_task(task_name=task_name, date_string=date_string)
  107. await self.lock_task(task_name, date_string)
  108. match task_name:
  109. case "check_kimi_balance":
  110. response = await check_kimi_balance()
  111. await self.log_client.log(
  112. contents={
  113. "task": task_name,
  114. "function": "task_scheduler_deal",
  115. "message": "check_kimi_balance task execute successfully",
  116. "status": "success",
  117. "data": response,
  118. }
  119. )
  120. await self.release_task(
  121. task_name=task_name,
  122. date_string=date_string,
  123. final_status=response["code"],
  124. )
  125. return await task_schedule_response.success_response(
  126. task_name=task_name, data=response
  127. )
  128. case "get_off_videos":
  129. async def background_get_off_videos():
  130. sub_task = GetOffVideos(self.db_client, self.log_client)
  131. await sub_task.get_off_job()
  132. task_status = await sub_task.check()
  133. await self.release_task(
  134. task_name=task_name,
  135. date_string=date_string,
  136. final_status=task_status,
  137. )
  138. asyncio.create_task(background_get_off_videos())
  139. return await task_schedule_response.success_response(
  140. task_name=task_name,
  141. data={"code": 0, "message": "get off_videos started background"},
  142. )
  143. case "check_publish_video_audit_status":
  144. async def background_check_publish_video_audit_status():
  145. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client)
  146. print("start processing task status: ")
  147. task_status = await sub_task.deal()
  148. await self.release_task(
  149. task_name=task_name,
  150. date_string=date_string,
  151. final_status=task_status,
  152. )
  153. print("finish task status: ", task_status)
  154. asyncio.create_task(background_check_publish_video_audit_status())
  155. return await task_schedule_response.success_response(
  156. task_name=task_name,
  157. data={
  158. "code": 0,
  159. "message": "check publish video audit status started",
  160. },
  161. )
  162. case "outside_article_monitor":
  163. async def background_outside_article_monitor():
  164. collect_task = OutsideGzhArticlesCollector(self.db_client)
  165. await collect_task.deal()
  166. monitor_task = OutsideGzhArticlesMonitor(self.db_client)
  167. final_status = await monitor_task.deal()
  168. await self.release_task(
  169. task_name, date_string, final_status=final_status
  170. )
  171. asyncio.create_task(background_outside_article_monitor())
  172. return await task_schedule_response.success_response(
  173. task_name=task_name,
  174. data={
  175. "code": 0,
  176. "message": "outside_article_monitor started background",
  177. },
  178. )
  179. case "inner_article_monitor":
  180. async def background_inner_article_monitor():
  181. task = InnerGzhArticlesMonitor(self.db_client)
  182. final_status = await task.deal()
  183. await self.release_task(
  184. task_name, date_string, final_status=final_status
  185. )
  186. asyncio.create_task(background_inner_article_monitor())
  187. return await task_schedule_response.success_response(
  188. task_name=task_name,
  189. data={"code": 0, "message": "task started background"},
  190. )
  191. case "title_rewrite":
  192. async def background_title_rewrite():
  193. sub_task = TitleRewrite(self.db_client, self.log_client)
  194. await sub_task.deal()
  195. await self.release_task(
  196. task_name=task_name, date_string=date_string
  197. )
  198. asyncio.create_task(background_title_rewrite())
  199. return await task_schedule_response.success_response(
  200. task_name=task_name,
  201. data={
  202. "code": 0,
  203. "message": "inner_article_monitor started background",
  204. },
  205. )
  206. case "daily_publish_articles_recycle":
  207. async def background_daily_publish_articles_recycle():
  208. sub_task = RecycleDailyPublishArticlesTask(
  209. self.db_client, self.log_client, date_string
  210. )
  211. await sub_task.deal()
  212. task = CheckDailyPublishArticlesTask(
  213. self.db_client, self.log_client, date_string
  214. )
  215. await task.deal()
  216. await self.release_task(
  217. task_name=task_name, date_string=date_string
  218. )
  219. asyncio.create_task(background_daily_publish_articles_recycle())
  220. return await task_schedule_response.success_response(
  221. task_name=task_name,
  222. data={"code": 0, "message": "task started background"},
  223. )
  224. case "update_root_source_id":
  225. async def background_update_root_source_id():
  226. sub_task = UpdateRootSourceIdAndUpdateTimeTask(
  227. self.db_client, self.log_client
  228. )
  229. await sub_task.deal()
  230. await self.release_task(
  231. task_name=task_name, date_string=date_string
  232. )
  233. asyncio.create_task(background_update_root_source_id())
  234. return await task_schedule_response.success_response(
  235. task_name=task_name,
  236. data={"code": 0, "message": "task started background"},
  237. )
  238. case "task_processing_monitor":
  239. async def background_task_processing_monitor():
  240. sub_task = TaskProcessingMonitor(self.db_client)
  241. await sub_task.deal()
  242. await self.release_task(
  243. task_name=task_name, date_string=date_string
  244. )
  245. asyncio.create_task(background_task_processing_monitor())
  246. return await task_schedule_response.success_response(
  247. task_name=task_name,
  248. data={"code": 0, "message": "task started background"},
  249. )
  250. case "crawler_toutiao_articles":
  251. async def background_crawler_toutiao_articles():
  252. sub_task = CrawlerToutiao(self.db_client, self.log_client)
  253. await sub_task.crawler_task(
  254. media_type=self.data.get("media_type", "article")
  255. )
  256. await self.release_task(
  257. task_name=task_name, date_string=date_string
  258. )
  259. asyncio.create_task(background_crawler_toutiao_articles())
  260. return await task_schedule_response.success_response(
  261. task_name=task_name,
  262. data={"code": 0, "message": "task started background"},
  263. )
  264. case _:
  265. await self.log_client.log(
  266. contents={
  267. "task": task_name,
  268. "function": "task_scheduler_deal",
  269. "message": "wrong task input",
  270. "status": "success",
  271. "data": self.data,
  272. }
  273. )
  274. await self.release_task(task_name, date_string, self.TASK_FAILED_STATUS)
  275. return await task_schedule_response.fail_response(
  276. error_code="4001", error_message="wrong task name input"
  277. )