""" 文档服务 - 文档管理和处理服务 """ from typing import List, Dict, Any, Optional from pathlib import Path from loguru import logger from langchain.schema import Document from ..core.document_processor import document_processor from ..core.vector_store import vector_store_manager from ..utils.cache import cache_manager class DocumentService: """文档服务""" def __init__(self): self.default_store_type = "chroma" self.default_collection = "documents" def process_and_store_document(self, file_path: str, store_type: str = None, collection_name: str = None, chunk_size: int = 1000, chunk_overlap: int = 200, additional_metadata: Dict[str, Any] = None) -> Dict[str, Any]: """处理并存储文档""" try: # 处理文档 documents = document_processor.process_document_pipeline( file_path, chunk_size, chunk_overlap, additional_metadata ) # 存储到向量数据库 vector_store_manager.add_documents( documents, store_type or self.default_store_type, collection_name or self.default_collection ) # 获取统计信息 stats = document_processor.get_document_stats(documents) collection_stats = vector_store_manager.get_collection_stats( store_type or self.default_store_type, collection_name or self.default_collection ) result = { "file_path": file_path, "processed_documents": len(documents), "document_stats": stats, "collection_stats": collection_stats, "store_type": store_type or self.default_store_type, "collection_name": collection_name or self.default_collection } logger.info(f"文档处理完成: {file_path}") return result except Exception as e: logger.error(f"文档处理失败: {file_path}, 错误: {e}") raise def process_and_store_directory(self, directory_path: str, store_type: str = None, collection_name: str = None, recursive: bool = True, chunk_size: int = 1000, chunk_overlap: int = 200, additional_metadata: Dict[str, Any] = None) -> Dict[str, Any]: """处理并存储目录中的所有文档""" try: # 加载目录中的所有文档 documents = document_processor.load_directory(directory_path, recursive) if not documents: logger.warning(f"目录中没有找到可处理的文档: {directory_path}") return { "directory_path": directory_path, "processed_documents": 0, "message": "没有找到可处理的文档" } # 分割文档 split_docs = document_processor.split_documents( documents, chunk_size, chunk_overlap ) # 添加元数据 if additional_metadata: split_docs = document_processor.add_metadata(split_docs, additional_metadata) # 存储到向量数据库 vector_store_manager.add_documents( split_docs, store_type or self.default_store_type, collection_name or self.default_collection ) # 获取统计信息 stats = document_processor.get_document_stats(split_docs) collection_stats = vector_store_manager.get_collection_stats( store_type or self.default_store_type, collection_name or self.default_collection ) result = { "directory_path": directory_path, "processed_documents": len(split_docs), "original_documents": len(documents), "document_stats": stats, "collection_stats": collection_stats, "store_type": store_type or self.default_store_type, "collection_name": collection_name or self.default_collection } logger.info(f"目录处理完成: {directory_path}") return result except Exception as e: logger.error(f"目录处理失败: {directory_path}, 错误: {e}") raise def search_documents(self, query: str, k: int = 4, store_type: str = None, collection_name: str = None) -> List[Dict[str, Any]]: """搜索文档""" documents = vector_store_manager.similarity_search( query, k, store_type or self.default_store_type, collection_name or self.default_collection ) results = [] for doc in documents: result = { "content": doc.page_content, "metadata": doc.metadata, "source": doc.metadata.get("source", "unknown") } results.append(result) return results def search_documents_with_scores(self, query: str, k: int = 4, store_type: str = None, collection_name: str = None) -> List[Dict[str, Any]]: """带分数的文档搜索""" results_with_scores = vector_store_manager.similarity_search_with_score( query, k, store_type or self.default_store_type, collection_name or self.default_collection ) results = [] for doc, score in results_with_scores: result = { "content": doc.page_content, "metadata": doc.metadata, "source": doc.metadata.get("source", "unknown"), "similarity_score": score } results.append(result) return results def get_document_collections(self, store_type: str = None) -> List[str]: """获取文档集合列表""" return vector_store_manager.list_collections( store_type or self.default_store_type ) def get_collection_info(self, store_type: str = None, collection_name: str = None) -> Dict[str, Any]: """获取集合信息""" return vector_store_manager.get_collection_stats( store_type or self.default_store_type, collection_name or self.default_collection ) def delete_collection(self, store_type: str = None, collection_name: str = None) -> None: """删除文档集合""" vector_store_manager.delete_collection( store_type or self.default_store_type, collection_name or self.default_collection ) logger.info(f"已删除文档集合: {collection_name or self.default_collection}") def export_documents(self, store_type: str = None, collection_name: str = None, format: str = "json") -> str: """导出文档集合""" # 获取所有文档(这里简化处理,实际可能需要分批获取) documents = vector_store_manager.similarity_search( "", 1000, store_type or self.default_store_type, collection_name or self.default_collection ) if format == "json": import json export_data = { "collection_name": collection_name or self.default_collection, "store_type": store_type or self.default_store_type, "document_count": len(documents), "documents": [ { "content": doc.page_content, "metadata": doc.metadata } for doc in documents ] } return json.dumps(export_data, ensure_ascii=False, indent=2) else: raise ValueError(f"不支持的导出格式: {format}") def get_supported_formats(self) -> List[str]: """获取支持的文件格式""" return list(document_processor.supported_extensions.keys()) def validate_document(self, file_path: str) -> Dict[str, Any]: """验证文档""" file_path = Path(file_path) if not file_path.exists(): return { "valid": False, "error": "文件不存在" } extension = file_path.suffix.lower() if extension not in document_processor.supported_extensions: return { "valid": False, "error": f"不支持的文件格式: {extension}", "supported_formats": self.get_supported_formats() } try: # 尝试加载文档 documents = document_processor.load_document(str(file_path)) stats = document_processor.get_document_stats(documents) return { "valid": True, "file_path": str(file_path), "file_size": file_path.stat().st_size, "extension": extension, "document_count": len(documents), "stats": stats } except Exception as e: return { "valid": False, "error": str(e) } # 全局文档服务实例 document_service = DocumentService()