create_decode_tasks.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. import json
  2. from tqdm import tqdm
  3. from typing import Dict, List
  4. from app.core.database import DatabaseManager
  5. from app.core.observability import LogService
  6. from ._const import DecodeArticleConst
  7. from ._mapper import (
  8. AdPlatformArticlesDecodeTaskMapper,
  9. InnerArticlesDecodeTaskMapper,
  10. )
  11. from ._utils import AdPlatformArticlesDecodeUtils, InnerArticlesDecodeUtils
  12. class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
  13. def __init__(self, pool: DatabaseManager, log_service: LogService):
  14. self.pool = pool
  15. self.log_service = log_service
  16. self.mapper = AdPlatformArticlesDecodeTaskMapper(self.pool)
  17. self.tool = AdPlatformArticlesDecodeUtils()
  18. async def _acquire_articles(self) -> List[Dict]:
  19. """获取待解构文章,并加锁(decode_status INIT → PROCESSING)"""
  20. article_list = await self.mapper.fetch_decode_articles()
  21. locked = []
  22. for article in article_list:
  23. article_id = article["id"]
  24. acquired = await self.mapper.update_article_decode_status(
  25. article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  26. )
  27. if acquired:
  28. locked.append(article)
  29. else:
  30. await self.log_service.log(
  31. contents={
  32. "article_id": article_id,
  33. "task": "create_decode_task_v2",
  34. "status": "skip",
  35. "message": "acquire lock failed",
  36. }
  37. )
  38. return locked
  39. async def _submit_and_record(self, articles: List[Dict]):
  40. if not articles:
  41. return
  42. posts = self.tool.prepare_posts(articles)
  43. submit_results = await self.tool.submit_decode_batch(posts)
  44. posts_by_wx = {p["channelContentId"]: p for p in posts}
  45. for article in articles:
  46. wx_sn = article["wx_sn"]
  47. article_id = article["id"]
  48. result = submit_results.get(wx_sn)
  49. if not result:
  50. await self.mapper.update_article_decode_status(
  51. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
  52. )
  53. await self.log_service.log(
  54. contents={
  55. "article_id": article_id,
  56. "wx_sn": wx_sn,
  57. "task": "create_decode_task_v2",
  58. "status": "fail",
  59. "message": "no response for channel_content_id",
  60. }
  61. )
  62. continue
  63. status = result.get("status")
  64. if status == self.SubmitStatus.FAILED:
  65. await self.mapper.update_article_decode_status(
  66. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
  67. )
  68. await self.log_service.log(
  69. contents={
  70. "article_id": article_id,
  71. "wx_sn": wx_sn,
  72. "task": "create_decode_task_v2",
  73. "status": "fail",
  74. "data": result,
  75. }
  76. )
  77. continue
  78. if status == self.SubmitStatus.SUCCESS:
  79. # 已有解构结果,直接查询结果并落库
  80. query_results = await self.tool.query_decode_results_batch([wx_sn])
  81. result_data = query_results.get(wx_sn)
  82. if result_data and result_data.get("status") == self.QueryStatus.SUCCESS:
  83. data_content = result_data.get("dataContent") or "{}"
  84. html = result_data.get("html")
  85. await self.mapper.insert_decode_task(
  86. channel_content_id=wx_sn,
  87. content_id=article_id,
  88. source=self.SourceType.AD_PLATFORM,
  89. payload=json.dumps(
  90. posts_by_wx.get(wx_sn, {}), ensure_ascii=False
  91. ),
  92. remark="提交时已有解构结果,直接落库",
  93. )
  94. await self.mapper.set_decode_result(
  95. channel_content_id=wx_sn,
  96. result=json.dumps(
  97. {"dataContent": data_content, "html": html},
  98. ensure_ascii=False,
  99. ),
  100. remark="提交时已返回 SUCCESS,结果已落库",
  101. )
  102. await self.mapper.update_article_decode_status(
  103. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
  104. )
  105. await self.log_service.log(
  106. contents={
  107. "article_id": article_id,
  108. "wx_sn": wx_sn,
  109. "task": "create_decode_task_v2",
  110. "status": "success",
  111. "message": "decode result already available on submit",
  112. }
  113. )
  114. else:
  115. # 提交返回 SUCCESS 但查询不到结果,插入记录等待轮询
  116. await self.mapper.insert_decode_task(
  117. channel_content_id=wx_sn,
  118. content_id=article_id,
  119. source=self.SourceType.AD_PLATFORM,
  120. payload=json.dumps(
  121. posts_by_wx.get(wx_sn, {}), ensure_ascii=False
  122. ),
  123. remark="提交返回SUCCESS,查询未果,等待轮询",
  124. )
  125. await self.mapper.update_article_decode_status(
  126. article_id,
  127. self.TaskStatus.PROCESSING,
  128. self.TaskStatus.SUCCESS,
  129. )
  130. await self.log_service.log(
  131. contents={
  132. "article_id": article_id,
  133. "wx_sn": wx_sn,
  134. "task": "create_decode_task_v2",
  135. "status": "pending",
  136. "message": "submit SUCCESS but query not ready, inserted for polling",
  137. }
  138. )
  139. elif status == self.SubmitStatus.PENDING:
  140. await self.mapper.insert_decode_task(
  141. channel_content_id=wx_sn,
  142. content_id=article_id,
  143. source=self.SourceType.AD_PLATFORM,
  144. payload=json.dumps(
  145. posts_by_wx.get(wx_sn, {}), ensure_ascii=False
  146. ),
  147. remark="任务已提交,等待轮询",
  148. )
  149. await self.mapper.update_article_decode_status(
  150. article_id,
  151. self.TaskStatus.PROCESSING,
  152. self.TaskStatus.SUCCESS,
  153. )
  154. await self.log_service.log(
  155. contents={
  156. "article_id": article_id,
  157. "wx_sn": wx_sn,
  158. "task": "create_decode_task_v2",
  159. "status": "pending",
  160. "message": "task submitted, waiting for polling",
  161. }
  162. )
  163. else:
  164. await self.mapper.update_article_decode_status(
  165. article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
  166. )
  167. await self.log_service.log(
  168. contents={
  169. "article_id": article_id,
  170. "wx_sn": wx_sn,
  171. "task": "create_decode_task_v2",
  172. "status": "fail",
  173. "message": f"unexpected submit status: {status}",
  174. "data": result,
  175. }
  176. )
  177. async def deal(self):
  178. article_list = await self._acquire_articles()
  179. if not article_list:
  180. await self.log_service.log(
  181. contents={
  182. "task": "create_decode_task_v2",
  183. "message": "No more articles to decode",
  184. }
  185. )
  186. return
  187. await self._submit_and_record(article_list)
  188. await self.log_service.log(
  189. contents={
  190. "task": "create_decode_task_v2",
  191. "message": f"Processed {len(article_list)} articles",
  192. }
  193. )
  194. class CreateInnerArticlesDecodeTask(DecodeArticleConst):
  195. def __init__(self, pool: DatabaseManager, log_service: LogService):
  196. self.pool = pool
  197. self.log_service = log_service
  198. self.mapper = InnerArticlesDecodeTaskMapper(self.pool)
  199. self.tool = InnerArticlesDecodeUtils()
  200. async def deal(self):
  201. article_list = await self.mapper.fetch_inner_articles()
  202. if not article_list:
  203. await self.log_service.log(
  204. contents={
  205. "task": "create_inner_decode_task_v2",
  206. "message": "No more articles to decode",
  207. }
  208. )
  209. return
  210. # 过滤已有任务记录的文章
  211. all_wx_sns = [a["wx_sn"] for a in article_list]
  212. existing = await self.mapper.fetch_existing_channel_content_ids(all_wx_sns)
  213. new_articles = [a for a in article_list if a["wx_sn"] not in existing]
  214. skipped = len(article_list) - len(new_articles)
  215. if skipped > 0:
  216. await self.log_service.log(
  217. contents={
  218. "task": "create_inner_decode_task_v2",
  219. "message": f"Skipped {skipped} already-submitted articles",
  220. }
  221. )
  222. if not new_articles:
  223. await self.log_service.log(
  224. contents={
  225. "task": "create_inner_decode_task_v2",
  226. "message": "All articles already submitted",
  227. }
  228. )
  229. return
  230. # 批量获取 produce 信息
  231. produce_info_map: Dict[str, list] = {}
  232. for article in new_articles:
  233. source_id = article["source_id"]
  234. produce_info = await self.mapper.fetch_inner_articles_produce_detail(
  235. source_id
  236. )
  237. produce_info_map[article["wx_sn"]] = produce_info
  238. posts = self.tool.prepare_posts(new_articles, produce_info_map)
  239. submit_results = await self.tool.submit_decode_batch(posts)
  240. posts_by_wx = {p["channelContentId"]: p for p in posts}
  241. for article in tqdm(new_articles):
  242. wx_sn = article["wx_sn"]
  243. result = submit_results.get(wx_sn)
  244. if not result:
  245. await self.log_service.log(
  246. contents={
  247. "wx_sn": wx_sn,
  248. "task": "create_inner_decode_task_v2",
  249. "status": "fail",
  250. "message": "no response for channel_content_id",
  251. }
  252. )
  253. continue
  254. status = result.get("status")
  255. if status == self.SubmitStatus.FAILED:
  256. await self.log_service.log(
  257. contents={
  258. "wx_sn": wx_sn,
  259. "task": "create_inner_decode_task_v2",
  260. "status": "fail",
  261. "data": result,
  262. }
  263. )
  264. elif status == self.SubmitStatus.PENDING:
  265. await self.mapper.insert_decode_task(
  266. channel_content_id=wx_sn,
  267. content_id=str(article.get("source_id", "")),
  268. source=self.SourceType.INNER,
  269. payload=json.dumps(
  270. posts_by_wx.get(wx_sn, {}), ensure_ascii=False
  271. ),
  272. remark="内部文章解构任务已提交",
  273. )
  274. elif status == self.SubmitStatus.SUCCESS:
  275. query_results = await self.tool.query_decode_results_batch([wx_sn])
  276. result_data = query_results.get(wx_sn)
  277. data_content = result_data.get("dataContent") if result_data else None
  278. if data_content:
  279. await self.mapper.insert_decode_task(
  280. channel_content_id=wx_sn,
  281. content_id=str(article.get("source_id", "")),
  282. source=self.SourceType.INNER,
  283. payload=json.dumps(
  284. posts_by_wx.get(wx_sn, {}), ensure_ascii=False
  285. ),
  286. remark="内部文章解构结果已获取",
  287. )
  288. await self.mapper.set_decode_result(
  289. channel_content_id=wx_sn,
  290. result=json.dumps(
  291. {"dataContent": data_content}, ensure_ascii=False
  292. ),
  293. )
  294. else:
  295. await self.mapper.insert_decode_task(
  296. channel_content_id=wx_sn,
  297. content_id=str(article.get("source_id", "")),
  298. source=self.SourceType.INNER,
  299. payload=json.dumps(result, ensure_ascii=False),
  300. remark="提交返回SUCCESS,查询未果,等待轮询",
  301. )
  302. else:
  303. await self.log_service.log(
  304. contents={
  305. "wx_sn": wx_sn,
  306. "task": "create_inner_decode_task_v2",
  307. "status": "fail",
  308. "message": f"unexpected submit status: {status}",
  309. "data": result,
  310. }
  311. )
  312. await self.log_service.log(
  313. contents={
  314. "task": "create_inner_decode_task_v2",
  315. "message": f"Processed {len(new_articles)} articles",
  316. }
  317. )
  318. __all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]