candidate_account_process.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. import json
  2. import traceback
  3. from typing import List, Dict
  4. from tqdm import tqdm
  5. from applications.api import fetch_deepseek_completion
  6. class CandidateAccountProcessConst:
  7. INIT_STATUS = 0
  8. PROCESSING_STATUS = 1
  9. SUCCESS_STATUS = 2
  10. FAILED_STATUS = 99
  11. LACK_ARTICLE_STATUS = 11
  12. TITLE_TOO_LONG_STATUS = 14
  13. AVG_SCORE_THRESHOLD = 65
  14. ARTICLE_COUNT_THRESHOLD = 13
  15. AVG_TITLE_LENGTH_THRESHOLD = 45
  16. @staticmethod
  17. def generate_title_match_score_prompt(title_list):
  18. title_list_string = "\n".join(title_list)
  19. prompt = f"""
  20. ** 任务指令 **
  21. 你是一名资深中文新闻编辑,需根据以下标准对一批标题进行主题匹配度评分(0-100分)
  22. ** 评估维度及权重 **
  23. 1. 受众精准度(50%)
  24. 正向匹配:存款/养老/健康/饮食/疾病警示/家庭伦理/近现代战争历史/老知青/奇闻异事
  25. 负向排除:影视解说/文学解读/个人收藏(钱币/邮票)/机械科普/数码测评/电子游戏/时尚潮流/明星八卦/极限运动/学术研究/网络热梗/宠物饲养/音乐/棋牌
  26. 2. 标题技法(40%)
  27. 悬念设计:疑问句/省略号/反转结构(例:"打开后瞬间愣住...")
  28. 情感强度:使用"痛心!""寒心!"等情绪词
  29. 数据冲击:具体数字增强可信度(例:"存款180万消失")
  30. 口语化表达:使用"涨知识了""别不当回事"等日常用语
  31. 3. 内容调性(10%)
  32. 煽情猎奇:家庭悲剧/离奇事件(例:"棺材板挖出金条")
  33. 警示价值:健康建议/法律案例(例:"三种食物禁止二次加热")
  34. 历史揭秘:人物秘闻/老照片故事
  35. 爱国情怀:军事突破/资源发现(例:"南极发现巨型粮仓")
  36. ** 评分规则 **
  37. 90-100分:同时满足3个维度且要素齐全,无负向内容
  38. 70-89分:满足2个核心维度,无负向内容
  39. 50-69分:仅满足受众群体正向匹配,无负向内容
  40. 30-49分:存在轻微关联但要素缺失
  41. 0-29分:完全无关或包含任意负向品类内容
  42. ** 待评估标题 **
  43. {title_list_string}
  44. ** 输出要求 **
  45. 输出结果为JSON,仅输出这一批标题的评分,用数组 List 返回 [score1, score2, score3,...] 不要包含任何解释或说明。
  46. """
  47. return prompt
  48. class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
  49. def __init__(self, pool, log_client, trace_id):
  50. self.pool = pool
  51. self.log_client = log_client
  52. self.trace_id = trace_id
  53. async def get_task_list(self) -> List[Dict]:
  54. """
  55. get account tasks from the database
  56. """
  57. fetch_query = f"""
  58. select id, title_list, platform
  59. from crawler_candidate_account_pool
  60. where avg_score is null and status = {self.INIT_STATUS} and title_list is not null;
  61. """
  62. fetch_response = await self.pool.async_fetch(
  63. fetch_query,
  64. )
  65. return fetch_response
  66. async def update_account_status(
  67. self, account_id: int, ori_status: int, new_status: int
  68. ) -> int:
  69. """update account status"""
  70. update_query = f"""
  71. update crawler_candidate_account_pool
  72. set status = %s
  73. where id = %s and status = %s;
  74. """
  75. return await self.pool.async_save(
  76. update_query, (new_status, account_id, ori_status)
  77. )
  78. async def score_for_each_account_by_llm(self, account):
  79. account_id = account["id"]
  80. # lock
  81. if not await self.update_account_status(
  82. account_id, self.INIT_STATUS, self.PROCESSING_STATUS
  83. ):
  84. return
  85. # start processing
  86. title_list = json.loads(account["title_list"])
  87. if (
  88. len(title_list) < self.ARTICLE_COUNT_THRESHOLD
  89. and account["platform"] == "toutiao"
  90. ):
  91. await self.update_account_status(
  92. account_id, self.PROCESSING_STATUS, self.LACK_ARTICLE_STATUS
  93. )
  94. return
  95. # 平均标题过长
  96. avg_title_length = sum([len(title) for title in title_list]) / len(title_list)
  97. if avg_title_length > self.AVG_TITLE_LENGTH_THRESHOLD:
  98. await self.update_account_status(
  99. account_id, self.PROCESSING_STATUS, self.TITLE_TOO_LONG_STATUS
  100. )
  101. return
  102. prompt = self.generate_title_match_score_prompt(title_list)
  103. try:
  104. completion = fetch_deepseek_completion(
  105. model="DeepSeek-V3", prompt=prompt, output_type="json"
  106. )
  107. avg_score = sum(completion) / len(completion)
  108. query = f"""
  109. update crawler_candidate_account_pool
  110. set score_list = %s, avg_score = %s, status = %s
  111. where id = %s and status = %s;
  112. """
  113. await self.pool.async_save(
  114. query=query,
  115. params=(
  116. json.dumps(completion),
  117. avg_score,
  118. self.PROCESSING_STATUS,
  119. account_id,
  120. self.SUCCESS_STATUS,
  121. ),
  122. )
  123. except Exception as e:
  124. await self.log_client.log(
  125. contents={
  126. "task": "candidate_account_analysis",
  127. "trace_id": self.trace_id,
  128. "function": "score_for_each_account_by_llm",
  129. "message": "大模型识别账号失败",
  130. "status": "fail",
  131. "data": {
  132. "error": str(e),
  133. "title_list": json.dumps(title_list),
  134. },
  135. }
  136. )
  137. await self.update_account_status(
  138. account_id, self.PROCESSING_STATUS, self.FAILED_STATUS
  139. )
  140. async def deal(self):
  141. task_list = await self.get_task_list()
  142. for task in tqdm(task_list, desc="use llm to analysis each account"):
  143. try:
  144. await self.score_for_each_account_by_llm(task)
  145. except Exception as e:
  146. await self.log_client.log(
  147. contents={
  148. "task": "candidate_account_analysis",
  149. "trace_id": self.trace_id,
  150. "function": "deal",
  151. "status": "fail",
  152. "data": {
  153. "error": str(e),
  154. "traceback": traceback.format_exc(),
  155. "task": task,
  156. },
  157. }
  158. )