account_recognize_by_llm.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. """
  2. use llm function to recognize the account information
  3. """
  4. import json
  5. from pymysql.cursors import DictCursor
  6. from tqdm import tqdm
  7. from threading import local
  8. import concurrent
  9. from concurrent.futures import ThreadPoolExecutor
  10. from applications.api import fetch_deepseek_response
  11. from applications.db import DatabaseConnector
  12. from config import long_articles_config
  13. thread_local = local()
  14. def generate_prompt(account_title_list):
  15. """
  16. 生成prompt
  17. :param account_title_list:
  18. """
  19. title_list = "\n".join(account_title_list)
  20. g_prompt = f"""
  21. ** 任务指令 **
  22. 你是一名资深中文新闻编辑,需根据以下标准对一批标题进行主题匹配度评分(0-100分)
  23. ** 评估维度及权重 **
  24. 1. 受众精准度(50%)
  25. 正向匹配:存款/养老/健康/饮食/疾病警示/家庭伦理/近现代战争历史/老知青/奇闻异事
  26. 负向排除:影视解说/文学解读/个人收藏(钱币/邮票)/机械科普/数码测评/电子游戏/时尚潮流/明星八卦/极限运动/学术研究/网络热梗/宠物饲养
  27. 2. 标题技法(40%)
  28. 悬念设计:疑问句/省略号/反转结构(例:"打开后瞬间愣住...")
  29. 情感强度:使用"痛心!""寒心!"等情绪词
  30. 数据冲击:具体数字增强可信度(例:"存款180万消失")
  31. 口语化表达:使用"涨知识了""别不当回事"等日常用语
  32. 3. 内容调性(10%)
  33. 煽情猎奇:家庭悲剧/离奇事件(例:"棺材板挖出金条")
  34. 警示价值:健康建议/法律案例(例:"三种食物禁止二次加热")
  35. 历史揭秘:人物秘闻/老照片故事
  36. 爱国情怀:军事突破/资源发现(例:"南极发现巨型粮仓")
  37. ** 评分规则 **
  38. 90-100分:同时满足3个维度且要素齐全,无负向内容
  39. 70-89分:满足2个核心维度,无负向内容
  40. 50-69分:仅满足受众群体正向匹配,无负向内容
  41. 30-49分:存在轻微关联但要素缺失
  42. 0-29分:完全无关或包含任意负向品类内容
  43. ** 待评估标题 **
  44. {title_list}
  45. ** 输出要求 **
  46. 仅输出这一批标题的评分,用数组 List 返回 [score1, score2, score3,...] 不要包含任何解释或说明。
  47. """
  48. return g_prompt
  49. def get_db_client():
  50. """
  51. each thread get it's own db client
  52. """
  53. if not hasattr(thread_local, "db_client"):
  54. thread_local.db_client = DatabaseConnector(long_articles_config)
  55. thread_local.db_client.connect()
  56. return thread_local.db_client
  57. def update_task_status(thread_db_client, task_id, ori_status, new_status):
  58. """
  59. update task status
  60. """
  61. update_query = f"""
  62. update crawler_candidate_account_pool
  63. set status = %s
  64. where id = %s and status = %s;
  65. """
  66. thread_db_client.save(update_query, (new_status, task_id, ori_status))
  67. def recognize_each_account(thread_db_client, account):
  68. """
  69. recognize each account
  70. """
  71. task_id = account["id"]
  72. # lock task
  73. update_task_status(thread_db_client, task_id, 0, 1)
  74. # process
  75. title_list = json.loads(account["title_list"])
  76. if len(title_list) < 15:
  77. # 账号数量不足,直接跳过
  78. print("bad account, skip")
  79. update_task_status(thread_db_client, task_id, 1, 11)
  80. return
  81. prompt = generate_prompt(title_list)
  82. response = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
  83. response_score_str = response.strip()
  84. try:
  85. score_list = json.loads(response_score_str)
  86. avg_score = sum(score_list) / len(score_list)
  87. except Exception as e:
  88. score_list = []
  89. avg_score = 0
  90. if score_list and avg_score:
  91. update_query = f"""
  92. update crawler_candidate_account_pool
  93. set score_list = %s, avg_score = %s, status = %s
  94. where id = %s and status = %s;
  95. """
  96. thread_db_client.save(
  97. update_query, (json.dumps(score_list), avg_score, 2, task_id, 1)
  98. )
  99. else:
  100. update_task_status(thread_db_client, task_id, 1, 12)
  101. def recognize_task_thread(task):
  102. """
  103. recognize thread
  104. """
  105. thread_db_client = get_db_client()
  106. try:
  107. recognize_each_account(thread_db_client, task)
  108. except Exception as e:
  109. print(e)
  110. update_task_status(
  111. thread_db_client=thread_db_client,
  112. task_id=["id"],
  113. ori_status=1,
  114. new_status=13,
  115. )
  116. class AccountRecognizer:
  117. def __init__(self):
  118. self.db_client = DatabaseConnector(long_articles_config)
  119. self.db_client.connect()
  120. def get_task_list(self):
  121. """
  122. get account task from database
  123. """
  124. fetch_query = f"""
  125. select id, title_list from crawler_candidate_account_pool
  126. where avg_score is null and status = 0 and title_list is not null;
  127. """
  128. fetch_response = self.db_client.fetch(fetch_query, cursor_type=DictCursor)
  129. return fetch_response
  130. def deal(self):
  131. task_list = self.get_task_list()
  132. with ThreadPoolExecutor(max_workers=8) as executor:
  133. futures = [
  134. executor.submit(recognize_task_thread, task) for task in task_list
  135. ]
  136. for future in tqdm(
  137. concurrent.futures.as_completed(futures),
  138. total=len(task_list),
  139. desc="处理进度",
  140. ):
  141. future.result()