MilvusComponent.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. from pymilvus import Collection, CollectionSchema, FieldSchema, DataType, connections, utility
  2. from typing import List, Optional
  3. # 配置您的Milvus服务器信息
  4. MILVUS_HOST = '192.168.205.14' # 或者Docker容器的IP地址
  5. MILVUS_PORT = '19530' # 默认端口
  6. # 连接到Milvus服务器
  7. connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT)
  8. # 定义集合参数
  9. collection_name = "two_year_all_video_titles"
  10. # 向量维度
  11. dim = 768
  12. # 创建字段模式
  13. fields = [FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
  14. FieldSchema(name="title_vector",
  15. dtype=DataType.FLOAT_VECTOR, dim=dim),
  16. FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=256,
  17. description="视频标题"),
  18. FieldSchema(name="preview_times",
  19. dtype=DataType.INT64, description="预曝光次数"),
  20. FieldSchema(name="preview_users", dtype=DataType.INT64,
  21. description="预曝光用户数"),
  22. FieldSchema(name="view_times", dtype=DataType.INT64,
  23. description="曝光次数"),
  24. FieldSchema(name="view_users", dtype=DataType.INT64,
  25. description="曝光用户数"),
  26. FieldSchema(name="play_times", dtype=DataType.INT64,
  27. description="播放次数"),
  28. FieldSchema(name="play_users", dtype=DataType.INT64,
  29. description="播放用户数"),
  30. FieldSchema(name="share_times", dtype=DataType.INT64,
  31. description="分享次数"),
  32. FieldSchema(name="share_users", dtype=DataType.INT64,
  33. description="分享用户数"),
  34. FieldSchema(name="return_times",
  35. dtype=DataType.INT64, description="回看次数"),
  36. FieldSchema(name="return_users",
  37. dtype=DataType.INT64, description="回看用户数"),
  38. FieldSchema(name="create_time", dtype=DataType.INT64,
  39. description="创建时间戳")
  40. ]
  41. schema = CollectionSchema(fields, description="两年内的分发过的视频标题")
  42. # 创建集合(如果不存在)
  43. if collection_name not in utility.list_collections():
  44. collection = Collection(name=collection_name, schema=schema)
  45. else:
  46. collection = Collection(name=collection_name)
  47. # 索引参数
  48. index_params = {
  49. "index_type": "IVF_FLAT", # 选择合适的索引类型
  50. "metric_type": "IP", # 适合文本向量的度量类型
  51. "params": {"nlist": 128} # 根据需要调整参数
  52. }
  53. # 创建索引
  54. collection.create_index(field_name="title_vector", index_params=index_params)
  55. collection.load()
  56. def insert_vector(data):
  57. # 插入数据
  58. mr = collection.insert(data)
  59. print(f"insert result: {mr}")
  60. # 提交更改
  61. collection.load()
  62. def query(vector: List, top_k: int, expr: Optional[str], output_fields=['title', 'preview_times'], search_params={"metric_type": "IP", "offset": 0,
  63. "ignore_growing": False, "params": {"nprobe": 10}}):
  64. """
  65. 先根据字段过滤条件查询,然后在结果上执行向量相似度搜索。
  66. """
  67. collection = Collection(name=collection_name)
  68. # 执行组合查询
  69. results = collection.search(data=vector, anns_field="title_vector", param=search_params,
  70. limit=top_k, expr=expr, output_fields=output_fields,
  71. consistency_level="Strong")
  72. return results