document_service.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. """
  2. 文档服务 - 文档管理和处理服务
  3. """
  4. from typing import List, Dict, Any, Optional
  5. from pathlib import Path
  6. from loguru import logger
  7. from langchain.schema import Document
  8. from ..core.document_processor import document_processor
  9. from ..core.vector_store import vector_store_manager
  10. from ..utils.cache import cache_manager
  11. class DocumentService:
  12. """文档服务"""
  13. def __init__(self):
  14. self.default_store_type = "chroma"
  15. self.default_collection = "documents"
  16. def process_and_store_document(self, file_path: str,
  17. store_type: str = None,
  18. collection_name: str = None,
  19. chunk_size: int = 1000,
  20. chunk_overlap: int = 200,
  21. additional_metadata: Dict[str, Any] = None) -> Dict[str, Any]:
  22. """处理并存储文档"""
  23. try:
  24. # 处理文档
  25. documents = document_processor.process_document_pipeline(
  26. file_path, chunk_size, chunk_overlap, additional_metadata
  27. )
  28. # 存储到向量数据库
  29. vector_store_manager.add_documents(
  30. documents,
  31. store_type or self.default_store_type,
  32. collection_name or self.default_collection
  33. )
  34. # 获取统计信息
  35. stats = document_processor.get_document_stats(documents)
  36. collection_stats = vector_store_manager.get_collection_stats(
  37. store_type or self.default_store_type,
  38. collection_name or self.default_collection
  39. )
  40. result = {
  41. "file_path": file_path,
  42. "processed_documents": len(documents),
  43. "document_stats": stats,
  44. "collection_stats": collection_stats,
  45. "store_type": store_type or self.default_store_type,
  46. "collection_name": collection_name or self.default_collection
  47. }
  48. logger.info(f"文档处理完成: {file_path}")
  49. return result
  50. except Exception as e:
  51. logger.error(f"文档处理失败: {file_path}, 错误: {e}")
  52. raise
  53. def process_and_store_directory(self, directory_path: str,
  54. store_type: str = None,
  55. collection_name: str = None,
  56. recursive: bool = True,
  57. chunk_size: int = 1000,
  58. chunk_overlap: int = 200,
  59. additional_metadata: Dict[str, Any] = None) -> Dict[str, Any]:
  60. """处理并存储目录中的所有文档"""
  61. try:
  62. # 加载目录中的所有文档
  63. documents = document_processor.load_directory(directory_path, recursive)
  64. if not documents:
  65. logger.warning(f"目录中没有找到可处理的文档: {directory_path}")
  66. return {
  67. "directory_path": directory_path,
  68. "processed_documents": 0,
  69. "message": "没有找到可处理的文档"
  70. }
  71. # 分割文档
  72. split_docs = document_processor.split_documents(
  73. documents, chunk_size, chunk_overlap
  74. )
  75. # 添加元数据
  76. if additional_metadata:
  77. split_docs = document_processor.add_metadata(split_docs, additional_metadata)
  78. # 存储到向量数据库
  79. vector_store_manager.add_documents(
  80. split_docs,
  81. store_type or self.default_store_type,
  82. collection_name or self.default_collection
  83. )
  84. # 获取统计信息
  85. stats = document_processor.get_document_stats(split_docs)
  86. collection_stats = vector_store_manager.get_collection_stats(
  87. store_type or self.default_store_type,
  88. collection_name or self.default_collection
  89. )
  90. result = {
  91. "directory_path": directory_path,
  92. "processed_documents": len(split_docs),
  93. "original_documents": len(documents),
  94. "document_stats": stats,
  95. "collection_stats": collection_stats,
  96. "store_type": store_type or self.default_store_type,
  97. "collection_name": collection_name or self.default_collection
  98. }
  99. logger.info(f"目录处理完成: {directory_path}")
  100. return result
  101. except Exception as e:
  102. logger.error(f"目录处理失败: {directory_path}, 错误: {e}")
  103. raise
  104. def search_documents(self, query: str, k: int = 4,
  105. store_type: str = None, collection_name: str = None) -> List[Dict[str, Any]]:
  106. """搜索文档"""
  107. documents = vector_store_manager.similarity_search(
  108. query, k, store_type or self.default_store_type,
  109. collection_name or self.default_collection
  110. )
  111. results = []
  112. for doc in documents:
  113. result = {
  114. "content": doc.page_content,
  115. "metadata": doc.metadata,
  116. "source": doc.metadata.get("source", "unknown")
  117. }
  118. results.append(result)
  119. return results
  120. def search_documents_with_scores(self, query: str, k: int = 4,
  121. store_type: str = None, collection_name: str = None) -> List[Dict[str, Any]]:
  122. """带分数的文档搜索"""
  123. results_with_scores = vector_store_manager.similarity_search_with_score(
  124. query, k, store_type or self.default_store_type,
  125. collection_name or self.default_collection
  126. )
  127. results = []
  128. for doc, score in results_with_scores:
  129. result = {
  130. "content": doc.page_content,
  131. "metadata": doc.metadata,
  132. "source": doc.metadata.get("source", "unknown"),
  133. "similarity_score": score
  134. }
  135. results.append(result)
  136. return results
  137. def get_document_collections(self, store_type: str = None) -> List[str]:
  138. """获取文档集合列表"""
  139. return vector_store_manager.list_collections(
  140. store_type or self.default_store_type
  141. )
  142. def get_collection_info(self, store_type: str = None, collection_name: str = None) -> Dict[str, Any]:
  143. """获取集合信息"""
  144. return vector_store_manager.get_collection_stats(
  145. store_type or self.default_store_type,
  146. collection_name or self.default_collection
  147. )
  148. def delete_collection(self, store_type: str = None, collection_name: str = None) -> None:
  149. """删除文档集合"""
  150. vector_store_manager.delete_collection(
  151. store_type or self.default_store_type,
  152. collection_name or self.default_collection
  153. )
  154. logger.info(f"已删除文档集合: {collection_name or self.default_collection}")
  155. def export_documents(self, store_type: str = None, collection_name: str = None,
  156. format: str = "json") -> str:
  157. """导出文档集合"""
  158. # 获取所有文档(这里简化处理,实际可能需要分批获取)
  159. documents = vector_store_manager.similarity_search(
  160. "", 1000, store_type or self.default_store_type,
  161. collection_name or self.default_collection
  162. )
  163. if format == "json":
  164. import json
  165. export_data = {
  166. "collection_name": collection_name or self.default_collection,
  167. "store_type": store_type or self.default_store_type,
  168. "document_count": len(documents),
  169. "documents": [
  170. {
  171. "content": doc.page_content,
  172. "metadata": doc.metadata
  173. }
  174. for doc in documents
  175. ]
  176. }
  177. return json.dumps(export_data, ensure_ascii=False, indent=2)
  178. else:
  179. raise ValueError(f"不支持的导出格式: {format}")
  180. def get_supported_formats(self) -> List[str]:
  181. """获取支持的文件格式"""
  182. return list(document_processor.supported_extensions.keys())
  183. def validate_document(self, file_path: str) -> Dict[str, Any]:
  184. """验证文档"""
  185. file_path = Path(file_path)
  186. if not file_path.exists():
  187. return {
  188. "valid": False,
  189. "error": "文件不存在"
  190. }
  191. extension = file_path.suffix.lower()
  192. if extension not in document_processor.supported_extensions:
  193. return {
  194. "valid": False,
  195. "error": f"不支持的文件格式: {extension}",
  196. "supported_formats": self.get_supported_formats()
  197. }
  198. try:
  199. # 尝试加载文档
  200. documents = document_processor.load_document(str(file_path))
  201. stats = document_processor.get_document_stats(documents)
  202. return {
  203. "valid": True,
  204. "file_path": str(file_path),
  205. "file_size": file_path.stat().st_size,
  206. "extension": extension,
  207. "document_count": len(documents),
  208. "stats": stats
  209. }
  210. except Exception as e:
  211. return {
  212. "valid": False,
  213. "error": str(e)
  214. }
  215. # 全局文档服务实例
  216. document_service = DocumentService()