task_scheduler_v2.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. import asyncio
  2. import time
  3. import traceback
  4. from datetime import datetime
  5. from typing import Awaitable, Callable, Dict
  6. from applications.api import feishu_robot
  7. from applications.utils import task_schedule_response, generate_task_trace_id
  8. from applications.tasks.cold_start_tasks import ArticlePoolColdStart
  9. from applications.tasks.crawler_tasks import CrawlerToutiao
  10. from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
  11. from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
  12. from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
  13. from applications.tasks.llm_tasks import TitleRewrite
  14. from applications.tasks.monitor_tasks import check_kimi_balance
  15. from applications.tasks.monitor_tasks import GetOffVideos
  16. from applications.tasks.monitor_tasks import CheckVideoAuditStatus
  17. from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
  18. from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
  19. from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
  20. from applications.tasks.monitor_tasks import TaskProcessingMonitor
  21. from applications.tasks.task_mapper import TaskMapper
  22. class TaskScheduler(TaskMapper):
  23. """统一调度入口:外部只需调用 `await TaskScheduler(data, log_cli, db_cli).deal()`"""
  24. # ---------- 初始化 ----------
  25. def __init__(self, data, log_service, db_client):
  26. self.data = data
  27. self.log_client = log_service
  28. self.db_client = db_client
  29. self.table = "long_articles_task_manager"
  30. self.trace_id = generate_task_trace_id()
  31. # ---------- 公共数据库工具 ----------
  32. async def _insert_or_ignore_task(self, task_name: str, date_str: str) -> None:
  33. """新建记录(若同键已存在则忽略)"""
  34. query = (
  35. f"insert ignore into {self.table} "
  36. "(date_string, task_name, start_timestamp, task_status, trace_id) "
  37. "values (%s, %s, %s, %s, %s);"
  38. )
  39. await self.db_client.async_save(
  40. query=query,
  41. params=(
  42. date_str,
  43. task_name,
  44. int(time.time()),
  45. self.TASK_INIT_STATUS,
  46. self.trace_id,
  47. ),
  48. )
  49. async def _try_lock_task(self, task_name: str, date_str: str) -> bool:
  50. """一次 UPDATE 抢锁;返回 True 表示成功上锁"""
  51. query = (
  52. f"update {self.table} "
  53. "set task_status = %s "
  54. "where task_name = %s and date_string = %s and task_status = %s;"
  55. )
  56. res = await self.db_client.async_save(
  57. query=query,
  58. params=(
  59. self.TASK_PROCESSING_STATUS,
  60. task_name,
  61. date_str,
  62. self.TASK_INIT_STATUS,
  63. ),
  64. )
  65. return True if res else False
  66. async def _release_task(self, task_name: str, date_str: str, status: int) -> None:
  67. query = (
  68. f"update {self.table} set task_status=%s, finish_timestamp=%s "
  69. "where task_name=%s and date_string=%s and task_status=%s;"
  70. )
  71. await self.db_client.async_save(
  72. query=query,
  73. params=(
  74. status,
  75. int(time.time()),
  76. task_name,
  77. date_str,
  78. self.TASK_PROCESSING_STATUS,
  79. ),
  80. )
  81. async def _is_processing_overtime(self, task_name: str) -> bool:
  82. """检测是否已有同名任务在执行且超时。若超时会发飞书告警"""
  83. query = f"select start_timestamp from {self.table} where task_name=%s and task_status=%s"
  84. rows = await self.db_client.async_fetch(
  85. query=query, params=(task_name, self.TASK_PROCESSING_STATUS)
  86. )
  87. if not rows:
  88. return False
  89. start_ts = rows[0]["start_timestamp"]
  90. if int(time.time()) - start_ts >= self.get_task_config(task_name).get(
  91. "expire_duration", self.DEFAULT_TIMEOUT
  92. ):
  93. await feishu_robot.bot(
  94. title=f"{task_name} is overtime",
  95. detail={"start_ts": start_ts},
  96. )
  97. return True
  98. async def _run_with_guard(
  99. self, task_name: str, date_str: str, task_coro: Callable[[], Awaitable[int]]
  100. ):
  101. """公共:检查、建记录、抢锁、后台运行"""
  102. # 1. 超时检测(若有正在执行的同名任务则拒绝)
  103. if await self._is_processing_overtime(task_name):
  104. return await task_schedule_response.fail_response(
  105. "5001", "task is processing"
  106. )
  107. # 2. 记录并尝试抢锁
  108. await self._insert_or_ignore_task(task_name, date_str)
  109. if not await self._try_lock_task(task_name, date_str):
  110. return await task_schedule_response.fail_response(
  111. "5001", "task is processing"
  112. )
  113. # 3. 真正执行任务 —— 使用后台协程保证不阻塞调度入口
  114. async def _wrapper():
  115. status = self.TASK_FAILED_STATUS
  116. try:
  117. status = await task_coro()
  118. except Exception as e:
  119. await self.log_client.log(
  120. contents={
  121. "trace_id": self.trace_id,
  122. "function": "cor_wrapper",
  123. "task": task_name,
  124. "error": str(e),
  125. }
  126. )
  127. await feishu_robot.bot(
  128. title=f"{task_name} is failed",
  129. detail={"task": task_name, "err": str(e), "traceback": traceback.format_exc()},
  130. )
  131. finally:
  132. await self._release_task(task_name, date_str, status)
  133. asyncio.create_task(_wrapper(), name=task_name)
  134. return await task_schedule_response.success_response(
  135. task_name=task_name, data={"code": 0, "message": "task started", "trace_id": self.trace_id}
  136. )
  137. # ---------- 主入口 ----------
  138. async def deal(self):
  139. task_name: str | None = self.data.get("task_name")
  140. if not task_name:
  141. return await task_schedule_response.fail_response(
  142. "4002", "task_name must be input"
  143. )
  144. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  145. # === 所有任务在此注册:映射到一个返回 int 状态码的异步函数 ===
  146. handlers: Dict[str, Callable[[], Awaitable[int]]] = {
  147. # 校验kimi余额
  148. "check_kimi_balance": self._check_kimi_balance_handler,
  149. # 长文视频发布之后,三天后下架
  150. "get_off_videos": self._get_off_videos_task_handler,
  151. # 长文视频发布之后,三天内保持视频可见状态
  152. "check_publish_video_audit_status": self._check_video_audit_status_handler,
  153. # 外部服务号发文监测
  154. "outside_article_monitor": self._outside_monitor_handler,
  155. # 站内发文监测
  156. "inner_article_monitor": self._inner_gzh_articles_monitor_handler,
  157. # 标题重写(代测试)
  158. "title_rewrite": self._title_rewrite_handler,
  159. # 每日发文数据回收
  160. "daily_publish_articles_recycle": self._recycle_article_data_handler,
  161. # 每日发文更新root_source_id
  162. "update_root_source_id": self._update_root_source_id_handler,
  163. # 头条文章,视频抓取
  164. "crawler_toutiao_articles": self._crawler_toutiao_handler,
  165. # 文章池冷启动发布
  166. "article_pool_pool_cold_start": self._article_pool_cold_start_handler,
  167. # 任务超时监控
  168. "task_processing_monitor": self._task_processing_monitor_handler,
  169. }
  170. if task_name not in handlers:
  171. return await task_schedule_response.fail_response(
  172. "4001", "wrong task name input"
  173. )
  174. return await self._run_with_guard(task_name, date_str, handlers[task_name])
  175. # ---------- 下面是若干复合任务的局部实现 ----------
  176. async def _check_kimi_balance_handler(self) -> int:
  177. response = await check_kimi_balance()
  178. await self.log_client.log(
  179. contents={
  180. "trace_id": self.trace_id,
  181. "task": "check_kimi_balance",
  182. "data": response,
  183. }
  184. )
  185. return self.TASK_SUCCESS_STATUS
  186. async def _get_off_videos_task_handler(self) -> int:
  187. sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
  188. return await sub_task.deal()
  189. async def _check_video_audit_status_handler(self) -> int:
  190. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
  191. return await sub_task.deal()
  192. async def _task_processing_monitor_handler(self) -> int:
  193. sub_task = TaskProcessingMonitor(self.db_client)
  194. await sub_task.deal()
  195. return self.TASK_SUCCESS_STATUS
  196. async def _inner_gzh_articles_monitor_handler(self) -> int:
  197. sub_task = InnerGzhArticlesMonitor(self.db_client)
  198. return await sub_task.deal()
  199. async def _title_rewrite_handler(self):
  200. sub_task = TitleRewrite(self.db_client, self.log_client)
  201. return await sub_task.deal()
  202. async def _update_root_source_id_handler(self) -> int:
  203. sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
  204. await sub_task.deal()
  205. return self.TASK_SUCCESS_STATUS
  206. async def _outside_monitor_handler(self) -> int:
  207. collector = OutsideGzhArticlesCollector(self.db_client)
  208. await collector.deal()
  209. monitor = OutsideGzhArticlesMonitor(self.db_client)
  210. return await monitor.deal() # 应返回 SUCCESS / FAILED
  211. async def _recycle_article_data_handler(self) -> int:
  212. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  213. recycle = RecycleDailyPublishArticlesTask(
  214. self.db_client, self.log_client, date_str
  215. )
  216. await recycle.deal()
  217. check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
  218. await check.deal()
  219. return self.TASK_SUCCESS_STATUS
  220. async def _crawler_toutiao_handler(self) -> int:
  221. sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
  222. media_type = self.data.get("media_type", "article")
  223. method = self.data.get("method", "account")
  224. category_list = self.data.get("category_list", [])
  225. if method == "account":
  226. await sub_task.crawler_task(media_type=media_type)
  227. elif method == "recommend":
  228. await sub_task.crawl_toutiao_recommend_task(category_list)
  229. else:
  230. raise ValueError(f"Unsupported method {method}")
  231. return self.TASK_SUCCESS_STATUS
  232. async def _article_pool_cold_start_handler(self) -> int:
  233. cold_start = ArticlePoolColdStart(
  234. self.db_client, self.log_client, self.trace_id
  235. )
  236. platform = self.data.get("platform", "weixin")
  237. crawler_methods = self.data.get("crawler_methods", [])
  238. await cold_start.deal(platform=platform, crawl_methods=crawler_methods)
  239. return self.TASK_SUCCESS_STATUS