json_parse_utils.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. import concurrent
  2. import json
  3. import threading
  4. from core.database import DBHelper
  5. from data_models.content_data import ContentData
  6. from data_models.keyword_clustering import KeywordClustering
  7. from data_models.keyword_data import KeywordData
  8. from data_models.keyword_with_content import KeywordWithContent
  9. from utils.deepseek_utils import text_segment, text_question, create_keyword_summary_prompt, get_keyword_summary, \
  10. update_keyword_summary_prompt
  11. def is_empty(value):
  12. """辅助函数:判断值是否为空(None 或空字符串)"""
  13. return value is None or value == ""
  14. def parse_json(file_path):
  15. try:
  16. # 读取文件内容
  17. with open(file_path, 'r', encoding='utf-8') as file:
  18. try:
  19. # 解析JSON内容
  20. json_data = json.load(file)
  21. # 检查是否为JSON数组
  22. if isinstance(json_data, list):
  23. return json_data
  24. # # 遍历每个JSON对象
  25. # for index, json_obj in enumerate(json_data, 1):
  26. # body_text = json_obj.get("body_text", "") # 字段不存在 → 空字符串
  27. # if not is_empty(body_text):
  28. # text_list.append(body_text)
  29. else:
  30. print("错误: 文件内容不是一个JSON数组")
  31. except json.JSONDecodeError as e:
  32. print(f"JSON解析错误: {e}")
  33. except FileNotFoundError:
  34. print(f"错误: 找不到文件 '{file_path}'")
  35. except Exception as e:
  36. print(f"发生错误: {e}")
  37. return []
  38. def generate_keywords(keyword, content_data):
  39. db_helper = DBHelper()
  40. keyword_data = db_helper.get(KeywordData, keyword=keyword)
  41. if keyword_data is None:
  42. new_keyword_data = KeywordData(keyword=keyword)
  43. keyword_data = db_helper.add(new_keyword_data)
  44. keyword_with_content = KeywordWithContent(keyword_id=keyword_data.id, content_id=content_data.id)
  45. db_helper.add(keyword_with_content)
  46. keyword_clustering = db_helper.get(KeywordClustering, keyword_id=keyword_data.id)
  47. if keyword_clustering is None:
  48. keyword_summary = get_keyword_summary(content_data.content, keyword_data.keyword)
  49. new_keyword_clustering = KeywordClustering(keyword_id=keyword_data.id,
  50. keyword_summary=keyword_summary['keyword_summary'])
  51. db_helper.add(new_keyword_clustering)
  52. print(new_keyword_clustering)
  53. else:
  54. new_keyword_summary = update_keyword_summary_prompt(keyword_clustering.keyword_summary, keyword,
  55. content_data.content)
  56. db_helper.update(KeywordClustering, filters={"id": keyword_clustering.id},
  57. updates={"keyword_summary": new_keyword_summary})
  58. # 划分7元组
  59. def ai_dispose(text):
  60. db_helper = DBHelper()
  61. segments = text_segment(text)['segments']
  62. segment_pre_content_id = None
  63. for segment in segments:
  64. text = ''
  65. content = segment['content']
  66. summary = segment['summary']
  67. keywords = segment['keywords']
  68. if not is_empty(content) and not is_empty(summary):
  69. # 两个都不为空:拼接(中间加空格,可按需改为空字符串 "")
  70. text = f"{content}。{summary}"
  71. elif not is_empty(summary):
  72. # 仅 title 不为空:返回 title
  73. text = content
  74. elif not is_empty(content):
  75. # 仅 body_text 不为空:返回 body_text
  76. text = summary
  77. questions = text_question(text)['questions']
  78. content_data = ContentData(pre_content_id=segment_pre_content_id,
  79. content=content,
  80. summary=summary,
  81. keywords=json.dumps(keywords, ensure_ascii=False),
  82. entities=json.dumps(segment['entities'], ensure_ascii=False),
  83. questions=json.dumps(questions, ensure_ascii=False),
  84. keywords_status=0)
  85. db_helper.add(content_data)
  86. segment_pre_content_id = content_data.id
  87. for keyword in keywords:
  88. generate_keywords(keyword, content_data)
  89. db_helper.update(ContentData, filters={"id": content_data.id}, updates={"keywords_status": 1})
  90. print_lock = threading.Lock()
  91. def process_text(text):
  92. """处理单个文本的函数"""
  93. try:
  94. # 使用锁确保打印不会交叉
  95. with print_lock:
  96. print(f"处理文本: {text[:50]}...") # 只打印前50个字符避免过长
  97. # 调用原来的处理函数
  98. result = ai_dispose(text)
  99. return result
  100. except Exception as e:
  101. with print_lock:
  102. print(f"处理文本时出错: {e}")
  103. return None
  104. # 使用线程池处理文本列表
  105. def process_texts_concurrently(text_list, max_workers=20):
  106. """使用多线程并发处理文本列表"""
  107. results = []
  108. # 创建线程池执行器
  109. with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
  110. # 提交所有任务到线程池
  111. future_to_text = {executor.submit(process_text, text['body_text']): text for text in text_list}
  112. # 处理完成的任务
  113. for future in concurrent.futures.as_completed(future_to_text):
  114. text = future_to_text[future]
  115. try:
  116. result = future.result()
  117. results.append(result)
  118. with print_lock:
  119. print(f"成功处理文本: {text[:30]}...")
  120. except Exception as e:
  121. with print_lock:
  122. print(f"处理文本时发生异常: {e}")
  123. return results
  124. if __name__ == '__main__':
  125. json_path = '../data/test_data1.json'
  126. text_list = parse_json(json_path)
  127. process_texts_concurrently(text_list)