from datetime import datetime from applications.tasks.analysis_task import CrawlerDetailDeal from applications.tasks.algorithm_tasks import AccountCategoryAnalysis from applications.tasks.cold_start_tasks import ArticlePoolColdStart from applications.tasks.crawler_tasks import CrawlerToutiao from applications.tasks.crawler_tasks import WeixinAccountManager from applications.tasks.crawler_tasks import CrawlerGzhAccountArticles from applications.tasks.crawler_tasks import CrawlerGzhSearchArticles from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask from applications.tasks.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask from applications.tasks.data_recycle_tasks import RecycleMiniProgramDetailTask from applications.tasks.llm_tasks import TitleRewrite from applications.tasks.llm_tasks import ArticlePoolCategoryGeneration from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer from applications.tasks.monitor_tasks import check_kimi_balance from applications.tasks.monitor_tasks import GetOffVideos from applications.tasks.monitor_tasks import CheckVideoAuditStatus from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector from applications.tasks.monitor_tasks import TaskProcessingMonitor from applications.tasks.task_mapper import TaskMapper class TaskHandler(TaskMapper): def __init__(self, data, log_service, db_client, trace_id): self.data = data self.log_client = log_service self.db_client = db_client self.trace_id = trace_id # ---------- 下面是若干复合任务的局部实现 ---------- async def _check_kimi_balance_handler(self) -> int: response = await check_kimi_balance() await self.log_client.log( contents={ "trace_id": self.trace_id, "task": "check_kimi_balance", "data": response, } ) return self.TASK_SUCCESS_STATUS async def _get_off_videos_task_handler(self) -> int: sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id) return await sub_task.deal() async def _check_video_audit_status_handler(self) -> int: sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id) return await sub_task.deal() async def _task_processing_monitor_handler(self) -> int: sub_task = TaskProcessingMonitor(self.db_client) await sub_task.deal() return self.TASK_SUCCESS_STATUS async def _inner_gzh_articles_monitor_handler(self) -> int: sub_task = InnerGzhArticlesMonitor(self.db_client) return await sub_task.deal() async def _title_rewrite_handler(self): sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id) return await sub_task.deal() async def _update_root_source_id_handler(self) -> int: sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client) await sub_task.deal() return self.TASK_SUCCESS_STATUS async def _outside_monitor_handler(self) -> int: collector = OutsideGzhArticlesCollector(self.db_client) await collector.deal() monitor = OutsideGzhArticlesMonitor(self.db_client) return await monitor.deal() # 应返回 SUCCESS / FAILED async def _recycle_article_data_handler(self) -> int: date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d") recycle = RecycleDailyPublishArticlesTask( self.db_client, self.log_client, date_str ) await recycle.deal() check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str) await check.deal() return self.TASK_SUCCESS_STATUS async def _crawler_toutiao_handler(self) -> int: sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id) method = self.data.get("method", "account") media_type = self.data.get("media_type", "article") category_list = self.data.get("category_list", []) match method: case "account": await sub_task.crawler_task(media_type=media_type) case "recommend": await sub_task.crawl_toutiao_recommend_task(category_list) case "search": await sub_task.search_candidate_accounts() case _: raise ValueError(f"Unsupported method {method}") return self.TASK_SUCCESS_STATUS async def _article_pool_cold_start_handler(self) -> int: cold_start = ArticlePoolColdStart( self.db_client, self.log_client, self.trace_id ) platform = self.data.get("platform", "weixin") crawler_methods = self.data.get("crawler_methods", []) category_list = self.data.get("category_list", []) strategy = self.data.get("strategy", "strategy_v1") await cold_start.deal( platform=platform, crawl_methods=crawler_methods, category_list=category_list, strategy=strategy, ) return self.TASK_SUCCESS_STATUS async def _candidate_account_quality_score_handler(self) -> int: task = CandidateAccountQualityScoreRecognizer( self.db_client, self.log_client, self.trace_id ) await task.deal() return self.TASK_SUCCESS_STATUS async def _article_pool_category_generation_handler(self) -> int: task = ArticlePoolCategoryGeneration( self.db_client, self.log_client, self.trace_id ) limit_num = self.data.get("limit") await task.deal(limit=limit_num) return self.TASK_SUCCESS_STATUS async def _crawler_account_manager_handler(self) -> int: platform = self.data.get("platform", "weixin") account_id_list = self.data.get("account_id_list") match platform: case "weixin": task = WeixinAccountManager( self.db_client, self.log_client, self.trace_id ) case _: raise ValueError(f"Unsupported platform {platform}") await task.deal(platform=platform, account_id_list=account_id_list) return self.TASK_SUCCESS_STATUS # 抓取公众号文章 async def _crawler_gzh_article_handler(self) -> int: account_method = self.data.get("account_method") crawl_mode = self.data.get("crawl_mode") strategy = self.data.get("strategy") match crawl_mode: case "account": task = CrawlerGzhAccountArticles( self.db_client, self.log_client, self.trace_id ) await task.deal(account_method, strategy) case "search": task = CrawlerGzhSearchArticles( self.db_client, self.log_client, self.trace_id ) await task.deal(strategy) case _: raise ValueError(f"Unsupported crawl mode {crawl_mode}") return self.TASK_SUCCESS_STATUS # 回收服务号文章 async def _recycle_fwh_article_handler(self) -> int: task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client) await task.deal() return self.TASK_SUCCESS_STATUS # 账号品类处理任务 async def _account_category_analysis_handler(self) -> int: task = AccountCategoryAnalysis( pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id, data=self.data, date_string=None, ) await task.deal() return self.TASK_SUCCESS_STATUS # 抓取视频/文章详情分析统计 async def _crawler_article_analysis_handler(self) -> int: task = CrawlerDetailDeal(pool=self.db_client, trace_id=self.trace_id) await task.deal(params=self.data) return self.TASK_SUCCESS_STATUS # 更新小程序裂变信息 async def _mini_program_detail_handler(self) -> int: task = RecycleMiniProgramDetailTask(pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id) await task.deal(params=self.data) return self.TASK_SUCCESS_STATUS __all__ = ["TaskHandler"]