123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- from pymilvus import Collection, CollectionSchema, FieldSchema, DataType, connections, utility
- from typing import List, Optional
- # 配置您的Milvus服务器信息
- MILVUS_HOST = '192.168.205.14' # 或者Docker容器的IP地址
- MILVUS_PORT = '19530' # 默认端口
- # 连接到Milvus服务器
- connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT)
- # 定义集合参数
- collection_name = "two_year_all_video_titles"
- # 向量维度
- dim = 768
- # 创建字段模式
- fields = [FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
- FieldSchema(name="title_vector",
- dtype=DataType.FLOAT_VECTOR, dim=dim),
- FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=256,
- description="视频标题"),
- FieldSchema(name="preview_times",
- dtype=DataType.INT64, description="预曝光次数"),
- FieldSchema(name="preview_users", dtype=DataType.INT64,
- description="预曝光用户数"),
- FieldSchema(name="view_times", dtype=DataType.INT64,
- description="曝光次数"),
- FieldSchema(name="view_users", dtype=DataType.INT64,
- description="曝光用户数"),
- FieldSchema(name="play_times", dtype=DataType.INT64,
- description="播放次数"),
- FieldSchema(name="play_users", dtype=DataType.INT64,
- description="播放用户数"),
- FieldSchema(name="share_times", dtype=DataType.INT64,
- description="分享次数"),
- FieldSchema(name="share_users", dtype=DataType.INT64,
- description="分享用户数"),
- FieldSchema(name="return_times",
- dtype=DataType.INT64, description="回看次数"),
- FieldSchema(name="return_users",
- dtype=DataType.INT64, description="回看用户数"),
- FieldSchema(name="create_time", dtype=DataType.INT64,
- description="创建时间戳")
- ]
- schema = CollectionSchema(fields, description="两年内的分发过的视频标题")
- # 创建集合(如果不存在)
- if collection_name not in utility.list_collections():
- collection = Collection(name=collection_name, schema=schema)
- else:
- collection = Collection(name=collection_name)
- # 索引参数
- index_params = {
- "index_type": "IVF_FLAT", # 选择合适的索引类型
- "metric_type": "IP", # 适合文本向量的度量类型
- "params": {"nlist": 128} # 根据需要调整参数
- }
- # 创建索引
- collection.create_index(field_name="title_vector", index_params=index_params)
- collection.load()
- def insert_vector(data):
- # 插入数据
- mr = collection.insert(data)
- print(f"insert result: {mr}")
- # 提交更改
- collection.load()
- def query(vector: List, top_k: int, expr: Optional[str], output_fields=['title', 'preview_times'], search_params={"metric_type": "IP", "offset": 0,
- "ignore_growing": False, "params": {"nprobe": 10}}):
- """
- 先根据字段过滤条件查询,然后在结果上执行向量相似度搜索。
- """
- collection = Collection(name=collection_name)
- # 执行组合查询
- results = collection.search(data=vector, anns_field="title_vector", param=search_params,
- limit=top_k, expr=expr, output_fields=output_fields,
- consistency_level="Strong")
- return results
|