| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 构建人设图谱
- ================================================================================
- 输入文件:
- ================================================================================
- 1. pattern聚合结果.json - 分类节点、标签节点、属于/包含边
- 2. dimension_associations_analysis.json - 分类共现边(跨点)
- 3. intra_dimension_associations_analysis.json - 分类共现边(点内)
- 4. 历史帖子解构目录/*.json - 标签共现边
- ================================================================================
- 输出文件: 人设图谱.json
- ================================================================================
- {
- "meta": { # 元信息
- "description": "...",
- "account": "账号名",
- "createdAt": "时间戳",
- "stats": { ... } # 统计信息
- },
- "nodes": { # 节点字典 (nodeId -> nodeData)
- "{domain}:{dimension}:{type}:{name}": {
- "name": "显示名称",
- "type": "人设|灵感点|目的点|关键点|分类|标签",
- "domain": "人设",
- "dimension": "人设|灵感点|目的点|关键点",
- "detail": { ... }
- }
- },
- "edges": { # 边字典 (edgeId -> edgeData)
- "{source}|{type}|{target}": {
- "source": "源节点ID",
- "target": "目标节点ID",
- "type": "属于|包含|标签共现|分类共现|分类共现",
- "score": 0.5,
- "detail": { ... }
- }
- },
- "index": { # 游走索引
- "outEdges": { nodeId: { edgeType: [{ target, score }] } },
- "inEdges": { nodeId: { edgeType: [{ source, score }] } }
- },
- "tree": { ... } # 嵌套树结构(从根节点沿"包含"边构建)
- }
- ================================================================================
- 核心逻辑:
- ================================================================================
- 1. 提取节点
- - 从 pattern 提取分类节点(按维度分组的层级分类)
- - 从 pattern 提取标签节点(具体特征标签)
- - 添加根节点(人设)和维度节点(灵感点/目的点/关键点)
- 2. 提取边
- - 属于/包含边:根据节点的 parentPath 构建层级关系
- - 分类共现边(跨点):从关联分析结果提取
- - 分类共现边(点内):从点内关联分析提取
- - 标签共现边:遍历历史帖子,统计标签同现
- 3. 构建索引
- - outEdges: 从该节点出发能到达的节点
- - inEdges: 能到达该节点的源节点
- 4. 构建树
- - 从根节点开始,沿"包含"边递归构建嵌套树结构
- ================================================================================
- 节点ID格式: {domain}:{dimension}:{type}:{name}
- ================================================================================
- - 根节点: 人设:人设:人设:人设
- - 维度节点: 人设:灵感点:灵感点:灵感点
- - 分类节点: 人设:灵感点:分类:视觉呈现
- - 标签节点: 人设:灵感点:标签:手绘风格
- ================================================================================
- 边类型:
- ================================================================================
- - 属于: 子节点 -> 父节点(层级关系)
- - 包含: 父节点 -> 子节点(层级关系)
- - 标签共现: 标签 <-> 标签(同一帖子出现)
- - 分类共现: 分类 <-> 分类(跨维度共现)
- - 分类共现: 分类 <-> 分类(点内组合共现)
- ================================================================================
- 图游走函数:
- ================================================================================
- 1. walk_graph(index, start_node, edge_types, direction, min_score)
- - 从起始节点出发,按边类型序列游走N步
- - 示例: walk_graph(index, "人设:灵感点:标签:手绘风格", ["属于", "分类共现"])
- - 返回: 到达的节点ID集合
- 2. get_neighbors(index, node_id, edge_type, direction, min_score)
- - 获取节点的邻居
- - 示例: get_neighbors(index, "人设:灵感点:分类:视觉呈现", "包含")
- - 返回: 邻居列表 [{"target": "...", "score": 0.5}, ...]
- ================================================================================
- """
- import json
- from pathlib import Path
- from typing import Dict, List, Set, Any
- from datetime import datetime
- import sys
- # 添加项目根目录到路径
- project_root = Path(__file__).parent.parent.parent
- sys.path.insert(0, str(project_root))
- from script.data_processing.path_config import PathConfig
- # ==================== 节点和边构建工具 ====================
- def build_node_id(domain: str, dimension: str, node_type: str, name: str) -> str:
- """构建节点ID"""
- return f"{domain}:{dimension}:{node_type}:{name}"
- def build_edge_id(source: str, edge_type: str, target: str) -> str:
- """构建边ID"""
- return f"{source}|{edge_type}|{target}"
- def create_node(
- domain: str,
- dimension: str,
- node_type: str,
- name: str,
- detail: Dict = None
- ) -> Dict:
- """创建节点"""
- return {
- "name": name,
- "type": node_type,
- "dimension": dimension,
- "domain": domain,
- "detail": detail or {}
- }
- def create_edge(
- source: str,
- target: str,
- edge_type: str,
- score: float = None,
- detail: Dict = None
- ) -> Dict:
- """创建边"""
- return {
- "source": source,
- "target": target,
- "type": edge_type,
- "score": score,
- "detail": detail or {}
- }
- # ==================== 从 pattern 提取分类节点 ====================
- def extract_category_nodes_from_pattern(
- pattern_data: Dict,
- dimension_key: str,
- dimension_name: str
- ) -> Dict[str, Dict]:
- """
- 从 pattern 聚合结果中提取分类节点
- Returns:
- { nodeId: nodeData }
- """
- nodes = {}
- if dimension_key not in pattern_data:
- return nodes
- def collect_sources_recursively(node: Dict) -> List[Dict]:
- """递归收集节点及其所有子节点的特征来源"""
- sources = []
- if "特征列表" in node:
- for feature in node["特征列表"]:
- source = {
- "pointName": feature.get("所属点", ""),
- "pointDesc": feature.get("点描述", ""),
- "postId": feature.get("帖子id", "")
- }
- sources.append(source)
- for key, value in node.items():
- if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]:
- continue
- if isinstance(value, dict):
- sources.extend(collect_sources_recursively(value))
- return sources
- def traverse_node(node: Dict, parent_path: List[str]):
- """递归遍历节点"""
- for key, value in node.items():
- if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]:
- continue
- if isinstance(value, dict):
- current_path = parent_path + [key]
- # 构建节点来源(只收集当前节点的特征)
- node_sources = []
- if "特征列表" in value:
- for feature in value["特征列表"]:
- source = {
- "pointName": feature.get("所属点", ""),
- "pointDesc": feature.get("点描述", ""),
- "postId": feature.get("帖子id", "")
- }
- node_sources.append(source)
- # 收集帖子ID列表(递归收集当前节点及所有子节点的帖子ID,去重)
- all_sources = collect_sources_recursively(value)
- unique_post_ids = list(set(s.get("postId", "") for s in all_sources if s.get("postId")))
- # 构建节点
- node_id = build_node_id("人设", dimension_name, "分类", key)
- nodes[node_id] = create_node(
- domain="人设",
- dimension=dimension_name,
- node_type="分类",
- name=key,
- detail={
- "parentPath": parent_path.copy(),
- "postIds": unique_post_ids,
- "postCount": len(unique_post_ids),
- "sources": node_sources
- }
- )
- # 递归处理子节点
- traverse_node(value, current_path)
- traverse_node(pattern_data[dimension_key], [])
- return nodes
- # ==================== 从 pattern 提取标签节点 ====================
- def extract_tag_nodes_from_pattern(
- pattern_data: Dict,
- dimension_key: str,
- dimension_name: str
- ) -> Dict[str, Dict]:
- """
- 从 pattern 聚合结果中提取标签节点
- Returns:
- { nodeId: nodeData }
- """
- nodes = {}
- tag_map = {} # 用于合并同名标签: tagId -> { sources, postIds, parentPath }
- if dimension_key not in pattern_data:
- return nodes
- def traverse_node(node: Dict, parent_path: List[str]):
- """递归遍历节点"""
- # 处理特征列表(标签)
- if "特征列表" in node:
- for feature in node["特征列表"]:
- tag_name = feature.get("特征名称", "")
- if not tag_name:
- continue
- source = {
- "pointName": feature.get("所属点", ""),
- "pointDesc": feature.get("点描述", ""),
- "postId": feature.get("帖子id", "")
- }
- tag_id = build_node_id("人设", dimension_name, "标签", tag_name)
- if tag_id not in tag_map:
- tag_map[tag_id] = {
- "name": tag_name,
- "sources": [],
- "postIds": set(),
- "parentPath": parent_path.copy()
- }
- tag_map[tag_id]["sources"].append(source)
- if source["postId"]:
- tag_map[tag_id]["postIds"].add(source["postId"])
- # 递归处理子节点
- for key, value in node.items():
- if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]:
- continue
- if isinstance(value, dict):
- current_path = parent_path + [key]
- traverse_node(value, current_path)
- traverse_node(pattern_data[dimension_key], [])
- # 转换为节点
- for tag_id, tag_info in tag_map.items():
- nodes[tag_id] = create_node(
- domain="人设",
- dimension=dimension_name,
- node_type="标签",
- name=tag_info["name"],
- detail={
- "parentPath": tag_info["parentPath"],
- "postIds": list(tag_info["postIds"]),
- "postCount": len(tag_info["postIds"]),
- "sources": tag_info["sources"]
- }
- )
- return nodes
- # ==================== 从 pattern 提取属于/包含边 ====================
- def extract_belong_contain_edges(
- pattern_data: Dict,
- dimension_key: str,
- dimension_name: str,
- nodes: Dict[str, Dict]
- ) -> Dict[str, Dict]:
- """
- 从 pattern 聚合结果中提取属于/包含边
- Returns:
- { edgeId: edgeData }
- """
- edges = {}
- if dimension_key not in pattern_data:
- return edges
- # 构建分类名称到ID的映射
- category_name_to_id = {}
- for node_id, node_data in nodes.items():
- if node_data["type"] == "分类" and node_data["dimension"] == dimension_name:
- category_name_to_id[node_data["name"]] = node_id
- # 为每个节点创建属于边(子→父)
- for node_id, node_data in nodes.items():
- if node_data["dimension"] != dimension_name:
- continue
- parent_path = node_data["detail"].get("parentPath", [])
- if not parent_path:
- continue
- # 取最后一个作为直接父分类
- parent_name = parent_path[-1]
- parent_id = category_name_to_id.get(parent_name)
- if parent_id:
- # 获取 source 和 target 的 postIds
- child_post_ids = node_data["detail"].get("postIds", [])
- parent_post_ids = nodes.get(parent_id, {}).get("detail", {}).get("postIds", [])
- # 属于边:子 → 父
- edge_id = build_edge_id(node_id, "属于", parent_id)
- edges[edge_id] = create_edge(
- source=node_id,
- target=parent_id,
- edge_type="属于",
- score=1.0,
- detail={
- "sourcePostIds": child_post_ids,
- "targetPostIds": parent_post_ids
- }
- )
- # 包含边:父 → 子
- edge_id_contain = build_edge_id(parent_id, "包含", node_id)
- edges[edge_id_contain] = create_edge(
- source=parent_id,
- target=node_id,
- edge_type="包含",
- score=1.0,
- detail={
- "sourcePostIds": parent_post_ids,
- "targetPostIds": child_post_ids
- }
- )
- return edges
- # ==================== 从关联分析提取分类共现边(跨点)====================
- def extract_category_cooccur_edges(associations_data: Dict, nodes: Dict[str, Dict]) -> Dict[str, Dict]:
- """
- 从 dimension_associations_analysis.json 中提取分类共现边(跨点)
- Args:
- associations_data: 关联分析数据
- nodes: 已构建的节点数据(用于获取节点的 postIds)
- Returns:
- { edgeId: edgeData }
- """
- edges = {}
- if "单维度关联分析" not in associations_data:
- return edges
- single_dim = associations_data["单维度关联分析"]
- # 维度映射
- dimension_map = {
- "灵感点维度": "灵感点",
- "目的点维度": "目的点",
- "关键点维度": "关键点"
- }
- def get_last_segment(path: str) -> str:
- """获取路径的最后一段"""
- return path.split("/")[-1]
- for dim_key, dim_data in single_dim.items():
- if dim_key not in dimension_map:
- continue
- source_dimension = dimension_map[dim_key]
- for direction_key, direction_data in dim_data.items():
- if direction_key == "说明" or "→" not in direction_key:
- continue
- for source_path, source_info in direction_data.items():
- source_name = get_last_segment(source_path)
- source_node_id = build_node_id("人设", source_dimension, "分类", source_name)
- for field_name, associations in source_info.items():
- if not field_name.startswith("与") or not field_name.endswith("的关联"):
- continue
- target_dimension = field_name[1:-3]
- if not isinstance(associations, list):
- continue
- for assoc in associations:
- target_path = assoc.get("目标分类", "")
- if not target_path:
- continue
- target_name = get_last_segment(target_path)
- target_node_id = build_node_id("人设", target_dimension, "分类", target_name)
- # 使用 Jaccard 作为 score
- jaccard = assoc.get("Jaccard相似度", 0)
- # 获取 source 和 target 的 postIds
- source_post_ids = nodes.get(source_node_id, {}).get("detail", {}).get("postIds", [])
- target_post_ids = nodes.get(target_node_id, {}).get("detail", {}).get("postIds", [])
- edge_id = build_edge_id(source_node_id, "分类共现", target_node_id)
- edges[edge_id] = create_edge(
- source=source_node_id,
- target=target_node_id,
- edge_type="分类共现",
- score=jaccard,
- detail={
- "postIds": assoc.get("共同帖子ID", []),
- "postCount": assoc.get("共同帖子数", 0),
- "jaccard": jaccard,
- "overlapCoef": assoc.get("重叠系数", 0),
- "sourcePostIds": source_post_ids,
- "targetPostIds": target_post_ids
- }
- )
- return edges
- # ==================== 从关联分析提取分类共现边(点内)====================
- def extract_intra_category_cooccur_edges(intra_data: Dict, nodes: Dict[str, Dict]) -> Dict[str, Dict]:
- """
- 从 intra_dimension_associations_analysis.json 中提取点内分类共现边
- Args:
- intra_data: 点内关联分析数据
- nodes: 已构建的节点数据(用于获取节点的 postIds)
- Returns:
- { edgeId: edgeData }
- """
- edges = {}
- if "叶子分类组合聚类" not in intra_data:
- return edges
- clusters_by_dim = intra_data["叶子分类组合聚类"]
- for dimension, clusters in clusters_by_dim.items():
- if dimension not in ("灵感点", "目的点", "关键点"):
- continue
- for cluster_key, cluster_data in clusters.items():
- leaf_categories = cluster_data.get("叶子分类组合", [])
- point_count = cluster_data.get("点数", 0)
- point_details = cluster_data.get("点详情列表", [])
- # 提取点名称列表
- point_names = [p.get("点名称", "") for p in point_details if p.get("点名称")]
- # 两两组合生成共现边
- for i in range(len(leaf_categories)):
- for j in range(i + 1, len(leaf_categories)):
- cat1 = leaf_categories[i]
- cat2 = leaf_categories[j]
- cat1_id = build_node_id("人设", dimension, "分类", cat1)
- cat2_id = build_node_id("人设", dimension, "分类", cat2)
- # 确保顺序一致(按字典序)
- if cat1_id > cat2_id:
- cat1_id, cat2_id = cat2_id, cat1_id
- edge_id = build_edge_id(cat1_id, "分类共现", cat2_id)
- if edge_id in edges:
- # 累加
- edges[edge_id]["detail"]["pointCount"] += point_count
- edges[edge_id]["detail"]["pointNames"].extend(point_names)
- else:
- # 获取 source 和 target 的 postIds
- cat1_post_ids = nodes.get(cat1_id, {}).get("detail", {}).get("postIds", [])
- cat2_post_ids = nodes.get(cat2_id, {}).get("detail", {}).get("postIds", [])
- # 计算 Jaccard(基于帖子)
- cat1_set = set(cat1_post_ids)
- cat2_set = set(cat2_post_ids)
- intersection = cat1_set & cat2_set
- union = cat1_set | cat2_set
- jaccard = round(len(intersection) / len(union), 4) if union else 0
- edges[edge_id] = create_edge(
- source=cat1_id,
- target=cat2_id,
- edge_type="分类共现",
- score=jaccard,
- detail={
- "postIds": list(intersection),
- "postCount": len(intersection),
- "jaccard": jaccard,
- "pointCount": point_count,
- "pointNames": point_names.copy(),
- "sourcePostIds": cat1_post_ids,
- "targetPostIds": cat2_post_ids
- }
- )
- return edges
- # ==================== 从历史帖子提取标签共现边 ====================
- def extract_tag_cooccur_edges(historical_posts_dir: Path, nodes: Dict[str, Dict]) -> Dict[str, Dict]:
- """
- 从历史帖子解构结果中提取标签共现边
- Args:
- historical_posts_dir: 历史帖子目录
- nodes: 已构建的节点数据(用于获取标签的 postIds 计算 Jaccard)
- Returns:
- { edgeId: edgeData }
- """
- edges = {}
- cooccur_map = {} # (tag1_id, tag2_id) -> { postIds: set() }
- if not historical_posts_dir.exists():
- print(f" 警告: 历史帖子目录不存在: {historical_posts_dir}")
- return edges
- json_files = list(historical_posts_dir.glob("*.json"))
- print(f" 找到 {len(json_files)} 个历史帖子文件")
- def extract_post_id_from_filename(filename: str) -> str:
- """从文件名中提取帖子ID"""
- import re
- match = re.match(r'^([^_]+)_', filename)
- return match.group(1) if match else ""
- def extract_tags_from_post(post_data: Dict) -> Dict[str, List[str]]:
- """从帖子解构结果中提取所有标签"""
- tags_by_dimension = {
- "灵感点": [],
- "目的点": [],
- "关键点": []
- }
- if "三点解构" not in post_data:
- return tags_by_dimension
- three_points = post_data["三点解构"]
- # 灵感点
- if "灵感点" in three_points:
- inspiration = three_points["灵感点"]
- for section in ["全新内容", "共性差异", "共性内容"]:
- if section in inspiration and isinstance(inspiration[section], list):
- for item in inspiration[section]:
- if "提取的特征" in item and isinstance(item["提取的特征"], list):
- for feature in item["提取的特征"]:
- tag_name = feature.get("特征名称", "")
- if tag_name:
- tags_by_dimension["灵感点"].append(tag_name)
- # 目的点
- if "目的点" in three_points:
- purpose = three_points["目的点"]
- if "purposes" in purpose and isinstance(purpose["purposes"], list):
- for item in purpose["purposes"]:
- if "提取的特征" in item and isinstance(item["提取的特征"], list):
- for feature in item["提取的特征"]:
- tag_name = feature.get("特征名称", "")
- if tag_name:
- tags_by_dimension["目的点"].append(tag_name)
- # 关键点
- if "关键点" in three_points:
- key_points = three_points["关键点"]
- if "key_points" in key_points and isinstance(key_points["key_points"], list):
- for item in key_points["key_points"]:
- if "提取的特征" in item and isinstance(item["提取的特征"], list):
- for feature in item["提取的特征"]:
- tag_name = feature.get("特征名称", "")
- if tag_name:
- tags_by_dimension["关键点"].append(tag_name)
- return tags_by_dimension
- # 遍历所有帖子文件
- for file_path in json_files:
- post_id = extract_post_id_from_filename(file_path.name)
- if not post_id:
- continue
- try:
- with open(file_path, "r", encoding="utf-8") as f:
- post_data = json.load(f)
- tags_by_dimension = extract_tags_from_post(post_data)
- # 对每个维度内的标签两两组合
- for dimension, tags in tags_by_dimension.items():
- unique_tags = list(set(tags))
- for i in range(len(unique_tags)):
- for j in range(i + 1, len(unique_tags)):
- tag1 = unique_tags[i]
- tag2 = unique_tags[j]
- tag1_id = build_node_id("人设", dimension, "标签", tag1)
- tag2_id = build_node_id("人设", dimension, "标签", tag2)
- # 确保顺序一致
- if tag1_id > tag2_id:
- tag1_id, tag2_id = tag2_id, tag1_id
- key = (tag1_id, tag2_id)
- if key not in cooccur_map:
- cooccur_map[key] = {"postIds": set()}
- cooccur_map[key]["postIds"].add(post_id)
- except Exception as e:
- print(f" 警告: 处理文件 {file_path.name} 时出错: {e}")
- # 转换为边
- for (tag1_id, tag2_id), info in cooccur_map.items():
- cooccur_post_ids = list(info["postIds"])
- cooccur_count = len(cooccur_post_ids)
- # 获取两个标签的帖子集合,计算 Jaccard
- tag1_post_ids = nodes.get(tag1_id, {}).get("detail", {}).get("postIds", [])
- tag2_post_ids = nodes.get(tag2_id, {}).get("detail", {}).get("postIds", [])
- union_count = len(set(tag1_post_ids) | set(tag2_post_ids))
- jaccard = round(cooccur_count / union_count, 4) if union_count > 0 else 0
- edge_id = build_edge_id(tag1_id, "标签共现", tag2_id)
- edges[edge_id] = create_edge(
- source=tag1_id,
- target=tag2_id,
- edge_type="标签共现",
- score=jaccard,
- detail={
- "postIds": cooccur_post_ids,
- "postCount": cooccur_count,
- "jaccard": jaccard,
- "sourcePostIds": tag1_post_ids,
- "targetPostIds": tag2_post_ids
- }
- )
- return edges
- # ==================== 构建嵌套树结构 ====================
- def build_nested_tree(nodes: Dict[str, Dict], edges: Dict[str, Dict]) -> Dict:
- """
- 从根节点开始,沿"包含"边递归构建嵌套树结构
- 包含边:父节点 -> 子节点
- 从根节点开始,递归找所有包含的子节点
- Returns:
- 嵌套的树结构
- """
- # 从"包含"边构建 父节点 -> [子节点] 的映射
- parent_to_children = {} # parent_id -> [child_id, ...]
- for edge_id, edge_data in edges.items():
- if edge_data["type"] == "包含":
- parent_id = edge_data["source"]
- child_id = edge_data["target"]
- if parent_id not in parent_to_children:
- parent_to_children[parent_id] = []
- parent_to_children[parent_id].append(child_id)
- # 递归构建子树
- def build_subtree(node_id: str) -> Dict:
- node_data = nodes[node_id]
- subtree = {
- "id": node_id,
- "name": node_data["name"],
- "type": node_data["type"],
- "domain": node_data["domain"],
- "dimension": node_data["dimension"],
- "detail": node_data.get("detail", {}),
- "children": []
- }
- # 获取子节点
- child_ids = parent_to_children.get(node_id, [])
- for child_id in child_ids:
- if child_id in nodes:
- subtree["children"].append(build_subtree(child_id))
- return subtree
- # 从根节点开始构建
- root_id = "人设:人设:人设:人设"
- return build_subtree(root_id)
- # ==================== 图游走工具 ====================
- def walk_graph(
- index: Dict,
- start_node: str,
- edge_types: List[str],
- direction: str = "out",
- min_score: float = None
- ) -> Set[str]:
- """
- 从起始节点出发,按指定边类型序列游走N步
- Args:
- index: 游走索引 {"outEdges": {...}, "inEdges": {...}}
- start_node: 起始节点ID
- edge_types: 边类型序列,如 ["属于", "分类共现"]
- direction: 游走方向 "out"(沿出边) / "in"(沿入边)
- min_score: 最小分数过滤
- Returns:
- 到达的节点ID集合
- Example:
- # 从标签出发,沿"属于"边走1步,再沿"分类共现"边走1步
- result = walk_graph(
- index,
- "人设:灵感点:标签:手绘风格",
- ["属于", "分类共现"]
- )
- """
- edge_index = index["outEdges"] if direction == "out" else index["inEdges"]
- target_key = "target" if direction == "out" else "source"
- current_nodes = {start_node}
- for edge_type in edge_types:
- next_nodes = set()
- for node in current_nodes:
- neighbors = edge_index.get(node, {}).get(edge_type, [])
- for neighbor in neighbors:
- # 分数过滤
- if min_score is not None and neighbor.get("score", 0) < min_score:
- continue
- next_nodes.add(neighbor[target_key])
- current_nodes = next_nodes
- if not current_nodes:
- break
- return current_nodes
- def get_neighbors(
- index: Dict,
- node_id: str,
- edge_type: str = None,
- direction: str = "out",
- min_score: float = None
- ) -> List[Dict]:
- """
- 获取节点的邻居
- Args:
- index: 游走索引
- node_id: 节点ID
- edge_type: 边类型(可选,不指定则返回所有类型)
- direction: 方向 "out" / "in"
- min_score: 最小分数过滤
- Returns:
- 邻居列表 [{"target": "...", "score": 0.5}, ...]
- """
- edge_index = index["outEdges"] if direction == "out" else index["inEdges"]
- node_edges = edge_index.get(node_id, {})
- if edge_type:
- neighbors = node_edges.get(edge_type, [])
- else:
- neighbors = []
- for edges in node_edges.values():
- neighbors.extend(edges)
- if min_score is not None:
- neighbors = [n for n in neighbors if n.get("score", 0) >= min_score]
- return neighbors
- # ==================== 构建索引 ====================
- def build_index(edges: Dict[str, Dict]) -> Dict:
- """
- 构建游走索引
- Returns:
- {
- "outEdges": { nodeId: { edgeType: [{ target, score }] } },
- "inEdges": { nodeId: { edgeType: [{ source, score }] } }
- }
- """
- out_edges = {}
- in_edges = {}
- for edge_id, edge_data in edges.items():
- source = edge_data["source"]
- target = edge_data["target"]
- edge_type = edge_data["type"]
- score = edge_data["score"]
- # outEdges
- if source not in out_edges:
- out_edges[source] = {}
- if edge_type not in out_edges[source]:
- out_edges[source][edge_type] = []
- out_edges[source][edge_type].append({
- "target": target,
- "score": score
- })
- # inEdges
- if target not in in_edges:
- in_edges[target] = {}
- if edge_type not in in_edges[target]:
- in_edges[target][edge_type] = []
- in_edges[target][edge_type].append({
- "source": source,
- "score": score
- })
- return {
- "outEdges": out_edges,
- "inEdges": in_edges
- }
- # ==================== 主函数 ====================
- def main():
- config = PathConfig()
- config.ensure_dirs()
- print(f"账号: {config.account_name}")
- print(f"输出版本: {config.output_version}")
- print()
- # 输入文件路径
- pattern_file = config.pattern_cluster_file
- associations_file = config.account_dir / "pattern相关文件/optimization/dimension_associations_analysis.json"
- intra_associations_file = config.account_dir / "pattern相关文件/optimization/intra_dimension_associations_analysis.json"
- historical_posts_dir = config.historical_posts_dir
- # 输出文件路径
- output_file = config.intermediate_dir / "人设图谱.json"
- print("输入文件:")
- print(f" pattern聚合文件: {pattern_file}")
- print(f" 跨点关联分析文件: {associations_file}")
- print(f" 点内关联分析文件: {intra_associations_file}")
- print(f" 历史帖子目录: {historical_posts_dir}")
- print(f"\n输出文件: {output_file}")
- print()
- # ===== 读取数据 =====
- print("=" * 60)
- print("读取数据...")
- print(" 读取 pattern 聚合结果...")
- with open(pattern_file, "r", encoding="utf-8") as f:
- pattern_data = json.load(f)
- print(" 读取跨点关联分析结果...")
- with open(associations_file, "r", encoding="utf-8") as f:
- associations_data = json.load(f)
- print(" 读取点内关联分析结果...")
- with open(intra_associations_file, "r", encoding="utf-8") as f:
- intra_associations_data = json.load(f)
- # ===== 提取节点 =====
- print("\n" + "=" * 60)
- print("提取节点...")
- all_nodes = {}
- dimension_mapping = {
- "灵感点列表": "灵感点",
- "目的点": "目的点",
- "关键点列表": "关键点"
- }
- # 分类节点
- print("\n提取分类节点:")
- for dim_key, dim_name in dimension_mapping.items():
- category_nodes = extract_category_nodes_from_pattern(pattern_data, dim_key, dim_name)
- all_nodes.update(category_nodes)
- print(f" {dim_name}: {len(category_nodes)} 个")
- # 标签节点
- print("\n提取标签节点:")
- for dim_key, dim_name in dimension_mapping.items():
- tag_nodes = extract_tag_nodes_from_pattern(pattern_data, dim_key, dim_name)
- all_nodes.update(tag_nodes)
- print(f" {dim_name}: {len(tag_nodes)} 个")
- # 统计
- category_count = sum(1 for n in all_nodes.values() if n["type"] == "分类")
- tag_count = sum(1 for n in all_nodes.values() if n["type"] == "标签")
- print(f"\n节点总计: {len(all_nodes)} (分类: {category_count}, 标签: {tag_count})")
- # ===== 提取边 =====
- print("\n" + "=" * 60)
- print("提取边...")
- all_edges = {}
- # 属于/包含边
- print("\n提取属于/包含边:")
- for dim_key, dim_name in dimension_mapping.items():
- belong_contain_edges = extract_belong_contain_edges(pattern_data, dim_key, dim_name, all_nodes)
- all_edges.update(belong_contain_edges)
- belong_count = sum(1 for e in all_edges.values() if e["type"] == "属于")
- contain_count = sum(1 for e in all_edges.values() if e["type"] == "包含")
- print(f" 属于边: {belong_count}, 包含边: {contain_count}")
- # 分类共现边(跨点)
- print("\n提取分类共现边(跨点):")
- category_cooccur_edges = extract_category_cooccur_edges(associations_data, all_nodes)
- all_edges.update(category_cooccur_edges)
- print(f" 分类共现边: {len(category_cooccur_edges)}")
- # 分类共现边(点内)
- print("\n提取分类共现边(点内):")
- intra_category_edges = extract_intra_category_cooccur_edges(intra_associations_data, all_nodes)
- all_edges.update(intra_category_edges)
- print(f" 分类共现边: {len(intra_category_edges)}")
- # 标签共现边
- print("\n提取标签共现边:")
- tag_cooccur_edges = extract_tag_cooccur_edges(historical_posts_dir, all_nodes)
- all_edges.update(tag_cooccur_edges)
- print(f" 标签共现边: {len(tag_cooccur_edges)}")
- # ===== 添加根节点和维度节点 =====
- print("\n添加根节点和维度节点:")
- # 收集所有帖子ID(用于根节点)
- all_post_ids_for_root = set()
- for node in all_nodes.values():
- post_ids = node["detail"].get("postIds", [])
- all_post_ids_for_root.update(post_ids)
- # 根节点
- root_id = "人设:人设:人设:人设"
- root_post_ids = list(all_post_ids_for_root)
- all_nodes[root_id] = create_node(
- domain="人设",
- dimension="人设",
- node_type="人设",
- name="人设",
- detail={
- "postIds": root_post_ids,
- "postCount": len(root_post_ids)
- }
- )
- # 维度节点 + 边
- dimensions = ["灵感点", "目的点", "关键点"]
- for dim in dimensions:
- # 收集该维度下所有节点的帖子ID
- dim_post_ids = set()
- for node in all_nodes.values():
- if node["dimension"] == dim:
- post_ids = node["detail"].get("postIds", [])
- dim_post_ids.update(post_ids)
- dim_post_ids_list = list(dim_post_ids)
- dim_id = f"人设:{dim}:{dim}:{dim}"
- all_nodes[dim_id] = create_node(
- domain="人设",
- dimension=dim,
- node_type=dim,
- name=dim,
- detail={
- "postIds": dim_post_ids_list,
- "postCount": len(dim_post_ids_list)
- }
- )
- # 维度 -> 根 的属于边
- edge_id = build_edge_id(dim_id, "属于", root_id)
- all_edges[edge_id] = create_edge(
- source=dim_id,
- target=root_id,
- edge_type="属于",
- score=1.0,
- detail={
- "sourcePostIds": dim_post_ids_list,
- "targetPostIds": root_post_ids
- }
- )
- # 根 -> 维度 的包含边
- edge_id_contain = build_edge_id(root_id, "包含", dim_id)
- all_edges[edge_id_contain] = create_edge(
- source=root_id,
- target=dim_id,
- edge_type="包含",
- score=1.0,
- detail={
- "sourcePostIds": root_post_ids,
- "targetPostIds": dim_post_ids_list
- }
- )
- # 找该维度下的顶级分类(没有父节点的分类),添加边
- dim_categories = [
- (nid, ndata) for nid, ndata in all_nodes.items()
- if ndata["dimension"] == dim and ndata["type"] == "分类"
- and not ndata["detail"].get("parentPath")
- ]
- for cat_id, cat_data in dim_categories:
- cat_post_ids = cat_data["detail"].get("postIds", [])
- # 顶级分类 -> 维度 的属于边
- edge_id = build_edge_id(cat_id, "属于", dim_id)
- all_edges[edge_id] = create_edge(
- source=cat_id,
- target=dim_id,
- edge_type="属于",
- score=1.0,
- detail={
- "sourcePostIds": cat_post_ids,
- "targetPostIds": dim_post_ids_list
- }
- )
- # 维度 -> 顶级分类 的包含边
- edge_id_contain = build_edge_id(dim_id, "包含", cat_id)
- all_edges[edge_id_contain] = create_edge(
- source=dim_id,
- target=cat_id,
- edge_type="包含",
- score=1.0,
- detail={
- "sourcePostIds": dim_post_ids_list,
- "targetPostIds": cat_post_ids
- }
- )
- print(f" 添加节点: 1 根节点 + 3 维度节点 = 4")
- print(f" 添加边: 根↔维度 6条 + 维度↔顶级分类")
- # 边统计
- edge_type_counts = {}
- for edge in all_edges.values():
- t = edge["type"]
- edge_type_counts[t] = edge_type_counts.get(t, 0) + 1
- print(f"\n边总计: {len(all_edges)}")
- for t, count in sorted(edge_type_counts.items(), key=lambda x: -x[1]):
- print(f" {t}: {count}")
- # ===== 计算节点概率 =====
- print("\n" + "=" * 60)
- print("计算节点概率...")
- # 1. 计算总帖子数(所有帖子ID的并集)
- all_post_ids = set()
- for node in all_nodes.values():
- post_ids = node["detail"].get("postIds", [])
- all_post_ids.update(post_ids)
- total_post_count = len(all_post_ids)
- print(f" 总帖子数: {total_post_count}")
- # 2. 为每个节点计算概率
- for node_id, node in all_nodes.items():
- post_count = node["detail"].get("postCount", 0)
- # 全局概率
- if total_post_count > 0:
- node["detail"]["probGlobal"] = round(post_count / total_post_count, 4)
- else:
- node["detail"]["probGlobal"] = 0
- # 相对父节点的概率
- # 通过"属于"边找父节点
- parent_edge_id = None
- for edge_id, edge in all_edges.items():
- if edge["source"] == node_id and edge["type"] == "属于":
- parent_node_id = edge["target"]
- parent_node = all_nodes.get(parent_node_id)
- if parent_node:
- parent_post_count = parent_node["detail"].get("postCount", 0)
- if parent_post_count > 0:
- node["detail"]["probToParent"] = round(post_count / parent_post_count, 4)
- else:
- node["detail"]["probToParent"] = 0
- break
- else:
- # 没有父节点(根节点)
- node["detail"]["probToParent"] = 1.0
- print(f" 已为 {len(all_nodes)} 个节点计算概率")
- # 3. 更新"包含"边的分数(使用子节点的 probToParent)
- contain_edge_updated = 0
- for edge_id, edge in all_edges.items():
- if edge["type"] == "包含":
- target_node = all_nodes.get(edge["target"])
- if target_node:
- edge["score"] = target_node["detail"].get("probToParent", 1.0)
- contain_edge_updated += 1
- print(f" 已更新 {contain_edge_updated} 条包含边的分数")
- # ===== 构建索引 =====
- print("\n" + "=" * 60)
- print("构建索引...")
- index = build_index(all_edges)
- print(f" outEdges 节点数: {len(index['outEdges'])}")
- print(f" inEdges 节点数: {len(index['inEdges'])}")
- # ===== 构建嵌套树 =====
- print("\n" + "=" * 60)
- print("构建嵌套树...")
- tree = build_nested_tree(all_nodes, all_edges)
- # 统计树节点数
- def count_tree_nodes(node):
- count = 1
- for child in node.get("children", []):
- count += count_tree_nodes(child)
- return count
- tree_node_count = count_tree_nodes(tree)
- print(f" 树节点数: {tree_node_count}")
- # ===== 统计各维度 =====
- dimension_stats = {}
- for dim_name in ["灵感点", "目的点", "关键点"]:
- dim_categories = sum(1 for n in all_nodes.values() if n["type"] == "分类" and n["dimension"] == dim_name)
- dim_tags = sum(1 for n in all_nodes.values() if n["type"] == "标签" and n["dimension"] == dim_name)
- dimension_stats[dim_name] = {
- "categoryCount": dim_categories,
- "tagCount": dim_tags
- }
- # ===== 构建输出 =====
- print("\n" + "=" * 60)
- print("保存结果...")
- output_data = {
- "meta": {
- "description": "人设图谱数据",
- "account": config.account_name,
- "createdAt": datetime.now().isoformat(),
- "stats": {
- "nodeCount": len(all_nodes),
- "edgeCount": len(all_edges),
- "categoryCount": category_count,
- "tagCount": tag_count,
- "treeNodeCount": tree_node_count,
- "dimensions": dimension_stats,
- "edgeTypes": edge_type_counts
- }
- },
- "nodes": all_nodes,
- "edges": all_edges,
- "index": index,
- "tree": tree
- }
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(output_data, f, ensure_ascii=False, indent=2)
- print(f"\n输出文件: {output_file}")
- print("\n" + "=" * 60)
- print("完成!")
- if __name__ == "__main__":
- main()
|