import json from typing import Dict from tqdm import tqdm from app.core.database import DatabaseManager from app.core.observability import LogService from app.infra.shared import run_tasks_with_asyncio_task_group from ._const import AdPlatformArticlesDecodeConst from ._mapper import AdPlatformArticlesDecodeMapper from ._util import AdPlatformArticlesDecodeUtil class AdPlatformArticlesDecodeTask(AdPlatformArticlesDecodeConst): def __init__(self, pool: DatabaseManager, log_service: LogService): self.pool = pool self.log_service = log_service self.mapper = AdPlatformArticlesDecodeMapper(self.pool) self.tool = AdPlatformArticlesDecodeUtil() async def create_single_decode_task(self, article: Dict): # Acquire Lock article_id = article["id"] acquire_lock = await self.mapper.update_article_decode_status( article_id, self.INIT_STATUS, self.PROCESSING_STATUS ) if not acquire_lock: await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task", "status": "skip", "message": "acquire lock failed", } ) return # 与解构系统交互,创建解构任务 response = await self.tool.create_decode_task(article) response_code = response.get("code") if response_code != self.SUCCESS_CODE: # 解构任务创建失败 await self.mapper.update_article_decode_status( article_id, self.PROCESSING_STATUS, self.FAILED_STATUS ) await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task", "status": "fail", "data": response, } ) return task_id = response.get("data", {}).get("task_id") or response.get( "data", {} ).get("taskId") if not task_id: # 解构任务创建失败 await self.mapper.update_article_decode_status( article_id, self.PROCESSING_STATUS, self.FAILED_STATUS ) await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task", "status": "fail", "data": response, } ) return # 创建 decode 任务成功 await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task", "status": "success", "data": response, } ) wx_sn = article["wx_sn"] remark = f"task_id: {task_id}-创建解构任务" record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark) if not record_row: # 记录解构任务失败 await self.mapper.update_article_decode_status( article_id, self.PROCESSING_STATUS, self.FAILED_STATUS ) await self.log_service.log( contents={ "article_id": article_id, "task": "record_decode_task", "status": "fail", "message": "创建 decode 记录失败", "data": response, } ) return # 记录创建成功 await self.mapper.update_article_decode_status( article_id, self.PROCESSING_STATUS, self.SUCCESS_STATUS ) async def fetch_single_task(self, task: Dict): task_id = task["task_id"] # acquire lock acquire_lock = await self.mapper.update_decode_task_status( task_id, self.INIT_STATUS, self.PROCESSING_STATUS ) if not acquire_lock: return response = await self.tool.fetch_decode_result(task_id) if not response: await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.PROCESSING_STATUS, new_status=self.INIT_STATUS, remark="获取解构结果失败,服务异常,已回滚状态", ) return # 请求成功 response_code = response.get("code") if response_code != self.SUCCESS_CODE: # 解构任务获取失败 await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.PROCESSING_STATUS, new_status=self.FAILED_STATUS, remark=f"请求解构接口返回异常,标记为失败:{json.dumps(response, ensure_ascii=False)}", ) return response_data = response.get("data", {}) response_task_id = response_data.get("taskId") or response_data.get("task_id") if task_id != response_task_id: # 解构任务获取失败 await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.PROCESSING_STATUS, new_status=self.FAILED_STATUS, remark=f"请求解构接口TaskId异常:{json.dumps(response, ensure_ascii=False)}", ) return status = response_data.get("status") match status: case self.PENDING: await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.PROCESSING_STATUS, new_status=self.INIT_STATUS, remark=f"解构任务状态为PENDING,继续轮询", ) case self.RUNNING: await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.PROCESSING_STATUS, new_status=self.INIT_STATUS, remark=f"解构任务状态为RUNNING,继续轮询", ) case self.SUCCESS: await self.mapper.set_decode_result( task_id=task_id, result=json.dumps(response_data, ensure_ascii=False), ) case self.FAILED: await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.PROCESSING_STATUS, new_status=self.FAILED_STATUS, remark=f"解构任务状态为FAILED,标记为失败", ) case _: await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.PROCESSING_STATUS, new_status=self.INIT_STATUS, remark=f"解构任务状态未知(status={status}),回滚待重试:{json.dumps(response_data, ensure_ascii=False)}", ) await self.log_service.log( contents={ "task": "fetch_single_task", "task_id": task_id, "status": "unknown", "message": f"unexpected decode status: {status}", "data": response_data, } ) async def extract_single_result(self, task): task_id = task["id"] # acquire lock by extract_status acquire_lock = await self.mapper.update_extract_status( task_id, self.INIT_STATUS, self.PROCESSING_STATUS ) if not acquire_lock: return try: result = json.loads(task["result"])["result"] except (TypeError, KeyError, json.JSONDecodeError) as e: await self.mapper.update_extract_status( task_id, self.PROCESSING_STATUS, self.FAILED_STATUS, ) await self.log_service.log( contents={ "task": "extract_single_result", "task_id": task_id, "status": "fail", "message": f"parse decode result error: {e}", "raw": task.get("result"), } ) return detail = self.tool.extract_decode_result(result) # 如果工具返回错误信息,直接标记为失败 if detail.get("error"): await self.mapper.update_extract_status( task_id, self.PROCESSING_STATUS, self.FAILED_STATUS, ) await self.log_service.log( contents={ "task": "extract_single_result", "task_id": task_id, "status": "fail", "message": detail["error"], } ) return # 写入明细表 saved = await self.mapper.record_extract_detail(task_id, detail) if not saved: await self.mapper.update_extract_status( task_id, self.PROCESSING_STATUS, self.FAILED_STATUS, ) await self.log_service.log( contents={ "task": "extract_single_result", "task_id": task_id, "status": "fail", "message": "insert long_articles_decode_task_detail failed", "detail": detail, } ) return # 写入成功,更新状态为成功 await self.mapper.update_extract_status( task_id, self.PROCESSING_STATUS, self.SUCCESS_STATUS, ) async def create_tasks(self): article_list = await self.mapper.fetch_decode_articles() if not article_list: await self.log_service.log( contents={ "task": "create_tasks", "message": "No more articles to decode", } ) return for article in tqdm(article_list, desc="Creating decode tasks"): await self.create_single_decode_task(article) async def fetch_results(self): decoding_tasks = await self.mapper.fetch_decoding_tasks() if not decoding_tasks: await self.log_service.log( contents={"task": "fetch_results", "message": "No more tasks to fetch"} ) return for task in decoding_tasks: await self.fetch_single_task(task) async def extract_task(self): tasks = await self.mapper.fetch_extract_tasks() await run_tasks_with_asyncio_task_group( task_list=tasks, handler=self.extract_single_result, description="批量解析结构结果", unit="task", ) async def deal(self, task_name): match task_name: case "create_tasks": await self.create_tasks() case "fetch_results": await self.fetch_results() case "extract": await self.extract_task() __all__ = ["AdPlatformArticlesDecodeTask"]