|
@@ -0,0 +1,260 @@
|
|
|
|
|
+import json
|
|
|
|
|
+from typing import Dict
|
|
|
|
|
+from tqdm import tqdm
|
|
|
|
|
+
|
|
|
|
|
+from app.core.database import DatabaseManager
|
|
|
|
|
+from app.core.observability import LogService
|
|
|
|
|
+
|
|
|
|
|
+from app.infra.shared import run_tasks_with_asyncio_task_group
|
|
|
|
|
+
|
|
|
|
|
+from ._const import InnerArticlesDecodeConst
|
|
|
|
|
+from ._mapper import InnerArticlesDecodeMapper
|
|
|
|
|
+from ._util import InnerArticlesDecodeUtil
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class InnerArticlesDecodeTask(InnerArticlesDecodeConst):
|
|
|
|
|
+ def __init__(self, pool: DatabaseManager, log_service: LogService):
|
|
|
|
|
+ self.pool = pool
|
|
|
|
|
+ self.log_service = log_service
|
|
|
|
|
+ self.mapper = InnerArticlesDecodeMapper(self.pool)
|
|
|
|
|
+ self.tool = InnerArticlesDecodeUtil()
|
|
|
|
|
+
|
|
|
|
|
+ async def create_single_decode_task(self, article: Dict):
|
|
|
|
|
+ # Acquire Lock
|
|
|
|
|
+ source_id = article["source_id"]
|
|
|
|
|
+ article_produce_info = await self.mapper.fetch_inner_articles_produce_detail(
|
|
|
|
|
+ source_id
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 与解构系统交互,创建解构任务
|
|
|
|
|
+ response = await self.tool.create_decode_task(article, article_produce_info)
|
|
|
|
|
+ response_code = response.get("code")
|
|
|
|
|
+ if response_code != self.RequestDecode.SUCCESS:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ task_id = response.get("data", {}).get("task_id") or response.get(
|
|
|
|
|
+ "data", {}
|
|
|
|
|
+ ).get("taskId")
|
|
|
|
|
+ if not task_id:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ wx_sn = article["wx_sn"]
|
|
|
|
|
+ remark = f"task_id: {task_id}-创建解构任务"
|
|
|
|
|
+ record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
|
|
|
|
|
+ if not record_row:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ async def fetch_single_task(self, task: Dict):
|
|
|
|
|
+ task_id = task["task_id"]
|
|
|
|
|
+
|
|
|
|
|
+ # acquire lock
|
|
|
|
|
+ acquire_lock = await self.mapper.update_decode_task_status(
|
|
|
|
|
+ task_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
|
|
|
|
|
+ )
|
|
|
|
|
+ if not acquire_lock:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ response = await self.tool.fetch_decode_result(task_id)
|
|
|
|
|
+ if not response:
|
|
|
|
|
+ await self.mapper.update_decode_task_status(
|
|
|
|
|
+ task_id=task_id,
|
|
|
|
|
+ ori_status=self.TaskStatus.PROCESSING,
|
|
|
|
|
+ new_status=self.TaskStatus.INIT,
|
|
|
|
|
+ remark="获取解构结果失败,服务异常,已回滚状态",
|
|
|
|
|
+ )
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ # 请求成功
|
|
|
|
|
+ response_code = response.get("code")
|
|
|
|
|
+ if response_code != self.RequestDecode.SUCCESS:
|
|
|
|
|
+ # 解构任务获取失败
|
|
|
|
|
+ await self.mapper.update_decode_task_status(
|
|
|
|
|
+ task_id=task_id,
|
|
|
|
|
+ ori_status=self.TaskStatus.PROCESSING,
|
|
|
|
|
+ new_status=self.TaskStatus.FAILED,
|
|
|
|
|
+ remark=f"请求解构接口返回异常,标记为失败:{json.dumps(response, ensure_ascii=False)}",
|
|
|
|
|
+ )
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ response_data = response.get("data", {})
|
|
|
|
|
+ response_task_id = response_data.get("taskId") or response_data.get("task_id")
|
|
|
|
|
+ if task_id != response_task_id:
|
|
|
|
|
+ # 解构任务获取失败
|
|
|
|
|
+ await self.mapper.update_decode_task_status(
|
|
|
|
|
+ task_id=task_id,
|
|
|
|
|
+ ori_status=self.TaskStatus.PROCESSING,
|
|
|
|
|
+ new_status=self.TaskStatus.FAILED,
|
|
|
|
|
+ remark=f"请求解构接口TaskId异常:{json.dumps(response, ensure_ascii=False)}",
|
|
|
|
|
+ )
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ status = response_data.get("status")
|
|
|
|
|
+ match status:
|
|
|
|
|
+ case self.DecodeStatus.PENDING:
|
|
|
|
|
+ await self.mapper.update_decode_task_status(
|
|
|
|
|
+ task_id=task_id,
|
|
|
|
|
+ ori_status=self.TaskStatus.PROCESSING,
|
|
|
|
|
+ new_status=self.TaskStatus.INIT,
|
|
|
|
|
+ remark=f"解构任务状态为PENDING,继续轮询",
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ case self.DecodeStatus.RUNNING:
|
|
|
|
|
+ await self.mapper.update_decode_task_status(
|
|
|
|
|
+ task_id=task_id,
|
|
|
|
|
+ ori_status=self.TaskStatus.PROCESSING,
|
|
|
|
|
+ new_status=self.TaskStatus.INIT,
|
|
|
|
|
+ remark=f"解构任务状态为RUNNING,继续轮询",
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ case self.DecodeStatus.SUCCESS:
|
|
|
|
|
+ await self.mapper.set_decode_result(
|
|
|
|
|
+ task_id=task_id,
|
|
|
|
|
+ result=json.dumps(response_data, ensure_ascii=False),
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ case self.DecodeStatus.FAILED:
|
|
|
|
|
+ await self.mapper.update_decode_task_status(
|
|
|
|
|
+ task_id=task_id,
|
|
|
|
|
+ ori_status=self.TaskStatus.PROCESSING,
|
|
|
|
|
+ new_status=self.TaskStatus.FAILED,
|
|
|
|
|
+ remark=f"解构任务状态为FAILED,标记为失败",
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ case _:
|
|
|
|
|
+ await self.mapper.update_decode_task_status(
|
|
|
|
|
+ task_id=task_id,
|
|
|
|
|
+ ori_status=self.TaskStatus.PROCESSING,
|
|
|
|
|
+ new_status=self.TaskStatus.INIT,
|
|
|
|
|
+ remark=f"解构任务状态未知(status={status}),回滚待重试:{json.dumps(response_data, ensure_ascii=False)}",
|
|
|
|
|
+ )
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "fetch_single_task",
|
|
|
|
|
+ "task_id": task_id,
|
|
|
|
|
+ "status": "unknown",
|
|
|
|
|
+ "message": f"unexpected decode status: {status}",
|
|
|
|
|
+ "data": response_data,
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ async def extract_single_result(self, task):
|
|
|
|
|
+ task_id = task["id"]
|
|
|
|
|
+
|
|
|
|
|
+ # acquire lock by extract_status
|
|
|
|
|
+ acquire_lock = await self.mapper.update_extract_status(
|
|
|
|
|
+ task_id, self.ExtractStatus.INIT, self.ExtractStatus.PROCESSING
|
|
|
|
|
+ )
|
|
|
|
|
+ if not acquire_lock:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ result = json.loads(task["result"])["result"]
|
|
|
|
|
+ except (TypeError, KeyError, json.JSONDecodeError) as e:
|
|
|
|
|
+ await self.mapper.update_extract_status(
|
|
|
|
|
+ task_id,
|
|
|
|
|
+ self.ExtractStatus.PROCESSING,
|
|
|
|
|
+ self.ExtractStatus.FAILED,
|
|
|
|
|
+ )
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "extract_single_result",
|
|
|
|
|
+ "task_id": task_id,
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "message": f"parse decode result error: {e}",
|
|
|
|
|
+ "raw": task.get("result"),
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ detail = self.tool.extract_decode_result(result)
|
|
|
|
|
+ # 如果工具返回错误信息,直接标记为失败
|
|
|
|
|
+ if detail.get("error"):
|
|
|
|
|
+ await self.mapper.update_extract_status(
|
|
|
|
|
+ task_id,
|
|
|
|
|
+ self.ExtractStatus.PROCESSING,
|
|
|
|
|
+ self.ExtractStatus.FAILED,
|
|
|
|
|
+ )
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "extract_single_result",
|
|
|
|
|
+ "task_id": task_id,
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "message": detail["error"],
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ # 写入明细表
|
|
|
|
|
+ saved = await self.mapper.record_extract_detail(task_id, detail)
|
|
|
|
|
+ if not saved:
|
|
|
|
|
+ await self.mapper.update_extract_status(
|
|
|
|
|
+ task_id,
|
|
|
|
|
+ self.ExtractStatus.PROCESSING,
|
|
|
|
|
+ self.ExtractStatus.FAILED,
|
|
|
|
|
+ )
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "extract_single_result",
|
|
|
|
|
+ "task_id": task_id,
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "message": "insert long_articles_decode_task_detail failed",
|
|
|
|
|
+ "detail": detail,
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ # 写入成功,更新状态为成功
|
|
|
|
|
+ await self.mapper.update_extract_status(
|
|
|
|
|
+ task_id,
|
|
|
|
|
+ self.ExtractStatus.PROCESSING,
|
|
|
|
|
+ self.ExtractStatus.SUCCESS,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ async def create_tasks(self):
|
|
|
|
|
+ article_list = await self.mapper.fetch_inner_articles()
|
|
|
|
|
+ if not article_list:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "create_tasks",
|
|
|
|
|
+ "message": "No more articles to decode",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ for article in tqdm(article_list[1:], desc="Creating decode tasks"):
|
|
|
|
|
+ await self.create_single_decode_task(article)
|
|
|
|
|
+
|
|
|
|
|
+ async def fetch_results(self):
|
|
|
|
|
+ decoding_tasks = await self.mapper.fetch_decoding_tasks()
|
|
|
|
|
+ if not decoding_tasks:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={"task": "fetch_results", "message": "No more tasks to fetch"}
|
|
|
|
|
+ )
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ for task in tqdm(decoding_tasks, desc="Fetching decode results"):
|
|
|
|
|
+ await self.fetch_single_task(task)
|
|
|
|
|
+
|
|
|
|
|
+ async def extract_task(self):
|
|
|
|
|
+ tasks = await self.mapper.fetch_extract_tasks()
|
|
|
|
|
+ await run_tasks_with_asyncio_task_group(
|
|
|
|
|
+ task_list=tasks,
|
|
|
|
|
+ handler=self.extract_single_result,
|
|
|
|
|
+ description="批量解析结构结果",
|
|
|
|
|
+ unit="task",
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ async def deal(self, task_name):
|
|
|
|
|
+ match task_name:
|
|
|
|
|
+ case "create_tasks":
|
|
|
|
|
+ await self.create_tasks()
|
|
|
|
|
+
|
|
|
|
|
+ # case "fetch_results":
|
|
|
|
|
+ # await self.fetch_results()
|
|
|
|
|
+ #
|
|
|
|
|
+ # case "extract":
|
|
|
|
|
+ # await self.extract_task()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+__all__ = ["InnerArticlesDecodeTask"]
|