jihuaqiang пре 1 месец
родитељ
комит
dfad1ed4a1

+ 1 - 1
.env

@@ -8,7 +8,7 @@ FEISHU_FILE_TOKEN=VEBsbCfaWa3gF3slQILc6Rybnde
 
 # 扣子
 COZE_API_KEY=pat_pClXS15hyuqohC9TK58vU7130Hp6QmmHlnyW2TjFpKVWKsW2B1VniFwdXkY3eRNB
-COZE_BOT_ID=7535375645057351720
+COZE_BOT_ID=7537570163895812146
 
 # Gemini
 GEMINI_API_KEY=AIzaSyC0J8gtl5I6-nu6fgvQrfnWkw0QIzfXEWE

+ 165 - 0
content_indentify/CONTINUOUS_PROCESSING.md

@@ -0,0 +1,165 @@
+# 连续处理模式使用说明
+
+## 概述
+
+连续处理模式是一个自动化执行流程,能够连续处理数据库中的记录,一条完成后自动处理下一条,直到所有记录处理完成或达到限制条件。
+
+## 功能特点
+
+✅ **自动化处理**:无需人工干预,自动连续处理数据库记录  
+✅ **智能停止**:连续失败3次后自动停止,避免无限循环  
+✅ **进度监控**:实时显示处理进度、成功率和统计信息  
+✅ **灵活配置**:可设置最大处理数量和间隔时间  
+✅ **用户控制**:支持 Ctrl+C 随时中断处理  
+✅ **错误处理**:完善的异常处理和错误恢复机制  
+
+## 使用方法
+
+### 1. 命令行启动
+
+```bash
+# 基本连续处理(无数量限制)
+python3 indentify.py --continuous
+
+# 设置最大处理数量
+python3 indentify.py --continuous --max-records 100
+
+# 设置处理间隔时间(秒)
+python3 indentify.py --continuous --delay 5
+
+# 组合使用
+python3 indentify.py --continuous --max-records 50 --delay 3
+```
+
+### 2. 启动脚本
+
+```bash
+# 运行启动脚本
+./run.sh
+
+# 选择选项 3(连续处理模式)
+# 然后根据需要设置参数
+```
+
+### 3. 参数说明
+
+| 参数 | 说明 | 默认值 |
+|------|------|--------|
+| `--continuous` | 启用连续处理模式 | - |
+| `--max-records` | 最大处理记录数量 | 无限制 |
+| `--delay` | 处理间隔时间(秒) | 2秒 |
+
+## 工作流程
+
+1. **启动检查**:检查数据库连接和待处理记录
+2. **循环处理**:逐条处理数据库记录
+3. **状态更新**:更新记录的处理状态
+4. **进度监控**:显示处理进度和统计信息
+5. **智能停止**:达到条件后自动停止
+
+## 处理逻辑
+
+### 成功处理
+- 记录处理成功后,状态更新为完成
+- 重置连续失败计数
+- 继续处理下一条记录
+
+### 失败处理
+- 记录处理失败时,增加连续失败计数
+- 连续失败3次后自动停止
+- 显示详细的错误信息
+
+### 自动停止条件
+- 所有记录处理完成
+- 达到最大处理数量限制
+- 连续失败3次
+- 用户手动中断(Ctrl+C)
+
+## 监控信息
+
+### 实时显示
+- 当前处理的记录信息
+- 处理时间和状态
+- 成功/失败统计
+- 剩余待处理记录数量
+
+### 最终统计
+- 总处理数量
+- 成功数量
+- 失败数量
+- 成功率百分比
+
+## 配置建议
+
+### 处理间隔时间
+- **快速处理**:1-2秒(适合测试环境)
+- **正常处理**:2-5秒(适合生产环境)
+- **保守处理**:5-10秒(避免API限制)
+
+### 最大处理数量
+- **测试环境**:10-50条
+- **生产环境**:100-500条
+- **无限制**:适合批量处理任务
+
+## 注意事项
+
+⚠️ **API限制**:注意第三方API的调用频率限制  
+⚠️ **资源消耗**:长时间运行会消耗系统资源  
+⚠️ **数据库连接**:确保数据库连接稳定  
+⚠️ **错误处理**:关注失败记录的原因分析  
+
+## 故障排除
+
+### 常见问题
+
+1. **连续失败过多**
+   - 检查数据库连接
+   - 验证API配置
+   - 查看错误日志
+
+2. **处理速度过慢**
+   - 调整处理间隔时间
+   - 检查网络延迟
+   - 优化处理逻辑
+
+3. **内存占用过高**
+   - 减少最大处理数量
+   - 增加处理间隔时间
+   - 检查内存泄漏
+
+### 日志分析
+
+系统会输出详细的处理日志,包括:
+- 每条记录的处理状态
+- 错误信息和异常堆栈
+- 性能统计和资源使用情况
+
+## 示例输出
+
+```
+启动连续处理模式...
+系统将自动处理数据库中的记录,一条完成后自动处理下一条
+处理间隔: 2 秒
+无数量限制,将处理所有可用记录
+按 Ctrl+C 可以随时停止处理
+------------------------------------------------------------
+
+--- 处理第 1 条记录 ---
+时间: 2024-01-15 14:30:25
+开始处理记录 ID: 123, 内容ID: content_456
+  频道: xiaohongshu
+  标题: 小红书内容识别测试...
+  内容: 这是一个测试内容...
+开始内容识别处理...
+✅ 记录处理成功 (成功: 1, 失败: 0)
+剩余待处理记录: 45 条
+等待 2 秒后处理下一条记录...
+
+--- 处理第 2 条记录 ---
+时间: 2024-01-15 14:30:30
+...
+```
+
+## 总结
+
+连续处理模式是一个强大的自动化工具,能够高效地处理大量数据库记录。通过合理的配置和监控,可以实现稳定、高效的批量处理流程。 

+ 113 - 0
content_indentify/image_identifier.py

@@ -0,0 +1,113 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+图文识别脚本
+主要功能:使用 Coze API 分析图片内容
+"""
+
+import os
+import json
+import time
+import sys
+from typing import Dict, Any, List, Optional
+from dotenv import load_dotenv
+
+# 导入自定义模块
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from coze.coze_hook import CozeHook
+
+
+class ImageIdentifier:
+    def __init__(self):
+        # 加载环境变量
+        load_dotenv()
+        
+        # 初始化Coze客户端
+        self.coze = CozeHook()
+    
+    def extract_image_urls(self, formatted_content: Dict[str, Any]) -> List[str]:
+        """提取图片URL列表"""
+        image_urls = []
+        image_url_list = formatted_content.get('image_url_list', [])
+        
+        for img_data in image_url_list:
+            if isinstance(img_data, dict) and 'image_url' in img_data:
+                image_urls.append(img_data['image_url'])
+        
+        return image_urls
+    
+    def analyze_images_with_coze(self, image_urls: List[str]) -> Dict[str, Any]:
+        """使用Coze API分析图片内容"""
+        try:
+            if not image_urls:
+                return {"images_comprehension": [], "error": "没有图片需要分析"}
+            
+            print(f"正在使用Coze API分析 {len(image_urls)} 张图片...")
+            response = self.coze.run(image_urls)
+            
+            # 解析Coze响应
+            if response and 'data' in response:
+                try:
+                    if isinstance(response['data'], str):
+                        data = json.loads(response['data'])
+                    else:
+                        data = response['data']
+                    
+                    return {
+                        "images_comprehension": data.get('images_comprehension', []),
+                    }
+                except json.JSONDecodeError:
+                    return {"images_comprehension": [], "error": "Coze响应解析失败", "raw_response": response}
+            else:
+                return {"images_comprehension": [], "error": "Coze API响应异常", "raw_response": response}
+                
+        except Exception as e:
+            print(f"Coze API调用失败: {e}")
+            return {"images_comprehension": [], "error": f"Coze API调用失败: {str(e)}"}
+    
+    def process_images(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]:
+        """处理图片识别的主函数"""
+        print("开始图片识别处理...")
+        
+        # 提取图片URL
+        image_urls = self.extract_image_urls(formatted_content)
+        print(f"提取到 {len(image_urls)} 张图片")
+        
+        if not image_urls:
+            print("没有图片需要分析")
+            return {"images_comprehension": [], "error": "没有图片需要分析"}
+        
+        # 分析图片
+        result = self.analyze_images_with_coze(image_urls)
+        
+        if result.get("images_comprehension"):
+            print(f"图片识别完成,共分析 {len(result['images_comprehension'])} 张图片")
+        else:
+            print("图片识别失败")
+        
+        return result
+
+
+def main():
+    """测试函数"""
+    # 模拟数据
+    test_content = {
+        "image_url_list": [
+            {
+                "image_type": 2,
+                "image_url": "http://example.com/image1.jpg"
+            }
+        ]
+    }
+    
+    identifier = ImageIdentifier()
+    result = identifier.process_images(
+        test_content
+    )
+    
+    print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
+
+
+if __name__ == '__main__':
+    main() 

+ 351 - 0
content_indentify/indentify.py

@@ -0,0 +1,351 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+内容识别脚本
+主要功能:
+1. 从数据库中拉取一条 recognition_status = 0 的数据 : 
+2. 解析 formatted_content 中的图片和视频
+3. 调用独立的图文识别和视频识别模块
+4. 将识别结果更新到数据库
+"""
+
+import os
+import json
+import time
+import sys
+import argparse
+from typing import Dict, Any, List, Optional
+from dotenv import load_dotenv
+
+# 导入自定义模块
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from utils.mysql_db import MysqlHelper
+from content_indentify.image_identifier import ImageIdentifier
+from content_indentify.video_identifier import VideoIdentifier
+
+
+class ContentIdentifier:
+    def __init__(self):
+        # 加载环境变量
+        load_dotenv()
+        
+        # 初始化数据库连接
+        self.db = MysqlHelper()
+        
+        # 初始化识别模块
+        self.image_identifier = ImageIdentifier()
+        self.video_identifier = VideoIdentifier()
+    
+    def get_unprocessed_record(self) -> Optional[Dict[str, Any]]:
+        """从数据库获取一条未处理的数据"""
+        sql = """
+        SELECT id, formatted_content 
+        FROM knowledge_search_content 
+        WHERE recognition_status = 0
+        LIMIT 1
+        """
+        
+        try:
+            result = self.db.get_values(sql)
+            if result and len(result) > 0:
+                record = result[0]
+                # 检查返回的字段数量
+                if len(record) >= 3:
+                    return {
+                        'id': record[0],
+                        'formatted_content': record[1],
+                        'channel_content_id': record[2]
+                    }
+                elif len(record) == 2:
+                    # 如果没有channel_content_id字段,使用id作为默认值
+                    return {
+                        'id': record[0],
+                        'formatted_content': record[1],
+                        'channel_content_id': record[0]  # 使用id作为默认值
+                    }
+                else:
+                    print(f"数据库返回字段数量异常: {len(record)}, 期望至少2个字段")
+                    return None
+            return None
+        except Exception as e:
+            print(f"获取未处理记录失败: {e}")
+            return None
+    
+    def parse_formatted_content(self, formatted_content: str) -> Dict[str, Any]:
+        """解析 formatted_content JSON 字符串"""
+        try:
+            if isinstance(formatted_content, str):
+                return json.loads(formatted_content)
+            elif isinstance(formatted_content, dict):
+                return formatted_content
+            else:
+                raise ValueError(f"不支持的数据类型: {type(formatted_content)}")
+        except json.JSONDecodeError as e:
+            print(f"解析 formatted_content JSON 失败: {e}")
+            raise
+    
+    def process_content_recognition(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]:
+        """处理内容识别,调用独立的识别模块"""
+        print("开始内容识别处理...")
+        
+        # 图片识别
+        image_result = self.image_identifier.process_images(formatted_content)
+        
+        # 视频识别
+        video_result = self.video_identifier.process_videos(formatted_content)
+        
+        # 整合结果
+        recognition_result = {
+            'image_analysis': image_result,
+            'video_analysis': video_result
+        }
+
+        print(f"识别结果: {recognition_result}")
+        
+        return recognition_result
+    
+    def update_multimodal_recognition(self, record_id: int, recognition_result: Dict[str, Any]) -> bool:
+        """更新数据库中的 multimodal_recognition 字段"""
+        try:
+            # 将结果转换为JSON字符串,并处理换行符问题
+            result_json = json.dumps(recognition_result, ensure_ascii=False)
+            # 将换行符替换为 \n 字符串,确保JSON可以被正确解析
+            result_json = result_json.replace('\n', '\\n').replace('\r', '\\r')
+
+            print(f"result_json: {result_json}")
+            
+            # 构建更新SQL - 使用参数化查询避免换行符问题
+            sql = "UPDATE knowledge_search_content SET multimodal_recognition = %s, updated_at = NOW(), recognition_status = 2 WHERE id = %s"
+            params = (result_json, record_id)
+            
+            # 执行更新
+            result = self.db.update_values(sql, params)
+            if result is not None:
+                print(f"已更新记录 {record_id} 的 multimodal_recognition 字段")
+                return True
+            else:
+                print(f"更新记录 {record_id} 失败")
+                return False
+                
+        except Exception as e:
+            print(f"更新数据库失败: {e}")
+            return False
+    
+    def process_single_record(self) -> bool:
+        """处理单条记录"""
+        try:
+            # 获取未处理的记录
+            record = self.get_unprocessed_record()
+            if not record:
+                print("没有找到未处理的记录")
+                return False
+            
+            print(f"开始处理记录 ID: {record['id']}, 内容ID: {record['channel_content_id']}")
+            # print(f"  多模态识别: {record['multimodal_recognition'][:300]}...")
+
+            # 先设置这条记录的 recognition_status = 1
+            self.db.update_values(f"UPDATE knowledge_search_content SET recognition_status = 3 WHERE id = {record['id']}")
+            
+            # 解析 formatted_content
+            formatted_content = self.parse_formatted_content(record['formatted_content'])
+            
+            # 提取基本信息,处理 null 值
+            title = formatted_content.get('title') or ''
+            content = formatted_content.get('body_text') or ''
+            channel = formatted_content.get('channel') or ''
+            images = formatted_content.get('image_url_list') or []
+            videos = formatted_content.get('video_url_list') or []
+            author = formatted_content.get('channel_account_name') or ''
+            like_count = formatted_content.get('like_count') or 0
+            collect_count = formatted_content.get('collect_count') or 0
+            comment_count = formatted_content.get('comment_count') or 0
+            view_count = formatted_content.get('view_count') or 0
+            publish_time = formatted_content.get('publish_time') or ''
+            update_timestamp = formatted_content.get('update_timestamp') or ''
+            content_link = formatted_content.get('content_link') or ''
+            content_id = formatted_content.get('channel_content_id') or ''
+            
+            # 安全地显示信息,避免 null 值导致的错误
+            print(f"  频道: {channel}")
+            print(f"  标题: {title[:100] if title else '(无标题)'}...")
+            print(f"  内容: {content[:200] if content else '(无内容)'}...")
+            
+            # 调用内容识别处理
+            recognition_result = self.process_content_recognition(formatted_content)
+            
+            # 构建完整的识别结果
+            complete_result = {
+                'id': record['id'],
+                'channel': channel,
+                'title': title,
+                'content': content,
+                'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []),
+                'videos': recognition_result.get('video_analysis', {}).get('videos_comprehension', []),
+                'meta': {
+                    'author': author,
+                    'like_count': like_count,
+                    'collect_count': collect_count,
+                    'comment_count': comment_count,
+                    'view_count': view_count,
+                    'publish_time': publish_time,
+                    'update_timestamp': update_timestamp,
+                    'content_link': content_link,
+                    'content_id': content_id,
+                }
+            }
+            
+            # 更新数据库
+            success = self.update_multimodal_recognition(record['id'], complete_result)
+            
+            if success:
+                print(f"记录 {record['id']} 处理完成")
+                return True
+            else:
+                print(f"记录 {record['id']} 处理失败")
+                return False
+                
+        except Exception as e:
+            print(f"处理记录失败: {e}")
+            return False
+    
+    def process_all_records(self, max_records: int = 10):
+        """处理多条记录"""
+        print(f"开始批量处理,最多处理 {max_records} 条记录")
+        
+        processed_count = 0
+        success_count = 0
+        
+        for i in range(max_records):
+            print(f"\n--- 处理第 {i+1}/{max_records} 条记录 ---")
+            
+            if self.process_single_record():
+                success_count += 1
+            else:
+                print("没有更多记录需要处理,结束批量处理")
+                break
+            
+            processed_count += 1
+            
+            # 添加延迟避免API限制
+            time.sleep(2)
+        
+        print(f"\n批量处理完成!总共处理 {processed_count} 条记录,成功 {success_count} 条")
+
+    def process_continuous(self, max_records: int = None, delay_seconds: int = 2):
+        """连续处理记录,直到没有更多记录或达到最大数量限制"""
+        print("启动连续处理模式...")
+        print("系统将自动处理数据库中的记录,一条完成后自动处理下一条")
+        print(f"处理间隔: {delay_seconds} 秒")
+        if max_records:
+            print(f"最大处理数量: {max_records} 条")
+        else:
+            print("无数量限制,将处理所有可用记录")
+        print("按 Ctrl+C 可以随时停止处理")
+        print("-" * 60)
+        
+        processed_count = 0
+        success_count = 0
+        consecutive_failures = 0
+        max_consecutive_failures = 3  # 连续失败3次后停止
+        
+        try:
+            while True:
+                # 检查是否达到最大数量限制
+                if max_records and processed_count >= max_records:
+                    print(f"\n已达到最大处理数量限制 ({max_records} 条),停止处理")
+                    break
+                
+                print(f"\n--- 处理第 {processed_count + 1} 条记录 ---")
+                print(f"时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
+                
+                # 处理单条记录
+                if self.process_single_record():
+                    success_count += 1
+                    consecutive_failures = 0  # 重置连续失败计数
+                    print(f"✅ 记录处理成功 (成功: {success_count}, 失败: {processed_count - success_count + 1})")
+                else:
+                    consecutive_failures += 1
+                    print(f"❌ 记录处理失败 (成功: {success_count}, 失败: {processed_count - success_count + 1})")
+                    
+                    # 检查连续失败次数
+                    if consecutive_failures >= max_consecutive_failures:
+                        print(f"\n⚠️  连续失败 {max_consecutive_failures} 次,可能没有更多记录需要处理")
+                        print("停止连续处理")
+                        break
+                
+                processed_count += 1
+                
+                # 检查是否还有更多记录
+                remaining_records = self.get_remaining_records_count()
+                if remaining_records == 0:
+                    print(f"\n🎉 所有记录已处理完成!总共处理 {processed_count} 条记录")
+                    break
+                
+                print(f"剩余待处理记录: {remaining_records} 条")
+                
+                # 添加延迟避免API限制
+                if delay_seconds > 0:
+                    print(f"等待 {delay_seconds} 秒后处理下一条记录...")
+                    time.sleep(delay_seconds)
+                
+        except KeyboardInterrupt:
+            print(f"\n\n⏹️  用户中断处理")
+            print(f"已处理 {processed_count} 条记录,成功 {success_count} 条")
+        except Exception as e:
+            print(f"\n\n💥 处理过程中发生错误: {e}")
+            print(f"已处理 {processed_count} 条记录,成功 {success_count} 条")
+        
+        print(f"\n📊 连续处理完成!")
+        print(f"总处理数量: {processed_count}")
+        print(f"成功数量: {success_count}")
+        print(f"失败数量: {processed_count - success_count}")
+        if processed_count > 0:
+            success_rate = (success_count / processed_count) * 100
+            print(f"成功率: {success_rate:.1f}%")
+
+    def get_remaining_records_count(self) -> int:
+        """获取剩余待处理记录数量"""
+        try:
+            sql = "SELECT COUNT(*) FROM knowledge_search_content WHERE recognition_status = 0"
+            result = self.db.get_values(sql)
+            if result and len(result) > 0:
+                return result[0][0]
+            return 0
+        except Exception as e:
+            print(f"获取剩余记录数量失败: {e}")
+            return 0
+
+
+def main():
+    """主函数"""
+    parser = argparse.ArgumentParser(description='内容识别脚本 - 分析图片和视频内容')
+    parser.add_argument('--single', action='store_true', help='只处理一条记录')
+    parser.add_argument('--batch', type=int, default=10, help='批量处理记录数量,默认10条')
+    parser.add_argument('--continuous', action='store_true', help='连续处理模式,自动处理所有可用记录')
+    parser.add_argument('--max-records', type=int, help='连续处理模式下的最大处理数量限制')
+    parser.add_argument('--delay', type=int, default=2, help='处理间隔时间(秒),默认2秒')
+    
+    args = parser.parse_args()
+    
+    try:
+        # 创建ContentIdentifier实例
+        identifier = ContentIdentifier()
+        
+        if args.single:
+            # 处理单条记录
+            identifier.process_single_record()
+        elif args.continuous:
+            # 连续处理模式
+            identifier.process_continuous(args.max_records, args.delay)
+        else:
+            # 批量处理记录
+            identifier.process_all_records(args.batch)
+            
+    except Exception as e:
+        print(f"程序执行失败: {e}")
+        sys.exit(1)
+
+
+if __name__ == '__main__':
+    main()

+ 51 - 0
content_indentify/install_deps.sh

@@ -0,0 +1,51 @@
+#!/bin/bash
+
+# 内容识别脚本依赖安装脚本
+
+echo "开始安装内容识别脚本的依赖包..."
+
+# 检查Python环境
+if ! command -v python3 &> /dev/null; then
+    echo "错误: 未找到 python3 命令"
+    exit 1
+fi
+
+# 检查pip
+if ! command -v pip3 &> /dev/null; then
+    echo "错误: 未找到 pip3 命令"
+    exit 1
+fi
+
+echo "Python版本: $(python3 --version)"
+echo "Pip版本: $(pip3 --version)"
+
+# 安装基础依赖
+echo "安装基础依赖..."
+pip3 install python-dotenv pymysql
+
+# 安装HTTP请求库(包含SOCKS支持)
+echo "安装HTTP请求库..."
+pip3 install "httpx[socks]" requests
+
+# 安装AI API客户端
+echo "安装AI API客户端..."
+pip3 install google-genai
+
+# 安装飞书API客户端
+echo "安装飞书API客户端..."
+pip3 install lark-oapi
+
+# 安装日志工具
+echo "安装日志工具..."
+pip3 install loguru
+
+echo ""
+echo "依赖安装完成!"
+echo ""
+echo "如果遇到权限问题,请使用:"
+echo "  sudo pip3 install -r requirements.txt"
+echo ""
+echo "或者使用虚拟环境:"
+echo "  python3 -m venv venv"
+echo "  source venv/bin/activate"
+echo "  pip install -r requirements.txt" 

+ 18 - 0
content_indentify/requirements.txt

@@ -0,0 +1,18 @@
+# 内容识别脚本依赖包
+
+# 基础依赖
+python-dotenv>=1.0.0
+pymysql>=1.0.0
+
+# HTTP请求库
+requests>=2.31.0
+httpx[socks]>=0.24.0
+
+# AI API客户端
+google-genai>=1.29.0
+
+# 飞书API客户端(如果需要)
+lark-oapi>=1.4.20
+
+# 日志和工具
+loguru>=0.7.0 

+ 94 - 0
content_indentify/run.sh

@@ -0,0 +1,94 @@
+#!/bin/bash
+
+# 内容识别脚本启动脚本
+
+echo "内容识别脚本启动中..."
+
+# 检查Python环境
+if ! command -v python3 &> /dev/null; then
+    echo "错误: 未找到 python3 命令"
+    exit 1
+fi
+
+# 检查依赖
+echo "检查依赖..."
+python3 -c "import pymysql, google.genai, requests, dotenv" 2>/dev/null
+if [ $? -ne 0 ]; then
+    echo "错误: 缺少必要的依赖包,请先安装: pip install -r requirements.txt"
+    exit 1
+fi
+
+# 检查环境变量文件
+if [ ! -f ".env" ]; then
+    echo "警告: 未找到 .env 文件,将使用默认配置"
+    echo "建议创建 .env 文件并配置必要的环境变量"
+fi
+
+# 显示帮助信息
+echo ""
+echo "使用方法:"
+echo "  python3 indentify.py --single                    # 处理单条记录"
+echo "  python3 indentify.py --batch 20                  # 批量处理20条记录"
+echo "  python3 indentify.py --continuous                # 连续处理模式(无限制)"
+echo "  python3 indentify.py --continuous --max-records 100  # 连续处理最多100条"
+echo "  python3 indentify.py --continuous --delay 5      # 连续处理,间隔5秒"
+echo "  python3 indentify.py --help                      # 显示帮助信息"
+echo ""
+
+# 运行测试(可选)
+read -p "是否先运行测试?(y/N): " -n 1 -r
+echo
+if [[ $REPLY =~ ^[Yy]$ ]]; then
+    echo "运行测试..."
+    python3 test_identify.py
+    echo ""
+fi
+
+# 询问运行模式
+echo "请选择运行模式:"
+echo "1) 处理单条记录"
+echo "2) 批量处理记录"
+echo "3) 连续处理模式(推荐)"
+echo "4) 退出"
+
+read -p "请输入选择 (1-4): " choice
+
+case $choice in
+    1)
+        echo "启动单条记录处理模式..."
+        python3 indentify.py --single
+        ;;
+    2)
+        read -p "请输入批量处理数量 (默认10): " batch_size
+        batch_size=${batch_size:-10}
+        echo "启动批量处理模式,处理 $batch_size 条记录..."
+        python3 indentify.py --batch $batch_size
+        ;;
+    3)
+        echo "启动连续处理模式..."
+        echo "此模式将自动处理数据库中的所有记录,一条完成后自动处理下一条"
+        echo ""
+        read -p "是否设置最大处理数量限制?(y/N): " -n 1 -r
+        echo
+        if [[ $REPLY =~ ^[Yy]$ ]]; then
+            read -p "请输入最大处理数量: " max_records
+            read -p "请输入处理间隔时间(秒,默认2): " delay
+            delay=${delay:-2}
+            echo "启动连续处理模式,最多处理 $max_records 条记录,间隔 $delay 秒..."
+            python3 indentify.py --continuous --max-records $max_records --delay $delay
+        else
+            read -p "请输入处理间隔时间(秒,默认2): " delay
+            delay=${delay:-2}
+            echo "启动连续处理模式,无数量限制,间隔 $delay 秒..."
+            python3 indentify.py --continuous --delay $delay
+        fi
+        ;;
+    4)
+        echo "退出"
+        exit 0
+        ;;
+    *)
+        echo "无效选择,退出"
+        exit 1
+        ;;
+esac 

+ 166 - 0
content_indentify/video_identifier.py

@@ -0,0 +1,166 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+视频识别脚本
+主要功能:使用 Gemini API 分析视频内容
+"""
+
+import os
+import json
+import time
+import sys
+from typing import Dict, Any, List, Optional
+from dotenv import load_dotenv
+
+# 导入自定义模块
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from gemini import GeminiProcessor
+
+
+class VideoIdentifier:
+    def __init__(self):
+        # 加载环境变量
+        load_dotenv()
+        
+        # 初始化Gemini客户端
+        self.gemini = GeminiProcessor()
+        
+        # 系统提示词
+        self.video_system_prompt = """你是一个专业的视频内容分析专家。请分析视频中的内容,包括:
+1. 视频的主要内容和主题
+2. 视频中的文字内容(如果有)
+3. 视频的风格和特点
+4. 视频可能表达的情感或意图
+5. 视频的背景音乐或语音内容(如果有)
+
+请用简洁、准确的语言描述视频内容,重点关注文字内容和主要视觉元素。"""
+    
+    def extract_video_urls(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
+        """提取视频URL列表"""
+        video_data = []
+        video_url_list = formatted_content.get('video_url_list', [])
+        
+        for video_item in video_url_list:
+            if isinstance(video_item, dict) and 'video_url' in video_item:
+                video_data.append({
+                    'url': video_item['video_url'],
+                    'duration': video_item.get('video_duration', 0)
+                })
+        
+        return video_data
+    
+    def analyze_videos_with_gemini(self, video_data: List[Dict[str, Any]]) -> Dict[str, Any]:
+        """使用Gemini API分析视频内容"""
+        try:
+            if not video_data:
+                return {"videos_comprehension": [], "error": "没有视频需要分析"}
+            
+            print(f"正在使用Gemini API分析 {len(video_data)} 个视频...")
+            
+            videos_comprehension = []
+            for i, video in enumerate(video_data):
+                print(f"  分析视频 {i+1}/{len(video_data)}: {video['url'][:50]}...")
+                
+                # 构建分析提示
+                prompt = f"""请分析以下视频内容:
+视频时长: {video['duration']}秒
+视频链接: {video['url']}
+
+请从以下角度分析视频内容:
+1. 视频的主要内容和主题
+2. 视频中的文字内容(如果有)
+3. 视频的风格和特点
+4. 视频可能表达的情感或意图
+5. 视频的背景音乐或语音内容(如果有)
+
+请用简洁、准确的语言描述视频内容。"""
+                
+                # 调用Gemini API
+                try:
+                    response = self.gemini.process(
+                        content=prompt,
+                        system_prompt=self.video_system_prompt,
+                        model_name="gemini-2.5-flash"
+                    )
+                    
+                    if response:
+                        videos_comprehension.append({
+                            'video_url': video['url'],
+                            'duration': video['duration'],
+                            'comprehension': response,
+                            'analysis_timestamp': int(time.time() * 1000)
+                        })
+                    else:
+                        videos_comprehension.append({
+                            'video_url': video['url'],
+                            'duration': video['duration'],
+                            'comprehension': 'Gemini API分析失败',
+                            'analysis_timestamp': int(time.time() * 1000)
+                        })
+                    
+                    # 添加延迟避免API限制
+                    time.sleep(1)
+                    
+                except Exception as e:
+                    print(f"  视频 {i+1} 分析失败: {e}")
+                    videos_comprehension.append({
+                        'video_url': video['url'],
+                        'duration': video['duration'],
+                        'comprehension': f'分析失败: {str(e)}',
+                        'analysis_timestamp': int(time.time() * 1000)
+                    })
+            
+            return {"videos_comprehension": videos_comprehension}
+            
+        except Exception as e:
+            print(f"Gemini API调用失败: {e}")
+            return {"videos_comprehension": [], "error": f"Gemini API调用失败: {str(e)}"}
+    
+    def process_videos(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]:
+        """处理视频识别的主函数"""
+        print("开始视频识别处理...")
+        
+        # 提取视频URL
+        video_data = self.extract_video_urls(formatted_content)
+        print(f"提取到 {len(video_data)} 个视频")
+        
+        if not video_data:
+            print("没有视频需要分析")
+            return {"videos_comprehension": [], "error": "没有视频需要分析"}
+        
+        # 分析视频
+        result = self.analyze_videos_with_gemini(video_data)
+        
+        if result.get("videos_comprehension"):
+            print(f"视频识别完成,共分析 {len(result['videos_comprehension'])} 个视频")
+        else:
+            print("视频识别失败")
+        
+        return result
+
+
+def main():
+    """测试函数"""
+    # 模拟数据
+    test_content = {
+        "video_url_list": [
+            {
+                "video_url": "http://example.com/video1.mp4",
+                "video_duration": 30
+            }
+        ]
+    }
+    
+    identifier = VideoIdentifier()
+    result = identifier.process_videos(
+        test_content["title"],
+        test_content["body_text"],
+        test_content
+    )
+    
+    print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
+
+
+if __name__ == '__main__':
+    main() 

+ 3 - 5
coze/coze_hook.py

@@ -10,14 +10,12 @@ class CozeHook(object):
             "Authorization": "Bearer " + os.getenv('COZE_API_KEY')
         }
         self.hook_id = os.getenv('COZE_BOT_ID')
-    def call_coze_api(self, title, body_text, image_url_list, hook_id=None):
+    def call_coze_api(self, image_url_list, hook_id=None):
         url = self.url
         headers = self.headers
         payload = {
             "workflow_id": hook_id or self.hook_id,
             "parameters": {
-                "title": title,
-                "body_text": body_text,
                 "image_url_list": image_url_list
             }
         }
@@ -25,6 +23,6 @@ class CozeHook(object):
         response.raise_for_status()
         return response.json()
     
-    def run(self, title, body_text, image_url_list): 
-        resp = self.call_coze_api(title, body_text, image_url_list)
+    def run(self, image_url_list): 
+        resp = self.call_coze_api(image_url_list)
         return resp

+ 11 - 3
utils/mysql_db.py

@@ -16,7 +16,12 @@ class MysqlHelper:
             passwd="wqsd@2025",  # mysql用户登录密码
             db="ai_knowledge",  # 数据库名
             # 如果数据库里面的文本是utf8编码的,charset指定是utf8
-            charset="utf8")
+            charset="utf8",
+            # 超时设置
+            connect_timeout=30,      # 连接超时时间(秒)
+            read_timeout=30,         # 读取超时时间(秒)
+            write_timeout=30        # 写入超时时间(秒)
+        )
         return connection
 
     @classmethod
@@ -45,14 +50,17 @@ class MysqlHelper:
             print(f"get_values异常:{e}\n")
 
     @classmethod
-    def update_values(cls, sql):
+    def update_values(cls, sql, params=None):
         # 连接数据库
         connect = cls.connect_mysql()
         # 返回一个 Cursor对象
         mysql = connect.cursor()
         try:
             # 执行 sql 语句
-            res = mysql.execute(sql)
+            if params:
+                res = mysql.execute(sql, params)
+            else:
+                res = mysql.execute(sql)
             # 注意 一定要commit,否则添加数据不生效
             connect.commit()
             return res