task_scheduler.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. import asyncio
  2. import time
  3. from datetime import datetime
  4. from applications.api import feishu_robot
  5. from applications.utils import task_schedule_response
  6. from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
  7. from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
  8. from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
  9. from applications.tasks.llm_tasks import TitleRewrite
  10. from applications.tasks.monitor_tasks import check_kimi_balance
  11. from applications.tasks.monitor_tasks import GetOffVideos
  12. from applications.tasks.monitor_tasks import CheckVideoAuditStatus
  13. from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
  14. from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
  15. from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
  16. class TaskScheduler:
  17. def __init__(self, data, log_service, db_client):
  18. self.data = data
  19. self.log_client = log_service
  20. self.db_client = db_client
  21. self.table = "long_articles_task_manager"
  22. async def whether_task_processing(self, task_name: str) -> bool:
  23. """whether task is processing"""
  24. query = f"""
  25. select start_timestamp from {self.table} where task_name = %s and task_status = %s;
  26. """
  27. response, error = await self.db_client.async_fetch(
  28. query=query, params=(task_name, 1)
  29. )
  30. if not response:
  31. # no task is processing
  32. return False
  33. else:
  34. start_timestamp = response[0]["start_timestamp"]
  35. # todo: every task should has a unique expire timestamp, remember to write that in a task config file
  36. if int(time.time()) - start_timestamp >= 86400:
  37. await feishu_robot.bot(
  38. title=f"{task_name} has been processing for more than one day",
  39. detail={"timestamp": start_timestamp},
  40. env="long_articles_task",
  41. )
  42. return True
  43. async def record_task(self, task_name, date_string):
  44. """record task"""
  45. query = f"""insert into {self.table} (date_string, task_name, start_timestamp) values (%s, %s, %s);"""
  46. await self.db_client.async_save(
  47. query=query, params=(date_string, task_name, int(time.time()))
  48. )
  49. async def lock_task(self, task_name, date_string):
  50. query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
  51. return await self.db_client.async_save(
  52. query=query, params=(1, task_name, date_string, 0)
  53. )
  54. async def release_task(self, task_name, date_string, final_status):
  55. """
  56. 任务执行完成之后,将任务状态设置为完成状态/失败状态
  57. """
  58. query = f"""
  59. update {self.table}
  60. set task_status = %s, finish_timestamp = %s
  61. where task_name = %s and date_string = %s and task_status = %s;
  62. """
  63. return await self.db_client.async_save(
  64. query=query,
  65. params=(final_status, int(time.time()), task_name, date_string, 1),
  66. )
  67. async def deal(self):
  68. task_name = self.data.get("task_name")
  69. date_string = self.data.get("date_string")
  70. if not task_name:
  71. await self.log_client.log(
  72. contents={
  73. "task": task_name,
  74. "function": "task_scheduler_deal",
  75. "message": "not task name in params",
  76. "status": "fail",
  77. "data": self.data,
  78. }
  79. )
  80. return await task_schedule_response.fail_response(
  81. error_code="4002", error_message="task_name must be input"
  82. )
  83. if not date_string:
  84. date_string = datetime.today().strftime("%Y-%m-%d")
  85. # prepare for task
  86. if await self.whether_task_processing(task_name):
  87. return await task_schedule_response.fail_response(
  88. error_code="5001", error_message="task is processing"
  89. )
  90. await self.record_task(task_name=task_name, date_string=date_string)
  91. await self.lock_task(task_name, date_string)
  92. match task_name:
  93. case "check_kimi_balance":
  94. response = await check_kimi_balance()
  95. await self.log_client.log(
  96. contents={
  97. "task": task_name,
  98. "function": "task_scheduler_deal",
  99. "message": "check_kimi_balance task execute successfully",
  100. "status": "success",
  101. "data": response,
  102. }
  103. )
  104. await self.release_task(
  105. task_name=task_name,
  106. date_string=date_string,
  107. final_status=response["code"],
  108. )
  109. return await task_schedule_response.success_response(
  110. task_name=task_name, data=response
  111. )
  112. case "get_off_videos":
  113. async def background_get_off_videos():
  114. sub_task = GetOffVideos(self.db_client, self.log_client)
  115. await sub_task.get_off_job()
  116. task_status = await sub_task.check()
  117. await self.release_task(
  118. task_name=task_name,
  119. date_string=date_string,
  120. final_status=task_status,
  121. )
  122. asyncio.create_task(background_get_off_videos())
  123. return await task_schedule_response.success_response(
  124. task_name=task_name,
  125. data={"code": 0, "message": "get off_videos started background"},
  126. )
  127. case "check_publish_video_audit_status":
  128. async def background_check_publish_video_audit_status():
  129. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client)
  130. print("start processing task status: ")
  131. task_status = await sub_task.deal()
  132. await self.release_task(
  133. task_name=task_name,
  134. date_string=date_string,
  135. final_status=task_status,
  136. )
  137. print("finish task status: ", task_status)
  138. asyncio.create_task(background_check_publish_video_audit_status())
  139. return await task_schedule_response.success_response(
  140. task_name=task_name,
  141. data={
  142. "code": 0,
  143. "message": "check publish video audit status started",
  144. },
  145. )
  146. case "outside_article_monitor":
  147. async def background_outside_article_monitor():
  148. collect_task = OutsideGzhArticlesCollector(self.db_client)
  149. await collect_task.deal()
  150. monitor_task = OutsideGzhArticlesMonitor(self.db_client)
  151. final_status = await monitor_task.deal()
  152. await self.release_task(
  153. task_name, date_string, final_status=final_status
  154. )
  155. asyncio.create_task(background_outside_article_monitor())
  156. return await task_schedule_response.success_response(
  157. task_name=task_name,
  158. data={
  159. "code": 0,
  160. "message": "outside_article_monitor started background",
  161. },
  162. )
  163. case "inner_article_monitor":
  164. async def background_inner_article_monitor():
  165. task = InnerGzhArticlesMonitor(self.db_client)
  166. final_status = await task.deal()
  167. await self.release_task(
  168. task_name, date_string, final_status=final_status
  169. )
  170. asyncio.create_task(background_inner_article_monitor())
  171. return await task_schedule_response.success_response(
  172. task_name=task_name,
  173. data={"code": 0, "message": "task started background"},
  174. )
  175. case "title_rewrite":
  176. async def background_title_rewrite():
  177. sub_task = TitleRewrite(self.db_client, self.log_client)
  178. await sub_task.deal()
  179. await self.release_task(
  180. task_name=task_name, date_string=date_string, final_status=2
  181. )
  182. asyncio.create_task(background_title_rewrite())
  183. return await task_schedule_response.success_response(
  184. task_name=task_name,
  185. data={
  186. "code": 0,
  187. "message": "inner_article_monitor started background",
  188. },
  189. )
  190. case "daily_publish_articles_recycle":
  191. async def background_daily_publish_articles_recycle():
  192. sub_task = RecycleDailyPublishArticlesTask(
  193. self.db_client, self.log_client, date_string
  194. )
  195. await sub_task.deal()
  196. task = CheckDailyPublishArticlesTask(
  197. self.db_client, self.log_client, date_string
  198. )
  199. await task.deal()
  200. await self.release_task(
  201. task_name=task_name, date_string=date_string, final_status=2
  202. )
  203. asyncio.create_task(background_daily_publish_articles_recycle())
  204. return await task_schedule_response.success_response(
  205. task_name=task_name,
  206. data={"code": 0, "message": "task started background"},
  207. )
  208. case "update_root_source_id":
  209. async def background_update_root_source_id():
  210. sub_task = UpdateRootSourceIdAndUpdateTimeTask(
  211. self.db_client, self.log_client
  212. )
  213. await sub_task.deal()
  214. await self.release_task(
  215. task_name=task_name, date_string=date_string, final_status=2
  216. )
  217. asyncio.create_task(background_update_root_source_id())
  218. return await task_schedule_response.success_response(
  219. task_name=task_name,
  220. data={"code": 0, "message": "task started background"},
  221. )
  222. case _:
  223. await self.log_client.log(
  224. contents={
  225. "task": task_name,
  226. "function": "task_scheduler_deal",
  227. "message": "wrong task input",
  228. "status": "success",
  229. "data": self.data,
  230. }
  231. )
  232. await self.release_task(task_name, date_string, 99)
  233. return await task_schedule_response.fail_response(
  234. error_code="4001", error_message="wrong task name input"
  235. )