123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- import json
- import traceback
- from typing import List, Dict, Optional
- from tqdm.asyncio import tqdm
- from applications.api import fetch_deepseek_completion
- from applications.api import feishu_robot
- from applications.utils import ci_lower
- class CandidateAccountProcessConst:
- INIT_STATUS = 0
- PROCESSING_STATUS = 1
- SUCCESS_STATUS = 2
- FAILED_STATUS = 99
- LACK_ARTICLE_STATUS = 11
- TITLE_TOO_LONG_STATUS = 14
- AVG_SCORE_THRESHOLD = 65
- ARTICLE_COUNT_THRESHOLD = 13
- AVG_TITLE_LENGTH_THRESHOLD = 45
- ACCOUNT_GOOD_STATUS = 1
- @staticmethod
- def generate_title_match_score_prompt(title_list):
- title_list_string = "\n".join(title_list)
- prompt = f"""
- ** 任务指令 **
- 你是一名资深中文新闻编辑,需根据以下标准对一批标题进行主题匹配度评分(0-100分)
- ** 评估维度及权重 **
- 1. 受众精准度(50%)
- 正向匹配:存款/养老/健康/饮食/疾病警示/家庭伦理/近现代战争历史/老知青/奇闻异事
- 负向排除:影视解说/文学解读/个人收藏(钱币/邮票)/机械科普/数码测评/电子游戏/时尚潮流/明星八卦/极限运动/学术研究/网络热梗/宠物饲养/音乐/棋牌
- 2. 标题技法(40%)
- 悬念设计:疑问句/省略号/反转结构(例:"打开后瞬间愣住...")
- 情感强度:使用"痛心!""寒心!"等情绪词
- 数据冲击:具体数字增强可信度(例:"存款180万消失")
- 口语化表达:使用"涨知识了""别不当回事"等日常用语
- 3. 内容调性(10%)
- 煽情猎奇:家庭悲剧/离奇事件(例:"棺材板挖出金条")
- 警示价值:健康建议/法律案例(例:"三种食物禁止二次加热")
- 历史揭秘:人物秘闻/老照片故事
- 爱国情怀:军事突破/资源发现(例:"南极发现巨型粮仓")
- ** 评分规则 **
- 90-100分:同时满足3个维度且要素齐全,无负向内容
- 70-89分:满足2个核心维度,无负向内容
- 50-69分:仅满足受众群体正向匹配,无负向内容
- 30-49分:存在轻微关联但要素缺失
- 0-29分:完全无关或包含任意负向品类内容
- ** 待评估标题 **
- {title_list_string}
- ** 输出要求 **
- 输出结果为JSON,仅输出这一批标题的评分,用数组 List 返回 [score1, score2, score3,...] 不要包含任何解释或说明。
- """
- return prompt
- class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
- """
- description: 对候选账号池内的账号进行质量分析
- 对于满足质量的账号,添加到抓取账号表里面
- """
- def __init__(self, pool, log_client, trace_id):
- self.pool = pool
- self.log_client = log_client
- self.trace_id = trace_id
- async def get_task_list(self) -> List[Dict]:
- """
- get account tasks from the database
- """
- query = """
- select id, title_list, platform, account_id, account_name
- from crawler_candidate_account_pool
- where avg_score is null and status = %s and title_list is not null;
- """
- response = await self.pool.async_fetch(query=query, params=(self.INIT_STATUS, ))
- await self.log_client.log(
- contents={
- "trace_id": self.trace_id,
- "message": f"获取账号数量: {len(response)}",
- }
- )
- return response
- async def update_account_status(
- self, account_id: int, ori_status: int, new_status: int
- ) -> int:
- """update account status"""
- query = """
- update crawler_candidate_account_pool
- set status = %s
- where id = %s and status = %s;
- """
- return await self.pool.async_save(query, (new_status, account_id, ori_status))
- async def insert_account_into_crawler_queue(
- self, score_list: List[int], account: dict
- ) -> None:
- """
- 计算账号的得分置信区间下限,若置信区间下限的分数大于阈值,则认为是好的账号
- """
- if ci_lower(score_list) > self.AVG_SCORE_THRESHOLD:
- query = """
- insert into article_meta_accounts (platform, account_id, account_name, account_source, status, trace_id)
- values (%s, %s, %s, %s, %s, %s);
- """
- await self.pool.async_save(
- query=query,
- params=(
- account["platform"],
- account["account_id"],
- account["account_name"],
- "ai_recognize",
- self.ACCOUNT_GOOD_STATUS,
- self.trace_id,
- ),
- )
- async def score_for_each_account_by_llm(self, account):
- account_id = account["id"]
- # lock
- if not await self.update_account_status(
- account_id, self.INIT_STATUS, self.PROCESSING_STATUS
- ):
- return
- # start processing
- title_list = json.loads(account["title_list"])
- if (
- len(title_list) < self.ARTICLE_COUNT_THRESHOLD
- and account["platform"] == "toutiao"
- ):
- await self.update_account_status(
- account_id, self.PROCESSING_STATUS, self.LACK_ARTICLE_STATUS
- )
- return
- # 平均标题过长
- avg_title_length = sum([len(title) for title in title_list]) / len(title_list)
- if avg_title_length > self.AVG_TITLE_LENGTH_THRESHOLD:
- await self.update_account_status(
- account_id, self.PROCESSING_STATUS, self.TITLE_TOO_LONG_STATUS
- )
- return
- prompt = self.generate_title_match_score_prompt(title_list)
- try:
- completion = fetch_deepseek_completion(
- model="DeepSeek-V3", prompt=prompt, output_type="json"
- )
- avg_score = sum(completion) / len(completion)
- query = """
- update crawler_candidate_account_pool
- set score_list = %s, avg_score = %s, status = %s
- where id = %s and status = %s;
- """
- await self.pool.async_save(
- query=query,
- params=(
- json.dumps(completion),
- avg_score,
- self.SUCCESS_STATUS,
- account_id,
- self.PROCESSING_STATUS,
- ),
- )
- # 判断置信区间下限, 并且插入账号
- await self.insert_account_into_crawler_queue(
- score_list=completion, account=account
- )
- except Exception as e:
- await self.log_client.log(
- contents={
- "task": "candidate_account_analysis",
- "trace_id": self.trace_id,
- "function": "score_for_each_account_by_llm",
- "message": "大模型识别账号失败",
- "status": "fail",
- "data": {
- "error": str(e),
- "title_list": json.dumps(title_list),
- },
- }
- )
- await self.update_account_status(
- account_id, self.PROCESSING_STATUS, self.FAILED_STATUS
- )
- async def get_task_execute_detail(self) -> Optional[Dict]:
- query = """
- select count(1) as new_mining_account from article_meta_accounts
- where trace_id = %s;
- """
- return await self.pool.async_fetch(query=query, params=(self.trace_id,))
- async def deal(self):
- task_list = await self.get_task_list()
- for task in tqdm(task_list, desc="use llm to analysis each account"):
- try:
- await self.score_for_each_account_by_llm(task)
- except Exception as e:
- await self.log_client.log(
- contents={
- "task": "candidate_account_analysis",
- "trace_id": self.trace_id,
- "function": "deal",
- "status": "fail",
- "data": {
- "error": str(e),
- "traceback": traceback.format_exc(),
- "task": task,
- },
- }
- )
- # analysis
- execute_response = await self.get_task_execute_detail()
- detail = {
- "total_execute_acounts": len(task_list),
- "new_mining_account": execute_response[0]["new_mining_account"],
- }
- await feishu_robot.bot(
- title="执行账号质量分析任务",
- detail=detail,
- mention=False
- )
|