import json from typing import Dict from tqdm import tqdm from app.core.database import DatabaseManager from app.core.observability import LogService from ._const import AdPlatformArticlesDecodeConst from ._mapper import AdPlatformArticlesDecodeMapper from ._util import AdPlatformArticlesDecodeUtil class AdPlatformArticlesDecodeTask(AdPlatformArticlesDecodeConst): def __init__(self, pool: DatabaseManager, log_service: LogService): self.pool = pool self.log_service = log_service self.mapper = AdPlatformArticlesDecodeMapper(self.pool) self.tool = AdPlatformArticlesDecodeUtil() async def create_single_decode_task(self, article: Dict): # Acquire Lock article_id = article["id"] acquire_lock = await self.mapper.update_article_decode_status( article_id, self.INIT_STATUS, self.PROCESSING_STATUS ) if not acquire_lock: await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task", "status": "skip", "message": "acquire lock failed", } ) return # 与解构系统交互,创建解构任务 response = await self.tool.create_decode_task(article) response_code = response.get("code") if response_code != self.SUCCESS_CODE: # 解构任务创建失败 await self.mapper.update_article_decode_status( article_id, self.PROCESSING_STATUS, self.FAILED_STATUS ) await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task", "status": "fail", "data": response, } ) return task_id = response.get("data", {}).get("task_id") or response.get("data", {}).get("taskId") if not task_id: # 解构任务创建失败 await self.mapper.update_article_decode_status( article_id, self.PROCESSING_STATUS, self.FAILED_STATUS ) await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task", "status": "fail", "data": response, } ) return # 创建 decode 任务成功 await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task", "status": "success", "data": response, } ) wx_sn = article["wx_sn"] remark = f"task_id: {task_id}-创建解构任务" record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark) if not record_row: # 记录解构任务失败 await self.mapper.update_article_decode_status( article_id, self.PROCESSING_STATUS, self.FAILED_STATUS ) await self.log_service.log( contents={ "article_id": article_id, "task": "record_decode_task", "status": "fail", "message": "创建 decode 记录失败", "data": response, } ) return # 记录创建成功 await self.mapper.update_article_decode_status( article_id, self.PROCESSING_STATUS, self.SUCCESS_STATUS ) async def fetch_single_task(self, task: Dict): task_id = task["task_id"] # acquire lock acquire_lock = await self.mapper.update_decode_task_status( task_id, self.INIT_STATUS, self.PROCESSING_STATUS ) if not acquire_lock: return response = await self.tool.fetch_decode_result(task_id) if not response: await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.PROCESSING_STATUS, new_status=self.INIT_STATUS, remark="获取解构结果失败,服务异常,已回滚状态", ) return # 请求成功 response_code = response.get("code") if response_code != self.SUCCESS_CODE: # 解构任务获取失败 await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.PROCESSING_STATUS, new_status=self.FAILED_STATUS, remark=f"请求解构接口返回异常,标记为失败:{json.dumps(response, ensure_ascii=False)}", ) return response_data = response.get("data", {}) response_task_id = response_data.get("taskId") or response_data.get("task_id") if task_id != response_task_id: # 解构任务获取失败 await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.PROCESSING_STATUS, new_status=self.FAILED_STATUS, remark=f"请求解构接口TaskId异常:{json.dumps(response, ensure_ascii=False)}", ) return status = response_data.get("status") match status: case self.PENDING: await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.PROCESSING_STATUS, new_status=self.INIT_STATUS, remark=f"解构任务状态为PENDING,继续轮询", ) case self.RUNNING: await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.PROCESSING_STATUS, new_status=self.INIT_STATUS, remark=f"解构任务状态为RUNNING,继续轮询", ) case self.SUCCESS: await self.mapper.set_decode_result( task_id=task_id, result=json.dumps(response_data, ensure_ascii=False), ) case self.FAILED: await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.PROCESSING_STATUS, new_status=self.FAILED_STATUS, remark=f"解构任务状态为FAILED,标记为失败", ) case _: await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.PROCESSING_STATUS, new_status=self.INIT_STATUS, remark=f"解构任务状态未知(status={status}),回滚待重试:{json.dumps(response_data, ensure_ascii=False)}", ) await self.log_service.log( contents={ "task": "fetch_single_task", "task_id": task_id, "status": "unknown", "message": f"unexpected decode status: {status}", "data": response_data, } ) async def create_tasks(self): article_list = await self.mapper.fetch_decode_articles() if not article_list: await self.log_service.log( contents={"task": "create_tasks", "message": "No more articles to decode"} ) return for article in tqdm(article_list, desc="Creating decode tasks"): await self.create_single_decode_task(article) async def fetch_results(self): decoding_tasks = await self.mapper.fetch_decoding_tasks() if not decoding_tasks: await self.log_service.log( contents={"task": "fetch_results", "message": "No more tasks to fetch"} ) return for task in decoding_tasks: await self.fetch_single_task(task) async def deal(self, task_name): match task_name: case "create_tasks": await self.create_tasks() case "fetch_results": await self.fetch_results() __all__ = ["AdPlatformArticlesDecodeTask"]