|
|
@@ -1,4 +1,5 @@
|
|
|
from datetime import datetime
|
|
|
+from typing import Callable, Awaitable, Dict, Any, Optional
|
|
|
|
|
|
from applications.tasks.analysis_task import CrawlerDetailDeal
|
|
|
from applications.tasks.analysis_task import AccountPositionReadRateAvg
|
|
|
@@ -40,71 +41,119 @@ from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
|
|
|
from applications.tasks.monitor_tasks import TaskProcessingMonitor
|
|
|
from applications.tasks.monitor_tasks import LimitedAccountAnalysisTask
|
|
|
|
|
|
-from applications.tasks.task_mapper import TaskMapper
|
|
|
+from applications.tasks.task_config import TaskStatus
|
|
|
+from applications.tasks.task_utils import TaskValidationError
|
|
|
|
|
|
|
|
|
-class TaskHandler(TaskMapper):
|
|
|
- def __init__(self, data, log_service, db_client, trace_id):
|
|
|
+_TASK_HANDLER_REGISTRY: Dict[str, Callable] = {}
|
|
|
+
|
|
|
+
|
|
|
+def register(task_name: str):
|
|
|
+ def decorator(func):
|
|
|
+ _TASK_HANDLER_REGISTRY[task_name] = func
|
|
|
+ return func
|
|
|
+
|
|
|
+ return decorator
|
|
|
+
|
|
|
+
|
|
|
+class TaskHandler:
|
|
|
+ """任务处理器基类 - 使用装饰器模式自动注册任务"""
|
|
|
+
|
|
|
+ # 任务注册表
|
|
|
+ _handlers = _TASK_HANDLER_REGISTRY
|
|
|
+
|
|
|
+ def __init__(self, data: dict, log_service, db_client, trace_id: str):
|
|
|
self.data = data
|
|
|
self.log_client = log_service
|
|
|
self.db_client = db_client
|
|
|
self.trace_id = trace_id
|
|
|
|
|
|
- # ---------- 下面是若干复合任务的局部实现 ----------
|
|
|
+ @classmethod
|
|
|
+ def get_handler(cls, task_name: str) -> Optional[Callable]:
|
|
|
+ """获取任务处理器"""
|
|
|
+ return cls._handlers.get(task_name)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def list_registered_tasks(cls) -> list:
|
|
|
+ """列出所有已注册的任务"""
|
|
|
+ return list(cls._handlers.keys())
|
|
|
+
|
|
|
+ async def _log_task_event(self, event_type: str, **kwargs):
|
|
|
+ """统一的任务日志记录"""
|
|
|
+ log_data = {
|
|
|
+ "timestamp": datetime.now().isoformat(),
|
|
|
+ "trace_id": self.trace_id,
|
|
|
+ "event_type": event_type,
|
|
|
+ "task": self.data.get("task_name"),
|
|
|
+ **kwargs,
|
|
|
+ }
|
|
|
+ await self.log_client.log(contents=log_data)
|
|
|
+
|
|
|
+ # ==================== 监控类任务 ====================
|
|
|
+
|
|
|
+ @register("check_kimi_balance")
|
|
|
async def _check_kimi_balance_handler(self) -> int:
|
|
|
+ """检查 Kimi 余额"""
|
|
|
response = await check_kimi_balance()
|
|
|
- await self.log_client.log(
|
|
|
- contents={
|
|
|
- "trace_id": self.trace_id,
|
|
|
- "task": "check_kimi_balance",
|
|
|
- "data": response,
|
|
|
- }
|
|
|
- )
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ await self._log_task_event("kimi_balance_checked", data=response)
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
|
|
|
+ @register("get_off_videos")
|
|
|
async def _get_off_videos_task_handler(self) -> int:
|
|
|
+ """视频下架任务"""
|
|
|
sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
|
|
|
return await sub_task.deal()
|
|
|
|
|
|
+ @register("check_publish_video_audit_status")
|
|
|
async def _check_video_audit_status_handler(self) -> int:
|
|
|
+ """检查视频审核状态"""
|
|
|
sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
|
|
|
return await sub_task.deal()
|
|
|
|
|
|
+ @register("task_processing_monitor")
|
|
|
async def _task_processing_monitor_handler(self) -> int:
|
|
|
+ """任务处理监控"""
|
|
|
sub_task = TaskProcessingMonitor(self.db_client)
|
|
|
await sub_task.deal()
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
|
|
|
+ @register("inner_article_monitor")
|
|
|
async def _inner_gzh_articles_monitor_handler(self) -> int:
|
|
|
+ """内部公众号文章监控"""
|
|
|
sub_task = InnerGzhArticlesMonitor(self.db_client)
|
|
|
return await sub_task.deal()
|
|
|
|
|
|
- async def _title_rewrite_handler(self):
|
|
|
- sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id)
|
|
|
- return await sub_task.deal()
|
|
|
-
|
|
|
- async def _update_root_source_id_handler(self) -> int:
|
|
|
- sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
|
|
|
- await sub_task.deal()
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
-
|
|
|
+ @register("outside_article_monitor")
|
|
|
async def _outside_monitor_handler(self) -> int:
|
|
|
+ """外部文章监控"""
|
|
|
collector = OutsideGzhArticlesCollector(self.db_client)
|
|
|
await collector.deal()
|
|
|
monitor = OutsideGzhArticlesMonitor(self.db_client)
|
|
|
- return await monitor.deal() # 应返回 SUCCESS / FAILED
|
|
|
+ return await monitor.deal()
|
|
|
|
|
|
- async def _recycle_article_data_handler(self) -> int:
|
|
|
- date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
|
|
|
- recycle = RecycleDailyPublishArticlesTask(
|
|
|
- self.db_client, self.log_client, date_str
|
|
|
+ @register("cooperate_accounts_monitor")
|
|
|
+ async def _cooperate_accounts_monitor_handler(self) -> int:
|
|
|
+ """合作账号文章监测"""
|
|
|
+ task = CooperateAccountsMonitorTask(
|
|
|
+ pool=self.db_client, log_client=self.log_client
|
|
|
)
|
|
|
- await recycle.deal()
|
|
|
- check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
|
|
|
- await check.deal()
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ await task.deal(task_name="save_articles")
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
+
|
|
|
+ @register("cooperate_accounts_detail")
|
|
|
+ async def _cooperate_accounts_detail_handler(self) -> int:
|
|
|
+ """合作账号文章详情更新"""
|
|
|
+ task = CooperateAccountsMonitorTask(
|
|
|
+ pool=self.db_client, log_client=self.log_client
|
|
|
+ )
|
|
|
+ await task.deal(task_name="get_detail")
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
|
|
|
+ # ==================== 爬虫类任务 ====================
|
|
|
+
|
|
|
+ @register("crawler_toutiao")
|
|
|
async def _crawler_toutiao_handler(self) -> int:
|
|
|
+ """头条文章/视频抓取"""
|
|
|
sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
|
|
|
method = self.data.get("method", "account")
|
|
|
media_type = self.data.get("media_type", "article")
|
|
|
@@ -118,60 +167,17 @@ class TaskHandler(TaskMapper):
|
|
|
case "search":
|
|
|
await sub_task.search_candidate_accounts()
|
|
|
case _:
|
|
|
- raise ValueError(f"Unsupported method {method}")
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ raise TaskValidationError(f"Unsupported method: {method}")
|
|
|
|
|
|
- async def _article_pool_cold_start_handler(self) -> int:
|
|
|
- cold_start = ArticlePoolColdStart(
|
|
|
- self.db_client, self.log_client, self.trace_id
|
|
|
- )
|
|
|
- platform = self.data.get("platform", "weixin")
|
|
|
- crawler_methods = self.data.get("crawler_methods", [])
|
|
|
- category_list = self.data.get("category_list", [])
|
|
|
- strategy = self.data.get("strategy", "strategy_v1")
|
|
|
- await cold_start.deal(
|
|
|
- platform=platform,
|
|
|
- crawl_methods=crawler_methods,
|
|
|
- category_list=category_list,
|
|
|
- strategy=strategy,
|
|
|
- )
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
-
|
|
|
- async def _candidate_account_quality_score_handler(self) -> int:
|
|
|
- task = CandidateAccountQualityScoreRecognizer(
|
|
|
- self.db_client, self.log_client, self.trace_id
|
|
|
- )
|
|
|
- await task.deal()
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
|
|
|
- async def _article_pool_category_generation_handler(self) -> int:
|
|
|
- task = ArticlePoolCategoryGeneration(
|
|
|
- self.db_client, self.log_client, self.trace_id
|
|
|
- )
|
|
|
- limit_num = self.data.get("limit")
|
|
|
- await task.deal(limit=limit_num)
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
-
|
|
|
- async def _crawler_account_manager_handler(self) -> int:
|
|
|
- platform = self.data.get("platform", "weixin")
|
|
|
- account_id_list = self.data.get("account_id_list")
|
|
|
-
|
|
|
- match platform:
|
|
|
- case "weixin":
|
|
|
- task = WeixinAccountManager(
|
|
|
- self.db_client, self.log_client, self.trace_id
|
|
|
- )
|
|
|
- case _:
|
|
|
- raise ValueError(f"Unsupported platform {platform}")
|
|
|
-
|
|
|
- await task.deal(platform=platform, account_id_list=account_id_list)
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
-
|
|
|
- # 抓取公众号文章
|
|
|
+ @register("crawler_gzh_articles")
|
|
|
async def _crawler_gzh_article_handler(self) -> int:
|
|
|
+ """抓取公众号文章"""
|
|
|
account_method = self.data.get("account_method")
|
|
|
crawl_mode = self.data.get("crawl_mode")
|
|
|
strategy = self.data.get("strategy")
|
|
|
+
|
|
|
match crawl_mode:
|
|
|
case "account":
|
|
|
task = CrawlerGzhAccountArticles(
|
|
|
@@ -184,131 +190,220 @@ class TaskHandler(TaskMapper):
|
|
|
)
|
|
|
await task.deal(strategy)
|
|
|
case _:
|
|
|
- raise ValueError(f"Unsupported crawl mode {crawl_mode}")
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ raise TaskValidationError(f"Unsupported crawl mode: {crawl_mode}")
|
|
|
|
|
|
- # 回收服务号文章
|
|
|
- async def _recycle_fwh_article_handler(self) -> int:
|
|
|
- task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
|
|
|
- await task.deal()
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
|
|
|
- # 账号品类处理任务
|
|
|
- async def _account_category_analysis_handler(self) -> int:
|
|
|
- task = AccountCategoryAnalysis(
|
|
|
- pool=self.db_client,
|
|
|
- log_client=self.log_client,
|
|
|
- trace_id=self.trace_id,
|
|
|
- data=self.data,
|
|
|
- date_string=None,
|
|
|
- )
|
|
|
- await task.deal()
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ @register("crawler_account_manager")
|
|
|
+ async def _crawler_account_manager_handler(self) -> int:
|
|
|
+ """账号管理任务"""
|
|
|
+ platform = self.data.get("platform", "weixin")
|
|
|
+ account_id_list = self.data.get("account_id_list")
|
|
|
|
|
|
- # 抓取视频/文章详情分析统计
|
|
|
+ match platform:
|
|
|
+ case "weixin":
|
|
|
+ task = WeixinAccountManager(
|
|
|
+ self.db_client, self.log_client, self.trace_id
|
|
|
+ )
|
|
|
+ await task.deal(platform=platform, account_id_list=account_id_list)
|
|
|
+ case _:
|
|
|
+ raise TaskValidationError(f"Unsupported platform: {platform}")
|
|
|
+
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
+
|
|
|
+ @register("crawler_detail_analysis")
|
|
|
async def _crawler_article_analysis_handler(self) -> int:
|
|
|
+ """抓取视频/文章详情分析统计"""
|
|
|
task = CrawlerDetailDeal(pool=self.db_client, trace_id=self.trace_id)
|
|
|
await task.deal(params=self.data)
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
|
|
|
- # 更新小程序裂变信息
|
|
|
- async def _mini_program_detail_handler(self) -> int:
|
|
|
- task = RecycleMiniProgramDetailTask(
|
|
|
- pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
|
|
|
+ # ==================== 数据回收类任务 ====================
|
|
|
+
|
|
|
+ @register("daily_publish_articles_recycle")
|
|
|
+ async def _recycle_article_data_handler(self) -> int:
|
|
|
+ """每日发文数据回收"""
|
|
|
+ date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
|
|
|
+ recycle = RecycleDailyPublishArticlesTask(
|
|
|
+ self.db_client, self.log_client, date_str
|
|
|
)
|
|
|
- await task.deal(params=self.data)
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ await recycle.deal()
|
|
|
+ check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
|
|
|
+ await check.deal()
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
|
|
|
- # 提取标题特征
|
|
|
- async def _extract_title_features_handler(self) -> int:
|
|
|
- task = ExtractTitleFeatures(self.db_client, self.log_client, self.trace_id)
|
|
|
- await task.deal(data=self.data)
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ @register("update_root_source_id")
|
|
|
+ async def _update_root_source_id_handler(self) -> int:
|
|
|
+ """更新 root_source_id"""
|
|
|
+ sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
|
|
|
+ await sub_task.deal()
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
|
|
|
- # 回收外部账号文章
|
|
|
+ @register("recycle_outside_account_articles")
|
|
|
async def _recycle_outside_account_article_handler(self) -> int:
|
|
|
+ """回收外部账号文章"""
|
|
|
date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
|
|
|
task = RecycleOutsideAccountArticlesTask(
|
|
|
pool=self.db_client, log_client=self.log_client, date_string=date_str
|
|
|
)
|
|
|
await task.deal()
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
|
|
|
- # 更新外部账号文章的root_source_id和update_time
|
|
|
- async def _update_outside_account_article_root_source_id_and_update_time_handler(
|
|
|
- self,
|
|
|
- ) -> int:
|
|
|
+ @register("update_outside_account_article_root_source_id")
|
|
|
+ async def _update_outside_account_article_root_source_id_handler(self) -> int:
|
|
|
+ """更新外部账号文章的 root_source_id"""
|
|
|
task = UpdateOutsideRootSourceIdAndUpdateTimeTask(
|
|
|
pool=self.db_client, log_client=self.log_client
|
|
|
)
|
|
|
await task.deal()
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
+
|
|
|
+ @register("fwh_daily_recycle")
|
|
|
+ async def _recycle_fwh_article_handler(self) -> int:
|
|
|
+ """回收服务号文章"""
|
|
|
+ task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
|
|
|
+ await task.deal()
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
+
|
|
|
+ # ==================== 算法类任务 ====================
|
|
|
+
|
|
|
+ @register("article_pool_cold_start")
|
|
|
+ async def _article_pool_cold_start_handler(self) -> int:
|
|
|
+ """文章池冷启动"""
|
|
|
+ cold_start = ArticlePoolColdStart(
|
|
|
+ self.db_client, self.log_client, self.trace_id
|
|
|
+ )
|
|
|
+ platform = self.data.get("platform", "weixin")
|
|
|
+ crawler_methods = self.data.get("crawler_methods", [])
|
|
|
+ category_list = self.data.get("category_list", [])
|
|
|
+ strategy = self.data.get("strategy", "strategy_v1")
|
|
|
+
|
|
|
+ await cold_start.deal(
|
|
|
+ platform=platform,
|
|
|
+ crawl_methods=crawler_methods,
|
|
|
+ category_list=category_list,
|
|
|
+ strategy=strategy,
|
|
|
+ )
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
+
|
|
|
+ @register("candidate_account_quality_analysis")
|
|
|
+ async def _candidate_account_quality_score_handler(self) -> int:
|
|
|
+ """候选账号质量分析"""
|
|
|
+ task = CandidateAccountQualityScoreRecognizer(
|
|
|
+ self.db_client, self.log_client, self.trace_id
|
|
|
+ )
|
|
|
+ await task.deal()
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
+
|
|
|
+ @register("article_pool_category_generation")
|
|
|
+ async def _article_pool_category_generation_handler(self) -> int:
|
|
|
+ """文章池品类生成"""
|
|
|
+ task = ArticlePoolCategoryGeneration(
|
|
|
+ self.db_client, self.log_client, self.trace_id
|
|
|
+ )
|
|
|
+ limit_num = self.data.get("limit")
|
|
|
+ await task.deal(limit=limit_num)
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
|
|
|
- # 更新限流账号信息
|
|
|
+ @register("account_category_analysis")
|
|
|
+ async def _account_category_analysis_handler(self) -> int:
|
|
|
+ """账号品类分析"""
|
|
|
+ task = AccountCategoryAnalysis(
|
|
|
+ pool=self.db_client,
|
|
|
+ log_client=self.log_client,
|
|
|
+ trace_id=self.trace_id,
|
|
|
+ data=self.data,
|
|
|
+ date_string=None,
|
|
|
+ )
|
|
|
+ await task.deal()
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
+
|
|
|
+ @register("update_limited_account_info")
|
|
|
async def _update_limited_account_info_handler(self) -> int:
|
|
|
+ """更新限流账号信息"""
|
|
|
task = LimitedAccountAnalysisTask(
|
|
|
pool=self.db_client, log_client=self.log_client
|
|
|
)
|
|
|
await task.deal(date_string=self.data.get("date_string"))
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
+
|
|
|
+ # ==================== LLM 类任务 ====================
|
|
|
|
|
|
- # 更新账号阅读率均值
|
|
|
+ @register("title_rewrite")
|
|
|
+ async def _title_rewrite_handler(self) -> int:
|
|
|
+ """标题重写"""
|
|
|
+ sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id)
|
|
|
+ return await sub_task.deal()
|
|
|
+
|
|
|
+ @register("extract_title_features")
|
|
|
+ async def _extract_title_features_handler(self) -> int:
|
|
|
+ """提取标题特征"""
|
|
|
+ task = ExtractTitleFeatures(self.db_client, self.log_client, self.trace_id)
|
|
|
+ await task.deal(data=self.data)
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
+
|
|
|
+ # ==================== 统计分析类任务 ====================
|
|
|
+
|
|
|
+ @register("update_account_read_rate_avg")
|
|
|
async def _update_account_read_rate_avg_handler(self) -> int:
|
|
|
+ """更新账号阅读率均值"""
|
|
|
task = AccountPositionReadRateAvg(
|
|
|
pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
|
|
|
)
|
|
|
await task.deal(end_date=self.data.get("end_date"))
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
|
|
|
- # 更新账号阅读均值
|
|
|
+ @register("update_account_read_avg")
|
|
|
async def _update_account_read_avg_handler(self) -> int:
|
|
|
+ """更新账号阅读均值"""
|
|
|
task = AccountPositionReadAvg(
|
|
|
pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
|
|
|
)
|
|
|
await task.deal(end_date=self.data.get("end_date"))
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
|
|
|
- # 更新账号打开率均值
|
|
|
+ @register("update_account_open_rate_avg")
|
|
|
async def _update_account_open_rate_avg_handler(self) -> int:
|
|
|
+ """更新账号打开率均值"""
|
|
|
task = AccountPositionOpenRateAvg(
|
|
|
pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
|
|
|
)
|
|
|
await task.deal(date_string=self.data.get("date_string"))
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
+
|
|
|
+ # ==================== 自动化任务 ====================
|
|
|
|
|
|
- # 自动关注公众号账号
|
|
|
+ @register("auto_follow_account")
|
|
|
async def _auto_follow_account_handler(self) -> int:
|
|
|
+ """自动关注公众号"""
|
|
|
task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
|
|
|
await task.deal(task_name="follow_gzh_task")
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
|
|
|
- # 获取自动关注回复
|
|
|
+ @register("get_follow_result")
|
|
|
async def _get_follow_result_handler(self) -> int:
|
|
|
+ """获取自动关注回复"""
|
|
|
task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
|
|
|
await task.deal(task_name="get_auto_reply_task")
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
|
|
|
- # 解析自动回复结果
|
|
|
+ @register("extract_reply_result")
|
|
|
async def _extract_reply_result_handler(self) -> int:
|
|
|
+ """解析自动回复结果"""
|
|
|
task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
|
|
|
await task.deal(task_name="extract_task")
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
|
|
|
- # 定时获取外部文章
|
|
|
- async def _cooperate_accounts_monitor_handler(self) -> int:
|
|
|
- task = CooperateAccountsMonitorTask(
|
|
|
- pool=self.db_client, log_client=self.log_client
|
|
|
- )
|
|
|
- await task.deal(task_name="save_articles")
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ # ==================== 其他任务 ====================
|
|
|
|
|
|
- # 定时更新详情
|
|
|
- async def _cooperate_accounts_detail_handler(self) -> int:
|
|
|
- task = CooperateAccountsMonitorTask(
|
|
|
- pool=self.db_client, log_client=self.log_client
|
|
|
+ @register("mini_program_detail_process")
|
|
|
+ async def _mini_program_detail_handler(self) -> int:
|
|
|
+ """更新小程序裂变信息"""
|
|
|
+ task = RecycleMiniProgramDetailTask(
|
|
|
+ pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
|
|
|
)
|
|
|
- await task.deal(task_name="get_detail")
|
|
|
- return self.TASK_SUCCESS_STATUS
|
|
|
+ await task.deal(params=self.data)
|
|
|
+ return TaskStatus.SUCCESS
|
|
|
|
|
|
|
|
|
__all__ = ["TaskHandler"]
|