123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- from datetime import datetime
- from applications.tasks.cold_start_tasks import ArticlePoolColdStart
- from applications.tasks.crawler_tasks import CrawlerToutiao
- from applications.tasks.crawler_tasks import WeixinAccountManager
- from applications.tasks.crawler_tasks import CrawlerGzhAccountArticles
- from applications.tasks.crawler_tasks import CrawlerGzhSearchArticles
- from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
- from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
- from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
- from applications.tasks.llm_tasks import TitleRewrite
- from applications.tasks.llm_tasks import ArticlePoolCategoryGeneration
- from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
- from applications.tasks.monitor_tasks import check_kimi_balance
- from applications.tasks.monitor_tasks import GetOffVideos
- from applications.tasks.monitor_tasks import CheckVideoAuditStatus
- from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
- from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
- from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
- from applications.tasks.monitor_tasks import TaskProcessingMonitor
- from applications.tasks.task_mapper import TaskMapper
- class TaskHandler(TaskMapper):
- def __init__(self, data, log_service, db_client, trace_id):
- self.data = data
- self.log_client = log_service
- self.db_client = db_client
- self.trace_id = trace_id
- # ---------- 下面是若干复合任务的局部实现 ----------
- async def _check_kimi_balance_handler(self) -> int:
- response = await check_kimi_balance()
- await self.log_client.log(
- contents={
- "trace_id": self.trace_id,
- "task": "check_kimi_balance",
- "data": response,
- }
- )
- return self.TASK_SUCCESS_STATUS
- async def _get_off_videos_task_handler(self) -> int:
- sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
- return await sub_task.deal()
- async def _check_video_audit_status_handler(self) -> int:
- sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
- return await sub_task.deal()
- async def _task_processing_monitor_handler(self) -> int:
- sub_task = TaskProcessingMonitor(self.db_client)
- await sub_task.deal()
- return self.TASK_SUCCESS_STATUS
- async def _inner_gzh_articles_monitor_handler(self) -> int:
- sub_task = InnerGzhArticlesMonitor(self.db_client)
- return await sub_task.deal()
- async def _title_rewrite_handler(self):
- sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id)
- return await sub_task.deal()
- async def _update_root_source_id_handler(self) -> int:
- sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
- await sub_task.deal()
- return self.TASK_SUCCESS_STATUS
- async def _outside_monitor_handler(self) -> int:
- collector = OutsideGzhArticlesCollector(self.db_client)
- await collector.deal()
- monitor = OutsideGzhArticlesMonitor(self.db_client)
- return await monitor.deal() # 应返回 SUCCESS / FAILED
- async def _recycle_article_data_handler(self) -> int:
- date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
- recycle = RecycleDailyPublishArticlesTask(
- self.db_client, self.log_client, date_str
- )
- await recycle.deal()
- check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
- await check.deal()
- return self.TASK_SUCCESS_STATUS
- async def _crawler_toutiao_handler(self) -> int:
- sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
- method = self.data.get("method", "account")
- media_type = self.data.get("media_type", "article")
- category_list = self.data.get("category_list", [])
- match method:
- case "account":
- await sub_task.crawler_task(media_type=media_type)
- case "recommend":
- await sub_task.crawl_toutiao_recommend_task(category_list)
- case "search":
- await sub_task.search_candidate_accounts()
- case _:
- raise ValueError(f"Unsupported method {method}")
- return self.TASK_SUCCESS_STATUS
- async def _article_pool_cold_start_handler(self) -> int:
- cold_start = ArticlePoolColdStart(
- self.db_client, self.log_client, self.trace_id
- )
- platform = self.data.get("platform", "weixin")
- crawler_methods = self.data.get("crawler_methods", [])
- await cold_start.deal(platform=platform, crawl_methods=crawler_methods)
- return self.TASK_SUCCESS_STATUS
- async def _candidate_account_quality_score_handler(self) -> int:
- task = CandidateAccountQualityScoreRecognizer(
- self.db_client, self.log_client, self.trace_id
- )
- await task.deal()
- return self.TASK_SUCCESS_STATUS
- async def _article_pool_category_generation_handler(self) -> int:
- task = ArticlePoolCategoryGeneration(
- self.db_client, self.log_client, self.trace_id
- )
- limit_num = self.data.get("limit")
- await task.deal(limit=limit_num)
- return self.TASK_SUCCESS_STATUS
- async def _crawler_account_manager_handler(self) -> int:
- platform = self.data.get("platform", "weixin")
- account_id_list = self.data.get("account_id_list")
- match platform:
- case "weixin":
- task = WeixinAccountManager(
- self.db_client, self.log_client, self.trace_id
- )
- case _:
- raise ValueError(f"Unsupported platform {platform}")
- await task.deal(platform=platform, account_id_list=account_id_list)
- return self.TASK_SUCCESS_STATUS
- async def _crawler_gzh_article_handler(self) -> int:
- account_method = self.data.get("account_method")
- crawl_mode = self.data.get("crawl_mode")
- strategy = self.data.get("strategy")
- match crawl_mode:
- case "account":
- task = CrawlerGzhAccountArticles(
- self.db_client, self.log_client, self.trace_id
- )
- await task.deal(account_method, strategy)
- case "search":
- task = CrawlerGzhSearchArticles(
- self.db_client, self.log_client, self.trace_id
- )
- await task.deal(strategy)
- case _:
- raise ValueError(f"Unsupported crawl mode {crawl_mode}")
- return self.TASK_SUCCESS_STATUS
|