| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419 |
- import asyncio
- import time
- from typing import Dict
- import json
- import aiohttp
- from tqdm import tqdm
- from app.core.database import DatabaseManager
- from app.core.observability import LogService
- from app.infra.shared.async_tasks import run_tasks_with_asyncio_task_group
- from ._const import DecodeTaskConst
- from ._mapper import AdPlatformArticlesDecodeTaskMapper, InnerArticlesDecodeTaskMapper
- from ._utils import AdPlatformArticlesDecodeUtils, InnerArticlesDecodeUtils
- class CreateAdPlatformArticlesDecodeTask(DecodeTaskConst):
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- self.pool = pool
- self.log_service = log_service
- self.mapper = AdPlatformArticlesDecodeTaskMapper(self.pool)
- self.tool = AdPlatformArticlesDecodeUtils()
- async def create_single_decode_task(self, article: Dict):
- # Acquire Lock
- article_id = article["id"]
- acquire_lock = await self.mapper.update_article_decode_status(
- article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
- )
- if not acquire_lock:
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "task": self.LogTaskKey.CREATE_SINGLE,
- "status": "skip",
- "message": "acquire lock failed",
- }
- )
- return
- # 与解构系统交互,创建解构任务
- response = await self.tool.create_decode_task(article)
- response_code = response.get("code")
- if response_code != self.RequestDecode.SUCCESS:
- # 解构任务创建失败
- await self.mapper.update_article_decode_status(
- article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
- )
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "task": self.LogTaskKey.CREATE_SINGLE,
- "status": "fail",
- "data": response,
- }
- )
- return
- task_id = response.get("data", {}).get("task_id") or response.get(
- "data", {}
- ).get("taskId")
- if not task_id:
- # 解构任务创建失败
- await self.mapper.update_article_decode_status(
- article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
- )
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "task": self.LogTaskKey.CREATE_SINGLE,
- "status": "fail",
- "data": response,
- }
- )
- return
- # 创建 decode 任务成功
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "task": self.LogTaskKey.CREATE_SINGLE,
- "status": "success",
- "data": response,
- }
- )
- wx_sn = article["wx_sn"]
- remark = f"task_id: {task_id}-创建解构任务"
- record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
- if not record_row:
- # 记录解构任务失败
- await self.mapper.update_article_decode_status(
- article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
- )
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "task": self.LogTaskKey.RECORD_QUEUE,
- "status": "fail",
- "message": "创建 decode 记录失败",
- "data": response,
- }
- )
- return
- # 记录创建成功
- await self.mapper.update_article_decode_status(
- article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
- )
- async def create_tasks(self):
- article_list = await self.mapper.fetch_decode_articles()
- if not article_list:
- await self.log_service.log(
- contents={
- "task": self.LogTaskKey.CREATE_BATCH,
- "message": "No more articles to decode",
- }
- )
- return
- for article in tqdm(
- article_list, desc=self.AdPlatformDecodeBatch.TQDM_DESCRIPTION
- ):
- await self.create_single_decode_task(article)
- async def deal(self):
- await self.create_tasks()
- class CreateInnerArticlesDecodeTask(DecodeTaskConst):
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- self.pool = pool
- self.log_service = log_service
- self.mapper = InnerArticlesDecodeTaskMapper(self.pool)
- self.tool = InnerArticlesDecodeUtils()
- async def _log_create_event(self, **contents):
- await self.log_service.log(
- contents={"task": self.InnerDecodeCreate.SCHEDULER_TASK_NAME, **contents}
- )
- @staticmethod
- def _trim_error_message(message: str, limit: int = None):
- if limit is None:
- limit = DecodeTaskConst.InnerDecodeCreate.ERROR_MESSAGE_MAX_CHARS
- if not message:
- return ""
- return message[:limit]
- async def _mark_retry_or_failed(
- self,
- source_id: str,
- task_type: int,
- error_message: str,
- retryable: bool,
- state: Dict | None,
- ):
- now_ts = int(time.time())
- retry_count = (state or {}).get("retry_count", 0)
- should_retry = (
- retryable and retry_count < self.InnerDecodeCreate.MAX_RETRY_TIMES
- )
- error_message = self._trim_error_message(error_message)
- if should_retry:
- await self.mapper.mark_create_retry(
- source_id=source_id,
- task_type=task_type,
- now_ts=now_ts,
- error_message=error_message,
- )
- await self._log_create_event(
- source_id=source_id,
- task_type=task_type,
- status="retry",
- retry_count=retry_count + 1,
- message=error_message,
- )
- return
- await self.mapper.mark_create_failed(
- source_id=source_id,
- task_type=task_type,
- now_ts=now_ts,
- error_message=error_message,
- )
- await self._log_create_event(
- source_id=source_id,
- task_type=task_type,
- status="failed",
- retry_count=retry_count + 1,
- message=error_message,
- )
- async def _build_payload(self, article: Dict, task_type: int):
- source_id = article["source_id"]
- match task_type:
- case self.TaskType.PUBLISH_TITLE_COVER:
- return {
- "source_id": source_id,
- "title": article["title"],
- "cover_img": article["cover_img_url"],
- "channel_content_id": article.get("wx_sn", source_id),
- "content_type": self.ContentType.TITLE_COVER,
- }
- case self.TaskType.SOURCE_IMAGES_TEXT:
- crawl_source_info = await self.mapper.fetch_article_crawler_source_info(
- source_id
- )
- if not crawl_source_info:
- raise ValueError("未找到文章抓取源信息")
- crawl_info = crawl_source_info[0]
- channel_content_id = crawl_info["channel_content_id"]
- raw_body_text = crawl_info["body_text"]
- body_text, images = self.tool.extract_body_text_and_images(
- raw_body_text
- )
- if not body_text and not images:
- raise ValueError("文章正文和图片均为空,无法创建解构任务")
- return {
- "source_id": source_id,
- "images": images,
- "body_text": raw_body_text,
- "channel_content_id": channel_content_id or source_id,
- "content_type": self.ContentType.LONG_ARTICLE,
- }
- case self.TaskType.MINI_TITLE_CARD:
- raise NotImplementedError("MINI_TITLE_CARD 数据未完善")
- case _:
- raise ValueError(f"unsupported task type: {task_type}")
- async def create_single_decode_task(self, task: Dict):
- article = task["article"]
- task_type = task["task_type"]
- source_id = article["source_id"]
- exist_task = await self.mapper.fetch_exist_source_id(source_id, task_type)
- if exist_task:
- await self.mapper.mark_create_success(
- source_id=source_id,
- task_type=task_type,
- remote_task_id=self.InnerDecodeCreate.DUPLICATE_SKIP_REMOTE_TASK_ID,
- now_ts=int(time.time()),
- remark="任务已存在,跳过重复创建",
- )
- await self._log_create_event(
- source_id=source_id,
- task_type=task_type,
- status="skip",
- message="decode task already exists",
- )
- return
- now_ts = int(time.time())
- lock_expire_before = now_ts - self.INNER_DECODE_LOCK_TIMEOUT_SECONDS
- await self.mapper.init_create_state(source_id, task_type, now_ts)
- acquire_lock = await self.mapper.acquire_create_lock(
- source_id=source_id,
- task_type=task_type,
- now_ts=now_ts,
- max_retry_times=self.InnerDecodeCreate.MAX_RETRY_TIMES,
- lock_expire_before=lock_expire_before,
- )
- if not acquire_lock:
- await self._log_create_event(
- source_id=source_id,
- task_type=task_type,
- status="skip",
- message="acquire create lock failed",
- )
- return
- state = await self.mapper.fetch_create_state(source_id, task_type)
- try:
- payload = await self._build_payload(article, task_type)
- response = await self.tool.create_decode_task_with_retry(
- payload=payload,
- retry_times=self.InnerDecodeCreate.MAX_RETRY_TIMES,
- )
- response_code = response.get("code")
- if response_code != self.RequestDecode.SUCCESS:
- await self._mark_retry_or_failed(
- source_id=source_id,
- task_type=task_type,
- error_message=f"解构任务创建失败: {json.dumps(response, ensure_ascii=False)}",
- retryable=False,
- state=state,
- )
- return
- task_id = response.get("data", {}).get("task_id") or response.get(
- "data", {}
- ).get("taskId")
- if not task_id:
- await self._mark_retry_or_failed(
- source_id=source_id,
- task_type=task_type,
- error_message=f"解构任务返回缺少 task_id: {json.dumps(response, ensure_ascii=False)}",
- retryable=False,
- state=state,
- )
- return
- remark = f"task_id: {task_id}-创建解构任务"
- record_row = await self.mapper.record_decode_task_if_absent(
- task_id=task_id,
- content_id=source_id,
- task_type=task_type,
- payload=json.dumps(payload, ensure_ascii=False),
- remark=remark,
- )
- if record_row not in (
- self.InnerDecodeCreate.INSERT_IGNORE_AFFECTED_NOOP,
- self.InnerDecodeCreate.INSERT_IGNORE_AFFECTED_INSERTED,
- ):
- await self._mark_retry_or_failed(
- source_id=source_id,
- task_type=task_type,
- error_message="创建 decode 记录失败",
- retryable=True,
- state=state,
- )
- return
- await self.mapper.mark_create_success(
- source_id=source_id,
- task_type=task_type,
- remote_task_id=task_id,
- now_ts=int(time.time()),
- remark=remark,
- )
- await self._log_create_event(
- source_id=source_id,
- task_type=task_type,
- status="success",
- retry_count=(state or {}).get("retry_count", 0),
- data=response,
- )
- except (aiohttp.ClientError, TimeoutError, asyncio.TimeoutError) as exc:
- await self._mark_retry_or_failed(
- source_id=source_id,
- task_type=task_type,
- error_message=f"解构服务调用异常: {exc}",
- retryable=True,
- state=state,
- )
- except (ValueError, NotImplementedError) as exc:
- await self._mark_retry_or_failed(
- source_id=source_id,
- task_type=task_type,
- error_message=str(exc),
- retryable=False,
- state=state,
- )
- except Exception as exc:
- await self._mark_retry_or_failed(
- source_id=source_id,
- task_type=task_type,
- error_message=f"创建解构任务异常: {exc}",
- retryable=True,
- state=state,
- )
- async def create_tasks(self, date_string: str = None, max_concurrency: int = None):
- article_list = await self.mapper.fetch_inner_articles(date_string)
- if not article_list:
- await self._log_create_event(
- status="empty",
- message="No more articles to decode",
- )
- return
- decode_types = [
- self.TaskType.SOURCE_IMAGES_TEXT,
- self.TaskType.PUBLISH_TITLE_COVER
- ]
- task_list = [
- {"article": article, "task_type": task_type}
- for article in article_list
- for task_type in decode_types
- ]
- result = await run_tasks_with_asyncio_task_group(
- task_list=task_list,
- handler=self.create_single_decode_task,
- description=self.InnerDecodeCreate.ASYNC_BATCH_DESCRIPTION,
- unit=self.InnerDecodeCreate.ASYNC_BATCH_UNIT,
- max_concurrency=max_concurrency
- or self.InnerDecodeCreate.DEFAULT_MAX_CONCURRENCY,
- fail_fast=self.InnerDecodeCreate.ASYNC_BATCH_FAIL_FAST,
- )
- if result["errors"]:
- await self._log_create_event(
- status="partial_error",
- message="some inner decode tasks raised uncaught errors",
- data={
- "total_task": result["total_task"],
- "processed_task": result["processed_task"],
- "error_count": len(result["errors"]),
- },
- )
- async def deal(self, date_string: str = None, max_concurrency: int = None):
- await self.create_tasks(
- date_string=date_string,
- max_concurrency=max_concurrency,
- )
- __all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]
|