import json import os from typing import List, Dict, Tuple from fim import fpgrowth as pyfim_fpgrowth import pandas as pd def build_classification_path_label(classification_path: List[str]) -> str: """构建完整分类路径标签 Args: classification_path: 分类路径数组,如 ['食品', '水果', '猕猴桃'] Returns: 完整路径标签,如 '食品>水果>猕猴桃' """ if classification_path and len(classification_path) > 0: return '>'.join(classification_path) return "" def get_path_at_depth(classification_path: List[str], target_depth: int) -> str: """获取指定深度的分类路径 Args: classification_path: 分类路径数组,如 ['食品', '水果', '猕猴桃'] target_depth: 目标深度(1-based),如果路径不够深,返回最大深度 Returns: 指定深度的路径标签 """ if not classification_path or len(classification_path) == 0: return "" # 如果请求深度超过实际深度,使用实际深度(取全部) actual_depth = min(target_depth, len(classification_path)) return '>'.join(classification_path[:actual_depth]) def build_transactions_at_depth(results_file: str, target_depth, dimension_mode: str = 'full', data: Dict = None) -> Tuple[ List[List[str]], List[str], Dict]: """构建指定深度的transactions Args: results_file: 数据文件路径 target_depth: 目标深度 - 1, 2, 3...: 具体深度层级 - 'max': 最具体(叶子节点) - 'max_with_name': 最具体+名称 - 'mixed': 混合深度(同时包含 max 路径层 + max_with_name 名称层) - 'all_levels': 展开所有层级(A, A>B, A>B>C 同时放入transaction) - 'max-N': 从叶子端缩减N层(如 'max-1' 去掉最后1层,'max-2' 去掉最后2层) dimension_mode: 维度模式 - 'full': 点类型_维度_路径(默认,维度包括:实质/形式/意图) - 'point_type_only': 点类型_路径 - 'substance_form_only': 维度_路径(维度包括:实质/形式/意图) Returns: (transactions, post_ids, original_data) Item 格式(根据 dimension_mode): - full 模式: - 路径层:点类型_维度_路径(例:灵感点_实质_路径,灵感点_形式_路径,灵感点_意图_路径) - 名称层:点类型_维度_路径||名称 - point_type_only 模式: - 路径层:点类型_路径 - 名称层:点类型_路径||名称 - substance_form_only 模式: - 路径层:维度_路径(例:实质_路径,形式_路径,意图_路径) - 名称层:维度_路径||名称 """ if data is None: with open(results_file, 'r', encoding='utf-8') as f: data = json.load(f) # 解析 max-N 模式 trim_from_leaf = 0 if isinstance(target_depth, str) and target_depth.startswith('max-'): trim_from_leaf = int(target_depth.split('-')[1]) transactions = [] post_ids = [] for post_id, post_data in data.items(): post_transaction_set = set() for point_type in ['灵感点', '目的点', '关键点']: points = post_data.get(point_type, []) for point in points: # 根据 dimension_mode 构建前缀 if dimension_mode == 'full': prefix_substance = f"{point_type}_实质_" prefix_form = f"{point_type}_形式_" prefix_intent = f"{point_type}_意图_" elif dimension_mode == 'point_type_only': prefix_substance = f"{point_type}_" prefix_form = f"{point_type}_" prefix_intent = f"{point_type}_" elif dimension_mode == 'substance_form_only': prefix_substance = "实质_" prefix_form = "形式_" prefix_intent = "意图_" else: raise ValueError(f"Unknown dimension_mode: {dimension_mode}") # 辅助函数:展开路径的所有层级前缀 def expand_all_levels(path_list, prefix, start_depth=2): """将路径展开为多层级前缀,start_depth=1从根节点开始,2跳过根节点""" items = set() for depth in range(start_depth, len(path_list) + 1): label = '>'.join(path_list[:depth]) items.add(f"{prefix}{label}") return items # 处理实质 for item in point.get('实质', []): path = item.get('分类路径', []) name = item.get('名称', '') if path: if target_depth == 'all_levels': # 展开所有层级前缀 post_transaction_set.update(expand_all_levels(path, prefix_substance)) elif target_depth == 'mixed': # 混合深度:同时添加路径层和名称层 # 路径层代表分类,名称层代表具体的点 # 它们是不同的语义层级,应该同时存在以发现跨层级关系 path_label = '>'.join(path) # 添加路径层(分类) post_transaction_set.add(f"{prefix_substance}{path_label}") # 添加名称层(具体点) if name: post_transaction_set.add(f"{prefix_substance}{path_label}||{name}") elif target_depth == 'max': # 使用完整路径(叶子节点) path_label = '>'.join(path) post_transaction_set.add(f"{prefix_substance}{path_label}") elif target_depth == 'max_with_name': # 使用完整路径||名称 path_label = '>'.join(path) if name: post_transaction_set.add(f"{prefix_substance}{path_label}||{name}") else: # 如果没有名称,退化为路径 post_transaction_set.add(f"{prefix_substance}{path_label}") elif trim_from_leaf > 0: # max-N:从叶子端缩减N层 if len(path) > trim_from_leaf: trimmed = path[:-trim_from_leaf] else: trimmed = path[:1] path_label = '>'.join(trimmed) post_transaction_set.add(f"{prefix_substance}{path_label}") else: # 使用指定深度 path_label = get_path_at_depth(path, target_depth) if path_label: post_transaction_set.add(f"{prefix_substance}{path_label}") # 处理形式 for item in point.get('形式', []): path = item.get('分类路径', []) name = item.get('名称', '') if path: if target_depth == 'all_levels': post_transaction_set.update(expand_all_levels(path, prefix_form)) elif target_depth == 'mixed': # 混合深度:同时添加路径层和名称层 # 路径层代表分类,名称层代表具体的点 # 它们是不同的语义层级,应该同时存在以发现跨层级关系 path_label = '>'.join(path) # 添加路径层(分类) post_transaction_set.add(f"{prefix_form}{path_label}") # 添加名称层(具体点) if name: post_transaction_set.add(f"{prefix_form}{path_label}||{name}") elif target_depth == 'max': path_label = '>'.join(path) post_transaction_set.add(f"{prefix_form}{path_label}") elif target_depth == 'max_with_name': path_label = '>'.join(path) if name: post_transaction_set.add(f"{prefix_form}{path_label}||{name}") else: post_transaction_set.add(f"{prefix_form}{path_label}") elif trim_from_leaf > 0: if len(path) > trim_from_leaf: trimmed = path[:-trim_from_leaf] else: trimmed = path[:1] path_label = '>'.join(trimmed) post_transaction_set.add(f"{prefix_form}{path_label}") else: path_label = get_path_at_depth(path, target_depth) if path_label: post_transaction_set.add(f"{prefix_form}{path_label}") # 处理意图 for item in point.get('意图', []): path = item.get('分类路径', []) name = item.get('名称', '') if path: if target_depth == 'all_levels': post_transaction_set.update(expand_all_levels(path, prefix_intent, start_depth=1)) elif target_depth == 'mixed': # 混合深度:同时添加路径层和名称层 # 路径层代表分类,名称层代表具体的点 # 它们是不同的语义层级,应该同时存在以发现跨层级关系 path_label = '>'.join(path) # 添加路径层(分类) post_transaction_set.add(f"{prefix_intent}{path_label}") # 添加名称层(具体点) if name: post_transaction_set.add(f"{prefix_intent}{path_label}||{name}") elif target_depth == 'max': path_label = '>'.join(path) post_transaction_set.add(f"{prefix_intent}{path_label}") elif target_depth == 'max_with_name': path_label = '>'.join(path) if name: post_transaction_set.add(f"{prefix_intent}{path_label}||{name}") else: post_transaction_set.add(f"{prefix_intent}{path_label}") elif trim_from_leaf > 0: if len(path) > trim_from_leaf: trimmed = path[:-trim_from_leaf] else: trimmed = path[:1] path_label = '>'.join(trimmed) post_transaction_set.add(f"{prefix_intent}{path_label}") else: path_label = get_path_at_depth(path, target_depth) if path_label: post_transaction_set.add(f"{prefix_intent}{path_label}") post_transaction = list(post_transaction_set) if post_transaction: transactions.append(post_transaction) post_ids.append(post_id) return transactions, post_ids, data def run_fpgrowth_with_absolute_support(transactions: List[List[str]], min_absolute_support: int = 2, max_len: int = None) -> pd.DataFrame: """使用 pyfim FP-Growth 算法挖掘闭频繁项集 Args: transactions: transaction列表 min_absolute_support: 最小绝对支持度(至少出现在N个transactions中) max_len: 频繁项集的最大长度(None表示不限制) Returns: 包含闭频繁项集的DataFrame """ if not transactions or len(transactions) == 0: return pd.DataFrame() total_transactions = len(transactions) print(f" 使用算法: pyfim FP-Growth (闭频繁项集)") print(f" 总 transactions 数: {total_transactions}") print(f" 最小绝对支持度: {min_absolute_support}") import time start_time = time.time() # pyfim fpgrowth: supp 为负数表示绝对支持度, target='c' 为闭频繁项集, report='a' 返回绝对支持度计数 pyfim_kwargs = { 'supp': -min_absolute_support, 'target': 'c', 'report': 'a', } if max_len is not None: pyfim_kwargs['zmax'] = max_len print(f" 最大项集长度限制: {max_len}") result = pyfim_fpgrowth(transactions, **pyfim_kwargs) elapsed_time = time.time() - start_time print(f" 算法运行时间: {elapsed_time:.2f} 秒") if not result: return pd.DataFrame() # 转换为 DataFrame rows = [] for itemset_tuple, abs_support in result: itemset = frozenset(itemset_tuple) support = abs_support / total_transactions rows.append({ 'itemsets': itemset, 'support': support, 'absolute_support': int(abs_support), 'length': len(itemset), }) frequent_itemsets = pd.DataFrame(rows) print(f" 找到 {len(frequent_itemsets)} 个闭频繁项集") # 排序 if not frequent_itemsets.empty: frequent_itemsets = frequent_itemsets.sort_values(['length', 'support'], ascending=[False, False]) return frequent_itemsets def find_post_ids_for_itemset(itemset: frozenset, transactions: List[List[str]], post_ids: List[str]) -> List[str]: """找出包含指定频繁项集的所有帖子ID Args: itemset: 频繁项集 transactions: transaction列表 post_ids: 每个transaction对应的帖子ID列表 Returns: 包含该频繁项集的帖子ID列表 """ itemset_set = set(itemset) matched_post_ids = [] for idx, transaction in enumerate(transactions): transaction_set = set(transaction) # 如果 transaction 包含 itemset 的所有元素 if itemset_set.issubset(transaction_set): matched_post_ids.append(post_ids[idx]) return matched_post_ids def batch_find_post_ids(all_itemsets: List[frozenset], transactions: List[List[str]], post_ids: List[str]) -> Dict[frozenset, List[str]]: """批量查找所有频繁项集匹配的帖子ID(一次遍历 transactions) 相比对每个 itemset 单独调用 find_post_ids_for_itemset, 这里只遍历 transactions 一次,同时匹配所有 itemsets。 Args: all_itemsets: 所有待匹配的频繁项集列表 transactions: transaction列表 post_ids: 每个transaction对应的帖子ID列表 Returns: {itemset: [matched_post_id, ...], ...} """ result = {itemset: [] for itemset in all_itemsets} for idx, transaction in enumerate(transactions): txn_set = set(transaction) pid = post_ids[idx] for itemset in all_itemsets: if itemset.issubset(txn_set): result[itemset].append(pid) return result def classify_itemset_by_point_type(itemset: frozenset, dimension_mode: str = 'full') -> Dict: """分类频繁项集涉及的点类型和维度 Args: itemset: 频繁项集 dimension_mode: 维度模式 - 'full': 点类型_维度_路径(默认,维度包括:实质/形式/意图) - 'point_type_only': 点类型_路径 - 'substance_form_only': 维度_路径(维度包括:实质/形式/意图) Returns: { 'point_types': 涉及的点类型集合, 'dimensions': 涉及的维度集合, 'is_single_point_type': 是否只涉及单一点类型, 'is_cross_point': 是否跨点类型, 'combination_type': 组合类型描述 } """ point_types = set() dimensions = set() for item in itemset: if dimension_mode == 'full': # 格式: 点类型_维度_路径 或 点类型_维度_路径||名称 parts = item.split('_', 2) # 分割为最多3部分 if len(parts) >= 3: point_type = parts[0] dimension = parts[1] point_types.add(point_type) dimensions.add(dimension) elif dimension_mode == 'point_type_only': # 格式: 点类型_路径 或 点类型_路径||名称 # 只有点类型,没有维度信息 parts = item.split('_', 1) if len(parts) >= 1: point_type = parts[0] point_types.add(point_type) elif dimension_mode == 'substance_form_only': # 格式: 维度_路径 或 维度_路径||名称 # 只有维度,没有点类型信息 parts = item.split('_', 1) if len(parts) >= 2: dimension = parts[0] dimensions.add(dimension) is_single_point_type = len(point_types) <= 1 is_cross_point = len(point_types) > 1 # 生成组合类型描述 if dimension_mode == 'full': # full 模式:同时考虑点类型和维度 if is_single_point_type: point_type = list(point_types)[0] if point_types else '未知' if len(dimensions) == 1: dimension = list(dimensions)[0] combination_type = f"{point_type}_{dimension}" else: # 多个维度:使用实际的维度组合 dimensions_sorted = sorted(list(dimensions)) combination_type = f"{point_type}_{'+'.join(dimensions_sorted)}" else: # 跨点组合 point_types_list = sorted(list(point_types)) if len(dimensions) == 1: dimension = list(dimensions)[0] combination_type = f"{'×'.join(point_types_list)}_{dimension}" else: combination_type = f"{'×'.join(point_types_list)}_混合" elif dimension_mode == 'point_type_only': # point_type_only 模式:只考虑点类型 if is_single_point_type: point_type = list(point_types)[0] if point_types else '未知' combination_type = f"{point_type}" else: # 跨点组合 point_types_list = sorted(list(point_types)) combination_type = f"{'×'.join(point_types_list)}" elif dimension_mode == 'substance_form_only': # substance_form_only 模式:只考虑维度 if len(dimensions) == 1: dimension = list(dimensions)[0] combination_type = f"{dimension}" else: # 多个维度 dimensions_sorted = sorted(list(dimensions)) combination_type = f"{'+'.join(dimensions_sorted)}" return { 'point_types': list(point_types), 'dimensions': list(dimensions), 'is_single_point_type': is_single_point_type, 'is_cross_point': is_cross_point, 'combination_type': combination_type } def categorize_frequent_itemsets(frequent_itemsets: pd.DataFrame, dimension_mode: str = 'full') -> Dict[ str, pd.DataFrame]: """将频繁项集按组合类型分类 Args: frequent_itemsets: Apriori结果DataFrame dimension_mode: 维度模式 Returns: 按组合类型分类的字典 """ if frequent_itemsets.empty: return {} categorized = {} for _, row in frequent_itemsets.iterrows(): itemset = row['itemsets'] classification = classify_itemset_by_point_type(itemset, dimension_mode) combo_type = classification['combination_type'] if combo_type not in categorized: categorized[combo_type] = [] categorized[combo_type].append(row) # 转换为DataFrame result = {} for combo_type, rows in categorized.items(): result[combo_type] = pd.DataFrame(rows) return result def parse_mixed_item(item: str) -> Dict: """解析 item(统一格式) Args: item: 格式为 "点类型_维度_路径" 或 "点类型_维度_路径||名称" Returns: { 'layer': 'path' 或 'name', # 通过是否包含||判断 'point_type': 点类型, 'dimension': 维度, 'path': 路径, 'name': 名称(如果有), 'full_path': 完整路径(包含名称) } """ # 分离点类型、维度和路径 parts = item.split('_', 2) if len(parts) < 3: raise ValueError(f"Invalid item format: {item}") point_type = parts[0] dimension = parts[1] path_with_name = parts[2] # 分离路径和名称 if '||' in path_with_name: path, name = path_with_name.split('||', 1) layer = 'name' full_path = f"{path}>{name}" else: path = path_with_name name = None layer = 'path' full_path = path return { 'layer': layer, 'point_type': point_type, 'dimension': dimension, 'path': path, 'name': name, 'full_path': full_path } def is_ancestor_descendant_pair(item1: str, item2: str) -> bool: """检查两个 item 是否为父子/祖孙关系 Args: item1, item2: 混合深度格式的 item Returns: True 如果存在祖先-后代关系 """ try: parsed1 = parse_mixed_item(item1) parsed2 = parse_mixed_item(item2) except ValueError: return False # 必须是相同的点类型和维度 if (parsed1['point_type'] != parsed2['point_type'] or parsed1['dimension'] != parsed2['dimension']): return False # 检查 full_path 的包含关系 path1 = parsed1['full_path'] path2 = parsed2['full_path'] # item1 是 item2 的祖先 if path2.startswith(path1 + '>'): return True # item2 是 item1 的祖先 if path1.startswith(path2 + '>'): return True return False def is_cross_level_combination(itemset) -> bool: """检查是否为跨层级组合(有意义的组合) 有意义的组合: - 包含至少一个 PATH 和一个 NAME - 或者全是 NAME 但来自不同的路径分支 无意义的组合(需要过滤): - 全是 PATH - 全是 NAME 且来自同一路径分支(兄弟节点) - 存在父子/祖孙关系 Args: itemset: frozenset 或 list of items Returns: True 如果是有意义的跨层级组合 """ items = list(itemset) # 规则1:检查是否存在父子/祖孙关系 for i, item1 in enumerate(items): for j, item2 in enumerate(items): if i < j and is_ancestor_descendant_pair(item1, item2): return False # 存在父子关系,过滤 # 解析所有 items parsed_items = [] for item in items: try: parsed_items.append(parse_mixed_item(item)) except ValueError: return False # 统计层级分布 layers = [p['layer'] for p in parsed_items] layer_counts = {'path': layers.count('path'), 'name': layers.count('name')} # 规则2:过滤全是 path 的组合 if layer_counts['name'] == 0: return False # 规则3:过滤全是 name 的组合(不是跨层级) if layer_counts['path'] == 0: return False # 只有同时包含 path 和 name 才是有意义的跨层级组合 return True def filter_mixed_depth_itemsets(frequent_itemsets: pd.DataFrame) -> Tuple[pd.DataFrame, Dict]: """过滤混合深度的频繁项集,只保留有意义的跨层级组合 Args: frequent_itemsets: Apriori 结果 Returns: (filtered_itemsets, filter_stats) """ if frequent_itemsets.empty: return frequent_itemsets, {} filtered_rows = [] filter_stats = { 'total': len(frequent_itemsets), 'filtered_all_path': 0, 'filtered_same_path_names': 0, 'filtered_ancestor_descendant': 0, 'kept': 0 } for _, row in frequent_itemsets.iterrows(): itemset = row['itemsets'] # 最小长度检查 if len(itemset) < 2: continue # 跨层级检查 if not is_cross_level_combination(itemset): # 统计过滤原因(简化版) layers = [] for item in itemset: try: parsed = parse_mixed_item(item) layers.append(parsed['layer']) except ValueError: pass if layers.count('name') == 0: filter_stats['filtered_all_path'] += 1 else: filter_stats['filtered_same_path_names'] += 1 continue filtered_rows.append(row) filter_stats['kept'] += 1 if not filtered_rows: return pd.DataFrame(), filter_stats result = pd.DataFrame(filtered_rows) return result.sort_values(['length', 'support'], ascending=[False, False]), filter_stats def format_results_for_json(frequent_itemsets: pd.DataFrame, total_transactions: int, transactions: List[List[str]], post_ids: List[str], depth=None, dimension_mode: str = 'full') -> Dict: """格式化结果为JSON格式 Args: frequent_itemsets: Apriori结果DataFrame total_transactions: transaction总数 transactions: transaction列表 post_ids: 每个transaction对应的帖子ID depth: 深度标记(可选) dimension_mode: 维度模式 Returns: 格式化的结果字典 """ if frequent_itemsets.empty: return { 'depth': depth, 'total_transactions': total_transactions, 'frequent_itemsets_by_type': {} } # 按组合类型分类(传递 dimension_mode) categorized = categorize_frequent_itemsets(frequent_itemsets, dimension_mode) # 批量查找所有 itemset 匹配的帖子(一次遍历 transactions) all_itemsets = list(frequent_itemsets['itemsets']) import time t0 = time.time() itemset_post_map = batch_find_post_ids(all_itemsets, transactions, post_ids) print(f" 批量匹配帖子耗时: {time.time() - t0:.2f} 秒 ({len(all_itemsets)} 个项集 × {len(transactions)} 个帖子)") results_by_type = {} for combo_type, itemsets_df in categorized.items(): itemsets_list = [] for _, row in itemsets_df.iterrows(): itemset = row['itemsets'] matched_posts = itemset_post_map[itemset] # 分类信息(传递 dimension_mode) classification = classify_itemset_by_point_type(itemset, dimension_mode) itemset_dict = { 'itemset': list(itemset), 'support': float(row['support']), 'absolute_support': int(row['absolute_support']), 'length': int(row['length']), 'post_count': len(matched_posts), 'matched_posts': matched_posts, 'point_types': classification['point_types'], 'dimensions': classification['dimensions'], 'is_cross_point': classification['is_cross_point'] } itemsets_list.append(itemset_dict) results_by_type[combo_type] = { 'count': len(itemsets_list), 'itemsets': itemsets_list } return { 'depth': depth, 'total_transactions': total_transactions, 'min_support_used': float(frequent_itemsets['support'].min()) if not frequent_itemsets.empty else 0, 'frequent_itemsets_by_type': results_by_type }