#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创作起点分析 整合数据准备 + AI分析两步流程: 1. 根据帖子图谱 + 人设图谱,准备待分析数据 2. 调用AI分析起点 输入:帖子图谱 + 人设图谱 输出:起点分析结果 """ import asyncio import json from pathlib import Path from typing import Dict, List, Optional import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from agents import Agent, Runner, ModelSettings, trace from agents.tracing.create import custom_span from lib.client import get_model from lib.my_trace import set_trace_smith as set_trace from script.data_processing.path_config import PathConfig # ===== 配置 ===== MODEL_NAME = "google/gemini-3-pro-preview" # MODEL_NAME = "anthropic/claude-sonnet-4" MATCH_SCORE_THRESHOLD = 0.8 # 匹配分数阈值 GLOBAL_RATIO_THRESHOLD = 0.8 # 全局占比阈值 agent = Agent( name="Creation Origin Analyzer", model=get_model(MODEL_NAME), model_settings=ModelSettings( temperature=0.0, max_tokens=8192, ), tools=[], ) # ===== 数据加载 ===== def load_json(file_path: Path) -> Dict: """加载JSON文件""" with open(file_path, "r", encoding="utf-8") as f: return json.load(f) def get_post_graph_files(config: PathConfig) -> List[Path]: """获取所有帖子图谱文件""" post_graph_dir = config.intermediate_dir / "post_graph" return sorted(post_graph_dir.glob("*_帖子图谱.json")) def get_result_file(config: PathConfig, post_id: str) -> Path: """获取分析结果文件路径""" return config.intermediate_dir / "origin_analysis_result" / f"{post_id}_起点分析.json" def is_already_processed(config: PathConfig, post_id: str) -> bool: """检查帖子是否已处理过""" result_file = get_result_file(config, post_id) return result_file.exists() # ===== 第一步:数据准备 ===== def extract_post_detail(post_graph: Dict) -> Dict: """提取帖子详情(保留原始字段名)""" meta = post_graph.get("meta", {}) post_detail = meta.get("postDetail", {}) return { "postId": meta.get("postId", ""), "postTitle": meta.get("postTitle", ""), "body_text": post_detail.get("body_text", ""), "images": post_detail.get("images", []), "video": post_detail.get("video"), "publish_time": post_detail.get("publish_time", ""), "like_count": post_detail.get("like_count", 0), "collect_count": post_detail.get("collect_count", 0), } def extract_analysis_nodes(post_graph: Dict, persona_graph: Dict) -> tuple: """ 提取待分析节点列表 待分析节点 = 灵感点 + 目的点 + 关键点 """ nodes = post_graph.get("nodes", {}) edges = post_graph.get("edges", {}) persona_nodes = persona_graph.get("nodes", {}) persona_index = persona_graph.get("index", {}) # 1. 收集关键点信息(用于支撑信息) keypoints = {} for node_id, node in nodes.items(): if node.get("type") == "标签" and node.get("dimension") == "关键点": keypoints[node_id] = { "名称": node.get("name", ""), "描述": node.get("detail", {}).get("description", ""), } # 2. 分析支撑关系:关键点 → 灵感点/目的点 support_map = {} # {target_node_id: [支撑的关键点信息]} for edge_id, edge in edges.items(): if edge.get("type") == "支撑": source_id = edge.get("source", "") target_id = edge.get("target", "") if source_id in keypoints: if target_id not in support_map: support_map[target_id] = [] support_map[target_id].append(keypoints[source_id]) # 3. 分析关联关系 relation_map = {} # {node_id: [关联的节点名称]} for edge_id, edge in edges.items(): if edge.get("type") == "关联": source_id = edge.get("source", "") target_id = edge.get("target", "") source_name = nodes.get(source_id, {}).get("name", "") target_name = nodes.get(target_id, {}).get("name", "") # 双向记录 if source_id not in relation_map: relation_map[source_id] = [] relation_map[source_id].append(target_name) if target_id not in relation_map: relation_map[target_id] = [] relation_map[target_id].append(source_name) # 4. 分析人设匹配 match_map = {} # {node_id: 匹配信息} persona_out_edges = persona_index.get("outEdges", {}) def get_node_info(node_id: str) -> Optional[Dict]: """获取人设节点的标准信息""" node = persona_nodes.get(node_id, {}) if not node: return None detail = node.get("detail", {}) parent_path = detail.get("parentPath", []) return { "节点ID": node_id, "节点名称": node.get("name", ""), "节点分类": "/".join(parent_path) if parent_path else "", "节点维度": node.get("dimension", ""), "节点类型": node.get("type", ""), "人设全局占比": detail.get("probGlobal", 0), "父类下占比": detail.get("probToParent", 0), } def get_parent_category_id(node_id: str) -> Optional[str]: """通过属于边获取父分类节点ID""" belong_edges = persona_out_edges.get(node_id, {}).get("属于", []) for edge in belong_edges: target_id = edge.get("target", "") target_node = persona_nodes.get(target_id, {}) if target_node.get("type") == "分类": return target_id return None for edge_id, edge in edges.items(): if edge.get("type") == "匹配": source_id = edge.get("source", "") target_id = edge.get("target", "") # 只处理 帖子节点 → 人设节点 的匹配 if source_id.startswith("帖子:") and target_id.startswith("人设:"): match_score = edge.get("score", 0) persona_node = persona_nodes.get(target_id, {}) if persona_node: node_type = persona_node.get("type", "") # 获取匹配节点信息 match_node_info = get_node_info(target_id) if not match_node_info: continue # 确定所属分类节点 if node_type == "标签": # 标签:找父分类 category_id = get_parent_category_id(target_id) else: # 分类:就是自己 category_id = target_id # 获取所属分类信息和常见搭配 category_info = None if category_id: category_node = persona_nodes.get(category_id, {}) if category_node: category_detail = category_node.get("detail", {}) category_path = category_detail.get("parentPath", []) category_info = { "节点ID": category_id, "节点名称": category_node.get("name", ""), "节点分类": "/".join(category_path) if category_path else "", "节点维度": category_node.get("dimension", ""), "节点类型": "分类", "人设全局占比": category_detail.get("probGlobal", 0), "父类下占比": category_detail.get("probToParent", 0), "历史共现分类": [], } # 获取分类共现节点(按共现度降序排列) co_occur_edges = persona_out_edges.get(category_id, {}).get("分类共现", []) co_occur_edges_sorted = sorted(co_occur_edges, key=lambda x: x.get("score", 0), reverse=True) for co_edge in co_occur_edges_sorted[:5]: # 取前5个 co_target_id = co_edge.get("target", "") co_score = co_edge.get("score", 0) co_node = persona_nodes.get(co_target_id, {}) if co_node: co_detail = co_node.get("detail", {}) co_path = co_detail.get("parentPath", []) category_info["历史共现分类"].append({ "节点ID": co_target_id, "节点名称": co_node.get("name", ""), "节点分类": "/".join(co_path) if co_path else "", "节点维度": co_node.get("dimension", ""), "节点类型": "分类", "人设全局占比": co_detail.get("probGlobal", 0), "父类下占比": co_detail.get("probToParent", 0), "共现度": round(co_score, 4), }) match_map[source_id] = { "匹配节点": match_node_info, "匹配分数": round(match_score, 4), "所属分类": category_info, } # 5. 构建待分析节点列表(灵感点、目的点、关键点) analysis_nodes = [] for node_id, node in nodes.items(): if node.get("type") == "标签" and node.get("domain") == "帖子": dimension = node.get("dimension", "") if dimension in ["灵感点", "目的点", "关键点"]: # 人设匹配信息 match_info = match_map.get(node_id) analysis_nodes.append({ "节点ID": node_id, "节点名称": node.get("name", ""), "节点分类": node.get("category", ""), # 根分类:意图/实质/形式 "节点维度": dimension, "节点类型": node.get("type", ""), "节点描述": node.get("detail", {}).get("description", ""), "人设匹配": match_info, }) # 6. 构建可能的关系列表 relation_list = [] # 支撑关系:关键点 → 灵感点/目的点 for edge_id, edge in edges.items(): if edge.get("type") == "支撑": source_id = edge.get("source", "") target_id = edge.get("target", "") if source_id in keypoints: relation_list.append({ "来源节点": source_id, "目标节点": target_id, "关系类型": "支撑", }) # 关联关系:节点之间的关联(去重,只记录一次) seen_relations = set() for edge_id, edge in edges.items(): if edge.get("type") == "关联": source_id = edge.get("source", "") target_id = edge.get("target", "") # 用排序后的元组作为key去重 key = tuple(sorted([source_id, target_id])) if key not in seen_relations: seen_relations.add(key) relation_list.append({ "来源节点": source_id, "目标节点": target_id, "关系类型": "关联", }) return analysis_nodes, relation_list def prepare_analysis_data(post_graph: Dict, persona_graph: Dict) -> Dict: """ 准备完整的分析数据 Returns: { "帖子详情": {...}, "待分析节点列表": [...], "可能的关系列表": [...] } """ analysis_nodes, relation_list = extract_analysis_nodes(post_graph, persona_graph) return { "帖子详情": extract_post_detail(post_graph), "待分析节点列表": analysis_nodes, "可能的关系列表": relation_list, } # ===== 第二步:AI分析 ===== def build_context(data: Dict) -> Dict: """ 构造AI分析的上下文 Returns: { "all_points": [...], # 全部创意点(含详细信息) "candidates": [...], # 起点候选集(名称列表) "constants": [...], # 人设常量(名称列表) } """ nodes = data.get("待分析节点列表", []) # 全部创意点(含详细信息) all_points = [] for node in nodes: match_info = node.get("人设匹配") match_score = 0 category_global_ratio = 0 if match_info: match_score = match_info.get("匹配分数", 0) category_info = match_info.get("所属分类", {}) if category_info: category_global_ratio = category_info.get("人设全局占比", 0) all_points.append({ "名称": node["节点名称"], "分类": node.get("节点分类", ""), "维度": node.get("节点维度", ""), "描述": node.get("节点描述", ""), "人设匹配度": round(match_score, 2), "所属分类全局占比": round(category_global_ratio, 2), }) # 起点候选集(灵感点 + 目的点) candidates = [ node["节点名称"] for node in nodes if node["节点维度"] in ["灵感点", "目的点"] ] # 人设常量(匹配分数 > 0.8 且 全局占比 > 0.8) constants = [] for node in nodes: match_info = node.get("人设匹配") if match_info: match_score = match_info.get("匹配分数", 0) match_node = match_info.get("匹配节点", {}) global_ratio = match_node.get("人设全局占比", 0) if match_score > MATCH_SCORE_THRESHOLD and global_ratio > GLOBAL_RATIO_THRESHOLD: constants.append(node["节点名称"]) return { "all_points": all_points, "candidates": candidates, "constants": constants, } def format_prompt(context: Dict) -> str: """ 格式化为AI prompt """ all_points = context["all_points"] candidates = context["candidates"] constants = context["constants"] # 格式化全部创意点为易读文本 points_text = "" for p in all_points: points_text += f"- {p['名称']}\n" points_text += f" 维度: {p['维度']} | 分类: {p['分类']}\n" points_text += f" 描述: {p['描述']}\n" points_text += f" 人设匹配度: {p['人设匹配度']} | 所属分类全局占比: {p['所属分类全局占比']}\n" points_text += "\n" # 格式化起点候选集 candidates_text = "、".join(candidates) # 格式化人设常量 constants_text = "、".join(constants) if constants else "无" prompt = f"""# Role 你是小红书爆款内容的"逆向工程"专家。你的核心能力是透过内容的表象(视觉/形式),还原创作者最初的脑回路(动机/实质)。 # Task 我提供一组笔记的【创意标签】和一个【起点候选集】。 请推理出哪些选项是真正的**创意起点**。 # Input Data ## 全部创意点 {points_text} ## 起点候选集 {candidates_text} ## 来自人设的常量 {constants_text} # 推理约束 1. 实质推形式,而不是形式推实质,除非形式是一切创意的起点 2. 因推果而不是果推因 3. 无法被其他项或人设推理出的点,即为起点 # Output Format 请输出一个标准的 JSON 格式。 - Key: 候选集中的词。 - Value: 一个对象,包含: - `score`: 0.0 到 1.0 的浮点数(代表是起点的可能性)。 - `analysis`: 一句话推理""" return prompt # ===== 显示函数 ===== def display_context(context: Dict, post_id: str): """显示构造的上下文""" print(f"\n帖子: {post_id}") print(f"\n全部创意点 ({len(context['all_points'])} 个):") for p in context['all_points']: print(f" - {p['名称']} ({p['维度']}/{p['分类']}) 匹配度={p['人设匹配度']}, 分类占比={p['所属分类全局占比']}") print(f"\n起点候选集 ({len(context['candidates'])} 个):") print(f" {context['candidates']}") print(f"\n人设常量 ({len(context['constants'])} 个):") print(f" {context['constants']}") def display_result(result: Dict): """显示分析结果""" output = result.get("输出") if output: print("\n起点分析结果:") # 按score降序排列 sorted_items = sorted(output.items(), key=lambda x: x[1].get("score", 0), reverse=True) for name, info in sorted_items: score = info.get("score", 0) analysis = info.get("analysis", "") marker = "★" if score >= 0.7 else "○" print(f" {marker} {name}: {score:.2f}") print(f" {analysis}") else: print(f" 分析失败: {result.get('错误', 'N/A')}") # ===== 处理函数 ===== async def process_single_post( post_file: Path, persona_graph: Dict, config: PathConfig, current_time: str = None, log_url: str = None, force: bool = False, ) -> Dict: """ 处理单个帖子(数据准备 + AI分析) """ # 加载帖子图谱 post_graph = load_json(post_file) post_id = post_graph.get("meta", {}).get("postId", "unknown") # 检查是否已处理 if not force and is_already_processed(config, post_id): print(f"\n跳过帖子 {post_id}(已处理,使用 --force 强制重新分析)") # 返回已有结果 result_file = get_result_file(config, post_id) return load_json(result_file) print(f"\n{'=' * 60}") print(f"处理帖子: {post_id}") print("-" * 60) # 第一步:准备数据 data = prepare_analysis_data(post_graph, persona_graph) # 构造上下文 context = build_context(data) display_context(context, post_id) # 格式化prompt prompt = format_prompt(context) # 第二步:调用AI print("\n调用AI分析中...") with custom_span( name=f"创作起点分析 - {post_id}", data={ "帖子id": post_id, "候选数": len(context["candidates"]), "模型": MODEL_NAME } ): result = await Runner.run(agent, input=prompt) output_text = result.final_output # 解析JSON try: if "```json" in output_text: json_start = output_text.find("```json") + 7 json_end = output_text.find("```", json_start) json_str = output_text[json_start:json_end].strip() elif "{" in output_text and "}" in output_text: json_start = output_text.find("{") json_end = output_text.rfind("}") + 1 json_str = output_text[json_start:json_end] else: json_str = output_text analysis_result = json.loads(json_str) result_data = { "帖子id": post_id, "模型": MODEL_NAME, "输入": context, "输出": analysis_result } except Exception as e: result_data = { "帖子id": post_id, "模型": MODEL_NAME, "输入": context, "输出": None, "错误": str(e), "原始输出": output_text } # 显示结果 display_result(result_data) # 保存结果 output_dir = config.intermediate_dir / "origin_analysis_result" output_dir.mkdir(parents=True, exist_ok=True) output_with_meta = { "元数据": { "current_time": current_time, "log_url": log_url, "model": MODEL_NAME }, **result_data } output_file = output_dir / f"{post_id}_起点分析.json" with open(output_file, "w", encoding="utf-8") as f: json.dump(output_with_meta, f, ensure_ascii=False, indent=2) print(f"\n已保存: {output_file.name}") return result_data # ===== 主函数 ===== async def main( post_id: str = None, all_posts: bool = False, force: bool = False, ): """ 主函数 Args: post_id: 帖子ID,可选 all_posts: 是否处理所有帖子 force: 强制重新分析已处理的帖子 """ # 设置 trace current_time, log_url = set_trace() config = PathConfig() print(f"账号: {config.account_name}") print(f"使用模型: {MODEL_NAME}") print(f"Trace URL: {log_url}") # 加载人设图谱 persona_graph_file = config.intermediate_dir / "人设图谱.json" if not persona_graph_file.exists(): print(f"错误: 人设图谱文件不存在: {persona_graph_file}") return persona_graph = load_json(persona_graph_file) print(f"人设图谱节点数: {len(persona_graph.get('nodes', {}))}") # 获取帖子图谱文件 post_graph_files = get_post_graph_files(config) if not post_graph_files: print("错误: 没有找到帖子图谱文件") return # 确定要处理的帖子 if post_id: target_file = next( (f for f in post_graph_files if post_id in f.name), None ) if not target_file: print(f"错误: 未找到帖子 {post_id}") return files_to_process = [target_file] elif all_posts: files_to_process = post_graph_files else: files_to_process = [post_graph_files[0]] print(f"待处理帖子数: {len(files_to_process)}") # 处理 with trace("创作起点分析"): results = [] skipped = 0 for i, post_file in enumerate(files_to_process, 1): print(f"\n{'#' * 60}") print(f"# 处理帖子 {i}/{len(files_to_process)}") print(f"{'#' * 60}") result = await process_single_post( post_file=post_file, persona_graph=persona_graph, config=config, current_time=current_time, log_url=log_url, force=force, ) # 检查是否是跳过的 if not force and "元数据" in result: skipped += 1 results.append(result) # 汇总 print(f"\n{'#' * 60}") print(f"# 完成! 共处理 {len(results)} 个帖子 (跳过 {skipped} 个已处理)") print(f"{'#' * 60}") print(f"Trace: {log_url}") print("\n汇总(score >= 0.7 的起点):") for result in results: post_id = result.get("帖子id") output = result.get("输出") if output: origins = [f"{k}({v['score']:.2f})" for k, v in output.items() if v.get("score", 0) >= 0.7] print(f" {post_id}: {', '.join(origins) if origins else '无高置信起点'}") else: print(f" {post_id}: 分析失败") if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="创作起点分析") parser.add_argument("--post-id", type=str, help="帖子ID") parser.add_argument("--all-posts", action="store_true", help="处理所有帖子") parser.add_argument("--force", action="store_true", help="强制重新分析已处理的帖子") args = parser.parse_args() asyncio.run(main( post_id=args.post_id, all_posts=args.all_posts, force=args.force, ))