""" 相似度计算工具:计算两组短语的 M×N 相似度矩阵。 使用综合相似度:embedding 50% + LLM 50%。 """ from __future__ import annotations import asyncio import hashlib import json import logging import os import re import time from typing import List, Tuple, TypedDict import httpx logger = logging.getLogger(__name__) # 缓存目录:相对本文件所在目录的 ../.cache/similarity,按 (phrase_a, phrase_b) 原子化存储 _CACHE_DIR = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", ".cache", "similarity")) def _atomic_pair_key(phrase_a: str, phrase_b: str) -> str: """单对短语的缓存键(原子粒度)。""" raw = json.dumps([phrase_a, phrase_b], ensure_ascii=False, sort_keys=False) return hashlib.sha256(raw.encode("utf-8")).hexdigest() def _ensure_cache_dir() -> None: os.makedirs(_CACHE_DIR, exist_ok=True) def _read_atomic_score(cache_type: str, phrase_a: str, phrase_b: str) -> float | None: """读取单对短语的分数缓存,不存在或失败返回 None。""" key = _atomic_pair_key(phrase_a, phrase_b) path = os.path.join(_CACHE_DIR, f"{cache_type}_{key}.json") if not os.path.isfile(path): return None try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) # 校验 phrase_a / phrase_b 一致,避免碰撞误用 if data.get("phrase_a") != phrase_a or data.get("phrase_b") != phrase_b: return None return float(data["score"]) except Exception as e: logger.debug("[similarity_cache] 读取 %s 失败: %s", path, e) return None def _write_atomic_score(cache_type: str, phrase_a: str, phrase_b: str, score: float) -> None: """写入单对短语的分数缓存(原子结果:phrase_a, phrase_b, score)。""" _ensure_cache_dir() key = _atomic_pair_key(phrase_a, phrase_b) path = os.path.join(_CACHE_DIR, f"{cache_type}_{key}.json") try: with open(path, "w", encoding="utf-8") as f: json.dump({"phrase_a": phrase_a, "phrase_b": phrase_b, "score": score}, f, ensure_ascii=False) except Exception as e: logger.warning("[similarity_cache] 写入 %s 失败: %s", path, e) class SimilarityItem(TypedDict): """单条相似度结果。""" phrase_a: str phrase_b: str embedding_score: float llm_score: float combined_score: float # 批量提示词模板(LLM 打分用)。占位符:{count}、{pairs_list};JSON 内大括号已转义 DEFAULT_BATCH_PROMPT_TEMPLATE = """ # 角色 你是一个语言学家、信息学家,能够理解、区分、辨别不同词语、词汇之间所表达的语义信息量的细微差异。 # 任务 从语义相似程度的角度,对以下{count}对短语的语义相似度进行打分,两者的语义越相近得分越高。 # 核心打分规则: 1. 打分时必须严格区分“语义相似”与“语义关联”两种关系,语义"相似"≠语义"关联",不能混淆: - ✅ 语义相似(高分):两个词语表达的核心含义高度重叠,可以在相近语境中互相替换。例如:"快乐"与"开心"。 - ❌ 语义关联(低分):两个词语有逻辑/功能/字形上的联系,但各自指代的事物或概念本身并不相同,不能互相替换。 2. 禁止因为两个短语共享汉字或词根就给高分。字形相近 ≠ 语义相近。 3. 当一个短语**包含另一个短语**作为组成部分时,复合后的语义范围已发生显著变化,应根据语义实际差距给分,不得因"包含"关系而虚高打分。 4. 给每对短语进行打分时,将每对短语的打分都当做是完全独立的事件来判断打分,不同对的短语之间打分没有关联,当做自己看不到别的短语,**禁止**参考其它对短语的打分结果。 # 打分量化参考(分值范围 0.00 - 1.00,最小颗粒度 0.01) - **[0.90 - 1.00] 极度相似 / 同义替换**:核心概念完全一致,仅表述习惯不同。 - **[0.80 - 0.89] 高度相似 / 细微差异**:核心概念一致,但在范围、颗粒度、或侧重点上有轻微区别。 - **[0.50 - 0.79] 中度相似 / 修饰偏离**:共享部分核心概念,但其中一个多出了强烈的修饰语或限定条件,导致语义重心发生偏移。 - **[0.20 - 0.49] 低度相似 / 类别偏移**:字面有重合或场景高度相关,但**核心类别(词性或实体类型)已经改变**。 - **[0.00 - 0.19] 毫不相干**:语义完全无关。 # 输出格式(必须是一个JSON数组): ```json [ {{ "text_1": "", "text_2": "", "score": 0.00, "reason": "简明扼要说明理由" }}, ... ] ``` # 短语对列表: {pairs_list} """.strip() # Embedding 相似度 API EMBEDDING_SIMILARITY_URL = "http://61.48.133.26:8187/cartesian_similarity" # LLM 模型 LLM_MODEL = "google/gemini-3.1-flash-lite-preview" def _phrase_pairs(phrases_a: List[str], phrases_b: List[str]) -> List[Tuple[str, str]]: """将 M×N 展开为短语对列表,顺序为 (a0,b0),(a0,b1),...,(a0,b_{N-1}),(a1,b0),...""" return [(a, b) for a in phrases_a for b in phrases_b] async def _embedding_similarity(phrases_a: List[str], phrases_b: List[str]) -> List[List[float]]: """ 调用 cartesian_similarity API,返回 M×N 矩阵。先查原子缓存 (phrase_a, phrase_b) -> score,仅对未命中的短语对调用 API。 """ if not phrases_a or not phrases_b: return [] M, N = len(phrases_a), len(phrases_b) matrix = [[0.0] * N for _ in range(M)] missing_indices: List[Tuple[int, int]] = [] for i in range(M): for j in range(N): score = _read_atomic_score("embedding", phrases_a[i], phrases_b[j]) if score is not None: matrix[i][j] = score else: missing_indices.append((i, j)) total = M * N hit_count = total - len(missing_indices) if hit_count > 0: logger.info("[similarity_matrix] embedding 原子缓存命中 %d/%d", hit_count, total) if not missing_indices: return matrix # 仅对未命中的短语对调用 API:构造缺失的 phrases_a / phrases_b(去重且保持顺序) a_set: List[str] = list(dict.fromkeys(phrases_a[i] for i, _ in missing_indices)) b_set: List[str] = list(dict.fromkeys(phrases_b[j] for _, j in missing_indices)) async with httpx.AsyncClient(timeout=60.0) as client: resp = await client.post( EMBEDDING_SIMILARITY_URL, json={"texts1": a_set, "texts2": b_set}, headers={"Content-Type": "application/json"}, ) resp.raise_for_status() data = resp.json() results = data.get("results", []) len_b = len(b_set) for i, j in missing_indices: a, b = phrases_a[i], phrases_b[j] i_m, j_m = a_set.index(a), b_set.index(b) idx_flat = i_m * len_b + j_m if idx_flat < len(results): score = float(results[idx_flat]["score"]) matrix[i][j] = score _write_atomic_score("embedding", a, b, score) return matrix def _extract_json_array(content: str) -> List[dict]: """从 LLM 回复中解析 JSON 数组(允许被 ```json ... ``` 包裹)。""" content = content.strip() # 尝试匹配 ```json ... ``` 中的内容 m = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", content) if m: content = m.group(1).strip() return json.loads(content) async def _llm_similarity( phrases_a: List[str], phrases_b: List[str], *, use_cache: bool = True ) -> List[List[float]]: """ 用 LLM 对短语对打分,返回 M×N 矩阵。use_cache=True 时先查原子缓存,仅对未命中的短语对调用 API; use_cache=False 时不读不写缓存。 """ if not phrases_a or not phrases_b: return [] M, N = len(phrases_a), len(phrases_b) matrix = [[0.0] * N for _ in range(M)] missing_indices: List[Tuple[int, int]] = [] for i in range(M): for j in range(N): if use_cache: score = _read_atomic_score("llm", phrases_a[i], phrases_b[j]) if score is not None: matrix[i][j] = score continue missing_indices.append((i, j)) total = M * N if use_cache: hit_count = total - len(missing_indices) if hit_count > 0: logger.info("[similarity_matrix] LLM 原子缓存命中 %d/%d", hit_count, total) if not missing_indices: return matrix # 仅对未命中的短语对调用 LLM:按缺失顺序构造 pairs_list,LLM 按同序返回 missing_pairs = [(phrases_a[i], phrases_b[j]) for (i, j) in missing_indices] pairs_list = "\n".join( f'{idx + 1}. "{a}" 和 "{b}"' for idx, (a, b) in enumerate(missing_pairs) ) prompt = DEFAULT_BATCH_PROMPT_TEMPLATE.format(count=len(missing_pairs), pairs_list=pairs_list) from agent.llm.openrouter import openrouter_llm_call messages = [{"role": "user", "content": prompt}] result = await openrouter_llm_call(messages, model=LLM_MODEL) content = result.get("content", "") if not content: raise ValueError("LLM 未返回内容") items = _extract_json_array(content) for idx, (i, j) in enumerate(missing_indices): if idx >= len(items): break score = float(items[idx].get("score", 0.0)) score = max(0.0, min(1.0, score)) matrix[i][j] = score if use_cache: a, b = phrases_a[i], phrases_b[j] _write_atomic_score("llm", a, b, score) return matrix async def similarity_matrix( phrases_a: List[str], phrases_b: List[str], *, embedding_weight: float = 0.5, llm_weight: float = 0.5, use_llm_cache: bool = True, ) -> List[SimilarityItem]: """ 计算两组短语的相似度,返回对象列表(每条含 phrase_a, phrase_b, embedding_score, llm_score, combined_score)。 综合相似度 = embedding_weight * embedding_score + llm_weight * llm_score。 默认各 50%。 Args: phrases_a: 第一组短语列表(M 个) phrases_b: 第二组短语列表(N 个) embedding_weight: embedding 权重,默认 0.5 llm_weight: LLM 权重,默认 0.5 use_llm_cache: 是否使用 LLM 相似度缓存,默认 True Returns: 对象列表,长度 M×N,顺序与短语对 (a0,b0),(a0,b1),...,(aM-1,bN-1) 一致。 """ if not phrases_a or not phrases_b: return [] M, N = len(phrases_a), len(phrases_b) total_pairs = M * N logger.info("[similarity_matrix] 开始计算: phrases_a=%d, phrases_b=%d, 短语对=%d", M, N, total_pairs) t_total = time.perf_counter() async def _run_embedding() -> List[List[float]]: t0 = time.perf_counter() out = await _embedding_similarity(phrases_a, phrases_b) logger.info("[similarity_matrix] embedding 耗时: %.3fs", time.perf_counter() - t0) return out async def _run_llm() -> List[List[float]]: t0 = time.perf_counter() out = await _llm_similarity(phrases_a, phrases_b, use_cache=use_llm_cache) logger.info("[similarity_matrix] LLM 耗时: %.3fs", time.perf_counter() - t0) return out emb_matrix, llm_matrix = await asyncio.gather(_run_embedding(), _run_llm()) elapsed = time.perf_counter() - t_total logger.info("[similarity_matrix] 总耗时: %.3fs", elapsed) N = len(phrases_b) pairs = _phrase_pairs(phrases_a, phrases_b) result: List[SimilarityItem] = [] for idx, (a, b) in enumerate(pairs): i, j = idx // N, idx % N emb_s = emb_matrix[i][j] llm_s = llm_matrix[i][j] combined_s = embedding_weight * emb_s + llm_weight * llm_s result.append({ "phrase_a": a, "phrase_b": b, "embedding_score": emb_s, "llm_score": llm_s, "combined_score": combined_s, }) return result def similarity_matrix_sync( phrases_a: List[str], phrases_b: List[str], **kwargs, ) -> List[SimilarityItem]: """同步封装:在同步代码中调用时使用 asyncio.run 执行。返回与 similarity_matrix 相同结构的对象列表。""" return asyncio.run(similarity_matrix(phrases_a, phrases_b, **kwargs)) # --------------------------------------------------------------------------- # 测试 # --------------------------------------------------------------------------- def test_phrase_pairs() -> None: """测试 M×N 展开为短语对列表的顺序。""" a = ["犬", "猫咪"] b = ["狗", "手机"] pairs = _phrase_pairs(a, b) assert len(pairs) == 4 assert pairs[0] == ("犬", "狗") assert pairs[1] == ("犬", "手机") assert pairs[2] == ("猫咪", "狗") assert pairs[3] == ("猫咪", "手机") print("test_phrase_pairs: ok") def test_extract_json_array() -> None: """测试从 LLM 回复中解析 JSON 数组。""" # 带 ```json 包裹 content = '''一些说明 ```json [ {"text_1": "犬", "text_2": "狗", "score": 0.85, "reason": "同义"} ] ``` ''' arr = _extract_json_array(content) assert len(arr) == 1 assert arr[0]["score"] == 0.85 # 纯 JSON 数组 arr2 = _extract_json_array('[{"score": 0.5}]') assert len(arr2) == 1 and arr2[0]["score"] == 0.5 print("test_extract_json_array: ok") async def test_similarity_matrix() -> None: """集成测试:调用 embedding + LLM 得到相似度对象列表。use_llm_cache 可控制是否使用 LLM 缓存。""" # use_llm_cache = True use_llm_cache = False phrases_a = ["柴犬形象", "鞋子", "夸张"] phrases_b = ["柴犬主角", "鞋架", "夸张堆叠"] items = await similarity_matrix(phrases_a, phrases_b, use_llm_cache=use_llm_cache) for item in items: print(item) # assert len(items) == 4 # for row in items: # assert "phrase_a" in row and "phrase_b" in row # assert "embedding_score" in row and "llm_score" in row and "combined_score" in row # assert 0 <= row["combined_score"] <= 1, f"combined_score 应在 [0,1],得到 {row['combined_score']}" # # 语义上 "犬"-"狗" 应高于 "犬"-"手机" # dog_dog = next(r for r in items if r["phrase_a"] == "犬" and r["phrase_b"] == "狗") # dog_phone = next(r for r in items if r["phrase_a"] == "犬" and r["phrase_b"] == "手机") # assert dog_dog["combined_score"] > dog_phone["combined_score"], "犬-狗 应高于 犬-手机" # print("test_similarity_matrix: ok") # for r in items: # print(f" {r['phrase_a']}-{r['phrase_b']}: emb={r['embedding_score']:.4f} llm={r['llm_score']:.4f} combined={r['combined_score']:.4f}") if __name__ == "__main__": # 直接运行 python similarity_calc.py 时,将项目根加入 path,以便 import agent _root = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "..", "..")) if _root not in __import__("sys").path: __import__("sys").path.insert(0, _root) test_phrase_pairs() test_extract_json_array() print("运行集成测试(需 embedding API、OPEN_ROUTER_API_KEY 及 agent 依赖)...") try: asyncio.run(test_similarity_matrix()) print("全部通过。") except Exception as e: print(f"跳过集成测试: {e}") print("仅单元测试已通过。集成测试请确保:1) embedding 服务可访问 2) 设置 OPEN_ROUTER_API_KEY 3) 在项目根目录执行: python -m examples_how.overall_derivation.utils.similarity_calc")