| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463 |
- """
- Milvus Lite 存储封装
- 单一存储架构,存储完整知识数据 + 向量。
- """
- from milvus import default_server
- from pymilvus import (
- connections, Collection, FieldSchema,
- CollectionSchema, DataType, utility
- )
- from typing import List, Dict, Optional
- import json
- import time
- class MilvusStore:
- def __init__(self, data_dir: str = "./milvus_data"):
- """
- 初始化 Milvus Lite 存储
- Args:
- data_dir: 数据存储目录
- """
- # 启动内嵌服务器
- default_server.set_base_dir(data_dir)
- # 检查是否已经有 Milvus 实例在运行
- try:
- # 尝试连接到可能已存在的实例
- connections.connect(
- alias="default",
- host='127.0.0.1',
- port=default_server.listen_port,
- timeout=5
- )
- print(f"[Milvus] 连接到已存在的 Milvus 实例 (端口 {default_server.listen_port})")
- except Exception:
- # 没有运行的实例,启动新的
- print(f"[Milvus] 启动新的 Milvus Lite 实例...")
- try:
- default_server.start()
- print(f"[Milvus] Milvus Lite 启动成功 (端口 {default_server.listen_port})")
- # 启动后建立连接
- connections.connect(
- alias="default",
- host='127.0.0.1',
- port=default_server.listen_port,
- timeout=5
- )
- print(f"[Milvus] 已连接到新启动的实例")
- except Exception as e:
- print(f"[Milvus] 启动失败: {e}")
- # 尝试连接到可能已经在运行的实例
- try:
- connections.connect(
- alias="default",
- host='127.0.0.1',
- port=default_server.listen_port,
- timeout=5
- )
- print(f"[Milvus] 连接到已存在的实例")
- except Exception as e2:
- raise RuntimeError(f"无法启动或连接到 Milvus: {e}, {e2}")
- self._init_collection()
- def _init_collection(self):
- """初始化 collection"""
- collection_name = "knowledge"
- if utility.has_collection(collection_name):
- self.collection = Collection(collection_name)
- else:
- # 定义 schema
- fields = [
- FieldSchema(name="id", dtype=DataType.VARCHAR,
- max_length=100, is_primary=True),
- FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR,
- dim=1536),
- FieldSchema(name="message_id", dtype=DataType.VARCHAR,
- max_length=100),
- FieldSchema(name="task", dtype=DataType.VARCHAR,
- max_length=2000),
- FieldSchema(name="content", dtype=DataType.VARCHAR,
- max_length=50000),
- FieldSchema(name="types", dtype=DataType.ARRAY,
- element_type=DataType.VARCHAR, max_capacity=20, max_length=50),
- FieldSchema(name="tags", dtype=DataType.JSON),
- FieldSchema(name="tag_keys", dtype=DataType.ARRAY,
- element_type=DataType.VARCHAR, max_capacity=50, max_length=100),
- FieldSchema(name="scopes", dtype=DataType.ARRAY,
- element_type=DataType.VARCHAR, max_capacity=20, max_length=100),
- FieldSchema(name="owner", dtype=DataType.VARCHAR,
- max_length=200),
- FieldSchema(name="resource_ids", dtype=DataType.ARRAY,
- element_type=DataType.VARCHAR, max_capacity=50, max_length=200),
- FieldSchema(name="source", dtype=DataType.JSON),
- FieldSchema(name="eval", dtype=DataType.JSON),
- FieldSchema(name="created_at", dtype=DataType.INT64),
- FieldSchema(name="updated_at", dtype=DataType.INT64),
- FieldSchema(name="status", dtype=DataType.VARCHAR,
- max_length=20, default_value="approved"),
- FieldSchema(name="relationships", dtype=DataType.VARCHAR,
- max_length=10000, default_value="[]"),
- ]
- schema = CollectionSchema(fields, description="KnowHub Knowledge")
- self.collection = Collection(collection_name, schema)
- # 创建向量索引
- index_params = {
- "metric_type": "COSINE",
- "index_type": "HNSW",
- "params": {"M": 16, "efConstruction": 200}
- }
- self.collection.create_index("embedding", index_params)
- # 为 status 创建 Trie 标量索引(加速过滤)
- try:
- self.collection.create_index("status", {"index_type": "Trie"})
- except Exception:
- pass
- self.collection.load()
- def insert(self, knowledge: Dict):
- """
- 插入单条知识
- Args:
- knowledge: 知识数据(包含 embedding)
- """
- self.collection.insert([knowledge])
- self.collection.flush()
- def insert_batch(self, knowledge_list: List[Dict]):
- """
- 批量插入知识
- Args:
- knowledge_list: 知识列表
- """
- if not knowledge_list:
- return
- self.collection.insert(knowledge_list)
- self.collection.flush()
- def search(self,
- query_embedding: List[float],
- filters: Optional[str] = None,
- limit: int = 10) -> List[Dict]:
- """
- 向量检索 + 标量过滤
- Args:
- query_embedding: 查询向量
- filters: 过滤表达式(如: 'owner == "agent"')
- limit: 返回数量
- Returns:
- 知识列表
- """
- search_params = {"metric_type": "COSINE", "params": {"ef": 100}}
- results = self.collection.search(
- data=[query_embedding],
- anns_field="embedding",
- param=search_params,
- limit=limit,
- expr=filters,
- output_fields=["id", "message_id", "task", "content", "types",
- "tags", "tag_keys", "scopes", "owner", "resource_ids",
- "source", "eval", "created_at", "updated_at",
- "status", "relationships"]
- )
- if not results or not results[0]:
- return []
- # 返回实体字典,包含所有字段
- # 注意:时间戳需要转换为毫秒(JavaScript Date 需要)
- return [
- {
- "id": hit.entity.get("id"),
- "message_id": hit.entity.get("message_id"),
- "task": hit.entity.get("task"),
- "content": hit.entity.get("content"),
- "types": list(hit.entity.get("types")) if hit.entity.get("types") else [],
- "tags": hit.entity.get("tags"),
- "tag_keys": list(hit.entity.get("tag_keys")) if hit.entity.get("tag_keys") else [],
- "scopes": list(hit.entity.get("scopes")) if hit.entity.get("scopes") else [],
- "owner": hit.entity.get("owner"),
- "resource_ids": list(hit.entity.get("resource_ids")) if hit.entity.get("resource_ids") else [],
- "source": hit.entity.get("source"),
- "eval": hit.entity.get("eval"),
- "created_at": hit.entity.get("created_at") * 1000 if hit.entity.get("created_at") else None,
- "updated_at": hit.entity.get("updated_at") * 1000 if hit.entity.get("updated_at") else None,
- "status": hit.entity.get("status", "approved"),
- "relationships": json.loads(hit.entity.get("relationships") or "[]"),
- "score": hit.score,
- }
- for hit in results[0]
- ]
- def query(self, filters: str, limit: int = 100) -> List[Dict]:
- """
- 纯标量查询(不使用向量)
- Args:
- filters: 过滤表达式
- limit: 返回数量
- Returns:
- 知识列表
- """
- results = self.collection.query(
- expr=filters,
- output_fields=["id", "message_id", "task", "content", "types",
- "tags", "tag_keys", "scopes", "owner", "resource_ids",
- "source", "eval", "created_at", "updated_at",
- "status", "relationships"],
- limit=limit
- )
- # 转换时间戳为毫秒,确保数组字段格式正确
- for r in results:
- if r.get("created_at"):
- r["created_at"] = r["created_at"] * 1000
- if r.get("updated_at"):
- r["updated_at"] = r["updated_at"] * 1000
- # 确保数组字段是列表格式
- if r.get("types") and not isinstance(r["types"], list):
- r["types"] = list(r["types"])
- if r.get("tag_keys") and not isinstance(r["tag_keys"], list):
- r["tag_keys"] = list(r["tag_keys"])
- if r.get("scopes") and not isinstance(r["scopes"], list):
- r["scopes"] = list(r["scopes"])
- if r.get("resource_ids") and not isinstance(r["resource_ids"], list):
- r["resource_ids"] = list(r["resource_ids"])
- # 兼容旧数据(无 status/relationships 字段)
- if "status" not in r:
- r["status"] = "approved"
- if "relationships" not in r or r["relationships"] is None:
- r["relationships"] = []
- else:
- r["relationships"] = json.loads(r["relationships"]) if isinstance(r["relationships"], str) else r["relationships"]
- return results
- def get_by_id(self, knowledge_id: str) -> Optional[Dict]:
- """
- 根据 ID 获取知识
- Args:
- knowledge_id: 知识 ID
- Returns:
- 知识数据,不存在返回 None
- """
- results = self.collection.query(
- expr=f'id == "{knowledge_id}"',
- output_fields=["id", "embedding", "message_id", "task", "content", "types",
- "tags", "tag_keys", "scopes", "owner", "resource_ids",
- "source", "eval", "created_at", "updated_at",
- "status", "relationships"]
- )
- if not results:
- return None
- # 转换时间戳和数组字段
- r = results[0]
- if r.get("created_at"):
- r["created_at"] = r["created_at"] * 1000
- if r.get("updated_at"):
- r["updated_at"] = r["updated_at"] * 1000
- if r.get("types") and not isinstance(r["types"], list):
- r["types"] = list(r["types"])
- if r.get("tag_keys") and not isinstance(r["tag_keys"], list):
- r["tag_keys"] = list(r["tag_keys"])
- if r.get("scopes") and not isinstance(r["scopes"], list):
- r["scopes"] = list(r["scopes"])
- if r.get("resource_ids") and not isinstance(r["resource_ids"], list):
- r["resource_ids"] = list(r["resource_ids"])
- # 兼容旧数据
- if "status" not in r:
- r["status"] = "approved"
- if "relationships" not in r or r["relationships"] is None:
- r["relationships"] = []
- else:
- r["relationships"] = json.loads(r["relationships"]) if isinstance(r["relationships"], str) else r["relationships"]
- return r
- def update(self, knowledge_id: str, updates: Dict):
- """
- 更新知识(先删除再插入)
- Args:
- knowledge_id: 知识 ID
- updates: 更新字段
- """
- # 1. 查询现有数据
- existing = self.get_by_id(knowledge_id)
- if not existing:
- raise ValueError(f"Knowledge not found: {knowledge_id}")
- # 2. 合并更新
- existing.update(updates)
- existing["updated_at"] = int(time.time())
- # 3. 还原 get_by_id 的展示层转换,确保存储格式正确
- # created_at 被 get_by_id 乘以 1000(毫秒),需还原为秒
- if existing.get("created_at") and existing["created_at"] > 1_000_000_000_000:
- existing["created_at"] = existing["created_at"] // 1000
- # relationships 被 get_by_id 反序列化为 list,需还原为 JSON 字符串
- if isinstance(existing.get("relationships"), list):
- existing["relationships"] = json.dumps(existing["relationships"])
- # 4. 删除旧数据
- self.delete(knowledge_id)
- # 5. 插入新数据
- self.insert(existing)
- def delete(self, knowledge_id: str):
- """
- 删除知识
- Args:
- knowledge_id: 知识 ID
- """
- self.collection.delete(f'id == "{knowledge_id}"')
- self.collection.flush()
- def count(self) -> int:
- """返回知识总数"""
- return self.collection.num_entities
- def drop_collection(self):
- """删除 collection(危险操作)"""
- utility.drop_collection("knowledge")
- def migrate_schema(self) -> int:
- """
- 将旧 collection(无 status/relationships 字段)迁移到新 schema。
- 采用中转 collection 模式,Step 3 之前数据始终有两份副本。
- 返回迁移的知识条数。
- """
- MIGRATION_NAME = "knowledge_migration"
- MAIN_NAME = "knowledge"
- # 如果中转 collection 已存在(上次迁移中途失败),先清理
- if utility.has_collection(MIGRATION_NAME):
- print(f"[Migrate] 检测到残留中转 collection,清理...")
- utility.drop_collection(MIGRATION_NAME)
- # Step 1: 创建中转 collection(新 schema)
- print(f"[Migrate] Step 1: 创建中转 collection {MIGRATION_NAME}...")
- fields = [
- FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=100, is_primary=True),
- FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536),
- FieldSchema(name="message_id", dtype=DataType.VARCHAR, max_length=100),
- FieldSchema(name="task", dtype=DataType.VARCHAR, max_length=2000),
- FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=50000),
- FieldSchema(name="types", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=20, max_length=50),
- FieldSchema(name="tags", dtype=DataType.JSON),
- FieldSchema(name="tag_keys", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=50, max_length=100),
- FieldSchema(name="scopes", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=20, max_length=100),
- FieldSchema(name="owner", dtype=DataType.VARCHAR, max_length=200),
- FieldSchema(name="resource_ids", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=50, max_length=200),
- FieldSchema(name="source", dtype=DataType.JSON),
- FieldSchema(name="eval", dtype=DataType.JSON),
- FieldSchema(name="created_at", dtype=DataType.INT64),
- FieldSchema(name="updated_at", dtype=DataType.INT64),
- FieldSchema(name="status", dtype=DataType.VARCHAR, max_length=20, default_value="approved"),
- FieldSchema(name="relationships", dtype=DataType.VARCHAR, max_length=10000, default_value="[]"),
- ]
- schema = CollectionSchema(fields, description="KnowHub Knowledge")
- migration_col = Collection(MIGRATION_NAME, schema)
- migration_col.create_index("embedding", {"metric_type": "COSINE", "index_type": "HNSW", "params": {"M": 16, "efConstruction": 200}})
- try:
- migration_col.create_index("status", {"index_type": "Trie"})
- except Exception:
- pass
- migration_col.load()
- # Step 2: 从旧 collection 逐批读取,补字段,插入中转
- print(f"[Migrate] Step 2: 读取旧数据并插入中转 collection...")
- batch_size = 200
- offset = 0
- total = 0
- while True:
- batch = self.collection.query(
- expr='id != ""',
- output_fields=["id", "embedding", "message_id", "task", "content", "types",
- "tags", "tag_keys", "scopes", "owner", "resource_ids",
- "source", "eval", "created_at", "updated_at"],
- limit=batch_size,
- offset=offset
- )
- if not batch:
- break
- for item in batch:
- item["status"] = item.get("status", "approved")
- item["relationships"] = item.get("relationships") or []
- # 时间戳已是秒级(query 返回原始值,未乘 1000)
- migration_col.insert(batch)
- migration_col.flush()
- total += len(batch)
- offset += len(batch)
- print(f"[Migrate] 已迁移 {total} 条...")
- if len(batch) < batch_size:
- break
- # Step 3: drop 旧 collection
- print(f"[Migrate] Step 3: drop 旧 collection {MAIN_NAME}...")
- self.collection.release()
- utility.drop_collection(MAIN_NAME)
- # Step 4: 创建新 collection(同名,新 schema)
- print(f"[Migrate] Step 4: 创建新 collection {MAIN_NAME}...")
- new_col = Collection(MAIN_NAME, schema)
- new_col.create_index("embedding", {"metric_type": "COSINE", "index_type": "HNSW", "params": {"M": 16, "efConstruction": 200}})
- try:
- new_col.create_index("status", {"index_type": "Trie"})
- except Exception:
- pass
- new_col.load()
- # Step 5: 从中转 collection 读取,插入新 collection
- print(f"[Migrate] Step 5: 从中转 collection 回写到新 collection...")
- offset = 0
- while True:
- batch = migration_col.query(
- expr='id != ""',
- output_fields=["id", "embedding", "message_id", "task", "content", "types",
- "tags", "tag_keys", "scopes", "owner", "resource_ids",
- "source", "eval", "created_at", "updated_at",
- "status", "relationships"],
- limit=batch_size,
- offset=offset
- )
- if not batch:
- break
- new_col.insert(batch)
- new_col.flush()
- offset += len(batch)
- if len(batch) < batch_size:
- break
- # Step 6: drop 中转 collection
- print(f"[Migrate] Step 6: drop 中转 collection {MIGRATION_NAME}...")
- migration_col.release()
- utility.drop_collection(MIGRATION_NAME)
- # Step 7: 更新 self.collection 引用
- print(f"[Migrate] Step 7: 更新 collection 引用...")
- self.collection = new_col
- print(f"[Migrate] 迁移完成,共迁移 {total} 条知识。")
- return total
|