entrance.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. import json
  2. from typing import Dict
  3. from tqdm import tqdm
  4. from app.core.database import DatabaseManager
  5. from app.core.observability import LogService
  6. from app.infra.shared import run_tasks_with_asyncio_task_group
  7. from ._const import InnerArticlesDecodeConst
  8. from ._mapper import InnerArticlesDecodeMapper
  9. from ._util import InnerArticlesDecodeUtil
  10. class InnerArticlesDecodeTask(InnerArticlesDecodeConst):
  11. def __init__(self, pool: DatabaseManager, log_service: LogService):
  12. self.pool = pool
  13. self.log_service = log_service
  14. self.mapper = InnerArticlesDecodeMapper(self.pool)
  15. self.tool = InnerArticlesDecodeUtil()
  16. async def create_single_decode_task(self, article: Dict):
  17. # Acquire Lock
  18. source_id = article["source_id"]
  19. article_produce_info = await self.mapper.fetch_inner_articles_produce_detail(
  20. source_id
  21. )
  22. # 与解构系统交互,创建解构任务
  23. response = await self.tool.create_decode_task(article, article_produce_info)
  24. response_code = response.get("code")
  25. if response_code != self.RequestDecode.SUCCESS:
  26. return
  27. task_id = response.get("data", {}).get("task_id") or response.get(
  28. "data", {}
  29. ).get("taskId")
  30. if not task_id:
  31. return
  32. wx_sn = article["wx_sn"]
  33. remark = f"task_id: {task_id}-创建解构任务"
  34. record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
  35. if not record_row:
  36. return
  37. async def fetch_single_task(self, task: Dict):
  38. task_id = task["task_id"]
  39. # acquire lock
  40. acquire_lock = await self.mapper.update_decode_task_status(
  41. task_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  42. )
  43. if not acquire_lock:
  44. return
  45. response = await self.tool.fetch_decode_result(task_id)
  46. if not response:
  47. await self.mapper.update_decode_task_status(
  48. task_id=task_id,
  49. ori_status=self.TaskStatus.PROCESSING,
  50. new_status=self.TaskStatus.INIT,
  51. remark="获取解构结果失败,服务异常,已回滚状态",
  52. )
  53. return
  54. # 请求成功
  55. response_code = response.get("code")
  56. if response_code != self.RequestDecode.SUCCESS:
  57. # 解构任务获取失败
  58. await self.mapper.update_decode_task_status(
  59. task_id=task_id,
  60. ori_status=self.TaskStatus.PROCESSING,
  61. new_status=self.TaskStatus.FAILED,
  62. remark=f"请求解构接口返回异常,标记为失败:{json.dumps(response, ensure_ascii=False)}",
  63. )
  64. return
  65. response_data = response.get("data", {})
  66. response_task_id = response_data.get("taskId") or response_data.get("task_id")
  67. if task_id != response_task_id:
  68. # 解构任务获取失败
  69. await self.mapper.update_decode_task_status(
  70. task_id=task_id,
  71. ori_status=self.TaskStatus.PROCESSING,
  72. new_status=self.TaskStatus.FAILED,
  73. remark=f"请求解构接口TaskId异常:{json.dumps(response, ensure_ascii=False)}",
  74. )
  75. return
  76. status = response_data.get("status")
  77. match status:
  78. case self.DecodeStatus.PENDING:
  79. await self.mapper.update_decode_task_status(
  80. task_id=task_id,
  81. ori_status=self.TaskStatus.PROCESSING,
  82. new_status=self.TaskStatus.INIT,
  83. remark=f"解构任务状态为PENDING,继续轮询",
  84. )
  85. case self.DecodeStatus.RUNNING:
  86. await self.mapper.update_decode_task_status(
  87. task_id=task_id,
  88. ori_status=self.TaskStatus.PROCESSING,
  89. new_status=self.TaskStatus.INIT,
  90. remark=f"解构任务状态为RUNNING,继续轮询",
  91. )
  92. case self.DecodeStatus.SUCCESS:
  93. await self.mapper.set_decode_result(
  94. task_id=task_id,
  95. result=json.dumps(response_data, ensure_ascii=False),
  96. )
  97. case self.DecodeStatus.FAILED:
  98. await self.mapper.update_decode_task_status(
  99. task_id=task_id,
  100. ori_status=self.TaskStatus.PROCESSING,
  101. new_status=self.TaskStatus.FAILED,
  102. remark=f"解构任务状态为FAILED,标记为失败",
  103. )
  104. case _:
  105. await self.mapper.update_decode_task_status(
  106. task_id=task_id,
  107. ori_status=self.TaskStatus.PROCESSING,
  108. new_status=self.TaskStatus.INIT,
  109. remark=f"解构任务状态未知(status={status}),回滚待重试:{json.dumps(response_data, ensure_ascii=False)}",
  110. )
  111. await self.log_service.log(
  112. contents={
  113. "task": "fetch_single_task",
  114. "task_id": task_id,
  115. "status": "unknown",
  116. "message": f"unexpected decode status: {status}",
  117. "data": response_data,
  118. }
  119. )
  120. async def extract_single_result(self, task):
  121. task_id = task["id"]
  122. # acquire lock by extract_status
  123. acquire_lock = await self.mapper.update_extract_status(
  124. task_id, self.ExtractStatus.INIT, self.ExtractStatus.PROCESSING
  125. )
  126. if not acquire_lock:
  127. return
  128. try:
  129. result = json.loads(task["result"])["result"]
  130. except (TypeError, KeyError, json.JSONDecodeError) as e:
  131. await self.mapper.update_extract_status(
  132. task_id,
  133. self.ExtractStatus.PROCESSING,
  134. self.ExtractStatus.FAILED,
  135. )
  136. await self.log_service.log(
  137. contents={
  138. "task": "extract_single_result",
  139. "task_id": task_id,
  140. "status": "fail",
  141. "message": f"parse decode result error: {e}",
  142. "raw": task.get("result"),
  143. }
  144. )
  145. return
  146. detail = self.tool.extract_decode_result(result)
  147. # 如果工具返回错误信息,直接标记为失败
  148. if detail.get("error"):
  149. await self.mapper.update_extract_status(
  150. task_id,
  151. self.ExtractStatus.PROCESSING,
  152. self.ExtractStatus.FAILED,
  153. )
  154. await self.log_service.log(
  155. contents={
  156. "task": "extract_single_result",
  157. "task_id": task_id,
  158. "status": "fail",
  159. "message": detail["error"],
  160. }
  161. )
  162. return
  163. # 写入明细表
  164. saved = await self.mapper.record_extract_detail(task_id, detail)
  165. if not saved:
  166. await self.mapper.update_extract_status(
  167. task_id,
  168. self.ExtractStatus.PROCESSING,
  169. self.ExtractStatus.FAILED,
  170. )
  171. await self.log_service.log(
  172. contents={
  173. "task": "extract_single_result",
  174. "task_id": task_id,
  175. "status": "fail",
  176. "message": "insert long_articles_decode_task_detail failed",
  177. "detail": detail,
  178. }
  179. )
  180. return
  181. # 写入成功,更新状态为成功
  182. await self.mapper.update_extract_status(
  183. task_id,
  184. self.ExtractStatus.PROCESSING,
  185. self.ExtractStatus.SUCCESS,
  186. )
  187. async def create_tasks(self):
  188. article_list = await self.mapper.fetch_inner_articles()
  189. if not article_list:
  190. await self.log_service.log(
  191. contents={
  192. "task": "create_tasks",
  193. "message": "No more articles to decode",
  194. }
  195. )
  196. return
  197. for article in tqdm(article_list[1:], desc="Creating decode tasks"):
  198. await self.create_single_decode_task(article)
  199. async def fetch_results(self):
  200. decoding_tasks = await self.mapper.fetch_decoding_tasks()
  201. if not decoding_tasks:
  202. await self.log_service.log(
  203. contents={"task": "fetch_results", "message": "No more tasks to fetch"}
  204. )
  205. return
  206. for task in tqdm(decoding_tasks, desc="Fetching decode results"):
  207. await self.fetch_single_task(task)
  208. async def extract_task(self):
  209. tasks = await self.mapper.fetch_extract_tasks()
  210. await run_tasks_with_asyncio_task_group(
  211. task_list=tasks,
  212. handler=self.extract_single_result,
  213. description="批量解析结构结果",
  214. unit="task",
  215. )
  216. async def deal(self, task_name):
  217. match task_name:
  218. case "create_tasks":
  219. await self.create_tasks()
  220. # case "fetch_results":
  221. # await self.fetch_results()
  222. #
  223. # case "extract":
  224. # await self.extract_task()
  225. __all__ = ["InnerArticlesDecodeTask"]