create_decode_tasks.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. import json
  2. from tqdm import tqdm
  3. from typing import Dict, List
  4. from app.core.database import DatabaseManager
  5. from app.core.observability import LogService
  6. from ._const import DecodeArticleConst
  7. from ._mapper import (
  8. AdPlatformArticlesDecodeTaskMapper,
  9. InnerArticlesDecodeTaskMapper,
  10. )
  11. from ._utils import AdPlatformArticlesDecodeUtils, InnerArticlesDecodeUtils
  12. class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
  13. def __init__(self, pool: DatabaseManager, log_service: LogService):
  14. self.pool = pool
  15. self.log_service = log_service
  16. self.mapper = AdPlatformArticlesDecodeTaskMapper(self.pool)
  17. self.tool = AdPlatformArticlesDecodeUtils()
  18. async def _acquire_articles(self) -> List[Dict]:
  19. """获取待解构文章,并加锁(decode_status INIT → PROCESSING)"""
  20. article_list = await self.mapper.fetch_decode_articles()
  21. locked = []
  22. for article in article_list:
  23. article_id = article["id"]
  24. acquired = await self.mapper.update_article_decode_status(
  25. article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  26. )
  27. if acquired:
  28. locked.append(article)
  29. else:
  30. await self.log_service.log(
  31. contents={
  32. "article_id": article_id,
  33. "task": "create_decode_task_v2",
  34. "status": "skip",
  35. "message": "acquire lock failed",
  36. }
  37. )
  38. return locked
  39. async def _submit_and_record(self, articles: List[Dict]):
  40. if not articles:
  41. return
  42. posts = self.tool.prepare_posts(articles)
  43. submit_results = await self.tool.submit_decode_batch(posts)
  44. posts_by_wx = {p["channelContentId"]: p for p in posts}
  45. for article in articles:
  46. wx_sn = article["wx_sn"]
  47. article_id = article["id"]
  48. result = submit_results.get(wx_sn)
  49. if not result:
  50. await self.mapper.update_article_decode_status(
  51. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  52. )
  53. await self.log_service.log(
  54. contents={
  55. "article_id": article_id,
  56. "wx_sn": wx_sn,
  57. "task": "create_decode_task_v2",
  58. "status": "fail",
  59. "message": "no response for content_id, rolled back to INIT",
  60. }
  61. )
  62. continue
  63. status = result.get("status")
  64. if status == self.SubmitStatus.FAILED:
  65. await self.mapper.update_article_decode_status(
  66. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  67. )
  68. await self.log_service.log(
  69. contents={
  70. "article_id": article_id,
  71. "wx_sn": wx_sn,
  72. "task": "create_decode_task_v2",
  73. "status": "fail",
  74. "data": result,
  75. }
  76. )
  77. continue
  78. if status == self.SubmitStatus.SUCCESS:
  79. # 已有解构结果,直接查询结果并落库
  80. query_results = await self.tool.query_decode_results_batch([wx_sn])
  81. result_data = query_results.get(wx_sn)
  82. if (
  83. result_data
  84. and result_data.get("status") == self.QueryStatus.SUCCESS
  85. ):
  86. data_content = result_data.get("dataContent") or "{}"
  87. html = result_data.get("html")
  88. await self.mapper.insert_decode_task(
  89. source_id=wx_sn,
  90. source=self.SourceType.AD_PLATFORM,
  91. payload=json.dumps(
  92. posts_by_wx.get(wx_sn, {}), ensure_ascii=False
  93. ),
  94. remark="提交时已有解构结果,直接落库",
  95. )
  96. await self.mapper.set_decode_result(
  97. source_id=wx_sn,
  98. result=json.dumps(
  99. {"dataContent": data_content, "html": html},
  100. ensure_ascii=False,
  101. ),
  102. remark="提交时已返回 SUCCESS,结果已落库",
  103. )
  104. await self.mapper.update_article_decode_status(
  105. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  106. )
  107. await self.log_service.log(
  108. contents={
  109. "article_id": article_id,
  110. "wx_sn": wx_sn,
  111. "task": "create_decode_task_v2",
  112. "status": "success",
  113. "message": "decode result already available on submit",
  114. }
  115. )
  116. else:
  117. # 提交返回 SUCCESS 但查询不到结果,插入记录等待轮询
  118. await self.mapper.insert_decode_task(
  119. source_id=wx_sn,
  120. source=self.SourceType.AD_PLATFORM,
  121. payload=json.dumps(
  122. posts_by_wx.get(wx_sn, {}), ensure_ascii=False
  123. ),
  124. remark="提交返回SUCCESS,查询未果,等待轮询",
  125. status=self.TaskStatus.PROCESSING,
  126. )
  127. await self.mapper.update_article_decode_status(
  128. article_id,
  129. self.TaskStatus.PROCESSING,
  130. self.TaskStatus.SUCCESS,
  131. )
  132. await self.log_service.log(
  133. contents={
  134. "article_id": article_id,
  135. "wx_sn": wx_sn,
  136. "task": "create_decode_task_v2",
  137. "status": "pending",
  138. "message": "submit SUCCESS but query not ready, inserted for polling",
  139. }
  140. )
  141. elif status == self.SubmitStatus.PENDING:
  142. await self.mapper.insert_decode_task(
  143. source_id=wx_sn,
  144. source=self.SourceType.AD_PLATFORM,
  145. payload=json.dumps(posts_by_wx.get(wx_sn, {}), ensure_ascii=False),
  146. remark="任务已提交,等待轮询",
  147. status=self.TaskStatus.PROCESSING,
  148. )
  149. await self.mapper.update_article_decode_status(
  150. article_id,
  151. self.TaskStatus.PROCESSING,
  152. self.TaskStatus.SUCCESS,
  153. )
  154. await self.log_service.log(
  155. contents={
  156. "article_id": article_id,
  157. "wx_sn": wx_sn,
  158. "task": "create_decode_task_v2",
  159. "status": "pending",
  160. "message": "task submitted, waiting for polling",
  161. }
  162. )
  163. else:
  164. await self.mapper.update_article_decode_status(
  165. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  166. )
  167. await self.log_service.log(
  168. contents={
  169. "article_id": article_id,
  170. "wx_sn": wx_sn,
  171. "task": "create_decode_task_v2",
  172. "status": "fail",
  173. "message": f"unexpected submit status: {status}, rolled back to INIT",
  174. "data": result,
  175. }
  176. )
  177. async def deal(self):
  178. article_list = await self._acquire_articles()
  179. if not article_list:
  180. await self.log_service.log(
  181. contents={
  182. "task": "create_decode_task_v2",
  183. "message": "No more articles to decode",
  184. }
  185. )
  186. return
  187. await self._submit_and_record(article_list)
  188. await self.log_service.log(
  189. contents={
  190. "task": "create_decode_task_v2",
  191. "message": f"Processed {len(article_list)} articles",
  192. }
  193. )
  194. class CreateInnerArticlesDecodeTask(DecodeArticleConst):
  195. _TEST_MODE = False
  196. def __init__(self, pool: DatabaseManager, log_service: LogService):
  197. self.pool = pool
  198. self.log_service = log_service
  199. self.mapper = InnerArticlesDecodeTaskMapper(self.pool)
  200. self.tool = InnerArticlesDecodeUtils()
  201. async def deal(self):
  202. article_list = await self._acquire_articles()
  203. if not article_list:
  204. await self.log_service.log(
  205. contents={
  206. "task": "create_inner_decode_task",
  207. "message": "No more articles to decode",
  208. }
  209. )
  210. return
  211. await self._submit_and_record(article_list)
  212. await self.log_service.log(
  213. contents={
  214. "task": "create_inner_decode_task",
  215. "message": f"Processed {len(article_list)} articles",
  216. }
  217. )
  218. async def _acquire_articles(self) -> List[Dict]:
  219. """获取待解构文章,并加锁(status INIT → PROCESSING)
  220. 先检查队列中已有多少进行中任务(INIT + PROCESSING),控制消费端压力:
  221. - 软上限:可创建数 ≈ TASK_BATCH - 已有进行中任务数
  222. - 计数与取数之间不原子,其他 worker 可能同时插入,由后续 CAS 锁兜底
  223. - 若队列已满(pending >= TASK_BATCH),跳过本轮
  224. """
  225. pending_count = await self.mapper.count_pending_tasks(
  226. source=self.SourceType.INNER
  227. )
  228. available_slots = max(0, self.TASK_BATCH - pending_count)
  229. if available_slots == 0:
  230. await self.log_service.log(
  231. contents={
  232. "task": "create_inner_decode_task",
  233. "message": f"队列已满:进行中任务 {pending_count} >= {self.TASK_BATCH},跳过本轮创建",
  234. }
  235. )
  236. return []
  237. article_list = await self.mapper.fetch_inner_articles(limit=available_slots)
  238. if self._TEST_MODE:
  239. return article_list
  240. locked = []
  241. for article in article_list:
  242. article_id = article["id"]
  243. acquired = await self.mapper.update_inner_article_status(
  244. article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  245. )
  246. if acquired:
  247. locked.append(article)
  248. else:
  249. await self.log_service.log(
  250. contents={
  251. "article_id": article_id,
  252. "task": "create_inner_decode_task",
  253. "status": "skip",
  254. "message": "acquire lock failed",
  255. }
  256. )
  257. return locked
  258. async def _handle_result(
  259. self,
  260. article: Dict,
  261. source_id: str,
  262. result: Dict,
  263. posts_by_cid: Dict,
  264. config_id: int,
  265. ):
  266. if not result:
  267. await self.log_service.log(
  268. contents={
  269. "source_id": source_id,
  270. "task": "create_inner_decode_task",
  271. "status": "fail",
  272. "message": "no response for source_id",
  273. }
  274. )
  275. return
  276. status = result.get("status")
  277. if status == self.SubmitStatus.FAILED:
  278. await self.log_service.log(
  279. contents={
  280. "source_id": source_id,
  281. "task": "create_inner_decode_task",
  282. "status": "fail",
  283. "data": result,
  284. }
  285. )
  286. elif status == self.SubmitStatus.PENDING:
  287. await self.mapper.insert_decode_task(
  288. source_id=source_id,
  289. source=self.SourceType.INNER,
  290. payload=json.dumps(posts_by_cid.get(source_id, {}), ensure_ascii=False),
  291. remark="内部文章解构任务已提交",
  292. status=self.TaskStatus.PROCESSING,
  293. )
  294. elif status == self.SubmitStatus.SUCCESS:
  295. query_results = await self.tool.query_decode_results_batch(
  296. [source_id], config_id=config_id
  297. )
  298. result_data = query_results.get(source_id)
  299. data_content = result_data.get("dataContent") if result_data else None
  300. if data_content:
  301. await self.mapper.insert_decode_task(
  302. source_id=source_id,
  303. source=self.SourceType.INNER,
  304. payload=json.dumps(
  305. posts_by_cid.get(source_id, {}), ensure_ascii=False
  306. ),
  307. remark="内部文章解构结果已获取",
  308. )
  309. await self.mapper.set_decode_result(
  310. source_id=source_id,
  311. result=json.dumps(
  312. {"dataContent": data_content}, ensure_ascii=False
  313. ),
  314. )
  315. else:
  316. await self.mapper.insert_decode_task(
  317. source_id=source_id,
  318. source=self.SourceType.INNER,
  319. payload=json.dumps(result, ensure_ascii=False),
  320. remark="提交返回SUCCESS,查询未果,等待轮询",
  321. status=self.TaskStatus.PROCESSING,
  322. )
  323. else:
  324. await self.log_service.log(
  325. contents={
  326. "source_id": source_id,
  327. "task": "create_inner_decode_task",
  328. "status": "fail",
  329. "message": f"unexpected submit status: {status}",
  330. "data": result,
  331. }
  332. )
  333. async def _submit_and_record(self, articles: List[Dict]):
  334. if not articles:
  335. return
  336. # 过滤已有任务记录的文章(测试模式跳过)
  337. if not self._TEST_MODE:
  338. all_source_ids = [str(a["source_id"]) for a in articles]
  339. existing = await self.mapper.fetch_existing_source_ids(all_source_ids)
  340. new_articles = [a for a in articles if str(a["source_id"]) not in existing]
  341. skipped = len(articles) - len(new_articles)
  342. if skipped > 0:
  343. await self.log_service.log(
  344. contents={
  345. "task": "create_inner_decode_task",
  346. "message": f"Skipped {skipped} already-submitted articles",
  347. }
  348. )
  349. for article in articles:
  350. if article not in new_articles:
  351. await self.mapper.update_inner_article_status(
  352. article["id"],
  353. self.TaskStatus.PROCESSING,
  354. self.TaskStatus.SUCCESS,
  355. )
  356. else:
  357. new_articles = articles
  358. if not new_articles:
  359. return
  360. # 批量获取 produce 信息
  361. produce_info_map: Dict[str, list] = {}
  362. for article in new_articles:
  363. source_id = article["source_id"]
  364. produce_info = await self.mapper.fetch_inner_articles_produce_detail(
  365. source_id
  366. )
  367. produce_info_map[str(article["source_id"])] = produce_info
  368. posts = self.tool.prepare_posts(new_articles, produce_info_map)
  369. submit_results = await self.tool.submit_decode_batch(
  370. posts, config_id=self.CONFIG_ID, skip_completed=True
  371. )
  372. posts_by_cid = {p["channelContentId"]: p for p in posts}
  373. for article in tqdm(new_articles):
  374. source_id = str(article["source_id"])
  375. article_id = article["id"]
  376. result = submit_results.get(source_id)
  377. await self._handle_result(
  378. article, source_id, result, posts_by_cid, self.CONFIG_ID
  379. )
  380. if not self._TEST_MODE:
  381. ok = result and result.get("status") != self.SubmitStatus.FAILED
  382. if ok:
  383. await self.mapper.update_inner_article_status(
  384. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  385. )
  386. else:
  387. # 提交失败或无响应,回锁为 INIT 等待下次重试
  388. await self.mapper.update_inner_article_status(
  389. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  390. )
  391. __all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]