task_handler.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. from datetime import datetime
  2. from typing import Callable, Dict, Optional
  3. from app.core.config import GlobalConfigSettings
  4. from app.domains.analysis_task import CrawlerDetailDeal
  5. from app.domains.analysis_task import AccountPositionReadRateAvg
  6. from app.domains.analysis_task import AccountPositionReadAvg
  7. from app.domains.analysis_task import AccountPositionOpenRateAvg
  8. from app.domains.analysis_task import RateLimitedArticleFilter
  9. from app.domains.algorithm_tasks import AccountCategoryAnalysis
  10. from app.domains.cold_start_tasks import ArticlePoolColdStart
  11. from app.domains.crawler_tasks import CrawlerToutiao
  12. from app.domains.crawler_tasks import WeixinAccountManager
  13. from app.domains.crawler_tasks import CrawlerGzhAccountArticles
  14. from app.domains.crawler_tasks import CrawlerGzhSearchArticles
  15. from app.domains.data_recycle_tasks import ArticleDetailStat
  16. from app.domains.data_recycle_tasks import RecycleDailyPublishArticlesTask
  17. from app.domains.data_recycle_tasks import RecycleOutsideAccountArticlesTask
  18. from app.domains.data_recycle_tasks import CheckDailyPublishArticlesTask
  19. from app.domains.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
  20. from app.domains.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask
  21. from app.domains.data_recycle_tasks import RecycleMiniProgramDetailTask
  22. from app.domains.data_recycle_tasks import (
  23. UpdateOutsideRootSourceIdAndUpdateTimeTask,
  24. )
  25. from app.domains.decode_task import AdPlatformArticlesDecodeTask
  26. from app.domains.llm_tasks import TitleRewrite
  27. from app.domains.llm_tasks import ArticlePoolCategoryGeneration
  28. from app.domains.llm_tasks import CandidateAccountQualityScoreRecognizer
  29. from app.domains.llm_tasks import ExtractTitleFeatures
  30. from app.domains.monitor_tasks import AutoReplyCardsMonitor
  31. from app.domains.monitor_tasks import check_kimi_balance
  32. from app.domains.monitor_tasks import GetOffVideos
  33. from app.domains.monitor_tasks import CheckVideoAuditStatus
  34. from app.domains.monitor_tasks import CooperateAccountsMonitorTask
  35. from app.domains.monitor_tasks import InnerGzhArticlesMonitor
  36. from app.domains.monitor_tasks import OutsideGzhArticlesMonitor
  37. from app.domains.monitor_tasks import OutsideGzhArticlesCollector
  38. from app.domains.monitor_tasks import TaskProcessingMonitor
  39. from app.domains.monitor_tasks import LimitedAccountAnalysisTask
  40. from app.domains.monitor_tasks import AdPlatformAccountsMonitorTask
  41. from app.jobs.task_config import TaskStatus
  42. from app.jobs.task_utils import TaskValidationError
  43. _TASK_HANDLER_REGISTRY: Dict[str, Callable] = {}
  44. def register(task_name: str):
  45. def decorator(func):
  46. _TASK_HANDLER_REGISTRY[task_name] = func
  47. return func
  48. return decorator
  49. class TaskHandler:
  50. """任务处理器基类 - 使用装饰器模式自动注册任务"""
  51. # 任务注册表
  52. _handlers = _TASK_HANDLER_REGISTRY
  53. def __init__(
  54. self,
  55. data: dict,
  56. log_service,
  57. db_client,
  58. trace_id: str,
  59. config: GlobalConfigSettings,
  60. ):
  61. self.data = data
  62. self.log_client = log_service
  63. self.db_client = db_client
  64. self.trace_id = trace_id
  65. self.config = config
  66. @classmethod
  67. def get_handler(cls, task_name: str) -> Optional[Callable]:
  68. """获取任务处理器"""
  69. return cls._handlers.get(task_name)
  70. @classmethod
  71. def list_registered_tasks(cls) -> list:
  72. """列出所有已注册的任务"""
  73. return list(cls._handlers.keys())
  74. async def _log_task_event(self, event_type: str, **kwargs):
  75. """统一的任务日志记录"""
  76. log_data = {
  77. "timestamp": datetime.now().isoformat(),
  78. "trace_id": self.trace_id,
  79. "event_type": event_type,
  80. "task": self.data.get("task_name"),
  81. **kwargs,
  82. }
  83. await self.log_client.log(contents=log_data)
  84. # ==================== 监控类任务 ====================
  85. @register("check_kimi_balance")
  86. async def _check_kimi_balance_handler(self) -> int:
  87. """检查 Kimi 余额"""
  88. response = await check_kimi_balance()
  89. await self._log_task_event("kimi_balance_checked", data=response)
  90. return TaskStatus.SUCCESS
  91. @register("get_off_videos")
  92. async def _get_off_videos_task_handler(self) -> int:
  93. """视频下架任务"""
  94. sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
  95. return await sub_task.deal()
  96. @register("check_publish_video_audit_status")
  97. async def _check_video_audit_status_handler(self) -> int:
  98. """检查视频审核状态"""
  99. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
  100. return await sub_task.deal()
  101. @register("task_processing_monitor")
  102. async def _task_processing_monitor_handler(self) -> int:
  103. """任务处理监控"""
  104. sub_task = TaskProcessingMonitor(self.db_client)
  105. await sub_task.deal()
  106. return TaskStatus.SUCCESS
  107. @register("inner_article_monitor")
  108. async def _inner_gzh_articles_monitor_handler(self) -> int:
  109. """内部公众号文章监控"""
  110. sub_task = InnerGzhArticlesMonitor(self.db_client)
  111. return await sub_task.deal()
  112. @register("outside_article_monitor")
  113. async def _outside_monitor_handler(self) -> int:
  114. """外部文章监控"""
  115. collector = OutsideGzhArticlesCollector(self.db_client)
  116. await collector.deal()
  117. monitor = OutsideGzhArticlesMonitor(self.db_client)
  118. return await monitor.deal()
  119. @register("cooperate_accounts_monitor")
  120. async def _cooperate_accounts_monitor_handler(self) -> int:
  121. """合作账号文章监测"""
  122. task = CooperateAccountsMonitorTask(
  123. pool=self.db_client, log_client=self.log_client
  124. )
  125. await task.deal(task_name="save_articles")
  126. return TaskStatus.SUCCESS
  127. @register("cooperate_accounts_detail")
  128. async def _cooperate_accounts_detail_handler(self) -> int:
  129. """合作账号文章详情更新"""
  130. task = CooperateAccountsMonitorTask(
  131. pool=self.db_client, log_client=self.log_client
  132. )
  133. await task.deal(task_name="get_detail")
  134. return TaskStatus.SUCCESS
  135. @register("ad_platform_accounts_crawler")
  136. async def _ad_platform_accounts_crawler_handler(self) -> int:
  137. """广告平台账号监测"""
  138. task = AdPlatformAccountsMonitorTask(
  139. pool=self.db_client, log_service=self.log_client
  140. )
  141. # 抓账号内所有的文章
  142. await task.deal(task_name="crawl_articles")
  143. # 抓完之后, 计算账号的阅读量中位数
  144. await task.deal(task_name="cal_read_median")
  145. return TaskStatus.SUCCESS
  146. @register("ad_platform_article_detail")
  147. async def _ad_platform_article_detail_handler(self) -> int:
  148. """广告平台文章详情更新"""
  149. task = AdPlatformAccountsMonitorTask(
  150. pool=self.db_client, log_service=self.log_client
  151. )
  152. await task.deal(task_name="get_detail")
  153. return TaskStatus.SUCCESS
  154. # ==================== 爬虫类任务 ====================
  155. @register("crawler_toutiao")
  156. async def _crawler_toutiao_handler(self) -> int:
  157. """头条文章/视频抓取"""
  158. sub_task = CrawlerToutiao(
  159. self.db_client, self.log_client, self.trace_id, self.config
  160. )
  161. method = self.data.get("method", "account")
  162. media_type = self.data.get("media_type", "article")
  163. category_list = self.data.get("category_list", [])
  164. match method:
  165. case "account":
  166. await sub_task.crawler_task(media_type=media_type)
  167. case "recommend":
  168. await sub_task.crawl_toutiao_recommend_task(category_list)
  169. case "search":
  170. await sub_task.search_candidate_accounts()
  171. case _:
  172. raise TaskValidationError(f"Unsupported method: {method}")
  173. return TaskStatus.SUCCESS
  174. @register("crawler_gzh_articles")
  175. async def _crawler_gzh_article_handler(self) -> int:
  176. """抓取公众号文章"""
  177. account_method = self.data.get("account_method")
  178. crawl_mode = self.data.get("crawl_mode")
  179. strategy = self.data.get("strategy")
  180. match crawl_mode:
  181. case "account":
  182. task = CrawlerGzhAccountArticles(
  183. self.db_client, self.log_client, self.trace_id, self.config
  184. )
  185. await task.deal(account_method, strategy)
  186. case "search":
  187. task = CrawlerGzhSearchArticles(
  188. self.db_client, self.log_client, self.trace_id, self.config
  189. )
  190. await task.deal(strategy)
  191. case _:
  192. raise TaskValidationError(f"Unsupported crawl mode: {crawl_mode}")
  193. return TaskStatus.SUCCESS
  194. @register("crawler_account_manager")
  195. async def _crawler_account_manager_handler(self) -> int:
  196. """账号管理任务"""
  197. platform = self.data.get("platform", "weixin")
  198. account_id_list = self.data.get("account_id_list")
  199. match platform:
  200. case "weixin":
  201. task = WeixinAccountManager(
  202. self.db_client, self.log_client, self.trace_id
  203. )
  204. await task.deal(platform=platform, account_id_list=account_id_list)
  205. case _:
  206. raise TaskValidationError(f"Unsupported platform: {platform}")
  207. return TaskStatus.SUCCESS
  208. @register("crawler_detail_analysis")
  209. async def _crawler_article_analysis_handler(self) -> int:
  210. """抓取视频/文章详情分析统计"""
  211. task = CrawlerDetailDeal(pool=self.db_client, trace_id=self.trace_id)
  212. await task.deal(params=self.data)
  213. return TaskStatus.SUCCESS
  214. # ==================== 数据回收类任务 ====================
  215. @register("article_detail_stat")
  216. async def _article_detail_stat_handler(self) -> int:
  217. """文章详情统计"""
  218. task = ArticleDetailStat(self.db_client, self.log_client)
  219. await task.deal()
  220. return TaskStatus.SUCCESS
  221. @register("daily_publish_articles_recycle")
  222. async def _recycle_article_data_handler(self) -> int:
  223. """每日发文数据回收"""
  224. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  225. recycle = RecycleDailyPublishArticlesTask(
  226. self.db_client, self.log_client, date_str
  227. )
  228. await recycle.deal()
  229. check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
  230. await check.deal()
  231. return TaskStatus.SUCCESS
  232. @register("update_root_source_id")
  233. async def _update_root_source_id_handler(self) -> int:
  234. """更新 root_source_id"""
  235. sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
  236. await sub_task.deal()
  237. return TaskStatus.SUCCESS
  238. @register("recycle_outside_account_articles")
  239. async def _recycle_outside_account_article_handler(self) -> int:
  240. """回收外部账号文章"""
  241. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  242. task = RecycleOutsideAccountArticlesTask(
  243. pool=self.db_client, log_client=self.log_client, date_string=date_str
  244. )
  245. await task.deal()
  246. return TaskStatus.SUCCESS
  247. @register("update_outside_account_article_root_source_id")
  248. async def _update_outside_account_article_root_source_id_handler(self) -> int:
  249. """更新外部账号文章的 root_source_id"""
  250. task = UpdateOutsideRootSourceIdAndUpdateTimeTask(
  251. pool=self.db_client, log_client=self.log_client
  252. )
  253. await task.deal()
  254. return TaskStatus.SUCCESS
  255. @register("fwh_daily_recycle")
  256. async def _recycle_fwh_article_handler(self) -> int:
  257. """回收服务号文章"""
  258. task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
  259. await task.deal()
  260. return TaskStatus.SUCCESS
  261. # ==================== 算法类任务 ====================
  262. @register("article_pool_cold_start")
  263. async def _article_pool_cold_start_handler(self) -> int:
  264. """文章池冷启动"""
  265. cold_start = ArticlePoolColdStart(
  266. self.db_client, self.log_client, self.trace_id, self.config
  267. )
  268. platform = self.data.get("platform", "weixin")
  269. crawler_methods = self.data.get("crawler_methods", [])
  270. category_list = self.data.get("category_list", [])
  271. strategy = self.data.get("strategy", "strategy_v1")
  272. await cold_start.deal(
  273. platform=platform,
  274. crawl_methods=crawler_methods,
  275. category_list=category_list,
  276. strategy=strategy,
  277. )
  278. return TaskStatus.SUCCESS
  279. @register("candidate_account_quality_analysis")
  280. async def _candidate_account_quality_score_handler(self) -> int:
  281. """候选账号质量分析"""
  282. task = CandidateAccountQualityScoreRecognizer(
  283. self.db_client, self.log_client, self.trace_id
  284. )
  285. await task.deal()
  286. return TaskStatus.SUCCESS
  287. @register("article_pool_category_generation")
  288. async def _article_pool_category_generation_handler(self) -> int:
  289. """文章池品类生成"""
  290. task = ArticlePoolCategoryGeneration(
  291. self.db_client, self.log_client, self.trace_id
  292. )
  293. limit_num = self.data.get("limit")
  294. await task.deal(limit=limit_num)
  295. return TaskStatus.SUCCESS
  296. @register("account_category_analysis")
  297. async def _account_category_analysis_handler(self) -> int:
  298. """账号品类分析"""
  299. task = AccountCategoryAnalysis(
  300. pool=self.db_client,
  301. log_client=self.log_client,
  302. trace_id=self.trace_id,
  303. data=self.data,
  304. date_string=None,
  305. )
  306. await task.deal()
  307. return TaskStatus.SUCCESS
  308. @register("update_limited_account_info")
  309. async def _update_limited_account_info_handler(self) -> int:
  310. """更新限流账号信息"""
  311. task = LimitedAccountAnalysisTask(
  312. pool=self.db_client, log_client=self.log_client
  313. )
  314. await task.deal(date_string=self.data.get("date_string"))
  315. return TaskStatus.SUCCESS
  316. # ==================== LLM 类任务 ====================
  317. @register("title_rewrite")
  318. async def _title_rewrite_handler(self) -> int:
  319. """标题重写"""
  320. sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id)
  321. return await sub_task.deal()
  322. @register("extract_title_features")
  323. async def _extract_title_features_handler(self) -> int:
  324. """提取标题特征"""
  325. task = ExtractTitleFeatures(self.db_client, self.log_client, self.trace_id)
  326. await task.deal(data=self.data)
  327. return TaskStatus.SUCCESS
  328. # ==================== 统计分析类任务 ====================
  329. @register("update_account_read_rate_avg")
  330. async def _update_account_read_rate_avg_handler(self) -> int:
  331. """更新账号阅读率均值"""
  332. task = AccountPositionReadRateAvg(
  333. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  334. )
  335. await task.deal(end_date=self.data.get("end_date"))
  336. return TaskStatus.SUCCESS
  337. @register("update_account_read_avg")
  338. async def _update_account_read_avg_handler(self) -> int:
  339. """更新账号阅读均值"""
  340. task = AccountPositionReadAvg(
  341. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  342. )
  343. await task.deal(end_date=self.data.get("end_date"))
  344. return TaskStatus.SUCCESS
  345. @register("update_account_open_rate_avg")
  346. async def _update_account_open_rate_avg_handler(self) -> int:
  347. """更新账号打开率均值"""
  348. task = AccountPositionOpenRateAvg(
  349. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  350. )
  351. await task.deal(date_string=self.data.get("date_string"))
  352. return TaskStatus.SUCCESS
  353. # ==================== 自动化任务 ====================
  354. @register("auto_follow_account")
  355. async def _auto_follow_account_handler(self) -> int:
  356. """自动关注公众号"""
  357. task = AutoReplyCardsMonitor(
  358. pool=self.db_client, log_service=self.log_client, config=self.config
  359. )
  360. await task.deal(task_name="follow_gzh_task")
  361. return TaskStatus.SUCCESS
  362. @register("get_follow_result")
  363. async def _get_follow_result_handler(self) -> int:
  364. """获取自动关注回复"""
  365. task = AutoReplyCardsMonitor(
  366. pool=self.db_client, log_service=self.log_client, config=self.config
  367. )
  368. await task.deal(task_name="get_auto_reply_task")
  369. return TaskStatus.SUCCESS
  370. @register("extract_reply_result")
  371. async def _extract_reply_result_handler(self) -> int:
  372. """解析自动回复结果"""
  373. task = AutoReplyCardsMonitor(
  374. pool=self.db_client, log_service=self.log_client, config=self.config
  375. )
  376. await task.deal(task_name="extract_task")
  377. return TaskStatus.SUCCESS
  378. # ==================== 其他任务 ====================
  379. @register("mini_program_detail_process")
  380. async def _mini_program_detail_handler(self) -> int:
  381. """更新小程序裂变信息"""
  382. task = RecycleMiniProgramDetailTask(
  383. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  384. )
  385. await task.deal(params=self.data)
  386. return TaskStatus.SUCCESS
  387. @register("rate_limited_article_filter")
  388. async def _rate_limited_article_filter(self) -> int:
  389. """限流文章删除"""
  390. task = RateLimitedArticleFilter(pool=self.db_client, config=self.config)
  391. await task.deal()
  392. return TaskStatus.SUCCESS
  393. @register("create_ad_platform_accounts_decode_task")
  394. async def _create_ad_platform_accounts_decode_task(self) -> int:
  395. """创建解构任务"""
  396. task = AdPlatformArticlesDecodeTask(
  397. pool=self.db_client, log_service=self.log_client
  398. )
  399. await task.deal(task_name="create_tasks")
  400. return TaskStatus.SUCCESS
  401. @register("fetch_ad_platform_accounts_decode_result")
  402. async def _fetch_ad_platform_accounts_decode_result(self) -> int:
  403. """获取解构任务结果"""
  404. task = AdPlatformArticlesDecodeTask(
  405. pool=self.db_client, log_service=self.log_client
  406. )
  407. await task.deal(task_name="fetch_results")
  408. return TaskStatus.SUCCESS
  409. __all__ = ["TaskHandler"]