entrance.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. import json
  2. from typing import Dict
  3. from tqdm import tqdm
  4. from app.core.database import DatabaseManager
  5. from app.core.observability import LogService
  6. from app.infra.shared import run_tasks_with_asyncio_task_group
  7. from ._const import AdPlatformArticlesDecodeConst
  8. from ._mapper import AdPlatformArticlesDecodeMapper
  9. from ._util import AdPlatformArticlesDecodeUtil
  10. class AdPlatformArticlesDecodeTask(AdPlatformArticlesDecodeConst):
  11. def __init__(self, pool: DatabaseManager, log_service: LogService):
  12. self.pool = pool
  13. self.log_service = log_service
  14. self.mapper = AdPlatformArticlesDecodeMapper(self.pool)
  15. self.tool = AdPlatformArticlesDecodeUtil()
  16. async def create_single_decode_task(self, article: Dict):
  17. # Acquire Lock
  18. article_id = article["id"]
  19. acquire_lock = await self.mapper.update_article_decode_status(
  20. article_id, self.INIT_STATUS, self.PROCESSING_STATUS
  21. )
  22. if not acquire_lock:
  23. await self.log_service.log(
  24. contents={
  25. "article_id": article_id,
  26. "task": "create_decode_task",
  27. "status": "skip",
  28. "message": "acquire lock failed",
  29. }
  30. )
  31. return
  32. # 与解构系统交互,创建解构任务
  33. response = await self.tool.create_decode_task(article)
  34. response_code = response.get("code")
  35. if response_code != self.SUCCESS_CODE:
  36. # 解构任务创建失败
  37. await self.mapper.update_article_decode_status(
  38. article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
  39. )
  40. await self.log_service.log(
  41. contents={
  42. "article_id": article_id,
  43. "task": "create_decode_task",
  44. "status": "fail",
  45. "data": response,
  46. }
  47. )
  48. return
  49. task_id = response.get("data", {}).get("task_id") or response.get(
  50. "data", {}
  51. ).get("taskId")
  52. if not task_id:
  53. # 解构任务创建失败
  54. await self.mapper.update_article_decode_status(
  55. article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
  56. )
  57. await self.log_service.log(
  58. contents={
  59. "article_id": article_id,
  60. "task": "create_decode_task",
  61. "status": "fail",
  62. "data": response,
  63. }
  64. )
  65. return
  66. # 创建 decode 任务成功
  67. await self.log_service.log(
  68. contents={
  69. "article_id": article_id,
  70. "task": "create_decode_task",
  71. "status": "success",
  72. "data": response,
  73. }
  74. )
  75. wx_sn = article["wx_sn"]
  76. remark = f"task_id: {task_id}-创建解构任务"
  77. record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
  78. if not record_row:
  79. # 记录解构任务失败
  80. await self.mapper.update_article_decode_status(
  81. article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
  82. )
  83. await self.log_service.log(
  84. contents={
  85. "article_id": article_id,
  86. "task": "record_decode_task",
  87. "status": "fail",
  88. "message": "创建 decode 记录失败",
  89. "data": response,
  90. }
  91. )
  92. return
  93. # 记录创建成功
  94. await self.mapper.update_article_decode_status(
  95. article_id, self.PROCESSING_STATUS, self.SUCCESS_STATUS
  96. )
  97. async def fetch_single_task(self, task: Dict):
  98. task_id = task["task_id"]
  99. # acquire lock
  100. acquire_lock = await self.mapper.update_decode_task_status(
  101. task_id, self.INIT_STATUS, self.PROCESSING_STATUS
  102. )
  103. if not acquire_lock:
  104. return
  105. response = await self.tool.fetch_decode_result(task_id)
  106. if not response:
  107. await self.mapper.update_decode_task_status(
  108. task_id=task_id,
  109. ori_status=self.PROCESSING_STATUS,
  110. new_status=self.INIT_STATUS,
  111. remark="获取解构结果失败,服务异常,已回滚状态",
  112. )
  113. return
  114. # 请求成功
  115. response_code = response.get("code")
  116. if response_code != self.SUCCESS_CODE:
  117. # 解构任务获取失败
  118. await self.mapper.update_decode_task_status(
  119. task_id=task_id,
  120. ori_status=self.PROCESSING_STATUS,
  121. new_status=self.FAILED_STATUS,
  122. remark=f"请求解构接口返回异常,标记为失败:{json.dumps(response, ensure_ascii=False)}",
  123. )
  124. return
  125. response_data = response.get("data", {})
  126. response_task_id = response_data.get("taskId") or response_data.get("task_id")
  127. if task_id != response_task_id:
  128. # 解构任务获取失败
  129. await self.mapper.update_decode_task_status(
  130. task_id=task_id,
  131. ori_status=self.PROCESSING_STATUS,
  132. new_status=self.FAILED_STATUS,
  133. remark=f"请求解构接口TaskId异常:{json.dumps(response, ensure_ascii=False)}",
  134. )
  135. return
  136. status = response_data.get("status")
  137. match status:
  138. case self.PENDING:
  139. await self.mapper.update_decode_task_status(
  140. task_id=task_id,
  141. ori_status=self.PROCESSING_STATUS,
  142. new_status=self.INIT_STATUS,
  143. remark=f"解构任务状态为PENDING,继续轮询",
  144. )
  145. case self.RUNNING:
  146. await self.mapper.update_decode_task_status(
  147. task_id=task_id,
  148. ori_status=self.PROCESSING_STATUS,
  149. new_status=self.INIT_STATUS,
  150. remark=f"解构任务状态为RUNNING,继续轮询",
  151. )
  152. case self.SUCCESS:
  153. await self.mapper.set_decode_result(
  154. task_id=task_id,
  155. result=json.dumps(response_data, ensure_ascii=False),
  156. )
  157. case self.FAILED:
  158. await self.mapper.update_decode_task_status(
  159. task_id=task_id,
  160. ori_status=self.PROCESSING_STATUS,
  161. new_status=self.FAILED_STATUS,
  162. remark=f"解构任务状态为FAILED,标记为失败",
  163. )
  164. case _:
  165. await self.mapper.update_decode_task_status(
  166. task_id=task_id,
  167. ori_status=self.PROCESSING_STATUS,
  168. new_status=self.INIT_STATUS,
  169. remark=f"解构任务状态未知(status={status}),回滚待重试:{json.dumps(response_data, ensure_ascii=False)}",
  170. )
  171. await self.log_service.log(
  172. contents={
  173. "task": "fetch_single_task",
  174. "task_id": task_id,
  175. "status": "unknown",
  176. "message": f"unexpected decode status: {status}",
  177. "data": response_data,
  178. }
  179. )
  180. async def extract_single_result(self, task):
  181. task_id = task["id"]
  182. # acquire lock by extract_status
  183. acquire_lock = await self.mapper.update_extract_status(
  184. task_id, self.INIT_STATUS, self.PROCESSING_STATUS
  185. )
  186. if not acquire_lock:
  187. return
  188. try:
  189. result = json.loads(task["result"])["result"]
  190. except (TypeError, KeyError, json.JSONDecodeError) as e:
  191. await self.mapper.update_extract_status(
  192. task_id,
  193. self.PROCESSING_STATUS,
  194. self.FAILED_STATUS,
  195. )
  196. await self.log_service.log(
  197. contents={
  198. "task": "extract_single_result",
  199. "task_id": task_id,
  200. "status": "fail",
  201. "message": f"parse decode result error: {e}",
  202. "raw": task.get("result"),
  203. }
  204. )
  205. return
  206. detail = self.tool.extract_decode_result(result)
  207. # 如果工具返回错误信息,直接标记为失败
  208. if detail.get("error"):
  209. await self.mapper.update_extract_status(
  210. task_id,
  211. self.PROCESSING_STATUS,
  212. self.FAILED_STATUS,
  213. )
  214. await self.log_service.log(
  215. contents={
  216. "task": "extract_single_result",
  217. "task_id": task_id,
  218. "status": "fail",
  219. "message": detail["error"],
  220. }
  221. )
  222. return
  223. # 写入明细表
  224. saved = await self.mapper.record_extract_detail(task_id, detail)
  225. if not saved:
  226. await self.mapper.update_extract_status(
  227. task_id,
  228. self.PROCESSING_STATUS,
  229. self.FAILED_STATUS,
  230. )
  231. await self.log_service.log(
  232. contents={
  233. "task": "extract_single_result",
  234. "task_id": task_id,
  235. "status": "fail",
  236. "message": "insert long_articles_decode_task_detail failed",
  237. "detail": detail,
  238. }
  239. )
  240. return
  241. # 写入成功,更新状态为成功
  242. await self.mapper.update_extract_status(
  243. task_id,
  244. self.PROCESSING_STATUS,
  245. self.SUCCESS_STATUS,
  246. )
  247. async def create_tasks(self):
  248. article_list = await self.mapper.fetch_decode_articles()
  249. if not article_list:
  250. await self.log_service.log(
  251. contents={
  252. "task": "create_tasks",
  253. "message": "No more articles to decode",
  254. }
  255. )
  256. return
  257. for article in tqdm(article_list, desc="Creating decode tasks"):
  258. await self.create_single_decode_task(article)
  259. async def fetch_results(self):
  260. decoding_tasks = await self.mapper.fetch_decoding_tasks()
  261. if not decoding_tasks:
  262. await self.log_service.log(
  263. contents={"task": "fetch_results", "message": "No more tasks to fetch"}
  264. )
  265. return
  266. for task in decoding_tasks:
  267. await self.fetch_single_task(task)
  268. async def extract_task(self):
  269. tasks = await self.mapper.fetch_extract_tasks()
  270. await run_tasks_with_asyncio_task_group(
  271. task_list=tasks,
  272. handler=self.extract_single_result,
  273. description="批量解析结构结果",
  274. unit="task",
  275. )
  276. async def deal(self, task_name):
  277. match task_name:
  278. case "create_tasks":
  279. await self.create_tasks()
  280. case "fetch_results":
  281. await self.fetch_results()
  282. case "extract":
  283. await self.extract_task()
  284. __all__ = ["AdPlatformArticlesDecodeTask"]