create_decode_tasks.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. import json
  2. from tqdm import tqdm
  3. from typing import Dict, List
  4. from app.core.database import DatabaseManager
  5. from app.core.observability import LogService
  6. from ._const import DecodeArticleConst
  7. from ._mapper import (
  8. AdPlatformArticlesDecodeTaskMapper,
  9. InnerArticlesDecodeTaskMapper,
  10. )
  11. from ._utils import AdPlatformArticlesDecodeUtils, InnerArticlesDecodeUtils
  12. class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
  13. def __init__(self, pool: DatabaseManager, log_service: LogService):
  14. self.pool = pool
  15. self.log_service = log_service
  16. self.mapper = AdPlatformArticlesDecodeTaskMapper(self.pool)
  17. self.tool = AdPlatformArticlesDecodeUtils()
  18. async def _acquire_articles(self) -> List[Dict]:
  19. """获取待解构文章,并加锁(decode_status INIT → PROCESSING)"""
  20. article_list = await self.mapper.fetch_decode_articles()
  21. locked = []
  22. for article in article_list:
  23. article_id = article["id"]
  24. acquired = await self.mapper.update_article_decode_status(
  25. article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  26. )
  27. if acquired:
  28. locked.append(article)
  29. else:
  30. await self.log_service.log(
  31. contents={
  32. "article_id": article_id,
  33. "task": "create_decode_task_v2",
  34. "status": "skip",
  35. "message": "acquire lock failed",
  36. }
  37. )
  38. return locked
  39. async def _submit_and_record(self, articles: List[Dict]):
  40. if not articles:
  41. return
  42. posts = self.tool.prepare_posts(articles)
  43. submit_results = await self.tool.submit_decode_batch(posts)
  44. posts_by_wx = {p["channelContentId"]: p for p in posts}
  45. for article in articles:
  46. wx_sn = article["wx_sn"]
  47. article_id = article["id"]
  48. result = submit_results.get(wx_sn)
  49. if not result:
  50. await self.mapper.update_article_decode_status(
  51. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  52. )
  53. await self.log_service.log(
  54. contents={
  55. "article_id": article_id,
  56. "wx_sn": wx_sn,
  57. "task": "create_decode_task_v2",
  58. "status": "fail",
  59. "message": "no response for content_id, rolled back to INIT",
  60. }
  61. )
  62. continue
  63. status = result.get("status")
  64. if status == self.SubmitStatus.FAILED:
  65. await self.mapper.update_article_decode_status(
  66. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  67. )
  68. await self.log_service.log(
  69. contents={
  70. "article_id": article_id,
  71. "wx_sn": wx_sn,
  72. "task": "create_decode_task_v2",
  73. "status": "fail",
  74. "data": result,
  75. }
  76. )
  77. continue
  78. if status == self.SubmitStatus.SUCCESS:
  79. # 已有解构结果,直接查询结果并落库
  80. query_results = await self.tool.query_decode_results_batch([wx_sn])
  81. result_data = query_results.get(wx_sn)
  82. if (
  83. result_data
  84. and result_data.get("status") == self.QueryStatus.SUCCESS
  85. ):
  86. data_content = result_data.get("dataContent") or "{}"
  87. html = result_data.get("html")
  88. await self.mapper.insert_decode_task(
  89. source_id=wx_sn,
  90. source=self.SourceType.AD_PLATFORM,
  91. payload=json.dumps(
  92. posts_by_wx.get(wx_sn, {}), ensure_ascii=False
  93. ),
  94. remark="提交时已有解构结果,直接落库",
  95. )
  96. await self.mapper.set_decode_result(
  97. source_id=wx_sn,
  98. result=json.dumps(
  99. {"dataContent": data_content, "html": html},
  100. ensure_ascii=False,
  101. ),
  102. remark="提交时已返回 SUCCESS,结果已落库",
  103. )
  104. await self.mapper.update_article_decode_status(
  105. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  106. )
  107. await self.log_service.log(
  108. contents={
  109. "article_id": article_id,
  110. "wx_sn": wx_sn,
  111. "task": "create_decode_task_v2",
  112. "status": "success",
  113. "message": "decode result already available on submit",
  114. }
  115. )
  116. else:
  117. # 提交返回 SUCCESS 但查询不到结果,插入记录等待轮询
  118. await self.mapper.insert_decode_task(
  119. source_id=wx_sn,
  120. source=self.SourceType.AD_PLATFORM,
  121. payload=json.dumps(
  122. posts_by_wx.get(wx_sn, {}), ensure_ascii=False
  123. ),
  124. remark="提交返回SUCCESS,查询未果,等待轮询",
  125. status=self.TaskStatus.PROCESSING,
  126. )
  127. await self.mapper.update_article_decode_status(
  128. article_id,
  129. self.TaskStatus.PROCESSING,
  130. self.TaskStatus.SUCCESS,
  131. )
  132. await self.log_service.log(
  133. contents={
  134. "article_id": article_id,
  135. "wx_sn": wx_sn,
  136. "task": "create_decode_task_v2",
  137. "status": "pending",
  138. "message": "submit SUCCESS but query not ready, inserted for polling",
  139. }
  140. )
  141. elif status == self.SubmitStatus.PENDING:
  142. await self.mapper.insert_decode_task(
  143. source_id=wx_sn,
  144. source=self.SourceType.AD_PLATFORM,
  145. payload=json.dumps(posts_by_wx.get(wx_sn, {}), ensure_ascii=False),
  146. remark="任务已提交,等待轮询",
  147. status=self.TaskStatus.PROCESSING,
  148. )
  149. await self.mapper.update_article_decode_status(
  150. article_id,
  151. self.TaskStatus.PROCESSING,
  152. self.TaskStatus.SUCCESS,
  153. )
  154. await self.log_service.log(
  155. contents={
  156. "article_id": article_id,
  157. "wx_sn": wx_sn,
  158. "task": "create_decode_task_v2",
  159. "status": "pending",
  160. "message": "task submitted, waiting for polling",
  161. }
  162. )
  163. else:
  164. await self.mapper.update_article_decode_status(
  165. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  166. )
  167. await self.log_service.log(
  168. contents={
  169. "article_id": article_id,
  170. "wx_sn": wx_sn,
  171. "task": "create_decode_task_v2",
  172. "status": "fail",
  173. "message": f"unexpected submit status: {status}, rolled back to INIT",
  174. "data": result,
  175. }
  176. )
  177. async def deal(self):
  178. article_list = await self._acquire_articles()
  179. if not article_list:
  180. await self.log_service.log(
  181. contents={
  182. "task": "create_decode_task_v2",
  183. "message": "No more articles to decode",
  184. }
  185. )
  186. return
  187. await self._submit_and_record(article_list)
  188. await self.log_service.log(
  189. contents={
  190. "task": "create_decode_task_v2",
  191. "message": f"Processed {len(article_list)} articles",
  192. }
  193. )
  194. class CreateInnerArticlesDecodeTask(DecodeArticleConst):
  195. _TEST_MODE = False
  196. def __init__(self, pool: DatabaseManager, log_service: LogService):
  197. self.pool = pool
  198. self.log_service = log_service
  199. self.mapper = InnerArticlesDecodeTaskMapper(self.pool)
  200. self.tool = InnerArticlesDecodeUtils()
  201. async def _acquire_articles(self) -> List[Dict]:
  202. """获取待解构文章,并加锁(status INIT → PROCESSING)"""
  203. article_list = await self.mapper.fetch_inner_articles()
  204. if self._TEST_MODE:
  205. return article_list
  206. locked = []
  207. for article in article_list:
  208. article_id = article["id"]
  209. acquired = await self.mapper.update_inner_article_status(
  210. article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  211. )
  212. if acquired:
  213. locked.append(article)
  214. else:
  215. await self.log_service.log(
  216. contents={
  217. "article_id": article_id,
  218. "task": "create_inner_decode_task",
  219. "status": "skip",
  220. "message": "acquire lock failed",
  221. }
  222. )
  223. return locked
  224. async def _handle_result(
  225. self,
  226. article: Dict,
  227. source_id: str,
  228. result: Dict,
  229. posts_by_cid: Dict,
  230. config_id: int,
  231. ):
  232. if not result:
  233. await self.log_service.log(
  234. contents={
  235. "source_id": source_id,
  236. "task": "create_inner_decode_task",
  237. "status": "fail",
  238. "message": "no response for source_id",
  239. }
  240. )
  241. return
  242. status = result.get("status")
  243. if status == self.SubmitStatus.FAILED:
  244. await self.log_service.log(
  245. contents={
  246. "source_id": source_id,
  247. "task": "create_inner_decode_task",
  248. "status": "fail",
  249. "data": result,
  250. }
  251. )
  252. elif status == self.SubmitStatus.PENDING:
  253. await self.mapper.insert_decode_task(
  254. source_id=source_id,
  255. source=self.SourceType.INNER,
  256. payload=json.dumps(posts_by_cid.get(source_id, {}), ensure_ascii=False),
  257. remark="内部文章解构任务已提交",
  258. status=self.TaskStatus.PROCESSING,
  259. )
  260. elif status == self.SubmitStatus.SUCCESS:
  261. query_results = await self.tool.query_decode_results_batch(
  262. [source_id], config_id=config_id
  263. )
  264. result_data = query_results.get(source_id)
  265. data_content = result_data.get("dataContent") if result_data else None
  266. if data_content:
  267. await self.mapper.insert_decode_task(
  268. source_id=source_id,
  269. source=self.SourceType.INNER,
  270. payload=json.dumps(
  271. posts_by_cid.get(source_id, {}), ensure_ascii=False
  272. ),
  273. remark="内部文章解构结果已获取",
  274. )
  275. await self.mapper.set_decode_result(
  276. source_id=source_id,
  277. result=json.dumps(
  278. {"dataContent": data_content}, ensure_ascii=False
  279. ),
  280. )
  281. else:
  282. await self.mapper.insert_decode_task(
  283. source_id=source_id,
  284. source=self.SourceType.INNER,
  285. payload=json.dumps(result, ensure_ascii=False),
  286. remark="提交返回SUCCESS,查询未果,等待轮询",
  287. status=self.TaskStatus.PROCESSING,
  288. )
  289. else:
  290. await self.log_service.log(
  291. contents={
  292. "source_id": source_id,
  293. "task": "create_inner_decode_task",
  294. "status": "fail",
  295. "message": f"unexpected submit status: {status}",
  296. "data": result,
  297. }
  298. )
  299. async def _submit_and_record(self, articles: List[Dict]):
  300. if not articles:
  301. return
  302. # 过滤已有任务记录的文章(测试模式跳过)
  303. if not self._TEST_MODE:
  304. all_source_ids = [str(a["source_id"]) for a in articles]
  305. existing = await self.mapper.fetch_existing_source_ids(all_source_ids)
  306. new_articles = [a for a in articles if str(a["source_id"]) not in existing]
  307. skipped = len(articles) - len(new_articles)
  308. if skipped > 0:
  309. await self.log_service.log(
  310. contents={
  311. "task": "create_inner_decode_task",
  312. "message": f"Skipped {skipped} already-submitted articles",
  313. }
  314. )
  315. for article in articles:
  316. if article not in new_articles:
  317. await self.mapper.update_inner_article_status(
  318. article["id"],
  319. self.TaskStatus.PROCESSING,
  320. self.TaskStatus.SUCCESS,
  321. )
  322. else:
  323. new_articles = articles
  324. if not new_articles:
  325. return
  326. # 批量获取 produce 信息
  327. produce_info_map: Dict[str, list] = {}
  328. for article in new_articles:
  329. source_id = article["source_id"]
  330. produce_info = await self.mapper.fetch_inner_articles_produce_detail(
  331. source_id
  332. )
  333. produce_info_map[str(article["source_id"])] = produce_info
  334. posts = self.tool.prepare_posts(new_articles, produce_info_map)
  335. submit_results = await self.tool.submit_decode_batch(
  336. posts, config_id=self.CONFIG_ID, skip_completed=True
  337. )
  338. posts_by_cid = {p["channelContentId"]: p for p in posts}
  339. for article in tqdm(new_articles):
  340. source_id = str(article["source_id"])
  341. article_id = article["id"]
  342. result = submit_results.get(source_id)
  343. await self._handle_result(
  344. article, source_id, result, posts_by_cid, self.CONFIG_ID
  345. )
  346. if not self._TEST_MODE:
  347. ok = result and result.get("status") != self.SubmitStatus.FAILED
  348. if ok:
  349. await self.mapper.update_inner_article_status(
  350. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  351. )
  352. else:
  353. # 提交失败或无响应,回锁为 INIT 等待下次重试
  354. await self.mapper.update_inner_article_status(
  355. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  356. )
  357. async def deal(self):
  358. article_list = await self._acquire_articles()
  359. if not article_list:
  360. await self.log_service.log(
  361. contents={
  362. "task": "create_inner_decode_task",
  363. "message": "No more articles to decode",
  364. }
  365. )
  366. return
  367. await self._submit_and_record(article_list)
  368. await self.log_service.log(
  369. contents={
  370. "task": "create_inner_decode_task",
  371. "message": f"Processed {len(article_list)} articles",
  372. }
  373. )
  374. __all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]