Просмотр исходного кода

Merge branch 'dev-xym-update' of algorithm/RAG into master

xueyiming 2 месяцев назад
Родитель
Сommit
412c97fb46

+ 7 - 9
api/search.py

@@ -8,8 +8,8 @@ from fastapi import APIRouter, BackgroundTasks
 from schemas import ResponseWrapper
 from schemas.schemas import Query, ContentData
 from tools_v1 import query_keyword_summary_results, query_keyword_content_results
+from utils.data_utils import add_data
 from utils.deepseek_utils import get_keywords
-from utils.json_parse_utils import process_texts_concurrently
 
 router = APIRouter()
 
@@ -33,18 +33,16 @@ async def query_keyword(query: Query):
 
 @router.post("/add/data", response_model=ResponseWrapper)
 async def query_keyword(content_list: List[ContentData]):
-    param = []
+    res_list = []
     for content in content_list:
-        param.append({'body_text': content.body_text})
-    print(json.dumps(param, ensure_ascii=False))
-
-    # 将处理任务提交给后台线程池
-    executor.submit(process_texts_concurrently, param)
-
+        if content.body_text:
+            print(content.body_text)
+            res = add_data(content.body_text)
+            res_list.append(res)
     return ResponseWrapper(
         status_code=200,
         detail="success",
-        data="正在后台处理中"
+        data=res_list
     )
 
 # @router.post("/query/keyword/content", response_model=ResponseWrapper)

+ 13 - 28
core/database.py

@@ -1,25 +1,27 @@
-from sqlalchemy import create_engine, Column, Integer, String, DateTime
-from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy.orm import sessionmaker
+from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker, scoped_session
 from sqlalchemy.exc import SQLAlchemyError
 from urllib.parse import quote_plus
 import configs
-
-
-
-# 配置日志
 from core.config import logger
 
-# 创建基础类
-Base = declarative_base()
 
+# 配置数据库连接池
 def create_sql_engine(config):
     user = config['user']
     passwd = quote_plus(config['password'])
     host = config['host']
     db_name = config['database']
     charset = config.get('charset', 'utf8mb4')
-    engine = create_engine(f'mysql+mysqlconnector://{user}:{passwd}@{host}/{db_name}?charset={charset}')
+
+    # 配置连接池
+    engine = create_engine(
+        f'mysql+mysqlconnector://{user}:{passwd}@{host}/{db_name}?charset={charset}',
+        pool_size=30,  # 连接池大小
+        max_overflow=10,  # 超过连接池大小后可以创建的最大连接数
+        pool_timeout=30,  # 获取连接的超时时间,单位为秒
+        pool_recycle=3600,  # 连接最大复用时间,超过这个时间将被关闭并重新创建连接
+    )
     return engine
 
 def create_rag_db_engine():
@@ -43,7 +45,6 @@ class DBHelper:
         try:
             self.session.add(entity)
             self.session.commit()
-            logger.info(f"添加成功: {entity}")
             return entity
         except SQLAlchemyError as e:
             self.session.rollback()
@@ -54,7 +55,6 @@ class DBHelper:
         """根据过滤条件获取实体对象"""
         try:
             entity = self.session.query(model).filter_by(**filters).first()
-            logger.info(f"查询成功: {entity}")
             return entity
         except SQLAlchemyError as e:
             logger.error(f"查询失败: {e}")
@@ -68,8 +68,7 @@ class DBHelper:
                 for key, value in updates.items():
                     setattr(entity, key, value)
                 self.session.commit()
-                logger.info(f"更新成功: {entity}")
-                return entity
+                return
             else:
                 logger.warning(f"未找到符合条件的实体: {filters}")
                 return None
@@ -85,7 +84,6 @@ class DBHelper:
             if entity:
                 self.session.delete(entity)
                 self.session.commit()
-                logger.info(f"删除成功: {entity}")
                 return entity
             else:
                 logger.warning(f"未找到符合条件的实体: {filters}")
@@ -95,16 +93,6 @@ class DBHelper:
             logger.error(f"删除失败: {e}")
             raise
 
-    # def get_all(self, model, **filters):
-    #     """获取所有符合条件的实体对象"""
-    #     try:
-    #         entities = self.session.query(model).filter_by(**filters).all()
-    #         logger.info(f"查询成功: {entities}")
-    #         return entities
-    #     except SQLAlchemyError as e:
-    #         logger.error(f"查询失败: {e}")
-    #         raise
-
     def get_all(self, model, **filters):
         """获取所有符合条件的实体对象,支持更复杂的查询条件"""
         try:
@@ -126,11 +114,8 @@ class DBHelper:
                 query = query.filter_by(**actual_filters)
 
             entities = query.all()
-            logger.info(f"查询成功: {entities}")
             return entities
         except SQLAlchemyError as e:
             logger.error(f"查询失败: {e}")
             raise
 
-# 创建表
-Base.metadata.create_all(engine)

+ 71 - 0
core/database_data.py

@@ -0,0 +1,71 @@
+import mysql.connector
+from mysql.connector import Error
+
+
+# 数据库配置
+db_config = {
+    'host': 'rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com',  # 数据库主机
+    'database': 'ai_knowledge',  # 数据库名称
+    'user': 'wqsd',  # 用户名
+    'password': 'wqsd@2025'  # 密码
+}
+
+
+class DatabaseHelper:
+    def __init__(self, config=None):
+        """ 初始化数据库连接配置 """
+        if config is None:
+            config = db_config
+        self.config = config
+        self.connection = None
+        self.cursor = None
+        self._connect()
+
+    def _connect(self):
+        """ 建立数据库连接 """
+        try:
+            self.connection = mysql.connector.connect(**self.config)
+            if self.connection.is_connected():
+                print("数据库连接成功")
+                self.cursor = self.connection.cursor(dictionary=True)
+        except Error as e:
+            print(f"数据库连接失败: {e}")
+
+    def execute_query(self, query, params=None):
+        """
+        执行查询SQL语句
+        :param query: SQL 查询语句
+        :param params: 查询参数 (可选)
+        :return: 查询结果
+        """
+        try:
+            self.cursor.execute(query, params)
+            return self.cursor.fetchall()
+        except Error as e:
+            print(f"执行查询失败: {e}")
+            return None
+
+    def execute_non_query(self, query, params=None):
+        """
+        执行没有返回结果的 SQL 语句 (例如 INSERT, UPDATE, DELETE)
+        :param query: SQL 执行语句
+        :param params: 参数 (可选)
+        :return: 受影响的行数
+        """
+        try:
+            self.cursor.execute(query, params)
+            self.connection.commit()  # 提交事务
+            return self.cursor.rowcount  # 返回受影响的行数
+        except Error as e:
+            print(f"执行非查询失败: {e}")
+            self.connection.rollback()
+            return 0
+
+    def close(self):
+        """ 关闭数据库连接 """
+        if self.cursor:
+            self.cursor.close()
+        if self.connection:
+            self.connection.close()
+            print("数据库连接已关闭")
+

+ 32 - 0
data_models/content_chunks.py

@@ -0,0 +1,32 @@
+from sqlalchemy import Column, Text, BigInteger, TIMESTAMP, Integer, Float
+from sqlalchemy.dialects.mysql import VARCHAR
+from sqlalchemy.orm import declarative_base
+
+Base = declarative_base()
+
+
+class ContentChunks(Base):
+    __tablename__ = "content_chunks"
+
+    id = Column(BigInteger, primary_key=True, autoincrement=True, comment="主键id")
+    chunk_id = Column(Integer)
+    doc_id = Column(VARCHAR(64))
+    text = Column(Text)
+    tokens = Column(Integer)
+    summary = Column(Text)
+    topic = Column(VARCHAR(255))
+    domain = Column(VARCHAR(100))
+    task_type = Column(VARCHAR(100))
+    topic_purity = Column(Float)
+    keywords = Column(Text)
+    concepts = Column(Text)
+    questions = Column(Text)
+    created_at = Column(TIMESTAMP)
+    updated_at = Column(TIMESTAMP)
+    chunk_status = Column(Integer)
+    keywords_status = Column(Integer)
+    embedding_status = Column(Integer)
+    entities = Column(Text)
+    version = Column(Integer)
+
+

+ 0 - 18
data_models/content_data.py

@@ -1,18 +0,0 @@
-from sqlalchemy import Column, Text, BigInteger, TIMESTAMP, Integer
-from sqlalchemy.orm import declarative_base
-
-Base = declarative_base()
-
-
-class ContentData(Base):
-    __tablename__ = "content_data"
-
-    id = Column(BigInteger, primary_key=True, autoincrement=True, comment="主键id")
-    pre_content_id = Column(BigInteger, nullable=False, comment="上一段内容id")
-    content = Column(Text, nullable=True, comment="内容")
-    summary = Column(Text, nullable=True, comment="总结")
-    keywords = Column(Text, nullable=True, comment="关键词")
-    entities = Column(Text, nullable=True, comment="实体")
-    questions = Column(Text, nullable=True, comment="问题")
-    keywords_status = Column(Integer, nullable=False, comment="关键词处理状态")
-    create_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", comment="创建时间")

+ 5 - 4
data_models/keyword_with_content.py → data_models/keyword_with_content_chunk.py

@@ -1,13 +1,14 @@
-from sqlalchemy import Column, Text, BigInteger, TIMESTAMP, VARCHAR
+from sqlalchemy import Column, Text, BigInteger, TIMESTAMP, VARCHAR, Integer
 from sqlalchemy.orm import declarative_base
 
 Base = declarative_base()
 
 
-class KeywordWithContent(Base):
-    __tablename__ = "keyword_with_content"
+class KeywordWithContentChunk(Base):
+    __tablename__ = "keyword_with_content_chunk"
 
     id = Column(BigInteger, primary_key=True, autoincrement=True, comment="主键id")
     keyword_id = Column(BigInteger, nullable=False, comment="关键词id")
-    content_id = Column(BigInteger, nullable=False, comment="内容id")
+    content_chunk_id = Column(BigInteger, nullable=False, comment="内容id")
+    keyword_clustering_status = Column(Integer, nullable=False, default=0, comment="总结状态")
     create_time = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP", comment="创建时间")

+ 9 - 20
main.py

@@ -1,4 +1,6 @@
+import asyncio
 import os
+import threading
 from contextlib import asynccontextmanager
 
 from fastapi import FastAPI
@@ -7,12 +9,10 @@ from fastapi.middleware.cors import CORSMiddleware
 # 导入配置
 from core.config import logger
 
-
-
 # 导入API路由
 from api.search import router as search_router
 from api.health import router as health_router
-
+from utils.keywords_utils import KeywordSummaryTask
 
 # 创建 FastAPI 应用
 app = FastAPI(
@@ -42,29 +42,18 @@ async def lifespan(app: FastAPI):
     print("正在启动数据胶囊 API 服务...")
     # 初始化一些资源,例如数据库连接
     # app.state.db = await init_database()  # 假设是异步初始化
-
     yield  # 应用在此处运行,处理请求
 
     # Shutdown 逻辑
     print("数据胶囊 API 服务正在关闭...")
-    # 关闭资源,例如数据库连接
-    # await close_database(app.state.db) # 假设是异步关闭
 
 
-
-# @app.on_event("startup")
-# async def startup_event():
-#     """应用启动事件"""
-#     logger.info("正在启动数据胶囊 API 服务...")
-#     # 创建数据库表
-#     # create_tables()
-#
-# @app.on_event("shutdown")
-# async def shutdown_event():
-#     """应用关闭事件"""
-#     logger.info("数据胶囊 API 服务正在关闭...")
-
 if __name__ == "__main__":
     import uvicorn
+
     os.environ['RAG_ENV'] = 'prod'
-    uvicorn.run(app, host="0.0.0.0", port=5000)
+    keyword_summary_task = KeywordSummaryTask()
+
+    # 启动单独的线程来执行 process_texts_concurrently 方法
+    threading.Thread(target=keyword_summary_task.process_texts_concurrently, daemon=True).start()
+    uvicorn.run(app, host="0.0.0.0", port=5000)

+ 2 - 1
requirements.txt

@@ -8,4 +8,5 @@ loguru==0.7.3
 sqlalchemy==2.0.43
 uvicorn==0.29.0
 pyyaml==6.0.2
-mysql-connector-python==9.4.0
+mysql-connector-python==9.4.0
+requests==2.32.5

+ 10 - 10
tools_v1.py

@@ -1,10 +1,10 @@
 import json
 
 from core.database import DBHelper
-from data_models.content_data import ContentData
+from data_models.content_chunks import ContentChunks
 from data_models.keyword_clustering import KeywordClustering
 from data_models.keyword_data import KeywordData
-from data_models.keyword_with_content import KeywordWithContent
+from data_models.keyword_with_content_chunk import KeywordWithContentChunk
 
 
 def query_keyword_data(keywords, db_helper):
@@ -95,20 +95,20 @@ def query_keyword_content_results(keywords):
 
         # 一次性查询所有关键词与内容的关联
         keyword_content_relations = db_helper.get_all(
-            KeywordWithContent,
+            KeywordWithContentChunk,
             keyword_id__in=keyword_ids
         )
 
         # 获取所有内容ID
-        content_ids = [relation.content_id for relation in keyword_content_relations]
+        content_chunk_ids = [relation.content_chunk_id for relation in keyword_content_relations]
 
-        if not content_ids:
+        if not content_chunk_ids:
             return res
 
         # 一次性查询所有内容数据
         content_data_list = db_helper.get_all(
-            ContentData,
-            id__in=content_ids
+            ContentChunks,
+            id__in=content_chunk_ids
         )
 
         # 构建内容ID到内容数据的映射
@@ -119,11 +119,11 @@ def query_keyword_content_results(keywords):
 
         # 构建结果
         for relation in keyword_content_relations:
-            if relation.content_id in content_map:
-                content_data = content_map[relation.content_id]
+            if relation.content_chunk_id in content_map:
+                content_data = content_map[relation.content_chunk_id]
                 res.append({
                     'keyword': keyword_id_to_word.get(relation.keyword_id, '未知关键词'),
-                    'content': content_data.content,
+                    'content': content_data.text,
                     'content_summary': content_data.summary
                 })
 

+ 40 - 0
utils/data_utils.py

@@ -0,0 +1,40 @@
+import json
+
+import requests
+
+from core.config import logger
+from core.database_data import DatabaseHelper
+
+
+def add_data(text):
+    try:
+        response = requests.post(
+            url='http://192.168.100.31:8001/api/chunk',
+            json={
+                "text": text,
+                "text_type": 1},
+            headers={"Content-Type": "application/json"},
+        )
+        return response.json()['doc_id']
+    except Exception as e:
+        logger.error(e)
+        return e
+
+
+def select_data():
+    db_helper = DatabaseHelper()
+    # 执行查询
+    query = """
+        SELECT c.crawl_data as json_text
+        FROM knowledge_extraction_content a
+        LEFT JOIN knowledge_parsing_content b ON a.parsing_id = b.id AND b.request_id = a.request_id
+        LEFT JOIN knowledge_crawl_content c ON c.content_id = b.content_id AND c.request_id = a.request_id
+        LEFT JOIN knowledge_request d ON d.request_id = a.request_id
+        LEFT JOIN knowledge_query e ON e.id = d.query_id
+        WHERE a.request_id > '20250905022700393495252' AND e.knowledge_type = '整体' AND a.score >= 0 AND e.category_id = 0
+        ORDER BY a.id DESC
+        """
+    result = db_helper.execute_query(query)
+    for row in result:
+        add_data(json.loads(row['json_text'])['body_text'])
+

+ 0 - 150
utils/json_parse_utils.py

@@ -1,150 +0,0 @@
-import concurrent
-import json
-import threading
-
-from core.database import DBHelper
-from data_models.content_data import ContentData
-from data_models.keyword_clustering import KeywordClustering
-from data_models.keyword_data import KeywordData
-from data_models.keyword_with_content import KeywordWithContent
-from utils.deepseek_utils import text_segment, text_question, create_keyword_summary_prompt, get_keyword_summary, \
-    update_keyword_summary_prompt
-
-
-def is_empty(value):
-    """辅助函数:判断值是否为空(None 或空字符串)"""
-    return value is None or value == ""
-
-
-def parse_json(file_path):
-    try:
-        # 读取文件内容
-        with open(file_path, 'r', encoding='utf-8') as file:
-            try:
-                # 解析JSON内容
-                json_data = json.load(file)
-                # 检查是否为JSON数组
-                if isinstance(json_data, list):
-                    return json_data
-                    # # 遍历每个JSON对象
-                    # for index, json_obj in enumerate(json_data, 1):
-                    #     body_text = json_obj.get("body_text", "")  # 字段不存在 → 空字符串
-                    #     if not is_empty(body_text):
-                    #         text_list.append(body_text)
-                else:
-                    print("错误: 文件内容不是一个JSON数组")
-
-            except json.JSONDecodeError as e:
-                print(f"JSON解析错误: {e}")
-    except FileNotFoundError:
-        print(f"错误: 找不到文件 '{file_path}'")
-    except Exception as e:
-        print(f"发生错误: {e}")
-    return []
-
-
-def generate_keywords(keyword, content_data):
-    db_helper = DBHelper()
-    keyword_data = db_helper.get(KeywordData, keyword=keyword)
-    if keyword_data is None:
-        new_keyword_data = KeywordData(keyword=keyword)
-        keyword_data = db_helper.add(new_keyword_data)
-    keyword_with_content = KeywordWithContent(keyword_id=keyword_data.id, content_id=content_data.id)
-    db_helper.add(keyword_with_content)
-    keyword_clustering = db_helper.get(KeywordClustering, keyword_id=keyword_data.id)
-    if keyword_clustering is None:
-        keyword_summary = get_keyword_summary(content_data.content, keyword_data.keyword)
-        new_keyword_clustering = KeywordClustering(keyword_id=keyword_data.id,
-                                                   keyword_summary=keyword_summary['keyword_summary'])
-        db_helper.add(new_keyword_clustering)
-        print(new_keyword_clustering)
-    else:
-        new_keyword_summary = update_keyword_summary_prompt(keyword_clustering.keyword_summary, keyword,
-                                                            content_data.content)
-        db_helper.update(KeywordClustering, filters={"id": keyword_clustering.id},
-                         updates={"keyword_summary": new_keyword_summary})
-
-
-# 划分7元组
-def ai_dispose(text):
-    db_helper = DBHelper()
-    segments = text_segment(text)['segments']
-    segment_pre_content_id = None
-    for segment in segments:
-        text = ''
-        content = segment['content']
-        summary = segment['summary']
-        keywords = segment['keywords']
-        if not is_empty(content) and not is_empty(summary):
-            # 两个都不为空:拼接(中间加空格,可按需改为空字符串 "")
-            text = f"{content}。{summary}"
-        elif not is_empty(summary):
-            # 仅 title 不为空:返回 title
-            text = content
-        elif not is_empty(content):
-            # 仅 body_text 不为空:返回 body_text
-            text = summary
-        questions = text_question(text)['questions']
-        content_data = ContentData(pre_content_id=segment_pre_content_id,
-                                   content=content,
-                                   summary=summary,
-                                   keywords=json.dumps(keywords, ensure_ascii=False),
-                                   entities=json.dumps(segment['entities'], ensure_ascii=False),
-                                   questions=json.dumps(questions, ensure_ascii=False),
-                                   keywords_status=0)
-        db_helper.add(content_data)
-        segment_pre_content_id = content_data.id
-        for keyword in keywords:
-            generate_keywords(keyword, content_data)
-        db_helper.update(ContentData, filters={"id": content_data.id}, updates={"keywords_status": 1})
-
-
-print_lock = threading.Lock()
-
-
-def process_text(text):
-    """处理单个文本的函数"""
-    try:
-        # 使用锁确保打印不会交叉
-        with print_lock:
-            print(f"处理文本: {text[:50]}...")  # 只打印前50个字符避免过长
-
-        # 调用原来的处理函数
-        result = ai_dispose(text)
-
-        return result
-    except Exception as e:
-        with print_lock:
-            print(f"处理文本时出错: {e}")
-        return None
-
-
-# 使用线程池处理文本列表
-def process_texts_concurrently(text_list, max_workers=20):
-    """使用多线程并发处理文本列表"""
-    results = []
-
-    # 创建线程池执行器
-    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
-        # 提交所有任务到线程池
-        future_to_text = {executor.submit(process_text, text['body_text']): text for text in text_list}
-
-        # 处理完成的任务
-        for future in concurrent.futures.as_completed(future_to_text):
-            text = future_to_text[future]
-            try:
-                result = future.result()
-                results.append(result)
-                with print_lock:
-                    print(f"成功处理文本: {text[:30]}...")
-            except Exception as e:
-                with print_lock:
-                    print(f"处理文本时发生异常: {e}")
-
-    return results
-
-
-if __name__ == '__main__':
-    json_path = '../data/test_data1.json'
-    text_list = parse_json(json_path)
-    process_texts_concurrently(text_list)

+ 86 - 0
utils/keywords_utils.py

@@ -0,0 +1,86 @@
+import concurrent
+import json
+from concurrent.futures import ThreadPoolExecutor
+from time import sleep
+from venv import logger
+
+from core.database import DBHelper
+from data_models.content_chunks import ContentChunks
+from data_models.keyword_clustering import KeywordClustering
+from data_models.keyword_data import KeywordData
+from data_models.keyword_with_content_chunk import KeywordWithContentChunk
+from utils.deepseek_utils import get_keyword_summary, update_keyword_summary_prompt
+
+
+class KeywordSummaryTask:
+    def __init__(self):
+        self.executor = ThreadPoolExecutor(max_workers=20, thread_name_prefix='KeywordSummaryTask')
+
+    def _generate_keywords(self, content_chunk):
+        db_helper = DBHelper()
+        keywords = json.loads(content_chunk.keywords)
+        for keyword in keywords:
+            keyword_data = db_helper.get(KeywordData, keyword=keyword)
+            if keyword_data is None:
+                new_keyword_data = KeywordData(keyword=keyword)
+                keyword_data = db_helper.add(new_keyword_data)
+            keyword_with_content_chunk = db_helper.get(KeywordWithContentChunk, keyword_id=keyword_data.id,
+                                                            content_chunk_id=content_chunk.id)
+            if keyword_with_content_chunk is None:
+                keyword_with_content_chunk = KeywordWithContentChunk(keyword_id=keyword_data.id,
+                                                                     content_chunk_id=content_chunk.id)
+                db_helper.add(keyword_with_content_chunk)
+            if keyword_with_content_chunk.keyword_clustering_status == 0:
+                try:
+                    keyword_clustering = db_helper.get(KeywordClustering, keyword_id=keyword_data.id)
+                    if keyword_clustering is None:
+                        keyword_summary = get_keyword_summary(content_chunk.text, keyword_data.keyword)
+                        new_keyword_clustering = KeywordClustering(keyword_id=keyword_data.id,
+                                                                   keyword_summary=keyword_summary['keyword_summary'])
+                        db_helper.add(new_keyword_clustering)
+                    else:
+                        new_keyword_summary = update_keyword_summary_prompt(keyword_clustering.keyword_summary, keyword,
+                                                                            content_chunk.text)
+                        db_helper.update(KeywordClustering, filters={"id": keyword_clustering.id},
+                                              updates={"keyword_summary": new_keyword_summary})
+                    db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
+                                          updates={"keyword_clustering_status": 1})
+                except Exception as e:
+                    print(e)
+                    db_helper.update(KeywordWithContentChunk, filters={"id": keyword_with_content_chunk.id},
+                                          updates={"keyword_clustering_status": 2})
+        db_helper.update(ContentChunks, filters={"id": content_chunk.id},
+                              updates={"keywords_status": 1})
+
+    # 使用线程池处理文本列表
+    def process_texts_concurrently(self):
+        print('process_texts_concurrently start')
+        db_helper = DBHelper()
+        while True:
+            content_chunks = db_helper.get_all(ContentChunks, chunk_status=2, keywords_status=0)
+            if len(content_chunks) == 0:
+                logger.info('sleep')
+                print('sleep')
+                sleep(1800)
+            else:
+                future_to_chunk = {self.executor.submit(self._generate_keywords, content_chunk): content_chunk for
+                                   content_chunk
+                                   in
+                                   content_chunks}
+
+                # 等待所有任务完成
+                concurrent.futures.wait(future_to_chunk.keys())
+
+                # 创建一个字典,内容块到结果的映射(注意:这里假设任务没有异常,如果有异常,result()会抛出)
+                results = {}
+                for future, chunk in future_to_chunk.items():
+                    try:
+                        results[chunk] = future.result()
+                    except Exception as exc:
+                        results[chunk] = exc  # 或者你可以选择其他异常处理方式
+                print("success")
+
+
+if __name__ == '__main__':
+    keyword_summary_task = KeywordSummaryTask()
+    keyword_summary_task.process_texts_concurrently()