| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417 |
- from datetime import datetime
- from typing import Callable, Awaitable, Dict, Any, Optional
- from applications.tasks.analysis_task import CrawlerDetailDeal
- from applications.tasks.analysis_task import AccountPositionReadRateAvg
- from applications.tasks.analysis_task import AccountPositionReadAvg
- from applications.tasks.analysis_task import AccountPositionOpenRateAvg
- from applications.tasks.algorithm_tasks import AccountCategoryAnalysis
- from applications.tasks.cold_start_tasks import ArticlePoolColdStart
- from applications.tasks.crawler_tasks import CrawlerToutiao
- from applications.tasks.crawler_tasks import WeixinAccountManager
- from applications.tasks.crawler_tasks import CrawlerGzhAccountArticles
- from applications.tasks.crawler_tasks import CrawlerGzhSearchArticles
- from applications.tasks.data_recycle_tasks import ArticleDetailStat
- from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
- from applications.tasks.data_recycle_tasks import RecycleOutsideAccountArticlesTask
- from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
- from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
- from applications.tasks.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask
- from applications.tasks.data_recycle_tasks import RecycleMiniProgramDetailTask
- from applications.tasks.data_recycle_tasks import (
- UpdateOutsideRootSourceIdAndUpdateTimeTask,
- )
- from applications.tasks.llm_tasks import TitleRewrite
- from applications.tasks.llm_tasks import ArticlePoolCategoryGeneration
- from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
- from applications.tasks.llm_tasks import ExtractTitleFeatures
- from applications.tasks.monitor_tasks import AutoReplyCardsMonitor
- from applications.tasks.monitor_tasks import check_kimi_balance
- from applications.tasks.monitor_tasks import GetOffVideos
- from applications.tasks.monitor_tasks import CheckVideoAuditStatus
- from applications.tasks.monitor_tasks import CooperateAccountsMonitorTask
- from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
- from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
- from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
- from applications.tasks.monitor_tasks import TaskProcessingMonitor
- from applications.tasks.monitor_tasks import LimitedAccountAnalysisTask
- from applications.tasks.task_config import TaskStatus
- from applications.tasks.task_utils import TaskValidationError
- _TASK_HANDLER_REGISTRY: Dict[str, Callable] = {}
- def register(task_name: str):
- def decorator(func):
- _TASK_HANDLER_REGISTRY[task_name] = func
- return func
- return decorator
- class TaskHandler:
- """任务处理器基类 - 使用装饰器模式自动注册任务"""
- # 任务注册表
- _handlers = _TASK_HANDLER_REGISTRY
- def __init__(self, data: dict, log_service, db_client, trace_id: str):
- self.data = data
- self.log_client = log_service
- self.db_client = db_client
- self.trace_id = trace_id
- @classmethod
- def get_handler(cls, task_name: str) -> Optional[Callable]:
- """获取任务处理器"""
- return cls._handlers.get(task_name)
- @classmethod
- def list_registered_tasks(cls) -> list:
- """列出所有已注册的任务"""
- return list(cls._handlers.keys())
- async def _log_task_event(self, event_type: str, **kwargs):
- """统一的任务日志记录"""
- log_data = {
- "timestamp": datetime.now().isoformat(),
- "trace_id": self.trace_id,
- "event_type": event_type,
- "task": self.data.get("task_name"),
- **kwargs,
- }
- await self.log_client.log(contents=log_data)
- # ==================== 监控类任务 ====================
- @register("check_kimi_balance")
- async def _check_kimi_balance_handler(self) -> int:
- """检查 Kimi 余额"""
- response = await check_kimi_balance()
- await self._log_task_event("kimi_balance_checked", data=response)
- return TaskStatus.SUCCESS
- @register("get_off_videos")
- async def _get_off_videos_task_handler(self) -> int:
- """视频下架任务"""
- sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
- return await sub_task.deal()
- @register("check_publish_video_audit_status")
- async def _check_video_audit_status_handler(self) -> int:
- """检查视频审核状态"""
- sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
- return await sub_task.deal()
- @register("task_processing_monitor")
- async def _task_processing_monitor_handler(self) -> int:
- """任务处理监控"""
- sub_task = TaskProcessingMonitor(self.db_client)
- await sub_task.deal()
- return TaskStatus.SUCCESS
- @register("inner_article_monitor")
- async def _inner_gzh_articles_monitor_handler(self) -> int:
- """内部公众号文章监控"""
- sub_task = InnerGzhArticlesMonitor(self.db_client)
- return await sub_task.deal()
- @register("outside_article_monitor")
- async def _outside_monitor_handler(self) -> int:
- """外部文章监控"""
- collector = OutsideGzhArticlesCollector(self.db_client)
- await collector.deal()
- monitor = OutsideGzhArticlesMonitor(self.db_client)
- return await monitor.deal()
- @register("cooperate_accounts_monitor")
- async def _cooperate_accounts_monitor_handler(self) -> int:
- """合作账号文章监测"""
- task = CooperateAccountsMonitorTask(
- pool=self.db_client, log_client=self.log_client
- )
- await task.deal(task_name="save_articles")
- return TaskStatus.SUCCESS
- @register("cooperate_accounts_detail")
- async def _cooperate_accounts_detail_handler(self) -> int:
- """合作账号文章详情更新"""
- task = CooperateAccountsMonitorTask(
- pool=self.db_client, log_client=self.log_client
- )
- await task.deal(task_name="get_detail")
- return TaskStatus.SUCCESS
- # ==================== 爬虫类任务 ====================
- @register("crawler_toutiao")
- async def _crawler_toutiao_handler(self) -> int:
- """头条文章/视频抓取"""
- sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
- method = self.data.get("method", "account")
- media_type = self.data.get("media_type", "article")
- category_list = self.data.get("category_list", [])
- match method:
- case "account":
- await sub_task.crawler_task(media_type=media_type)
- case "recommend":
- await sub_task.crawl_toutiao_recommend_task(category_list)
- case "search":
- await sub_task.search_candidate_accounts()
- case _:
- raise TaskValidationError(f"Unsupported method: {method}")
- return TaskStatus.SUCCESS
- @register("crawler_gzh_articles")
- async def _crawler_gzh_article_handler(self) -> int:
- """抓取公众号文章"""
- account_method = self.data.get("account_method")
- crawl_mode = self.data.get("crawl_mode")
- strategy = self.data.get("strategy")
- match crawl_mode:
- case "account":
- task = CrawlerGzhAccountArticles(
- self.db_client, self.log_client, self.trace_id
- )
- await task.deal(account_method, strategy)
- case "search":
- task = CrawlerGzhSearchArticles(
- self.db_client, self.log_client, self.trace_id
- )
- await task.deal(strategy)
- case _:
- raise TaskValidationError(f"Unsupported crawl mode: {crawl_mode}")
- return TaskStatus.SUCCESS
- @register("crawler_account_manager")
- async def _crawler_account_manager_handler(self) -> int:
- """账号管理任务"""
- platform = self.data.get("platform", "weixin")
- account_id_list = self.data.get("account_id_list")
- match platform:
- case "weixin":
- task = WeixinAccountManager(
- self.db_client, self.log_client, self.trace_id
- )
- await task.deal(platform=platform, account_id_list=account_id_list)
- case _:
- raise TaskValidationError(f"Unsupported platform: {platform}")
- return TaskStatus.SUCCESS
- @register("crawler_detail_analysis")
- async def _crawler_article_analysis_handler(self) -> int:
- """抓取视频/文章详情分析统计"""
- task = CrawlerDetailDeal(pool=self.db_client, trace_id=self.trace_id)
- await task.deal(params=self.data)
- return TaskStatus.SUCCESS
- # ==================== 数据回收类任务 ====================
- @register("article_detail_stat")
- async def _article_detail_stat_handler(self) -> int:
- """文章详情统计"""
- task = ArticleDetailStat(self.db_client, self.log_client)
- await task.deal()
- return TaskStatus.SUCCESS
- @register("daily_publish_articles_recycle")
- async def _recycle_article_data_handler(self) -> int:
- """每日发文数据回收"""
- date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
- recycle = RecycleDailyPublishArticlesTask(
- self.db_client, self.log_client, date_str
- )
- await recycle.deal()
- check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
- await check.deal()
- return TaskStatus.SUCCESS
- @register("update_root_source_id")
- async def _update_root_source_id_handler(self) -> int:
- """更新 root_source_id"""
- sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
- await sub_task.deal()
- return TaskStatus.SUCCESS
- @register("recycle_outside_account_articles")
- async def _recycle_outside_account_article_handler(self) -> int:
- """回收外部账号文章"""
- date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
- task = RecycleOutsideAccountArticlesTask(
- pool=self.db_client, log_client=self.log_client, date_string=date_str
- )
- await task.deal()
- return TaskStatus.SUCCESS
- @register("update_outside_account_article_root_source_id")
- async def _update_outside_account_article_root_source_id_handler(self) -> int:
- """更新外部账号文章的 root_source_id"""
- task = UpdateOutsideRootSourceIdAndUpdateTimeTask(
- pool=self.db_client, log_client=self.log_client
- )
- await task.deal()
- return TaskStatus.SUCCESS
- @register("fwh_daily_recycle")
- async def _recycle_fwh_article_handler(self) -> int:
- """回收服务号文章"""
- task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
- await task.deal()
- return TaskStatus.SUCCESS
- # ==================== 算法类任务 ====================
- @register("article_pool_cold_start")
- async def _article_pool_cold_start_handler(self) -> int:
- """文章池冷启动"""
- cold_start = ArticlePoolColdStart(
- self.db_client, self.log_client, self.trace_id
- )
- platform = self.data.get("platform", "weixin")
- crawler_methods = self.data.get("crawler_methods", [])
- category_list = self.data.get("category_list", [])
- strategy = self.data.get("strategy", "strategy_v1")
- await cold_start.deal(
- platform=platform,
- crawl_methods=crawler_methods,
- category_list=category_list,
- strategy=strategy,
- )
- return TaskStatus.SUCCESS
- @register("candidate_account_quality_analysis")
- async def _candidate_account_quality_score_handler(self) -> int:
- """候选账号质量分析"""
- task = CandidateAccountQualityScoreRecognizer(
- self.db_client, self.log_client, self.trace_id
- )
- await task.deal()
- return TaskStatus.SUCCESS
- @register("article_pool_category_generation")
- async def _article_pool_category_generation_handler(self) -> int:
- """文章池品类生成"""
- task = ArticlePoolCategoryGeneration(
- self.db_client, self.log_client, self.trace_id
- )
- limit_num = self.data.get("limit")
- await task.deal(limit=limit_num)
- return TaskStatus.SUCCESS
- @register("account_category_analysis")
- async def _account_category_analysis_handler(self) -> int:
- """账号品类分析"""
- task = AccountCategoryAnalysis(
- pool=self.db_client,
- log_client=self.log_client,
- trace_id=self.trace_id,
- data=self.data,
- date_string=None,
- )
- await task.deal()
- return TaskStatus.SUCCESS
- @register("update_limited_account_info")
- async def _update_limited_account_info_handler(self) -> int:
- """更新限流账号信息"""
- task = LimitedAccountAnalysisTask(
- pool=self.db_client, log_client=self.log_client
- )
- await task.deal(date_string=self.data.get("date_string"))
- return TaskStatus.SUCCESS
- # ==================== LLM 类任务 ====================
- @register("title_rewrite")
- async def _title_rewrite_handler(self) -> int:
- """标题重写"""
- sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id)
- return await sub_task.deal()
- @register("extract_title_features")
- async def _extract_title_features_handler(self) -> int:
- """提取标题特征"""
- task = ExtractTitleFeatures(self.db_client, self.log_client, self.trace_id)
- await task.deal(data=self.data)
- return TaskStatus.SUCCESS
- # ==================== 统计分析类任务 ====================
- @register("update_account_read_rate_avg")
- async def _update_account_read_rate_avg_handler(self) -> int:
- """更新账号阅读率均值"""
- task = AccountPositionReadRateAvg(
- pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
- )
- await task.deal(end_date=self.data.get("end_date"))
- return TaskStatus.SUCCESS
- @register("update_account_read_avg")
- async def _update_account_read_avg_handler(self) -> int:
- """更新账号阅读均值"""
- task = AccountPositionReadAvg(
- pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
- )
- await task.deal(end_date=self.data.get("end_date"))
- return TaskStatus.SUCCESS
- @register("update_account_open_rate_avg")
- async def _update_account_open_rate_avg_handler(self) -> int:
- """更新账号打开率均值"""
- task = AccountPositionOpenRateAvg(
- pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
- )
- await task.deal(date_string=self.data.get("date_string"))
- return TaskStatus.SUCCESS
- # ==================== 自动化任务 ====================
- @register("auto_follow_account")
- async def _auto_follow_account_handler(self) -> int:
- """自动关注公众号"""
- task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
- await task.deal(task_name="follow_gzh_task")
- return TaskStatus.SUCCESS
- @register("get_follow_result")
- async def _get_follow_result_handler(self) -> int:
- """获取自动关注回复"""
- task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
- await task.deal(task_name="get_auto_reply_task")
- return TaskStatus.SUCCESS
- @register("extract_reply_result")
- async def _extract_reply_result_handler(self) -> int:
- """解析自动回复结果"""
- task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
- await task.deal(task_name="extract_task")
- return TaskStatus.SUCCESS
- # ==================== 其他任务 ====================
- @register("mini_program_detail_process")
- async def _mini_program_detail_handler(self) -> int:
- """更新小程序裂变信息"""
- task = RecycleMiniProgramDetailTask(
- pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
- )
- await task.deal(params=self.data)
- return TaskStatus.SUCCESS
- __all__ = ["TaskHandler"]
|