| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- import json
- from typing import Dict
- from tqdm import tqdm
- from app.core.database import DatabaseManager
- from app.core.observability import LogService
- from ._const import AdPlatformArticlesDecodeConst
- from ._mapper import AdPlatformArticlesDecodeMapper
- from ._util import AdPlatformArticlesDecodeUtil
- class AdPlatformArticlesDecodeTask(AdPlatformArticlesDecodeConst):
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- self.pool = pool
- self.log_service = log_service
- self.mapper = AdPlatformArticlesDecodeMapper(self.pool)
- self.tool = AdPlatformArticlesDecodeUtil()
- async def create_single_decode_task(self, article: Dict):
- # Acquire Lock
- article_id = article["id"]
- acquire_lock = await self.mapper.update_article_decode_status(
- article_id, self.INIT_STATUS, self.PROCESSING_STATUS
- )
- if not acquire_lock:
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "task": "create_decode_task",
- "status": "skip",
- "message": "acquire lock failed",
- }
- )
- return
- # 与解构系统交互,创建解构任务
- response = await self.tool.create_decode_task(article)
- response_code = response.get("code")
- if response_code != self.SUCCESS_CODE:
- # 解构任务创建失败
- await self.mapper.update_article_decode_status(
- article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
- )
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "task": "create_decode_task",
- "status": "fail",
- "data": response,
- }
- )
- return
- task_id = response.get("data", {}).get("task_id")
- if not task_id:
- # 解构任务创建失败
- await self.mapper.update_article_decode_status(
- article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
- )
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "task": "create_decode_task",
- "status": "fail",
- "data": response,
- }
- )
- return
- # 创建 decode 任务成功
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "task": "create_decode_task",
- "status": "success",
- "data": response,
- }
- )
- wx_sn = article["wx_sn"]
- remark = f"task_id: {task_id}-创建解构任务"
- record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
- if not record_row:
- # 记录解构任务失败
- await self.mapper.update_article_decode_status(
- article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
- )
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "task": "record_decode_task",
- "status": "fail",
- "message": "创建 decode 记录失败",
- "data": response,
- }
- )
- return
- # 记录创建成功
- await self.mapper.update_article_decode_status(
- article_id, self.PROCESSING_STATUS, self.SUCCESS_STATUS
- )
- async def fetch_single_task(self, task: Dict):
- task_id = task["task_id"]
- # acquire lock
- acquire_lock = await self.mapper.update_decode_task_status(
- task_id, self.INIT_STATUS, self.PROCESSING_STATUS
- )
- if not acquire_lock:
- return
- response = await self.tool.fetch_decode_result(task_id)
- if not response:
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.PROCESSING_STATUS,
- new_status=self.INIT_STATUS,
- remark="获取解构结果失败,服务异常,已回滚状态",
- )
- return
- # 请求成功
- response_code = response.get("code")
- if response_code != self.SUCCESS_CODE:
- # 解构任务获取失败
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.PROCESSING_STATUS,
- new_status=self.FAILED_STATUS,
- remark=f"请求解构接口返回异常,标记为失败:{json.dumps(response, ensure_ascii=False)}",
- )
- return
- response_data = response.get("data", {})
- response_task_id = response_data.get("taskId")
- if task_id != response_task_id:
- # 解构任务获取失败
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.PROCESSING_STATUS,
- new_status=self.FAILED_STATUS,
- remark=f"请求解构接口TaskId异常:{json.dumps(response, ensure_ascii=False)}",
- )
- return
- status = response_data.get("status")
- match status:
- case self.PENDING:
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.PROCESSING_STATUS,
- new_status=self.INIT_STATUS,
- remark=f"解构任务状态为PENDING,继续轮询",
- )
- case self.RUNNING:
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.PROCESSING_STATUS,
- new_status=self.INIT_STATUS,
- remark=f"解构任务状态为RUNNING,继续轮询",
- )
- case self.SUCCESS:
- await self.mapper.set_decode_result(
- task_id=task_id,
- result=json.dumps(response_data, ensure_ascii=False),
- )
- case self.FAILED:
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.PROCESSING_STATUS,
- new_status=self.FAILED_STATUS,
- remark=f"解构任务状态为FAILED,标记为失败",
- )
- case _:
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.PROCESSING_STATUS,
- new_status=self.INIT_STATUS,
- remark=f"解构任务状态未知(status={status}),回滚待重试:{json.dumps(response_data, ensure_ascii=False)}",
- )
- await self.log_service.log(
- contents={
- "task": "fetch_single_task",
- "task_id": task_id,
- "status": "unknown",
- "message": f"unexpected decode status: {status}",
- "data": response_data,
- }
- )
- async def create_tasks(self):
- article_list = await self.mapper.fetch_decode_articles()
- if not article_list:
- await self.log_service.log(
- contents={"task": "create_tasks", "message": "No more articles to decode"}
- )
- return
- for article in tqdm(article_list, desc="Creating decode tasks"):
- await self.create_single_decode_task(article)
- async def fetch_results(self):
- decoding_tasks = await self.mapper.fetch_decoding_tasks()
- if not decoding_tasks:
- await self.log_service.log(
- contents={"task": "fetch_results", "message": "No more tasks to fetch"}
- )
- return
- for task in decoding_tasks:
- await self.fetch_single_task(task)
- async def deal(self, task_name):
- match task_name:
- case "create_tasks":
- await self.create_tasks()
- case "fetch_results":
- await self.fetch_results()
- __all__ = ["AdPlatformArticlesDecodeTask"]
|