similarity_calc.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. """
  2. 相似度计算工具:计算两组短语的 M×N 相似度矩阵。
  3. 使用综合相似度:embedding 50% + LLM 50%。
  4. """
  5. from __future__ import annotations
  6. import asyncio
  7. import hashlib
  8. import json
  9. import logging
  10. import os
  11. import re
  12. import time
  13. from typing import List, Tuple, TypedDict
  14. import httpx
  15. logger = logging.getLogger(__name__)
  16. # 缓存目录:相对本文件所在目录的 ../.cache/similarity,按 (phrase_a, phrase_b) 原子化存储
  17. _CACHE_DIR = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", ".cache", "similarity"))
  18. def _atomic_pair_key(phrase_a: str, phrase_b: str) -> str:
  19. """单对短语的缓存键(原子粒度)。"""
  20. raw = json.dumps([phrase_a, phrase_b], ensure_ascii=False, sort_keys=False)
  21. return hashlib.sha256(raw.encode("utf-8")).hexdigest()
  22. def _ensure_cache_dir() -> None:
  23. os.makedirs(_CACHE_DIR, exist_ok=True)
  24. def _read_atomic_score(cache_type: str, phrase_a: str, phrase_b: str) -> float | None:
  25. """读取单对短语的分数缓存,不存在或失败返回 None。"""
  26. key = _atomic_pair_key(phrase_a, phrase_b)
  27. path = os.path.join(_CACHE_DIR, f"{cache_type}_{key}.json")
  28. if not os.path.isfile(path):
  29. return None
  30. try:
  31. with open(path, "r", encoding="utf-8") as f:
  32. data = json.load(f)
  33. # 校验 phrase_a / phrase_b 一致,避免碰撞误用
  34. if data.get("phrase_a") != phrase_a or data.get("phrase_b") != phrase_b:
  35. return None
  36. return float(data["score"])
  37. except Exception as e:
  38. logger.debug("[similarity_cache] 读取 %s 失败: %s", path, e)
  39. return None
  40. def _write_atomic_score(cache_type: str, phrase_a: str, phrase_b: str, score: float) -> None:
  41. """写入单对短语的分数缓存(原子结果:phrase_a, phrase_b, score)。"""
  42. _ensure_cache_dir()
  43. key = _atomic_pair_key(phrase_a, phrase_b)
  44. path = os.path.join(_CACHE_DIR, f"{cache_type}_{key}.json")
  45. try:
  46. with open(path, "w", encoding="utf-8") as f:
  47. json.dump({"phrase_a": phrase_a, "phrase_b": phrase_b, "score": score}, f, ensure_ascii=False)
  48. except Exception as e:
  49. logger.warning("[similarity_cache] 写入 %s 失败: %s", path, e)
  50. class SimilarityItem(TypedDict):
  51. """单条相似度结果。"""
  52. phrase_a: str
  53. phrase_b: str
  54. embedding_score: float
  55. llm_score: float
  56. combined_score: float
  57. # 批量提示词模板(LLM 打分用)。占位符:{count}、{pairs_list};JSON 内大括号已转义
  58. DEFAULT_BATCH_PROMPT_TEMPLATE = """
  59. # 角色
  60. 你是一个语言学家、信息学家,能够理解、区分、辨别不同词语、词汇之间所表达的语义信息量的细微差异。
  61. # 任务
  62. 从语义相似程度的角度,对以下{count}对短语的语义相似度进行打分,两者的语义越相近得分越高。
  63. # 核心打分规则:
  64. 1. 打分时必须严格区分“语义相似”与“语义关联”两种关系,语义"相似"≠语义"关联",不能混淆:
  65. - ✅ 语义相似(高分):两个词语表达的核心含义高度重叠,可以在相近语境中互相替换。例如:"快乐"与"开心"。
  66. - ❌ 语义关联(低分):两个词语有逻辑/功能/字形上的联系,但各自指代的事物或概念本身并不相同,不能互相替换。
  67. 2. 禁止因为两个短语共享汉字或词根就给高分。字形相近 ≠ 语义相近。
  68. 3. 当一个短语**包含另一个短语**作为组成部分时,复合后的语义范围已发生显著变化,应根据语义实际差距给分,不得因"包含"关系而虚高打分。
  69. 4. 给每对短语进行打分时,将每对短语的打分都当做是完全独立的事件来判断打分,不同对的短语之间打分没有关联,当做自己看不到别的短语,**禁止**参考其它对短语的打分结果。
  70. # 打分量化参考(分值范围 0.00 - 1.00,最小颗粒度 0.01)
  71. - **[0.90 - 1.00] 极度相似 / 同义替换**:核心概念完全一致,仅表述习惯不同。
  72. - **[0.80 - 0.89] 高度相似 / 细微差异**:核心概念一致,但在范围、颗粒度、或侧重点上有轻微区别。
  73. - **[0.50 - 0.79] 中度相似 / 修饰偏离**:共享部分核心概念,但其中一个多出了强烈的修饰语或限定条件,导致语义重心发生偏移。
  74. - **[0.20 - 0.49] 低度相似 / 类别偏移**:字面有重合或场景高度相关,但**核心类别(词性或实体类型)已经改变**。
  75. - **[0.00 - 0.19] 毫不相干**:语义完全无关。
  76. # 输出格式(必须是一个JSON数组):
  77. ```json
  78. [
  79. {{
  80. "text_1": "",
  81. "text_2": "",
  82. "score": 0.00,
  83. "reason": "简明扼要说明理由"
  84. }},
  85. ...
  86. ]
  87. ```
  88. # 短语对列表:
  89. {pairs_list}
  90. """.strip()
  91. # Embedding 相似度 API
  92. EMBEDDING_SIMILARITY_URL = "http://61.48.133.26:8187/cartesian_similarity"
  93. # LLM 模型
  94. LLM_MODEL = "google/gemini-3.1-flash-lite-preview"
  95. def _phrase_pairs(phrases_a: List[str], phrases_b: List[str]) -> List[Tuple[str, str]]:
  96. """将 M×N 展开为短语对列表,顺序为 (a0,b0),(a0,b1),...,(a0,b_{N-1}),(a1,b0),..."""
  97. return [(a, b) for a in phrases_a for b in phrases_b]
  98. async def _embedding_similarity(phrases_a: List[str], phrases_b: List[str]) -> List[List[float]]:
  99. """
  100. 调用 cartesian_similarity API,返回 M×N 矩阵。先查原子缓存 (phrase_a, phrase_b) -> score,仅对未命中的短语对调用 API。
  101. """
  102. if not phrases_a or not phrases_b:
  103. return []
  104. M, N = len(phrases_a), len(phrases_b)
  105. matrix = [[0.0] * N for _ in range(M)]
  106. missing_indices: List[Tuple[int, int]] = []
  107. for i in range(M):
  108. for j in range(N):
  109. score = _read_atomic_score("embedding", phrases_a[i], phrases_b[j])
  110. if score is not None:
  111. matrix[i][j] = score
  112. else:
  113. missing_indices.append((i, j))
  114. total = M * N
  115. hit_count = total - len(missing_indices)
  116. if hit_count > 0:
  117. logger.info("[similarity_matrix] embedding 原子缓存命中 %d/%d", hit_count, total)
  118. if not missing_indices:
  119. return matrix
  120. # 仅对未命中的短语对调用 API:构造缺失的 phrases_a / phrases_b(去重且保持顺序)
  121. a_set: List[str] = list(dict.fromkeys(phrases_a[i] for i, _ in missing_indices))
  122. b_set: List[str] = list(dict.fromkeys(phrases_b[j] for _, j in missing_indices))
  123. async with httpx.AsyncClient(timeout=60.0) as client:
  124. resp = await client.post(
  125. EMBEDDING_SIMILARITY_URL,
  126. json={"texts1": a_set, "texts2": b_set},
  127. headers={"Content-Type": "application/json"},
  128. )
  129. resp.raise_for_status()
  130. data = resp.json()
  131. results = data.get("results", [])
  132. len_b = len(b_set)
  133. for i, j in missing_indices:
  134. a, b = phrases_a[i], phrases_b[j]
  135. i_m, j_m = a_set.index(a), b_set.index(b)
  136. idx_flat = i_m * len_b + j_m
  137. if idx_flat < len(results):
  138. score = float(results[idx_flat]["score"])
  139. matrix[i][j] = score
  140. _write_atomic_score("embedding", a, b, score)
  141. return matrix
  142. def _extract_json_array(content: str) -> List[dict]:
  143. """从 LLM 回复中解析 JSON 数组(允许被 ```json ... ``` 包裹)。"""
  144. content = content.strip()
  145. # 尝试匹配 ```json ... ``` 中的内容
  146. m = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", content)
  147. if m:
  148. content = m.group(1).strip()
  149. return json.loads(content)
  150. async def _llm_similarity(
  151. phrases_a: List[str], phrases_b: List[str], *, use_cache: bool = True
  152. ) -> List[List[float]]:
  153. """
  154. 用 LLM 对短语对打分,返回 M×N 矩阵。use_cache=True 时先查原子缓存,仅对未命中的短语对调用 API;
  155. use_cache=False 时不读不写缓存。
  156. """
  157. if not phrases_a or not phrases_b:
  158. return []
  159. M, N = len(phrases_a), len(phrases_b)
  160. matrix = [[0.0] * N for _ in range(M)]
  161. missing_indices: List[Tuple[int, int]] = []
  162. for i in range(M):
  163. for j in range(N):
  164. if use_cache:
  165. score = _read_atomic_score("llm", phrases_a[i], phrases_b[j])
  166. if score is not None:
  167. matrix[i][j] = score
  168. continue
  169. missing_indices.append((i, j))
  170. total = M * N
  171. if use_cache:
  172. hit_count = total - len(missing_indices)
  173. if hit_count > 0:
  174. logger.info("[similarity_matrix] LLM 原子缓存命中 %d/%d", hit_count, total)
  175. if not missing_indices:
  176. return matrix
  177. # 仅对未命中的短语对调用 LLM:按缺失顺序构造 pairs_list,LLM 按同序返回
  178. missing_pairs = [(phrases_a[i], phrases_b[j]) for (i, j) in missing_indices]
  179. pairs_list = "\n".join(
  180. f'{idx + 1}. "{a}" 和 "{b}"'
  181. for idx, (a, b) in enumerate(missing_pairs)
  182. )
  183. prompt = DEFAULT_BATCH_PROMPT_TEMPLATE.format(count=len(missing_pairs), pairs_list=pairs_list)
  184. from agent.llm.openrouter import openrouter_llm_call
  185. messages = [{"role": "user", "content": prompt}]
  186. result = await openrouter_llm_call(messages, model=LLM_MODEL)
  187. content = result.get("content", "")
  188. if not content:
  189. raise ValueError("LLM 未返回内容")
  190. items = _extract_json_array(content)
  191. for idx, (i, j) in enumerate(missing_indices):
  192. if idx >= len(items):
  193. break
  194. score = float(items[idx].get("score", 0.0))
  195. score = max(0.0, min(1.0, score))
  196. matrix[i][j] = score
  197. if use_cache:
  198. a, b = phrases_a[i], phrases_b[j]
  199. _write_atomic_score("llm", a, b, score)
  200. return matrix
  201. async def similarity_matrix(
  202. phrases_a: List[str],
  203. phrases_b: List[str],
  204. *,
  205. embedding_weight: float = 0.5,
  206. llm_weight: float = 0.5,
  207. use_llm_cache: bool = True,
  208. ) -> List[SimilarityItem]:
  209. """
  210. 计算两组短语的相似度,返回对象列表(每条含 phrase_a, phrase_b, embedding_score, llm_score, combined_score)。
  211. 综合相似度 = embedding_weight * embedding_score + llm_weight * llm_score。
  212. 默认各 50%。
  213. Args:
  214. phrases_a: 第一组短语列表(M 个)
  215. phrases_b: 第二组短语列表(N 个)
  216. embedding_weight: embedding 权重,默认 0.5
  217. llm_weight: LLM 权重,默认 0.5
  218. use_llm_cache: 是否使用 LLM 相似度缓存,默认 True
  219. Returns:
  220. 对象列表,长度 M×N,顺序与短语对 (a0,b0),(a0,b1),...,(aM-1,bN-1) 一致。
  221. """
  222. if not phrases_a or not phrases_b:
  223. return []
  224. M, N = len(phrases_a), len(phrases_b)
  225. total_pairs = M * N
  226. logger.info("[similarity_matrix] 开始计算: phrases_a=%d, phrases_b=%d, 短语对=%d", M, N, total_pairs)
  227. t_total = time.perf_counter()
  228. async def _run_embedding() -> List[List[float]]:
  229. t0 = time.perf_counter()
  230. out = await _embedding_similarity(phrases_a, phrases_b)
  231. logger.info("[similarity_matrix] embedding 耗时: %.3fs", time.perf_counter() - t0)
  232. return out
  233. async def _run_llm() -> List[List[float]]:
  234. t0 = time.perf_counter()
  235. out = await _llm_similarity(phrases_a, phrases_b, use_cache=use_llm_cache)
  236. logger.info("[similarity_matrix] LLM 耗时: %.3fs", time.perf_counter() - t0)
  237. return out
  238. emb_matrix, llm_matrix = await asyncio.gather(_run_embedding(), _run_llm())
  239. elapsed = time.perf_counter() - t_total
  240. logger.info("[similarity_matrix] 总耗时: %.3fs", elapsed)
  241. N = len(phrases_b)
  242. pairs = _phrase_pairs(phrases_a, phrases_b)
  243. result: List[SimilarityItem] = []
  244. for idx, (a, b) in enumerate(pairs):
  245. i, j = idx // N, idx % N
  246. emb_s = emb_matrix[i][j]
  247. llm_s = llm_matrix[i][j]
  248. combined_s = embedding_weight * emb_s + llm_weight * llm_s
  249. result.append({
  250. "phrase_a": a,
  251. "phrase_b": b,
  252. "embedding_score": emb_s,
  253. "llm_score": llm_s,
  254. "combined_score": combined_s,
  255. })
  256. return result
  257. def similarity_matrix_sync(
  258. phrases_a: List[str],
  259. phrases_b: List[str],
  260. **kwargs,
  261. ) -> List[SimilarityItem]:
  262. """同步封装:在同步代码中调用时使用 asyncio.run 执行。返回与 similarity_matrix 相同结构的对象列表。"""
  263. return asyncio.run(similarity_matrix(phrases_a, phrases_b, **kwargs))
  264. # ---------------------------------------------------------------------------
  265. # 测试
  266. # ---------------------------------------------------------------------------
  267. def test_phrase_pairs() -> None:
  268. """测试 M×N 展开为短语对列表的顺序。"""
  269. a = ["犬", "猫咪"]
  270. b = ["狗", "手机"]
  271. pairs = _phrase_pairs(a, b)
  272. assert len(pairs) == 4
  273. assert pairs[0] == ("犬", "狗")
  274. assert pairs[1] == ("犬", "手机")
  275. assert pairs[2] == ("猫咪", "狗")
  276. assert pairs[3] == ("猫咪", "手机")
  277. print("test_phrase_pairs: ok")
  278. def test_extract_json_array() -> None:
  279. """测试从 LLM 回复中解析 JSON 数组。"""
  280. # 带 ```json 包裹
  281. content = '''一些说明
  282. ```json
  283. [
  284. {"text_1": "犬", "text_2": "狗", "score": 0.85, "reason": "同义"}
  285. ]
  286. ```
  287. '''
  288. arr = _extract_json_array(content)
  289. assert len(arr) == 1
  290. assert arr[0]["score"] == 0.85
  291. # 纯 JSON 数组
  292. arr2 = _extract_json_array('[{"score": 0.5}]')
  293. assert len(arr2) == 1 and arr2[0]["score"] == 0.5
  294. print("test_extract_json_array: ok")
  295. async def test_similarity_matrix() -> None:
  296. """集成测试:调用 embedding + LLM 得到相似度对象列表。use_llm_cache 可控制是否使用 LLM 缓存。"""
  297. # use_llm_cache = True
  298. use_llm_cache = False
  299. phrases_a = ["柴犬形象", "鞋子", "夸张"]
  300. phrases_b = ["柴犬主角", "鞋架", "夸张堆叠"]
  301. items = await similarity_matrix(phrases_a, phrases_b, use_llm_cache=use_llm_cache)
  302. for item in items:
  303. print(item)
  304. # assert len(items) == 4
  305. # for row in items:
  306. # assert "phrase_a" in row and "phrase_b" in row
  307. # assert "embedding_score" in row and "llm_score" in row and "combined_score" in row
  308. # assert 0 <= row["combined_score"] <= 1, f"combined_score 应在 [0,1],得到 {row['combined_score']}"
  309. # # 语义上 "犬"-"狗" 应高于 "犬"-"手机"
  310. # dog_dog = next(r for r in items if r["phrase_a"] == "犬" and r["phrase_b"] == "狗")
  311. # dog_phone = next(r for r in items if r["phrase_a"] == "犬" and r["phrase_b"] == "手机")
  312. # assert dog_dog["combined_score"] > dog_phone["combined_score"], "犬-狗 应高于 犬-手机"
  313. # print("test_similarity_matrix: ok")
  314. # for r in items:
  315. # print(f" {r['phrase_a']}-{r['phrase_b']}: emb={r['embedding_score']:.4f} llm={r['llm_score']:.4f} combined={r['combined_score']:.4f}")
  316. if __name__ == "__main__":
  317. # 直接运行 python similarity_calc.py 时,将项目根加入 path,以便 import agent
  318. _root = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
  319. if _root not in __import__("sys").path:
  320. __import__("sys").path.insert(0, _root)
  321. test_phrase_pairs()
  322. test_extract_json_array()
  323. print("运行集成测试(需 embedding API、OPEN_ROUTER_API_KEY 及 agent 依赖)...")
  324. try:
  325. asyncio.run(test_similarity_matrix())
  326. print("全部通过。")
  327. except Exception as e:
  328. print(f"跳过集成测试: {e}")
  329. print("仅单元测试已通过。集成测试请确保:1) embedding 服务可访问 2) 设置 OPEN_ROUTER_API_KEY 3) 在项目根目录执行: python -m examples_how.overall_derivation.utils.similarity_calc")