#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 从源数据文件中提取节点列表和边关系 输入: 1. 过去帖子_pattern聚合结果.json - 分类节点、标签-分类边 2. 过去帖子_what解构结果目录 - 标签节点来源 3. dimension_associations_analysis.json - 分类-分类边(共现) 输出: 1. 节点列表.json 2. 边关系.json """ import json from pathlib import Path from typing import Dict, List, Any, Set, Optional import sys import re # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from script.data_processing.path_config import PathConfig from script.detail import get_xiaohongshu_detail def get_post_detail(post_id: str) -> Optional[Dict]: """获取帖子详情""" try: detail = get_xiaohongshu_detail(post_id) return detail except Exception as e: print(f" 警告: 获取帖子 {post_id} 详情失败: {e}") return None def get_last_segment(path: str) -> str: """获取路径的最后一段""" return path.split("/")[-1] def build_node_id(dimension: str, node_type: str, name: str) -> str: """ 构建节点ID Args: dimension: 节点层级(灵感点、目的点、关键点) node_type: 节点类型(分类、标签) name: 节点名称 Returns: 节点ID,格式: {层级}_{类型}_{名称} """ return f"{dimension}_{node_type}_{name}" def extract_post_id_from_filename(filename: str) -> str: """从文件名中提取帖子ID""" match = re.match(r'^([^_]+)_', filename) if match: return match.group(1) return "" def get_current_post_ids(current_posts_dir: Path) -> Set[str]: """ 获取当前帖子目录中的所有帖子ID Args: current_posts_dir: 当前帖子目录路径 Returns: 当前帖子ID集合 """ if not current_posts_dir.exists(): print(f"警告: 当前帖子目录不存在: {current_posts_dir}") return set() json_files = list(current_posts_dir.glob("*.json")) if not json_files: print(f"警告: 当前帖子目录为空: {current_posts_dir}") return set() print(f"找到 {len(json_files)} 个当前帖子") post_ids = set() for file_path in json_files: post_id = extract_post_id_from_filename(file_path.name) if post_id: post_ids.add(post_id) print(f"提取到 {len(post_ids)} 个帖子ID") return post_ids def collect_all_post_ids_from_nodes(nodes: List[Dict]) -> Set[str]: """从节点列表中收集所有帖子ID""" post_ids = set() for node in nodes: for source in node.get("节点来源", []): post_id = source.get("帖子ID", "") if post_id: post_ids.add(post_id) return post_ids def collect_all_post_ids_from_edges(edges: List[Dict]) -> Set[str]: """从边列表中收集所有帖子ID""" post_ids = set() for edge in edges: if edge.get("边类型") in ("分类共现(跨点)", "标签共现"): edge_details = edge.get("边详情", {}) common_post_ids = edge_details.get("共同帖子ID", []) post_ids.update(common_post_ids) # 点内共现边不包含帖子ID return post_ids def fetch_post_details(post_ids: Set[str]) -> Dict[str, Dict]: """ 批量获取帖子详情 Args: post_ids: 帖子ID集合 Returns: 帖子ID -> 帖子详情 的映射 """ print(f"\n正在获取 {len(post_ids)} 个帖子的详情...") post_details = {} for i, post_id in enumerate(sorted(post_ids), 1): print(f" [{i}/{len(post_ids)}] 获取帖子 {post_id} 的详情...") detail = get_post_detail(post_id) if detail: post_details[post_id] = detail print(f"成功获取 {len(post_details)} 个帖子详情") return post_details def filter_nodes_by_post_ids(nodes: List[Dict], exclude_post_ids: Set[str]) -> List[Dict]: """ 过滤节点,排除指定帖子ID的来源 Args: nodes: 节点列表 exclude_post_ids: 要排除的帖子ID集合 Returns: 过滤后的节点列表 """ filtered_nodes = [] for node in nodes: # 过滤节点来源 filtered_sources = [ source for source in node.get("节点来源", []) if source.get("帖子ID", "") not in exclude_post_ids ] # 只保留有来源的节点 if filtered_sources: node_copy = node.copy() node_copy["节点来源"] = filtered_sources # 重新计算帖子数 unique_post_ids = set(s.get("帖子ID", "") for s in filtered_sources if s.get("帖子ID")) node_copy["帖子数"] = len(unique_post_ids) filtered_nodes.append(node_copy) return filtered_nodes def filter_edges_by_post_ids(edges: List[Dict], exclude_post_ids: Set[str]) -> List[Dict]: """ 过滤边,排除指定帖子ID的共现边 Args: edges: 边列表 exclude_post_ids: 要排除的帖子ID集合 Returns: 过滤后的边列表 """ filtered_edges = [] for edge in edges: edge_type = edge["边类型"] if edge_type in ("分类共现(跨点)", "标签共现"): # 过滤共同帖子ID edge_details = edge.get("边详情", {}) common_post_ids = edge_details.get("共同帖子ID", []) filtered_post_ids = [pid for pid in common_post_ids if pid not in exclude_post_ids] if filtered_post_ids: edge_copy = edge.copy() edge_copy["边详情"] = edge_details.copy() edge_copy["边详情"]["共同帖子ID"] = filtered_post_ids edge_copy["边详情"]["共同帖子数"] = len(filtered_post_ids) filtered_edges.append(edge_copy) elif edge_type == "分类共现(点内)": # 点内共现边不涉及帖子ID,直接保留 filtered_edges.append(edge) else: # 属于/包含边不需要过滤 filtered_edges.append(edge) return filtered_edges # ========== 分类节点提取 ========== def extract_category_nodes_from_pattern( pattern_data: Dict, dimension_key: str, dimension_name: str ) -> List[Dict]: """ 从pattern聚合结果中提取分类节点 Args: pattern_data: pattern聚合数据 dimension_key: 维度键名(灵感点列表、目的点、关键点列表) dimension_name: 维度名称(灵感点、目的点、关键点) Returns: 分类节点列表 """ nodes = [] if dimension_key not in pattern_data: return nodes def collect_sources_recursively(node: Dict) -> List[Dict]: """递归收集节点及其所有子节点的特征来源""" sources = [] # 收集当前节点的特征 if "特征列表" in node: for feature in node["特征列表"]: source = { "点的名称": feature.get("所属点", ""), "点的描述": feature.get("点描述", ""), "帖子ID": feature.get("帖子id", "") } sources.append(source) # 递归收集子节点的特征 for key, value in node.items(): if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]: continue if isinstance(value, dict): sources.extend(collect_sources_recursively(value)) return sources def traverse_node(node: Dict, parent_categories: List[str]): """递归遍历节点""" for key, value in node.items(): if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]: continue if isinstance(value, dict): # 当前节点是一个分类 current_path = parent_categories + [key] # 获取帖子列表 post_ids = value.get("帖子列表", []) # 构建节点来源(从特征列表中获取,如果没有则递归收集子分类的) node_sources = [] if "特征列表" in value: for feature in value["特征列表"]: source = { "点的名称": feature.get("所属点", ""), "点的描述": feature.get("点描述", ""), "帖子ID": feature.get("帖子id", "") } node_sources.append(source) else: # 没有直接特征,递归收集子分类的特征来源 node_sources = collect_sources_recursively(value) node_info = { "节点ID": build_node_id(dimension_name, "分类", key), "节点名称": key, "节点类型": "分类", "节点层级": dimension_name, "所属分类": parent_categories.copy(), "帖子数": len(post_ids) if post_ids else len(set(s.get("帖子ID", "") for s in node_sources if s.get("帖子ID"))), "节点来源": node_sources } nodes.append(node_info) # 递归处理子节点 traverse_node(value, current_path) traverse_node(pattern_data[dimension_key], []) return nodes # ========== 标签节点提取 ========== def extract_tag_nodes_from_pattern( pattern_data: Dict, dimension_key: str, dimension_name: str ) -> List[Dict]: """ 从pattern聚合结果中提取标签节点 Args: pattern_data: pattern聚合数据 dimension_key: 维度键名 dimension_name: 维度名称 Returns: 标签节点列表 """ nodes = [] tag_map = {} # 用于合并同名标签 if dimension_key not in pattern_data: return nodes def traverse_node(node: Dict, parent_categories: List[str]): """递归遍历节点""" # 处理特征列表(标签) if "特征列表" in node: for feature in node["特征列表"]: tag_name = feature.get("特征名称", "") if not tag_name: continue source = { "点的名称": feature.get("所属点", ""), "点的描述": feature.get("点描述", ""), "帖子ID": feature.get("帖子id", "") } tag_id = build_node_id(dimension_name, "标签", tag_name) if tag_id not in tag_map: tag_map[tag_id] = { "节点ID": tag_id, "节点名称": tag_name, "节点类型": "标签", "节点层级": dimension_name, "所属分类": parent_categories.copy(), "帖子数": 0, "节点来源": [], "_post_ids": set() } tag_map[tag_id]["节点来源"].append(source) if source["帖子ID"]: tag_map[tag_id]["_post_ids"].add(source["帖子ID"]) # 递归处理子节点 for key, value in node.items(): if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]: continue if isinstance(value, dict): current_path = parent_categories + [key] traverse_node(value, current_path) traverse_node(pattern_data[dimension_key], []) # 转换为列表,计算帖子数 for tag_id, tag_info in tag_map.items(): tag_info["帖子数"] = len(tag_info["_post_ids"]) del tag_info["_post_ids"] nodes.append(tag_info) return nodes # ========== 标签-分类边提取 ========== def extract_tag_category_edges_from_pattern( pattern_data: Dict, dimension_key: str, dimension_name: str ) -> List[Dict]: """ 从pattern聚合结果中提取标签-分类边(属于/包含) Args: pattern_data: pattern聚合数据 dimension_key: 维度键名 dimension_name: 维度名称 Returns: 边列表 """ edges = [] seen_edges = set() # 避免重复边 if dimension_key not in pattern_data: return edges def traverse_node(node: Dict, parent_categories: List[str]): """递归遍历节点""" current_category = parent_categories[-1] if parent_categories else None # 处理特征列表(标签) if "特征列表" in node and current_category: for feature in node["特征列表"]: tag_name = feature.get("特征名称", "") if not tag_name: continue tag_id = build_node_id(dimension_name, "标签", tag_name) category_id = build_node_id(dimension_name, "分类", current_category) # 属于边:标签 -> 分类 edge_key_belong = (tag_id, category_id, "属于") if edge_key_belong not in seen_edges: seen_edges.add(edge_key_belong) edges.append({ "源节点ID": tag_id, "目标节点ID": category_id, "边类型": "属于", "边详情": {} }) # 包含边:分类 -> 标签 edge_key_contain = (category_id, tag_id, "包含") if edge_key_contain not in seen_edges: seen_edges.add(edge_key_contain) edges.append({ "源节点ID": category_id, "目标节点ID": tag_id, "边类型": "包含", "边详情": {} }) # 递归处理子节点 for key, value in node.items(): if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]: continue if isinstance(value, dict): current_path = parent_categories + [key] traverse_node(value, current_path) traverse_node(pattern_data[dimension_key], []) return edges # ========== 标签-标签共现边提取 ========== def extract_tags_from_post(post_data: Dict) -> Dict[str, List[str]]: """ 从单个帖子的解构结果中提取所有标签(特征名称) Args: post_data: 帖子解构数据 Returns: 按维度分组的标签字典 {"灵感点": [...], "目的点": [...], "关键点": [...]} """ tags_by_dimension = { "灵感点": [], "目的点": [], "关键点": [] } if "三点解构" not in post_data: return tags_by_dimension three_points = post_data["三点解构"] # 提取灵感点的特征 if "灵感点" in three_points: inspiration = three_points["灵感点"] for section in ["全新内容", "共性差异", "共性内容"]: if section in inspiration and isinstance(inspiration[section], list): for item in inspiration[section]: if "提取的特征" in item and isinstance(item["提取的特征"], list): for feature in item["提取的特征"]: tag_name = feature.get("特征名称", "") if tag_name: tags_by_dimension["灵感点"].append(tag_name) # 提取目的点的特征 if "目的点" in three_points: purpose = three_points["目的点"] if "purposes" in purpose and isinstance(purpose["purposes"], list): for item in purpose["purposes"]: if "提取的特征" in item and isinstance(item["提取的特征"], list): for feature in item["提取的特征"]: tag_name = feature.get("特征名称", "") if tag_name: tags_by_dimension["目的点"].append(tag_name) # 提取关键点的特征 if "关键点" in three_points: key_points = three_points["关键点"] if "key_points" in key_points and isinstance(key_points["key_points"], list): for item in key_points["key_points"]: if "提取的特征" in item and isinstance(item["提取的特征"], list): for feature in item["提取的特征"]: tag_name = feature.get("特征名称", "") if tag_name: tags_by_dimension["关键点"].append(tag_name) return tags_by_dimension def extract_tag_cooccurrence_edges(historical_posts_dir: Path, exclude_post_ids: Set[str] = None) -> List[Dict]: """ 从历史帖子解构结果中提取标签-标签共现边 Args: historical_posts_dir: 历史帖子解构结果目录 exclude_post_ids: 要排除的帖子ID集合 Returns: 标签共现边列表 """ if exclude_post_ids is None: exclude_post_ids = set() # 存储每对标签的共现信息 # key: (tag1_id, tag2_id), value: {"共同帖子ID": set()} cooccurrence_map = {} if not historical_posts_dir.exists(): print(f"警告: 历史帖子目录不存在: {historical_posts_dir}") return [] json_files = list(historical_posts_dir.glob("*.json")) print(f"找到 {len(json_files)} 个历史帖子文件") for file_path in json_files: # 提取帖子ID post_id = extract_post_id_from_filename(file_path.name) if not post_id: continue # 跳过排除的帖子 if post_id in exclude_post_ids: continue try: with open(file_path, "r", encoding="utf-8") as f: post_data = json.load(f) # 提取该帖子的所有标签 tags_by_dimension = extract_tags_from_post(post_data) # 对每个维度内的标签两两组合,构建共现关系 for dimension, tags in tags_by_dimension.items(): unique_tags = list(set(tags)) # 去重 for i in range(len(unique_tags)): for j in range(i + 1, len(unique_tags)): tag1 = unique_tags[i] tag2 = unique_tags[j] # 构建节点ID tag1_id = build_node_id(dimension, "标签", tag1) tag2_id = build_node_id(dimension, "标签", tag2) # 确保顺序一致(按字典序) if tag1_id > tag2_id: tag1_id, tag2_id = tag2_id, tag1_id key = (tag1_id, tag2_id, dimension) if key not in cooccurrence_map: cooccurrence_map[key] = {"共同帖子ID": set()} cooccurrence_map[key]["共同帖子ID"].add(post_id) except Exception as e: print(f" 警告: 处理文件 {file_path.name} 时出错: {e}") # 转换为边列表 edges = [] for (tag1_id, tag2_id, dimension), info in cooccurrence_map.items(): common_post_ids = list(info["共同帖子ID"]) edge = { "源节点ID": tag1_id, "目标节点ID": tag2_id, "边类型": "标签共现", "边详情": { "共同帖子数": len(common_post_ids), "共同帖子ID": common_post_ids } } edges.append(edge) return edges # ========== 分类-分类边提取 ========== def extract_category_edges_from_associations(associations_data: Dict) -> List[Dict]: """ 从dimension_associations_analysis.json中提取分类-分类边(共现) Args: associations_data: 关联分析数据 Returns: 边列表 """ edges = [] if "单维度关联分析" not in associations_data: return edges single_dim = associations_data["单维度关联分析"] # 维度映射 dimension_map = { "灵感点维度": "灵感点", "目的点维度": "目的点", "关键点维度": "关键点" } for dim_key, dim_data in single_dim.items(): if dim_key not in dimension_map: continue source_dimension = dimension_map[dim_key] # 遍历该维度下的所有关联方向 for direction_key, direction_data in dim_data.items(): if direction_key == "说明": continue if "→" not in direction_key: continue # 遍历每个源分类 for source_path, source_info in direction_data.items(): source_name = get_last_segment(source_path) source_node_id = build_node_id(source_dimension, "分类", source_name) # 确定目标维度 for field_name, associations in source_info.items(): if not field_name.startswith("与") or not field_name.endswith("的关联"): continue target_dimension = field_name[1:-3] if not isinstance(associations, list): continue for assoc in associations: target_path = assoc.get("目标分类", "") if not target_path: continue target_name = get_last_segment(target_path) target_node_id = build_node_id(target_dimension, "分类", target_name) edge = { "源节点ID": source_node_id, "目标节点ID": target_node_id, "边类型": "分类共现(跨点)", "边详情": { "Jaccard相似度": assoc.get("Jaccard相似度", 0), "重叠系数": assoc.get("重叠系数", 0), "共同帖子数": assoc.get("共同帖子数", 0), "共同帖子ID": assoc.get("共同帖子ID", []) } } edges.append(edge) return edges # ========== 点内分类共现边提取 ========== def extract_intra_category_edges(intra_associations_data: Dict) -> List[Dict]: """ 从intra_dimension_associations_analysis.json中提取点内分类共现边 Args: intra_associations_data: 点内关联分析数据 Returns: 边列表 """ edges = [] seen_edges = set() # 避免重复边 if "叶子分类组合聚类" not in intra_associations_data: return edges clusters_by_dim = intra_associations_data["叶子分类组合聚类"] for dimension, clusters in clusters_by_dim.items(): if dimension not in ("灵感点", "目的点", "关键点"): continue for cluster_key, cluster_data in clusters.items(): leaf_categories = cluster_data.get("叶子分类组合", []) point_count = cluster_data.get("点数", 0) point_details = cluster_data.get("点详情列表", []) # 提取点名称列表 point_names = [p.get("点名称", "") for p in point_details if p.get("点名称")] # 两两组合生成共现边 for i in range(len(leaf_categories)): for j in range(i + 1, len(leaf_categories)): cat1 = leaf_categories[i] cat2 = leaf_categories[j] # 构建节点ID cat1_id = build_node_id(dimension, "分类", cat1) cat2_id = build_node_id(dimension, "分类", cat2) # 确保顺序一致(按字典序) if cat1_id > cat2_id: cat1_id, cat2_id = cat2_id, cat1_id edge_key = (cat1_id, cat2_id, dimension) if edge_key in seen_edges: # 已存在的边,累加点数和点名称 for edge in edges: if (edge["源节点ID"] == cat1_id and edge["目标节点ID"] == cat2_id and edge["边类型"] == "分类共现(点内)"): edge["边详情"]["点数"] += point_count edge["边详情"]["关联点名称"].extend(point_names) break else: seen_edges.add(edge_key) edge = { "源节点ID": cat1_id, "目标节点ID": cat2_id, "边类型": "分类共现(点内)", "边详情": { "点数": point_count, "关联点名称": point_names.copy() } } edges.append(edge) return edges # ========== 主函数 ========== def main(): # 使用路径配置 config = PathConfig() config.ensure_dirs() print(f"账号: {config.account_name}") print(f"输出版本: {config.output_version}") print(f"过滤模式: {config.filter_mode}") print() # 输入文件路径 pattern_file = config.pattern_cluster_file associations_file = config.account_dir / "pattern相关文件/optimization/dimension_associations_analysis.json" intra_associations_file = config.account_dir / "pattern相关文件/optimization/intra_dimension_associations_analysis.json" current_posts_dir = config.current_posts_dir # 输出文件路径 nodes_output_file = config.intermediate_dir / "节点列表.json" edges_output_file = config.intermediate_dir / "边关系.json" print(f"输入文件:") print(f" pattern聚合文件: {pattern_file}") print(f" 跨点关联分析文件: {associations_file}") print(f" 点内关联分析文件: {intra_associations_file}") print(f" 当前帖子目录: {current_posts_dir}") print(f"\n输出文件:") print(f" 节点列表: {nodes_output_file}") print(f" 边关系: {edges_output_file}") print() # 读取pattern聚合结果 print("正在读取pattern聚合结果...") with open(pattern_file, "r", encoding="utf-8") as f: pattern_data = json.load(f) # 读取跨点关联分析结果 print("正在读取跨点关联分析结果...") with open(associations_file, "r", encoding="utf-8") as f: associations_data = json.load(f) # 读取点内关联分析结果 print("正在读取点内关联分析结果...") with open(intra_associations_file, "r", encoding="utf-8") as f: intra_associations_data = json.load(f) # ===== 提取节点 ===== print("\n" + "="*60) print("正在提取节点...") all_nodes = [] # 维度映射 dimension_mapping = { "灵感点列表": "灵感点", "目的点": "目的点", "关键点列表": "关键点" } # 提取分类节点 print("\n提取分类节点:") for dim_key, dim_name in dimension_mapping.items(): category_nodes = extract_category_nodes_from_pattern(pattern_data, dim_key, dim_name) all_nodes.extend(category_nodes) print(f" {dim_name}: {len(category_nodes)} 个分类节点") # 提取标签节点 print("\n提取标签节点:") for dim_key, dim_name in dimension_mapping.items(): tag_nodes = extract_tag_nodes_from_pattern(pattern_data, dim_key, dim_name) all_nodes.extend(tag_nodes) print(f" {dim_name}: {len(tag_nodes)} 个标签节点") print(f"\n总计: {len(all_nodes)} 个节点") # 统计节点类型 category_count = sum(1 for n in all_nodes if n["节点类型"] == "分类") tag_count = sum(1 for n in all_nodes if n["节点类型"] == "标签") print(f" 分类节点: {category_count}") print(f" 标签节点: {tag_count}") # ===== 提取边 ===== print("\n" + "="*60) print("正在提取边...") all_edges = [] # 提取分类-分类边(跨点共现) print("\n提取分类-分类边(跨点共现):") category_edges = extract_category_edges_from_associations(associations_data) all_edges.extend(category_edges) print(f" 分类共现(跨点)边: {len(category_edges)} 条") # 提取分类-分类边(点内共现) print("\n提取分类-分类边(点内共现):") intra_category_edges = extract_intra_category_edges(intra_associations_data) all_edges.extend(intra_category_edges) print(f" 分类共现(点内)边: {len(intra_category_edges)} 条") # 提取标签-分类边(属于/包含) print("\n提取标签-分类边(属于/包含):") belong_count = 0 contain_count = 0 for dim_key, dim_name in dimension_mapping.items(): tag_category_edges = extract_tag_category_edges_from_pattern(pattern_data, dim_key, dim_name) all_edges.extend(tag_category_edges) dim_belong = sum(1 for e in tag_category_edges if e["边类型"] == "属于") dim_contain = sum(1 for e in tag_category_edges if e["边类型"] == "包含") belong_count += dim_belong contain_count += dim_contain print(f" {dim_name}: {dim_belong} 条属于边, {dim_contain} 条包含边") # 提取标签-标签边(共现)- 需要在过滤之前先记录排除的帖子ID # 这里先占位,过滤后再处理 tag_cooccurrence_edges_placeholder = True print(f"\n边统计(标签共现待提取):") print(f" 分类共现(跨点)边: {len(category_edges)}") print(f" 分类共现(点内)边: {len(intra_category_edges)}") print(f" 属于边: {belong_count}") print(f" 包含边: {contain_count}") # ===== 应用过滤 ===== exclude_post_ids = set() filter_mode = config.filter_mode if filter_mode == "exclude_current_posts": print("\n" + "="*60) print("应用过滤规则: 排除当前帖子ID") exclude_post_ids = get_current_post_ids(current_posts_dir) if exclude_post_ids: # 过滤节点 nodes_before = len(all_nodes) all_nodes = filter_nodes_by_post_ids(all_nodes, exclude_post_ids) nodes_after = len(all_nodes) print(f"\n节点过滤: {nodes_before} -> {nodes_after} (移除 {nodes_before - nodes_after} 个)") # 过滤边 edges_before = len(all_edges) all_edges = filter_edges_by_post_ids(all_edges, exclude_post_ids) edges_after = len(all_edges) print(f"边过滤: {edges_before} -> {edges_after} (移除 {edges_before - edges_after} 条)") elif filter_mode == "none": print("\n过滤模式: none,不应用任何过滤") else: print(f"\n警告: 未知的过滤模式 '{filter_mode}',不应用过滤") # ===== 提取标签-标签共现边 ===== print("\n" + "="*60) print("提取标签-标签共现边...") historical_posts_dir = config.historical_posts_dir print(f"历史帖子目录: {historical_posts_dir}") tag_cooccurrence_edges = extract_tag_cooccurrence_edges(historical_posts_dir, exclude_post_ids) all_edges.extend(tag_cooccurrence_edges) print(f" 标签-标签共现边: {len(tag_cooccurrence_edges)} 条") # 更新总计 print(f"\n总计: {len(all_edges)} 条边") print(f" 分类共现(跨点)边: {len(category_edges)}") print(f" 分类共现(点内)边: {len(intra_category_edges)}") print(f" 标签共现边: {len(tag_cooccurrence_edges)}") print(f" 属于边: {belong_count}") print(f" 包含边: {contain_count}") # ===== 获取帖子详情 ===== print("\n" + "="*60) print("获取帖子详情...") # 收集所有需要获取详情的帖子ID(从节点和边) post_ids_from_nodes = collect_all_post_ids_from_nodes(all_nodes) post_ids_from_edges = collect_all_post_ids_from_edges(all_edges) all_post_ids = post_ids_from_nodes | post_ids_from_edges print(f"节点中的帖子: {len(post_ids_from_nodes)} 个") print(f"边中的帖子: {len(post_ids_from_edges)} 个") print(f"合计(去重): {len(all_post_ids)} 个") # 批量获取帖子详情 post_details = fetch_post_details(all_post_ids) # ===== 保存结果 ===== print("\n" + "="*60) # 输出文件路径 post_details_output_file = config.intermediate_dir / "帖子详情映射.json" # 保存节点列表 nodes_output = { "说明": { "描述": "分类和标签节点列表", "数据来源": ["过去帖子_pattern聚合结果.json"], "过滤模式": filter_mode, "过滤帖子数": len(exclude_post_ids) if exclude_post_ids else 0 }, "节点列表": all_nodes } print(f"正在保存节点列表到: {nodes_output_file}") with open(nodes_output_file, "w", encoding="utf-8") as f: json.dump(nodes_output, f, ensure_ascii=False, indent=2) # 构建节点ID索引的边关系: 节点 -> 边类型 -> {目标节点: 完整边信息} edges_by_node = {} # key: 节点ID, value: {边类型: {目标节点ID: 完整边信息}} for edge in all_edges: source_id = edge["源节点ID"] target_id = edge["目标节点ID"] edge_type = edge["边类型"] # 源节点 -> 目标节点 if source_id not in edges_by_node: edges_by_node[source_id] = {} if edge_type not in edges_by_node[source_id]: edges_by_node[source_id][edge_type] = {} edges_by_node[source_id][edge_type][target_id] = edge # 保存边关系 edges_output = { "说明": { "描述": "分类和标签之间的边关系", "数据来源": ["过去帖子_pattern聚合结果.json", "dimension_associations_analysis.json", "过去帖子_what解构结果目录"], "过滤模式": filter_mode, "过滤帖子数": len(exclude_post_ids) if exclude_post_ids else 0 }, "边列表": all_edges, "节点边索引": edges_by_node } print(f"正在保存边关系到: {edges_output_file}") with open(edges_output_file, "w", encoding="utf-8") as f: json.dump(edges_output, f, ensure_ascii=False, indent=2) # 保存帖子详情映射 post_details_output = { "说明": { "描述": "帖子ID到帖子详情的映射", "帖子数": len(post_details) }, "帖子详情": post_details } print(f"正在保存帖子详情映射到: {post_details_output_file}") with open(post_details_output_file, "w", encoding="utf-8") as f: json.dump(post_details_output, f, ensure_ascii=False, indent=2) print("\n完成!") print(f"\n输出文件:") print(f" 节点列表: {len(all_nodes)} 个节点") print(f" 边关系: {len(all_edges)} 条边") print(f" 帖子详情映射: {len(post_details)} 个帖子") if __name__ == "__main__": main()