task_handler.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. from datetime import datetime
  2. from typing import Callable, Dict, Optional
  3. from app.core.config import GlobalConfigSettings
  4. from app.jobs.domains import *
  5. from app.jobs.task_config import TaskStatus
  6. from app.jobs.task_utils import TaskValidationError
  7. _TASK_HANDLER_REGISTRY: Dict[str, Callable] = {}
  8. def register(task_name: str):
  9. def decorator(func):
  10. _TASK_HANDLER_REGISTRY[task_name] = func
  11. return func
  12. return decorator
  13. class TaskHandler:
  14. """任务处理器基类 - 使用装饰器模式自动注册任务"""
  15. # 任务注册表
  16. _handlers = _TASK_HANDLER_REGISTRY
  17. def __init__(
  18. self,
  19. data: dict,
  20. log_service,
  21. db_client,
  22. trace_id: str,
  23. config: GlobalConfigSettings,
  24. ):
  25. self.data = data
  26. self.log_client = log_service
  27. self.db_client = db_client
  28. self.trace_id = trace_id
  29. self.config = config
  30. @classmethod
  31. def get_handler(cls, task_name: str) -> Optional[Callable]:
  32. """获取任务处理器"""
  33. return cls._handlers.get(task_name)
  34. @classmethod
  35. def list_registered_tasks(cls) -> list:
  36. """列出所有已注册的任务"""
  37. return list(cls._handlers.keys())
  38. async def _log_task_event(self, event_type: str, **kwargs):
  39. """统一的任务日志记录"""
  40. log_data = {
  41. "timestamp": datetime.now().isoformat(),
  42. "trace_id": self.trace_id,
  43. "event_type": event_type,
  44. "task": self.data.get("task_name"),
  45. **kwargs,
  46. }
  47. await self.log_client.log(contents=log_data)
  48. # ==================== 监控类任务 ====================
  49. @register("check_kimi_balance")
  50. async def _check_kimi_balance_handler(self) -> int:
  51. """检查 Kimi 余额"""
  52. response = await check_kimi_balance()
  53. await self._log_task_event("kimi_balance_checked", data=response)
  54. return TaskStatus.SUCCESS
  55. @register("get_off_videos")
  56. async def _get_off_videos_task_handler(self) -> int:
  57. """视频下架任务"""
  58. sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
  59. return await sub_task.deal()
  60. @register("check_publish_video_audit_status")
  61. async def _check_video_audit_status_handler(self) -> int:
  62. """检查视频审核状态"""
  63. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
  64. return await sub_task.deal()
  65. @register("task_processing_monitor")
  66. async def _task_processing_monitor_handler(self) -> int:
  67. """任务处理监控"""
  68. sub_task = TaskProcessingMonitor(self.db_client)
  69. await sub_task.deal()
  70. return TaskStatus.SUCCESS
  71. @register("inner_article_monitor")
  72. async def _inner_gzh_articles_monitor_handler(self) -> int:
  73. """内部公众号文章监控"""
  74. sub_task = InnerGzhArticlesMonitor(self.db_client)
  75. return await sub_task.deal()
  76. @register("outside_article_monitor")
  77. async def _outside_monitor_handler(self) -> int:
  78. """外部文章监控"""
  79. collector = OutsideGzhArticlesCollector(self.db_client)
  80. await collector.deal()
  81. monitor = OutsideGzhArticlesMonitor(self.db_client)
  82. return await monitor.deal()
  83. @register("cooperate_accounts_monitor")
  84. async def _cooperate_accounts_monitor_handler(self) -> int:
  85. """合作账号文章监测"""
  86. task = CooperateAccountsMonitorTask(
  87. pool=self.db_client, log_client=self.log_client
  88. )
  89. await task.deal(task_name="save_articles")
  90. return TaskStatus.SUCCESS
  91. @register("cooperate_accounts_detail")
  92. async def _cooperate_accounts_detail_handler(self) -> int:
  93. """合作账号文章详情更新"""
  94. task = CooperateAccountsMonitorTask(
  95. pool=self.db_client, log_client=self.log_client
  96. )
  97. await task.deal(task_name="get_detail")
  98. return TaskStatus.SUCCESS
  99. @register("ad_platform_accounts_crawler")
  100. async def _ad_platform_accounts_crawler_handler(self) -> int:
  101. """广告平台账号监测"""
  102. task = AdPlatformAccountsMonitorTask(
  103. pool=self.db_client, log_service=self.log_client
  104. )
  105. # 抓账号内所有的文章
  106. await task.deal(task_name="crawl_articles")
  107. # 抓完之后, 计算账号的阅读量中位数
  108. await task.deal(task_name="cal_read_median")
  109. return TaskStatus.SUCCESS
  110. @register("ad_platform_article_detail")
  111. async def _ad_platform_article_detail_handler(self) -> int:
  112. """广告平台文章详情更新"""
  113. task = AdPlatformAccountsMonitorTask(
  114. pool=self.db_client, log_service=self.log_client
  115. )
  116. await task.deal(task_name="get_detail")
  117. return TaskStatus.SUCCESS
  118. # ==================== 爬虫类任务 ====================
  119. @register("crawler_toutiao")
  120. async def _crawler_toutiao_handler(self) -> int:
  121. """头条文章/视频抓取"""
  122. sub_task = CrawlerToutiao(
  123. self.db_client, self.log_client, self.trace_id, self.config
  124. )
  125. method = self.data.get("method", "account")
  126. media_type = self.data.get("media_type", "article")
  127. category_list = self.data.get("category_list", [])
  128. match method:
  129. case "account":
  130. await sub_task.crawler_task(media_type=media_type)
  131. case "recommend":
  132. await sub_task.crawl_toutiao_recommend_task(category_list)
  133. case "search":
  134. await sub_task.search_candidate_accounts()
  135. case _:
  136. raise TaskValidationError(f"Unsupported method: {method}")
  137. return TaskStatus.SUCCESS
  138. @register("crawler_gzh_articles")
  139. async def _crawler_gzh_article_handler(self) -> int:
  140. """抓取公众号文章"""
  141. account_method = self.data.get("account_method")
  142. crawl_mode = self.data.get("crawl_mode")
  143. strategy = self.data.get("strategy")
  144. match crawl_mode:
  145. case "account":
  146. task = CrawlerGzhAccountArticles(
  147. self.db_client, self.log_client, self.trace_id, self.config
  148. )
  149. await task.deal(account_method, strategy)
  150. case "search":
  151. task = CrawlerGzhSearchArticles(
  152. self.db_client, self.log_client, self.trace_id, self.config
  153. )
  154. await task.deal(strategy)
  155. case _:
  156. raise TaskValidationError(f"Unsupported crawl mode: {crawl_mode}")
  157. return TaskStatus.SUCCESS
  158. @register("crawler_account_manager")
  159. async def _crawler_account_manager_handler(self) -> int:
  160. """账号管理任务"""
  161. platform = self.data.get("platform", "weixin")
  162. account_id_list = self.data.get("account_id_list")
  163. match platform:
  164. case "weixin":
  165. task = WeixinAccountManager(
  166. self.db_client, self.log_client, self.trace_id
  167. )
  168. await task.deal(platform=platform, account_id_list=account_id_list)
  169. case _:
  170. raise TaskValidationError(f"Unsupported platform: {platform}")
  171. return TaskStatus.SUCCESS
  172. @register("crawler_detail_analysis")
  173. async def _crawler_article_analysis_handler(self) -> int:
  174. """抓取视频/文章详情分析统计"""
  175. task = CrawlerDetailDeal(pool=self.db_client, trace_id=self.trace_id)
  176. await task.deal(params=self.data)
  177. return TaskStatus.SUCCESS
  178. # ==================== 数据回收类任务 ====================
  179. @register("article_detail_stat")
  180. async def _article_detail_stat_handler(self) -> int:
  181. """文章详情统计"""
  182. task = ArticleDetailStat(self.db_client, self.log_client)
  183. await task.deal()
  184. return TaskStatus.SUCCESS
  185. @register("daily_publish_articles_recycle")
  186. async def _recycle_article_data_handler(self) -> int:
  187. """每日发文数据回收"""
  188. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  189. recycle = RecycleDailyPublishArticlesTask(
  190. self.db_client, self.log_client, date_str
  191. )
  192. await recycle.deal()
  193. check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
  194. await check.deal()
  195. return TaskStatus.SUCCESS
  196. @register("update_root_source_id")
  197. async def _update_root_source_id_handler(self) -> int:
  198. """更新 root_source_id"""
  199. sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
  200. await sub_task.deal()
  201. return TaskStatus.SUCCESS
  202. @register("recycle_outside_account_articles")
  203. async def _recycle_outside_account_article_handler(self) -> int:
  204. """回收外部账号文章"""
  205. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  206. task = RecycleOutsideAccountArticlesTask(
  207. pool=self.db_client, log_client=self.log_client, date_string=date_str
  208. )
  209. await task.deal()
  210. return TaskStatus.SUCCESS
  211. @register("update_outside_account_article_root_source_id")
  212. async def _update_outside_account_article_root_source_id_handler(self) -> int:
  213. """更新外部账号文章的 root_source_id"""
  214. task = UpdateOutsideRootSourceIdAndUpdateTimeTask(
  215. pool=self.db_client, log_client=self.log_client
  216. )
  217. await task.deal()
  218. return TaskStatus.SUCCESS
  219. @register("fwh_daily_recycle")
  220. async def _recycle_fwh_article_handler(self) -> int:
  221. """回收服务号文章"""
  222. task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
  223. await task.deal()
  224. return TaskStatus.SUCCESS
  225. # ==================== 算法类任务 ====================
  226. @register("article_pool_cold_start")
  227. async def _article_pool_cold_start_handler(self) -> int:
  228. """文章池冷启动"""
  229. cold_start = ArticlePoolColdStart(
  230. self.db_client, self.log_client, self.trace_id, self.config
  231. )
  232. platform = self.data.get("platform", "weixin")
  233. crawler_methods = self.data.get("crawler_methods", [])
  234. category_list = self.data.get("category_list", [])
  235. strategy = self.data.get("strategy", "strategy_v1")
  236. await cold_start.deal(
  237. platform=platform,
  238. crawl_methods=crawler_methods,
  239. category_list=category_list,
  240. strategy=strategy,
  241. )
  242. return TaskStatus.SUCCESS
  243. @register("ad_platform_article_publish")
  244. async def _ad_platform_article_publish(self) -> int:
  245. """文章池冷启动"""
  246. task = AdPlatformArticlePublishTask(self.db_client, self.log_client)
  247. await task.deal()
  248. return TaskStatus.SUCCESS
  249. @register("candidate_account_quality_analysis")
  250. async def _candidate_account_quality_score_handler(self) -> int:
  251. """候选账号质量分析"""
  252. task = CandidateAccountQualityScoreRecognizer(
  253. self.db_client, self.log_client, self.trace_id
  254. )
  255. await task.deal()
  256. return TaskStatus.SUCCESS
  257. @register("article_pool_category_generation")
  258. async def _article_pool_category_generation_handler(self) -> int:
  259. """文章池品类生成"""
  260. task = ArticlePoolCategoryGeneration(
  261. self.db_client, self.log_client, self.trace_id
  262. )
  263. limit_num = self.data.get("limit")
  264. await task.deal(limit=limit_num)
  265. return TaskStatus.SUCCESS
  266. @register("account_category_analysis")
  267. async def _account_category_analysis_handler(self) -> int:
  268. """账号品类分析"""
  269. task = AccountCategoryAnalysis(
  270. pool=self.db_client,
  271. log_client=self.log_client,
  272. trace_id=self.trace_id,
  273. data=self.data,
  274. date_string=None,
  275. )
  276. await task.deal()
  277. return TaskStatus.SUCCESS
  278. @register("update_limited_account_info")
  279. async def _update_limited_account_info_handler(self) -> int:
  280. """更新限流账号信息"""
  281. task = LimitedAccountAnalysisTask(
  282. pool=self.db_client, log_client=self.log_client
  283. )
  284. await task.deal(date_string=self.data.get("date_string"))
  285. return TaskStatus.SUCCESS
  286. # ==================== LLM 类任务 ====================
  287. @register("title_rewrite")
  288. async def _title_rewrite_handler(self) -> int:
  289. """标题重写"""
  290. sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id)
  291. return await sub_task.deal()
  292. @register("extract_title_features")
  293. async def _extract_title_features_handler(self) -> int:
  294. """提取标题特征"""
  295. task = ExtractTitleFeatures(self.db_client, self.log_client, self.trace_id)
  296. await task.deal(data=self.data)
  297. return TaskStatus.SUCCESS
  298. # ==================== 统计分析类任务 ====================
  299. @register("update_account_read_rate_avg")
  300. async def _update_account_read_rate_avg_handler(self) -> int:
  301. """更新账号阅读率均值"""
  302. task = AccountPositionReadRateAvg(
  303. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  304. )
  305. await task.deal(end_date=self.data.get("end_date"))
  306. return TaskStatus.SUCCESS
  307. @register("update_account_read_avg")
  308. async def _update_account_read_avg_handler(self) -> int:
  309. """更新账号阅读均值"""
  310. task = AccountPositionReadAvg(
  311. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  312. )
  313. await task.deal(end_date=self.data.get("end_date"))
  314. return TaskStatus.SUCCESS
  315. @register("update_account_open_rate_avg")
  316. async def _update_account_open_rate_avg_handler(self) -> int:
  317. """更新账号打开率均值"""
  318. task = AccountPositionOpenRateAvg(
  319. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  320. )
  321. await task.deal(date_string=self.data.get("date_string"))
  322. return TaskStatus.SUCCESS
  323. # ==================== 自动化任务 ====================
  324. @register("auto_follow_account")
  325. async def _auto_follow_account_handler(self) -> int:
  326. """自动关注公众号"""
  327. task = AutoReplyCardsMonitor(
  328. pool=self.db_client, log_service=self.log_client, config=self.config
  329. )
  330. await task.deal(task_name="follow_gzh_task")
  331. return TaskStatus.SUCCESS
  332. @register("get_follow_result")
  333. async def _get_follow_result_handler(self) -> int:
  334. """获取自动关注回复"""
  335. task = AutoReplyCardsMonitor(
  336. pool=self.db_client, log_service=self.log_client, config=self.config
  337. )
  338. await task.deal(task_name="get_auto_reply_task")
  339. return TaskStatus.SUCCESS
  340. @register("extract_reply_result")
  341. async def _extract_reply_result_handler(self) -> int:
  342. """解析自动回复结果"""
  343. task = AutoReplyCardsMonitor(
  344. pool=self.db_client, log_service=self.log_client, config=self.config
  345. )
  346. await task.deal(task_name="extract_task")
  347. return TaskStatus.SUCCESS
  348. # ==================== 其他任务 ====================
  349. @register("mini_program_detail_process")
  350. async def _mini_program_detail_handler(self) -> int:
  351. """更新小程序裂变信息"""
  352. task = RecycleMiniProgramDetailTask(
  353. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  354. )
  355. await task.deal(params=self.data)
  356. return TaskStatus.SUCCESS
  357. @register("rate_limited_article_filter")
  358. async def _rate_limited_article_filter(self) -> int:
  359. """限流文章删除"""
  360. task = RateLimitedArticleFilter(pool=self.db_client, config=self.config)
  361. await task.deal()
  362. return TaskStatus.SUCCESS
  363. @register("create_ad_platform_accounts_decode_task")
  364. async def _create_ad_platform_accounts_decode_task(self) -> int:
  365. """创建解构任务"""
  366. task = CreateAdPlatformArticlesDecodeTask(
  367. pool=self.db_client, log_service=self.log_client
  368. )
  369. await task.deal()
  370. return TaskStatus.SUCCESS
  371. @register("fetch_decode_result")
  372. async def _fetch_decode_result(self) -> int:
  373. """获取解构任务结果"""
  374. task = FetchDecodeResults(
  375. pool=self.db_client, log_service=self.log_client
  376. )
  377. await task.deal()
  378. return TaskStatus.SUCCESS
  379. @register("extract_decode_result")
  380. async def _extract_ad_platform_accounts_decode_result(self) -> int:
  381. """提取解构任务结果"""
  382. task = ExtractDecodeTaskDetail(
  383. pool=self.db_client, log_service=self.log_client
  384. )
  385. await task.deal()
  386. return TaskStatus.SUCCESS
  387. # ====================== Recommend Tasks=====================
  388. @register("i2i_recommend_data_sync")
  389. async def _i2i_recommend_data_sync_handler(self) -> int:
  390. task = I2IRecommendDataSyncTask(
  391. pool=self.db_client, log_service=self.log_client
  392. )
  393. await task.deal()
  394. return TaskStatus.SUCCESS
  395. __all__ = ["TaskHandler"]