浏览代码

Merge branch 'dev' of Server/LongArticleTaskServer into master

luojunhui 4 周之前
父节点
当前提交
71b9803713

+ 46 - 19
applications/tasks/llm_tasks/candidate_account_process.py

@@ -1,9 +1,10 @@
 import json
 import traceback
-from typing import List, Dict
-from tqdm import tqdm
+from typing import List, Dict, Optional
+from tqdm.asyncio import tqdm
 
 from applications.api import fetch_deepseek_completion
+from applications.api import feishu_robot
 from applications.utils import ci_lower
 
 
@@ -64,6 +65,10 @@ class CandidateAccountProcessConst:
 
 
 class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
+    """
+    description: 对候选账号池内的账号进行质量分析
+                对于满足质量的账号,添加到抓取账号表里面
+    """
     def __init__(self, pool, log_client, trace_id):
         self.pool = pool
         self.log_client = log_client
@@ -73,28 +78,30 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
         """
         get account tasks from the database
         """
-        fetch_query = f"""
+        query = """
             select id, title_list, platform, account_id, account_name
             from crawler_candidate_account_pool
-            where avg_score is null and status = {self.INIT_STATUS} and title_list is not null;
+            where avg_score is null and status = %s and title_list is not null;
         """
-        fetch_response = await self.pool.async_fetch(
-            fetch_query,
+        response = await self.pool.async_fetch(query=query, params=(self.INIT_STATUS, ))
+        await self.log_client.log(
+            contents={
+                "trace_id": self.trace_id,
+                "message": f"获取账号数量: {len(response)}",
+            }
         )
-        return fetch_response
+        return response
 
     async def update_account_status(
         self, account_id: int, ori_status: int, new_status: int
     ) -> int:
         """update account status"""
-        update_query = f"""
-                update crawler_candidate_account_pool
-                set status = %s
-                where id = %s and status = %s;
-            """
-        return await self.pool.async_save(
-            update_query, (new_status, account_id, ori_status)
-        )
+        query = """
+            update crawler_candidate_account_pool
+            set status = %s
+            where id = %s and status = %s;
+        """
+        return await self.pool.async_save(query, (new_status, account_id, ori_status))
 
     async def insert_account_into_crawler_queue(
         self, score_list: List[int], account: dict
@@ -103,9 +110,9 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
         计算账号的得分置信区间下限,若置信区间下限的分数大于阈值,则认为是好的账号
         """
         if ci_lower(score_list) > self.AVG_SCORE_THRESHOLD:
-            query = f"""
-                insert into article_meta_accounts (platform, account_id, account_name, account_source, status)
-                values (%s, %s, %s, %s, %s);
+            query = """
+                insert into article_meta_accounts (platform, account_id, account_name, account_source, status, trace_id)
+                values (%s, %s, %s, %s, %s, %s);
             """
             await self.pool.async_save(
                 query=query,
@@ -115,6 +122,7 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
                     account["account_name"],
                     "ai_recognize",
                     self.ACCOUNT_GOOD_STATUS,
+                    self.trace_id,
                 ),
             )
 
@@ -151,7 +159,7 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
                 model="DeepSeek-V3", prompt=prompt, output_type="json"
             )
             avg_score = sum(completion) / len(completion)
-            query = f"""
+            query = """
                 update crawler_candidate_account_pool
                 set score_list = %s, avg_score = %s, status = %s
                 where id = %s and status = %s;
@@ -189,6 +197,13 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
                 account_id, self.PROCESSING_STATUS, self.FAILED_STATUS
             )
 
+    async def get_task_execute_detail(self) -> Optional[Dict]:
+        query = """
+            select count(1) as new_mining_account from article_meta_accounts
+            where trace_id = %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(self.trace_id,))
+
     async def deal(self):
         task_list = await self.get_task_list()
         for task in tqdm(task_list, desc="use llm to analysis each account"):
@@ -208,3 +223,15 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
                         },
                     }
                 )
+
+        # analysis
+        execute_response = await self.get_task_execute_detail()
+        detail = {
+            "total_execute_acounts": len(task_list),
+            "new_mining_account": execute_response[0]["new_mining_account"],
+        }
+        await feishu_robot.bot(
+            title="执行账号质量分析任务",
+            detail=detail,
+            mention=False
+        )

+ 5 - 7
applications/tasks/llm_tasks/process_title.py

@@ -441,7 +441,7 @@ class ArticlePoolCategoryGeneration(TitleProcess):
             limit = self.PROCESS_NUM
 
         task_list = await self.get_task_list(limit=limit)
-        print(task_list)
+        print(type(task_list))
         await self.aliyun_log.log(
             contents={
                 "task": "ArticlePoolCategoryGeneration",
@@ -450,6 +450,7 @@ class ArticlePoolCategoryGeneration(TitleProcess):
                 "message": f"总共获取{len(task_list)}条文章",
             }
         )
+        print(task_list)
         task_batch_list = yield_batch(data=task_list, batch_size=self.BATCH_SIZE)
         batch_index = 0
         for task_batch in task_batch_list:
@@ -482,12 +483,9 @@ class TitleRewrite(TitleProcess):
         query = f"""
             select id, title_rewrite_status_update_timestamp
             from publish_single_video_source
-            where title_rewrite_status = {self.TITLE_REWRITE_LOCK_STATUS};
+            where title_rewrite_status = %s;
         """
-        article_list = await self.pool.async_fetch(
-            query=query,
-            db_name="long_articles",
-        )
+        article_list = await self.pool.async_fetch(query=query, params=(self.TITLE_REWRITE_LOCK_STATUS, ))
         if article_list:
             blocked_id_list = [
                 i["id"]
@@ -496,7 +494,7 @@ class TitleRewrite(TitleProcess):
                 > self.TITLE_REWRITE_LOCK_TIME
             ]
             if blocked_id_list:
-                update_query = f"""
+                update_query = """
                     update publish_single_video_source
                     set title_rewrite_status = %s
                     where id in %s and title_rewrite_status = %s;

+ 1 - 1
applications/tasks/task_handler.py

@@ -58,7 +58,7 @@ class TaskHandler(TaskMapper):
         return await sub_task.deal()
 
     async def _title_rewrite_handler(self):
-        sub_task = TitleRewrite(self.db_client, self.log_client)
+        sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id)
         return await sub_task.deal()
 
     async def _update_root_source_id_handler(self) -> int:

+ 0 - 14
dev.py

@@ -1,14 +0,0 @@
-import asyncio
-
-from applications.api import fetch_deepseek_completion
-
-
-prompt = "你好"
-
-res = fetch_deepseek_completion(model="defa", prompt=prompt)
-
-print(res)
-"""
-curl -X POST http://127.0.0.1:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "crawler_gzh_articles", "account_method": "search", "crawl_mode": "search", "strategy": "V1"}'
-
-"""

+ 14 - 0
run_dev_server.sh

@@ -0,0 +1,14 @@
+#!/usr/bin/env bash
+set -e
+
+# 切到脚本所在目录(确保相对路径正确)
+cd "$(dirname "$0")"
+
+# 启动 Hypercorn 服务
+hypercorn task_app:app \
+  --reload \
+  --bind 0.0.0.0:6062 \
+  --workers 1 \
+  --keep-alive 120 \
+  --graceful-timeout 30 \
+  --log-level info