Przeglądaj źródła

增加书籍处理

xueyiming 19 godzin temu
rodzic
commit
f0505836e8

+ 37 - 0
applications/utils/mysql/books.py

@@ -23,3 +23,40 @@ class Books(BaseMySQLClient):
         return await self.pool.async_save(
             query=query, params=(new_status, book_id, ori_status)
         )
+
+    async def insert_book(self, book_id, book_name, book_oss_path):
+        query = """
+            INSERT INTO books (book_id, book_name, book_oss_path)
+             VALUES (%s, %s, %s);
+        """
+        return await self.pool.async_save(
+            query=query, params=(book_id, book_name, book_oss_path)
+        )
+
+    async def select_init_books(self):
+        query = """
+            SELECT book_id, book_name, book_oss_path, extract_status
+            FROM books
+            WHERE extract_status = 0;
+        """
+        return await self.pool.async_fetch(query=query)
+
+    async def select_book_extract_status(self, book_id):
+        query = """
+            SELECT book_id, extract_status
+            FROM books
+            WHERE book_id = %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(book_id,))
+
+    async def update_book_extract_status(self, book_id, status):
+        query = """
+            UPDATE books SET extract_status = %s WHERE book_id = %s;
+            """
+        return await self.pool.async_save(query=query, params=(status, book_id))
+
+    async def update_book_extract_result(self, book_id, extract_result):
+        query = """
+            UPDATE books SET extract_result = %s, extract_status = 2 WHERE book_id = %s;
+            """
+        return await self.pool.async_save(query=query, params=(extract_result, book_id))

+ 0 - 0
applications/utils/oss/__init__.py


+ 90 - 0
applications/utils/oss/oss_client.py

@@ -0,0 +1,90 @@
+import oss2
+import os
+
+
+class OSSClient:
+    # 配置默认的 endpoint 地址
+    DEFAULT_ENDPOINT = "oss-cn-hangzhou.aliyuncs.com"  # 默认华东1区(杭州)
+
+    def __init__(self, access_key_id=None, access_key_secret=None, bucket_name=None):
+        """
+        初始化 OSS 客户端
+        :param bucket_name: Bucket 名称
+        """
+        # 从环境变量中获取 Access Key 和 Secret
+        if access_key_id is None or access_key_secret is None:
+            access_key_id = "LTAIP6x1l3DXfSxm"
+            access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
+        if bucket_name is None:
+            bucket_name = "art-pubbucket"
+
+        # 检查是否有凭证
+        if not access_key_id or not access_key_secret:
+            raise ValueError(
+                "ACCESS_KEY_ID and ACCESS_KEY_SECRET must be set in the environment variables."
+            )
+
+        # 使用默认的 endpoint 地址
+        self.auth = oss2.Auth(access_key_id, access_key_secret)
+        self.bucket = oss2.Bucket(self.auth, self.DEFAULT_ENDPOINT, bucket_name)
+
+    def upload_file(self, local_file_path, oss_file_path):
+        """
+        上传文件到 OSS
+        :param local_file_path: 本地文件路径
+        :param oss_file_path: OSS 存储的文件路径(例如:pdfs/myfile.pdf)
+        :return: 上传结果,成功返回文件信息,失败抛出异常
+        """
+        if not os.path.exists(local_file_path):
+            raise FileNotFoundError(f"Local file {local_file_path} does not exist.")
+
+        try:
+            self.bucket.put_object_from_file(oss_file_path, local_file_path)
+            return {"status": "success", "message": f"File uploaded to {oss_file_path}"}
+        except Exception as e:
+            raise Exception(f"Error uploading file to OSS: {str(e)}")
+
+    def download_file(self, oss_file_path, local_file_path):
+        """
+        从 OSS 下载文件
+        :param oss_file_path: OSS 文件路径(例如:pdfs/myfile.pdf)
+        :param local_file_path: 本地保存路径
+        :return: 下载结果,成功返回下载文件的路径,失败抛出异常
+        """
+        try:
+            self.bucket.get_object_to_file(oss_file_path, local_file_path)
+            return {
+                "status": "success",
+                "message": f"File downloaded to {local_file_path}",
+            }
+        except Exception as e:
+            raise Exception(f"Error downloading file from OSS: {str(e)}")
+
+    def delete_file(self, oss_file_path):
+        """
+        从 OSS 删除文件
+        :param oss_file_path: OSS 文件路径(例如:pdfs/myfile.pdf)
+        :return: 删除结果,成功返回消息,失败抛出异常
+        """
+        try:
+            self.bucket.delete_object(oss_file_path)
+            return {
+                "status": "success",
+                "message": f"File {oss_file_path} deleted from OSS",
+            }
+        except Exception as e:
+            raise Exception(f"Error deleting file from OSS: {str(e)}")
+
+    def file_exists(self, oss_file_path):
+        """
+        检查文件是否存在于 OSS 中
+        :param oss_file_path: OSS 文件路径(例如:pdfs/myfile.pdf)
+        :return: 布尔值,文件存在返回 True,文件不存在返回 False
+        """
+        try:
+            self.bucket.get_object(oss_file_path)
+            return True
+        except oss2.exceptions.NoSuchKey:
+            return False
+        except Exception as e:
+            raise Exception(f"Error checking file existence on OSS: {str(e)}")

+ 0 - 0
applications/utils/pdf/__init__.py


+ 29 - 0
applications/utils/pdf/book_extract.py

@@ -0,0 +1,29 @@
+import requests
+# -*- coding: utf-8 -*-
+
+
+async def book_extract(book_path, book_id):
+    with open(book_path, "rb") as f:
+        files = {"files": (book_id, f, "application/pdf")}
+        response = requests.post(
+            "http://192.168.100.31:8003/file_parse",
+            headers={"accept": "application/json"},
+            data={
+                "return_model_output": "false",
+                "return_md": "false",
+                "return_images": "false",
+                "end_page_id": "99999",
+                "parse_method": "auto",
+                "start_page_id": "0",
+                "lang_list": "ch",
+                "output_dir": "./output",
+                "server_url": "string",
+                "return_content_list": "true",
+                "backend": "pipeline",
+                "table_enable": "true",
+                "response_format_zip": "false",
+                "formula_enable": "true",
+            },
+            files=files,
+        )
+    return response.json()

+ 1 - 0
applications/utils/spider/study.py

@@ -1,4 +1,5 @@
 import json
+import time
 
 import requests
 

+ 1 - 0
requirements.txt

@@ -26,4 +26,5 @@ langchain==0.3.27
 langchain-core==0.3.76
 langchain-text-splitters==0.3.11
 mcp==1.14.1
+oss2==2.19.1
 dashscope==1.24.6

+ 91 - 1
routes/blueprint.py

@@ -1,5 +1,6 @@
 import asyncio
 import json
+import os
 import traceback
 import uuid
 from typing import Dict, Any
@@ -19,8 +20,10 @@ from applications.config import (
 from applications.resource import get_resource_manager
 from applications.search import HybridSearch
 from applications.utils.chat import RAGChatAgent
-from applications.utils.mysql import Dataset, Contents, ContentChunks, ChatResult
+from applications.utils.mysql import Dataset, Contents, ContentChunks, ChatResult, Books
 from applications.api.qwen import QwenClient
+from applications.utils.oss.oss_client import OSSClient
+from applications.utils.pdf.book_extract import book_extract
 from applications.utils.spider.study import study
 
 server_bp = Blueprint("api", __name__, url_prefix="/api")
@@ -629,3 +632,90 @@ async def chat_history():
             },
         }
     )
+
+
+@server_bp.route("/upload/file", methods=["POST"])
+async def upload_pdf():
+    # 获取前端上传的文件
+    # 先等待 request.files 属性来确保文件已加载
+    files = await request.files
+
+    # 获取文件对象
+    file = files.get("file")
+
+    if file:
+        # 检查文件扩展名是否是 .pdf
+        if not file.filename.lower().endswith(".pdf"):
+            return jsonify(
+                {
+                    "status": "error",
+                    "message": "Invalid file format. Only PDF files are allowed.",
+                }
+            ), 400
+
+        # 获取文件名
+        filename = file.filename
+        print(filename)
+
+        book_id = f"book-{uuid.uuid4()}"
+        # 检查文件的 MIME 类型是否是 application/pdf
+        if file.content_type != "application/pdf":
+            return jsonify(
+                {
+                    "status": "error",
+                    "message": "Invalid MIME type. Only PDF files are allowed.",
+                }
+            ), 400
+
+        # 保存到本地(可选,视需要)
+        file_path = os.path.join("/tmp", book_id)  # 临时存储路径
+        await file.save(file_path)
+        resource = get_resource_manager()
+        books = Books(resource.mysql_client)
+        # 上传到 OSS
+        try:
+            oss_client = OSSClient()
+            # 上传文件到 OSS
+            oss_path = f"rag/pdfs/{book_id}"
+            oss_client.upload_file(file_path, oss_path)
+            await books.insert_book(book_id, filename, oss_path)
+            # os.remove(file_path)
+            return jsonify(
+                {
+                    "status": "success",
+                    "message": f"File {filename} uploaded successfully to OSS!",
+                }
+            ), 200
+        except Exception as e:
+            return jsonify({"status": "error", "message": str(e)}), 500
+    else:
+        return jsonify({"status": "error", "message": "No file uploaded."}), 400
+
+
+@server_bp.route("/process/book", methods=["GET"])
+async def process_book():
+    resource = get_resource_manager()
+    books_mapper = Books(resource.mysql_client)
+    oss_client = OSSClient()
+    books = await books_mapper.select_init_books()
+    for book in books:
+        extract_status = books_mapper.select_book_extract_status(book.get("book_id"))[
+            0
+        ]["extract_status"]
+        if extract_status == 0:
+            await books_mapper.update_book_extract_status(book.get("book_id"), 1)
+            book_id = book.get("book_id")
+            book_path = os.path.join("/tmp", book.get("book_id"))
+            if not os.path.exists(book_path):
+                oss_path = f"rag/pdfs/{book_id}"
+                oss_client.download_file(oss_path, book_path)
+            res = await book_extract(book_path, book_id)
+            if res:
+                await books_mapper.update_book_extract_result(
+                    book_id, res.get("results").get(book_id).get("content_list")
+                )
+                doc_id = f"doc-{uuid.uuid4()}"
+                chunk_task = ChunkBooksTask(doc_id=doc_id, resource=resource)
+                body = {"book_id": book_id}
+                await chunk_task.deal(body)
+    return jsonify({"status": "success"})