entrance.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import traceback
  2. from app.core.database import DatabaseManager
  3. from app.core.observability import LogService
  4. from app.infra.external import feishu_robot
  5. from ._const import AdPlatformArticleColdStartConst
  6. from ._utils import AdPlatformArticleColdStartUtils
  7. from ._mapper import AdPlatformArticleColdStartMapper
  8. class AdPlatformArticleColdStartTask(AdPlatformArticleColdStartConst):
  9. def __init__(self, pool: DatabaseManager, log_service: LogService):
  10. self.mapper = AdPlatformArticleColdStartMapper(pool)
  11. self.tool = AdPlatformArticleColdStartUtils()
  12. self.log_service = log_service
  13. async def _process_single_plan(self, plan: str):
  14. category = self.CATEGORY_MAP[plan]
  15. id_list: list = []
  16. try:
  17. # fetch candidate articles
  18. candidate_articles = await self.mapper.fetch_candidate_articles(
  19. category=category
  20. )
  21. if not candidate_articles:
  22. return
  23. id_list = [article["id"] for article in candidate_articles]
  24. # update cold start status
  25. await self.mapper.update_cold_start_status(
  26. id_list=id_list,
  27. ori_status=self.ColdStartStatus.INIT,
  28. new_status=self.ColdStartStatus.PROCESSING,
  29. )
  30. # start process
  31. plan_response = await self.tool.create_crawler_plan(
  32. category=category, article_list=candidate_articles
  33. )
  34. print(plan_response)
  35. # save to db
  36. plan_info = self.tool.process_plan_info(plan_response)
  37. await self.mapper.record_crawler_plan(plan_info)
  38. # bind to generate plan
  39. response = await self.tool.bind_to_generate_task(plan, plan_response)
  40. await self.log_service.log(
  41. contents={
  42. "task": "article_pool_cold_start",
  43. "status": "success",
  44. "message": "绑定至生成计划成功",
  45. "data": response,
  46. }
  47. )
  48. # 修改状态
  49. await self.mapper.update_cold_start_status(
  50. id_list=id_list,
  51. ori_status=self.ColdStartStatus.PROCESSING,
  52. new_status=self.ColdStartStatus.DONE,
  53. )
  54. except Exception as e:
  55. # 回滚状态:若已更新为 PROCESSING 则改为 FAILED,便于重试或排查
  56. if id_list:
  57. try:
  58. await self.mapper.update_cold_start_status(
  59. id_list=id_list,
  60. ori_status=self.ColdStartStatus.PROCESSING,
  61. new_status=self.ColdStartStatus.FAILED,
  62. )
  63. except Exception as rollback_err:
  64. await self.log_service.log(
  65. contents={
  66. "task": "ad_platform_article_cold_start",
  67. "status": "error",
  68. "message": "回滚冷启状态失败",
  69. "rollback_error": str(rollback_err),
  70. }
  71. )
  72. await self.log_service.log(
  73. contents={
  74. "task": "ad_platform_article_cold_start",
  75. "status": "error",
  76. "plan": plan,
  77. "category": category,
  78. "error": str(e),
  79. "traceback": traceback.format_exc(),
  80. }
  81. )
  82. await feishu_robot.bot(
  83. title="互选平台文章冷启动任务异常",
  84. detail={
  85. "plan": plan,
  86. "category": category,
  87. "error": str(e),
  88. "traceback": traceback.format_exc(),
  89. },
  90. mention=False,
  91. )
  92. async def deal(self):
  93. """函数入口"""
  94. try:
  95. for plan in self.PLAN_LIST:
  96. await self._process_single_plan(plan)
  97. except Exception as e:
  98. await self.log_service.log(
  99. contents={
  100. "task": "ad_platform_article_cold_start",
  101. "status": "error",
  102. "message": "deal 入口异常",
  103. "error": str(e),
  104. "traceback": traceback.format_exc(),
  105. }
  106. )
  107. await feishu_robot.bot(
  108. title="互选平台文章冷启动任务-入口异常",
  109. detail={
  110. "error": str(e),
  111. "traceback": traceback.format_exc(),
  112. },
  113. mention=False,
  114. )
  115. raise
  116. __all__ = ["AdPlatformArticleColdStartTask"]