create_decode_tasks.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. import json
  2. from typing import Dict, List
  3. from app.core.database import DatabaseManager
  4. from app.core.observability import LogService
  5. from ._const import DecodeMaterialConst
  6. from ._mapper import MaterialDecodeTaskMapper
  7. from ._utils import MaterialDecodeUtils
  8. class CreateMaterialsDecodeTask(DecodeMaterialConst):
  9. def __init__(self, pool: DatabaseManager, log_service: LogService):
  10. self.pool = pool
  11. self.log_service = log_service
  12. self.mapper = MaterialDecodeTaskMapper(self.pool)
  13. self.tool = MaterialDecodeUtils()
  14. async def _acquire_materials(self) -> List[Dict]:
  15. """获取待解构素材并加锁(status INIT → PROCESSING)
  16. 先检查队列中已有多少进行中任务(INIT + PROCESSING),控制消费端压力:
  17. - 软上限:可创建数 ≈ TASK_BATCH - 已有进行中任务数
  18. - 计数与取数之间不原子,其他 worker 可能同时插入,由后续 CAS 锁兜底
  19. - 若队列已满(pending >= TASK_BATCH),跳过本轮
  20. """
  21. pending_count = await self.mapper.count_pending_tasks()
  22. available_slots = max(0, self.TASK_BATCH - pending_count)
  23. if available_slots == 0:
  24. await self.log_service.log(
  25. contents={
  26. "task": "create_material_decode_task",
  27. "message": f"队列已满:进行中任务 {pending_count} >= {self.TASK_BATCH},跳过本轮创建",
  28. }
  29. )
  30. return []
  31. materials = await self.mapper.fetch_materials(limit=available_slots)
  32. locked = []
  33. for m in materials:
  34. mid = m["id"]
  35. acquired = await self.mapper.update_material_status(
  36. mid, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  37. )
  38. if acquired:
  39. locked.append(m)
  40. else:
  41. await self.log_service.log(
  42. contents={
  43. "material_id": mid,
  44. "task": "create_material_decode_task",
  45. "status": "skip",
  46. "message": "acquire lock failed",
  47. }
  48. )
  49. return locked
  50. async def _submit_and_record(self, materials: List[Dict]):
  51. if not materials:
  52. return
  53. # 无 material_cover 的素材直接标记失败
  54. valid = []
  55. for m in materials:
  56. if not m.get("material_cover"):
  57. await self.mapper.update_material_status(
  58. m["id"],
  59. self.TaskStatus.PROCESSING,
  60. self.TaskStatus.FAILED,
  61. )
  62. await self.log_service.log(
  63. contents={
  64. "material_id": m["id"],
  65. "task": "create_material_decode_task",
  66. "status": "fail",
  67. "message": "material_cover is empty, marked as FAILED",
  68. }
  69. )
  70. else:
  71. valid.append(m)
  72. if not valid:
  73. return
  74. # 跨批次去重:查 DB 已有任务
  75. all_source_ids = [str(m["material_id"]) for m in valid]
  76. existing = await self.mapper.fetch_existing_source_ids(all_source_ids)
  77. # 同批次去重:相同 material_id 只保留第一条
  78. seen = set()
  79. deduped = []
  80. dups = []
  81. for m in valid:
  82. mid = str(m["material_id"])
  83. if mid in existing or mid in seen:
  84. dups.append(m)
  85. else:
  86. seen.add(mid)
  87. deduped.append(m)
  88. if dups:
  89. await self.log_service.log(
  90. contents={
  91. "task": "create_material_decode_task",
  92. "message": f"Skipped {len(dups)} duplicate materials (already submitted or same-batch)",
  93. }
  94. )
  95. for m in dups:
  96. await self.mapper.update_material_status(
  97. m["id"],
  98. self.TaskStatus.PROCESSING,
  99. self.TaskStatus.SUCCESS,
  100. )
  101. if not deduped:
  102. return
  103. posts = self.tool.prepare_posts(deduped)
  104. submit_results = await self.tool.submit_decode_batch(posts)
  105. posts_by_cid = {p["channelContentId"]: p for p in posts}
  106. for m in deduped:
  107. source_id = str(m["material_id"])
  108. mid = m["id"]
  109. result = submit_results.get(source_id)
  110. if not result:
  111. await self.mapper.update_material_status(
  112. mid, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  113. )
  114. await self.log_service.log(
  115. contents={
  116. "material_id": mid,
  117. "source_id": source_id,
  118. "task": "create_material_decode_task",
  119. "status": "fail",
  120. "message": "no response for source_id, rolled back to INIT",
  121. }
  122. )
  123. continue
  124. status = result.get("status")
  125. if status == self.SubmitStatus.FAILED:
  126. await self.mapper.update_material_status(
  127. mid, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  128. )
  129. await self.log_service.log(
  130. contents={
  131. "material_id": mid,
  132. "source_id": source_id,
  133. "task": "create_material_decode_task",
  134. "status": "fail",
  135. "data": result,
  136. }
  137. )
  138. continue
  139. if status == self.SubmitStatus.SUCCESS:
  140. query_results = await self.tool.query_decode_results_batch([source_id])
  141. result_data = query_results.get(source_id)
  142. if (
  143. result_data
  144. and result_data.get("status") == self.QueryStatus.SUCCESS
  145. ):
  146. data_content = result_data.get("dataContent") or "{}"
  147. html = result_data.get("html")
  148. await self.mapper.insert_decode_task(
  149. source_id=source_id,
  150. payload=json.dumps(
  151. posts_by_cid.get(source_id, {}), ensure_ascii=False
  152. ),
  153. remark="提交时已有解构结果,直接落库",
  154. )
  155. await self.mapper.set_decode_result(
  156. source_id=source_id,
  157. result=json.dumps(
  158. {"dataContent": data_content, "html": html},
  159. ensure_ascii=False,
  160. ),
  161. remark="提交时已返回 SUCCESS,结果已落库",
  162. )
  163. await self.mapper.update_material_status(
  164. mid, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  165. )
  166. await self.log_service.log(
  167. contents={
  168. "material_id": mid,
  169. "source_id": source_id,
  170. "task": "create_material_decode_task",
  171. "status": "success",
  172. "message": "decode result already available on submit",
  173. }
  174. )
  175. else:
  176. await self.mapper.insert_decode_task(
  177. source_id=source_id,
  178. payload=json.dumps(
  179. posts_by_cid.get(source_id, {}), ensure_ascii=False
  180. ),
  181. remark="提交返回SUCCESS,查询未果,等待轮询",
  182. status=self.TaskStatus.PROCESSING,
  183. )
  184. await self.mapper.update_material_status(
  185. mid, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  186. )
  187. await self.log_service.log(
  188. contents={
  189. "material_id": mid,
  190. "source_id": source_id,
  191. "task": "create_material_decode_task",
  192. "status": "pending",
  193. "message": "submit SUCCESS but query not ready, inserted for polling",
  194. }
  195. )
  196. elif status == self.SubmitStatus.PENDING:
  197. await self.mapper.insert_decode_task(
  198. source_id=source_id,
  199. payload=json.dumps(
  200. posts_by_cid.get(source_id, {}), ensure_ascii=False
  201. ),
  202. remark="素材解构任务已提交,等待轮询",
  203. status=self.TaskStatus.PROCESSING,
  204. )
  205. await self.mapper.update_material_status(
  206. mid, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  207. )
  208. await self.log_service.log(
  209. contents={
  210. "material_id": mid,
  211. "source_id": source_id,
  212. "task": "create_material_decode_task",
  213. "status": "pending",
  214. "message": "task submitted, waiting for polling",
  215. }
  216. )
  217. else:
  218. await self.mapper.update_material_status(
  219. mid, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  220. )
  221. await self.log_service.log(
  222. contents={
  223. "material_id": mid,
  224. "source_id": source_id,
  225. "task": "create_material_decode_task",
  226. "status": "fail",
  227. "message": f"unexpected submit status: {status}, rolled back to INIT",
  228. "data": result,
  229. }
  230. )
  231. async def deal(self):
  232. materials = await self._acquire_materials()
  233. if not materials:
  234. await self.log_service.log(
  235. contents={
  236. "task": "create_material_decode_task",
  237. "message": "No more materials to decode",
  238. }
  239. )
  240. return
  241. await self._submit_and_record(materials)
  242. await self.log_service.log(
  243. contents={
  244. "task": "create_material_decode_task",
  245. "message": f"Processed {len(materials)} materials",
  246. }
  247. )
  248. __all__ = ["CreateMaterialsDecodeTask"]