import json import traceback from typing import List, Dict from tqdm import tqdm from applications.api import fetch_deepseek_completion from applications.utils import ci_lower class CandidateAccountProcessConst: INIT_STATUS = 0 PROCESSING_STATUS = 1 SUCCESS_STATUS = 2 FAILED_STATUS = 99 LACK_ARTICLE_STATUS = 11 TITLE_TOO_LONG_STATUS = 14 AVG_SCORE_THRESHOLD = 65 ARTICLE_COUNT_THRESHOLD = 13 AVG_TITLE_LENGTH_THRESHOLD = 45 ACCOUNT_GOOD_STATUS = 1 @staticmethod def generate_title_match_score_prompt(title_list): title_list_string = "\n".join(title_list) prompt = f""" ** 任务指令 ** 你是一名资深中文新闻编辑,需根据以下标准对一批标题进行主题匹配度评分(0-100分) ** 评估维度及权重 ** 1. 受众精准度(50%) 正向匹配:存款/养老/健康/饮食/疾病警示/家庭伦理/近现代战争历史/老知青/奇闻异事 负向排除:影视解说/文学解读/个人收藏(钱币/邮票)/机械科普/数码测评/电子游戏/时尚潮流/明星八卦/极限运动/学术研究/网络热梗/宠物饲养/音乐/棋牌 2. 标题技法(40%) 悬念设计:疑问句/省略号/反转结构(例:"打开后瞬间愣住...") 情感强度:使用"痛心!""寒心!"等情绪词 数据冲击:具体数字增强可信度(例:"存款180万消失") 口语化表达:使用"涨知识了""别不当回事"等日常用语 3. 内容调性(10%) 煽情猎奇:家庭悲剧/离奇事件(例:"棺材板挖出金条") 警示价值:健康建议/法律案例(例:"三种食物禁止二次加热") 历史揭秘:人物秘闻/老照片故事 爱国情怀:军事突破/资源发现(例:"南极发现巨型粮仓") ** 评分规则 ** 90-100分:同时满足3个维度且要素齐全,无负向内容 70-89分:满足2个核心维度,无负向内容 50-69分:仅满足受众群体正向匹配,无负向内容 30-49分:存在轻微关联但要素缺失 0-29分:完全无关或包含任意负向品类内容 ** 待评估标题 ** {title_list_string} ** 输出要求 ** 输出结果为JSON,仅输出这一批标题的评分,用数组 List 返回 [score1, score2, score3,...] 不要包含任何解释或说明。 """ return prompt class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst): def __init__(self, pool, log_client, trace_id): self.pool = pool self.log_client = log_client self.trace_id = trace_id async def get_task_list(self) -> List[Dict]: """ get account tasks from the database """ fetch_query = f""" select id, title_list, platform, account_id, account_name from crawler_candidate_account_pool where avg_score is null and status = {self.INIT_STATUS} and title_list is not null; """ fetch_response = await self.pool.async_fetch( fetch_query, ) return fetch_response async def update_account_status( self, account_id: int, ori_status: int, new_status: int ) -> int: """update account status""" update_query = f""" update crawler_candidate_account_pool set status = %s where id = %s and status = %s; """ return await self.pool.async_save( update_query, (new_status, account_id, ori_status) ) async def insert_account_into_crawler_queue(self, score_list: List[int], account: dict) -> None: """ 计算账号的得分置信区间下限,若置信区间下限的分数大于阈值,则认为是好的账号 """ if ci_lower(score_list) > self.AVG_SCORE_THRESHOLD: query = f""" insert into article_meta_accounts (platform, account_id, account_name, account_source, status) values (%s, %s, %s, %s, %s); """ await self.pool.async_save( query=query, params=(account["platform"], account["account_id"], account["account_name"], 'ai_recognize', self.ACCOUNT_GOOD_STATUS) ) async def score_for_each_account_by_llm(self, account): account_id = account["id"] # lock if not await self.update_account_status( account_id, self.INIT_STATUS, self.PROCESSING_STATUS ): return # start processing title_list = json.loads(account["title_list"]) if ( len(title_list) < self.ARTICLE_COUNT_THRESHOLD and account["platform"] == "toutiao" ): await self.update_account_status( account_id, self.PROCESSING_STATUS, self.LACK_ARTICLE_STATUS ) return # 平均标题过长 avg_title_length = sum([len(title) for title in title_list]) / len(title_list) if avg_title_length > self.AVG_TITLE_LENGTH_THRESHOLD: await self.update_account_status( account_id, self.PROCESSING_STATUS, self.TITLE_TOO_LONG_STATUS ) return prompt = self.generate_title_match_score_prompt(title_list) try: completion = fetch_deepseek_completion( model="DeepSeek-V3", prompt=prompt, output_type="json" ) avg_score = sum(completion) / len(completion) query = f""" update crawler_candidate_account_pool set score_list = %s, avg_score = %s, status = %s where id = %s and status = %s; """ await self.pool.async_save( query=query, params=( json.dumps(completion), avg_score, self.PROCESSING_STATUS, account_id, self.SUCCESS_STATUS, ), ) # 判断置信区间下限, 并且插入账号 await self.insert_account_into_crawler_queue(score_list=completion, account=account) except Exception as e: await self.log_client.log( contents={ "task": "candidate_account_analysis", "trace_id": self.trace_id, "function": "score_for_each_account_by_llm", "message": "大模型识别账号失败", "status": "fail", "data": { "error": str(e), "title_list": json.dumps(title_list), }, } ) await self.update_account_status( account_id, self.PROCESSING_STATUS, self.FAILED_STATUS ) async def deal(self): task_list = await self.get_task_list() for task in tqdm(task_list, desc="use llm to analysis each account"): try: await self.score_for_each_account_by_llm(task) except Exception as e: await self.log_client.log( contents={ "task": "candidate_account_analysis", "trace_id": self.trace_id, "function": "deal", "status": "fail", "data": { "error": str(e), "traceback": traceback.format_exc(), "task": task, }, } )