""" 测试脚本:运行 待解构帖子.json(带历史帖子) 功能: 1. 加载最近3篇历史帖子(从早到晚排序) 2. 加载待解构帖子 3. 运行 WhatDeconstructionWorkflow """ import json import sys import os import argparse from pathlib import Path from datetime import datetime # 添加项目根目录到路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) # 手动加载.env文件 def load_env_file(env_path): """手动加载.env文件""" if not env_path.exists(): return False with open(env_path, 'r') as f: for line in f: line = line.strip() # 跳过注释和空行 if not line or line.startswith('#'): continue # 解析KEY=VALUE if '=' in line: key, value = line.split('=', 1) os.environ[key.strip()] = value.strip() return True env_path = project_root / ".env" if load_env_file(env_path): print(f"✅ 已加载环境变量从: {env_path}") # 验证API密钥 api_key = os.environ.get("GEMINI_API_KEY", "") if api_key: print(f" GEMINI_API_KEY: {api_key[:10]}...") else: print(f"⚠️ 未找到.env文件: {env_path}") from src.workflows.what_deconstruction_workflow import WhatDeconstructionWorkflow from src.utils.logger import get_logger logger = get_logger(__name__) def load_historical_posts(history_dir, target_timestamp=None, target_post_id=None, max_count=10): """ 加载历史帖子(根据publish_timestamp从新到旧排序) 选择比目标帖子早发布,并且是最近发布的帖子,排除目标帖子本身 Args: history_dir: 历史帖子目录 target_timestamp: 目标帖子的发布时间戳(可选) target_post_id: 目标帖子的ID(用于过滤重复,可选) max_count: 最多加载的帖子数量 Returns: list: 历史帖子列表(从新到旧排序) """ history_path = Path(history_dir) if not history_path.exists(): print(f"⚠️ 历史帖子目录不存在: {history_path}") return [] # 获取所有JSON文件 json_files = list(history_path.glob("*.json")) if not json_files: print(f"⚠️ 未找到历史帖子文件") return [] print(f"\n📁 找到 {len(json_files)} 个历史帖子文件") # 读取所有帖子并提取publish_timestamp posts_with_timestamp = [] for file_path in json_files: try: with open(file_path, 'r', encoding='utf-8') as f: post_data = json.load(f) # 获取发布时间戳,如果不存在则使用0 timestamp = post_data.get("publish_timestamp", 0) post_id = post_data.get("channel_content_id", "") posts_with_timestamp.append({ "file_path": file_path, "post_data": post_data, "timestamp": timestamp, "post_id": post_id }) except Exception as e: print(f" ⚠️ 读取文件失败 {file_path.name}: {e}") continue if not posts_with_timestamp: print(f"⚠️ 没有成功读取到任何帖子") return [] # 过滤掉目标帖子本身 if target_post_id is not None: original_count = len(posts_with_timestamp) posts_with_timestamp = [ post for post in posts_with_timestamp if post["post_id"] != target_post_id ] filtered_count = original_count - len(posts_with_timestamp) if filtered_count > 0: print(f"🔍 过滤掉 {filtered_count} 个重复帖子(目标帖子本身)") # 如果提供了目标时间戳,只保留比目标帖子早的帖子 if target_timestamp is not None: posts_with_timestamp = [ post for post in posts_with_timestamp if post["timestamp"] < target_timestamp ] print(f"📊 筛选出 {len(posts_with_timestamp)} 个比目标帖子早的历史帖子") if not posts_with_timestamp: print(f"⚠️ 没有找到比目标帖子早的历史帖子") return [] # 按照publish_timestamp排序(从新到旧) posts_with_timestamp.sort(key=lambda x: x["timestamp"], reverse=True) # 选择最近的N篇(从新到旧) selected_posts = posts_with_timestamp[:max_count] if len(posts_with_timestamp) > max_count else posts_with_timestamp print(f"📋 选择最近 {len(selected_posts)} 篇历史帖子(按发布时间从新到旧):") historical_posts = [] for idx, post_info in enumerate(selected_posts, 1): post_data = post_info["post_data"] file_path = post_info["file_path"] timestamp = post_info["timestamp"] # 转换为需要的格式 historical_post = { "text": { "title": post_data.get("title", ""), "body": post_data.get("body_text", ""), "hashtags": "" }, "images": post_data.get("images", []) } historical_posts.append(historical_post) # 格式化时间显示 publish_time = post_data.get("publish_time", "未知时间") print(f" {idx}. {file_path.name}") print(f" 标题: {post_data.get('title', '无标题')}") print(f" 发布时间: {publish_time}") print(f" 图片数: {len(post_data.get('images', []))}") return historical_posts def load_test_data(directory): """ 加载测试数据 Args: directory: 帖子目录名(如"阿里多多酱"或"G88818") """ test_data_path = Path(__file__).parent / directory / "待解构帖子.json" with open(test_data_path, "r", encoding="utf-8") as f: data = json.load(f) return data def convert_to_workflow_input(raw_data, historical_posts=None): """ 将原始数据转换为工作流输入格式 Args: raw_data: 原始帖子数据 historical_posts: 历史帖子列表(可选) """ images = raw_data.get("images", []) input_data = { "multimedia_content": { "images": images, "video": raw_data.get("video", {}), "text": { "title": raw_data.get("title", ""), "body": raw_data.get("body_text", ""), "hashtags": "" } }, "comments": raw_data.get("comments", []), "creator_info": { "nickname": raw_data.get("channel_account_name", ""), "account_id": raw_data.get("channel_account_id", "") } } # 如果有历史帖子,添加到输入数据中 if historical_posts: input_data["historical_posts"] = historical_posts return input_data def main(): """主函数""" # 解析命令行参数 parser = argparse.ArgumentParser(description='运行单个帖子的What解构工作流') parser.add_argument('directory', type=str, help='帖子目录名(如"阿里多多酱"或"G88818")') args = parser.parse_args() directory = args.directory print("=" * 80) print(f"开始测试 What 解构工作流(带历史帖子)- 目录: {directory}") print("=" * 80) # 1. 加载测试数据(目标帖子) print("\n[1] 加载测试数据(目标帖子)...") try: raw_data = load_test_data(directory) target_timestamp = raw_data.get('publish_timestamp') target_post_id = raw_data.get('channel_content_id') target_publish_time = raw_data.get('publish_time', '未知时间') print(f"✅ 成功加载测试数据") print(f" - 标题: {raw_data.get('title')}") print(f" - 帖子ID: {target_post_id}") print(f" - 发布时间: {target_publish_time}") print(f" - 图片数: {len(raw_data.get('images', []))}") print(f" - 点赞数: {raw_data.get('like_count')}") print(f" - 评论数: {raw_data.get('comment_count')}") except Exception as e: print(f"❌ 加载测试数据失败: {e}") return # 2. 加载历史帖子(比目标帖子早的帖子,排除目标帖子本身) print("\n[2] 加载历史帖子...") history_dir = Path(__file__).parent / directory / "作者历史帖子" historical_posts = load_historical_posts( history_dir, target_timestamp=target_timestamp, target_post_id=target_post_id, max_count=15 ) if historical_posts: print(f"✅ 成功加载 {len(historical_posts)} 篇历史帖子") else: print(f"⚠️ 未加载到历史帖子,将使用常规分析模式") # 3. 转换数据格式 print("\n[3] 转换数据格式...") try: input_data = convert_to_workflow_input(raw_data, historical_posts) print(f"✅ 数据格式转换成功") print(f" - 话题标签: {input_data['multimedia_content']['text']['hashtags']}") print(f" - 历史帖子数: {len(input_data.get('historical_posts', []))}") except Exception as e: print(f"❌ 数据格式转换失败: {e}") return # 4. 初始化工作流 print("\n[4] 初始化工作流...") try: workflow = WhatDeconstructionWorkflow( model_provider="google_genai", max_depth=10 ) print(f"✅ 工作流初始化成功") except Exception as e: print(f"❌ 工作流初始化失败: {e}") import traceback traceback.print_exc() return # 5. 执行工作流 print("\n[5] 执行工作流...") print(" 注意:这可能需要几分钟时间...") try: result = workflow.invoke(input_data) print(f"✅ 工作流执行成功") except Exception as e: print(f"❌ 工作流执行失败: {e}") import traceback traceback.print_exc() return # 6. 保存结果 print("\n[6] 保存结果...") try: # 生成带时间戳的文件名 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_filename = f"result_{timestamp}.json" output_path = Path(__file__).parent / directory / "output" / output_filename output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=2) print(f"✅ 结果已保存到: {output_path}") print(f" 文件名: {output_filename}") except Exception as e: print(f"❌ 保存结果失败: {e}") return # 7. 生成HTML可视化 # print("\n[7] 生成HTML可视化...") # try: # visualize_script = Path(__file__).parent / "visualize_result.py" # if visualize_script.exists(): # import subprocess # result_viz = subprocess.run( # [sys.executable, str(visualize_script), str(output_path)], # capture_output=True, # text=True # ) # if result_viz.returncode == 0: # print(f"✅ HTML可视化生成成功") # # 查找生成的HTML文件 # html_file = output_path.parent / f"{output_path.stem}_visualization.html" # if html_file.exists(): # print(f" 可视化文件: {html_file}") # else: # print(f"⚠️ HTML可视化生成失败: {result_viz.stderr}") # else: # print(f"⚠️ 未找到可视化脚本: {visualize_script}") # except Exception as e: # print(f"⚠️ 生成HTML可视化失败: {e}") # 8. 显示结果摘要 print("\n" + "=" * 80) print("结果摘要") print("=" * 80) if result: three_points = result.get("三点解构", {}) inspiration_data = three_points.get("灵感点", {}) keypoints_data = three_points.get("关键点", {}) comments = result.get("评论分析", {}).get("解构维度", []) print(f"\n三点解构:") print(f" - 灵感点数量: {inspiration_data.get('total_count', 0)}") print(f" - 灵感点分析模式: {inspiration_data.get('analysis_mode', '未知')}") print(f" - 目的点数量: 1") print(f" - 关键点数量: {keypoints_data.get('total_count', 0)}") # 显示灵感点详情 if inspiration_data.get('points'): print(f"\n灵感点列表:") for idx, point in enumerate(inspiration_data['points'], 1): print(f" {idx}. {point.get('灵感点', '')}") print(f"\n评论分析:") print(f" - 解构维度数: {len(comments)}") topic_understanding = result.get("选题理解", {}) if topic_understanding: topic_theme = topic_understanding.get("topic_theme", "") print(f"\n选题理解:") print(f" - 选题主题: {topic_theme}") print("\n" + "=" * 80) print("测试完成!") print("=" * 80) if __name__ == "__main__": main()