123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- """
- 文档服务 - 文档管理和处理服务
- """
- from typing import List, Dict, Any, Optional
- from pathlib import Path
- from loguru import logger
- from langchain.schema import Document
- from ..core.document_processor import document_processor
- from ..core.vector_store import vector_store_manager
- from ..utils.cache import cache_manager
- class DocumentService:
- """文档服务"""
-
- def __init__(self):
- self.default_store_type = "chroma"
- self.default_collection = "documents"
-
- def process_and_store_document(self, file_path: str,
- store_type: str = None,
- collection_name: str = None,
- chunk_size: int = 1000,
- chunk_overlap: int = 200,
- additional_metadata: Dict[str, Any] = None) -> Dict[str, Any]:
- """处理并存储文档"""
- try:
- # 处理文档
- documents = document_processor.process_document_pipeline(
- file_path, chunk_size, chunk_overlap, additional_metadata
- )
-
- # 存储到向量数据库
- vector_store_manager.add_documents(
- documents,
- store_type or self.default_store_type,
- collection_name or self.default_collection
- )
-
- # 获取统计信息
- stats = document_processor.get_document_stats(documents)
- collection_stats = vector_store_manager.get_collection_stats(
- store_type or self.default_store_type,
- collection_name or self.default_collection
- )
-
- result = {
- "file_path": file_path,
- "processed_documents": len(documents),
- "document_stats": stats,
- "collection_stats": collection_stats,
- "store_type": store_type or self.default_store_type,
- "collection_name": collection_name or self.default_collection
- }
-
- logger.info(f"文档处理完成: {file_path}")
- return result
-
- except Exception as e:
- logger.error(f"文档处理失败: {file_path}, 错误: {e}")
- raise
-
- def process_and_store_directory(self, directory_path: str,
- store_type: str = None,
- collection_name: str = None,
- recursive: bool = True,
- chunk_size: int = 1000,
- chunk_overlap: int = 200,
- additional_metadata: Dict[str, Any] = None) -> Dict[str, Any]:
- """处理并存储目录中的所有文档"""
- try:
- # 加载目录中的所有文档
- documents = document_processor.load_directory(directory_path, recursive)
-
- if not documents:
- logger.warning(f"目录中没有找到可处理的文档: {directory_path}")
- return {
- "directory_path": directory_path,
- "processed_documents": 0,
- "message": "没有找到可处理的文档"
- }
-
- # 分割文档
- split_docs = document_processor.split_documents(
- documents, chunk_size, chunk_overlap
- )
-
- # 添加元数据
- if additional_metadata:
- split_docs = document_processor.add_metadata(split_docs, additional_metadata)
-
- # 存储到向量数据库
- vector_store_manager.add_documents(
- split_docs,
- store_type or self.default_store_type,
- collection_name or self.default_collection
- )
-
- # 获取统计信息
- stats = document_processor.get_document_stats(split_docs)
- collection_stats = vector_store_manager.get_collection_stats(
- store_type or self.default_store_type,
- collection_name or self.default_collection
- )
-
- result = {
- "directory_path": directory_path,
- "processed_documents": len(split_docs),
- "original_documents": len(documents),
- "document_stats": stats,
- "collection_stats": collection_stats,
- "store_type": store_type or self.default_store_type,
- "collection_name": collection_name or self.default_collection
- }
-
- logger.info(f"目录处理完成: {directory_path}")
- return result
-
- except Exception as e:
- logger.error(f"目录处理失败: {directory_path}, 错误: {e}")
- raise
-
- def search_documents(self, query: str, k: int = 4,
- store_type: str = None, collection_name: str = None) -> List[Dict[str, Any]]:
- """搜索文档"""
- documents = vector_store_manager.similarity_search(
- query, k, store_type or self.default_store_type,
- collection_name or self.default_collection
- )
-
- results = []
- for doc in documents:
- result = {
- "content": doc.page_content,
- "metadata": doc.metadata,
- "source": doc.metadata.get("source", "unknown")
- }
- results.append(result)
-
- return results
-
- def search_documents_with_scores(self, query: str, k: int = 4,
- store_type: str = None, collection_name: str = None) -> List[Dict[str, Any]]:
- """带分数的文档搜索"""
- results_with_scores = vector_store_manager.similarity_search_with_score(
- query, k, store_type or self.default_store_type,
- collection_name or self.default_collection
- )
-
- results = []
- for doc, score in results_with_scores:
- result = {
- "content": doc.page_content,
- "metadata": doc.metadata,
- "source": doc.metadata.get("source", "unknown"),
- "similarity_score": score
- }
- results.append(result)
-
- return results
-
- def get_document_collections(self, store_type: str = None) -> List[str]:
- """获取文档集合列表"""
- return vector_store_manager.list_collections(
- store_type or self.default_store_type
- )
-
- def get_collection_info(self, store_type: str = None, collection_name: str = None) -> Dict[str, Any]:
- """获取集合信息"""
- return vector_store_manager.get_collection_stats(
- store_type or self.default_store_type,
- collection_name or self.default_collection
- )
-
- def delete_collection(self, store_type: str = None, collection_name: str = None) -> None:
- """删除文档集合"""
- vector_store_manager.delete_collection(
- store_type or self.default_store_type,
- collection_name or self.default_collection
- )
- logger.info(f"已删除文档集合: {collection_name or self.default_collection}")
-
- def export_documents(self, store_type: str = None, collection_name: str = None,
- format: str = "json") -> str:
- """导出文档集合"""
- # 获取所有文档(这里简化处理,实际可能需要分批获取)
- documents = vector_store_manager.similarity_search(
- "", 1000, store_type or self.default_store_type,
- collection_name or self.default_collection
- )
-
- if format == "json":
- import json
- export_data = {
- "collection_name": collection_name or self.default_collection,
- "store_type": store_type or self.default_store_type,
- "document_count": len(documents),
- "documents": [
- {
- "content": doc.page_content,
- "metadata": doc.metadata
- }
- for doc in documents
- ]
- }
- return json.dumps(export_data, ensure_ascii=False, indent=2)
- else:
- raise ValueError(f"不支持的导出格式: {format}")
-
- def get_supported_formats(self) -> List[str]:
- """获取支持的文件格式"""
- return list(document_processor.supported_extensions.keys())
-
- def validate_document(self, file_path: str) -> Dict[str, Any]:
- """验证文档"""
- file_path = Path(file_path)
-
- if not file_path.exists():
- return {
- "valid": False,
- "error": "文件不存在"
- }
-
- extension = file_path.suffix.lower()
- if extension not in document_processor.supported_extensions:
- return {
- "valid": False,
- "error": f"不支持的文件格式: {extension}",
- "supported_formats": self.get_supported_formats()
- }
-
- try:
- # 尝试加载文档
- documents = document_processor.load_document(str(file_path))
- stats = document_processor.get_document_stats(documents)
-
- return {
- "valid": True,
- "file_path": str(file_path),
- "file_size": file_path.stat().st_size,
- "extension": extension,
- "document_count": len(documents),
- "stats": stats
- }
- except Exception as e:
- return {
- "valid": False,
- "error": str(e)
- }
- # 全局文档服务实例
- document_service = DocumentService()
|