| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 |
- """
- 相似度计算工具:计算两组短语的 M×N 相似度矩阵。
- 使用综合相似度:embedding 50% + LLM 50%。
- """
- from __future__ import annotations
- import asyncio
- import hashlib
- import json
- import logging
- import os
- import re
- import time
- from typing import List, Tuple, TypedDict
- import httpx
- logger = logging.getLogger(__name__)
- # 缓存目录:相对本文件所在目录的 ../.cache/similarity,按 (phrase_a, phrase_b) 原子化存储
- _CACHE_DIR = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", ".cache", "similarity"))
- def _atomic_pair_key(phrase_a: str, phrase_b: str) -> str:
- """单对短语的缓存键(原子粒度)。"""
- raw = json.dumps([phrase_a, phrase_b], ensure_ascii=False, sort_keys=False)
- return hashlib.sha256(raw.encode("utf-8")).hexdigest()
- def _ensure_cache_dir() -> None:
- os.makedirs(_CACHE_DIR, exist_ok=True)
- def _read_atomic_score(cache_type: str, phrase_a: str, phrase_b: str) -> float | None:
- """读取单对短语的分数缓存,不存在或失败返回 None。"""
- key = _atomic_pair_key(phrase_a, phrase_b)
- path = os.path.join(_CACHE_DIR, f"{cache_type}_{key}.json")
- if not os.path.isfile(path):
- return None
- try:
- with open(path, "r", encoding="utf-8") as f:
- data = json.load(f)
- # 校验 phrase_a / phrase_b 一致,避免碰撞误用
- if data.get("phrase_a") != phrase_a or data.get("phrase_b") != phrase_b:
- return None
- return float(data["score"])
- except Exception as e:
- logger.debug("[similarity_cache] 读取 %s 失败: %s", path, e)
- return None
- def _write_atomic_score(cache_type: str, phrase_a: str, phrase_b: str, score: float) -> None:
- """写入单对短语的分数缓存(原子结果:phrase_a, phrase_b, score)。"""
- _ensure_cache_dir()
- key = _atomic_pair_key(phrase_a, phrase_b)
- path = os.path.join(_CACHE_DIR, f"{cache_type}_{key}.json")
- try:
- with open(path, "w", encoding="utf-8") as f:
- json.dump({"phrase_a": phrase_a, "phrase_b": phrase_b, "score": score}, f, ensure_ascii=False)
- except Exception as e:
- logger.warning("[similarity_cache] 写入 %s 失败: %s", path, e)
- class SimilarityItem(TypedDict):
- """单条相似度结果。"""
- phrase_a: str
- phrase_b: str
- embedding_score: float
- llm_score: float
- combined_score: float
- # 批量提示词模板(LLM 打分用)。占位符:{count}、{pairs_list};JSON 内大括号已转义
- DEFAULT_BATCH_PROMPT_TEMPLATE = """
- # 角色
- 你是一个语言学家、信息学家,能够理解、区分、辨别不同词语、词汇之间所表达的语义信息量的细微差异。
- # 任务
- 从语义相似程度的角度,对以下{count}对短语的语义相似度进行打分,两者的语义越相近得分越高。
- # 核心打分规则:
- 1. 打分时必须严格区分“语义相似”与“语义关联”两种关系,语义"相似"≠语义"关联",不能混淆:
- - ✅ 语义相似(高分):两个词语表达的核心含义高度重叠,可以在相近语境中互相替换。例如:"快乐"与"开心"。
- - ❌ 语义关联(低分):两个词语有逻辑/功能/字形上的联系,但各自指代的事物或概念本身并不相同,不能互相替换。
- 2. 禁止因为两个短语共享汉字或词根就给高分。字形相近 ≠ 语义相近。
- 3. 当一个短语**包含另一个短语**作为组成部分时,复合后的语义范围已发生显著变化,应根据语义实际差距给分,不得因"包含"关系而虚高打分。
- 4. 给每对短语进行打分时,将每对短语的打分都当做是完全独立的事件来判断打分,不同对的短语之间打分没有关联,当做自己看不到别的短语,**禁止**参考其它对短语的打分结果。
- # 打分量化参考(分值范围 0.00 - 1.00,最小颗粒度 0.01)
- - **[0.90 - 1.00] 极度相似 / 同义替换**:核心概念完全一致,仅表述习惯不同。
- - **[0.80 - 0.89] 高度相似 / 细微差异**:核心概念一致,但在范围、颗粒度、或侧重点上有轻微区别。
- - **[0.50 - 0.79] 中度相似 / 修饰偏离**:共享部分核心概念,但其中一个多出了强烈的修饰语或限定条件,导致语义重心发生偏移。
- - **[0.20 - 0.49] 低度相似 / 类别偏移**:字面有重合或场景高度相关,但**核心类别(词性或实体类型)已经改变**。
- - **[0.00 - 0.19] 毫不相干**:语义完全无关。
- # 输出格式(必须是一个JSON数组):
- ```json
- [
- {{
- "text_1": "",
- "text_2": "",
- "score": 0.00,
- "reason": "简明扼要说明理由"
- }},
- ...
- ]
- ```
- # 短语对列表:
- {pairs_list}
- """.strip()
- # Embedding 相似度 API
- EMBEDDING_SIMILARITY_URL = "http://61.48.133.26:8187/cartesian_similarity"
- # LLM 模型
- LLM_MODEL = "google/gemini-3.1-flash-lite-preview"
- def _phrase_pairs(phrases_a: List[str], phrases_b: List[str]) -> List[Tuple[str, str]]:
- """将 M×N 展开为短语对列表,顺序为 (a0,b0),(a0,b1),...,(a0,b_{N-1}),(a1,b0),..."""
- return [(a, b) for a in phrases_a for b in phrases_b]
- async def _embedding_similarity(phrases_a: List[str], phrases_b: List[str]) -> List[List[float]]:
- """
- 调用 cartesian_similarity API,返回 M×N 矩阵。先查原子缓存 (phrase_a, phrase_b) -> score,仅对未命中的短语对调用 API。
- """
- if not phrases_a or not phrases_b:
- return []
- M, N = len(phrases_a), len(phrases_b)
- matrix = [[0.0] * N for _ in range(M)]
- missing_indices: List[Tuple[int, int]] = []
- for i in range(M):
- for j in range(N):
- score = _read_atomic_score("embedding", phrases_a[i], phrases_b[j])
- if score is not None:
- matrix[i][j] = score
- else:
- missing_indices.append((i, j))
- total = M * N
- hit_count = total - len(missing_indices)
- if hit_count > 0:
- logger.info("[similarity_matrix] embedding 原子缓存命中 %d/%d", hit_count, total)
- if not missing_indices:
- return matrix
- # 仅对未命中的短语对调用 API:构造缺失的 phrases_a / phrases_b(去重且保持顺序)
- a_set: List[str] = list(dict.fromkeys(phrases_a[i] for i, _ in missing_indices))
- b_set: List[str] = list(dict.fromkeys(phrases_b[j] for _, j in missing_indices))
- async with httpx.AsyncClient(timeout=60.0) as client:
- resp = await client.post(
- EMBEDDING_SIMILARITY_URL,
- json={"texts1": a_set, "texts2": b_set},
- headers={"Content-Type": "application/json"},
- )
- resp.raise_for_status()
- data = resp.json()
- results = data.get("results", [])
- len_b = len(b_set)
- for i, j in missing_indices:
- a, b = phrases_a[i], phrases_b[j]
- i_m, j_m = a_set.index(a), b_set.index(b)
- idx_flat = i_m * len_b + j_m
- if idx_flat < len(results):
- score = float(results[idx_flat]["score"])
- matrix[i][j] = score
- _write_atomic_score("embedding", a, b, score)
- return matrix
- def _extract_json_array(content: str) -> List[dict]:
- """从 LLM 回复中解析 JSON 数组(允许被 ```json ... ``` 包裹)。"""
- content = content.strip()
- # 尝试匹配 ```json ... ``` 中的内容
- m = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", content)
- if m:
- content = m.group(1).strip()
- return json.loads(content)
- async def _llm_similarity(
- phrases_a: List[str], phrases_b: List[str], *, use_cache: bool = True
- ) -> List[List[float]]:
- """
- 用 LLM 对短语对打分,返回 M×N 矩阵。use_cache=True 时先查原子缓存,仅对未命中的短语对调用 API;
- use_cache=False 时不读不写缓存。
- """
- if not phrases_a or not phrases_b:
- return []
- M, N = len(phrases_a), len(phrases_b)
- matrix = [[0.0] * N for _ in range(M)]
- missing_indices: List[Tuple[int, int]] = []
- for i in range(M):
- for j in range(N):
- if use_cache:
- score = _read_atomic_score("llm", phrases_a[i], phrases_b[j])
- if score is not None:
- matrix[i][j] = score
- continue
- missing_indices.append((i, j))
- total = M * N
- if use_cache:
- hit_count = total - len(missing_indices)
- if hit_count > 0:
- logger.info("[similarity_matrix] LLM 原子缓存命中 %d/%d", hit_count, total)
- if not missing_indices:
- return matrix
- # 仅对未命中的短语对调用 LLM:按缺失顺序构造 pairs_list,LLM 按同序返回
- missing_pairs = [(phrases_a[i], phrases_b[j]) for (i, j) in missing_indices]
- pairs_list = "\n".join(
- f'{idx + 1}. "{a}" 和 "{b}"'
- for idx, (a, b) in enumerate(missing_pairs)
- )
- prompt = DEFAULT_BATCH_PROMPT_TEMPLATE.format(count=len(missing_pairs), pairs_list=pairs_list)
- from agent.llm.openrouter import openrouter_llm_call
- messages = [{"role": "user", "content": prompt}]
- result = await openrouter_llm_call(messages, model=LLM_MODEL)
- content = result.get("content", "")
- if not content:
- raise ValueError("LLM 未返回内容")
- items = _extract_json_array(content)
- for idx, (i, j) in enumerate(missing_indices):
- if idx >= len(items):
- break
- score = float(items[idx].get("score", 0.0))
- score = max(0.0, min(1.0, score))
- matrix[i][j] = score
- if use_cache:
- a, b = phrases_a[i], phrases_b[j]
- _write_atomic_score("llm", a, b, score)
- return matrix
- async def similarity_matrix(
- phrases_a: List[str],
- phrases_b: List[str],
- *,
- embedding_weight: float = 0.5,
- llm_weight: float = 0.5,
- use_llm_cache: bool = True,
- ) -> List[SimilarityItem]:
- """
- 计算两组短语的相似度,返回对象列表(每条含 phrase_a, phrase_b, embedding_score, llm_score, combined_score)。
- 综合相似度 = embedding_weight * embedding_score + llm_weight * llm_score。
- 默认各 50%。
- Args:
- phrases_a: 第一组短语列表(M 个)
- phrases_b: 第二组短语列表(N 个)
- embedding_weight: embedding 权重,默认 0.5
- llm_weight: LLM 权重,默认 0.5
- use_llm_cache: 是否使用 LLM 相似度缓存,默认 True
- Returns:
- 对象列表,长度 M×N,顺序与短语对 (a0,b0),(a0,b1),...,(aM-1,bN-1) 一致。
- """
- if not phrases_a or not phrases_b:
- return []
- M, N = len(phrases_a), len(phrases_b)
- total_pairs = M * N
- logger.info("[similarity_matrix] 开始计算: phrases_a=%d, phrases_b=%d, 短语对=%d", M, N, total_pairs)
- t_total = time.perf_counter()
- async def _run_embedding() -> List[List[float]]:
- t0 = time.perf_counter()
- out = await _embedding_similarity(phrases_a, phrases_b)
- logger.info("[similarity_matrix] embedding 耗时: %.3fs", time.perf_counter() - t0)
- return out
- async def _run_llm() -> List[List[float]]:
- t0 = time.perf_counter()
- out = await _llm_similarity(phrases_a, phrases_b, use_cache=use_llm_cache)
- logger.info("[similarity_matrix] LLM 耗时: %.3fs", time.perf_counter() - t0)
- return out
- emb_matrix, llm_matrix = await asyncio.gather(_run_embedding(), _run_llm())
- elapsed = time.perf_counter() - t_total
- logger.info("[similarity_matrix] 总耗时: %.3fs", elapsed)
- N = len(phrases_b)
- pairs = _phrase_pairs(phrases_a, phrases_b)
- result: List[SimilarityItem] = []
- for idx, (a, b) in enumerate(pairs):
- i, j = idx // N, idx % N
- emb_s = emb_matrix[i][j]
- llm_s = llm_matrix[i][j]
- combined_s = embedding_weight * emb_s + llm_weight * llm_s
- result.append({
- "phrase_a": a,
- "phrase_b": b,
- "embedding_score": emb_s,
- "llm_score": llm_s,
- "combined_score": combined_s,
- })
- return result
- def similarity_matrix_sync(
- phrases_a: List[str],
- phrases_b: List[str],
- **kwargs,
- ) -> List[SimilarityItem]:
- """同步封装:在同步代码中调用时使用 asyncio.run 执行。返回与 similarity_matrix 相同结构的对象列表。"""
- return asyncio.run(similarity_matrix(phrases_a, phrases_b, **kwargs))
- # ---------------------------------------------------------------------------
- # 测试
- # ---------------------------------------------------------------------------
- def test_phrase_pairs() -> None:
- """测试 M×N 展开为短语对列表的顺序。"""
- a = ["犬", "猫咪"]
- b = ["狗", "手机"]
- pairs = _phrase_pairs(a, b)
- assert len(pairs) == 4
- assert pairs[0] == ("犬", "狗")
- assert pairs[1] == ("犬", "手机")
- assert pairs[2] == ("猫咪", "狗")
- assert pairs[3] == ("猫咪", "手机")
- print("test_phrase_pairs: ok")
- def test_extract_json_array() -> None:
- """测试从 LLM 回复中解析 JSON 数组。"""
- # 带 ```json 包裹
- content = '''一些说明
- ```json
- [
- {"text_1": "犬", "text_2": "狗", "score": 0.85, "reason": "同义"}
- ]
- ```
- '''
- arr = _extract_json_array(content)
- assert len(arr) == 1
- assert arr[0]["score"] == 0.85
- # 纯 JSON 数组
- arr2 = _extract_json_array('[{"score": 0.5}]')
- assert len(arr2) == 1 and arr2[0]["score"] == 0.5
- print("test_extract_json_array: ok")
- async def test_similarity_matrix() -> None:
- """集成测试:调用 embedding + LLM 得到相似度对象列表。use_llm_cache 可控制是否使用 LLM 缓存。"""
- # use_llm_cache = True
- use_llm_cache = False
- phrases_a = ["柴犬形象", "鞋子", "夸张"]
- phrases_b = ["柴犬主角", "鞋架", "夸张堆叠"]
- items = await similarity_matrix(phrases_a, phrases_b, use_llm_cache=use_llm_cache)
- for item in items:
- print(item)
- # assert len(items) == 4
- # for row in items:
- # assert "phrase_a" in row and "phrase_b" in row
- # assert "embedding_score" in row and "llm_score" in row and "combined_score" in row
- # assert 0 <= row["combined_score"] <= 1, f"combined_score 应在 [0,1],得到 {row['combined_score']}"
- # # 语义上 "犬"-"狗" 应高于 "犬"-"手机"
- # dog_dog = next(r for r in items if r["phrase_a"] == "犬" and r["phrase_b"] == "狗")
- # dog_phone = next(r for r in items if r["phrase_a"] == "犬" and r["phrase_b"] == "手机")
- # assert dog_dog["combined_score"] > dog_phone["combined_score"], "犬-狗 应高于 犬-手机"
- # print("test_similarity_matrix: ok")
- # for r in items:
- # print(f" {r['phrase_a']}-{r['phrase_b']}: emb={r['embedding_score']:.4f} llm={r['llm_score']:.4f} combined={r['combined_score']:.4f}")
- if __name__ == "__main__":
- # 直接运行 python similarity_calc.py 时,将项目根加入 path,以便 import agent
- _root = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
- if _root not in __import__("sys").path:
- __import__("sys").path.insert(0, _root)
- test_phrase_pairs()
- test_extract_json_array()
- print("运行集成测试(需 embedding API、OPEN_ROUTER_API_KEY 及 agent 依赖)...")
- try:
- asyncio.run(test_similarity_matrix())
- print("全部通过。")
- except Exception as e:
- print(f"跳过集成测试: {e}")
- print("仅单元测试已通过。集成测试请确保:1) embedding 服务可访问 2) 设置 OPEN_ROUTER_API_KEY 3) 在项目根目录执行: python -m examples_how.overall_derivation.utils.similarity_calc")
|