ソースを参照

删除无用的脚本

jihuaqiang 4 日 前
コミット
420b9bb807

+ 0 - 1
.gitignore

@@ -7,7 +7,6 @@ examples/html/
 examples/output_*.json
 examples/*.zip
 examples/videos/
-src/*/__pycache__/
 
 # C extensions
 *.so

+ 0 - 580
examples/fetch.py

@@ -1,580 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-"""
-Xiaohongshu Blogger Historical Posts Fetcher
-
-Features:
-1. Fetch blogger's historical posts (posts command)
-   - Input Xiaohongshu author ID
-   - Call API to get blogger's homepage info and historical posts
-   - Support pagination
-   - Output to author directory's historical posts folder
-   - Filename is post ID with .json extension
-   - Auto fetch post details when body_text > threshold (default: 90 chars)
-   - Update original post file's body_text with complete version from detail API
-
-2. Fetch single post detail (detail command)
-   - Input Xiaohongshu note ID
-   - Call API to get post detail
-   - Save to specified output file
-
-Usage:
-  # Fetch historical posts (with auto body_text update)
-  python fetch.py posts <account_id> [-o output_dir] [-m max_pages] [-d delay]
-
-  # Fetch historical posts without auto detail fetching
-  python fetch.py posts <account_id> --no-auto-detail
-
-  # Fetch historical posts with custom threshold (e.g., 120 chars)
-  python fetch.py posts <account_id> --detail-threshold 120
-
-  # Fetch single post detail
-  python fetch.py detail <note_id> [-o output_file]
-"""
-
-import requests
-import json
-import os
-import time
-from typing import Optional, Dict, List
-
-
-class XHSBloggerFetcher:
-    """Xiaohongshu Blogger Historical Posts Fetcher"""
-
-    API_URL = "http://47.84.182.56:8001/tools/call/get_xhs_blogger_historical_posts"
-    DETAIL_API_URL = "http://47.84.182.56:8001/tools/call/get_xhs_detail_by_note_id"
-
-    def __init__(self, account_id: str, output_dir: Optional[str] = None,
-                 auto_fetch_detail: bool = True, detail_threshold: int = 90):
-        """
-        Initialize fetcher
-
-        Args:
-            account_id: Xiaohongshu blogger's ID
-            output_dir: Output directory path, defaults to current dir + account_name
-            auto_fetch_detail: Whether to auto fetch detail for posts with body_text > threshold
-            detail_threshold: Minimum character count in body_text to trigger detail fetch (default: 90)
-        """
-        self.account_id = account_id
-        self.account_name = None  # Will be set from first post's channel_account_name
-        self.output_dir = output_dir
-        self.posts_dir = None
-        self.first_post_saved = False  # Track if first post has been copied
-        self.auto_fetch_detail = auto_fetch_detail
-        self.detail_threshold = detail_threshold
-        # Statistics
-        self.detail_fetch_count = 0  # Successfully fetched and updated details
-        self.detail_skip_count = 0   # Skipped due to low character count
-        self.detail_error_count = 0  # Failed to fetch details
-
-    def fetch_posts(self, cursor: Optional[str] = None) -> Dict:
-        """
-        Fetch blogger's historical posts
-
-        Args:
-            cursor: Pagination cursor, not passed for first request
-
-        Returns:
-            API response data
-        """
-        payload = {
-            "account_id": self.account_id
-        }
-
-        if cursor:
-            payload["cursor"] = cursor
-
-        try:
-            response = requests.post(
-                self.API_URL,
-                json=payload,
-                headers={"Content-Type": "application/json"},
-                timeout=30
-            )
-            response.raise_for_status()
-            return response.json()
-        except requests.exceptions.RequestException as e:
-            print(f"API request failed: {e}")
-            return {}
-
-    def fetch_post_detail(self, note_id: str) -> Dict:
-        """
-        Fetch single post detail by note ID
-
-        Args:
-            note_id: Xiaohongshu note ID
-
-        Returns:
-            API response data containing post details
-        """
-        payload = {
-            "note_id": note_id
-        }
-
-        try:
-            response = requests.post(
-                self.DETAIL_API_URL,
-                json=payload,
-                headers={"Content-Type": "application/json"},
-                timeout=30
-            )
-            response.raise_for_status()
-            return response.json()
-        except requests.exceptions.RequestException as e:
-            print(f"API request failed: {e}")
-            return {}
-
-    def _convert_post_format(self, post: Dict) -> Dict:
-        """
-        Convert API post format to standard format
-
-        Args:
-            post: Original post data from API
-
-        Returns:
-            Converted post data in standard format
-        """
-        from datetime import datetime
-
-        # Extract image URLs from image_url_list
-        images = []
-        for img in post.get("image_url_list", []):
-            if isinstance(img, dict):
-                images.append(img.get("image_url", ""))
-            elif isinstance(img, str):
-                images.append(img)
-
-        # Extract video URL from video_url_list
-        video_list = post.get("video_url_list", [])
-        video = ""
-        if video_list:
-            if isinstance(video_list[0], dict):
-                video = video_list[0].get("video_url", "")
-            elif isinstance(video_list[0], str):
-                video = video_list[0]
-
-        # Convert timestamp to datetime string
-        publish_timestamp = post.get("publish_timestamp", 0)
-        publish_time = ""
-        if publish_timestamp:
-            try:
-                dt = datetime.fromtimestamp(publish_timestamp / 1000)
-                publish_time = dt.strftime("%Y-%m-%d %H:%M:%S")
-            except:
-                publish_time = ""
-
-        # Build standard format
-        converted = {
-            "channel_content_id": post.get("channel_content_id"),
-            "link": post.get("content_link", ""),
-            "comment_count": post.get("comment_count", 0),
-            "images": images,
-            "like_count": post.get("like_count", 0),
-            "body_text": post.get("body_text", ""),
-            "title": post.get("title", ""),
-            "collect_count": post.get("collect_count", 0),
-            "channel_account_id": post.get("channel_account_id", ""),
-            "channel_account_name": post.get("channel_account_name", ""),
-            "content_type": post.get("content_type", "note"),
-            "video": video,
-            "publish_timestamp": publish_timestamp,
-            "publish_time": publish_time
-        }
-
-        return converted
-
-    def _initialize_directories(self, account_name: str):
-        """
-        Initialize output directories using account name
-
-        Args:
-            account_name: Account name from first post
-        """
-        if self.posts_dir is not None:
-            return  # Already initialized
-
-        self.account_name = account_name
-
-        # Use provided output_dir or default to current dir + account_name
-        if self.output_dir is None:
-            self.output_dir = os.path.join(os.getcwd(), account_name)
-
-        self.posts_dir = os.path.join(self.output_dir, "作者历史帖子")
-
-        # Ensure output directory exists
-        os.makedirs(self.posts_dir, exist_ok=True)
-
-    def _count_body_text_chars(self, body_text: str) -> int:
-        """
-        Count characters in body_text
-
-        Args:
-            body_text: The post body text
-
-        Returns:
-            Total number of characters (including all whitespace, emoji, etc.)
-        """
-        if not body_text:
-            return 0
-        # Return the total length of the string
-        return len(body_text)
-
-    def save_post(self, post: Dict, is_first_post: bool = False) -> bool:
-        """
-        Save single post to JSON file
-
-        Args:
-            post: Post data
-            is_first_post: Whether this is the first post (will be copied to parent directory)
-
-        Returns:
-            Whether save was successful
-        """
-        # Initialize directories on first post using account name
-        if self.posts_dir is None:
-            account_name = post.get("channel_account_name")
-            if not account_name:
-                print(f"Warning: Post data missing channel_account_name field, using account_id instead")
-                account_name = self.account_id
-            self._initialize_directories(account_name)
-
-        # Get post ID as filename
-        post_id = post.get("channel_content_id")
-        if not post_id:
-            print(f"Warning: Post data missing channel_content_id field, skipping")
-            return False
-
-        file_path = os.path.join(self.posts_dir, f"{post_id}.json")
-
-        try:
-            # Convert to standard format before saving
-            converted_post = self._convert_post_format(post)
-
-            with open(file_path, 'w', encoding='utf-8') as f:
-                json.dump(converted_post, f, ensure_ascii=False, indent=2)
-            print(f"✓ Saved post: {post_id}.json")
-
-            # If this is the first post, copy it to parent directory as "待解构帖子.json"
-            if is_first_post and not self.first_post_saved:
-                target_path = os.path.join(self.output_dir, "待解构帖子.json")
-                with open(target_path, 'w', encoding='utf-8') as f:
-                    json.dump(converted_post, f, ensure_ascii=False, indent=2)
-                print(f"✓ Copied first post to: 待解构帖子.json")
-                self.first_post_saved = True
-
-            # Auto fetch post detail if body_text exceeds threshold
-            if self.auto_fetch_detail:
-                body_text = converted_post.get("body_text", "")
-                char_count = self._count_body_text_chars(body_text)
-
-                if char_count > self.detail_threshold:
-                    print(f"  → Body text has {char_count} chars (> {self.detail_threshold}), fetching detail...")
-
-                    # Fetch detail
-                    response_data = self.fetch_post_detail(post_id)
-
-                    if response_data and response_data.get('success'):
-                        try:
-                            # Parse the result field (it's a JSON string containing a list)
-                            result_str = response_data.get('result', '[]')
-                            result_list = json.loads(result_str)
-
-                            # The result is a list with one item, which contains 'data' field
-                            if isinstance(result_list, list) and len(result_list) > 0:
-                                detail_data = result_list[0].get('data', {})
-
-                                if detail_data and 'body_text' in detail_data:
-                                    # Update the original post file with detailed body_text
-                                    detail_body_text = detail_data.get('body_text', '')
-                                    if detail_body_text and detail_body_text != body_text:
-                                        converted_post['body_text'] = detail_body_text
-                                        # Re-save the post file with updated body_text
-                                        with open(file_path, 'w', encoding='utf-8') as f:
-                                            json.dump(converted_post, f, ensure_ascii=False, indent=2)
-                                        print(f"  ✓ Updated body_text with complete version from detail API")
-                                    else:
-                                        print(f"  → Body text already complete, no update needed")
-
-                                    self.detail_fetch_count += 1
-                                else:
-                                    print(f"  ✗ No valid data in detail response")
-                                    self.detail_error_count += 1
-                            else:
-                                print(f"  ✗ Unexpected detail response format")
-                                self.detail_error_count += 1
-                        except Exception as e:
-                            print(f"  ✗ Failed to parse/update detail: {e}")
-                            self.detail_error_count += 1
-                    else:
-                        print(f"  ✗ Failed to fetch detail")
-                        self.detail_error_count += 1
-                else:
-                    print(f"  → Body text has {char_count} chars (<= {self.detail_threshold}), skipping detail fetch")
-                    self.detail_skip_count += 1
-
-            return True
-        except Exception as e:
-            print(f"✗ Failed to save post {post_id}: {e}")
-            return False
-
-    def fetch_all_posts(self, max_pages: Optional[int] = None, delay: float = 1.0) -> int:
-        """
-        Fetch all historical posts (with pagination support)
-
-        Args:
-            max_pages: Maximum pages to fetch, None means fetch all
-            delay: Delay between requests in seconds
-
-        Returns:
-            Number of successfully saved posts
-        """
-        cursor = None
-        page = 0
-        total_saved = 0
-
-        print(f"Starting to fetch historical posts for blogger: {self.account_id}")
-        print("-" * 60)
-
-        while True:
-            page += 1
-            print(f"\nPage {page}:")
-
-            # Fetch data
-            response_data = self.fetch_posts(cursor)
-
-            if not response_data:
-                print("Failed to fetch data, stopping")
-                break
-
-            # Extract posts list (adjust based on actual API response structure)
-            posts = self._extract_posts(response_data)
-
-            if not posts:
-                print("No more posts, finished")
-                break
-
-            # Save posts
-            print(f"Got {len(posts)} posts on this page")
-            for idx, post in enumerate(posts):
-                # Mark the first post overall (page 1, first post)
-                is_first = (page == 1 and idx == 0)
-                if self.save_post(post, is_first_post=is_first):
-                    total_saved += 1
-                    # Print output directory info after first post is saved
-                    if is_first:
-                        print(f"Output directory: {self.posts_dir}")
-
-            # Check if there's a next page
-            cursor = self._get_next_cursor(response_data)
-            if not cursor:
-                print("\nAll posts fetched")
-                break
-
-            # Check if max pages limit reached
-            if max_pages and page >= max_pages:
-                print(f"\nReached max pages limit ({max_pages} pages)")
-                break
-
-            # Delay to avoid too frequent requests
-            if delay > 0:
-                time.sleep(delay)
-
-        print("-" * 60)
-        print(f"✓ Done! Saved {total_saved} posts to: {self.posts_dir}")
-
-        # Print detail fetch statistics if auto_fetch_detail is enabled
-        if self.auto_fetch_detail:
-            print(f"\nBody Text Update Statistics:")
-            print(f"  ✓ Successfully updated: {self.detail_fetch_count}")
-            print(f"  → Skipped (text <= {self.detail_threshold} chars): {self.detail_skip_count}")
-            print(f"  ✗ Failed: {self.detail_error_count}")
-
-        return total_saved
-
-    def _extract_posts(self, response_data: Dict) -> List[Dict]:
-        """
-        Extract posts list from API response
-
-        Args:
-            response_data: API response data
-
-        Returns:
-            List of posts
-        """
-        try:
-            # Check if API call was successful
-            if not response_data.get('success'):
-                print(f"API call failed: {response_data}")
-                return []
-
-            # Parse the result field (it's a JSON string)
-            result_str = response_data.get('result', '{}')
-            result = json.loads(result_str)
-
-            # Check response code
-            if result.get('code') != 0:
-                print(f"API returned error code: {result.get('code')}, message: {result.get('msg')}")
-                return []
-
-            # Extract posts from result.data.data
-            data = result.get('data', {})
-            posts = data.get('data', [])
-
-            return posts if isinstance(posts, list) else []
-
-        except json.JSONDecodeError as e:
-            print(f"Failed to parse result JSON: {e}")
-            return []
-        except Exception as e:
-            print(f"Error extracting posts: {e}")
-            return []
-
-    def _get_next_cursor(self, response_data: Dict) -> Optional[str]:
-        """
-        Extract next page cursor from API response
-
-        Args:
-            response_data: API response data
-
-        Returns:
-            Next page cursor, or None if no more pages
-        """
-        try:
-            # Check if API call was successful
-            if not response_data.get('success'):
-                return None
-
-            # Parse the result field (it's a JSON string)
-            result_str = response_data.get('result', '{}')
-            result = json.loads(result_str)
-
-            # Extract cursor from result.data.next_cursor and check has_more
-            data = result.get('data', {})
-            has_more = data.get('has_more', False)
-            next_cursor = data.get('next_cursor')
-
-            # Only return cursor if there are more pages
-            if has_more and next_cursor:
-                return str(next_cursor)
-
-            return None
-
-        except Exception as e:
-            print(f"Error extracting next cursor: {e}")
-            return None
-
-    def fetch_and_save_post_detail(self, note_id: str, output_path: Optional[str] = None) -> bool:
-        """
-        Fetch post detail by note_id and save to file
-
-        Args:
-            note_id: Xiaohongshu note ID
-            output_path: Optional output file path, defaults to {note_id}_detail.json
-
-        Returns:
-            Whether fetch and save was successful
-        """
-        print(f"Fetching post detail for note_id: {note_id}")
-        print("-" * 60)
-
-        # Fetch post detail
-        response_data = self.fetch_post_detail(note_id)
-
-        if not response_data:
-            print("✗ Failed to fetch post detail")
-            return False
-
-        try:
-            # Check if API call was successful
-            if not response_data.get('success'):
-                print(f"✗ API call failed: {response_data}")
-                return False
-
-            # Parse the result field (it's a JSON string)
-            result_str = response_data.get('result', '{}')
-            result = json.loads(result_str)
-
-            # Check response code
-            if result.get('code') != 0:
-                print(f"✗ API returned error code: {result.get('code')}, message: {result.get('msg')}")
-                return False
-
-            # Extract post data from result.data
-            post_data = result.get('data', {})
-
-            if not post_data:
-                print("✗ No post data in response")
-                return False
-
-            # Determine output path
-            if output_path is None:
-                output_path = f"{note_id}_detail.json"
-
-            # Save to file
-            with open(output_path, 'w', encoding='utf-8') as f:
-                json.dump(post_data, f, ensure_ascii=False, indent=2)
-
-            print(f"✓ Post detail saved to: {output_path}")
-            print("-" * 60)
-            return True
-
-        except json.JSONDecodeError as e:
-            print(f"✗ Failed to parse result JSON: {e}")
-            return False
-        except Exception as e:
-            print(f"✗ Error saving post detail: {e}")
-            return False
-
-
-def main():
-    """Main function"""
-    import argparse
-
-    parser = argparse.ArgumentParser(description='Fetch Xiaohongshu blogger historical posts or single post detail')
-
-    # Create subparsers for different commands
-    subparsers = parser.add_subparsers(dest='command', help='Command to execute')
-
-    # Subparser for fetching historical posts
-    posts_parser = subparsers.add_parser('posts', help='Fetch blogger historical posts')
-    posts_parser.add_argument('account_id', help='Xiaohongshu blogger ID')
-    posts_parser.add_argument('-o', '--output', help='Output directory path (default: current dir + account_name)')
-    posts_parser.add_argument('-m', '--max-pages', type=int, help='Maximum pages to fetch (default: fetch all)')
-    posts_parser.add_argument('-d', '--delay', type=float, default=1.0, help='Request delay in seconds (default: 1.0)')
-    posts_parser.add_argument('--no-auto-detail', action='store_true', help='Disable auto fetching post details')
-    posts_parser.add_argument('--detail-threshold', type=int, default=90,
-                             help='Minimum character count in body_text to trigger detail fetch (default: 90)')
-
-    # Subparser for fetching single post detail
-    detail_parser = subparsers.add_parser('detail', help='Fetch single post detail by note_id')
-    detail_parser.add_argument('note_id', help='Xiaohongshu note ID')
-    detail_parser.add_argument('-o', '--output', help='Output file path (default: {note_id}_detail.json)')
-
-    args = parser.parse_args()
-
-    # If no command specified, show help
-    if not args.command:
-        parser.print_help()
-        return
-
-    # Execute corresponding command
-    if args.command == 'posts':
-        # Create fetcher and execute
-        auto_fetch_detail = not args.no_auto_detail  # Invert the flag
-        fetcher = XHSBloggerFetcher(
-            args.account_id,
-            args.output,
-            auto_fetch_detail=auto_fetch_detail,
-            detail_threshold=args.detail_threshold
-        )
-        fetcher.fetch_all_posts(max_pages=args.max_pages, delay=args.delay)
-
-    elif args.command == 'detail':
-        # Create a minimal fetcher instance (account_id not needed for detail fetching)
-        fetcher = XHSBloggerFetcher(account_id='')
-        fetcher.fetch_and_save_post_detail(args.note_id, args.output)
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 271
examples/run_batch.py

@@ -1,271 +0,0 @@
-"""
-批量处理脚本:读取demo.json,批量处理视频
-
-功能:
-1. 读取demo.json文件
-2. 使用run_single.py同样的方法处理每个视频
-3. 每处理完一个视频立即写入结果到output_demo.json文件(实时保存)
-"""
-
-import json
-import sys
-import os
-from pathlib import Path
-from datetime import datetime
-
-# 添加项目根目录到路径
-project_root = Path(__file__).parent.parent
-sys.path.insert(0, str(project_root))
-
-# 手动加载.env文件
-def load_env_file(env_path):
-    """手动加载.env文件"""
-    if not env_path.exists():
-        return False
-
-    with open(env_path, 'r') as f:
-        for line in f:
-            line = line.strip()
-            # 跳过注释和空行
-            if not line or line.startswith('#'):
-                continue
-            # 解析KEY=VALUE
-            if '=' in line:
-                key, value = line.split('=', 1)
-                os.environ[key.strip()] = value.strip()
-
-    return True
-
-env_path = project_root / ".env"
-if load_env_file(env_path):
-    print(f"✅ 已加载环境变量从: {env_path}")
-    # 验证API密钥
-    api_key = os.environ.get("GEMINI_API_KEY", "")
-    if api_key:
-        print(f"   GEMINI_API_KEY: {api_key[:10]}...")
-else:
-    print(f"⚠️  未找到.env文件: {env_path}")
-
-from src.workflows.what_deconstruction_workflow import WhatDeconstructionWorkflow
-from src.utils.logger import get_logger
-
-logger = get_logger(__name__)
-
-
-def convert_to_workflow_input(raw_data):
-    """
-    将原始数据转换为工作流输入格式(视频分析版本)
-
-    Args:
-        raw_data: 原始帖子数据(视频格式)
-    """
-    # 视频分析版本:直接使用视频URL和文本信息
-    input_data = {
-        "video": raw_data.get("video", ""),
-        "channel_content_id": raw_data.get("channel_content_id", ""),
-        "title": raw_data.get("title", ""),
-        "body_text": raw_data.get("body_text", ""),
-    }
-
-    return input_data
-
-
-def load_existing_results(output_path):
-    """
-    加载已有的结果文件(如果存在)
-
-    Args:
-        output_path: 结果文件路径
-
-    Returns:
-        已有结果数据,如果文件不存在则返回None
-    """
-    if not output_path.exists():
-        return None
-
-    try:
-        with open(output_path, "r", encoding="utf-8") as f:
-            return json.load(f)
-    except Exception as e:
-        print(f"⚠️  读取已有结果文件失败(将重新创建): {e}")
-        return None
-
-
-def save_result(output_path, results, timestamp, total, success_count, fail_count):
-    """
-    保存结果到文件
-
-    Args:
-        output_path: 结果文件路径
-        results: 结果列表
-        timestamp: 时间戳
-        total: 总数
-        success_count: 成功数
-        fail_count: 失败数
-    """
-    output_data = {
-        "timestamp": timestamp,
-        "total": total,
-        "success_count": success_count,
-        "fail_count": fail_count,
-        "results": results
-    }
-
-    try:
-        with open(output_path, "w", encoding="utf-8") as f:
-            json.dump(output_data, f, ensure_ascii=False, indent=2)
-        return True
-    except Exception as e:
-        print(f"❌ 保存结果失败: {e}")
-        return False
-
-
-def process_single_video(workflow, video_data, index, total):
-    """
-    处理单个视频
-
-    Args:
-        workflow: WhatDeconstructionWorkflow实例
-        video_data: 视频数据字典
-        index: 当前索引(从1开始)
-        total: 总数
-
-    Returns:
-        处理结果字典,包含原始数据和结果
-    """
-    channel_content_id = video_data.get("channel_content_id", "unknown")
-    title = video_data.get("title", "")
-
-    print(f"\n{'=' * 80}")
-    print(f"[{index}/{total}] 处理视频: {channel_content_id}")
-    print(f"标题: {title}")
-    print(f"{'=' * 80}")
-
-    # 转换数据格式
-    try:
-        input_data = convert_to_workflow_input(video_data)
-        print(f"✅ 数据格式转换成功")
-    except Exception as e:
-        print(f"❌ 数据格式转换失败: {e}")
-        return {
-            "video_data": video_data,
-            "success": False,
-            "error": f"数据格式转换失败: {e}",
-            "result": None
-        }
-
-    # 执行工作流
-    print(f"   开始执行工作流(这可能需要几分钟时间)...")
-    try:
-        result = workflow.invoke(input_data)
-        print(f"✅ 工作流执行成功")
-        return {
-            "video_data": video_data,
-            "success": True,
-            "error": None,
-            "result": result
-        }
-    except Exception as e:
-        print(f"❌ 工作流执行失败: {e}")
-        import traceback
-        traceback.print_exc()
-        return {
-            "video_data": video_data,
-            "success": False,
-            "error": f"工作流执行失败: {e}",
-            "result": None
-        }
-
-
-def main():
-    """主函数"""
-    print("=" * 80)
-    print("批量处理视频 - What 解构工作流(视频分析版本)")
-    print("=" * 80)
-
-    # 1. 读取demo.json
-    print("\n[1] 读取demo.json...")
-    demo_json_path = Path(__file__).parent / "demo.json"
-    if not demo_json_path.exists():
-        print(f"❌ 未找到demo.json文件: {demo_json_path}")
-        return
-
-    try:
-        with open(demo_json_path, "r", encoding="utf-8") as f:
-            video_list = json.load(f)
-        print(f"✅ 成功读取demo.json,共 {len(video_list)} 个视频")
-    except Exception as e:
-        print(f"❌ 读取demo.json失败: {e}")
-        return
-
-    # 2. 初始化工作流
-    print("\n[2] 初始化工作流...")
-    try:
-        workflow = WhatDeconstructionWorkflow(
-            model_provider="google_genai",
-            max_depth=10
-        )
-        print(f"✅ 工作流初始化成功")
-    except Exception as e:
-        print(f"❌ 工作流初始化失败: {e}")
-        import traceback
-        traceback.print_exc()
-        return
-
-    # 3. 准备结果文件路径和时间戳
-    print("\n[3] 准备结果文件...")
-    output_path = Path(__file__).parent / "output_demo.json"
-    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
-
-    # 检查是否已有结果文件(用于提示)
-    existing_results = load_existing_results(output_path)
-    if existing_results:
-        print(f"⚠️  检测到已有结果文件: {output_path}")
-        print(f"   将覆盖已有结果")
-    else:
-        print(f"✅ 将创建新的结果文件: {output_path}")
-
-    # 4. 批量处理视频(每处理完一个立即保存)
-    print("\n[4] 开始批量处理视频(每处理完一个立即保存结果)...")
-    results = []
-    total = len(video_list)
-    success_count = 0
-    fail_count = 0
-
-    for index, video_data in enumerate(video_list, 1):
-        # 处理单个视频
-        result = process_single_video(workflow, video_data, index, total)
-        results.append(result)
-
-        # 更新统计
-        if result["success"]:
-            success_count += 1
-        else:
-            fail_count += 1
-
-        # 立即保存结果到文件
-        print(f"   保存结果到文件... [{success_count}成功/{fail_count}失败/{total}总计]")
-        if save_result(output_path, results, timestamp, total, success_count, fail_count):
-            print(f"✅ 结果已实时保存到: {output_path}")
-        else:
-            print(f"❌ 保存结果失败,但将继续处理")
-
-    # 5. 显示最终处理摘要
-    print("\n" + "=" * 80)
-    print("最终处理摘要")
-    print("=" * 80)
-    print(f"总计: {total} 个视频")
-    print(f"成功: {success_count} 个")
-    print(f"失败: {fail_count} 个")
-    print(f"结果文件: {output_path}")
-
-    if fail_count > 0:
-        print("\n失败的视频:")
-        for i, result in enumerate(results, 1):
-            if not result["success"]:
-                video_data = result["video_data"]
-                print(f"  [{i}] {video_data.get('channel_content_id')}: {result.get('error')}")
-
-
-if __name__ == "__main__":
-    main()

+ 0 - 160
examples/run_batch_script.py

@@ -1,160 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-批量运行脚本理解工作流 (ScriptWorkflow)。
-
-读取 examples/output_demo.json 中三点解构的结果,
-将每条里的 video_data 和 result 组合后传入 ScriptWorkflow,
-并将脚本理解结果增量写入 examples/output_demo_script.json。
-"""
-
-import json
-import os
-import sys
-from datetime import datetime
-from pathlib import Path
-from typing import Dict, Any, List
-# 添加项目根目录到路径
-project_root = Path(__file__).parent.parent
-sys.path.insert(0, str(project_root))
-
-from src.workflows.script_workflow import ScriptWorkflow
-from src.utils.logger import get_logger
-
-logger = get_logger(__name__)
-
-
-def load_json(path: Path) -> Dict[str, Any]:
-    if not path.exists():
-        return {}
-    with path.open("r", encoding="utf-8") as f:
-        return json.load(f)
-
-
-def save_json(path: Path, data: Dict[str, Any]) -> None:
-    tmp_path = path.with_suffix(".tmp")
-    with tmp_path.open("w", encoding="utf-8") as f:
-        json.dump(data, f, ensure_ascii=False, indent=2)
-    tmp_path.replace(path)
-
-
-def build_script_input(video_data: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]:
-    """根据 output_demo.json 单条结果构造 ScriptWorkflow 的输入结构。"""
-    video_info = result.get("视频信息", {}) or {}
-    three_points = result.get("三点解构", {}) or {}
-    topic_understanding = result.get("选题理解", {}) or {}
-
-    # 基本视频信息
-    video_url = video_data.get("video") or video_info.get("视频URL", "")
-    channel_content_id = video_data.get("channel_content_id", "")
-    title = video_data.get("title") or video_info.get("标题", "")
-    body_text = video_info.get("正文", "")
-
-    # 三点解构映射到脚本理解需要的字段
-    inspiration_points: List[Dict[str, Any]] = three_points.get("灵感点", []) or []
-    purpose_block = three_points.get("目的点", {}) or {}
-    purpose_points: List[Dict[str, Any]] = purpose_block.get("purposes", []) or []
-    key_points_block = three_points.get("关键点", {}) or {}
-    key_points: List[Dict[str, Any]] = key_points_block.get("key_points", []) or []
-
-    input_data: Dict[str, Any] = {
-        "video": video_url,
-        "channel_content_id": channel_content_id,
-        "text": {
-            "title": title,
-            "body": body_text,
-        },
-        "topic_selection_understanding": topic_understanding,
-        "content_weight": {},  # 目前没有对应数据,留空
-        "inspiration_points": inspiration_points,
-        "purpose_points": purpose_points,
-        "key_points": key_points,
-    }
-    return input_data
-
-
-def main() -> None:
-    base_dir = Path(__file__).parent
-    input_path = base_dir / "output_demo.json"
-    output_path = base_dir / "output_demo_script.json"
-
-    if not input_path.exists():
-        raise FileNotFoundError(f"找不到输入文件: {input_path}")
-
-    # 读取原始三点解构结果
-    raw = load_json(input_path)
-    raw_results: List[Dict[str, Any]] = raw.get("results", []) or []
-
-    # 读取已有的脚本理解输出,支持增量追加
-    output_data = load_json(output_path)
-    if not output_data:
-        output_data = {
-            "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
-            "total": 0,
-            "success_count": 0,
-            "fail_count": 0,
-            "results": [],
-        }
-
-    existing_results: List[Dict[str, Any]] = output_data.get("results", []) or []
-    # 用 channel_content_id + video URL 去重,避免重复处理
-    processed_keys = {
-        f"{item.get('video_data', {}).get('channel_content_id','')}|"
-        f"{item.get('video_data', {}).get('video','')}"
-        for item in existing_results
-    }
-
-    workflow = ScriptWorkflow()
-
-    for item in raw_results:
-        video_data = item.get("video_data", {}) or {}
-        result = item.get("result", {}) or {}
-
-        key = f"{video_data.get('channel_content_id','')}|{video_data.get('video','')}"
-        if key in processed_keys:
-            logger.info(f"已处理过该视频,跳过: {key}")
-            continue
-
-        logger.info(f"处理视频: channel_content_id={video_data.get('channel_content_id')} title={video_data.get('title')}")
-
-        try:
-            script_input = build_script_input(video_data, result)
-            script_result = workflow.invoke(script_input)
-
-            record = {
-                "video_data": video_data,
-                "what_deconstruction_result": result,
-                "script_result": script_result,
-                "success": True,
-                "error": None,
-            }
-
-            output_data["success_count"] = output_data.get("success_count", 0) + 1
-
-        except Exception as e:
-            logger.error(f"脚本理解处理失败: {e}", exc_info=True)
-            record = {
-                "video_data": video_data,
-                "what_deconstruction_result": result,
-                "script_result": None,
-                "success": False,
-                "error": str(e),
-            }
-            output_data["fail_count"] = output_data.get("fail_count", 0) + 1
-
-        output_data["results"].append(record)
-        output_data["total"] = output_data.get("total", 0) + 1
-
-        # 处理完一条就保存一次,避免长任务中途失败导致全部丢失
-        save_json(output_path, output_data)
-
-    logger.info(
-        f"批量脚本理解完成: total={output_data.get('total')}, "
-        f"success={output_data.get('success_count')}, fail={output_data.get('fail_count')}"
-    )
-
-
-if __name__ == "__main__":
-    main()
-
-# 脚本解构

BIN
examples/static/visualize/__pycache__/tab5.cpython-313.pyc


+ 45 - 3
examples/static/visualize/tab5.py

@@ -43,9 +43,33 @@ def generate_tab5_content(data: Dict[str, Any]) -> str:
     script_data = data.get('脚本理解', {})
     form_list = script_data.get('形式列表', [])
     substance_list = script_data.get('实质列表', [])
-    inspiration_points = data.get('灵感点', [])
-    purpose_points = data.get('目的点', [])
-    key_points = data.get('关键点', [])
+    
+    # 处理灵感点:可能是列表,也可能在对象中
+    inspiration_data = data.get('灵感点', [])
+    if isinstance(inspiration_data, list):
+        inspiration_points = inspiration_data
+    elif isinstance(inspiration_data, dict):
+        inspiration_points = inspiration_data.get('inspiration_points', [])
+    else:
+        inspiration_points = []
+    
+    # 处理目的点:可能是列表,也可能在对象的purposes字段中
+    purpose_data = data.get('目的点', [])
+    if isinstance(purpose_data, list):
+        purpose_points = purpose_data
+    elif isinstance(purpose_data, dict):
+        purpose_points = purpose_data.get('purposes', [])
+    else:
+        purpose_points = []
+    
+    # 处理关键点:可能是列表,也可能在对象的key_points字段中
+    keypoint_data = data.get('关键点', [])
+    if isinstance(keypoint_data, list):
+        key_points = keypoint_data
+    elif isinstance(keypoint_data, dict):
+        key_points = keypoint_data.get('key_points', [])
+    else:
+        key_points = []
 
     if not substance_list and not form_list:
         html += '<div class="empty-state">暂无实质点和形式点数据</div>\n'
@@ -125,6 +149,9 @@ def generate_tab5_content(data: Dict[str, Any]) -> str:
         html += '<h4 class="group-title">灵感点</h4>\n'
         html += '<div class="target-items">\n'
         for idx, point in enumerate(inspiration_points, 1):
+            # 确保 point 是字典
+            if not isinstance(point, dict):
+                continue
             # 使用提取的特征中的特征名称,每个特征名称一个卡片
             features = point.get('提取的特征', [])
             feature_names = [f.get('特征名称', '') for f in features if f.get('特征名称')]
@@ -153,6 +180,9 @@ def generate_tab5_content(data: Dict[str, Any]) -> str:
         html += '<h4 class="group-title">关键点</h4>\n'
         html += '<div class="target-items">\n'
         for idx, point in enumerate(key_points, 1):
+            # 确保 point 是字典
+            if not isinstance(point, dict):
+                continue
             # 使用提取的特征中的特征名称,每个特征名称一个卡片
             features = point.get('提取的特征', [])
             feature_names = [f.get('特征名称', '') for f in features if f.get('特征名称')]
@@ -181,6 +211,9 @@ def generate_tab5_content(data: Dict[str, Any]) -> str:
         html += '<h4 class="group-title">目的点</h4>\n'
         html += '<div class="target-items">\n'
         for idx, point in enumerate(purpose_points, 1):
+            # 确保 point 是字典
+            if not isinstance(point, dict):
+                continue
             # 使用提取的特征中的特征名称,每个特征名称一个卡片
             features = point.get('提取的特征', [])
             feature_names = [f.get('特征名称', '') for f in features if f.get('特征名称')]
@@ -297,6 +330,9 @@ def generate_tab5_content(data: Dict[str, Any]) -> str:
         html += '<h4 class="group-title">灵感点</h4>\n'
         html += '<div class="target-items">\n'
         for idx, point in enumerate(inspiration_points, 1):
+            # 确保 point 是字典
+            if not isinstance(point, dict):
+                continue
             # 使用提取的特征中的特征名称,每个特征名称一个卡片
             features = point.get('提取的特征', [])
             feature_names = [f.get('特征名称', '') for f in features if f.get('特征名称')]
@@ -325,6 +361,9 @@ def generate_tab5_content(data: Dict[str, Any]) -> str:
         html += '<h4 class="group-title">关键点</h4>\n'
         html += '<div class="target-items">\n'
         for idx, point in enumerate(key_points, 1):
+            # 确保 point 是字典
+            if not isinstance(point, dict):
+                continue
             # 使用提取的特征中的特征名称,每个特征名称一个卡片
             features = point.get('提取的特征', [])
             feature_names = [f.get('特征名称', '') for f in features if f.get('特征名称')]
@@ -353,6 +392,9 @@ def generate_tab5_content(data: Dict[str, Any]) -> str:
         html += '<h4 class="group-title">目的点</h4>\n'
         html += '<div class="target-items">\n'
         for idx, point in enumerate(purpose_points, 1):
+            # 确保 point 是字典
+            if not isinstance(point, dict):
+                continue
             # 使用提取的特征中的特征名称,每个特征名称一个卡片
             features = point.get('提取的特征', [])
             feature_names = [f.get('特征名称', '') for f in features if f.get('特征名称')]

+ 14 - 2
examples/visualize_script_results.py

@@ -326,10 +326,22 @@ class ScriptResultVisualizer:
                 print(f"⚠️  跳过第 {idx} 条结果:缺少 script_result 字段或结构不正确")
                 continue
 
+            # 从 what_deconstruction_result 中获取三点解构数据并合并到 script_data
+            what_result = item.get("what_deconstruction_result", {})
+            if isinstance(what_result, dict) and "三点解构" in what_result:
+                deconstruction = what_result["三点解构"]
+                # 将三点解构数据合并到 script_data 顶层,供 tab1 使用
+                if "灵感点" in deconstruction:
+                    script_data["灵感点"] = deconstruction["灵感点"]
+                if "目的点" in deconstruction:
+                    script_data["目的点"] = deconstruction["目的点"]
+                if "关键点" in deconstruction:
+                    script_data["关键点"] = deconstruction["关键点"]
+
             video_data = item.get("video_data") or {}
             channel_content_id = video_data.get("channel_content_id")
 
-            # 用于 HTML 内部展示的“文件名”标签
+            # 用于 HTML 内部展示的"文件名"标签
             json_label = f"{self.json_file.name}#{idx}"
 
             # 生成输出文件名(优先使用 channel_content_id,回退到序号)
@@ -396,7 +408,7 @@ def main():
             json_path = Path.cwd() / json_path
     else:
         # 默认使用 examples/output_demo_script.json
-        json_path = Path(__file__).parent / "output_demo_script.json"
+        json_path = Path(__file__).parent / "output_decode_result.json"
 
     print("🚀 开始生成脚本结果可视化...")
     print(f"📁 JSON文件: {json_path}")

+ 0 - 841
examples/visualize_script_results_v2.py

@@ -1,841 +0,0 @@
-#!/usr/bin/env python3
-"""
-脚本结果可视化工具 V2
-功能:为 output_demo_script_v2.json 中的每个视频生成独立的HTML可视化页面,专门展示"整体结构理解"的结果
-"""
-
-import json
-import argparse
-import sys
-from pathlib import Path
-from datetime import datetime
-from typing import List, Dict, Any, Optional
-import html as html_module
-
-# 保证可以从项目根目录导入
-PROJECT_ROOT = Path(__file__).parent.parent
-if str(PROJECT_ROOT) not in sys.path:
-    sys.path.insert(0, str(PROJECT_ROOT))
-
-
-class ScriptResultVisualizerV2:
-    """脚本结果可视化器 V2 - 专门展示整体结构理解"""
-
-    def __init__(self, json_file: str = None):
-        """
-        初始化可视化器
-
-        Args:
-            json_file: JSON文件路径
-        """
-        if json_file is None:
-            self.json_file = None
-        else:
-            self.json_file = Path(json_file)
-            if not self.json_file.is_absolute():
-                self.json_file = Path.cwd() / json_file
-
-    def load_json_data(self, file_path: Path) -> Optional[Dict[str, Any]]:
-        """
-        加载JSON文件
-
-        Args:
-            file_path: JSON文件路径
-
-        Returns:
-            JSON数据字典,加载失败返回None
-        """
-        try:
-            with open(file_path, 'r', encoding='utf-8') as f:
-                return json.load(f)
-        except Exception as e:
-            print(f"加载文件失败 {file_path}: {e}")
-            return None
-
-    def generate_overall_structure_section(self, overall_data: Dict[str, Any], section_idx: int = 0) -> str:
-        """生成整体解构部分HTML"""
-        html = '<div class="section overall-structure">\n'
-        html += '    <h2 class="section-title collapsible" onclick="toggleCollapse(this)">整体解构 <span class="toggle-icon">▼</span></h2>\n'
-        html += f'    <div class="section-content collapsed" id="section-{section_idx}">\n'
-
-        # 节点基础信息
-        if "节点基础信息" in overall_data:
-            html += f'        <div class="subsection">\n'
-            html += f'            <h3 class="subsection-title collapsible" onclick="toggleCollapse(this)">节点基础信息 <span class="toggle-icon">▼</span></h3>\n'
-            html += f'            <div class="subsection-content collapsed" id="subsection-{section_idx}-0">\n'
-            html += f'                <div class="content-box">{html_module.escape(str(overall_data["节点基础信息"]))}</div>\n'
-            html += '            </div>\n'
-            html += '        </div>\n'
-
-        # 整体实质×形式
-        if "整体实质×形式" in overall_data:
-            html += f'        <div class="subsection">\n'
-            html += f'            <h3 class="subsection-title collapsible" onclick="toggleCollapse(this)">整体实质×形式 <span class="toggle-icon">▼</span></h3>\n'
-            html += f'            <div class="subsection-content collapsed" id="subsection-{section_idx}-1">\n'
-            html += f'                <div class="content-box">{html_module.escape(str(overall_data["整体实质×形式"]))}</div>\n'
-            html += '            </div>\n'
-            html += '        </div>\n'
-
-        # 纵向逻辑流
-        if "纵向逻辑流" in overall_data:
-            html += f'        <div class="subsection">\n'
-            html += f'            <h3 class="subsection-title collapsible" onclick="toggleCollapse(this)">纵向逻辑流 <span class="toggle-icon">▼</span></h3>\n'
-            html += f'            <div class="subsection-content collapsed" id="subsection-{section_idx}-2">\n'
-            logic_flow = overall_data["纵向逻辑流"]
-            if isinstance(logic_flow, list):
-                html += '                <div class="logic-flow">\n'
-                for idx, stage in enumerate(logic_flow):
-                    html += f'                    <div class="logic-stage collapsible-item" onclick="toggleCollapse(this)">\n'
-                    html += f'                        <div class="logic-stage-header">\n'
-                    if isinstance(stage, dict):
-                        stage_num = stage.get("阶段编号", "")
-                        stage_name = stage.get("阶段逻辑名称", "")
-                        stage_desc = stage.get("阶段逻辑描述", "")
-                        if stage_num:
-                            html += f'                            <div class="stage-number">阶段 {stage_num}</div>\n'
-                        if stage_name:
-                            html += f'                            <div class="stage-name">{html_module.escape(stage_name)}</div>\n'
-                        html += f'                            <span class="toggle-icon">▼</span>\n'
-                        html += f'                        </div>\n'
-                        html += f'                        <div class="logic-stage-content collapsed">\n'
-                        if stage_desc:
-                            html += f'                            <div class="stage-desc">{html_module.escape(stage_desc)}</div>\n'
-                        html += f'                        </div>\n'
-                    html += '                    </div>\n'
-                html += '                </div>\n'
-            html += '            </div>\n'
-            html += '        </div>\n'
-
-        html += '    </div>\n'
-        html += '</div>\n'
-        return html
-
-    def generate_paragraph_section(self, paragraphs: List[Dict[str, Any]], section_idx: int = 1) -> str:
-        """生成段落解构部分HTML"""
-        html = '<div class="section paragraph-structure">\n'
-        html += '    <h2 class="section-title collapsible" onclick="toggleCollapse(this)">段落解构 <span class="toggle-icon">▼</span></h2>\n'
-        html += f'    <div class="section-content collapsed" id="section-{section_idx}">\n'
-
-        if not isinstance(paragraphs, list):
-            html += '        <p>暂无段落数据</p>\n'
-            html += '    </div>\n'
-            html += '</div>\n'
-            return html
-
-        for para_idx, para in enumerate(paragraphs):
-            # 段落基本信息
-            para_num = para.get("段落序号", "")
-            time_range = para.get("时间范围", "")
-            units = para.get("包含单元", [])
-            full_text = para.get("段落完整文案", "")
-
-            html += f'        <div class="paragraph collapsible-item" onclick="toggleCollapse(this)">\n'
-            html += f'            <div class="paragraph-header">\n'
-            if para_num:
-                html += f'                <span class="para-number">段落 {para_num}</span>\n'
-            if time_range:
-                html += f'                <span class="time-range">{html_module.escape(time_range)}</span>\n'
-            if units:
-                units_str = ", ".join(str(u) for u in units) if isinstance(units, list) else str(units)
-                html += f'                <span class="units">包含单元: {html_module.escape(units_str)}</span>\n'
-            html += f'                <span class="toggle-icon">▼</span>\n'
-            html += '            </div>\n'
-            html += f'            <div class="paragraph-content collapsed">\n'
-
-            if full_text:
-                html += f'                <div class="paragraph-text">{html_module.escape(full_text)}</div>\n'
-
-            # 具体元素实质和形式
-            concrete_elements = para.get("具体元素实质和形式", [])
-            if concrete_elements:
-                html += f'                <div class="element-group collapsible-item" onclick="event.stopPropagation(); toggleCollapse(this);">\n'
-                html += f'                    <h4 class="element-group-title">具体元素实质和形式 <span class="toggle-icon">▼</span></h4>\n'
-                html += f'                    <div class="element-list collapsed">\n'
-                for elem_idx, elem in enumerate(concrete_elements):
-                    html += f'                        <div class="element-item collapsible-item" onclick="event.stopPropagation(); toggleCollapse(this);">\n'
-                    elem_name = elem.get("具体元素名称", "")
-                    if elem_name:
-                        html += f'                            <div class="element-name-header">\n'
-                        html += f'                                <span class="element-name">{html_module.escape(elem_name)}</span>\n'
-                        html += f'                                <span class="toggle-icon">▼</span>\n'
-                        html += f'                            </div>\n'
-                    html += f'                            <div class="element-forms collapsed">\n'
-                    for form_type in ["对应形式-文案", "对应形式-画面", "对应形式-声音"]:
-                        if form_type in elem:
-                            form_label = form_type.replace("对应形式-", "")
-                            html += f'                                <div class="form-item">\n'
-                            html += f'                                    <span class="form-label">{html_module.escape(form_label)}:</span>\n'
-                            html += f'                                    <span class="form-content">{html_module.escape(str(elem[form_type]))}</span>\n'
-                            html += f'                                </div>\n'
-                    html += '                            </div>\n'
-                    html += '                        </div>\n'
-                html += '                    </div>\n'
-                html += '                </div>\n'
-
-            # 具象概念实质和形式
-            concrete_concepts = para.get("具象概念实质和形式", [])
-            if concrete_concepts:
-                html += f'                <div class="element-group collapsible-item" onclick="event.stopPropagation(); toggleCollapse(this);">\n'
-                html += f'                    <h4 class="element-group-title">具象概念实质和形式 <span class="toggle-icon">▼</span></h4>\n'
-                html += f'                    <div class="element-list collapsed">\n'
-                for concept in concrete_concepts:
-                    html += f'                        <div class="element-item collapsible-item" onclick="event.stopPropagation(); toggleCollapse(this);">\n'
-                    concept_name = concept.get("具象概念名称", "")
-                    if concept_name:
-                        html += f'                            <div class="element-name-header">\n'
-                        html += f'                                <span class="element-name">{html_module.escape(concept_name)}</span>\n'
-                        html += f'                                <span class="toggle-icon">▼</span>\n'
-                        html += f'                            </div>\n'
-                    html += f'                            <div class="element-forms collapsed">\n'
-                    for form_type in ["对应形式-文案", "对应形式-画面", "对应形式-声音"]:
-                        if form_type in concept:
-                            form_label = form_type.replace("对应形式-", "")
-                            html += f'                                <div class="form-item">\n'
-                            html += f'                                    <span class="form-label">{html_module.escape(form_label)}:</span>\n'
-                            html += f'                                    <span class="form-content">{html_module.escape(str(concept[form_type]))}</span>\n'
-                            html += f'                                </div>\n'
-                    html += '                            </div>\n'
-                    html += '                        </div>\n'
-                html += '                    </div>\n'
-                html += '                </div>\n'
-
-            # 抽象概念实质和形式
-            abstract_concepts = para.get("抽象概念实质和形式", [])
-            if abstract_concepts:
-                html += f'                <div class="element-group collapsible-item" onclick="event.stopPropagation(); toggleCollapse(this);">\n'
-                html += f'                    <h4 class="element-group-title">抽象概念实质和形式 <span class="toggle-icon">▼</span></h4>\n'
-                html += f'                    <div class="element-list collapsed">\n'
-                for concept in abstract_concepts:
-                    html += f'                        <div class="element-item collapsible-item" onclick="event.stopPropagation(); toggleCollapse(this);">\n'
-                    concept_name = concept.get("抽象概念名称", "")
-                    if concept_name:
-                        html += f'                            <div class="element-name-header">\n'
-                        html += f'                                <span class="element-name">{html_module.escape(concept_name)}</span>\n'
-                        html += f'                                <span class="toggle-icon">▼</span>\n'
-                        html += f'                            </div>\n'
-                    html += f'                            <div class="element-forms collapsed">\n'
-                    for form_type in ["对应形式-文案", "对应形式-画面", "对应形式-声音"]:
-                        if form_type in concept:
-                            form_label = form_type.replace("对应形式-", "")
-                            html += f'                                <div class="form-item">\n'
-                            html += f'                                    <span class="form-label">{html_module.escape(form_label)}:</span>\n'
-                            html += f'                                    <span class="form-content">{html_module.escape(str(concept[form_type]))}</span>\n'
-                            html += f'                                </div>\n'
-                    html += '                            </div>\n'
-                    html += '                        </div>\n'
-                html += '                    </div>\n'
-                html += '                </div>\n'
-
-            html += '            </div>\n'
-            html += '        </div>\n'
-
-        html += '    </div>\n'
-        html += '</div>\n'
-        return html
-
-    def generate_html(self, understanding_data: Dict[str, Any], video_title: str, channel_content_id: str) -> str:
-        """生成完整的HTML页面"""
-        html = '<!DOCTYPE html>\n'
-        html += '<html lang="zh-CN">\n'
-        html += '<head>\n'
-        html += '    <meta charset="UTF-8">\n'
-        html += '    <meta name="viewport" content="width=device-width, initial-scale=1.0">\n'
-        html += f'    <title>整体结构理解 - {html_module.escape(video_title)}</title>\n'
-        html += '    <style>\n'
-        html += self.generate_css()
-        html += '    </style>\n'
-        html += '</head>\n'
-        html += '<body>\n'
-
-        html += '<div class="container">\n'
-
-        # 页眉
-        html += '<div class="header">\n'
-        html += '    <h1>整体结构理解</h1>\n'
-        html += f'    <div class="subtitle">{html_module.escape(video_title)}</div>\n'
-        if channel_content_id:
-            html += f'    <div class="subtitle">ID: {html_module.escape(channel_content_id)}</div>\n'
-        html += f'    <div class="subtitle">生成时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</div>\n'
-        html += '</div>\n'
-
-        # 主内容
-        html += '<div class="content">\n'
-
-        # 整体解构
-        if "整体解构" in understanding_data:
-            html += self.generate_overall_structure_section(understanding_data["整体解构"], section_idx=0)
-
-        # 段落解构
-        if "段落解构" in understanding_data:
-            html += self.generate_paragraph_section(understanding_data["段落解构"], section_idx=1)
-
-        html += '</div>\n'
-
-        # 页脚
-        html += '<div class="footer">\n'
-        html += f'    <p>生成时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</p>\n'
-        html += '</div>\n'
-
-        html += '</div>\n'
-        html += '<script>\n'
-        html += self.generate_javascript()
-        html += '</script>\n'
-        html += '</body>\n'
-        html += '</html>\n'
-
-        return html
-
-    def generate_javascript(self) -> str:
-        """生成JavaScript代码"""
-        return """
-        function toggleCollapse(element) {
-            // 阻止事件冒泡(如果是从子元素触发的)
-            if (event) {
-                event.stopPropagation();
-            }
-            
-            // 查找内容区域 - 优先查找下一个兄弟元素
-            let content = element.nextElementSibling;
-            
-            // 如果下一个兄弟元素不是内容区域,尝试在元素内部查找
-            if (!content || (!content.classList.contains('collapsed') && 
-                !content.classList.contains('expanded') && 
-                !content.classList.contains('section-content') &&
-                !content.classList.contains('subsection-content') &&
-                !content.classList.contains('paragraph-content') &&
-                !content.classList.contains('logic-stage-content') &&
-                !content.classList.contains('element-forms') &&
-                !content.classList.contains('element-list'))) {
-                
-                // 在元素内部查找内容区域
-                content = element.querySelector('.section-content, .subsection-content, .paragraph-content, .logic-stage-content, .element-forms, .element-list');
-            }
-            
-            // 如果还是找不到,尝试查找父元素的下一个兄弟
-            if (!content && element.parentElement) {
-                const siblings = Array.from(element.parentElement.children);
-                const currentIndex = siblings.indexOf(element);
-                if (currentIndex < siblings.length - 1) {
-                    content = siblings[currentIndex + 1];
-                }
-            }
-            
-            if (content) {
-                // 切换展开/收起状态
-                const isCollapsed = content.classList.contains('collapsed');
-                
-                if (isCollapsed) {
-                    content.classList.remove('collapsed');
-                    content.classList.add('expanded');
-                } else {
-                    content.classList.remove('expanded');
-                    content.classList.add('collapsed');
-                }
-                
-                // 更新图标
-                const icon = element.querySelector('.toggle-icon');
-                if (icon) {
-                    icon.textContent = isCollapsed ? '▲' : '▼';
-                }
-            }
-        }
-        """
-
-    def generate_css(self) -> str:
-        """生成CSS样式"""
-        return """
-        * {
-            margin: 0;
-            padding: 0;
-            box-sizing: border-box;
-        }
-
-        body {
-            font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif;
-            line-height: 1.6;
-            color: #333;
-            background-color: #f5f5f5;
-            padding: 20px;
-        }
-
-        .container {
-            max-width: 1200px;
-            margin: 0 auto;
-            background: white;
-            border-radius: 8px;
-            box-shadow: 0 2px 10px rgba(0,0,0,0.1);
-            overflow: hidden;
-        }
-
-        .header {
-            background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
-            color: white;
-            padding: 30px;
-            text-align: center;
-        }
-
-        .header h1 {
-            font-size: 2em;
-            margin-bottom: 10px;
-        }
-
-        .header .subtitle {
-            font-size: 1.1em;
-            opacity: 0.9;
-            margin-top: 5px;
-        }
-
-        .content {
-            padding: 30px;
-        }
-
-        .section {
-            margin-bottom: 40px;
-        }
-
-        .section h2 {
-            font-size: 1.8em;
-            color: #667eea;
-            margin-bottom: 20px;
-            padding-bottom: 10px;
-            border-bottom: 2px solid #667eea;
-        }
-
-        .section-title, .subsection-title {
-            cursor: pointer;
-            user-select: none;
-            display: flex;
-            align-items: center;
-            justify-content: space-between;
-            transition: background-color 0.2s;
-        }
-
-        .section-title:hover, .subsection-title:hover {
-            background-color: rgba(102, 126, 234, 0.1);
-            border-radius: 4px;
-            padding: 5px;
-            margin: -5px;
-        }
-
-        .toggle-icon {
-            font-size: 0.8em;
-            transition: transform 0.3s;
-            margin-left: 10px;
-        }
-
-        .section-content, .subsection-content, .paragraph-content, .logic-stage-content, .element-forms, .element-list {
-            max-height: 0;
-            overflow: hidden;
-            transition: max-height 0.3s ease-out;
-        }
-
-        .section-content.expanded, .subsection-content.expanded, .paragraph-content.expanded, 
-        .logic-stage-content.expanded, .element-forms.expanded, .element-list.expanded {
-            max-height: 10000px;
-            transition: max-height 0.5s ease-in;
-        }
-
-        .section-content.collapsed, .subsection-content.collapsed, .paragraph-content.collapsed, 
-        .logic-stage-content.collapsed, .element-forms.collapsed, .element-list.collapsed {
-            max-height: 0;
-        }
-
-        .subsection {
-            margin-bottom: 25px;
-        }
-
-        .subsection h3 {
-            font-size: 1.4em;
-            color: #555;
-            margin-bottom: 15px;
-        }
-
-        .content-box {
-            background: #f9f9f9;
-            padding: 20px;
-            border-radius: 6px;
-            border-left: 4px solid #667eea;
-            line-height: 1.8;
-            white-space: pre-wrap;
-        }
-
-        .logic-flow {
-            display: flex;
-            flex-direction: column;
-            gap: 15px;
-        }
-
-        .logic-stage {
-            background: #f0f4ff;
-            padding: 20px;
-            border-radius: 6px;
-            border-left: 4px solid #764ba2;
-        }
-
-        .stage-number {
-            font-weight: bold;
-            color: #764ba2;
-            font-size: 1.1em;
-            margin-bottom: 8px;
-        }
-
-        .stage-name {
-            font-weight: bold;
-            color: #333;
-            font-size: 1.1em;
-            margin-bottom: 10px;
-        }
-
-        .stage-desc {
-            color: #666;
-            line-height: 1.7;
-        }
-
-        .paragraph {
-            background: #fafafa;
-            border: 1px solid #e0e0e0;
-            border-radius: 6px;
-            padding: 20px;
-            margin-bottom: 25px;
-            cursor: pointer;
-            transition: background-color 0.2s;
-        }
-
-        .paragraph:hover {
-            background-color: #f0f0f0;
-        }
-
-        .paragraph-header {
-            display: flex;
-            flex-wrap: wrap;
-            align-items: center;
-            gap: 15px;
-            margin-bottom: 15px;
-            padding-bottom: 10px;
-            border-bottom: 1px solid #e0e0e0;
-        }
-
-        .paragraph-header .toggle-icon {
-            margin-left: auto;
-        }
-
-        .para-number {
-            font-weight: bold;
-            color: #667eea;
-            font-size: 1.1em;
-        }
-
-        .time-range {
-            color: #666;
-            background: #e8e8e8;
-            padding: 4px 10px;
-            border-radius: 4px;
-        }
-
-        .units {
-            color: #666;
-            font-size: 0.9em;
-        }
-
-        .paragraph-text {
-            background: white;
-            padding: 15px;
-            border-radius: 4px;
-            margin-bottom: 20px;
-            line-height: 1.8;
-            border-left: 3px solid #667eea;
-        }
-
-        .element-group {
-            margin-bottom: 25px;
-            cursor: pointer;
-            transition: background-color 0.2s;
-            padding: 10px;
-            border-radius: 4px;
-        }
-
-        .element-group:hover {
-            background-color: rgba(0,0,0,0.02);
-        }
-
-        .element-group-title {
-            font-size: 1.2em;
-            color: #555;
-            margin-bottom: 15px;
-            padding-bottom: 8px;
-            border-bottom: 1px solid #ddd;
-            display: flex;
-            align-items: center;
-            justify-content: space-between;
-            cursor: pointer;
-            user-select: none;
-        }
-
-        .element-group-title:hover {
-            color: #667eea;
-        }
-
-        .element-list {
-            display: flex;
-            flex-direction: column;
-            gap: 15px;
-        }
-
-        .element-item {
-            background: white;
-            border: 1px solid #e0e0e0;
-            border-radius: 6px;
-            padding: 15px;
-            transition: box-shadow 0.2s;
-            cursor: pointer;
-        }
-
-        .element-item:hover {
-            box-shadow: 0 2px 8px rgba(0,0,0,0.1);
-        }
-
-        .element-name-header {
-            display: flex;
-            align-items: center;
-            justify-content: space-between;
-            margin-bottom: 12px;
-            padding-bottom: 8px;
-            border-bottom: 1px solid #f0f0f0;
-        }
-
-        .element-name {
-            font-weight: bold;
-            color: #667eea;
-            font-size: 1.1em;
-        }
-
-        .logic-stage {
-            cursor: pointer;
-            transition: background-color 0.2s;
-        }
-
-        .logic-stage:hover {
-            background-color: #e8f0ff;
-        }
-
-        .logic-stage-header {
-            display: flex;
-            align-items: center;
-            justify-content: space-between;
-        }
-
-        .logic-stage-header .toggle-icon {
-            margin-left: auto;
-        }
-
-        .element-forms {
-            display: flex;
-            flex-direction: column;
-            gap: 10px;
-        }
-
-        .form-item {
-            display: flex;
-            gap: 10px;
-        }
-
-        .form-label {
-            font-weight: 600;
-            color: #555;
-            min-width: 60px;
-        }
-
-        .form-content {
-            color: #666;
-            flex: 1;
-            line-height: 1.6;
-        }
-
-        .footer {
-            background: #f9f9f9;
-            padding: 20px;
-            text-align: center;
-            color: #666;
-            border-top: 1px solid #e0e0e0;
-        }
-
-        @media (max-width: 768px) {
-            .container {
-                margin: 10px;
-                border-radius: 4px;
-            }
-
-            .content {
-                padding: 20px;
-            }
-
-            .header {
-                padding: 20px;
-            }
-
-            .header h1 {
-                font-size: 1.5em;
-            }
-
-            .paragraph-header {
-                flex-direction: column;
-                gap: 8px;
-            }
-
-            .form-item {
-                flex-direction: column;
-                gap: 5px;
-            }
-
-            .form-label {
-                min-width: auto;
-            }
-        }
-        """
-
-    def save_all_html(self, output_dir: str | Path | None = None) -> List[str]:
-        """
-        基于 output_demo_script_v2.json,为其中每个视频生成一个独立的 HTML 页面。
-
-        仅支持这种结构:
-        {
-          "results": [
-            {
-              "video_data": {...},
-              "script_result": {
-                "整体结构理解": {...}
-              }
-            },
-            ...
-          ]
-        }
-        """
-        if self.json_file is None:
-            print("❌ 错误: 未指定JSON文件")
-            return []
-
-        # 加载JSON数据
-        data = self.load_json_data(self.json_file)
-        if data is None:
-            return []
-
-        results = data.get("results") or []
-        if not isinstance(results, list) or not results:
-            print("⚠️  JSON 中未找到有效的 results 数组,期望为 output_demo_script_v2.json 结构")
-            return []
-
-        # 确定输出目录
-        if output_dir is None:
-            # 默认输出到examples/html_v2目录
-            output_dir = Path(__file__).parent / "html_v2"
-        else:
-            output_dir = Path(output_dir)
-            if not output_dir.is_absolute():
-                output_dir = Path.cwd() / output_dir
-
-        # 创建输出目录
-        output_dir.mkdir(parents=True, exist_ok=True)
-
-        generated_paths: List[str] = []
-
-        print(f"📁 检测到 output_demo_script_v2 格式,包含 {len(results)} 条结果")
-
-        for idx, item in enumerate(results, start=1):
-            script_result = item.get("script_result")
-            if not isinstance(script_result, dict):
-                print(f"⚠️  跳过第 {idx} 条结果:缺少 script_result 字段或结构不正确")
-                continue
-
-            understanding_data = script_result.get("整体结构理解")
-            if not isinstance(understanding_data, dict):
-                print(f"⚠️  跳过第 {idx} 条结果:缺少 整体结构理解 字段或结构不正确")
-                continue
-
-            video_data = item.get("video_data") or {}
-            channel_content_id = video_data.get("channel_content_id", "")
-            video_title = video_data.get("title", f"视频 {idx}")
-
-            # 生成输出文件名(优先使用 channel_content_id,回退到序号)
-            if channel_content_id:
-                output_filename = f"understanding_{channel_content_id}.html"
-            else:
-                output_filename = f"{self.json_file.stem}_understanding_{idx}.html"
-
-            output_path = output_dir / output_filename
-
-            html_content = self.generate_html(understanding_data, video_title, channel_content_id)
-
-            with open(output_path, "w", encoding="utf-8") as f:
-                f.write(html_content)
-
-            generated_paths.append(str(output_path))
-            print(f"✅ HTML文件已生成: {output_path}")
-
-        if not generated_paths:
-            print("⚠️  未能从 JSON 中生成任何 HTML 文件")
-
-        return generated_paths
-
-
-def main():
-    """主函数"""
-    # 解析命令行参数
-    parser = argparse.ArgumentParser(
-        description='脚本结果可视化工具 V2 - 基于 output_demo_script_v2.json 为每个视频生成独立的HTML页面(展示整体结构理解)',
-        formatter_class=argparse.RawDescriptionHelpFormatter,
-        epilog="""
-使用示例:
-  # 在当前 examples 目录下使用默认的 output_demo_script_v2.json 并输出到 examples/html_v2
-  python visualize_script_results_v2.py
-
-  # 指定 JSON 文件
-  python visualize_script_results_v2.py examples/output_demo_script_v2.json
-
-  # 指定 JSON 文件和输出目录
-  python visualize_script_results_v2.py examples/output_demo_script_v2.json --output-dir examples/html_v2_custom
-        """
-    )
-
-    parser.add_argument(
-        'json_file',
-        type=str,
-        nargs='?',
-        help='JSON文件路径(默认为 examples/output_demo_script_v2.json)'
-    )
-
-    parser.add_argument(
-        '-o', '--output-dir',
-        type=str,
-        default=None,
-        help='输出目录路径(默认: examples/html_v2)'
-    )
-
-    args = parser.parse_args()
-
-    # 确定 JSON 文件路径
-    if args.json_file:
-        json_path = Path(args.json_file)
-        if not json_path.is_absolute():
-            json_path = Path.cwd() / json_path
-    else:
-        # 默认使用 examples/output_demo_script_v2.json
-        json_path = Path(__file__).parent / "output_demo_script_v2.json"
-
-    print("🚀 开始生成整体结构理解可视化...")
-    print(f"📁 JSON文件: {json_path}")
-    print(f"📄 输出目录: {args.output_dir or (Path(__file__).parent / 'html_v2')}")
-    print()
-
-    visualizer = ScriptResultVisualizerV2(json_file=str(json_path))
-    generated_files = visualizer.save_all_html(output_dir=args.output_dir)
-
-    if generated_files:
-        print()
-        print(f"🎉 完成! 共生成 {len(generated_files)} 个HTML文件")
-        # 提示其中一个示例文件
-        print(f"📄 示例: 请在浏览器中打开: {generated_files[0]}")
-
-
-if __name__ == "__main__":
-    main()
-

+ 0 - 333
src/workflows/script_workflow.py

@@ -1,333 +0,0 @@
-"""
-Script Workflow.
-
-脚本理解工作流:编排脚本段落划分和元素提取流程的执行顺序和流程逻辑。
-流程:段落划分 → 实质提取 → 形式提取 → 分离结果 → 结果汇总
-"""
-
-from typing import Dict, Any
-from langgraph.graph import StateGraph, END
-
-from src.components.agents.base import BaseGraphAgent
-from src.states.script_state import ScriptState
-from src.components.agents.script_section_division_agent import ScriptSectionDivisionAgent
-from src.components.agents.script_substance_extraction_agent import ScriptSubstanceExtractionAgent
-from src.components.agents.script_form_extraction_agent import ScriptFormExtractionAgent
-from src.components.functions.video_upload_function import VideoUploadFunction
-from src.utils.logger import get_logger
-
-logger = get_logger(__name__)
-
-
-class ScriptWorkflow(BaseGraphAgent):
-    """脚本理解工作流
-
-    功能:
-    - 编排脚本理解流程
-    - 流程:段落划分 → 实质提取 → 形式提取 → 分离结果 → 结果汇总
-    - 管理状态传递
-
-    实现方式:BaseGraphAgent (LangGraph)
-    """
-
-    def __init__(
-        self,
-        name: str = "script_workflow",
-        description: str = "脚本理解工作流",
-        model_provider: str = "google_genai"
-    ):
-        super().__init__(
-            name=name,
-            description=description,
-            state_class=ScriptState
-        )
-
-        self.model_provider = model_provider
-
-        # 初始化视频上传Function
-        self.video_upload_func = VideoUploadFunction()
-
-        # 初始化脚本段落划分Agent
-        self.section_agent = ScriptSectionDivisionAgent(
-            model_provider=model_provider
-        )
-
-        # 初始化实质提取Agent
-        self.substance_agent = ScriptSubstanceExtractionAgent(
-            model_provider=model_provider
-        )
-
-        # 初始化形式提取Agent
-        self.form_agent = ScriptFormExtractionAgent(
-            model_provider=model_provider
-        )
-
-        logger.info(f"ScriptWorkflow 初始化完成,model_provider: {model_provider}")
-
-    def _build_graph(self) -> StateGraph:
-        """构建工作流图(视频分析版)
-
-        流程:
-        START → 视频上传 → 段落划分 → 实质提取 → 形式提取 → 分离结果 → 结果汇总 → END
-        """
-        workflow = StateGraph(dict)  # 使用dict作为状态类型
-
-        # 添加所有节点
-        workflow.add_node("video_upload", self._video_upload_node)
-        workflow.add_node("section_division", self._section_division_node)
-        workflow.add_node("substance_extraction", self._substance_extraction_node)
-        workflow.add_node("form_extraction", self._form_extraction_node)
-        workflow.add_node("merge_all_results", self._merge_all_results_node)
-        workflow.add_node("result_aggregation", self._result_aggregation_node)
-
-        # 定义流程的边
-        workflow.set_entry_point("video_upload")
-        workflow.add_edge("video_upload", "section_division")
-        workflow.add_edge("section_division", "substance_extraction")
-        workflow.add_edge("substance_extraction", "form_extraction")
-        workflow.add_edge("form_extraction", "merge_all_results")
-        workflow.add_edge("merge_all_results", "result_aggregation")
-        workflow.add_edge("result_aggregation", END)
-
-        logger.info("工作流图构建完成 - 流程:视频上传 → 段落划分 → 实质提取 → 形式提取 → 分离结果 → 结果汇总")
-
-        return workflow
-
-    def _video_upload_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:视频上传(第一步)- 下载视频并上传至Gemini"""
-        logger.info("=== 执行节点:视频上传 ===")
-
-        try:
-            # 初始化Function
-            if not self.video_upload_func.is_initialized:
-                self.video_upload_func.initialize()
-
-            # 执行视频上传
-            result = self.video_upload_func.execute(state)
-
-            # 更新状态
-            state.update(result)
-
-            video_uri = result.get("video_uploaded_uri")
-            if video_uri:
-                logger.info(f"视频上传完成 - URI: {video_uri}")
-            else:
-                error = result.get("video_upload_error", "未知错误")
-                logger.warning(f"视频上传失败: {error}")
-
-        except Exception as e:
-            logger.error(f"视频上传失败: {e}", exc_info=True)
-            state.update({
-                "video_uploaded_uri": None,
-                "video_upload_error": str(e)
-            })
-
-        return state
-
-    def _section_division_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:脚本段落划分"""
-        logger.info("=== 执行节点:脚本段落划分 ===")
-
-        try:
-            # 初始化Agent
-            if not self.section_agent.is_initialized:
-                self.section_agent.initialize()
-
-            # 执行Agent
-            result = self.section_agent.process(state)
-
-            # 更新状态
-            state.update(result)
-
-            sections = result.get("段落列表", [])
-            content_category = result.get("内容品类", "未知")
-            logger.info(f"脚本段落划分完成 - 内容品类: {content_category}, 段落数: {len(sections)}")
-
-        except Exception as e:
-            logger.error(f"脚本段落划分失败: {e}", exc_info=True)
-            state.update({
-                "内容品类": "未知品类",
-                "段落列表": []
-            })
-
-        return state
-
-    def _substance_extraction_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:实质元素提取"""
-        logger.info("=== 执行节点:实质元素提取 ===")
-
-        try:
-            # 初始化Agent
-            if not self.substance_agent.is_initialized:
-                self.substance_agent.initialize()
-
-            # 准备状态:将段落列表包装到section_division字段中
-            sections = state.get("段落列表", [])
-            state["section_division"] = {"段落列表": sections}
-
-            # 执行Agent
-            result = self.substance_agent.process(state)
-
-            # 更新状态
-            state.update(result)
-
-            final_elements = result.get("substance_final_elements", [])
-            logger.info(f"实质元素提取完成 - 最终元素数: {len(final_elements)}")
-
-        except Exception as e:
-            logger.error(f"实质元素提取失败: {e}", exc_info=True)
-            state.update({
-                "concrete_elements": [],
-                "concrete_concepts": [],
-                "abstract_concepts": [],
-                "substance_elements": [],
-                "substance_analyzed_result": [],
-                "substance_scored_result": {},
-                "substance_filtered_ids": [],
-                "substance_categorized_result": {},
-                "substance_final_elements": []
-            })
-
-        return state
-
-    def _form_extraction_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:形式元素提取"""
-        logger.info("=== 执行节点:形式元素提取 ===")
-
-        try:
-            # 初始化Agent
-            if not self.form_agent.is_initialized:
-                self.form_agent.initialize()
-
-            # 执行Agent(依赖实质元素)
-            result = self.form_agent.process(state)
-
-            # 更新状态
-            state.update(result)
-
-            final_elements = result.get("form_final_elements", [])
-            logger.info(f"形式元素提取完成 - 最终元素数: {len(final_elements)}")
-
-        except Exception as e:
-            logger.error(f"形式元素提取失败: {e}", exc_info=True)
-            state.update({
-                "concrete_element_forms": [],
-                "concrete_concept_forms": [],
-                "overall_forms": [],
-                "form_elements": [],
-                "form_analyzed_result": [],
-                "form_scored_result": {},
-                "form_weighted_result": {},
-                "form_filtered_ids": [],
-                "form_categorized_result": {},
-                "form_final_elements": []
-            })
-
-        return state
-
-    def _merge_all_results_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:分离实质和形式结果(Step 7)"""
-        logger.info("=== 执行节点:分离实质和形式结果 ===")
-
-        try:
-            # 获取实质和形式的最终元素
-            substance_final_elements = state.get("substance_final_elements", [])
-            form_final_elements = state.get("form_final_elements", [])
-
-            # 分别存储实质列表和形式列表
-            state["实质列表"] = substance_final_elements
-            state["形式列表"] = form_final_elements
-
-            logger.info(f"分离完成 - 实质元素: {len(substance_final_elements)}, 形式元素: {len(form_final_elements)}")
-
-        except Exception as e:
-            logger.error(f"分离结果失败: {e}", exc_info=True)
-            state["实质列表"] = []
-            state["形式列表"] = []
-
-        return state
-
-    def _result_aggregation_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:结果汇总"""
-        logger.info("=== 执行节点:结果汇总 ===")
-
-        try:
-            # 从 topic_selection_understanding 提取选题描述
-            topic_understanding = state.get("topic_selection_understanding", {})
-
-            # 兼容两种结构:直接包含主题/描述,或嵌套在"选题"键下
-            if "选题" in topic_understanding:
-                selected_topic = topic_understanding.get("选题", {})
-            else:
-                selected_topic = topic_understanding
-
-            # 组装最终结果 - 实质和形式分别输出
-            final_result = {
-                "选题描述": {
-                    "主题": selected_topic.get("主题", ""),
-                    "描述": selected_topic.get("描述", "")
-                },
-                "脚本理解": {
-                    "内容品类": state.get("内容品类", "未知"),
-                    "段落列表": state.get("段落列表", []),
-                    "实质列表": state.get("实质列表", []),  # 独立的实质列表
-                    "形式列表": state.get("形式列表", []),  # 独立的形式列表
-                    "图片列表": state.get("images", [])
-                },
-                "灵感点": state.get("inspiration_points", []),
-                "目的点": state.get("purpose_points", []),
-                "关键点": state.get("key_points", [])
-            }
-
-            # 更新状态
-            state["final_result"] = final_result
-
-            logger.info("结果汇总完成")
-
-        except Exception as e:
-            logger.error(f"结果汇总失败: {e}", exc_info=True)
-            state["final_result"] = {
-                "错误": f"汇总失败: {str(e)}"
-            }
-
-        return state
-
-    def invoke(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
-        """执行工作流(公共接口)- 视频分析版
-
-        Returns:
-            最终脚本理解结果
-        """
-        logger.info("=== 开始执行脚本理解工作流(视频分析) ===")
-
-        # 确保工作流已初始化
-        if not self.is_initialized:
-            self.initialize()
-
-        # 构建 text(兼容两种输入方式)
-        if "text" in input_data and isinstance(input_data.get("text"), dict):
-            text = input_data.get("text", {})
-        else:
-            text = {
-                "title": input_data.get("title", ""),
-                "body": input_data.get("body_text", ""),
-            }
-
-        # 初始化状态(包含视频信息,供视频上传和后续Agent使用)
-        initial_state = {
-            "video": input_data.get("video", ""),
-            "channel_content_id": input_data.get("channel_content_id", ""),
-            "text": text,
-            "topic_selection_understanding": input_data.get("topic_selection_understanding", {}),
-            "content_weight": input_data.get("content_weight", {}),
-            "inspiration_points": input_data.get("inspiration_points", []),
-            "purpose_points": input_data.get("purpose_points", []),
-            "key_points": input_data.get("key_points", [])
-        }
-
-        # 执行工作流
-        result = self.compiled_graph.invoke(initial_state)
-
-        logger.info("=== 脚本理解工作流执行完成(视频分析) ===")
-
-        return result.get("final_result", {})

+ 0 - 501
src/workflows/what_deconstruction_workflow.py

@@ -1,501 +0,0 @@
-"""
-What Deconstruction Workflow.
-
-What解构主工作流:编排三点解构流程(灵感点、目的点、关键点)的执行顺序和流程逻辑。
-流程(视频分析):视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 结果汇总
-"""
-
-from typing import Dict, Any
-from langgraph.graph import StateGraph, END
-
-from src.components.agents.base import BaseGraphAgent
-from src.states.what_deconstruction_state import WhatDeconstructionState
-from src.components.agents.topic_selection_understanding_agent import TopicSelectionUnderstandingAgent
-from src.components.agents.search_keyword_agent import SearchKeywordAgent
-from src.components.functions.result_aggregation_function import ResultAggregationFunction
-from src.components.functions.video_upload_function import VideoUploadFunction
-# 新增三点解构Agent
-from src.components.agents.inspiration_points_agent import InspirationPointsAgent
-from src.components.agents.purpose_point_agent import PurposePointAgent
-from src.components.agents.key_points_agent import KeyPointsAgent
-# 新增选题结构Agent V2
-from src.components.agents.topic_agent_v2 import TopicAgentV2
-# 新增结构化内容库Agent
-from src.components.agents.structure_agent import StructureAgent
-from src.utils.logger import get_logger
-
-logger = get_logger(__name__)
-
-
-class WhatDeconstructionWorkflow(BaseGraphAgent):
-    """What解构主工作流(视频分析版本)
-
-    功能:
-    - 编排整个What解构流程(针对视频输入)
-    - 支持条件分支:
-      * 视频上传 → topic_selection_v2(选题结构分析V2,直接基于视频)→ 结束
-      * 视频上传 → structure_agent(结构化内容库解构,直接基于视频)→ 结束
-      * 视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 搜索关键词 → 结果汇总 → 结束
-    - 通过 state 中的 use_topic_agent_v2 或 use_structure_agent 标志手动控制分支选择(二选一)
-    - 管理状态传递
-    - 仅支持单视频输入
-
-    实现方式:BaseGraphAgent (LangGraph)
-    """
-
-    def __init__(
-        self,
-        name: str = "what_deconstruction_workflow",
-        description: str = "What解构主工作流(视频分析)",
-        model_provider: str = "google_genai",
-        max_depth: int = 10
-    ):
-        super().__init__(
-            name=name,
-            description=description,
-            state_class=WhatDeconstructionState
-        )
-
-        self.max_depth = max_depth
-
-        # 初始化视频上传Function
-        self.video_upload_func = VideoUploadFunction()
-
-        # 初始化选题理解Agent
-        self.topic_selection_understanding_agent = TopicSelectionUnderstandingAgent(
-            model_provider=model_provider
-        )
-        
-        # 初始化搜索关键词Agent
-        self.search_keyword_agent = SearchKeywordAgent(
-            model_provider=model_provider
-        )
-        
-        # 初始化结果汇总Function
-        self.result_aggregation_func = ResultAggregationFunction()
-
-        # 初始化新的三点解构Agent
-        self.inspiration_points_agent = InspirationPointsAgent(
-            model_provider=model_provider
-        )
-        self.purpose_point_agent = PurposePointAgent(
-            model_provider=model_provider
-        )
-        self.key_points_agent = KeyPointsAgent(
-            model_provider=model_provider
-        )
-
-        # 初始化选题结构Agent V2
-        self.topic_agent_v2 = TopicAgentV2(
-            model_provider=model_provider
-        )
-
-        # 初始化结构化内容库Agent
-        self.structure_agent = StructureAgent(
-            model_provider=model_provider
-        )
-
-        logger.info(f"WhatDeconstructionWorkflow(视频分析)初始化完成")
-
-    def _build_graph(self) -> StateGraph:
-        """构建工作流图
-
-        新流程(视频分析):
-        START → 视频上传 → [条件分支]
-          - 如果 use_topic_agent_v2=True: → topic_selection_v2 → END
-          - 如果 use_structure_agent=True: → structure_agent → END
-          - 否则: → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 搜索关键词 → 结果汇总 → END
-        """
-        workflow = StateGraph(dict)  # 使用dict作为状态类型
-
-        # 添加节点
-        workflow.add_node("video_upload", self._video_upload_node)
-        workflow.add_node("topic_selection_v2", self._topic_selection_v2_node)
-        workflow.add_node("structure_agent", self._structure_agent_node)
-        workflow.add_node("inspiration_points_extraction", self._inspiration_points_node)
-        workflow.add_node("purpose_point_extraction", self._purpose_point_node)
-        workflow.add_node("key_points_extraction", self._key_points_node)
-        workflow.add_node("topic_selection_understanding", self._topic_selection_understanding_node)
-        workflow.add_node("search_keyword_extraction", self._search_keyword_node)
-        workflow.add_node("result_aggregation", self._result_aggregation_node)
-
-        # 定义流程的边:视频上传 → 条件分支 → topic_selection_v2/structure_agent(结束) 或 原流程
-        workflow.set_entry_point("video_upload")
-        # 条件分支:根据 use_topic_agent_v2 或 use_structure_agent 标志决定走哪个分支
-        workflow.add_conditional_edges(
-            "video_upload",
-            self._route_after_upload,
-            {
-                "topic_selection_v2": "topic_selection_v2",
-                "structure_agent": "structure_agent",
-                "normal_flow": "inspiration_points_extraction"
-            }
-        )
-        # topic_selection_v2 分支直接结束
-        workflow.add_edge("topic_selection_v2", END)
-        # structure_agent 分支直接结束
-        workflow.add_edge("structure_agent", END)
-        # 原流程继续
-        workflow.add_edge("inspiration_points_extraction", "purpose_point_extraction")
-        workflow.add_edge("purpose_point_extraction", "key_points_extraction")
-        workflow.add_edge("key_points_extraction", "topic_selection_understanding")
-        workflow.add_edge("topic_selection_understanding", "search_keyword_extraction")
-        workflow.add_edge("search_keyword_extraction", "result_aggregation")
-        workflow.add_edge("result_aggregation", END)
-
-        logger.info("工作流图构建完成 - 视频分析流程:视频上传 → [条件分支: topic_selection_v2 / structure_agent / 三点解构流程]")
-
-        return workflow
-
-    def _video_upload_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:视频上传(第一步)- 下载视频并上传至Gemini"""
-        logger.info("=== 执行节点:视频上传 ===")
-
-        try:
-            # 初始化Function
-            if not self.video_upload_func.is_initialized:
-                self.video_upload_func.initialize()
-
-            # 执行视频上传
-            result = self.video_upload_func.execute(state)
-
-            # 更新状态
-            state.update(result)
-
-            video_uri = result.get("video_uploaded_uri")
-            if video_uri:
-                logger.info(f"视频上传完成 - URI: {video_uri}")
-            else:
-                error = result.get("video_upload_error", "未知错误")
-                logger.warning(f"视频上传失败: {error}")
-
-        except Exception as e:
-            logger.error(f"视频上传失败: {e}", exc_info=True)
-            state.update({
-                "video_uploaded_uri": None,
-                "video_upload_error": str(e)
-            })
-
-        return state
-
-    def _route_after_upload(self, state: Dict[str, Any]) -> str:
-        """条件分支函数:路由到不同的处理分支
-        
-        通过 state 中的标志来控制:
-        - use_topic_agent_v2=True: 走 topic_selection_v2 分支,直接结束
-        - use_structure_agent=True: 走 structure_agent 分支,直接结束
-        - 否则: 走原来的正常流程
-        
-        Returns:
-            "topic_selection_v2" / "structure_agent" / "normal_flow"
-        """
-        use_v2 = state.get("use_topic_agent_v2", False)
-        use_structure = state.get("use_structure_agent", False)
-        
-        if use_v2:
-            logger.info("检测到 use_topic_agent_v2=True,将使用 TopicAgentV2 分支")
-            return "topic_selection_v2"
-        elif use_structure:
-            logger.info("检测到 use_structure_agent=True,将使用 StructureAgent 分支")
-            return "structure_agent"
-        else:
-            logger.info("使用正常流程(三点解构 → 选题理解 → 搜索关键词)")
-            return "normal_flow"
-
-    def _topic_selection_v2_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:选题结构分析 V2(直接基于视频)"""
-        logger.info("=== 执行节点:选题结构分析 V2 ===")
-
-        try:
-            # 初始化Agent
-            if not self.topic_agent_v2.is_initialized:
-                self.topic_agent_v2.initialize()
-
-            # 执行Agent
-            result = self.topic_agent_v2.process(state)
-
-            # 更新状态
-            state.update(result)
-
-            topic_selection = result.get("topic_selection_v2", {})
-            if "error" not in topic_selection:
-                logger.info(f"选题结构分析 V2 完成 - topic_selection_v2: {topic_selection}")
-            else:
-                logger.warning(f"选题结构分析 V2 执行出错: {topic_selection.get('error')}")
-
-        except Exception as e:
-            logger.error(f"选题结构分析 V2 失败: {e}", exc_info=True)
-            state.update({
-                "video_script": "",
-                "topic_selection_v2": {
-                    "error": str(e),
-                }
-            })
-
-        return state
-
-    def _structure_agent_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:结构化内容库解构(直接基于视频)"""
-        logger.info("=== 执行节点:结构化内容库解构 ===")
-
-        try:
-            # 初始化Agent
-            if not self.structure_agent.is_initialized:
-                self.structure_agent.initialize()
-
-            # 执行Agent
-            result = self.structure_agent.process(state)
-
-            # 更新状态
-            state.update(result)
-
-            structure_data = result.get("structure_data", {})
-            if "错误" not in structure_data:
-                logger.info(f"结构化内容库解构完成 - structure_data: {structure_data}")
-            else:
-                logger.warning(f"结构化内容库解构执行出错: {structure_data.get('错误')}")
-
-        except Exception as e:
-            logger.error(f"结构化内容库解构失败: {e}", exc_info=True)
-            state.update({
-                "structure_data": {
-                    "错误": str(e),
-                }
-            })
-
-        return state
-
-    def _topic_selection_understanding_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:选题理解"""
-        logger.info("=== 执行节点:选题理解 ===")
-
-        try:
-            # 初始化Agent
-            if not self.topic_selection_understanding_agent.is_initialized:
-                self.topic_selection_understanding_agent.initialize()
-
-            # 执行Agent
-            result = self.topic_selection_understanding_agent.process(state)
-
-            # 更新状态
-            state.update(result)
-
-            logger.info(f"选题理解完成 - result: {result}")
-
-        except Exception as e:
-            logger.error(f"选题理解失败: {e}", exc_info=True)
-            state.update({
-                "topic_selection_understanding": {}
-            })
-
-        return state
-
-    def _search_keyword_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:搜索关键词提取"""
-        logger.info("=== 执行节点:搜索关键词提取 ===")
-
-        try:
-            # 初始化Agent
-            if not self.search_keyword_agent.is_initialized:
-                self.search_keyword_agent.initialize()
-
-            # 执行Agent
-            result = self.search_keyword_agent.process(state)
-
-            # 更新状态
-            state.update(result)
-
-            search_keywords_count = result.get("search_keywords", {}).get("总数", 0)
-            logger.info(f"搜索关键词提取完成 - 共 {search_keywords_count} 个搜索词")
-
-        except Exception as e:
-            logger.error(f"搜索关键词提取失败: {e}", exc_info=True)
-            state.update({
-                "search_keywords": {
-                    "搜索词列表": [],
-                    "总数": 0,
-                    "错误": str(e)
-                }
-            })
-
-        return state
-
-    def _inspiration_points_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:灵感点提取(新三点解构)"""
-        logger.info("=== 执行节点:灵感点提取 ===")
-
-        try:
-            # 初始化Agent
-            if not self.inspiration_points_agent.is_initialized:
-                self.inspiration_points_agent.initialize()
-
-            # 执行Agent
-            result = self.inspiration_points_agent.process(state)
-
-            # 更新状态
-            state.update(result)
-
-            # 安全地获取灵感点数量:total_count 在 metadata 中
-            if isinstance(result, dict):
-                metadata = result.get("metadata", {})
-                inspiration_count = metadata.get("total_count", 0) if isinstance(metadata, dict) else 0
-                # 如果 metadata 中没有,尝试从 inspiration_points 列表长度获取
-                if inspiration_count == 0:
-                    inspiration_points = result.get("inspiration_points", [])
-                    if isinstance(inspiration_points, list):
-                        inspiration_count = len(inspiration_points)
-            else:
-                # 如果 result 不是 dict(比如是列表),尝试获取长度
-                inspiration_count = len(result) if isinstance(result, list) else 0
-            
-            logger.info(f"灵感点提取完成 - 共 {inspiration_count} 个灵感点")
-
-        except Exception as e:
-            logger.error(f"灵感点提取失败: {e}", exc_info=True)
-            state.update({
-                "inspiration_points": {
-                    "error": str(e),
-                    "points": [],
-                    "total_count": 0
-                }
-            })
-
-        return state
-
-    def _purpose_point_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:目的点提取(新三点解构)"""
-        logger.info("=== 执行节点:目的点提取 ===")
-
-        try:
-            # 初始化Agent
-            if not self.purpose_point_agent.is_initialized:
-                self.purpose_point_agent.initialize()
-
-            # 执行Agent
-            result = self.purpose_point_agent.process(state)
-
-            # 更新状态
-            state.update(result)
-
-            main_purpose = result.get("purpose_point", {}).get("main_purpose", "未知")
-            logger.info(f"目的点提取完成 - 主要目的: {main_purpose}")
-
-        except Exception as e:
-            logger.error(f"目的点提取失败: {e}", exc_info=True)
-            state.update({
-                "purpose_point": {
-                    "error": str(e),
-                    "main_purpose": "未知"
-                }
-            })
-
-        return state
-
-    def _key_points_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:关键点提取(新三点解构)"""
-        logger.info("=== 执行节点:关键点提取 ===")
-
-        try:
-            # 初始化Agent
-            if not self.key_points_agent.is_initialized:
-                self.key_points_agent.initialize()
-
-            # 执行Agent
-            result = self.key_points_agent.process(state)
-
-            # 更新状态
-            state.update(result)
-
-            total_key_points = result.get("key_points", {}).get("total_count", 0)
-            logger.info(f"关键点提取完成 - 共 {total_key_points} 个关键点")
-
-        except Exception as e:
-            logger.error(f"关键点提取失败: {e}", exc_info=True)
-            state.update({
-                "key_points": {
-                    "error": str(e),
-                    "creator_perspective": {"key_points": [], "summary": ""},
-                    "consumer_perspective": {"key_points": [], "summary": ""}
-                }
-            })
-
-        return state
-
-    def _result_aggregation_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:结果汇总"""
-        logger.info("=== 执行节点:结果汇总 ===")
-
-        try:
-            # 初始化Function
-            if not self.result_aggregation_func.is_initialized:
-                self.result_aggregation_func.initialize()
-
-            # 执行Function
-            final_result = self.result_aggregation_func.execute(state)
-
-            # 更新状态
-            state["final_result"] = final_result
-
-            logger.info("结果汇总完成")
-
-        except Exception as e:
-            logger.error(f"结果汇总失败: {e}", exc_info=True)
-            state["final_result"] = {
-                "帖子总结": {"错误": f"汇总失败: {str(e)}"},
-                "帖子包含元素": []
-            }
-
-        return state
-
-    def invoke(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
-        """执行工作流(公共接口)- 视频分析版本
-
-        Args:
-            input_data: 输入数据,包含 video 字段(视频URL)
-                格式参考:examples/56898272/视频详情.json
-                {
-                    "video": "http://...",
-                    "title": "...",
-                    "use_topic_agent_v2": False,  # 可选,控制是否使用 topic_agent_v2 分支
-                    "use_structure_agent": True,  # 可选,控制是否使用 structure_agent 分支(与 use_topic_agent_v2 二选一),默认 True
-                    ...
-                }
-
-        Returns:
-            最终解构结果
-        """
-        logger.info("=== 开始执行 What 解构工作流(视频分析) ===")
-
-        # 确保工作流已初始化
-        if not self.is_initialized:
-            self.initialize()
-
-        # 初始化状态(仅视频输入)
-        initial_state = {
-            "video": input_data.get("video", ""),
-            "channel_content_id": input_data.get("channel_content_id", ""),
-            "text": {
-                "title": input_data.get("title", ""),
-                "body": input_data.get("body_text", ""),
-                "hashtags": []
-            },
-            "current_depth": 0,
-            "max_depth": self.max_depth,
-            "use_topic_agent_v2": input_data.get("use_topic_agent_v2", False),
-            "use_structure_agent": input_data.get("use_structure_agent", True)
-        }
-
-        # 执行工作流
-        result = self.compiled_graph.invoke(initial_state)
-
-        logger.info("=== What 解构工作流执行完成(视频分析) ===")
-
-        # 如果走的是 topic_agent_v2 分支,返回 topic_selection_v2 结果
-        if result.get("use_topic_agent_v2") and "topic_selection_v2" in result:
-            return result.get("topic_selection_v2", {})
-        
-        # 如果走的是 structure_agent 分支,返回 structure_data 结果
-        if result.get("use_structure_agent") and "structure_data" in result:
-            return result.get("structure_data", {})
-
-        return result.get("final_result", {})