entrance.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. import traceback
  2. from app.core.database import DatabaseManager
  3. from app.core.observability import LogService
  4. from ._const import AdPlatformArticlePublishConst
  5. from ._utils import AdPlatformArticlePublishUtils
  6. from ._mapper import AdPlatformArticlePublishMapper
  7. class AdPlatformArticlePublishTask(AdPlatformArticlePublishConst):
  8. def __init__(self, pool: DatabaseManager, log_service: LogService):
  9. self.mapper = AdPlatformArticlePublishMapper(pool)
  10. self.tool = AdPlatformArticlePublishUtils()
  11. self.log_service = log_service
  12. async def _process_single_plan(self, plan: str):
  13. category = self.CATEGORY_MAP[plan]
  14. id_list: list = []
  15. try:
  16. # fetch candidate articles
  17. candidate_articles = await self.mapper.fetch_candidate_articles(
  18. category=category
  19. )
  20. if not candidate_articles:
  21. return
  22. id_list = [article["id"] for article in candidate_articles]
  23. # update cold start status
  24. await self.mapper.update_cold_start_status(
  25. id_list=id_list,
  26. ori_status=self.ColdStartStatus.INIT,
  27. new_status=self.ColdStartStatus.PROCESSING,
  28. )
  29. # start process
  30. plan_response = await self.tool.create_crawler_plan(
  31. category=category, article_list=candidate_articles
  32. )
  33. # save to db
  34. plan_info = self.tool.process_plan_info(plan_response)
  35. await self.mapper.record_crawler_plan(plan_info)
  36. # bind to generate plan
  37. response = await self.tool.bind_to_generate_task(plan, plan_response)
  38. await self.log_service.log(
  39. contents={
  40. "task": "article_pool_cold_start",
  41. "status": "success",
  42. "message": "绑定至生成计划成功",
  43. "data": response,
  44. }
  45. )
  46. # 修改状态
  47. await self.mapper.update_cold_start_status(
  48. id_list=id_list,
  49. ori_status=self.ColdStartStatus.PROCESSING,
  50. new_status=self.ColdStartStatus.DONE,
  51. )
  52. except Exception as e:
  53. # 回滚状态:若已更新为 PROCESSING 则改为 FAILED,便于重试或排查
  54. if id_list:
  55. try:
  56. await self.mapper.update_cold_start_status(
  57. id_list=id_list,
  58. ori_status=self.ColdStartStatus.PROCESSING,
  59. new_status=self.ColdStartStatus.FAILED,
  60. )
  61. except Exception as rollback_err:
  62. await self.log_service.log(
  63. contents={
  64. "task": "ad_platform_article_cold_start",
  65. "status": "fail",
  66. "message": "回滚冷启状态失败",
  67. "rollback_error": str(rollback_err),
  68. }
  69. )
  70. await self.log_service.log(
  71. contents={
  72. "task": "ad_platform_article_cold_start",
  73. "status": "fail",
  74. "plan": plan,
  75. "category": category,
  76. "error": str(e),
  77. "traceback": traceback.format_exc(),
  78. }
  79. )
  80. await self.tool.alert(
  81. title="冷启动任务异常",
  82. detail={
  83. "plan": plan,
  84. "category": category,
  85. "error": str(e),
  86. "traceback": traceback.format_exc(),
  87. },
  88. )
  89. async def deal(self):
  90. """函数入口"""
  91. try:
  92. for plan in self.PLAN_LIST:
  93. await self._process_single_plan(plan)
  94. except Exception as e:
  95. await self.log_service.log(
  96. contents={
  97. "task": "ad_platform_article_cold_start",
  98. "status": "fail",
  99. "message": "deal 入口异常",
  100. "error": str(e),
  101. "traceback": traceback.format_exc(),
  102. }
  103. )
  104. await self.tool.alert(
  105. title="互选平台文章冷启动任务-入口异常",
  106. detail={
  107. "error": str(e),
  108. "traceback": traceback.format_exc(),
  109. },
  110. )
  111. raise
  112. __all__ = ["AdPlatformArticlePublishTask"]