create_decode_tasks.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. import json
  2. from tqdm import tqdm
  3. from typing import Dict, List
  4. from app.core.database import DatabaseManager
  5. from app.core.observability import LogService
  6. from ._const import DecodeArticleConst
  7. from ._mapper import (
  8. AdPlatformArticlesDecodeTaskMapper,
  9. InnerArticlesDecodeTaskMapper,
  10. )
  11. from ._utils import AdPlatformArticlesDecodeUtils, InnerArticlesDecodeUtils
  12. class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
  13. def __init__(self, pool: DatabaseManager, log_service: LogService):
  14. self.pool = pool
  15. self.log_service = log_service
  16. self.mapper = AdPlatformArticlesDecodeTaskMapper(self.pool)
  17. self.tool = AdPlatformArticlesDecodeUtils()
  18. async def _acquire_articles(self) -> List[Dict]:
  19. """获取待解构文章,并加锁(decode_status INIT → PROCESSING)"""
  20. article_list = await self.mapper.fetch_decode_articles()
  21. locked = []
  22. for article in article_list:
  23. article_id = article["id"]
  24. acquired = await self.mapper.update_article_decode_status(
  25. article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  26. )
  27. if acquired:
  28. locked.append(article)
  29. else:
  30. await self.log_service.log(
  31. contents={
  32. "article_id": article_id,
  33. "task": "create_decode_task_v2",
  34. "status": "skip",
  35. "message": "acquire lock failed",
  36. }
  37. )
  38. return locked
  39. async def _submit_and_record(self, articles: List[Dict]):
  40. if not articles:
  41. return
  42. posts = self.tool.prepare_posts(articles)
  43. submit_results = await self.tool.submit_decode_batch(posts)
  44. posts_by_wx = {p["channelContentId"]: p for p in posts}
  45. for article in articles:
  46. wx_sn = article["wx_sn"]
  47. article_id = article["id"]
  48. result = submit_results.get(wx_sn)
  49. if not result:
  50. await self.mapper.update_article_decode_status(
  51. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  52. )
  53. await self.log_service.log(
  54. contents={
  55. "article_id": article_id,
  56. "wx_sn": wx_sn,
  57. "task": "create_decode_task_v2",
  58. "status": "fail",
  59. "message": "no response for channel_content_id, rolled back to INIT",
  60. }
  61. )
  62. continue
  63. status = result.get("status")
  64. if status == self.SubmitStatus.FAILED:
  65. await self.mapper.update_article_decode_status(
  66. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  67. )
  68. await self.log_service.log(
  69. contents={
  70. "article_id": article_id,
  71. "wx_sn": wx_sn,
  72. "task": "create_decode_task_v2",
  73. "status": "fail",
  74. "data": result,
  75. }
  76. )
  77. continue
  78. if status == self.SubmitStatus.SUCCESS:
  79. # 已有解构结果,直接查询结果并落库
  80. query_results = await self.tool.query_decode_results_batch([wx_sn])
  81. result_data = query_results.get(wx_sn)
  82. if result_data and result_data.get("status") == self.QueryStatus.SUCCESS:
  83. data_content = result_data.get("dataContent") or "{}"
  84. html = result_data.get("html")
  85. await self.mapper.insert_decode_task(
  86. channel_content_id=wx_sn,
  87. content_id=article_id,
  88. source=self.SourceType.AD_PLATFORM,
  89. payload=json.dumps(
  90. posts_by_wx.get(wx_sn, {}), ensure_ascii=False
  91. ),
  92. remark="提交时已有解构结果,直接落库",
  93. )
  94. await self.mapper.set_decode_result(
  95. channel_content_id=wx_sn,
  96. result=json.dumps(
  97. {"dataContent": data_content, "html": html},
  98. ensure_ascii=False,
  99. ),
  100. remark="提交时已返回 SUCCESS,结果已落库",
  101. )
  102. await self.mapper.update_article_decode_status(
  103. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  104. )
  105. await self.log_service.log(
  106. contents={
  107. "article_id": article_id,
  108. "wx_sn": wx_sn,
  109. "task": "create_decode_task_v2",
  110. "status": "success",
  111. "message": "decode result already available on submit",
  112. }
  113. )
  114. else:
  115. # 提交返回 SUCCESS 但查询不到结果,插入记录等待轮询
  116. await self.mapper.insert_decode_task(
  117. channel_content_id=wx_sn,
  118. content_id=article_id,
  119. source=self.SourceType.AD_PLATFORM,
  120. payload=json.dumps(
  121. posts_by_wx.get(wx_sn, {}), ensure_ascii=False
  122. ),
  123. remark="提交返回SUCCESS,查询未果,等待轮询",
  124. )
  125. await self.mapper.update_article_decode_status(
  126. article_id,
  127. self.TaskStatus.PROCESSING,
  128. self.TaskStatus.SUCCESS,
  129. )
  130. await self.log_service.log(
  131. contents={
  132. "article_id": article_id,
  133. "wx_sn": wx_sn,
  134. "task": "create_decode_task_v2",
  135. "status": "pending",
  136. "message": "submit SUCCESS but query not ready, inserted for polling",
  137. }
  138. )
  139. elif status == self.SubmitStatus.PENDING:
  140. await self.mapper.insert_decode_task(
  141. channel_content_id=wx_sn,
  142. content_id=article_id,
  143. source=self.SourceType.AD_PLATFORM,
  144. payload=json.dumps(
  145. posts_by_wx.get(wx_sn, {}), ensure_ascii=False
  146. ),
  147. remark="任务已提交,等待轮询",
  148. )
  149. await self.mapper.update_article_decode_status(
  150. article_id,
  151. self.TaskStatus.PROCESSING,
  152. self.TaskStatus.SUCCESS,
  153. )
  154. await self.log_service.log(
  155. contents={
  156. "article_id": article_id,
  157. "wx_sn": wx_sn,
  158. "task": "create_decode_task_v2",
  159. "status": "pending",
  160. "message": "task submitted, waiting for polling",
  161. }
  162. )
  163. else:
  164. await self.mapper.update_article_decode_status(
  165. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  166. )
  167. await self.log_service.log(
  168. contents={
  169. "article_id": article_id,
  170. "wx_sn": wx_sn,
  171. "task": "create_decode_task_v2",
  172. "status": "fail",
  173. "message": f"unexpected submit status: {status}, rolled back to INIT",
  174. "data": result,
  175. }
  176. )
  177. async def deal(self):
  178. article_list = await self._acquire_articles()
  179. if not article_list:
  180. await self.log_service.log(
  181. contents={
  182. "task": "create_decode_task_v2",
  183. "message": "No more articles to decode",
  184. }
  185. )
  186. return
  187. await self._submit_and_record(article_list)
  188. await self.log_service.log(
  189. contents={
  190. "task": "create_decode_task_v2",
  191. "message": f"Processed {len(article_list)} articles",
  192. }
  193. )
  194. class CreateInnerArticlesDecodeTask(DecodeArticleConst):
  195. _TEST_MODE = False
  196. def __init__(self, pool: DatabaseManager, log_service: LogService):
  197. self.pool = pool
  198. self.log_service = log_service
  199. self.mapper = InnerArticlesDecodeTaskMapper(self.pool)
  200. self.tool = InnerArticlesDecodeUtils()
  201. async def _acquire_articles(self) -> List[Dict]:
  202. """获取待解构文章,并加锁(status INIT → PROCESSING)"""
  203. article_list = await self.mapper.fetch_inner_articles()
  204. if self._TEST_MODE:
  205. return article_list
  206. locked = []
  207. for article in article_list:
  208. article_id = article["id"]
  209. acquired = await self.mapper.update_inner_article_status(
  210. article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  211. )
  212. if acquired:
  213. locked.append(article)
  214. else:
  215. await self.log_service.log(
  216. contents={
  217. "article_id": article_id,
  218. "task": "create_inner_decode_task",
  219. "status": "skip",
  220. "message": "acquire lock failed",
  221. }
  222. )
  223. return locked
  224. async def _handle_result(
  225. self,
  226. article: Dict,
  227. channel_content_id: str,
  228. result: Dict,
  229. posts_by_cid: Dict,
  230. config_id: int,
  231. ):
  232. wx_sn = article["wx_sn"]
  233. if not result:
  234. await self.log_service.log(
  235. contents={
  236. "wx_sn": wx_sn,
  237. "task": "create_inner_decode_task",
  238. "status": "fail",
  239. "message": "no response for channel_content_id",
  240. }
  241. )
  242. return
  243. status = result.get("status")
  244. if status == self.SubmitStatus.FAILED:
  245. await self.log_service.log(
  246. contents={
  247. "wx_sn": wx_sn,
  248. "task": "create_inner_decode_task",
  249. "status": "fail",
  250. "data": result,
  251. }
  252. )
  253. elif status == self.SubmitStatus.PENDING:
  254. await self.mapper.insert_decode_task(
  255. channel_content_id=channel_content_id,
  256. content_id=str(article.get("source_id", "")),
  257. source=self.SourceType.INNER,
  258. payload=json.dumps(
  259. posts_by_cid.get(channel_content_id, {}), ensure_ascii=False
  260. ),
  261. remark="内部文章解构任务已提交",
  262. )
  263. elif status == self.SubmitStatus.SUCCESS:
  264. query_results = await self.tool.query_decode_results_batch(
  265. [channel_content_id], config_id=config_id
  266. )
  267. result_data = query_results.get(channel_content_id)
  268. data_content = result_data.get("dataContent") if result_data else None
  269. if data_content:
  270. await self.mapper.insert_decode_task(
  271. channel_content_id=channel_content_id,
  272. content_id=str(article.get("source_id", "")),
  273. source=self.SourceType.INNER,
  274. payload=json.dumps(
  275. posts_by_cid.get(channel_content_id, {}), ensure_ascii=False
  276. ),
  277. remark="内部文章解构结果已获取",
  278. )
  279. await self.mapper.set_decode_result(
  280. channel_content_id=channel_content_id,
  281. result=json.dumps(
  282. {"dataContent": data_content}, ensure_ascii=False
  283. ),
  284. )
  285. else:
  286. await self.mapper.insert_decode_task(
  287. channel_content_id=channel_content_id,
  288. content_id=str(article.get("source_id", "")),
  289. source=self.SourceType.INNER,
  290. payload=json.dumps(result, ensure_ascii=False),
  291. remark="提交返回SUCCESS,查询未果,等待轮询",
  292. )
  293. else:
  294. await self.log_service.log(
  295. contents={
  296. "wx_sn": wx_sn,
  297. "task": "create_inner_decode_task",
  298. "status": "fail",
  299. "message": f"unexpected submit status: {status}",
  300. "data": result,
  301. }
  302. )
  303. async def _submit_and_record(self, articles: List[Dict]):
  304. if not articles:
  305. return
  306. # 过滤已有任务记录的文章(测试模式跳过)
  307. if not self._TEST_MODE:
  308. all_wx_sns = [a["wx_sn"] for a in articles]
  309. existing = await self.mapper.fetch_existing_channel_content_ids(all_wx_sns)
  310. new_articles = [a for a in articles if a["wx_sn"] not in existing]
  311. skipped = len(articles) - len(new_articles)
  312. if skipped > 0:
  313. await self.log_service.log(
  314. contents={
  315. "task": "create_inner_decode_task",
  316. "message": f"Skipped {skipped} already-submitted articles",
  317. }
  318. )
  319. for article in articles:
  320. if article not in new_articles:
  321. await self.mapper.update_inner_article_status(
  322. article["id"], self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  323. )
  324. else:
  325. new_articles = articles
  326. if not new_articles:
  327. return
  328. # 批量获取 produce 信息
  329. produce_info_map: Dict[str, list] = {}
  330. for article in new_articles:
  331. source_id = article["source_id"]
  332. produce_info = await self.mapper.fetch_inner_articles_produce_detail(
  333. source_id
  334. )
  335. produce_info_map[article["wx_sn"]] = produce_info
  336. posts = self.tool.prepare_posts(new_articles, produce_info_map)
  337. submit_results = await self.tool.submit_decode_batch(
  338. posts, config_id=self.CONFIG_ID, skip_completed=True
  339. )
  340. posts_by_cid = {p["channelContentId"]: p for p in posts}
  341. for article in tqdm(new_articles):
  342. wx_sn = article["wx_sn"]
  343. article_id = article["id"]
  344. result = submit_results.get(wx_sn)
  345. await self._handle_result(
  346. article, wx_sn, result, posts_by_cid, self.CONFIG_ID
  347. )
  348. if not self._TEST_MODE:
  349. ok = result and result.get("status") != self.SubmitStatus.FAILED
  350. if ok:
  351. await self.mapper.update_inner_article_status(
  352. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  353. )
  354. else:
  355. # 提交失败或无响应,回锁为 INIT 等待下次重试
  356. await self.mapper.update_inner_article_status(
  357. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
  358. )
  359. async def deal(self):
  360. article_list = await self._acquire_articles()
  361. if not article_list:
  362. await self.log_service.log(
  363. contents={
  364. "task": "create_inner_decode_task",
  365. "message": "No more articles to decode",
  366. }
  367. )
  368. return
  369. await self._submit_and_record(article_list)
  370. await self.log_service.log(
  371. contents={
  372. "task": "create_inner_decode_task",
  373. "message": f"Processed {len(article_list)} articles",
  374. }
  375. )
  376. __all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]