entrance.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. import json
  2. from typing import Dict
  3. from tqdm import tqdm
  4. from app.core.database import DatabaseManager
  5. from app.core.observability import LogService
  6. from ._const import AdPlatformArticlesDecodeConst
  7. from ._mapper import AdPlatformArticlesDecodeMapper
  8. from ._util import AdPlatformArticlesDecodeUtil
  9. class AdPlatformArticlesDecodeTask(AdPlatformArticlesDecodeConst):
  10. def __init__(self, pool: DatabaseManager, log_service: LogService):
  11. self.pool = pool
  12. self.log_service = log_service
  13. self.mapper = AdPlatformArticlesDecodeMapper(self.pool)
  14. self.tool = AdPlatformArticlesDecodeUtil()
  15. async def create_single_decode_task(self, article: Dict):
  16. # Acquire Lock
  17. article_id = article["id"]
  18. acquire_lock = await self.mapper.update_article_decode_status(
  19. article_id, self.INIT_STATUS, self.PROCESSING_STATUS
  20. )
  21. if not acquire_lock:
  22. await self.log_service.log(
  23. contents={
  24. "article_id": article_id,
  25. "task": "create_decode_task",
  26. "status": "skip",
  27. "message": "acquire lock failed",
  28. }
  29. )
  30. return
  31. # 与解构系统交互,创建解构任务
  32. response = await self.tool.create_decode_task(article)
  33. response_code = response.get("code")
  34. if response_code != self.SUCCESS_CODE:
  35. # 解构任务创建失败
  36. await self.mapper.update_article_decode_status(
  37. article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
  38. )
  39. await self.log_service.log(
  40. contents={
  41. "article_id": article_id,
  42. "task": "create_decode_task",
  43. "status": "fail",
  44. "data": response,
  45. }
  46. )
  47. return
  48. task_id = response.get("data", {}).get("task_id")
  49. if not task_id:
  50. # 解构任务创建失败
  51. await self.mapper.update_article_decode_status(
  52. article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
  53. )
  54. await self.log_service.log(
  55. contents={
  56. "article_id": article_id,
  57. "task": "create_decode_task",
  58. "status": "fail",
  59. "data": response,
  60. }
  61. )
  62. return
  63. # 创建 decode 任务成功
  64. await self.log_service.log(
  65. contents={
  66. "article_id": article_id,
  67. "task": "create_decode_task",
  68. "status": "success",
  69. "data": response,
  70. }
  71. )
  72. wx_sn = article["wx_sn"]
  73. remark = f"task_id: {task_id}-创建解构任务"
  74. record_row = await self.mapper.record_decode_task(task_id, wx_sn, remark)
  75. if not record_row:
  76. # 记录解构任务失败
  77. await self.mapper.update_article_decode_status(
  78. article_id, self.PROCESSING_STATUS, self.FAILED_STATUS
  79. )
  80. await self.log_service.log(
  81. contents={
  82. "article_id": article_id,
  83. "task": "record_decode_task",
  84. "status": "fail",
  85. "message": "创建 decode 记录失败",
  86. "data": response,
  87. }
  88. )
  89. return
  90. # 记录创建成功
  91. await self.mapper.update_article_decode_status(
  92. article_id, self.PROCESSING_STATUS, self.SUCCESS_STATUS
  93. )
  94. async def fetch_single_task(self, task: Dict):
  95. task_id = task["task_id"]
  96. # acquire lock
  97. acquire_lock = await self.mapper.update_decode_task_status(
  98. task_id, self.INIT_STATUS, self.PROCESSING_STATUS
  99. )
  100. if not acquire_lock:
  101. return
  102. response = await self.tool.fetch_decode_result(task_id)
  103. if not response:
  104. await self.mapper.update_decode_task_status(
  105. task_id=task_id,
  106. ori_status=self.PROCESSING_STATUS,
  107. new_status=self.INIT_STATUS,
  108. remark="获取解构结果失败,服务异常,已回滚状态",
  109. )
  110. return
  111. # 请求成功
  112. response_code = response.get("code")
  113. if response_code != self.SUCCESS_CODE:
  114. # 解构任务获取失败
  115. await self.mapper.update_decode_task_status(
  116. task_id=task_id,
  117. ori_status=self.PROCESSING_STATUS,
  118. new_status=self.FAILED_STATUS,
  119. remark=f"请求解构接口返回异常,标记为失败:{json.dumps(response, ensure_ascii=False)}",
  120. )
  121. return
  122. response_data = response.get("data", {})
  123. response_task_id = response_data.get("taskId")
  124. if task_id != response_task_id:
  125. # 解构任务获取失败
  126. await self.mapper.update_decode_task_status(
  127. task_id=task_id,
  128. ori_status=self.PROCESSING_STATUS,
  129. new_status=self.FAILED_STATUS,
  130. remark=f"请求解构接口TaskId异常:{json.dumps(response, ensure_ascii=False)}",
  131. )
  132. return
  133. status = response_data.get("status")
  134. match status:
  135. case self.PENDING:
  136. await self.mapper.update_decode_task_status(
  137. task_id=task_id,
  138. ori_status=self.PROCESSING_STATUS,
  139. new_status=self.INIT_STATUS,
  140. remark=f"解构任务状态为PENDING,继续轮询",
  141. )
  142. case self.RUNNING:
  143. await self.mapper.update_decode_task_status(
  144. task_id=task_id,
  145. ori_status=self.PROCESSING_STATUS,
  146. new_status=self.INIT_STATUS,
  147. remark=f"解构任务状态为RUNNING,继续轮询",
  148. )
  149. case self.SUCCESS:
  150. await self.mapper.set_decode_result(
  151. task_id=task_id,
  152. result=json.dumps(response_data, ensure_ascii=False),
  153. )
  154. case self.FAILED:
  155. await self.mapper.update_decode_task_status(
  156. task_id=task_id,
  157. ori_status=self.PROCESSING_STATUS,
  158. new_status=self.FAILED_STATUS,
  159. remark=f"解构任务状态为FAILED,标记为失败",
  160. )
  161. case _:
  162. await self.mapper.update_decode_task_status(
  163. task_id=task_id,
  164. ori_status=self.PROCESSING_STATUS,
  165. new_status=self.INIT_STATUS,
  166. remark=f"解构任务状态未知(status={status}),回滚待重试:{json.dumps(response_data, ensure_ascii=False)}",
  167. )
  168. await self.log_service.log(
  169. contents={
  170. "task": "fetch_single_task",
  171. "task_id": task_id,
  172. "status": "unknown",
  173. "message": f"unexpected decode status: {status}",
  174. "data": response_data,
  175. }
  176. )
  177. async def create_tasks(self):
  178. article_list = await self.mapper.fetch_decode_articles()
  179. if not article_list:
  180. await self.log_service.log(
  181. contents={"task": "create_tasks", "message": "No more articles to decode"}
  182. )
  183. return
  184. for article in tqdm(article_list, desc="Creating decode tasks"):
  185. await self.create_single_decode_task(article)
  186. async def fetch_results(self):
  187. decoding_tasks = await self.mapper.fetch_decoding_tasks()
  188. if not decoding_tasks:
  189. await self.log_service.log(
  190. contents={"task": "fetch_results", "message": "No more tasks to fetch"}
  191. )
  192. return
  193. for task in decoding_tasks:
  194. await self.fetch_single_task(task)
  195. async def deal(self, task_name):
  196. match task_name:
  197. case "create_tasks":
  198. await self.create_tasks()
  199. case "fetch_results":
  200. await self.fetch_results()
  201. __all__ = ["AdPlatformArticlesDecodeTask"]