| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- import traceback
- from app.core.database import DatabaseManager
- from app.core.observability import LogService
- from ._const import AdPlatformArticlePublishConst
- from ._utils import AdPlatformArticlePublishUtils
- from ._mapper import AdPlatformArticlePublishMapper
- class AdPlatformArticlePublishTask(AdPlatformArticlePublishConst):
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- self.mapper = AdPlatformArticlePublishMapper(pool)
- self.tool = AdPlatformArticlePublishUtils()
- self.log_service = log_service
- async def _process_single_plan(self, plan: str):
- category = self.CATEGORY_MAP[plan]
- id_list: list = []
- try:
- # fetch candidate articles
- candidate_articles = await self.mapper.fetch_candidate_articles(
- category=category
- )
- if not candidate_articles:
- return
- id_list = [article["id"] for article in candidate_articles]
- # update cold start status
- await self.mapper.update_cold_start_status(
- id_list=id_list,
- ori_status=self.ColdStartStatus.INIT,
- new_status=self.ColdStartStatus.PROCESSING,
- )
- # start process
- plan_response = await self.tool.create_crawler_plan(
- category=category, article_list=candidate_articles
- )
- # save to db
- plan_info = self.tool.process_plan_info(plan_response)
- await self.mapper.record_crawler_plan(plan_info)
- # bind to generate plan
- response = await self.tool.bind_to_generate_task(plan, plan_response)
- await self.log_service.log(
- contents={
- "task": "article_pool_cold_start",
- "status": "success",
- "message": "绑定至生成计划成功",
- "data": response,
- }
- )
- # 修改状态
- await self.mapper.update_cold_start_status(
- id_list=id_list,
- ori_status=self.ColdStartStatus.PROCESSING,
- new_status=self.ColdStartStatus.DONE,
- )
- except Exception as e:
- # 回滚状态:若已更新为 PROCESSING 则改为 FAILED,便于重试或排查
- if id_list:
- try:
- await self.mapper.update_cold_start_status(
- id_list=id_list,
- ori_status=self.ColdStartStatus.PROCESSING,
- new_status=self.ColdStartStatus.FAILED,
- )
- except Exception as rollback_err:
- await self.log_service.log(
- contents={
- "task": "ad_platform_article_cold_start",
- "status": "fail",
- "message": "回滚冷启状态失败",
- "rollback_error": str(rollback_err),
- }
- )
- await self.log_service.log(
- contents={
- "task": "ad_platform_article_cold_start",
- "status": "fail",
- "plan": plan,
- "category": category,
- "error": str(e),
- "traceback": traceback.format_exc(),
- }
- )
- await self.tool.alert(
- title="冷启动任务异常",
- detail={
- "plan": plan,
- "category": category,
- "error": str(e),
- "traceback": traceback.format_exc(),
- },
- )
- async def deal(self):
- """函数入口"""
- try:
- for plan in self.PLAN_LIST:
- await self._process_single_plan(plan)
- except Exception as e:
- await self.log_service.log(
- contents={
- "task": "ad_platform_article_cold_start",
- "status": "fail",
- "message": "deal 入口异常",
- "error": str(e),
- "traceback": traceback.format_exc(),
- }
- )
- await self.tool.alert(
- title="互选平台文章冷启动任务-入口异常",
- detail={
- "error": str(e),
- "traceback": traceback.format_exc(),
- },
- )
- raise
- __all__ = ["AdPlatformArticlePublishTask"]
|