#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 构建人设图谱 ================================================================================ 输入文件: ================================================================================ 1. pattern聚合结果.json - 分类节点、标签节点、属于/包含边 2. dimension_associations_analysis.json - 分类共现边(跨点) 3. intra_dimension_associations_analysis.json - 分类共现边(点内) 4. 历史帖子解构目录/*.json - 标签共现边 ================================================================================ 输出文件: 人设图谱.json ================================================================================ { "meta": { # 元信息 "description": "...", "account": "账号名", "createdAt": "时间戳", "stats": { ... } # 统计信息 }, "nodes": { # 节点字典 (nodeId -> nodeData) "{domain}:{dimension}:{type}:{name}": { "name": "显示名称", "type": "人设|灵感点|目的点|关键点|分类|标签", "domain": "人设", "dimension": "人设|灵感点|目的点|关键点", "detail": { ... } } }, "edges": { # 边字典 (edgeId -> edgeData) "{source}|{type}|{target}": { "source": "源节点ID", "target": "目标节点ID", "type": "属于|包含|标签共现|分类共现|分类共现", "score": 0.5, "detail": { ... } } }, "index": { # 游走索引 "outEdges": { nodeId: { edgeType: [{ target, score }] } }, "inEdges": { nodeId: { edgeType: [{ source, score }] } } }, "tree": { ... } # 嵌套树结构(从根节点沿"包含"边构建) } ================================================================================ 核心逻辑: ================================================================================ 1. 提取节点 - 从 pattern 提取分类节点(按维度分组的层级分类) - 从 pattern 提取标签节点(具体特征标签) - 添加根节点(人设)和维度节点(灵感点/目的点/关键点) 2. 提取边 - 属于/包含边:根据节点的 parentPath 构建层级关系 - 分类共现边(跨点):从关联分析结果提取 - 分类共现边(点内):从点内关联分析提取 - 标签共现边:遍历历史帖子,统计标签同现 3. 构建索引 - outEdges: 从该节点出发能到达的节点 - inEdges: 能到达该节点的源节点 4. 构建树 - 从根节点开始,沿"包含"边递归构建嵌套树结构 ================================================================================ 节点ID格式: {domain}:{dimension}:{type}:{name} ================================================================================ - 根节点: 人设:人设:人设:人设 - 维度节点: 人设:灵感点:灵感点:灵感点 - 分类节点: 人设:灵感点:分类:视觉呈现 - 标签节点: 人设:灵感点:标签:手绘风格 ================================================================================ 边类型: ================================================================================ - 属于: 子节点 -> 父节点(层级关系) - 包含: 父节点 -> 子节点(层级关系) - 标签共现: 标签 <-> 标签(同一帖子出现) - 分类共现: 分类 <-> 分类(跨维度共现) - 分类共现: 分类 <-> 分类(点内组合共现) ================================================================================ 图游走函数: ================================================================================ 1. walk_graph(index, start_node, edge_types, direction, min_score) - 从起始节点出发,按边类型序列游走N步 - 示例: walk_graph(index, "人设:灵感点:标签:手绘风格", ["属于", "分类共现"]) - 返回: 到达的节点ID集合 2. get_neighbors(index, node_id, edge_type, direction, min_score) - 获取节点的邻居 - 示例: get_neighbors(index, "人设:灵感点:分类:视觉呈现", "包含") - 返回: 邻居列表 [{"target": "...", "score": 0.5}, ...] ================================================================================ """ import json from pathlib import Path from typing import Dict, List, Set, Any from datetime import datetime import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from script.data_processing.path_config import PathConfig # ==================== 节点和边构建工具 ==================== def build_node_id(domain: str, dimension: str, node_type: str, name: str) -> str: """构建节点ID""" return f"{domain}:{dimension}:{node_type}:{name}" def build_edge_id(source: str, edge_type: str, target: str) -> str: """构建边ID""" return f"{source}|{edge_type}|{target}" def create_node( domain: str, dimension: str, node_type: str, name: str, detail: Dict = None ) -> Dict: """创建节点""" return { "name": name, "type": node_type, "dimension": dimension, "domain": domain, "detail": detail or {} } def create_edge( source: str, target: str, edge_type: str, score: float = None, detail: Dict = None ) -> Dict: """创建边""" return { "source": source, "target": target, "type": edge_type, "score": score, "detail": detail or {} } # ==================== 从 pattern 提取分类节点 ==================== def extract_category_nodes_from_pattern( pattern_data: Dict, dimension_key: str, dimension_name: str ) -> Dict[str, Dict]: """ 从 pattern 聚合结果中提取分类节点 Returns: { nodeId: nodeData } """ nodes = {} if dimension_key not in pattern_data: return nodes def collect_sources_recursively(node: Dict) -> List[Dict]: """递归收集节点及其所有子节点的特征来源""" sources = [] if "特征列表" in node: for feature in node["特征列表"]: source = { "pointName": feature.get("所属点", ""), "pointDesc": feature.get("点描述", ""), "postId": feature.get("帖子id", "") } sources.append(source) for key, value in node.items(): if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]: continue if isinstance(value, dict): sources.extend(collect_sources_recursively(value)) return sources def traverse_node(node: Dict, parent_path: List[str]): """递归遍历节点""" for key, value in node.items(): if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]: continue if isinstance(value, dict): current_path = parent_path + [key] # 构建节点来源(只收集当前节点的特征) node_sources = [] if "特征列表" in value: for feature in value["特征列表"]: source = { "pointName": feature.get("所属点", ""), "pointDesc": feature.get("点描述", ""), "postId": feature.get("帖子id", "") } node_sources.append(source) # 收集帖子ID列表(递归收集当前节点及所有子节点的帖子ID,去重) all_sources = collect_sources_recursively(value) unique_post_ids = list(set(s.get("postId", "") for s in all_sources if s.get("postId"))) # 构建节点 node_id = build_node_id("人设", dimension_name, "分类", key) nodes[node_id] = create_node( domain="人设", dimension=dimension_name, node_type="分类", name=key, detail={ "parentPath": parent_path.copy(), "postIds": unique_post_ids, "postCount": len(unique_post_ids), "sources": node_sources } ) # 递归处理子节点 traverse_node(value, current_path) traverse_node(pattern_data[dimension_key], []) return nodes # ==================== 从 pattern 提取标签节点 ==================== def extract_tag_nodes_from_pattern( pattern_data: Dict, dimension_key: str, dimension_name: str ) -> Dict[str, Dict]: """ 从 pattern 聚合结果中提取标签节点 Returns: { nodeId: nodeData } """ nodes = {} tag_map = {} # 用于合并同名标签: tagId -> { sources, postIds, parentPath } if dimension_key not in pattern_data: return nodes def traverse_node(node: Dict, parent_path: List[str]): """递归遍历节点""" # 处理特征列表(标签) if "特征列表" in node: for feature in node["特征列表"]: tag_name = feature.get("特征名称", "") if not tag_name: continue source = { "pointName": feature.get("所属点", ""), "pointDesc": feature.get("点描述", ""), "postId": feature.get("帖子id", "") } tag_id = build_node_id("人设", dimension_name, "标签", tag_name) if tag_id not in tag_map: tag_map[tag_id] = { "name": tag_name, "sources": [], "postIds": set(), "parentPath": parent_path.copy() } tag_map[tag_id]["sources"].append(source) if source["postId"]: tag_map[tag_id]["postIds"].add(source["postId"]) # 递归处理子节点 for key, value in node.items(): if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]: continue if isinstance(value, dict): current_path = parent_path + [key] traverse_node(value, current_path) traverse_node(pattern_data[dimension_key], []) # 转换为节点 for tag_id, tag_info in tag_map.items(): nodes[tag_id] = create_node( domain="人设", dimension=dimension_name, node_type="标签", name=tag_info["name"], detail={ "parentPath": tag_info["parentPath"], "postIds": list(tag_info["postIds"]), "postCount": len(tag_info["postIds"]), "sources": tag_info["sources"] } ) return nodes # ==================== 从 pattern 提取属于/包含边 ==================== def extract_belong_contain_edges( pattern_data: Dict, dimension_key: str, dimension_name: str, nodes: Dict[str, Dict] ) -> Dict[str, Dict]: """ 从 pattern 聚合结果中提取属于/包含边 Returns: { edgeId: edgeData } """ edges = {} if dimension_key not in pattern_data: return edges # 构建分类名称到ID的映射 category_name_to_id = {} for node_id, node_data in nodes.items(): if node_data["type"] == "分类" and node_data["dimension"] == dimension_name: category_name_to_id[node_data["name"]] = node_id # 为每个节点创建属于边(子→父) for node_id, node_data in nodes.items(): if node_data["dimension"] != dimension_name: continue parent_path = node_data["detail"].get("parentPath", []) if not parent_path: continue # 取最后一个作为直接父分类 parent_name = parent_path[-1] parent_id = category_name_to_id.get(parent_name) if parent_id: # 获取 source 和 target 的 postIds child_post_ids = node_data["detail"].get("postIds", []) parent_post_ids = nodes.get(parent_id, {}).get("detail", {}).get("postIds", []) # 属于边:子 → 父 edge_id = build_edge_id(node_id, "属于", parent_id) edges[edge_id] = create_edge( source=node_id, target=parent_id, edge_type="属于", score=1.0, detail={ "sourcePostIds": child_post_ids, "targetPostIds": parent_post_ids } ) # 包含边:父 → 子 edge_id_contain = build_edge_id(parent_id, "包含", node_id) edges[edge_id_contain] = create_edge( source=parent_id, target=node_id, edge_type="包含", score=1.0, detail={ "sourcePostIds": parent_post_ids, "targetPostIds": child_post_ids } ) return edges # ==================== 从关联分析提取分类共现边(跨点)==================== def extract_category_cooccur_edges(associations_data: Dict, nodes: Dict[str, Dict]) -> Dict[str, Dict]: """ 从 dimension_associations_analysis.json 中提取分类共现边(跨点) Args: associations_data: 关联分析数据 nodes: 已构建的节点数据(用于获取节点的 postIds) Returns: { edgeId: edgeData } """ edges = {} if "单维度关联分析" not in associations_data: return edges single_dim = associations_data["单维度关联分析"] # 维度映射 dimension_map = { "灵感点维度": "灵感点", "目的点维度": "目的点", "关键点维度": "关键点" } def get_last_segment(path: str) -> str: """获取路径的最后一段""" return path.split("/")[-1] for dim_key, dim_data in single_dim.items(): if dim_key not in dimension_map: continue source_dimension = dimension_map[dim_key] for direction_key, direction_data in dim_data.items(): if direction_key == "说明" or "→" not in direction_key: continue for source_path, source_info in direction_data.items(): source_name = get_last_segment(source_path) source_node_id = build_node_id("人设", source_dimension, "分类", source_name) for field_name, associations in source_info.items(): if not field_name.startswith("与") or not field_name.endswith("的关联"): continue target_dimension = field_name[1:-3] if not isinstance(associations, list): continue for assoc in associations: target_path = assoc.get("目标分类", "") if not target_path: continue target_name = get_last_segment(target_path) target_node_id = build_node_id("人设", target_dimension, "分类", target_name) # 使用 Jaccard 作为 score jaccard = assoc.get("Jaccard相似度", 0) # 获取 source 和 target 的 postIds source_post_ids = nodes.get(source_node_id, {}).get("detail", {}).get("postIds", []) target_post_ids = nodes.get(target_node_id, {}).get("detail", {}).get("postIds", []) edge_id = build_edge_id(source_node_id, "分类共现", target_node_id) edges[edge_id] = create_edge( source=source_node_id, target=target_node_id, edge_type="分类共现", score=jaccard, detail={ "postIds": assoc.get("共同帖子ID", []), "postCount": assoc.get("共同帖子数", 0), "jaccard": jaccard, "overlapCoef": assoc.get("重叠系数", 0), "sourcePostIds": source_post_ids, "targetPostIds": target_post_ids } ) return edges # ==================== 从关联分析提取分类共现边(点内)==================== def extract_intra_category_cooccur_edges(intra_data: Dict, nodes: Dict[str, Dict]) -> Dict[str, Dict]: """ 从 intra_dimension_associations_analysis.json 中提取点内分类共现边 Args: intra_data: 点内关联分析数据 nodes: 已构建的节点数据(用于获取节点的 postIds) Returns: { edgeId: edgeData } """ edges = {} if "叶子分类组合聚类" not in intra_data: return edges clusters_by_dim = intra_data["叶子分类组合聚类"] for dimension, clusters in clusters_by_dim.items(): if dimension not in ("灵感点", "目的点", "关键点"): continue for cluster_key, cluster_data in clusters.items(): leaf_categories = cluster_data.get("叶子分类组合", []) point_count = cluster_data.get("点数", 0) point_details = cluster_data.get("点详情列表", []) # 提取点名称列表 point_names = [p.get("点名称", "") for p in point_details if p.get("点名称")] # 两两组合生成共现边 for i in range(len(leaf_categories)): for j in range(i + 1, len(leaf_categories)): cat1 = leaf_categories[i] cat2 = leaf_categories[j] cat1_id = build_node_id("人设", dimension, "分类", cat1) cat2_id = build_node_id("人设", dimension, "分类", cat2) # 确保顺序一致(按字典序) if cat1_id > cat2_id: cat1_id, cat2_id = cat2_id, cat1_id edge_id = build_edge_id(cat1_id, "分类共现", cat2_id) if edge_id in edges: # 累加 edges[edge_id]["detail"]["pointCount"] += point_count edges[edge_id]["detail"]["pointNames"].extend(point_names) else: # 获取 source 和 target 的 postIds cat1_post_ids = nodes.get(cat1_id, {}).get("detail", {}).get("postIds", []) cat2_post_ids = nodes.get(cat2_id, {}).get("detail", {}).get("postIds", []) # 计算 Jaccard(基于帖子) cat1_set = set(cat1_post_ids) cat2_set = set(cat2_post_ids) intersection = cat1_set & cat2_set union = cat1_set | cat2_set jaccard = round(len(intersection) / len(union), 4) if union else 0 edges[edge_id] = create_edge( source=cat1_id, target=cat2_id, edge_type="分类共现", score=jaccard, detail={ "postIds": list(intersection), "postCount": len(intersection), "jaccard": jaccard, "pointCount": point_count, "pointNames": point_names.copy(), "sourcePostIds": cat1_post_ids, "targetPostIds": cat2_post_ids } ) return edges # ==================== 从历史帖子提取标签共现边 ==================== def extract_tag_cooccur_edges(historical_posts_dir: Path, nodes: Dict[str, Dict]) -> Dict[str, Dict]: """ 从历史帖子解构结果中提取标签共现边 Args: historical_posts_dir: 历史帖子目录 nodes: 已构建的节点数据(用于获取标签的 postIds 计算 Jaccard) Returns: { edgeId: edgeData } """ edges = {} cooccur_map = {} # (tag1_id, tag2_id) -> { postIds: set() } if not historical_posts_dir.exists(): print(f" 警告: 历史帖子目录不存在: {historical_posts_dir}") return edges json_files = list(historical_posts_dir.glob("*.json")) print(f" 找到 {len(json_files)} 个历史帖子文件") def extract_post_id_from_filename(filename: str) -> str: """从文件名中提取帖子ID""" import re match = re.match(r'^([^_]+)_', filename) return match.group(1) if match else "" def extract_tags_from_post(post_data: Dict) -> Dict[str, List[str]]: """从帖子解构结果中提取所有标签""" tags_by_dimension = { "灵感点": [], "目的点": [], "关键点": [] } if "三点解构" not in post_data: return tags_by_dimension three_points = post_data["三点解构"] # 灵感点 if "灵感点" in three_points: inspiration = three_points["灵感点"] for section in ["全新内容", "共性差异", "共性内容"]: if section in inspiration and isinstance(inspiration[section], list): for item in inspiration[section]: if "提取的特征" in item and isinstance(item["提取的特征"], list): for feature in item["提取的特征"]: tag_name = feature.get("特征名称", "") if tag_name: tags_by_dimension["灵感点"].append(tag_name) # 目的点 if "目的点" in three_points: purpose = three_points["目的点"] if "purposes" in purpose and isinstance(purpose["purposes"], list): for item in purpose["purposes"]: if "提取的特征" in item and isinstance(item["提取的特征"], list): for feature in item["提取的特征"]: tag_name = feature.get("特征名称", "") if tag_name: tags_by_dimension["目的点"].append(tag_name) # 关键点 if "关键点" in three_points: key_points = three_points["关键点"] if "key_points" in key_points and isinstance(key_points["key_points"], list): for item in key_points["key_points"]: if "提取的特征" in item and isinstance(item["提取的特征"], list): for feature in item["提取的特征"]: tag_name = feature.get("特征名称", "") if tag_name: tags_by_dimension["关键点"].append(tag_name) return tags_by_dimension # 遍历所有帖子文件 for file_path in json_files: post_id = extract_post_id_from_filename(file_path.name) if not post_id: continue try: with open(file_path, "r", encoding="utf-8") as f: post_data = json.load(f) tags_by_dimension = extract_tags_from_post(post_data) # 对每个维度内的标签两两组合 for dimension, tags in tags_by_dimension.items(): unique_tags = list(set(tags)) for i in range(len(unique_tags)): for j in range(i + 1, len(unique_tags)): tag1 = unique_tags[i] tag2 = unique_tags[j] tag1_id = build_node_id("人设", dimension, "标签", tag1) tag2_id = build_node_id("人设", dimension, "标签", tag2) # 确保顺序一致 if tag1_id > tag2_id: tag1_id, tag2_id = tag2_id, tag1_id key = (tag1_id, tag2_id) if key not in cooccur_map: cooccur_map[key] = {"postIds": set()} cooccur_map[key]["postIds"].add(post_id) except Exception as e: print(f" 警告: 处理文件 {file_path.name} 时出错: {e}") # 转换为边 for (tag1_id, tag2_id), info in cooccur_map.items(): cooccur_post_ids = list(info["postIds"]) cooccur_count = len(cooccur_post_ids) # 获取两个标签的帖子集合,计算 Jaccard tag1_post_ids = nodes.get(tag1_id, {}).get("detail", {}).get("postIds", []) tag2_post_ids = nodes.get(tag2_id, {}).get("detail", {}).get("postIds", []) union_count = len(set(tag1_post_ids) | set(tag2_post_ids)) jaccard = round(cooccur_count / union_count, 4) if union_count > 0 else 0 edge_id = build_edge_id(tag1_id, "标签共现", tag2_id) edges[edge_id] = create_edge( source=tag1_id, target=tag2_id, edge_type="标签共现", score=jaccard, detail={ "postIds": cooccur_post_ids, "postCount": cooccur_count, "jaccard": jaccard, "sourcePostIds": tag1_post_ids, "targetPostIds": tag2_post_ids } ) return edges # ==================== 构建嵌套树结构 ==================== def build_nested_tree(nodes: Dict[str, Dict], edges: Dict[str, Dict]) -> Dict: """ 从根节点开始,沿"包含"边递归构建嵌套树结构 包含边:父节点 -> 子节点 从根节点开始,递归找所有包含的子节点 Returns: 嵌套的树结构 """ # 从"包含"边构建 父节点 -> [子节点] 的映射 parent_to_children = {} # parent_id -> [child_id, ...] for edge_id, edge_data in edges.items(): if edge_data["type"] == "包含": parent_id = edge_data["source"] child_id = edge_data["target"] if parent_id not in parent_to_children: parent_to_children[parent_id] = [] parent_to_children[parent_id].append(child_id) # 递归构建子树 def build_subtree(node_id: str) -> Dict: node_data = nodes[node_id] subtree = { "id": node_id, "name": node_data["name"], "type": node_data["type"], "domain": node_data["domain"], "dimension": node_data["dimension"], "detail": node_data.get("detail", {}), "children": [] } # 获取子节点 child_ids = parent_to_children.get(node_id, []) for child_id in child_ids: if child_id in nodes: subtree["children"].append(build_subtree(child_id)) return subtree # 从根节点开始构建 root_id = "人设:人设:人设:人设" return build_subtree(root_id) # ==================== 图游走工具 ==================== def walk_graph( index: Dict, start_node: str, edge_types: List[str], direction: str = "out", min_score: float = None ) -> Set[str]: """ 从起始节点出发,按指定边类型序列游走N步 Args: index: 游走索引 {"outEdges": {...}, "inEdges": {...}} start_node: 起始节点ID edge_types: 边类型序列,如 ["属于", "分类共现"] direction: 游走方向 "out"(沿出边) / "in"(沿入边) min_score: 最小分数过滤 Returns: 到达的节点ID集合 Example: # 从标签出发,沿"属于"边走1步,再沿"分类共现"边走1步 result = walk_graph( index, "人设:灵感点:标签:手绘风格", ["属于", "分类共现"] ) """ edge_index = index["outEdges"] if direction == "out" else index["inEdges"] target_key = "target" if direction == "out" else "source" current_nodes = {start_node} for edge_type in edge_types: next_nodes = set() for node in current_nodes: neighbors = edge_index.get(node, {}).get(edge_type, []) for neighbor in neighbors: # 分数过滤 if min_score is not None and neighbor.get("score", 0) < min_score: continue next_nodes.add(neighbor[target_key]) current_nodes = next_nodes if not current_nodes: break return current_nodes def get_neighbors( index: Dict, node_id: str, edge_type: str = None, direction: str = "out", min_score: float = None ) -> List[Dict]: """ 获取节点的邻居 Args: index: 游走索引 node_id: 节点ID edge_type: 边类型(可选,不指定则返回所有类型) direction: 方向 "out" / "in" min_score: 最小分数过滤 Returns: 邻居列表 [{"target": "...", "score": 0.5}, ...] """ edge_index = index["outEdges"] if direction == "out" else index["inEdges"] node_edges = edge_index.get(node_id, {}) if edge_type: neighbors = node_edges.get(edge_type, []) else: neighbors = [] for edges in node_edges.values(): neighbors.extend(edges) if min_score is not None: neighbors = [n for n in neighbors if n.get("score", 0) >= min_score] return neighbors # ==================== 构建索引 ==================== def build_index(edges: Dict[str, Dict]) -> Dict: """ 构建游走索引 Returns: { "outEdges": { nodeId: { edgeType: [{ target, score }] } }, "inEdges": { nodeId: { edgeType: [{ source, score }] } } } """ out_edges = {} in_edges = {} for edge_id, edge_data in edges.items(): source = edge_data["source"] target = edge_data["target"] edge_type = edge_data["type"] score = edge_data["score"] # outEdges if source not in out_edges: out_edges[source] = {} if edge_type not in out_edges[source]: out_edges[source][edge_type] = [] out_edges[source][edge_type].append({ "target": target, "score": score }) # inEdges if target not in in_edges: in_edges[target] = {} if edge_type not in in_edges[target]: in_edges[target][edge_type] = [] in_edges[target][edge_type].append({ "source": source, "score": score }) return { "outEdges": out_edges, "inEdges": in_edges } # ==================== 主函数 ==================== def main(): config = PathConfig() config.ensure_dirs() print(f"账号: {config.account_name}") print(f"输出版本: {config.output_version}") print() # 输入文件路径 pattern_file = config.pattern_cluster_file associations_file = config.account_dir / "pattern相关文件/optimization/dimension_associations_analysis.json" intra_associations_file = config.account_dir / "pattern相关文件/optimization/intra_dimension_associations_analysis.json" historical_posts_dir = config.historical_posts_dir # 输出文件路径 output_file = config.intermediate_dir / "人设图谱.json" print("输入文件:") print(f" pattern聚合文件: {pattern_file}") print(f" 跨点关联分析文件: {associations_file}") print(f" 点内关联分析文件: {intra_associations_file}") print(f" 历史帖子目录: {historical_posts_dir}") print(f"\n输出文件: {output_file}") print() # ===== 读取数据 ===== print("=" * 60) print("读取数据...") print(" 读取 pattern 聚合结果...") with open(pattern_file, "r", encoding="utf-8") as f: pattern_data = json.load(f) print(" 读取跨点关联分析结果...") with open(associations_file, "r", encoding="utf-8") as f: associations_data = json.load(f) print(" 读取点内关联分析结果...") with open(intra_associations_file, "r", encoding="utf-8") as f: intra_associations_data = json.load(f) # ===== 提取节点 ===== print("\n" + "=" * 60) print("提取节点...") all_nodes = {} dimension_mapping = { "灵感点列表": "灵感点", "目的点": "目的点", "关键点列表": "关键点" } # 分类节点 print("\n提取分类节点:") for dim_key, dim_name in dimension_mapping.items(): category_nodes = extract_category_nodes_from_pattern(pattern_data, dim_key, dim_name) all_nodes.update(category_nodes) print(f" {dim_name}: {len(category_nodes)} 个") # 标签节点 print("\n提取标签节点:") for dim_key, dim_name in dimension_mapping.items(): tag_nodes = extract_tag_nodes_from_pattern(pattern_data, dim_key, dim_name) all_nodes.update(tag_nodes) print(f" {dim_name}: {len(tag_nodes)} 个") # 统计 category_count = sum(1 for n in all_nodes.values() if n["type"] == "分类") tag_count = sum(1 for n in all_nodes.values() if n["type"] == "标签") print(f"\n节点总计: {len(all_nodes)} (分类: {category_count}, 标签: {tag_count})") # ===== 提取边 ===== print("\n" + "=" * 60) print("提取边...") all_edges = {} # 属于/包含边 print("\n提取属于/包含边:") for dim_key, dim_name in dimension_mapping.items(): belong_contain_edges = extract_belong_contain_edges(pattern_data, dim_key, dim_name, all_nodes) all_edges.update(belong_contain_edges) belong_count = sum(1 for e in all_edges.values() if e["type"] == "属于") contain_count = sum(1 for e in all_edges.values() if e["type"] == "包含") print(f" 属于边: {belong_count}, 包含边: {contain_count}") # 分类共现边(跨点) print("\n提取分类共现边(跨点):") category_cooccur_edges = extract_category_cooccur_edges(associations_data, all_nodes) all_edges.update(category_cooccur_edges) print(f" 分类共现边: {len(category_cooccur_edges)}") # 分类共现边(点内) print("\n提取分类共现边(点内):") intra_category_edges = extract_intra_category_cooccur_edges(intra_associations_data, all_nodes) all_edges.update(intra_category_edges) print(f" 分类共现边: {len(intra_category_edges)}") # 标签共现边 print("\n提取标签共现边:") tag_cooccur_edges = extract_tag_cooccur_edges(historical_posts_dir, all_nodes) all_edges.update(tag_cooccur_edges) print(f" 标签共现边: {len(tag_cooccur_edges)}") # ===== 添加根节点和维度节点 ===== print("\n添加根节点和维度节点:") # 收集所有帖子ID(用于根节点) all_post_ids_for_root = set() for node in all_nodes.values(): post_ids = node["detail"].get("postIds", []) all_post_ids_for_root.update(post_ids) # 根节点 root_id = "人设:人设:人设:人设" root_post_ids = list(all_post_ids_for_root) all_nodes[root_id] = create_node( domain="人设", dimension="人设", node_type="人设", name="人设", detail={ "postIds": root_post_ids, "postCount": len(root_post_ids) } ) # 维度节点 + 边 dimensions = ["灵感点", "目的点", "关键点"] for dim in dimensions: # 收集该维度下所有节点的帖子ID dim_post_ids = set() for node in all_nodes.values(): if node["dimension"] == dim: post_ids = node["detail"].get("postIds", []) dim_post_ids.update(post_ids) dim_post_ids_list = list(dim_post_ids) dim_id = f"人设:{dim}:{dim}:{dim}" all_nodes[dim_id] = create_node( domain="人设", dimension=dim, node_type=dim, name=dim, detail={ "postIds": dim_post_ids_list, "postCount": len(dim_post_ids_list) } ) # 维度 -> 根 的属于边 edge_id = build_edge_id(dim_id, "属于", root_id) all_edges[edge_id] = create_edge( source=dim_id, target=root_id, edge_type="属于", score=1.0, detail={ "sourcePostIds": dim_post_ids_list, "targetPostIds": root_post_ids } ) # 根 -> 维度 的包含边 edge_id_contain = build_edge_id(root_id, "包含", dim_id) all_edges[edge_id_contain] = create_edge( source=root_id, target=dim_id, edge_type="包含", score=1.0, detail={ "sourcePostIds": root_post_ids, "targetPostIds": dim_post_ids_list } ) # 找该维度下的顶级分类(没有父节点的分类),添加边 dim_categories = [ (nid, ndata) for nid, ndata in all_nodes.items() if ndata["dimension"] == dim and ndata["type"] == "分类" and not ndata["detail"].get("parentPath") ] for cat_id, cat_data in dim_categories: cat_post_ids = cat_data["detail"].get("postIds", []) # 顶级分类 -> 维度 的属于边 edge_id = build_edge_id(cat_id, "属于", dim_id) all_edges[edge_id] = create_edge( source=cat_id, target=dim_id, edge_type="属于", score=1.0, detail={ "sourcePostIds": cat_post_ids, "targetPostIds": dim_post_ids_list } ) # 维度 -> 顶级分类 的包含边 edge_id_contain = build_edge_id(dim_id, "包含", cat_id) all_edges[edge_id_contain] = create_edge( source=dim_id, target=cat_id, edge_type="包含", score=1.0, detail={ "sourcePostIds": dim_post_ids_list, "targetPostIds": cat_post_ids } ) print(f" 添加节点: 1 根节点 + 3 维度节点 = 4") print(f" 添加边: 根↔维度 6条 + 维度↔顶级分类") # 边统计 edge_type_counts = {} for edge in all_edges.values(): t = edge["type"] edge_type_counts[t] = edge_type_counts.get(t, 0) + 1 print(f"\n边总计: {len(all_edges)}") for t, count in sorted(edge_type_counts.items(), key=lambda x: -x[1]): print(f" {t}: {count}") # ===== 计算节点概率 ===== print("\n" + "=" * 60) print("计算节点概率...") # 1. 计算总帖子数(所有帖子ID的并集) all_post_ids = set() for node in all_nodes.values(): post_ids = node["detail"].get("postIds", []) all_post_ids.update(post_ids) total_post_count = len(all_post_ids) print(f" 总帖子数: {total_post_count}") # 2. 为每个节点计算概率 for node_id, node in all_nodes.items(): post_count = node["detail"].get("postCount", 0) # 全局概率 if total_post_count > 0: node["detail"]["probGlobal"] = round(post_count / total_post_count, 4) else: node["detail"]["probGlobal"] = 0 # 相对父节点的概率 # 通过"属于"边找父节点 parent_edge_id = None for edge_id, edge in all_edges.items(): if edge["source"] == node_id and edge["type"] == "属于": parent_node_id = edge["target"] parent_node = all_nodes.get(parent_node_id) if parent_node: parent_post_count = parent_node["detail"].get("postCount", 0) if parent_post_count > 0: node["detail"]["probToParent"] = round(post_count / parent_post_count, 4) else: node["detail"]["probToParent"] = 0 break else: # 没有父节点(根节点) node["detail"]["probToParent"] = 1.0 print(f" 已为 {len(all_nodes)} 个节点计算概率") # 3. 更新"包含"边的分数(使用子节点的 probToParent) contain_edge_updated = 0 for edge_id, edge in all_edges.items(): if edge["type"] == "包含": target_node = all_nodes.get(edge["target"]) if target_node: edge["score"] = target_node["detail"].get("probToParent", 1.0) contain_edge_updated += 1 print(f" 已更新 {contain_edge_updated} 条包含边的分数") # ===== 构建索引 ===== print("\n" + "=" * 60) print("构建索引...") index = build_index(all_edges) print(f" outEdges 节点数: {len(index['outEdges'])}") print(f" inEdges 节点数: {len(index['inEdges'])}") # ===== 构建嵌套树 ===== print("\n" + "=" * 60) print("构建嵌套树...") tree = build_nested_tree(all_nodes, all_edges) # 统计树节点数 def count_tree_nodes(node): count = 1 for child in node.get("children", []): count += count_tree_nodes(child) return count tree_node_count = count_tree_nodes(tree) print(f" 树节点数: {tree_node_count}") # ===== 统计各维度 ===== dimension_stats = {} for dim_name in ["灵感点", "目的点", "关键点"]: dim_categories = sum(1 for n in all_nodes.values() if n["type"] == "分类" and n["dimension"] == dim_name) dim_tags = sum(1 for n in all_nodes.values() if n["type"] == "标签" and n["dimension"] == dim_name) dimension_stats[dim_name] = { "categoryCount": dim_categories, "tagCount": dim_tags } # ===== 构建输出 ===== print("\n" + "=" * 60) print("保存结果...") output_data = { "meta": { "description": "人设图谱数据", "account": config.account_name, "createdAt": datetime.now().isoformat(), "stats": { "nodeCount": len(all_nodes), "edgeCount": len(all_edges), "categoryCount": category_count, "tagCount": tag_count, "treeNodeCount": tree_node_count, "dimensions": dimension_stats, "edgeTypes": edge_type_counts } }, "nodes": all_nodes, "edges": all_edges, "index": index, "tree": tree } with open(output_file, "w", encoding="utf-8") as f: json.dump(output_data, f, ensure_ascii=False, indent=2) print(f"\n输出文件: {output_file}") print("\n" + "=" * 60) print("完成!") if __name__ == "__main__": main()