|
|
@@ -0,0 +1,687 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+创作起点分析
|
|
|
+
|
|
|
+整合数据准备 + AI分析两步流程:
|
|
|
+1. 根据帖子图谱 + 人设图谱,准备待分析数据
|
|
|
+2. 调用AI分析起点
|
|
|
+
|
|
|
+输入:帖子图谱 + 人设图谱
|
|
|
+输出:起点分析结果
|
|
|
+"""
|
|
|
+
|
|
|
+import asyncio
|
|
|
+import json
|
|
|
+from pathlib import Path
|
|
|
+from typing import Dict, List, Optional
|
|
|
+import sys
|
|
|
+
|
|
|
+# 添加项目根目录到路径
|
|
|
+project_root = Path(__file__).parent.parent.parent
|
|
|
+sys.path.insert(0, str(project_root))
|
|
|
+
|
|
|
+from agents import Agent, Runner, ModelSettings, trace
|
|
|
+from agents.tracing.create import custom_span
|
|
|
+from lib.client import get_model
|
|
|
+from lib.my_trace import set_trace_smith as set_trace
|
|
|
+from script.data_processing.path_config import PathConfig
|
|
|
+
|
|
|
+
|
|
|
+# ===== 配置 =====
|
|
|
+MODEL_NAME = "google/gemini-3-pro-preview"
|
|
|
+# MODEL_NAME = "anthropic/claude-sonnet-4"
|
|
|
+
|
|
|
+MATCH_SCORE_THRESHOLD = 0.8 # 匹配分数阈值
|
|
|
+GLOBAL_RATIO_THRESHOLD = 0.8 # 全局占比阈值
|
|
|
+
|
|
|
+agent = Agent(
|
|
|
+ name="Creation Origin Analyzer",
|
|
|
+ model=get_model(MODEL_NAME),
|
|
|
+ model_settings=ModelSettings(
|
|
|
+ temperature=0.0,
|
|
|
+ max_tokens=8192,
|
|
|
+ ),
|
|
|
+ tools=[],
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+# ===== 数据加载 =====
|
|
|
+
|
|
|
+def load_json(file_path: Path) -> Dict:
|
|
|
+ """加载JSON文件"""
|
|
|
+ with open(file_path, "r", encoding="utf-8") as f:
|
|
|
+ return json.load(f)
|
|
|
+
|
|
|
+
|
|
|
+def get_post_graph_files(config: PathConfig) -> List[Path]:
|
|
|
+ """获取所有帖子图谱文件"""
|
|
|
+ post_graph_dir = config.intermediate_dir / "post_graph"
|
|
|
+ return sorted(post_graph_dir.glob("*_帖子图谱.json"))
|
|
|
+
|
|
|
+
|
|
|
+def get_result_file(config: PathConfig, post_id: str) -> Path:
|
|
|
+ """获取分析结果文件路径"""
|
|
|
+ return config.intermediate_dir / "origin_analysis_result" / f"{post_id}_起点分析.json"
|
|
|
+
|
|
|
+
|
|
|
+def is_already_processed(config: PathConfig, post_id: str) -> bool:
|
|
|
+ """检查帖子是否已处理过"""
|
|
|
+ result_file = get_result_file(config, post_id)
|
|
|
+ return result_file.exists()
|
|
|
+
|
|
|
+
|
|
|
+# ===== 第一步:数据准备 =====
|
|
|
+
|
|
|
+def extract_post_detail(post_graph: Dict) -> Dict:
|
|
|
+ """提取帖子详情(保留原始字段名)"""
|
|
|
+ meta = post_graph.get("meta", {})
|
|
|
+ post_detail = meta.get("postDetail", {})
|
|
|
+
|
|
|
+ return {
|
|
|
+ "postId": meta.get("postId", ""),
|
|
|
+ "postTitle": meta.get("postTitle", ""),
|
|
|
+ "body_text": post_detail.get("body_text", ""),
|
|
|
+ "images": post_detail.get("images", []),
|
|
|
+ "video": post_detail.get("video"),
|
|
|
+ "publish_time": post_detail.get("publish_time", ""),
|
|
|
+ "like_count": post_detail.get("like_count", 0),
|
|
|
+ "collect_count": post_detail.get("collect_count", 0),
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def extract_analysis_nodes(post_graph: Dict, persona_graph: Dict) -> tuple:
|
|
|
+ """
|
|
|
+ 提取待分析节点列表
|
|
|
+
|
|
|
+ 待分析节点 = 灵感点 + 目的点 + 关键点
|
|
|
+ """
|
|
|
+ nodes = post_graph.get("nodes", {})
|
|
|
+ edges = post_graph.get("edges", {})
|
|
|
+ persona_nodes = persona_graph.get("nodes", {})
|
|
|
+ persona_index = persona_graph.get("index", {})
|
|
|
+
|
|
|
+ # 1. 收集关键点信息(用于支撑信息)
|
|
|
+ keypoints = {}
|
|
|
+ for node_id, node in nodes.items():
|
|
|
+ if node.get("type") == "标签" and node.get("dimension") == "关键点":
|
|
|
+ keypoints[node_id] = {
|
|
|
+ "名称": node.get("name", ""),
|
|
|
+ "描述": node.get("detail", {}).get("description", ""),
|
|
|
+ }
|
|
|
+
|
|
|
+ # 2. 分析支撑关系:关键点 → 灵感点/目的点
|
|
|
+ support_map = {} # {target_node_id: [支撑的关键点信息]}
|
|
|
+ for edge_id, edge in edges.items():
|
|
|
+ if edge.get("type") == "支撑":
|
|
|
+ source_id = edge.get("source", "")
|
|
|
+ target_id = edge.get("target", "")
|
|
|
+ if source_id in keypoints:
|
|
|
+ if target_id not in support_map:
|
|
|
+ support_map[target_id] = []
|
|
|
+ support_map[target_id].append(keypoints[source_id])
|
|
|
+
|
|
|
+ # 3. 分析关联关系
|
|
|
+ relation_map = {} # {node_id: [关联的节点名称]}
|
|
|
+ for edge_id, edge in edges.items():
|
|
|
+ if edge.get("type") == "关联":
|
|
|
+ source_id = edge.get("source", "")
|
|
|
+ target_id = edge.get("target", "")
|
|
|
+ source_name = nodes.get(source_id, {}).get("name", "")
|
|
|
+ target_name = nodes.get(target_id, {}).get("name", "")
|
|
|
+
|
|
|
+ # 双向记录
|
|
|
+ if source_id not in relation_map:
|
|
|
+ relation_map[source_id] = []
|
|
|
+ relation_map[source_id].append(target_name)
|
|
|
+
|
|
|
+ if target_id not in relation_map:
|
|
|
+ relation_map[target_id] = []
|
|
|
+ relation_map[target_id].append(source_name)
|
|
|
+
|
|
|
+ # 4. 分析人设匹配
|
|
|
+ match_map = {} # {node_id: 匹配信息}
|
|
|
+ persona_out_edges = persona_index.get("outEdges", {})
|
|
|
+
|
|
|
+ def get_node_info(node_id: str) -> Optional[Dict]:
|
|
|
+ """获取人设节点的标准信息"""
|
|
|
+ node = persona_nodes.get(node_id, {})
|
|
|
+ if not node:
|
|
|
+ return None
|
|
|
+ detail = node.get("detail", {})
|
|
|
+ parent_path = detail.get("parentPath", [])
|
|
|
+ return {
|
|
|
+ "节点ID": node_id,
|
|
|
+ "节点名称": node.get("name", ""),
|
|
|
+ "节点分类": "/".join(parent_path) if parent_path else "",
|
|
|
+ "节点维度": node.get("dimension", ""),
|
|
|
+ "节点类型": node.get("type", ""),
|
|
|
+ "人设全局占比": detail.get("probGlobal", 0),
|
|
|
+ "父类下占比": detail.get("probToParent", 0),
|
|
|
+ }
|
|
|
+
|
|
|
+ def get_parent_category_id(node_id: str) -> Optional[str]:
|
|
|
+ """通过属于边获取父分类节点ID"""
|
|
|
+ belong_edges = persona_out_edges.get(node_id, {}).get("属于", [])
|
|
|
+ for edge in belong_edges:
|
|
|
+ target_id = edge.get("target", "")
|
|
|
+ target_node = persona_nodes.get(target_id, {})
|
|
|
+ if target_node.get("type") == "分类":
|
|
|
+ return target_id
|
|
|
+ return None
|
|
|
+
|
|
|
+ for edge_id, edge in edges.items():
|
|
|
+ if edge.get("type") == "匹配":
|
|
|
+ source_id = edge.get("source", "")
|
|
|
+ target_id = edge.get("target", "")
|
|
|
+
|
|
|
+ # 只处理 帖子节点 → 人设节点 的匹配
|
|
|
+ if source_id.startswith("帖子:") and target_id.startswith("人设:"):
|
|
|
+ match_score = edge.get("score", 0)
|
|
|
+ persona_node = persona_nodes.get(target_id, {})
|
|
|
+
|
|
|
+ if persona_node:
|
|
|
+ node_type = persona_node.get("type", "")
|
|
|
+
|
|
|
+ # 获取匹配节点信息
|
|
|
+ match_node_info = get_node_info(target_id)
|
|
|
+ if not match_node_info:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 确定所属分类节点
|
|
|
+ if node_type == "标签":
|
|
|
+ # 标签:找父分类
|
|
|
+ category_id = get_parent_category_id(target_id)
|
|
|
+ else:
|
|
|
+ # 分类:就是自己
|
|
|
+ category_id = target_id
|
|
|
+
|
|
|
+ # 获取所属分类信息和常见搭配
|
|
|
+ category_info = None
|
|
|
+ if category_id:
|
|
|
+ category_node = persona_nodes.get(category_id, {})
|
|
|
+ if category_node:
|
|
|
+ category_detail = category_node.get("detail", {})
|
|
|
+ category_path = category_detail.get("parentPath", [])
|
|
|
+ category_info = {
|
|
|
+ "节点ID": category_id,
|
|
|
+ "节点名称": category_node.get("name", ""),
|
|
|
+ "节点分类": "/".join(category_path) if category_path else "",
|
|
|
+ "节点维度": category_node.get("dimension", ""),
|
|
|
+ "节点类型": "分类",
|
|
|
+ "人设全局占比": category_detail.get("probGlobal", 0),
|
|
|
+ "父类下占比": category_detail.get("probToParent", 0),
|
|
|
+ "历史共现分类": [],
|
|
|
+ }
|
|
|
+
|
|
|
+ # 获取分类共现节点(按共现度降序排列)
|
|
|
+ co_occur_edges = persona_out_edges.get(category_id, {}).get("分类共现", [])
|
|
|
+ co_occur_edges_sorted = sorted(co_occur_edges, key=lambda x: x.get("score", 0), reverse=True)
|
|
|
+ for co_edge in co_occur_edges_sorted[:5]: # 取前5个
|
|
|
+ co_target_id = co_edge.get("target", "")
|
|
|
+ co_score = co_edge.get("score", 0)
|
|
|
+ co_node = persona_nodes.get(co_target_id, {})
|
|
|
+ if co_node:
|
|
|
+ co_detail = co_node.get("detail", {})
|
|
|
+ co_path = co_detail.get("parentPath", [])
|
|
|
+ category_info["历史共现分类"].append({
|
|
|
+ "节点ID": co_target_id,
|
|
|
+ "节点名称": co_node.get("name", ""),
|
|
|
+ "节点分类": "/".join(co_path) if co_path else "",
|
|
|
+ "节点维度": co_node.get("dimension", ""),
|
|
|
+ "节点类型": "分类",
|
|
|
+ "人设全局占比": co_detail.get("probGlobal", 0),
|
|
|
+ "父类下占比": co_detail.get("probToParent", 0),
|
|
|
+ "共现度": round(co_score, 4),
|
|
|
+ })
|
|
|
+
|
|
|
+ match_map[source_id] = {
|
|
|
+ "匹配节点": match_node_info,
|
|
|
+ "匹配分数": round(match_score, 4),
|
|
|
+ "所属分类": category_info,
|
|
|
+ }
|
|
|
+
|
|
|
+ # 5. 构建待分析节点列表(灵感点、目的点、关键点)
|
|
|
+ analysis_nodes = []
|
|
|
+ for node_id, node in nodes.items():
|
|
|
+ if node.get("type") == "标签" and node.get("domain") == "帖子":
|
|
|
+ dimension = node.get("dimension", "")
|
|
|
+ if dimension in ["灵感点", "目的点", "关键点"]:
|
|
|
+ # 人设匹配信息
|
|
|
+ match_info = match_map.get(node_id)
|
|
|
+
|
|
|
+ analysis_nodes.append({
|
|
|
+ "节点ID": node_id,
|
|
|
+ "节点名称": node.get("name", ""),
|
|
|
+ "节点分类": node.get("category", ""), # 根分类:意图/实质/形式
|
|
|
+ "节点维度": dimension,
|
|
|
+ "节点类型": node.get("type", ""),
|
|
|
+ "节点描述": node.get("detail", {}).get("description", ""),
|
|
|
+ "人设匹配": match_info,
|
|
|
+ })
|
|
|
+
|
|
|
+ # 6. 构建可能的关系列表
|
|
|
+ relation_list = []
|
|
|
+
|
|
|
+ # 支撑关系:关键点 → 灵感点/目的点
|
|
|
+ for edge_id, edge in edges.items():
|
|
|
+ if edge.get("type") == "支撑":
|
|
|
+ source_id = edge.get("source", "")
|
|
|
+ target_id = edge.get("target", "")
|
|
|
+ if source_id in keypoints:
|
|
|
+ relation_list.append({
|
|
|
+ "来源节点": source_id,
|
|
|
+ "目标节点": target_id,
|
|
|
+ "关系类型": "支撑",
|
|
|
+ })
|
|
|
+
|
|
|
+ # 关联关系:节点之间的关联(去重,只记录一次)
|
|
|
+ seen_relations = set()
|
|
|
+ for edge_id, edge in edges.items():
|
|
|
+ if edge.get("type") == "关联":
|
|
|
+ source_id = edge.get("source", "")
|
|
|
+ target_id = edge.get("target", "")
|
|
|
+ # 用排序后的元组作为key去重
|
|
|
+ key = tuple(sorted([source_id, target_id]))
|
|
|
+ if key not in seen_relations:
|
|
|
+ seen_relations.add(key)
|
|
|
+ relation_list.append({
|
|
|
+ "来源节点": source_id,
|
|
|
+ "目标节点": target_id,
|
|
|
+ "关系类型": "关联",
|
|
|
+ })
|
|
|
+
|
|
|
+ return analysis_nodes, relation_list
|
|
|
+
|
|
|
+
|
|
|
+def prepare_analysis_data(post_graph: Dict, persona_graph: Dict) -> Dict:
|
|
|
+ """
|
|
|
+ 准备完整的分析数据
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ {
|
|
|
+ "帖子详情": {...},
|
|
|
+ "待分析节点列表": [...],
|
|
|
+ "可能的关系列表": [...]
|
|
|
+ }
|
|
|
+ """
|
|
|
+ analysis_nodes, relation_list = extract_analysis_nodes(post_graph, persona_graph)
|
|
|
+ return {
|
|
|
+ "帖子详情": extract_post_detail(post_graph),
|
|
|
+ "待分析节点列表": analysis_nodes,
|
|
|
+ "可能的关系列表": relation_list,
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+# ===== 第二步:AI分析 =====
|
|
|
+
|
|
|
+def build_context(data: Dict) -> Dict:
|
|
|
+ """
|
|
|
+ 构造AI分析的上下文
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ {
|
|
|
+ "all_points": [...], # 全部创意点(含详细信息)
|
|
|
+ "candidates": [...], # 起点候选集(名称列表)
|
|
|
+ "constants": [...], # 人设常量(名称列表)
|
|
|
+ }
|
|
|
+ """
|
|
|
+ nodes = data.get("待分析节点列表", [])
|
|
|
+
|
|
|
+ # 全部创意点(含详细信息)
|
|
|
+ all_points = []
|
|
|
+ for node in nodes:
|
|
|
+ match_info = node.get("人设匹配")
|
|
|
+ match_score = 0
|
|
|
+ category_global_ratio = 0
|
|
|
+ if match_info:
|
|
|
+ match_score = match_info.get("匹配分数", 0)
|
|
|
+ category_info = match_info.get("所属分类", {})
|
|
|
+ if category_info:
|
|
|
+ category_global_ratio = category_info.get("人设全局占比", 0)
|
|
|
+
|
|
|
+ all_points.append({
|
|
|
+ "名称": node["节点名称"],
|
|
|
+ "分类": node.get("节点分类", ""),
|
|
|
+ "维度": node.get("节点维度", ""),
|
|
|
+ "描述": node.get("节点描述", ""),
|
|
|
+ "人设匹配度": round(match_score, 2),
|
|
|
+ "所属分类全局占比": round(category_global_ratio, 2),
|
|
|
+ })
|
|
|
+
|
|
|
+ # 起点候选集(灵感点 + 目的点)
|
|
|
+ candidates = [
|
|
|
+ node["节点名称"]
|
|
|
+ for node in nodes
|
|
|
+ if node["节点维度"] in ["灵感点", "目的点"]
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 人设常量(匹配分数 > 0.8 且 全局占比 > 0.8)
|
|
|
+ constants = []
|
|
|
+ for node in nodes:
|
|
|
+ match_info = node.get("人设匹配")
|
|
|
+ if match_info:
|
|
|
+ match_score = match_info.get("匹配分数", 0)
|
|
|
+ match_node = match_info.get("匹配节点", {})
|
|
|
+ global_ratio = match_node.get("人设全局占比", 0)
|
|
|
+
|
|
|
+ if match_score > MATCH_SCORE_THRESHOLD and global_ratio > GLOBAL_RATIO_THRESHOLD:
|
|
|
+ constants.append(node["节点名称"])
|
|
|
+
|
|
|
+ return {
|
|
|
+ "all_points": all_points,
|
|
|
+ "candidates": candidates,
|
|
|
+ "constants": constants,
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def format_prompt(context: Dict) -> str:
|
|
|
+ """
|
|
|
+ 格式化为AI prompt
|
|
|
+ """
|
|
|
+ all_points = context["all_points"]
|
|
|
+ candidates = context["candidates"]
|
|
|
+ constants = context["constants"]
|
|
|
+
|
|
|
+ # 格式化全部创意点为易读文本
|
|
|
+ points_text = ""
|
|
|
+ for p in all_points:
|
|
|
+ points_text += f"- {p['名称']}\n"
|
|
|
+ points_text += f" 维度: {p['维度']} | 分类: {p['分类']}\n"
|
|
|
+ points_text += f" 描述: {p['描述']}\n"
|
|
|
+ points_text += f" 人设匹配度: {p['人设匹配度']} | 所属分类全局占比: {p['所属分类全局占比']}\n"
|
|
|
+ points_text += "\n"
|
|
|
+
|
|
|
+ # 格式化起点候选集
|
|
|
+ candidates_text = "、".join(candidates)
|
|
|
+
|
|
|
+ # 格式化人设常量
|
|
|
+ constants_text = "、".join(constants) if constants else "无"
|
|
|
+
|
|
|
+ prompt = f"""# Role
|
|
|
+你是小红书爆款内容的"逆向工程"专家。你的核心能力是透过内容的表象(视觉/形式),还原创作者最初的脑回路(动机/实质)。
|
|
|
+
|
|
|
+# Task
|
|
|
+我提供一组笔记的【创意标签】和一个【起点候选集】。
|
|
|
+请推理出哪些选项是真正的**创意起点**。
|
|
|
+
|
|
|
+
|
|
|
+# Input Data
|
|
|
+
|
|
|
+## 全部创意点
|
|
|
+
|
|
|
+{points_text}
|
|
|
+
|
|
|
+## 起点候选集
|
|
|
+{candidates_text}
|
|
|
+
|
|
|
+## 来自人设的常量
|
|
|
+{constants_text}
|
|
|
+
|
|
|
+
|
|
|
+# 推理约束
|
|
|
+
|
|
|
+1. 实质推形式,而不是形式推实质,除非形式是一切创意的起点
|
|
|
+2. 因推果而不是果推因
|
|
|
+3. 无法被其他项或人设推理出的点,即为起点
|
|
|
+
|
|
|
+# Output Format
|
|
|
+
|
|
|
+请输出一个标准的 JSON 格式。
|
|
|
+- Key: 候选集中的词。
|
|
|
+- Value: 一个对象,包含:
|
|
|
+ - `score`: 0.0 到 1.0 的浮点数(代表是起点的可能性)。
|
|
|
+ - `analysis`: 一句话推理"""
|
|
|
+
|
|
|
+ return prompt
|
|
|
+
|
|
|
+
|
|
|
+# ===== 显示函数 =====
|
|
|
+
|
|
|
+def display_context(context: Dict, post_id: str):
|
|
|
+ """显示构造的上下文"""
|
|
|
+ print(f"\n帖子: {post_id}")
|
|
|
+ print(f"\n全部创意点 ({len(context['all_points'])} 个):")
|
|
|
+ for p in context['all_points']:
|
|
|
+ print(f" - {p['名称']} ({p['维度']}/{p['分类']}) 匹配度={p['人设匹配度']}, 分类占比={p['所属分类全局占比']}")
|
|
|
+ print(f"\n起点候选集 ({len(context['candidates'])} 个):")
|
|
|
+ print(f" {context['candidates']}")
|
|
|
+ print(f"\n人设常量 ({len(context['constants'])} 个):")
|
|
|
+ print(f" {context['constants']}")
|
|
|
+
|
|
|
+
|
|
|
+def display_result(result: Dict):
|
|
|
+ """显示分析结果"""
|
|
|
+ output = result.get("输出")
|
|
|
+ if output:
|
|
|
+ print("\n起点分析结果:")
|
|
|
+ # 按score降序排列
|
|
|
+ sorted_items = sorted(output.items(), key=lambda x: x[1].get("score", 0), reverse=True)
|
|
|
+ for name, info in sorted_items:
|
|
|
+ score = info.get("score", 0)
|
|
|
+ analysis = info.get("analysis", "")
|
|
|
+ marker = "★" if score >= 0.7 else "○"
|
|
|
+ print(f" {marker} {name}: {score:.2f}")
|
|
|
+ print(f" {analysis}")
|
|
|
+ else:
|
|
|
+ print(f" 分析失败: {result.get('错误', 'N/A')}")
|
|
|
+
|
|
|
+
|
|
|
+# ===== 处理函数 =====
|
|
|
+
|
|
|
+async def process_single_post(
|
|
|
+ post_file: Path,
|
|
|
+ persona_graph: Dict,
|
|
|
+ config: PathConfig,
|
|
|
+ current_time: str = None,
|
|
|
+ log_url: str = None,
|
|
|
+ force: bool = False,
|
|
|
+) -> Dict:
|
|
|
+ """
|
|
|
+ 处理单个帖子(数据准备 + AI分析)
|
|
|
+ """
|
|
|
+ # 加载帖子图谱
|
|
|
+ post_graph = load_json(post_file)
|
|
|
+ post_id = post_graph.get("meta", {}).get("postId", "unknown")
|
|
|
+
|
|
|
+ # 检查是否已处理
|
|
|
+ if not force and is_already_processed(config, post_id):
|
|
|
+ print(f"\n跳过帖子 {post_id}(已处理,使用 --force 强制重新分析)")
|
|
|
+ # 返回已有结果
|
|
|
+ result_file = get_result_file(config, post_id)
|
|
|
+ return load_json(result_file)
|
|
|
+
|
|
|
+ print(f"\n{'=' * 60}")
|
|
|
+ print(f"处理帖子: {post_id}")
|
|
|
+ print("-" * 60)
|
|
|
+
|
|
|
+ # 第一步:准备数据
|
|
|
+ data = prepare_analysis_data(post_graph, persona_graph)
|
|
|
+
|
|
|
+ # 构造上下文
|
|
|
+ context = build_context(data)
|
|
|
+ display_context(context, post_id)
|
|
|
+
|
|
|
+ # 格式化prompt
|
|
|
+ prompt = format_prompt(context)
|
|
|
+
|
|
|
+ # 第二步:调用AI
|
|
|
+ print("\n调用AI分析中...")
|
|
|
+ with custom_span(
|
|
|
+ name=f"创作起点分析 - {post_id}",
|
|
|
+ data={
|
|
|
+ "帖子id": post_id,
|
|
|
+ "候选数": len(context["candidates"]),
|
|
|
+ "模型": MODEL_NAME
|
|
|
+ }
|
|
|
+ ):
|
|
|
+ result = await Runner.run(agent, input=prompt)
|
|
|
+ output_text = result.final_output
|
|
|
+
|
|
|
+ # 解析JSON
|
|
|
+ try:
|
|
|
+ if "```json" in output_text:
|
|
|
+ json_start = output_text.find("```json") + 7
|
|
|
+ json_end = output_text.find("```", json_start)
|
|
|
+ json_str = output_text[json_start:json_end].strip()
|
|
|
+ elif "{" in output_text and "}" in output_text:
|
|
|
+ json_start = output_text.find("{")
|
|
|
+ json_end = output_text.rfind("}") + 1
|
|
|
+ json_str = output_text[json_start:json_end]
|
|
|
+ else:
|
|
|
+ json_str = output_text
|
|
|
+
|
|
|
+ analysis_result = json.loads(json_str)
|
|
|
+
|
|
|
+ result_data = {
|
|
|
+ "帖子id": post_id,
|
|
|
+ "模型": MODEL_NAME,
|
|
|
+ "输入": context,
|
|
|
+ "输出": analysis_result
|
|
|
+ }
|
|
|
+ except Exception as e:
|
|
|
+ result_data = {
|
|
|
+ "帖子id": post_id,
|
|
|
+ "模型": MODEL_NAME,
|
|
|
+ "输入": context,
|
|
|
+ "输出": None,
|
|
|
+ "错误": str(e),
|
|
|
+ "原始输出": output_text
|
|
|
+ }
|
|
|
+
|
|
|
+ # 显示结果
|
|
|
+ display_result(result_data)
|
|
|
+
|
|
|
+ # 保存结果
|
|
|
+ output_dir = config.intermediate_dir / "origin_analysis_result"
|
|
|
+ output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+
|
|
|
+ output_with_meta = {
|
|
|
+ "元数据": {
|
|
|
+ "current_time": current_time,
|
|
|
+ "log_url": log_url,
|
|
|
+ "model": MODEL_NAME
|
|
|
+ },
|
|
|
+ **result_data
|
|
|
+ }
|
|
|
+
|
|
|
+ output_file = output_dir / f"{post_id}_起点分析.json"
|
|
|
+ with open(output_file, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(output_with_meta, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ print(f"\n已保存: {output_file.name}")
|
|
|
+
|
|
|
+ return result_data
|
|
|
+
|
|
|
+
|
|
|
+# ===== 主函数 =====
|
|
|
+
|
|
|
+async def main(
|
|
|
+ post_id: str = None,
|
|
|
+ all_posts: bool = False,
|
|
|
+ force: bool = False,
|
|
|
+):
|
|
|
+ """
|
|
|
+ 主函数
|
|
|
+
|
|
|
+ Args:
|
|
|
+ post_id: 帖子ID,可选
|
|
|
+ all_posts: 是否处理所有帖子
|
|
|
+ force: 强制重新分析已处理的帖子
|
|
|
+ """
|
|
|
+ # 设置 trace
|
|
|
+ current_time, log_url = set_trace()
|
|
|
+
|
|
|
+ config = PathConfig()
|
|
|
+
|
|
|
+ print(f"账号: {config.account_name}")
|
|
|
+ print(f"使用模型: {MODEL_NAME}")
|
|
|
+ print(f"Trace URL: {log_url}")
|
|
|
+
|
|
|
+ # 加载人设图谱
|
|
|
+ persona_graph_file = config.intermediate_dir / "人设图谱.json"
|
|
|
+ if not persona_graph_file.exists():
|
|
|
+ print(f"错误: 人设图谱文件不存在: {persona_graph_file}")
|
|
|
+ return
|
|
|
+
|
|
|
+ persona_graph = load_json(persona_graph_file)
|
|
|
+ print(f"人设图谱节点数: {len(persona_graph.get('nodes', {}))}")
|
|
|
+
|
|
|
+ # 获取帖子图谱文件
|
|
|
+ post_graph_files = get_post_graph_files(config)
|
|
|
+ if not post_graph_files:
|
|
|
+ print("错误: 没有找到帖子图谱文件")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 确定要处理的帖子
|
|
|
+ if post_id:
|
|
|
+ target_file = next(
|
|
|
+ (f for f in post_graph_files if post_id in f.name),
|
|
|
+ None
|
|
|
+ )
|
|
|
+ if not target_file:
|
|
|
+ print(f"错误: 未找到帖子 {post_id}")
|
|
|
+ return
|
|
|
+ files_to_process = [target_file]
|
|
|
+ elif all_posts:
|
|
|
+ files_to_process = post_graph_files
|
|
|
+ else:
|
|
|
+ files_to_process = [post_graph_files[0]]
|
|
|
+
|
|
|
+ print(f"待处理帖子数: {len(files_to_process)}")
|
|
|
+
|
|
|
+ # 处理
|
|
|
+ with trace("创作起点分析"):
|
|
|
+ results = []
|
|
|
+ skipped = 0
|
|
|
+ for i, post_file in enumerate(files_to_process, 1):
|
|
|
+ print(f"\n{'#' * 60}")
|
|
|
+ print(f"# 处理帖子 {i}/{len(files_to_process)}")
|
|
|
+ print(f"{'#' * 60}")
|
|
|
+
|
|
|
+ result = await process_single_post(
|
|
|
+ post_file=post_file,
|
|
|
+ persona_graph=persona_graph,
|
|
|
+ config=config,
|
|
|
+ current_time=current_time,
|
|
|
+ log_url=log_url,
|
|
|
+ force=force,
|
|
|
+ )
|
|
|
+
|
|
|
+ # 检查是否是跳过的
|
|
|
+ if not force and "元数据" in result:
|
|
|
+ skipped += 1
|
|
|
+
|
|
|
+ results.append(result)
|
|
|
+
|
|
|
+ # 汇总
|
|
|
+ print(f"\n{'#' * 60}")
|
|
|
+ print(f"# 完成! 共处理 {len(results)} 个帖子 (跳过 {skipped} 个已处理)")
|
|
|
+ print(f"{'#' * 60}")
|
|
|
+ print(f"Trace: {log_url}")
|
|
|
+
|
|
|
+ print("\n汇总(score >= 0.7 的起点):")
|
|
|
+ for result in results:
|
|
|
+ post_id = result.get("帖子id")
|
|
|
+ output = result.get("输出")
|
|
|
+ if output:
|
|
|
+ origins = [f"{k}({v['score']:.2f})" for k, v in output.items() if v.get("score", 0) >= 0.7]
|
|
|
+ print(f" {post_id}: {', '.join(origins) if origins else '无高置信起点'}")
|
|
|
+ else:
|
|
|
+ print(f" {post_id}: 分析失败")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ import argparse
|
|
|
+
|
|
|
+ parser = argparse.ArgumentParser(description="创作起点分析")
|
|
|
+ parser.add_argument("--post-id", type=str, help="帖子ID")
|
|
|
+ parser.add_argument("--all-posts", action="store_true", help="处理所有帖子")
|
|
|
+ parser.add_argument("--force", action="store_true", help="强制重新分析已处理的帖子")
|
|
|
+ args = parser.parse_args()
|
|
|
+
|
|
|
+ asyncio.run(main(
|
|
|
+ post_id=args.post_id,
|
|
|
+ all_posts=args.all_posts,
|
|
|
+ force=args.force,
|
|
|
+ ))
|