""" Milvus Lite 存储封装 单一存储架构,存储完整知识数据 + 向量。 """ from milvus import default_server from pymilvus import ( connections, Collection, FieldSchema, CollectionSchema, DataType, utility ) from typing import List, Dict, Optional import json import time class MilvusStore: def __init__(self, data_dir: str = "./milvus_data"): """ 初始化 Milvus Lite 存储 Args: data_dir: 数据存储目录 """ # 启动内嵌服务器 default_server.set_base_dir(data_dir) # 检查是否已经有 Milvus 实例在运行 try: # 尝试连接到可能已存在的实例 connections.connect( alias="default", host='127.0.0.1', port=default_server.listen_port, timeout=5 ) print(f"[Milvus] 连接到已存在的 Milvus 实例 (端口 {default_server.listen_port})") except Exception: # 没有运行的实例,启动新的 print(f"[Milvus] 启动新的 Milvus Lite 实例...") try: default_server.start() print(f"[Milvus] Milvus Lite 启动成功 (端口 {default_server.listen_port})") # 启动后建立连接 connections.connect( alias="default", host='127.0.0.1', port=default_server.listen_port, timeout=5 ) print(f"[Milvus] 已连接到新启动的实例") except Exception as e: print(f"[Milvus] 启动失败: {e}") # 尝试连接到可能已经在运行的实例 try: connections.connect( alias="default", host='127.0.0.1', port=default_server.listen_port, timeout=5 ) print(f"[Milvus] 连接到已存在的实例") except Exception as e2: raise RuntimeError(f"无法启动或连接到 Milvus: {e}, {e2}") self._init_collection() def _init_collection(self): """初始化 collection""" collection_name = "knowledge" if utility.has_collection(collection_name): self.collection = Collection(collection_name) else: # 定义 schema fields = [ FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=100, is_primary=True), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536), FieldSchema(name="message_id", dtype=DataType.VARCHAR, max_length=100), FieldSchema(name="task", dtype=DataType.VARCHAR, max_length=2000), FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=50000), FieldSchema(name="types", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=20, max_length=50), FieldSchema(name="tags", dtype=DataType.JSON), FieldSchema(name="tag_keys", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=50, max_length=100), FieldSchema(name="scopes", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=20, max_length=100), FieldSchema(name="owner", dtype=DataType.VARCHAR, max_length=200), FieldSchema(name="resource_ids", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=50, max_length=200), FieldSchema(name="source", dtype=DataType.JSON), FieldSchema(name="eval", dtype=DataType.JSON), FieldSchema(name="created_at", dtype=DataType.INT64), FieldSchema(name="updated_at", dtype=DataType.INT64), FieldSchema(name="status", dtype=DataType.VARCHAR, max_length=20, default_value="approved"), FieldSchema(name="relationships", dtype=DataType.VARCHAR, max_length=10000, default_value="[]"), ] schema = CollectionSchema(fields, description="KnowHub Knowledge") self.collection = Collection(collection_name, schema) # 创建向量索引 index_params = { "metric_type": "COSINE", "index_type": "HNSW", "params": {"M": 16, "efConstruction": 200} } self.collection.create_index("embedding", index_params) # 为 status 创建 Trie 标量索引(加速过滤) try: self.collection.create_index("status", {"index_type": "Trie"}) except Exception: pass self.collection.load() def insert(self, knowledge: Dict): """ 插入单条知识 Args: knowledge: 知识数据(包含 embedding) """ self.collection.insert([knowledge]) self.collection.flush() def insert_batch(self, knowledge_list: List[Dict]): """ 批量插入知识 Args: knowledge_list: 知识列表 """ if not knowledge_list: return self.collection.insert(knowledge_list) self.collection.flush() def search(self, query_embedding: List[float], filters: Optional[str] = None, limit: int = 10) -> List[Dict]: """ 向量检索 + 标量过滤 Args: query_embedding: 查询向量 filters: 过滤表达式(如: 'owner == "agent"') limit: 返回数量 Returns: 知识列表 """ search_params = {"metric_type": "COSINE", "params": {"ef": 100}} results = self.collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=limit, expr=filters, output_fields=["id", "message_id", "task", "content", "types", "tags", "tag_keys", "scopes", "owner", "resource_ids", "source", "eval", "created_at", "updated_at", "status", "relationships"] ) if not results or not results[0]: return [] # 返回实体字典,包含所有字段 # 注意:时间戳需要转换为毫秒(JavaScript Date 需要) return [ { "id": hit.entity.get("id"), "message_id": hit.entity.get("message_id"), "task": hit.entity.get("task"), "content": hit.entity.get("content"), "types": list(hit.entity.get("types")) if hit.entity.get("types") else [], "tags": hit.entity.get("tags"), "tag_keys": list(hit.entity.get("tag_keys")) if hit.entity.get("tag_keys") else [], "scopes": list(hit.entity.get("scopes")) if hit.entity.get("scopes") else [], "owner": hit.entity.get("owner"), "resource_ids": list(hit.entity.get("resource_ids")) if hit.entity.get("resource_ids") else [], "source": hit.entity.get("source"), "eval": hit.entity.get("eval"), "created_at": hit.entity.get("created_at") * 1000 if hit.entity.get("created_at") else None, "updated_at": hit.entity.get("updated_at") * 1000 if hit.entity.get("updated_at") else None, "status": hit.entity.get("status", "approved"), "relationships": json.loads(hit.entity.get("relationships") or "[]"), } for hit in results[0] ] def query(self, filters: str, limit: int = 100) -> List[Dict]: """ 纯标量查询(不使用向量) Args: filters: 过滤表达式 limit: 返回数量 Returns: 知识列表 """ results = self.collection.query( expr=filters, output_fields=["id", "message_id", "task", "content", "types", "tags", "tag_keys", "scopes", "owner", "resource_ids", "source", "eval", "created_at", "updated_at", "status", "relationships"], limit=limit ) # 转换时间戳为毫秒,确保数组字段格式正确 for r in results: if r.get("created_at"): r["created_at"] = r["created_at"] * 1000 if r.get("updated_at"): r["updated_at"] = r["updated_at"] * 1000 # 确保数组字段是列表格式 if r.get("types") and not isinstance(r["types"], list): r["types"] = list(r["types"]) if r.get("tag_keys") and not isinstance(r["tag_keys"], list): r["tag_keys"] = list(r["tag_keys"]) if r.get("scopes") and not isinstance(r["scopes"], list): r["scopes"] = list(r["scopes"]) if r.get("resource_ids") and not isinstance(r["resource_ids"], list): r["resource_ids"] = list(r["resource_ids"]) # 兼容旧数据(无 status/relationships 字段) if "status" not in r: r["status"] = "approved" if "relationships" not in r or r["relationships"] is None: r["relationships"] = [] else: r["relationships"] = json.loads(r["relationships"]) if isinstance(r["relationships"], str) else r["relationships"] return results def get_by_id(self, knowledge_id: str) -> Optional[Dict]: """ 根据 ID 获取知识 Args: knowledge_id: 知识 ID Returns: 知识数据,不存在返回 None """ results = self.collection.query( expr=f'id == "{knowledge_id}"', output_fields=["id", "embedding", "message_id", "task", "content", "types", "tags", "tag_keys", "scopes", "owner", "resource_ids", "source", "eval", "created_at", "updated_at", "status", "relationships"] ) if not results: return None # 转换时间戳和数组字段 r = results[0] if r.get("created_at"): r["created_at"] = r["created_at"] * 1000 if r.get("updated_at"): r["updated_at"] = r["updated_at"] * 1000 if r.get("types") and not isinstance(r["types"], list): r["types"] = list(r["types"]) if r.get("tag_keys") and not isinstance(r["tag_keys"], list): r["tag_keys"] = list(r["tag_keys"]) if r.get("scopes") and not isinstance(r["scopes"], list): r["scopes"] = list(r["scopes"]) if r.get("resource_ids") and not isinstance(r["resource_ids"], list): r["resource_ids"] = list(r["resource_ids"]) # 兼容旧数据 if "status" not in r: r["status"] = "approved" if "relationships" not in r or r["relationships"] is None: r["relationships"] = [] else: r["relationships"] = json.loads(r["relationships"]) if isinstance(r["relationships"], str) else r["relationships"] return r def update(self, knowledge_id: str, updates: Dict): """ 更新知识(先删除再插入) Args: knowledge_id: 知识 ID updates: 更新字段 """ # 1. 查询现有数据 existing = self.get_by_id(knowledge_id) if not existing: raise ValueError(f"Knowledge not found: {knowledge_id}") # 2. 合并更新 existing.update(updates) existing["updated_at"] = int(time.time()) # 3. 还原 get_by_id 的展示层转换,确保存储格式正确 # created_at 被 get_by_id 乘以 1000(毫秒),需还原为秒 if existing.get("created_at") and existing["created_at"] > 1_000_000_000_000: existing["created_at"] = existing["created_at"] // 1000 # relationships 被 get_by_id 反序列化为 list,需还原为 JSON 字符串 if isinstance(existing.get("relationships"), list): existing["relationships"] = json.dumps(existing["relationships"]) # 4. 删除旧数据 self.delete(knowledge_id) # 5. 插入新数据 self.insert(existing) def delete(self, knowledge_id: str): """ 删除知识 Args: knowledge_id: 知识 ID """ self.collection.delete(f'id == "{knowledge_id}"') self.collection.flush() def count(self) -> int: """返回知识总数""" return self.collection.num_entities def drop_collection(self): """删除 collection(危险操作)""" utility.drop_collection("knowledge") def migrate_schema(self) -> int: """ 将旧 collection(无 status/relationships 字段)迁移到新 schema。 采用中转 collection 模式,Step 3 之前数据始终有两份副本。 返回迁移的知识条数。 """ MIGRATION_NAME = "knowledge_migration" MAIN_NAME = "knowledge" # 如果中转 collection 已存在(上次迁移中途失败),先清理 if utility.has_collection(MIGRATION_NAME): print(f"[Migrate] 检测到残留中转 collection,清理...") utility.drop_collection(MIGRATION_NAME) # Step 1: 创建中转 collection(新 schema) print(f"[Migrate] Step 1: 创建中转 collection {MIGRATION_NAME}...") fields = [ FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=100, is_primary=True), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536), FieldSchema(name="message_id", dtype=DataType.VARCHAR, max_length=100), FieldSchema(name="task", dtype=DataType.VARCHAR, max_length=2000), FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=50000), FieldSchema(name="types", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=20, max_length=50), FieldSchema(name="tags", dtype=DataType.JSON), FieldSchema(name="tag_keys", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=50, max_length=100), FieldSchema(name="scopes", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=20, max_length=100), FieldSchema(name="owner", dtype=DataType.VARCHAR, max_length=200), FieldSchema(name="resource_ids", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=50, max_length=200), FieldSchema(name="source", dtype=DataType.JSON), FieldSchema(name="eval", dtype=DataType.JSON), FieldSchema(name="created_at", dtype=DataType.INT64), FieldSchema(name="updated_at", dtype=DataType.INT64), FieldSchema(name="status", dtype=DataType.VARCHAR, max_length=20, default_value="approved"), FieldSchema(name="relationships", dtype=DataType.VARCHAR, max_length=10000, default_value="[]"), ] schema = CollectionSchema(fields, description="KnowHub Knowledge") migration_col = Collection(MIGRATION_NAME, schema) migration_col.create_index("embedding", {"metric_type": "COSINE", "index_type": "HNSW", "params": {"M": 16, "efConstruction": 200}}) try: migration_col.create_index("status", {"index_type": "Trie"}) except Exception: pass migration_col.load() # Step 2: 从旧 collection 逐批读取,补字段,插入中转 print(f"[Migrate] Step 2: 读取旧数据并插入中转 collection...") batch_size = 200 offset = 0 total = 0 while True: batch = self.collection.query( expr='id != ""', output_fields=["id", "embedding", "message_id", "task", "content", "types", "tags", "tag_keys", "scopes", "owner", "resource_ids", "source", "eval", "created_at", "updated_at"], limit=batch_size, offset=offset ) if not batch: break for item in batch: item["status"] = item.get("status", "approved") item["relationships"] = item.get("relationships") or [] # 时间戳已是秒级(query 返回原始值,未乘 1000) migration_col.insert(batch) migration_col.flush() total += len(batch) offset += len(batch) print(f"[Migrate] 已迁移 {total} 条...") if len(batch) < batch_size: break # Step 3: drop 旧 collection print(f"[Migrate] Step 3: drop 旧 collection {MAIN_NAME}...") self.collection.release() utility.drop_collection(MAIN_NAME) # Step 4: 创建新 collection(同名,新 schema) print(f"[Migrate] Step 4: 创建新 collection {MAIN_NAME}...") new_col = Collection(MAIN_NAME, schema) new_col.create_index("embedding", {"metric_type": "COSINE", "index_type": "HNSW", "params": {"M": 16, "efConstruction": 200}}) try: new_col.create_index("status", {"index_type": "Trie"}) except Exception: pass new_col.load() # Step 5: 从中转 collection 读取,插入新 collection print(f"[Migrate] Step 5: 从中转 collection 回写到新 collection...") offset = 0 while True: batch = migration_col.query( expr='id != ""', output_fields=["id", "embedding", "message_id", "task", "content", "types", "tags", "tag_keys", "scopes", "owner", "resource_ids", "source", "eval", "created_at", "updated_at", "status", "relationships"], limit=batch_size, offset=offset ) if not batch: break new_col.insert(batch) new_col.flush() offset += len(batch) if len(batch) < batch_size: break # Step 6: drop 中转 collection print(f"[Migrate] Step 6: drop 中转 collection {MIGRATION_NAME}...") migration_col.release() utility.drop_collection(MIGRATION_NAME) # Step 7: 更新 self.collection 引用 print(f"[Migrate] Step 7: 更新 collection 引用...") self.collection = new_col print(f"[Migrate] 迁移完成,共迁移 {total} 条知识。") return total