from pymilvus import Collection, CollectionSchema, FieldSchema, DataType, connections, utility from typing import List, Optional # 配置您的Milvus服务器信息 MILVUS_HOST = '192.168.205.14' # 或者Docker容器的IP地址 MILVUS_PORT = '19530' # 默认端口 # 连接到Milvus服务器 connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT) # 定义集合参数 collection_name = "two_year_all_video_titles" # 向量维度 dim = 768 # 创建字段模式 fields = [FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), FieldSchema(name="title_vector", dtype=DataType.FLOAT_VECTOR, dim=dim), FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=256, description="视频标题"), FieldSchema(name="preview_times", dtype=DataType.INT64, description="预曝光次数"), FieldSchema(name="preview_users", dtype=DataType.INT64, description="预曝光用户数"), FieldSchema(name="view_times", dtype=DataType.INT64, description="曝光次数"), FieldSchema(name="view_users", dtype=DataType.INT64, description="曝光用户数"), FieldSchema(name="play_times", dtype=DataType.INT64, description="播放次数"), FieldSchema(name="play_users", dtype=DataType.INT64, description="播放用户数"), FieldSchema(name="share_times", dtype=DataType.INT64, description="分享次数"), FieldSchema(name="share_users", dtype=DataType.INT64, description="分享用户数"), FieldSchema(name="return_times", dtype=DataType.INT64, description="回看次数"), FieldSchema(name="return_users", dtype=DataType.INT64, description="回看用户数"), FieldSchema(name="create_time", dtype=DataType.INT64, description="创建时间戳") ] schema = CollectionSchema(fields, description="两年内的分发过的视频标题") # 创建集合(如果不存在) if collection_name not in utility.list_collections(): collection = Collection(name=collection_name, schema=schema) else: collection = Collection(name=collection_name) # 索引参数 index_params = { "index_type": "IVF_FLAT", # 选择合适的索引类型 "metric_type": "IP", # 适合文本向量的度量类型 "params": {"nlist": 128} # 根据需要调整参数 } # 创建索引 collection.create_index(field_name="title_vector", index_params=index_params) collection.load() def insert_vector(data): # 插入数据 mr = collection.insert(data) print(f"insert result: {mr}") # 提交更改 collection.load() def query(vector: List, top_k: int, expr: Optional[str], output_fields=['title', 'preview_times'], search_params={"metric_type": "IP", "offset": 0, "ignore_growing": False, "params": {"nprobe": 10}}): """ 先根据字段过滤条件查询,然后在结果上执行向量相似度搜索。 """ collection = Collection(name=collection_name) # 执行组合查询 results = collection.search(data=vector, anns_field="title_vector", param=search_params, limit=top_k, expr=expr, output_fields=output_fields, consistency_level="Strong") return results