from datetime import datetime from typing import Callable, Dict, Optional from app.core.config import GlobalConfigSettings from app.domains.analysis_task import CrawlerDetailDeal from app.domains.analysis_task import AccountPositionReadRateAvg from app.domains.analysis_task import AccountPositionReadAvg from app.domains.analysis_task import AccountPositionOpenRateAvg from app.domains.analysis_task import RateLimitedArticleFilter from app.domains.algorithm_tasks import AccountCategoryAnalysis from app.domains.cold_start_tasks import ArticlePoolColdStart from app.domains.crawler_tasks import CrawlerToutiao from app.domains.crawler_tasks import WeixinAccountManager from app.domains.crawler_tasks import CrawlerGzhAccountArticles from app.domains.crawler_tasks import CrawlerGzhSearchArticles from app.domains.data_recycle_tasks import ArticleDetailStat from app.domains.data_recycle_tasks import RecycleDailyPublishArticlesTask from app.domains.data_recycle_tasks import RecycleOutsideAccountArticlesTask from app.domains.data_recycle_tasks import CheckDailyPublishArticlesTask from app.domains.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask from app.domains.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask from app.domains.data_recycle_tasks import RecycleMiniProgramDetailTask from app.domains.data_recycle_tasks import ( UpdateOutsideRootSourceIdAndUpdateTimeTask, ) from app.domains.llm_tasks import TitleRewrite from app.domains.llm_tasks import ArticlePoolCategoryGeneration from app.domains.llm_tasks import CandidateAccountQualityScoreRecognizer from app.domains.llm_tasks import ExtractTitleFeatures from app.domains.monitor_tasks import AutoReplyCardsMonitor from app.domains.monitor_tasks import check_kimi_balance from app.domains.monitor_tasks import GetOffVideos from app.domains.monitor_tasks import CheckVideoAuditStatus from app.domains.monitor_tasks import CooperateAccountsMonitorTask from app.domains.monitor_tasks import InnerGzhArticlesMonitor from app.domains.monitor_tasks import OutsideGzhArticlesMonitor from app.domains.monitor_tasks import OutsideGzhArticlesCollector from app.domains.monitor_tasks import TaskProcessingMonitor from app.domains.monitor_tasks import LimitedAccountAnalysisTask from app.jobs.task_config import TaskStatus from app.jobs.task_utils import TaskValidationError _TASK_HANDLER_REGISTRY: Dict[str, Callable] = {} def register(task_name: str): def decorator(func): _TASK_HANDLER_REGISTRY[task_name] = func return func return decorator class TaskHandler: """任务处理器基类 - 使用装饰器模式自动注册任务""" # 任务注册表 _handlers = _TASK_HANDLER_REGISTRY def __init__( self, data: dict, log_service, db_client, trace_id: str, config: GlobalConfigSettings, ): self.data = data self.log_client = log_service self.db_client = db_client self.trace_id = trace_id self.config = config @classmethod def get_handler(cls, task_name: str) -> Optional[Callable]: """获取任务处理器""" return cls._handlers.get(task_name) @classmethod def list_registered_tasks(cls) -> list: """列出所有已注册的任务""" return list(cls._handlers.keys()) async def _log_task_event(self, event_type: str, **kwargs): """统一的任务日志记录""" log_data = { "timestamp": datetime.now().isoformat(), "trace_id": self.trace_id, "event_type": event_type, "task": self.data.get("task_name"), **kwargs, } await self.log_client.log(contents=log_data) # ==================== 监控类任务 ==================== @register("check_kimi_balance") async def _check_kimi_balance_handler(self) -> int: """检查 Kimi 余额""" response = await check_kimi_balance() await self._log_task_event("kimi_balance_checked", data=response) return TaskStatus.SUCCESS @register("get_off_videos") async def _get_off_videos_task_handler(self) -> int: """视频下架任务""" sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id) return await sub_task.deal() @register("check_publish_video_audit_status") async def _check_video_audit_status_handler(self) -> int: """检查视频审核状态""" sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id) return await sub_task.deal() @register("task_processing_monitor") async def _task_processing_monitor_handler(self) -> int: """任务处理监控""" sub_task = TaskProcessingMonitor(self.db_client) await sub_task.deal() return TaskStatus.SUCCESS @register("inner_article_monitor") async def _inner_gzh_articles_monitor_handler(self) -> int: """内部公众号文章监控""" sub_task = InnerGzhArticlesMonitor(self.db_client) return await sub_task.deal() @register("outside_article_monitor") async def _outside_monitor_handler(self) -> int: """外部文章监控""" collector = OutsideGzhArticlesCollector(self.db_client) await collector.deal() monitor = OutsideGzhArticlesMonitor(self.db_client) return await monitor.deal() @register("cooperate_accounts_monitor") async def _cooperate_accounts_monitor_handler(self) -> int: """合作账号文章监测""" task = CooperateAccountsMonitorTask( pool=self.db_client, log_client=self.log_client ) await task.deal(task_name="save_articles") return TaskStatus.SUCCESS @register("cooperate_accounts_detail") async def _cooperate_accounts_detail_handler(self) -> int: """合作账号文章详情更新""" task = CooperateAccountsMonitorTask( pool=self.db_client, log_client=self.log_client ) await task.deal(task_name="get_detail") return TaskStatus.SUCCESS # ==================== 爬虫类任务 ==================== @register("crawler_toutiao") async def _crawler_toutiao_handler(self) -> int: """头条文章/视频抓取""" sub_task = CrawlerToutiao( self.db_client, self.log_client, self.trace_id, self.config ) method = self.data.get("method", "account") media_type = self.data.get("media_type", "article") category_list = self.data.get("category_list", []) match method: case "account": await sub_task.crawler_task(media_type=media_type) case "recommend": await sub_task.crawl_toutiao_recommend_task(category_list) case "search": await sub_task.search_candidate_accounts() case _: raise TaskValidationError(f"Unsupported method: {method}") return TaskStatus.SUCCESS @register("crawler_gzh_articles") async def _crawler_gzh_article_handler(self) -> int: """抓取公众号文章""" account_method = self.data.get("account_method") crawl_mode = self.data.get("crawl_mode") strategy = self.data.get("strategy") match crawl_mode: case "account": task = CrawlerGzhAccountArticles( self.db_client, self.log_client, self.trace_id, self.config ) await task.deal(account_method, strategy) case "search": task = CrawlerGzhSearchArticles( self.db_client, self.log_client, self.trace_id, self.config ) await task.deal(strategy) case _: raise TaskValidationError(f"Unsupported crawl mode: {crawl_mode}") return TaskStatus.SUCCESS @register("crawler_account_manager") async def _crawler_account_manager_handler(self) -> int: """账号管理任务""" platform = self.data.get("platform", "weixin") account_id_list = self.data.get("account_id_list") match platform: case "weixin": task = WeixinAccountManager( self.db_client, self.log_client, self.trace_id ) await task.deal(platform=platform, account_id_list=account_id_list) case _: raise TaskValidationError(f"Unsupported platform: {platform}") return TaskStatus.SUCCESS @register("crawler_detail_analysis") async def _crawler_article_analysis_handler(self) -> int: """抓取视频/文章详情分析统计""" task = CrawlerDetailDeal(pool=self.db_client, trace_id=self.trace_id) await task.deal(params=self.data) return TaskStatus.SUCCESS # ==================== 数据回收类任务 ==================== @register("article_detail_stat") async def _article_detail_stat_handler(self) -> int: """文章详情统计""" task = ArticleDetailStat(self.db_client, self.log_client) await task.deal() return TaskStatus.SUCCESS @register("daily_publish_articles_recycle") async def _recycle_article_data_handler(self) -> int: """每日发文数据回收""" date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d") recycle = RecycleDailyPublishArticlesTask( self.db_client, self.log_client, date_str ) await recycle.deal() check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str) await check.deal() return TaskStatus.SUCCESS @register("update_root_source_id") async def _update_root_source_id_handler(self) -> int: """更新 root_source_id""" sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client) await sub_task.deal() return TaskStatus.SUCCESS @register("recycle_outside_account_articles") async def _recycle_outside_account_article_handler(self) -> int: """回收外部账号文章""" date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d") task = RecycleOutsideAccountArticlesTask( pool=self.db_client, log_client=self.log_client, date_string=date_str ) await task.deal() return TaskStatus.SUCCESS @register("update_outside_account_article_root_source_id") async def _update_outside_account_article_root_source_id_handler(self) -> int: """更新外部账号文章的 root_source_id""" task = UpdateOutsideRootSourceIdAndUpdateTimeTask( pool=self.db_client, log_client=self.log_client ) await task.deal() return TaskStatus.SUCCESS @register("fwh_daily_recycle") async def _recycle_fwh_article_handler(self) -> int: """回收服务号文章""" task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client) await task.deal() return TaskStatus.SUCCESS # ==================== 算法类任务 ==================== @register("article_pool_cold_start") async def _article_pool_cold_start_handler(self) -> int: """文章池冷启动""" cold_start = ArticlePoolColdStart( self.db_client, self.log_client, self.trace_id, self.config ) platform = self.data.get("platform", "weixin") crawler_methods = self.data.get("crawler_methods", []) category_list = self.data.get("category_list", []) strategy = self.data.get("strategy", "strategy_v1") await cold_start.deal( platform=platform, crawl_methods=crawler_methods, category_list=category_list, strategy=strategy, ) return TaskStatus.SUCCESS @register("candidate_account_quality_analysis") async def _candidate_account_quality_score_handler(self) -> int: """候选账号质量分析""" task = CandidateAccountQualityScoreRecognizer( self.db_client, self.log_client, self.trace_id ) await task.deal() return TaskStatus.SUCCESS @register("article_pool_category_generation") async def _article_pool_category_generation_handler(self) -> int: """文章池品类生成""" task = ArticlePoolCategoryGeneration( self.db_client, self.log_client, self.trace_id ) limit_num = self.data.get("limit") await task.deal(limit=limit_num) return TaskStatus.SUCCESS @register("account_category_analysis") async def _account_category_analysis_handler(self) -> int: """账号品类分析""" task = AccountCategoryAnalysis( pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id, data=self.data, date_string=None, ) await task.deal() return TaskStatus.SUCCESS @register("update_limited_account_info") async def _update_limited_account_info_handler(self) -> int: """更新限流账号信息""" task = LimitedAccountAnalysisTask( pool=self.db_client, log_client=self.log_client ) await task.deal(date_string=self.data.get("date_string")) return TaskStatus.SUCCESS # ==================== LLM 类任务 ==================== @register("title_rewrite") async def _title_rewrite_handler(self) -> int: """标题重写""" sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id) return await sub_task.deal() @register("extract_title_features") async def _extract_title_features_handler(self) -> int: """提取标题特征""" task = ExtractTitleFeatures(self.db_client, self.log_client, self.trace_id) await task.deal(data=self.data) return TaskStatus.SUCCESS # ==================== 统计分析类任务 ==================== @register("update_account_read_rate_avg") async def _update_account_read_rate_avg_handler(self) -> int: """更新账号阅读率均值""" task = AccountPositionReadRateAvg( pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id ) await task.deal(end_date=self.data.get("end_date")) return TaskStatus.SUCCESS @register("update_account_read_avg") async def _update_account_read_avg_handler(self) -> int: """更新账号阅读均值""" task = AccountPositionReadAvg( pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id ) await task.deal(end_date=self.data.get("end_date")) return TaskStatus.SUCCESS @register("update_account_open_rate_avg") async def _update_account_open_rate_avg_handler(self) -> int: """更新账号打开率均值""" task = AccountPositionOpenRateAvg( pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id ) await task.deal(date_string=self.data.get("date_string")) return TaskStatus.SUCCESS # ==================== 自动化任务 ==================== @register("auto_follow_account") async def _auto_follow_account_handler(self) -> int: """自动关注公众号""" task = AutoReplyCardsMonitor( pool=self.db_client, log_service=self.log_client, config=self.config ) await task.deal(task_name="follow_gzh_task") return TaskStatus.SUCCESS @register("get_follow_result") async def _get_follow_result_handler(self) -> int: """获取自动关注回复""" task = AutoReplyCardsMonitor( pool=self.db_client, log_service=self.log_client, config=self.config ) await task.deal(task_name="get_auto_reply_task") return TaskStatus.SUCCESS @register("extract_reply_result") async def _extract_reply_result_handler(self) -> int: """解析自动回复结果""" task = AutoReplyCardsMonitor( pool=self.db_client, log_service=self.log_client, config=self.config ) await task.deal(task_name="extract_task") return TaskStatus.SUCCESS # ==================== 其他任务 ==================== @register("mini_program_detail_process") async def _mini_program_detail_handler(self) -> int: """更新小程序裂变信息""" task = RecycleMiniProgramDetailTask( pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id ) await task.deal(params=self.data) return TaskStatus.SUCCESS @register("rate_limited_article_filter") async def _rate_limited_article_filter(self) -> int: """限流文章删除""" task = RateLimitedArticleFilter(pool=self.db_client, config=self.config) await task.deal() return TaskStatus.SUCCESS __all__ = ["TaskHandler"]