Explorar o código

账号质量分析

luojunhui hai 4 semanas
pai
achega
e2206e4400
Modificáronse 1 ficheiros con 46 adicións e 19 borrados
  1. 46 19
      applications/tasks/llm_tasks/candidate_account_process.py

+ 46 - 19
applications/tasks/llm_tasks/candidate_account_process.py

@@ -1,9 +1,10 @@
 import json
 import traceback
-from typing import List, Dict
-from tqdm import tqdm
+from typing import List, Dict, Optional
+from tqdm.asyncio import tqdm
 
 from applications.api import fetch_deepseek_completion
+from applications.api import feishu_robot
 from applications.utils import ci_lower
 
 
@@ -64,6 +65,10 @@ class CandidateAccountProcessConst:
 
 
 class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
+    """
+    description: 对候选账号池内的账号进行质量分析
+                对于满足质量的账号,添加到抓取账号表里面
+    """
     def __init__(self, pool, log_client, trace_id):
         self.pool = pool
         self.log_client = log_client
@@ -73,28 +78,30 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
         """
         get account tasks from the database
         """
-        fetch_query = f"""
+        query = """
             select id, title_list, platform, account_id, account_name
             from crawler_candidate_account_pool
-            where avg_score is null and status = {self.INIT_STATUS} and title_list is not null;
+            where avg_score is null and status = %s and title_list is not null;
         """
-        fetch_response = await self.pool.async_fetch(
-            fetch_query,
+        response = await self.pool.async_fetch(query=query, params=(self.INIT_STATUS, ))
+        await self.log_client.log(
+            contents={
+                "trace_id": self.trace_id,
+                "message": f"获取账号数量: {len(response)}",
+            }
         )
-        return fetch_response
+        return response
 
     async def update_account_status(
         self, account_id: int, ori_status: int, new_status: int
     ) -> int:
         """update account status"""
-        update_query = f"""
-                update crawler_candidate_account_pool
-                set status = %s
-                where id = %s and status = %s;
-            """
-        return await self.pool.async_save(
-            update_query, (new_status, account_id, ori_status)
-        )
+        query = """
+            update crawler_candidate_account_pool
+            set status = %s
+            where id = %s and status = %s;
+        """
+        return await self.pool.async_save(query, (new_status, account_id, ori_status))
 
     async def insert_account_into_crawler_queue(
         self, score_list: List[int], account: dict
@@ -103,9 +110,9 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
         计算账号的得分置信区间下限,若置信区间下限的分数大于阈值,则认为是好的账号
         """
         if ci_lower(score_list) > self.AVG_SCORE_THRESHOLD:
-            query = f"""
-                insert into article_meta_accounts (platform, account_id, account_name, account_source, status)
-                values (%s, %s, %s, %s, %s);
+            query = """
+                insert into article_meta_accounts (platform, account_id, account_name, account_source, status, trace_id)
+                values (%s, %s, %s, %s, %s, %s);
             """
             await self.pool.async_save(
                 query=query,
@@ -115,6 +122,7 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
                     account["account_name"],
                     "ai_recognize",
                     self.ACCOUNT_GOOD_STATUS,
+                    self.trace_id,
                 ),
             )
 
@@ -151,7 +159,7 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
                 model="DeepSeek-V3", prompt=prompt, output_type="json"
             )
             avg_score = sum(completion) / len(completion)
-            query = f"""
+            query = """
                 update crawler_candidate_account_pool
                 set score_list = %s, avg_score = %s, status = %s
                 where id = %s and status = %s;
@@ -189,6 +197,13 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
                 account_id, self.PROCESSING_STATUS, self.FAILED_STATUS
             )
 
+    async def get_task_execute_detail(self) -> Optional[Dict]:
+        query = """
+            select count(1) as new_mining_account from article_meta_accounts
+            where trace_id = %s;
+        """
+        return await self.pool.async_fetch(query, (self.trace_id,))
+
     async def deal(self):
         task_list = await self.get_task_list()
         for task in tqdm(task_list, desc="use llm to analysis each account"):
@@ -208,3 +223,15 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
                         },
                     }
                 )
+
+        # analysis
+        execute_response = await self.get_task_execute_detail()
+        detail = {
+            "total_execute_acounts": len(task_list),
+            "new_mining_account": execute_response[0]["new_mining_account"],
+        }
+        await feishu_robot.bot(
+            title="执行账号质量分析任务",
+            detail=detail,
+            mention=False
+        )