from typing import Dict from tqdm import tqdm from app.core.database import DatabaseManager from app.core.observability import LogService from ._const import DecodeTaskConst from ._mapper import AdPlatformArticlesDecodeTaskMapper, InnerArticlesDecodeTaskMapper from ._utils import AdPlatformArticlesDecodeUtils, InnerArticlesDecodeUtils class CreateAdPlatformArticlesDecodeTask(DecodeTaskConst): def __init__(self, pool: DatabaseManager, log_service: LogService): self.pool = pool self.log_service = log_service self.mapper = AdPlatformArticlesDecodeTaskMapper(self.pool) self.tool = AdPlatformArticlesDecodeUtils() async def create_single_decode_task(self, article: Dict): # Acquire Lock article_id = article["id"] acquire_lock = await self.mapper.update_article_decode_status( article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING ) if not acquire_lock: await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task", "status": "skip", "message": "acquire lock failed", } ) return # 与解构系统交互,创建解构任务 response = await self.tool.create_decode_task(article) response_code = response.get("code") if response_code != self.RequestDecode.SUCCESS: # 解构任务创建失败 await self.mapper.update_article_decode_status( article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED ) await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task", "status": "fail", "data": response, } ) return task_id = response.get("data", {}).get("task_id") or response.get( "data", {} ).get("taskId") if not task_id: # 解构任务创建失败 await self.mapper.update_article_decode_status( article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED ) await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task", "status": "fail", "data": response, } ) return # 创建 decode 任务成功 await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task", "status": "success", "data": response, } ) wx_sn = article["wx_sn"] remark = f"task_id: {task_id}-创建解构任务" record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark) if not record_row: # 记录解构任务失败 await self.mapper.update_article_decode_status( article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED ) await self.log_service.log( contents={ "article_id": article_id, "task": "record_decode_task", "status": "fail", "message": "创建 decode 记录失败", "data": response, } ) return # 记录创建成功 await self.mapper.update_article_decode_status( article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS ) async def create_tasks(self): article_list = await self.mapper.fetch_decode_articles() if not article_list: await self.log_service.log( contents={ "task": "create_tasks", "message": "No more articles to decode", } ) return for article in tqdm(article_list, desc="Creating decode tasks"): await self.create_single_decode_task(article) async def deal(self): await self.create_tasks() class CreateInnerArticlesDecodeTask(DecodeTaskConst): def __init__(self, pool: DatabaseManager, log_service: LogService): self.pool = pool self.log_service = log_service self.mapper = InnerArticlesDecodeTaskMapper(self.pool) self.tool = InnerArticlesDecodeUtils() async def create_single_decode_task(self, article: Dict): # Acquire Lock source_id = article["source_id"] article_produce_info = await self.mapper.fetch_inner_articles_produce_detail( source_id ) # 与解构系统交互,创建解构任务 response = await self.tool.create_decode_task(article, article_produce_info) response_code = response.get("code") if response_code != self.RequestDecode.SUCCESS: return task_id = response.get("data", {}).get("task_id") or response.get( "data", {} ).get("taskId") if not task_id: return wx_sn = article["wx_sn"] remark = f"task_id: {task_id}-创建解构任务" record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark) if not record_row: return async def create_tasks(self): article_list = await self.mapper.fetch_inner_articles() if not article_list: await self.log_service.log( contents={ "task": "create_tasks", "message": "No more articles to decode", } ) return for article in tqdm(article_list, desc="Creating decode tasks"): await self.create_single_decode_task(article) async def deal(self): await self.create_tasks() __all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]