#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 从过去帖子_pattern聚合结果.json中提取特征名称及其对应的分类层级 """ import json from pathlib import Path from typing import Dict, List, Any, Optional, Set import sys import re # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from script.detail import get_xiaohongshu_detail def extract_post_id_from_filename(filename: str) -> str: """从文件名中提取帖子ID""" match = re.match(r'^([^_]+)_', filename) if match: return match.group(1) return "" def get_post_detail(post_id: str) -> Optional[Dict]: """获取帖子详情""" try: detail = get_xiaohongshu_detail(post_id) return detail except Exception as e: print(f" 警告: 获取帖子 {post_id} 详情失败: {e}") return None def get_earliest_publish_time(current_posts_dir: Path) -> Optional[str]: """ 获取当前帖子目录中最早的发布时间 Args: current_posts_dir: 当前帖子目录路径 Returns: 最早的发布时间字符串,格式为 "YYYY-MM-DD HH:MM:SS" """ if not current_posts_dir.exists(): print(f"警告: 当前帖子目录不存在: {current_posts_dir}") return None json_files = list(current_posts_dir.glob("*.json")) if not json_files: print(f"警告: 当前帖子目录为空: {current_posts_dir}") return None print(f"\n正在获取当前帖子的发布时间...") print(f"找到 {len(json_files)} 个当前帖子") earliest_time = None for file_path in json_files: post_id = extract_post_id_from_filename(file_path.name) if not post_id: continue try: detail = get_post_detail(post_id) if detail and 'publish_time' in detail: publish_time = detail['publish_time'] if earliest_time is None or publish_time < earliest_time: earliest_time = publish_time print(f" 更新最早时间: {publish_time} (帖子: {post_id})") except Exception as e: print(f" 警告: 获取帖子 {post_id} 发布时间失败: {e}") if earliest_time: print(f"\n当前帖子最早发布时间: {earliest_time}") else: print("\n警告: 未能获取到任何当前帖子的发布时间") return earliest_time def collect_all_post_ids(data: Dict) -> Set[str]: """ 收集数据中的所有帖子ID Args: data: 聚合结果数据 Returns: 帖子ID集合 """ post_ids = set() def traverse_node(node): if isinstance(node, dict): # 检查是否有帖子列表 if "帖子列表" in node and isinstance(node["帖子列表"], list): post_ids.update(node["帖子列表"]) # 检查是否有特征列表 if "特征列表" in node and isinstance(node["特征列表"], list): for feature in node["特征列表"]: if "帖子id" in feature: post_ids.add(feature["帖子id"]) # 递归遍历 for key, value in node.items(): if key not in ["_meta", "帖子数", "特征数", "帖子列表"]: traverse_node(value) elif isinstance(node, list): for item in node: traverse_node(item) for category in ["灵感点列表", "目的点", "关键点列表"]: if category in data: traverse_node(data[category]) return post_ids def filter_data_by_time(data: Dict, time_filter: str) -> tuple[Dict, Set[str]]: """ 根据发布时间过滤数据 Args: data: 原始聚合结果数据 time_filter: 时间过滤阈值 Returns: (过滤后的数据, 被过滤掉的帖子ID集合) """ # 收集所有帖子ID all_post_ids = collect_all_post_ids(data) print(f"\n数据中包含 {len(all_post_ids)} 个不同的帖子") # 获取所有帖子的详情 print("正在获取帖子详情...") post_details = {} for i, post_id in enumerate(all_post_ids, 1): print(f"[{i}/{len(all_post_ids)}] 获取帖子 {post_id} 的详情...") detail = get_post_detail(post_id) if detail: post_details[post_id] = detail # 根据时间过滤(过滤掉发布时间晚于等于阈值的帖子,避免穿越) print(f"\n正在应用时间过滤 (< {time_filter}),避免使用晚于当前帖子的数据...") filtered_post_ids = set() valid_post_ids = set() for post_id, detail in post_details.items(): publish_time = detail.get('publish_time', '') if publish_time < time_filter: valid_post_ids.add(post_id) else: filtered_post_ids.add(post_id) print(f" ⚠️ 过滤掉帖子 {post_id} (发布时间: {publish_time},晚于阈值)") print(f"\n过滤统计: 过滤掉 {len(filtered_post_ids)} 个帖子(穿越),保留 {len(valid_post_ids)} 个帖子") # 过滤数据 filtered_data = filter_node_by_post_ids(data, valid_post_ids) return filtered_data, filtered_post_ids def filter_node_by_post_ids(node: Any, valid_post_ids: Set[str]) -> Any: """ 递归过滤节点,只保留有效帖子的数据 Args: node: 当前节点 valid_post_ids: 有效的帖子ID集合 Returns: 过滤后的节点 """ if isinstance(node, dict): filtered_node = {} # 处理特征列表 if "特征列表" in node: filtered_features = [] for feature in node["特征列表"]: if "帖子id" in feature and feature["帖子id"] in valid_post_ids: filtered_features.append(feature) if filtered_features: filtered_node["特征列表"] = filtered_features # 更新元数据 if "_meta" in node: filtered_node["_meta"] = node["_meta"].copy() filtered_node["帖子数"] = len(set(f["帖子id"] for f in filtered_features if "帖子id" in f)) filtered_node["特征数"] = len(filtered_features) # 更新帖子列表 filtered_node["帖子列表"] = list(set(f["帖子id"] for f in filtered_features if "帖子id" in f)) # 递归处理子节点 for key, value in node.items(): if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]: continue filtered_child = filter_node_by_post_ids(value, valid_post_ids) if filtered_child: # 只添加非空的子节点 filtered_node[key] = filtered_child return filtered_node if filtered_node else None elif isinstance(node, list): return [filter_node_by_post_ids(item, valid_post_ids) for item in node] else: return node def extract_categories_from_node(node: Dict, current_path: List[str], result: Dict[str, Dict]): """ 递归遍历树形结构,提取特征名称及其分类路径 Args: node: 当前节点 current_path: 当前分类路径(从下到上) result: 结果字典,用于存储特征名称到分类的映射 """ # 如果当前节点包含"特征列表" if "特征列表" in node: for feature in node["特征列表"]: feature_name = feature.get("特征名称") if feature_name: # 将分类路径存储到结果中 result[feature_name] = { "所属分类": current_path.copy() } # 递归处理子节点 for key, value in node.items(): # 跳过特殊字段 if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]: continue # 如果值是字典,继续递归 if isinstance(value, dict): # 将当前key添加到路径中 new_path = [key] + current_path extract_categories_from_node(value, new_path, result) def process_category(category_data: Dict, category_key: str) -> Dict[str, Dict]: """ 处理单个分类(灵感点列表/目的点/关键点列表) Args: category_data: 分类数据 category_key: 分类键名 Returns: 特征名称到分类的映射字典 """ result = {} if isinstance(category_data, dict): extract_categories_from_node(category_data, [], result) return result def build_category_hierarchy_from_node( node: Dict, category_hierarchy: Dict[str, Dict], current_level: int = 1, parent_categories: List[str] = None ): """ 递归构建分类层级结构 Args: node: 当前节点 category_hierarchy: 分类层级字典 current_level: 当前层级(从1开始) parent_categories: 父级分类列表(从顶到下) """ if parent_categories is None: parent_categories = [] # 遍历当前节点的所有键 for key, value in node.items(): # 跳过特殊字段 if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]: continue if isinstance(value, dict): # 初始化当前分类的信息 if key not in category_hierarchy: category_hierarchy[key] = { "几级分类": current_level, "是否是叶子分类": False, "下一级": [] } # 收集下一级的分类名称和特征名称 next_level_items = [] # 检查是否有子分类 has_sub_categories = False for sub_key, sub_value in value.items(): if sub_key not in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]: if isinstance(sub_value, dict): has_sub_categories = True next_level_items.append({ "节点类型": "分类", "节点名称": sub_key }) # 如果有特征列表,添加特征名称 if "特征列表" in value: for feature in value["特征列表"]: feature_name = feature.get("特征名称") if feature_name: next_level_items.append({ "节点类型": "特征", "节点名称": feature_name }) # 更新下一级列表 category_hierarchy[key]["下一级"] = next_level_items # 如果没有子分类,标记为叶子分类 if not has_sub_categories: category_hierarchy[key]["是否是叶子分类"] = True # 递归处理子节点 new_parent_categories = parent_categories + [key] build_category_hierarchy_from_node( value, category_hierarchy, current_level + 1, new_parent_categories ) def build_category_hierarchy(category_data: Dict) -> Dict[str, Dict]: """ 构建分类名称到下一级的映射关系 Args: category_data: 分类数据 Returns: 分类层级映射字典 """ category_hierarchy = {} if isinstance(category_data, dict): build_category_hierarchy_from_node(category_data, category_hierarchy) return category_hierarchy def main(): # 输入输出文件路径(默认使用项目根目录下的 data/data_1117 目录) script_dir = Path(__file__).parent project_root = script_dir.parent.parent data_dir = project_root / "data" / "data_1117" input_file = data_dir / "过去帖子_pattern聚合结果.json" current_posts_dir = data_dir / "当前帖子_what解构结果" output_file_1 = data_dir / "特征名称_分类映射.json" output_file_2 = data_dir / "分类层级映射.json" # 获取当前帖子的最早发布时间 earliest_time = get_earliest_publish_time(current_posts_dir) # 读取输入文件 print(f"\n正在读取文件: {input_file}") with open(input_file, "r", encoding="utf-8") as f: data = json.load(f) # 如果有时间过滤,应用过滤 filtered_post_ids = set() if earliest_time: print("\n" + "="*60) print("开始应用时间过滤...") data, filtered_post_ids = filter_data_by_time(data, earliest_time) if filtered_post_ids: print(f"\n⚠️ 警告: 以下 {len(filtered_post_ids)} 个帖子因发布时间晚于阈值被过滤:") for post_id in sorted(filtered_post_ids): print(f" - {post_id}") else: print("\n未启用时间过滤") # 处理结果1: 特征名称到分类的映射 output_1 = {} # 处理灵感点列表 if "灵感点列表" in data: print("正在处理: 灵感点列表 (特征名称映射)") output_1["灵感点"] = process_category(data["灵感点列表"], "灵感点列表") print(f" 提取了 {len(output_1['灵感点'])} 个特征") # 处理目的点 if "目的点" in data: print("正在处理: 目的点 (特征名称映射)") output_1["目的点"] = process_category(data["目的点"], "目的点") print(f" 提取了 {len(output_1['目的点'])} 个特征") # 处理关键点列表 if "关键点列表" in data: print("正在处理: 关键点列表 (特征名称映射)") output_1["关键点"] = process_category(data["关键点列表"], "关键点列表") print(f" 提取了 {len(output_1['关键点'])} 个特征") # 保存结果1 print(f"\n正在保存结果到: {output_file_1}") with open(output_file_1, "w", encoding="utf-8") as f: json.dump(output_1, f, ensure_ascii=False, indent=4) print("完成!") if earliest_time: print(f"\n总计 (特征名称映射,已过滤掉发布时间 >= {earliest_time} 的帖子):") else: print(f"\n总计 (特征名称映射):") for category, features in output_1.items(): print(f" {category}: {len(features)} 个特征") # 处理结果2: 分类层级映射 print("\n" + "="*60) print("开始生成分类层级映射...") output_2 = {} # 处理灵感点列表 if "灵感点列表" in data: print("正在处理: 灵感点列表 (分类层级)") output_2["灵感点"] = build_category_hierarchy(data["灵感点列表"]) print(f" 提取了 {len(output_2['灵感点'])} 个分类") # 处理目的点 if "目的点" in data: print("正在处理: 目的点 (分类层级)") output_2["目的点"] = build_category_hierarchy(data["目的点"]) print(f" 提取了 {len(output_2['目的点'])} 个分类") # 处理关键点列表 if "关键点列表" in data: print("正在处理: 关键点列表 (分类层级)") output_2["关键点"] = build_category_hierarchy(data["关键点列表"]) print(f" 提取了 {len(output_2['关键点'])} 个分类") # 保存结果2 print(f"\n正在保存结果到: {output_file_2}") with open(output_file_2, "w", encoding="utf-8") as f: json.dump(output_2, f, ensure_ascii=False, indent=4) print("完成!") if earliest_time: print(f"\n总计 (分类层级映射,已过滤掉发布时间 >= {earliest_time} 的帖子):") else: print(f"\n总计 (分类层级映射):") for category, hierarchies in output_2.items(): print(f" {category}: {len(hierarchies)} 个分类") if __name__ == "__main__": main()