|
@@ -0,0 +1,176 @@
|
|
|
+import os
|
|
|
+import time
|
|
|
+import json
|
|
|
+import re
|
|
|
+from google import genai
|
|
|
+from google.genai import types
|
|
|
+import config
|
|
|
+
|
|
|
+
|
|
|
+# --- 正确的初始化流程 ---
|
|
|
+
|
|
|
+
|
|
|
+client = genai.Client(api_key=config.GEMINI_API_KEY)
|
|
|
+
|
|
|
+
|
|
|
+# 系统提示词和COT配置
|
|
|
+DEFAULT_SYSTEM_PROMPT = """
|
|
|
+"""
|
|
|
+
|
|
|
+SYSTEM_PROMPT_FILE="system_prompt/v37"
|
|
|
+
|
|
|
+def load_system_prompt(prompt_file_path: str) -> str:
|
|
|
+ """
|
|
|
+ 从指定文件加载系统提示词
|
|
|
+ :param prompt_file_path: 系统提示词文件路径
|
|
|
+ :return: 系统提示词内容
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ with open(prompt_file_path, 'r', encoding='utf-8') as f:
|
|
|
+ system_prompt = f.read().strip()
|
|
|
+ print(f"成功从 {prompt_file_path} 加载系统提示词")
|
|
|
+ return system_prompt
|
|
|
+ except Exception as e:
|
|
|
+ print(f"读取系统提示词文件 {prompt_file_path} 出错: {str(e)}")
|
|
|
+ return DEFAULT_SYSTEM_PROMPT
|
|
|
+
|
|
|
+SYSTEM_PROMPT = load_system_prompt(SYSTEM_PROMPT_FILE)
|
|
|
+# 再次提醒:SYSTEM_PROMPT 的内容必须与您期望的输入/输出格式严格匹配。
|
|
|
+# 它应该明确说明模型将收到的是 "目标对话\n[目标对话JSON字符串]\n上下文对话\n[上下文JSON数组字符串]" 这种格式,
|
|
|
+# 并期望输出为您提供的 { "对话整体解构": {...}, "详细解构": [...] } JSON 对象结构。
|
|
|
+
|
|
|
+
|
|
|
+def process_files_sequentially(input_dir: str, output_dir: str, num_context_files: int = 4, delay_seconds: float = 2.0):
|
|
|
+ """
|
|
|
+ 逐个处理文件夹中的文本文件,每个目标文件带上下文
|
|
|
+ :param input_dir: 输入文件夹路径
|
|
|
+ :param output_dir: 输出文件夹路径
|
|
|
+ :param num_context_files: 每个目标文件附带的上下文文件数量
|
|
|
+ :param delay_seconds: 每个文件处理之间的延迟(秒)
|
|
|
+ """
|
|
|
+
|
|
|
+ # 确保输出目录存在
|
|
|
+ os.makedirs(output_dir, exist_ok=True)
|
|
|
+
|
|
|
+ # 获取所有txt文件
|
|
|
+ # 注意: f.endswith('') 会匹配所有文件,如果只想处理txt,应改为 f.endswith('.txt')
|
|
|
+ input_files_names = sorted([f for f in os.listdir(input_dir) if f.endswith('')])
|
|
|
+ total_files = len(input_files_names)
|
|
|
+
|
|
|
+ print(f"找到 {total_files} 个文件。将逐个处理(每个目标文件附带 {num_context_files} 个上下文文件)")
|
|
|
+
|
|
|
+ # 预先读取所有文件内容,以便高效构建上下文
|
|
|
+ all_file_contents = []
|
|
|
+ for filename in input_files_names:
|
|
|
+ input_path = os.path.join(input_dir, filename)
|
|
|
+ try:
|
|
|
+ with open(input_path, 'r', encoding='utf-8') as f:
|
|
|
+ all_file_contents.append(f.read().strip())
|
|
|
+ except Exception as e:
|
|
|
+ print(f" ✕ 预读取文件 {filename} 出错: {str(e)}")
|
|
|
+ all_file_contents.append(f"错误: 无法读取文件 '{filename}' - {str(e)}")
|
|
|
+
|
|
|
+ # 逐个处理文件
|
|
|
+ # i 现在直接代表当前处理文件的索引
|
|
|
+ for i in range(1):
|
|
|
+ # for i in range(total_files):
|
|
|
+ target_filename = input_files_names[i]
|
|
|
+ target_content = all_file_contents[i]
|
|
|
+
|
|
|
+ # 收集上下文文件内容
|
|
|
+ context_contents = []
|
|
|
+ for k in range(1, num_context_files + 1):
|
|
|
+ context_idx = i + k
|
|
|
+ if context_idx < total_files:
|
|
|
+ context_contents.append(all_file_contents[context_idx])
|
|
|
+ # 如果没有足够的上下文文件,就按实际数量提供,不会填充空字符串
|
|
|
+
|
|
|
+ print(f"\n处理文件 {i+1}/{total_files}: '{target_filename}' (目标 + {len(context_contents)} 个上下文文件)")
|
|
|
+
|
|
|
+ output_path = os.path.join(output_dir, f"{os.path.splitext(target_filename)[0]}.json")
|
|
|
+
|
|
|
+ target_content_json_str = json.dumps(target_content, ensure_ascii=False)
|
|
|
+ context_contents_json_str = json.dumps(context_contents, ensure_ascii=False)
|
|
|
+
|
|
|
+ # 构建符合 SYSTEM_PROMPT 期望的单个文本字符串,包含Markdown标题和JSON内容
|
|
|
+ combined_input_text = (
|
|
|
+ f"## 目标对话\n"
|
|
|
+ f"{target_content_json_str}\n" # 使用json.dumps后的字符串
|
|
|
+ f"## 上下文对话\n"
|
|
|
+ f"{context_contents_json_str}" # 使用json.dumps后的字符串
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+ try:
|
|
|
+ contents = [
|
|
|
+ {"text": combined_input_text}
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 调用Gemini模型处理单个目标文件
|
|
|
+ response = client.models.generate_content(
|
|
|
+ model="gemini-2.5-pro", # 或者您需要的其他模型
|
|
|
+ config=types.GenerateContentConfig(
|
|
|
+ system_instruction=SYSTEM_PROMPT),
|
|
|
+ contents=contents
|
|
|
+ )
|
|
|
+
|
|
|
+ result = response.text
|
|
|
+
|
|
|
+ # 移除Markdown代码块的围栏
|
|
|
+ result = re.sub(r'^\s*```json\s*|\s*```\s*$', '', result, flags=re.MULTILINE)
|
|
|
+ result = result.strip() # 去除多余的空行
|
|
|
+
|
|
|
+ # 尝试解析JSON响应
|
|
|
+ try:
|
|
|
+ # 此时 result 应该就是单个文件的 JSON 结果,即您提供的 { "对话整体解构": {...}, "详细解构": [...] } 结构
|
|
|
+ dialogue_report = json.loads(result)
|
|
|
+ print(f" 成功获取并解析API响应 '{target_filename}'")
|
|
|
+
|
|
|
+ # 保存处理结果
|
|
|
+ # dialogue_report 现在是一个字典,可以直接保存
|
|
|
+ try:
|
|
|
+ with open(output_path, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(dialogue_report, f, indent=2, ensure_ascii=False)
|
|
|
+ print(f" ✓ 保存: {os.path.basename(output_path)}")
|
|
|
+ except Exception as e:
|
|
|
+ error_msg = f"保存错误: {str(e)}"
|
|
|
+ with open(output_path, 'w', encoding='utf-8') as f:
|
|
|
+ f.write(error_msg)
|
|
|
+ print(f" ⚠ 保存失败: {error_msg}")
|
|
|
+
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ print(f" ⚠ API返回非JSON格式,尝试提取有效部分... 错误: {e}")
|
|
|
+ # ****** 重点修改:寻找 '{' 和 '}' 来提取JSON对象 ******
|
|
|
+ json_start = result.find('{')
|
|
|
+ json_end = result.rfind('}') + 1 # +1 to include the closing brace
|
|
|
+ if json_end > json_start > -1: # 检查是否找到了有效的括号对
|
|
|
+ try:
|
|
|
+ extracted_report = json.loads(result[json_start:json_end])
|
|
|
+ print(f" 成功提取JSON数据 for '{target_filename}'")
|
|
|
+ with open(output_path, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(extracted_report, f, indent=2, ensure_ascii=False)
|
|
|
+ print(f" ✓ 保存 (提取成功): {os.path.basename(output_path)}")
|
|
|
+ except Exception as extract_e:
|
|
|
+ error_msg = f"无法提取有效JSON数据,使用原始响应。提取错误: {extract_e}\n原始响应:\n{result}"
|
|
|
+ with open(output_path, 'w', encoding='utf-8') as f:
|
|
|
+ f.write(error_msg)
|
|
|
+ print(f" ⚠ 保存失败 (提取错误): {error_msg}")
|
|
|
+ else:
|
|
|
+ error_msg = f"无法定位JSON内容,使用原始响应。\n原始响应:\n{result}"
|
|
|
+ with open(output_path, 'w', encoding='utf-8') as f:
|
|
|
+ f.write(error_msg)
|
|
|
+ print(f" ⚠ 保存失败 (未找到JSON): {error_msg}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ error_msg = f"处理 '{target_filename}' 时API错误: {str(e)}"
|
|
|
+ print(f" ✕ {error_msg}")
|
|
|
+ # API调用失败,为当前文件生成错误文件
|
|
|
+ with open(output_path, 'w', encoding='utf-8') as f:
|
|
|
+ f.write(error_msg)
|
|
|
+
|
|
|
+ # 延迟 (在处理完当前文件后,如果不是最后一个文件)
|
|
|
+ if i < total_files - 1:
|
|
|
+ print(f"等待 {delay_seconds} 秒...")
|
|
|
+ time.sleep(delay_seconds)
|
|
|
+
|
|
|
+ print("\n所有文件处理完成")
|