Selaa lähdekoodia

多线程版本改造

jihuaqiang 1 kuukausi sitten
vanhempi
commit
4bd4ded140

+ 0 - 117
3_handle.py

@@ -1,117 +0,0 @@
-import os
-import json
-import time
-import sys
-import argparse
-from typing import Dict, Any, List, Optional, Tuple
-
-# 导入自定义模块
-sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
-from utils.mysql_db import MysqlHelper
-from gemini import GeminiProcessor
-from utils.file import File
-
-
-class Handler:
-    def __init__(self):
-        # 初始化处理器
-        self.processor = GeminiProcessor()
-        self.system_prompt = File.read_file('prompt/handle.md')
-    
-    def build_query_conditions(self, query_word: Optional[str], 
-                             source_type: Optional[str], 
-                             source_channel: Optional[str]) -> Tuple[str, Tuple]:
-        """构建查询条件和参数"""
-        conditions = ["multimodal_recognition is not null", "structured_data is null"]
-        params = []
-        
-        if query_word is not None:
-            conditions.append("query_word = %s")
-            params.append(query_word)
-        if source_type is not None:
-            conditions.append("source_type = %s")
-            params.append(source_type)
-        if source_channel is not None:
-            conditions.append("source_channel = %s")
-            params.append(source_channel)
-            
-        where_clause = " AND ".join(conditions)
-        return where_clause, tuple(params)
-    
-    def process_all_records(self, query_word: Optional[str], 
-                           source_type: Optional[str], 
-                           source_channel: Optional[str]):
-        """处理所有记录"""
-        total_processed = 0
-        total_success = 0
-        
-        try:
-            # 构建查询条件和参数
-            where_clause, params = self.build_query_conditions(query_word, source_type, source_channel)
-            sql = f"""
-                SELECT id, multimodal_recognition 
-                FROM knowledge_search_content 
-                WHERE {where_clause}
-            """
-            
-            # 查询记录
-            records = MysqlHelper.get_values(sql, params)
-            print(f"获取到 {len(records)} 条记录")
-            
-            # 处理每条记录
-            for row in records:
-                total_processed += 1
-                try:
-                    # 处理内容
-                    result = self.processor.process(row[1], self.system_prompt)
-
-                    print(result)
-
-                    # 更新数据库
-                    update_sql = """
-                        UPDATE knowledge_search_content 
-                        SET structured_data = %s 
-                        WHERE id = %s
-                    """
-
-                    affected_rows = MysqlHelper.update_values(update_sql, (result, row[0]))
-
-
-                    total_success += 1
-
-                    # 添加延迟避免API限制
-                    time.sleep(5)
-                    
-                except Exception as e:
-                    print(f"处理记录 {row[0]} 失败: {str(e)}")
-                finally:
-                    print(f"处理完成!总数据量 {len(records)},已处理 {total_processed} ,成功 {total_success} ")
-                    
-        except Exception as e:
-            print(f"处理过程中发生错误: {str(e)}")
-
-
-def main():
-    """主函数"""
-    parser = argparse.ArgumentParser(description='内容识别脚本')
-    parser.add_argument('--query_word', default=None, help='query词')
-    parser.add_argument('--source_type', default=None, help='数据源类型')
-    parser.add_argument('--source_channel', default=None, help='数据源渠道')
-    
-    args = parser.parse_args()
-    
-    try:
-        handler = Handler()
-        handler.process_all_records(
-            query_word=args.query_word, 
-            source_type=args.source_type, 
-            source_channel=args.source_channel
-        )
-    except Exception as e:
-        print(f"程序执行失败: {str(e)}")
-        sys.exit(1)
-
-
-if __name__ == "__main__":
-    main()

+ 0 - 0
4_aggregate.py


+ 267 - 8
README.md

@@ -1,26 +1,285 @@
-python3 -m venv myenv
+# 知识处理系统项目架构
 
-source myenv/bin/activate
+这是一个多模块的知识处理系统,包含内容识别、结构化处理等功能。项目采用模块化设计,每个功能模块都有独立的日志系统和进程管理。
 
-pip install -r requirements.txt
+## 🏗️ 项目结构
 
+```
+knowledge/
+├── content_indentify/          # 内容识别模块
+│   ├── multi_thread_scheduler.py    # 多线程调度器
+│   ├── indentify.py                 # 内容识别处理器
+│   ├── image_identifier.py          # 图片识别模块
+│   ├── video_identifier.py          # 视频识别模块
+│   ├── start_scheduler.sh           # 启动脚本
+│   ├── logging_config.py            # 日志配置(向后兼容)
+│   └── README.md                    # 模块说明文档
+│
+├── structure/                   # 结构化处理模块
+│   ├── multi_thread_scheduler.py    # 多线程调度器
+│   ├── structure_processor.py       # 结构化处理器
+│   ├── start_structure.sh           # 启动脚本
+│   ├── logging_config.py            # 日志配置(向后兼容)
+│   └── README.md                    # 模块说明文档
+│
+├── utils/                       # 公共工具模块
+│   ├── logging_config.py            # 公共日志配置
+│   ├── mysql_db.py                 # 数据库工具
+│   ├── file.py                     # 文件工具
+│   └── container.py                # 容器工具
+│
+├── prompt/                      # 提示词配置
+│   ├── handle.md                   # 处理提示词
+│   └── structure.md                # 结构化提示词
+│
+├── gemini.py                     # Gemini API处理器
+├── config.py                      # 配置文件
+└── README.md                      # 项目说明文档
+```
+
+## 🚀 核心功能模块
+
+### 1. 内容识别模块 (`content_indentify/`)
+
+**功能**: 自动识别和处理内容中的图片和视频
+- 多线程并发处理
+- 支持图片和视频的AI识别
+- 自动更新数据库
+- 完整的日志记录
+
+**使用方法**:
+```bash
+cd content_indentify
+./start_scheduler.sh start    # 启动
+./start_scheduler.sh status   # 查看状态
+./start_scheduler.sh stop     # 停止
+```
+
+### 2. 结构化处理模块 (`structure/`)
+
+**功能**: 对已识别的内容进行结构化处理
+- 支持按条件筛选处理
+- 调用Gemini API进行内容结构化
+- 多线程并发处理
+- 完整的日志记录
+
+**使用方法**:
+```bash
+cd structure
+./start_structure.sh start    # 启动
+./start_structure.sh status   # 查看状态
+./start_structure.sh stop     # 停止
+```
+
+## 🔧 公共基础设施
+
+### 日志系统
+
+所有模块都使用统一的日志配置 (`utils/logging_config.py`):
+
+```python
+from utils.logging_config import get_logger
+
+logger = get_logger('ModuleName')
+logger.info("信息日志")
+logger.error("错误日志")
+```
+
+**日志特点**:
+- 自动按日期分文件
+- 同时输出到文件和控制台
+- 支持不同日志级别
+- 统一的日志格式
+
+### 进程管理
+
+每个模块都有完整的进程管理功能:
+
+```bash
+# 启动
+./start_module.sh start
+
+# 查看状态
+./start_module.sh status
+
+# 停止
+./start_module.sh stop
+
+# 重启
+./start_module.sh restart
+```
+
+**进程管理特点**:
+- PID文件管理
+- 优雅退出机制
+- 信号处理
+- 自动清理资源
+
+## 📊 监控和日志
+
+### 日志文件位置
+
+```
+logs/
+├── scheduler_YYYYMMDD.log           # 调度器日志
+├── content_identifier_YYYYMMDD.log  # 内容识别日志
+├── structure_scheduler_YYYYMMDD.log # 结构化调度器日志
+├── StructureProcessor_YYYYMMDD.log  # 结构化处理器日志
+└── *_stdout.log                     # 标准输出日志
+```
+
+### 实时监控
 
+```bash
+# 监控所有日志
+tail -f logs/*.log
 
+# 监控特定模块
+tail -f logs/scheduler_*.log
+tail -f logs/structure_*.log
 
-# 执行命令
+# 监控错误
+tail -f logs/*.log | grep ERROR
+```
+
+## 🚀 部署建议
+
+### 1. 开发环境
+
+```bash
+# 直接运行Python脚本
+python3 content_indentify/multi_thread_scheduler.py
+python3 structure/multi_thread_scheduler.py
+```
+
+### 2. 生产环境
 
-## 1 抓取
+#### 使用启动脚本(推荐)
+```bash
+# 内容识别
+cd content_indentify
+./start_scheduler.sh start
+
+# 结构化处理
+cd structure
+./start_structure.sh start
 ```
 
+#### 使用systemd服务
+```bash
+# 创建服务文件
+sudo cp content_indentify/content-scheduler.service /etc/systemd/system/
+sudo cp structure/structure-scheduler.service /etc/systemd/system/
+
+# 启用服务
+sudo systemctl enable content-scheduler
+sudo systemctl enable structure-scheduler
+
+# 启动服务
+sudo systemctl start content-scheduler
+sudo systemctl start structure-scheduler
 ```
 
-## 2 图文、视频识别
+#### 使用screen/tmux
+```bash
+# 创建会话
+screen -S content
+screen -S structure
+
+# 在会话中启动
+./start_scheduler.sh start
+./start_structure.sh start
+
+# 分离会话
+# Ctrl+A, D
+
+# 重新连接
+screen -r content
+screen -r structure
 ```
 
+## 🔍 故障排除
+
+### 常见问题
+
+1. **权限问题**
+   ```bash
+   chmod +x */start_*.sh
+   ```
+
+2. **Python依赖**
+   ```bash
+   pip install -r requirements.txt
+   ```
+
+3. **数据库连接**
+   - 检查环境变量配置
+   - 确认数据库服务状态
+
+4. **日志查看**
+   ```bash
+   # 查看错误
+   grep ERROR logs/*.log
+   
+   # 查看进程状态
+   ps aux | grep python
+   ```
+
+### 调试模式
+
+```bash
+# 前台运行,查看详细输出
+python3 -u module_name/multi_thread_scheduler.py
+
+# 查看实时日志
+tail -f logs/*.log
 ```
 
-## 3 文档化
+## 📈 性能优化
+
+### 线程配置
+
+- **内容识别**: 默认5线程,每2分钟处理一条
+- **结构化处理**: 默认5线程,每2分钟处理一条
+
+可根据服务器性能调整:
+
+```python
+scheduler = MultiThreadScheduler(
+    thread_count=10,        # 增加线程数
+    interval_minutes=1       # 减少处理间隔
+)
 ```
-python 3_handle.py
+
+### 资源监控
+
+```bash
+# 监控CPU和内存
+htop
+top
+
+# 监控磁盘空间
+df -h
+du -sh logs/
+
+# 监控进程
+ps aux | grep python
 ```
 
+## 🤝 贡献指南
+
+### 添加新模块
+
+1. 创建模块目录
+2. 复制基础文件结构
+3. 实现核心功能
+4. 添加日志记录
+5. 创建启动脚本
+6. 编写文档
+
+### 代码规范
+
+- 使用统一的日志配置
+- 遵循现有的命名规范
+- 添加完整的文档注释
+- 实现优雅退出机制

+ 220 - 0
content_indentify/README.md

@@ -0,0 +1,220 @@
+# 多线程内容识别调度器
+
+这是一个多线程定时任务调度器,用于自动处理内容识别任务。在服务器上运行时,所有日志都会保存到文件中,方便查看和调试。
+
+## 功能特点
+
+- 🚀 多线程并发处理,提高处理效率
+- 📝 完整的日志记录,支持文件和控制台输出
+- 🕐 定时任务调度,可配置处理间隔
+- 🛑 优雅退出,支持信号处理
+- 🔧 进程管理,支持启动、停止、重启、状态查看
+
+## 文件说明
+
+- `multi_thread_scheduler.py` - 多线程调度器主程序
+- `indentify.py` - 内容识别处理模块
+- `logging_config.py` - 日志配置模块
+- `start_scheduler.sh` - 启动脚本(支持多种操作)
+- `logs/` - 日志文件目录
+
+## 使用方法
+
+### 1. 启动调度器
+
+```bash
+# 启动调度器(后台运行)
+./start_scheduler.sh start
+
+# 或者直接运行Python脚本(前台运行)
+python3 multi_thread_scheduler.py
+```
+
+### 2. 查看运行状态
+
+```bash
+# 查看调度器状态
+./start_scheduler.sh status
+
+# 查看实时日志
+tail -f logs/scheduler_*.log
+
+# 查看内容识别日志
+tail -f logs/content_identifier_*.log
+```
+
+### 3. 停止调度器
+
+```bash
+# 优雅停止调度器
+./start_scheduler.sh stop
+
+# 强制停止(如果优雅停止失败)
+kill -KILL $(cat scheduler.pid)
+```
+
+### 4. 重启调度器
+
+```bash
+# 重启调度器
+./start_scheduler.sh restart
+```
+
+## 配置说明
+
+### 线程数量和处理间隔
+
+在 `multi_thread_scheduler.py` 中修改:
+
+```python
+scheduler = MultiThreadScheduler(thread_count=5, interval_minutes=2)
+```
+
+- `thread_count`: 工作线程数量(默认5个)
+- `interval_minutes`: 每个线程处理数据的间隔(默认2分钟)
+
+### 日志配置
+
+日志文件会自动按日期命名:
+- 调度器日志:`logs/scheduler_YYYYMMDD.log`
+- 内容识别日志:`logs/content_identifier_YYYYMMDD.log`
+- 标准输出日志:`logs/scheduler_stdout.log`
+
+## 服务器部署建议
+
+### 1. 使用screen或tmux
+
+```bash
+# 使用screen
+screen -S scheduler
+./start_scheduler.sh start
+# 按 Ctrl+A 然后按 D 分离会话
+
+# 重新连接会话
+screen -r scheduler
+```
+
+### 2. 使用systemd服务(推荐)
+
+创建服务文件 `/etc/systemd/system/content-scheduler.service`:
+
+```ini
+[Unit]
+Description=Content Recognition Scheduler
+After=network.target
+
+[Service]
+Type=forking
+User=your_username
+WorkingDirectory=/path/to/your/project
+ExecStart=/path/to/your/project/content_indentify/start_scheduler.sh start
+ExecStop=/path/to/your/project/content_indentify/start_scheduler.sh stop
+PIDFile=/path/to/your/project/content_indentify/scheduler.pid
+Restart=always
+RestartSec=10
+
+[Install]
+WantedBy=multi-user.target
+```
+
+启用服务:
+```bash
+sudo systemctl daemon-reload
+sudo systemctl enable content-scheduler
+sudo systemctl start content-scheduler
+sudo systemctl status content-scheduler
+```
+
+### 3. 使用crontab监控
+
+```bash
+# 编辑crontab
+crontab -e
+
+# 添加监控任务(每5分钟检查一次)
+*/5 * * * * /path/to/your/project/content_indentify/start_scheduler.sh status > /dev/null 2>&1 || /path/to/your/project/content_indentify/start_scheduler.sh restart
+```
+
+## 日志查看技巧
+
+### 1. 实时监控日志
+
+```bash
+# 监控所有日志
+tail -f logs/*.log
+
+# 监控特定日志
+tail -f logs/scheduler_$(date +%Y%m%d).log
+
+# 监控错误日志
+tail -f logs/*.log | grep ERROR
+```
+
+### 2. 日志搜索
+
+```bash
+# 搜索特定关键词
+grep "ERROR" logs/*.log
+
+# 搜索特定时间段的日志
+grep "2024-01-15" logs/*.log
+
+# 搜索特定线程的日志
+grep "WorkerThread-1" logs/*.log
+```
+
+### 3. 日志分析
+
+```bash
+# 统计错误数量
+grep -c "ERROR" logs/*.log
+
+# 查看处理成功的记录
+grep -c "数据处理成功" logs/*.log
+
+# 查看处理失败的记录
+grep -c "处理失败" logs/*.log
+```
+
+## 故障排除
+
+### 1. 调度器无法启动
+
+- 检查Python环境和依赖包
+- 检查数据库连接配置
+- 查看错误日志
+
+### 2. 调度器意外停止
+
+- 检查系统资源(内存、CPU)
+- 查看错误日志
+- 检查数据库连接状态
+
+### 3. 日志文件过大
+
+- 定期清理旧日志文件
+- 调整日志级别
+- 使用logrotate进行日志轮转
+
+## 注意事项
+
+1. **权限设置**: 确保脚本有执行权限
+   ```bash
+   chmod +x start_scheduler.sh
+   ```
+
+2. **路径配置**: 确保所有路径都是绝对路径或正确的相对路径
+
+3. **环境变量**: 确保数据库连接等环境变量已正确配置
+
+4. **资源监控**: 定期监控服务器资源使用情况
+
+5. **备份策略**: 定期备份重要的配置和日志文件
+
+## 联系支持
+
+如果遇到问题,请:
+1. 查看相关日志文件
+2. 检查系统资源使用情况
+3. 确认配置是否正确
+4. 联系技术支持团队 

+ 52 - 53
content_indentify/indentify.py

@@ -16,6 +16,7 @@ import sys
 import argparse
 from typing import Dict, Any, List, Optional
 from dotenv import load_dotenv
+from datetime import datetime
 
 # 导入自定义模块
 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -23,6 +24,7 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 from utils.mysql_db import MysqlHelper
 from content_indentify.image_identifier import ImageIdentifier
 from content_indentify.video_identifier import VideoIdentifier
+from utils.logging_config import get_logger
 
 
 class ContentIdentifier:
@@ -30,6 +32,9 @@ class ContentIdentifier:
         # 加载环境变量
         load_dotenv()
         
+        # 设置日志
+        self.logger = get_logger('ContentIdentifier')
+        
         # 初始化数据库连接
         self.db = MysqlHelper()
         
@@ -37,6 +42,7 @@ class ContentIdentifier:
         self.image_identifier = ImageIdentifier()
         self.video_identifier = VideoIdentifier()
     
+
     def get_unprocessed_record(self) -> Optional[Dict[str, Any]]:
         """从数据库获取一条未处理的数据"""
         sql = """
@@ -65,11 +71,11 @@ class ContentIdentifier:
                         'channel_content_id': record[0]  # 使用id作为默认值
                     }
                 else:
-                    print(f"数据库返回字段数量异常: {len(record)}, 期望至少2个字段")
+                    self.logger.error(f"数据库返回字段数量异常: {len(record)}, 期望至少2个字段")
                     return None
             return None
         except Exception as e:
-            print(f"获取未处理记录失败: {e}")
+            self.logger.error(f"获取未处理记录失败: {e}")
             return None
     
     def parse_formatted_content(self, formatted_content: str) -> Dict[str, Any]:
@@ -82,12 +88,12 @@ class ContentIdentifier:
             else:
                 raise ValueError(f"不支持的数据类型: {type(formatted_content)}")
         except json.JSONDecodeError as e:
-            print(f"解析 formatted_content JSON 失败: {e}")
+            self.logger.error(f"解析 formatted_content JSON 失败: {e}")
             raise
     
     def process_content_recognition(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]:
         """处理内容识别,调用独立的识别模块"""
-        print("开始内容识别处理...")
+        self.logger.info("开始内容识别处理...")
         
         # 图片识别
         image_result = self.image_identifier.process_images(formatted_content)
@@ -101,7 +107,7 @@ class ContentIdentifier:
             'video_analysis': video_result
         }
 
-        print(f"识别结果: {recognition_result}")
+        self.logger.info(f"识别结果: {recognition_result}")
         
         return recognition_result
     
@@ -112,8 +118,6 @@ class ContentIdentifier:
             result_json = json.dumps(recognition_result, ensure_ascii=False)
             # 将换行符替换为 \n 字符串,确保JSON可以被正确解析
             result_json = result_json.replace('\n', '\\n').replace('\r', '\\r')
-
-            print(f"result_json: {result_json}")
             
             # 构建更新SQL - 使用参数化查询避免换行符问题
             sql = "UPDATE knowledge_search_content SET multimodal_recognition = %s, updated_at = NOW(), recognition_status = 2 WHERE id = %s"
@@ -122,14 +126,14 @@ class ContentIdentifier:
             # 执行更新
             result = self.db.update_values(sql, params)
             if result is not None:
-                print(f"已更新记录 {record_id} 的 multimodal_recognition 字段")
+                self.logger.info(f"已更新记录 {record_id} 的 multimodal_recognition 字段")
                 return True
             else:
-                print(f"更新记录 {record_id} 失败")
+                self.logger.error(f"更新记录 {record_id} 失败")
                 return False
                 
         except Exception as e:
-            print(f"更新数据库失败: {e}")
+            self.logger.error(f"更新数据库失败: {e}")
             return False
     
     def process_single_record(self) -> bool:
@@ -138,11 +142,11 @@ class ContentIdentifier:
             # 获取未处理的记录
             record = self.get_unprocessed_record()
             if not record:
-                print("没有找到未处理的记录")
+                self.logger.warning("没有找到未处理的记录")
                 return False
             
-            print(f"开始处理记录 ID: {record['id']}, 内容ID: {record['channel_content_id']}")
-            # print(f"  多模态识别: {record['multimodal_recognition'][:300]}...")
+            self.logger.info(f"开始处理记录 ID: {record['id']}, 内容ID: {record['channel_content_id']}")
+            # self.logger.info(f"  多模态识别: {record['multimodal_recognition'][:300]}...")
 
             # 先设置这条记录的 recognition_status = 1
             self.db.update_values(f"UPDATE knowledge_search_content SET recognition_status = 3 WHERE id = {record['id']}")
@@ -166,11 +170,6 @@ class ContentIdentifier:
             content_link = formatted_content.get('content_link') or ''
             content_id = formatted_content.get('channel_content_id') or ''
             
-            # 安全地显示信息,避免 null 值导致的错误
-            print(f"  频道: {channel}")
-            print(f"  标题: {title[:100] if title else '(无标题)'}...")
-            print(f"  内容: {content[:200] if content else '(无内容)'}...")
-            
             # 调用内容识别处理
             recognition_result = self.process_content_recognition(formatted_content)
             
@@ -199,30 +198,30 @@ class ContentIdentifier:
             success = self.update_multimodal_recognition(record['id'], complete_result)
             
             if success:
-                print(f"记录 {record['id']} 处理完成")
+                self.logger.info(f"记录 {record['id']} 处理完成")
                 return True
             else:
-                print(f"记录 {record['id']} 处理失败")
+                self.logger.error(f"记录 {record['id']} 处理失败")
                 return False
                 
         except Exception as e:
-            print(f"处理记录失败: {e}")
+            self.logger.error(f"处理记录失败: {e}")
             return False
     
     def process_all_records(self, max_records: int = 10):
         """处理多条记录"""
-        print(f"开始批量处理,最多处理 {max_records} 条记录")
+        self.logger.info(f"开始批量处理,最多处理 {max_records} 条记录")
         
         processed_count = 0
         success_count = 0
         
         for i in range(max_records):
-            print(f"\n--- 处理第 {i+1}/{max_records} 条记录 ---")
+            self.logger.info(f"\n--- 处理第 {i+1}/{max_records} 条记录 ---")
             
             if self.process_single_record():
                 success_count += 1
             else:
-                print("没有更多记录需要处理,结束批量处理")
+                self.logger.warning("没有更多记录需要处理,结束批量处理")
                 break
             
             processed_count += 1
@@ -230,19 +229,19 @@ class ContentIdentifier:
             # 添加延迟避免API限制
             time.sleep(2)
         
-        print(f"\n批量处理完成!总共处理 {processed_count} 条记录,成功 {success_count} 条")
+        self.logger.info(f"\n批量处理完成!总共处理 {processed_count} 条记录,成功 {success_count} 条")
 
     def process_continuous(self, max_records: int = None, delay_seconds: int = 2):
         """连续处理记录,直到没有更多记录或达到最大数量限制"""
-        print("启动连续处理模式...")
-        print("系统将自动处理数据库中的记录,一条完成后自动处理下一条")
-        print(f"处理间隔: {delay_seconds} 秒")
+        self.logger.info("启动连续处理模式...")
+        self.logger.info("系统将自动处理数据库中的记录,一条完成后自动处理下一条")
+        self.logger.info(f"处理间隔: {delay_seconds} 秒")
         if max_records:
-            print(f"最大处理数量: {max_records} 条")
+            self.logger.info(f"最大处理数量: {max_records} 条")
         else:
-            print("无数量限制,将处理所有可用记录")
-        print("按 Ctrl+C 可以随时停止处理")
-        print("-" * 60)
+            self.logger.info("无数量限制,将处理所有可用记录")
+        self.logger.info("按 Ctrl+C 可以随时停止处理")
+        self.logger.info("-" * 60)
         
         processed_count = 0
         success_count = 0
@@ -253,25 +252,25 @@ class ContentIdentifier:
             while True:
                 # 检查是否达到最大数量限制
                 if max_records and processed_count >= max_records:
-                    print(f"\n已达到最大处理数量限制 ({max_records} 条),停止处理")
+                    self.logger.info(f"\n已达到最大处理数量限制 ({max_records} 条),停止处理")
                     break
                 
-                print(f"\n--- 处理第 {processed_count + 1} 条记录 ---")
-                print(f"时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
+                self.logger.info(f"\n--- 处理第 {processed_count + 1} 条记录 ---")
+                self.logger.info(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
                 
                 # 处理单条记录
                 if self.process_single_record():
                     success_count += 1
                     consecutive_failures = 0  # 重置连续失败计数
-                    print(f"✅ 记录处理成功 (成功: {success_count}, 失败: {processed_count - success_count + 1})")
+                    self.logger.info(f"✅ 记录处理成功 (成功: {success_count}, 失败: {processed_count - success_count + 1})")
                 else:
                     consecutive_failures += 1
-                    print(f"❌ 记录处理失败 (成功: {success_count}, 失败: {processed_count - success_count + 1})")
+                    self.logger.warning(f"❌ 记录处理失败 (成功: {success_count}, 失败: {processed_count - success_count + 1})")
                     
                     # 检查连续失败次数
                     if consecutive_failures >= max_consecutive_failures:
-                        print(f"\n⚠️  连续失败 {max_consecutive_failures} 次,可能没有更多记录需要处理")
-                        print("停止连续处理")
+                        self.logger.warning(f"\n⚠️  连续失败 {max_consecutive_failures} 次,可能没有更多记录需要处理")
+                        self.logger.info("停止连续处理")
                         break
                 
                 processed_count += 1
@@ -279,30 +278,30 @@ class ContentIdentifier:
                 # 检查是否还有更多记录
                 remaining_records = self.get_remaining_records_count()
                 if remaining_records == 0:
-                    print(f"\n🎉 所有记录已处理完成!总共处理 {processed_count} 条记录")
+                    self.logger.info(f"\n🎉 所有记录已处理完成!总共处理 {processed_count} 条记录")
                     break
                 
-                print(f"剩余待处理记录: {remaining_records} 条")
+                self.logger.info(f"剩余待处理记录: {remaining_records} 条")
                 
                 # 添加延迟避免API限制
                 if delay_seconds > 0:
-                    print(f"等待 {delay_seconds} 秒后处理下一条记录...")
+                    self.logger.info(f"等待 {delay_seconds} 秒后处理下一条记录...")
                     time.sleep(delay_seconds)
                 
         except KeyboardInterrupt:
-            print(f"\n\n⏹️  用户中断处理")
-            print(f"已处理 {processed_count} 条记录,成功 {success_count} 条")
+            self.logger.info(f"\n\n⏹️  用户中断处理")
+            self.logger.info(f"已处理 {processed_count} 条记录,成功 {success_count} 条")
         except Exception as e:
-            print(f"\n\n💥 处理过程中发生错误: {e}")
-            print(f"已处理 {processed_count} 条记录,成功 {success_count} 条")
+            self.logger.error(f"\n\n💥 处理过程中发生错误: {e}")
+            self.logger.info(f"已处理 {processed_count} 条记录,成功 {success_count} 条")
         
-        print(f"\n📊 连续处理完成!")
-        print(f"总处理数量: {processed_count}")
-        print(f"成功数量: {success_count}")
-        print(f"失败数量: {processed_count - success_count}")
+        self.logger.info(f"\n📊 连续处理完成!")
+        self.logger.info(f"总处理数量: {processed_count}")
+        self.logger.info(f"成功数量: {success_count}")
+        self.logger.info(f"失败数量: {processed_count - success_count}")
         if processed_count > 0:
             success_rate = (success_count / processed_count) * 100
-            print(f"成功率: {success_rate:.1f}%")
+            self.logger.info(f"成功率: {success_rate:.1f}%")
 
     def get_remaining_records_count(self) -> int:
         """获取剩余待处理记录数量"""
@@ -313,7 +312,7 @@ class ContentIdentifier:
                 return result[0][0]
             return 0
         except Exception as e:
-            print(f"获取剩余记录数量失败: {e}")
+            self.logger.error(f"获取剩余记录数量失败: {e}")
             return 0
 
 
@@ -343,7 +342,7 @@ def main():
             identifier.process_all_records(args.batch)
             
     except Exception as e:
-        print(f"程序执行失败: {e}")
+        sys.stderr.write(f"程序执行失败: {e}\n")
         sys.exit(1)
 
 

+ 180 - 0
content_indentify/multi_thread_scheduler.py

@@ -0,0 +1,180 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+多线程定时任务调度器
+开启5个线程,每2分钟调用一次identifier.process_single_record()处理一条数据
+"""
+
+import threading
+import time
+import signal
+import sys
+import os
+import atexit
+from datetime import datetime
+from indentify import ContentIdentifier
+from utils.logging_config import get_logger
+
+class MultiThreadScheduler:
+    def __init__(self, thread_count=5, interval_minutes=2):
+        self.thread_count = thread_count
+        self.interval_seconds = interval_minutes * 60
+        self.running = True
+        self.threads = []
+        self.identifier = ContentIdentifier()
+        self.pid_file = "scheduler.pid"
+        
+        # 设置日志
+        self.logger = get_logger('MultiThreadScheduler')
+        
+        # 设置信号处理,优雅退出
+        signal.signal(signal.SIGINT, self.signal_handler)
+        signal.signal(signal.SIGTERM, self.signal_handler)
+        
+        # 注册退出时的清理函数
+        atexit.register(self.cleanup)
+        
+        # 创建PID文件
+        self.create_pid_file()
+    
+    def create_pid_file(self):
+        """创建PID文件"""
+        try:
+            with open(self.pid_file, 'w') as f:
+                f.write(str(os.getpid()))
+            self.logger.info(f"PID文件已创建: {self.pid_file}")
+        except Exception as e:
+            self.logger.error(f"创建PID文件失败: {e}")
+    
+    def cleanup(self):
+        """清理资源"""
+        try:
+            if os.path.exists(self.pid_file):
+                os.remove(self.pid_file)
+                self.logger.info("PID文件已清理")
+        except Exception as e:
+            self.logger.error(f"清理PID文件失败: {e}")
+    
+    def signal_handler(self, signum, frame):
+        """信号处理函数,优雅退出"""
+        signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
+        self.logger.info(f"收到信号 {signal_name},正在优雅退出...")
+        self.running = False
+        self.stop_all_threads()
+        self.cleanup()
+        sys.exit(0)
+    
+    def worker_thread(self, thread_id):
+        """工作线程函数"""
+        thread_logger = get_logger(f'WorkerThread-{thread_id}')
+        thread_logger.info(f"线程 {thread_id} 启动,每 {self.interval_seconds//60} 分钟处理一条数据")
+        
+        while self.running:
+            try:
+                start_time = time.time()
+                
+                # 处理一条数据
+                thread_logger.info(f"开始处理数据...")
+                success = self.identifier.process_single_record()
+                
+                if success:
+                    thread_logger.info("数据处理成功")
+                else:
+                    thread_logger.info("没有数据需要处理或处理失败")
+                
+                # 计算剩余等待时间
+                elapsed_time = time.time() - start_time
+                wait_time = max(0, self.interval_seconds - elapsed_time)
+                
+                if wait_time > 0:
+                    thread_logger.info(f"等待 {wait_time:.1f} 秒后继续...")
+                    # 分段等待,每10秒检查一次running状态
+                    for _ in range(int(wait_time / 10) + 1):
+                        if not self.running:
+                            break
+                        time.sleep(min(10, wait_time))
+                        wait_time -= 10
+                        if wait_time <= 0:
+                            break
+                
+            except Exception as e:
+                thread_logger.error(f"处理过程中发生错误: {e}", exc_info=True)
+                # 发生错误时等待一段时间再继续
+                for _ in range(10):
+                    if not self.running:
+                        break
+                    time.sleep(1)
+        
+        thread_logger.info(f"线程 {thread_id} 已停止")
+    
+    def start_all_threads(self):
+        """启动所有工作线程"""
+        self.logger.info(f"启动 {self.thread_count} 个工作线程...")
+        
+        for i in range(self.thread_count):
+            thread = threading.Thread(
+                target=self.worker_thread, 
+                args=(i+1,),
+                daemon=True,
+                name=f"WorkerThread-{i+1}"
+            )
+            thread.start()
+            self.threads.append(thread)
+            self.logger.info(f"线程 {i+1} 已启动")
+            
+            # 如果不是最后一个线程,等待5秒再启动下一个
+            if i < self.thread_count - 1:
+                self.logger.info("等待5秒后启动下一个线程...")
+                time.sleep(5)
+        
+        self.logger.info(f"所有 {self.thread_count} 个线程已启动")
+        self.logger.info(f"每个线程每 {self.interval_seconds//60} 分钟处理一条数据")
+        self.logger.info("使用以下命令停止: ./start_scheduler.sh stop")
+    
+    def stop_all_threads(self):
+        """停止所有线程"""
+        self.logger.info("正在停止所有线程...")
+        self.running = False
+        
+        # 等待所有线程结束
+        for i, thread in enumerate(self.threads):
+            if thread.is_alive():
+                thread.join(timeout=5)
+                if thread.is_alive():
+                    self.logger.warning(f"线程 {i+1} 未能正常停止")
+                else:
+                    self.logger.info(f"线程 {i+1} 已停止")
+        
+        self.logger.info("所有线程已停止")
+    
+    def run(self):
+        """运行调度器"""
+        try:
+            self.start_all_threads()
+            
+            # 主线程保持运行,等待信号
+            while self.running:
+                time.sleep(1)
+                
+        except KeyboardInterrupt:
+            self.logger.info("收到键盘中断信号")
+        finally:
+            self.stop_all_threads()
+            self.cleanup()
+
+def main():
+    """主函数"""
+    print("=" * 60)
+    print("多线程定时任务调度器")
+    print("=" * 60)
+    print(f"线程数量: 5")
+    print(f"处理间隔: 2分钟")
+    print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+    print("=" * 60)
+    
+    # 创建并运行调度器
+    scheduler = MultiThreadScheduler(thread_count=5, interval_minutes=2)
+    scheduler.run()
+
+if __name__ == "__main__":
+    main() 

+ 0 - 94
content_indentify/run.sh

@@ -1,94 +0,0 @@
-#!/bin/bash
-
-# 内容识别脚本启动脚本
-
-echo "内容识别脚本启动中..."
-
-# 检查Python环境
-if ! command -v python3 &> /dev/null; then
-    echo "错误: 未找到 python3 命令"
-    exit 1
-fi
-
-# 检查依赖
-echo "检查依赖..."
-python3 -c "import pymysql, google.genai, requests, dotenv" 2>/dev/null
-if [ $? -ne 0 ]; then
-    echo "错误: 缺少必要的依赖包,请先安装: pip install -r requirements.txt"
-    exit 1
-fi
-
-# 检查环境变量文件
-if [ ! -f ".env" ]; then
-    echo "警告: 未找到 .env 文件,将使用默认配置"
-    echo "建议创建 .env 文件并配置必要的环境变量"
-fi
-
-# 显示帮助信息
-echo ""
-echo "使用方法:"
-echo "  python3 indentify.py --single                    # 处理单条记录"
-echo "  python3 indentify.py --batch 20                  # 批量处理20条记录"
-echo "  python3 indentify.py --continuous                # 连续处理模式(无限制)"
-echo "  python3 indentify.py --continuous --max-records 100  # 连续处理最多100条"
-echo "  python3 indentify.py --continuous --delay 5      # 连续处理,间隔5秒"
-echo "  python3 indentify.py --help                      # 显示帮助信息"
-echo ""
-
-# 运行测试(可选)
-read -p "是否先运行测试?(y/N): " -n 1 -r
-echo
-if [[ $REPLY =~ ^[Yy]$ ]]; then
-    echo "运行测试..."
-    python3 test_identify.py
-    echo ""
-fi
-
-# 询问运行模式
-echo "请选择运行模式:"
-echo "1) 处理单条记录"
-echo "2) 批量处理记录"
-echo "3) 连续处理模式(推荐)"
-echo "4) 退出"
-
-read -p "请输入选择 (1-4): " choice
-
-case $choice in
-    1)
-        echo "启动单条记录处理模式..."
-        python3 indentify.py --single
-        ;;
-    2)
-        read -p "请输入批量处理数量 (默认10): " batch_size
-        batch_size=${batch_size:-10}
-        echo "启动批量处理模式,处理 $batch_size 条记录..."
-        python3 indentify.py --batch $batch_size
-        ;;
-    3)
-        echo "启动连续处理模式..."
-        echo "此模式将自动处理数据库中的所有记录,一条完成后自动处理下一条"
-        echo ""
-        read -p "是否设置最大处理数量限制?(y/N): " -n 1 -r
-        echo
-        if [[ $REPLY =~ ^[Yy]$ ]]; then
-            read -p "请输入最大处理数量: " max_records
-            read -p "请输入处理间隔时间(秒,默认2): " delay
-            delay=${delay:-2}
-            echo "启动连续处理模式,最多处理 $max_records 条记录,间隔 $delay 秒..."
-            python3 indentify.py --continuous --max-records $max_records --delay $delay
-        else
-            read -p "请输入处理间隔时间(秒,默认2): " delay
-            delay=${delay:-2}
-            echo "启动连续处理模式,无数量限制,间隔 $delay 秒..."
-            python3 indentify.py --continuous --delay $delay
-        fi
-        ;;
-    4)
-        echo "退出"
-        exit 0
-        ;;
-    *)
-        echo "无效选择,退出"
-        exit 1
-        ;;
-esac 

+ 172 - 0
content_indentify/start_scheduler.sh

@@ -0,0 +1,172 @@
+#!/bin/bash
+
+# 多线程调度器启动脚本
+# 使用方法: 
+#   ./start_scheduler.sh start    # 启动调度器
+#   ./start_scheduler.sh stop     # 停止调度器
+#   ./start_scheduler.sh status   # 查看状态
+#   ./start_scheduler.sh restart  # 重启调度器
+
+SCRIPT_NAME="multi_thread_scheduler.py"
+PID_FILE="scheduler.pid"
+LOG_DIR="logs"
+
+# 检查Python环境
+if ! command -v python3 &> /dev/null; then
+    echo "错误: 未找到Python3,请先安装Python3"
+    exit 1
+fi
+
+# 检查依赖文件
+if [ ! -f "indentify.py" ]; then
+    echo "错误: 未找到indentify.py文件"
+    exit 1
+fi
+
+if [ ! -f "multi_thread_scheduler.py" ]; then
+    echo "错误: 未找到multi_thread_scheduler.py文件"
+    exit 1
+fi
+
+# 创建logs目录
+mkdir -p logs
+
+# 获取进程ID
+get_pid() {
+    if [ -f "$PID_FILE" ]; then
+        cat "$PID_FILE"
+    else
+        echo ""
+    fi
+}
+
+# 检查进程是否运行
+is_running() {
+    local pid=$(get_pid)
+    if [ -n "$pid" ]; then
+        if ps -p "$pid" > /dev/null 2>&1; then
+            return 0
+        else
+            # 进程不存在,删除PID文件
+            rm -f "$PID_FILE"
+            return 1
+        fi
+    fi
+    return 1
+}
+
+# 启动调度器
+start_scheduler() {
+    if is_running; then
+        echo "调度器已经在运行中 (PID: $(get_pid))"
+        return 1
+    fi
+    
+    echo "正在启动多线程调度器..."
+    echo "日志文件将保存在 $LOG_DIR/ 目录中"
+    
+    # 后台运行Python脚本
+    nohup python3 "$SCRIPT_NAME" > "$LOG_DIR/scheduler_stdout.log" 2>&1 &
+    local pid=$!
+    
+    # 保存PID到文件
+    echo "$pid" > "$PID_FILE"
+    
+    echo "调度器已启动 (PID: $pid)"
+    echo "使用以下命令查看状态:"
+    echo "  ./start_scheduler.sh status"
+    echo "  tail -f $LOG_DIR/scheduler_*.log"
+    echo ""
+    echo "使用以下命令停止:"
+    echo "  ./start_scheduler.sh stop"
+}
+
+# 停止调度器
+stop_scheduler() {
+    local pid=$(get_pid)
+    if [ -z "$pid" ]; then
+        echo "调度器未运行"
+        return 1
+    fi
+    
+    echo "正在停止调度器 (PID: $pid)..."
+    
+    # 发送SIGTERM信号
+    kill -TERM "$pid" 2>/dev/null
+    
+    # 等待进程结束
+    local count=0
+    while [ $count -lt 10 ] && ps -p "$pid" > /dev/null 2>&1; do
+        sleep 1
+        count=$((count + 1))
+        echo "等待进程结束... ($count/10)"
+    done
+    
+    # 如果进程仍在运行,强制杀死
+    if ps -p "$pid" > /dev/null 2>&1; then
+        echo "强制停止进程..."
+        kill -KILL "$pid" 2>/dev/null
+    fi
+    
+    # 删除PID文件
+    rm -f "$PID_FILE"
+    echo "调度器已停止"
+}
+
+# 查看状态
+show_status() {
+    if is_running; then
+        local pid=$(get_pid)
+        echo "调度器正在运行 (PID: $pid)"
+        echo "进程信息:"
+        ps -p "$pid" -o pid,ppid,cmd,etime
+        echo ""
+        echo "最近的日志:"
+        if [ -f "$LOG_DIR/scheduler_$(date +%Y%m%d).log" ]; then
+            tail -5 "$LOG_DIR/scheduler_$(date +%Y%m%d).log"
+        else
+            echo "未找到今日日志文件"
+        fi
+    else
+        echo "调度器未运行"
+    fi
+}
+
+# 重启调度器
+restart_scheduler() {
+    echo "重启调度器..."
+    stop_scheduler
+    sleep 2
+    start_scheduler
+}
+
+# 主逻辑
+case "${1:-start}" in
+    start)
+        start_scheduler
+        ;;
+    stop)
+        stop_scheduler
+        ;;
+    status)
+        show_status
+        ;;
+    restart)
+        restart_scheduler
+        ;;
+    *)
+        echo "用法: $0 {start|stop|status|restart}"
+        echo ""
+        echo "命令说明:"
+        echo "  start   - 启动调度器"
+        echo "  stop    - 停止调度器"
+        echo "  status  - 查看运行状态"
+        echo "  restart - 重启调度器"
+        echo ""
+        echo "示例:"
+        echo "  $0 start    # 启动"
+        echo "  $0 status   # 查看状态"
+        echo "  $0 stop     # 停止"
+        exit 1
+        ;;
+esac 

+ 0 - 0
prompt/handle.md → prompt/structure.md


+ 237 - 0
structure/README.md

@@ -0,0 +1,237 @@
+# 多线程结构化处理调度器
+
+这是一个多线程定时任务调度器,用于自动处理内容结构化任务。在服务器上运行时,所有日志都会保存到文件中,方便查看和调试。
+
+## 功能特点
+
+- 🚀 多线程并发处理,提高处理效率
+- 📝 完整的日志记录,支持文件和控制台输出
+- 🕐 定时任务调度,可配置处理间隔
+- 🛑 优雅退出,支持信号处理
+- 🔧 进程管理,支持启动、停止、重启、状态查看
+- 🎯 支持按条件筛选处理(query_word、source_type、source_channel)
+
+## 文件说明
+
+- `structure_processor.py` - 结构化处理核心模块
+- `multi_thread_scheduler.py` - 多线程调度器主程序
+- `logging_config.py` - 日志配置模块
+- `start_structure.sh` - 启动脚本(支持多种操作)
+- `logs/` - 日志文件目录
+
+## 使用方法
+
+### 1. 启动调度器
+
+```bash
+# 启动调度器(后台运行)
+./start_structure.sh start
+
+# 或者直接运行Python脚本(前台运行)
+python3 multi_thread_scheduler.py
+
+# 带参数启动
+python3 multi_thread_scheduler.py --query_word "关键词" --source_type "类型" --source_channel "渠道"
+```
+
+### 2. 查看运行状态
+
+```bash
+# 查看调度器状态
+./start_structure.sh status
+
+# 查看实时日志
+tail -f logs/structure_*.log
+
+# 查看结构化处理日志
+tail -f logs/StructureProcessor_*.log
+```
+
+### 3. 停止调度器
+
+```bash
+# 优雅停止调度器
+./start_structure.sh stop
+
+# 强制停止(如果优雅停止失败)
+kill -KILL $(cat structure_scheduler.pid)
+```
+
+### 4. 重启调度器
+
+```bash
+# 重启调度器
+./start_structure.sh restart
+```
+
+## 配置说明
+
+### 线程数量和处理间隔
+
+在 `multi_thread_scheduler.py` 中修改:
+
+```python
+scheduler = MultiThreadScheduler(
+    thread_count=5,           # 工作线程数量(默认5个)
+    interval_minutes=2,        # 每个线程处理数据的间隔(默认2分钟)
+    query_word=None,           # 查询关键词
+    source_type=None,          # 数据源类型
+    source_channel=None        # 数据源渠道
+)
+```
+
+### 查询条件
+
+支持按以下条件筛选需要处理的数据:
+
+- `query_word`: 查询关键词
+- `source_type`: 数据源类型
+- `source_channel`: 数据源渠道
+
+如果不指定条件,则处理所有符合条件的数据。
+
+### 日志配置
+
+日志文件会自动按日期命名:
+- 调度器日志:`logs/structure_scheduler_YYYYMMDD.log`
+- 结构化处理日志:`logs/StructureProcessor_YYYYMMDD.log`
+- 标准输出日志:`logs/structure_scheduler_stdout.log`
+
+## 服务器部署建议
+
+### 1. 使用screen或tmux
+
+```bash
+# 使用screen
+screen -S structure
+./start_structure.sh start
+# 按 Ctrl+A 然后按 D 分离会话
+
+# 重新连接会话
+screen -r structure
+```
+
+### 2. 使用systemd服务(推荐)
+
+创建服务文件 `/etc/systemd/system/structure-scheduler.service`:
+
+```ini
+[Unit]
+Description=Structure Processing Scheduler
+After=network.target
+
+[Service]
+Type=forking
+User=your_username
+WorkingDirectory=/path/to/your/project/structure
+ExecStart=/path/to/your/project/structure/start_structure.sh start
+ExecStop=/path/to/your/project/structure/start_structure.sh stop
+PIDFile=/path/to/your/project/structure/structure_scheduler.pid
+Restart=always
+RestartSec=10
+
+[Install]
+WantedBy=multi-user.target
+```
+
+启用服务:
+```bash
+sudo systemctl daemon-reload
+sudo systemctl enable structure-scheduler
+sudo systemctl start structure-scheduler
+sudo systemctl status structure-scheduler
+```
+
+### 3. 使用crontab监控
+
+```bash
+# 编辑crontab
+crontab -e
+
+# 添加监控任务(每5分钟检查一次)
+*/5 * * * * /path/to/your/project/structure/start_structure.sh status > /dev/null 2>&1 || /path/to/your/project/structure/start_structure.sh restart
+```
+
+## 日志查看技巧
+
+### 1. 实时监控日志
+
+```bash
+# 监控所有日志
+tail -f logs/*.log
+
+# 监控特定日志
+tail -f logs/structure_scheduler_$(date +%Y%m%d).log
+
+# 监控错误日志
+tail -f logs/*.log | grep ERROR
+```
+
+### 2. 日志搜索
+
+```bash
+# 搜索特定关键词
+grep "ERROR" logs/*.log
+
+# 搜索特定时间段的日志
+grep "2024-01-15" logs/*.log
+
+# 搜索特定线程的日志
+grep "StructureWorkerThread-1" logs/*.log
+```
+
+### 3. 日志分析
+
+```bash
+# 统计错误数量
+grep -c "ERROR" logs/*.log
+
+# 查看处理成功的记录
+grep -c "数据处理成功" logs/*.log
+
+# 查看处理失败的记录
+grep -c "处理失败" logs/*.log
+```
+
+## 故障排除
+
+### 1. 调度器无法启动
+
+- 检查Python环境和依赖包
+- 检查数据库连接配置
+- 查看错误日志
+
+### 2. 调度器意外停止
+
+- 检查系统资源(内存、CPU)
+- 查看错误日志
+- 检查数据库连接状态
+
+### 3. 日志文件过大
+
+- 定期清理旧日志文件
+- 调整日志级别
+- 使用logrotate进行日志轮转
+
+## 注意事项
+
+1. **权限设置**: 确保脚本有执行权限
+   ```bash
+   chmod +x start_structure.sh
+   ```
+
+2. **路径配置**: 确保所有路径都是绝对路径或正确的相对路径
+
+3. **环境变量**: 确保数据库连接等环境变量已正确配置
+
+4. **资源监控**: 定期监控服务器资源使用情况
+
+5. **备份策略**: 定期备份重要的配置和日志文件
+
+## 联系支持
+
+如果遇到问题,请:
+1. 查看相关日志文件
+2. 检查系统资源使用情况
+3. 确认配置是否正确
+4. 联系技术支持团队 

+ 205 - 0
structure/multi_thread_scheduler.py

@@ -0,0 +1,205 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+多线程定时任务调度器 - 结构化处理版本
+开启5个线程,每2分钟调用一次processor.process_single_record()处理一条数据
+"""
+
+import threading
+import time
+import signal
+import sys
+import os
+import atexit
+from datetime import datetime
+from structure_processor import StructureProcessor
+from utils.logging_config import get_logger
+
+class MultiThreadScheduler:
+    def __init__(self, thread_count=5, interval_minutes=2, 
+                 query_word=None, source_type=None, source_channel=None):
+        self.thread_count = thread_count
+        self.interval_seconds = interval_minutes * 60
+        self.running = True
+        self.threads = []
+        self.processor = StructureProcessor()
+        self.query_word = query_word
+        self.source_type = source_type
+        self.source_channel = source_channel
+        self.pid_file = "structure_scheduler.pid"
+        
+        # 设置日志
+        self.logger = get_logger('StructureMultiThreadScheduler')
+        
+        # 设置信号处理,优雅退出
+        signal.signal(signal.SIGINT, self.signal_handler)
+        signal.signal(signal.SIGTERM, self.signal_handler)
+        
+        # 注册退出时的清理函数
+        atexit.register(self.cleanup)
+        
+        # 创建PID文件
+        self.create_pid_file()
+    
+    def create_pid_file(self):
+        """创建PID文件"""
+        try:
+            with open(self.pid_file, 'w') as f:
+                f.write(str(os.getpid()))
+            self.logger.info(f"PID文件已创建: {self.pid_file}")
+        except Exception as e:
+            self.logger.error(f"创建PID文件失败: {e}")
+    
+    def cleanup(self):
+        """清理资源"""
+        try:
+            if os.path.exists(self.pid_file):
+                os.remove(self.pid_file)
+                self.logger.info("PID文件已清理")
+        except Exception as e:
+            self.logger.error(f"清理PID文件失败: {e}")
+    
+    def signal_handler(self, signum, frame):
+        """信号处理函数,优雅退出"""
+        signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
+        self.logger.info(f"收到信号 {signal_name},正在优雅退出...")
+        self.running = False
+        self.stop_all_threads()
+        self.cleanup()
+        sys.exit(0)
+    
+    def worker_thread(self, thread_id):
+        """工作线程函数"""
+        thread_logger = get_logger(f'StructureWorkerThread-{thread_id}')
+        thread_logger.info(f"线程 {thread_id} 启动,每 {self.interval_seconds//60} 分钟处理一条数据")
+        
+        while self.running:
+            try:
+                start_time = time.time()
+                
+                # 处理一条数据
+                thread_logger.info(f"开始处理数据...")
+                success = self.processor.process_single_record(
+                    self.query_word, self.source_type, self.source_channel
+                )
+                
+                if success:
+                    thread_logger.info("数据处理成功")
+                else:
+                    thread_logger.info("没有数据需要处理或处理失败")
+                
+                # 计算剩余等待时间
+                elapsed_time = time.time() - start_time
+                wait_time = max(0, self.interval_seconds - elapsed_time)
+                
+                if wait_time > 0:
+                    thread_logger.info(f"等待 {wait_time:.1f} 秒后继续...")
+                    # 分段等待,每10秒检查一次running状态
+                    for _ in range(int(wait_time / 10) + 1):
+                        if not self.running:
+                            break
+                        time.sleep(min(10, wait_time))
+                        wait_time -= 10
+                        if wait_time <= 0:
+                            break
+                
+            except Exception as e:
+                thread_logger.error(f"处理过程中发生错误: {e}", exc_info=True)
+                # 发生错误时等待一段时间再继续
+                for _ in range(10):
+                    if not self.running:
+                        break
+                    time.sleep(1)
+        
+        thread_logger.info(f"线程 {thread_id} 已停止")
+    
+    def start_all_threads(self):
+        """启动所有工作线程"""
+        self.logger.info(f"启动 {self.thread_count} 个工作线程...")
+        self.logger.info(f"查询条件: query_word={self.query_word}, source_type={self.source_type}, source_channel={self.source_channel}")
+        
+        for i in range(self.thread_count):
+            thread = threading.Thread(
+                target=self.worker_thread, 
+                args=(i+1,),
+                daemon=True,
+                name=f"StructureWorkerThread-{i+1}"
+            )
+            thread.start()
+            self.threads.append(thread)
+            self.logger.info(f"线程 {i+1} 已启动")
+            
+            # 如果不是最后一个线程,等待5秒再启动下一个
+            if i < self.thread_count - 1:
+                self.logger.info("等待5秒后启动下一个线程...")
+                time.sleep(5)
+        
+        self.logger.info(f"所有 {self.thread_count} 个线程已启动")
+        self.logger.info(f"每个线程每 {self.interval_seconds//60} 分钟处理一条数据")
+        self.logger.info("使用以下命令停止: ./start_structure.sh stop")
+    
+    def stop_all_threads(self):
+        """停止所有线程"""
+        self.logger.info("正在停止所有线程...")
+        self.running = False
+        
+        # 等待所有线程结束
+        for i, thread in enumerate(self.threads):
+            if thread.is_alive():
+                thread.join(timeout=5)
+                if thread.is_alive():
+                    self.logger.warning(f"线程 {i+1} 未能正常停止")
+                else:
+                    self.logger.info(f"线程 {i+1} 已停止")
+        
+        self.logger.info("所有线程已停止")
+    
+    def run(self):
+        """运行调度器"""
+        try:
+            self.start_all_threads()
+            
+            # 主线程保持运行,等待信号
+            while self.running:
+                time.sleep(1)
+                
+        except KeyboardInterrupt:
+            self.logger.info("收到键盘中断信号")
+        finally:
+            self.stop_all_threads()
+            self.cleanup()
+
+def main():
+    """主函数"""
+    import argparse
+    
+    parser = argparse.ArgumentParser(description='多线程结构化处理调度器')
+    parser.add_argument('--query_word', default=None, help='query词')
+    parser.add_argument('--source_type', default=None, help='数据源类型')
+    parser.add_argument('--source_channel', default=None, help='数据源渠道')
+    parser.add_argument('--thread_count', type=int, default=5, help='线程数量')
+    parser.add_argument('--interval_minutes', type=int, default=2, help='处理间隔(分钟)')
+    
+    args = parser.parse_args()
+    
+    print("=" * 60)
+    print("多线程结构化处理调度器")
+    print("=" * 60)
+    print(f"线程数量: {args.thread_count}")
+    print(f"处理间隔: {args.interval_minutes}分钟")
+    print(f"查询条件: query_word={args.query_word}, source_type={args.source_type}, source_channel={args.source_channel}")
+    print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+    print("=" * 60)
+    
+    # 创建并运行调度器
+    scheduler = MultiThreadScheduler(
+        thread_count=args.thread_count, 
+        interval_minutes=args.interval_minutes,
+        query_word=args.query_word,
+        source_type=args.source_type,
+        source_channel=args.source_channel
+    )
+    scheduler.run()
+
+if __name__ == "__main__":
+    main() 

+ 172 - 0
structure/start_structure.sh

@@ -0,0 +1,172 @@
+#!/bin/bash
+
+# 多线程结构化处理调度器启动脚本
+# 使用方法: 
+#   ./start_structure.sh start    # 启动调度器
+#   ./start_structure.sh stop     # 停止调度器
+#   ./start_structure.sh status   # 查看状态
+#   ./start_structure.sh restart  # 重启调度器
+
+SCRIPT_NAME="multi_thread_scheduler.py"
+PID_FILE="structure_scheduler.pid"
+LOG_DIR="logs"
+
+# 检查Python环境
+if ! command -v python3 &> /dev/null; then
+    echo "错误: 未找到Python3,请先安装Python3"
+    exit 1
+fi
+
+# 检查依赖文件
+if [ ! -f "structure_processor.py" ]; then
+    echo "错误: 未找到structure_processor.py文件"
+    exit 1
+fi
+
+if [ ! -f "multi_thread_scheduler.py" ]; then
+    echo "错误: 未找到multi_thread_scheduler.py文件"
+    exit 1
+fi
+
+# 创建logs目录
+mkdir -p logs
+
+# 获取进程ID
+get_pid() {
+    if [ -f "$PID_FILE" ]; then
+        cat "$PID_FILE"
+    else
+        echo ""
+    fi
+}
+
+# 检查进程是否运行
+is_running() {
+    local pid=$(get_pid)
+    if [ -n "$pid" ]; then
+        if ps -p "$pid" > /dev/null 2>&1; then
+            return 0
+        else
+            # 进程不存在,删除PID文件
+            rm -f "$PID_FILE"
+            return 1
+        fi
+    fi
+    return 1
+}
+
+# 启动调度器
+start_scheduler() {
+    if is_running; then
+        echo "调度器已经在运行中 (PID: $(get_pid))"
+        return 1
+    fi
+    
+    echo "正在启动多线程结构化处理调度器..."
+    echo "日志文件将保存在 $LOG_DIR/ 目录中"
+    
+    # 后台运行Python脚本
+    nohup python3 "$SCRIPT_NAME" > "$LOG_DIR/structure_scheduler_stdout.log" 2>&1 &
+    local pid=$!
+    
+    # 保存PID到文件
+    echo "$pid" > "$PID_FILE"
+    
+    echo "调度器已启动 (PID: $pid)"
+    echo "使用以下命令查看状态:"
+    echo "  ./start_structure.sh status"
+    echo "  tail -f $LOG_DIR/structure_*.log"
+    echo ""
+    echo "使用以下命令停止:"
+    echo "  ./start_structure.sh stop"
+}
+
+# 停止调度器
+stop_scheduler() {
+    local pid=$(get_pid)
+    if [ -z "$pid" ]; then
+        echo "调度器未运行"
+        return 1
+    fi
+    
+    echo "正在停止调度器 (PID: $pid)..."
+    
+    # 发送SIGTERM信号
+    kill -TERM "$pid" 2>/dev/null
+    
+    # 等待进程结束
+    local count=0
+    while [ $count -lt 10 ] && ps -p "$pid" > /dev/null 2>&1; do
+        sleep 1
+        count=$((count + 1))
+        echo "等待进程结束... ($count/10)"
+    done
+    
+    # 如果进程仍在运行,强制杀死
+    if ps -p "$pid" > /dev/null 2>&1; then
+        echo "强制停止进程..."
+        kill -KILL "$pid" 2>/dev/null
+    fi
+    
+    # 删除PID文件
+    rm -f "$PID_FILE"
+    echo "调度器已停止"
+}
+
+# 查看状态
+show_status() {
+    if is_running; then
+        local pid=$(get_pid)
+        echo "调度器正在运行 (PID: $pid)"
+        echo "进程信息:"
+        ps -p "$pid" -o pid,ppid,cmd,etime
+        echo ""
+        echo "最近的日志:"
+        if [ -f "$LOG_DIR/structure_scheduler_$(date +%Y%m%d).log" ]; then
+            tail -5 "$LOG_DIR/structure_scheduler_$(date +%Y%m%d).log"
+        else
+            echo "未找到今日日志文件"
+        fi
+    else
+        echo "调度器未运行"
+    fi
+}
+
+# 重启调度器
+restart_scheduler() {
+    echo "重启调度器..."
+    stop_scheduler
+    sleep 2
+    start_scheduler
+}
+
+# 主逻辑
+case "${1:-start}" in
+    start)
+        start_scheduler
+        ;;
+    stop)
+        stop_scheduler
+        ;;
+    status)
+        show_status
+        ;;
+    restart)
+        restart_scheduler
+        ;;
+    *)
+        echo "用法: $0 {start|stop|status|restart}"
+        echo ""
+        echo "命令说明:"
+        echo "  start   - 启动调度器"
+        echo "  stop    - 停止调度器"
+        echo "  status  - 查看运行状态"
+        echo "  restart - 重启调度器"
+        echo ""
+        echo "示例:"
+        echo "  $0 start    # 启动"
+        echo "  $0 status   # 查看状态"
+        echo "  $0 stop     # 停止"
+        exit 1
+        ;;
+esac 

+ 232 - 0
structure/structure_processor.py

@@ -0,0 +1,232 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+内容结构化处理模块
+主要功能:
+1. 从数据库中拉取需要结构化的数据
+2. 调用Gemini API进行内容结构化
+3. 将结构化结果更新到数据库
+"""
+
+import os
+import json
+import time
+import sys
+import threading
+from typing import Dict, Any, List, Optional, Tuple
+
+# 导入自定义模块
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from utils.mysql_db import MysqlHelper
+from gemini import GeminiProcessor
+from utils.file import File
+from utils.logging_config import get_logger
+
+
+class StructureProcessor:
+    def __init__(self):
+        # 设置日志
+        self.logger = get_logger('StructureProcessor')
+        
+        # 初始化处理器
+        self.processor = GeminiProcessor()
+        self.system_prompt = File.read_file('../prompt/structure.md')
+        self.logger.info("系统提示词加载完成")
+        self.logger.debug(f"系统提示词: {self.system_prompt}")
+        
+        # 线程控制
+        self.lock = threading.Lock()
+        self.stop_event = threading.Event()
+        self.threads = []
+    
+    def build_query_conditions(self, query_word: Optional[str], 
+                             source_type: Optional[str], 
+                             source_channel: Optional[str]) -> Tuple[str, Tuple]:
+        """构建查询条件和参数"""
+        conditions = ["multimodal_recognition is not null", "structured_data is null"]
+        params = []
+        
+        if query_word is not None:
+            conditions.append("query_word = %s")
+            params.append(query_word)
+        if source_type is not None:
+            conditions.append("source_type = %s")
+            params.append(source_type)
+        if source_channel is not None:
+            conditions.append("source_channel = %s")
+            params.append(source_channel)
+            
+        where_clause = " AND ".join(conditions)
+        return where_clause, tuple(params)
+    
+    def process_single_record(self, query_word: Optional[str], 
+                            source_type: Optional[str], 
+                            source_channel: Optional[str]) -> bool:
+        """处理单条记录"""
+        try:
+            with self.lock:
+                # 构建查询条件和参数
+                where_clause, params = self.build_query_conditions(query_word, source_type, source_channel)
+                
+                # 先查询一条需要处理的记录
+                select_sql = f"""
+                    SELECT id, multimodal_recognition 
+                    FROM knowledge_search_content 
+                    WHERE {where_clause}
+                    LIMIT 1
+                """
+                
+                records = MysqlHelper.get_values(select_sql, params)
+                if not records:
+                    self.logger.warning("没有找到需要处理的记录")
+                    return False
+                
+                row = records[0]
+                record_id = row[0]
+                
+                # 标记为处理中,防止其他线程取到重复处理
+                mark_sql = """
+                    UPDATE knowledge_search_content 
+                    SET structured_data = '{}' 
+                    WHERE id = %s
+                """
+                
+                MysqlHelper.update_values(mark_sql, (record_id,))
+                
+                self.logger.info(f"开始处理记录 ID: {record_id}")
+                
+                # 处理内容
+                result = self.processor.process(row[1], self.system_prompt)
+                self.logger.info(f"处理完成,结果长度: {len(str(result))}")
+                self.logger.debug(f"处理结果: {result}")
+                
+                # 更新数据库为实际结果
+                update_sql = """
+                    UPDATE knowledge_search_content 
+                    SET structured_data = %s 
+                    WHERE id = %s
+                """
+                
+                MysqlHelper.update_values(update_sql, (result, record_id))
+                self.logger.info(f"记录 {record_id} 处理完成并更新数据库")
+                return True
+                
+        except Exception as e:
+            self.logger.error(f"处理记录失败: {str(e)}", exc_info=True)
+            return False
+    
+    def worker_thread(self, thread_id: int, query_word: Optional[str], 
+                     source_type: Optional[str], source_channel: Optional[str]):
+        """工作线程函数"""
+        thread_logger = get_logger(f'WorkerThread-{thread_id}')
+        thread_logger.info(f"线程 {thread_id} 启动")
+        
+        while not self.stop_event.is_set():
+            try:
+                # 尝试处理一条记录
+                success = self.process_single_record(query_word, source_type, source_channel)
+                
+                if not success:
+                    thread_logger.info(f"没有找到需要处理的记录,等待5秒后重试")
+                    # 等待时也要检查停止信号
+                    if self.stop_event.wait(5):
+                        break
+                    continue
+                
+                # 处理成功后等待5秒再处理下一条
+                thread_logger.info(f"处理完成,等待5秒后处理下一条")
+                # 等待时也要检查停止信号
+                if self.stop_event.wait(5):
+                    break
+                
+            except Exception as e:
+                thread_logger.error(f"发生错误: {str(e)}", exc_info=True)
+                # 等待时也要检查停止信号
+                if self.stop_event.wait(5):
+                    break
+        
+        thread_logger.info(f"线程 {thread_id} 已停止")
+    
+    def start_multi_thread_processing(self, query_word: Optional[str], 
+                                    source_type: Optional[str], 
+                                    source_channel: Optional[str]):
+        """启动多线程处理"""
+        self.threads = []
+        
+        self.logger.info("启动多线程处理...")
+        self.logger.info(f"查询条件: query_word={query_word}, source_type={source_type}, source_channel={source_channel}")
+        
+        # 创建5个线程,间隔5秒启动
+        for i in range(5):
+            thread = threading.Thread(
+                target=self.worker_thread,
+                args=(i + 1, query_word, source_type, source_channel)
+            )
+            self.threads.append(thread)
+            
+            # 启动线程
+            thread.start()
+            self.logger.info(f"线程 {i + 1} 已启动")
+            
+            # 等待5秒后启动下一个线程
+            if i < 4:  # 最后一个线程不需要等待
+                self.logger.info("等待5秒后启动下一个线程...")
+                time.sleep(5)
+        
+        self.logger.info("所有线程已启动,使用 ./start_structure.sh stop 停止")
+        
+        try:
+            # 等待所有线程完成
+            for thread in self.threads:
+                thread.join()
+        except KeyboardInterrupt:
+            self.logger.info("收到停止信号,正在停止所有线程...")
+            self.stop_all_threads()
+    
+    def stop_all_threads(self):
+        """停止所有线程"""
+        self.logger.info("正在停止所有线程...")
+        self.stop_event.set()
+        
+        # 等待所有线程结束
+        for i, thread in enumerate(self.threads):
+            if thread.is_alive():
+                self.logger.info(f"等待线程 {i + 1} 结束...")
+                thread.join(timeout=10)  # 最多等待10秒
+                if thread.is_alive():
+                    self.logger.warning(f"线程 {i + 1} 未能正常结束")
+                else:
+                    self.logger.info(f"线程 {i + 1} 已正常结束")
+        
+        self.logger.info("所有线程已停止")
+
+
+def main():
+    """主函数"""
+    import argparse
+    
+    parser = argparse.ArgumentParser(description='内容结构化处理脚本')
+    parser.add_argument('--query_word', default=None, help='query词')
+    parser.add_argument('--source_type', default=None, help='数据源类型')
+    parser.add_argument('--source_channel', default=None, help='数据源渠道')
+    
+    args = parser.parse_args()
+    
+    try:
+        processor = StructureProcessor()
+        
+        processor.start_multi_thread_processing(
+            query_word=args.query_word, 
+            source_type=args.source_type, 
+            source_channel=args.source_channel
+        )
+    except Exception as e:
+        print(f"程序执行失败: {str(e)}")
+        sys.exit(1)
+
+
+if __name__ == "__main__":
+    # 测试单条记录处理
+    processor = StructureProcessor()
+    processor.process_single_record(query_word=None, source_type=None, source_channel=None) 

+ 72 - 0
utils/logging_config.py

@@ -0,0 +1,72 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+公共日志配置文件
+提供统一的日志配置,支持文件和控制台输出
+可以被多个模块共享使用
+"""
+
+import os
+import logging
+from datetime import datetime
+
+def setup_logging(log_name, log_dir="logs", level=logging.INFO, 
+                  console_output=True, file_output=True):
+    """
+    设置日志配置
+    
+    Args:
+        log_name: 日志器名称
+        log_dir: 日志文件目录
+        level: 日志级别
+        console_output: 是否输出到控制台
+        file_output: 是否输出到文件
+    """
+    # 创建logs目录
+    if file_output and not os.path.exists(log_dir):
+        os.makedirs(log_dir)
+    
+    # 创建logger
+    logger = logging.getLogger(log_name)
+    logger.setLevel(level)
+    
+    # 清除已有的handlers,避免重复
+    logger.handlers.clear()
+    
+    # 日志格式
+    formatter = logging.Formatter(
+        '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+        datefmt='%Y-%m-%d %H:%M:%S'
+    )
+    
+    # 文件输出
+    if file_output:
+        # 生成日志文件名(包含日期)
+        log_filename = f"{log_dir}/{log_name}_{datetime.now().strftime('%Y%m%d')}.log"
+        file_handler = logging.FileHandler(log_filename, encoding='utf-8')
+        file_handler.setLevel(level)
+        file_handler.setFormatter(formatter)
+        logger.addHandler(file_handler)
+    
+    # 控制台输出
+    if console_output:
+        console_handler = logging.StreamHandler()
+        console_handler.setLevel(level)
+        console_handler.setFormatter(formatter)
+        logger.addHandler(console_handler)
+    
+    return logger
+
+def get_logger(log_name, log_dir="logs", level=logging.INFO):
+    """
+    获取配置好的logger
+    
+    Args:
+        log_name: 日志器名称
+        log_dir: 日志文件目录
+        level: 日志级别
+    
+    Returns:
+        配置好的logger实例
+    """
+    return setup_logging(log_name, log_dir, level) 

+ 0 - 154
xhs/README.md

@@ -1,154 +0,0 @@
-# 内容识别脚本使用说明
-
-## 功能描述
-
-`content_identify.py` 脚本实现了以下功能:
-
-1. **读取飞书表格数据**:从指定的飞书多维表格中读取数据
-2. **调用Coze工作流**:对每条记录调用Coze工作流进行内容识别
-3. **结果写回**:将Coze返回的识别结果写回飞书表格
-
-## 环境配置
-
-### 环境变量配置
-
-在 `.env` 文件中设置以下环境变量:
-
-```bash
-# 飞书配置
-FEISHU_APP_ID=your_app_id
-FEISHU_APP_SECRET=your_app_secret
-FEISHU_FILE_TOKEN=your_file_token
-FEISHU_TABLE_ID=your_table_id    # 可选,也可在运行时传入
-
-# 可选:自定义字段名称
-FEISHU_INPUT_FIELD=抓取结果      # 输入内容字段名
-FEISHU_OUTPUT_FIELD=识别结果     # 输出结果字段名
-```
-
-**注意**:`FEISHU_TABLE_ID` 可以通过以下方式设置(优先级从高到低):
-1. 命令行参数:`python content_identify.py tblNdje7z6Cf3hax`
-2. 环境变量:`FEISHU_TABLE_ID=your_table_id`
-
-### 飞书表格结构
-
-飞书表格应包含以下字段:
-- **标题**:内容的标题
-- **抓取结果**:需要识别的内容文本
-- **图片链接**:相关的图片URL列表
-- **识别结果**:Coze工作流的识别结果(会自动填充)
-
-## 使用方法
-
-### 1. 直接运行脚本
-
-#### 使用环境变量中的表格ID
-```bash
-cd xhs
-python content_identify.py
-```
-
-#### 运行时动态传入表格ID
-```bash
-cd xhs
-python content_identify.py tblNdje7z6Cf3hax
-```
-
-#### 其他命令行选项
-```bash
-# 试运行模式(只显示会处理的记录,不实际调用API)
-python content_identify.py tblNdje7z6Cf3hax --dry-run
-
-# 从指定分页token开始处理
-python content_identify.py tblNdje7z6Cf3hax --page-token VEBsbCfaWa3gF3slQILc6Rybnde
-
-# 查看帮助信息
-python content_identify.py --help
-```
-
-### 2. 在代码中调用
-
-```python
-from xhs.content_identify import ContentIdentifier
-
-# 使用环境变量中的表格ID
-identifier = ContentIdentifier()
-identifier.process_all_records()
-
-# 或者动态传入表格ID
-identifier = ContentIdentifier(table_id='tblNdje7z6Cf3hax')
-identifier.process_all_records()
-```
-
-### 3. 运行测试脚本
-
-在运行主脚本之前,建议先运行测试脚本验证配置:
-
-```bash
-cd xhs
-python test_content_identify.py
-```
-
-测试脚本会检查:
-- 环境变量配置
-- 飞书API连接
-- Coze API连接
-- ContentIdentifier类初始化
-
-## 脚本特性
-
-### 智能处理
-- **跳过已处理记录**:如果某条记录已有识别结果,会自动跳过
-- **空内容检查**:没有输入内容的记录会被跳过
-- **分页处理**:支持大量数据的分页处理
-
-### 错误处理
-- **API调用失败**:Coze API调用失败时会记录错误信息
-- **数据提取失败**:无法提取数据时会记录详细错误
-- **网络异常**:网络问题时会自动重试
-
-### 性能优化
-- **API限制**:每次调用后添加1秒延迟,避免触发API限制
-- **批量处理**:支持批量获取和更新记录
-- **内存优化**:分页处理避免内存溢出
-
-## 输出日志
-
-脚本运行时会输出详细的处理日志:
-
-```
-开始处理飞书表格 tblxxxxxxxxx 中的所有记录
-获取到 10 条记录
-处理记录 recxxxxxxxxx
-标题: 这是一个测试标题...
-内容长度: 1234 字符
-图片数量: 3
-正在调用Coze工作流,标题: 这是一个测试标题...
-Coze工作流调用成功
-已更新记录 recxxxxxxxxx
-处理完成!总共处理 10 条记录,成功 10 条
-```
-
-## 故障排除
-
-### 常见问题
-
-1. **环境变量未设置**
-   - 确保所有必需的环境变量都已正确设置
-   - 检查 `.env` 文件是否存在且格式正确
-
-2. **飞书API权限问题**
-   - 检查 `FEISHU_APP_ID` 和 `FEISHU_APP_SECRET` 是否正确
-   - 确认应用有访问多维表格的权限
-
-3. **Coze API调用失败**
-   - 检查Coze工作流ID是否正确
-   - 确认API密钥有效且有调用权限
-
-4. **字段名称不匹配**
-   - 检查飞书表格中的字段名称是否与配置一致
-   - 可以通过环境变量自定义字段名称
-
-### 调试模式
-
-如需更详细的调试信息,可以修改脚本中的日志级别或添加更多打印语句。 

+ 0 - 286
xhs/content_identify.py

@@ -1,286 +0,0 @@
-import os
-import json
-import time
-import sys
-import argparse
-from typing import Dict, Any, List, Optional
-from dotenv import load_dotenv
-
-# 导入自定义模块
-sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
-from utils.fei_shu import FeiShu
-from coze.coze_hook import CozeHook
-
-
-class ContentIdentifier:
-    def __init__(self, table_id: Optional[str] = None):
-        # 加载环境变量
-        load_dotenv()
-        
-        # 初始化飞书客户端
-        self.feishu = FeiShu()
-        
-        # 初始化Coze客户端
-        self.coze = CozeHook()
-        
-        # 获取表格ID:优先使用传入的参数,其次使用环境变量
-        self.table_id = table_id or os.getenv('FEISHU_TABLE_ID')
-        if not self.table_id:
-            raise ValueError("请设置环境变量 FEISHU_TABLE_ID 或在运行时传入 table_id 参数")
-        
-        # 字段名称配置
-        self.input_field = os.getenv('FEISHU_INPUT_FIELD', '抓取结果')
-        self.output_field = os.getenv('FEISHU_OUTPUT_FIELD', '识别结果')
-        
-    def extract_content_from_record(self, record) -> Dict[str, Any]:
-        """从飞书记录中提取内容"""
-        fields = record.fields
-        
-        # 提取抓取结果
-        crawl_result = fields.get(self.input_field, '')
-        title = ''
-        body_text = ''
-        image_url_list = []
-        
-        # 解析抓取结果
-        if crawl_result:
-            if isinstance(crawl_result, list) and len(crawl_result) > 0:
-                # 如果是数组格式,取第一个元素
-                crawl_data = crawl_result[0]
-                if isinstance(crawl_data, dict) and 'text' in crawl_data:
-                    try:
-                        # 解析JSON字符串
-                        json_data = json.loads(crawl_data['text'])
-                        
-                        # 提取标题
-                        title = json_data.get('title', '')
-                        
-                        # 提取正文内容
-                        body_text = json_data.get('body_text', '')
-                        
-                        # 提取图片链接
-                        image_data_list = json_data.get('image_url_list', [])
-                        for img_data in image_data_list:
-                            if isinstance(img_data, dict) and 'image_url' in img_data:
-                                image_url_list.append(img_data['image_url'])
-                        
-                    except json.JSONDecodeError as e:
-                        print(f"解析抓取结果JSON失败: {e}")
-                        # 如果解析失败,尝试直接使用文本内容
-                        if isinstance(crawl_data, dict) and 'text' in crawl_data:
-                            body_text = crawl_data['text']
-            elif isinstance(crawl_result, str):
-                # 如果是字符串格式,尝试直接解析
-                try:
-                    json_data = json.loads(crawl_result)
-                    title = json_data.get('title', '')
-                    body_text = json_data.get('body_text', '')
-                    image_data_list = json_data.get('image_url_list', [])
-                    for img_data in image_data_list:
-                        if isinstance(img_data, dict) and 'image_url' in img_data:
-                            image_url_list.append(img_data['image_url'])
-                except json.JSONDecodeError:
-                    body_text = crawl_result
-        
-        return {
-            'title': title,
-            'body_text': body_text,
-            'image_url_list': image_url_list,
-            'record_id': record.record_id
-        }
-    
-    def call_coze_workflow(self, title: str, body_text: str, image_url_list: List[str]) -> Dict[str, Any]:
-        """调用Coze工作流"""
-        try:
-            print(f"正在调用Coze工作流,标题: {title[:50]}...")
-            response = self.coze.run(title, body_text, image_url_list)
-            print("Coze工作流调用成功")
-            return response
-        except Exception as e:
-            print(f"调用Coze工作流失败: {e}")
-            return {"data": "{}"}
-    
-    def extract_coze_result(self, coze_response: Dict[str, Any]) -> Dict[str, str]:
-        """
-        从API响应中提取images_comprehension、title、body_text字段
-        """
-        try:
-            # 获取data字段
-            data = coze_response.get("data")
-            if not data:
-                print("响应中没有data字段")
-                return {"images_comprehension": "", "title": "", "body_text": ""}
-            
-            # 解析data字段(它是JSON字符串)
-            if isinstance(data, str):
-                try:
-                    data = json.loads(data)
-                except json.JSONDecodeError as e:
-                    print(f"data字段JSON解析失败: {e}")
-                    return {"images_comprehension": "", "title": "", "body_text": ""}
-            
-            # 从解析后的data中提取字段
-            extracted_fields = {
-                "images_comprehension": data.get("images_comprehension", ""),
-                "title": data.get("title", ""),
-                "body_text": data.get("body_text", "")
-            }
-            
-            return extracted_fields
-            
-        except Exception as e:
-            print(f"提取Coze结果失败: {e}")
-            return {"images_comprehension": "", "title": "", "body_text": ""}
-    
-    def update_feishu_record(self, record_id: str, result_dict: Dict[str, Any]):
-        """更新飞书表格中的记录"""
-        try:
-            import lark_oapi as lark
-                        # 创建更新记录
-            update_record = (lark.bitable.v1.AppTableRecord.builder()
-                           .record_id(record_id)
-                           .fields({
-                               self.output_field: json.dumps({
-                                    'images_comprehension': result_dict.get('images_comprehension', ''),
-                                    'title': result_dict.get('title', ''),
-                                    'body_text': result_dict.get('body_text', '')
-                                }, ensure_ascii=False)
-                           })
-                           .build())
-            
-            # 执行更新
-            self.feishu.update_record(self.table_id, update_record)
-            print(f"已更新记录 {record_id}")
-            
-        except Exception as e:
-            print(f"更新飞书记录失败: {e}")
-    
-    def process_single_record(self, record) -> bool:
-        """处理单条记录"""
-        try:
-            # 提取内容
-            content = self.extract_content_from_record(record)
-            
-            # 检查是否已经有识别结果
-            fields = record.fields
-            existing_result = fields.get(self.output_field, '')
-            
-            # 如果已有识别结果,则跳过
-            if existing_result and existing_result.strip():
-                try:
-                    # 尝试解析JSON,如果成功说明已有有效结果
-                    json.loads(existing_result)
-                    print(f"记录 {record.record_id} 已有识别结果,跳过")
-                    return True
-                except json.JSONDecodeError:
-                    # 如果JSON解析失败,说明可能是旧格式,继续处理
-                    pass
-            
-            # 检查是否有输入内容
-            if not content['body_text'] or not content['body_text'].strip():
-                print(f"记录 {record.record_id} 没有输入内容,跳过")
-                return True
-            
-            print(f"处理记录 {record.record_id}")
-            print(f"标题: {content['title'][:50]}...")
-            print(f"内容长度: {len(content['body_text'])} 字符")
-            print(f"图片数量: {len(content['image_url_list'])}")
-            
-            # 调用Coze工作流
-            coze_response = self.call_coze_workflow(
-                content['title'],
-                content['body_text'],
-                content['image_url_list']
-            )
-            
-            # 提取结果
-            result_dict = self.extract_coze_result(coze_response)
-            
-            # 更新飞书表格
-            self.update_feishu_record(record.record_id, result_dict)
-            
-            # 添加延迟避免API限制
-            time.sleep(1)
-            
-            return True
-            
-        except Exception as e:
-            print(f"处理记录 {record.record_id} 失败: {e}")
-            return False
-    
-    def process_all_records(self):
-        """处理所有记录"""
-        print(f"开始处理飞书表格 {self.table_id} 中的所有记录")
-        
-        page_token = None
-        total_processed = 0
-        total_success = 0
-        
-        while True:
-            try:
-                # 获取记录
-                result = self.feishu.get_all_records(self.table_id, page_token)
-                
-                if not result.items:
-                    print("没有找到记录")
-                    break
-                
-                print(f"获取到 {len(result.items)} 条记录")
-                
-                # 处理每条记录
-                for record in result.items:
-                    total_processed += 1
-                    if self.process_single_record(record):
-                        total_success += 1
-                
-                # 检查是否有下一页
-                if not result.has_more:
-                    break
-                
-                page_token = result.page_token
-                print(f"继续获取下一页,token: {page_token}")
-                
-            except Exception as e:
-                print(f"获取记录失败: {e}")
-                break
-        
-        print(f"处理完成!总共处理 {total_processed} 条记录,成功 {total_success} 条")
-
-
-def main():
-    """主函数"""
-    # 创建命令行参数解析器
-    parser = argparse.ArgumentParser(description='内容识别脚本 - 处理飞书表格数据')
-    parser.add_argument('table_id', nargs='?', help='飞书表格ID (可选,也可通过环境变量 FEISHU_TABLE_ID 设置)')
-    parser.add_argument('--page-token', help='分页token,用于从指定位置开始处理')
-    parser.add_argument('--dry-run', action='store_true', help='试运行模式,只显示会处理哪些记录,不实际调用API')
-    
-    args = parser.parse_args()
-    
-    try:
-        # 创建内容识别器实例
-        identifier = ContentIdentifier(table_id=args.table_id)
-        
-        print(f"使用表格ID: {identifier.table_id}")
-        
-        if args.dry_run:
-            print("试运行模式:只显示会处理的记录,不实际调用API")
-            # TODO: 实现试运行模式
-            identifier.process_all_records()
-        else:
-            # 正常处理模式
-            if args.page_token:
-                print(f"从分页token开始处理: {args.page_token}")
-                # TODO: 支持从指定分页token开始处理
-                identifier.process_all_records()
-            else:
-                identifier.process_all_records()
-                
-    except Exception as e:
-        print(f"程序执行失败: {e}")
-        sys.exit(1)
-
-
-if __name__ == "__main__":
-    main()

+ 0 - 503
xhs/fit_content.py

@@ -1,503 +0,0 @@
-# 读取飞书表格的抓取结果字段,取出body_text和title,替换识别结果中的body_text和title
-#
-# 功能说明:
-# 1. 读取飞书表格中的"抓取结果"字段
-# 2. 从抓取结果中提取body_text和title
-# 3. 读取"识别结果"字段中的现有内容
-# 4. 用抓取结果中的body_text和title替换识别结果中的相应字段
-# 5. 保持识别结果中的images_comprehension字段不变
-# 6. 更新飞书表格中的识别结果字段
-#
-# 使用方法:
-# 1. 设置环境变量:
-#    - FEISHU_APP_ID: 飞书应用ID
-#    - FEISHU_APP_SECRET: 飞书应用密钥
-#    - FEISHU_FILE_TOKEN: 飞书文件Token
-#    - FEISHU_TABLE_ID: 飞书表格ID (可选,也可在运行时传入)
-#    - FEISHU_CRAWL_FIELD: 抓取结果字段名 (默认: '抓取结果')
-#    - FEISHU_IDENTIFY_FIELD: 识别结果字段名 (默认: '识别结果')
-#
-# 2. 运行脚本:
-#    python fit_content.py [table_id] [--dry-run]
-#
-# 示例:
-#    python fit_content.py tblNdje7z6Cf3hax          # 正常模式
-#    python fit_content.py tblNdje7z6Cf3hax --dry-run # 试运行模式
-#    python fit_content.py --dry-run                  # 使用环境变量中的表格ID,试运行模式
-#
-# 注意事项:
-# - 试运行模式会显示将要处理的内容,但不会实际更新飞书表格
-# - 脚本会自动处理分页,支持大量数据
-# - 如果抓取结果或识别结果解析失败,会跳过该记录并继续处理其他记录
-
-import json
-import os
-import sys
-from typing import Dict, Any, List, Optional
-from dotenv import load_dotenv
-
-# 导入自定义模块
-sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
-from utils.fei_shu import FeiShu
-
-
-class ContentFitter:
-    def __init__(self, table_id: Optional[str] = None):
-        # 加载环境变量
-        load_dotenv()
-        
-        # 初始化飞书客户端
-        self.feishu = FeiShu()
-        
-        # 获取表格ID:优先使用传入的参数,其次使用环境变量
-        self.table_id = table_id or os.getenv('FEISHU_TABLE_ID')
-        if not self.table_id:
-            raise ValueError("请设置环境变量 FEISHU_TABLE_ID 或在运行时传入 table_id 参数")
-        
-        # 字段名称配置
-        self.crawl_field = os.getenv('FEISHU_CRAWL_FIELD', '抓取结果')
-        self.identify_field = os.getenv('FEISHU_IDENTIFY_FIELD', '识别结果')
-    
-    def extract_crawl_content(self, crawl_result) -> Dict[str, str]:
-        """从抓取结果中提取body_text和title"""
-        title = ''
-        body_text = ''
-        
-        if not crawl_result:
-            return {'title': title, 'body_text': body_text}
-        
-        try:
-            # 如果是字符串格式,尝试直接解析
-            if isinstance(crawl_result, str):
-                json_data = json.loads(crawl_result)
-            elif isinstance(crawl_result, list) and len(crawl_result) > 0:
-                # 如果是数组格式,取第一个元素
-                crawl_data = crawl_result[0]
-                if isinstance(crawl_data, dict):
-                    if 'text' in crawl_data:
-                        # 如果crawl_data是包含text字段的字典
-                        json_data = json.loads(crawl_data['text'])
-                    else:
-                        # 如果crawl_data是直接的字典数据
-                        json_data = crawl_data
-                else:
-                    # 如果crawl_data不是字典,尝试直接解析
-                    json_data = crawl_data
-            elif isinstance(crawl_result, dict):
-                # 如果crawl_result本身就是字典
-                json_data = crawl_result
-            else:
-                # 其他情况,尝试直接使用
-                json_data = crawl_result
-            
-            # 确保json_data是字典类型
-            if not isinstance(json_data, dict):
-                print(f"抓取结果格式不正确,期望字典类型,实际类型: {type(json_data)}")
-                return {'title': title, 'body_text': body_text}
-            
-            # 提取标题和正文内容
-            title = json_data.get('title', '')
-            body_text = json_data.get('body_text', '')
-            
-        except (json.JSONDecodeError, KeyError, TypeError, AttributeError) as e:
-            print(f"解析抓取结果失败: {e}")
-            # 如果解析失败,尝试直接使用文本内容
-            if isinstance(crawl_result, str):
-                body_text = crawl_result
-            elif isinstance(crawl_result, list) and len(crawl_result) > 0:
-                # 如果是列表,尝试将第一个元素转为字符串
-                body_text = str(crawl_result[0])
-        
-        return {'title': title, 'body_text': body_text}
-    
-    def extract_identify_content(self, identify_result) -> Dict[str, Any]:
-        """从识别结果中提取现有内容"""
-        images_comprehension = []
-        title = ''
-        body_text = ''
-        
-        if not identify_result:
-            print(f"  调试: identify_result为空")
-            return {'images_comprehension': images_comprehension, 'title': title, 'body_text': body_text}
-        
-        print(f"  调试: identify_result类型: {type(identify_result)}")
-        print(f"  调试: identify_result内容前100字符: {identify_result[:100]}...")
-        
-        try:
-            # 如果是字符串格式,尝试解析JSON
-            if isinstance(identify_result, str):
-                print(f"  调试: 尝试解析字符串格式的identify_result")
-                json_data = self.safe_json_loads(identify_result)
-                if json_data is None:
-                    print(f"  调试: JSON解析失败,返回空结果")
-                    return {'images_comprehension': images_comprehension, 'title': title, 'body_text': body_text}
-            elif isinstance(identify_result, dict):
-                print(f"  调试: identify_result本身就是字典")
-                json_data = identify_result
-            elif isinstance(identify_result, list) and len(identify_result) > 0:
-                print(f"  调试: identify_result是列表,合并所有元素的内容")
-                # 合并列表中所有元素的text字段
-                combined_text = ""
-                for i, item in enumerate(identify_result):
-                    if isinstance(item, dict) and 'text' in item:
-                        combined_text += item['text']
-                        print(f"  调试: 合并第{i+1}个元素的text,当前长度: {len(combined_text)}")
-                
-                print(f"  调试: 合并后的文本长度: {len(combined_text)}")
-                if combined_text:
-                    json_data = self.safe_json_loads(combined_text)
-                    if json_data is None:
-                        print(f"  调试: 合并后JSON解析失败")
-                        return {'images_comprehension': images_comprehension, 'title': title, 'body_text': body_text}
-                else:
-                    print(f"  调试: 没有找到text字段")
-                    return {'images_comprehension': images_comprehension, 'title': title, 'body_text': body_text}
-            else:
-                print(f"  调试: identify_result是其他类型: {type(identify_result)}")
-                json_data = identify_result
-            
-            # 确保json_data是字典类型
-            if not isinstance(json_data, dict):
-                print(f"识别结果格式不正确,期望字典类型,实际类型: {type(json_data)}")
-                return {'images_comprehension': images_comprehension, 'title': title, 'body_text': body_text}
-            
-            print(f"  调试: json_data键: {list(json_data.keys())}")
-            
-            # 检查是否有text字段,如果有,尝试解析其中的JSON
-            if 'text' in json_data and isinstance(json_data['text'], str):
-                print(f"  调试: 发现text字段,尝试解析其中的JSON")
-                text_content = self.safe_json_loads(json_data['text'])
-                if text_content and isinstance(text_content, dict):
-                    print(f"  调试: text字段解析成功,键: {list(text_content.keys())}")
-                    # 从text_content中提取字段
-                    images_comprehension = text_content.get('images_comprehension', [])
-                    title = text_content.get('title', '')
-                    body_text = text_content.get('body_text', '')
-                else:
-                    print(f"  调试: text字段解析失败或不是字典")
-                    # 如果text字段解析失败,尝试直接提取images_comprehension数组
-                    print(f"  调试: 尝试直接从text字段中提取images_comprehension数组")
-                    images_comprehension = self.extract_images_comprehension_from_text(json_data['text'])
-                    title = json_data.get('title', '')
-                    body_text = json_data.get('body_text', '')
-            else:
-                # 直接从json_data中提取字段
-                images_comprehension = json_data.get('images_comprehension', [])
-                title = json_data.get('title', '')
-                body_text = json_data.get('body_text', '')
-            
-            print(f"  调试: 提取的images_comprehension类型: {type(images_comprehension)}, 值: {images_comprehension}")
-            
-            # 确保images_comprehension是列表格式
-            if not isinstance(images_comprehension, list):
-                if isinstance(images_comprehension, str):
-                    # 如果是字符串,尝试解析为列表
-                    try:
-                        print(f"  调试: images_comprehension是字符串,尝试解析为列表")
-                        images_comprehension = json.loads(images_comprehension)
-                        if not isinstance(images_comprehension, list):
-                            images_comprehension = []
-                    except (json.JSONDecodeError, TypeError):
-                        print(f"  调试: 解析images_comprehension字符串失败")
-                        images_comprehension = []
-                else:
-                    print(f"  调试: images_comprehension不是列表也不是字符串,设置为空列表")
-                    images_comprehension = []
-            
-            # 调试信息:打印images_comprehension的结构
-            if images_comprehension:
-                print(f"  调试: images_comprehension类型: {type(images_comprehension)}, 长度: {len(images_comprehension)}")
-                if len(images_comprehension) > 0:
-                    print(f"  调试: 第一个元素类型: {type(images_comprehension[0])}")
-                    if isinstance(images_comprehension[0], dict):
-                        print(f"  调试: 第一个元素键: {list(images_comprehension[0].keys())}")
-            else:
-                print(f"  调试: images_comprehension为空")
-            
-        except (json.JSONDecodeError, KeyError, TypeError, AttributeError) as e:
-            print(f"解析识别结果失败: {e}")
-            print(f"  调试: identify_result类型: {type(identify_result)}")
-            if isinstance(identify_result, str):
-                print(f"  调试: identify_result内容前100字符: {identify_result[:100]}...")
-        
-        return {'images_comprehension': images_comprehension, 'title': title, 'body_text': body_text}
-    
-    def merge_content(self, crawl_content: Dict[str, str], identify_content: Dict[str, Any]) -> Dict[str, Any]:
-        """合并抓取内容和识别内容,用抓取内容替换识别内容中的title和body_text"""
-        return {
-            'images_comprehension': identify_content.get('images_comprehension', []),  # 保持数组格式
-            'title': crawl_content.get('title', ''),  # 使用抓取结果的title
-            'body_text': crawl_content.get('body_text', '')  # 使用抓取结果的body_text
-        }
-    
-    def update_feishu_record(self, record_id: str, merged_content: Dict[str, Any]):
-        """更新飞书表格中的记录"""
-        try:
-            import lark_oapi as lark
-            
-            # 创建更新记录
-            update_record = (lark.bitable.v1.AppTableRecord.builder()
-                           .record_id(record_id)
-                           .fields({
-                               self.identify_field: json.dumps(merged_content, ensure_ascii=False)
-                           })
-                           .build())
-            
-            # 执行更新
-            self.feishu.update_record(self.table_id, update_record)
-            print(f"已更新记录 {record_id}")
-            
-        except Exception as e:
-            print(f"更新飞书记录失败: {e}")
-    
-    def process_single_record(self, record, dry_run: bool = False) -> bool:
-        """处理单条记录"""
-        try:
-            fields = record.fields
-            
-            # 提取抓取结果
-            crawl_result = fields.get(self.crawl_field, '')
-            if not crawl_result:
-                print(f"记录 {record.record_id} 没有抓取结果,跳过")
-                return True
-            
-            # 提取识别结果
-            identify_result = fields.get(self.identify_field, '')
-            
-            print(f"  调试: 原始identify_result类型: {type(identify_result)}")
-            if isinstance(identify_result, str):
-                print(f"  调试: 原始identify_result内容前200字符: {identify_result[:200]}...")
-            
-            # 从抓取结果中提取title和body_text
-            crawl_content = self.extract_crawl_content(crawl_result)
-            
-            # 从识别结果中提取现有内容
-            identify_content = self.extract_identify_content(identify_result)
-            
-            # 合并内容,用抓取结果替换识别结果中的title和body_text
-            merged_content = self.merge_content(crawl_content, identify_content)
-            
-            print(f"处理记录 {record.record_id}")
-            print(f"  抓取结果 - 标题: {crawl_content['title'][:50] if crawl_content['title'] else '无标题'}...")
-            print(f"  抓取结果 - 内容长度: {len(crawl_content['body_text'])} 字符")
-            
-            # 处理images_comprehension的打印
-            images_comp = identify_content['images_comprehension']
-            if isinstance(images_comp, list) and len(images_comp) > 0:
-                # 显示第一个元素的内容预览
-                first_item = images_comp[0]
-                if isinstance(first_item, dict):
-                    content_preview = first_item.get('content', '')[:50] if first_item.get('content') else '无内容'
-                    print(f"  识别结果 - 图片理解: [{len(images_comp)}项] 第一项内容: {content_preview}...")
-                else:
-                    images_comp_text = str(first_item)[:50] + "..." if len(str(first_item)) > 50 else str(first_item)
-                    print(f"  识别结果 - 图片理解: [{len(images_comp)}项] {images_comp_text}")
-            else:
-                print(f"  识别结果 - 图片理解: 无图片理解")
-            
-            if not dry_run:
-                # 更新飞书表格
-                self.update_feishu_record(record.record_id, merged_content)
-            else:
-                print(f"  [试运行] 将更新识别结果字段,新内容: {json.dumps(merged_content, ensure_ascii=False)[:100]}...")
-            
-            return True
-            
-        except Exception as e:
-            print(f"处理记录 {record.record_id} 失败: {e}")
-            return False
-    
-    def process_all_records(self, dry_run: bool = False):
-        """处理所有记录"""
-        mode_text = "试运行模式" if dry_run else "正常模式"
-        print(f"开始处理飞书表格 {self.table_id} 中的所有记录 ({mode_text})")
-        
-        page_token = None
-        total_processed = 0
-        total_success = 0
-        
-        while True:
-            try:
-                # 获取记录
-                result = self.feishu.get_all_records(self.table_id, page_token)
-                
-                if not result.items:
-                    print("没有找到记录")
-                    break
-                
-                print(f"获取到 {len(result.items)} 条记录")
-                
-                # 处理每条记录
-                for record in result.items:
-                    total_processed += 1
-                    if self.process_single_record(record, dry_run):
-                        total_success += 1
-                
-                # 检查是否有下一页
-                if not result.has_more:
-                    break
-                
-                page_token = result.page_token
-                print(f"继续获取下一页,token: {page_token}")
-                
-            except Exception as e:
-                print(f"获取记录失败: {e}")
-                break
-        
-        print(f"处理完成!总共处理 {total_processed} 条记录,成功 {total_success} 条")
-
-    def safe_json_loads(self, json_str: str) -> Any:
-        """安全地解析JSON字符串,处理可能的语法错误"""
-        if not isinstance(json_str, str):
-            return json_str
-        
-        try:
-            return json.loads(json_str)
-        except json.JSONDecodeError as e:
-            print(f"  调试: JSON解析失败: {e}")
-            # 尝试修复常见的JSON语法错误
-            try:
-                # 移除多余的逗号
-                fixed_json = json_str.replace(',,', ',')
-                # 移除末尾的逗号
-                fixed_json = fixed_json.rstrip(',')
-                # 移除末尾的多个逗号
-                while fixed_json.endswith(',}'):
-                    fixed_json = fixed_json[:-2] + '}'
-                while fixed_json.endswith(',]'):
-                    fixed_json = fixed_json[:-2] + ']'
-                
-                # 尝试修复未终止的字符串
-                if 'Unterminated string' in str(e):
-                    print(f"  调试: 检测到未终止的字符串,尝试修复")
-                    # 查找最后一个完整的JSON对象
-                    import re
-                    # 查找匹配的大括号
-                    brace_count = 0
-                    end_pos = -1
-                    for i, char in enumerate(fixed_json):
-                        if char == '{':
-                            brace_count += 1
-                        elif char == '}':
-                            brace_count -= 1
-                            if brace_count == 0:
-                                end_pos = i
-                                break
-                    
-                    if end_pos > 0:
-                        fixed_json = fixed_json[:end_pos + 1]
-                        print(f"  调试: 截取到位置 {end_pos + 1}")
-                
-                return json.loads(fixed_json)
-            except json.JSONDecodeError:
-                print(f"  调试: 修复JSON后仍然解析失败")
-                # 尝试更激进的修复
-                try:
-                    # 如果还是失败,尝试找到最后一个有效的JSON对象
-                    import re
-                    # 查找最后一个完整的JSON对象
-                    pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
-                    matches = re.findall(pattern, json_str)
-                    if matches:
-                        last_match = matches[-1]
-                        return json.loads(last_match)
-                except:
-                    pass
-                
-                # 最后的尝试:手动构建JSON对象
-                try:
-                    print(f"  调试: 尝试手动提取关键字段")
-                    # 尝试提取images_comprehension字段
-                    import re
-                    # 查找images_comprehension数组的开始
-                    pattern = r'"images_comprehension":\s*\[(.*?)\]'
-                    match = re.search(pattern, json_str, re.DOTALL)
-                    if match:
-                        array_content = match.group(1)
-                        # 尝试解析数组内容
-                        try:
-                            # 构建一个简单的JSON对象
-                            simple_json = f'{{"images_comprehension": [{array_content}]}}'
-                            return json.loads(simple_json)
-                        except:
-                            pass
-                except:
-                    pass
-                
-                return None
-
-    def extract_images_comprehension_from_text(self, text: str) -> list:
-        """直接从文本中提取images_comprehension数组"""
-        try:
-            import re
-            # 查找images_comprehension数组的开始和结束
-            pattern = r'"images_comprehension":\s*\[(.*?)\]'
-            match = re.search(pattern, text, re.DOTALL)
-            if match:
-                array_content = match.group(1)
-                print(f"  调试: 找到images_comprehension数组内容,长度: {len(array_content)}")
-                
-                # 尝试解析数组内容
-                try:
-                    # 构建一个简单的JSON对象
-                    simple_json = f'{{"images_comprehension": [{array_content}]}}'
-                    result = json.loads(simple_json)
-                    return result.get('images_comprehension', [])
-                except json.JSONDecodeError as e:
-                    print(f"  调试: 解析数组内容失败: {e}")
-                    
-                    # 尝试手动解析数组中的对象
-                    try:
-                        # 查找数组中的每个对象
-                        object_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
-                        objects = re.findall(object_pattern, array_content)
-                        print(f"  调试: 找到 {len(objects)} 个对象")
-                        
-                        parsed_objects = []
-                        for obj_str in objects:
-                            try:
-                                obj = json.loads(obj_str)
-                                parsed_objects.append(obj)
-                            except:
-                                # 如果单个对象解析失败,跳过
-                                continue
-                        
-                        return parsed_objects
-                    except Exception as e2:
-                        print(f"  调试: 手动解析对象失败: {e2}")
-                        return []
-            else:
-                print(f"  调试: 未找到images_comprehension数组")
-                return []
-        except Exception as e:
-            print(f"  调试: 提取images_comprehension失败: {e}")
-            return []
-
-
-def main():
-    """主函数"""
-    import argparse
-    
-    parser = argparse.ArgumentParser(description='读取飞书表格抓取结果,替换识别结果中的body_text和title')
-    parser.add_argument('table_id', nargs='?', help='飞书表格ID')
-    parser.add_argument('--dry-run', action='store_true', help='试运行模式,只显示会处理的记录,不实际更新')
-    
-    args = parser.parse_args()
-    
-    try:
-        # 创建ContentFitter实例
-        fitter = ContentFitter(args.table_id)
-        
-        # 处理所有记录
-        fitter.process_all_records(dry_run=args.dry_run)
-            
-    except Exception as e:
-        print(f"程序执行失败: {e}")
-        sys.exit(1)
-
-
-if __name__ == '__main__':
-    main()
-
-