task_handler.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. from datetime import datetime
  2. from typing import Callable, Dict, Optional
  3. from app.core.config import GlobalConfigSettings
  4. from app.domains.analysis_task import CrawlerDetailDeal
  5. from app.domains.analysis_task import AccountPositionReadRateAvg
  6. from app.domains.analysis_task import AccountPositionReadAvg
  7. from app.domains.analysis_task import AccountPositionOpenRateAvg
  8. from app.domains.algorithm_tasks import AccountCategoryAnalysis
  9. from app.domains.cold_start_tasks import ArticlePoolColdStart
  10. from app.domains.crawler_tasks import CrawlerToutiao
  11. from app.domains.crawler_tasks import WeixinAccountManager
  12. from app.domains.crawler_tasks import CrawlerGzhAccountArticles
  13. from app.domains.crawler_tasks import CrawlerGzhSearchArticles
  14. from app.domains.data_recycle_tasks import ArticleDetailStat
  15. from app.domains.data_recycle_tasks import RecycleDailyPublishArticlesTask
  16. from app.domains.data_recycle_tasks import RecycleOutsideAccountArticlesTask
  17. from app.domains.data_recycle_tasks import CheckDailyPublishArticlesTask
  18. from app.domains.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
  19. from app.domains.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask
  20. from app.domains.data_recycle_tasks import RecycleMiniProgramDetailTask
  21. from app.domains.data_recycle_tasks import (
  22. UpdateOutsideRootSourceIdAndUpdateTimeTask,
  23. )
  24. from app.domains.llm_tasks import TitleRewrite
  25. from app.domains.llm_tasks import ArticlePoolCategoryGeneration
  26. from app.domains.llm_tasks import CandidateAccountQualityScoreRecognizer
  27. from app.domains.llm_tasks import ExtractTitleFeatures
  28. from app.domains.monitor_tasks import AutoReplyCardsMonitor
  29. from app.domains.monitor_tasks import check_kimi_balance
  30. from app.domains.monitor_tasks import GetOffVideos
  31. from app.domains.monitor_tasks import CheckVideoAuditStatus
  32. from app.domains.monitor_tasks import CooperateAccountsMonitorTask
  33. from app.domains.monitor_tasks import InnerGzhArticlesMonitor
  34. from app.domains.monitor_tasks import OutsideGzhArticlesMonitor
  35. from app.domains.monitor_tasks import OutsideGzhArticlesCollector
  36. from app.domains.monitor_tasks import TaskProcessingMonitor
  37. from app.domains.monitor_tasks import LimitedAccountAnalysisTask
  38. from app.jobs.task_config import TaskStatus
  39. from app.jobs.task_utils import TaskValidationError
  40. _TASK_HANDLER_REGISTRY: Dict[str, Callable] = {}
  41. def register(task_name: str):
  42. def decorator(func):
  43. _TASK_HANDLER_REGISTRY[task_name] = func
  44. return func
  45. return decorator
  46. class TaskHandler:
  47. """任务处理器基类 - 使用装饰器模式自动注册任务"""
  48. # 任务注册表
  49. _handlers = _TASK_HANDLER_REGISTRY
  50. def __init__(
  51. self,
  52. data: dict,
  53. log_service,
  54. db_client,
  55. trace_id: str,
  56. config: GlobalConfigSettings,
  57. ):
  58. self.data = data
  59. self.log_client = log_service
  60. self.db_client = db_client
  61. self.trace_id = trace_id
  62. self.config = config
  63. @classmethod
  64. def get_handler(cls, task_name: str) -> Optional[Callable]:
  65. """获取任务处理器"""
  66. return cls._handlers.get(task_name)
  67. @classmethod
  68. def list_registered_tasks(cls) -> list:
  69. """列出所有已注册的任务"""
  70. return list(cls._handlers.keys())
  71. async def _log_task_event(self, event_type: str, **kwargs):
  72. """统一的任务日志记录"""
  73. log_data = {
  74. "timestamp": datetime.now().isoformat(),
  75. "trace_id": self.trace_id,
  76. "event_type": event_type,
  77. "task": self.data.get("task_name"),
  78. **kwargs,
  79. }
  80. await self.log_client.log(contents=log_data)
  81. # ==================== 监控类任务 ====================
  82. @register("check_kimi_balance")
  83. async def _check_kimi_balance_handler(self) -> int:
  84. """检查 Kimi 余额"""
  85. response = await check_kimi_balance()
  86. await self._log_task_event("kimi_balance_checked", data=response)
  87. return TaskStatus.SUCCESS
  88. @register("get_off_videos")
  89. async def _get_off_videos_task_handler(self) -> int:
  90. """视频下架任务"""
  91. sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
  92. return await sub_task.deal()
  93. @register("check_publish_video_audit_status")
  94. async def _check_video_audit_status_handler(self) -> int:
  95. """检查视频审核状态"""
  96. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
  97. return await sub_task.deal()
  98. @register("task_processing_monitor")
  99. async def _task_processing_monitor_handler(self) -> int:
  100. """任务处理监控"""
  101. sub_task = TaskProcessingMonitor(self.db_client)
  102. await sub_task.deal()
  103. return TaskStatus.SUCCESS
  104. @register("inner_article_monitor")
  105. async def _inner_gzh_articles_monitor_handler(self) -> int:
  106. """内部公众号文章监控"""
  107. sub_task = InnerGzhArticlesMonitor(self.db_client)
  108. return await sub_task.deal()
  109. @register("outside_article_monitor")
  110. async def _outside_monitor_handler(self) -> int:
  111. """外部文章监控"""
  112. collector = OutsideGzhArticlesCollector(self.db_client)
  113. await collector.deal()
  114. monitor = OutsideGzhArticlesMonitor(self.db_client)
  115. return await monitor.deal()
  116. @register("cooperate_accounts_monitor")
  117. async def _cooperate_accounts_monitor_handler(self) -> int:
  118. """合作账号文章监测"""
  119. task = CooperateAccountsMonitorTask(
  120. pool=self.db_client, log_client=self.log_client
  121. )
  122. await task.deal(task_name="save_articles")
  123. return TaskStatus.SUCCESS
  124. @register("cooperate_accounts_detail")
  125. async def _cooperate_accounts_detail_handler(self) -> int:
  126. """合作账号文章详情更新"""
  127. task = CooperateAccountsMonitorTask(
  128. pool=self.db_client, log_client=self.log_client
  129. )
  130. await task.deal(task_name="get_detail")
  131. return TaskStatus.SUCCESS
  132. # ==================== 爬虫类任务 ====================
  133. @register("crawler_toutiao")
  134. async def _crawler_toutiao_handler(self) -> int:
  135. """头条文章/视频抓取"""
  136. sub_task = CrawlerToutiao(
  137. self.db_client, self.log_client, self.trace_id, self.config
  138. )
  139. method = self.data.get("method", "account")
  140. media_type = self.data.get("media_type", "article")
  141. category_list = self.data.get("category_list", [])
  142. match method:
  143. case "account":
  144. await sub_task.crawler_task(media_type=media_type)
  145. case "recommend":
  146. await sub_task.crawl_toutiao_recommend_task(category_list)
  147. case "search":
  148. await sub_task.search_candidate_accounts()
  149. case _:
  150. raise TaskValidationError(f"Unsupported method: {method}")
  151. return TaskStatus.SUCCESS
  152. @register("crawler_gzh_articles")
  153. async def _crawler_gzh_article_handler(self) -> int:
  154. """抓取公众号文章"""
  155. account_method = self.data.get("account_method")
  156. crawl_mode = self.data.get("crawl_mode")
  157. strategy = self.data.get("strategy")
  158. match crawl_mode:
  159. case "account":
  160. task = CrawlerGzhAccountArticles(
  161. self.db_client, self.log_client, self.trace_id, self.config
  162. )
  163. await task.deal(account_method, strategy)
  164. case "search":
  165. task = CrawlerGzhSearchArticles(
  166. self.db_client, self.log_client, self.trace_id, self.config
  167. )
  168. await task.deal(strategy)
  169. case _:
  170. raise TaskValidationError(f"Unsupported crawl mode: {crawl_mode}")
  171. return TaskStatus.SUCCESS
  172. @register("crawler_account_manager")
  173. async def _crawler_account_manager_handler(self) -> int:
  174. """账号管理任务"""
  175. platform = self.data.get("platform", "weixin")
  176. account_id_list = self.data.get("account_id_list")
  177. match platform:
  178. case "weixin":
  179. task = WeixinAccountManager(
  180. self.db_client, self.log_client, self.trace_id
  181. )
  182. await task.deal(platform=platform, account_id_list=account_id_list)
  183. case _:
  184. raise TaskValidationError(f"Unsupported platform: {platform}")
  185. return TaskStatus.SUCCESS
  186. @register("crawler_detail_analysis")
  187. async def _crawler_article_analysis_handler(self) -> int:
  188. """抓取视频/文章详情分析统计"""
  189. task = CrawlerDetailDeal(pool=self.db_client, trace_id=self.trace_id)
  190. await task.deal(params=self.data)
  191. return TaskStatus.SUCCESS
  192. # ==================== 数据回收类任务 ====================
  193. @register("article_detail_stat")
  194. async def _article_detail_stat_handler(self) -> int:
  195. """文章详情统计"""
  196. task = ArticleDetailStat(self.db_client, self.log_client)
  197. await task.deal()
  198. return TaskStatus.SUCCESS
  199. @register("daily_publish_articles_recycle")
  200. async def _recycle_article_data_handler(self) -> int:
  201. """每日发文数据回收"""
  202. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  203. recycle = RecycleDailyPublishArticlesTask(
  204. self.db_client, self.log_client, date_str
  205. )
  206. await recycle.deal()
  207. check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
  208. await check.deal()
  209. return TaskStatus.SUCCESS
  210. @register("update_root_source_id")
  211. async def _update_root_source_id_handler(self) -> int:
  212. """更新 root_source_id"""
  213. sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
  214. await sub_task.deal()
  215. return TaskStatus.SUCCESS
  216. @register("recycle_outside_account_articles")
  217. async def _recycle_outside_account_article_handler(self) -> int:
  218. """回收外部账号文章"""
  219. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  220. task = RecycleOutsideAccountArticlesTask(
  221. pool=self.db_client, log_client=self.log_client, date_string=date_str
  222. )
  223. await task.deal()
  224. return TaskStatus.SUCCESS
  225. @register("update_outside_account_article_root_source_id")
  226. async def _update_outside_account_article_root_source_id_handler(self) -> int:
  227. """更新外部账号文章的 root_source_id"""
  228. task = UpdateOutsideRootSourceIdAndUpdateTimeTask(
  229. pool=self.db_client, log_client=self.log_client
  230. )
  231. await task.deal()
  232. return TaskStatus.SUCCESS
  233. @register("fwh_daily_recycle")
  234. async def _recycle_fwh_article_handler(self) -> int:
  235. """回收服务号文章"""
  236. task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
  237. await task.deal()
  238. return TaskStatus.SUCCESS
  239. # ==================== 算法类任务 ====================
  240. @register("article_pool_cold_start")
  241. async def _article_pool_cold_start_handler(self) -> int:
  242. """文章池冷启动"""
  243. cold_start = ArticlePoolColdStart(
  244. self.db_client, self.log_client, self.trace_id, self.config
  245. )
  246. platform = self.data.get("platform", "weixin")
  247. crawler_methods = self.data.get("crawler_methods", [])
  248. category_list = self.data.get("category_list", [])
  249. strategy = self.data.get("strategy", "strategy_v1")
  250. await cold_start.deal(
  251. platform=platform,
  252. crawl_methods=crawler_methods,
  253. category_list=category_list,
  254. strategy=strategy,
  255. )
  256. return TaskStatus.SUCCESS
  257. @register("candidate_account_quality_analysis")
  258. async def _candidate_account_quality_score_handler(self) -> int:
  259. """候选账号质量分析"""
  260. task = CandidateAccountQualityScoreRecognizer(
  261. self.db_client, self.log_client, self.trace_id
  262. )
  263. await task.deal()
  264. return TaskStatus.SUCCESS
  265. @register("article_pool_category_generation")
  266. async def _article_pool_category_generation_handler(self) -> int:
  267. """文章池品类生成"""
  268. task = ArticlePoolCategoryGeneration(
  269. self.db_client, self.log_client, self.trace_id
  270. )
  271. limit_num = self.data.get("limit")
  272. await task.deal(limit=limit_num)
  273. return TaskStatus.SUCCESS
  274. @register("account_category_analysis")
  275. async def _account_category_analysis_handler(self) -> int:
  276. """账号品类分析"""
  277. task = AccountCategoryAnalysis(
  278. pool=self.db_client,
  279. log_client=self.log_client,
  280. trace_id=self.trace_id,
  281. data=self.data,
  282. date_string=None,
  283. )
  284. await task.deal()
  285. return TaskStatus.SUCCESS
  286. @register("update_limited_account_info")
  287. async def _update_limited_account_info_handler(self) -> int:
  288. """更新限流账号信息"""
  289. task = LimitedAccountAnalysisTask(
  290. pool=self.db_client, log_client=self.log_client
  291. )
  292. await task.deal(date_string=self.data.get("date_string"))
  293. return TaskStatus.SUCCESS
  294. # ==================== LLM 类任务 ====================
  295. @register("title_rewrite")
  296. async def _title_rewrite_handler(self) -> int:
  297. """标题重写"""
  298. sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id)
  299. return await sub_task.deal()
  300. @register("extract_title_features")
  301. async def _extract_title_features_handler(self) -> int:
  302. """提取标题特征"""
  303. task = ExtractTitleFeatures(self.db_client, self.log_client, self.trace_id)
  304. await task.deal(data=self.data)
  305. return TaskStatus.SUCCESS
  306. # ==================== 统计分析类任务 ====================
  307. @register("update_account_read_rate_avg")
  308. async def _update_account_read_rate_avg_handler(self) -> int:
  309. """更新账号阅读率均值"""
  310. task = AccountPositionReadRateAvg(
  311. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  312. )
  313. await task.deal(end_date=self.data.get("end_date"))
  314. return TaskStatus.SUCCESS
  315. @register("update_account_read_avg")
  316. async def _update_account_read_avg_handler(self) -> int:
  317. """更新账号阅读均值"""
  318. task = AccountPositionReadAvg(
  319. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  320. )
  321. await task.deal(end_date=self.data.get("end_date"))
  322. return TaskStatus.SUCCESS
  323. @register("update_account_open_rate_avg")
  324. async def _update_account_open_rate_avg_handler(self) -> int:
  325. """更新账号打开率均值"""
  326. task = AccountPositionOpenRateAvg(
  327. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  328. )
  329. await task.deal(date_string=self.data.get("date_string"))
  330. return TaskStatus.SUCCESS
  331. # ==================== 自动化任务 ====================
  332. @register("auto_follow_account")
  333. async def _auto_follow_account_handler(self) -> int:
  334. """自动关注公众号"""
  335. task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
  336. await task.deal(task_name="follow_gzh_task")
  337. return TaskStatus.SUCCESS
  338. @register("get_follow_result")
  339. async def _get_follow_result_handler(self) -> int:
  340. """获取自动关注回复"""
  341. task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
  342. await task.deal(task_name="get_auto_reply_task")
  343. return TaskStatus.SUCCESS
  344. @register("extract_reply_result")
  345. async def _extract_reply_result_handler(self) -> int:
  346. """解析自动回复结果"""
  347. task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
  348. await task.deal(task_name="extract_task")
  349. return TaskStatus.SUCCESS
  350. # ==================== 其他任务 ====================
  351. @register("mini_program_detail_process")
  352. async def _mini_program_detail_handler(self) -> int:
  353. """更新小程序裂变信息"""
  354. task = RecycleMiniProgramDetailTask(
  355. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  356. )
  357. await task.deal(params=self.data)
  358. return TaskStatus.SUCCESS
  359. __all__ = ["TaskHandler"]