|
@@ -1,321 +0,0 @@
|
|
|
-import json
|
|
|
|
|
-from typing import Dict
|
|
|
|
|
-from tqdm import tqdm
|
|
|
|
|
-
|
|
|
|
|
-from app.core.database import DatabaseManager
|
|
|
|
|
-from app.core.observability import LogService
|
|
|
|
|
-
|
|
|
|
|
-from app.infra.shared import run_tasks_with_asyncio_task_group
|
|
|
|
|
-
|
|
|
|
|
-from ._const import AdPlatformArticlesDecodeConst
|
|
|
|
|
-from ._mapper import AdPlatformArticlesDecodeMapper
|
|
|
|
|
-from ._util import AdPlatformArticlesDecodeUtil
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-class AdPlatformArticlesDecodeTask(AdPlatformArticlesDecodeConst):
|
|
|
|
|
- def __init__(self, pool: DatabaseManager, log_service: LogService):
|
|
|
|
|
- self.pool = pool
|
|
|
|
|
- self.log_service = log_service
|
|
|
|
|
- self.mapper = AdPlatformArticlesDecodeMapper(self.pool)
|
|
|
|
|
- self.tool = AdPlatformArticlesDecodeUtil()
|
|
|
|
|
-
|
|
|
|
|
- async def create_single_decode_task(self, article: Dict):
|
|
|
|
|
- # Acquire Lock
|
|
|
|
|
- article_id = article["id"]
|
|
|
|
|
- acquire_lock = await self.mapper.update_article_decode_status(
|
|
|
|
|
- article_id, self.INIT_STATUS, self.PROCESSING_STATUS
|
|
|
|
|
- )
|
|
|
|
|
- if not acquire_lock:
|
|
|
|
|
- await self.log_service.log(
|
|
|
|
|
- contents={
|
|
|
|
|
- "article_id": article_id,
|
|
|
|
|
- "task": "create_decode_task",
|
|
|
|
|
- "status": "skip",
|
|
|
|
|
- "message": "acquire lock failed",
|
|
|
|
|
- }
|
|
|
|
|
- )
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- # 与解构系统交互,创建解构任务
|
|
|
|
|
- response = await self.tool.create_decode_task(article)
|
|
|
|
|
- response_code = response.get("code")
|
|
|
|
|
- if response_code != self.SUCCESS_CODE:
|
|
|
|
|
- # 解构任务创建失败
|
|
|
|
|
- await self.mapper.update_article_decode_status(
|
|
|
|
|
- article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
|
|
|
|
|
- )
|
|
|
|
|
- await self.log_service.log(
|
|
|
|
|
- contents={
|
|
|
|
|
- "article_id": article_id,
|
|
|
|
|
- "task": "create_decode_task",
|
|
|
|
|
- "status": "fail",
|
|
|
|
|
- "data": response,
|
|
|
|
|
- }
|
|
|
|
|
- )
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- task_id = response.get("data", {}).get("task_id") or response.get(
|
|
|
|
|
- "data", {}
|
|
|
|
|
- ).get("taskId")
|
|
|
|
|
- if not task_id:
|
|
|
|
|
- # 解构任务创建失败
|
|
|
|
|
- await self.mapper.update_article_decode_status(
|
|
|
|
|
- article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
|
|
|
|
|
- )
|
|
|
|
|
- await self.log_service.log(
|
|
|
|
|
- contents={
|
|
|
|
|
- "article_id": article_id,
|
|
|
|
|
- "task": "create_decode_task",
|
|
|
|
|
- "status": "fail",
|
|
|
|
|
- "data": response,
|
|
|
|
|
- }
|
|
|
|
|
- )
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- # 创建 decode 任务成功
|
|
|
|
|
- await self.log_service.log(
|
|
|
|
|
- contents={
|
|
|
|
|
- "article_id": article_id,
|
|
|
|
|
- "task": "create_decode_task",
|
|
|
|
|
- "status": "success",
|
|
|
|
|
- "data": response,
|
|
|
|
|
- }
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- wx_sn = article["wx_sn"]
|
|
|
|
|
- remark = f"task_id: {task_id}-创建解构任务"
|
|
|
|
|
- record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
|
|
|
|
|
- if not record_row:
|
|
|
|
|
- # 记录解构任务失败
|
|
|
|
|
- await self.mapper.update_article_decode_status(
|
|
|
|
|
- article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
|
|
|
|
|
- )
|
|
|
|
|
- await self.log_service.log(
|
|
|
|
|
- contents={
|
|
|
|
|
- "article_id": article_id,
|
|
|
|
|
- "task": "record_decode_task",
|
|
|
|
|
- "status": "fail",
|
|
|
|
|
- "message": "创建 decode 记录失败",
|
|
|
|
|
- "data": response,
|
|
|
|
|
- }
|
|
|
|
|
- )
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- # 记录创建成功
|
|
|
|
|
- await self.mapper.update_article_decode_status(
|
|
|
|
|
- article_id, self.PROCESSING_STATUS, self.SUCCESS_STATUS
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- async def fetch_single_task(self, task: Dict):
|
|
|
|
|
- task_id = task["task_id"]
|
|
|
|
|
-
|
|
|
|
|
- # acquire lock
|
|
|
|
|
- acquire_lock = await self.mapper.update_decode_task_status(
|
|
|
|
|
- task_id, self.INIT_STATUS, self.PROCESSING_STATUS
|
|
|
|
|
- )
|
|
|
|
|
- if not acquire_lock:
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- response = await self.tool.fetch_decode_result(task_id)
|
|
|
|
|
- if not response:
|
|
|
|
|
- await self.mapper.update_decode_task_status(
|
|
|
|
|
- task_id=task_id,
|
|
|
|
|
- ori_status=self.PROCESSING_STATUS,
|
|
|
|
|
- new_status=self.INIT_STATUS,
|
|
|
|
|
- remark="获取解构结果失败,服务异常,已回滚状态",
|
|
|
|
|
- )
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- # 请求成功
|
|
|
|
|
- response_code = response.get("code")
|
|
|
|
|
- if response_code != self.SUCCESS_CODE:
|
|
|
|
|
- # 解构任务获取失败
|
|
|
|
|
- await self.mapper.update_decode_task_status(
|
|
|
|
|
- task_id=task_id,
|
|
|
|
|
- ori_status=self.PROCESSING_STATUS,
|
|
|
|
|
- new_status=self.FAILED_STATUS,
|
|
|
|
|
- remark=f"请求解构接口返回异常,标记为失败:{json.dumps(response, ensure_ascii=False)}",
|
|
|
|
|
- )
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- response_data = response.get("data", {})
|
|
|
|
|
- response_task_id = response_data.get("taskId") or response_data.get("task_id")
|
|
|
|
|
- if task_id != response_task_id:
|
|
|
|
|
- # 解构任务获取失败
|
|
|
|
|
- await self.mapper.update_decode_task_status(
|
|
|
|
|
- task_id=task_id,
|
|
|
|
|
- ori_status=self.PROCESSING_STATUS,
|
|
|
|
|
- new_status=self.FAILED_STATUS,
|
|
|
|
|
- remark=f"请求解构接口TaskId异常:{json.dumps(response, ensure_ascii=False)}",
|
|
|
|
|
- )
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- status = response_data.get("status")
|
|
|
|
|
- match status:
|
|
|
|
|
- case self.PENDING:
|
|
|
|
|
- await self.mapper.update_decode_task_status(
|
|
|
|
|
- task_id=task_id,
|
|
|
|
|
- ori_status=self.PROCESSING_STATUS,
|
|
|
|
|
- new_status=self.INIT_STATUS,
|
|
|
|
|
- remark=f"解构任务状态为PENDING,继续轮询",
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- case self.RUNNING:
|
|
|
|
|
- await self.mapper.update_decode_task_status(
|
|
|
|
|
- task_id=task_id,
|
|
|
|
|
- ori_status=self.PROCESSING_STATUS,
|
|
|
|
|
- new_status=self.INIT_STATUS,
|
|
|
|
|
- remark=f"解构任务状态为RUNNING,继续轮询",
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- case self.SUCCESS:
|
|
|
|
|
- await self.mapper.set_decode_result(
|
|
|
|
|
- task_id=task_id,
|
|
|
|
|
- result=json.dumps(response_data, ensure_ascii=False),
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- case self.FAILED:
|
|
|
|
|
- await self.mapper.update_decode_task_status(
|
|
|
|
|
- task_id=task_id,
|
|
|
|
|
- ori_status=self.PROCESSING_STATUS,
|
|
|
|
|
- new_status=self.FAILED_STATUS,
|
|
|
|
|
- remark=f"解构任务状态为FAILED,标记为失败",
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- case _:
|
|
|
|
|
- await self.mapper.update_decode_task_status(
|
|
|
|
|
- task_id=task_id,
|
|
|
|
|
- ori_status=self.PROCESSING_STATUS,
|
|
|
|
|
- new_status=self.INIT_STATUS,
|
|
|
|
|
- remark=f"解构任务状态未知(status={status}),回滚待重试:{json.dumps(response_data, ensure_ascii=False)}",
|
|
|
|
|
- )
|
|
|
|
|
- await self.log_service.log(
|
|
|
|
|
- contents={
|
|
|
|
|
- "task": "fetch_single_task",
|
|
|
|
|
- "task_id": task_id,
|
|
|
|
|
- "status": "unknown",
|
|
|
|
|
- "message": f"unexpected decode status: {status}",
|
|
|
|
|
- "data": response_data,
|
|
|
|
|
- }
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- async def extract_single_result(self, task):
|
|
|
|
|
- task_id = task["id"]
|
|
|
|
|
-
|
|
|
|
|
- # acquire lock by extract_status
|
|
|
|
|
- acquire_lock = await self.mapper.update_extract_status(
|
|
|
|
|
- task_id, self.INIT_STATUS, self.PROCESSING_STATUS
|
|
|
|
|
- )
|
|
|
|
|
- if not acquire_lock:
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- try:
|
|
|
|
|
- result = json.loads(task["result"])["result"]
|
|
|
|
|
- except (TypeError, KeyError, json.JSONDecodeError) as e:
|
|
|
|
|
- await self.mapper.update_extract_status(
|
|
|
|
|
- task_id,
|
|
|
|
|
- self.PROCESSING_STATUS,
|
|
|
|
|
- self.FAILED_STATUS,
|
|
|
|
|
- )
|
|
|
|
|
- await self.log_service.log(
|
|
|
|
|
- contents={
|
|
|
|
|
- "task": "extract_single_result",
|
|
|
|
|
- "task_id": task_id,
|
|
|
|
|
- "status": "fail",
|
|
|
|
|
- "message": f"parse decode result error: {e}",
|
|
|
|
|
- "raw": task.get("result"),
|
|
|
|
|
- }
|
|
|
|
|
- )
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- detail = self.tool.extract_decode_result(result)
|
|
|
|
|
- # 如果工具返回错误信息,直接标记为失败
|
|
|
|
|
- if detail.get("error"):
|
|
|
|
|
- await self.mapper.update_extract_status(
|
|
|
|
|
- task_id,
|
|
|
|
|
- self.PROCESSING_STATUS,
|
|
|
|
|
- self.FAILED_STATUS,
|
|
|
|
|
- )
|
|
|
|
|
- await self.log_service.log(
|
|
|
|
|
- contents={
|
|
|
|
|
- "task": "extract_single_result",
|
|
|
|
|
- "task_id": task_id,
|
|
|
|
|
- "status": "fail",
|
|
|
|
|
- "message": detail["error"],
|
|
|
|
|
- }
|
|
|
|
|
- )
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- # 写入明细表
|
|
|
|
|
- saved = await self.mapper.record_extract_detail(task_id, detail)
|
|
|
|
|
- if not saved:
|
|
|
|
|
- await self.mapper.update_extract_status(
|
|
|
|
|
- task_id,
|
|
|
|
|
- self.PROCESSING_STATUS,
|
|
|
|
|
- self.FAILED_STATUS,
|
|
|
|
|
- )
|
|
|
|
|
- await self.log_service.log(
|
|
|
|
|
- contents={
|
|
|
|
|
- "task": "extract_single_result",
|
|
|
|
|
- "task_id": task_id,
|
|
|
|
|
- "status": "fail",
|
|
|
|
|
- "message": "insert long_articles_decode_task_detail failed",
|
|
|
|
|
- "detail": detail,
|
|
|
|
|
- }
|
|
|
|
|
- )
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- # 写入成功,更新状态为成功
|
|
|
|
|
- await self.mapper.update_extract_status(
|
|
|
|
|
- task_id,
|
|
|
|
|
- self.PROCESSING_STATUS,
|
|
|
|
|
- self.SUCCESS_STATUS,
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- async def create_tasks(self):
|
|
|
|
|
- article_list = await self.mapper.fetch_decode_articles()
|
|
|
|
|
- if not article_list:
|
|
|
|
|
- await self.log_service.log(
|
|
|
|
|
- contents={
|
|
|
|
|
- "task": "create_tasks",
|
|
|
|
|
- "message": "No more articles to decode",
|
|
|
|
|
- }
|
|
|
|
|
- )
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- for article in tqdm(article_list, desc="Creating decode tasks"):
|
|
|
|
|
- await self.create_single_decode_task(article)
|
|
|
|
|
-
|
|
|
|
|
- async def fetch_results(self):
|
|
|
|
|
- decoding_tasks = await self.mapper.fetch_decoding_tasks()
|
|
|
|
|
- if not decoding_tasks:
|
|
|
|
|
- await self.log_service.log(
|
|
|
|
|
- contents={"task": "fetch_results", "message": "No more tasks to fetch"}
|
|
|
|
|
- )
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- for task in decoding_tasks:
|
|
|
|
|
- await self.fetch_single_task(task)
|
|
|
|
|
-
|
|
|
|
|
- async def extract_task(self):
|
|
|
|
|
- tasks = await self.mapper.fetch_extract_tasks()
|
|
|
|
|
- await run_tasks_with_asyncio_task_group(
|
|
|
|
|
- task_list=tasks,
|
|
|
|
|
- handler=self.extract_single_result,
|
|
|
|
|
- description="批量解析结构结果",
|
|
|
|
|
- unit="task",
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- async def deal(self, task_name):
|
|
|
|
|
- match task_name:
|
|
|
|
|
- case "create_tasks":
|
|
|
|
|
- await self.create_tasks()
|
|
|
|
|
-
|
|
|
|
|
- case "fetch_results":
|
|
|
|
|
- await self.fetch_results()
|
|
|
|
|
-
|
|
|
|
|
- case "extract":
|
|
|
|
|
- await self.extract_task()
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-__all__ = ["AdPlatformArticlesDecodeTask"]
|
|
|