create_decode_tasks.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. import json
  2. from tqdm import tqdm
  3. from typing import Dict, List
  4. from app.core.database import DatabaseManager
  5. from app.core.observability import LogService
  6. from ._const import DecodeArticleConst
  7. from ._mapper import (
  8. AdPlatformArticlesDecodeTaskMapper,
  9. InnerArticlesDecodeTaskMapper,
  10. )
  11. from ._utils import AdPlatformArticlesDecodeUtils, InnerArticlesDecodeUtils
  12. class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
  13. def __init__(self, pool: DatabaseManager, log_service: LogService):
  14. self.pool = pool
  15. self.log_service = log_service
  16. self.mapper = AdPlatformArticlesDecodeTaskMapper(self.pool)
  17. self.tool = AdPlatformArticlesDecodeUtils()
  18. async def _acquire_articles(self) -> List[Dict]:
  19. """获取待解构文章,并加锁(decode_status INIT → PROCESSING)"""
  20. article_list = await self.mapper.fetch_decode_articles()
  21. locked = []
  22. for article in article_list:
  23. article_id = article["id"]
  24. acquired = await self.mapper.update_article_decode_status(
  25. article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  26. )
  27. if acquired:
  28. locked.append(article)
  29. else:
  30. await self.log_service.log(
  31. contents={
  32. "article_id": article_id,
  33. "task": "create_decode_task_v2",
  34. "status": "skip",
  35. "message": "acquire lock failed",
  36. }
  37. )
  38. return locked
  39. async def _submit_and_record(self, articles: List[Dict]):
  40. if not articles:
  41. return
  42. posts = self.tool.prepare_posts(articles)
  43. submit_results = await self.tool.submit_decode_batch(posts)
  44. posts_by_wx = {p["channelContentId"]: p for p in posts}
  45. for article in articles:
  46. wx_sn = article["wx_sn"]
  47. article_id = article["id"]
  48. result = submit_results.get(wx_sn)
  49. if not result:
  50. await self.mapper.update_article_decode_status(
  51. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  52. )
  53. await self.log_service.log(
  54. contents={
  55. "article_id": article_id,
  56. "wx_sn": wx_sn,
  57. "task": "create_decode_task_v2",
  58. "status": "fail",
  59. "message": "no response for content_id, rolled back to INIT",
  60. }
  61. )
  62. continue
  63. status = result.get("status")
  64. if status == self.SubmitStatus.FAILED:
  65. await self.mapper.update_article_decode_status(
  66. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  67. )
  68. await self.log_service.log(
  69. contents={
  70. "article_id": article_id,
  71. "wx_sn": wx_sn,
  72. "task": "create_decode_task_v2",
  73. "status": "fail",
  74. "data": result,
  75. }
  76. )
  77. continue
  78. if status == self.SubmitStatus.SUCCESS:
  79. # 已有解构结果,直接查询结果并落库
  80. query_results = await self.tool.query_decode_results_batch([wx_sn])
  81. result_data = query_results.get(wx_sn)
  82. if (
  83. result_data
  84. and result_data.get("status") == self.QueryStatus.SUCCESS
  85. ):
  86. data_content = result_data.get("dataContent") or "{}"
  87. html = result_data.get("html")
  88. await self.mapper.insert_decode_task(
  89. source_id=wx_sn,
  90. source=self.SourceType.AD_PLATFORM,
  91. payload=json.dumps(
  92. posts_by_wx.get(wx_sn, {}), ensure_ascii=False
  93. ),
  94. remark="提交时已有解构结果,直接落库",
  95. )
  96. await self.mapper.set_decode_result(
  97. source_id=wx_sn,
  98. result=json.dumps(
  99. {"dataContent": data_content, "html": html},
  100. ensure_ascii=False,
  101. ),
  102. remark="提交时已返回 SUCCESS,结果已落库",
  103. )
  104. await self.mapper.update_article_decode_status(
  105. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  106. )
  107. await self.log_service.log(
  108. contents={
  109. "article_id": article_id,
  110. "wx_sn": wx_sn,
  111. "task": "create_decode_task_v2",
  112. "status": "success",
  113. "message": "decode result already available on submit",
  114. }
  115. )
  116. else:
  117. # 提交返回 SUCCESS 但查询不到结果,插入记录等待轮询
  118. await self.mapper.insert_decode_task(
  119. source_id=wx_sn,
  120. source=self.SourceType.AD_PLATFORM,
  121. payload=json.dumps(
  122. posts_by_wx.get(wx_sn, {}), ensure_ascii=False
  123. ),
  124. remark="提交返回SUCCESS,查询未果,等待轮询",
  125. status=self.TaskStatus.PROCESSING,
  126. )
  127. await self.mapper.update_article_decode_status(
  128. article_id,
  129. self.TaskStatus.PROCESSING,
  130. self.TaskStatus.SUCCESS,
  131. )
  132. await self.log_service.log(
  133. contents={
  134. "article_id": article_id,
  135. "wx_sn": wx_sn,
  136. "task": "create_decode_task_v2",
  137. "status": "pending",
  138. "message": "submit SUCCESS but query not ready, inserted for polling",
  139. }
  140. )
  141. elif status == self.SubmitStatus.PENDING:
  142. await self.mapper.insert_decode_task(
  143. source_id=wx_sn,
  144. source=self.SourceType.AD_PLATFORM,
  145. payload=json.dumps(posts_by_wx.get(wx_sn, {}), ensure_ascii=False),
  146. remark="任务已提交,等待轮询",
  147. status=self.TaskStatus.PROCESSING,
  148. )
  149. await self.mapper.update_article_decode_status(
  150. article_id,
  151. self.TaskStatus.PROCESSING,
  152. self.TaskStatus.SUCCESS,
  153. )
  154. await self.log_service.log(
  155. contents={
  156. "article_id": article_id,
  157. "wx_sn": wx_sn,
  158. "task": "create_decode_task_v2",
  159. "status": "pending",
  160. "message": "task submitted, waiting for polling",
  161. }
  162. )
  163. else:
  164. await self.mapper.update_article_decode_status(
  165. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  166. )
  167. await self.log_service.log(
  168. contents={
  169. "article_id": article_id,
  170. "wx_sn": wx_sn,
  171. "task": "create_decode_task_v2",
  172. "status": "fail",
  173. "message": f"unexpected submit status: {status}, rolled back to INIT",
  174. "data": result,
  175. }
  176. )
  177. async def deal(self):
  178. article_list = await self._acquire_articles()
  179. if not article_list:
  180. await self.log_service.log(
  181. contents={
  182. "task": "create_decode_task_v2",
  183. "message": "No more articles to decode",
  184. }
  185. )
  186. return
  187. await self._submit_and_record(article_list)
  188. await self.log_service.log(
  189. contents={
  190. "task": "create_decode_task_v2",
  191. "message": f"Processed {len(article_list)} articles",
  192. }
  193. )
  194. class CreateInnerArticlesDecodeTask(DecodeArticleConst):
  195. _TEST_MODE = False
  196. def __init__(self, pool: DatabaseManager, log_service: LogService):
  197. self.pool = pool
  198. self.log_service = log_service
  199. self.mapper = InnerArticlesDecodeTaskMapper(self.pool)
  200. self.tool = InnerArticlesDecodeUtils()
  201. async def deal(self):
  202. article_list = await self._acquire_articles()
  203. if not article_list:
  204. await self.log_service.log(
  205. contents={
  206. "task": "create_inner_decode_task",
  207. "message": "No more articles to decode",
  208. }
  209. )
  210. return
  211. await self._submit_and_record(article_list)
  212. await self.log_service.log(
  213. contents={
  214. "task": "create_inner_decode_task",
  215. "message": f"Processed {len(article_list)} articles",
  216. }
  217. )
  218. async def _acquire_articles(self) -> List[Dict]:
  219. """获取待解构文章,并加锁(status INIT → PROCESSING)"""
  220. article_list = await self.mapper.fetch_inner_articles()
  221. if self._TEST_MODE:
  222. return article_list
  223. locked = []
  224. for article in article_list:
  225. article_id = article["id"]
  226. acquired = await self.mapper.update_inner_article_status(
  227. article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  228. )
  229. if acquired:
  230. locked.append(article)
  231. else:
  232. await self.log_service.log(
  233. contents={
  234. "article_id": article_id,
  235. "task": "create_inner_decode_task",
  236. "status": "skip",
  237. "message": "acquire lock failed",
  238. }
  239. )
  240. return locked
  241. async def _handle_result(
  242. self,
  243. article: Dict,
  244. source_id: str,
  245. result: Dict,
  246. posts_by_cid: Dict,
  247. config_id: int,
  248. ):
  249. if not result:
  250. await self.log_service.log(
  251. contents={
  252. "source_id": source_id,
  253. "task": "create_inner_decode_task",
  254. "status": "fail",
  255. "message": "no response for source_id",
  256. }
  257. )
  258. return
  259. status = result.get("status")
  260. if status == self.SubmitStatus.FAILED:
  261. await self.log_service.log(
  262. contents={
  263. "source_id": source_id,
  264. "task": "create_inner_decode_task",
  265. "status": "fail",
  266. "data": result,
  267. }
  268. )
  269. elif status == self.SubmitStatus.PENDING:
  270. await self.mapper.insert_decode_task(
  271. source_id=source_id,
  272. source=self.SourceType.INNER,
  273. payload=json.dumps(posts_by_cid.get(source_id, {}), ensure_ascii=False),
  274. remark="内部文章解构任务已提交",
  275. status=self.TaskStatus.PROCESSING,
  276. )
  277. elif status == self.SubmitStatus.SUCCESS:
  278. query_results = await self.tool.query_decode_results_batch(
  279. [source_id], config_id=config_id
  280. )
  281. result_data = query_results.get(source_id)
  282. data_content = result_data.get("dataContent") if result_data else None
  283. if data_content:
  284. await self.mapper.insert_decode_task(
  285. source_id=source_id,
  286. source=self.SourceType.INNER,
  287. payload=json.dumps(
  288. posts_by_cid.get(source_id, {}), ensure_ascii=False
  289. ),
  290. remark="内部文章解构结果已获取",
  291. )
  292. await self.mapper.set_decode_result(
  293. source_id=source_id,
  294. result=json.dumps(
  295. {"dataContent": data_content}, ensure_ascii=False
  296. ),
  297. )
  298. else:
  299. await self.mapper.insert_decode_task(
  300. source_id=source_id,
  301. source=self.SourceType.INNER,
  302. payload=json.dumps(result, ensure_ascii=False),
  303. remark="提交返回SUCCESS,查询未果,等待轮询",
  304. status=self.TaskStatus.PROCESSING,
  305. )
  306. else:
  307. await self.log_service.log(
  308. contents={
  309. "source_id": source_id,
  310. "task": "create_inner_decode_task",
  311. "status": "fail",
  312. "message": f"unexpected submit status: {status}",
  313. "data": result,
  314. }
  315. )
  316. async def _submit_and_record(self, articles: List[Dict]):
  317. if not articles:
  318. return
  319. # 过滤已有任务记录的文章(测试模式跳过)
  320. if not self._TEST_MODE:
  321. all_source_ids = [str(a["source_id"]) for a in articles]
  322. existing = await self.mapper.fetch_existing_source_ids(all_source_ids)
  323. new_articles = [a for a in articles if str(a["source_id"]) not in existing]
  324. skipped = len(articles) - len(new_articles)
  325. if skipped > 0:
  326. await self.log_service.log(
  327. contents={
  328. "task": "create_inner_decode_task",
  329. "message": f"Skipped {skipped} already-submitted articles",
  330. }
  331. )
  332. for article in articles:
  333. if article not in new_articles:
  334. await self.mapper.update_inner_article_status(
  335. article["id"],
  336. self.TaskStatus.PROCESSING,
  337. self.TaskStatus.SUCCESS,
  338. )
  339. else:
  340. new_articles = articles
  341. if not new_articles:
  342. return
  343. # 批量获取 produce 信息
  344. produce_info_map: Dict[str, list] = {}
  345. for article in new_articles:
  346. source_id = article["source_id"]
  347. produce_info = await self.mapper.fetch_inner_articles_produce_detail(
  348. source_id
  349. )
  350. produce_info_map[str(article["source_id"])] = produce_info
  351. posts = self.tool.prepare_posts(new_articles, produce_info_map)
  352. submit_results = await self.tool.submit_decode_batch(
  353. posts, config_id=self.CONFIG_ID, skip_completed=True
  354. )
  355. posts_by_cid = {p["channelContentId"]: p for p in posts}
  356. for article in tqdm(new_articles):
  357. source_id = str(article["source_id"])
  358. article_id = article["id"]
  359. result = submit_results.get(source_id)
  360. await self._handle_result(
  361. article, source_id, result, posts_by_cid, self.CONFIG_ID
  362. )
  363. if not self._TEST_MODE:
  364. ok = result and result.get("status") != self.SubmitStatus.FAILED
  365. if ok:
  366. await self.mapper.update_inner_article_status(
  367. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  368. )
  369. else:
  370. # 提交失败或无响应,回锁为 INIT 等待下次重试
  371. await self.mapper.update_inner_article_status(
  372. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  373. )
  374. __all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]