#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 从匹配结果中构建帖子与人设的节点边关系图 输入: 1. filtered_results目录下的匹配结果文件 2. 节点列表.json 3. 边关系.json 输出: 1. match_graph目录下的节点边关系文件 """ import json from pathlib import Path from typing import Dict, List, Set, Any, Optional import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from script.data_processing.path_config import PathConfig def build_post_node_id(dimension: str, node_type: str, name: str) -> str: """构建帖子节点ID""" return f"帖子_{dimension}_{node_type}_{name}" def build_persona_node_id(dimension: str, node_type: str, name: str) -> str: """构建人设节点ID""" return f"{dimension}_{node_type}_{name}" def extract_matched_nodes_and_edges(filtered_data: Dict) -> tuple: """ 从匹配结果中提取帖子节点、人设节点和匹配边 Args: filtered_data: 匹配结果数据 Returns: (帖子节点列表, 人设节点ID集合, 匹配边列表) """ post_nodes = [] persona_node_ids = set() match_edges = [] how_result = filtered_data.get("how解构结果", {}) # 维度映射 dimension_mapping = { "灵感点列表": "灵感点", "目的点列表": "目的点", "关键点列表": "关键点" } for list_key, dimension in dimension_mapping.items(): points = how_result.get(list_key, []) for point in points: # 遍历how步骤列表 how_steps = point.get("how步骤列表", []) for step in how_steps: features = step.get("特征列表", []) for feature in features: feature_name = feature.get("特征名称", "") weight = feature.get("权重", 0) match_results = feature.get("匹配结果", []) if not feature_name: continue # 如果有匹配结果,创建帖子节点和匹配边 if match_results: # 创建帖子节点(标签类型) post_node_id = build_post_node_id(dimension, "标签", feature_name) post_node = { "节点ID": post_node_id, "节点名称": feature_name, "节点类型": "标签", "节点层级": dimension, "权重": weight } # 避免重复添加 if not any(n["节点ID"] == post_node_id for n in post_nodes): post_nodes.append(post_node) # 处理每个匹配结果 for match in match_results: persona_name = match.get("人设特征名称", "") persona_dimension = match.get("人设特征层级", "") persona_type = match.get("特征类型", "标签") match_detail = match.get("匹配结果", {}) if not persona_name or not persona_dimension: continue # 构建人设节点ID persona_node_id = build_persona_node_id( persona_dimension, persona_type, persona_name ) persona_node_ids.add(persona_node_id) # 创建匹配边 match_edge = { "源节点ID": post_node_id, "目标节点ID": persona_node_id, "边类型": "匹配", "边详情": { "相似度": match_detail.get("相似度", 0), "说明": match_detail.get("说明", "") } } match_edges.append(match_edge) return post_nodes, persona_node_ids, match_edges def get_persona_nodes_details( persona_node_ids: Set[str], nodes_data: Dict ) -> List[Dict]: """ 从节点列表中获取人设节点的详细信息 Args: persona_node_ids: 人设节点ID集合 nodes_data: 节点列表数据 Returns: 人设节点详情列表 """ persona_nodes = [] all_nodes = nodes_data.get("节点列表", []) for node in all_nodes: if node["节点ID"] in persona_node_ids: persona_nodes.append(node) return persona_nodes def get_edges_between_nodes( node_ids: Set[str], edges_data: Dict ) -> List[Dict]: """ 获取指定节点之间的边关系 Args: node_ids: 节点ID集合 edges_data: 边关系数据 Returns: 节点之间的边列表 """ edges_between = [] all_edges = edges_data.get("边列表", []) for edge in all_edges: source_id = edge["源节点ID"] target_id = edge["目标节点ID"] # 两个节点都在集合中 if source_id in node_ids and target_id in node_ids: edges_between.append(edge) return edges_between def create_mirrored_post_edges( match_edges: List[Dict], persona_edges: List[Dict] ) -> List[Dict]: """ 根据人设节点之间的边,创建帖子节点之间的镜像边 逻辑:如果人设节点A和B之间有边,且帖子节点X匹配A,帖子节点Y匹配B, 则创建帖子节点X和Y之间的镜像边 Args: match_edges: 匹配边列表(帖子节点 -> 人设节点) persona_edges: 人设节点之间的边列表 Returns: 帖子节点之间的镜像边列表 """ # 构建人设节点到帖子节点的反向映射 # persona_id -> [post_id1, post_id2, ...] persona_to_posts = {} for edge in match_edges: post_id = edge["源节点ID"] persona_id = edge["目标节点ID"] if persona_id not in persona_to_posts: persona_to_posts[persona_id] = [] if post_id not in persona_to_posts[persona_id]: persona_to_posts[persona_id].append(post_id) # 根据人设边创建帖子镜像边 post_edges = [] seen_edges = set() for persona_edge in persona_edges: source_persona = persona_edge["源节点ID"] target_persona = persona_edge["目标节点ID"] edge_type = persona_edge["边类型"] # 获取匹配到这两个人设节点的帖子节点 source_posts = persona_to_posts.get(source_persona, []) target_posts = persona_to_posts.get(target_persona, []) # 为每对帖子节点创建镜像边 for src_post in source_posts: for tgt_post in target_posts: if src_post == tgt_post: continue # 使用排序后的key避免重复(A-B 和 B-A 视为同一条边) edge_key = tuple(sorted([src_post, tgt_post])) + (edge_type,) if edge_key in seen_edges: continue seen_edges.add(edge_key) post_edge = { "源节点ID": src_post, "目标节点ID": tgt_post, "边类型": f"镜像_{edge_type}", # 标记为镜像边 "边详情": { "原始边类型": edge_type, "源人设节点": source_persona, "目标人设节点": target_persona } } post_edges.append(post_edge) return post_edges def expand_one_layer( node_ids: Set[str], edges_data: Dict, nodes_data: Dict, edge_types: List[str] = None, direction: str = "both" ) -> tuple: """ 从指定节点扩展一层,获取相邻节点和连接边 Args: node_ids: 起始节点ID集合 edges_data: 边关系数据 nodes_data: 节点列表数据 edge_types: 要扩展的边类型列表,None表示所有类型 direction: 扩展方向 - "outgoing": 只沿出边扩展(源节点在集合中,扩展到目标节点) - "incoming": 只沿入边扩展(目标节点在集合中,扩展到源节点) - "both": 双向扩展 Returns: (扩展的节点列表, 扩展的边列表, 扩展的节点ID集合) """ expanded_node_ids = set() expanded_edges = [] all_edges = edges_data.get("边列表", []) # 找出所有与起始节点相连的边和节点 for edge in all_edges: # 过滤边类型 if edge_types and edge["边类型"] not in edge_types: continue source_id = edge["源节点ID"] target_id = edge["目标节点ID"] # 沿出边扩展:源节点在集合中,扩展到目标节点 if direction in ["outgoing", "both"]: if source_id in node_ids and target_id not in node_ids: expanded_node_ids.add(target_id) expanded_edges.append(edge) # 沿入边扩展:目标节点在集合中,扩展到源节点 if direction in ["incoming", "both"]: if target_id in node_ids and source_id not in node_ids: expanded_node_ids.add(source_id) expanded_edges.append(edge) # 获取扩展节点的详情 expanded_nodes = [] all_nodes = nodes_data.get("节点列表", []) for node in all_nodes: if node["节点ID"] in expanded_node_ids: # 标记为扩展节点 node_copy = node.copy() node_copy["是否扩展"] = True expanded_nodes.append(node_copy) return expanded_nodes, expanded_edges, expanded_node_ids def process_filtered_result( filtered_file: Path, nodes_data: Dict, edges_data: Dict, output_dir: Path ) -> Dict: """ 处理单个匹配结果文件 Args: filtered_file: 匹配结果文件路径 nodes_data: 节点列表数据 edges_data: 边关系数据 output_dir: 输出目录 Returns: 处理结果统计 """ # 读取匹配结果 with open(filtered_file, "r", encoding="utf-8") as f: filtered_data = json.load(f) post_id = filtered_data.get("帖子id", "") post_detail = filtered_data.get("帖子详情", {}) post_title = post_detail.get("title", "") # 提取节点和边 post_nodes, persona_node_ids, match_edges = extract_matched_nodes_and_edges(filtered_data) # 获取人设节点详情(直接匹配的,标记为非扩展) persona_nodes = get_persona_nodes_details(persona_node_ids, nodes_data) for node in persona_nodes: node["是否扩展"] = False # 获取人设节点之间的边 persona_edges = get_edges_between_nodes(persona_node_ids, edges_data) # 创建帖子节点之间的镜像边(基于人设边的投影) post_edges = create_mirrored_post_edges(match_edges, persona_edges) # 合并节点列表(不扩展,只保留直接匹配的节点) all_nodes = post_nodes + persona_nodes # 合并边列表 all_edges = match_edges + persona_edges + post_edges # 去重边 seen_edges = set() unique_edges = [] for edge in all_edges: edge_key = (edge["源节点ID"], edge["目标节点ID"], edge["边类型"]) if edge_key not in seen_edges: seen_edges.add(edge_key) unique_edges.append(edge) all_edges = unique_edges # 构建节点边索引 edges_by_node = {} for edge in all_edges: source_id = edge["源节点ID"] target_id = edge["目标节点ID"] edge_type = edge["边类型"] if source_id not in edges_by_node: edges_by_node[source_id] = {} if edge_type not in edges_by_node[source_id]: edges_by_node[source_id][edge_type] = {} edges_by_node[source_id][edge_type][target_id] = edge # 构建输出数据 output_data = { "说明": { "帖子ID": post_id, "帖子标题": post_title, "描述": "帖子与人设的节点匹配关系", "统计": { "帖子节点数": len(post_nodes), "人设节点数": len(persona_nodes), "匹配边数": len(match_edges), "人设节点间边数": len(persona_edges), "帖子节点间边数": len(post_edges), "总节点数": len(all_nodes), "总边数": len(all_edges) } }, "帖子节点列表": post_nodes, "人设节点列表": persona_nodes, "匹配边列表": match_edges, "人设节点间边列表": persona_edges, "帖子节点间边列表": post_edges, "节点列表": all_nodes, "边列表": all_edges, "节点边索引": edges_by_node } # 保存输出文件 output_file = output_dir / f"{post_id}_match_graph.json" with open(output_file, "w", encoding="utf-8") as f: json.dump(output_data, f, ensure_ascii=False, indent=2) return { "帖子ID": post_id, "帖子节点数": len(post_nodes), "人设节点数": len(persona_nodes), "匹配边数": len(match_edges), "人设节点间边数": len(persona_edges), "帖子节点间边数": len(post_edges), "总节点数": len(all_nodes), "总边数": len(all_edges), "输出文件": str(output_file) } def main(): # 使用路径配置 config = PathConfig() config.ensure_dirs() print(f"账号: {config.account_name}") print(f"输出版本: {config.output_version}") print() # 输入文件/目录 filtered_results_dir = config.intermediate_dir / "filtered_results" nodes_file = config.intermediate_dir / "节点列表.json" edges_file = config.intermediate_dir / "边关系.json" # 输出目录 output_dir = config.intermediate_dir / "match_graph" output_dir.mkdir(parents=True, exist_ok=True) print(f"输入:") print(f" 匹配结果目录: {filtered_results_dir}") print(f" 节点列表: {nodes_file}") print(f" 边关系: {edges_file}") print(f"\n输出目录: {output_dir}") print() # 读取节点和边数据 print("正在读取节点列表...") with open(nodes_file, "r", encoding="utf-8") as f: nodes_data = json.load(f) print(f" 共 {len(nodes_data.get('节点列表', []))} 个节点") print("正在读取边关系...") with open(edges_file, "r", encoding="utf-8") as f: edges_data = json.load(f) print(f" 共 {len(edges_data.get('边列表', []))} 条边") # 处理所有匹配结果文件 print("\n" + "="*60) print("处理匹配结果文件...") filtered_files = list(filtered_results_dir.glob("*_filtered.json")) print(f"找到 {len(filtered_files)} 个匹配结果文件") results = [] for i, filtered_file in enumerate(filtered_files, 1): print(f"\n[{i}/{len(filtered_files)}] 处理: {filtered_file.name}") result = process_filtered_result(filtered_file, nodes_data, edges_data, output_dir) results.append(result) print(f" 帖子节点: {result['帖子节点数']}, 人设节点: {result['人设节点数']}") print(f" 匹配边: {result['匹配边数']}, 人设边: {result['人设节点间边数']}, 帖子边: {result['帖子节点间边数']}") # 汇总统计 print("\n" + "="*60) print("处理完成!") print(f"\n汇总:") print(f" 处理文件数: {len(results)}") total_post = sum(r['帖子节点数'] for r in results) total_persona = sum(r['人设节点数'] for r in results) total_match = sum(r['匹配边数'] for r in results) total_persona_edges = sum(r['人设节点间边数'] for r in results) total_post_edges = sum(r['帖子节点间边数'] for r in results) print(f" 总帖子节点: {total_post}") print(f" 总人设节点: {total_persona}") print(f" 总匹配边: {total_match}") print(f" 总人设边: {total_persona_edges}") print(f" 总帖子边: {total_post_edges}") print(f"\n输出目录: {output_dir}") if __name__ == "__main__": main()