浏览代码

V1版本初始化

xueyiming 1 天之前
当前提交
09a21d381b

+ 0 - 0
api/__init__.py


+ 19 - 0
api/health.py

@@ -0,0 +1,19 @@
+from fastapi import APIRouter
+from pydantic import BaseModel
+
+from schemas import ResponseWrapper
+
+router = APIRouter()
+
+
+@router.get("/health", response_model=ResponseWrapper)
+async def health_check():
+    """健康检查接口"""
+    return ResponseWrapper(
+        status_code=200,
+        detail="success",
+        data={"status": "healthy"}
+    )
+
+
+

+ 66 - 0
api/search.py

@@ -0,0 +1,66 @@
+import json
+from cgitb import reset
+from concurrent.futures import ThreadPoolExecutor
+from typing import List
+
+from fastapi import APIRouter, BackgroundTasks
+
+from schemas import ResponseWrapper
+from schemas.schemas import Query, ContentData
+from tools_v1 import query_keyword_summary_results, query_keyword_content_results
+from utils.deepseek_utils import get_keywords
+from utils.json_parse_utils import process_texts_concurrently
+
+router = APIRouter()
+
+# 创建线程池执行器
+executor = ThreadPoolExecutor(max_workers=10)
+
+
+@router.post("/query", response_model=ResponseWrapper)
+async def query_keyword(query: Query):
+    keywords = get_keywords(query.text)['keywords']
+    print(keywords)
+    summary_res = query_keyword_summary_results(keywords)
+    content_res = query_keyword_content_results(keywords)
+    res = {'summary_results': summary_res, 'content_results': content_res}
+    return ResponseWrapper(
+        status_code=200,
+        detail="success",
+        data=res
+    )
+
+
+@router.post("/add/data", response_model=ResponseWrapper)
+async def query_keyword(content_list: List[ContentData]):
+    param = []
+    for content in content_list:
+        param.append({'body_text': content.body_text})
+    print(json.dumps(param, ensure_ascii=False))
+
+    # 将处理任务提交给后台线程池
+    executor.submit(process_texts_concurrently, param)
+
+    return ResponseWrapper(
+        status_code=200,
+        detail="success",
+        data="正在后台处理中"
+    )
+
+# @router.post("/query/keyword/content", response_model=ResponseWrapper)
+# async def query_keyword(query: Query):
+#     res = query_keyword_content_results(query.text)
+#     return ResponseWrapper(
+#         status_code=200,
+#         detail="success",
+#         data=res
+#     )
+
+# @router.post("/query/embedding", response_model=ResponseWrapper)
+# async def query_keyword(query: Query):
+#     res = query_embedding_results(query.text)
+#     return ResponseWrapper(
+#         status_code=200,
+#         detail="success",
+#         data=res
+#     )

+ 23 - 0
configs/__init__.py

@@ -0,0 +1,23 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim:fenc=utf-8
+import json
+import os
+import yaml
+
+_config_cache = None
+
+def get():
+    global _config_cache
+    if _config_cache is None:
+        dir_name = os.path.dirname(os.path.abspath(__file__))
+        env = os.environ.get('RAG_ENV', 'dev')
+        if env not in ('dev', 'pre', 'prod'):
+            raise ValueError(f"Invalid environment: {env}. Expected one of ('dev', 'pre', 'prod').")
+        with open(f'{dir_name}/{env}.yaml', 'r') as f:
+            _config_cache = yaml.safe_load(f.read())
+    return _config_cache
+
+def get_env():
+    env = os.environ.get('RAG_ENV', 'dev')
+    return env

+ 8 - 0
configs/dev.yaml

@@ -0,0 +1,8 @@
+database:
+  rag:
+    host: rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com
+    port: 3306
+    user: wqsd
+    password: wqsd@2025
+    database: rag
+    charset: utf8mb4

+ 9 - 0
configs/prod.yaml

@@ -0,0 +1,9 @@
+database:
+  rag:
+    host: rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com
+    port: 3306
+    user: wqsd
+    password: wqsd@2025
+    database: rag
+    charset: utf8mb4
+

+ 1 - 0
core/__init__.py

@@ -0,0 +1 @@
+# 核心配置模块 

+ 20 - 0
core/config.py

@@ -0,0 +1,20 @@
+import os
+import sys
+from dotenv import load_dotenv
+from loguru import logger
+
+# 加载环境变量
+load_dotenv(override=True)
+
+# 设置日志配置
+log_level = os.getenv("LOG_LEVEL", "INFO")
+logger.remove()  # 移除默认处理器
+logger.add(sys.stderr, level=log_level)  # 添加新的处理器并设置日志级别
+logger.info(f"日志级别设置为: {log_level}")
+
+
+# 全局变量
+predictor_version = "1.0.0"
+
+# 全局优化任务跟踪
+optimization_tasks = {} 

+ 136 - 0
core/database.py

@@ -0,0 +1,136 @@
+from sqlalchemy import create_engine, Column, Integer, String, DateTime
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import sessionmaker
+from sqlalchemy.exc import SQLAlchemyError
+from urllib.parse import quote_plus
+import configs
+
+
+
+# 配置日志
+from core.config import logger
+
+# 创建基础类
+Base = declarative_base()
+
+def create_sql_engine(config):
+    user = config['user']
+    passwd = quote_plus(config['password'])
+    host = config['host']
+    db_name = config['database']
+    charset = config.get('charset', 'utf8mb4')
+    engine = create_engine(f'mysql+mysqlconnector://{user}:{passwd}@{host}/{db_name}?charset={charset}')
+    return engine
+
+def create_rag_db_engine():
+    config = configs.get()['database']['rag']
+    return create_sql_engine(config)
+
+# 创建数据库引擎
+engine = create_rag_db_engine()
+
+# 创建会话
+Session = sessionmaker(bind=engine)
+
+
+class DBHelper:
+    def __init__(self):
+        """初始化数据库连接"""
+        self.session = Session()
+
+    def add(self, entity):
+        """插入实体对象"""
+        try:
+            self.session.add(entity)
+            self.session.commit()
+            logger.info(f"添加成功: {entity}")
+            return entity
+        except SQLAlchemyError as e:
+            self.session.rollback()
+            logger.error(f"添加失败: {e}")
+            raise
+
+    def get(self, model, **filters):
+        """根据过滤条件获取实体对象"""
+        try:
+            entity = self.session.query(model).filter_by(**filters).first()
+            logger.info(f"查询成功: {entity}")
+            return entity
+        except SQLAlchemyError as e:
+            logger.error(f"查询失败: {e}")
+            raise
+
+    def update(self, model, filters, updates):
+        """更新实体对象"""
+        try:
+            entity = self.session.query(model).filter_by(**filters).first()
+            if entity:
+                for key, value in updates.items():
+                    setattr(entity, key, value)
+                self.session.commit()
+                logger.info(f"更新成功: {entity}")
+                return entity
+            else:
+                logger.warning(f"未找到符合条件的实体: {filters}")
+                return None
+        except SQLAlchemyError as e:
+            self.session.rollback()
+            logger.error(f"更新失败: {e}")
+            raise
+
+    def delete(self, model, **filters):
+        """删除实体对象"""
+        try:
+            entity = self.session.query(model).filter_by(**filters).first()
+            if entity:
+                self.session.delete(entity)
+                self.session.commit()
+                logger.info(f"删除成功: {entity}")
+                return entity
+            else:
+                logger.warning(f"未找到符合条件的实体: {filters}")
+                return None
+        except SQLAlchemyError as e:
+            self.session.rollback()
+            logger.error(f"删除失败: {e}")
+            raise
+
+    # def get_all(self, model, **filters):
+    #     """获取所有符合条件的实体对象"""
+    #     try:
+    #         entities = self.session.query(model).filter_by(**filters).all()
+    #         logger.info(f"查询成功: {entities}")
+    #         return entities
+    #     except SQLAlchemyError as e:
+    #         logger.error(f"查询失败: {e}")
+    #         raise
+
+    def get_all(self, model, **filters):
+        """获取所有符合条件的实体对象,支持更复杂的查询条件"""
+        try:
+            query = self.session.query(model)
+
+            # 处理特殊条件如 __in
+            actual_filters = {}
+            for key, value in filters.items():
+                if key.endswith('__in'):
+                    # 处理 IN 查询
+                    field_name = key[:-4]
+                    field = getattr(model, field_name)
+                    query = query.filter(field.in_(value))
+                else:
+                    actual_filters[key] = value
+
+            # 应用其他过滤条件
+            if actual_filters:
+                query = query.filter_by(**actual_filters)
+
+            entities = query.all()
+            logger.info(f"查询成功: {entities}")
+            return entities
+        except SQLAlchemyError as e:
+            logger.error(f"查询失败: {e}")
+            raise
+
+# 创建表
+Base.metadata.create_all(engine)

+ 0 - 0
data_models/__init__.py


+ 18 - 0
data_models/content_data.py

@@ -0,0 +1,18 @@
+from sqlalchemy import Column, Text, BigInteger, TIMESTAMP, Integer
+from sqlalchemy.orm import declarative_base
+
+Base = declarative_base()
+
+
+class ContentData(Base):
+    __tablename__ = "content_data"
+
+    id = Column(BigInteger, primary_key=True, autoincrement=True, comment="主键id")
+    pre_content_id = Column(BigInteger, nullable=False, comment="上一段内容id")
+    content = Column(Text, nullable=True, comment="内容")
+    summary = Column(Text, nullable=True, comment="总结")
+    keywords = Column(Text, nullable=True, comment="关键词")
+    entities = Column(Text, nullable=True, comment="实体")
+    questions = Column(Text, nullable=True, comment="问题")
+    keywords_status = Column(Integer, nullable=False, comment="关键词处理状态")
+    create_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", comment="创建时间")

+ 15 - 0
data_models/keyword_clustering.py

@@ -0,0 +1,15 @@
+from sqlalchemy import Column, Text, BigInteger, TIMESTAMP, VARCHAR, ForeignKey
+from sqlalchemy.orm import declarative_base
+
+Base = declarative_base()
+
+
+class KeywordClustering(Base):
+    __tablename__ = "keyword_clustering"
+
+    id = Column(BigInteger, primary_key=True, autoincrement=True, comment="主键id")
+    keyword_id = Column(BigInteger, nullable=False, comment="关键词id")
+    keyword_summary = Column(Text, nullable=True, comment="关键词知识")
+    create_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", comment="创建时间")
+    update_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", onupdate="CURRENT_TIMESTAMP",
+                         comment="更新时间")

+ 12 - 0
data_models/keyword_data.py

@@ -0,0 +1,12 @@
+from sqlalchemy import Column, Text, BigInteger, TIMESTAMP, VARCHAR
+from sqlalchemy.orm import declarative_base
+
+Base = declarative_base()
+
+
+class KeywordData(Base):
+    __tablename__ = "keyword_data"
+
+    id = Column(BigInteger, primary_key=True, autoincrement=True, comment="主键id")
+    keyword = Column(VARCHAR(128), nullable=False, comment="关键词")
+    create_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", comment="创建时间")

+ 13 - 0
data_models/keyword_with_content.py

@@ -0,0 +1,13 @@
+from sqlalchemy import Column, Text, BigInteger, TIMESTAMP, VARCHAR
+from sqlalchemy.orm import declarative_base
+
+Base = declarative_base()
+
+
+class KeywordWithContent(Base):
+    __tablename__ = "keyword_with_content"
+
+    id = Column(BigInteger, primary_key=True, autoincrement=True, comment="主键id")
+    keyword_id = Column(BigInteger, nullable=False, comment="关键词id")
+    content_id = Column(BigInteger, nullable=False, comment="内容id")
+    create_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", comment="创建时间")

+ 70 - 0
main.py

@@ -0,0 +1,70 @@
+import os
+from contextlib import asynccontextmanager
+
+from fastapi import FastAPI
+from fastapi.middleware.cors import CORSMiddleware
+
+# 导入配置
+from core.config import logger
+
+
+
+# 导入API路由
+from api.search import router as search_router
+from api.health import router as health_router
+
+
+# 创建 FastAPI 应用
+app = FastAPI(
+    title="RAG",
+    description="RAG数据检索",
+    version="1.0.0"
+)
+
+# 添加 CORS 中间件
+app.add_middleware(
+    CORSMiddleware,
+    allow_origins=["*"],
+    allow_credentials=True,
+    allow_methods=["*"],
+    allow_headers=["*"],
+)
+
+# 注册路由
+app.include_router(health_router, tags=["健康检查"])
+app.include_router(search_router, tags=["数据检索"])
+
+
+# 定义生命周期事件处理器
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+    # Startup 逻辑
+    print("正在启动数据胶囊 API 服务...")
+    # 初始化一些资源,例如数据库连接
+    # app.state.db = await init_database()  # 假设是异步初始化
+
+    yield  # 应用在此处运行,处理请求
+
+    # Shutdown 逻辑
+    print("数据胶囊 API 服务正在关闭...")
+    # 关闭资源,例如数据库连接
+    # await close_database(app.state.db) # 假设是异步关闭
+
+
+
+# @app.on_event("startup")
+# async def startup_event():
+#     """应用启动事件"""
+#     logger.info("正在启动数据胶囊 API 服务...")
+#     # 创建数据库表
+#     # create_tables()
+#
+# @app.on_event("shutdown")
+# async def shutdown_event():
+#     """应用关闭事件"""
+#     logger.info("数据胶囊 API 服务正在关闭...")
+
+if __name__ == "__main__":
+    import uvicorn
+    os.environ['RAG_ENV'] = 'prod'
+    uvicorn.run(app, host="127.0.0.1", port=5000)

+ 9 - 0
requirements.txt

@@ -0,0 +1,9 @@
+fastapi==0.115.8
+fastapi-sso==0.16.0
+pydantic==2.10.6
+pydantic_core==2.27.2
+openai==1.63.0
+python-dotenv==1.0.1
+loguru==0.7.3
+sqlalchemy==2.0.43
+uvicorn==0.29.0

+ 4 - 0
schemas/__init__.py

@@ -0,0 +1,4 @@
+# Pydantic 模式模块
+from .schemas import ResponseWrapper
+
+__all__ = ["ResponseWrapper"]

+ 17 - 0
schemas/schemas.py

@@ -0,0 +1,17 @@
+from pydantic import BaseModel, Field
+from typing import List, Dict, Any
+
+
+class ResponseWrapper(BaseModel):
+    """封装的响应模型"""
+    status_code: int
+    detail: str
+    data: Any
+
+
+class Query(BaseModel):
+    text: str
+
+
+class ContentData(BaseModel):
+    body_text: str

+ 144 - 0
tools_v1.py

@@ -0,0 +1,144 @@
+import json
+
+from core.database import DBHelper
+from data_models.content_data import ContentData
+from data_models.keyword_clustering import KeywordClustering
+from data_models.keyword_data import KeywordData
+from data_models.keyword_with_content import KeywordWithContent
+
+
+def query_keyword_data(keywords, db_helper):
+    """获取关键词数据,避免重复查询"""
+    if not keywords:
+        return {}
+
+    # 一次性查询所有关键词数据
+    keyword_datas = db_helper.get_all(KeywordData, keyword__in=keywords)
+    return {data.keyword: data for data in keyword_datas}
+
+
+def query_keyword_summary_results(keywords):
+    """通过关键词搜索,获取与问题相关的内容总结
+
+    Args:
+        keywords: 关键词列表["关键词1","关键词2",...]
+
+    Returns:
+        list: 搜索结果,包含关键词和对应的总结
+    """
+    if not keywords:
+        return []
+
+    res = []
+    db_helper = DBHelper()
+
+    try:
+        # 一次性获取所有关键词数据
+        keyword_dict = query_keyword_data(keywords, db_helper)
+
+        # 获取所有关键词ID
+        keyword_ids = [data.id for data in keyword_dict.values()]
+
+        if not keyword_ids:
+            return res
+
+        # 一次性查询所有关键词聚类数据
+        clustering_data = db_helper.get_all(
+            KeywordClustering,
+            keyword_id__in=keyword_ids
+        )
+
+        # 构建关键字ID到聚类数据的映射
+        clustering_map = {data.keyword_id: data for data in clustering_data}
+
+        # 构建结果
+        for keyword in keywords:
+            if keyword in keyword_dict:
+                keyword_id = keyword_dict[keyword].id
+                if keyword_id in clustering_map:
+                    res.append({
+                        'keyword': keyword,
+                        'keyword_summary': clustering_map[keyword_id].keyword_summary
+                    })
+
+    except Exception as e:
+        # 记录日志或处理异常
+        print(f"查询关键词总结时出错: {str(e)}")
+
+    return res
+
+
+def query_keyword_content_results(keywords):
+    """通过关键词搜索,获取与问题相关的内容
+
+    Args:
+        keywords: 关键词列表["关键词1","关键词2",...]
+
+    Returns:
+        list: 搜索结果,包含关键词、内容和内容总结
+    """
+    if not keywords:
+        return []
+
+    res = []
+    db_helper = DBHelper()
+
+    try:
+        # 一次性获取所有关键词数据
+        keyword_dict = query_keyword_data(keywords, db_helper)
+
+        # 获取所有关键词ID
+        keyword_ids = [data.id for data in keyword_dict.values()]
+
+        if not keyword_ids:
+            return res
+
+        # 一次性查询所有关键词与内容的关联
+        keyword_content_relations = db_helper.get_all(
+            KeywordWithContent,
+            keyword_id__in=keyword_ids
+        )
+
+        # 获取所有内容ID
+        content_ids = [relation.content_id for relation in keyword_content_relations]
+
+        if not content_ids:
+            return res
+
+        # 一次性查询所有内容数据
+        content_data_list = db_helper.get_all(
+            ContentData,
+            id__in=content_ids
+        )
+
+        # 构建内容ID到内容数据的映射
+        content_map = {data.id: data for data in content_data_list}
+
+        # 构建关键字ID到关键词的映射
+        keyword_id_to_word = {data.id: data.keyword for data in keyword_dict.values()}
+
+        # 构建结果
+        for relation in keyword_content_relations:
+            if relation.content_id in content_map:
+                content_data = content_map[relation.content_id]
+                res.append({
+                    'keyword': keyword_id_to_word.get(relation.keyword_id, '未知关键词'),
+                    'content': content_data.content,
+                    'content_summary': content_data.summary
+                })
+
+    except Exception as e:
+        # 记录日志或处理异常
+        print(f"查询关键词内容时出错: {str(e)}")
+
+    return res
+
+
+if __name__ == '__main__':
+    print(json.dumps(query_keyword_content_results(['医疗AI', 'Lora模型']), ensure_ascii=False))
+
+
+#
+# def query_embedding_results(query, top_k=5, better_than_threshold=0.65):
+#     graphvectorizer = GraphVectorizer()
+#     return graphvectorizer.embedding_search_entity(query, top_k=top_k, better_than_threshold=better_than_threshold)

+ 0 - 0
utils/__init__.py


+ 228 - 0
utils/deepseek_utils.py

@@ -0,0 +1,228 @@
+from typing import List
+import json
+from openai import OpenAI
+
+
+def create_segmentation_prompt(text):
+    prompt = f"""
+请对以下文本进行分段处理,要求如下:
+
+1. **内容相近原则**:将语义相近、主题一致的内容划分到同一段落
+2. **长度适中**:每段不要太长(一般不超过200字)也不要太短(一般不少于50字)
+3. **语意完整**:确保每段都能表达一个相对完整的意思或观点
+4. **结构清晰**:保持原文的逻辑顺序,不要改变原意
+5. **过渡自然**:段落之间应有合理的过渡或衔接
+6. **关键词提取**:提取5-8个最能代表当前分析范围(整体或段落)核心内容的关键词或短语。避免使用过于通用和宽泛的词汇。
+7. **实体提取**:识别并分类以下类型的实体(如果存在):
+    *   人物姓名、别名、称号。
+    *   组织、公司、机构、政府部门。
+    *   地点、国家、城市、区域。
+    *   绝对或相对的日期、时间、年份、时期。
+    *   产品、作品、物品的名称。
+    *   历史事件、会议、活动名称。
+    (*注:实体需要去重)
+
+请直接输出分段后的文本,不需要额外解释或标注
+
+请返回以下JSON格式:
+{{
+  "segments": [
+    {{
+      "id": 1,
+      "content": "第一段内容",
+      "summary": "本段摘要",
+      "keywords": ["关键词1", "关键词2", ...],
+      "entities": ["实体1", "实体2", ...]
+    }},
+    {{
+      "id": 2,
+      "content": "第二段内容", 
+      "summary": "本段摘要",
+      "keywords": ["关键词1", "关键词2", ...],
+      "entities": ["实体1", "实体2", ...]
+    }}
+  ],
+  "total_segments": 2
+}}
+
+需要分段的文本:
+"{text}"
+"""
+    return prompt
+
+
+def text_segment(text: str):
+    prompt = create_segmentation_prompt(text)
+    res = chat_with_deepseek(prompt)
+    return res
+
+
+def create_question_generation_prompt(text):
+    """
+    创建生成文本内容问题的 prompt
+
+    参数:
+        text (str): 需要生成问题的文本
+        num_questions (int): 需要生成的问题数量
+
+    返回:
+        str: 格式化后的 prompt
+    """
+    prompt = f"""
+请基于以下文本内容生成相关问题。要求:
+
+1. 问题应覆盖文本的主要内容和关键信息
+2. 问题类型多样(事实性、理解性、分析性等)
+3. 问题应清晰明确,易于理解
+4. 问题不应过于简单或复杂,适合文本内容的理解深度
+
+请以严格的 JSON 数组格式返回结果,每个元素是一个问题对象,包含以下字段:
+- id: 问题序号(从1开始)
+- question: 问题文本
+
+请返回以下JSON格式:
+{{
+  "questions": ["问题1", "问题2", ...]
+}}
+
+文本内容:
+"{text}"
+"""
+    return prompt
+
+
+def text_question(text_to_question: str):
+    prompt = create_question_generation_prompt(text_to_question)
+    return chat_with_deepseek(prompt)
+
+
+def create_keyword_summary_prompt(text, keyword):
+    prompt = f"""
+    
+    请基于以下关于关键词"{keyword}"的多条知识,生成一段全面、准确且连贯的知识。
+
+## 描述内容:
+{text}
+
+## 知识要求:
+1. 识别重叠与重复:找出不同文本中表述不同但含义相同的内容。
+2. 解决矛盾与冲突:当不同文本的信息不一致时,需要辨别或调和。
+3. 建立联系与结构:将分散的知识点连接起来,形成逻辑层次。
+4. 提炼与升华:从合并后的信息中总结出更高层次的洞察和结论。
+
+
+请返回以下JSON格式:
+{{
+  "keyword_summary": "关键词知识内容"
+}}
+
+"""
+    return prompt
+
+
+def get_keyword_summary(text, keyword):
+    prompt = create_keyword_summary_prompt(text, keyword)
+    return chat_with_deepseek(prompt)
+
+
+def update_keyword_summary_prompt(text, keyword, new_content):
+    prompt = f"""
+
+    请基于以下关于关键词"{keyword}"的相关知识,融合最新的知识到现有的知识中。
+
+## 知识要求:
+1. 识别重叠与重复:找出不同文本中表述不同但含义相同的内容。
+2. 解决矛盾与冲突:当不同文本的信息不一致时,需要辨别或调和。
+3. 建立联系与结构:将分散的知识点连接起来,形成逻辑层次。
+4. 提炼与升华:从合并后的信息中总结出更高层次的洞察和结论。
+
+## 现有知识:
+{text}
+
+## 新知识:
+{new_content}
+
+
+请返回以下JSON格式:
+{{
+  "keyword_summary": "关键词知识内容"
+}}
+
+"""
+    return prompt
+
+
+def update_keyword_summary(text, keyword, new_content):
+    prompt = update_keyword_summary_prompt(text, keyword, new_content)
+    return chat_with_deepseek(prompt)
+
+
+def create_keyword_prompt(text):
+    """
+    创建生成关键词总结的 prompt
+
+    参数:
+        text (str): 需要生成总结的文本
+        keyword (int): 关键词
+
+    返回:
+        str: 格式化后的 prompt
+    """
+    prompt = f"""
+提取最能代表当前分析范围(整体或段落)核心内容的关键词或短语。避免使用过于通用和宽泛的词汇
+## 描述内容:
+{text}
+
+请返回以下JSON格式:
+{{
+  "keywords": ["关键词1", "关键词2", ...]
+}}
+
+"""
+    return prompt
+
+
+def get_keywords(text):
+    prompt = create_keyword_prompt(text)
+    return chat_with_deepseek(prompt)
+
+
+def chat_with_deepseek(prompt, model="deepseek-chat", max_tokens=8192, temperature=0.7):
+    """
+    使用OpenAI兼容方式调用DeepSeek API
+
+    参数:
+        prompt (str): 用户输入的提示文本
+        model (str): 使用的模型,默认为deepseek-chat
+        max_tokens (int): 生成的最大token数
+        temperature (float): 控制生成随机性的参数(0-1)
+
+    返回:
+        str: API的回复内容
+    """
+    # 初始化客户端,指定DeepSeek的API端点
+    client = OpenAI(
+        api_key='sk-cfd2df92c8864ab999d66a615ee812c5',
+        base_url="https://api.deepseek.com/v1",  # DeepSeek的API端点
+    )
+
+    try:
+        # 创建聊天完成请求
+        response = client.chat.completions.create(
+            model=model,
+            messages=[{"role": "user", "content": prompt}],
+            max_tokens=max_tokens,
+            temperature=temperature,
+            stream=False,
+            response_format={"type": "json_object"}
+        )
+
+        # 返回回复内容
+        return json.loads(response.choices[0].message.content)
+
+    except Exception as e:
+        return f"发生异常: {str(e)}"
+
+
+if __name__ == '__main__':
+    print(chat_with_deepseek('你好啊'))

+ 150 - 0
utils/json_parse_utils.py

@@ -0,0 +1,150 @@
+import concurrent
+import json
+import threading
+
+from core.database import DBHelper
+from data_models.content_data import ContentData
+from data_models.keyword_clustering import KeywordClustering
+from data_models.keyword_data import KeywordData
+from data_models.keyword_with_content import KeywordWithContent
+from utils.deepseek_utils import text_segment, text_question, create_keyword_summary_prompt, get_keyword_summary, \
+    update_keyword_summary_prompt
+
+
+def is_empty(value):
+    """辅助函数:判断值是否为空(None 或空字符串)"""
+    return value is None or value == ""
+
+
+def parse_json(file_path):
+    try:
+        # 读取文件内容
+        with open(file_path, 'r', encoding='utf-8') as file:
+            try:
+                # 解析JSON内容
+                json_data = json.load(file)
+                # 检查是否为JSON数组
+                if isinstance(json_data, list):
+                    return json_data
+                    # # 遍历每个JSON对象
+                    # for index, json_obj in enumerate(json_data, 1):
+                    #     body_text = json_obj.get("body_text", "")  # 字段不存在 → 空字符串
+                    #     if not is_empty(body_text):
+                    #         text_list.append(body_text)
+                else:
+                    print("错误: 文件内容不是一个JSON数组")
+
+            except json.JSONDecodeError as e:
+                print(f"JSON解析错误: {e}")
+    except FileNotFoundError:
+        print(f"错误: 找不到文件 '{file_path}'")
+    except Exception as e:
+        print(f"发生错误: {e}")
+    return []
+
+
+def generate_keywords(keyword, content_data):
+    db_helper = DBHelper()
+    keyword_data = db_helper.get(KeywordData, keyword=keyword)
+    if keyword_data is None:
+        new_keyword_data = KeywordData(keyword=keyword)
+        keyword_data = db_helper.add(new_keyword_data)
+    keyword_with_content = KeywordWithContent(keyword_id=keyword_data.id, content_id=content_data.id)
+    db_helper.add(keyword_with_content)
+    keyword_clustering = db_helper.get(KeywordClustering, keyword_id=keyword_data.id)
+    if keyword_clustering is None:
+        keyword_summary = get_keyword_summary(content_data.content, keyword_data.keyword)
+        new_keyword_clustering = KeywordClustering(keyword_id=keyword_data.id,
+                                                   keyword_summary=keyword_summary['keyword_summary'])
+        db_helper.add(new_keyword_clustering)
+        print(new_keyword_clustering)
+    else:
+        new_keyword_summary = update_keyword_summary_prompt(keyword_clustering.keyword_summary, keyword,
+                                                            content_data.content)
+        db_helper.update(KeywordClustering, filters={"id": keyword_clustering.id},
+                         updates={"keyword_summary": new_keyword_summary})
+
+
+# 划分7元组
+def ai_dispose(text):
+    db_helper = DBHelper()
+    segments = text_segment(text)['segments']
+    segment_pre_content_id = None
+    for segment in segments:
+        text = ''
+        content = segment['content']
+        summary = segment['summary']
+        keywords = segment['keywords']
+        if not is_empty(content) and not is_empty(summary):
+            # 两个都不为空:拼接(中间加空格,可按需改为空字符串 "")
+            text = f"{content}。{summary}"
+        elif not is_empty(summary):
+            # 仅 title 不为空:返回 title
+            text = content
+        elif not is_empty(content):
+            # 仅 body_text 不为空:返回 body_text
+            text = summary
+        questions = text_question(text)['questions']
+        content_data = ContentData(pre_content_id=segment_pre_content_id,
+                                   content=content,
+                                   summary=summary,
+                                   keywords=json.dumps(keywords, ensure_ascii=False),
+                                   entities=json.dumps(segment['entities'], ensure_ascii=False),
+                                   questions=json.dumps(questions, ensure_ascii=False),
+                                   keywords_status=0)
+        db_helper.add(content_data)
+        segment_pre_content_id = content_data.id
+        for keyword in keywords:
+            generate_keywords(keyword, content_data)
+        db_helper.update(ContentData, filters={"id": content_data.id}, updates={"keywords_status": 1})
+
+
+print_lock = threading.Lock()
+
+
+def process_text(text):
+    """处理单个文本的函数"""
+    try:
+        # 使用锁确保打印不会交叉
+        with print_lock:
+            print(f"处理文本: {text[:50]}...")  # 只打印前50个字符避免过长
+
+        # 调用原来的处理函数
+        result = ai_dispose(text)
+
+        return result
+    except Exception as e:
+        with print_lock:
+            print(f"处理文本时出错: {e}")
+        return None
+
+
+# 使用线程池处理文本列表
+def process_texts_concurrently(text_list, max_workers=20):
+    """使用多线程并发处理文本列表"""
+    results = []
+
+    # 创建线程池执行器
+    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
+        # 提交所有任务到线程池
+        future_to_text = {executor.submit(process_text, text['body_text']): text for text in text_list}
+
+        # 处理完成的任务
+        for future in concurrent.futures.as_completed(future_to_text):
+            text = future_to_text[future]
+            try:
+                result = future.result()
+                results.append(result)
+                with print_lock:
+                    print(f"成功处理文本: {text[:30]}...")
+            except Exception as e:
+                with print_lock:
+                    print(f"处理文本时发生异常: {e}")
+
+    return results
+
+
+if __name__ == '__main__':
+    json_path = '../data/test_data1.json'
+    text_list = parse_json(json_path)
+    process_texts_concurrently(text_list)