task_handler.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. from datetime import datetime
  2. from typing import Callable, Awaitable, Dict, Any, Optional
  3. from applications.tasks.analysis_task import CrawlerDetailDeal
  4. from applications.tasks.analysis_task import AccountPositionReadRateAvg
  5. from applications.tasks.analysis_task import AccountPositionReadAvg
  6. from applications.tasks.analysis_task import AccountPositionOpenRateAvg
  7. from applications.tasks.algorithm_tasks import AccountCategoryAnalysis
  8. from applications.tasks.cold_start_tasks import ArticlePoolColdStart
  9. from applications.tasks.crawler_tasks import CrawlerToutiao
  10. from applications.tasks.crawler_tasks import WeixinAccountManager
  11. from applications.tasks.crawler_tasks import CrawlerGzhAccountArticles
  12. from applications.tasks.crawler_tasks import CrawlerGzhSearchArticles
  13. from applications.tasks.data_recycle_tasks import ArticleDetailStat
  14. from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
  15. from applications.tasks.data_recycle_tasks import RecycleOutsideAccountArticlesTask
  16. from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
  17. from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
  18. from applications.tasks.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask
  19. from applications.tasks.data_recycle_tasks import RecycleMiniProgramDetailTask
  20. from applications.tasks.data_recycle_tasks import (
  21. UpdateOutsideRootSourceIdAndUpdateTimeTask,
  22. )
  23. from applications.tasks.llm_tasks import TitleRewrite
  24. from applications.tasks.llm_tasks import ArticlePoolCategoryGeneration
  25. from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
  26. from applications.tasks.llm_tasks import ExtractTitleFeatures
  27. from applications.tasks.monitor_tasks import AutoReplyCardsMonitor
  28. from applications.tasks.monitor_tasks import check_kimi_balance
  29. from applications.tasks.monitor_tasks import GetOffVideos
  30. from applications.tasks.monitor_tasks import CheckVideoAuditStatus
  31. from applications.tasks.monitor_tasks import CooperateAccountsMonitorTask
  32. from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
  33. from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
  34. from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
  35. from applications.tasks.monitor_tasks import TaskProcessingMonitor
  36. from applications.tasks.monitor_tasks import LimitedAccountAnalysisTask
  37. from applications.tasks.task_config import TaskStatus
  38. from applications.tasks.task_utils import TaskValidationError
  39. _TASK_HANDLER_REGISTRY: Dict[str, Callable] = {}
  40. def register(task_name: str):
  41. def decorator(func):
  42. _TASK_HANDLER_REGISTRY[task_name] = func
  43. return func
  44. return decorator
  45. class TaskHandler:
  46. """任务处理器基类 - 使用装饰器模式自动注册任务"""
  47. # 任务注册表
  48. _handlers = _TASK_HANDLER_REGISTRY
  49. def __init__(self, data: dict, log_service, db_client, trace_id: str):
  50. self.data = data
  51. self.log_client = log_service
  52. self.db_client = db_client
  53. self.trace_id = trace_id
  54. @classmethod
  55. def get_handler(cls, task_name: str) -> Optional[Callable]:
  56. """获取任务处理器"""
  57. return cls._handlers.get(task_name)
  58. @classmethod
  59. def list_registered_tasks(cls) -> list:
  60. """列出所有已注册的任务"""
  61. return list(cls._handlers.keys())
  62. async def _log_task_event(self, event_type: str, **kwargs):
  63. """统一的任务日志记录"""
  64. log_data = {
  65. "timestamp": datetime.now().isoformat(),
  66. "trace_id": self.trace_id,
  67. "event_type": event_type,
  68. "task": self.data.get("task_name"),
  69. **kwargs,
  70. }
  71. await self.log_client.log(contents=log_data)
  72. # ==================== 监控类任务 ====================
  73. @register("check_kimi_balance")
  74. async def _check_kimi_balance_handler(self) -> int:
  75. """检查 Kimi 余额"""
  76. response = await check_kimi_balance()
  77. await self._log_task_event("kimi_balance_checked", data=response)
  78. return TaskStatus.SUCCESS
  79. @register("get_off_videos")
  80. async def _get_off_videos_task_handler(self) -> int:
  81. """视频下架任务"""
  82. sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
  83. return await sub_task.deal()
  84. @register("check_publish_video_audit_status")
  85. async def _check_video_audit_status_handler(self) -> int:
  86. """检查视频审核状态"""
  87. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
  88. return await sub_task.deal()
  89. @register("task_processing_monitor")
  90. async def _task_processing_monitor_handler(self) -> int:
  91. """任务处理监控"""
  92. sub_task = TaskProcessingMonitor(self.db_client)
  93. await sub_task.deal()
  94. return TaskStatus.SUCCESS
  95. @register("inner_article_monitor")
  96. async def _inner_gzh_articles_monitor_handler(self) -> int:
  97. """内部公众号文章监控"""
  98. sub_task = InnerGzhArticlesMonitor(self.db_client)
  99. return await sub_task.deal()
  100. @register("outside_article_monitor")
  101. async def _outside_monitor_handler(self) -> int:
  102. """外部文章监控"""
  103. collector = OutsideGzhArticlesCollector(self.db_client)
  104. await collector.deal()
  105. monitor = OutsideGzhArticlesMonitor(self.db_client)
  106. return await monitor.deal()
  107. @register("cooperate_accounts_monitor")
  108. async def _cooperate_accounts_monitor_handler(self) -> int:
  109. """合作账号文章监测"""
  110. task = CooperateAccountsMonitorTask(
  111. pool=self.db_client, log_client=self.log_client
  112. )
  113. await task.deal(task_name="save_articles")
  114. return TaskStatus.SUCCESS
  115. @register("cooperate_accounts_detail")
  116. async def _cooperate_accounts_detail_handler(self) -> int:
  117. """合作账号文章详情更新"""
  118. task = CooperateAccountsMonitorTask(
  119. pool=self.db_client, log_client=self.log_client
  120. )
  121. await task.deal(task_name="get_detail")
  122. return TaskStatus.SUCCESS
  123. # ==================== 爬虫类任务 ====================
  124. @register("crawler_toutiao")
  125. async def _crawler_toutiao_handler(self) -> int:
  126. """头条文章/视频抓取"""
  127. sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
  128. method = self.data.get("method", "account")
  129. media_type = self.data.get("media_type", "article")
  130. category_list = self.data.get("category_list", [])
  131. match method:
  132. case "account":
  133. await sub_task.crawler_task(media_type=media_type)
  134. case "recommend":
  135. await sub_task.crawl_toutiao_recommend_task(category_list)
  136. case "search":
  137. await sub_task.search_candidate_accounts()
  138. case _:
  139. raise TaskValidationError(f"Unsupported method: {method}")
  140. return TaskStatus.SUCCESS
  141. @register("crawler_gzh_articles")
  142. async def _crawler_gzh_article_handler(self) -> int:
  143. """抓取公众号文章"""
  144. account_method = self.data.get("account_method")
  145. crawl_mode = self.data.get("crawl_mode")
  146. strategy = self.data.get("strategy")
  147. match crawl_mode:
  148. case "account":
  149. task = CrawlerGzhAccountArticles(
  150. self.db_client, self.log_client, self.trace_id
  151. )
  152. await task.deal(account_method, strategy)
  153. case "search":
  154. task = CrawlerGzhSearchArticles(
  155. self.db_client, self.log_client, self.trace_id
  156. )
  157. await task.deal(strategy)
  158. case _:
  159. raise TaskValidationError(f"Unsupported crawl mode: {crawl_mode}")
  160. return TaskStatus.SUCCESS
  161. @register("crawler_account_manager")
  162. async def _crawler_account_manager_handler(self) -> int:
  163. """账号管理任务"""
  164. platform = self.data.get("platform", "weixin")
  165. account_id_list = self.data.get("account_id_list")
  166. match platform:
  167. case "weixin":
  168. task = WeixinAccountManager(
  169. self.db_client, self.log_client, self.trace_id
  170. )
  171. await task.deal(platform=platform, account_id_list=account_id_list)
  172. case _:
  173. raise TaskValidationError(f"Unsupported platform: {platform}")
  174. return TaskStatus.SUCCESS
  175. @register("crawler_detail_analysis")
  176. async def _crawler_article_analysis_handler(self) -> int:
  177. """抓取视频/文章详情分析统计"""
  178. task = CrawlerDetailDeal(pool=self.db_client, trace_id=self.trace_id)
  179. await task.deal(params=self.data)
  180. return TaskStatus.SUCCESS
  181. # ==================== 数据回收类任务 ====================
  182. @register("article_detail_stat")
  183. async def _article_detail_stat_handler(self) -> int:
  184. """文章详情统计"""
  185. task = ArticleDetailStat(self.db_client, self.log_client)
  186. await task.deal()
  187. return TaskStatus.SUCCESS
  188. @register("daily_publish_articles_recycle")
  189. async def _recycle_article_data_handler(self) -> int:
  190. """每日发文数据回收"""
  191. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  192. recycle = RecycleDailyPublishArticlesTask(
  193. self.db_client, self.log_client, date_str
  194. )
  195. await recycle.deal()
  196. check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
  197. await check.deal()
  198. return TaskStatus.SUCCESS
  199. @register("update_root_source_id")
  200. async def _update_root_source_id_handler(self) -> int:
  201. """更新 root_source_id"""
  202. sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
  203. await sub_task.deal()
  204. return TaskStatus.SUCCESS
  205. @register("recycle_outside_account_articles")
  206. async def _recycle_outside_account_article_handler(self) -> int:
  207. """回收外部账号文章"""
  208. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  209. task = RecycleOutsideAccountArticlesTask(
  210. pool=self.db_client, log_client=self.log_client, date_string=date_str
  211. )
  212. await task.deal()
  213. return TaskStatus.SUCCESS
  214. @register("update_outside_account_article_root_source_id")
  215. async def _update_outside_account_article_root_source_id_handler(self) -> int:
  216. """更新外部账号文章的 root_source_id"""
  217. task = UpdateOutsideRootSourceIdAndUpdateTimeTask(
  218. pool=self.db_client, log_client=self.log_client
  219. )
  220. await task.deal()
  221. return TaskStatus.SUCCESS
  222. @register("fwh_daily_recycle")
  223. async def _recycle_fwh_article_handler(self) -> int:
  224. """回收服务号文章"""
  225. task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
  226. await task.deal()
  227. return TaskStatus.SUCCESS
  228. # ==================== 算法类任务 ====================
  229. @register("article_pool_cold_start")
  230. async def _article_pool_cold_start_handler(self) -> int:
  231. """文章池冷启动"""
  232. cold_start = ArticlePoolColdStart(
  233. self.db_client, self.log_client, self.trace_id
  234. )
  235. platform = self.data.get("platform", "weixin")
  236. crawler_methods = self.data.get("crawler_methods", [])
  237. category_list = self.data.get("category_list", [])
  238. strategy = self.data.get("strategy", "strategy_v1")
  239. await cold_start.deal(
  240. platform=platform,
  241. crawl_methods=crawler_methods,
  242. category_list=category_list,
  243. strategy=strategy,
  244. )
  245. return TaskStatus.SUCCESS
  246. @register("candidate_account_quality_analysis")
  247. async def _candidate_account_quality_score_handler(self) -> int:
  248. """候选账号质量分析"""
  249. task = CandidateAccountQualityScoreRecognizer(
  250. self.db_client, self.log_client, self.trace_id
  251. )
  252. await task.deal()
  253. return TaskStatus.SUCCESS
  254. @register("article_pool_category_generation")
  255. async def _article_pool_category_generation_handler(self) -> int:
  256. """文章池品类生成"""
  257. task = ArticlePoolCategoryGeneration(
  258. self.db_client, self.log_client, self.trace_id
  259. )
  260. limit_num = self.data.get("limit")
  261. await task.deal(limit=limit_num)
  262. return TaskStatus.SUCCESS
  263. @register("account_category_analysis")
  264. async def _account_category_analysis_handler(self) -> int:
  265. """账号品类分析"""
  266. task = AccountCategoryAnalysis(
  267. pool=self.db_client,
  268. log_client=self.log_client,
  269. trace_id=self.trace_id,
  270. data=self.data,
  271. date_string=None,
  272. )
  273. await task.deal()
  274. return TaskStatus.SUCCESS
  275. @register("update_limited_account_info")
  276. async def _update_limited_account_info_handler(self) -> int:
  277. """更新限流账号信息"""
  278. task = LimitedAccountAnalysisTask(
  279. pool=self.db_client, log_client=self.log_client
  280. )
  281. await task.deal(date_string=self.data.get("date_string"))
  282. return TaskStatus.SUCCESS
  283. # ==================== LLM 类任务 ====================
  284. @register("title_rewrite")
  285. async def _title_rewrite_handler(self) -> int:
  286. """标题重写"""
  287. sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id)
  288. return await sub_task.deal()
  289. @register("extract_title_features")
  290. async def _extract_title_features_handler(self) -> int:
  291. """提取标题特征"""
  292. task = ExtractTitleFeatures(self.db_client, self.log_client, self.trace_id)
  293. await task.deal(data=self.data)
  294. return TaskStatus.SUCCESS
  295. # ==================== 统计分析类任务 ====================
  296. @register("update_account_read_rate_avg")
  297. async def _update_account_read_rate_avg_handler(self) -> int:
  298. """更新账号阅读率均值"""
  299. task = AccountPositionReadRateAvg(
  300. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  301. )
  302. await task.deal(end_date=self.data.get("end_date"))
  303. return TaskStatus.SUCCESS
  304. @register("update_account_read_avg")
  305. async def _update_account_read_avg_handler(self) -> int:
  306. """更新账号阅读均值"""
  307. task = AccountPositionReadAvg(
  308. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  309. )
  310. await task.deal(end_date=self.data.get("end_date"))
  311. return TaskStatus.SUCCESS
  312. @register("update_account_open_rate_avg")
  313. async def _update_account_open_rate_avg_handler(self) -> int:
  314. """更新账号打开率均值"""
  315. task = AccountPositionOpenRateAvg(
  316. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  317. )
  318. await task.deal(date_string=self.data.get("date_string"))
  319. return TaskStatus.SUCCESS
  320. # ==================== 自动化任务 ====================
  321. @register("auto_follow_account")
  322. async def _auto_follow_account_handler(self) -> int:
  323. """自动关注公众号"""
  324. task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
  325. await task.deal(task_name="follow_gzh_task")
  326. return TaskStatus.SUCCESS
  327. @register("get_follow_result")
  328. async def _get_follow_result_handler(self) -> int:
  329. """获取自动关注回复"""
  330. task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
  331. await task.deal(task_name="get_auto_reply_task")
  332. return TaskStatus.SUCCESS
  333. @register("extract_reply_result")
  334. async def _extract_reply_result_handler(self) -> int:
  335. """解析自动回复结果"""
  336. task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
  337. await task.deal(task_name="extract_task")
  338. return TaskStatus.SUCCESS
  339. # ==================== 其他任务 ====================
  340. @register("mini_program_detail_process")
  341. async def _mini_program_detail_handler(self) -> int:
  342. """更新小程序裂变信息"""
  343. task = RecycleMiniProgramDetailTask(
  344. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  345. )
  346. await task.deal(params=self.data)
  347. return TaskStatus.SUCCESS
  348. __all__ = ["TaskHandler"]