|
|
@@ -0,0 +1,409 @@
|
|
|
+import json
|
|
|
+
|
|
|
+from tqdm import tqdm
|
|
|
+from typing import Dict, List
|
|
|
+
|
|
|
+from app.core.database import DatabaseManager
|
|
|
+from app.core.observability import LogService
|
|
|
+
|
|
|
+from ._const import DecodeArticleConst
|
|
|
+from ._mapper import (
|
|
|
+ AdPlatformArticlesDecodeTaskMapper,
|
|
|
+ InnerArticlesDecodeTaskMapper,
|
|
|
+)
|
|
|
+from ._utils import AdPlatformArticlesDecodeUtils, InnerArticlesDecodeUtils
|
|
|
+
|
|
|
+
|
|
|
+class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
|
|
|
+ def __init__(self, pool: DatabaseManager, log_service: LogService):
|
|
|
+ self.pool = pool
|
|
|
+ self.log_service = log_service
|
|
|
+ self.mapper = AdPlatformArticlesDecodeTaskMapper(self.pool)
|
|
|
+ self.tool = AdPlatformArticlesDecodeUtils()
|
|
|
+
|
|
|
+ async def _acquire_articles(self) -> List[Dict]:
|
|
|
+ """获取待解构文章,并加锁(decode_status INIT → PROCESSING)"""
|
|
|
+ article_list = await self.mapper.fetch_decode_articles()
|
|
|
+ locked = []
|
|
|
+ for article in article_list:
|
|
|
+ article_id = article["id"]
|
|
|
+ acquired = await self.mapper.update_article_decode_status(
|
|
|
+ article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
|
|
|
+ )
|
|
|
+ if acquired:
|
|
|
+ locked.append(article)
|
|
|
+ else:
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "article_id": article_id,
|
|
|
+ "task": "create_decode_task_v2",
|
|
|
+ "status": "skip",
|
|
|
+ "message": "acquire lock failed",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ return locked
|
|
|
+
|
|
|
+ async def _submit_and_record(self, articles: List[Dict]):
|
|
|
+ if not articles:
|
|
|
+ return
|
|
|
+
|
|
|
+ posts = self.tool.prepare_posts(articles)
|
|
|
+ submit_results = await self.tool.submit_decode_batch(posts)
|
|
|
+ posts_by_wx = {p["channelContentId"]: p for p in posts}
|
|
|
+
|
|
|
+ for article in articles:
|
|
|
+ wx_sn = article["wx_sn"]
|
|
|
+ article_id = article["id"]
|
|
|
+ result = submit_results.get(wx_sn)
|
|
|
+
|
|
|
+ if not result:
|
|
|
+ await self.mapper.update_article_decode_status(
|
|
|
+ article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
|
|
|
+ )
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "article_id": article_id,
|
|
|
+ "wx_sn": wx_sn,
|
|
|
+ "task": "create_decode_task_v2",
|
|
|
+ "status": "fail",
|
|
|
+ "message": "no response for content_id, rolled back to INIT",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ continue
|
|
|
+
|
|
|
+ status = result.get("status")
|
|
|
+ if status == self.SubmitStatus.FAILED:
|
|
|
+ await self.mapper.update_article_decode_status(
|
|
|
+ article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
|
|
|
+ )
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "article_id": article_id,
|
|
|
+ "wx_sn": wx_sn,
|
|
|
+ "task": "create_decode_task_v2",
|
|
|
+ "status": "fail",
|
|
|
+ "data": result,
|
|
|
+ }
|
|
|
+ )
|
|
|
+ continue
|
|
|
+
|
|
|
+ if status == self.SubmitStatus.SUCCESS:
|
|
|
+ # 已有解构结果,直接查询结果并落库
|
|
|
+ query_results = await self.tool.query_decode_results_batch([wx_sn])
|
|
|
+ result_data = query_results.get(wx_sn)
|
|
|
+ if (
|
|
|
+ result_data
|
|
|
+ and result_data.get("status") == self.QueryStatus.SUCCESS
|
|
|
+ ):
|
|
|
+ data_content = result_data.get("dataContent") or "{}"
|
|
|
+ html = result_data.get("html")
|
|
|
+ await self.mapper.insert_decode_task(
|
|
|
+ source_id=wx_sn,
|
|
|
+ source=self.SourceType.AD_PLATFORM,
|
|
|
+ payload=json.dumps(
|
|
|
+ posts_by_wx.get(wx_sn, {}), ensure_ascii=False
|
|
|
+ ),
|
|
|
+ remark="提交时已有解构结果,直接落库",
|
|
|
+ )
|
|
|
+ await self.mapper.set_decode_result(
|
|
|
+ source_id=wx_sn,
|
|
|
+ result=json.dumps(
|
|
|
+ {"dataContent": data_content, "html": html},
|
|
|
+ ensure_ascii=False,
|
|
|
+ ),
|
|
|
+ remark="提交时已返回 SUCCESS,结果已落库",
|
|
|
+ )
|
|
|
+ await self.mapper.update_article_decode_status(
|
|
|
+ article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
|
|
|
+ )
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "article_id": article_id,
|
|
|
+ "wx_sn": wx_sn,
|
|
|
+ "task": "create_decode_task_v2",
|
|
|
+ "status": "success",
|
|
|
+ "message": "decode result already available on submit",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ # 提交返回 SUCCESS 但查询不到结果,插入记录等待轮询
|
|
|
+ await self.mapper.insert_decode_task(
|
|
|
+ source_id=wx_sn,
|
|
|
+ source=self.SourceType.AD_PLATFORM,
|
|
|
+ payload=json.dumps(
|
|
|
+ posts_by_wx.get(wx_sn, {}), ensure_ascii=False
|
|
|
+ ),
|
|
|
+ remark="提交返回SUCCESS,查询未果,等待轮询",
|
|
|
+ status=self.TaskStatus.PROCESSING,
|
|
|
+ )
|
|
|
+ await self.mapper.update_article_decode_status(
|
|
|
+ article_id,
|
|
|
+ self.TaskStatus.PROCESSING,
|
|
|
+ self.TaskStatus.SUCCESS,
|
|
|
+ )
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "article_id": article_id,
|
|
|
+ "wx_sn": wx_sn,
|
|
|
+ "task": "create_decode_task_v2",
|
|
|
+ "status": "pending",
|
|
|
+ "message": "submit SUCCESS but query not ready, inserted for polling",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ elif status == self.SubmitStatus.PENDING:
|
|
|
+ await self.mapper.insert_decode_task(
|
|
|
+ source_id=wx_sn,
|
|
|
+ source=self.SourceType.AD_PLATFORM,
|
|
|
+ payload=json.dumps(posts_by_wx.get(wx_sn, {}), ensure_ascii=False),
|
|
|
+ remark="任务已提交,等待轮询",
|
|
|
+ status=self.TaskStatus.PROCESSING,
|
|
|
+ )
|
|
|
+ await self.mapper.update_article_decode_status(
|
|
|
+ article_id,
|
|
|
+ self.TaskStatus.PROCESSING,
|
|
|
+ self.TaskStatus.SUCCESS,
|
|
|
+ )
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "article_id": article_id,
|
|
|
+ "wx_sn": wx_sn,
|
|
|
+ "task": "create_decode_task_v2",
|
|
|
+ "status": "pending",
|
|
|
+ "message": "task submitted, waiting for polling",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ await self.mapper.update_article_decode_status(
|
|
|
+ article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
|
|
|
+ )
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "article_id": article_id,
|
|
|
+ "wx_sn": wx_sn,
|
|
|
+ "task": "create_decode_task_v2",
|
|
|
+ "status": "fail",
|
|
|
+ "message": f"unexpected submit status: {status}, rolled back to INIT",
|
|
|
+ "data": result,
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ async def deal(self):
|
|
|
+ article_list = await self._acquire_articles()
|
|
|
+ if not article_list:
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "task": "create_decode_task_v2",
|
|
|
+ "message": "No more articles to decode",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ await self._submit_and_record(article_list)
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "task": "create_decode_task_v2",
|
|
|
+ "message": f"Processed {len(article_list)} articles",
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+class CreateInnerArticlesDecodeTask(DecodeArticleConst):
|
|
|
+ _TEST_MODE = False
|
|
|
+
|
|
|
+ def __init__(self, pool: DatabaseManager, log_service: LogService):
|
|
|
+ self.pool = pool
|
|
|
+ self.log_service = log_service
|
|
|
+ self.mapper = InnerArticlesDecodeTaskMapper(self.pool)
|
|
|
+ self.tool = InnerArticlesDecodeUtils()
|
|
|
+
|
|
|
+ async def _acquire_articles(self) -> List[Dict]:
|
|
|
+ """获取待解构文章,并加锁(status INIT → PROCESSING)"""
|
|
|
+ article_list = await self.mapper.fetch_inner_articles()
|
|
|
+ if self._TEST_MODE:
|
|
|
+ return article_list
|
|
|
+
|
|
|
+ locked = []
|
|
|
+ for article in article_list:
|
|
|
+ article_id = article["id"]
|
|
|
+ acquired = await self.mapper.update_inner_article_status(
|
|
|
+ article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
|
|
|
+ )
|
|
|
+ if acquired:
|
|
|
+ locked.append(article)
|
|
|
+ else:
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "article_id": article_id,
|
|
|
+ "task": "create_inner_decode_task",
|
|
|
+ "status": "skip",
|
|
|
+ "message": "acquire lock failed",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ return locked
|
|
|
+
|
|
|
+ async def _handle_result(
|
|
|
+ self,
|
|
|
+ article: Dict,
|
|
|
+ source_id: str,
|
|
|
+ result: Dict,
|
|
|
+ posts_by_cid: Dict,
|
|
|
+ config_id: int,
|
|
|
+ ):
|
|
|
+ if not result:
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "source_id": source_id,
|
|
|
+ "task": "create_inner_decode_task",
|
|
|
+ "status": "fail",
|
|
|
+ "message": "no response for source_id",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ status = result.get("status")
|
|
|
+ if status == self.SubmitStatus.FAILED:
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "source_id": source_id,
|
|
|
+ "task": "create_inner_decode_task",
|
|
|
+ "status": "fail",
|
|
|
+ "data": result,
|
|
|
+ }
|
|
|
+ )
|
|
|
+ elif status == self.SubmitStatus.PENDING:
|
|
|
+ await self.mapper.insert_decode_task(
|
|
|
+ source_id=source_id,
|
|
|
+ source=self.SourceType.INNER,
|
|
|
+ payload=json.dumps(posts_by_cid.get(source_id, {}), ensure_ascii=False),
|
|
|
+ remark="内部文章解构任务已提交",
|
|
|
+ status=self.TaskStatus.PROCESSING,
|
|
|
+ )
|
|
|
+ elif status == self.SubmitStatus.SUCCESS:
|
|
|
+ query_results = await self.tool.query_decode_results_batch(
|
|
|
+ [source_id], config_id=config_id
|
|
|
+ )
|
|
|
+ result_data = query_results.get(source_id)
|
|
|
+ data_content = result_data.get("dataContent") if result_data else None
|
|
|
+ if data_content:
|
|
|
+ await self.mapper.insert_decode_task(
|
|
|
+ source_id=source_id,
|
|
|
+ source=self.SourceType.INNER,
|
|
|
+ payload=json.dumps(
|
|
|
+ posts_by_cid.get(source_id, {}), ensure_ascii=False
|
|
|
+ ),
|
|
|
+ remark="内部文章解构结果已获取",
|
|
|
+ )
|
|
|
+ await self.mapper.set_decode_result(
|
|
|
+ source_id=source_id,
|
|
|
+ result=json.dumps(
|
|
|
+ {"dataContent": data_content}, ensure_ascii=False
|
|
|
+ ),
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ await self.mapper.insert_decode_task(
|
|
|
+ source_id=source_id,
|
|
|
+ source=self.SourceType.INNER,
|
|
|
+ payload=json.dumps(result, ensure_ascii=False),
|
|
|
+ remark="提交返回SUCCESS,查询未果,等待轮询",
|
|
|
+ status=self.TaskStatus.PROCESSING,
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "source_id": source_id,
|
|
|
+ "task": "create_inner_decode_task",
|
|
|
+ "status": "fail",
|
|
|
+ "message": f"unexpected submit status: {status}",
|
|
|
+ "data": result,
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ async def _submit_and_record(self, articles: List[Dict]):
|
|
|
+ if not articles:
|
|
|
+ return
|
|
|
+
|
|
|
+ # 过滤已有任务记录的文章(测试模式跳过)
|
|
|
+ if not self._TEST_MODE:
|
|
|
+ all_source_ids = [str(a["source_id"]) for a in articles]
|
|
|
+ existing = await self.mapper.fetch_existing_source_ids(all_source_ids)
|
|
|
+ new_articles = [a for a in articles if str(a["source_id"]) not in existing]
|
|
|
+ skipped = len(articles) - len(new_articles)
|
|
|
+ if skipped > 0:
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "task": "create_inner_decode_task",
|
|
|
+ "message": f"Skipped {skipped} already-submitted articles",
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ for article in articles:
|
|
|
+ if article not in new_articles:
|
|
|
+ await self.mapper.update_inner_article_status(
|
|
|
+ article["id"],
|
|
|
+ self.TaskStatus.PROCESSING,
|
|
|
+ self.TaskStatus.SUCCESS,
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ new_articles = articles
|
|
|
+
|
|
|
+ if not new_articles:
|
|
|
+ return
|
|
|
+
|
|
|
+ # 批量获取 produce 信息
|
|
|
+ produce_info_map: Dict[str, list] = {}
|
|
|
+ for article in new_articles:
|
|
|
+ source_id = article["source_id"]
|
|
|
+ produce_info = await self.mapper.fetch_inner_articles_produce_detail(
|
|
|
+ source_id
|
|
|
+ )
|
|
|
+ produce_info_map[str(article["source_id"])] = produce_info
|
|
|
+
|
|
|
+ posts = self.tool.prepare_posts(new_articles, produce_info_map)
|
|
|
+
|
|
|
+ submit_results = await self.tool.submit_decode_batch(
|
|
|
+ posts, config_id=self.CONFIG_ID, skip_completed=True
|
|
|
+ )
|
|
|
+ posts_by_cid = {p["channelContentId"]: p for p in posts}
|
|
|
+
|
|
|
+ for article in tqdm(new_articles):
|
|
|
+ source_id = str(article["source_id"])
|
|
|
+ article_id = article["id"]
|
|
|
+
|
|
|
+ result = submit_results.get(source_id)
|
|
|
+ await self._handle_result(
|
|
|
+ article, source_id, result, posts_by_cid, self.CONFIG_ID
|
|
|
+ )
|
|
|
+
|
|
|
+ if not self._TEST_MODE:
|
|
|
+ ok = result and result.get("status") != self.SubmitStatus.FAILED
|
|
|
+ if ok:
|
|
|
+ await self.mapper.update_inner_article_status(
|
|
|
+ article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ # 提交失败或无响应,回锁为 INIT 等待下次重试
|
|
|
+ await self.mapper.update_inner_article_status(
|
|
|
+ article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
|
|
|
+ )
|
|
|
+
|
|
|
+ async def deal(self):
|
|
|
+ article_list = await self._acquire_articles()
|
|
|
+ if not article_list:
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "task": "create_inner_decode_task",
|
|
|
+ "message": "No more articles to decode",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ await self._submit_and_record(article_list)
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "task": "create_inner_decode_task",
|
|
|
+ "message": f"Processed {len(article_list)} articles",
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+__all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]
|