import json from tqdm import tqdm from typing import Dict, List from app.core.database import DatabaseManager from app.core.observability import LogService from ._const import DecodeArticleConst from ._mapper import ( AdPlatformArticlesDecodeTaskMapper, InnerArticlesDecodeTaskMapper, ) from ._utils import AdPlatformArticlesDecodeUtils, InnerArticlesDecodeUtils class CreateAdPlatformArticlesDecodeTask(DecodeArticleConst): def __init__(self, pool: DatabaseManager, log_service: LogService): self.pool = pool self.log_service = log_service self.mapper = AdPlatformArticlesDecodeTaskMapper(self.pool) self.tool = AdPlatformArticlesDecodeUtils() async def _acquire_articles(self) -> List[Dict]: """获取待解构文章,并加锁(decode_status INIT → PROCESSING)""" article_list = await self.mapper.fetch_decode_articles() locked = [] for article in article_list: article_id = article["id"] acquired = await self.mapper.update_article_decode_status( article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING ) if acquired: locked.append(article) else: await self.log_service.log( contents={ "article_id": article_id, "task": "create_decode_task_v2", "status": "skip", "message": "acquire lock failed", } ) return locked async def _submit_and_record(self, articles: List[Dict]): if not articles: return posts = self.tool.prepare_posts(articles) submit_results = await self.tool.submit_decode_batch(posts) posts_by_wx = {p["channelContentId"]: p for p in posts} for article in articles: wx_sn = article["wx_sn"] article_id = article["id"] result = submit_results.get(wx_sn) if not result: await self.mapper.update_article_decode_status( article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT ) await self.log_service.log( contents={ "article_id": article_id, "wx_sn": wx_sn, "task": "create_decode_task_v2", "status": "fail", "message": "no response for channel_content_id, rolled back to INIT", } ) continue status = result.get("status") if status == self.SubmitStatus.FAILED: await self.mapper.update_article_decode_status( article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT ) await self.log_service.log( contents={ "article_id": article_id, "wx_sn": wx_sn, "task": "create_decode_task_v2", "status": "fail", "data": result, } ) continue if status == self.SubmitStatus.SUCCESS: # 已有解构结果,直接查询结果并落库 query_results = await self.tool.query_decode_results_batch([wx_sn]) result_data = query_results.get(wx_sn) if result_data and result_data.get("status") == self.QueryStatus.SUCCESS: data_content = result_data.get("dataContent") or "{}" html = result_data.get("html") await self.mapper.insert_decode_task( channel_content_id=wx_sn, content_id=article_id, source=self.SourceType.AD_PLATFORM, payload=json.dumps( posts_by_wx.get(wx_sn, {}), ensure_ascii=False ), remark="提交时已有解构结果,直接落库", ) await self.mapper.set_decode_result( channel_content_id=wx_sn, result=json.dumps( {"dataContent": data_content, "html": html}, ensure_ascii=False, ), remark="提交时已返回 SUCCESS,结果已落库", ) await self.mapper.update_article_decode_status( article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS ) await self.log_service.log( contents={ "article_id": article_id, "wx_sn": wx_sn, "task": "create_decode_task_v2", "status": "success", "message": "decode result already available on submit", } ) else: # 提交返回 SUCCESS 但查询不到结果,插入记录等待轮询 await self.mapper.insert_decode_task( channel_content_id=wx_sn, content_id=article_id, source=self.SourceType.AD_PLATFORM, payload=json.dumps( posts_by_wx.get(wx_sn, {}), ensure_ascii=False ), remark="提交返回SUCCESS,查询未果,等待轮询", ) await self.mapper.update_article_decode_status( article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS, ) await self.log_service.log( contents={ "article_id": article_id, "wx_sn": wx_sn, "task": "create_decode_task_v2", "status": "pending", "message": "submit SUCCESS but query not ready, inserted for polling", } ) elif status == self.SubmitStatus.PENDING: await self.mapper.insert_decode_task( channel_content_id=wx_sn, content_id=article_id, source=self.SourceType.AD_PLATFORM, payload=json.dumps( posts_by_wx.get(wx_sn, {}), ensure_ascii=False ), remark="任务已提交,等待轮询", ) await self.mapper.update_article_decode_status( article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS, ) await self.log_service.log( contents={ "article_id": article_id, "wx_sn": wx_sn, "task": "create_decode_task_v2", "status": "pending", "message": "task submitted, waiting for polling", } ) else: await self.mapper.update_article_decode_status( article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT ) await self.log_service.log( contents={ "article_id": article_id, "wx_sn": wx_sn, "task": "create_decode_task_v2", "status": "fail", "message": f"unexpected submit status: {status}, rolled back to INIT", "data": result, } ) async def deal(self): article_list = await self._acquire_articles() if not article_list: await self.log_service.log( contents={ "task": "create_decode_task_v2", "message": "No more articles to decode", } ) return await self._submit_and_record(article_list) await self.log_service.log( contents={ "task": "create_decode_task_v2", "message": f"Processed {len(article_list)} articles", } ) class CreateInnerArticlesDecodeTask(DecodeArticleConst): _TEST_MODE = False def __init__(self, pool: DatabaseManager, log_service: LogService): self.pool = pool self.log_service = log_service self.mapper = InnerArticlesDecodeTaskMapper(self.pool) self.tool = InnerArticlesDecodeUtils() async def _acquire_articles(self) -> List[Dict]: """获取待解构文章,并加锁(status INIT → PROCESSING)""" article_list = await self.mapper.fetch_inner_articles() if self._TEST_MODE: return article_list locked = [] for article in article_list: article_id = article["id"] acquired = await self.mapper.update_inner_article_status( article_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING ) if acquired: locked.append(article) else: await self.log_service.log( contents={ "article_id": article_id, "task": "create_inner_decode_task", "status": "skip", "message": "acquire lock failed", } ) return locked async def _handle_result( self, article: Dict, channel_content_id: str, result: Dict, posts_by_cid: Dict, config_id: int, ): wx_sn = article["wx_sn"] if not result: await self.log_service.log( contents={ "wx_sn": wx_sn, "task": "create_inner_decode_task", "status": "fail", "message": "no response for channel_content_id", } ) return status = result.get("status") if status == self.SubmitStatus.FAILED: await self.log_service.log( contents={ "wx_sn": wx_sn, "task": "create_inner_decode_task", "status": "fail", "data": result, } ) elif status == self.SubmitStatus.PENDING: await self.mapper.insert_decode_task( channel_content_id=channel_content_id, content_id=str(article.get("source_id", "")), source=self.SourceType.INNER, payload=json.dumps( posts_by_cid.get(channel_content_id, {}), ensure_ascii=False ), remark="内部文章解构任务已提交", ) elif status == self.SubmitStatus.SUCCESS: query_results = await self.tool.query_decode_results_batch( [channel_content_id], config_id=config_id ) result_data = query_results.get(channel_content_id) data_content = result_data.get("dataContent") if result_data else None if data_content: await self.mapper.insert_decode_task( channel_content_id=channel_content_id, content_id=str(article.get("source_id", "")), source=self.SourceType.INNER, payload=json.dumps( posts_by_cid.get(channel_content_id, {}), ensure_ascii=False ), remark="内部文章解构结果已获取", ) await self.mapper.set_decode_result( channel_content_id=channel_content_id, result=json.dumps( {"dataContent": data_content}, ensure_ascii=False ), ) else: await self.mapper.insert_decode_task( channel_content_id=channel_content_id, content_id=str(article.get("source_id", "")), source=self.SourceType.INNER, payload=json.dumps(result, ensure_ascii=False), remark="提交返回SUCCESS,查询未果,等待轮询", ) else: await self.log_service.log( contents={ "wx_sn": wx_sn, "task": "create_inner_decode_task", "status": "fail", "message": f"unexpected submit status: {status}", "data": result, } ) async def _submit_and_record(self, articles: List[Dict]): if not articles: return # 过滤已有任务记录的文章(测试模式跳过) if not self._TEST_MODE: all_wx_sns = [a["wx_sn"] for a in articles] existing = await self.mapper.fetch_existing_channel_content_ids(all_wx_sns) new_articles = [a for a in articles if a["wx_sn"] not in existing] skipped = len(articles) - len(new_articles) if skipped > 0: await self.log_service.log( contents={ "task": "create_inner_decode_task", "message": f"Skipped {skipped} already-submitted articles", } ) for article in articles: if article not in new_articles: await self.mapper.update_inner_article_status( article["id"], self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS ) else: new_articles = articles if not new_articles: return # 批量获取 produce 信息 produce_info_map: Dict[str, list] = {} for article in new_articles: source_id = article["source_id"] produce_info = await self.mapper.fetch_inner_articles_produce_detail( source_id ) produce_info_map[article["wx_sn"]] = produce_info posts = self.tool.prepare_posts(new_articles, produce_info_map) submit_results = await self.tool.submit_decode_batch( posts, config_id=self.CONFIG_ID, skip_completed=True ) posts_by_cid = {p["channelContentId"]: p for p in posts} for article in tqdm(new_articles): wx_sn = article["wx_sn"] article_id = article["id"] result = submit_results.get(wx_sn) await self._handle_result( article, wx_sn, result, posts_by_cid, self.CONFIG_ID ) if not self._TEST_MODE: ok = result and result.get("status") != self.SubmitStatus.FAILED if ok: await self.mapper.update_inner_article_status( article_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS ) else: # 提交失败或无响应,回锁为 INIT 等待下次重试 await self.mapper.update_inner_article_status( article_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT ) async def deal(self): article_list = await self._acquire_articles() if not article_list: await self.log_service.log( contents={ "task": "create_inner_decode_task", "message": "No more articles to decode", } ) return await self._submit_and_record(article_list) await self.log_service.log( contents={ "task": "create_inner_decode_task", "message": f"Processed {len(article_list)} articles", } ) __all__ = ["CreateAdPlatformArticlesDecodeTask", "CreateInnerArticlesDecodeTask"]