account_recognize_by_llm.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. """
  2. use llm function to recognize the account information
  3. """
  4. import json
  5. from pymysql.cursors import DictCursor
  6. from tqdm import tqdm
  7. from threading import local
  8. import concurrent
  9. from concurrent.futures import ThreadPoolExecutor
  10. from applications.api import fetch_deepseek_completion
  11. from applications.db import DatabaseConnector
  12. from config import long_articles_config
  13. from tasks.ai_tasks.prompts import category_generation_for_each_account
  14. from tasks.ai_tasks.prompts import get_title_match_score_list
  15. thread_local = local()
  16. def get_db_client():
  17. """
  18. each thread get it's own db client
  19. """
  20. if not hasattr(thread_local, "db_client"):
  21. thread_local.db_client = DatabaseConnector(long_articles_config)
  22. thread_local.db_client.connect()
  23. return thread_local.db_client
  24. def update_task_status(thread_db_client, task_id, ori_status, new_status):
  25. """
  26. update task status
  27. """
  28. update_query = f"""
  29. update crawler_candidate_account_pool
  30. set status = %s
  31. where id = %s and status = %s;
  32. """
  33. thread_db_client.save(update_query, (new_status, task_id, ori_status))
  34. def update_task_category_status(thread_db_client, task_id, ori_status, new_status):
  35. """
  36. update task status
  37. """
  38. update_query = f"""
  39. update crawler_candidate_account_pool
  40. set category_status = %s
  41. where id = %s and category_status = %s;
  42. """
  43. thread_db_client.save(update_query, (new_status, task_id, ori_status))
  44. def get_account_score(thread_db_client, account):
  45. """
  46. recognize each account
  47. """
  48. task_id = account["id"]
  49. # lock task
  50. update_task_status(thread_db_client, task_id, 0, 1)
  51. # process
  52. title_list = json.loads(account["title_list"])
  53. if len(title_list) < 15 and account["platform"] == "toutiao":
  54. # 账号数量不足,直接跳过
  55. print("bad account, skip")
  56. update_task_status(thread_db_client, task_id, 1, 11)
  57. return
  58. # 标题长度过长,需要过滤
  59. title_total_length = sum(len(title) for title in title_list)
  60. avg_title_length = title_total_length / len(title_list)
  61. if avg_title_length > 45:
  62. print("title too long, skip")
  63. update_task_status(thread_db_client, task_id, 1, 14)
  64. return
  65. prompt = get_title_match_score_list(title_list)
  66. response = fetch_deepseek_completion(model="DeepSeek-V3", prompt=prompt)
  67. response_score_str = response.strip()
  68. try:
  69. score_list = json.loads(response_score_str)
  70. avg_score = sum(score_list) / len(score_list)
  71. except Exception as e:
  72. score_list = []
  73. avg_score = 0
  74. if score_list and avg_score:
  75. update_query = f"""
  76. update crawler_candidate_account_pool
  77. set score_list = %s, avg_score = %s, status = %s
  78. where id = %s and status = %s;
  79. """
  80. thread_db_client.save(
  81. update_query, (json.dumps(score_list), avg_score, 2, task_id, 1)
  82. )
  83. else:
  84. update_task_status(thread_db_client, task_id, 1, 12)
  85. def get_account_category(thread_db_client, account):
  86. """
  87. recognize each account
  88. """
  89. task_id = account["id"]
  90. title_list = json.loads(account["title_list"])
  91. # lock task
  92. update_task_category_status(thread_db_client, task_id, 0, 1)
  93. prompt = category_generation_for_each_account(title_list)
  94. response = fetch_deepseek_completion(model="DeepSeek-V3", prompt=prompt)
  95. print(response)
  96. response_category = response.strip()
  97. if response_category:
  98. update_query = f"""
  99. update crawler_candidate_account_pool
  100. set category = %s, category_status = %s
  101. where id = %s and category_status = %s;
  102. """
  103. thread_db_client.save(update_query, (response_category, 2, task_id, 1))
  104. else:
  105. update_task_category_status(thread_db_client, task_id, 1, 99)
  106. def recognize_account_thread(account, task):
  107. """
  108. recognize thread
  109. """
  110. match task:
  111. case "score":
  112. thread_db_client = get_db_client()
  113. try:
  114. get_account_score(thread_db_client, account)
  115. except Exception as e:
  116. update_task_status(
  117. thread_db_client=thread_db_client,
  118. task_id=account["id"],
  119. ori_status=1,
  120. new_status=13,
  121. )
  122. case "category":
  123. thread_db_client = get_db_client()
  124. try:
  125. get_account_category(thread_db_client, account)
  126. except Exception as e:
  127. update_task_category_status(
  128. thread_db_client=thread_db_client,
  129. task_id=account["id"],
  130. ori_status=1,
  131. new_status=99,
  132. )
  133. case "_":
  134. return
  135. class CandidateAccountRecognizer:
  136. INIT_STATUS = 0
  137. PROCESSING_STATUS = 1
  138. SUCCESS_STATUS = 2
  139. FAILED_STATUS = 99
  140. AVG_SCORE_THRESHOLD = 65
  141. def __init__(self):
  142. self.db_client = DatabaseConnector(long_articles_config)
  143. self.db_client.connect()
  144. class CandidateAccountQualityScoreRecognizer(CandidateAccountRecognizer):
  145. def get_task_list(self):
  146. """
  147. get account tasks from the database
  148. """
  149. fetch_query = f"""
  150. select id, title_list, platform
  151. from crawler_candidate_account_pool
  152. where avg_score is null and status = {self.INIT_STATUS} and title_list is not null;
  153. """
  154. fetch_response = self.db_client.fetch(fetch_query, cursor_type=DictCursor)
  155. return fetch_response
  156. def deal(self):
  157. task_list = self.get_task_list()
  158. with ThreadPoolExecutor(max_workers=8) as executor:
  159. futures = [
  160. executor.submit(recognize_account_thread, task, "score")
  161. for task in task_list
  162. ]
  163. for future in tqdm(
  164. concurrent.futures.as_completed(futures),
  165. total=len(task_list),
  166. desc="处理进度",
  167. ):
  168. future.result()
  169. class CandidateAccountCategoryRecognizer(CandidateAccountRecognizer):
  170. def get_task_list(self):
  171. fetch_query = f"""
  172. select id, title_list
  173. from crawler_candidate_account_pool
  174. where category_status = %s and avg_score >= %s;
  175. """
  176. fetch_response = self.db_client.fetch(
  177. fetch_query,
  178. cursor_type=DictCursor,
  179. params=(self.INIT_STATUS, self.AVG_SCORE_THRESHOLD),
  180. )
  181. return fetch_response
  182. def deal(self):
  183. task_list = self.get_task_list()
  184. with ThreadPoolExecutor(max_workers=8) as executor:
  185. futures = [
  186. executor.submit(recognize_account_thread, task, "category")
  187. for task in task_list
  188. ]
  189. for future in tqdm(
  190. concurrent.futures.as_completed(futures),
  191. total=len(task_list),
  192. desc="处理进度",
  193. ):
  194. future.result()