| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- import json
- from tqdm import tqdm
- from typing import Dict, List
- from app.core.database import DatabaseManager
- from app.core.observability import LogService
- from ._const import DecodeArticleConst
- from ._mapper import (
- AdPlatformArticlesDecodeTaskMapper,
- InnerArticlesDecodeTaskMapper,
- )
- from ._utils import AdPlatformArticlesDecodeUtils, InnerArticlesDecodeUtils
- class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst):
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- self.pool = pool
- self.log_service = log_service
- self.mapper = AdPlatformArticlesDecodeTaskMapper(self.pool)
- self.tool = AdPlatformArticlesDecodeUtils()
- async def _acquire_articles(self) -> List[Dict]:
- """获取待解构文章,并加锁(decode_status INIT → PROCESSING)"""
- article_list = await self.mapper.fetch_decode_articles()
- locked = []
- for article in article_list:
- article_id = article["id"]
- acquired = await self.mapper.update_article_decode_status(
- article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
- )
- if acquired:
- locked.append(article)
- else:
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "task": "create_decode_task_v2",
- "status": "skip",
- "message": "acquire lock failed",
- }
- )
- return locked
- async def _submit_and_record(self, articles: List[Dict]):
- if not articles:
- return
- posts = self.tool.prepare_posts(articles)
- submit_results = await self.tool.submit_decode_batch(posts)
- posts_by_wx = {p["channelContentId"]: p for p in posts}
- for article in articles:
- wx_sn = article["wx_sn"]
- article_id = article["id"]
- result = submit_results.get(wx_sn)
- if not result:
- await self.mapper.update_article_decode_status(
- article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
- )
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "wx_sn": wx_sn,
- "task": "create_decode_task_v2",
- "status": "fail",
- "message": "no response for channel_content_id",
- }
- )
- continue
- status = result.get("status")
- if status == self.SubmitStatus.FAILED:
- await self.mapper.update_article_decode_status(
- article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
- )
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "wx_sn": wx_sn,
- "task": "create_decode_task_v2",
- "status": "fail",
- "data": result,
- }
- )
- continue
- if status == self.SubmitStatus.SUCCESS:
- # 已有解构结果,直接查询结果并落库
- query_results = await self.tool.query_decode_results_batch([wx_sn])
- result_data = query_results.get(wx_sn)
- if result_data and result_data.get("status") == self.QueryStatus.SUCCESS:
- data_content = result_data.get("dataContent") or "{}"
- html = result_data.get("html")
- await self.mapper.insert_decode_task(
- channel_content_id=wx_sn,
- content_id=article_id,
- source=self.SourceType.AD_PLATFORM,
- payload=json.dumps(
- posts_by_wx.get(wx_sn, {}), ensure_ascii=False
- ),
- remark="提交时已有解构结果,直接落库",
- )
- await self.mapper.set_decode_result(
- channel_content_id=wx_sn,
- result=json.dumps(
- {"dataContent": data_content, "html": html},
- ensure_ascii=False,
- ),
- remark="提交时已返回 SUCCESS,结果已落库",
- )
- await self.mapper.update_article_decode_status(
- article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
- )
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "wx_sn": wx_sn,
- "task": "create_decode_task_v2",
- "status": "success",
- "message": "decode result already available on submit",
- }
- )
- else:
- # 提交返回 SUCCESS 但查询不到结果,插入记录等待轮询
- await self.mapper.insert_decode_task(
- channel_content_id=wx_sn,
- content_id=article_id,
- source=self.SourceType.AD_PLATFORM,
- payload=json.dumps(
- posts_by_wx.get(wx_sn, {}), ensure_ascii=False
- ),
- remark="提交返回SUCCESS,查询未果,等待轮询",
- )
- await self.mapper.update_article_decode_status(
- article_id,
- self.TaskStatus.PROCESSING,
- self.TaskStatus.SUCCESS,
- )
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "wx_sn": wx_sn,
- "task": "create_decode_task_v2",
- "status": "pending",
- "message": "submit SUCCESS but query not ready, inserted for polling",
- }
- )
- elif status == self.SubmitStatus.PENDING:
- await self.mapper.insert_decode_task(
- channel_content_id=wx_sn,
- content_id=article_id,
- source=self.SourceType.AD_PLATFORM,
- payload=json.dumps(
- posts_by_wx.get(wx_sn, {}), ensure_ascii=False
- ),
- remark="任务已提交,等待轮询",
- )
- await self.mapper.update_article_decode_status(
- article_id,
- self.TaskStatus.PROCESSING,
- self.TaskStatus.SUCCESS,
- )
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "wx_sn": wx_sn,
- "task": "create_decode_task_v2",
- "status": "pending",
- "message": "task submitted, waiting for polling",
- }
- )
- else:
- await self.mapper.update_article_decode_status(
- article_id, self.TaskStatus.PROCESSING, self.TaskStatus.FAILED
- )
- await self.log_service.log(
- contents={
- "article_id": article_id,
- "wx_sn": wx_sn,
- "task": "create_decode_task_v2",
- "status": "fail",
- "message": f"unexpected submit status: {status}",
- "data": result,
- }
- )
- async def deal(self):
- article_list = await self._acquire_articles()
- if not article_list:
- await self.log_service.log(
- contents={
- "task": "create_decode_task_v2",
- "message": "No more articles to decode",
- }
- )
- return
- await self._submit_and_record(article_list)
- await self.log_service.log(
- contents={
- "task": "create_decode_task_v2",
- "message": f"Processed {len(article_list)} articles",
- }
- )
- class CreateInnerArticlesDecodeTask(DecodeArticleConst):
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- self.pool = pool
- self.log_service = log_service
- self.mapper = InnerArticlesDecodeTaskMapper(self.pool)
- self.tool = InnerArticlesDecodeUtils()
- async def deal(self):
- article_list = await self.mapper.fetch_inner_articles()
- if not article_list:
- await self.log_service.log(
- contents={
- "task": "create_inner_decode_task_v2",
- "message": "No more articles to decode",
- }
- )
- return
- # 过滤已有任务记录的文章
- all_wx_sns = [a["wx_sn"] for a in article_list]
- existing = await self.mapper.fetch_existing_channel_content_ids(all_wx_sns)
- new_articles = [a for a in article_list if a["wx_sn"] not in existing]
- skipped = len(article_list) - len(new_articles)
- if skipped > 0:
- await self.log_service.log(
- contents={
- "task": "create_inner_decode_task_v2",
- "message": f"Skipped {skipped} already-submitted articles",
- }
- )
- if not new_articles:
- await self.log_service.log(
- contents={
- "task": "create_inner_decode_task_v2",
- "message": "All articles already submitted",
- }
- )
- return
- # 批量获取 produce 信息
- produce_info_map: Dict[str, list] = {}
- for article in new_articles:
- source_id = article["source_id"]
- produce_info = await self.mapper.fetch_inner_articles_produce_detail(
- source_id
- )
- produce_info_map[article["wx_sn"]] = produce_info
- posts = self.tool.prepare_posts(new_articles, produce_info_map)
- submit_results = await self.tool.submit_decode_batch(posts)
- posts_by_wx = {p["channelContentId"]: p for p in posts}
- for article in tqdm(new_articles):
- wx_sn = article["wx_sn"]
- result = submit_results.get(wx_sn)
- if not result:
- await self.log_service.log(
- contents={
- "wx_sn": wx_sn,
- "task": "create_inner_decode_task_v2",
- "status": "fail",
- "message": "no response for channel_content_id",
- }
- )
- continue
- status = result.get("status")
- if status == self.SubmitStatus.FAILED:
- await self.log_service.log(
- contents={
- "wx_sn": wx_sn,
- "task": "create_inner_decode_task_v2",
- "status": "fail",
- "data": result,
- }
- )
- elif status == self.SubmitStatus.PENDING:
- await self.mapper.insert_decode_task(
- channel_content_id=wx_sn,
- content_id=str(article.get("source_id", "")),
- source=self.SourceType.INNER,
- payload=json.dumps(
- posts_by_wx.get(wx_sn, {}), ensure_ascii=False
- ),
- remark="内部文章解构任务已提交",
- )
- elif status == self.SubmitStatus.SUCCESS:
- query_results = await self.tool.query_decode_results_batch([wx_sn])
- result_data = query_results.get(wx_sn)
- data_content = result_data.get("dataContent") if result_data else None
- if data_content:
- await self.mapper.insert_decode_task(
- channel_content_id=wx_sn,
- content_id=str(article.get("source_id", "")),
- source=self.SourceType.INNER,
- payload=json.dumps(
- posts_by_wx.get(wx_sn, {}), ensure_ascii=False
- ),
- remark="内部文章解构结果已获取",
- )
- await self.mapper.set_decode_result(
- channel_content_id=wx_sn,
- result=json.dumps(
- {"dataContent": data_content}, ensure_ascii=False
- ),
- )
- else:
- await self.mapper.insert_decode_task(
- channel_content_id=wx_sn,
- content_id=str(article.get("source_id", "")),
- source=self.SourceType.INNER,
- payload=json.dumps(result, ensure_ascii=False),
- remark="提交返回SUCCESS,查询未果,等待轮询",
- )
- else:
- await self.log_service.log(
- contents={
- "wx_sn": wx_sn,
- "task": "create_inner_decode_task_v2",
- "status": "fail",
- "message": f"unexpected submit status: {status}",
- "data": result,
- }
- )
- await self.log_service.log(
- contents={
- "task": "create_inner_decode_task_v2",
- "message": f"Processed {len(new_articles)} articles",
- }
- )
- __all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]
|