account_recognize_by_llm.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. """
  2. use llm function to recognize the account information
  3. """
  4. import json
  5. from pymysql.cursors import DictCursor
  6. from tqdm import tqdm
  7. from applications.api import fetch_deepseek_response
  8. from applications.db import DatabaseConnector
  9. from config import long_articles_config
  10. def generate_prompt(account_title_list):
  11. """
  12. 生成prompt
  13. :param account_title_list:
  14. """
  15. title_list = '\n'.join(account_title_list)
  16. g_prompt = f"""
  17. ** 任务指令 **
  18. 你是一名资深中文新闻编辑,需根据以下标准对一批标题进行主题匹配度评分(0-100分)
  19. ** 评估维度及权重 **
  20. 1. 受众精准度(50%)
  21. 中老年群体:涉及存款/养老/健康/饮食/疾病警示/家庭伦理/近现代战争历史/老知青/奇闻异事
  22. 2. 标题技法(40%)
  23. 悬念设计:疑问句/省略号/反转结构(例:"打开后瞬间愣住...")
  24. 情感强度:使用"痛心!""寒心!"等情绪词
  25. 数据冲击:具体数字增强可信度(例:"存款180万消失")
  26. 口语化表达:使用"涨知识了""别不当回事"等日常用语
  27. 3. 内容调性(10%)
  28. 煽情猎奇:家庭悲剧/离奇事件(例:"棺材板挖出金条")
  29. 警示价值:健康建议/法律案例(例:"三种食物禁止二次加热")
  30. 历史揭秘:人物秘闻/老照片故事
  31. 爱国情怀:军事突破/资源发现(例:"南极发现巨型粮仓")
  32. ** 评分规则 **
  33. 90-100分:同时满足3个维度且要素齐全
  34. 70-89分:满足2个核心维度
  35. 50-69分:仅满足受众群体
  36. 30-49分:存在关联但要素缺失
  37. 0-29分:完全无关
  38. ** 待评估标题 **
  39. {title_list}
  40. ** 输出要求 **
  41. 仅输出这一批标题的评分,用数组 List 返回 [score1, score2, score3,...] 不要包含任何解释或说明。
  42. """
  43. return g_prompt
  44. class AccountRecognizer:
  45. def __init__(self):
  46. self.db_client = DatabaseConnector(long_articles_config)
  47. self.db_client.connect()
  48. def get_task_list(self):
  49. """
  50. get account task from database
  51. """
  52. fetch_query = f"""
  53. select id, title_list from crawler_candidate_account_pool
  54. where avg_score is null and status = 0 and title_list is not null;
  55. """
  56. fetch_response = self.db_client.fetch(fetch_query, cursor_type=DictCursor)
  57. return fetch_response
  58. def update_task_status(self, task_id, ori_status, new_status):
  59. """
  60. update task status
  61. """
  62. update_query = f"""
  63. update crawler_candidate_account_pool
  64. set status = %s
  65. where id = %s and status = %s;
  66. """
  67. self.db_client.save(update_query, (new_status, task_id, ori_status))
  68. def recognize_each_account(self, account):
  69. """
  70. recognize each account
  71. """
  72. task_id = account['id']
  73. # lock task
  74. self.update_task_status(task_id, 0, 1)
  75. # process
  76. title_list = json.loads(account["title_list"])
  77. if len(title_list) < 15:
  78. # 账号数量不足,直接跳过
  79. print("bad account, skip")
  80. self.update_task_status(task_id, 1, 11)
  81. return
  82. prompt = generate_prompt(title_list)
  83. response = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
  84. response_score_str = response.strip()
  85. try:
  86. score_list = json.loads(response_score_str)
  87. avg_score = sum(score_list) / len(score_list)
  88. except Exception as e:
  89. score_list = []
  90. avg_score = 0
  91. if score_list and avg_score:
  92. update_query = f"""
  93. update crawler_candidate_account_pool
  94. set score_list = %s, avg_score = %s, status = %s
  95. where id = %s and status = %s;
  96. """
  97. self.db_client.save(update_query, (json.dumps(score_list), avg_score, 2, task_id, 1))
  98. else:
  99. self.update_task_status(task_id, 1, 12)
  100. def deal(self):
  101. task_list = self.get_task_list()
  102. for task in tqdm(task_list):
  103. try:
  104. self.recognize_each_account(task)
  105. except Exception as e:
  106. print(e)
  107. self.update_task_status(task['id'], 1, 13)