jihuaqiang há 2 meses atrás
pai
commit
aca6c2c5d0

BIN
.DS_Store


+ 335 - 0
agent.py

@@ -0,0 +1,335 @@
+import json
+import sys
+import os
+import argparse
+import signal
+from http.server import BaseHTTPRequestHandler, HTTPServer
+from urllib.parse import urlparse
+from typing import Any, Dict, List, Optional, Tuple
+
+# 保证可以导入本项目模块
+sys.path.append(os.path.dirname(os.path.abspath(__file__)))
+
+from utils.logging_config import get_logger
+from agent_tools import QueryDataTool, IdentifyTool, StructureTool
+from agent_process import start_daemon, stop_daemon, status_daemon
+
+# 可选引入 LangGraph(如未安装,将在运行时优雅回退到顺序执行)
+HAS_LANGGRAPH = False
+try:
+    from langgraph.graph import StateGraph, END
+    HAS_LANGGRAPH = True
+except Exception:
+    HAS_LANGGRAPH = False
+
+
+logger = get_logger('Agent')
+
+PID_FILE = os.path.join(os.path.dirname(__file__), 'agent_scheduler.pid')
+
+
+class ReactAgent:
+    def __init__(self) -> None:
+        self.identify_tool = IdentifyTool()
+
+    def handle_request(self, request_id: str) -> Dict[str, Any]:
+        items = QueryDataTool.fetch_crawl_data_list(request_id)
+        if not items:
+            return {"requestId": request_id, "processed": 0, "success": 0, "details": []}
+
+        success_count = 0
+        details: List[Dict[str, Any]] = []
+        for idx, item in enumerate(items, start=1):
+            crawl_data = item.get('crawl_data') or {}
+
+            # Step 1: 识别
+            identify_result = self.identify_tool.run(crawl_data if isinstance(crawl_data, dict) else {})
+
+            # Step 2: 结构化并入库
+            affected = StructureTool.store_parsing_result(request_id, item.get('raw') or {}, identify_result)
+            ok = affected is not None and affected > 0
+            if ok:
+                success_count += 1
+
+            details.append({
+                "index": idx,
+                "dbInserted": ok,
+                "identifyError": identify_result.get('error'),
+            })
+
+        return {
+            "requestId": request_id,
+            "processed": len(items),
+            "success": success_count,
+            "details": details,
+        }
+
+
+AGENT = ReactAgent()
+
+
+# =========================
+# LangGraph 风格实现(可选)
+# =========================
+def build_langgraph_app():
+    if not HAS_LANGGRAPH:
+        return None
+
+    # 状态:以 dict 形式承载
+    # 输入: {"request_id": str}
+    # 输出附加: items, details, processed, success
+
+    def node_fetch(state: Dict[str, Any]) -> Dict[str, Any]:
+        request_id = str(state.get("request_id", ""))
+        items = QueryDataTool.fetch_crawl_data_list(request_id)
+        return {
+            **state,
+            "items": items,
+            "details": [],
+            "processed": 0,
+            "success": 0,
+        }
+
+    identify_tool = IdentifyTool()
+
+    def node_process(state: Dict[str, Any]) -> Dict[str, Any]:
+        request_id = str(state.get("request_id", ""))
+        items: List[Dict[str, Any]] = state.get("items", []) or []
+        details: List[Dict[str, Any]] = []
+        success_count = 0
+
+        for idx, item in enumerate(items, start=1):
+            crawl_data = item.get('crawl_data') or {}
+            identify_result = identify_tool.run(crawl_data if isinstance(crawl_data, dict) else {})
+            affected = StructureTool.store_parsing_result(request_id, item.get('raw') or {}, identify_result)
+            ok = affected is not None and affected > 0
+            if ok:
+                success_count += 1
+            details.append({
+                "index": idx,
+                "dbInserted": ok,
+                "identifyError": identify_result.get('error'),
+            })
+
+        return {
+            **state,
+            "details": details,
+            "processed": len(items),
+            "success": success_count,
+        }
+
+    graph = StateGraph(dict)
+    graph.add_node("fetch", node_fetch)
+    graph.add_node("process", node_process)
+
+    graph.set_entry_point("fetch")
+    graph.add_edge("fetch", "process")
+    graph.add_edge("process", END)
+
+    return graph.compile()
+
+
+APP = build_langgraph_app()
+
+
+class AgentHttpHandler(BaseHTTPRequestHandler):
+    def _set_headers(self, status_code: int = 200):
+        self.send_response(status_code)
+        self.send_header('Content-Type', 'application/json; charset=utf-8')
+        self.end_headers()
+
+    def do_POST(self):
+        parsed = urlparse(self.path)
+        if parsed.path != '/trigger':
+            self._set_headers(404)
+            self.wfile.write(json.dumps({"error": "not found"}).encode('utf-8'))
+            return
+
+        length = int(self.headers.get('Content-Length', '0') or '0')
+        body = self.rfile.read(length) if length > 0 else b''
+        try:
+            payload = json.loads(body.decode('utf-8')) if body else {}
+        except Exception:
+            self._set_headers(400)
+            self.wfile.write(json.dumps({"error": "invalid json"}).encode('utf-8'))
+            return
+
+        request_id = (payload or {}).get('requestId')
+        if not request_id:
+            self._set_headers(400)
+            self.wfile.write(json.dumps({"error": "requestId is required"}).encode('utf-8'))
+            return
+
+        try:
+            logger.info(f"收到触发请求: requestId={request_id}")
+            if APP is not None:
+                result = APP.invoke({"request_id": str(request_id)})
+                # 标准化返回
+                result = {
+                    "requestId": str(request_id),
+                    "processed": result.get("processed", 0),
+                    "success": result.get("success", 0),
+                    "details": result.get("details", []),
+                }
+            else:
+                # 回退到顺序执行
+                result = AGENT.handle_request(str(request_id))
+            self._set_headers(200)
+            self.wfile.write(json.dumps(result, ensure_ascii=False).encode('utf-8'))
+        except Exception as e:
+            logger.error(f"处理失败: {e}")
+            self._set_headers(500)
+            self.wfile.write(json.dumps({"error": str(e)}).encode('utf-8'))
+
+    def log_message(self, format: str, *args) -> None:
+        # 重定向默认日志到我们统一的 logger
+        logger.info("HTTP " + (format % args))
+
+
+def run(host: str = '0.0.0.0', port: int = 8080):
+    server_address = (host, port)
+    httpd = HTTPServer(server_address, AgentHttpHandler)
+
+    def _graceful_shutdown(signum, frame):
+        try:
+            logger.info(f"收到信号 {signum},正在停止HTTP服务...")
+            # shutdown 会在其他线程优雅停止; 这里我们直接关闭,避免阻塞
+            httpd.shutdown()
+        except Exception:
+            pass
+    for sig in (signal.SIGINT, signal.SIGTERM):
+        signal.signal(sig, _graceful_shutdown)
+
+    logger.info(f"Agent HTTP 服务已启动: http://{host}:{port}/trigger")
+    try:
+        httpd.serve_forever()
+    finally:
+        try:
+            httpd.server_close()
+        except Exception:
+            pass
+        logger.info("Agent HTTP 服务已停止")
+
+
+def _write_pid_file(pid: int) -> None:
+    with open(PID_FILE, 'w') as f:
+        f.write(str(pid))
+
+
+def _read_pid_file() -> Optional[int]:
+    if not os.path.exists(PID_FILE):
+        return None
+    try:
+        with open(PID_FILE, 'r') as f:
+            content = f.read().strip()
+            return int(content) if content else None
+    except Exception:
+        return None
+
+
+def _is_process_running(pid: int) -> bool:
+    try:
+        os.kill(pid, 0)
+        return True
+    except Exception:
+        return False
+
+
+def start_daemon(host: str, port: int) -> Dict[str, Any]:
+    old_pid = _read_pid_file()
+    if old_pid and _is_process_running(old_pid):
+        return {"status": "already_running", "pid": old_pid}
+
+    python_exec = sys.executable
+    script_path = os.path.abspath(__file__)
+    args = [python_exec, script_path, "--serve", "--host", host, "--port", str(port)]
+
+    with open(os.devnull, 'wb') as devnull:
+        proc = subprocess.Popen(
+            args,
+            stdout=devnull,
+            stderr=devnull,
+            stdin=devnull,
+            close_fds=True,
+            preexec_fn=os.setsid if hasattr(os, 'setsid') else None,
+        )
+
+    _write_pid_file(proc.pid)
+    # 简单等待,确认进程未立即退出
+    time.sleep(0.5)
+    running = _is_process_running(proc.pid)
+    return {"status": "started" if running else "failed", "pid": proc.pid}
+
+
+def stop_daemon(timeout: float = 5.0) -> Dict[str, Any]:
+    pid = _read_pid_file()
+    if not pid:
+        return {"status": "not_running"}
+
+    if not _is_process_running(pid):
+        try:
+            os.remove(PID_FILE)
+        except Exception:
+            pass
+        return {"status": "not_running"}
+
+    try:
+        os.kill(pid, signal.SIGTERM)
+    except Exception as e:
+        return {"status": "error", "error": str(e)}
+
+    start_time = time.time()
+    while time.time() - start_time < timeout:
+        if not _is_process_running(pid):
+            break
+        time.sleep(0.2)
+
+    if _is_process_running(pid):
+        try:
+            os.kill(pid, signal.SIGKILL)
+        except Exception as e:
+            return {"status": "error", "error": str(e)}
+
+    try:
+        os.remove(PID_FILE)
+    except Exception:
+        pass
+
+    return {"status": "stopped"}
+
+
+def status_daemon() -> Dict[str, Any]:
+    pid = _read_pid_file()
+    if pid and _is_process_running(pid):
+        return {"status": "running", "pid": pid}
+    return {"status": "not_running"}
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(description='Agent 服务管理')
+    parser.add_argument('--serve', action='store_true', help='以前台模式启动HTTP服务')
+    parser.add_argument('--host', default='0.0.0.0', help='监听地址')
+    parser.add_argument('--port', type=int, default=8080, help='监听端口')
+    parser.add_argument('command', nargs='?', choices=['start', 'stop', 'status'], help='守护进程管理命令')
+    args = parser.parse_args()
+
+    if args.serve:
+        run(args.host, args.port)
+        sys.exit(0)
+
+    if args.command == 'start':
+        res = start_daemon(args.host, args.port)
+        print(json.dumps(res, ensure_ascii=False))
+        sys.exit(0 if res.get('status') == 'started' else 1)
+    elif args.command == 'stop':
+        res = stop_daemon()
+        print(json.dumps(res, ensure_ascii=False))
+        sys.exit(0 if res.get('status') in ('stopped', 'not_running') else 1)
+    elif args.command == 'status':
+        res = status_daemon()
+        print(json.dumps(res, ensure_ascii=False))
+        sys.exit(0 if res.get('status') in ('running', 'not_running') else 1)
+    else:
+        # 默认行为:以前台启动(兼容旧用法)
+        run(args.host, args.port)
+

+ 102 - 0
agent_process.py

@@ -0,0 +1,102 @@
+import os
+import sys
+import json
+import time
+import signal
+import subprocess
+from typing import Any, Dict, Optional
+
+PID_FILE = os.path.join(os.path.dirname(__file__), 'agent_scheduler.pid')
+
+
+def _write_pid_file(pid: int) -> None:
+    with open(PID_FILE, 'w') as f:
+        f.write(str(pid))
+
+
+def _read_pid_file() -> Optional[int]:
+    if not os.path.exists(PID_FILE):
+        return None
+    try:
+        with open(PID_FILE, 'r') as f:
+            content = f.read().strip()
+            return int(content) if content else None
+    except Exception:
+        return None
+
+
+def _is_process_running(pid: int) -> bool:
+    try:
+        os.kill(pid, 0)
+        return True
+    except Exception:
+        return False
+
+
+def start_daemon(host: str, port: int) -> Dict[str, Any]:
+    old_pid = _read_pid_file()
+    if old_pid and _is_process_running(old_pid):
+        return {"status": "already_running", "pid": old_pid}
+
+    python_exec = sys.executable
+    script_path = os.path.join(os.path.dirname(__file__), 'agent.py')
+    args = [python_exec, script_path, "--serve", "--host", host, "--port", str(port)]
+
+    with open(os.devnull, 'wb') as devnull:
+        proc = subprocess.Popen(
+            args,
+            stdout=devnull,
+            stderr=devnull,
+            stdin=devnull,
+            close_fds=True,
+            preexec_fn=os.setsid if hasattr(os, 'setsid') else None,
+        )
+
+    _write_pid_file(proc.pid)
+    time.sleep(0.5)
+    running = _is_process_running(proc.pid)
+    return {"status": "started" if running else "failed", "pid": proc.pid}
+
+
+def stop_daemon(timeout: float = 5.0) -> Dict[str, Any]:
+    pid = _read_pid_file()
+    if not pid:
+        return {"status": "not_running"}
+
+    if not _is_process_running(pid):
+        try:
+            os.remove(PID_FILE)
+        except Exception:
+            pass
+        return {"status": "not_running"}
+
+    try:
+        os.kill(pid, signal.SIGTERM)
+    except Exception as e:
+        return {"status": "error", "error": str(e)}
+
+    start_time = time.time()
+    while time.time() - start_time < timeout:
+        if not _is_process_running(pid):
+            break
+        time.sleep(0.2)
+
+    if _is_process_running(pid):
+        try:
+            os.kill(pid, signal.SIGKILL)
+        except Exception as e:
+            return {"status": "error", "error": str(e)}
+
+    try:
+        os.remove(PID_FILE)
+    except Exception:
+        pass
+
+    return {"status": "stopped"}
+
+
+def status_daemon() -> Dict[str, Any]:
+    pid = _read_pid_file()
+    if pid and _is_process_running(pid):
+        return {"status": "running", "pid": pid}
+    return {"status": "not_running"}

+ 129 - 0
agent_tools.py

@@ -0,0 +1,129 @@
+import os
+import sys
+import json
+from typing import Any, Dict, List, Optional
+
+sys.path.append(os.path.dirname(os.path.abspath(__file__)))
+
+from utils.logging_config import get_logger
+from utils.mysql_db import MysqlHelper
+from indentify.indentify import ContentIdentifier
+
+logger = get_logger('AgentTools')
+
+
+class QueryDataTool:
+    """查询 knowledge_crawl_content 获取 data 列表中的 crawl_data 字段"""
+
+    @staticmethod
+    def fetch_crawl_data_list(request_id: str) -> List[Dict[str, Any]]:
+        sql = "SELECT data FROM knowledge_crawl_content WHERE request_id = %s ORDER BY id ASC"
+        rows = MysqlHelper.get_values(sql, (request_id,))
+        if not rows:
+            logger.info(f"request_id={request_id} 未查询到数据")
+            return []
+
+        results: List[Dict[str, Any]] = []
+        for row in rows:
+            data_cell = row[0]
+            if not data_cell:
+                continue
+            try:
+                parsed = json.loads(data_cell) if isinstance(data_cell, (str, bytes)) else data_cell
+                if isinstance(parsed, list):
+                    for item in parsed:
+                        if isinstance(item, dict):
+                            crawl_data = item.get('crawl_data')
+                            if isinstance(crawl_data, (dict, list)):
+                                results.append({"crawl_data": crawl_data, "raw": item})
+                            else:
+                                results.append({"crawl_data": item, "raw": item})
+                elif isinstance(parsed, dict):
+                    crawl_data = parsed.get('crawl_data')
+                    if isinstance(crawl_data, (dict, list)):
+                        results.append({"crawl_data": crawl_data, "raw": parsed})
+                    else:
+                        results.append({"crawl_data": parsed, "raw": parsed})
+                else:
+                    logger.warning("data 字段非期望的 JSON 结构,已跳过一行")
+            except Exception as e:
+                logger.error(f"解析 data JSON 失败: {e}")
+        logger.info(f"request_id={request_id} 提取 crawl_data 数量: {len(results)}")
+        return results
+
+
+class IdentifyTool:
+    """调用 indentify 内部能力,完成图像/视频识别"""
+
+    def __init__(self) -> None:
+        self.identifier = ContentIdentifier()
+
+    def run(self, crawl_data: Dict[str, Any]) -> Dict[str, Any]:
+        try:
+            formatted_content = self.identifier.parse_formatted_content(crawl_data)
+            recognition_result = self.identifier.process_content_recognition(formatted_content)
+
+            title = formatted_content.get('title') or ''
+            content = formatted_content.get('body_text') or ''
+            channel = formatted_content.get('channel') or ''
+            author = formatted_content.get('channel_account_name') or ''
+            like_count = formatted_content.get('like_count') or 0
+            collect_count = formatted_content.get('collect_count') or 0
+            comment_count = formatted_content.get('comment_count') or 0
+            view_count = formatted_content.get('view_count') or 0
+            publish_time = formatted_content.get('publish_time') or ''
+            update_timestamp = formatted_content.get('update_timestamp') or ''
+            content_link = formatted_content.get('content_link') or ''
+            content_id = formatted_content.get('channel_content_id') or ''
+
+            complete_result = {
+                'channel': channel,
+                'title': title,
+                'content': content,
+                'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []),
+                'videos': recognition_result.get('video_analysis', {}),
+                'meta': {
+                    'author': author,
+                    'like_count': like_count,
+                    'collect_count': collect_count,
+                    'comment_count': comment_count,
+                    'view_count': view_count,
+                    'publish_time': publish_time,
+                    'update_timestamp': update_timestamp,
+                    'content_link': content_link,
+                    'content_id': content_id,
+                }
+            }
+            return complete_result
+        except Exception as e:
+            logger.error(f"识别失败: {e}")
+            return {
+                'channel': '',
+                'title': '',
+                'content': '',
+                'images': [],
+                'videos': [],
+                'meta': {},
+                'error': str(e)
+            }
+
+
+class StructureTool:
+    """
+    结构化工具:按照既定的结构将识别结果与原始 crawl_data 组合,
+    并存入 knowledge_parsing_content 表。
+    """
+
+    @staticmethod
+    def store_parsing_result(request_id: str, crawl_raw: Dict[str, Any], identify_result: Dict[str, Any]) -> Optional[int]:
+        payload = {
+            'request_id': request_id,
+            'crawl_raw': crawl_raw,
+            'identify_result': identify_result,
+        }
+        sql = (
+            "INSERT INTO knowledge_parsing_content (request_id, parsing_result, created_at) "
+            "VALUES (%s, %s, NOW())"
+        )
+        params = (request_id, json.dumps(payload, ensure_ascii=False))
+        return MysqlHelper.update_values(sql, params)

+ 9 - 0
config.py

@@ -0,0 +1,9 @@
+# 飞书配置
+# 推荐通过环境变量设置这些敏感信息,以增强安全性,避免硬编码。
+# 如果环境变量未设置,则使用默认的 "YOUR_..." 占位符。
+FEISHU_APP_ID="cli_a76c35b8fa7d500c"
+FEISHU_APP_SECRET="xHpF7H9nBwXeCH2Z1YRDScWSXzyktq36"
+
+
+# Gemini 配置  (付费账号)
+GEMINI_API_KEY = "AIzaSyAkt1l9Kw1CQgHFzTpla0vgt0OE53fr-BI"

+ 175 - 0
indentify/image_identifier.py

@@ -0,0 +1,175 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+图文识别脚本
+主要功能:使用 Gemini API 进行图片OCR识别
+"""
+
+import os
+import json
+import time
+import sys
+from typing import Dict, Any, List, Optional
+from dotenv import load_dotenv
+import google.generativeai as genai
+from PIL import Image
+import requests
+from io import BytesIO
+
+# 导入自定义模块
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+
+class ImageIdentifier:
+    def __init__(self):
+        # 加载环境变量
+        load_dotenv()
+        
+        # 初始化Gemini API
+        api_key = os.getenv('GEMINI_API_KEY')
+        if not api_key:
+            raise ValueError("请在环境变量中设置 GEMINI_API_KEY")
+        
+        genai.configure(api_key=api_key)
+        self.model = genai.GenerativeModel('gemini-2.5-flash')
+    
+    def download_image(self, image_url: str) -> Optional[Image.Image]:
+        """下载图片并转换为PIL Image对象"""
+        try:
+            response = requests.get(image_url, timeout=10)
+            response.raise_for_status()
+            image = Image.open(BytesIO(response.content))
+            return image
+        except Exception as e:
+            print(f"下载图片失败 {image_url}: {e}")
+            return None
+    
+    def extract_image_urls(self, formatted_content: Dict[str, Any]) -> List[str]:
+        """提取图片URL列表"""
+        image_urls = []
+        image_url_list = formatted_content.get('image_url_list', [])
+        
+        for img_data in image_url_list:
+            if isinstance(img_data, dict) and 'image_url' in img_data:
+                image_urls.append(img_data['image_url'])
+        
+        return image_urls
+    
+    def analyze_image_with_gemini(self, image: Image.Image) -> Dict[str, Any]:
+        """使用Gemini API分析单张图片内容"""
+        try:
+            # 构建OCR提示词
+            prompt = """
+            #### 人设
+            你是一名图像文字理解专家,请对输入的文章图片进行精准的文字提取和结构化整理。
+
+            #### 任务要求如下:
+            1. 仅提取图片中可见的文字内容,不需要改写、总结或推理隐藏信息。
+            2. 如果图片包含结构(如表格、图表、标题、段落等),请按结构输出。
+            3. 所有提取的内容需保持原始顺序和排版上下文的逻辑。
+            4. 不需要进行OCR校正,只需要原样提取图中文字。
+            5. 舍弃图片中和标题不相关的文字
+            6. 对于结构不明确或自由排列的文字,按照从上到下、从左到右的顺序依次提取。
+            """
+            
+            response = self.model.generate_content([prompt, image])
+            
+            return {
+                "text_content": response.text,
+                "success": True
+            }
+            
+        except Exception as e:
+            print(f"Gemini API调用失败: {e}")
+            return {
+                "text_content": "",
+                "success": False,
+                "error": str(e)
+            }
+    
+    def analyze_images_with_gemini(self, image_urls: List[str]) -> Dict[str, Any]:
+        """使用Gemini API分析多张图片内容"""
+        try:
+            if not image_urls:
+                return {"images_comprehension": [], "error": "没有图片需要分析"}
+            
+            print(f"正在使用Gemini API分析 {len(image_urls)} 张图片...")
+            results = []
+            
+            for i, image_url in enumerate(image_urls):
+                print(f"正在处理第 {i+1} 张图片: {image_url}")
+                
+                # 下载图片
+                image = self.download_image(image_url)
+                if image is None:
+                    results.append({
+                        "image_url": image_url,
+                        "text_content": "",
+                        "success": False,
+                        "error": "图片下载失败"
+                    })
+                    continue
+                
+                # 分析图片
+                result = self.analyze_image_with_gemini(image)
+                result["image_url"] = image_url
+                results.append(result)
+                
+                # 添加延迟避免API限制
+                time.sleep(1)
+            
+            return {
+                "images_comprehension": results
+            }
+                
+        except Exception as e:
+            print(f"Gemini API批量调用失败: {e}")
+            return {"images_comprehension": [], "error": f"Gemini API调用失败: {str(e)}"}
+    
+    def process_images(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]:
+        """处理图片识别的主函数"""
+        print("开始图片OCR识别处理...")
+        
+        # 提取图片URL
+        image_urls = self.extract_image_urls(formatted_content)
+        print(f"提取到 {len(image_urls)} 张图片")
+        
+        if not image_urls:
+            print("没有图片需要分析")
+            return {"images_comprehension": [], "error": "没有图片需要分析"}
+        
+        # 分析图片
+        result = self.analyze_images_with_gemini(image_urls)
+        
+        if result.get("images_comprehension"):
+            successful_count = sum(1 for img in result['images_comprehension'] if img.get('success', False))
+            print(f"图片OCR识别完成,成功分析 {successful_count}/{len(result['images_comprehension'])} 张图片")
+        else:
+            print("图片OCR识别失败")
+        
+        return result
+
+
+def main():
+    """测试函数"""
+    # 模拟数据
+    test_content = {
+        "image_url_list": [
+            {
+                "image_type": 2,
+                "image_url": "http://rescdn.yishihui.com/pipeline/image/ea4f33e9-9e36-4124-aaec-138ea9bcadd9.jpg"
+            }
+        ]
+    }
+    
+    try:
+        identifier = ImageIdentifier()
+        result = identifier.process_images(test_content)
+        
+        print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
+    except Exception as e:
+        print(f"初始化失败: {e}")
+
+
+if __name__ == '__main__':
+    main() 

+ 392 - 0
indentify/indentify.py

@@ -0,0 +1,392 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+内容识别脚本
+主要功能:
+1. 从数据库中拉取一条 recognition_status = 0 的数据 : 
+2. 解析 formatted_content 中的图片和视频
+3. 调用独立的图文识别和视频识别模块
+4. 将识别结果更新到数据库
+"""
+
+import os
+import json
+import time
+import sys
+import argparse
+from typing import Dict, Any, List, Optional
+from dotenv import load_dotenv
+from datetime import datetime
+
+# 导入自定义模块
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from utils.mysql_db import MysqlHelper
+from content_indentify.image_identifier import ImageIdentifier
+from content_indentify.video_identifier import VideoIdentifier
+from utils.logging_config import get_logger
+
+
+class ContentIdentifier:
+    def __init__(self):
+        # 加载环境变量
+        load_dotenv()
+        
+        # 设置日志
+        self.logger = get_logger('ContentIdentifier')
+        
+        # 初始化数据库连接
+        self.db = MysqlHelper()
+        
+        # 初始化识别模块
+        self.image_identifier = ImageIdentifier()
+        self.video_identifier = VideoIdentifier()
+    
+
+    def get_unprocessed_record(self) -> Optional[Dict[str, Any]]:
+        """从数据库获取一条未处理的数据
+        先从 knowledge_content_query 表中选取 category_id = 0 的所有 query_word,
+        然后用这些 query_word 去 knowledge_search_content 表中匹配,
+        找出 recognition_status = 0 的一条开始处理
+        """
+        try:
+            # 第一步:获取 category_id = 0 的所有 query_word
+            query_sql = """
+            SELECT query_word 
+            FROM knowledge_content_query 
+            WHERE category_id = 0
+            """
+            
+            query_result = self.db.get_values(query_sql)
+            if not query_result:
+                self.logger.warning("未找到 category_id = 0 的 query_word")
+                return None
+            
+            query_words = [row[0] for row in query_result]
+            self.logger.info(f"找到 {len(query_words)} 个 category_id = 0 的 query_word")
+            
+            # 第二步:用这些 query_word 去匹配 knowledge_search_content 表
+            # 使用 IN 查询来匹配多个 query_word
+            if len(query_words) > 0:
+                # 构建带引号的查询条件,因为 query_word 是字符串
+                quoted_words = [f"'{word}'" for word in query_words]
+                placeholders = ','.join(quoted_words)
+                
+                content_sql = f"""
+                SELECT id, formatted_content
+                FROM knowledge_search_content 
+                WHERE recognition_status = 0
+                AND query_word IN ({placeholders})
+                LIMIT 1
+                """
+                
+                # 不需要传递参数,因为SQL已经包含了具体的值
+                result = self.db.get_values(content_sql)
+            else:
+                self.logger.warning("没有可用的 query_word 进行匹配")
+                return None
+            if result and len(result) > 0:
+                record = result[0]
+                # 检查返回的字段数量
+                return {
+                    'id': record[0],
+                    'formatted_content': record[1]
+                }
+            else:
+                self.logger.info("未找到匹配 query_word 且 recognition_status = 0 的记录")
+                return None
+                
+        except Exception as e:
+            self.logger.error(f"获取未处理记录失败: {e}")
+            return None
+    
+    def parse_formatted_content(self, formatted_content: str) -> Dict[str, Any]:
+        """解析 formatted_content JSON 字符串"""
+        try:
+            if isinstance(formatted_content, str):
+                return json.loads(formatted_content)
+            elif isinstance(formatted_content, dict):
+                return formatted_content
+            else:
+                raise ValueError(f"不支持的数据类型: {type(formatted_content)}")
+        except json.JSONDecodeError as e:
+            self.logger.error(f"解析 formatted_content JSON 失败: {e}")
+            raise
+    
+    def process_content_recognition(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]:
+        """处理内容识别,调用独立的识别模块"""
+        self.logger.info("开始内容识别处理...")
+        
+        # 图片识别
+        image_result = self.image_identifier.process_images(formatted_content)
+        
+        # 视频识别
+        video_result = self.video_identifier.process_videos(formatted_content)
+        
+
+        # 整合结果
+        recognition_result = {
+            'image_analysis': image_result,
+            'video_analysis': video_result
+        }
+
+        self.logger.info(f"识别结果: {recognition_result}")
+        
+        return recognition_result
+    
+    def update_multimodal_recognition(self, record_id: int, recognition_result: Dict[str, Any]) -> bool:
+        """更新数据库中的 multimodal_recognition 字段"""
+        try:
+            # 将结果转换为JSON字符串,并处理换行符问题
+            result_json = json.dumps(recognition_result, ensure_ascii=False)
+            # 将换行符替换为 \n 字符串,确保JSON可以被正确解析
+            result_json = result_json.replace('\n', '\\n').replace('\r', '\\r')
+            
+            # 构建更新SQL - 使用参数化查询避免换行符问题
+            sql = "UPDATE knowledge_search_content SET multimodal_recognition = %s, updated_at = NOW(), recognition_status = 2 WHERE id = %s"
+            params = (result_json, record_id)
+            
+            # 执行更新
+            result = self.db.update_values(sql, params)
+            if result is not None:
+                self.logger.info(f"已更新记录 {record_id} 的 multimodal_recognition 字段")
+                return True
+            else:
+                self.logger.error(f"更新记录 {record_id} 失败")
+                return False
+                
+        except Exception as e:
+            self.logger.error(f"更新数据库失败: {e}")
+            return False
+    
+    def process_single_record(self) -> bool:
+        """处理单条记录"""
+        try:
+            # 获取未处理的记录
+            record = self.get_unprocessed_record()
+            if not record:
+                self.logger.warning("没有找到未处理的记录")
+                return False
+            
+            self.logger.info(f"开始处理记录 ID: {record['id']}")
+            # self.logger.info(f"  多模态识别: {record['multimodal_recognition'][:300]}...")
+
+            # 先设置这条记录的 recognition_status = 1
+            self.db.update_values(f"UPDATE knowledge_search_content SET recognition_status = 1 WHERE id = {record['id']}")
+             
+            # 解析 formatted_content
+            formatted_content = self.parse_formatted_content(record['formatted_content'])
+            
+            # 提取基本信息,处理 null 值
+            title = formatted_content.get('title') or ''
+            content = formatted_content.get('body_text') or ''
+            channel = formatted_content.get('channel') or ''
+            images = formatted_content.get('image_url_list') or []
+            videos = formatted_content.get('video_url_list') or []
+            author = formatted_content.get('channel_account_name') or ''
+            like_count = formatted_content.get('like_count') or 0
+            collect_count = formatted_content.get('collect_count') or 0
+            comment_count = formatted_content.get('comment_count') or 0
+            view_count = formatted_content.get('view_count') or 0
+            publish_time = formatted_content.get('publish_time') or ''
+            update_timestamp = formatted_content.get('update_timestamp') or ''
+            content_link = formatted_content.get('content_link') or ''
+            content_id = formatted_content.get('channel_content_id') or ''
+            
+            # 调用内容识别处理
+            recognition_result = self.process_content_recognition(formatted_content)
+
+            # 判断识别是否成功:如果视频任一项的 asr_content 包含“视频上传失败”或者包含“ASR分析失败”,则标记失败
+            video_analysis = recognition_result.get('video_analysis', {})
+            video_upload_failed = False
+            if isinstance(video_analysis, list):
+                for video_item in video_analysis:
+                    if isinstance(video_item, dict) and 'asr_content' in video_item:
+                        if '视频上传失败' in (video_item.get('asr_content') or '') or 'ASR分析失败' in (video_item.get('asr_content') or ''):
+                            video_upload_failed = True
+                            break
+            elif isinstance(video_analysis, dict):
+                if 'asr_content' in video_analysis and ('视频上传失败' in (video_analysis.get('asr_content') or '') or 'ASR分析失败' in (video_analysis.get('asr_content') or '')):
+                    video_upload_failed = True
+
+            if video_upload_failed:
+                self.logger.info(f"记录 {record['id']} 识别失败,将 recognition_status 设置为 3")
+                self.db.update_values(f"UPDATE knowledge_search_content SET recognition_status = 3 WHERE id = {record['id']}")
+                return False
+
+            
+            # 构建完整的识别结果
+            complete_result = {
+                'id': record['id'],
+                'channel': channel,
+                'title': title,
+                'content': content,
+                'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []),
+                'videos': recognition_result.get('video_analysis', {}),
+                'meta': {
+                    'author': author,
+                    'like_count': like_count,
+                    'collect_count': collect_count,
+                    'comment_count': comment_count,
+                    'view_count': view_count,
+                    'publish_time': publish_time,
+                    'update_timestamp': update_timestamp,
+                    'content_link': content_link,
+                    'content_id': content_id,
+                }
+            }
+            
+            # 更新数据库
+            success = self.update_multimodal_recognition(record['id'], complete_result)
+            
+            if success:
+                self.logger.info(f"记录 {record['id']} 处理完成")
+                return True
+            else:
+                self.logger.error(f"记录 {record['id']} 处理失败")
+                return False
+                
+        except Exception as e:
+            self.logger.error(f"处理记录失败: {e}")
+            return False
+    
+    def process_all_records(self, max_records: int = 10):
+        """处理多条记录"""
+        self.logger.info(f"开始批量处理,最多处理 {max_records} 条记录")
+        
+        processed_count = 0
+        success_count = 0
+        
+        for i in range(max_records):
+            self.logger.info(f"\n--- 处理第 {i+1}/{max_records} 条记录 ---")
+            
+            if self.process_single_record():
+                success_count += 1
+            else:
+                self.logger.warning("没有更多记录需要处理,结束批量处理")
+                break
+            
+            processed_count += 1
+            
+            # 添加延迟避免API限制
+            time.sleep(2)
+        
+        self.logger.info(f"\n批量处理完成!总共处理 {processed_count} 条记录,成功 {success_count} 条")
+
+    def process_continuous(self, max_records: int = None, delay_seconds: int = 2):
+        """连续处理记录,直到没有更多记录或达到最大数量限制"""
+        self.logger.info("启动连续处理模式...")
+        self.logger.info("系统将自动处理数据库中的记录,一条完成后自动处理下一条")
+        self.logger.info(f"处理间隔: {delay_seconds} 秒")
+        if max_records:
+            self.logger.info(f"最大处理数量: {max_records} 条")
+        else:
+            self.logger.info("无数量限制,将处理所有可用记录")
+        self.logger.info("按 Ctrl+C 可以随时停止处理")
+        self.logger.info("-" * 60)
+        
+        processed_count = 0
+        success_count = 0
+        consecutive_failures = 0
+        max_consecutive_failures = 3  # 连续失败3次后停止
+        
+        try:
+            while True:
+                # 检查是否达到最大数量限制
+                if max_records and processed_count >= max_records:
+                    self.logger.info(f"\n已达到最大处理数量限制 ({max_records} 条),停止处理")
+                    break
+                
+                self.logger.info(f"\n--- 处理第 {processed_count + 1} 条记录 ---")
+                self.logger.info(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+                
+                # 处理单条记录
+                if self.process_single_record():
+                    success_count += 1
+                    consecutive_failures = 0  # 重置连续失败计数
+                    self.logger.info(f"✅ 记录处理成功 (成功: {success_count}, 失败: {processed_count - success_count + 1})")
+                else:
+                    consecutive_failures += 1
+                    self.logger.warning(f"❌ 记录处理失败 (成功: {success_count}, 失败: {processed_count - success_count + 1})")
+                    
+                    # 检查连续失败次数
+                    if consecutive_failures >= max_consecutive_failures:
+                        self.logger.warning(f"\n⚠️  连续失败 {max_consecutive_failures} 次,可能没有更多记录需要处理")
+                        self.logger.info("停止连续处理")
+                        break
+                
+                processed_count += 1
+                
+                # 检查是否还有更多记录
+                remaining_records = self.get_remaining_records_count()
+                if remaining_records == 0:
+                    self.logger.info(f"\n🎉 所有记录已处理完成!总共处理 {processed_count} 条记录")
+                    break
+                
+                self.logger.info(f"剩余待处理记录: {remaining_records} 条")
+                
+                # 添加延迟避免API限制
+                if delay_seconds > 0:
+                    self.logger.info(f"等待 {delay_seconds} 秒后处理下一条记录...")
+                    time.sleep(delay_seconds)
+                
+        except KeyboardInterrupt:
+            self.logger.info(f"\n\n⏹️  用户中断处理")
+            self.logger.info(f"已处理 {processed_count} 条记录,成功 {success_count} 条")
+        except Exception as e:
+            self.logger.error(f"\n\n💥 处理过程中发生错误: {e}")
+            self.logger.info(f"已处理 {processed_count} 条记录,成功 {success_count} 条")
+        
+        self.logger.info(f"\n📊 连续处理完成!")
+        self.logger.info(f"总处理数量: {processed_count}")
+        self.logger.info(f"成功数量: {success_count}")
+        self.logger.info(f"失败数量: {processed_count - success_count}")
+        if processed_count > 0:
+            success_rate = (success_count / processed_count) * 100
+            self.logger.info(f"成功率: {success_rate:.1f}%")
+
+    def get_remaining_records_count(self) -> int:
+        """获取剩余待处理记录数量"""
+        try:
+            sql = "SELECT COUNT(*) FROM knowledge_search_content WHERE recognition_status = 0"
+            result = self.db.get_values(sql)
+            if result and len(result) > 0:
+                return result[0][0]
+            return 0
+        except Exception as e:
+            self.logger.error(f"获取剩余记录数量失败: {e}")
+            return 0
+
+
+def main():
+    """主函数"""
+    parser = argparse.ArgumentParser(description='内容识别脚本 - 分析图片和视频内容')
+    parser.add_argument('--single', action='store_true', help='只处理一条记录')
+    parser.add_argument('--batch', type=int, default=10, help='批量处理记录数量,默认10条')
+    parser.add_argument('--continuous', action='store_true', help='连续处理模式,自动处理所有可用记录')
+    parser.add_argument('--max-records', type=int, help='连续处理模式下的最大处理数量限制')
+    parser.add_argument('--delay', type=int, default=2, help='处理间隔时间(秒),默认2秒')
+    
+    args = parser.parse_args()
+    
+    try:
+        # 创建ContentIdentifier实例
+        identifier = ContentIdentifier()
+        
+        if args.single:
+            # 处理单条记录
+            identifier.process_single_record()
+        elif args.continuous:
+            # 连续处理模式
+            identifier.process_continuous(args.max_records, args.delay)
+        else:
+            # 批量处理记录
+            identifier.process_all_records(args.batch)
+            
+    except Exception as e:
+        sys.stderr.write(f"程序执行失败: {e}\n")
+        sys.exit(1)
+
+
+if __name__ == '__main__':
+    main()

+ 586 - 0
indentify/video_identifier.py

@@ -0,0 +1,586 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+视频识别脚本
+主要功能:使用 Gemini API 从三个维度分析视频内容
+1. ASR (Automatic Speech Recognition) - 语音转文字
+2. OCR - 识别视频画面中的文字
+3. 关键帧提取与描述 - 提取视频关键帧并进行图像描述
+"""
+
+import os
+import json
+import time
+import sys
+import uuid
+import requests
+from typing import Dict, Any, List, Optional
+from dotenv import load_dotenv
+
+# 导入自定义模块
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+# 导入Google Generative AI
+import google.generativeai as genai
+from google.generativeai.types import HarmCategory, HarmBlockThreshold
+
+# 缓存目录配置
+CACHE_DIR = os.path.join(os.path.dirname(__file__), 'cache')
+# 缓存文件最大保留时间(秒)
+CACHE_MAX_AGE = 3600  # 1小时
+
+
+class VideoIdentifier:
+    def __init__(self):
+        # 加载环境变量
+        load_dotenv()
+        
+        # 获取API密钥
+        self.api_key = os.getenv('GEMINI_API_KEY')
+        if not self.api_key:
+            raise ValueError("未找到GEMINI_API_KEY环境变量")
+        
+        # 配置Gemini
+        genai.configure(api_key=self.api_key)
+        
+        # 初始化缓存清理时间
+        self.last_cache_cleanup = time.time()
+        
+        # 统一的系统提示词 - 三个维度分析
+        self.unified_system_prompt = """你是一个专业的视频内容分析专家。请从以下两个维度分析视频内容,并以JSON格式输出结果:
+
+1. ASR (Automatic Speech Recognition) - 语音转文字:
+   - 仅提取视频中的语音内容,转换为文字
+   - 保持原始语音的准确性和完整性
+   - 不要添加分析、解释或评论
+
+2. 关键帧提取与描述(包含OCR文字识别):
+   - 将视频按照画面场景变化分解为多个关键时间片段
+   - 对每个时间片段进行以下分析:
+     * 画面的主要视觉元素和内容, 20个字以内
+     * 画面中出现的所有文字内容(OCR识别),**注意忽略语音的字幕**
+   - 每个时间片段应包含:
+     * content: 画面内容的详细描述,15个字以内
+     * ocr_content: 该时间段画面中出现的文字内容,仅做画面内文字提取,不要提取字幕文字,不要做任何解释或总结
+
+请严格按照以下JSON格式输出,使用中文输出,不要添加任何其他文字:
+{
+    "asr_content": "提取的语音文字内容",
+    "iframe_details": [
+        {
+            "time_start": "开始时间(秒)",
+            "time_end": "结束时间(秒)",
+            "content": "该时间段画面内容的详细描述",
+            "ocr_content": "该时间段画面中出现的文字内容"
+        }
+    ]
+}"""
+    
+    def download_video(self, video_url: str) -> Optional[str]:
+        """下载视频到本地缓存"""
+        file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
+        try:
+            # 确保缓存目录存在
+            try:
+                os.makedirs(CACHE_DIR, exist_ok=True)
+            except Exception as e:
+                print(f'创建缓存目录失败: {e}')
+                return None
+            
+            # 尝试下载视频
+            for attempt in range(3):
+                try:
+                    response = requests.get(url=video_url, timeout=60)
+                    if response.status_code == 200:
+                        try:
+                            with open(file_path, 'wb') as f:
+                                f.write(response.content)
+                            print(f'视频下载成功: {video_url} -> {file_path}')
+                            return file_path
+                        except Exception as e:
+                            print(f'视频保存失败: {e}')
+                            # 保存失败时清理已创建的文件
+                            if os.path.exists(file_path):
+                                try:
+                                    os.remove(file_path)
+                                    print(f'已清理下载失败的文件: {file_path}')
+                                except:
+                                    pass
+                            return None
+                    else:
+                        print(f'视频下载失败,状态码: {response.status_code}')
+                        if attempt == 2:  # 最后一次尝试失败
+                            print(f'所有下载尝试都失败了')
+                            return None
+                except Exception as e:
+                    print(f'下载尝试 {attempt + 1} 失败: {e}')
+                    if attempt < 2:  # 不是最后一次尝试
+                        time.sleep(1)
+                        continue
+                    else:
+                        print(f'所有下载尝试都失败了')
+                        return None
+                        
+        except Exception as e:
+            print(f'下载过程异常: {e}')
+            return None
+        
+        return None
+    
+    def cleanup_cache(self):
+        """清理过期的缓存文件"""
+        try:
+            current_time = time.time()
+            # 每小时清理一次缓存
+            if current_time - self.last_cache_cleanup < 3600:
+                return
+            
+            if not os.path.exists(CACHE_DIR):
+                return
+            
+            cleaned_count = 0
+            for filename in os.listdir(CACHE_DIR):
+                file_path = os.path.join(CACHE_DIR, filename)
+                if os.path.isfile(file_path):
+                    file_age = current_time - os.path.getmtime(file_path)
+                    if file_age > CACHE_MAX_AGE:
+                        try:
+                            os.remove(file_path)
+                            cleaned_count += 1
+                        except Exception as e:
+                            print(f'清理缓存文件失败: {file_path}, 错误: {e}')
+            
+            if cleaned_count > 0:
+                print(f'已清理 {cleaned_count} 个过期缓存文件')
+            
+            self.last_cache_cleanup = current_time
+            
+        except Exception as e:
+            print(f'清理缓存失败: {e}')
+    
+    def upload_video_to_gemini(self, video_path: str) -> Optional[Any]:
+        """上传视频到Gemini进行分析"""
+        max_retries = 3
+        retry_delay = 5
+        
+        for attempt in range(max_retries):
+            try:
+                print(f"  开始上传视频到Gemini... (尝试 {attempt + 1}/{max_retries})")
+                print(f"    文件路径: {video_path}")
+                
+                # 1. 文件检查
+                if not os.path.exists(video_path):
+                    print(f"    错误: 文件不存在")
+                    return None
+                
+                file_size = os.path.getsize(video_path)
+                print(f"    文件大小: {file_size / (1024*1024):.2f} MB")
+                
+                if file_size == 0:
+                    print(f"    错误: 文件大小为0")
+                    return None
+                
+                # 2. 文件权限检查
+                try:
+                    with open(video_path, 'rb') as f:
+                        # 尝试读取文件开头,检查是否可读
+                        f.read(1024)
+                    print(f"    文件权限: 可读")
+                except Exception as e:
+                    print(f"    错误: 文件无法读取 - {e}")
+                    return None
+                
+                # 4. 尝试上传文件
+                print(f"    开始上传文件...")
+                try:
+                    video_file = genai.upload_file(path=video_path, mime_type='video/mp4')
+                    print(f"    文件上传请求已发送,文件ID: {video_file.name}")
+                except Exception as e:
+                    print(f"    错误: 文件上传请求失败 - {e}")
+                    print(f"    错误类型: {type(e).__name__}")
+                    print(f"    错误详情: {str(e)}")
+                    
+                    # 如果是网络相关错误,尝试重试
+                    if any(keyword in str(e).lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']):
+                        if attempt < max_retries - 1:
+                            print(f"    网络错误,等待 {retry_delay} 秒后重试...")
+                            time.sleep(retry_delay)
+                            retry_delay *= 2  # 指数退避
+                            continue
+                        else:
+                            print(f"    所有重试都失败了")
+                            return None
+                    else:
+                        # 非网络错误,直接返回
+                        return None
+                
+                # 5. 等待文件处理完成
+                print(f"    等待文件处理完成...")
+                max_wait_time = 120  # 最大等待2分钟
+                wait_count = 0
+                
+                while video_file.state.name == 'PROCESSING' and wait_count < max_wait_time:
+                    time.sleep(2)  # 每2秒检查一次
+                    wait_count += 2
+                    
+                    try:
+                        # 获取最新状态
+                        video_file = genai.get_file(name=video_file.name)
+                        current_state = video_file.state.name
+                        print(f"      状态: {current_state} ({wait_count}秒)")
+                        
+                        # 检查是否有错误状态
+                        if current_state in ['FAILED', 'ERROR', 'INVALID']:
+                            print(f"    错误: 文件处理失败,状态: {current_state}")
+                            if hasattr(video_file, 'error'):
+                                print(f"    错误详情: {video_file.error}")
+                            
+                            # 如果是处理失败,尝试重试
+                            if attempt < max_retries - 1:
+                                print(f"    文件处理失败,等待 {retry_delay} 秒后重试...")
+                                time.sleep(retry_delay)
+                                retry_delay *= 2
+                                break  # 跳出等待循环,进行重试
+                            else:
+                                return None
+                                
+                    except Exception as e:
+                        print(f"      警告: 获取文件状态失败 - {e}")
+                        if wait_count > 60:  # 超过1分钟后,尝试继续
+                            print(f"      继续等待...")
+                            continue
+                        else:
+                            print(f"    错误: 无法获取文件状态")
+                            return None
+                
+                # 6. 检查最终状态
+                if video_file.state.name == 'ACTIVE':
+                    print(f'    视频上传成功: {video_file.name}')
+                    print(f"    最终状态: {video_file.state.name}")
+                    return video_file
+                else:
+                    print(f'    错误: 视频文件上传失败')
+                    print(f"    最终状态: {video_file.state.name}")
+                    print(f"    等待时间: {wait_count}秒")
+                    
+                    # 尝试获取更多错误信息
+                    try:
+                        file_info = genai.get_file(name=video_file.name)
+                        print(f"    文件信息: {file_info}")
+                    except Exception as e:
+                        print(f"    无法获取文件详细信息: {e}")
+                    
+                    # 如果不是最后一次尝试,进行重试
+                    if attempt < max_retries - 1:
+                        print(f"    上传失败,等待 {retry_delay} 秒后重试...")
+                        time.sleep(retry_delay)
+                        retry_delay *= 2
+                        continue
+                    else:
+                        return None
+                        
+            except Exception as e:
+                error_type = type(e).__name__
+                error_msg = str(e)
+                
+                print(f'    错误: 视频上传到Gemini失败')
+                print(f"    错误类型: {error_type}")
+                print(f"    错误信息: {error_msg}")
+                
+                # 针对特定错误的处理建议
+                if "Broken pipe" in error_msg:
+                    print(f"    诊断: Broken pipe 错误通常表示:")
+                    print(f"      - 网络连接不稳定")
+                    print(f"      - 服务器连接中断")
+                    print(f"      - 防火墙或代理问题")
+                    print(f"    建议:")
+                    print(f"      - 检查网络连接")
+                    print(f"      - 尝试使用VPN或更换网络")
+                    print(f"      - 检查防火墙设置")
+                elif "Connection" in error_msg:
+                    print(f"    诊断: 连接错误")
+                    print(f"    建议: 检查网络连接和API密钥")
+                elif "Timeout" in error_msg:
+                    print(f"    诊断: 超时错误")
+                    print(f"    建议: 网络较慢,可以增加超时时间")
+                elif "Permission" in error_msg:
+                    print(f"    诊断: 权限错误")
+                    print(f"    建议: 检查API密钥和权限设置")
+                
+                # 如果是网络相关错误,尝试重试
+                if any(keyword in error_msg.lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']):
+                    if attempt < max_retries - 1:
+                        print(f"    网络错误,等待 {retry_delay} 秒后重试...")
+                        time.sleep(retry_delay)
+                        retry_delay *= 2
+                        continue
+                    else:
+                        print(f"    所有重试都失败了")
+                        return None
+                else:
+                    # 非网络错误,直接返回
+                    print(f"    非网络错误,不进行重试")
+                    return None
+        
+        return None
+    
+    def analyze_video_with_gemini(self, video_file: Any, video_info: Dict[str, Any]) -> Dict[str, Any]:
+        """使用Gemini API分析视频内容"""
+        
+        try:
+            # 创建Gemini模型
+            model = genai.GenerativeModel(
+                model_name='gemini-2.5-flash',
+                generation_config=genai.GenerationConfig(
+                    response_mime_type='application/json',
+                    temperature=0.3,
+                    max_output_tokens=40960
+                ),
+                safety_settings={
+                    HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
+                    HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
+                }
+            )
+            
+            # 生成内容
+            response = model.generate_content(
+                contents=[video_file, self.unified_system_prompt],
+                request_options={'timeout': 240}
+            )
+
+            print(f"response: {response.text}")
+            
+            # 检查错误
+            if hasattr(response, '_error') and response._error:
+                raise Exception(f"生成错误: {response._error}")
+            
+            # 解析JSON响应
+            try:
+                result = json.loads(response.text.strip())
+                print(f"[视频分析] 响应: {result}")
+                
+                if not isinstance(result, dict):
+                    raise ValueError("响应格式错误:非字典结构")
+                
+                # 确保包含所有必需字段
+                required_fields = ['asr_content', 'iframe_details']
+                for field in required_fields:
+                    if field not in result:
+                        if field == 'iframe_details':
+                            result[field] = [{
+                                'time_start': 0,
+                                'time_end': 0,
+                                'content': f'{field}分析失败',
+                                'ocr_content': f'{field}分析失败'
+                            }]
+                        else:
+                            result[field] = f"{field}分析失败"
+                
+                return result
+                
+            except json.JSONDecodeError as e:
+                print(f"JSON解析失败: {e}")
+                return {
+                    'asr_content': 'ASR分析失败:JSON解析错误',
+                    'iframe_details': [{
+                        'time_start': 0,
+                        'time_end': 0,
+                        'content': '关键帧分析失败:JSON解析错误',
+                        'ocr_content': '关键帧分析失败:JSON解析错误'
+                    }]
+                }
+                
+            else:
+                return {
+                    'asr_content': 'ASR分析失败:API无响应',
+                    'iframe_details': [{
+                        'time_start': 0,
+                        'time_end': 0,
+                        'content': '关键帧分析失败:API无响应',
+                        'ocr_content': '关键帧分析失败:API无响应'
+                    }]
+                }
+                
+        except Exception as e:
+            return {
+                'asr_content': f'ASR分析失败: {str(e)}',
+                'iframe_details': [{
+                    'time_start': 0,
+                    'time_end': 0,
+                    'content': f'关键帧分析失败: {str(e)}',
+                    'ocr_content': f'关键帧分析失败: {str(e)}'
+                }]
+            }
+    
+    def extract_video_urls(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
+        """提取视频URL列表"""
+        video_data = []
+        video_url_list = formatted_content.get('video_url_list', [])
+        
+        for video_item in video_url_list:
+            if isinstance(video_item, dict) and 'video_url' in video_item:
+                video_data.append({
+                    'url': video_item['video_url'],
+                    'duration': video_item.get('video_duration', 0)
+                })
+        
+        return video_data
+    
+    def process_video_single(self, video_info: Dict[str, Any]) -> Dict[str, Any]:
+        """处理单个视频的完整流程"""
+        print(f"开始处理视频: {video_info['url'][:50]}...")
+        
+        video_path = None
+        video_file = None
+        try:
+            # 1. 下载视频
+            print("  1. 下载视频...")
+            video_path = self.download_video(video_info['url'])
+            if not video_path:
+                print("  视频下载失败")
+                return {
+                    'url': video_info['url'],
+                    'duration': video_info['duration'],
+                    'asr_content': '视频下载失败',
+                    'iframe_details': [{
+                        'time_start': 0,
+                        'time_end': 0,
+                        'content': '视频下载失败',
+                        'ocr_content': '视频下载失败'
+                    }]
+                }
+            
+            # 2. 上传到Gemini
+            print("  2. 上传视频到Gemini...")
+            video_file = self.upload_video_to_gemini(video_path)
+            if not video_file:
+                print("  视频上传到Gemini失败")
+                # 上传失败时也要清理缓存文件
+                if video_path and os.path.exists(video_path):
+                    try:
+                        os.remove(video_path)
+                        print(f"  上传失败,缓存文件已清理: {video_path}")
+                    except Exception as e:
+                        print(f"  清理缓存文件失败: {e}")
+                
+                return {
+                    'url': video_info['url'],
+                    'duration': video_info['duration'],
+                    'asr_content': '视频上传失败',
+                    'iframe_details': [{
+                        'time_start': 0,
+                        'time_end': 0,
+                        'content': '视频上传失败',
+                        'ocr_content': '视频上传失败'
+                    }]
+                }
+            
+            # 3. 使用Gemini分析
+            print("  3. 使用Gemini分析视频内容...")
+            analysis_result = self.analyze_video_with_gemini(video_file, video_info)
+            
+            # 4. 组合结果
+            final_result = {
+                'url': video_info['url'],
+                'duration': video_info['duration'],
+                'asr_content': analysis_result.get('asr_content', 'ASR分析失败'),
+                'iframe_details': analysis_result.get('iframe_details', '关键帧分析失败'),
+            }
+            
+            print("  视频分析完成")
+            return final_result
+            
+        except Exception as e:
+            print(f"  视频处理异常: {e}")
+            # 异常情况下也要清理缓存文件
+            if video_path and os.path.exists(video_path):
+                try:
+                    os.remove(video_path)
+                    print(f"  异常处理,缓存文件已清理: {video_path}")
+                except Exception as e:
+                    print(f"  清理缓存文件失败: {e}")
+            
+            return {
+                'url': video_info['url'],
+                'duration': video_info['duration'],
+                'asr_content': f'处理异常: {str(e)}',
+                'iframe_details': [{
+                    'time_start': 0,
+                    'time_end': 0,
+                    'content': f'处理异常: {str(e)}',
+                    'ocr_content': f'处理异常: {str(e)}'
+                }],
+                'analysis_timestamp': int(time.time() * 1000)
+            }
+        finally:
+            # 清理临时文件
+            if video_path and os.path.exists(video_path):
+                try:
+                    os.remove(video_path)
+                    print(f"  临时文件已清理: {video_path}")
+                except Exception as e:
+                    print(f"  清理临时文件失败: {e}")
+            
+            # 清理Gemini文件
+            if video_file and hasattr(video_file, 'name'):
+                try:
+                    genai.delete_file(name=video_file.name)
+                    print(f"  Gemini文件已清理: {video_file.name}")
+                except Exception as e:
+                    print(f"  清理Gemini文件失败: {e}")
+    
+    def process_videos(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
+        """处理视频识别的主函数"""
+        print("开始视频识别处理...")
+        
+        # 定期清理缓存
+        self.cleanup_cache()
+        
+        # 提取视频URL
+        video_data = self.extract_video_urls(formatted_content)
+        print(f"提取到 {len(video_data)} 个视频")
+        
+        if not video_data:
+            print("没有视频需要分析")
+            return []
+        
+        # 逐个处理视频
+        results = []
+        for i, video_info in enumerate(video_data):
+            print(f"\n处理视频 {i+1}/{len(video_data)}")
+            result = self.process_video_single(video_info)
+            results.append(result)
+            
+            # 添加延迟避免API限制
+            if i < len(video_data) - 1:  # 不是最后一个视频
+                time.sleep(2)
+        
+        if results:
+            print(f"\n视频识别完成,共分析 {len(results)} 个视频")
+            print("分析维度:ASR、关键帧提取")
+        else:
+            print("视频识别失败")
+        
+        return results
+
+
+def main():
+    """测试函数"""
+    # 模拟数据
+    test_content = {
+        "video_url_list": [
+            {
+                "video_url": "http://rescdn.yishihui.com/pipeline/video/6ab92036-a166-491d-935e-eeeb7c0f2779.mp4",
+                "video_duration": 187
+            }
+        ]
+    }
+    
+    identifier = VideoIdentifier()
+    result = identifier.process_videos(test_content)
+    
+    print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
+
+
+if __name__ == '__main__':
+    main() 

+ 48 - 0
prompt/evaluate.md

@@ -0,0 +1,48 @@
+# Role: 内容策略分析师
+
+## Profile
+你是一位专业且细致的内容策略分析师。你的核心任务是模仿我的个人判断标准,评估一段内容(Content)在多大程度上满足了我的一个特定查询(Query)背后的真实意图。你的评估必须精准、客观,并严格遵循我为你设定的评分标准和工作流程。
+
+## Core Logic
+你评估的核心是“要素满足度”。你需要先将我的Query拆解成几个核心构成要素,然后判断Content是否同时满足了这些要素。
+
+## 评分标准 (Scoring Rubric): 0-100分制
+你必须严格按照以下标准进行打分:
+
+- **90-100分 (完美/精准命中):**
+  - 内容 **同时** 精准且深入地满足了Query中 **所有** 核心要素。
+  - 内容质量高,甚至提供了超出预期的价值。
+
+- **70-89分 (高度相关/基本命中):**
+  - 内容满足了Query中的 **所有** 核心要素,但在某一要素的 **深度或具体性** 上略有不足,或整体内容质量一般。
+
+- **40-69分 (中度相关/命中主干):**
+  - 内容只满足了Query中最主要的那个核心要素,但 **忽略或偏离** 了其他关键要素。
+
+- **10-39分 (轻度相关/仅擦边):**
+  - 内容只是与Query中的某个要素 **轻微相关**,但整体上没有抓住我的意图。
+
+- **0-9分 (完全无关):**
+  - 内容与Query中的任何核心要素都无关。
+
+## 工作流程 (Workflow)
+你必须严格遵循以下四个步骤来完成任务:
+
+1.  **分析Query:** 首先,在内心解析用户提供的`Query`,识别出其中包含的2-3个核心构成要素。
+2.  **评估Content:** 接着,仔细阅读`Content`,评估它对每一个核心要素的覆盖程度(是否提及、是否深入、是否准确)。
+3.  **打分与解释:** 根据`评分标准`,结合`Content`对各要素的满足情况,给出一个具体的分数,并撰写打分理由。理由必须清晰地指出`Content`满足了哪些要素、忽略了哪些要素。
+4.  **格式化输出:** 将你的分数和理由整理成一个严格的JSON格式输出。
+
+## 示例 (Example)
+这是一个你需要学习的范例:
+
+**输入:**
+- **Query:** "宠物类目的选题方法"
+- **Content:** "本文将介绍宠物狗的十大可爱瞬间,以及如何给它们拍摄有趣的照片,吸引社交媒体上的关注。我们将分享构图技巧和后期处理方法。"
+
+**理想输出:**
+```json
+{
+  "score": 45,
+  "reason": "内容命中了[宠物]这个核心要素,但完全忽略了[选题方法]这一关键要素,反而详细阐述了[图文创作],这与我的查询意图不符,因此给予中度相关评分。"
+}

+ 30 - 0
prompt/structure.md

@@ -0,0 +1,30 @@
+# 角色:专家级内容编辑
+
+## 个人档案
+你是一位专家级的内容编辑与结构分析师。你的核心能力是梳理和整合零散的信息片段(例如一段正文和多张图片中的笔记),将其合并成一篇单一、连贯、且结构清晰的文档。你一丝不苟、以事实为依据,并且非常擅长使用Markdown格式创建清晰易懂的指南或列表。
+
+## 核心目标
+将一个包含标题、正文、以及从多张图片中提取的文本数组的混乱JSON输入,转换成一篇结构完整、逻辑清晰、易于阅读的Markdown格式文章。
+
+## 约束条件
+- **严格忠于原文内容:** 你 **绝对不能** 添加任何未在输入JSON中提供的信息、观点或细节。你的任务是重组和格式化,而不是内容创作。
+- **禁止杜撰:** 不要虚构任何步骤、要求或功能。输出中的每一条信息都必须能追溯到原始输入。
+- **保留关键术语:** 对于专业术语、工具名称(如“美图秀秀”、“豆包”)和特定短语,必须保持原文用词。
+- **逻辑流程优先:** 最终的输出必须有一个清晰、合乎逻辑的进展。你需要识别内容的内在结构(无论是教程步骤还是对比列表),并以此为基础进行组织。
+- **智能合并与去噪:** 你需要将分散在不同图片中的同类信息(如“喵星人”的所有行为)合并到同一个标题下。同时,忽略无意义的文本(如“图中无文字内容”、“昨天18:32”等)。
+
+## 工作流程 (思考步骤)
+1.  **识别主标题:** 使用JSON中的 `title` 字段作为一级标题 (`#`)。
+2.  **处理引言:** 将 `body_text` 的内容作为文章的引言或开场白。如果内容不完整,忠实呈现原文即可。
+3.  **分析并整合核心内容(核心任务):**
+    -   通读 `images_comprehension` 数组中的所有文本,理解其整体内容结构。判断这是“步骤式教程”、“对比清单”还是其他类型。
+    -   识别出核心的类别或步骤标题(如“第一步”、“喵星人”、“汪星人”等)。
+    -   遍历所有输入,将所有相关的信息点(包括其详细描述)归类到相应的主标题之下。确保将分散在多处的内容合并到一起。
+    -   对于重复出现的主标题(如“屁股社交”),如果其描述性内容不同,则应作为独立条目保留,以确保信息的完整性。
+4.  **应用格式化:**
+    -   使用Markdown的二级标题 (`##`) 来组织主要类别或步骤。
+    -   使用项目符号(`-` 或 `*`)和粗体 (`**文字**`) 来构建清晰的列表和层次结构,突出重点。
+5.  **处理结尾和标签(如果存在):** 如果输入内容包含明确的结尾或 `#话题标签`,则将它们放在文档的末尾。
+
+## 输入
+用户将提供一个包含 `title`、`body_text` 和 `images_comprehension` 的JSON对象。

+ 99 - 0
requirements.txt

@@ -0,0 +1,99 @@
+# This file was autogenerated by uv via the following command:
+#    uv pip compile pyproject.toml -o requirements.txt
+annotated-types==0.7.0
+    # via pydantic
+anyio==4.10.0
+    # via
+    #   google-genai
+    #   httpx
+cachetools==5.5.2
+    # via google-auth
+certifi==2025.8.3
+    # via
+    #   httpcore
+    #   httpx
+    #   requests
+charset-normalizer==3.4.2
+    # via requests
+dotenv==0.9.9
+    # via knowledge (pyproject.toml)
+google-auth==2.40.3
+    # via google-genai
+google-genai==1.29.0
+    # via knowledge (pyproject.toml)
+grpcio==1.74.0
+    # via
+    #   knowledge (pyproject.toml)
+    #   grpcio-tools
+grpcio-tools==1.74.0
+    # via knowledge (pyproject.toml)
+h11==0.16.0
+    # via httpcore
+httpcore==1.0.9
+    # via httpx
+httpx==0.28.1
+    # via
+    #   google-genai
+    #   lark-oapi
+idna==3.10
+    # via
+    #   anyio
+    #   httpx
+    #   requests
+lark-oapi==1.4.20
+    # via knowledge (pyproject.toml)
+loguru==0.7.3
+    # via knowledge (pyproject.toml)
+protobuf==6.31.1
+    # via
+    #   knowledge (pyproject.toml)
+    #   grpcio-tools
+pyasn1==0.6.1
+    # via
+    #   pyasn1-modules
+    #   rsa
+pyasn1-modules==0.4.2
+    # via google-auth
+pycryptodome==3.23.0
+    # via lark-oapi
+pydantic==2.11.7
+    # via google-genai
+pydantic-core==2.33.2
+    # via pydantic
+python-dotenv==1.1.1
+    # via dotenv
+requests==2.32.4
+    # via
+    #   knowledge (pyproject.toml)
+    #   google-genai
+    #   lark-oapi
+    #   requests-toolbelt
+requests-toolbelt==1.0.0
+    # via lark-oapi
+rsa==4.9.1
+    # via google-auth
+setuptools==80.9.0
+    # via
+    #   knowledge (pyproject.toml)
+    #   grpcio-tools
+sniffio==1.3.1
+    # via anyio
+tenacity==9.1.2
+    # via google-genai
+typing-extensions==4.14.1
+    # via
+    #   anyio
+    #   google-genai
+    #   pydantic
+    #   pydantic-core
+    #   typing-inspection
+typing-inspection==0.4.1
+    # via pydantic
+urllib3==2.5.0
+    # via requests
+websockets==15.0.1
+    # via
+    #   google-genai
+    #   lark-oapi
+pymysql==1.0.2
+Pillow==10.4.0

+ 238 - 0
structure/structure_processor.py

@@ -0,0 +1,238 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+内容结构化处理模块
+主要功能:
+1. 从数据库中拉取需要结构化的数据
+2. 调用Gemini API进行内容结构化
+3. 将结构化结果更新到数据库
+"""
+
+import os
+import json
+import time
+import sys
+import threading
+from typing import Dict, Any, List, Optional, Tuple
+
+# 导入自定义模块
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from utils.mysql_db import MysqlHelper
+from gemini import GeminiProcessor
+from utils.file import File
+from utils.logging_config import get_logger
+
+
+class StructureProcessor:
+    def __init__(self):
+        # 设置日志
+        self.logger = get_logger('StructureProcessor')
+        
+        # 初始化处理器
+        self.processor = GeminiProcessor()
+        self.system_prompt = File.read_file('../prompt/structure.md')
+        self.logger.info("系统提示词加载完成")
+        self.logger.debug(f"系统提示词: {self.system_prompt}")
+        
+        # 线程控制
+        self.lock = threading.Lock()
+        self.stop_event = threading.Event()
+        self.threads = []
+    
+    def get_query_words(self) -> List[str]:
+        """从 knowledge_content_query 表中获取 category_id = 0 的所有 query_word"""
+        try:
+            sql = """
+            SELECT query_word 
+            FROM knowledge_content_query 
+            WHERE category_id = 0
+            """
+            
+            result = MysqlHelper.get_values(sql)
+            if result:
+                query_words = [row[0] for row in result]
+                self.logger.info(f"找到 {len(query_words)} 个 category_id = 0 的 query_word")
+                return query_words
+            else:
+                self.logger.warning("未找到 category_id = 0 的 query_word")
+                return []
+                
+        except Exception as e:
+            self.logger.error(f"获取 query_word 失败: {e}")
+            return []
+    
+    def process_single_record(self) -> bool:
+        """处理单条记录"""
+        try:
+            with self.lock:
+                # 第一步:获取 category_id = 0 的所有 query_word
+                query_words = self.get_query_words()
+                if not query_words:
+                    self.logger.warning("没有可用的 query_word")
+                    return False
+                
+                # 第二步:用这些 query_word 去匹配 knowledge_search_content 表
+                # 构建带引号的查询条件
+                quoted_words = [f"'{word}'" for word in query_words]
+                placeholders = ','.join(quoted_words)
+                
+                # 使用 FOR UPDATE 锁定记录,确保原子性操作
+                # 明确排除正在处理中和已处理的记录
+                select_sql = f"""
+                    SELECT id, multimodal_recognition 
+                    FROM knowledge_search_content 
+                    WHERE multimodal_recognition IS NOT NULL  
+                        AND structured_data IS NULL
+                        AND query_word IN ({placeholders})
+                    ORDER BY id ASC
+                    LIMIT 1
+                """
+                
+                self.logger.info(f"执行查询: {select_sql}")
+                
+                records = MysqlHelper.get_values(select_sql)
+                if not records:
+                    self.logger.warning("没有找到需要处理的记录")
+                    return False
+                
+                row = records[0]
+                self.logger.info(f"row: {row}")
+                record_id = row[0]
+                self.logger.info(f"record_id: {record_id}")
+                
+                # 立即标记为处理中,防止其他线程取到重复处理
+                mark_sql = """
+                    UPDATE knowledge_search_content 
+                    SET structured_data = 'PROCESSING' 
+                    WHERE id = %s
+                """
+                
+                mark_result = MysqlHelper.update_values(mark_sql, (record_id,))
+                if mark_result is None:
+                    self.logger.error(f"标记记录 {record_id} 为处理中失败")
+                    return False
+                
+                self.logger.info(f"记录 {record_id} 已标记为处理中")
+                
+                # 处理内容
+                result = self.processor.process(row[1], self.system_prompt)
+                self.logger.info(f"处理完成,结果长度: {len(str(result))}")
+                self.logger.debug(f"处理结果: {result}")
+                
+                # 更新数据库为实际结果
+                update_sql = """
+                    UPDATE knowledge_search_content 
+                    SET structured_data = %s 
+                    WHERE id = %s
+                """
+                
+                update_result = MysqlHelper.update_values(update_sql, (result, record_id))
+                if update_result is None:
+                    self.logger.error(f"更新记录 {record_id} 失败")
+                    return False
+                
+                self.logger.info(f"记录 {record_id} 处理完成并更新数据库")
+                return True
+                
+        except Exception as e:
+            self.logger.error(f"处理记录失败: {str(e)}", exc_info=True)
+            return False
+    
+    def worker_thread(self, thread_id: int):
+        """工作线程函数"""
+        thread_logger = get_logger(f'WorkerThread-{thread_id}')
+        thread_logger.info(f"线程 {thread_id} 启动")
+        
+        while not self.stop_event.is_set():
+            try:
+                # 尝试处理一条记录
+                success = self.process_single_record()
+                
+                if not success:
+                    thread_logger.info(f"没有找到需要处理的记录,等待5秒后重试")
+                    # 等待时也要检查停止信号
+                    if self.stop_event.wait(5):
+                        break
+                    continue
+                
+                # 处理成功后等待5秒再处理下一条
+                thread_logger.info(f"处理完成,等待5秒后处理下一条")
+                # 等待时也要检查停止信号
+                if self.stop_event.wait(5):
+                    break
+                
+            except Exception as e:
+                thread_logger.error(f"发生错误: {str(e)}", exc_info=True)
+                # 等待时也要检查停止信号
+                if self.stop_event.wait(5):
+                    break
+        
+        thread_logger.info(f"线程 {thread_id} 已停止")
+    
+    def start_multi_thread_processing(self):
+        """启动多线程处理"""
+        self.threads = []
+        
+        self.logger.info("启动多线程处理...")
+        self.logger.info("查询条件: multimodal_recognition is not null AND structured_data is null AND query_word IN (category_id = 0 的 query_word)")
+        
+        # 创建5个线程,间隔5秒启动
+        for i in range(5):
+            thread = threading.Thread(
+                target=self.worker_thread,
+                args=(i + 1,)
+            )
+            self.threads.append(thread)
+            
+            # 启动线程
+            thread.start()
+            self.logger.info(f"线程 {i + 1} 已启动")
+            
+            # 等待5秒后启动下一个线程
+            if i < 4:  # 最后一个线程不需要等待
+                self.logger.info("等待5秒后启动下一个线程...")
+                time.sleep(5)
+        
+        self.logger.info("所有线程已启动,使用 ./start_structure.sh stop 停止")
+        
+        try:
+            # 等待所有线程完成
+            for thread in self.threads:
+                thread.join()
+        except KeyboardInterrupt:
+            self.logger.info("收到停止信号,正在停止所有线程...")
+            self.stop_all_threads()
+    
+    def stop_all_threads(self):
+        """停止所有线程"""
+        self.logger.info("正在停止所有线程...")
+        self.stop_event.set()
+        
+        # 等待所有线程结束
+        for i, thread in enumerate(self.threads):
+            if thread.is_alive():
+                self.logger.info(f"等待线程 {i + 1} 结束...")
+                thread.join(timeout=10)  # 最多等待10秒
+                if thread.is_alive():
+                    self.logger.warning(f"线程 {i + 1} 未能正常结束")
+                else:
+                    self.logger.info(f"线程 {i + 1} 已正常结束")
+        
+        self.logger.info("所有线程已停止")
+
+
+def main():
+    """主函数"""
+    try:
+        processor = StructureProcessor()
+        processor.start_multi_thread_processing()
+    except Exception as e:
+        print(f"程序执行失败: {str(e)}")
+        sys.exit(1)
+
+
+if __name__ == "__main__":
+    # 测试单条记录处理
+    processor = StructureProcessor()
+    processor.process_single_record() 

+ 0 - 0
utils/__init__.py


+ 132 - 0
utils/container.py

@@ -0,0 +1,132 @@
+import logging
+import os
+from hashlib import sha256
+from typing import List, Tuple
+
+import grpc
+from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2
+
+from protos import container_pb2, container_pb2_grpc
+
+
+class Container(object):
+    _channel: grpc.Channel = None
+    _stub: container_pb2_grpc.ContainerServiceStub = None
+
+    def __new__(cls, *args, **kwargs):
+        if cls._channel is None:
+            cls._channel = grpc.insecure_channel(
+                target=f'{os.getenv("CONTAINER_GRPC_HOST")}:{os.getenv("CONTAINER_GRPC_PORT")}',
+                options=[
+                    ('grpc.keepalive_time_ms', 10000),
+                    ('grpc.keepalive_timeout_ms', 5000),
+                    ('grpc.keepalive_permit_without_calls', True),
+                    ('grpc.http2.max_pings_without_data', 0),
+                ],
+            )
+        cls._stub = container_pb2_grpc.ContainerServiceStub(channel=cls._channel)
+        return super().__new__(cls, *args, **kwargs)
+
+    def __init__(self):
+        self.container_id = None
+
+    # def close_channel(self):
+    #     """关闭通道"""
+    #     if self._channel is not None:
+    #         self._channel.close()
+
+    def start_container(self):
+        """启动一个容器,其最大运行时间为1小时,到期自动停止销毁"""
+        response = self._stub.StartContainer(request=google_dot_protobuf_dot_empty__pb2.Empty())
+        container_id = response.container_id
+        if not container_id:
+            raise RuntimeError('创建容器失败')
+        self.container_id = container_id
+
+    def stop_container(self) -> bool:
+        """关闭一个容器"""
+        if self.container_id:
+            request = container_pb2.StopContainerRequest(container_id=self.container_id)
+            response = self._stub.StopContainer(request=request)
+            return response.success
+        return True
+
+    def run_command(self, command: List[str], show_log: bool = False) -> Tuple[int, str, List[str]]:
+        """在容器内执行一条命令,可用的命令为: ffprobe | ffmpeg"""
+        exit_code, msg = -999, ''
+        request = container_pb2.RunCommandRequest(container_id=self.container_id, command=command)
+        for response in self._stub.RunCommand(request=request):
+            if show_log:
+                logging.info(response.msg)
+            msg += response.msg
+            if response.exit_code != -999:
+                exit_code = response.exit_code
+        return exit_code, msg, command
+
+    def file_exists(self, file_path: str) -> bool:
+        """判断容器内指定路径的文件是否存在"""
+        request = container_pb2.FileExistsRequest(container_id=self.container_id, path=file_path)
+        response = self._stub.FileExists(request=request)
+        return response.exists
+
+    def get_file(self, container_file_path: str, host_file_path: str) -> bool:
+        """从容器内获取文件"""
+        hasher, tmp, sha256sum, length = sha256(), dict(), None, 0
+        request = container_pb2.GetFileRequest(container_id=self.container_id, path=container_file_path)
+        with open(host_file_path, 'wb') as f:
+            for response in self._stub.GetFile(request=request):
+                if response.sha256sum:
+                    sha256sum = response.sha256sum
+                    continue
+                if response.payload:
+                    hasher.update(response.payload)
+                    f.seek(response.offset)
+                    f.write(response.payload)
+                    length += len(response.payload)
+        return hasher.hexdigest() == sha256sum
+
+    def put_file(self, host_file_path: str, container_file_path: str) -> bool:
+        """将宿主机上的文件复制到容器内"""
+        total_size = os.path.getsize(host_file_path)
+        hasher, chunk_size, offset = sha256(), 1024 * 1024, 0
+        with open(host_file_path, 'rb') as f:
+            while offset < total_size:
+                f.seek(offset)
+                chunk = f.read(min(chunk_size, total_size - offset))
+                if not chunk:
+                    break
+                hasher.update(chunk)
+                offset += len(chunk)
+        sha256sum = hasher.hexdigest()
+
+        def chunk_generator():
+            yield container_pb2.ReusableChunk(container_id=self.container_id, path=container_file_path, sha256sum=sha256sum)
+
+            _offset = 0
+            with open(host_file_path, 'rb') as _f:
+                while _offset < total_size:
+                    _f.seek(_offset)
+                    _chunk = _f.read(min(chunk_size, total_size - _offset))
+                    if not _chunk:
+                        break
+                    yield container_pb2.ReusableChunk(container_id=self.container_id, offset=_offset, payload=_chunk)
+                    _offset += len(_chunk)
+
+        response = self._stub.PutFile(chunk_generator())
+        return response.success
+
+    def download_oss(self, bucket_name: str, object_key: str) -> str:
+        """将OSS文件下载到容器"""
+        request = container_pb2.DownloadOssRequest(container_id=self.container_id, bucket_name=bucket_name, object_key=object_key)
+        response = self._stub.DownloadOss(request=request)
+        return response.save_path
+
+    def upload_oss(self, bucket_name: str, object_key: str, container_file_path: str, media_type: str):
+        """将容器内文件上传到OSS"""
+        request = container_pb2.UploadOssRequest(container_id=self.container_id,
+                                                 bucket_name=bucket_name,
+                                                 object_key=object_key,
+                                                 file_path=container_file_path,
+                                                 media_type=media_type)
+        response = self._stub.UploadOss(request=request)
+        return response.object_key

+ 174 - 0
utils/fei_shu.py

@@ -0,0 +1,174 @@
+import os
+import uuid
+from typing import Any, Dict, Optional
+
+import lark_oapi as lark
+from lark_oapi.core.enum import LogLevel
+import config
+
+
+class FeiShu:
+
+    def __init__(self, file_token:str = None):
+        app_id = os.getenv('FEISHU_APP_ID', config.FEISHU_APP_ID)
+        app_secret = os.getenv('FEISHU_APP_SECRET', config.FEISHU_APP_SECRET)
+        self.file_token = file_token if file_token is not None else os.getenv('FEISHU_FILE_TOKEN')
+        self.client = (lark.client.Client().builder()
+                       .app_id(app_id)
+                       .app_secret(app_secret)
+                       .log_level(LogLevel.INFO)
+                       .build())
+
+    def create_table(self, name: str) -> str:
+        request = (lark.bitable.v1.CreateAppTableRequest.builder()
+                   .app_token(self.file_token)
+                   .request_body(lark.bitable.v1.CreateAppTableRequestBody.builder()
+                                 .table(lark.bitable.v1.ReqTable.builder()
+                                        .name(name)
+                                        .fields([
+                                            lark.bitable.v1.AppTableCreateHeader.builder().field_name('原文链接').type(15).build(),
+                                            lark.bitable.v1.AppTableCreateHeader.builder().field_name('抓取结果').type(1).build(),
+                                            lark.bitable.v1.AppTableCreateHeader.builder().field_name('标题').type(1).build(),
+                                            lark.bitable.v1.AppTableCreateHeader.builder().field_name('用户链接').type(15).build(),
+                                            lark.bitable.v1.AppTableCreateHeader.builder().field_name('识别结果').type(1).build(),
+                                            lark.bitable.v1.AppTableCreateHeader.builder().field_name('初步理解').type(1).build(),
+                                            lark.bitable.v1.AppTableCreateHeader.builder().field_name('物理聚合').type(1).build(),
+                                        ])
+                                        .build())
+                                 .build())
+                   .build())
+        response = self.client.bitable.v1.app_table.create(request)
+        if not response.success():
+            raise RuntimeError(f'多维表格创建新数据表失败: {lark.JSON.marshal(response.error)}')
+        return response.data.table_id
+
+    def get_all_records(self, table_id: str, page_token: Optional[str] = None):
+        request = (lark.bitable.v1.SearchAppTableRecordRequest.builder()
+                   .app_token(self.file_token)
+                   .table_id(table_id)
+                   .user_id_type('open_id')
+                   .page_size(500)
+                   .request_body(lark.bitable.v1.SearchAppTableRecordRequestBody.builder()
+                                 .automatic_fields(True)
+                                 .build()))
+        if page_token:
+            request = request.page_token(page_token)
+        request = request.build()
+        response = self.client.bitable.v1.app_table_record.search(request)
+        if not response.success():
+            raise RuntimeError(f'获取多维表格记录失败: {lark.JSON.marshal(response.error)}')
+        return response.data
+
+    def get_record(self, table_id: str, *record_ids: str):
+        request = (lark.bitable.v1.BatchGetAppTableRecordRequest.builder()
+                   .app_token(self.file_token)
+                   .table_id(table_id)
+                   .request_body(lark.bitable.v1.BatchGetAppTableRecordRequestBody.builder()
+                                 .record_ids(list(record_ids))
+                                 .user_id_type('open_id')
+                                 .with_shared_url(True)
+                                 .automatic_fields(True)
+                                 .build())
+                   .build())
+        response = self.client.bitable.v1.app_table_record.batch_get(request)
+        if not response.success():
+            raise RuntimeError(f'获取多维表格指定记录失败: {lark.JSON.marshal(response.error)}')
+        return response.data
+
+    def create_record(self, table_id: str, *fields: Dict[str, Any]):
+        request = (lark.bitable.v1.BatchCreateAppTableRecordRequest.builder()
+                   .app_token(self.file_token)
+                   .table_id(table_id)
+                   .user_id_type('open_id')
+                   .client_token(str(uuid.uuid4()))
+                   .ignore_consistency_check(False)
+                   .request_body(lark.bitable.v1.BatchCreateAppTableRecordRequestBody.builder()
+                                 .records([lark.bitable.v1.AppTableRecord.builder()
+                                          .fields(item)
+                                          .build() for item in fields])
+                                 .build())
+                   .build())
+        response = self.client.bitable.v1.app_table_record.batch_create(request)
+        if not response.success():
+            raise RuntimeError(f'向多维表格添加记录失败: {lark.JSON.marshal(response.error)}')
+        return response.data
+
+    def update_record(self, table_id: str, *records: lark.bitable.v1.AppTableRecord):
+        request = (lark.bitable.v1.BatchUpdateAppTableRecordRequest.builder()
+                   .app_token(self.file_token)
+                   .table_id(table_id)
+                   .user_id_type('open_id')
+                   .ignore_consistency_check(False)
+                   .request_body(lark.bitable.v1.BatchUpdateAppTableRecordRequestBody.builder()
+                                 .records(list(records))
+                                 .build())
+                   .build())
+        response = self.client.bitable.v1.app_table_record.batch_update(request)
+        if not response.success():
+            raise RuntimeError(f'更新多维表格指定记录失败: {lark.JSON.marshal(response.error)}')
+        return response.data
+
+    def delete_record(self, table_id: str, *record_ids: str):
+        request = (lark.bitable.v1.BatchDeleteAppTableRecordRequest.builder()
+                   .app_token(self.file_token)
+                   .table_id(table_id)
+                   .request_body(lark.bitable.v1.BatchDeleteAppTableRecordRequestBody.builder()
+                                 .records(list(record_ids))
+                                 .build())
+                   .build())
+        response = self.client.bitable.v1.app_table_record.batch_delete(request)
+        if not response.success():
+            raise RuntimeError(f'删除多维表格指定记录失败: {lark.JSON.marshal(response.error)}')
+        return response.data
+
+
+if __name__ == '__main__':
+    from dotenv import load_dotenv
+
+    try:
+        load_dotenv()
+    except ImportError:
+        raise RuntimeError('导入环境变量失败')
+
+    feishu = FeiShu()
+    # 创建数据表
+    new_table_id = feishu.create_table('测试数据表')
+    # 新增记录
+    new_fields = [
+        {
+            '原文链接': {
+                'link': 'https://www.baidu.com',
+                'text': 'https://www.baidu.com',
+            },
+            '抓取结果': '这是抓取结果1',
+        },
+        {
+            '原文链接': {
+                'link': 'https://www.qq.com',
+                'text': 'https://www.qq.com',
+            },
+            '抓取结果': '这是抓取结果2',
+        }
+    ]
+    feishu.create_record(new_table_id, *new_fields)
+    # 获取全部记录
+    get_result = feishu.get_all_records(new_table_id)
+    has_more = get_result.has_more  # 是否有下一页
+    next_page_token = get_result.page_token  # 下一页token
+    new_record_ids = []
+    for record in get_result.items:
+        new_record_ids.append(record.record_id)
+        print(record.fields)
+    # 更新记录
+    new_record = (lark.bitable.v1.AppTableRecord.builder()
+                  .record_id(new_record_ids[0])
+                  .fields({'识别结果': '这是识别结果'})
+                  .build())
+    feishu.update_record(new_table_id, new_record)
+    # 获取指定ID记录
+    get_result = feishu.get_record(new_table_id, *new_record_ids)
+    for record in get_result.records:
+        new_record_ids.append(record.record_id)
+        print(record.fields)
+    # 删除指定ID记录
+    feishu.delete_record(new_table_id, new_record_ids[1])

+ 72 - 0
utils/file.py

@@ -0,0 +1,72 @@
+import os
+from typing import Optional
+
+class File:
+
+    @staticmethod
+    def read_file(file_path: str) -> Optional[str]:
+        """
+        读取文件内容
+        
+        参数:
+            file_path: 文件路径
+            
+        返回:
+            文件内容字符串,如果出错返回None
+        """
+        try:
+            with open(file_path, 'r', encoding='utf-8') as f:
+                return f.read()
+        except Exception as e:
+            print(f"读取文件 {file_path} 出错: {str(e)}")
+            return None
+
+    @staticmethod
+    def write_file(file_path: str, content: str) -> bool:
+        """
+        写入内容到文件
+        
+        参数:
+            file_path: 文件路径
+            content: 要写入的内容
+            
+        返回:
+            是否成功写入
+        """
+        try:
+            with open(file_path, 'w', encoding='utf-8') as f:
+                f.write(content)
+            return True
+        except Exception as e:
+            print(f"写入文件 {file_path} 出错: {str(e)}")
+            return False
+
+    @staticmethod
+    def ensure_directory_exists(dir_path: str) -> bool:
+        """
+        确保目录存在,如果不存在则创建
+        
+        参数:
+            dir_path: 目录路径
+            
+        返回:
+            是否成功确保目录存在
+        """
+        try:
+            os.makedirs(dir_path, exist_ok=True)
+            return True
+        except Exception as e:
+            print(f"创建目录 {dir_path} 出错: {str(e)}")
+            return False
+
+
+# 使用示例
+if __name__ == "__main__":
+    # 测试load_system_prompt
+    test_content = "这是一个测试文件内容"
+    File.write_file("test.txt", test_content)
+    read_content = File.read_file("test.txt")
+    print(f"读取的文件内容: {read_content}")
+    
+    # 测试ensure_directory_exists
+    File.ensure_directory_exists("test_dir")

+ 72 - 0
utils/logging_config.py

@@ -0,0 +1,72 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+公共日志配置文件
+提供统一的日志配置,支持文件和控制台输出
+可以被多个模块共享使用
+"""
+
+import os
+import logging
+from datetime import datetime
+
+def setup_logging(log_name, log_dir="logs", level=logging.INFO, 
+                  console_output=True, file_output=True):
+    """
+    设置日志配置
+    
+    Args:
+        log_name: 日志器名称
+        log_dir: 日志文件目录
+        level: 日志级别
+        console_output: 是否输出到控制台
+        file_output: 是否输出到文件
+    """
+    # 创建logs目录
+    if file_output and not os.path.exists(log_dir):
+        os.makedirs(log_dir)
+    
+    # 创建logger
+    logger = logging.getLogger(log_name)
+    logger.setLevel(level)
+    
+    # 清除已有的handlers,避免重复
+    logger.handlers.clear()
+    
+    # 日志格式
+    formatter = logging.Formatter(
+        '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+        datefmt='%Y-%m-%d %H:%M:%S'
+    )
+    
+    # 文件输出
+    if file_output:
+        # 生成日志文件名(包含日期)
+        log_filename = f"{log_dir}/{log_name}_{datetime.now().strftime('%Y%m%d')}.log"
+        file_handler = logging.FileHandler(log_filename, encoding='utf-8')
+        file_handler.setLevel(level)
+        file_handler.setFormatter(formatter)
+        logger.addHandler(file_handler)
+    
+    # 控制台输出
+    if console_output:
+        console_handler = logging.StreamHandler()
+        console_handler.setLevel(level)
+        console_handler.setFormatter(formatter)
+        logger.addHandler(console_handler)
+    
+    return logger
+
+def get_logger(log_name, log_dir="logs", level=logging.INFO):
+    """
+    获取配置好的logger
+    
+    Args:
+        log_name: 日志器名称
+        log_dir: 日志文件目录
+        level: 日志级别
+    
+    Returns:
+        配置好的logger实例
+    """
+    return setup_logging(log_name, log_dir, level) 

+ 99 - 0
utils/mysql_db.py

@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+"""
+数据库连接及操作
+"""
+import pymysql
+from loguru import logger
+
+class MysqlHelper:
+    @classmethod
+    def connect_mysql(cls):
+        # 创建一个 Connection 对象,代表了一个数据库连接
+        connection = pymysql.connect(
+            host="rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com",  # 数据库IP地址,内网地址
+            port=3306,  # 端口号
+            user="wqsd",  # mysql用户名
+            passwd="wqsd@2025",  # mysql用户登录密码
+            db="ai_knowledge",  # 数据库名
+            # 如果数据库里面的文本是utf8编码的,charset指定是utf8
+            charset="utf8",
+            # 超时设置
+            connect_timeout=30,      # 连接超时时间(秒)
+            read_timeout=30,         # 读取超时时间(秒)
+            write_timeout=30        # 写入超时时间(秒)
+        )
+        return connection
+
+    @classmethod
+    def get_values(cls, sql, params=None):
+        try:
+            # 连接数据库
+            connect = cls.connect_mysql()
+            # 返回一个 Cursor对象
+            mysql = connect.cursor()
+
+            if params:
+                # 如果传递了 params 参数
+                mysql.execute(sql, params)
+            else:
+                # 如果没有传递 params 参数
+                mysql.execute(sql)
+            # fetchall方法返回的是一个元组,里面每个元素也是元组,代表一行记录
+            data = mysql.fetchall()
+
+            # 关闭数据库连接
+            connect.close()
+
+            # 返回查询结果,元组
+            return data
+        except Exception as e:
+            print(f"get_values异常:{e}\n")
+
+    @classmethod
+    def update_values(cls, sql, params=None):
+        """
+        执行更新操作(INSERT/UPDATE/DELETE)
+        
+        参数:
+            sql: 要执行的SQL语句
+            params: SQL参数(可选,元组或字典)
+        
+        返回:
+            成功时返回影响的行数,失败返回None
+        """
+        connect = None
+        cursor = None
+        
+        try:
+            connect = cls.connect_mysql()
+            cursor = connect.cursor()
+            
+            # 执行SQL语句
+            if params:
+                affected_rows = cursor.execute(sql, params)
+            else:
+                affected_rows = cursor.execute(sql)
+            
+            connect.commit()
+            return affected_rows
+            
+        except Exception as e:
+            logger.error(f"SQL执行失败: {e}")
+            logger.error(f"SQL语句: {sql}")
+            if params:
+                logger.error(f"参数: {params}")
+            
+            if connect:
+                connect.rollback()
+            return None
+            
+        finally:
+            # 确保资源关闭
+            if cursor:
+                cursor.close()
+            if connect:
+                connect.close()
+
+
+
+