create_decode_tasks.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. import json
  2. from typing import Dict, List
  3. from app.core.database import DatabaseManager
  4. from app.core.observability import LogService
  5. from ._const import DecodeMaterialConst
  6. from ._mapper import MaterialDecodeTaskMapper
  7. from ._utils import MaterialDecodeUtils
  8. class CreateMaterialsDecodeTask(DecodeMaterialConst):
  9. def __init__(self, pool: DatabaseManager, log_service: LogService):
  10. self.pool = pool
  11. self.log_service = log_service
  12. self.mapper = MaterialDecodeTaskMapper(self.pool)
  13. self.tool = MaterialDecodeUtils()
  14. async def _acquire_materials(self) -> List[Dict]:
  15. """获取待解构素材并加锁(status INIT → PROCESSING)"""
  16. materials = await self.mapper.fetch_materials()
  17. locked = []
  18. for m in materials:
  19. mid = m["id"]
  20. acquired = await self.mapper.update_material_status(
  21. mid, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  22. )
  23. if acquired:
  24. locked.append(m)
  25. else:
  26. await self.log_service.log(
  27. contents={
  28. "material_id": mid,
  29. "task": "create_material_decode_task",
  30. "status": "skip",
  31. "message": "acquire lock failed",
  32. }
  33. )
  34. return locked
  35. async def _submit_and_record(self, materials: List[Dict]):
  36. if not materials:
  37. return
  38. # 无 material_cover 的素材直接标记失败
  39. valid = []
  40. for m in materials:
  41. if not m.get("material_cover"):
  42. await self.mapper.update_material_status(
  43. m["id"],
  44. self.TaskStatus.PROCESSING,
  45. self.TaskStatus.FAILED,
  46. )
  47. await self.log_service.log(
  48. contents={
  49. "material_id": m["id"],
  50. "task": "create_material_decode_task",
  51. "status": "fail",
  52. "message": "material_cover is empty, marked as FAILED",
  53. }
  54. )
  55. else:
  56. valid.append(m)
  57. if not valid:
  58. return
  59. # 跨批次去重:查 DB 已有任务
  60. all_source_ids = [str(m["material_id"]) for m in valid]
  61. existing = await self.mapper.fetch_existing_source_ids(all_source_ids)
  62. # 同批次去重:相同 material_id 只保留第一条
  63. seen = set()
  64. deduped = []
  65. dups = []
  66. for m in valid:
  67. mid = str(m["material_id"])
  68. if mid in existing or mid in seen:
  69. dups.append(m)
  70. else:
  71. seen.add(mid)
  72. deduped.append(m)
  73. if dups:
  74. await self.log_service.log(
  75. contents={
  76. "task": "create_material_decode_task",
  77. "message": f"Skipped {len(dups)} duplicate materials (already submitted or same-batch)",
  78. }
  79. )
  80. for m in dups:
  81. await self.mapper.update_material_status(
  82. m["id"],
  83. self.TaskStatus.PROCESSING,
  84. self.TaskStatus.SUCCESS,
  85. )
  86. if not deduped:
  87. return
  88. posts = self.tool.prepare_posts(deduped)
  89. submit_results = await self.tool.submit_decode_batch(posts)
  90. posts_by_cid = {p["channelContentId"]: p for p in posts}
  91. for m in deduped:
  92. source_id = str(m["material_id"])
  93. mid = m["id"]
  94. result = submit_results.get(source_id)
  95. if not result:
  96. await self.mapper.update_material_status(
  97. mid, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  98. )
  99. await self.log_service.log(
  100. contents={
  101. "material_id": mid,
  102. "source_id": source_id,
  103. "task": "create_material_decode_task",
  104. "status": "fail",
  105. "message": "no response for source_id, rolled back to INIT",
  106. }
  107. )
  108. continue
  109. status = result.get("status")
  110. if status == self.SubmitStatus.FAILED:
  111. await self.mapper.update_material_status(
  112. mid, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  113. )
  114. await self.log_service.log(
  115. contents={
  116. "material_id": mid,
  117. "source_id": source_id,
  118. "task": "create_material_decode_task",
  119. "status": "fail",
  120. "data": result,
  121. }
  122. )
  123. continue
  124. if status == self.SubmitStatus.SUCCESS:
  125. query_results = await self.tool.query_decode_results_batch([source_id])
  126. result_data = query_results.get(source_id)
  127. if (
  128. result_data
  129. and result_data.get("status") == self.QueryStatus.SUCCESS
  130. ):
  131. data_content = result_data.get("dataContent") or "{}"
  132. html = result_data.get("html")
  133. await self.mapper.insert_decode_task(
  134. source_id=source_id,
  135. payload=json.dumps(
  136. posts_by_cid.get(source_id, {}), ensure_ascii=False
  137. ),
  138. remark="提交时已有解构结果,直接落库",
  139. )
  140. await self.mapper.set_decode_result(
  141. source_id=source_id,
  142. result=json.dumps(
  143. {"dataContent": data_content, "html": html},
  144. ensure_ascii=False,
  145. ),
  146. remark="提交时已返回 SUCCESS,结果已落库",
  147. )
  148. await self.mapper.update_material_status(
  149. mid, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  150. )
  151. await self.log_service.log(
  152. contents={
  153. "material_id": mid,
  154. "source_id": source_id,
  155. "task": "create_material_decode_task",
  156. "status": "success",
  157. "message": "decode result already available on submit",
  158. }
  159. )
  160. else:
  161. await self.mapper.insert_decode_task(
  162. source_id=source_id,
  163. payload=json.dumps(
  164. posts_by_cid.get(source_id, {}), ensure_ascii=False
  165. ),
  166. remark="提交返回SUCCESS,查询未果,等待轮询",
  167. status=self.TaskStatus.PROCESSING,
  168. )
  169. await self.mapper.update_material_status(
  170. mid, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  171. )
  172. await self.log_service.log(
  173. contents={
  174. "material_id": mid,
  175. "source_id": source_id,
  176. "task": "create_material_decode_task",
  177. "status": "pending",
  178. "message": "submit SUCCESS but query not ready, inserted for polling",
  179. }
  180. )
  181. elif status == self.SubmitStatus.PENDING:
  182. await self.mapper.insert_decode_task(
  183. source_id=source_id,
  184. payload=json.dumps(
  185. posts_by_cid.get(source_id, {}), ensure_ascii=False
  186. ),
  187. remark="素材解构任务已提交,等待轮询",
  188. status=self.TaskStatus.PROCESSING,
  189. )
  190. await self.mapper.update_material_status(
  191. mid, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  192. )
  193. await self.log_service.log(
  194. contents={
  195. "material_id": mid,
  196. "source_id": source_id,
  197. "task": "create_material_decode_task",
  198. "status": "pending",
  199. "message": "task submitted, waiting for polling",
  200. }
  201. )
  202. else:
  203. await self.mapper.update_material_status(
  204. mid, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  205. )
  206. await self.log_service.log(
  207. contents={
  208. "material_id": mid,
  209. "source_id": source_id,
  210. "task": "create_material_decode_task",
  211. "status": "fail",
  212. "message": f"unexpected submit status: {status}, rolled back to INIT",
  213. "data": result,
  214. }
  215. )
  216. async def deal(self):
  217. materials = await self._acquire_materials()
  218. if not materials:
  219. await self.log_service.log(
  220. contents={
  221. "task": "create_material_decode_task",
  222. "message": "No more materials to decode",
  223. }
  224. )
  225. return
  226. await self._submit_and_record(materials)
  227. await self.log_service.log(
  228. contents={
  229. "task": "create_material_decode_task",
  230. "message": f"Processed {len(materials)} materials",
  231. }
  232. )
  233. __all__ = ["CreateMaterialsDecodeTask"]