task_scheduler.py 8.1 KB


  1. import asyncio
  2. import json
  3. import time
  4. import traceback
  5. from datetime import datetime, timedelta
  6. from typing import Awaitable, Callable, Dict
  7. from applications.api import feishu_robot
  8. from applications.utils import task_schedule_response
  9. from applications.tasks.task_handler import TaskHandler
  10. class TaskScheduler(TaskHandler):
  11. """统一调度入口:外部只需调用 `await TaskScheduler(data, log_cli, db_cli).deal()`"""
  12. # ---------- 初始化 ----------
  13. def __init__(self, data, log_service, db_client, trace_id):
  14. super().__init__(data, log_service, db_client, trace_id)
  15. self.data = data
  16. self.log_client = log_service
  17. self.db_client = db_client
  18. self.table = "long_articles_task_manager"
  19. self.trace_id = trace_id
  20. # ---------- 公共数据库工具 ----------
  21. async def _insert_or_ignore_task(self, task_name: str, date_str: str) -> None:
  22. """新建记录(若同键已存在则忽略)"""
  23. query = (
  24. f"insert ignore into {self.table} "
  25. "(date_string, task_name, start_timestamp, task_status, trace_id, data) "
  26. "values (%s, %s, %s, %s, %s, %s);"
  27. )
  28. await self.db_client.async_save(
  29. query=query,
  30. params=(
  31. date_str,
  32. task_name,
  33. int(time.time()),
  34. self.TASK_INIT_STATUS,
  35. self.trace_id,
  36. json.dumps(self.data, ensure_ascii=False),
  37. ),
  38. )
  39. async def _try_lock_task(self) -> bool:
  40. """一次 UPDATE 抢锁;返回 True 表示成功上锁"""
  41. query = (
  42. f"update {self.table} "
  43. "set task_status = %s "
  44. "where trace_id = %s and task_status = %s;"
  45. )
  46. res = await self.db_client.async_save(
  47. query=query,
  48. params=(
  49. self.TASK_PROCESSING_STATUS,
  50. self.trace_id,
  51. self.TASK_INIT_STATUS,
  52. ),
  53. )
  54. return True if res else False
  55. async def _release_task(self, status: int) -> None:
  56. query = (
  57. f"update {self.table} set task_status=%s, finish_timestamp=%s "
  58. "where trace_id=%s and task_status=%s;"
  59. )
  60. await self.db_client.async_save(
  61. query=query,
  62. params=(
  63. status,
  64. int(time.time()),
  65. self.trace_id,
  66. self.TASK_PROCESSING_STATUS,
  67. ),
  68. )
  69. async def _is_processing_overtime(self, task_name) -> bool:
  70. """检测在处理任务是否超时,或者超过最大并行数,若超时会发飞书告警"""
  71. query = f"select trace_id from {self.table} where task_status = %s and task_name = %s;"
  72. rows = await self.db_client.async_fetch(
  73. query=query, params=(self.TASK_PROCESSING_STATUS, task_name)
  74. )
  75. if not rows:
  76. return False
  77. processing_task_num = len(rows)
  78. if processing_task_num >= self.get_task_config(task_name).get(
  79. "task_max_num", self.TASK_MAX_NUM
  80. ):
  81. await feishu_robot.bot(
  82. title=f"multi {task_name} is processing ",
  83. detail={"detail": rows},
  84. )
  85. return True
  86. return False
  87. async def _run_with_guard(
  88. self, task_name: str, date_str: str, task_coro: Callable[[], Awaitable[int]]
  89. ):
  90. """公共:检查、建记录、抢锁、后台运行"""
  91. # 1. 超时检测
  92. if await self._is_processing_overtime(task_name):
  93. return await task_schedule_response.fail_response(
  94. "5005", "muti tasks with same task_name is processing"
  95. )
  96. # 2. 记录并尝试抢锁
  97. await self._insert_or_ignore_task(task_name, date_str)
  98. if not await self._try_lock_task():
  99. return await task_schedule_response.fail_response(
  100. "5001", "task is processing"
  101. )
  102. # 3. 真正执行任务 —— 使用后台协程保证不阻塞调度入口
  103. async def _wrapper():
  104. status = self.TASK_FAILED_STATUS
  105. try:
  106. status = await task_coro()
  107. except Exception as e:
  108. await self.log_client.log(
  109. contents={
  110. "trace_id": self.trace_id,
  111. "function": "cor_wrapper",
  112. "task": task_name,
  113. "error": str(e),
  114. }
  115. )
  116. await feishu_robot.bot(
  117. title=f"{task_name} is failed",
  118. detail={
  119. "task": task_name,
  120. "err": str(e),
  121. "traceback": traceback.format_exc(),
  122. },
  123. )
  124. finally:
  125. await self._release_task(status)
  126. asyncio.create_task(_wrapper(), name=task_name)
  127. return await task_schedule_response.success_response(
  128. task_name=task_name,
  129. data={"code": 0, "message": "task started", "trace_id": self.trace_id},
  130. )
  131. # ---------- 主入口 ----------
  132. async def deal(self):
  133. task_name: str | None = self.data.get("task_name")
  134. if not task_name:
  135. return await task_schedule_response.fail_response(
  136. "4003", "task_name must be input"
  137. )
  138. date_str = self.data.get("date_string") or (
  139. datetime.utcnow() + timedelta(hours=8)
  140. ).strftime("%Y-%m-%d")
  141. # === 所有任务在此注册:映射到一个返回 int 状态码的异步函数 ===
  142. handlers: Dict[str, Callable[[], Awaitable[int]]] = {
  143. # 校验kimi余额
  144. "check_kimi_balance": self._check_kimi_balance_handler,
  145. # 长文视频发布之后,三天后下架
  146. "get_off_videos": self._get_off_videos_task_handler,
  147. # 长文视频发布之后,三天内保持视频可见状态
  148. "check_publish_video_audit_status": self._check_video_audit_status_handler,
  149. # 外部服务号发文监测
  150. "outside_article_monitor": self._outside_monitor_handler,
  151. # 站内发文监测
  152. "inner_article_monitor": self._inner_gzh_articles_monitor_handler,
  153. # 标题重写(代测试)
  154. "title_rewrite": self._title_rewrite_handler,
  155. # 每日发文数据回收
  156. "daily_publish_articles_recycle": self._recycle_article_data_handler,
  157. # 每日发文更新root_source_id
  158. "update_root_source_id": self._update_root_source_id_handler,
  159. # 头条文章,视频抓取
  160. "crawler_toutiao": self._crawler_toutiao_handler,
  161. # 文章池冷启动发布
  162. "article_pool_cold_start": self._article_pool_cold_start_handler,
  163. # 任务超时监控
  164. "task_processing_monitor": self._task_processing_monitor_handler,
  165. # 候选账号质量分析
  166. "candidate_account_quality_analysis": self._candidate_account_quality_score_handler,
  167. # 文章内容池--标题品类处理
  168. "article_pool_category_generation": self._article_pool_category_generation_handler,
  169. # 抓取账号管理
  170. "crawler_account_manager": self._crawler_account_manager_handler,
  171. # 微信公众号文章抓取
  172. "crawler_gzh_articles": self._crawler_gzh_article_handler,
  173. # 服务号发文回收
  174. "fwh_daily_recycle": self._recycle_fwh_article_handler,
  175. # 发文账号品类分析
  176. "account_category_analysis": self._account_category_analysis_handler,
  177. # 抓取 文章/视频 数量分析
  178. "crawler_detail_analysis": self._crawler_article_analysis_handler,
  179. # 小程序裂变信息处理
  180. "mini_program_detail_process": self._mini_program_detail_handler,
  181. }
  182. if task_name not in handlers:
  183. return await task_schedule_response.fail_response(
  184. "4001", "wrong task name input"
  185. )
  186. return await self._run_with_guard(task_name, date_str, handlers[task_name])
  187. __all__ = ["TaskScheduler"]