import concurrent import json import threading from core.database import DBHelper from data_models.content_data import ContentData from data_models.keyword_clustering import KeywordClustering from data_models.keyword_data import KeywordData from data_models.keyword_with_content import KeywordWithContent from utils.deepseek_utils import text_segment, text_question, create_keyword_summary_prompt, get_keyword_summary, \ update_keyword_summary_prompt def is_empty(value): """辅助函数:判断值是否为空(None 或空字符串)""" return value is None or value == "" def parse_json(file_path): try: # 读取文件内容 with open(file_path, 'r', encoding='utf-8') as file: try: # 解析JSON内容 json_data = json.load(file) # 检查是否为JSON数组 if isinstance(json_data, list): return json_data # # 遍历每个JSON对象 # for index, json_obj in enumerate(json_data, 1): # body_text = json_obj.get("body_text", "") # 字段不存在 → 空字符串 # if not is_empty(body_text): # text_list.append(body_text) else: print("错误: 文件内容不是一个JSON数组") except json.JSONDecodeError as e: print(f"JSON解析错误: {e}") except FileNotFoundError: print(f"错误: 找不到文件 '{file_path}'") except Exception as e: print(f"发生错误: {e}") return [] def generate_keywords(keyword, content_data): db_helper = DBHelper() keyword_data = db_helper.get(KeywordData, keyword=keyword) if keyword_data is None: new_keyword_data = KeywordData(keyword=keyword) keyword_data = db_helper.add(new_keyword_data) keyword_with_content = KeywordWithContent(keyword_id=keyword_data.id, content_id=content_data.id) db_helper.add(keyword_with_content) keyword_clustering = db_helper.get(KeywordClustering, keyword_id=keyword_data.id) if keyword_clustering is None: keyword_summary = get_keyword_summary(content_data.content, keyword_data.keyword) new_keyword_clustering = KeywordClustering(keyword_id=keyword_data.id, keyword_summary=keyword_summary['keyword_summary']) db_helper.add(new_keyword_clustering) print(new_keyword_clustering) else: new_keyword_summary = update_keyword_summary_prompt(keyword_clustering.keyword_summary, keyword, content_data.content) db_helper.update(KeywordClustering, filters={"id": keyword_clustering.id}, updates={"keyword_summary": new_keyword_summary}) # 划分7元组 def ai_dispose(text): db_helper = DBHelper() segments = text_segment(text)['segments'] segment_pre_content_id = None for segment in segments: text = '' content = segment['content'] summary = segment['summary'] keywords = segment['keywords'] if not is_empty(content) and not is_empty(summary): # 两个都不为空:拼接(中间加空格,可按需改为空字符串 "") text = f"{content}。{summary}" elif not is_empty(summary): # 仅 title 不为空:返回 title text = content elif not is_empty(content): # 仅 body_text 不为空:返回 body_text text = summary questions = text_question(text)['questions'] content_data = ContentData(pre_content_id=segment_pre_content_id, content=content, summary=summary, keywords=json.dumps(keywords, ensure_ascii=False), entities=json.dumps(segment['entities'], ensure_ascii=False), questions=json.dumps(questions, ensure_ascii=False), keywords_status=0) db_helper.add(content_data) segment_pre_content_id = content_data.id for keyword in keywords: generate_keywords(keyword, content_data) db_helper.update(ContentData, filters={"id": content_data.id}, updates={"keywords_status": 1}) print_lock = threading.Lock() def process_text(text): """处理单个文本的函数""" try: # 使用锁确保打印不会交叉 with print_lock: print(f"处理文本: {text[:50]}...") # 只打印前50个字符避免过长 # 调用原来的处理函数 result = ai_dispose(text) return result except Exception as e: with print_lock: print(f"处理文本时出错: {e}") return None # 使用线程池处理文本列表 def process_texts_concurrently(text_list, max_workers=20): """使用多线程并发处理文本列表""" results = [] # 创建线程池执行器 with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务到线程池 future_to_text = {executor.submit(process_text, text['body_text']): text for text in text_list} # 处理完成的任务 for future in concurrent.futures.as_completed(future_to_text): text = future_to_text[future] try: result = future.result() results.append(result) with print_lock: print(f"成功处理文本: {text[:30]}...") except Exception as e: with print_lock: print(f"处理文本时发生异常: {e}") return results if __name__ == '__main__': json_path = '../data/test_data1.json' text_list = parse_json(json_path) process_texts_concurrently(text_list)