Ver código fonte

feat: 为text_embedding添加缓存机制并创建模型对比分析脚本

主要更新:
1. lib/text_embedding.py:添加完整的文件缓存系统
   - 实现缓存键生成、文件路径管理、读写功能
   - 新增 use_cache 和 cache_dir 参数
   - 缓存目录:cache/text_embedding/
   - 缓存格式兼容 semantic_similarity.py

2. script/analysis/test_all_models.py:测试所有向量模型
   - 从现有缓存提取测试用例
   - 并发测试4个text2vec模型(chinese/multilingual/paraphrase/sentence)
   - 使用100并发,带进度跟踪
   - 生成完整的缓存数据

3. script/analysis/analyze_model_comparison.py:对比分析脚本
   - 同时读取text_embedding和semantic_similarity缓存
   - 对比向量模型vs LLM模型效果
   - 导出Excel报告(只包含相似度列,不含说明)
   - 修复短语A/B顺序问题(移除排序,保持原始顺序)

4. script/data_processing/match_inspiration_features.py:
   - 更新导入:使用text_embedding.compare_phrases
   - 添加模型预加载避免多线程冲突
   - 使用asyncio.to_thread实现异步调用

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
yangxiaohui 1 semana atrás
pai
commit
36d71fe5ae

+ 230 - 12
lib/text_embedding.py

@@ -4,7 +4,11 @@
 基于 similarities 库(真正的向量模型,不使用 LLM)
 """
 
-from typing import Dict, Any
+from typing import Dict, Any, Optional
+import hashlib
+import json
+from pathlib import Path
+from datetime import datetime
 
 # 支持的模型列表
 SUPPORTED_MODELS = {
@@ -17,6 +21,175 @@ SUPPORTED_MODELS = {
 # 延迟导入 similarities,避免初始化时就加载模型
 _similarity_models = {}  # 存储多个模型实例
 
+# 默认缓存目录
+DEFAULT_CACHE_DIR = "cache/text_embedding"
+
+
+def _generate_cache_key(phrase_a: str, phrase_b: str, model_name: str) -> str:
+    """
+    生成缓存键(哈希值)
+
+    Args:
+        phrase_a: 第一个短语
+        phrase_b: 第二个短语
+        model_name: 模型名称
+
+    Returns:
+        32位MD5哈希值
+    """
+    cache_string = f"{phrase_a}||{phrase_b}||{model_name}"
+    return hashlib.md5(cache_string.encode('utf-8')).hexdigest()
+
+
+def _sanitize_for_filename(text: str, max_length: int = 30) -> str:
+    """
+    将文本转换为安全的文件名部分
+
+    Args:
+        text: 原始文本
+        max_length: 最大长度
+
+    Returns:
+        安全的文件名字符串
+    """
+    import re
+    # 移除特殊字符,只保留中文、英文、数字、下划线
+    sanitized = re.sub(r'[^\w\u4e00-\u9fff]', '_', text)
+    # 移除连续的下划线
+    sanitized = re.sub(r'_+', '_', sanitized)
+    # 截断到最大长度
+    if len(sanitized) > max_length:
+        sanitized = sanitized[:max_length]
+    return sanitized.strip('_')
+
+
+def _get_cache_filepath(
+    cache_key: str,
+    phrase_a: str,
+    phrase_b: str,
+    model_name: str,
+    cache_dir: str = DEFAULT_CACHE_DIR
+) -> Path:
+    """
+    获取缓存文件路径(可读文件名)
+
+    Args:
+        cache_key: 缓存键(哈希值)
+        phrase_a: 第一个短语
+        phrase_b: 第二个短语
+        model_name: 模型名称
+        cache_dir: 缓存目录
+
+    Returns:
+        缓存文件的完整路径
+
+    文件名格式: {phrase_a}_vs_{phrase_b}_{model}_{hash[:8]}.json
+    """
+    # 清理短语和模型名
+    clean_a = _sanitize_for_filename(phrase_a, max_length=20)
+    clean_b = _sanitize_for_filename(phrase_b, max_length=20)
+
+    # 简化模型名(提取关键部分)
+    model_short = model_name.split('/')[-1]
+    model_short = _sanitize_for_filename(model_short, max_length=20)
+
+    # 使用哈希的前8位
+    hash_short = cache_key[:8]
+
+    # 组合文件名
+    filename = f"{clean_a}_vs_{clean_b}_{model_short}_{hash_short}.json"
+
+    return Path(cache_dir) / filename
+
+
+def _load_from_cache(
+    cache_key: str,
+    phrase_a: str,
+    phrase_b: str,
+    model_name: str,
+    cache_dir: str = DEFAULT_CACHE_DIR
+) -> Optional[Dict[str, Any]]:
+    """
+    从缓存加载数据
+
+    Args:
+        cache_key: 缓存键
+        phrase_a: 第一个短语
+        phrase_b: 第二个短语
+        model_name: 模型名称
+        cache_dir: 缓存目录
+
+    Returns:
+        缓存的结果字典,如果不存在则返回 None
+    """
+    cache_file = _get_cache_filepath(cache_key, phrase_a, phrase_b, model_name, cache_dir)
+
+    # 如果文件不存在,尝试通过哈希匹配查找
+    if not cache_file.exists():
+        cache_path = Path(cache_dir)
+        if cache_path.exists():
+            hash_short = cache_key[:8]
+            matching_files = list(cache_path.glob(f"*_{hash_short}.json"))
+            if matching_files:
+                cache_file = matching_files[0]
+            else:
+                return None
+        else:
+            return None
+
+    try:
+        with open(cache_file, 'r', encoding='utf-8') as f:
+            cached_data = json.load(f)
+            return cached_data['output']
+    except (json.JSONDecodeError, IOError, KeyError):
+        return None
+
+
+def _save_to_cache(
+    cache_key: str,
+    phrase_a: str,
+    phrase_b: str,
+    model_name: str,
+    result: Dict[str, Any],
+    cache_dir: str = DEFAULT_CACHE_DIR
+) -> None:
+    """
+    保存数据到缓存
+
+    Args:
+        cache_key: 缓存键
+        phrase_a: 第一个短语
+        phrase_b: 第二个短语
+        model_name: 模型名称
+        result: 结果数据(字典格式)
+        cache_dir: 缓存目录
+    """
+    cache_file = _get_cache_filepath(cache_key, phrase_a, phrase_b, model_name, cache_dir)
+
+    # 确保缓存目录存在
+    cache_file.parent.mkdir(parents=True, exist_ok=True)
+
+    # 准备缓存数据
+    cache_data = {
+        "input": {
+            "phrase_a": phrase_a,
+            "phrase_b": phrase_b,
+            "model_name": model_name,
+        },
+        "output": result,
+        "metadata": {
+            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
+            "cache_key": cache_key,
+            "cache_file": str(cache_file.name)
+        }
+    }
+
+    try:
+        with open(cache_file, 'w', encoding='utf-8') as f:
+            json.dump(cache_data, f, ensure_ascii=False, indent=2)
+    except IOError:
+        pass  # 静默失败,不影响主流程
+
 
 def _get_similarity_model(model_name: str = "shibing624/text2vec-base-chinese"):
     """
@@ -54,7 +227,9 @@ def _get_similarity_model(model_name: str = "shibing624/text2vec-base-chinese"):
 def compare_phrases(
     phrase_a: str,
     phrase_b: str,
-    model_name: str = "chinese"
+    model_name: str = "chinese",
+    use_cache: bool = True,
+    cache_dir: str = DEFAULT_CACHE_DIR
 ) -> Dict[str, Any]:
     """
     比较两个短语的语义相似度(兼容 semantic_similarity.py 的接口)
@@ -80,6 +255,8 @@ def compare_phrases(
             - "shibing624/text2vec-base-multilingual"
             - "shibing624/text2vec-base-chinese-paraphrase"
             - "shibing624/text2vec-base-chinese-sentence"
+        use_cache: 是否使用缓存,默认 True
+        cache_dir: 缓存目录,默认 'cache/text_embedding'
 
     Returns:
         {
@@ -97,7 +274,23 @@ def compare_phrases(
 
         >>> # 使用长文本模型
         >>> result = compare_phrases("长文本1...", "长文本2...", model_name="paraphrase")
+
+        >>> # 禁用缓存
+        >>> result = compare_phrases("测试", "测试", use_cache=False)
     """
+    # 转换简称为完整名称(用于缓存键)
+    full_model_name = SUPPORTED_MODELS.get(model_name, model_name)
+
+    # 生成缓存键
+    cache_key = _generate_cache_key(phrase_a, phrase_b, full_model_name)
+
+    # 尝试从缓存加载
+    if use_cache:
+        cached_result = _load_from_cache(cache_key, phrase_a, phrase_b, full_model_name, cache_dir)
+        if cached_result is not None:
+            return cached_result
+
+    # 缓存未命中,计算相似度
     model = _get_similarity_model(model_name)
     score = float(model.similarity(phrase_a, phrase_b))
 
@@ -115,49 +308,74 @@ def compare_phrases(
 
     explanation = f"基于向量模型计算的语义相似度为 {level} ({score:.2f})"
 
-    return {
+    result = {
         "说明": explanation,
         "相似度": score
     }
 
+    # 保存到缓存
+    if use_cache:
+        _save_to_cache(cache_key, phrase_a, phrase_b, full_model_name, result, cache_dir)
+
+    return result
+
 
 if __name__ == "__main__":
     print("=" * 60)
-    print("text_embedding - 文本相似度计算")
+    print("text_embedding - 文本相似度计算(带缓存)")
     print("=" * 60)
     print()
 
-    # 示例 1: 默认模型
-    print("示例 1: 默认模型(chinese)")
+    # 示例 1: 默认模型(首次调用,会保存缓存)
+    print("示例 1: 默认模型(chinese)- 首次调用")
     result = compare_phrases("如何更换花呗绑定银行卡", "花呗更改绑定银行卡")
     print(f"相似度: {result['相似度']:.3f}")
     print(f"说明: {result['说明']}")
     print()
 
-    # 示例 2: 短句子
-    print("示例 2: 使用默认模型")
+    # 示例 2: 再次调用相同参数(从缓存读取)
+    print("示例 2: 测试缓存 - 再次调用相同参数")
+    result = compare_phrases("如何更换花呗绑定银行卡", "花呗更改绑定银行卡")
+    print(f"相似度: {result['相似度']:.3f}")
+    print(f"说明: {result['说明']}")
+    print("(应该从缓存读取,速度更快)")
+    print()
+
+    # 示例 3: 短句子
+    print("示例 3: 使用默认模型")
     result = compare_phrases("深度学习", "神经网络")
     print(f"相似度: {result['相似度']:.3f}")
     print(f"说明: {result['说明']}")
     print()
 
-    # 示例 3: 不相关
-    print("示例 3: 不相关的短语")
+    # 示例 4: 不相关
+    print("示例 4: 不相关的短语")
     result = compare_phrases("编程", "吃饭")
     print(f"相似度: {result['相似度']:.3f}")
     print(f"说明: {result['说明']}")
     print()
 
-    # 示例 4: 多语言模型
-    print("示例 4: 多语言模型(multilingual)")
+    # 示例 5: 多语言模型
+    print("示例 5: 多语言模型(multilingual)")
     result = compare_phrases("Hello", "Hi", model_name="multilingual")
     print(f"相似度: {result['相似度']:.3f}")
     print(f"说明: {result['说明']}")
     print()
 
+    # 示例 6: 禁用缓存
+    print("示例 6: 禁用缓存")
+    result = compare_phrases("测试", "测试", use_cache=False)
+    print(f"相似度: {result['相似度']:.3f}")
+    print(f"说明: {result['说明']}")
+    print()
+
     print("=" * 60)
     print("支持的模型:")
     print("-" * 60)
     for key, value in SUPPORTED_MODELS.items():
         print(f"  {key:15s} -> {value}")
     print("=" * 60)
+    print()
+    print("缓存目录: cache/text_embedding/")
+    print("缓存文件格式: {phrase_a}_vs_{phrase_b}_{model}_{hash[:8]}.json")
+    print("=" * 60)

+ 293 - 0
script/analysis/analyze_model_comparison.py

@@ -0,0 +1,293 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+分析模型对比结果并导出到Excel
+
+同时分析两种实现:
+1. cache/text_embedding - 向量模型实现(text2vec)
+2. cache/semantic_similarity - LLM实现(GPT/Claude等)
+
+生成Excel报告,对比不同实现的效果差异。
+"""
+
+import json
+import sys
+from pathlib import Path
+from typing import Dict, List, Tuple
+import pandas as pd
+from datetime import datetime
+
+# 添加项目根目录到路径
+project_root = Path(__file__).parent.parent.parent
+sys.path.insert(0, str(project_root))
+
+
+def extract_results_from_cache(
+    cache_dir: str,
+    model_type: str
+) -> Dict[Tuple[str, str], Dict]:
+    """
+    从缓存目录提取结果
+
+    Args:
+        cache_dir: 缓存目录路径
+        model_type: 模型类型("text_embedding" 或 "semantic_similarity")
+
+    Returns:
+        结果字典,键为 (phrase_a, phrase_b) 元组,值为结果数据
+    """
+    cache_path = Path(cache_dir)
+
+    if not cache_path.exists():
+        print(f"缓存目录不存在: {cache_dir}")
+        return {}
+
+    results = {}
+    cache_files = list(cache_path.glob("*.json"))
+
+    print(f"扫描 {model_type} 缓存: {len(cache_files)} 个文件")
+
+    for cache_file in cache_files:
+        try:
+            with open(cache_file, 'r', encoding='utf-8') as f:
+                data = json.load(f)
+
+            # 提取输入和输出
+            input_data = data.get("input", {})
+            output_data = data.get("output", {})
+
+            phrase_a = input_data.get("phrase_a")
+            phrase_b = input_data.get("phrase_b")
+
+            if phrase_a and phrase_b:
+                # 统一处理两种缓存格式
+                if model_type == "text_embedding":
+                    # text_embedding 的输出直接是 {"说明": "...", "相似度": 0.xx}
+                    result = {
+                        "相似度": output_data.get("相似度"),
+                        "说明": output_data.get("说明", ""),
+                        "模型": input_data.get("model_name", "unknown")
+                    }
+                elif model_type == "semantic_similarity":
+                    # semantic_similarity 的输出在 output.parsed 中
+                    parsed = output_data.get("parsed", output_data)
+                    result = {
+                        "相似度": parsed.get("相似度"),
+                        "说明": parsed.get("说明", ""),
+                        "模型": input_data.get("model_name", "LLM")
+                    }
+                else:
+                    continue
+
+                # 使用原始顺序的元组作为键(保持 phrase_a 和 phrase_b 的原始顺序)
+                pair_key = (phrase_a, phrase_b)
+
+                # 如果同一对短语有多个缓存(不同模型),保存为列表
+                if pair_key not in results:
+                    results[pair_key] = []
+
+                results[pair_key].append({
+                    "phrase_a": phrase_a,
+                    "phrase_b": phrase_b,
+                    "相似度": result["相似度"],
+                    "说明": result["说明"],
+                    "模型": result["模型"]
+                })
+
+        except (json.JSONDecodeError, IOError, KeyError) as e:
+            print(f"  读取缓存文件失败: {cache_file.name} - {e}")
+            continue
+
+    return results
+
+
+def merge_all_results(
+    text_embedding_results: Dict[Tuple[str, str], List[Dict]],
+    semantic_similarity_results: Dict[Tuple[str, str], List[Dict]]
+) -> List[Dict]:
+    """
+    合并所有结果
+
+    Args:
+        text_embedding_results: 向量模型结果
+        semantic_similarity_results: LLM模型结果
+
+    Returns:
+        合并后的结果列表
+    """
+    # 获取所有唯一的短语对
+    all_pairs = set(text_embedding_results.keys()) | set(semantic_similarity_results.keys())
+
+    merged = []
+
+    for pair_key in all_pairs:
+        phrase_a, phrase_b = pair_key
+
+        row = {
+            "短语A": phrase_a,
+            "短语B": phrase_b,
+        }
+
+        # 添加向量模型结果
+        if pair_key in text_embedding_results:
+            for result in text_embedding_results[pair_key]:
+                model_name = result["模型"].split('/')[-1]  # 提取模型简称
+                row[f"向量_{model_name}_相似度"] = result["相似度"]
+                row[f"向量_{model_name}_说明"] = result["说明"]
+
+        # 添加LLM模型结果
+        if pair_key in semantic_similarity_results:
+            for result in semantic_similarity_results[pair_key]:
+                model_name = result["模型"].split('/')[-1]  # 提取模型简称
+                row[f"LLM_{model_name}_相似度"] = result["相似度"]
+                row[f"LLM_{model_name}_说明"] = result["说明"]
+
+        merged.append(row)
+
+    return merged
+
+
+def create_comparison_dataframe(merged_results: List[Dict]) -> pd.DataFrame:
+    """
+    创建模型对比数据框
+
+    Args:
+        merged_results: 合并后的结果列表
+
+    Returns:
+        包含所有模型对比的DataFrame
+    """
+    # 直接从合并结果创建DataFrame
+    df = pd.DataFrame(merged_results)
+
+    # 添加序号
+    df.insert(0, "序号", range(1, len(df) + 1))
+
+    # 只保留相似度列,移除说明列
+    columns_to_keep = ["序号", "短语A", "短语B"]
+    similarity_cols = [col for col in df.columns if col.endswith("_相似度")]
+    columns_to_keep.extend(similarity_cols)
+
+    # 过滤出要保留的列
+    df = df[[col for col in columns_to_keep if col in df.columns]]
+
+    # 计算相似度差异
+    for idx, row in df.iterrows():
+        similarities = []
+        for col in similarity_cols:
+            if col in df.columns:
+                sim = row[col]
+                if sim is not None and isinstance(sim, (int, float)):
+                    similarities.append(sim)
+
+        if len(similarities) > 1:
+            df.at[idx, "相似度_差异"] = max(similarities) - min(similarities)
+        elif len(similarities) == 1:
+            df.at[idx, "相似度_差异"] = 0
+        else:
+            df.at[idx, "相似度_差异"] = None
+
+    # 移动相似度差异列到最后
+    if "相似度_差异" in df.columns:
+        cols = [col for col in df.columns if col != "相似度_差异"]
+        cols.append("相似度_差异")
+        df = df[cols]
+
+    return df
+
+
+def export_to_excel(
+    comparison_df: pd.DataFrame,
+    output_file: str
+) -> None:
+    """
+    导出到Excel文件
+
+    Args:
+        comparison_df: 对比数据框
+        output_file: 输出Excel文件路径
+    """
+    output_path = Path(output_file)
+    output_path.parent.mkdir(parents=True, exist_ok=True)
+
+    # 创建Excel写入器
+    with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
+        # 写入完整对比数据
+        comparison_df.to_excel(writer, sheet_name='模型对比', index=False)
+
+        # 格式化工作表
+        workbook = writer.book
+        ws = writer.sheets['模型对比']
+
+        # 自动调整列宽
+        for col in ws.columns:
+            max_length = 0
+            column = col[0].column_letter
+            for cell in col:
+                try:
+                    if len(str(cell.value)) > max_length:
+                        max_length = len(str(cell.value))
+                except:
+                    pass
+            adjusted_width = min(max_length + 2, 50)
+            ws.column_dimensions[column].width = adjusted_width
+
+    print(f"Excel报告已导出到: {output_file}")
+
+
+def main():
+    """主函数"""
+    # 配置参数
+    text_embedding_cache = "cache/text_embedding"
+    semantic_similarity_cache = "cache/semantic_similarity"
+    output_file = f"data/model_comparison_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
+
+    print("=" * 60)
+    print("模型对比结果分析(向量模型 vs LLM模型)")
+    print("=" * 60)
+
+    # 步骤 1: 从缓存提取结果
+    print("\n步骤 1: 从缓存提取结果...")
+    print(f"  - 向量模型缓存: {text_embedding_cache}")
+    text_embedding_results = extract_results_from_cache(text_embedding_cache, "text_embedding")
+    print(f"    提取到 {len(text_embedding_results)} 个唯一短语对")
+
+    print(f"  - LLM模型缓存: {semantic_similarity_cache}")
+    semantic_similarity_results = extract_results_from_cache(semantic_similarity_cache, "semantic_similarity")
+    print(f"    提取到 {len(semantic_similarity_results)} 个唯一短语对")
+
+    if not text_embedding_results and not semantic_similarity_results:
+        print("\n错误: 未找到任何缓存数据")
+        print("请先运行以下脚本生成缓存:")
+        print("  - match_inspiration_features.py (生成 text_embedding 缓存)")
+        print("  - test_all_models.py (生成多模型缓存)")
+        return
+
+    # 步骤 2: 合并结果
+    print("\n步骤 2: 合并所有模型结果...")
+    merged_results = merge_all_results(text_embedding_results, semantic_similarity_results)
+    print(f"合并后共 {len(merged_results)} 个测试用例")
+
+    # 步骤 3: 创建对比数据框
+    print("\n步骤 3: 创建对比数据框...")
+    comparison_df = create_comparison_dataframe(merged_results)
+    print(f"对比数据框创建完成,共 {len(comparison_df)} 行")
+
+    # 显示列信息
+    similarity_cols = [col for col in comparison_df.columns if col.endswith("_相似度")]
+    print(f"包含 {len(similarity_cols)} 个模型的相似度数据:")
+    for col in similarity_cols:
+        print(f"  - {col}")
+
+    # 步骤 4: 导出到Excel
+    print("\n步骤 4: 导出到Excel...")
+    export_to_excel(comparison_df, output_file)
+
+    print("\n" + "=" * 60)
+    print("分析完成!")
+    print(f"报告文件: {output_file}")
+    print("=" * 60)
+
+
+if __name__ == "__main__":
+    main()

+ 345 - 0
script/analysis/test_all_models.py

@@ -0,0 +1,345 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+测试所有文本相似度模型
+
+从现有缓存(cache/text_embedding)中提取所有测试用例,
+使用所有支持的模型并发计算相似度,生成完整的缓存数据。
+"""
+
+import json
+import sys
+from pathlib import Path
+from typing import List, Dict, Tuple
+import time
+import asyncio
+from datetime import datetime
+
+# 添加项目根目录到路径
+project_root = Path(__file__).parent.parent.parent
+sys.path.insert(0, str(project_root))
+
+from lib.text_embedding import compare_phrases, SUPPORTED_MODELS
+
+# 全局并发限制
+MAX_CONCURRENT_REQUESTS = 100
+semaphore = None
+
+# 进度跟踪
+class ProgressTracker:
+    """进度跟踪器"""
+    def __init__(self, total: int, description: str = ""):
+        self.total = total
+        self.completed = 0
+        self.start_time = datetime.now()
+        self.last_update_time = datetime.now()
+        self.description = description
+
+    def update(self, count: int = 1):
+        """更新进度"""
+        self.completed += count
+        current_time = datetime.now()
+
+        # 每0.5秒最多更新一次,或者达到总数时更新
+        if (current_time - self.last_update_time).total_seconds() >= 0.5 or self.completed >= self.total:
+            self.display()
+            self.last_update_time = current_time
+
+    def display(self):
+        """显示进度"""
+        if self.total == 0:
+            return
+
+        percentage = (self.completed / self.total) * 100
+        elapsed = (datetime.now() - self.start_time).total_seconds()
+
+        # 计算速度和预估剩余时间
+        if elapsed > 0:
+            speed = self.completed / elapsed
+            if speed > 0:
+                remaining = (self.total - self.completed) / speed
+                eta_str = f", 预计剩余: {int(remaining)}秒"
+            else:
+                eta_str = ""
+        else:
+            eta_str = ""
+
+        bar_length = 40
+        filled_length = int(bar_length * self.completed / self.total)
+        bar = '█' * filled_length + '░' * (bar_length - filled_length)
+
+        desc = f"{self.description}: " if self.description else ""
+        print(f"\r  {desc}[{bar}] {self.completed}/{self.total} ({percentage:.1f}%){eta_str}", end='', flush=True)
+
+        # 完成时换行
+        if self.completed >= self.total:
+            print()
+
+# 全局进度跟踪器
+progress_tracker = None
+
+
+def get_semaphore():
+    """获取全局信号量"""
+    global semaphore
+    if semaphore is None:
+        semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
+    return semaphore
+
+
+def extract_test_cases_from_cache(
+    cache_dir: str = "cache/text_embedding"
+) -> List[Tuple[str, str]]:
+    """
+    从现有缓存文件中提取所有测试用例
+
+    Args:
+        cache_dir: 缓存目录
+
+    Returns:
+        测试用例列表,每项为 (phrase_a, phrase_b) 元组
+    """
+    cache_path = Path(cache_dir)
+
+    if not cache_path.exists():
+        print(f"缓存目录不存在: {cache_dir}")
+        return []
+
+    test_cases = []
+    seen_pairs = set()  # 用于去重
+
+    # 遍历所有缓存文件
+    cache_files = list(cache_path.glob("*.json"))
+    print(f"扫描缓存文件: {len(cache_files)} 个")
+
+    for cache_file in cache_files:
+        try:
+            with open(cache_file, 'r', encoding='utf-8') as f:
+                data = json.load(f)
+
+            # 提取短语对
+            phrase_a = data.get("input", {}).get("phrase_a")
+            phrase_b = data.get("input", {}).get("phrase_b")
+
+            if phrase_a and phrase_b:
+                # 使用排序后的元组作为键,避免 (A, B) 和 (B, A) 重复
+                pair_key = tuple(sorted([phrase_a, phrase_b]))
+
+                if pair_key not in seen_pairs:
+                    test_cases.append((phrase_a, phrase_b))
+                    seen_pairs.add(pair_key)
+
+        except (json.JSONDecodeError, IOError) as e:
+            print(f"  读取缓存文件失败: {cache_file.name} - {e}")
+            continue
+
+    return test_cases
+
+
+async def test_single_case(
+    phrase_a: str,
+    phrase_b: str,
+    model_key: str,
+    use_cache: bool = True
+) -> Dict:
+    """
+    测试单个用例(带并发限制)
+
+    Args:
+        phrase_a: 第一个短语
+        phrase_b: 第二个短语
+        model_key: 模型键名
+        use_cache: 是否使用缓存
+
+    Returns:
+        测试结果字典
+    """
+    global progress_tracker
+    sem = get_semaphore()
+
+    async with sem:
+        try:
+            # 使用 asyncio.to_thread 将同步函数转为异步执行
+            result = await asyncio.to_thread(
+                compare_phrases,
+                phrase_a=phrase_a,
+                phrase_b=phrase_b,
+                model_name=model_key,
+                use_cache=use_cache
+            )
+
+            # 更新进度
+            if progress_tracker:
+                progress_tracker.update(1)
+
+            return {
+                "phrase_a": phrase_a,
+                "phrase_b": phrase_b,
+                "model": model_key,
+                "相似度": result["相似度"],
+                "说明": result["说明"],
+                "status": "success"
+            }
+
+        except Exception as e:
+            # 更新进度
+            if progress_tracker:
+                progress_tracker.update(1)
+
+            return {
+                "phrase_a": phrase_a,
+                "phrase_b": phrase_b,
+                "model": model_key,
+                "相似度": None,
+                "说明": f"计算失败: {str(e)}",
+                "status": "error"
+            }
+
+
+async def test_all_models(
+    test_cases: List[Tuple[str, str]],
+    models: Dict[str, str] = None,
+    use_cache: bool = True
+) -> Dict[str, List[Dict]]:
+    """
+    使用所有模型并发测试所有用例
+
+    Args:
+        test_cases: 测试用例列表
+        models: 模型字典,默认使用所有支持的模型
+        use_cache: 是否使用缓存
+
+    Returns:
+        测试结果字典,格式:
+        {
+            "model_name": [
+                {
+                    "phrase_a": "xxx",
+                    "phrase_b": "xxx",
+                    "相似度": 0.85,
+                    "说明": "xxx"
+                },
+                ...
+            ]
+        }
+    """
+    global progress_tracker
+
+    if models is None:
+        models = SUPPORTED_MODELS
+
+    total_tests = len(test_cases) * len(models)
+
+    print(f"\n开始测试 {len(models)} 个模型,共 {len(test_cases)} 个测试用例")
+    print(f"总测试数: {total_tests:,}\n")
+
+    # 预加载第一个模型(避免多线程加载冲突)
+    print("预加载模型...")
+    first_model = list(models.keys())[0]
+    await asyncio.to_thread(compare_phrases, "测试", "测试", model_name=first_model)
+    print("预加载完成!\n")
+
+    # 初始化进度跟踪器
+    progress_tracker = ProgressTracker(total_tests, "测试进度")
+
+    # 创建所有测试任务
+    tasks = []
+    for model_key in models.keys():
+        for phrase_a, phrase_b in test_cases:
+            tasks.append(
+                test_single_case(phrase_a, phrase_b, model_key, use_cache)
+            )
+
+    # 并发执行所有测试
+    start_time = time.time()
+    all_results = await asyncio.gather(*tasks)
+    elapsed = time.time() - start_time
+
+    print(f"\n所有测试完成! 总耗时: {elapsed:.2f}秒")
+    print(f"平均速度: {total_tests/elapsed:.2f} 条/秒\n")
+
+    # 按模型分组结果
+    results = {model_key: [] for model_key in models.keys()}
+    for result in all_results:
+        model_key = result["model"]
+        results[model_key].append({
+            "phrase_a": result["phrase_a"],
+            "phrase_b": result["phrase_b"],
+            "相似度": result["相似度"],
+            "说明": result["说明"],
+            "status": result["status"]
+        })
+
+    # 统计信息
+    print("统计信息:")
+    for model_key in models.keys():
+        model_results = results[model_key]
+        successful = sum(1 for r in model_results if r["status"] == "success")
+        failed = len(model_results) - successful
+        print(f"  {model_key}: {successful} 成功, {failed} 失败")
+
+    return results
+
+
+def save_results(
+    results: Dict[str, List[Dict]],
+    output_file: str = "data/model_comparison_results.json"
+) -> None:
+    """
+    保存测试结果到JSON文件
+
+    Args:
+        results: 测试结果
+        output_file: 输出文件路径
+    """
+    output_path = Path(output_file)
+    output_path.parent.mkdir(parents=True, exist_ok=True)
+
+    with open(output_path, 'w', encoding='utf-8') as f:
+        json.dump(results, f, ensure_ascii=False, indent=2)
+
+    print(f"测试结果已保存到: {output_file}")
+
+
+async def main():
+    """主函数"""
+    # 配置参数
+    cache_dir = "cache/text_embedding"
+    output_file = "data/model_comparison_results.json"
+
+    # 步骤 1: 从缓存提取测试用例
+    print("=" * 60)
+    print("步骤 1: 从缓存提取所有测试用例")
+    print("=" * 60)
+    test_cases = extract_test_cases_from_cache(cache_dir)
+
+    if not test_cases:
+        print("未找到测试用例,请先运行主流程生成缓存数据")
+        return
+
+    print(f"提取到 {len(test_cases):,} 个唯一测试用例")
+
+    # 显示前5个测试用例示例
+    print("\n前5个测试用例示例:")
+    for i, (phrase_a, phrase_b) in enumerate(test_cases[:5], 1):
+        print(f"  {i}. {phrase_a} vs {phrase_b}")
+
+    # 步骤 2: 测试所有模型
+    print("\n" + "=" * 60)
+    print("步骤 2: 使用所有模型并发测试")
+    print("=" * 60)
+    results = await test_all_models(test_cases, use_cache=True)
+
+    # 步骤 3: 保存结果
+    print("\n" + "=" * 60)
+    print("步骤 3: 保存结果")
+    print("=" * 60)
+    save_results(results, output_file)
+
+    print("\n" + "=" * 60)
+    print("全部完成!")
+    print("=" * 60)
+
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 10 - 3
script/data_processing/match_inspiration_features.py

@@ -18,7 +18,7 @@ from datetime import datetime
 project_root = Path(__file__).parent.parent.parent
 sys.path.insert(0, str(project_root))
 
-from lib.semantic_similarity import compare_phrases
+from lib.text_embedding import compare_phrases
 
 # 全局并发限制
 MAX_CONCURRENT_REQUESTS = 100
@@ -119,7 +119,9 @@ async def match_single_pair(
     global progress_tracker
     sem = get_semaphore()
     async with sem:
-        similarity_result = await compare_phrases(
+        # 使用 asyncio.to_thread 将同步函数转为异步执行
+        similarity_result = await asyncio.to_thread(
+            compare_phrases,
             phrase_a=feature_name,
             phrase_b=persona_name,
         )
@@ -466,9 +468,14 @@ async def main():
     with open(category_mapping_file, "r", encoding="utf-8") as f:
         category_mapping = json.load(f)
 
+    # 预先加载模型(在主线程中,避免多线程冲突)
+    print("\n预加载文本相似度模型...")
+    await asyncio.to_thread(compare_phrases, "测试", "测试")
+    print("模型预加载完成!\n")
+
     # 获取任务列表
     task_list = task_list_data.get("解构任务列表", [])
-    print(f"\n总任务数: {len(task_list)}")
+    print(f"总任务数: {len(task_list)}")
 
     # 处理任务列表
     updated_task_list = await process_task_list(