task_handler.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. from datetime import datetime
  2. from typing import Callable, Dict, Optional
  3. from app.core.config import GlobalConfigSettings
  4. from app.domains.analysis_task import CrawlerDetailDeal
  5. from app.domains.analysis_task import AccountPositionReadRateAvg
  6. from app.domains.analysis_task import AccountPositionReadAvg
  7. from app.domains.analysis_task import AccountPositionOpenRateAvg
  8. from app.domains.analysis_task import RateLimitedArticleFilter
  9. from app.domains.algorithm_tasks import AccountCategoryAnalysis
  10. from app.domains.cold_start_tasks import ArticlePoolColdStart
  11. from app.domains.cold_start_tasks import AdPlatformArticlePublishTask
  12. from app.domains.crawler_tasks import CrawlerToutiao
  13. from app.domains.crawler_tasks import WeixinAccountManager
  14. from app.domains.crawler_tasks import CrawlerGzhAccountArticles
  15. from app.domains.crawler_tasks import CrawlerGzhSearchArticles
  16. from app.domains.data_recycle_tasks import ArticleDetailStat
  17. from app.domains.data_recycle_tasks import RecycleDailyPublishArticlesTask
  18. from app.domains.data_recycle_tasks import RecycleOutsideAccountArticlesTask
  19. from app.domains.data_recycle_tasks import CheckDailyPublishArticlesTask
  20. from app.domains.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
  21. from app.domains.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask
  22. from app.domains.data_recycle_tasks import RecycleMiniProgramDetailTask
  23. from app.domains.data_recycle_tasks import (
  24. UpdateOutsideRootSourceIdAndUpdateTimeTask,
  25. )
  26. from app.domains.llm_tasks.aigc_decode_task import CreateAdPlatformArticlesDecodeTask
  27. from app.domains.llm_tasks.aigc_decode_task import CreateInnerArticlesDecodeTask
  28. from app.domains.llm_tasks.aigc_decode_task import FetchDecodeResults
  29. from app.domains.llm_tasks.aigc_decode_task import ExtractDecodeTaskDetail
  30. from app.domains.llm_tasks import TitleRewrite
  31. from app.domains.llm_tasks import ArticlePoolCategoryGeneration
  32. from app.domains.llm_tasks import CandidateAccountQualityScoreRecognizer
  33. from app.domains.llm_tasks import ExtractTitleFeatures
  34. from app.domains.monitor_tasks import AutoReplyCardsMonitor
  35. from app.domains.monitor_tasks import check_kimi_balance
  36. from app.domains.monitor_tasks import GetOffVideos
  37. from app.domains.monitor_tasks import CheckVideoAuditStatus
  38. from app.domains.monitor_tasks import CooperateAccountsMonitorTask
  39. from app.domains.monitor_tasks import InnerGzhArticlesMonitor
  40. from app.domains.monitor_tasks import OutsideGzhArticlesMonitor
  41. from app.domains.monitor_tasks import OutsideGzhArticlesCollector
  42. from app.domains.monitor_tasks import TaskProcessingMonitor
  43. from app.domains.monitor_tasks import LimitedAccountAnalysisTask
  44. from app.domains.monitor_tasks import AdPlatformAccountsMonitorTask
  45. from app.jobs.task_config import TaskStatus
  46. from app.jobs.task_utils import TaskValidationError
  47. _TASK_HANDLER_REGISTRY: Dict[str, Callable] = {}
  48. def register(task_name: str):
  49. def decorator(func):
  50. _TASK_HANDLER_REGISTRY[task_name] = func
  51. return func
  52. return decorator
  53. class TaskHandler:
  54. """任务处理器基类 - 使用装饰器模式自动注册任务"""
  55. # 任务注册表
  56. _handlers = _TASK_HANDLER_REGISTRY
  57. def __init__(
  58. self,
  59. data: dict,
  60. log_service,
  61. db_client,
  62. trace_id: str,
  63. config: GlobalConfigSettings,
  64. ):
  65. self.data = data
  66. self.log_client = log_service
  67. self.db_client = db_client
  68. self.trace_id = trace_id
  69. self.config = config
  70. @classmethod
  71. def get_handler(cls, task_name: str) -> Optional[Callable]:
  72. """获取任务处理器"""
  73. return cls._handlers.get(task_name)
  74. @classmethod
  75. def list_registered_tasks(cls) -> list:
  76. """列出所有已注册的任务"""
  77. return list(cls._handlers.keys())
  78. async def _log_task_event(self, event_type: str, **kwargs):
  79. """统一的任务日志记录"""
  80. log_data = {
  81. "timestamp": datetime.now().isoformat(),
  82. "trace_id": self.trace_id,
  83. "event_type": event_type,
  84. "task": self.data.get("task_name"),
  85. **kwargs,
  86. }
  87. await self.log_client.log(contents=log_data)
  88. # ==================== 监控类任务 ====================
  89. @register("check_kimi_balance")
  90. async def _check_kimi_balance_handler(self) -> int:
  91. """检查 Kimi 余额"""
  92. response = await check_kimi_balance()
  93. await self._log_task_event("kimi_balance_checked", data=response)
  94. return TaskStatus.SUCCESS
  95. @register("get_off_videos")
  96. async def _get_off_videos_task_handler(self) -> int:
  97. """视频下架任务"""
  98. sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
  99. return await sub_task.deal()
  100. @register("check_publish_video_audit_status")
  101. async def _check_video_audit_status_handler(self) -> int:
  102. """检查视频审核状态"""
  103. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
  104. return await sub_task.deal()
  105. @register("task_processing_monitor")
  106. async def _task_processing_monitor_handler(self) -> int:
  107. """任务处理监控"""
  108. sub_task = TaskProcessingMonitor(self.db_client)
  109. await sub_task.deal()
  110. return TaskStatus.SUCCESS
  111. @register("inner_article_monitor")
  112. async def _inner_gzh_articles_monitor_handler(self) -> int:
  113. """内部公众号文章监控"""
  114. sub_task = InnerGzhArticlesMonitor(self.db_client)
  115. return await sub_task.deal()
  116. @register("outside_article_monitor")
  117. async def _outside_monitor_handler(self) -> int:
  118. """外部文章监控"""
  119. collector = OutsideGzhArticlesCollector(self.db_client)
  120. await collector.deal()
  121. monitor = OutsideGzhArticlesMonitor(self.db_client)
  122. return await monitor.deal()
  123. @register("cooperate_accounts_monitor")
  124. async def _cooperate_accounts_monitor_handler(self) -> int:
  125. """合作账号文章监测"""
  126. task = CooperateAccountsMonitorTask(
  127. pool=self.db_client, log_client=self.log_client
  128. )
  129. await task.deal(task_name="save_articles")
  130. return TaskStatus.SUCCESS
  131. @register("cooperate_accounts_detail")
  132. async def _cooperate_accounts_detail_handler(self) -> int:
  133. """合作账号文章详情更新"""
  134. task = CooperateAccountsMonitorTask(
  135. pool=self.db_client, log_client=self.log_client
  136. )
  137. await task.deal(task_name="get_detail")
  138. return TaskStatus.SUCCESS
  139. @register("ad_platform_accounts_crawler")
  140. async def _ad_platform_accounts_crawler_handler(self) -> int:
  141. """广告平台账号监测"""
  142. task = AdPlatformAccountsMonitorTask(
  143. pool=self.db_client, log_service=self.log_client
  144. )
  145. # 抓账号内所有的文章
  146. await task.deal(task_name="crawl_articles")
  147. # 抓完之后, 计算账号的阅读量中位数
  148. await task.deal(task_name="cal_read_median")
  149. return TaskStatus.SUCCESS
  150. @register("ad_platform_article_detail")
  151. async def _ad_platform_article_detail_handler(self) -> int:
  152. """广告平台文章详情更新"""
  153. task = AdPlatformAccountsMonitorTask(
  154. pool=self.db_client, log_service=self.log_client
  155. )
  156. await task.deal(task_name="get_detail")
  157. return TaskStatus.SUCCESS
  158. # ==================== 爬虫类任务 ====================
  159. @register("crawler_toutiao")
  160. async def _crawler_toutiao_handler(self) -> int:
  161. """头条文章/视频抓取"""
  162. sub_task = CrawlerToutiao(
  163. self.db_client, self.log_client, self.trace_id, self.config
  164. )
  165. method = self.data.get("method", "account")
  166. media_type = self.data.get("media_type", "article")
  167. category_list = self.data.get("category_list", [])
  168. match method:
  169. case "account":
  170. await sub_task.crawler_task(media_type=media_type)
  171. case "recommend":
  172. await sub_task.crawl_toutiao_recommend_task(category_list)
  173. case "search":
  174. await sub_task.search_candidate_accounts()
  175. case _:
  176. raise TaskValidationError(f"Unsupported method: {method}")
  177. return TaskStatus.SUCCESS
  178. @register("crawler_gzh_articles")
  179. async def _crawler_gzh_article_handler(self) -> int:
  180. """抓取公众号文章"""
  181. account_method = self.data.get("account_method")
  182. crawl_mode = self.data.get("crawl_mode")
  183. strategy = self.data.get("strategy")
  184. match crawl_mode:
  185. case "account":
  186. task = CrawlerGzhAccountArticles(
  187. self.db_client, self.log_client, self.trace_id, self.config
  188. )
  189. await task.deal(account_method, strategy)
  190. case "search":
  191. task = CrawlerGzhSearchArticles(
  192. self.db_client, self.log_client, self.trace_id, self.config
  193. )
  194. await task.deal(strategy)
  195. case _:
  196. raise TaskValidationError(f"Unsupported crawl mode: {crawl_mode}")
  197. return TaskStatus.SUCCESS
  198. @register("crawler_account_manager")
  199. async def _crawler_account_manager_handler(self) -> int:
  200. """账号管理任务"""
  201. platform = self.data.get("platform", "weixin")
  202. account_id_list = self.data.get("account_id_list")
  203. match platform:
  204. case "weixin":
  205. task = WeixinAccountManager(
  206. self.db_client, self.log_client, self.trace_id
  207. )
  208. await task.deal(platform=platform, account_id_list=account_id_list)
  209. case _:
  210. raise TaskValidationError(f"Unsupported platform: {platform}")
  211. return TaskStatus.SUCCESS
  212. @register("crawler_detail_analysis")
  213. async def _crawler_article_analysis_handler(self) -> int:
  214. """抓取视频/文章详情分析统计"""
  215. task = CrawlerDetailDeal(pool=self.db_client, trace_id=self.trace_id)
  216. await task.deal(params=self.data)
  217. return TaskStatus.SUCCESS
  218. # ==================== 数据回收类任务 ====================
  219. @register("article_detail_stat")
  220. async def _article_detail_stat_handler(self) -> int:
  221. """文章详情统计"""
  222. task = ArticleDetailStat(self.db_client, self.log_client)
  223. await task.deal()
  224. return TaskStatus.SUCCESS
  225. @register("daily_publish_articles_recycle")
  226. async def _recycle_article_data_handler(self) -> int:
  227. """每日发文数据回收"""
  228. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  229. recycle = RecycleDailyPublishArticlesTask(
  230. self.db_client, self.log_client, date_str
  231. )
  232. await recycle.deal()
  233. check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
  234. await check.deal()
  235. return TaskStatus.SUCCESS
  236. @register("update_root_source_id")
  237. async def _update_root_source_id_handler(self) -> int:
  238. """更新 root_source_id"""
  239. sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
  240. await sub_task.deal()
  241. return TaskStatus.SUCCESS
  242. @register("recycle_outside_account_articles")
  243. async def _recycle_outside_account_article_handler(self) -> int:
  244. """回收外部账号文章"""
  245. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  246. task = RecycleOutsideAccountArticlesTask(
  247. pool=self.db_client, log_client=self.log_client, date_string=date_str
  248. )
  249. await task.deal()
  250. return TaskStatus.SUCCESS
  251. @register("update_outside_account_article_root_source_id")
  252. async def _update_outside_account_article_root_source_id_handler(self) -> int:
  253. """更新外部账号文章的 root_source_id"""
  254. task = UpdateOutsideRootSourceIdAndUpdateTimeTask(
  255. pool=self.db_client, log_client=self.log_client
  256. )
  257. await task.deal()
  258. return TaskStatus.SUCCESS
  259. @register("fwh_daily_recycle")
  260. async def _recycle_fwh_article_handler(self) -> int:
  261. """回收服务号文章"""
  262. task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
  263. await task.deal()
  264. return TaskStatus.SUCCESS
  265. # ==================== 算法类任务 ====================
  266. @register("article_pool_cold_start")
  267. async def _article_pool_cold_start_handler(self) -> int:
  268. """文章池冷启动"""
  269. cold_start = ArticlePoolColdStart(
  270. self.db_client, self.log_client, self.trace_id, self.config
  271. )
  272. platform = self.data.get("platform", "weixin")
  273. crawler_methods = self.data.get("crawler_methods", [])
  274. category_list = self.data.get("category_list", [])
  275. strategy = self.data.get("strategy", "strategy_v1")
  276. await cold_start.deal(
  277. platform=platform,
  278. crawl_methods=crawler_methods,
  279. category_list=category_list,
  280. strategy=strategy,
  281. )
  282. return TaskStatus.SUCCESS
  283. @register("ad_platform_article_publish")
  284. async def _ad_platform_article_publish(self) -> int:
  285. """文章池冷启动"""
  286. task = AdPlatformArticlePublishTask(self.db_client, self.log_client)
  287. await task.deal()
  288. return TaskStatus.SUCCESS
  289. @register("candidate_account_quality_analysis")
  290. async def _candidate_account_quality_score_handler(self) -> int:
  291. """候选账号质量分析"""
  292. task = CandidateAccountQualityScoreRecognizer(
  293. self.db_client, self.log_client, self.trace_id
  294. )
  295. await task.deal()
  296. return TaskStatus.SUCCESS
  297. @register("article_pool_category_generation")
  298. async def _article_pool_category_generation_handler(self) -> int:
  299. """文章池品类生成"""
  300. task = ArticlePoolCategoryGeneration(
  301. self.db_client, self.log_client, self.trace_id
  302. )
  303. limit_num = self.data.get("limit")
  304. await task.deal(limit=limit_num)
  305. return TaskStatus.SUCCESS
  306. @register("account_category_analysis")
  307. async def _account_category_analysis_handler(self) -> int:
  308. """账号品类分析"""
  309. task = AccountCategoryAnalysis(
  310. pool=self.db_client,
  311. log_client=self.log_client,
  312. trace_id=self.trace_id,
  313. data=self.data,
  314. date_string=None,
  315. )
  316. await task.deal()
  317. return TaskStatus.SUCCESS
  318. @register("update_limited_account_info")
  319. async def _update_limited_account_info_handler(self) -> int:
  320. """更新限流账号信息"""
  321. task = LimitedAccountAnalysisTask(
  322. pool=self.db_client, log_client=self.log_client
  323. )
  324. await task.deal(date_string=self.data.get("date_string"))
  325. return TaskStatus.SUCCESS
  326. # ==================== LLM 类任务 ====================
  327. @register("title_rewrite")
  328. async def _title_rewrite_handler(self) -> int:
  329. """标题重写"""
  330. sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id)
  331. return await sub_task.deal()
  332. @register("extract_title_features")
  333. async def _extract_title_features_handler(self) -> int:
  334. """提取标题特征"""
  335. task = ExtractTitleFeatures(self.db_client, self.log_client, self.trace_id)
  336. await task.deal(data=self.data)
  337. return TaskStatus.SUCCESS
  338. # ==================== 统计分析类任务 ====================
  339. @register("update_account_read_rate_avg")
  340. async def _update_account_read_rate_avg_handler(self) -> int:
  341. """更新账号阅读率均值"""
  342. task = AccountPositionReadRateAvg(
  343. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  344. )
  345. await task.deal(end_date=self.data.get("end_date"))
  346. return TaskStatus.SUCCESS
  347. @register("update_account_read_avg")
  348. async def _update_account_read_avg_handler(self) -> int:
  349. """更新账号阅读均值"""
  350. task = AccountPositionReadAvg(
  351. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  352. )
  353. await task.deal(end_date=self.data.get("end_date"))
  354. return TaskStatus.SUCCESS
  355. @register("update_account_open_rate_avg")
  356. async def _update_account_open_rate_avg_handler(self) -> int:
  357. """更新账号打开率均值"""
  358. task = AccountPositionOpenRateAvg(
  359. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  360. )
  361. await task.deal(date_string=self.data.get("date_string"))
  362. return TaskStatus.SUCCESS
  363. # ==================== 自动化任务 ====================
  364. @register("auto_follow_account")
  365. async def _auto_follow_account_handler(self) -> int:
  366. """自动关注公众号"""
  367. task = AutoReplyCardsMonitor(
  368. pool=self.db_client, log_service=self.log_client, config=self.config
  369. )
  370. await task.deal(task_name="follow_gzh_task")
  371. return TaskStatus.SUCCESS
  372. @register("get_follow_result")
  373. async def _get_follow_result_handler(self) -> int:
  374. """获取自动关注回复"""
  375. task = AutoReplyCardsMonitor(
  376. pool=self.db_client, log_service=self.log_client, config=self.config
  377. )
  378. await task.deal(task_name="get_auto_reply_task")
  379. return TaskStatus.SUCCESS
  380. @register("extract_reply_result")
  381. async def _extract_reply_result_handler(self) -> int:
  382. """解析自动回复结果"""
  383. task = AutoReplyCardsMonitor(
  384. pool=self.db_client, log_service=self.log_client, config=self.config
  385. )
  386. await task.deal(task_name="extract_task")
  387. return TaskStatus.SUCCESS
  388. # ==================== 其他任务 ====================
  389. @register("mini_program_detail_process")
  390. async def _mini_program_detail_handler(self) -> int:
  391. """更新小程序裂变信息"""
  392. task = RecycleMiniProgramDetailTask(
  393. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  394. )
  395. await task.deal(params=self.data)
  396. return TaskStatus.SUCCESS
  397. @register("rate_limited_article_filter")
  398. async def _rate_limited_article_filter(self) -> int:
  399. """限流文章删除"""
  400. task = RateLimitedArticleFilter(pool=self.db_client, config=self.config)
  401. await task.deal()
  402. return TaskStatus.SUCCESS
  403. @register("create_ad_platform_accounts_decode_task")
  404. async def _create_ad_platform_accounts_decode_task(self) -> int:
  405. """创建解构任务"""
  406. task = CreateAdPlatformArticlesDecodeTask(
  407. pool=self.db_client, log_service=self.log_client
  408. )
  409. await task.deal()
  410. return TaskStatus.SUCCESS
  411. @register("fetch_decode_result")
  412. async def _fetch_decode_result(self) -> int:
  413. """获取解构任务结果"""
  414. task = FetchDecodeResults(
  415. pool=self.db_client, log_service=self.log_client
  416. )
  417. await task.deal()
  418. return TaskStatus.SUCCESS
  419. @register("extract_decode_result")
  420. async def _extract_ad_platform_accounts_decode_result(self) -> int:
  421. """提取解构任务结果"""
  422. task = ExtractDecodeTaskDetail(
  423. pool=self.db_client, log_service=self.log_client
  424. )
  425. await task.deal()
  426. return TaskStatus.SUCCESS
  427. __all__ = ["TaskHandler"]