create_decode_tasks.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. import json
  2. from collections import defaultdict
  3. from typing import Dict, List
  4. from app.core.database import DatabaseManager
  5. from app.core.observability import LogService
  6. from ._const import DecodeCardConst
  7. from ._mapper import CardDecodeTaskMapper
  8. from ._utils import CardDecodeUtils
  9. class CreateCardsDecodeTask(DecodeCardConst):
  10. def __init__(self, pool: DatabaseManager, log_service: LogService):
  11. self.pool = pool
  12. self.log_service = log_service
  13. self.mapper = CardDecodeTaskMapper(self.pool)
  14. self.tool = CardDecodeUtils()
  15. async def _acquire_cards(self) -> List[Dict]:
  16. """获取待解构卡片并加锁(status INIT → PROCESSING)"""
  17. cards = await self.mapper.fetch_cards()
  18. locked = []
  19. for card in cards:
  20. card_id = card["id"]
  21. acquired = await self.mapper.update_card_status(
  22. card_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  23. )
  24. if acquired:
  25. locked.append(card)
  26. else:
  27. await self.log_service.log(
  28. contents={
  29. "card_id": card_id,
  30. "task": "create_card_decode_task",
  31. "status": "skip",
  32. "message": "acquire lock failed",
  33. }
  34. )
  35. return locked
  36. async def _ensure_cover_ids(self, cards: List[Dict]):
  37. """为缺少 card_cover_id 的卡片下载封面并计算 MD5"""
  38. for card in cards:
  39. if card.get("card_cover_id"):
  40. continue
  41. if not card.get("share_cover"):
  42. continue # 空封面留给 _submit_and_record 标记 FAILED
  43. try:
  44. cover_id = self.tool.compute_cover_md5(card["share_cover"])
  45. await self.mapper.set_card_cover_id(card["id"], cover_id)
  46. card["card_cover_id"] = cover_id
  47. await self.log_service.log(
  48. contents={
  49. "card_id": card["id"],
  50. "task": "create_card_decode_task",
  51. "status": "info",
  52. "message": f"computed cover_id={cover_id}",
  53. }
  54. )
  55. except Exception as e:
  56. await self.mapper.update_card_status(
  57. card["id"],
  58. self.TaskStatus.PROCESSING,
  59. self.TaskStatus.FAILED,
  60. )
  61. await self.log_service.log(
  62. contents={
  63. "card_id": card["id"],
  64. "share_cover": card.get("share_cover"),
  65. "task": "create_card_decode_task",
  66. "status": "fail",
  67. "message": f"compute cover md5 failed, marked as FAILED: {e}",
  68. }
  69. )
  70. @staticmethod
  71. def _group_cards_by_channel(cards: List[Dict]) -> Dict[int, List[Dict]]:
  72. """按 auto_reply_top_cards_daily.channel 分组,映射到对应的 config_id"""
  73. grouped = defaultdict(list)
  74. for card in cards:
  75. source_channel = card.get("channel", "")
  76. config_id = DecodeCardConst.CHANNEL_CONFIG_MAP.get(source_channel)
  77. if config_id:
  78. grouped[config_id].append(card)
  79. return dict(grouped)
  80. async def _submit_and_record_for_config(
  81. self, cards: List[Dict], config_id: int, source: int
  82. ):
  83. """对同一 config_id 的卡片执行提交与落库"""
  84. if not cards:
  85. return
  86. # 过滤已有任务记录的卡片(按 card_cover_id 去重)
  87. cards_with_cid = [c for c in cards if c.get("card_cover_id")]
  88. if not cards_with_cid:
  89. return
  90. # 跨批次去重:查 DB 已有任务
  91. all_source_ids = [str(c["card_cover_id"]) for c in cards_with_cid]
  92. existing = await self.mapper.fetch_existing_source_ids(
  93. all_source_ids, config_id, source
  94. )
  95. # 同批次去重:相同 card_cover_id 只保留第一条
  96. seen = set()
  97. deduped_cards = []
  98. intra_dups = []
  99. for c in cards_with_cid:
  100. cid = str(c["card_cover_id"])
  101. if cid in existing or cid in seen:
  102. intra_dups.append(c)
  103. else:
  104. seen.add(cid)
  105. deduped_cards.append(c)
  106. if intra_dups:
  107. await self.log_service.log(
  108. contents={
  109. "task": "create_card_decode_task",
  110. "config_id": config_id,
  111. "message": f"Skipped {len(intra_dups)} duplicate cards (already submitted or same-batch)",
  112. }
  113. )
  114. for card in intra_dups:
  115. await self.mapper.update_card_status(
  116. card["id"],
  117. self.TaskStatus.PROCESSING,
  118. self.TaskStatus.SUCCESS,
  119. )
  120. new_cards = deduped_cards
  121. if not new_cards:
  122. return
  123. posts = self.tool.prepare_posts(new_cards)
  124. submit_results = await self.tool.submit_decode_batch(
  125. posts, config_id=config_id
  126. )
  127. posts_by_cid = {p["channelContentId"]: p for p in posts}
  128. for card in new_cards:
  129. source_id = str(card["card_cover_id"])
  130. card_id = card["id"]
  131. result = submit_results.get(source_id)
  132. if not result:
  133. await self.mapper.update_card_status(
  134. card_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  135. )
  136. await self.log_service.log(
  137. contents={
  138. "card_id": card_id,
  139. "source_id": source_id,
  140. "config_id": config_id,
  141. "task": "create_card_decode_task",
  142. "status": "fail",
  143. "message": "no response for source_id, rolled back to INIT",
  144. }
  145. )
  146. continue
  147. status = result.get("status")
  148. if status == self.SubmitStatus.FAILED:
  149. await self.mapper.update_card_status(
  150. card_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  151. )
  152. await self.log_service.log(
  153. contents={
  154. "card_id": card_id,
  155. "source_id": source_id,
  156. "config_id": config_id,
  157. "task": "create_card_decode_task",
  158. "status": "fail",
  159. "data": result,
  160. }
  161. )
  162. continue
  163. if status == self.SubmitStatus.SUCCESS:
  164. query_results = await self.tool.query_decode_results_batch(
  165. [source_id], config_id=config_id
  166. )
  167. result_data = query_results.get(source_id)
  168. if (
  169. result_data
  170. and result_data.get("status") == self.QueryStatus.SUCCESS
  171. ):
  172. data_content = result_data.get("dataContent") or "{}"
  173. html = result_data.get("html")
  174. await self.mapper.insert_decode_task(
  175. source_id=source_id,
  176. config_id=config_id,
  177. source=source,
  178. payload=json.dumps(
  179. posts_by_cid.get(source_id, {}), ensure_ascii=False
  180. ),
  181. remark="提交时已有解构结果,直接落库",
  182. )
  183. await self.mapper.set_decode_result(
  184. source_id=source_id,
  185. config_id=config_id,
  186. result=json.dumps(
  187. {"dataContent": data_content, "html": html},
  188. ensure_ascii=False,
  189. ),
  190. remark="提交时已返回 SUCCESS,结果已落库",
  191. )
  192. await self.mapper.update_card_status(
  193. card_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  194. )
  195. await self.log_service.log(
  196. contents={
  197. "card_id": card_id,
  198. "source_id": source_id,
  199. "config_id": config_id,
  200. "task": "create_card_decode_task",
  201. "status": "success",
  202. "message": "decode result already available on submit",
  203. }
  204. )
  205. else:
  206. await self.mapper.insert_decode_task(
  207. source_id=source_id,
  208. config_id=config_id,
  209. source=source,
  210. payload=json.dumps(
  211. posts_by_cid.get(source_id, {}), ensure_ascii=False
  212. ),
  213. remark="提交返回SUCCESS,查询未果,等待轮询",
  214. status=self.TaskStatus.PROCESSING,
  215. )
  216. await self.mapper.update_card_status(
  217. card_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  218. )
  219. await self.log_service.log(
  220. contents={
  221. "card_id": card_id,
  222. "source_id": source_id,
  223. "config_id": config_id,
  224. "task": "create_card_decode_task",
  225. "status": "pending",
  226. "message": "submit SUCCESS but query not ready, inserted for polling",
  227. }
  228. )
  229. elif status == self.SubmitStatus.PENDING:
  230. await self.mapper.insert_decode_task(
  231. source_id=source_id,
  232. config_id=config_id,
  233. source=source,
  234. payload=json.dumps(
  235. posts_by_cid.get(source_id, {}), ensure_ascii=False
  236. ),
  237. remark="卡片解构任务已提交,等待轮询",
  238. status=self.TaskStatus.PROCESSING,
  239. )
  240. await self.mapper.update_card_status(
  241. card_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  242. )
  243. await self.log_service.log(
  244. contents={
  245. "card_id": card_id,
  246. "source_id": source_id,
  247. "config_id": config_id,
  248. "task": "create_card_decode_task",
  249. "status": "pending",
  250. "message": "task submitted, waiting for polling",
  251. }
  252. )
  253. else:
  254. await self.mapper.update_card_status(
  255. card_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  256. )
  257. await self.log_service.log(
  258. contents={
  259. "card_id": card_id,
  260. "source_id": source_id,
  261. "config_id": config_id,
  262. "task": "create_card_decode_task",
  263. "status": "fail",
  264. "message": f"unexpected submit status: {status}, rolled back to INIT",
  265. "data": result,
  266. }
  267. )
  268. async def _submit_and_record(self, cards: List[Dict]):
  269. if not cards:
  270. return
  271. # 无 share_cover 的卡片直接标记失败
  272. valid_cards = []
  273. for card in cards:
  274. if not card.get("share_cover"):
  275. await self.mapper.update_card_status(
  276. card["id"],
  277. self.TaskStatus.PROCESSING,
  278. self.TaskStatus.FAILED,
  279. )
  280. await self.log_service.log(
  281. contents={
  282. "card_id": card["id"],
  283. "task": "create_card_decode_task",
  284. "status": "fail",
  285. "message": "share_cover is empty, marked as FAILED",
  286. }
  287. )
  288. else:
  289. valid_cards.append(card)
  290. if not valid_cards:
  291. return
  292. grouped = self._group_cards_by_channel(valid_cards)
  293. for config_id, group_cards in grouped.items():
  294. source_channel = group_cards[0]["channel"]
  295. source = self.CHANNEL_SOURCE_MAP[source_channel]
  296. await self._submit_and_record_for_config(group_cards, config_id, source)
  297. # 处理不在映射表中的卡片(回滚状态)
  298. mapped_ids = {id(c) for g in grouped.values() for c in g}
  299. for card in valid_cards:
  300. if id(card) not in mapped_ids:
  301. await self.mapper.update_card_status(
  302. card["id"],
  303. self.TaskStatus.PROCESSING,
  304. self.TaskStatus.INIT,
  305. )
  306. await self.log_service.log(
  307. contents={
  308. "card_id": card["id"],
  309. "channel": card.get("channel"),
  310. "task": "create_card_decode_task",
  311. "status": "fail",
  312. "message": "unknown channel, rolled back to INIT",
  313. }
  314. )
  315. async def deal(self):
  316. cards = await self._acquire_cards()
  317. if not cards:
  318. await self.log_service.log(
  319. contents={
  320. "task": "create_card_decode_task",
  321. "message": "No more cards to decode",
  322. }
  323. )
  324. return
  325. await self._ensure_cover_ids(cards)
  326. await self._submit_and_record(cards)
  327. await self.log_service.log(
  328. contents={
  329. "task": "create_card_decode_task",
  330. "message": f"Processed {len(cards)} cards",
  331. }
  332. )
  333. __all__ = ["CreateCardsDecodeTask"]