| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- import json
- from typing import Dict
- from tqdm import tqdm
- from app.core.database import DatabaseManager
- from app.core.observability import LogService
- from app.infra.shared import run_tasks_with_asyncio_task_group
- from ._const import AdPlatformArticlesDecodeConst
- from ._mapper import AdPlatformArticlesDecodeMapper
- from ._util import AdPlatformArticlesDecodeUtil
- class AdPlatformArticlesDecodeTask(AdPlatformArticlesDecodeConst):
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- self.pool = pool
- self.log_service = log_service
- self.mapper = AdPlatformArticlesDecodeMapper(self.pool)
- self.tool = AdPlatformArticlesDecodeUtil()
- async def create_single_decode_task(self, article: Dict):
- # Acquire Lock
- article_id = article["id"]
- acquire_lock = await self.mapper.update_article_decode_status(
- article_id, self.INIT_STATUS, self.PROCESSING_STATUS
- )
- if not acquire_lock:
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "task": "create_decode_task",
- "status": "skip",
- "message": "acquire lock failed",
- }
- )
- return
- # 与解构系统交互,创建解构任务
- response = await self.tool.create_decode_task(article)
- response_code = response.get("code")
- if response_code != self.SUCCESS_CODE:
- # 解构任务创建失败
- await self.mapper.update_article_decode_status(
- article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
- )
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "task": "create_decode_task",
- "status": "fail",
- "data": response,
- }
- )
- return
- task_id = response.get("data", {}).get("task_id") or response.get(
- "data", {}
- ).get("taskId")
- if not task_id:
- # 解构任务创建失败
- await self.mapper.update_article_decode_status(
- article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
- )
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "task": "create_decode_task",
- "status": "fail",
- "data": response,
- }
- )
- return
- # 创建 decode 任务成功
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "task": "create_decode_task",
- "status": "success",
- "data": response,
- }
- )
- wx_sn = article["wx_sn"]
- remark = f"task_id: {task_id}-创建解构任务"
- record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
- if not record_row:
- # 记录解构任务失败
- await self.mapper.update_article_decode_status(
- article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
- )
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "task": "record_decode_task",
- "status": "fail",
- "message": "创建 decode 记录失败",
- "data": response,
- }
- )
- return
- # 记录创建成功
- await self.mapper.update_article_decode_status(
- article_id, self.PROCESSING_STATUS, self.SUCCESS_STATUS
- )
- async def fetch_single_task(self, task: Dict):
- task_id = task["task_id"]
- # acquire lock
- acquire_lock = await self.mapper.update_decode_task_status(
- task_id, self.INIT_STATUS, self.PROCESSING_STATUS
- )
- if not acquire_lock:
- return
- response = await self.tool.fetch_decode_result(task_id)
- if not response:
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.PROCESSING_STATUS,
- new_status=self.INIT_STATUS,
- remark="获取解构结果失败,服务异常,已回滚状态",
- )
- return
- # 请求成功
- response_code = response.get("code")
- if response_code != self.SUCCESS_CODE:
- # 解构任务获取失败
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.PROCESSING_STATUS,
- new_status=self.FAILED_STATUS,
- remark=f"请求解构接口返回异常,标记为失败:{json.dumps(response, ensure_ascii=False)}",
- )
- return
- response_data = response.get("data", {})
- response_task_id = response_data.get("taskId") or response_data.get("task_id")
- if task_id != response_task_id:
- # 解构任务获取失败
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.PROCESSING_STATUS,
- new_status=self.FAILED_STATUS,
- remark=f"请求解构接口TaskId异常:{json.dumps(response, ensure_ascii=False)}",
- )
- return
- status = response_data.get("status")
- match status:
- case self.PENDING:
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.PROCESSING_STATUS,
- new_status=self.INIT_STATUS,
- remark=f"解构任务状态为PENDING,继续轮询",
- )
- case self.RUNNING:
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.PROCESSING_STATUS,
- new_status=self.INIT_STATUS,
- remark=f"解构任务状态为RUNNING,继续轮询",
- )
- case self.SUCCESS:
- await self.mapper.set_decode_result(
- task_id=task_id,
- result=json.dumps(response_data, ensure_ascii=False),
- )
- case self.FAILED:
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.PROCESSING_STATUS,
- new_status=self.FAILED_STATUS,
- remark=f"解构任务状态为FAILED,标记为失败",
- )
- case _:
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.PROCESSING_STATUS,
- new_status=self.INIT_STATUS,
- remark=f"解构任务状态未知(status={status}),回滚待重试:{json.dumps(response_data, ensure_ascii=False)}",
- )
- await self.log_service.log(
- contents={
- "task": "fetch_single_task",
- "task_id": task_id,
- "status": "unknown",
- "message": f"unexpected decode status: {status}",
- "data": response_data,
- }
- )
- async def extract_single_result(self, task):
- task_id = task["id"]
- # acquire lock by extract_status
- acquire_lock = await self.mapper.update_extract_status(
- task_id, self.INIT_STATUS, self.PROCESSING_STATUS
- )
- if not acquire_lock:
- return
- try:
- result = json.loads(task["result"])["result"]
- except (TypeError, KeyError, json.JSONDecodeError) as e:
- await self.mapper.update_extract_status(
- task_id,
- self.PROCESSING_STATUS,
- self.FAILED_STATUS,
- )
- await self.log_service.log(
- contents={
- "task": "extract_single_result",
- "task_id": task_id,
- "status": "fail",
- "message": f"parse decode result error: {e}",
- "raw": task.get("result"),
- }
- )
- return
- detail = self.tool.extract_decode_result(result)
- # 如果工具返回错误信息,直接标记为失败
- if detail.get("error"):
- await self.mapper.update_extract_status(
- task_id,
- self.PROCESSING_STATUS,
- self.FAILED_STATUS,
- )
- await self.log_service.log(
- contents={
- "task": "extract_single_result",
- "task_id": task_id,
- "status": "fail",
- "message": detail["error"],
- }
- )
- return
- # 写入明细表
- saved = await self.mapper.record_extract_detail(task_id, detail)
- if not saved:
- await self.mapper.update_extract_status(
- task_id,
- self.PROCESSING_STATUS,
- self.FAILED_STATUS,
- )
- await self.log_service.log(
- contents={
- "task": "extract_single_result",
- "task_id": task_id,
- "status": "fail",
- "message": "insert long_articles_decode_task_detail failed",
- "detail": detail,
- }
- )
- return
- # 写入成功,更新状态为成功
- await self.mapper.update_extract_status(
- task_id,
- self.PROCESSING_STATUS,
- self.SUCCESS_STATUS,
- )
- async def create_tasks(self):
- article_list = await self.mapper.fetch_decode_articles()
- if not article_list:
- await self.log_service.log(
- contents={
- "task": "create_tasks",
- "message": "No more articles to decode",
- }
- )
- return
- for article in tqdm(article_list, desc="Creating decode tasks"):
- await self.create_single_decode_task(article)
- async def fetch_results(self):
- decoding_tasks = await self.mapper.fetch_decoding_tasks()
- if not decoding_tasks:
- await self.log_service.log(
- contents={"task": "fetch_results", "message": "No more tasks to fetch"}
- )
- return
- for task in decoding_tasks:
- await self.fetch_single_task(task)
- async def extract_task(self):
- tasks = await self.mapper.fetch_extract_tasks()
- await run_tasks_with_asyncio_task_group(
- task_list=tasks,
- handler=self.extract_single_result,
- description="批量解析结构结果",
- unit="task",
- )
- async def deal(self, task_name):
- match task_name:
- case "create_tasks":
- await self.create_tasks()
- case "fetch_results":
- await self.fetch_results()
- case "extract":
- await self.extract_task()
- __all__ = ["AdPlatformArticlesDecodeTask"]
|