#!/usr/bin/env python3 """ 语义相似度分析模块 使用 AI Agent 判断两个短语之间的语义相似度 """ from agents import Agent, Runner, ModelSettings from lib.client import get_model from lib.utils import parse_json_from_text from lib.config import get_cache_dir from typing import Dict, Any, Optional, List, Tuple import hashlib import json import os from datetime import datetime from pathlib import Path import asyncio import numpy as np # 默认提示词模板 DEFAULT_PROMPT_TEMPLATE = """ 从语意角度,判断"{phrase_a}"和"{phrase_b}"这两个短语的相似度,从0-1打分,输出格式如下: ```json {{ "说明": "简明扼要说明理由", "相似度": 0.0, }} ``` """.strip() def _get_default_cache_dir() -> str: """获取默认缓存目录(从配置中读取)""" return get_cache_dir("semantic_similarity") def _generate_cache_key( phrase_a: str, phrase_b: str, model_name: str, temperature: float, max_tokens: int, prompt_template: str, instructions: str = None, tools: str = "[]" ) -> str: """ 生成缓存键(哈希值) Args: phrase_a: 第一个短语 phrase_b: 第二个短语 model_name: 模型名称 temperature: 温度参数 max_tokens: 最大token数 prompt_template: 提示词模板 instructions: Agent 系统指令 tools: 工具列表的 JSON 字符串 Returns: 32位MD5哈希值 """ # 创建包含所有参数的字符串 cache_string = f"{phrase_a}||{phrase_b}||{model_name}||{temperature}||{max_tokens}||{prompt_template}||{instructions}||{tools}" # 生成MD5哈希 return hashlib.md5(cache_string.encode('utf-8')).hexdigest() def _sanitize_for_filename(text: str, max_length: int = 30) -> str: """ 将文本转换为安全的文件名部分 Args: text: 原始文本 max_length: 最大长度 Returns: 安全的文件名字符串 """ import re # 移除特殊字符,只保留中文、英文、数字、下划线 sanitized = re.sub(r'[^\w\u4e00-\u9fff]', '_', text) # 移除连续的下划线 sanitized = re.sub(r'_+', '_', sanitized) # 截断到最大长度 if len(sanitized) > max_length: sanitized = sanitized[:max_length] return sanitized.strip('_') def _get_cache_filepath( cache_key: str, phrase_a: str, phrase_b: str, model_name: str, temperature: float, cache_dir: Optional[str] = None ) -> Path: """ 获取缓存文件路径(可读文件名) Args: cache_key: 缓存键(哈希值) phrase_a: 第一个短语 phrase_b: 第二个短语 model_name: 模型名称 temperature: 温度参数 cache_dir: 缓存目录 Returns: 缓存文件的完整路径 文件名格式: {phrase_a}_vs_{phrase_b}_{model}_t{temp}_{hash[:8]}.json 示例: 宿命感_vs_余华的小说_gpt-4.1-mini_t0.0_a7f3e2d9.json """ if cache_dir is None: cache_dir = _get_default_cache_dir() # 清理短语和模型名 clean_a = _sanitize_for_filename(phrase_a, max_length=20) clean_b = _sanitize_for_filename(phrase_b, max_length=20) # 简化模型名(提取关键部分) model_short = model_name.split('/')[-1] # 例如: openai/gpt-4.1-mini -> gpt-4.1-mini model_short = _sanitize_for_filename(model_short, max_length=20) # 格式化温度参数 temp_str = f"t{temperature:.1f}" # 使用哈希的前8位 hash_short = cache_key[:8] # 组合文件名 filename = f"{clean_a}_vs_{clean_b}_{model_short}_{temp_str}_{hash_short}.json" return Path(cache_dir) / filename def _load_from_cache( cache_key: str, phrase_a: str, phrase_b: str, model_name: str, temperature: float, cache_dir: Optional[str] = None ) -> Optional[str]: """ 从缓存加载数据 Args: cache_key: 缓存键 phrase_a: 第一个短语 phrase_b: 第二个短语 model_name: 模型名称 temperature: 温度参数 cache_dir: 缓存目录 Returns: 缓存的结果字符串,如果不存在则返回 None """ if cache_dir is None: cache_dir = _get_default_cache_dir() cache_file = _get_cache_filepath(cache_key, phrase_a, phrase_b, model_name, temperature, cache_dir) # 如果文件不存在,尝试通过哈希匹配查找 if not cache_file.exists(): # 查找所有以该哈希结尾的文件 cache_path = Path(cache_dir) if cache_path.exists(): hash_short = cache_key[:8] matching_files = list(cache_path.glob(f"*_{hash_short}.json")) if matching_files: cache_file = matching_files[0] else: return None else: return None try: with open(cache_file, 'r', encoding='utf-8') as f: cached_data = json.load(f) return cached_data['output']['raw'] except (json.JSONDecodeError, IOError, KeyError): return None def _save_to_cache( cache_key: str, phrase_a: str, phrase_b: str, model_name: str, temperature: float, max_tokens: int, prompt_template: str, instructions: str, tools: str, result: str, cache_dir: Optional[str] = None ) -> None: """ 保存数据到缓存 Args: cache_key: 缓存键 phrase_a: 第一个短语 phrase_b: 第二个短语 model_name: 模型名称 temperature: 温度参数 max_tokens: 最大token数 prompt_template: 提示词模板 instructions: Agent 系统指令 tools: 工具列表的 JSON 字符串 result: 结果数据(原始字符串) cache_dir: 缓存目录 """ if cache_dir is None: cache_dir = _get_default_cache_dir() cache_file = _get_cache_filepath(cache_key, phrase_a, phrase_b, model_name, temperature, cache_dir) # 确保缓存目录存在 cache_file.parent.mkdir(parents=True, exist_ok=True) # 尝试解析 result 为 JSON parsed_result = parse_json_from_text(result) # 准备缓存数据(包含完整的输入输出信息) cache_data = { "input": { "phrase_a": phrase_a, "phrase_b": phrase_b, "model_name": model_name, "temperature": temperature, "max_tokens": max_tokens, "prompt_template": prompt_template, "instructions": instructions, "tools": tools }, "output": { "raw": result, # 保留原始响应 "parsed": parsed_result # 解析后的JSON对象 }, "metadata": { "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "cache_key": cache_key, "cache_file": str(cache_file.name) } } try: with open(cache_file, 'w', encoding='utf-8') as f: json.dump(cache_data, f, ensure_ascii=False, indent=2) except IOError: pass # 静默失败,不影响主流程 async def _difference_between_phrases( phrase_a: str, phrase_b: str, model_name: str = 'openai/gpt-4.1-mini', temperature: float = 0.0, max_tokens: int = 65536, prompt_template: str = None, instructions: str = None, tools: list = None, name: str = "Semantic Similarity Analyzer", use_cache: bool = True, cache_dir: Optional[str] = None ) -> str: """ 从语义角度判断两个短语的相似度 Args: phrase_a: 第一个短语 phrase_b: 第二个短语 model_name: 使用的模型名称,可选值: - 'google/gemini-2.5-pro' - 'anthropic/claude-sonnet-4.5' - 'google/gemini-2.0-flash-001' - 'openai/gpt-5-mini' - 'anthropic/claude-haiku-4.5' - 'openai/gpt-4.1-mini' (默认) temperature: 模型温度参数,控制输出随机性,默认 0.0(确定性输出) max_tokens: 最大生成token数,默认 65536 prompt_template: 自定义提示词模板,使用 {phrase_a} 和 {phrase_b} 作为占位符 如果为 None,使用默认模板 instructions: Agent 的系统指令,默认为 None tools: Agent 可用的工具列表,默认为 [] name: Agent 的名称,默认为 "Semantic Similarity Analyzer"(不参与缓存key构建) use_cache: 是否使用缓存,默认 True cache_dir: 缓存目录,默认从配置读取(可通过 lib.config 设置) Returns: JSON 格式的相似度分析结果字符串 Examples: >>> # 使用默认模板和缓存 >>> result = await difference_between_phrases("宿命感", "余华的小说") >>> print(result) { "说明": "简明扼要说明理由", "相似度": 0.0 } >>> # 禁用缓存 >>> result = await difference_between_phrases( ... "宿命感", "余华的小说", ... use_cache=False ... ) >>> # 使用自定义模板 >>> custom_template = ''' ... 请分析【{phrase_a}】和【{phrase_b}】的语义关联度 ... 输出格式:{{"score": 0.0, "reason": "..."}} ... ''' >>> result = await difference_between_phrases( ... "宿命感", "余华的小说", ... prompt_template=custom_template ... ) """ # 使用自定义模板或默认模板 if prompt_template is None: prompt_template = DEFAULT_PROMPT_TEMPLATE # 默认tools为空列表 if tools is None: tools = [] # 生成缓存键(tools转为JSON字符串以便哈希) tools_str = json.dumps(tools, sort_keys=True) if tools else "[]" cache_key = _generate_cache_key( phrase_a, phrase_b, model_name, temperature, max_tokens, prompt_template, instructions, tools_str ) # 尝试从缓存加载 if use_cache: cached_result = _load_from_cache(cache_key, phrase_a, phrase_b, model_name, temperature, cache_dir) if cached_result is not None: return cached_result # 缓存未命中,调用 API agent = Agent( name=name, model=get_model(model_name), model_settings=ModelSettings( temperature=temperature, max_tokens=max_tokens, ), instructions=instructions, tools=tools, ) # 格式化提示词 prompt = prompt_template.format(phrase_a=phrase_a, phrase_b=phrase_b) result = await Runner.run(agent, input=prompt) final_output = result.final_output # 注意:不在这里缓存,而是在解析成功后缓存 # 这样可以避免缓存解析失败的响应 return final_output async def _difference_between_phrases_parsed( phrase_a: str, phrase_b: str, model_name: str = 'openai/gpt-4.1-mini', temperature: float = 0.0, max_tokens: int = 65536, prompt_template: str = None, instructions: str = None, tools: list = None, name: str = "Semantic Similarity Analyzer", use_cache: bool = True, cache_dir: Optional[str] = None ) -> Dict[str, Any]: """ 从语义角度判断两个短语的相似度,并解析返回结果为字典 Args: phrase_a: 第一个短语 phrase_b: 第二个短语 model_name: 使用的模型名称 temperature: 模型温度参数,控制输出随机性,默认 0.0(确定性输出) max_tokens: 最大生成token数,默认 65536 prompt_template: 自定义提示词模板,使用 {phrase_a} 和 {phrase_b} 作为占位符 instructions: Agent 的系统指令,默认为 None tools: Agent 可用的工具列表,默认为 [] name: Agent 的名称,默认为 "Semantic Similarity Analyzer" use_cache: 是否使用缓存,默认 True cache_dir: 缓存目录,默认从配置读取(可通过 lib.config 设置) Returns: 解析后的字典,包含: - 说明: 相似度判断的理由 - 相似度: 0-1之间的浮点数 Raises: ValueError: 当无法解析AI响应为有效JSON时抛出 Examples: >>> result = await difference_between_phrases_parsed("宿命感", "余华的小说") >>> print(result['相似度']) 0.3 >>> print(result['说明']) "两个概念有一定关联..." """ # 使用默认模板或自定义模板 if prompt_template is None: prompt_template = DEFAULT_PROMPT_TEMPLATE # 默认tools为空列表 if tools is None: tools = [] # 生成缓存键 tools_str = json.dumps(tools, sort_keys=True) if tools else "[]" cache_key = _generate_cache_key( phrase_a, phrase_b, model_name, temperature, max_tokens, prompt_template, instructions, tools_str ) # 尝试从缓存加载 if use_cache: cached_result = _load_from_cache(cache_key, phrase_a, phrase_b, model_name, temperature, cache_dir) if cached_result is not None: # 缓存命中,直接解析并返回 parsed_result = parse_json_from_text(cached_result) if parsed_result: return parsed_result # 如果缓存的内容也无法解析,继续执行API调用(可能之前缓存了错误响应) # 重试机制:最多重试3次 max_retries = 3 last_error = None for attempt in range(max_retries): try: # 调用AI获取原始响应(不传use_cache,因为我们在这里手动处理缓存) raw_result = await _difference_between_phrases( phrase_a, phrase_b, model_name, temperature, max_tokens, prompt_template, instructions, tools, name, use_cache=False, cache_dir=cache_dir ) # 使用 utils.parse_json_from_text 解析结果 parsed_result = parse_json_from_text(raw_result) # 如果解析成功,缓存并返回 if parsed_result: # 只有解析成功后才缓存 if use_cache: _save_to_cache( cache_key, phrase_a, phrase_b, model_name, temperature, max_tokens, prompt_template, instructions, tools_str, raw_result, cache_dir ) return parsed_result # 解析失败,记录错误信息,准备重试 formatted_prompt = prompt_template.format(phrase_a=phrase_a, phrase_b=phrase_b) error_msg = f""" JSON解析失败 (尝试 {attempt + 1}/{max_retries}) ================================================================================ 短语A: {phrase_a} 短语B: {phrase_b} 模型: {model_name} 温度: {temperature} ================================================================================ Prompt: {formatted_prompt} ================================================================================ AI响应 (长度: {len(raw_result)}): {raw_result} ================================================================================ """ last_error = error_msg print(error_msg) if attempt < max_retries - 1: print(f"⚠️ 将在 1 秒后重试... (剩余重试次数: {max_retries - attempt - 1})") import asyncio await asyncio.sleep(1) except Exception as e: # 捕获其他异常(如网络错误) error_msg = f"API调用失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}" last_error = error_msg print(error_msg) if attempt < max_retries - 1: print(f"⚠️ 将在 1 秒后重试... (剩余重试次数: {max_retries - attempt - 1})") import asyncio await asyncio.sleep(1) # 所有重试都失败了,抛出异常 final_error = f""" 所有重试均失败!已尝试 {max_retries} 次 ================================================================================ 最后一次错误: {last_error} ================================================================================ """ raise ValueError(final_error) # ========== V1 版本(默认版本) ========== # 对外接口 - V1 async def compare_phrases( phrase_a: str, phrase_b: str, model_name: str = 'openai/gpt-4.1-mini', temperature: float = 0.0, max_tokens: int = 65536, prompt_template: str = None, instructions: str = None, tools: list = None, name: str = "Semantic Similarity Analyzer", use_cache: bool = True, cache_dir: Optional[str] = None ) -> Dict[str, Any]: """ 比较两个短语的语义相似度(对外唯一接口) Args: phrase_a: 第一个短语 phrase_b: 第二个短语 model_name: 使用的模型名称 temperature: 模型温度参数,控制输出随机性,默认 0.0(确定性输出) max_tokens: 最大生成token数,默认 65536 prompt_template: 自定义提示词模板,使用 {phrase_a} 和 {phrase_b} 作为占位符 instructions: Agent 的系统指令,默认为 None tools: Agent 可用的工具列表,默认为 [] name: Agent 的名称,默认为 "Semantic Similarity Analyzer" use_cache: 是否使用缓存,默认 True cache_dir: 缓存目录,默认从配置读取(可通过 lib.config 设置) Returns: 解析后的字典 """ return await _difference_between_phrases_parsed( phrase_a, phrase_b, model_name, temperature, max_tokens, prompt_template, instructions, tools, name, use_cache, cache_dir ) async def compare_phrases_cartesian( phrases_a: List[str], phrases_b: List[str], max_concurrent: int = 50 ) -> List[List[Dict[str, Any]]]: """ 笛卡尔积批量计算:M×N并发LLM调用(带并发控制) 用于架构统一性,内部通过并发实现(LLM无法真正批处理) Args: phrases_a: 第一组短语列表(M个) phrases_b: 第二组短语列表(N个) max_concurrent: 最大并发数,默认50 Returns: 嵌套列表 List[List[Dict]],每个Dict包含完整的比较结果 results[i][j] = { "相似度": float, "说明": str } Examples: >>> results = await compare_phrases_cartesian( ... ["深度学习"], ... ["神经网络", "Python"] ... ) >>> print(results[0][0]['相似度']) # 深度学习 vs 神经网络 >>> print(results[0][1]['说明']) # 深度学习 vs Python """ # 参数验证 if not phrases_a or not phrases_b: return [[]] M, N = len(phrases_a), len(phrases_b) # 创建信号量控制并发 semaphore = asyncio.Semaphore(max_concurrent) async def limited_compare(phrase_a: str, phrase_b: str): async with semaphore: return await compare_phrases(phrase_a, phrase_b) # 创建M×N个受控的并发任务 tasks = [] for phrase_a in phrases_a: for phrase_b in phrases_b: tasks.append(limited_compare(phrase_a, phrase_b)) # 并发执行所有任务 results = await asyncio.gather(*tasks) # 返回嵌套列表结构 nested_results = [] for i in range(M): row_results = results[i * N : (i + 1) * N] nested_results.append(row_results) return nested_results if __name__ == "__main__": import asyncio async def main(): """示例使用""" # 示例 1: 基本使用(使用缓存) print("示例 1: 基本使用") result = await compare_phrases("宿命感", "余华的小说") print(f"相似度: {result.get('相似度')}") print(f"说明: {result.get('说明')}") print() # 示例 2: 再次调用相同参数(应该从缓存读取) print("示例 2: 测试缓存") result = await compare_phrases("宿命感", "余华的小说") print(f"相似度: {result.get('相似度')}") print() # 示例 3: 自定义温度 print("示例 3: 自定义温度(创意性输出)") result = await compare_phrases( "创意写作", "AI生成", temperature=0.7 ) print(f"相似度: {result.get('相似度')}") print(f"说明: {result.get('说明')}") print() # 示例 4: 自定义 Agent 名称 print("示例 4: 自定义 Agent 名称") result = await compare_phrases( "人工智能", "机器学习", name="AI语义分析专家" ) print(f"相似度: {result.get('相似度')}") print(f"说明: {result.get('说明')}") print() # 示例 5: 使用不同的模型 print("示例 5: 使用 Claude 模型") result = await compare_phrases( "深度学习", "神经网络", model_name='anthropic/claude-haiku-4.5' ) print(f"相似度: {result.get('相似度')}") print(f"说明: {result.get('说明')}") asyncio.run(main()) # ========== V2 版本(示例:详细分析版本) ========== # V2 默认提示词模板(更详细的分析) DEFAULT_PROMPT_TEMPLATE_V2 = """ 请深入分析【{phrase_a}】和【{phrase_b}】的语义关系,包括: 1. 语义相似度(0-1) 2. 关系类型(如:包含、相关、对立、无关等) 3. 详细说明 输出格式: ```json {{ "相似度": 0.0, "关系类型": "相关/包含/对立/无关", "详细说明": "详细分析两者的语义关系...", "应用场景": "该关系在实际应用中的意义..." }} ``` """.strip() # 对外接口 - V2 async def compare_phrases_v2( phrase_a: str, phrase_b: str, model_name: str = 'anthropic/claude-sonnet-4.5', # V2 默认使用更强的模型 temperature: float = 0.0, max_tokens: int = 65536, prompt_template: str = None, instructions: str = None, tools: list = None, name: str = "Advanced Semantic Analyzer", use_cache: bool = True, cache_dir: Optional[str] = None ) -> Dict[str, Any]: """ 比较两个短语的语义相似度 - V2 版本(详细分析) V2 特点: - 默认使用更强的模型(Claude Sonnet 4.5) - 更详细的分析输出(包含关系类型和应用场景) - 适合需要深入分析的场景 Args: phrase_a: 第一个短语 phrase_b: 第二个短语 model_name: 使用的模型名称,默认 'anthropic/claude-sonnet-4.5' temperature: 模型温度参数,默认 0.0 max_tokens: 最大生成token数,默认 65536 prompt_template: 自定义提示词模板,默认使用 V2 详细模板 instructions: Agent 的系统指令,默认为 None tools: Agent 可用的工具列表,默认为 [] name: Agent 的名称,默认 "Advanced Semantic Analyzer" use_cache: 是否使用缓存,默认 True cache_dir: 缓存目录,默认从配置读取(可通过 lib.config 设置) Returns: 解析后的字典,包含: - 相似度: 0-1之间的浮点数 - 关系类型: 关系分类 - 详细说明: 详细分析 - 应用场景: 应用建议 Examples: >>> result = await compare_phrases_v2("深度学习", "神经网络") >>> print(result['相似度']) 0.9 >>> print(result['关系类型']) "包含" >>> print(result['详细说明']) "深度学习是基于人工神经网络的机器学习方法..." """ # 使用 V2 默认模板(如果未指定) if prompt_template is None: prompt_template = DEFAULT_PROMPT_TEMPLATE_V2 return await _difference_between_phrases_parsed( phrase_a, phrase_b, model_name, temperature, max_tokens, prompt_template, instructions, tools, name, use_cache, cache_dir )