import asyncio import time from typing import Dict import json import aiohttp from tqdm import tqdm from app.core.database import DatabaseManager from app.core.observability import LogService from app.infra.shared.async_tasks import run_tasks_with_asyncio_task_group from ._const import DecodeTaskConst from ._mapper import AdPlatformArticlesDecodeTaskMapper, InnerArticlesDecodeTaskMapper from ._utils import AdPlatformArticlesDecodeUtils, InnerArticlesDecodeUtils class CreateAdPlatformArticlesDecodeTask(DecodeTaskConst): def __init__(self, pool: DatabaseManager, log_service: LogService): self.pool = pool self.log_service = log_service self.mapper = AdPlatformArticlesDecodeTaskMapper(self.pool) self.tool = AdPlatformArticlesDecodeUtils() async def create_single_decode_task(self, article: Dict): # Acquire Lock article_id = article["id"] acquire_lock = await self.mapper.update_article_decode_status( article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING ) if not acquire_lock: await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task", "status": "skip", "message": "acquire lock failed", } ) return # 与解构系统交互,创建解构任务 response = await self.tool.create_decode_task(article) response_code = response.get("code") if response_code != self.RequestDecode.SUCCESS: # 解构任务创建失败 await self.mapper.update_article_decode_status( article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED ) await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task", "status": "fail", "data": response, } ) return task_id = response.get("data", {}).get("task_id") or response.get( "data", {} ).get("taskId") if not task_id: # 解构任务创建失败 await self.mapper.update_article_decode_status( article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED ) await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task", "status": "fail", "data": response, } ) return # 创建 decode 任务成功 await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task", "status": "success", "data": response, } ) wx_sn = article["wx_sn"] remark = f"task_id: {task_id}-创建解构任务" record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark) if not record_row: # 记录解构任务失败 await self.mapper.update_article_decode_status( article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED ) await self.log_service.log( contents={ "article_id": article_id, "task": "record_decode_task", "status": "fail", "message": "创建 decode 记录失败", "data": response, } ) return # 记录创建成功 await self.mapper.update_article_decode_status( article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS ) async def create_tasks(self): article_list = await self.mapper.fetch_decode_articles() if not article_list: await self.log_service.log( contents={ "task": "create_tasks", "message": "No more articles to decode", } ) return for article in tqdm(article_list, desc="Creating decode tasks"): await self.create_single_decode_task(article) async def deal(self): await self.create_tasks() class CreateInnerArticlesDecodeTask(DecodeTaskConst): CREATE_TASK_NAME = "create_inner_articles_decode_task" MAX_CREATE_RETRY_TIMES = 3 LOCK_TIMEOUT_SECONDS = 30 * 60 CREATE_MAX_CONCURRENCY = 5 def __init__(self, pool: DatabaseManager, log_service: LogService): self.pool = pool self.log_service = log_service self.mapper = InnerArticlesDecodeTaskMapper(self.pool) self.tool = InnerArticlesDecodeUtils() async def _log_create_event(self, **contents): await self.log_service.log( contents={"task": self.CREATE_TASK_NAME, **contents} ) @staticmethod def _trim_error_message(message: str, limit: int = 500): if not message: return "" return message[:limit] async def _mark_retry_or_failed( self, source_id: str, task_type: int, error_message: str, retryable: bool, state: Dict | None, ): now_ts = int(time.time()) retry_count = (state or {}).get("retry_count", 0) should_retry = retryable and retry_count < self.MAX_CREATE_RETRY_TIMES error_message = self._trim_error_message(error_message) if should_retry: await self.mapper.mark_create_retry( source_id=source_id, task_type=task_type, now_ts=now_ts, error_message=error_message, ) await self._log_create_event( source_id=source_id, task_type=task_type, status="retry", retry_count=retry_count + 1, message=error_message, ) return await self.mapper.mark_create_failed( source_id=source_id, task_type=task_type, now_ts=now_ts, error_message=error_message, ) await self._log_create_event( source_id=source_id, task_type=task_type, status="failed", retry_count=retry_count + 1, message=error_message, ) async def _build_payload(self, article: Dict, task_type: int): source_id = article["source_id"] match task_type: case self.TaskType.PUBLISH_TITLE_COVER: return { "source_id": source_id, "title": article["title"], "cover_img": article["cover_img_url"], "channel_content_id": article.get("wx_sn", source_id), "content_type": self.ContentType.TITLE_COVER } case self.TaskType.SOURCE_IMAGES_TEXT: crawl_source_info = await self.mapper.fetch_article_crawler_source_info(source_id) if not crawl_source_info: raise ValueError("未找到文章抓取源信息") crawl_info = crawl_source_info[0] channel_content_id = crawl_info["channel_content_id"] raw_body_text = crawl_info["body_text"] body_text, images = self.tool.extract_body_text_and_images(raw_body_text) if not body_text and not images: raise ValueError("文章正文和图片均为空,无法创建解构任务") return { "source_id": source_id, "images": images, "body_text": body_text, "channel_content_id": channel_content_id or source_id, "content_type": self.ContentType.LONG_ARTICLE } case self.TaskType.MINI_TITLE_CARD: raise NotImplementedError("MINI_TITLE_CARD 数据未完善") case _: raise ValueError(f"unsupported task type: {task_type}") async def create_single_decode_task(self, task: Dict): article = task["article"] task_type = task["task_type"] source_id = article["source_id"] now_ts = int(time.time()) lock_expire_before = now_ts - self.LOCK_TIMEOUT_SECONDS await self.mapper.init_create_state(source_id, task_type, now_ts) acquire_lock = await self.mapper.acquire_create_lock( source_id=source_id, task_type=task_type, now_ts=now_ts, max_retry_times=self.MAX_CREATE_RETRY_TIMES, lock_expire_before=lock_expire_before, ) if not acquire_lock: await self._log_create_event( source_id=source_id, task_type=task_type, status="skip", message="acquire create lock failed", ) return state = await self.mapper.fetch_create_state(source_id, task_type) try: # exist_task = await self.mapper.fetch_exist_source_id(source_id, task_type) # if exist_task: # await self.mapper.mark_create_success( # source_id=source_id, # task_type=task_type, # remote_task_id="existing_task", # now_ts=int(time.time()), # remark="任务已存在,跳过重复创建", # ) # await self._log_create_event( # source_id=source_id, # task_type=task_type, # status="skip", # message="decode task already exists", # ) # return payload = await self._build_payload(article, task_type) response = await self.tool.create_decode_task_with_retry( payload=payload, retry_times=self.MAX_CREATE_RETRY_TIMES, ) response_code = response.get("code") if response_code != self.RequestDecode.SUCCESS: await self._mark_retry_or_failed( source_id=source_id, task_type=task_type, error_message=f"解构任务创建失败: {json.dumps(response, ensure_ascii=False)}", retryable=False, state=state, ) return task_id = response.get("data", {}).get("task_id") or response.get( "data", {} ).get("taskId") if not task_id: await self._mark_retry_or_failed( source_id=source_id, task_type=task_type, error_message=f"解构任务返回缺少 task_id: {json.dumps(response, ensure_ascii=False)}", retryable=False, state=state, ) return remark = f"task_id: {task_id}-创建解构任务" record_row = await self.mapper.record_decode_task_if_absent( task_id=task_id, content_id=source_id, task_type=task_type, payload=json.dumps(payload, ensure_ascii=False), remark=remark, ) if record_row not in (0, 1): await self._mark_retry_or_failed( source_id=source_id, task_type=task_type, error_message="创建 decode 记录失败", retryable=True, state=state, ) return await self.mapper.mark_create_success( source_id=source_id, task_type=task_type, remote_task_id=task_id, now_ts=int(time.time()), remark=remark, ) await self._log_create_event( source_id=source_id, task_type=task_type, status="success", retry_count=(state or {}).get("retry_count", 0), data=response, ) except (aiohttp.ClientError, TimeoutError, asyncio.TimeoutError) as exc: await self._mark_retry_or_failed( source_id=source_id, task_type=task_type, error_message=f"解构服务调用异常: {exc}", retryable=True, state=state, ) except (ValueError, NotImplementedError) as exc: await self._mark_retry_or_failed( source_id=source_id, task_type=task_type, error_message=str(exc), retryable=False, state=state, ) except Exception as exc: await self._mark_retry_or_failed( source_id=source_id, task_type=task_type, error_message=f"创建解构任务异常: {exc}", retryable=True, state=state, ) async def create_tasks(self, date_string: str = None, max_concurrency: int = None): article_list = await self.mapper.fetch_inner_articles(date_string or "20260401") if not article_list: await self._log_create_event( status="empty", message="No more articles to decode", ) return decode_types = [ self.TaskType.SOURCE_IMAGES_TEXT, # self.TaskType.PUBLISH_TITLE_COVER ] task_list = [ {"article": article, "task_type": task_type} for article in article_list for task_type in decode_types ] result = await run_tasks_with_asyncio_task_group( task_list=task_list, handler=self.create_single_decode_task, description="Creating inner decode tasks", unit="task", max_concurrency=max_concurrency or self.CREATE_MAX_CONCURRENCY, fail_fast=False, ) if result["errors"]: await self._log_create_event( status="partial_error", message="some inner decode tasks raised uncaught errors", data={ "total_task": result["total_task"], "processed_task": result["processed_task"], "error_count": len(result["errors"]), }, ) async def deal(self, date_string: str = None, max_concurrency: int = None): await self.create_tasks( date_string=date_string, max_concurrency=max_concurrency, ) __all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]