#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 构建人设图谱 ================================================================================ 输入文件: ================================================================================ 1. pattern聚合结果.json - 分类节点、标签节点、属于/包含边 2. dimension_associations_analysis.json - 分类共现边(跨点) 3. intra_dimension_associations_analysis.json - 分类共现边(点内) 4. 历史帖子解构目录/*.json - 标签共现边 ================================================================================ 输出文件: 人设图谱.json ================================================================================ { "meta": { # 元信息 "description": "...", "account": "账号名", "createdAt": "时间戳", "stats": { ... } # 统计信息 }, "nodes": { # 节点字典 (nodeId -> nodeData) "{domain}:{dimension}:{type}:{name}": { "name": "显示名称", "type": "人设|灵感点|目的点|关键点|分类|标签", "domain": "人设", "dimension": "人设|灵感点|目的点|关键点", "detail": { ... } } }, "edges": { # 边字典 (edgeId -> edgeData) "{source}|{type}|{target}": { "source": "源节点ID", "target": "目标节点ID", "type": "属于|包含|标签共现|分类共现|分类共现_点内", "score": 0.5, "detail": { ... } } }, "index": { # 游走索引 "outEdges": { nodeId: { edgeType: [{ target, score }] } }, "inEdges": { nodeId: { edgeType: [{ source, score }] } } }, "tree": { ... } # 嵌套树结构(从根节点沿"包含"边构建) } ================================================================================ 核心逻辑: ================================================================================ 1. 提取节点 - 从 pattern 提取分类节点(按维度分组的层级分类) - 从 pattern 提取标签节点(具体特征标签) - 添加根节点(人设)和维度节点(灵感点/目的点/关键点) 2. 提取边 - 属于/包含边:根据节点的 parentPath 构建层级关系 - 分类共现边(跨点):从关联分析结果提取 - 分类共现边(点内):从点内关联分析提取 - 标签共现边:遍历历史帖子,统计标签同现 3. 构建索引 - outEdges: 从该节点出发能到达的节点 - inEdges: 能到达该节点的源节点 4. 构建树 - 从根节点开始,沿"包含"边递归构建嵌套树结构 ================================================================================ 节点ID格式: {domain}:{dimension}:{type}:{name} ================================================================================ - 根节点: 人设:人设:人设:人设 - 维度节点: 人设:灵感点:灵感点:灵感点 - 分类节点: 人设:灵感点:分类:视觉呈现 - 标签节点: 人设:灵感点:标签:手绘风格 ================================================================================ 边类型: ================================================================================ - 属于: 子节点 -> 父节点(层级关系) - 包含: 父节点 -> 子节点(层级关系) - 标签共现: 标签 <-> 标签(同一帖子出现) - 分类共现: 分类 <-> 分类(跨维度共现) - 分类共现_点内: 分类 <-> 分类(点内组合共现) ================================================================================ 图游走函数: ================================================================================ 1. walk_graph(index, start_node, edge_types, direction, min_score) - 从起始节点出发,按边类型序列游走N步 - 示例: walk_graph(index, "人设:灵感点:标签:手绘风格", ["属于", "分类共现"]) - 返回: 到达的节点ID集合 2. get_neighbors(index, node_id, edge_type, direction, min_score) - 获取节点的邻居 - 示例: get_neighbors(index, "人设:灵感点:分类:视觉呈现", "包含") - 返回: 邻居列表 [{"target": "...", "score": 0.5}, ...] ================================================================================ """ import json from pathlib import Path from typing import Dict, List, Set, Any from datetime import datetime import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from script.data_processing.path_config import PathConfig # ==================== 节点和边构建工具 ==================== def build_node_id(domain: str, dimension: str, node_type: str, name: str) -> str: """构建节点ID""" return f"{domain}:{dimension}:{node_type}:{name}" def build_edge_id(source: str, edge_type: str, target: str) -> str: """构建边ID""" return f"{source}|{edge_type}|{target}" def create_node( domain: str, dimension: str, node_type: str, name: str, detail: Dict = None ) -> Dict: """创建节点""" return { "name": name, "type": node_type, "dimension": dimension, "domain": domain, "detail": detail or {} } def create_edge( source: str, target: str, edge_type: str, score: float = None, detail: Dict = None ) -> Dict: """创建边""" return { "source": source, "target": target, "type": edge_type, "score": score, "detail": detail or {} } # ==================== 从 pattern 提取分类节点 ==================== def extract_category_nodes_from_pattern( pattern_data: Dict, dimension_key: str, dimension_name: str ) -> Dict[str, Dict]: """ 从 pattern 聚合结果中提取分类节点 Returns: { nodeId: nodeData } """ nodes = {} if dimension_key not in pattern_data: return nodes def collect_sources_recursively(node: Dict) -> List[Dict]: """递归收集节点及其所有子节点的特征来源""" sources = [] if "特征列表" in node: for feature in node["特征列表"]: source = { "pointName": feature.get("所属点", ""), "pointDesc": feature.get("点描述", ""), "postId": feature.get("帖子id", "") } sources.append(source) for key, value in node.items(): if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]: continue if isinstance(value, dict): sources.extend(collect_sources_recursively(value)) return sources def traverse_node(node: Dict, parent_path: List[str]): """递归遍历节点""" for key, value in node.items(): if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]: continue if isinstance(value, dict): current_path = parent_path + [key] # 获取帖子列表 post_ids = value.get("帖子列表", []) # 构建节点来源 node_sources = [] if "特征列表" in value: for feature in value["特征列表"]: source = { "pointName": feature.get("所属点", ""), "pointDesc": feature.get("点描述", ""), "postId": feature.get("帖子id", "") } node_sources.append(source) else: node_sources = collect_sources_recursively(value) # 计算帖子数 if post_ids: post_count = len(post_ids) else: post_count = len(set(s.get("postId", "") for s in node_sources if s.get("postId"))) # 构建节点 node_id = build_node_id("人设", dimension_name, "分类", key) nodes[node_id] = create_node( domain="人设", dimension=dimension_name, node_type="分类", name=key, detail={ "parentPath": parent_path.copy(), "postCount": post_count, "sources": node_sources } ) # 递归处理子节点 traverse_node(value, current_path) traverse_node(pattern_data[dimension_key], []) return nodes # ==================== 从 pattern 提取标签节点 ==================== def extract_tag_nodes_from_pattern( pattern_data: Dict, dimension_key: str, dimension_name: str ) -> Dict[str, Dict]: """ 从 pattern 聚合结果中提取标签节点 Returns: { nodeId: nodeData } """ nodes = {} tag_map = {} # 用于合并同名标签: tagId -> { sources, postIds, parentPath } if dimension_key not in pattern_data: return nodes def traverse_node(node: Dict, parent_path: List[str]): """递归遍历节点""" # 处理特征列表(标签) if "特征列表" in node: for feature in node["特征列表"]: tag_name = feature.get("特征名称", "") if not tag_name: continue source = { "pointName": feature.get("所属点", ""), "pointDesc": feature.get("点描述", ""), "postId": feature.get("帖子id", "") } tag_id = build_node_id("人设", dimension_name, "标签", tag_name) if tag_id not in tag_map: tag_map[tag_id] = { "name": tag_name, "sources": [], "postIds": set(), "parentPath": parent_path.copy() } tag_map[tag_id]["sources"].append(source) if source["postId"]: tag_map[tag_id]["postIds"].add(source["postId"]) # 递归处理子节点 for key, value in node.items(): if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]: continue if isinstance(value, dict): current_path = parent_path + [key] traverse_node(value, current_path) traverse_node(pattern_data[dimension_key], []) # 转换为节点 for tag_id, tag_info in tag_map.items(): nodes[tag_id] = create_node( domain="人设", dimension=dimension_name, node_type="标签", name=tag_info["name"], detail={ "parentPath": tag_info["parentPath"], "postCount": len(tag_info["postIds"]), "sources": tag_info["sources"] } ) return nodes # ==================== 从 pattern 提取属于/包含边 ==================== def extract_belong_contain_edges( pattern_data: Dict, dimension_key: str, dimension_name: str, nodes: Dict[str, Dict] ) -> Dict[str, Dict]: """ 从 pattern 聚合结果中提取属于/包含边 Returns: { edgeId: edgeData } """ edges = {} if dimension_key not in pattern_data: return edges # 构建分类名称到ID的映射 category_name_to_id = {} for node_id, node_data in nodes.items(): if node_data["type"] == "分类" and node_data["dimension"] == dimension_name: category_name_to_id[node_data["name"]] = node_id # 为每个节点创建属于边(子→父) for node_id, node_data in nodes.items(): if node_data["dimension"] != dimension_name: continue parent_path = node_data["detail"].get("parentPath", []) if not parent_path: continue # 取最后一个作为直接父分类 parent_name = parent_path[-1] parent_id = category_name_to_id.get(parent_name) if parent_id: # 属于边:子 → 父 edge_id = build_edge_id(node_id, "属于", parent_id) edges[edge_id] = create_edge( source=node_id, target=parent_id, edge_type="属于", score=1.0, detail={} ) # 包含边:父 → 子 edge_id_contain = build_edge_id(parent_id, "包含", node_id) edges[edge_id_contain] = create_edge( source=parent_id, target=node_id, edge_type="包含", score=1.0, detail={} ) return edges # ==================== 从关联分析提取分类共现边(跨点)==================== def extract_category_cooccur_edges(associations_data: Dict) -> Dict[str, Dict]: """ 从 dimension_associations_analysis.json 中提取分类共现边(跨点) Returns: { edgeId: edgeData } """ edges = {} if "单维度关联分析" not in associations_data: return edges single_dim = associations_data["单维度关联分析"] # 维度映射 dimension_map = { "灵感点维度": "灵感点", "目的点维度": "目的点", "关键点维度": "关键点" } def get_last_segment(path: str) -> str: """获取路径的最后一段""" return path.split("/")[-1] for dim_key, dim_data in single_dim.items(): if dim_key not in dimension_map: continue source_dimension = dimension_map[dim_key] for direction_key, direction_data in dim_data.items(): if direction_key == "说明" or "→" not in direction_key: continue for source_path, source_info in direction_data.items(): source_name = get_last_segment(source_path) source_node_id = build_node_id("人设", source_dimension, "分类", source_name) for field_name, associations in source_info.items(): if not field_name.startswith("与") or not field_name.endswith("的关联"): continue target_dimension = field_name[1:-3] if not isinstance(associations, list): continue for assoc in associations: target_path = assoc.get("目标分类", "") if not target_path: continue target_name = get_last_segment(target_path) target_node_id = build_node_id("人设", target_dimension, "分类", target_name) # 使用 Jaccard 作为 score jaccard = assoc.get("Jaccard相似度", 0) edge_id = build_edge_id(source_node_id, "分类共现", target_node_id) edges[edge_id] = create_edge( source=source_node_id, target=target_node_id, edge_type="分类共现", score=jaccard, detail={ "jaccard": jaccard, "overlapCoef": assoc.get("重叠系数", 0), "cooccurCount": assoc.get("共同帖子数", 0), "cooccurPosts": assoc.get("共同帖子ID", []) } ) return edges # ==================== 从关联分析提取分类共现边(点内)==================== def extract_intra_category_cooccur_edges(intra_data: Dict) -> Dict[str, Dict]: """ 从 intra_dimension_associations_analysis.json 中提取点内分类共现边 Returns: { edgeId: edgeData } """ edges = {} if "叶子分类组合聚类" not in intra_data: return edges clusters_by_dim = intra_data["叶子分类组合聚类"] for dimension, clusters in clusters_by_dim.items(): if dimension not in ("灵感点", "目的点", "关键点"): continue for cluster_key, cluster_data in clusters.items(): leaf_categories = cluster_data.get("叶子分类组合", []) point_count = cluster_data.get("点数", 0) point_details = cluster_data.get("点详情列表", []) # 提取点名称列表 point_names = [p.get("点名称", "") for p in point_details if p.get("点名称")] # 两两组合生成共现边 for i in range(len(leaf_categories)): for j in range(i + 1, len(leaf_categories)): cat1 = leaf_categories[i] cat2 = leaf_categories[j] cat1_id = build_node_id("人设", dimension, "分类", cat1) cat2_id = build_node_id("人设", dimension, "分类", cat2) # 确保顺序一致(按字典序) if cat1_id > cat2_id: cat1_id, cat2_id = cat2_id, cat1_id edge_id = build_edge_id(cat1_id, "分类共现_点内", cat2_id) if edge_id in edges: # 累加 edges[edge_id]["detail"]["pointCount"] += point_count edges[edge_id]["detail"]["pointNames"].extend(point_names) else: edges[edge_id] = create_edge( source=cat1_id, target=cat2_id, edge_type="分类共现_点内", score=point_count, # 先用点数作为 score,后续可归一化 detail={ "pointCount": point_count, "pointNames": point_names.copy() } ) return edges # ==================== 从历史帖子提取标签共现边 ==================== def extract_tag_cooccur_edges(historical_posts_dir: Path) -> Dict[str, Dict]: """ 从历史帖子解构结果中提取标签共现边 Returns: { edgeId: edgeData } """ edges = {} cooccur_map = {} # (tag1_id, tag2_id, dimension) -> { cooccurPosts: set() } if not historical_posts_dir.exists(): print(f" 警告: 历史帖子目录不存在: {historical_posts_dir}") return edges json_files = list(historical_posts_dir.glob("*.json")) print(f" 找到 {len(json_files)} 个历史帖子文件") def extract_post_id_from_filename(filename: str) -> str: """从文件名中提取帖子ID""" import re match = re.match(r'^([^_]+)_', filename) return match.group(1) if match else "" def extract_tags_from_post(post_data: Dict) -> Dict[str, List[str]]: """从帖子解构结果中提取所有标签""" tags_by_dimension = { "灵感点": [], "目的点": [], "关键点": [] } if "三点解构" not in post_data: return tags_by_dimension three_points = post_data["三点解构"] # 灵感点 if "灵感点" in three_points: inspiration = three_points["灵感点"] for section in ["全新内容", "共性差异", "共性内容"]: if section in inspiration and isinstance(inspiration[section], list): for item in inspiration[section]: if "提取的特征" in item and isinstance(item["提取的特征"], list): for feature in item["提取的特征"]: tag_name = feature.get("特征名称", "") if tag_name: tags_by_dimension["灵感点"].append(tag_name) # 目的点 if "目的点" in three_points: purpose = three_points["目的点"] if "purposes" in purpose and isinstance(purpose["purposes"], list): for item in purpose["purposes"]: if "提取的特征" in item and isinstance(item["提取的特征"], list): for feature in item["提取的特征"]: tag_name = feature.get("特征名称", "") if tag_name: tags_by_dimension["目的点"].append(tag_name) # 关键点 if "关键点" in three_points: key_points = three_points["关键点"] if "key_points" in key_points and isinstance(key_points["key_points"], list): for item in key_points["key_points"]: if "提取的特征" in item and isinstance(item["提取的特征"], list): for feature in item["提取的特征"]: tag_name = feature.get("特征名称", "") if tag_name: tags_by_dimension["关键点"].append(tag_name) return tags_by_dimension # 遍历所有帖子文件 for file_path in json_files: post_id = extract_post_id_from_filename(file_path.name) if not post_id: continue try: with open(file_path, "r", encoding="utf-8") as f: post_data = json.load(f) tags_by_dimension = extract_tags_from_post(post_data) # 对每个维度内的标签两两组合 for dimension, tags in tags_by_dimension.items(): unique_tags = list(set(tags)) for i in range(len(unique_tags)): for j in range(i + 1, len(unique_tags)): tag1 = unique_tags[i] tag2 = unique_tags[j] tag1_id = build_node_id("人设", dimension, "标签", tag1) tag2_id = build_node_id("人设", dimension, "标签", tag2) # 确保顺序一致 if tag1_id > tag2_id: tag1_id, tag2_id = tag2_id, tag1_id key = (tag1_id, tag2_id) if key not in cooccur_map: cooccur_map[key] = {"cooccurPosts": set()} cooccur_map[key]["cooccurPosts"].add(post_id) except Exception as e: print(f" 警告: 处理文件 {file_path.name} 时出错: {e}") # 转换为边 for (tag1_id, tag2_id), info in cooccur_map.items(): cooccur_posts = list(info["cooccurPosts"]) cooccur_count = len(cooccur_posts) edge_id = build_edge_id(tag1_id, "标签共现", tag2_id) edges[edge_id] = create_edge( source=tag1_id, target=tag2_id, edge_type="标签共现", score=cooccur_count, # 先用共现次数,后续可归一化 detail={ "cooccurCount": cooccur_count, "cooccurPosts": cooccur_posts } ) return edges # ==================== 构建嵌套树结构 ==================== def build_nested_tree(nodes: Dict[str, Dict], edges: Dict[str, Dict]) -> Dict: """ 从根节点开始,沿"包含"边递归构建嵌套树结构 包含边:父节点 -> 子节点 从根节点开始,递归找所有包含的子节点 Returns: 嵌套的树结构 """ # 从"包含"边构建 父节点 -> [子节点] 的映射 parent_to_children = {} # parent_id -> [child_id, ...] for edge_id, edge_data in edges.items(): if edge_data["type"] == "包含": parent_id = edge_data["source"] child_id = edge_data["target"] if parent_id not in parent_to_children: parent_to_children[parent_id] = [] parent_to_children[parent_id].append(child_id) # 递归构建子树 def build_subtree(node_id: str) -> Dict: node_data = nodes[node_id] subtree = { "id": node_id, "name": node_data["name"], "type": node_data["type"], "domain": node_data["domain"], "dimension": node_data["dimension"], "detail": node_data.get("detail", {}), "children": [] } # 获取子节点 child_ids = parent_to_children.get(node_id, []) for child_id in child_ids: if child_id in nodes: subtree["children"].append(build_subtree(child_id)) return subtree # 从根节点开始构建 root_id = "人设:人设:人设:人设" return build_subtree(root_id) # ==================== 图游走工具 ==================== def walk_graph( index: Dict, start_node: str, edge_types: List[str], direction: str = "out", min_score: float = None ) -> Set[str]: """ 从起始节点出发,按指定边类型序列游走N步 Args: index: 游走索引 {"outEdges": {...}, "inEdges": {...}} start_node: 起始节点ID edge_types: 边类型序列,如 ["属于", "分类共现"] direction: 游走方向 "out"(沿出边) / "in"(沿入边) min_score: 最小分数过滤 Returns: 到达的节点ID集合 Example: # 从标签出发,沿"属于"边走1步,再沿"分类共现"边走1步 result = walk_graph( index, "人设:灵感点:标签:手绘风格", ["属于", "分类共现"] ) """ edge_index = index["outEdges"] if direction == "out" else index["inEdges"] target_key = "target" if direction == "out" else "source" current_nodes = {start_node} for edge_type in edge_types: next_nodes = set() for node in current_nodes: neighbors = edge_index.get(node, {}).get(edge_type, []) for neighbor in neighbors: # 分数过滤 if min_score is not None and neighbor.get("score", 0) < min_score: continue next_nodes.add(neighbor[target_key]) current_nodes = next_nodes if not current_nodes: break return current_nodes def get_neighbors( index: Dict, node_id: str, edge_type: str = None, direction: str = "out", min_score: float = None ) -> List[Dict]: """ 获取节点的邻居 Args: index: 游走索引 node_id: 节点ID edge_type: 边类型(可选,不指定则返回所有类型) direction: 方向 "out" / "in" min_score: 最小分数过滤 Returns: 邻居列表 [{"target": "...", "score": 0.5}, ...] """ edge_index = index["outEdges"] if direction == "out" else index["inEdges"] node_edges = edge_index.get(node_id, {}) if edge_type: neighbors = node_edges.get(edge_type, []) else: neighbors = [] for edges in node_edges.values(): neighbors.extend(edges) if min_score is not None: neighbors = [n for n in neighbors if n.get("score", 0) >= min_score] return neighbors # ==================== 构建索引 ==================== def build_index(edges: Dict[str, Dict]) -> Dict: """ 构建游走索引 Returns: { "outEdges": { nodeId: { edgeType: [{ target, score }] } }, "inEdges": { nodeId: { edgeType: [{ source, score }] } } } """ out_edges = {} in_edges = {} for edge_id, edge_data in edges.items(): source = edge_data["source"] target = edge_data["target"] edge_type = edge_data["type"] score = edge_data["score"] # outEdges if source not in out_edges: out_edges[source] = {} if edge_type not in out_edges[source]: out_edges[source][edge_type] = [] out_edges[source][edge_type].append({ "target": target, "score": score }) # inEdges if target not in in_edges: in_edges[target] = {} if edge_type not in in_edges[target]: in_edges[target][edge_type] = [] in_edges[target][edge_type].append({ "source": source, "score": score }) return { "outEdges": out_edges, "inEdges": in_edges } # ==================== 主函数 ==================== def main(): config = PathConfig() config.ensure_dirs() print(f"账号: {config.account_name}") print(f"输出版本: {config.output_version}") print() # 输入文件路径 pattern_file = config.pattern_cluster_file associations_file = config.account_dir / "pattern相关文件/optimization/dimension_associations_analysis.json" intra_associations_file = config.account_dir / "pattern相关文件/optimization/intra_dimension_associations_analysis.json" historical_posts_dir = config.historical_posts_dir # 输出文件路径 output_file = config.intermediate_dir / "人设图谱.json" print("输入文件:") print(f" pattern聚合文件: {pattern_file}") print(f" 跨点关联分析文件: {associations_file}") print(f" 点内关联分析文件: {intra_associations_file}") print(f" 历史帖子目录: {historical_posts_dir}") print(f"\n输出文件: {output_file}") print() # ===== 读取数据 ===== print("=" * 60) print("读取数据...") print(" 读取 pattern 聚合结果...") with open(pattern_file, "r", encoding="utf-8") as f: pattern_data = json.load(f) print(" 读取跨点关联分析结果...") with open(associations_file, "r", encoding="utf-8") as f: associations_data = json.load(f) print(" 读取点内关联分析结果...") with open(intra_associations_file, "r", encoding="utf-8") as f: intra_associations_data = json.load(f) # ===== 提取节点 ===== print("\n" + "=" * 60) print("提取节点...") all_nodes = {} dimension_mapping = { "灵感点列表": "灵感点", "目的点": "目的点", "关键点列表": "关键点" } # 分类节点 print("\n提取分类节点:") for dim_key, dim_name in dimension_mapping.items(): category_nodes = extract_category_nodes_from_pattern(pattern_data, dim_key, dim_name) all_nodes.update(category_nodes) print(f" {dim_name}: {len(category_nodes)} 个") # 标签节点 print("\n提取标签节点:") for dim_key, dim_name in dimension_mapping.items(): tag_nodes = extract_tag_nodes_from_pattern(pattern_data, dim_key, dim_name) all_nodes.update(tag_nodes) print(f" {dim_name}: {len(tag_nodes)} 个") # 统计 category_count = sum(1 for n in all_nodes.values() if n["type"] == "分类") tag_count = sum(1 for n in all_nodes.values() if n["type"] == "标签") print(f"\n节点总计: {len(all_nodes)} (分类: {category_count}, 标签: {tag_count})") # ===== 提取边 ===== print("\n" + "=" * 60) print("提取边...") all_edges = {} # 属于/包含边 print("\n提取属于/包含边:") for dim_key, dim_name in dimension_mapping.items(): belong_contain_edges = extract_belong_contain_edges(pattern_data, dim_key, dim_name, all_nodes) all_edges.update(belong_contain_edges) belong_count = sum(1 for e in all_edges.values() if e["type"] == "属于") contain_count = sum(1 for e in all_edges.values() if e["type"] == "包含") print(f" 属于边: {belong_count}, 包含边: {contain_count}") # 分类共现边(跨点) print("\n提取分类共现边(跨点):") category_cooccur_edges = extract_category_cooccur_edges(associations_data) all_edges.update(category_cooccur_edges) print(f" 分类共现边: {len(category_cooccur_edges)}") # 分类共现边(点内) print("\n提取分类共现边(点内):") intra_category_edges = extract_intra_category_cooccur_edges(intra_associations_data) all_edges.update(intra_category_edges) print(f" 分类共现_点内边: {len(intra_category_edges)}") # 标签共现边 print("\n提取标签共现边:") tag_cooccur_edges = extract_tag_cooccur_edges(historical_posts_dir) all_edges.update(tag_cooccur_edges) print(f" 标签共现边: {len(tag_cooccur_edges)}") # ===== 添加根节点和维度节点 ===== print("\n添加根节点和维度节点:") # 根节点 root_id = "人设:人设:人设:人设" all_nodes[root_id] = create_node( domain="人设", dimension="人设", node_type="人设", name="人设", detail={} ) # 维度节点 + 边 dimensions = ["灵感点", "目的点", "关键点"] for dim in dimensions: dim_id = f"人设:{dim}:{dim}:{dim}" all_nodes[dim_id] = create_node( domain="人设", dimension=dim, node_type=dim, name=dim, detail={} ) # 维度 -> 根 的属于边 edge_id = build_edge_id(dim_id, "属于", root_id) all_edges[edge_id] = create_edge( source=dim_id, target=root_id, edge_type="属于", score=1.0, detail={} ) # 根 -> 维度 的包含边 edge_id_contain = build_edge_id(root_id, "包含", dim_id) all_edges[edge_id_contain] = create_edge( source=root_id, target=dim_id, edge_type="包含", score=1.0, detail={} ) # 找该维度下的顶级分类(没有父节点的分类),添加边 dim_categories = [ (nid, ndata) for nid, ndata in all_nodes.items() if ndata["dimension"] == dim and ndata["type"] == "分类" and not ndata["detail"].get("parentPath") ] for cat_id, cat_data in dim_categories: # 顶级分类 -> 维度 的属于边 edge_id = build_edge_id(cat_id, "属于", dim_id) all_edges[edge_id] = create_edge( source=cat_id, target=dim_id, edge_type="属于", score=1.0, detail={} ) # 维度 -> 顶级分类 的包含边 edge_id_contain = build_edge_id(dim_id, "包含", cat_id) all_edges[edge_id_contain] = create_edge( source=dim_id, target=cat_id, edge_type="包含", score=1.0, detail={} ) print(f" 添加节点: 1 根节点 + 3 维度节点 = 4") print(f" 添加边: 根↔维度 6条 + 维度↔顶级分类") # 边统计 edge_type_counts = {} for edge in all_edges.values(): t = edge["type"] edge_type_counts[t] = edge_type_counts.get(t, 0) + 1 print(f"\n边总计: {len(all_edges)}") for t, count in sorted(edge_type_counts.items(), key=lambda x: -x[1]): print(f" {t}: {count}") # ===== 构建索引 ===== print("\n" + "=" * 60) print("构建索引...") index = build_index(all_edges) print(f" outEdges 节点数: {len(index['outEdges'])}") print(f" inEdges 节点数: {len(index['inEdges'])}") # ===== 构建嵌套树 ===== print("\n" + "=" * 60) print("构建嵌套树...") tree = build_nested_tree(all_nodes, all_edges) # 统计树节点数 def count_tree_nodes(node): count = 1 for child in node.get("children", []): count += count_tree_nodes(child) return count tree_node_count = count_tree_nodes(tree) print(f" 树节点数: {tree_node_count}") # ===== 统计各维度 ===== dimension_stats = {} for dim_name in ["灵感点", "目的点", "关键点"]: dim_categories = sum(1 for n in all_nodes.values() if n["type"] == "分类" and n["dimension"] == dim_name) dim_tags = sum(1 for n in all_nodes.values() if n["type"] == "标签" and n["dimension"] == dim_name) dimension_stats[dim_name] = { "categoryCount": dim_categories, "tagCount": dim_tags } # ===== 构建输出 ===== print("\n" + "=" * 60) print("保存结果...") output_data = { "meta": { "description": "人设图谱数据", "account": config.account_name, "createdAt": datetime.now().isoformat(), "stats": { "nodeCount": len(all_nodes), "edgeCount": len(all_edges), "categoryCount": category_count, "tagCount": tag_count, "treeNodeCount": tree_node_count, "dimensions": dimension_stats, "edgeTypes": edge_type_counts } }, "nodes": all_nodes, "edges": all_edges, "index": index, "tree": tree } with open(output_file, "w", encoding="utf-8") as f: json.dump(output_data, f, ensure_ascii=False, indent=2) print(f"\n输出文件: {output_file}") print("\n" + "=" * 60) print("完成!") if __name__ == "__main__": main()