import os import time import json import re from google import genai from google.genai import types import config # --- 正确的初始化流程 --- client = genai.Client(api_key=config.GEMINI_API_KEY) # 系统提示词和COT配置 DEFAULT_SYSTEM_PROMPT = """ """ SYSTEM_PROMPT_FILE="system_prompt/v37" def load_system_prompt(prompt_file_path: str) -> str: """ 从指定文件加载系统提示词 :param prompt_file_path: 系统提示词文件路径 :return: 系统提示词内容 """ try: with open(prompt_file_path, 'r', encoding='utf-8') as f: system_prompt = f.read().strip() print(f"成功从 {prompt_file_path} 加载系统提示词") return system_prompt except Exception as e: print(f"读取系统提示词文件 {prompt_file_path} 出错: {str(e)}") return DEFAULT_SYSTEM_PROMPT SYSTEM_PROMPT = load_system_prompt(SYSTEM_PROMPT_FILE) # 再次提醒:SYSTEM_PROMPT 的内容必须与您期望的输入/输出格式严格匹配。 # 它应该明确说明模型将收到的是 "目标对话\n[目标对话JSON字符串]\n上下文对话\n[上下文JSON数组字符串]" 这种格式, # 并期望输出为您提供的 { "对话整体解构": {...}, "详细解构": [...] } JSON 对象结构。 def process_files_sequentially(input_dir: str, output_dir: str, num_context_files: int = 4, delay_seconds: float = 2.0): """ 逐个处理文件夹中的文本文件,每个目标文件带上下文 :param input_dir: 输入文件夹路径 :param output_dir: 输出文件夹路径 :param num_context_files: 每个目标文件附带的上下文文件数量 :param delay_seconds: 每个文件处理之间的延迟(秒) """ # 确保输出目录存在 os.makedirs(output_dir, exist_ok=True) # 获取所有txt文件 # 注意: f.endswith('') 会匹配所有文件,如果只想处理txt,应改为 f.endswith('.txt') input_files_names = sorted([f for f in os.listdir(input_dir) if f.endswith('')]) total_files = len(input_files_names) print(f"找到 {total_files} 个文件。将逐个处理(每个目标文件附带 {num_context_files} 个上下文文件)") # 预先读取所有文件内容,以便高效构建上下文 all_file_contents = [] for filename in input_files_names: input_path = os.path.join(input_dir, filename) try: with open(input_path, 'r', encoding='utf-8') as f: all_file_contents.append(f.read().strip()) except Exception as e: print(f" ✕ 预读取文件 {filename} 出错: {str(e)}") all_file_contents.append(f"错误: 无法读取文件 '{filename}' - {str(e)}") # 逐个处理文件 # i 现在直接代表当前处理文件的索引 for i in range(1): # for i in range(total_files): target_filename = input_files_names[i] target_content = all_file_contents[i] # 收集上下文文件内容 context_contents = [] for k in range(1, num_context_files + 1): context_idx = i + k if context_idx < total_files: context_contents.append(all_file_contents[context_idx]) # 如果没有足够的上下文文件,就按实际数量提供,不会填充空字符串 print(f"\n处理文件 {i+1}/{total_files}: '{target_filename}' (目标 + {len(context_contents)} 个上下文文件)") output_path = os.path.join(output_dir, f"{os.path.splitext(target_filename)[0]}.json") target_content_json_str = json.dumps(target_content, ensure_ascii=False) context_contents_json_str = json.dumps(context_contents, ensure_ascii=False) # 构建符合 SYSTEM_PROMPT 期望的单个文本字符串,包含Markdown标题和JSON内容 combined_input_text = ( f"## 目标对话\n" f"{target_content_json_str}\n" # 使用json.dumps后的字符串 f"## 上下文对话\n" f"{context_contents_json_str}" # 使用json.dumps后的字符串 ) try: contents = [ {"text": combined_input_text} ] # 调用Gemini模型处理单个目标文件 response = client.models.generate_content( model="gemini-2.5-pro", # 或者您需要的其他模型 config=types.GenerateContentConfig( system_instruction=SYSTEM_PROMPT), contents=contents ) result = response.text # 移除Markdown代码块的围栏 result = re.sub(r'^\s*```json\s*|\s*```\s*$', '', result, flags=re.MULTILINE) result = result.strip() # 去除多余的空行 # 尝试解析JSON响应 try: # 此时 result 应该就是单个文件的 JSON 结果,即您提供的 { "对话整体解构": {...}, "详细解构": [...] } 结构 dialogue_report = json.loads(result) print(f" 成功获取并解析API响应 '{target_filename}'") # 保存处理结果 # dialogue_report 现在是一个字典,可以直接保存 try: with open(output_path, 'w', encoding='utf-8') as f: json.dump(dialogue_report, f, indent=2, ensure_ascii=False) print(f" ✓ 保存: {os.path.basename(output_path)}") except Exception as e: error_msg = f"保存错误: {str(e)}" with open(output_path, 'w', encoding='utf-8') as f: f.write(error_msg) print(f" ⚠ 保存失败: {error_msg}") except json.JSONDecodeError as e: print(f" ⚠ API返回非JSON格式,尝试提取有效部分... 错误: {e}") # ****** 重点修改:寻找 '{' 和 '}' 来提取JSON对象 ****** json_start = result.find('{') json_end = result.rfind('}') + 1 # +1 to include the closing brace if json_end > json_start > -1: # 检查是否找到了有效的括号对 try: extracted_report = json.loads(result[json_start:json_end]) print(f" 成功提取JSON数据 for '{target_filename}'") with open(output_path, 'w', encoding='utf-8') as f: json.dump(extracted_report, f, indent=2, ensure_ascii=False) print(f" ✓ 保存 (提取成功): {os.path.basename(output_path)}") except Exception as extract_e: error_msg = f"无法提取有效JSON数据,使用原始响应。提取错误: {extract_e}\n原始响应:\n{result}" with open(output_path, 'w', encoding='utf-8') as f: f.write(error_msg) print(f" ⚠ 保存失败 (提取错误): {error_msg}") else: error_msg = f"无法定位JSON内容,使用原始响应。\n原始响应:\n{result}" with open(output_path, 'w', encoding='utf-8') as f: f.write(error_msg) print(f" ⚠ 保存失败 (未找到JSON): {error_msg}") except Exception as e: error_msg = f"处理 '{target_filename}' 时API错误: {str(e)}" print(f" ✕ {error_msg}") # API调用失败,为当前文件生成错误文件 with open(output_path, 'w', encoding='utf-8') as f: f.write(error_msg) # 延迟 (在处理完当前文件后,如果不是最后一个文件) if i < total_files - 1: print(f"等待 {delay_seconds} 秒...") time.sleep(delay_seconds) print("\n所有文件处理完成")