task_scheduler.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. import asyncio
  2. import json
  3. import time
  4. import traceback
  5. from datetime import datetime, timedelta
  6. from typing import Awaitable, Callable, Dict
  7. from applications.api import feishu_robot
  8. from applications.utils import task_schedule_response
  9. from applications.tasks.task_handler import TaskHandler
  10. class TaskScheduler(TaskHandler):
  11. """统一调度入口:外部只需调用 `await TaskScheduler(data, log_cli, db_cli).deal()`"""
  12. # ---------- 初始化 ----------
  13. def __init__(self, data, log_service, db_client, trace_id):
  14. super().__init__(data, log_service, db_client, trace_id)
  15. self.data = data
  16. self.log_client = log_service
  17. self.db_client = db_client
  18. self.table = "long_articles_task_manager"
  19. self.trace_id = trace_id
  20. # ---------- 公共数据库工具 ----------
  21. async def _insert_or_ignore_task(self, task_name: str, date_str: str) -> None:
  22. """新建记录(若同键已存在则忽略)"""
  23. query = (
  24. f"insert ignore into {self.table} "
  25. "(date_string, task_name, start_timestamp, task_status, trace_id, data) "
  26. "values (%s, %s, %s, %s, %s, %s);"
  27. )
  28. await self.db_client.async_save(
  29. query=query,
  30. params=(
  31. date_str,
  32. task_name,
  33. int(time.time()),
  34. self.TASK_INIT_STATUS,
  35. self.trace_id,
  36. json.dumps(self.data, ensure_ascii=False),
  37. ),
  38. )
  39. async def _try_lock_task(self) -> bool:
  40. """一次 UPDATE 抢锁;返回 True 表示成功上锁"""
  41. query = (
  42. f"update {self.table} "
  43. "set task_status = %s "
  44. "where trace_id = %s and task_status = %s;"
  45. )
  46. res = await self.db_client.async_save(
  47. query=query,
  48. params=(
  49. self.TASK_PROCESSING_STATUS,
  50. self.trace_id,
  51. self.TASK_INIT_STATUS,
  52. ),
  53. )
  54. return True if res else False
  55. async def _release_task(self, status: int) -> None:
  56. query = (
  57. f"update {self.table} set task_status=%s, finish_timestamp=%s "
  58. "where trace_id=%s and task_status=%s;"
  59. )
  60. await self.db_client.async_save(
  61. query=query,
  62. params=(
  63. status,
  64. int(time.time()),
  65. self.trace_id,
  66. self.TASK_PROCESSING_STATUS,
  67. ),
  68. )
  69. async def _is_processing_overtime(self, task_name) -> bool:
  70. """检测在处理任务是否超时,或者超过最大并行数,若超时会发飞书告警"""
  71. query = f"select trace_id from {self.table} where task_status = %s and task_name = %s;"
  72. rows = await self.db_client.async_fetch(
  73. query=query, params=(self.TASK_PROCESSING_STATUS, task_name)
  74. )
  75. if not rows:
  76. return False
  77. processing_task_num = len(rows)
  78. if processing_task_num >= self.get_task_config(task_name).get(
  79. "task_max_num", self.TASK_MAX_NUM
  80. ):
  81. await feishu_robot.bot(
  82. title=f"multi {task_name} is processing ",
  83. detail={"detail": rows},
  84. )
  85. return True
  86. return False
  87. async def _run_with_guard(
  88. self, task_name: str, date_str: str, task_coro: Callable[[], Awaitable[int]]
  89. ):
  90. """公共:检查、建记录、抢锁、后台运行"""
  91. # 1. 超时检测
  92. if await self._is_processing_overtime(task_name):
  93. return await task_schedule_response.fail_response(
  94. "5005", "muti tasks with same task_name is processing"
  95. )
  96. # 2. 记录并尝试抢锁
  97. await self._insert_or_ignore_task(task_name, date_str)
  98. if not await self._try_lock_task():
  99. return await task_schedule_response.fail_response(
  100. "5001", "task is processing"
  101. )
  102. # 3. 真正执行任务 —— 使用后台协程保证不阻塞调度入口
  103. async def _wrapper():
  104. status = self.TASK_FAILED_STATUS
  105. try:
  106. status = await task_coro()
  107. except Exception as e:
  108. await self.log_client.log(
  109. contents={
  110. "trace_id": self.trace_id,
  111. "function": "cor_wrapper",
  112. "task": task_name,
  113. "error": str(e),
  114. }
  115. )
  116. await feishu_robot.bot(
  117. title=f"{task_name} is failed",
  118. detail={
  119. "task": task_name,
  120. "err": str(e),
  121. "traceback": traceback.format_exc(),
  122. },
  123. )
  124. finally:
  125. await self._release_task(status)
  126. asyncio.create_task(_wrapper(), name=task_name)
  127. return await task_schedule_response.success_response(
  128. task_name=task_name,
  129. data={"code": 0, "message": "task started", "trace_id": self.trace_id},
  130. )
  131. # ---------- 主入口 ----------
  132. async def deal(self):
  133. task_name: str | None = self.data.get("task_name")
  134. if not task_name:
  135. return await task_schedule_response.fail_response(
  136. "4003", "task_name must be input"
  137. )
  138. date_str = self.data.get("date_string") or (
  139. datetime.utcnow() + timedelta(hours=8)
  140. ).strftime("%Y-%m-%d")
  141. # === 所有任务在此注册:映射到一个返回 int 状态码的异步函数 ===
  142. handlers: Dict[str, Callable[[], Awaitable[int]]] = {
  143. # 校验kimi余额
  144. "check_kimi_balance": self._check_kimi_balance_handler,
  145. # 长文视频发布之后,三天后下架
  146. "get_off_videos": self._get_off_videos_task_handler,
  147. # 长文视频发布之后,三天内保持视频可见状态
  148. "check_publish_video_audit_status": self._check_video_audit_status_handler,
  149. # 外部服务号发文监测
  150. "outside_article_monitor": self._outside_monitor_handler,
  151. # 站内发文监测
  152. "inner_article_monitor": self._inner_gzh_articles_monitor_handler,
  153. # 标题重写(代测试)
  154. "title_rewrite": self._title_rewrite_handler,
  155. # 每日发文数据回收
  156. "daily_publish_articles_recycle": self._recycle_article_data_handler,
  157. # 每日发文更新root_source_id
  158. "update_root_source_id": self._update_root_source_id_handler,
  159. # 头条文章,视频抓取
  160. "crawler_toutiao": self._crawler_toutiao_handler,
  161. # 文章池冷启动发布
  162. "article_pool_cold_start": self._article_pool_cold_start_handler,
  163. # 任务超时监控
  164. "task_processing_monitor": self._task_processing_monitor_handler,
  165. # 候选账号质量分析
  166. "candidate_account_quality_analysis": self._candidate_account_quality_score_handler,
  167. # 文章内容池--标题品类处理
  168. "article_pool_category_generation": self._article_pool_category_generation_handler,
  169. # 抓取账号管理
  170. "crawler_account_manager": self._crawler_account_manager_handler,
  171. # 微信公众号文章抓取
  172. "crawler_gzh_articles": self._crawler_gzh_article_handler,
  173. # 服务号发文回收
  174. "fwh_daily_recycle": self._recycle_fwh_article_handler,
  175. # 发文账号品类分析
  176. "account_category_analysis": self._account_category_analysis_handler,
  177. # 抓取 文章/视频 数量分析
  178. "crawler_detail_analysis": self._crawler_article_analysis_handler,
  179. # 小程序裂变信息处理
  180. "mini_program_detail_process": self._mini_program_detail_handler,
  181. }
  182. if task_name not in handlers:
  183. return await task_schedule_response.fail_response(
  184. "4001", "wrong task name input"
  185. )
  186. return await self._run_with_guard(task_name, date_str, handlers[task_name])
  187. __all__ = ["TaskScheduler"]