123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- """
- use llm function to recognize the account information
- """
- import json
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from threading import local
- import concurrent
- from concurrent.futures import ThreadPoolExecutor
- from applications.api import fetch_deepseek_response
- from applications.db import DatabaseConnector
- from config import long_articles_config
- thread_local = local()
- def generate_prompt(account_title_list):
- """
- 生成prompt
- :param account_title_list:
- """
- title_list = "\n".join(account_title_list)
- g_prompt = f"""
- ** 任务指令 **
- 你是一名资深中文新闻编辑,需根据以下标准对一批标题进行主题匹配度评分(0-100分)
-
- ** 评估维度及权重 **
- 1. 受众精准度(50%)
- 正向匹配:存款/养老/健康/饮食/疾病警示/家庭伦理/近现代战争历史/老知青/奇闻异事
- 负向排除:影视解说/文学解读/个人收藏(钱币/邮票)/机械科普/数码测评/电子游戏/时尚潮流/明星八卦/极限运动/学术研究/网络热梗/宠物饲养
-
- 2. 标题技法(40%)
- 悬念设计:疑问句/省略号/反转结构(例:"打开后瞬间愣住...")
- 情感强度:使用"痛心!""寒心!"等情绪词
- 数据冲击:具体数字增强可信度(例:"存款180万消失")
- 口语化表达:使用"涨知识了""别不当回事"等日常用语
-
- 3. 内容调性(10%)
- 煽情猎奇:家庭悲剧/离奇事件(例:"棺材板挖出金条")
- 警示价值:健康建议/法律案例(例:"三种食物禁止二次加热")
- 历史揭秘:人物秘闻/老照片故事
- 爱国情怀:军事突破/资源发现(例:"南极发现巨型粮仓")
- ** 评分规则 **
- 90-100分:同时满足3个维度且要素齐全,无负向内容
- 70-89分:满足2个核心维度,无负向内容
- 50-69分:仅满足受众群体正向匹配,无负向内容
- 30-49分:存在轻微关联但要素缺失
- 0-29分:完全无关或包含任意负向品类内容
-
- ** 待评估标题 **
- {title_list}
-
- ** 输出要求 **
- 仅输出这一批标题的评分,用数组 List 返回 [score1, score2, score3,...] 不要包含任何解释或说明。
- """
- return g_prompt
- def get_db_client():
- """
- each thread get it's own db client
- """
- if not hasattr(thread_local, "db_client"):
- thread_local.db_client = DatabaseConnector(long_articles_config)
- thread_local.db_client.connect()
- return thread_local.db_client
- def update_task_status(thread_db_client, task_id, ori_status, new_status):
- """
- update task status
- """
- update_query = f"""
- update crawler_candidate_account_pool
- set status = %s
- where id = %s and status = %s;
- """
- thread_db_client.save(update_query, (new_status, task_id, ori_status))
- def recognize_each_account(thread_db_client, account):
- """
- recognize each account
- """
- task_id = account["id"]
- # lock task
- update_task_status(thread_db_client, task_id, 0, 1)
- # process
- title_list = json.loads(account["title_list"])
- if len(title_list) < 15:
- # 账号数量不足,直接跳过
- print("bad account, skip")
- update_task_status(thread_db_client, task_id, 1, 11)
- return
- prompt = generate_prompt(title_list)
- response = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
- response_score_str = response.strip()
- try:
- score_list = json.loads(response_score_str)
- avg_score = sum(score_list) / len(score_list)
- except Exception as e:
- score_list = []
- avg_score = 0
- if score_list and avg_score:
- update_query = f"""
- update crawler_candidate_account_pool
- set score_list = %s, avg_score = %s, status = %s
- where id = %s and status = %s;
- """
- thread_db_client.save(
- update_query, (json.dumps(score_list), avg_score, 2, task_id, 1)
- )
- else:
- update_task_status(thread_db_client, task_id, 1, 12)
- def recognize_task_thread(task):
- """
- recognize thread
- """
- thread_db_client = get_db_client()
- try:
- recognize_each_account(thread_db_client, task)
- except Exception as e:
- print(e)
- update_task_status(
- thread_db_client=thread_db_client,
- task_id=["id"],
- ori_status=1,
- new_status=13,
- )
- class AccountRecognizer:
- def __init__(self):
- self.db_client = DatabaseConnector(long_articles_config)
- self.db_client.connect()
- def get_task_list(self):
- """
- get account task from database
- """
- fetch_query = f"""
- select id, title_list from crawler_candidate_account_pool
- where avg_score is null and status = 0 and title_list is not null;
- """
- fetch_response = self.db_client.fetch(fetch_query, cursor_type=DictCursor)
- return fetch_response
- def deal(self):
- task_list = self.get_task_list()
- with ThreadPoolExecutor(max_workers=8) as executor:
- futures = [
- executor.submit(recognize_task_thread, task) for task in task_list
- ]
- for future in tqdm(
- concurrent.futures.as_completed(futures),
- total=len(task_list),
- desc="处理进度",
- ):
- future.result()
|