task_scheduler.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. import asyncio
  2. import time
  3. from datetime import datetime
  4. from applications.api import feishu_robot
  5. from applications.utils import task_schedule_response, generate_task_trace_id
  6. from applications.tasks.cold_start_tasks import ArticlePoolColdStart
  7. from applications.tasks.crawler_tasks import CrawlerToutiao
  8. from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
  9. from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
  10. from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
  11. from applications.tasks.llm_tasks import TitleRewrite
  12. from applications.tasks.monitor_tasks import check_kimi_balance
  13. from applications.tasks.monitor_tasks import GetOffVideos
  14. from applications.tasks.monitor_tasks import CheckVideoAuditStatus
  15. from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
  16. from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
  17. from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
  18. from applications.tasks.monitor_tasks import TaskProcessingMonitor
  19. from applications.tasks.task_mapper import TaskMapper
  20. class TaskScheduler(TaskMapper):
  21. def __init__(self, data, log_service, db_client):
  22. self.data = data
  23. self.log_client = log_service
  24. self.db_client = db_client
  25. self.table = "long_articles_task_manager"
  26. self.trace_id = generate_task_trace_id()
  27. async def whether_task_processing(self, task_name: str) -> bool:
  28. """whether task is processing"""
  29. query = f"""
  30. select start_timestamp from {self.table} where task_name = %s and task_status = %s;
  31. """
  32. response = await self.db_client.async_fetch(query=query, params=(task_name, 1))
  33. if not response:
  34. # no task is processing
  35. return False
  36. else:
  37. start_timestamp = response[0]["start_timestamp"]
  38. if int(time.time()) - start_timestamp >= self.get_task_config(
  39. task_name
  40. ).get("expire_duration", self.DEFAULT_TIMEOUT):
  41. await feishu_robot.bot(
  42. title=f"{task_name} has been processing over timeout",
  43. detail={"timestamp": start_timestamp},
  44. env="long_articles_task",
  45. )
  46. return True
  47. async def record_task(self, task_name, date_string):
  48. """record task"""
  49. query = f"""insert into {self.table} (date_string, task_name, start_timestamp, trace_id) values (%s, %s, %s, %s);"""
  50. await self.db_client.async_save(
  51. query=query,
  52. params=(date_string, task_name, int(time.time()), self.trace_id),
  53. )
  54. async def lock_task(self, task_name, date_string):
  55. query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
  56. return await self.db_client.async_save(
  57. query=query,
  58. params=(
  59. self.TASK_PROCESSING_STATUS,
  60. task_name,
  61. date_string,
  62. self.TASK_INIT_STATUS,
  63. ),
  64. )
  65. async def release_task(self, task_name, date_string, final_status=None):
  66. """
  67. 任务执行完成之后,将任务状态设置为完成状态/失败状态
  68. """
  69. if not final_status:
  70. final_status = self.TASK_SUCCESS_STATUS
  71. query = f"""
  72. update {self.table}
  73. set task_status = %s, finish_timestamp = %s
  74. where task_name = %s and date_string = %s and task_status = %s;
  75. """
  76. return await self.db_client.async_save(
  77. query=query,
  78. params=(
  79. final_status,
  80. int(time.time()),
  81. task_name,
  82. date_string,
  83. self.TASK_PROCESSING_STATUS,
  84. ),
  85. )
  86. async def deal(self):
  87. task_name = self.data.get("task_name")
  88. date_string = self.data.get("date_string")
  89. if not task_name:
  90. await self.log_client.log(
  91. contents={
  92. "task": task_name,
  93. "function": "task_scheduler_deal",
  94. "message": "not task name in params",
  95. "status": "fail",
  96. "data": self.data,
  97. }
  98. )
  99. return await task_schedule_response.fail_response(
  100. error_code="4002", error_message="task_name must be input"
  101. )
  102. if not date_string:
  103. date_string = datetime.today().strftime("%Y-%m-%d")
  104. # prepare for task
  105. if await self.whether_task_processing(task_name):
  106. return await task_schedule_response.fail_response(
  107. error_code="5001", error_message="task is processing"
  108. )
  109. await self.log_client.log(
  110. contents={
  111. "trace_id": self.trace_id,
  112. "task": task_name,
  113. "message": "start processing",
  114. "data": self.data,
  115. }
  116. )
  117. await self.record_task(task_name=task_name, date_string=date_string)
  118. await self.lock_task(task_name, date_string)
  119. match task_name:
  120. case "check_kimi_balance":
  121. response = await check_kimi_balance()
  122. await self.log_client.log(
  123. contents={
  124. "task": task_name,
  125. "function": "task_scheduler_deal",
  126. "message": "check_kimi_balance task execute successfully",
  127. "status": "success",
  128. "data": response,
  129. }
  130. )
  131. await self.release_task(
  132. task_name=task_name,
  133. date_string=date_string,
  134. final_status=response["code"],
  135. )
  136. return await task_schedule_response.success_response(
  137. task_name=task_name, data=response
  138. )
  139. case "get_off_videos":
  140. async def background_get_off_videos():
  141. sub_task = GetOffVideos(self.db_client, self.log_client)
  142. await sub_task.get_off_job()
  143. task_status = await sub_task.check()
  144. await self.release_task(
  145. task_name=task_name,
  146. date_string=date_string,
  147. final_status=task_status,
  148. )
  149. asyncio.create_task(background_get_off_videos())
  150. return await task_schedule_response.success_response(
  151. task_name=task_name,
  152. data={"code": 0, "message": "get off_videos started background"},
  153. )
  154. case "check_publish_video_audit_status":
  155. async def background_check_publish_video_audit_status():
  156. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client)
  157. task_status = await sub_task.deal()
  158. await self.release_task(
  159. task_name=task_name,
  160. date_string=date_string,
  161. final_status=task_status,
  162. )
  163. print("finish task status: ", task_status)
  164. asyncio.create_task(background_check_publish_video_audit_status())
  165. return await task_schedule_response.success_response(
  166. task_name=task_name,
  167. data={
  168. "code": 0,
  169. "message": "check publish video audit status started",
  170. },
  171. )
  172. case "outside_article_monitor":
  173. async def background_outside_article_monitor():
  174. collect_task = OutsideGzhArticlesCollector(self.db_client)
  175. await collect_task.deal()
  176. monitor_task = OutsideGzhArticlesMonitor(self.db_client)
  177. final_status = await monitor_task.deal()
  178. await self.release_task(
  179. task_name, date_string, final_status=final_status
  180. )
  181. asyncio.create_task(background_outside_article_monitor())
  182. return await task_schedule_response.success_response(
  183. task_name=task_name,
  184. data={
  185. "code": 0,
  186. "message": "outside_article_monitor started background",
  187. },
  188. )
  189. case "inner_article_monitor":
  190. async def background_inner_article_monitor():
  191. task = InnerGzhArticlesMonitor(self.db_client)
  192. final_status = await task.deal()
  193. await self.release_task(
  194. task_name, date_string, final_status=final_status
  195. )
  196. asyncio.create_task(background_inner_article_monitor())
  197. return await task_schedule_response.success_response(
  198. task_name=task_name,
  199. data={"code": 0, "message": "task started background"},
  200. )
  201. case "title_rewrite":
  202. async def background_title_rewrite():
  203. sub_task = TitleRewrite(self.db_client, self.log_client)
  204. await sub_task.deal()
  205. await self.release_task(
  206. task_name=task_name, date_string=date_string
  207. )
  208. asyncio.create_task(background_title_rewrite())
  209. return await task_schedule_response.success_response(
  210. task_name=task_name,
  211. data={
  212. "code": 0,
  213. "message": "inner_article_monitor started background",
  214. },
  215. )
  216. case "daily_publish_articles_recycle":
  217. async def background_daily_publish_articles_recycle():
  218. sub_task = RecycleDailyPublishArticlesTask(
  219. self.db_client, self.log_client, date_string
  220. )
  221. await sub_task.deal()
  222. task = CheckDailyPublishArticlesTask(
  223. self.db_client, self.log_client, date_string
  224. )
  225. await task.deal()
  226. await self.release_task(
  227. task_name=task_name, date_string=date_string
  228. )
  229. asyncio.create_task(background_daily_publish_articles_recycle())
  230. return await task_schedule_response.success_response(
  231. task_name=task_name,
  232. data={"code": 0, "message": "task started background"},
  233. )
  234. case "update_root_source_id":
  235. async def background_update_root_source_id():
  236. sub_task = UpdateRootSourceIdAndUpdateTimeTask(
  237. self.db_client, self.log_client
  238. )
  239. await sub_task.deal()
  240. await self.release_task(
  241. task_name=task_name, date_string=date_string
  242. )
  243. asyncio.create_task(background_update_root_source_id())
  244. return await task_schedule_response.success_response(
  245. task_name=task_name,
  246. data={"code": 0, "message": "task started background"},
  247. )
  248. case "task_processing_monitor":
  249. async def background_task_processing_monitor():
  250. sub_task = TaskProcessingMonitor(self.db_client)
  251. await sub_task.deal()
  252. await self.release_task(
  253. task_name=task_name, date_string=date_string
  254. )
  255. asyncio.create_task(background_task_processing_monitor())
  256. return await task_schedule_response.success_response(
  257. task_name=task_name,
  258. data={"code": 0, "message": "task started background"},
  259. )
  260. case "crawler_toutiao_articles":
  261. async def background_crawler_toutiao_articles():
  262. sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
  263. media_type = self.data.get("media_type", "article")
  264. method = self.data.get("method", "account")
  265. category_list = self.data.get("category_list", [])
  266. match method:
  267. case "account":
  268. await sub_task.crawler_task(media_type=media_type)
  269. case "recommend":
  270. await sub_task.crawl_toutiao_recommend_task(category_list=category_list)
  271. await self.release_task(
  272. task_name=task_name, date_string=date_string
  273. )
  274. asyncio.create_task(background_crawler_toutiao_articles())
  275. return await task_schedule_response.success_response(
  276. task_name=task_name,
  277. data={"code": 0, "message": "task started background"},
  278. )
  279. case "article_pool_pool_cold_start":
  280. async def background_article_pool_pool_cold_start():
  281. sub_task = ArticlePoolColdStart(
  282. self.db_client, self.log_client, self.trace_id
  283. )
  284. crawler_methods = self.data.get("crawler_methods", [])
  285. platform = self.data.get("platform", "weixin")
  286. await sub_task.deal(
  287. platform=platform, crawl_methods=crawler_methods
  288. )
  289. await self.release_task(
  290. task_name=task_name, date_string=date_string
  291. )
  292. await self.log_client.log(
  293. contents={
  294. "trace_id": self.trace_id,
  295. "task": task_name,
  296. "message": "finish processed",
  297. "data": self.data,
  298. }
  299. )
  300. asyncio.create_task(background_article_pool_pool_cold_start())
  301. return await task_schedule_response.success_response(
  302. task_name=task_name,
  303. data={"code": 0, "message": "task started background"},
  304. )
  305. case _:
  306. await self.log_client.log(
  307. contents={
  308. "task": task_name,
  309. "function": "task_scheduler_deal",
  310. "message": "wrong task input",
  311. "status": "success",
  312. "data": self.data,
  313. }
  314. )
  315. await self.release_task(task_name, date_string, self.TASK_FAILED_STATUS)
  316. return await task_schedule_response.fail_response(
  317. error_code="4001", error_message="wrong task name input"
  318. )