Pārlūkot izejas kodu

account crawler

luojunhui 3 nedēļas atpakaļ
vecāks
revīzija
5c1ced70d2
3 mainītis faili ar 132 papildinājumiem un 75 dzēšanām
  1. 7 9
      account_crawler_task.py
  2. 13 0
      account_quality_analysis.py
  3. 112 66
      tasks/account_recognize_by_llm.py

+ 7 - 9
account_crawler_task.py

@@ -13,7 +13,7 @@ from tasks.crawler_accounts_by_association import HaoKanAccountCrawler
 document_token = "BGQCsOXwHhVRq5tswjgcI8NInqd"
 toutiao_sheet_id = "pIJSt7"
 channels_sheet_id = "ee0163"
-haokan_sheet_id = 'tfftfD'
+haokan_sheet_id = "tfftfD"
 
 
 def insert_data_into_feishu_sheet(platform: str, data_list: list[list[str]]) -> None:
@@ -32,7 +32,7 @@ def insert_data_into_feishu_sheet(platform: str, data_list: list[list[str]]) ->
             sheet_id = toutiao_sheet_id
         case "sph":
             sheet_id = channels_sheet_id
-        case 'hksp':
+        case "hksp":
             sheet_id = haokan_sheet_id
         case _:
             raise RuntimeError("platform error")
@@ -50,6 +50,7 @@ def insert_data_into_feishu_sheet(platform: str, data_list: list[list[str]]) ->
         ranges="A2:J{}".format(2 + len(video_array)),
     )
 
+
 def deal_each_platform(platform: str) -> None:
     """
     deal each platform
@@ -68,11 +69,8 @@ def deal_each_platform(platform: str) -> None:
     # start process
     crawler.deal()
 
-if __name__ == "__main__":
-    # platform_list = ["sph", "hksp", "toutiao"]
-    # for platform_id in platform_list:
-    #     deal_each_platform(platform=platform_id)
-    HaoKanAccountCrawler().deal()
-
-
 
+if __name__ == "__main__":
+    platform_list = ["sph", "hksp", "toutiao"]
+    for platform_id in platform_list:
+        deal_each_platform(platform=platform_id)

+ 13 - 0
account_quality_analysis.py

@@ -0,0 +1,13 @@
+from tasks.account_recognize_by_llm import AccountRecognizer
+
+
+def main():
+    """
+    main function
+    """
+    account_recognizer = AccountRecognizer()
+    account_recognizer.deal()
+
+
+if __name__ == "__main__":
+    main()

+ 112 - 66
tasks/account_recognize_by_llm.py

@@ -1,49 +1,143 @@
 """
 use llm function to recognize the account information
 """
+
 import json
 from pymysql.cursors import DictCursor
 from tqdm import tqdm
+from threading import local
+import concurrent
+from concurrent.futures import ThreadPoolExecutor
 
 from applications.api import fetch_deepseek_response
 from applications.db import DatabaseConnector
 from config import long_articles_config
 
+thread_local = local()
+
+
 def generate_prompt(account_title_list):
     """
     生成prompt
     :param account_title_list:
     """
-    title_list = '\n'.join(account_title_list)
+    title_list = "\n".join(account_title_list)
     g_prompt = f"""
     ** 任务指令 **
         你是一名资深中文新闻编辑,需根据以下标准对一批标题进行主题匹配度评分(0-100分)
+        
     ** 评估维度及权重 **
         1. 受众精准度(50%)
-            中老年群体:涉及存款/养老/健康/饮食/疾病警示/家庭伦理/近现代战争历史/老知青/奇闻异事
+            正向匹配:存款/养老/健康/饮食/疾病警示/家庭伦理/近现代战争历史/老知青/奇闻异事
+            负向排除:影视解说/文学解读/个人收藏(钱币/邮票)/机械科普/数码测评/电子游戏/时尚潮流/明星八卦/极限运动/学术研究/网络热梗/宠物饲养
+            
         2. 标题技法(40%)
             悬念设计:疑问句/省略号/反转结构(例:"打开后瞬间愣住...")
             情感强度:使用"痛心!""寒心!"等情绪词
             数据冲击:具体数字增强可信度(例:"存款180万消失")
             口语化表达:使用"涨知识了""别不当回事"等日常用语
+            
         3. 内容调性(10%)
             煽情猎奇:家庭悲剧/离奇事件(例:"棺材板挖出金条")
             警示价值:健康建议/法律案例(例:"三种食物禁止二次加热")
             历史揭秘:人物秘闻/老照片故事
             爱国情怀:军事突破/资源发现(例:"南极发现巨型粮仓")
+
     ** 评分规则 **
-        90-100分:同时满足3个维度且要素齐全
-        70-89分:满足2个核心维度
-        50-69分:仅满足受众群体
-        30-49分:存在关联但要素缺失
-        0-29分:完全无关
+        90-100分:同时满足3个维度且要素齐全,无负向内容
+        70-89分:满足2个核心维度,无负向内容
+        50-69分:仅满足受众群体正向匹配,无负向内容
+        30-49分:存在轻微关联但要素缺失
+        0-29分:完全无关或包含任意负向品类内容
+        
     ** 待评估标题 **
-    {title_list}
+        {title_list}
+        
     ** 输出要求 **
         仅输出这一批标题的评分,用数组 List 返回 [score1, score2, score3,...] 不要包含任何解释或说明。
     """
     return g_prompt
 
+
+def get_db_client():
+    """
+    each thread get it's own db client
+    """
+    if not hasattr(thread_local, "db_client"):
+        thread_local.db_client = DatabaseConnector(long_articles_config)
+        thread_local.db_client.connect()
+    return thread_local.db_client
+
+
+def update_task_status(thread_db_client, task_id, ori_status, new_status):
+    """
+    update task status
+    """
+    update_query = f"""
+        update crawler_candidate_account_pool
+        set status = %s
+        where id = %s and status = %s;  
+    """
+    thread_db_client.save(update_query, (new_status, task_id, ori_status))
+
+
+def recognize_each_account(thread_db_client, account):
+    """
+    recognize each account
+    """
+    task_id = account["id"]
+    # lock task
+    update_task_status(thread_db_client, task_id, 0, 1)
+
+    # process
+    title_list = json.loads(account["title_list"])
+    if len(title_list) < 15:
+        # 账号数量不足,直接跳过
+        print("bad account, skip")
+        update_task_status(thread_db_client, task_id, 1, 11)
+        return
+
+    prompt = generate_prompt(title_list)
+    response = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
+    response_score_str = response.strip()
+    try:
+        score_list = json.loads(response_score_str)
+        avg_score = sum(score_list) / len(score_list)
+
+    except Exception as e:
+        score_list = []
+        avg_score = 0
+
+    if score_list and avg_score:
+        update_query = f"""
+            update crawler_candidate_account_pool
+            set score_list = %s, avg_score = %s, status = %s
+            where id = %s and status = %s;
+        """
+        thread_db_client.save(
+            update_query, (json.dumps(score_list), avg_score, 2, task_id, 1)
+        )
+    else:
+        update_task_status(thread_db_client, task_id, 1, 12)
+
+
+def recognize_task_thread(task):
+    """
+    recognize thread
+    """
+    thread_db_client = get_db_client()
+    try:
+        recognize_each_account(thread_db_client, task)
+    except Exception as e:
+        print(e)
+        update_task_status(
+            thread_db_client=thread_db_client,
+            task_id=["id"],
+            ori_status=1,
+            new_status=13,
+        )
+
+
 class AccountRecognizer:
     def __init__(self):
         self.db_client = DatabaseConnector(long_articles_config)
@@ -60,64 +154,16 @@ class AccountRecognizer:
         fetch_response = self.db_client.fetch(fetch_query, cursor_type=DictCursor)
         return fetch_response
 
-    def update_task_status(self, task_id, ori_status, new_status):
-        """
-        update task status
-        """
-        update_query = f"""
-            update crawler_candidate_account_pool
-            set status = %s
-            where id = %s and status = %s;  
-        """
-        self.db_client.save(update_query, (new_status, task_id, ori_status))
-
-    def recognize_each_account(self, account):
-        """
-        recognize each account
-        """
-        task_id = account['id']
-        # lock task
-        self.update_task_status(task_id, 0, 1)
-
-        # process
-        title_list = json.loads(account["title_list"])
-        if len(title_list) < 15:
-            # 账号数量不足,直接跳过
-            print("bad account, skip")
-            self.update_task_status(task_id, 1, 11)
-            return
-
-        prompt = generate_prompt(title_list)
-        response = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
-        response_score_str = response.strip()
-        try:
-            score_list = json.loads(response_score_str)
-            avg_score = sum(score_list) / len(score_list)
-
-        except Exception as e:
-            score_list = []
-            avg_score = 0
-
-        if score_list and avg_score:
-            update_query = f"""
-                update crawler_candidate_account_pool
-                set score_list = %s, avg_score = %s, status = %s
-                where id = %s and status = %s;
-            """
-            self.db_client.save(update_query, (json.dumps(score_list), avg_score, 2, task_id, 1))
-        else:
-            self.update_task_status(task_id, 1, 12)
-
     def deal(self):
         task_list = self.get_task_list()
-        for task in tqdm(task_list):
-            try:
-                self.recognize_each_account(task)
-            except Exception as e:
-                print(e)
-                self.update_task_status(task['id'], 1, 13)
-
-
-
-
 
+        with ThreadPoolExecutor(max_workers=8) as executor:
+            futures = [
+                executor.submit(recognize_task_thread, task) for task in task_list
+            ]
+            for future in tqdm(
+                concurrent.futures.as_completed(futures),
+                total=len(task_list),
+                desc="处理进度",
+            ):
+                future.result()