""" 构建 Item 共现关系图 以每个 item 为节点,基于原始帖子中的共现关系建图: - co_in_post: 共同出现在同一帖子中(两个分类出现在同一帖子即为一次共现) - hierarchy: 分类路径上的层级关系(祖先/后代) """ import json import os from collections import defaultdict from itertools import combinations from typing import List, Dict, Optional from .apriori_analysis_post_level import ( build_transactions_at_depth, ) # ────────────────────────────────────────────── # 1. 解析 item meta 信息 # ────────────────────────────────────────────── def parse_item_meta(item: str, dimension_mode: str) -> Dict: """解析 item 字符串,提取 meta 信息""" if dimension_mode == 'full': # 格式: 点类型_维度_路径 或 点类型_维度_路径||名称 parts = item.split('_', 2) if len(parts) < 3: return {'raw': item} point_type, dimension, path_with_name = parts elif dimension_mode == 'substance_form_only': # 格式: 维度_路径 parts = item.split('_', 1) if len(parts) < 2: return {'raw': item} point_type, dimension, path_with_name = None, parts[0], parts[1] elif dimension_mode == 'point_type_only': # 格式: 点类型_路径 parts = item.split('_', 1) if len(parts) < 2: return {'raw': item} point_type, dimension, path_with_name = parts[0], None, parts[1] else: return {'raw': item} if '||' in path_with_name: path, name = path_with_name.split('||', 1) layer = 'name' else: path, name = path_with_name, None layer = 'path' return { 'point_type': point_type, 'dimension': dimension, 'path': path, 'name': name, 'layer': layer, } # ────────────────────────────────────────────── # 2. 检测层级关系 # ────────────────────────────────────────────── def detect_hierarchy(item_a: str, meta_a: Dict, item_b: str, meta_b: Dict) -> Optional[Dict]: """检测两个 item 之间是否存在层级关系 Returns: 从 item_a 视角看的层级关系 dict,或 None """ # 必须同 point_type 和 dimension if meta_a.get('point_type') != meta_b.get('point_type'): return None if meta_a.get('dimension') != meta_b.get('dimension'): return None # 构建 full_path(含 name) def full_path(meta): p = meta.get('path', '') n = meta.get('name') return f"{p}>{n}" if n else p fp_a = full_path(meta_a) fp_b = full_path(meta_b) if not fp_a or not fp_b or fp_a == fp_b: return None if fp_b.startswith(fp_a + '>'): depth_diff = fp_b.count('>') - fp_a.count('>') return {'relation': 'ancestor', 'depth_diff': depth_diff} if fp_a.startswith(fp_b + '>'): depth_diff = fp_a.count('>') - fp_b.count('>') return {'relation': 'descendant', 'depth_diff': depth_diff} return None # ────────────────────────────────────────────── # 3. 核心:构建 item graph # ────────────────────────────────────────────── def collect_elements_for_items(original_data: Dict, dimension_mode: str) -> Dict[str, Dict[str, Dict]]: """从原始数据中收集每个分类路径下的具体元素名称(计数 + 来源帖子ID) Args: original_data: point_classification_results.json 的完整数据 dimension_mode: 维度模式 Returns: {item_key: {element_name: {count: int, post_ids: [str, ...]}, ...}, ...} """ elements_map = defaultdict(lambda: defaultdict(lambda: {'count': 0, 'post_ids': set()})) for post_id, post_data in original_data.items(): for point_type in ['灵感点', '目的点', '关键点']: points = post_data.get(point_type, []) for point in points: # 按维度处理 dim_configs = [ ('实质', point.get('实质', [])), ('形式', point.get('形式', [])), ('意图', point.get('意图', [])), ] for dim_name, items in dim_configs: for item in items: path = item.get('分类路径', []) name = item.get('名称', '') if not path or not name: continue path_label = '>'.join(path) # 构建 item_key(与 build_transactions_at_depth 中 depth='max' 一致) if dimension_mode == 'full': item_key = f"{point_type}_{dim_name}_{path_label}" elif dimension_mode == 'point_type_only': item_key = f"{point_type}_{path_label}" elif dimension_mode == 'substance_form_only': item_key = f"{dim_name}_{path_label}" else: continue elements_map[item_key][name]['count'] += 1 elements_map[item_key][name]['post_ids'].add(post_id) # 将 set 转为 sorted list return { k: { name: {'count': v['count'], 'post_ids': sorted(v['post_ids'])} for name, v in elements.items() } for k, elements in elements_map.items() } def build_item_graph( transactions: List[List[str]], post_ids: List[str], dimension_mode: str, post_account_map: Dict[str, str] = None, elements_map: Dict[str, Dict[str, int]] = None, ) -> Dict: """基于原始帖子的共现关系构建 item 关系图 Args: transactions: transaction 列表(每个帖子的 item 列表) post_ids: 帖子 ID 列表 dimension_mode: 维度模式 post_account_map: post_id → account_name 映射(可选) elements_map: item_key → {element_name: count} 映射(可选) Returns: graph dict: {item_str: {meta: {...}, edges: {other_item: {edge_type: {...}}}}} """ # ── 3.1 收集所有 item,构建 meta ── all_items = set() for txn in transactions: all_items.update(txn) item_metas = {} for item in all_items: item_metas[item] = parse_item_meta(item, dimension_mode) # ── 3.2 统计每个 item 出现在多少个帖子中,收集 item → accounts ── freq_in_posts = defaultdict(int) item_accounts = defaultdict(set) for i, txn in enumerate(transactions): pid = post_ids[i] account = post_account_map.get(pid) if post_account_map else None for item in set(txn): freq_in_posts[item] += 1 if account: item_accounts[item].add(account) # ── 3.3 构建 co_in_post 边 ── co_post_edges = defaultdict(lambda: { 'co_post_count': 0, 'post_ids': [], }) for i, txn in enumerate(transactions): items = sorted(set(txn)) for a, b in combinations(items, 2): co_post_edges[(a, b)]['co_post_count'] += 1 co_post_edges[(a, b)]['post_ids'].append(post_ids[i]) # ── 3.4 组装 graph ── graph = {} for item in all_items: meta = item_metas.get(item, {'raw': item}) meta['frequency_in_posts'] = freq_in_posts[item] if item_accounts[item]: meta['accounts'] = sorted(item_accounts[item]) # 附加元素信息(深拷贝,避免 build_post_ids_index 就地修改影响原始 elements_map) if elements_map and item in elements_map: # 按出现次数降序排列 sorted_elements = { name: {'count': v['count'], 'post_ids': list(v['post_ids'])} for name, v in sorted(elements_map[item].items(), key=lambda x: -x[1]['count']) } meta['elements'] = sorted_elements graph[item] = { 'meta': meta, 'edges': {}, } # 填充 co_in_post 边(双向) for (a, b), data in co_post_edges.items(): edge_payload = { 'co_post_count': data['co_post_count'], 'post_ids': data['post_ids'], } # A → B _ensure_edge(graph, a, b) graph[a]['edges'][b]['co_in_post'] = { **edge_payload, 'confidence': round(data['co_post_count'] / freq_in_posts[a], 4), } # B → A _ensure_edge(graph, b, a) graph[b]['edges'][a]['co_in_post'] = { **edge_payload, 'confidence': round(data['co_post_count'] / freq_in_posts[b], 4), } # 填充 hierarchy 边(双向) items_list = sorted(all_items) for i, a in enumerate(items_list): for b in items_list[i + 1:]: h = detect_hierarchy(a, item_metas[a], b, item_metas[b]) if h: _ensure_edge(graph, a, b) graph[a]['edges'][b]['hierarchy'] = h reverse_relation = 'descendant' if h['relation'] == 'ancestor' else 'ancestor' _ensure_edge(graph, b, a) graph[b]['edges'][a]['hierarchy'] = { 'relation': reverse_relation, 'depth_diff': h['depth_diff'], } return graph def _ensure_edge(graph: Dict, source: str, target: str): """确保 graph[source]['edges'][target] 存在""" if source not in graph: graph[source] = {'meta': {}, 'edges': {}} if target not in graph[source]['edges']: graph[source]['edges'][target] = {} # ────────────────────────────────────────────── # 4. post_ids 全局索引(压缩 JSON 体积) # ────────────────────────────────────────────── def build_post_ids_index(graph: Dict) -> List[str]: """收集所有 post_ids,构建全局索引,将 graph 中的 post_ids 就地替换为索引数组 Returns: post_ids 索引列表 """ all_pids = set() # 从 edges 收集 for item_data in graph.values(): for target, edge_types in item_data.get('edges', {}).items(): cp = edge_types.get('co_in_post') if cp and cp.get('post_ids'): all_pids.update(cp['post_ids']) # 从 elements 收集 for item_data in graph.values(): elements = item_data.get('meta', {}).get('elements', {}) for el_data in elements.values(): if isinstance(el_data, dict) and el_data.get('post_ids'): all_pids.update(el_data['post_ids']) index = sorted(all_pids) pid_to_idx = {pid: i for i, pid in enumerate(index)} # 替换 edges 中的 post_ids for item_data in graph.values(): for target, edge_types in item_data.get('edges', {}).items(): cp = edge_types.get('co_in_post') if cp and cp.get('post_ids'): cp['post_ids'] = [pid_to_idx[pid] for pid in cp['post_ids']] # 替换 elements 中的 post_ids for item_data in graph.values(): elements = item_data.get('meta', {}).get('elements', {}) for el_data in elements.values(): if isinstance(el_data, dict) and el_data.get('post_ids'): el_data['post_ids'] = [pid_to_idx[pid] for pid in el_data['post_ids']] return index # ────────────────────────────────────────────── # 5. 主函数 # ────────────────────────────────────────────── # 支持的 depth 模式及说明 DEPTH_MODES = { 'max': '叶子节点完整路径', 'all_levels': '展开所有层级', } # 支持的 dimension 模式 DIMENSION_MODES = ['full', 'point_type_only', 'substance_form_only'] def main(account_name: str, depth_modes: List[str] = None, dimension_modes: List[str] = None): """构建 item 共现关系图 Args: account_name: 账号名称 depth_modes: 要构建的 depth 模式列表,默认 ['max', 'all_levels'] dimension_modes: 要构建的 dimension 模式列表,默认 ['full'] """ if depth_modes is None: depth_modes = ['max', 'all_levels'] if dimension_modes is None: dimension_modes = ['full'] base_dir = f"result/{account_name}" results_file = os.path.join(base_dir, "topic_point_data/point_classification_results.json") post_account_map_file = os.path.join(base_dir, "topic_point_data/post_account_map.json") output_dir = os.path.join(base_dir, "item_graph") os.makedirs(output_dir, exist_ok=True) # 加载原始数据(只加载一次) with open(results_file, 'r', encoding='utf-8') as f: original_data = json.load(f) print(f"已加载原始数据: {len(original_data)} 个帖子") # 加载 post → account 映射 post_account_map = None if os.path.exists(post_account_map_file): with open(post_account_map_file, 'r', encoding='utf-8') as f: post_account_map = json.load(f) print(f"已加载 post_account_map: {len(post_account_map)} 条") print(f"{'=' * 60}") print(f"构建 Item 共现关系图: {account_name}") print(f"{'=' * 60}") for dimension_mode in dimension_modes: # 每种 dimension_mode 只计算一次 elements_map(与 depth 无关) elements_map = collect_elements_for_items(original_data, dimension_mode) print(f"\n维度模式: {dimension_mode} | 含元素的分类数: {len(elements_map)}") for depth in depth_modes: print(f"\n{'─' * 60}") print(f"维度模式: {dimension_mode} | 深度: {depth} ({DEPTH_MODES.get(depth, depth)})") # 复用已加载的 original_data transactions, post_ids, _ = build_transactions_at_depth( results_file, depth, dimension_mode=dimension_mode, data=original_data ) print(f"帖子数: {len(post_ids)}") # 统计唯一 items unique_items = set() for txn in transactions: unique_items.update(txn) print(f"唯一 item 数: {len(unique_items)}") # 构建基础图(co_in_post + hierarchy) graph = build_item_graph(transactions, post_ids, dimension_mode, post_account_map, elements_map) # 统计 total_edges = sum(len(node['edges']) for node in graph.values()) edge_type_counts = defaultdict(int) for node in graph.values(): for target_node, edge_types in node['edges'].items(): for et in edge_types: edge_type_counts[et] += 1 print(f"\n 图节点数: {len(graph)}") print(f" 图边数(有向): {total_edges}") for et, cnt in sorted(edge_type_counts.items()): print(f" {et}: {cnt}") # 构建 post_ids 全局索引并压缩 post_ids_index = build_post_ids_index(graph) # 保存 item graph(带索引) output = {'_post_ids_index': post_ids_index, **graph} output_file = os.path.join(output_dir, f"item_graph_{dimension_mode}_{depth}.json") with open(output_file, 'w', encoding='utf-8') as f: json.dump(output, f, ensure_ascii=False, indent=2) print(f" 已保存图: {output_file} (post_ids索引: {len(post_ids_index)})") print(f"\n{'=' * 60}") print("完成") if __name__ == "__main__": account_name = "小红书" main(account_name, depth_modes=['max', 'all_levels'], dimension_modes=['full', 'point_type_only', 'substance_form_only']) import visualization_item_graph.generate as generate generate.generate_html(account_name, f"result/{account_name}")