| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 从过去帖子_pattern聚合结果.json中提取特征名称及其对应的分类层级
- """
- import json
- from pathlib import Path
- from typing import Dict, List, Any, Optional, Set
- import sys
- import re
- # 添加项目根目录到路径
- project_root = Path(__file__).parent.parent.parent
- sys.path.insert(0, str(project_root))
- from script.detail import get_xiaohongshu_detail
- from script.data_processing.path_config import PathConfig
- def extract_post_id_from_filename(filename: str) -> str:
- """从文件名中提取帖子ID"""
- match = re.match(r'^([^_]+)_', filename)
- if match:
- return match.group(1)
- return ""
- def get_post_detail(post_id: str) -> Optional[Dict]:
- """获取帖子详情"""
- try:
- detail = get_xiaohongshu_detail(post_id)
- return detail
- except Exception as e:
- print(f" 警告: 获取帖子 {post_id} 详情失败: {e}")
- return None
- def get_current_post_ids(current_posts_dir: Path) -> Set[str]:
- """
- 获取当前帖子目录中的所有帖子ID
- Args:
- current_posts_dir: 当前帖子目录路径
- Returns:
- 当前帖子ID集合
- """
- if not current_posts_dir.exists():
- print(f"警告: 当前帖子目录不存在: {current_posts_dir}")
- return set()
- json_files = list(current_posts_dir.glob("*.json"))
- if not json_files:
- print(f"警告: 当前帖子目录为空: {current_posts_dir}")
- return set()
- print(f"\n正在获取当前帖子ID...")
- print(f"找到 {len(json_files)} 个当前帖子")
- post_ids = set()
- for file_path in json_files:
- post_id = extract_post_id_from_filename(file_path.name)
- if post_id:
- post_ids.add(post_id)
- print(f"提取到 {len(post_ids)} 个帖子ID")
- return post_ids
- def get_earliest_publish_time(current_posts_dir: Path) -> Optional[str]:
- """
- 获取当前帖子目录中最早的发布时间
- Args:
- current_posts_dir: 当前帖子目录路径
- Returns:
- 最早的发布时间字符串,格式为 "YYYY-MM-DD HH:MM:SS"
- """
- if not current_posts_dir.exists():
- print(f"警告: 当前帖子目录不存在: {current_posts_dir}")
- return None
- json_files = list(current_posts_dir.glob("*.json"))
- if not json_files:
- print(f"警告: 当前帖子目录为空: {current_posts_dir}")
- return None
- print(f"\n正在获取当前帖子的发布时间...")
- print(f"找到 {len(json_files)} 个当前帖子")
- earliest_time = None
- for file_path in json_files:
- post_id = extract_post_id_from_filename(file_path.name)
- if not post_id:
- continue
- try:
- detail = get_post_detail(post_id)
- if detail and 'publish_time' in detail:
- publish_time = detail['publish_time']
- if earliest_time is None or publish_time < earliest_time:
- earliest_time = publish_time
- print(f" 更新最早时间: {publish_time} (帖子: {post_id})")
- except Exception as e:
- print(f" 警告: 获取帖子 {post_id} 发布时间失败: {e}")
- if earliest_time:
- print(f"\n当前帖子最早发布时间: {earliest_time}")
- else:
- print("\n警告: 未能获取到任何当前帖子的发布时间")
- return earliest_time
- def collect_all_post_ids(data: Dict) -> Set[str]:
- """
- 收集数据中的所有帖子ID
- Args:
- data: 聚合结果数据
- Returns:
- 帖子ID集合
- """
- post_ids = set()
- def traverse_node(node):
- if isinstance(node, dict):
- # 检查是否有帖子列表
- if "帖子列表" in node and isinstance(node["帖子列表"], list):
- post_ids.update(node["帖子列表"])
- # 检查是否有特征列表
- if "特征列表" in node and isinstance(node["特征列表"], list):
- for feature in node["特征列表"]:
- if "帖子id" in feature:
- post_ids.add(feature["帖子id"])
- # 递归遍历
- for key, value in node.items():
- if key not in ["_meta", "帖子数", "特征数", "帖子列表"]:
- traverse_node(value)
- elif isinstance(node, list):
- for item in node:
- traverse_node(item)
- for category in ["灵感点列表", "目的点", "关键点列表"]:
- if category in data:
- traverse_node(data[category])
- return post_ids
- def filter_data_by_post_ids(data: Dict, exclude_post_ids: Set[str]) -> tuple[Dict, Set[str]]:
- """
- 根据帖子ID过滤数据(新规则:排除当前帖子ID)
- Args:
- data: 原始聚合结果数据
- exclude_post_ids: 要排除的帖子ID集合
- Returns:
- (过滤后的数据, 被过滤掉的帖子ID集合)
- """
- # 收集所有帖子ID
- all_post_ids = collect_all_post_ids(data)
- print(f"\n数据中包含 {len(all_post_ids)} 个不同的帖子")
- # 过滤帖子
- print(f"\n正在应用帖子ID过滤,排除当前帖子目录中的 {len(exclude_post_ids)} 个帖子...")
- filtered_post_ids = all_post_ids & exclude_post_ids # 交集:需要过滤的
- valid_post_ids = all_post_ids - exclude_post_ids # 差集:保留的
- if filtered_post_ids:
- print(f" ⚠️ 过滤掉 {len(filtered_post_ids)} 个当前帖子:")
- for post_id in sorted(list(filtered_post_ids)[:10]): # 最多显示10个
- print(f" - {post_id}")
- if len(filtered_post_ids) > 10:
- print(f" ... 还有 {len(filtered_post_ids) - 10} 个")
- print(f"\n过滤统计: 过滤掉 {len(filtered_post_ids)} 个帖子,保留 {len(valid_post_ids)} 个帖子")
- # 过滤数据
- filtered_data = filter_node_by_post_ids(data, valid_post_ids)
- return filtered_data, filtered_post_ids
- def filter_data_by_time(data: Dict, time_filter: str) -> tuple[Dict, Set[str]]:
- """
- 根据发布时间过滤数据(旧规则:基于时间)
- Args:
- data: 原始聚合结果数据
- time_filter: 时间过滤阈值
- Returns:
- (过滤后的数据, 被过滤掉的帖子ID集合)
- """
- # 收集所有帖子ID
- all_post_ids = collect_all_post_ids(data)
- print(f"\n数据中包含 {len(all_post_ids)} 个不同的帖子")
- # 获取所有帖子的详情
- print("正在获取帖子详情...")
- post_details = {}
- for i, post_id in enumerate(all_post_ids, 1):
- print(f"[{i}/{len(all_post_ids)}] 获取帖子 {post_id} 的详情...")
- detail = get_post_detail(post_id)
- if detail:
- post_details[post_id] = detail
- # 根据时间过滤(过滤掉发布时间晚于等于阈值的帖子,避免穿越)
- print(f"\n正在应用时间过滤 (< {time_filter}),避免使用晚于当前帖子的数据...")
- filtered_post_ids = set()
- valid_post_ids = set()
- for post_id, detail in post_details.items():
- publish_time = detail.get('publish_time', '')
- if publish_time < time_filter:
- valid_post_ids.add(post_id)
- else:
- filtered_post_ids.add(post_id)
- print(f" ⚠️ 过滤掉帖子 {post_id} (发布时间: {publish_time},晚于阈值)")
- print(f"\n过滤统计: 过滤掉 {len(filtered_post_ids)} 个帖子(穿越),保留 {len(valid_post_ids)} 个帖子")
- # 过滤数据
- filtered_data = filter_node_by_post_ids(data, valid_post_ids)
- return filtered_data, filtered_post_ids
- def filter_node_by_post_ids(node: Any, valid_post_ids: Set[str]) -> Any:
- """
- 递归过滤节点,只保留有效帖子的数据
- Args:
- node: 当前节点
- valid_post_ids: 有效的帖子ID集合
- Returns:
- 过滤后的节点
- """
- if isinstance(node, dict):
- filtered_node = {}
- # 处理特征列表
- if "特征列表" in node:
- filtered_features = []
- for feature in node["特征列表"]:
- if "帖子id" in feature and feature["帖子id"] in valid_post_ids:
- filtered_features.append(feature)
- if filtered_features:
- filtered_node["特征列表"] = filtered_features
- # 更新元数据
- if "_meta" in node:
- filtered_node["_meta"] = node["_meta"].copy()
- filtered_node["帖子数"] = len(set(f["帖子id"] for f in filtered_features if "帖子id" in f))
- filtered_node["特征数"] = len(filtered_features)
- # 更新帖子列表
- filtered_node["帖子列表"] = list(set(f["帖子id"] for f in filtered_features if "帖子id" in f))
- # 递归处理子节点
- for key, value in node.items():
- if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]:
- continue
- filtered_child = filter_node_by_post_ids(value, valid_post_ids)
- if filtered_child: # 只添加非空的子节点
- filtered_node[key] = filtered_child
- return filtered_node if filtered_node else None
- elif isinstance(node, list):
- return [filter_node_by_post_ids(item, valid_post_ids) for item in node]
- else:
- return node
- def extract_categories_from_node(node: Dict, current_path: List[str], result: Dict[str, Dict]):
- """
- 递归遍历树形结构,提取特征名称及其分类路径
- Args:
- node: 当前节点
- current_path: 当前分类路径(从下到上)
- result: 结果字典,用于存储特征名称到分类的映射
- """
- # 如果当前节点包含"特征列表"
- if "特征列表" in node:
- for feature in node["特征列表"]:
- feature_name = feature.get("特征名称")
- if feature_name:
- # 将分类路径存储到结果中
- result[feature_name] = {
- "所属分类": current_path.copy()
- }
- # 递归处理子节点
- for key, value in node.items():
- # 跳过特殊字段
- if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]:
- continue
- # 如果值是字典,继续递归
- if isinstance(value, dict):
- # 将当前key添加到路径中
- new_path = [key] + current_path
- extract_categories_from_node(value, new_path, result)
- def process_category(category_data: Dict, category_key: str) -> Dict[str, Dict]:
- """
- 处理单个分类(灵感点列表/目的点/关键点列表)
- Args:
- category_data: 分类数据
- category_key: 分类键名
- Returns:
- 特征名称到分类的映射字典
- """
- result = {}
- if isinstance(category_data, dict):
- extract_categories_from_node(category_data, [], result)
- return result
- def build_category_hierarchy_from_node(
- node: Dict,
- category_hierarchy: Dict[str, Dict],
- current_level: int = 1,
- parent_categories: List[str] = None
- ):
- """
- 递归构建分类层级结构
- Args:
- node: 当前节点
- category_hierarchy: 分类层级字典
- current_level: 当前层级(从1开始)
- parent_categories: 父级分类列表(从顶到下)
- """
- if parent_categories is None:
- parent_categories = []
- # 遍历当前节点的所有键
- for key, value in node.items():
- # 跳过特殊字段
- if key in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]:
- continue
- if isinstance(value, dict):
- # 初始化当前分类的信息
- if key not in category_hierarchy:
- category_hierarchy[key] = {
- "几级分类": current_level,
- "是否是叶子分类": False,
- "下一级": []
- }
- # 收集下一级的分类名称和特征名称
- next_level_items = []
- # 检查是否有子分类
- has_sub_categories = False
- for sub_key, sub_value in value.items():
- if sub_key not in ["特征列表", "_meta", "帖子数", "特征数", "帖子列表"]:
- if isinstance(sub_value, dict):
- has_sub_categories = True
- next_level_items.append({
- "节点类型": "分类",
- "节点名称": sub_key
- })
- # 如果有特征列表,添加特征名称
- if "特征列表" in value:
- for feature in value["特征列表"]:
- feature_name = feature.get("特征名称")
- if feature_name:
- next_level_items.append({
- "节点类型": "特征",
- "节点名称": feature_name
- })
- # 更新下一级列表
- category_hierarchy[key]["下一级"] = next_level_items
- # 如果没有子分类,标记为叶子分类
- if not has_sub_categories:
- category_hierarchy[key]["是否是叶子分类"] = True
- # 递归处理子节点
- new_parent_categories = parent_categories + [key]
- build_category_hierarchy_from_node(
- value,
- category_hierarchy,
- current_level + 1,
- new_parent_categories
- )
- def build_category_hierarchy(category_data: Dict) -> Dict[str, Dict]:
- """
- 构建分类名称到下一级的映射关系
- Args:
- category_data: 分类数据
- Returns:
- 分类层级映射字典
- """
- category_hierarchy = {}
- if isinstance(category_data, dict):
- build_category_hierarchy_from_node(category_data, category_hierarchy)
- return category_hierarchy
- def main():
- # 使用路径配置
- config = PathConfig()
- # 确保输出目录存在
- config.ensure_dirs()
- # 获取路径
- input_file = config.pattern_cluster_file
- current_posts_dir = config.current_posts_dir
- output_file_1 = config.feature_category_mapping_file
- output_file_2 = config.category_hierarchy_file
- print(f"账号: {config.account_name}")
- print(f"过滤模式: {config.filter_mode}")
- print(f"输入文件: {input_file}")
- print(f"当前帖子目录: {current_posts_dir}")
- print(f"输出文件1: {output_file_1}")
- print(f"输出文件2: {output_file_2}")
- print()
- # 读取输入文件
- print(f"\n正在读取文件: {input_file}")
- with open(input_file, "r", encoding="utf-8") as f:
- data = json.load(f)
- # 根据配置的过滤模式应用过滤
- filtered_post_ids = set()
- filter_mode = config.filter_mode
- if filter_mode == "exclude_current_posts":
- # 新规则:排除当前帖子目录中的帖子ID
- print("\n" + "="*60)
- print("应用过滤规则: 排除当前帖子ID")
- current_post_ids = get_current_post_ids(current_posts_dir)
- if current_post_ids:
- data, filtered_post_ids = filter_data_by_post_ids(data, current_post_ids)
- else:
- print("\n未找到当前帖子ID,跳过过滤")
- elif filter_mode == "time_based":
- # 旧规则:基于发布时间过滤
- print("\n" + "="*60)
- print("应用过滤规则: 基于发布时间")
- earliest_time = get_earliest_publish_time(current_posts_dir)
- if earliest_time:
- data, filtered_post_ids = filter_data_by_time(data, earliest_time)
- else:
- print("\n未能获取时间信息,跳过过滤")
- elif filter_mode == "none":
- print("\n过滤模式: none,不应用任何过滤")
- else:
- print(f"\n警告: 未知的过滤模式 '{filter_mode}',不应用过滤")
- # 处理结果1: 特征名称到分类的映射
- output_1 = {}
- # 处理灵感点列表
- if "灵感点列表" in data:
- print("正在处理: 灵感点列表 (特征名称映射)")
- output_1["灵感点"] = process_category(data["灵感点列表"], "灵感点列表")
- print(f" 提取了 {len(output_1['灵感点'])} 个特征")
- # 处理目的点
- if "目的点" in data:
- print("正在处理: 目的点 (特征名称映射)")
- output_1["目的点"] = process_category(data["目的点"], "目的点")
- print(f" 提取了 {len(output_1['目的点'])} 个特征")
- # 处理关键点列表
- if "关键点列表" in data:
- print("正在处理: 关键点列表 (特征名称映射)")
- output_1["关键点"] = process_category(data["关键点列表"], "关键点列表")
- print(f" 提取了 {len(output_1['关键点'])} 个特征")
- # 保存结果1
- print(f"\n正在保存结果到: {output_file_1}")
- with open(output_file_1, "w", encoding="utf-8") as f:
- json.dump(output_1, f, ensure_ascii=False, indent=4)
- print("完成!")
- if filtered_post_ids:
- print(f"\n总计 (特征名称映射,已过滤掉 {len(filtered_post_ids)} 个帖子):")
- else:
- print(f"\n总计 (特征名称映射):")
- for category, features in output_1.items():
- print(f" {category}: {len(features)} 个特征")
- # 处理结果2: 分类层级映射
- print("\n" + "="*60)
- print("开始生成分类层级映射...")
- output_2 = {}
- # 处理灵感点列表
- if "灵感点列表" in data:
- print("正在处理: 灵感点列表 (分类层级)")
- output_2["灵感点"] = build_category_hierarchy(data["灵感点列表"])
- print(f" 提取了 {len(output_2['灵感点'])} 个分类")
- # 处理目的点
- if "目的点" in data:
- print("正在处理: 目的点 (分类层级)")
- output_2["目的点"] = build_category_hierarchy(data["目的点"])
- print(f" 提取了 {len(output_2['目的点'])} 个分类")
- # 处理关键点列表
- if "关键点列表" in data:
- print("正在处理: 关键点列表 (分类层级)")
- output_2["关键点"] = build_category_hierarchy(data["关键点列表"])
- print(f" 提取了 {len(output_2['关键点'])} 个分类")
- # 保存结果2
- print(f"\n正在保存结果到: {output_file_2}")
- with open(output_file_2, "w", encoding="utf-8") as f:
- json.dump(output_2, f, ensure_ascii=False, indent=4)
- print("完成!")
- if filtered_post_ids:
- print(f"\n总计 (分类层级映射,已过滤掉 {len(filtered_post_ids)} 个帖子):")
- else:
- print(f"\n总计 (分类层级映射):")
- for category, hierarchies in output_2.items():
- print(f" {category}: {len(hierarchies)} 个分类")
- if __name__ == "__main__":
- main()
|