#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创作起点分析 - 数据准备脚本 第一步:根据帖子图谱 + 人设图谱,把信息压缩到待分析节点中 输入:帖子图谱 + 人设图谱 输出:待分析数据结构 """ import json from pathlib import Path from typing import Dict, List, Optional import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from script.data_processing.path_config import PathConfig # ===== 数据加载函数 ===== def load_json(file_path: Path) -> Dict: """加载JSON文件""" with open(file_path, "r", encoding="utf-8") as f: return json.load(f) def get_post_graph_files(config: PathConfig) -> List[Path]: """获取所有帖子图谱文件""" post_graph_dir = config.intermediate_dir / "post_graph" return sorted(post_graph_dir.glob("*_帖子图谱.json")) # ===== 数据提取函数 ===== def extract_post_detail(post_graph: Dict) -> Dict: """ 提取帖子详情(保留原始字段名) """ meta = post_graph.get("meta", {}) post_detail = meta.get("postDetail", {}) return { "postId": meta.get("postId", ""), "postTitle": meta.get("postTitle", ""), "body_text": post_detail.get("body_text", ""), "images": post_detail.get("images", []), "video": post_detail.get("video"), "publish_time": post_detail.get("publish_time", ""), "like_count": post_detail.get("like_count", 0), "collect_count": post_detail.get("collect_count", 0), } def extract_analysis_nodes(post_graph: Dict, persona_graph: Dict) -> List[Dict]: """ 提取待分析节点列表 待分析节点 = 灵感点 + 目的点(不包括关键点,关键点是支撑信息) """ nodes = post_graph.get("nodes", {}) edges = post_graph.get("edges", {}) persona_nodes = persona_graph.get("nodes", {}) persona_index = persona_graph.get("index", {}) # 1. 收集关键点信息(用于支撑信息) keypoints = {} for node_id, node in nodes.items(): if node.get("type") == "标签" and node.get("dimension") == "关键点": keypoints[node_id] = { "名称": node.get("name", ""), "描述": node.get("detail", {}).get("description", ""), } # 2. 分析支撑关系:关键点 → 灵感点/目的点 support_map = {} # {target_node_id: [支撑的关键点信息]} for edge_id, edge in edges.items(): if edge.get("type") == "支撑": source_id = edge.get("source", "") target_id = edge.get("target", "") if source_id in keypoints: if target_id not in support_map: support_map[target_id] = [] support_map[target_id].append(keypoints[source_id]) # 3. 分析关联关系 relation_map = {} # {node_id: [关联的节点名称]} for edge_id, edge in edges.items(): if edge.get("type") == "关联": source_id = edge.get("source", "") target_id = edge.get("target", "") source_name = nodes.get(source_id, {}).get("name", "") target_name = nodes.get(target_id, {}).get("name", "") # 双向记录 if source_id not in relation_map: relation_map[source_id] = [] relation_map[source_id].append(target_name) if target_id not in relation_map: relation_map[target_id] = [] relation_map[target_id].append(source_name) # 4. 分析人设匹配 match_map = {} # {node_id: 匹配信息} persona_out_edges = persona_index.get("outEdges", {}) def get_node_info(node_id: str) -> Optional[Dict]: """获取人设节点的标准信息""" node = persona_nodes.get(node_id, {}) if not node: return None detail = node.get("detail", {}) parent_path = detail.get("parentPath", []) return { "节点ID": node_id, "节点名称": node.get("name", ""), "节点分类": "/".join(parent_path) if parent_path else "", "节点维度": node.get("dimension", ""), "节点类型": node.get("type", ""), "人设全局占比": detail.get("probGlobal", 0), "父类下占比": detail.get("probToParent", 0), } def get_parent_category_id(node_id: str) -> Optional[str]: """通过属于边获取父分类节点ID""" belong_edges = persona_out_edges.get(node_id, {}).get("属于", []) for edge in belong_edges: target_id = edge.get("target", "") target_node = persona_nodes.get(target_id, {}) if target_node.get("type") == "分类": return target_id return None for edge_id, edge in edges.items(): if edge.get("type") == "匹配": source_id = edge.get("source", "") target_id = edge.get("target", "") # 只处理 帖子节点 → 人设节点 的匹配 if source_id.startswith("帖子:") and target_id.startswith("人设:"): match_score = edge.get("score", 0) persona_node = persona_nodes.get(target_id, {}) if persona_node: node_type = persona_node.get("type", "") # 获取匹配节点信息 match_node_info = get_node_info(target_id) if not match_node_info: continue # 确定所属分类节点 if node_type == "标签": # 标签:找父分类 category_id = get_parent_category_id(target_id) else: # 分类:就是自己 category_id = target_id # 获取所属分类信息和常见搭配 category_info = None if category_id: category_node = persona_nodes.get(category_id, {}) if category_node: category_detail = category_node.get("detail", {}) category_path = category_detail.get("parentPath", []) category_info = { "节点ID": category_id, "节点名称": category_node.get("name", ""), "节点分类": "/".join(category_path) if category_path else "", "节点维度": category_node.get("dimension", ""), "节点类型": "分类", "人设全局占比": category_detail.get("probGlobal", 0), "父类下占比": category_detail.get("probToParent", 0), "历史共现分类": [], } # 获取分类共现节点(按共现度降序排列) co_occur_edges = persona_out_edges.get(category_id, {}).get("分类共现", []) co_occur_edges_sorted = sorted(co_occur_edges, key=lambda x: x.get("score", 0), reverse=True) for co_edge in co_occur_edges_sorted[:5]: # 取前5个 co_target_id = co_edge.get("target", "") co_score = co_edge.get("score", 0) co_node = persona_nodes.get(co_target_id, {}) if co_node: co_detail = co_node.get("detail", {}) co_path = co_detail.get("parentPath", []) category_info["历史共现分类"].append({ "节点ID": co_target_id, "节点名称": co_node.get("name", ""), "节点分类": "/".join(co_path) if co_path else "", "节点维度": co_node.get("dimension", ""), "节点类型": "分类", "人设全局占比": co_detail.get("probGlobal", 0), "父类下占比": co_detail.get("probToParent", 0), "共现度": round(co_score, 4), }) match_map[source_id] = { "匹配节点": match_node_info, "匹配分数": round(match_score, 4), "所属分类": category_info, } # 5. 构建待分析节点列表(灵感点、目的点、关键点) analysis_nodes = [] for node_id, node in nodes.items(): if node.get("type") == "标签" and node.get("domain") == "帖子": dimension = node.get("dimension", "") if dimension in ["灵感点", "目的点", "关键点"]: # 人设匹配信息 match_info = match_map.get(node_id) analysis_nodes.append({ "节点ID": node_id, "节点名称": node.get("name", ""), "节点分类": node.get("category", ""), # 根分类:意图/实质/形式 "节点维度": dimension, "节点类型": node.get("type", ""), "节点描述": node.get("detail", {}).get("description", ""), "人设匹配": match_info, }) # 6. 构建可能的关系列表 relation_list = [] # 支撑关系:关键点 → 灵感点/目的点 for edge_id, edge in edges.items(): if edge.get("type") == "支撑": source_id = edge.get("source", "") target_id = edge.get("target", "") if source_id in keypoints: relation_list.append({ "来源节点": source_id, "目标节点": target_id, "关系类型": "支撑", }) # 关联关系:节点之间的关联(去重,只记录一次) seen_relations = set() for edge_id, edge in edges.items(): if edge.get("type") == "关联": source_id = edge.get("source", "") target_id = edge.get("target", "") # 用排序后的元组作为key去重 key = tuple(sorted([source_id, target_id])) if key not in seen_relations: seen_relations.add(key) relation_list.append({ "来源节点": source_id, "目标节点": target_id, "关系类型": "关联", }) return analysis_nodes, relation_list def prepare_analysis_data(post_graph: Dict, persona_graph: Dict) -> Dict: """ 准备完整的分析数据 Returns: { "帖子详情": {...}, "待分析节点列表": [...], "可能的关系列表": [...] } """ analysis_nodes, relation_list = extract_analysis_nodes(post_graph, persona_graph) return { "帖子详情": extract_post_detail(post_graph), "待分析节点列表": analysis_nodes, "可能的关系列表": relation_list, } # ===== 显示函数 ===== def display_prepared_data(data: Dict): """显示准备好的数据""" post = data["帖子详情"] nodes = data["待分析节点列表"] relations = data["可能的关系列表"] print(f"\n帖子: {post['postId']}") print(f"标题: {post['postTitle']}") print(f"正文: {post['body_text'][:100]}...") print(f"\n待分析节点 ({len(nodes)} 个):") for node in nodes: match = node.get("人设匹配") category = node.get('节点分类', '') print(f" - [{node['节点ID']}] {node['节点名称']} ({node['节点维度']}/{category})") if match: match_node = match.get("匹配节点", {}) category_node = match.get("所属分类", {}) print(f" 匹配: {match_node.get('节点名称', '')} ({match_node.get('节点类型', '')}, 全局占比={match_node.get('人设全局占比', 0):.2%})") if category_node: co_count = len(category_node.get("历史共现分类", [])) print(f" 所属分类: {category_node.get('节点名称', '')} (全局占比={category_node.get('人设全局占比', 0):.2%}, {co_count}个历史共现分类)") else: print(f" 人设: 无匹配") print(f"\n可能的关系 ({len(relations)} 条):") for rel in relations: rel_type = rel["关系类型"] if rel_type == "支撑": print(f" - {rel['来源节点']} → {rel['目标节点']} [支撑]") else: print(f" - {rel['来源节点']} ↔ {rel['目标节点']} [关联]") # ===== 处理函数 ===== def process_single_post( post_file: Path, persona_graph: Dict, config: PathConfig, save: bool = True, ) -> Dict: """ 处理单个帖子 Args: post_file: 帖子图谱文件路径 persona_graph: 人设图谱数据 config: 路径配置 save: 是否保存结果 Returns: 准备好的分析数据 """ # 加载帖子图谱 post_graph = load_json(post_file) post_id = post_graph.get("meta", {}).get("postId", "unknown") print(f"\n{'=' * 60}") print(f"处理帖子: {post_id}") print("-" * 60) # 准备数据 data = prepare_analysis_data(post_graph, persona_graph) # 显示 display_prepared_data(data) # 保存 if save: output_dir = config.intermediate_dir / "origin_analysis_prepared" output_dir.mkdir(parents=True, exist_ok=True) output_file = output_dir / f"{post_id}_待分析数据.json" with open(output_file, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f"\n已保存: {output_file.name}") return data # ===== 主函数 ===== def main( post_id: str = None, all_posts: bool = False, save: bool = True, ): """ 主函数 Args: post_id: 帖子ID,可选 all_posts: 是否处理所有帖子 save: 是否保存结果 """ config = PathConfig() print(f"账号: {config.account_name}") # 加载人设图谱 persona_graph_file = config.intermediate_dir / "人设图谱.json" if not persona_graph_file.exists(): print(f"错误: 人设图谱文件不存在: {persona_graph_file}") return persona_graph = load_json(persona_graph_file) print(f"人设图谱节点数: {len(persona_graph.get('nodes', {}))}") # 获取帖子图谱文件 post_graph_files = get_post_graph_files(config) if not post_graph_files: print("错误: 没有找到帖子图谱文件") return # 确定要处理的帖子 if post_id: target_file = next( (f for f in post_graph_files if post_id in f.name), None ) if not target_file: print(f"错误: 未找到帖子 {post_id}") return files_to_process = [target_file] elif all_posts: files_to_process = post_graph_files else: files_to_process = [post_graph_files[0]] print(f"待处理帖子数: {len(files_to_process)}") # 处理 results = [] for i, post_file in enumerate(files_to_process, 1): print(f"\n{'#' * 60}") print(f"# 处理帖子 {i}/{len(files_to_process)}") print(f"{'#' * 60}") data = process_single_post( post_file=post_file, persona_graph=persona_graph, config=config, save=save, ) results.append(data) print(f"\n{'#' * 60}") print(f"# 完成! 共处理 {len(results)} 个帖子") print(f"{'#' * 60}") return results if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="创作起点分析 - 数据准备") parser.add_argument("--post-id", type=str, help="帖子ID") parser.add_argument("--all-posts", action="store_true", help="处理所有帖子") parser.add_argument("--no-save", action="store_true", help="不保存结果") args = parser.parse_args() main( post_id=args.post_id, all_posts=args.all_posts, save=not args.no_save, )