#!/usr/bin/env python3 """ 文本相似度计算模块 - 基于远程API 使用远程GPU加速的相似度计算服务,接口与 text_embedding.py 兼容 提供3种计算模式: 1. compare_phrases() - 单对计算 2. compare_phrases_batch() - 批量成对计算 (pair[i].text1 vs pair[i].text2) 3. compare_phrases_cartesian() - 笛卡尔积计算 (M×N矩阵) """ from typing import Dict, Any, Optional, List, Tuple import requests import numpy as np # API配置 DEFAULT_API_BASE_URL = "http://61.48.133.26:8187" DEFAULT_TIMEOUT = 60 # 秒 # API客户端单例 _api_client = None class SimilarityAPIClient: """文本相似度API客户端""" def __init__(self, base_url: str = DEFAULT_API_BASE_URL, timeout: int = DEFAULT_TIMEOUT): self.base_url = base_url.rstrip('/') self.timeout = timeout self._session = requests.Session() # 复用连接 def health_check(self) -> Dict: """健康检查""" response = self._session.get(f"{self.base_url}/health", timeout=10) response.raise_for_status() return response.json() def list_models(self) -> Dict: """列出支持的模型""" response = self._session.get(f"{self.base_url}/models", timeout=10) response.raise_for_status() return response.json() def similarity( self, text1: str, text2: str, model_name: Optional[str] = None ) -> Dict: """ 计算单个文本对的相似度 Args: text1: 第一个文本 text2: 第二个文本 model_name: 可选模型名称 Returns: {"text1": str, "text2": str, "score": float} """ payload = {"text1": text1, "text2": text2} if model_name: payload["model_name"] = model_name response = self._session.post( f"{self.base_url}/similarity", json=payload, timeout=self.timeout ) response.raise_for_status() return response.json() def batch_similarity( self, pairs: List[Dict], model_name: Optional[str] = None ) -> Dict: """ 批量计算成对相似度 Args: pairs: [{"text1": str, "text2": str}, ...] model_name: 可选模型名称 Returns: {"results": [{"text1": str, "text2": str, "score": float}, ...]} """ payload = {"pairs": pairs} if model_name: payload["model_name"] = model_name response = self._session.post( f"{self.base_url}/batch_similarity", json=payload, timeout=self.timeout ) response.raise_for_status() return response.json() def cartesian_similarity( self, texts1: List[str], texts2: List[str], model_name: Optional[str] = None ) -> Dict: """ 计算笛卡尔积相似度(M×N) Args: texts1: 第一组文本列表 (M个) texts2: 第二组文本列表 (N个) model_name: 可选模型名称 Returns: { "results": [{"text1": str, "text2": str, "score": float}, ...], "total": int # M×N } """ payload = { "texts1": texts1, "texts2": texts2 } if model_name: payload["model_name"] = model_name response = self._session.post( f"{self.base_url}/cartesian_similarity", json=payload, timeout=self.timeout ) response.raise_for_status() return response.json() def _get_api_client() -> SimilarityAPIClient: """获取API客户端单例""" global _api_client if _api_client is None: _api_client = SimilarityAPIClient() return _api_client def _format_result(score: float) -> Dict[str, Any]: """ 格式化相似度结果(兼容 text_embedding.py 格式) Args: score: 相似度分数 (0-1) Returns: {"说明": str, "相似度": float} """ # 生成说明 if score >= 0.9: level = "极高" elif score >= 0.7: level = "高" elif score >= 0.5: level = "中等" elif score >= 0.3: level = "较低" else: level = "低" return { "说明": f"基于向量模型计算的语义相似度为 {level} ({score:.2f})", "相似度": score } # ============================================================================ # 公开接口 - 3种计算模式 # ============================================================================ def compare_phrases( phrase_a: str, phrase_b: str, model_name: Optional[str] = None ) -> Dict[str, Any]: """ 比较两个短语的语义相似度(单对计算) Args: phrase_a: 第一个短语 phrase_b: 第二个短语 model_name: 模型名称(可选,默认使用API服务端默认模型) Returns: { "说明": str, # 相似度说明 "相似度": float # 0-1之间的相似度分数 } Examples: >>> result = compare_phrases("深度学习", "神经网络") >>> print(result['相似度']) # 0.855 >>> print(result['说明']) # 基于向量模型计算的语义相似度为 高 (0.86) """ try: client = _get_api_client() api_result = client.similarity(phrase_a, phrase_b, model_name) score = float(api_result["score"]) return _format_result(score) except Exception as e: raise RuntimeError(f"API调用失败: {e}") def compare_phrases_batch( phrase_pairs: List[Tuple[str, str]], model_name: Optional[str] = None ) -> List[Dict[str, Any]]: """ 批量比较多对短语的语义相似度(成对计算) 说明:pair[i].text1 vs pair[i].text2 适用场景:有N对独立的文本需要分别计算相似度 Args: phrase_pairs: 短语对列表 [(phrase_a, phrase_b), ...] model_name: 模型名称(可选) Returns: 结果列表,每个元素格式: { "说明": str, "相似度": float } Examples: >>> pairs = [ ... ("深度学习", "神经网络"), ... ("机器学习", "人工智能"), ... ("Python编程", "Python开发") ... ] >>> results = compare_phrases_batch(pairs) >>> for (a, b), result in zip(pairs, results): ... print(f"{a} vs {b}: {result['相似度']:.4f}") 性能: - 3对文本:~50ms(vs 逐对调用 ~150ms) - 100对文本:~200ms(vs 逐对调用 ~5s) """ if not phrase_pairs: return [] try: # 转换为API格式 api_pairs = [{"text1": a, "text2": b} for a, b in phrase_pairs] # 调用API批量计算 client = _get_api_client() api_response = client.batch_similarity(api_pairs, model_name) api_results = api_response["results"] # 格式化结果 results = [] for api_result in api_results: score = float(api_result["score"]) results.append(_format_result(score)) return results except Exception as e: raise RuntimeError(f"API批量调用失败: {e}") def compare_phrases_cartesian( phrases_a: List[str], phrases_b: List[str], max_concurrent: int = 50 ) -> List[List[Dict[str, Any]]]: """ 计算笛卡尔积相似度(M×N矩阵) 说明:计算 phrases_a 中每个短语与 phrases_b 中每个短语的相似度 适用场景:需要计算两组文本之间所有可能的组合 Args: phrases_a: 第一组短语列表 (M个) phrases_b: 第二组短语列表 (N个) max_concurrent: 最大并发数(API一次性调用,此参数保留用于接口一致性) Returns: M×N的结果矩阵(嵌套列表) results[i][j] = { "相似度": float, # phrases_a[i] vs phrases_b[j] "说明": str } Examples: >>> phrases_a = ["深度学习", "机器学习"] >>> phrases_b = ["神经网络", "人工智能", "Python"] >>> results = compare_phrases_cartesian(phrases_a, phrases_b) >>> print(results[0][0]['相似度']) # 深度学习 vs 神经网络 >>> print(results[1][2]['说明']) # 机器学习 vs Python 的说明 性能: - 2×3=6个组合:~50ms - 10×100=1000个组合:~500ms - 比逐对调用快 50-200x """ if not phrases_a or not phrases_b: return [[]] try: # 调用API计算笛卡尔积(一次性批量调用,不受max_concurrent限制) client = _get_api_client() api_response = client.cartesian_similarity(phrases_a, phrases_b, model_name=None) api_results = api_response["results"] M = len(phrases_a) N = len(phrases_b) # 返回嵌套列表(带完整说明) results = [[None for _ in range(N)] for _ in range(M)] for idx, api_result in enumerate(api_results): i = idx // N j = idx % N score = float(api_result["score"]) results[i][j] = _format_result(score) return results except Exception as e: raise RuntimeError(f"API笛卡尔积调用失败: {e}") # ============================================================================ # 工具函数 # ============================================================================ def get_api_health() -> Dict: """ 获取API健康状态 Returns: { "status": "ok", "gpu_available": bool, "gpu_name": str, "model_loaded": bool, "max_batch_pairs": int, "max_cartesian_texts": int, ... } """ client = _get_api_client() return client.health_check() def get_supported_models() -> Dict: """ 获取API支持的模型列表 Returns: 模型列表及详细信息 """ client = _get_api_client() return client.list_models() # ============================================================================ # 测试代码 # ============================================================================ if __name__ == "__main__": print("=" * 80) print(" text_embedding_api 模块测试") print("=" * 80) # 测试1: 健康检查 print("\n1. API健康检查") print("-" * 80) try: health = get_api_health() print(f"✅ API状态: {health['status']}") print(f" GPU可用: {health['gpu_available']}") if health.get('gpu_name'): print(f" GPU名称: {health['gpu_name']}") print(f" 模型已加载: {health['model_loaded']}") print(f" 最大批量对数: {health['max_batch_pairs']}") print(f" 最大笛卡尔积: {health['max_cartesian_texts']}") except Exception as e: print(f"❌ API连接失败: {e}") print(" 请确保API服务正常运行") exit(1) # 测试2: 单个相似度 print("\n2. 单个相似度计算") print("-" * 80) result = compare_phrases("深度学习", "神经网络") print(f"深度学习 vs 神经网络") print(f" 相似度: {result['相似度']:.4f}") print(f" 说明: {result['说明']}") # 测试3: 批量成对相似度 print("\n3. 批量成对相似度计算") print("-" * 80) pairs = [ ("深度学习", "神经网络"), ("机器学习", "人工智能"), ("Python编程", "Python开发") ] results = compare_phrases_batch(pairs) for (a, b), result in zip(pairs, results): print(f"{a} vs {b}: {result['相似度']:.4f}") # 测试4: 笛卡尔积(嵌套列表) print("\n4. 笛卡尔积计算(嵌套列表格式)") print("-" * 80) phrases_a = ["深度学习", "机器学习"] phrases_b = ["神经网络", "人工智能", "Python"] results = compare_phrases_cartesian(phrases_a, phrases_b) print(f"计算 {len(phrases_a)} × {len(phrases_b)} = {len(phrases_a) * len(phrases_b)} 个相似度") for i, phrase_a in enumerate(phrases_a): print(f"\n{phrase_a}:") for j, phrase_b in enumerate(phrases_b): score = results[i][j]['相似度'] print(f" vs {phrase_b:15}: {score:.4f}") # 测试5: 笛卡尔积(numpy矩阵) print("\n5. 笛卡尔积计算(numpy矩阵格式)") print("-" * 80) matrix = compare_phrases_cartesian(phrases_a, phrases_b, return_matrix=True) print(f"矩阵 shape: {matrix.shape}") print(f"\n相似度矩阵:") print(f"{'':15}", end="") for b in phrases_b: print(f"{b:15}", end="") print() for i, a in enumerate(phrases_a): print(f"{a:15}", end="") for j in range(len(phrases_b)): print(f"{matrix[i][j]:15.4f}", end="") print() # 测试6: 性能对比(可选) print("\n6. 性能测试(可选)") print("-" * 80) print("测试大规模笛卡尔积性能...") import time test_a = ["测试文本A" + str(i) for i in range(10)] test_b = ["测试文本B" + str(i) for i in range(50)] print(f"计算 {len(test_a)} × {len(test_b)} = {len(test_a) * len(test_b)} 个相似度") start = time.time() matrix = compare_phrases_cartesian(test_a, test_b, return_matrix=True) elapsed = time.time() - start print(f"耗时: {elapsed*1000:.2f}ms") print(f"QPS: {matrix.size / elapsed:.2f}") print("\n" + "=" * 80) print(" ✅ 所有测试通过!") print("=" * 80) print("\n📝 接口总结:") print(" 1. compare_phrases(a, b) - 单对计算") print(" 2. compare_phrases_batch([(a,b),...]) - 批量成对") print(" 3. compare_phrases_cartesian([a1,a2], [b1,b2,b3]) - 笛卡尔积") print("\n💡 提示:所有接口都不使用缓存,因为API已经足够快")