""" use llm function to recognize the account information """ import json from pymysql.cursors import DictCursor from tqdm import tqdm from applications.api import fetch_deepseek_response from applications.db import DatabaseConnector from config import long_articles_config def generate_prompt(account_title_list): """ 生成prompt :param account_title_list: """ title_list = '\n'.join(account_title_list) g_prompt = f""" ** 任务指令 ** 你是一名资深中文新闻编辑,需根据以下标准对一批标题进行主题匹配度评分(0-100分) ** 评估维度及权重 ** 1. 受众精准度(50%) 中老年群体:涉及存款/养老/健康/饮食/疾病警示/家庭伦理/近现代战争历史/老知青/奇闻异事 2. 标题技法(40%) 悬念设计:疑问句/省略号/反转结构(例:"打开后瞬间愣住...") 情感强度:使用"痛心!""寒心!"等情绪词 数据冲击:具体数字增强可信度(例:"存款180万消失") 口语化表达:使用"涨知识了""别不当回事"等日常用语 3. 内容调性(10%) 煽情猎奇:家庭悲剧/离奇事件(例:"棺材板挖出金条") 警示价值:健康建议/法律案例(例:"三种食物禁止二次加热") 历史揭秘:人物秘闻/老照片故事 爱国情怀:军事突破/资源发现(例:"南极发现巨型粮仓") ** 评分规则 ** 90-100分:同时满足3个维度且要素齐全 70-89分:满足2个核心维度 50-69分:仅满足受众群体 30-49分:存在关联但要素缺失 0-29分:完全无关 ** 待评估标题 ** {title_list} ** 输出要求 ** 仅输出这一批标题的评分,用数组 List 返回 [score1, score2, score3,...] 不要包含任何解释或说明。 """ return g_prompt class AccountRecognizer: def __init__(self): self.db_client = DatabaseConnector(long_articles_config) self.db_client.connect() def get_task_list(self): """ get account task from database """ fetch_query = f""" select id, title_list from crawler_candidate_account_pool where avg_score is null and status = 0 and title_list is not null; """ fetch_response = self.db_client.fetch(fetch_query, cursor_type=DictCursor) return fetch_response def update_task_status(self, task_id, ori_status, new_status): """ update task status """ update_query = f""" update crawler_candidate_account_pool set status = %s where id = %s and status = %s; """ self.db_client.save(update_query, (new_status, task_id, ori_status)) def recognize_each_account(self, account): """ recognize each account """ task_id = account['id'] # lock task self.update_task_status(task_id, 0, 1) # process title_list = json.loads(account["title_list"]) if len(title_list) < 15: # 账号数量不足,直接跳过 print("bad account, skip") self.update_task_status(task_id, 1, 11) return prompt = generate_prompt(title_list) response = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt) response_score_str = response.strip() try: score_list = json.loads(response_score_str) avg_score = sum(score_list) / len(score_list) except Exception as e: score_list = [] avg_score = 0 if score_list and avg_score: update_query = f""" update crawler_candidate_account_pool set score_list = %s, avg_score = %s, status = %s where id = %s and status = %s; """ self.db_client.save(update_query, (json.dumps(score_list), avg_score, 2, task_id, 1)) else: self.update_task_status(task_id, 1, 12) def deal(self): task_list = self.get_task_list() for task in tqdm(task_list): try: self.recognize_each_account(task) except Exception as e: print(e) self.update_task_status(task['id'], 1, 13)