Browse Source

新增处理标题接口

luojunhui 1 month ago
parent
commit
92b6839216

+ 27 - 5
applications/tasks/llm_tasks/process_title.py

@@ -38,6 +38,8 @@ class Const:
 
     BATCH_SIZE = 20
 
+    PROCESS_NUM = 1000
+
 
 class TitleProcess(Const):
     def __init__(self, pool, aliyun_log, trace_id):
@@ -469,7 +471,7 @@ class ArticlePoolCategoryGeneration(TitleProcess):
             ),
         )
 
-    async def get_task_list(self, limit=1000):
+    async def get_task_list(self, limit):
         query = f"""
             select article_id, title from long_articles.crawler_meta_article
             where category_status = %s and status = %s and score > %s
@@ -564,14 +566,34 @@ class ArticlePoolCategoryGeneration(TitleProcess):
         else:
             return
 
-    async def deal(self):
-
+    async def deal(self, limit):
         await self._roll_back_lock_tasks(table_name="crawler_meta_article")
 
-        task_list = await self.get_task_list()
+        if not limit:
+            limit = self.PROCESS_NUM
+
+        task_list = await self.get_task_list(limit=limit)
         task_batch_list = yield_batch(data=task_list, batch_size=self.BATCH_SIZE)
+        batch_index = 0
         for task_batch in task_batch_list:
-            await self.process_each_batch(task_batch)
+            batch_index += 1
+            try:
+                await self.process_each_batch(task_batch)
+
+            except Exception as e:
+                await self.aliyun_log.log(
+                    contents={
+                        "task": "ArticlePoolCategoryGeneration",
+                        "function": "deal",
+                        "message": f"batch {batch_index} processed failed",
+                        "status": "fail",
+                        "trace_id": self.trace_id,
+                        "data": {
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                        }
+                    }
+                )
 
 
 

+ 9 - 0
applications/tasks/task_handler.py

@@ -6,6 +6,7 @@ from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTas
 from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
 from applications.tasks.llm_tasks import TitleRewrite
+from applications.tasks.llm_tasks import ArticlePoolCategoryGeneration
 from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
 from applications.tasks.monitor_tasks import check_kimi_balance
 from applications.tasks.monitor_tasks import GetOffVideos
@@ -110,3 +111,11 @@ class TaskHandler(TaskMapper):
         )
         await task.deal()
         return self.TASK_SUCCESS_STATUS
+
+    async def _article_pool_category_generation_handler(self) -> int:
+        task = ArticlePoolCategoryGeneration(
+            self.db_client, self.log_client, self.trace_id
+        )
+        limit_num = self.data.get("limit")
+        await task.deal(limit=limit_num)
+        return self.TASK_SUCCESS_STATUS

+ 2 - 0
applications/tasks/task_scheduler.py

@@ -179,6 +179,8 @@ class TaskScheduler(TaskHandler):
             "task_processing_monitor": self._task_processing_monitor_handler,
             # 候选账号质量分析
             "candidate_account_quality_analysis": self._candidate_account_quality_score_handler,
+            # 文章内容池--标题品类处理
+            "article_pool_category_generation": self._article_pool_category_generation_handler,
         }
 
         if task_name not in handlers: