import traceback from app.core.database import DatabaseManager from app.core.observability import LogService from ._const import AdPlatformArticlePublishConst from ._utils import AdPlatformArticlePublishUtils from ._mapper import AdPlatformArticlePublishMapper class AdPlatformArticlePublishTask(AdPlatformArticlePublishConst): def __init__(self, pool: DatabaseManager, log_service: LogService): self.mapper = AdPlatformArticlePublishMapper(pool) self.tool = AdPlatformArticlePublishUtils() self.log_service = log_service async def _process_single_plan(self, plan: str): category = self.CATEGORY_MAP[plan] id_list: list = [] try: # fetch candidate articles candidate_articles = await self.mapper.fetch_candidate_articles( category=category ) if not candidate_articles: return id_list = [article["id"] for article in candidate_articles] # update cold start status await self.mapper.update_cold_start_status( id_list=id_list, ori_status=self.ColdStartStatus.INIT, new_status=self.ColdStartStatus.PROCESSING, ) # start process plan_response = await self.tool.create_crawler_plan( category=category, article_list=candidate_articles ) # save to db plan_info = self.tool.process_plan_info(plan_response) await self.mapper.record_crawler_plan(plan_info) # bind to generate plan response = await self.tool.bind_to_generate_task(plan, plan_response) await self.log_service.log( contents={ "task": "article_pool_cold_start", "status": "success", "message": "绑定至生成计划成功", "data": response, } ) # 修改状态 await self.mapper.update_cold_start_status( id_list=id_list, ori_status=self.ColdStartStatus.PROCESSING, new_status=self.ColdStartStatus.DONE, ) except Exception as e: # 回滚状态:若已更新为 PROCESSING 则改为 FAILED,便于重试或排查 if id_list: try: await self.mapper.update_cold_start_status( id_list=id_list, ori_status=self.ColdStartStatus.PROCESSING, new_status=self.ColdStartStatus.FAILED, ) except Exception as rollback_err: await self.log_service.log( contents={ "task": "ad_platform_article_cold_start", "status": "fail", "message": "回滚冷启状态失败", "rollback_error": str(rollback_err), } ) await self.log_service.log( contents={ "task": "ad_platform_article_cold_start", "status": "fail", "plan": plan, "category": category, "error": str(e), "traceback": traceback.format_exc(), } ) await self.tool.alert( title="冷启动任务异常", detail={ "plan": plan, "category": category, "error": str(e), "traceback": traceback.format_exc(), }, ) async def deal(self): """函数入口""" try: for plan in self.PLAN_LIST: await self._process_single_plan(plan) except Exception as e: await self.log_service.log( contents={ "task": "ad_platform_article_cold_start", "status": "fail", "message": "deal 入口异常", "error": str(e), "traceback": traceback.format_exc(), } ) await self.tool.alert( title="互选平台文章冷启动任务-入口异常", detail={ "error": str(e), "traceback": traceback.format_exc(), }, ) raise __all__ = ["AdPlatformArticlePublishTask"]