|
|
@@ -0,0 +1,358 @@
|
|
|
+"""
|
|
|
+相似度计算工具:计算两组短语的 M×N 相似度矩阵。
|
|
|
+使用综合相似度:embedding 50% + LLM 50%。
|
|
|
+"""
|
|
|
+
|
|
|
+from __future__ import annotations
|
|
|
+
|
|
|
+import asyncio
|
|
|
+import hashlib
|
|
|
+import json
|
|
|
+import logging
|
|
|
+import os
|
|
|
+import re
|
|
|
+import time
|
|
|
+from typing import List, Tuple, TypedDict
|
|
|
+
|
|
|
+import httpx
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+# 缓存目录:相对本文件所在目录的 ../.cache/similarity,按 (phrase_a, phrase_b) 原子化存储
|
|
|
+_CACHE_DIR = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", ".cache", "similarity"))
|
|
|
+
|
|
|
+
|
|
|
+def _atomic_pair_key(phrase_a: str, phrase_b: str) -> str:
|
|
|
+ """单对短语的缓存键(原子粒度)。"""
|
|
|
+ raw = json.dumps([phrase_a, phrase_b], ensure_ascii=False, sort_keys=False)
|
|
|
+ return hashlib.sha256(raw.encode("utf-8")).hexdigest()
|
|
|
+
|
|
|
+
|
|
|
+def _ensure_cache_dir() -> None:
|
|
|
+ os.makedirs(_CACHE_DIR, exist_ok=True)
|
|
|
+
|
|
|
+
|
|
|
+def _read_atomic_score(cache_type: str, phrase_a: str, phrase_b: str) -> float | None:
|
|
|
+ """读取单对短语的分数缓存,不存在或失败返回 None。"""
|
|
|
+ key = _atomic_pair_key(phrase_a, phrase_b)
|
|
|
+ path = os.path.join(_CACHE_DIR, f"{cache_type}_{key}.json")
|
|
|
+ if not os.path.isfile(path):
|
|
|
+ return None
|
|
|
+ try:
|
|
|
+ with open(path, "r", encoding="utf-8") as f:
|
|
|
+ data = json.load(f)
|
|
|
+ # 校验 phrase_a / phrase_b 一致,避免碰撞误用
|
|
|
+ if data.get("phrase_a") != phrase_a or data.get("phrase_b") != phrase_b:
|
|
|
+ return None
|
|
|
+ return float(data["score"])
|
|
|
+ except Exception as e:
|
|
|
+ logger.debug("[similarity_cache] 读取 %s 失败: %s", path, e)
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def _write_atomic_score(cache_type: str, phrase_a: str, phrase_b: str, score: float) -> None:
|
|
|
+ """写入单对短语的分数缓存(原子结果:phrase_a, phrase_b, score)。"""
|
|
|
+ _ensure_cache_dir()
|
|
|
+ key = _atomic_pair_key(phrase_a, phrase_b)
|
|
|
+ path = os.path.join(_CACHE_DIR, f"{cache_type}_{key}.json")
|
|
|
+ try:
|
|
|
+ with open(path, "w", encoding="utf-8") as f:
|
|
|
+ json.dump({"phrase_a": phrase_a, "phrase_b": phrase_b, "score": score}, f, ensure_ascii=False)
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning("[similarity_cache] 写入 %s 失败: %s", path, e)
|
|
|
+
|
|
|
+
|
|
|
+class SimilarityItem(TypedDict):
|
|
|
+ """单条相似度结果。"""
|
|
|
+ phrase_a: str
|
|
|
+ phrase_b: str
|
|
|
+ embedding_score: float
|
|
|
+ llm_score: float
|
|
|
+ combined_score: float
|
|
|
+
|
|
|
+# 批量提示词模板(LLM 打分用)。占位符:{count}、{pairs_list};JSON 内大括号已转义
|
|
|
+DEFAULT_BATCH_PROMPT_TEMPLATE = """
|
|
|
+请从语意角度判断以下{count}对短语的相似度,每对从0-1打分,输出格式如下(必须是一个JSON数组):
|
|
|
+```json
|
|
|
+[
|
|
|
+ {{
|
|
|
+ "text_1": "",
|
|
|
+ "text_2": "",
|
|
|
+ "score": 0.0,
|
|
|
+ "reason": "简明扼要说明理由"
|
|
|
+ }},
|
|
|
+ {{
|
|
|
+ "text_1": "",
|
|
|
+ "text_2": "",
|
|
|
+ "score": 0.0,
|
|
|
+ "reason": "简明扼要说明理由"
|
|
|
+ }}
|
|
|
+]
|
|
|
+```
|
|
|
+
|
|
|
+短语对列表:
|
|
|
+{pairs_list}
|
|
|
+""".strip()
|
|
|
+
|
|
|
+# Embedding 相似度 API
|
|
|
+EMBEDDING_SIMILARITY_URL = "http://61.48.133.26:8187/cartesian_similarity"
|
|
|
+# LLM 模型
|
|
|
+LLM_MODEL = "openai/gpt-4.1-mini"
|
|
|
+
|
|
|
+
|
|
|
+def _phrase_pairs(phrases_a: List[str], phrases_b: List[str]) -> List[Tuple[str, str]]:
|
|
|
+ """将 M×N 展开为短语对列表,顺序为 (a0,b0),(a0,b1),...,(a0,b_{N-1}),(a1,b0),..."""
|
|
|
+ return [(a, b) for a in phrases_a for b in phrases_b]
|
|
|
+
|
|
|
+
|
|
|
+async def _embedding_similarity(phrases_a: List[str], phrases_b: List[str]) -> List[List[float]]:
|
|
|
+ """
|
|
|
+ 调用 cartesian_similarity API,返回 M×N 矩阵。先查原子缓存 (phrase_a, phrase_b) -> score,仅对未命中的短语对调用 API。
|
|
|
+ """
|
|
|
+ if not phrases_a or not phrases_b:
|
|
|
+ return []
|
|
|
+
|
|
|
+ M, N = len(phrases_a), len(phrases_b)
|
|
|
+ matrix = [[0.0] * N for _ in range(M)]
|
|
|
+ missing_indices: List[Tuple[int, int]] = []
|
|
|
+ for i in range(M):
|
|
|
+ for j in range(N):
|
|
|
+ score = _read_atomic_score("embedding", phrases_a[i], phrases_b[j])
|
|
|
+ if score is not None:
|
|
|
+ matrix[i][j] = score
|
|
|
+ else:
|
|
|
+ missing_indices.append((i, j))
|
|
|
+
|
|
|
+ total = M * N
|
|
|
+ hit_count = total - len(missing_indices)
|
|
|
+ if hit_count > 0:
|
|
|
+ logger.info("[similarity_matrix] embedding 原子缓存命中 %d/%d", hit_count, total)
|
|
|
+ if not missing_indices:
|
|
|
+ return matrix
|
|
|
+
|
|
|
+ # 仅对未命中的短语对调用 API:构造缺失的 phrases_a / phrases_b(去重且保持顺序)
|
|
|
+ a_set: List[str] = list(dict.fromkeys(phrases_a[i] for i, _ in missing_indices))
|
|
|
+ b_set: List[str] = list(dict.fromkeys(phrases_b[j] for _, j in missing_indices))
|
|
|
+ async with httpx.AsyncClient(timeout=60.0) as client:
|
|
|
+ resp = await client.post(
|
|
|
+ EMBEDDING_SIMILARITY_URL,
|
|
|
+ json={"texts1": a_set, "texts2": b_set},
|
|
|
+ headers={"Content-Type": "application/json"},
|
|
|
+ )
|
|
|
+ resp.raise_for_status()
|
|
|
+ data = resp.json()
|
|
|
+
|
|
|
+ results = data.get("results", [])
|
|
|
+ len_b = len(b_set)
|
|
|
+ for i, j in missing_indices:
|
|
|
+ a, b = phrases_a[i], phrases_b[j]
|
|
|
+ i_m, j_m = a_set.index(a), b_set.index(b)
|
|
|
+ idx_flat = i_m * len_b + j_m
|
|
|
+ if idx_flat < len(results):
|
|
|
+ score = float(results[idx_flat]["score"])
|
|
|
+ matrix[i][j] = score
|
|
|
+ _write_atomic_score("embedding", a, b, score)
|
|
|
+ return matrix
|
|
|
+
|
|
|
+
|
|
|
+def _extract_json_array(content: str) -> List[dict]:
|
|
|
+ """从 LLM 回复中解析 JSON 数组(允许被 ```json ... ``` 包裹)。"""
|
|
|
+ content = content.strip()
|
|
|
+ # 尝试匹配 ```json ... ``` 中的内容
|
|
|
+ m = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", content)
|
|
|
+ if m:
|
|
|
+ content = m.group(1).strip()
|
|
|
+ return json.loads(content)
|
|
|
+
|
|
|
+
|
|
|
+async def _llm_similarity(phrases_a: List[str], phrases_b: List[str]) -> List[List[float]]:
|
|
|
+ """
|
|
|
+ 用 LLM 对短语对打分,返回 M×N 矩阵。先查原子缓存,仅对未命中的短语对调用 API。
|
|
|
+ """
|
|
|
+ if not phrases_a or not phrases_b:
|
|
|
+ return []
|
|
|
+
|
|
|
+ M, N = len(phrases_a), len(phrases_b)
|
|
|
+ matrix = [[0.0] * N for _ in range(M)]
|
|
|
+ missing_indices: List[Tuple[int, int]] = []
|
|
|
+ for i in range(M):
|
|
|
+ for j in range(N):
|
|
|
+ score = _read_atomic_score("llm", phrases_a[i], phrases_b[j])
|
|
|
+ if score is not None:
|
|
|
+ matrix[i][j] = score
|
|
|
+ else:
|
|
|
+ missing_indices.append((i, j))
|
|
|
+
|
|
|
+ total = M * N
|
|
|
+ hit_count = total - len(missing_indices)
|
|
|
+ if hit_count > 0:
|
|
|
+ logger.info("[similarity_matrix] LLM 原子缓存命中 %d/%d", hit_count, total)
|
|
|
+ if not missing_indices:
|
|
|
+ return matrix
|
|
|
+
|
|
|
+ # 仅对未命中的短语对调用 LLM:按缺失顺序构造 pairs_list,LLM 按同序返回
|
|
|
+ missing_pairs = [(phrases_a[i], phrases_b[j]) for (i, j) in missing_indices]
|
|
|
+ pairs_list = "\n".join(
|
|
|
+ f'{idx + 1}. "{a}" 和 "{b}"'
|
|
|
+ for idx, (a, b) in enumerate(missing_pairs)
|
|
|
+ )
|
|
|
+ prompt = DEFAULT_BATCH_PROMPT_TEMPLATE.format(count=len(missing_pairs), pairs_list=pairs_list)
|
|
|
+
|
|
|
+ from agent.llm.openrouter import openrouter_llm_call
|
|
|
+
|
|
|
+ messages = [{"role": "user", "content": prompt}]
|
|
|
+ result = await openrouter_llm_call(messages, model=LLM_MODEL)
|
|
|
+ content = result.get("content", "")
|
|
|
+ if not content:
|
|
|
+ raise ValueError("LLM 未返回内容")
|
|
|
+
|
|
|
+ items = _extract_json_array(content)
|
|
|
+ for idx, (i, j) in enumerate(missing_indices):
|
|
|
+ if idx >= len(items):
|
|
|
+ break
|
|
|
+ score = float(items[idx].get("score", 0.0))
|
|
|
+ score = max(0.0, min(1.0, score))
|
|
|
+ matrix[i][j] = score
|
|
|
+ a, b = phrases_a[i], phrases_b[j]
|
|
|
+ _write_atomic_score("llm", a, b, score)
|
|
|
+ return matrix
|
|
|
+
|
|
|
+
|
|
|
+async def similarity_matrix(
|
|
|
+ phrases_a: List[str],
|
|
|
+ phrases_b: List[str],
|
|
|
+ *,
|
|
|
+ embedding_weight: float = 0.5,
|
|
|
+ llm_weight: float = 0.5,
|
|
|
+) -> List[SimilarityItem]:
|
|
|
+ """
|
|
|
+ 计算两组短语的相似度,返回对象列表(每条含 phrase_a, phrase_b, embedding_score, llm_score, combined_score)。
|
|
|
+
|
|
|
+ 综合相似度 = embedding_weight * embedding_score + llm_weight * llm_score。
|
|
|
+ 默认各 50%。
|
|
|
+
|
|
|
+ Args:
|
|
|
+ phrases_a: 第一组短语列表(M 个)
|
|
|
+ phrases_b: 第二组短语列表(N 个)
|
|
|
+ embedding_weight: embedding 权重,默认 0.5
|
|
|
+ llm_weight: LLM 权重,默认 0.5
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 对象列表,长度 M×N,顺序与短语对 (a0,b0),(a0,b1),...,(aM-1,bN-1) 一致。
|
|
|
+ """
|
|
|
+ if not phrases_a or not phrases_b:
|
|
|
+ return []
|
|
|
+
|
|
|
+ M, N = len(phrases_a), len(phrases_b)
|
|
|
+ total_pairs = M * N
|
|
|
+ logger.info("[similarity_matrix] 开始计算: phrases_a=%d, phrases_b=%d, 短语对=%d", M, N, total_pairs)
|
|
|
+ t_total = time.perf_counter()
|
|
|
+
|
|
|
+ async def _run_embedding() -> List[List[float]]:
|
|
|
+ t0 = time.perf_counter()
|
|
|
+ out = await _embedding_similarity(phrases_a, phrases_b)
|
|
|
+ logger.info("[similarity_matrix] embedding 耗时: %.3fs", time.perf_counter() - t0)
|
|
|
+ return out
|
|
|
+
|
|
|
+ async def _run_llm() -> List[List[float]]:
|
|
|
+ t0 = time.perf_counter()
|
|
|
+ out = await _llm_similarity(phrases_a, phrases_b)
|
|
|
+ logger.info("[similarity_matrix] LLM 耗时: %.3fs", time.perf_counter() - t0)
|
|
|
+ return out
|
|
|
+
|
|
|
+ emb_matrix, llm_matrix = await asyncio.gather(_run_embedding(), _run_llm())
|
|
|
+ elapsed = time.perf_counter() - t_total
|
|
|
+ logger.info("[similarity_matrix] 总耗时: %.3fs", elapsed)
|
|
|
+
|
|
|
+ N = len(phrases_b)
|
|
|
+ pairs = _phrase_pairs(phrases_a, phrases_b)
|
|
|
+ result: List[SimilarityItem] = []
|
|
|
+ for idx, (a, b) in enumerate(pairs):
|
|
|
+ i, j = idx // N, idx % N
|
|
|
+ emb_s = emb_matrix[i][j]
|
|
|
+ llm_s = llm_matrix[i][j]
|
|
|
+ combined_s = embedding_weight * emb_s + llm_weight * llm_s
|
|
|
+ result.append({
|
|
|
+ "phrase_a": a,
|
|
|
+ "phrase_b": b,
|
|
|
+ "embedding_score": emb_s,
|
|
|
+ "llm_score": llm_s,
|
|
|
+ "combined_score": combined_s,
|
|
|
+ })
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+def similarity_matrix_sync(
|
|
|
+ phrases_a: List[str],
|
|
|
+ phrases_b: List[str],
|
|
|
+ **kwargs,
|
|
|
+) -> List[SimilarityItem]:
|
|
|
+ """同步封装:在同步代码中调用时使用 asyncio.run 执行。返回与 similarity_matrix 相同结构的对象列表。"""
|
|
|
+ return asyncio.run(similarity_matrix(phrases_a, phrases_b, **kwargs))
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 测试
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+def test_phrase_pairs() -> None:
|
|
|
+ """测试 M×N 展开为短语对列表的顺序。"""
|
|
|
+ a = ["犬", "猫咪"]
|
|
|
+ b = ["狗", "手机"]
|
|
|
+ pairs = _phrase_pairs(a, b)
|
|
|
+ assert len(pairs) == 4
|
|
|
+ assert pairs[0] == ("犬", "狗")
|
|
|
+ assert pairs[1] == ("犬", "手机")
|
|
|
+ assert pairs[2] == ("猫咪", "狗")
|
|
|
+ assert pairs[3] == ("猫咪", "手机")
|
|
|
+ print("test_phrase_pairs: ok")
|
|
|
+
|
|
|
+
|
|
|
+def test_extract_json_array() -> None:
|
|
|
+ """测试从 LLM 回复中解析 JSON 数组。"""
|
|
|
+ # 带 ```json 包裹
|
|
|
+ content = '''一些说明
|
|
|
+```json
|
|
|
+[
|
|
|
+ {"text_1": "犬", "text_2": "狗", "score": 0.85, "reason": "同义"}
|
|
|
+]
|
|
|
+```
|
|
|
+'''
|
|
|
+ arr = _extract_json_array(content)
|
|
|
+ assert len(arr) == 1
|
|
|
+ assert arr[0]["score"] == 0.85
|
|
|
+ # 纯 JSON 数组
|
|
|
+ arr2 = _extract_json_array('[{"score": 0.5}]')
|
|
|
+ assert len(arr2) == 1 and arr2[0]["score"] == 0.5
|
|
|
+ print("test_extract_json_array: ok")
|
|
|
+
|
|
|
+
|
|
|
+async def test_similarity_matrix() -> None:
|
|
|
+ """集成测试:调用 embedding + LLM 得到相似度对象列表。"""
|
|
|
+ phrases_a = ["犬", "猫咪"]
|
|
|
+ phrases_b = ["狗", "手机"]
|
|
|
+ items = await similarity_matrix(phrases_a, phrases_b)
|
|
|
+ assert len(items) == 4
|
|
|
+ for row in items:
|
|
|
+ assert "phrase_a" in row and "phrase_b" in row
|
|
|
+ assert "embedding_score" in row and "llm_score" in row and "combined_score" in row
|
|
|
+ assert 0 <= row["combined_score"] <= 1, f"combined_score 应在 [0,1],得到 {row['combined_score']}"
|
|
|
+ # 语义上 "犬"-"狗" 应高于 "犬"-"手机"
|
|
|
+ dog_dog = next(r for r in items if r["phrase_a"] == "犬" and r["phrase_b"] == "狗")
|
|
|
+ dog_phone = next(r for r in items if r["phrase_a"] == "犬" and r["phrase_b"] == "手机")
|
|
|
+ assert dog_dog["combined_score"] > dog_phone["combined_score"], "犬-狗 应高于 犬-手机"
|
|
|
+ print("test_similarity_matrix: ok")
|
|
|
+ for r in items:
|
|
|
+ print(f" {r['phrase_a']}-{r['phrase_b']}: emb={r['embedding_score']:.4f} llm={r['llm_score']:.4f} combined={r['combined_score']:.4f}")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ test_phrase_pairs()
|
|
|
+ test_extract_json_array()
|
|
|
+ print("运行集成测试(需 embedding API、OPEN_ROUTER_API_KEY 及 agent 依赖)...")
|
|
|
+ try:
|
|
|
+ asyncio.run(test_similarity_matrix())
|
|
|
+ print("全部通过。")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"跳过集成测试: {e}")
|
|
|
+ print("仅单元测试已通过。集成测试请确保:1) embedding 服务可访问 2) 设置 OPEN_ROUTER_API_KEY 3) 在项目根目录执行: python -m examples_how.overall_derivation.utils.similarity_calc")
|