|
|
@@ -0,0 +1,969 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+创作模式分析(完整流程)
|
|
|
+
|
|
|
+整合三步流程:
|
|
|
+1. 数据准备:根据帖子图谱 + 人设图谱,提取待分析数据
|
|
|
+2. 起点分析:AI分析创意起点
|
|
|
+3. 模式推导:基于共现关系的迭代推导
|
|
|
+
|
|
|
+输入:帖子图谱 + 人设图谱
|
|
|
+输出:完整的创作模式分析结果
|
|
|
+"""
|
|
|
+
|
|
|
+import asyncio
|
|
|
+import json
|
|
|
+from pathlib import Path
|
|
|
+from typing import Dict, List, Optional, Set
|
|
|
+import sys
|
|
|
+
|
|
|
+# 添加项目根目录到路径
|
|
|
+project_root = Path(__file__).parent.parent.parent
|
|
|
+sys.path.insert(0, str(project_root))
|
|
|
+
|
|
|
+from lib.llm_cached import analyze, LLMConfig, AnalyzeResult
|
|
|
+from lib.my_trace import set_trace_smith as set_trace
|
|
|
+from script.data_processing.path_config import PathConfig
|
|
|
+
|
|
|
+
|
|
|
+# ===== 配置 =====
|
|
|
+TASK_NAME = "creation_pattern" # 缓存任务名称
|
|
|
+
|
|
|
+MATCH_SCORE_THRESHOLD = 0.8 # 匹配分数阈值
|
|
|
+GLOBAL_RATIO_THRESHOLD = 0.8 # 全局占比阈值
|
|
|
+ORIGIN_SCORE_THRESHOLD = 0.8 # 起点分数阈值
|
|
|
+
|
|
|
+
|
|
|
+# ===== 数据加载 =====
|
|
|
+
|
|
|
+def load_json(file_path: Path) -> Dict:
|
|
|
+ """加载JSON文件"""
|
|
|
+ with open(file_path, "r", encoding="utf-8") as f:
|
|
|
+ return json.load(f)
|
|
|
+
|
|
|
+
|
|
|
+def get_post_graph_files(config: PathConfig) -> List[Path]:
|
|
|
+ """获取所有帖子图谱文件"""
|
|
|
+ post_graph_dir = config.intermediate_dir / "post_graph"
|
|
|
+ return sorted(post_graph_dir.glob("*_帖子图谱.json"))
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+# ===== 第一步:数据准备 =====
|
|
|
+
|
|
|
+def extract_post_detail(post_graph: Dict) -> Dict:
|
|
|
+ """提取帖子详情"""
|
|
|
+ meta = post_graph.get("meta", {})
|
|
|
+ post_detail = meta.get("postDetail", {})
|
|
|
+
|
|
|
+ return {
|
|
|
+ "postId": meta.get("postId", ""),
|
|
|
+ "postTitle": meta.get("postTitle", ""),
|
|
|
+ "body_text": post_detail.get("body_text", ""),
|
|
|
+ "images": post_detail.get("images", []),
|
|
|
+ "video": post_detail.get("video"),
|
|
|
+ "publish_time": post_detail.get("publish_time", ""),
|
|
|
+ "like_count": post_detail.get("like_count", 0),
|
|
|
+ "collect_count": post_detail.get("collect_count", 0),
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def extract_analysis_nodes(post_graph: Dict, persona_graph: Dict) -> tuple:
|
|
|
+ """
|
|
|
+ 提取待分析节点列表
|
|
|
+
|
|
|
+ 待分析节点 = 灵感点 + 目的点 + 关键点
|
|
|
+ """
|
|
|
+ nodes = post_graph.get("nodes", {})
|
|
|
+ edges = post_graph.get("edges", {})
|
|
|
+ persona_nodes = persona_graph.get("nodes", {})
|
|
|
+ persona_index = persona_graph.get("index", {})
|
|
|
+
|
|
|
+ # 1. 收集关键点信息
|
|
|
+ keypoints = {}
|
|
|
+ for node_id, node in nodes.items():
|
|
|
+ if node.get("type") == "标签" and node.get("dimension") == "关键点":
|
|
|
+ keypoints[node_id] = {
|
|
|
+ "名称": node.get("name", ""),
|
|
|
+ "描述": node.get("detail", {}).get("description", ""),
|
|
|
+ }
|
|
|
+
|
|
|
+ # 2. 分析支撑关系
|
|
|
+ support_map = {}
|
|
|
+ for edge_id, edge in edges.items():
|
|
|
+ if edge.get("type") == "支撑":
|
|
|
+ source_id = edge.get("source", "")
|
|
|
+ target_id = edge.get("target", "")
|
|
|
+ if source_id in keypoints:
|
|
|
+ if target_id not in support_map:
|
|
|
+ support_map[target_id] = []
|
|
|
+ support_map[target_id].append(keypoints[source_id])
|
|
|
+
|
|
|
+ # 3. 分析关联关系
|
|
|
+ relation_map = {}
|
|
|
+ for edge_id, edge in edges.items():
|
|
|
+ if edge.get("type") == "关联":
|
|
|
+ source_id = edge.get("source", "")
|
|
|
+ target_id = edge.get("target", "")
|
|
|
+ source_name = nodes.get(source_id, {}).get("name", "")
|
|
|
+ target_name = nodes.get(target_id, {}).get("name", "")
|
|
|
+
|
|
|
+ if source_id not in relation_map:
|
|
|
+ relation_map[source_id] = []
|
|
|
+ relation_map[source_id].append(target_name)
|
|
|
+
|
|
|
+ if target_id not in relation_map:
|
|
|
+ relation_map[target_id] = []
|
|
|
+ relation_map[target_id].append(source_name)
|
|
|
+
|
|
|
+ # 4. 分析人设匹配
|
|
|
+ match_map = {}
|
|
|
+ persona_out_edges = persona_index.get("outEdges", {})
|
|
|
+
|
|
|
+ def get_node_info(node_id: str) -> Optional[Dict]:
|
|
|
+ """获取人设节点的标准信息"""
|
|
|
+ node = persona_nodes.get(node_id, {})
|
|
|
+ if not node:
|
|
|
+ return None
|
|
|
+ detail = node.get("detail", {})
|
|
|
+ parent_path = detail.get("parentPath", [])
|
|
|
+ return {
|
|
|
+ "节点ID": node_id,
|
|
|
+ "节点名称": node.get("name", ""),
|
|
|
+ "节点分类": "/".join(parent_path) if parent_path else "",
|
|
|
+ "节点维度": node.get("dimension", ""),
|
|
|
+ "节点类型": node.get("type", ""),
|
|
|
+ "人设全局占比": detail.get("probGlobal", 0),
|
|
|
+ "父类下占比": detail.get("probToParent", 0),
|
|
|
+ }
|
|
|
+
|
|
|
+ def get_parent_category_id(node_id: str) -> Optional[str]:
|
|
|
+ """通过属于边获取父分类节点ID"""
|
|
|
+ belong_edges = persona_out_edges.get(node_id, {}).get("属于", [])
|
|
|
+ for edge in belong_edges:
|
|
|
+ target_id = edge.get("target", "")
|
|
|
+ target_node = persona_nodes.get(target_id, {})
|
|
|
+ if target_node.get("type") == "分类":
|
|
|
+ return target_id
|
|
|
+ return None
|
|
|
+
|
|
|
+ for edge_id, edge in edges.items():
|
|
|
+ if edge.get("type") == "匹配":
|
|
|
+ source_id = edge.get("source", "")
|
|
|
+ target_id = edge.get("target", "")
|
|
|
+
|
|
|
+ if source_id.startswith("帖子:") and target_id.startswith("人设:"):
|
|
|
+ match_score = edge.get("score", 0)
|
|
|
+ persona_node = persona_nodes.get(target_id, {})
|
|
|
+
|
|
|
+ if persona_node:
|
|
|
+ node_type = persona_node.get("type", "")
|
|
|
+ match_node_info = get_node_info(target_id)
|
|
|
+ if not match_node_info:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if node_type == "标签":
|
|
|
+ category_id = get_parent_category_id(target_id)
|
|
|
+ else:
|
|
|
+ category_id = target_id
|
|
|
+
|
|
|
+ category_info = None
|
|
|
+ if category_id:
|
|
|
+ category_node = persona_nodes.get(category_id, {})
|
|
|
+ if category_node:
|
|
|
+ category_detail = category_node.get("detail", {})
|
|
|
+ category_path = category_detail.get("parentPath", [])
|
|
|
+ category_info = {
|
|
|
+ "节点ID": category_id,
|
|
|
+ "节点名称": category_node.get("name", ""),
|
|
|
+ "节点分类": "/".join(category_path) if category_path else "",
|
|
|
+ "节点维度": category_node.get("dimension", ""),
|
|
|
+ "节点类型": "分类",
|
|
|
+ "人设全局占比": category_detail.get("probGlobal", 0),
|
|
|
+ "父类下占比": category_detail.get("probToParent", 0),
|
|
|
+ "历史共现分类": [],
|
|
|
+ }
|
|
|
+
|
|
|
+ co_occur_edges = persona_out_edges.get(category_id, {}).get("分类共现", [])
|
|
|
+ co_occur_edges_sorted = sorted(co_occur_edges, key=lambda x: x.get("score", 0), reverse=True)
|
|
|
+ for co_edge in co_occur_edges_sorted[:5]:
|
|
|
+ co_target_id = co_edge.get("target", "")
|
|
|
+ co_score = co_edge.get("score", 0)
|
|
|
+ co_node = persona_nodes.get(co_target_id, {})
|
|
|
+ if co_node:
|
|
|
+ co_detail = co_node.get("detail", {})
|
|
|
+ co_path = co_detail.get("parentPath", [])
|
|
|
+ category_info["历史共现分类"].append({
|
|
|
+ "节点ID": co_target_id,
|
|
|
+ "节点名称": co_node.get("name", ""),
|
|
|
+ "节点分类": "/".join(co_path) if co_path else "",
|
|
|
+ "节点维度": co_node.get("dimension", ""),
|
|
|
+ "节点类型": "分类",
|
|
|
+ "人设全局占比": co_detail.get("probGlobal", 0),
|
|
|
+ "父类下占比": co_detail.get("probToParent", 0),
|
|
|
+ "共现度": round(co_score, 4),
|
|
|
+ })
|
|
|
+
|
|
|
+ if source_id not in match_map:
|
|
|
+ match_map[source_id] = []
|
|
|
+ match_map[source_id].append({
|
|
|
+ "匹配节点": match_node_info,
|
|
|
+ "匹配分数": round(match_score, 4),
|
|
|
+ "所属分类": category_info,
|
|
|
+ })
|
|
|
+
|
|
|
+ # 5. 构建待分析节点列表
|
|
|
+ analysis_nodes = []
|
|
|
+ for node_id, node in nodes.items():
|
|
|
+ if node.get("type") == "标签" and node.get("domain") == "帖子":
|
|
|
+ dimension = node.get("dimension", "")
|
|
|
+ if dimension in ["灵感点", "目的点", "关键点"]:
|
|
|
+ match_info = match_map.get(node_id)
|
|
|
+
|
|
|
+ analysis_nodes.append({
|
|
|
+ "节点ID": node_id,
|
|
|
+ "节点名称": node.get("name", ""),
|
|
|
+ "节点分类": node.get("category", ""),
|
|
|
+ "节点维度": dimension,
|
|
|
+ "节点类型": node.get("type", ""),
|
|
|
+ "节点描述": node.get("detail", {}).get("description", ""),
|
|
|
+ "人设匹配": match_info,
|
|
|
+ })
|
|
|
+
|
|
|
+ # 6. 构建关系列表
|
|
|
+ relation_list = []
|
|
|
+
|
|
|
+ for edge_id, edge in edges.items():
|
|
|
+ if edge.get("type") == "支撑":
|
|
|
+ source_id = edge.get("source", "")
|
|
|
+ target_id = edge.get("target", "")
|
|
|
+ if source_id in keypoints:
|
|
|
+ relation_list.append({
|
|
|
+ "来源节点": source_id,
|
|
|
+ "目标节点": target_id,
|
|
|
+ "关系类型": "支撑",
|
|
|
+ })
|
|
|
+
|
|
|
+ seen_relations = set()
|
|
|
+ for edge_id, edge in edges.items():
|
|
|
+ if edge.get("type") == "关联":
|
|
|
+ source_id = edge.get("source", "")
|
|
|
+ target_id = edge.get("target", "")
|
|
|
+ key = tuple(sorted([source_id, target_id]))
|
|
|
+ if key not in seen_relations:
|
|
|
+ seen_relations.add(key)
|
|
|
+ relation_list.append({
|
|
|
+ "来源节点": source_id,
|
|
|
+ "目标节点": target_id,
|
|
|
+ "关系类型": "关联",
|
|
|
+ })
|
|
|
+
|
|
|
+ return analysis_nodes, relation_list
|
|
|
+
|
|
|
+
|
|
|
+def prepare_analysis_data(post_graph: Dict, persona_graph: Dict) -> Dict:
|
|
|
+ """
|
|
|
+ 准备完整的分析数据
|
|
|
+
|
|
|
+ 输出扁平化的节点列表 + 独立的人设共现关系数据
|
|
|
+ """
|
|
|
+ analysis_nodes, relation_list = extract_analysis_nodes(post_graph, persona_graph)
|
|
|
+
|
|
|
+ # 扁平化节点,提取人设共现关系数据
|
|
|
+ flat_nodes = []
|
|
|
+ persona_co_occur = {} # {分类ID: {名称, 共现分类列表}}
|
|
|
+
|
|
|
+ for node in analysis_nodes:
|
|
|
+ # 基础节点字段
|
|
|
+ flat_node = {
|
|
|
+ "节点ID": node["节点ID"],
|
|
|
+ "节点名称": node["节点名称"],
|
|
|
+ "节点分类": node.get("节点分类", ""),
|
|
|
+ "节点维度": node["节点维度"],
|
|
|
+ "节点描述": node.get("节点描述", ""),
|
|
|
+ "是否已知": False,
|
|
|
+ "发现编号": None,
|
|
|
+ }
|
|
|
+
|
|
|
+ # 提取人设匹配信息(list格式,支持多个匹配)
|
|
|
+ match_list = node.get("人设匹配") or []
|
|
|
+ if match_list:
|
|
|
+ flat_node["人设匹配"] = []
|
|
|
+ for match_info in match_list:
|
|
|
+ category_info = match_info.get("所属分类")
|
|
|
+ category_id = category_info.get("节点ID") if category_info else None
|
|
|
+
|
|
|
+ # 保留完整的匹配信息,但去掉历史共现分类(拆到外面)
|
|
|
+ clean_match = {
|
|
|
+ "匹配节点": match_info.get("匹配节点"),
|
|
|
+ "匹配分数": match_info.get("匹配分数", 0),
|
|
|
+ }
|
|
|
+ if category_info:
|
|
|
+ # 复制所属分类,但不包含历史共现分类
|
|
|
+ clean_category = {k: v for k, v in category_info.items() if k != "历史共现分类"}
|
|
|
+ clean_match["所属分类"] = clean_category
|
|
|
+
|
|
|
+ flat_node["人设匹配"].append(clean_match)
|
|
|
+
|
|
|
+ # 收集人设共现关系(去重)- 从历史共现分类拆出来
|
|
|
+ if category_id and category_id not in persona_co_occur:
|
|
|
+ co_occur_list = category_info.get("历史共现分类", [])
|
|
|
+ if co_occur_list:
|
|
|
+ persona_co_occur[category_id] = [
|
|
|
+ {
|
|
|
+ "节点ID": c.get("节点ID"),
|
|
|
+ "节点名称": c.get("节点名称"),
|
|
|
+ "节点分类": c.get("节点分类", ""),
|
|
|
+ "节点维度": c.get("节点维度", ""),
|
|
|
+ "节点类型": c.get("节点类型", ""),
|
|
|
+ "人设全局占比": c.get("人设全局占比", 0),
|
|
|
+ "父类下占比": c.get("父类下占比", 0),
|
|
|
+ "共现度": c.get("共现度", 0),
|
|
|
+ }
|
|
|
+ for c in co_occur_list
|
|
|
+ if c.get("节点ID")
|
|
|
+ ]
|
|
|
+ else:
|
|
|
+ flat_node["人设匹配"] = []
|
|
|
+
|
|
|
+ flat_nodes.append(flat_node)
|
|
|
+
|
|
|
+ return {
|
|
|
+ "帖子详情": extract_post_detail(post_graph),
|
|
|
+ "节点列表": flat_nodes,
|
|
|
+ "关系列表": relation_list,
|
|
|
+ "人设共现关系": persona_co_occur,
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+# ===== 第二步:起点分析 =====
|
|
|
+
|
|
|
+def get_best_match(node: Dict) -> Optional[Dict]:
|
|
|
+ """获取节点的最佳人设匹配(分数最高的)"""
|
|
|
+ match_list = node.get("人设匹配") or []
|
|
|
+ if not match_list:
|
|
|
+ return None
|
|
|
+ return max(match_list, key=lambda m: m.get("匹配分数", 0))
|
|
|
+
|
|
|
+
|
|
|
+def get_match_score(node: Dict) -> float:
|
|
|
+ """获取节点的最高人设匹配分数"""
|
|
|
+ best_match = get_best_match(node)
|
|
|
+ if best_match:
|
|
|
+ return best_match.get("匹配分数", 0)
|
|
|
+ return 0
|
|
|
+
|
|
|
+
|
|
|
+def get_category_id(node: Dict) -> Optional[str]:
|
|
|
+ """获取节点的所属分类ID(最佳匹配的)"""
|
|
|
+ best_match = get_best_match(node)
|
|
|
+ if best_match:
|
|
|
+ category = best_match.get("所属分类")
|
|
|
+ if category:
|
|
|
+ return category.get("节点ID")
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def get_all_category_ids(node: Dict) -> List[str]:
|
|
|
+ """获取节点所有匹配的分类ID"""
|
|
|
+ match_list = node.get("人设匹配") or []
|
|
|
+ result = []
|
|
|
+ for m in match_list:
|
|
|
+ category = m.get("所属分类")
|
|
|
+ if category and category.get("节点ID"):
|
|
|
+ result.append(category.get("节点ID"))
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+def get_category_global_ratio(node: Dict) -> float:
|
|
|
+ """获取节点所属分类的人设全局占比(最佳匹配的)"""
|
|
|
+ best_match = get_best_match(node)
|
|
|
+ if best_match:
|
|
|
+ category = best_match.get("所属分类")
|
|
|
+ if category:
|
|
|
+ return category.get("人设全局占比", 0)
|
|
|
+ return 0
|
|
|
+
|
|
|
+
|
|
|
+def is_persona_constant(node: Dict) -> bool:
|
|
|
+ """判断节点是否为人设常量(匹配分数 >= 0.8 且 分类全局占比 >= 0.8)"""
|
|
|
+ match_score = get_match_score(node)
|
|
|
+ global_ratio = get_category_global_ratio(node)
|
|
|
+ return match_score >= MATCH_SCORE_THRESHOLD and global_ratio >= GLOBAL_RATIO_THRESHOLD
|
|
|
+
|
|
|
+
|
|
|
+def build_origin_context(nodes: List[Dict]) -> Dict:
|
|
|
+ """构造AI分析的上下文"""
|
|
|
+
|
|
|
+ all_points = []
|
|
|
+ for node in nodes:
|
|
|
+ all_points.append({
|
|
|
+ "名称": node["节点名称"],
|
|
|
+ "分类": node.get("节点分类", ""),
|
|
|
+ "维度": node.get("节点维度", ""),
|
|
|
+ "描述": node.get("节点描述", ""),
|
|
|
+ "人设匹配度": round(get_match_score(node), 2),
|
|
|
+ })
|
|
|
+
|
|
|
+ # 起点候选集(灵感点 + 目的点)
|
|
|
+ candidates = [
|
|
|
+ node["节点名称"]
|
|
|
+ for node in nodes
|
|
|
+ if node["节点维度"] in ["灵感点", "目的点"]
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 人设常量(匹配分数 >= 0.8 且 分类全局占比 >= 0.8)
|
|
|
+ constants = [
|
|
|
+ node["节点名称"]
|
|
|
+ for node in nodes
|
|
|
+ if is_persona_constant(node)
|
|
|
+ ]
|
|
|
+
|
|
|
+ return {
|
|
|
+ "all_points": all_points,
|
|
|
+ "candidates": candidates,
|
|
|
+ "constants": constants,
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def format_origin_prompt(context: Dict) -> str:
|
|
|
+ """格式化起点分析的prompt"""
|
|
|
+ all_points = context["all_points"]
|
|
|
+ candidates = context["candidates"]
|
|
|
+ constants = context["constants"]
|
|
|
+
|
|
|
+ points_text = ""
|
|
|
+ for p in all_points:
|
|
|
+ points_text += f"- {p['名称']}\n"
|
|
|
+ points_text += f" 维度: {p['维度']} | 分类: {p['分类']}\n"
|
|
|
+ points_text += f" 描述: {p['描述']}\n"
|
|
|
+ points_text += f" 人设匹配度: {p['人设匹配度']}\n"
|
|
|
+ points_text += "\n"
|
|
|
+
|
|
|
+ candidates_text = "、".join(candidates)
|
|
|
+ constants_text = "、".join(constants) if constants else "无"
|
|
|
+
|
|
|
+ prompt = f"""# Role
|
|
|
+你是小红书爆款内容的"逆向工程"专家。你的核心能力是透过内容的表象(视觉/形式),还原创作者最初的脑回路(动机/实质)。
|
|
|
+
|
|
|
+# Task
|
|
|
+我提供一组笔记的【创意标签】和一个【起点候选集】。
|
|
|
+请推理出哪些选项是真正的**创意起点**。
|
|
|
+
|
|
|
+
|
|
|
+# Input Data
|
|
|
+
|
|
|
+## 全部创意点
|
|
|
+
|
|
|
+{points_text}
|
|
|
+
|
|
|
+## 起点候选集
|
|
|
+{candidates_text}
|
|
|
+
|
|
|
+## 来自人设的常量
|
|
|
+{constants_text}
|
|
|
+
|
|
|
+
|
|
|
+# 推理约束
|
|
|
+
|
|
|
+1. 实质推形式,而不是形式推实质,除非形式是一切创意的起点
|
|
|
+2. 因推果而不是果推因
|
|
|
+3. 无法被其他项或人设推理出的点,即为起点
|
|
|
+
|
|
|
+# Output Format
|
|
|
+
|
|
|
+请输出一个标准的 JSON 格式。
|
|
|
+- Key: 候选集中的词。
|
|
|
+- Value: 一个对象,包含:
|
|
|
+ - `score`: 0.0 到 1.0 的浮点数(代表是起点的可能性)。
|
|
|
+ - `analysis`: 一句话推理"""
|
|
|
+
|
|
|
+ return prompt
|
|
|
+
|
|
|
+
|
|
|
+async def analyze_origin(nodes: List[Dict], force_llm: bool = False) -> Dict:
|
|
|
+ """
|
|
|
+ 执行起点分析
|
|
|
+
|
|
|
+ 输入: 节点列表
|
|
|
+ 输出: 节点列表(加了起点分析、是否已知、发现编号字段)+ 中间结果
|
|
|
+ """
|
|
|
+ context = build_origin_context(nodes)
|
|
|
+ prompt = format_origin_prompt(context)
|
|
|
+
|
|
|
+ print(f"\n 起点候选: {len(context['candidates'])} 个")
|
|
|
+ print(f" 人设常量: {len(context['constants'])} 个")
|
|
|
+
|
|
|
+ result = await analyze(
|
|
|
+ prompt=prompt,
|
|
|
+ task_name=f"{TASK_NAME}/origin",
|
|
|
+ force=force_llm,
|
|
|
+ parse_json=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ # 把分析结果合并到节点
|
|
|
+ llm_result = result.data or {}
|
|
|
+ output_nodes = []
|
|
|
+ current_order = 1 # 已知节点的发现编号计数
|
|
|
+
|
|
|
+ for node in nodes:
|
|
|
+ new_node = dict(node) # 复制原节点
|
|
|
+ name = node["节点名称"]
|
|
|
+
|
|
|
+ if name in llm_result:
|
|
|
+ score = llm_result[name].get("score", 0)
|
|
|
+ analysis = llm_result[name].get("analysis", "")
|
|
|
+ # 加起点分析
|
|
|
+ new_node["起点分析"] = {
|
|
|
+ "分数": score,
|
|
|
+ "说明": analysis,
|
|
|
+ }
|
|
|
+ # 高分起点标记为已知
|
|
|
+ if score >= ORIGIN_SCORE_THRESHOLD:
|
|
|
+ new_node["是否已知"] = True
|
|
|
+ new_node["发现编号"] = current_order
|
|
|
+ current_order += 1
|
|
|
+ else:
|
|
|
+ new_node["起点分析"] = None
|
|
|
+
|
|
|
+ output_nodes.append(new_node)
|
|
|
+
|
|
|
+ return {
|
|
|
+ "输入上下文": {
|
|
|
+ "起点候选": context["candidates"],
|
|
|
+ "人设常量": context["constants"],
|
|
|
+ },
|
|
|
+ "中间结果": llm_result,
|
|
|
+ "输出节点": output_nodes,
|
|
|
+ "cache_hit": result.cache_hit,
|
|
|
+ "model": result.model_name,
|
|
|
+ "log_url": result.log_url,
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+# ===== 第三步:模式推导 =====
|
|
|
+
|
|
|
+def derive_patterns(
|
|
|
+ nodes: List[Dict],
|
|
|
+ persona_co_occur: Dict[str, Dict],
|
|
|
+) -> Dict:
|
|
|
+ """
|
|
|
+ 基于共现关系的迭代推导
|
|
|
+
|
|
|
+ 输入: 带起点分析的节点列表 + 人设共现关系数据
|
|
|
+ 输出: 节点列表(加了推导轮次、未知原因字段)+ 推导边列表
|
|
|
+ """
|
|
|
+ node_by_name: Dict[str, Dict] = {n["节点名称"]: n for n in nodes}
|
|
|
+
|
|
|
+ # 构建共现查找表 {节点ID: {共现节点ID: 共现度}}
|
|
|
+ co_occur_lookup = {}
|
|
|
+ for cat_id, co_occur_list in persona_co_occur.items():
|
|
|
+ co_occur_lookup[cat_id] = {
|
|
|
+ c["节点ID"]: c["共现度"]
|
|
|
+ for c in co_occur_list
|
|
|
+ }
|
|
|
+
|
|
|
+ # 1. 初始化已知点集合(已经是已知的节点)
|
|
|
+ known_names: Set[str] = set()
|
|
|
+ node_round: Dict[str, int] = {} # {节点名称: 加入轮次}
|
|
|
+
|
|
|
+ for node in nodes:
|
|
|
+ if node.get("是否已知"):
|
|
|
+ known_names.add(node["节点名称"])
|
|
|
+ node_round[node["节点名称"]] = 0
|
|
|
+
|
|
|
+ unknown_names: Set[str] = set(node_by_name.keys()) - known_names
|
|
|
+ edges: List[Dict] = []
|
|
|
+
|
|
|
+ # 2. 迭代推导
|
|
|
+ round_num = 0
|
|
|
+ new_known_this_round = known_names.copy()
|
|
|
+
|
|
|
+ while new_known_this_round:
|
|
|
+ round_num += 1
|
|
|
+ new_known_next_round: Set[str] = set()
|
|
|
+
|
|
|
+ for known_name in new_known_this_round:
|
|
|
+ known_node = node_by_name.get(known_name)
|
|
|
+ if not known_node:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if get_match_score(known_node) < MATCH_SCORE_THRESHOLD:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 获取该节点所属分类的共现列表
|
|
|
+ known_cat_id = get_category_id(known_node)
|
|
|
+ if not known_cat_id or known_cat_id not in co_occur_lookup:
|
|
|
+ continue
|
|
|
+
|
|
|
+ co_occur_map = co_occur_lookup[known_cat_id]
|
|
|
+
|
|
|
+ for unknown_name in list(unknown_names):
|
|
|
+ unknown_node = node_by_name.get(unknown_name)
|
|
|
+ if not unknown_node:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if get_match_score(unknown_node) < MATCH_SCORE_THRESHOLD:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 检查未知节点的分类是否在已知节点的共现列表中
|
|
|
+ unknown_cat_id = get_category_id(unknown_node)
|
|
|
+ if unknown_cat_id and unknown_cat_id in co_occur_map:
|
|
|
+ co_occur_score = co_occur_map[unknown_cat_id]
|
|
|
+ new_known_next_round.add(unknown_name)
|
|
|
+ node_round[unknown_name] = round_num
|
|
|
+
|
|
|
+ edges.append({
|
|
|
+ "来源": known_node["节点ID"],
|
|
|
+ "目标": unknown_node["节点ID"],
|
|
|
+ "关系类型": "共现推导",
|
|
|
+ "推导轮次": round_num,
|
|
|
+ "共现分类ID": unknown_cat_id,
|
|
|
+ "共现度": co_occur_score,
|
|
|
+ })
|
|
|
+
|
|
|
+ known_names.update(new_known_next_round)
|
|
|
+ unknown_names -= new_known_next_round
|
|
|
+ new_known_this_round = new_known_next_round
|
|
|
+
|
|
|
+ if not new_known_next_round:
|
|
|
+ break
|
|
|
+
|
|
|
+ # 3. 构建输出节点(只更新是否已知、发现编号)
|
|
|
+ # 先找出当前最大发现编号
|
|
|
+ max_order = 0
|
|
|
+ for node in nodes:
|
|
|
+ if node.get("发现编号") and node["发现编号"] > max_order:
|
|
|
+ max_order = node["发现编号"]
|
|
|
+
|
|
|
+ # 按推导轮次排序新发现的节点,分配发现编号
|
|
|
+ new_known_by_round = {}
|
|
|
+ for name, r in node_round.items():
|
|
|
+ if r > 0: # 排除起点(轮次0)
|
|
|
+ if r not in new_known_by_round:
|
|
|
+ new_known_by_round[r] = []
|
|
|
+ new_known_by_round[r].append(name)
|
|
|
+
|
|
|
+ # 分配发现编号
|
|
|
+ order_map = {}
|
|
|
+ current_order = max_order + 1
|
|
|
+ for r in sorted(new_known_by_round.keys()):
|
|
|
+ for name in new_known_by_round[r]:
|
|
|
+ order_map[name] = current_order
|
|
|
+ current_order += 1
|
|
|
+
|
|
|
+ output_nodes = []
|
|
|
+ for node in nodes:
|
|
|
+ new_node = dict(node)
|
|
|
+ name = node["节点名称"]
|
|
|
+
|
|
|
+ # 如果是新推导出来的(非起点),更新已知状态和发现编号
|
|
|
+ if name in node_round and node_round[name] > 0:
|
|
|
+ new_node["是否已知"] = True
|
|
|
+ new_node["发现编号"] = order_map.get(name)
|
|
|
+
|
|
|
+ output_nodes.append(new_node)
|
|
|
+
|
|
|
+ return {
|
|
|
+ "输出节点": output_nodes,
|
|
|
+ "推导边列表": edges,
|
|
|
+ "推导轮次": round_num,
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+# ===== 完整流程 =====
|
|
|
+
|
|
|
+def save_result(post_id: str, post_detail: Dict, steps: List, config: PathConfig) -> Path:
|
|
|
+ """保存结果到文件"""
|
|
|
+ output_dir = config.intermediate_dir / "creation_pattern"
|
|
|
+ output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+ output_file = output_dir / f"{post_id}_创作模式.json"
|
|
|
+
|
|
|
+ result = {
|
|
|
+ "帖子详情": post_detail,
|
|
|
+ "步骤列表": steps,
|
|
|
+ }
|
|
|
+ with open(output_file, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(result, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ print(f" [已保存] {output_file.name}")
|
|
|
+ return output_file
|
|
|
+
|
|
|
+
|
|
|
+async def process_single_post(
|
|
|
+ post_file: Path,
|
|
|
+ persona_graph: Dict,
|
|
|
+ config: PathConfig,
|
|
|
+ force_llm: bool = False,
|
|
|
+ max_step: int = 3,
|
|
|
+) -> Dict:
|
|
|
+ """
|
|
|
+ 处理单个帖子
|
|
|
+
|
|
|
+ Args:
|
|
|
+ force_llm: 强制重新调用LLM(跳过LLM缓存)
|
|
|
+ max_step: 最多运行到第几步 (1=数据准备, 2=起点分析, 3=模式推导)
|
|
|
+ """
|
|
|
+ post_graph = load_json(post_file)
|
|
|
+ post_id = post_graph.get("meta", {}).get("postId", "unknown")
|
|
|
+
|
|
|
+ print(f"\n{'=' * 60}")
|
|
|
+ print(f"处理帖子: {post_id}")
|
|
|
+ print("-" * 60)
|
|
|
+
|
|
|
+ steps = []
|
|
|
+
|
|
|
+ # ===== 步骤1:数据准备 =====
|
|
|
+ print("\n[步骤1] 数据准备...")
|
|
|
+ data = prepare_analysis_data(post_graph, persona_graph)
|
|
|
+ post_detail = data["帖子详情"]
|
|
|
+ nodes_step1 = data["节点列表"]
|
|
|
+ relations_step1 = data["关系列表"]
|
|
|
+ persona_co_occur = data["人设共现关系"]
|
|
|
+
|
|
|
+ # 步骤1所有节点都是新的
|
|
|
+ new_known_step1 = [n["节点名称"] for n in nodes_step1 if n.get("是否已知")]
|
|
|
+
|
|
|
+ step1 = {
|
|
|
+ "步骤": "数据准备",
|
|
|
+ "输入": {
|
|
|
+ "帖子图谱": str(post_file.name),
|
|
|
+ "人设图谱": "人设图谱.json",
|
|
|
+ },
|
|
|
+ "输出": {
|
|
|
+ "新的已知节点": new_known_step1,
|
|
|
+ "新的边": [],
|
|
|
+ "节点列表": nodes_step1,
|
|
|
+ "边列表": relations_step1,
|
|
|
+ },
|
|
|
+ "人设共现关系": persona_co_occur,
|
|
|
+ "摘要": {
|
|
|
+ "节点数": len(nodes_step1),
|
|
|
+ "边数": len(relations_step1),
|
|
|
+ "人设共现数": len(persona_co_occur),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ steps.append(step1)
|
|
|
+ print(f" 节点数: {len(nodes_step1)}")
|
|
|
+ print(f" 关系数: {len(relations_step1)}")
|
|
|
+ print(f" 人设共现数: {len(persona_co_occur)}")
|
|
|
+
|
|
|
+ # 步骤1完成,保存
|
|
|
+ save_result(post_id, post_detail, steps, config)
|
|
|
+
|
|
|
+ if max_step == 1:
|
|
|
+ return {"帖子详情": post_detail, "步骤列表": steps}
|
|
|
+
|
|
|
+ # ===== 步骤2:起点分析 =====
|
|
|
+ print("\n[步骤2] 起点分析...")
|
|
|
+ origin_result = await analyze_origin(nodes_step1, force_llm=force_llm)
|
|
|
+ nodes_step2 = origin_result["输出节点"]
|
|
|
+
|
|
|
+ # 统计高分起点
|
|
|
+ def get_origin_score(node):
|
|
|
+ analysis = node.get("起点分析")
|
|
|
+ if analysis:
|
|
|
+ return analysis.get("分数", 0)
|
|
|
+ return 0
|
|
|
+
|
|
|
+ high_score_origins = [
|
|
|
+ (n["节点名称"], get_origin_score(n))
|
|
|
+ for n in nodes_step2
|
|
|
+ if get_origin_score(n) >= 0.7
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 新发现的已知节点(起点)
|
|
|
+ new_known_nodes = [n["节点名称"] for n in nodes_step2 if n.get("是否已知")]
|
|
|
+
|
|
|
+ step2 = {
|
|
|
+ "步骤": "起点分析",
|
|
|
+ "输入": {
|
|
|
+ "节点列表": nodes_step1,
|
|
|
+ "起点候选": origin_result["输入上下文"]["起点候选"],
|
|
|
+ "人设常量": origin_result["输入上下文"]["人设常量"],
|
|
|
+ },
|
|
|
+ "中间结果": origin_result["中间结果"],
|
|
|
+ "输出": {
|
|
|
+ "新的已知节点": new_known_nodes,
|
|
|
+ "新的边": [],
|
|
|
+ "节点列表": nodes_step2,
|
|
|
+ "边列表": relations_step1, # 边没变化
|
|
|
+ },
|
|
|
+ "摘要": {
|
|
|
+ "新已知数": len(new_known_nodes),
|
|
|
+ "model": origin_result["model"],
|
|
|
+ "cache_hit": origin_result["cache_hit"],
|
|
|
+ "log_url": origin_result.get("log_url"),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ steps.append(step2)
|
|
|
+
|
|
|
+ print(f" 高分起点 (>=0.7): {len(high_score_origins)} 个")
|
|
|
+ for name, score in sorted(high_score_origins, key=lambda x: -x[1]):
|
|
|
+ print(f" ★ {name}: {score:.2f}")
|
|
|
+
|
|
|
+ # 步骤2完成,保存
|
|
|
+ save_result(post_id, post_detail, steps, config)
|
|
|
+
|
|
|
+ if max_step == 2:
|
|
|
+ return {"帖子详情": post_detail, "步骤列表": steps}
|
|
|
+
|
|
|
+ # ===== 步骤3:模式推导 =====
|
|
|
+ print("\n[步骤3] 模式推导...")
|
|
|
+ derivation_result = derive_patterns(nodes_step2, persona_co_occur)
|
|
|
+ nodes_step3 = derivation_result["输出节点"]
|
|
|
+ edges = derivation_result["推导边列表"]
|
|
|
+
|
|
|
+ # 统计
|
|
|
+ known_count = sum(1 for n in nodes_step3 if n.get("是否已知"))
|
|
|
+ unknown_count = len(nodes_step3) - known_count
|
|
|
+
|
|
|
+ # 新发现的已知节点(本步骤推导出来的,不包括之前的起点)
|
|
|
+ prev_known = {n["节点名称"] for n in nodes_step2 if n.get("是否已知")}
|
|
|
+ new_known_nodes = [n["节点名称"] for n in nodes_step3 if n.get("是否已知") and n["节点名称"] not in prev_known]
|
|
|
+
|
|
|
+ # 合并边列表(原有边 + 推导边)
|
|
|
+ all_edges = relations_step1 + edges
|
|
|
+
|
|
|
+ step3 = {
|
|
|
+ "步骤": "模式推导",
|
|
|
+ "输入": {
|
|
|
+ "节点列表": nodes_step2,
|
|
|
+ "人设共现关系": persona_co_occur,
|
|
|
+ },
|
|
|
+ "输出": {
|
|
|
+ "新的已知节点": new_known_nodes,
|
|
|
+ "新的边": edges,
|
|
|
+ "节点列表": nodes_step3,
|
|
|
+ "边列表": all_edges,
|
|
|
+ },
|
|
|
+ "摘要": {
|
|
|
+ "已知点数": known_count,
|
|
|
+ "新已知数": len(new_known_nodes),
|
|
|
+ "新边数": len(edges),
|
|
|
+ "未知点数": unknown_count,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ steps.append(step3)
|
|
|
+
|
|
|
+ print(f" 已知点: {known_count} 个")
|
|
|
+ print(f" 推导边: {len(edges)} 条")
|
|
|
+ print(f" 未知点: {unknown_count} 个")
|
|
|
+
|
|
|
+ # 步骤3完成,保存
|
|
|
+ save_result(post_id, post_detail, steps, config)
|
|
|
+
|
|
|
+ return {"帖子详情": post_detail, "步骤列表": steps}
|
|
|
+
|
|
|
+
|
|
|
+# ===== 主函数 =====
|
|
|
+
|
|
|
+async def main(
|
|
|
+ post_id: str = None,
|
|
|
+ all_posts: bool = False,
|
|
|
+ force_llm: bool = False,
|
|
|
+ max_step: int = 3,
|
|
|
+):
|
|
|
+ """主函数"""
|
|
|
+ _, log_url = set_trace()
|
|
|
+
|
|
|
+ config = PathConfig()
|
|
|
+
|
|
|
+ print(f"账号: {config.account_name}")
|
|
|
+ print(f"Trace URL: {log_url}")
|
|
|
+
|
|
|
+ # 加载人设图谱
|
|
|
+ persona_graph_file = config.intermediate_dir / "人设图谱.json"
|
|
|
+ if not persona_graph_file.exists():
|
|
|
+ print(f"错误: 人设图谱文件不存在: {persona_graph_file}")
|
|
|
+ return
|
|
|
+
|
|
|
+ persona_graph = load_json(persona_graph_file)
|
|
|
+ print(f"人设图谱节点数: {len(persona_graph.get('nodes', {}))}")
|
|
|
+
|
|
|
+ # 获取帖子图谱文件
|
|
|
+ post_graph_files = get_post_graph_files(config)
|
|
|
+ if not post_graph_files:
|
|
|
+ print("错误: 没有找到帖子图谱文件")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 确定要处理的帖子
|
|
|
+ if post_id:
|
|
|
+ target_file = next(
|
|
|
+ (f for f in post_graph_files if post_id in f.name),
|
|
|
+ None
|
|
|
+ )
|
|
|
+ if not target_file:
|
|
|
+ print(f"错误: 未找到帖子 {post_id}")
|
|
|
+ return
|
|
|
+ files_to_process = [target_file]
|
|
|
+ elif all_posts:
|
|
|
+ files_to_process = post_graph_files
|
|
|
+ else:
|
|
|
+ files_to_process = [post_graph_files[0]]
|
|
|
+
|
|
|
+ print(f"待处理帖子数: {len(files_to_process)}")
|
|
|
+
|
|
|
+ # 处理
|
|
|
+ results = []
|
|
|
+ for i, post_file in enumerate(files_to_process, 1):
|
|
|
+ print(f"\n{'#' * 60}")
|
|
|
+ print(f"# 处理帖子 {i}/{len(files_to_process)}")
|
|
|
+ print(f"{'#' * 60}")
|
|
|
+
|
|
|
+ result = await process_single_post(
|
|
|
+ post_file=post_file,
|
|
|
+ persona_graph=persona_graph,
|
|
|
+ config=config,
|
|
|
+ force_llm=force_llm,
|
|
|
+ max_step=max_step,
|
|
|
+ )
|
|
|
+ results.append(result)
|
|
|
+
|
|
|
+ # 汇总
|
|
|
+ print(f"\n{'#' * 60}")
|
|
|
+ print(f"# 完成! 共处理 {len(results)} 个帖子")
|
|
|
+ print(f"{'#' * 60}")
|
|
|
+ print(f"Trace: {log_url}")
|
|
|
+
|
|
|
+ print("\n汇总:")
|
|
|
+ for result in results:
|
|
|
+ post_id = result["帖子详情"]["postId"]
|
|
|
+ steps = result.get("步骤列表", [])
|
|
|
+ num_steps = len(steps)
|
|
|
+
|
|
|
+ if num_steps == 1:
|
|
|
+ step1_summary = steps[0].get("摘要", {})
|
|
|
+ print(f" {post_id}: 节点数={step1_summary.get('节点数', 0)} (仅数据准备)")
|
|
|
+ elif num_steps == 2:
|
|
|
+ step2_summary = steps[1].get("摘要", {})
|
|
|
+ print(f" {post_id}: 起点={step2_summary.get('高分起点数', 0)} (未推导)")
|
|
|
+ elif num_steps >= 3:
|
|
|
+ step2_summary = steps[1].get("摘要", {})
|
|
|
+ step3_summary = steps[2].get("摘要", {})
|
|
|
+ print(f" {post_id}: 起点={step2_summary.get('高分起点数', 0)}, "
|
|
|
+ f"已知={step3_summary.get('已知点数', 0)}, "
|
|
|
+ f"推导边={step3_summary.get('推导边数', 0)}")
|
|
|
+ else:
|
|
|
+ print(f" {post_id}: 无步骤数据")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ import argparse
|
|
|
+
|
|
|
+ parser = argparse.ArgumentParser(description="创作模式分析")
|
|
|
+ parser.add_argument("--post-id", type=str, help="帖子ID")
|
|
|
+ parser.add_argument("--all-posts", action="store_true", help="处理所有帖子")
|
|
|
+ parser.add_argument("--force-llm", action="store_true", help="强制重新调用LLM(跳过LLM缓存)")
|
|
|
+ parser.add_argument("--step", type=int, default=3, choices=[1, 2, 3],
|
|
|
+ help="运行到第几步 (1=数据准备, 2=起点分析, 3=模式推导)")
|
|
|
+ args = parser.parse_args()
|
|
|
+
|
|
|
+ asyncio.run(main(
|
|
|
+ post_id=args.post_id,
|
|
|
+ all_posts=args.all_posts,
|
|
|
+ force_llm=args.force_llm,
|
|
|
+ max_step=args.step,
|
|
|
+ ))
|