| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435 |
- """
- 构建 Item 共现关系图
- 以每个 item 为节点,基于原始帖子中的共现关系建图:
- - co_in_post: 共同出现在同一帖子中(两个分类出现在同一帖子即为一次共现)
- - hierarchy: 分类路径上的层级关系(祖先/后代)
- """
- import json
- import os
- from collections import defaultdict
- from itertools import combinations
- from typing import List, Dict, Optional
- from .apriori_analysis_post_level import (
- build_transactions_at_depth,
- )
- # ──────────────────────────────────────────────
- # 1. 解析 item meta 信息
- # ──────────────────────────────────────────────
- def parse_item_meta(item: str, dimension_mode: str) -> Dict:
- """解析 item 字符串,提取 meta 信息"""
- if dimension_mode == 'full':
- # 格式: 点类型_维度_路径 或 点类型_维度_路径||名称
- parts = item.split('_', 2)
- if len(parts) < 3:
- return {'raw': item}
- point_type, dimension, path_with_name = parts
- elif dimension_mode == 'substance_form_only':
- # 格式: 维度_路径
- parts = item.split('_', 1)
- if len(parts) < 2:
- return {'raw': item}
- point_type, dimension, path_with_name = None, parts[0], parts[1]
- elif dimension_mode == 'point_type_only':
- # 格式: 点类型_路径
- parts = item.split('_', 1)
- if len(parts) < 2:
- return {'raw': item}
- point_type, dimension, path_with_name = parts[0], None, parts[1]
- else:
- return {'raw': item}
- if '||' in path_with_name:
- path, name = path_with_name.split('||', 1)
- layer = 'name'
- else:
- path, name = path_with_name, None
- layer = 'path'
- return {
- 'point_type': point_type,
- 'dimension': dimension,
- 'path': path,
- 'name': name,
- 'layer': layer,
- }
- # ──────────────────────────────────────────────
- # 2. 检测层级关系
- # ──────────────────────────────────────────────
- def detect_hierarchy(item_a: str, meta_a: Dict, item_b: str, meta_b: Dict) -> Optional[Dict]:
- """检测两个 item 之间是否存在层级关系
- Returns:
- 从 item_a 视角看的层级关系 dict,或 None
- """
- # 必须同 point_type 和 dimension
- if meta_a.get('point_type') != meta_b.get('point_type'):
- return None
- if meta_a.get('dimension') != meta_b.get('dimension'):
- return None
- # 构建 full_path(含 name)
- def full_path(meta):
- p = meta.get('path', '')
- n = meta.get('name')
- return f"{p}>{n}" if n else p
- fp_a = full_path(meta_a)
- fp_b = full_path(meta_b)
- if not fp_a or not fp_b or fp_a == fp_b:
- return None
- if fp_b.startswith(fp_a + '>'):
- depth_diff = fp_b.count('>') - fp_a.count('>')
- return {'relation': 'ancestor', 'depth_diff': depth_diff}
- if fp_a.startswith(fp_b + '>'):
- depth_diff = fp_a.count('>') - fp_b.count('>')
- return {'relation': 'descendant', 'depth_diff': depth_diff}
- return None
- # ──────────────────────────────────────────────
- # 3. 核心:构建 item graph
- # ──────────────────────────────────────────────
- def collect_elements_for_items(original_data: Dict, dimension_mode: str) -> Dict[str, Dict[str, Dict]]:
- """从原始数据中收集每个分类路径下的具体元素名称(计数 + 来源帖子ID)
- Args:
- original_data: point_classification_results.json 的完整数据
- dimension_mode: 维度模式
- Returns:
- {item_key: {element_name: {count: int, post_ids: [str, ...]}, ...}, ...}
- """
- elements_map = defaultdict(lambda: defaultdict(lambda: {'count': 0, 'post_ids': set()}))
- for post_id, post_data in original_data.items():
- for point_type in ['灵感点', '目的点', '关键点']:
- points = post_data.get(point_type, [])
- for point in points:
- # 按维度处理
- dim_configs = [
- ('实质', point.get('实质', [])),
- ('形式', point.get('形式', [])),
- ('意图', point.get('意图', [])),
- ]
- for dim_name, items in dim_configs:
- for item in items:
- path = item.get('分类路径', [])
- name = item.get('名称', '')
- if not path or not name:
- continue
- path_label = '>'.join(path)
- # 构建 item_key(与 build_transactions_at_depth 中 depth='max' 一致)
- if dimension_mode == 'full':
- item_key = f"{point_type}_{dim_name}_{path_label}"
- elif dimension_mode == 'point_type_only':
- item_key = f"{point_type}_{path_label}"
- elif dimension_mode == 'substance_form_only':
- item_key = f"{dim_name}_{path_label}"
- else:
- continue
- elements_map[item_key][name]['count'] += 1
- elements_map[item_key][name]['post_ids'].add(post_id)
- # 将 set 转为 sorted list
- return {
- k: {
- name: {'count': v['count'], 'post_ids': sorted(v['post_ids'])}
- for name, v in elements.items()
- }
- for k, elements in elements_map.items()
- }
- def build_item_graph(
- transactions: List[List[str]],
- post_ids: List[str],
- dimension_mode: str,
- post_account_map: Dict[str, str] = None,
- elements_map: Dict[str, Dict[str, int]] = None,
- ) -> Dict:
- """基于原始帖子的共现关系构建 item 关系图
- Args:
- transactions: transaction 列表(每个帖子的 item 列表)
- post_ids: 帖子 ID 列表
- dimension_mode: 维度模式
- post_account_map: post_id → account_name 映射(可选)
- elements_map: item_key → {element_name: count} 映射(可选)
- Returns:
- graph dict: {item_str: {meta: {...}, edges: {other_item: {edge_type: {...}}}}}
- """
- # ── 3.1 收集所有 item,构建 meta ──
- all_items = set()
- for txn in transactions:
- all_items.update(txn)
- item_metas = {}
- for item in all_items:
- item_metas[item] = parse_item_meta(item, dimension_mode)
- # ── 3.2 统计每个 item 出现在多少个帖子中,收集 item → accounts ──
- freq_in_posts = defaultdict(int)
- item_accounts = defaultdict(set)
- for i, txn in enumerate(transactions):
- pid = post_ids[i]
- account = post_account_map.get(pid) if post_account_map else None
- for item in set(txn):
- freq_in_posts[item] += 1
- if account:
- item_accounts[item].add(account)
- # ── 3.3 构建 co_in_post 边 ──
- co_post_edges = defaultdict(lambda: {
- 'co_post_count': 0,
- 'post_ids': [],
- })
- for i, txn in enumerate(transactions):
- items = sorted(set(txn))
- for a, b in combinations(items, 2):
- co_post_edges[(a, b)]['co_post_count'] += 1
- co_post_edges[(a, b)]['post_ids'].append(post_ids[i])
- # ── 3.4 组装 graph ──
- graph = {}
- for item in all_items:
- meta = item_metas.get(item, {'raw': item})
- meta['frequency_in_posts'] = freq_in_posts[item]
- if item_accounts[item]:
- meta['accounts'] = sorted(item_accounts[item])
- # 附加元素信息(深拷贝,避免 build_post_ids_index 就地修改影响原始 elements_map)
- if elements_map and item in elements_map:
- # 按出现次数降序排列
- sorted_elements = {
- name: {'count': v['count'], 'post_ids': list(v['post_ids'])}
- for name, v in sorted(elements_map[item].items(), key=lambda x: -x[1]['count'])
- }
- meta['elements'] = sorted_elements
- graph[item] = {
- 'meta': meta,
- 'edges': {},
- }
- # 填充 co_in_post 边(双向)
- for (a, b), data in co_post_edges.items():
- edge_payload = {
- 'co_post_count': data['co_post_count'],
- 'post_ids': data['post_ids'],
- }
- # A → B
- _ensure_edge(graph, a, b)
- graph[a]['edges'][b]['co_in_post'] = {
- **edge_payload,
- 'confidence': round(data['co_post_count'] / freq_in_posts[a], 4),
- }
- # B → A
- _ensure_edge(graph, b, a)
- graph[b]['edges'][a]['co_in_post'] = {
- **edge_payload,
- 'confidence': round(data['co_post_count'] / freq_in_posts[b], 4),
- }
- # 填充 hierarchy 边(双向)
- items_list = sorted(all_items)
- for i, a in enumerate(items_list):
- for b in items_list[i + 1:]:
- h = detect_hierarchy(a, item_metas[a], b, item_metas[b])
- if h:
- _ensure_edge(graph, a, b)
- graph[a]['edges'][b]['hierarchy'] = h
- reverse_relation = 'descendant' if h['relation'] == 'ancestor' else 'ancestor'
- _ensure_edge(graph, b, a)
- graph[b]['edges'][a]['hierarchy'] = {
- 'relation': reverse_relation,
- 'depth_diff': h['depth_diff'],
- }
- return graph
- def _ensure_edge(graph: Dict, source: str, target: str):
- """确保 graph[source]['edges'][target] 存在"""
- if source not in graph:
- graph[source] = {'meta': {}, 'edges': {}}
- if target not in graph[source]['edges']:
- graph[source]['edges'][target] = {}
- # ──────────────────────────────────────────────
- # 4. post_ids 全局索引(压缩 JSON 体积)
- # ──────────────────────────────────────────────
- def build_post_ids_index(graph: Dict) -> List[str]:
- """收集所有 post_ids,构建全局索引,将 graph 中的 post_ids 就地替换为索引数组
- Returns:
- post_ids 索引列表
- """
- all_pids = set()
- # 从 edges 收集
- for item_data in graph.values():
- for target, edge_types in item_data.get('edges', {}).items():
- cp = edge_types.get('co_in_post')
- if cp and cp.get('post_ids'):
- all_pids.update(cp['post_ids'])
- # 从 elements 收集
- for item_data in graph.values():
- elements = item_data.get('meta', {}).get('elements', {})
- for el_data in elements.values():
- if isinstance(el_data, dict) and el_data.get('post_ids'):
- all_pids.update(el_data['post_ids'])
- index = sorted(all_pids)
- pid_to_idx = {pid: i for i, pid in enumerate(index)}
- # 替换 edges 中的 post_ids
- for item_data in graph.values():
- for target, edge_types in item_data.get('edges', {}).items():
- cp = edge_types.get('co_in_post')
- if cp and cp.get('post_ids'):
- cp['post_ids'] = [pid_to_idx[pid] for pid in cp['post_ids']]
- # 替换 elements 中的 post_ids
- for item_data in graph.values():
- elements = item_data.get('meta', {}).get('elements', {})
- for el_data in elements.values():
- if isinstance(el_data, dict) and el_data.get('post_ids'):
- el_data['post_ids'] = [pid_to_idx[pid] for pid in el_data['post_ids']]
- return index
- # ──────────────────────────────────────────────
- # 5. 主函数
- # ──────────────────────────────────────────────
- # 支持的 depth 模式及说明
- DEPTH_MODES = {
- 'max': '叶子节点完整路径',
- 'all_levels': '展开所有层级',
- }
- # 支持的 dimension 模式
- DIMENSION_MODES = ['full', 'point_type_only', 'substance_form_only']
- def main(account_name: str,
- depth_modes: List[str] = None,
- dimension_modes: List[str] = None):
- """构建 item 共现关系图
- Args:
- account_name: 账号名称
- depth_modes: 要构建的 depth 模式列表,默认 ['max', 'all_levels']
- dimension_modes: 要构建的 dimension 模式列表,默认 ['full']
- """
- if depth_modes is None:
- depth_modes = ['max', 'all_levels']
- if dimension_modes is None:
- dimension_modes = ['full']
- base_dir = f"result/{account_name}"
- results_file = os.path.join(base_dir, "topic_point_data/point_classification_results.json")
- post_account_map_file = os.path.join(base_dir, "topic_point_data/post_account_map.json")
- output_dir = os.path.join(base_dir, "item_graph")
- os.makedirs(output_dir, exist_ok=True)
- # 加载原始数据(只加载一次)
- with open(results_file, 'r', encoding='utf-8') as f:
- original_data = json.load(f)
- print(f"已加载原始数据: {len(original_data)} 个帖子")
- # 加载 post → account 映射
- post_account_map = None
- if os.path.exists(post_account_map_file):
- with open(post_account_map_file, 'r', encoding='utf-8') as f:
- post_account_map = json.load(f)
- print(f"已加载 post_account_map: {len(post_account_map)} 条")
- print(f"{'=' * 60}")
- print(f"构建 Item 共现关系图: {account_name}")
- print(f"{'=' * 60}")
- for dimension_mode in dimension_modes:
- # 每种 dimension_mode 只计算一次 elements_map(与 depth 无关)
- elements_map = collect_elements_for_items(original_data, dimension_mode)
- print(f"\n维度模式: {dimension_mode} | 含元素的分类数: {len(elements_map)}")
- for depth in depth_modes:
- print(f"\n{'─' * 60}")
- print(f"维度模式: {dimension_mode} | 深度: {depth} ({DEPTH_MODES.get(depth, depth)})")
- # 复用已加载的 original_data
- transactions, post_ids, _ = build_transactions_at_depth(
- results_file, depth, dimension_mode=dimension_mode, data=original_data
- )
- print(f"帖子数: {len(post_ids)}")
- # 统计唯一 items
- unique_items = set()
- for txn in transactions:
- unique_items.update(txn)
- print(f"唯一 item 数: {len(unique_items)}")
- # 构建基础图(co_in_post + hierarchy)
- graph = build_item_graph(transactions, post_ids, dimension_mode, post_account_map, elements_map)
- # 统计
- total_edges = sum(len(node['edges']) for node in graph.values())
- edge_type_counts = defaultdict(int)
- for node in graph.values():
- for target_node, edge_types in node['edges'].items():
- for et in edge_types:
- edge_type_counts[et] += 1
- print(f"\n 图节点数: {len(graph)}")
- print(f" 图边数(有向): {total_edges}")
- for et, cnt in sorted(edge_type_counts.items()):
- print(f" {et}: {cnt}")
- # 构建 post_ids 全局索引并压缩
- post_ids_index = build_post_ids_index(graph)
- # 保存 item graph(带索引)
- output = {'_post_ids_index': post_ids_index, **graph}
- output_file = os.path.join(output_dir, f"item_graph_{dimension_mode}_{depth}.json")
- with open(output_file, 'w', encoding='utf-8') as f:
- json.dump(output, f, ensure_ascii=False, indent=2)
- print(f" 已保存图: {output_file} (post_ids索引: {len(post_ids_index)})")
- print(f"\n{'=' * 60}")
- print("完成")
- if __name__ == "__main__":
- account_name = "小红书"
- main(account_name, depth_modes=['max', 'all_levels'],
- dimension_modes=['full', 'point_type_only', 'substance_form_only'])
- import visualization_item_graph.generate as generate
- generate.generate_html(account_name, f"result/{account_name}")
|