#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 从过去帖子_what解构结果目录中提取特征组合及其来源信息 特征组合格式: ['特征名称1', '特征名称2', ...] """ import json from pathlib import Path from typing import Dict, List, Optional import re import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from script.detail import get_xiaohongshu_detail def extract_post_id_from_filename(filename: str) -> str: """从文件名中提取帖子ID""" match = re.match(r'^([^_]+)_', filename) if match: return match.group(1) return "" def get_post_detail(post_id: str) -> Optional[Dict]: """ 获取帖子详情 Args: post_id: 帖子ID Returns: 帖子详情字典,如果获取失败则返回None """ try: detail = get_xiaohongshu_detail(post_id) return detail except Exception as e: print(f" 警告: 获取帖子 {post_id} 详情失败: {e}") return None def extract_feature_combination_from_point(point_data: Dict, post_id: str, point_name: str, point_description: str) -> Optional[Dict]: """ 从单个点(灵感点/目的点/关键点)中提取特征组合信息 Args: point_data: 点的数据 post_id: 帖子ID point_name: 点的名称 point_description: 点的描述 Returns: 特征组合字典,如果没有特征则返回None """ # 检查是否有"提取的特征"字段 if "提取的特征" not in point_data or not isinstance(point_data["提取的特征"], list): return None features = point_data["提取的特征"] if not features: return None # 提取所有特征名称,组成特征组合 feature_names = [f["特征名称"] for f in features if "特征名称" in f] if not feature_names: return None return { "特征组合": feature_names, "点的名称": point_name, "点的描述": point_description, "帖子id": post_id } def process_single_file(file_path: Path) -> Dict[str, List[Dict]]: """ 处理单个JSON文件,提取所有特征组合信息 Args: file_path: JSON文件路径 Returns: 包含灵感点、目的点、关键点的特征组合列表字典 """ result = { "灵感点": [], "目的点": [], "关键点": [] } # 从文件名提取帖子ID post_id = extract_post_id_from_filename(file_path.name) try: with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) # 提取三点解构数据 if "三点解构" not in data: return result three_points = data["三点解构"] # 处理灵感点 if "灵感点" in three_points: inspiration = three_points["灵感点"] # 处理全新内容 if "全新内容" in inspiration and isinstance(inspiration["全新内容"], list): for item in inspiration["全新内容"]: point_name = item.get("灵感点", "") point_desc = item.get("描述", "") feature_combo = extract_feature_combination_from_point(item, post_id, point_name, point_desc) if feature_combo: result["灵感点"].append(feature_combo) # 处理共性差异 if "共性差异" in inspiration and isinstance(inspiration["共性差异"], list): for item in inspiration["共性差异"]: point_name = item.get("灵感点", "") point_desc = item.get("描述", "") feature_combo = extract_feature_combination_from_point(item, post_id, point_name, point_desc) if feature_combo: result["灵感点"].append(feature_combo) # 处理共性内容 if "共性内容" in inspiration and isinstance(inspiration["共性内容"], list): for item in inspiration["共性内容"]: point_name = item.get("灵感点", "") point_desc = item.get("描述", "") feature_combo = extract_feature_combination_from_point(item, post_id, point_name, point_desc) if feature_combo: result["灵感点"].append(feature_combo) # 处理目的点 if "目的点" in three_points: purpose = three_points["目的点"] if "purposes" in purpose and isinstance(purpose["purposes"], list): for item in purpose["purposes"]: point_name = item.get("目的点", "") point_desc = item.get("描述", "") feature_combo = extract_feature_combination_from_point(item, post_id, point_name, point_desc) if feature_combo: result["目的点"].append(feature_combo) # 处理关键点 if "关键点" in three_points: key_points = three_points["关键点"] if "key_points" in key_points and isinstance(key_points["key_points"], list): for item in key_points["key_points"]: point_name = item.get("关键点", "") point_desc = item.get("描述", "") feature_combo = extract_feature_combination_from_point(item, post_id, point_name, point_desc) if feature_combo: result["关键点"].append(feature_combo) except Exception as e: print(f"处理文件 {file_path.name} 时出错: {e}") return result def merge_results(all_results: List[Dict]) -> Dict: """ 合并所有文件的提取结果,按特征组合分组 Args: all_results: 所有文件的结果列表 Returns: 合并后的结果 """ merged = { "灵感点": {}, "目的点": {}, "关键点": {} } for result in all_results: for category in ["灵感点", "目的点", "关键点"]: for combo_data in result[category]: # 将特征组合列表转换为tuple作为字典的key(list不可哈希) combo_key = tuple(sorted(combo_data["特征组合"])) if combo_key not in merged[category]: merged[category][combo_key] = [] merged[category][combo_key].append({ "点的名称": combo_data["点的名称"], "点的描述": combo_data["点的描述"], "帖子id": combo_data["帖子id"] }) return merged def convert_to_array_format(merged_dict: Dict, fetch_details: bool = True, time_filter: Optional[str] = None) -> Dict: """ 将字典格式转换为数组格式,并添加帖子详情 Args: merged_dict: 字典格式的结果 fetch_details: 是否获取帖子详情,默认为True time_filter: 时间过滤阈值,只保留发布时间<该时间的帖子,格式为 "YYYY-MM-DD HH:MM:SS" Returns: 数组格式的结果 """ result = { "灵感点": [], "目的点": [], "关键点": [] } # 收集所有需要获取详情的帖子ID post_ids = set() if fetch_details: for category in ["灵感点", "目的点", "关键点"]: for combo_key, sources in merged_dict[category].items(): for source in sources: post_ids.add(source["帖子id"]) # 批量获取帖子详情 print(f"\n正在获取 {len(post_ids)} 个帖子的详情...") post_details = {} for i, post_id in enumerate(post_ids, 1): print(f"[{i}/{len(post_ids)}] 获取帖子 {post_id} 的详情...") detail = get_post_detail(post_id) if detail: post_details[post_id] = detail print(f"成功获取 {len(post_details)} 个帖子详情") # 如果启用时间过滤,过滤帖子(过滤掉发布时间晚于等于阈值的帖子,避免穿越) if time_filter: print(f"\n正在应用时间过滤 (< {time_filter}),避免使用晚于当前帖子的数据...") filtered_post_ids = set() filtered_count = 0 for post_id, detail in post_details.items(): publish_time = detail.get('publish_time', '') if publish_time < time_filter: filtered_post_ids.add(post_id) else: filtered_count += 1 print(f" ⚠️ 过滤掉帖子 {post_id} (发布时间: {publish_time},晚于阈值)") print(f"过滤掉 {filtered_count} 个帖子(穿越),保留 {len(filtered_post_ids)} 个帖子") # 更新post_details,只保留符合时间条件的 post_details = {pid: detail for pid, detail in post_details.items() if pid in filtered_post_ids} # 转换为数组格式并添加帖子详情 for category in ["灵感点", "目的点", "关键点"]: for combo_key, sources in merged_dict[category].items(): # 为每个来源添加帖子详情 enhanced_sources = [] for source in sources: # 如果启用时间过滤,跳过不符合时间条件的帖子 if fetch_details and time_filter and source["帖子id"] not in post_details: continue enhanced_source = source.copy() if fetch_details and source["帖子id"] in post_details: enhanced_source["帖子详情"] = post_details[source["帖子id"]] enhanced_sources.append(enhanced_source) # 只添加有来源的特征组合 if enhanced_sources: result[category].append({ "特征组合": list(combo_key), # 将tuple转回list "特征来源": enhanced_sources }) return result def get_earliest_publish_time(current_posts_dir: Path) -> Optional[str]: """ 获取当前帖子目录中最早的发布时间 Args: current_posts_dir: 当前帖子目录路径 Returns: 最早的发布时间字符串,格式为 "YYYY-MM-DD HH:MM:SS" """ if not current_posts_dir.exists(): print(f"警告: 当前帖子目录不存在: {current_posts_dir}") return None json_files = list(current_posts_dir.glob("*.json")) if not json_files: print(f"警告: 当前帖子目录为空: {current_posts_dir}") return None print(f"\n正在获取当前帖子的发布时间...") print(f"找到 {len(json_files)} 个当前帖子") earliest_time = None for file_path in json_files: post_id = extract_post_id_from_filename(file_path.name) if not post_id: continue try: detail = get_post_detail(post_id) if detail and 'publish_time' in detail: publish_time = detail['publish_time'] if earliest_time is None or publish_time < earliest_time: earliest_time = publish_time print(f" 更新最早时间: {publish_time} (帖子: {post_id})") except Exception as e: print(f" 警告: 获取帖子 {post_id} 发布时间失败: {e}") if earliest_time: print(f"\n当前帖子最早发布时间: {earliest_time}") else: print("\n警告: 未能获取到任何当前帖子的发布时间") return earliest_time def main(): # 输入输出路径(默认使用项目根目录下的 data/data_1118 目录) script_dir = Path(__file__).parent project_root = script_dir.parent.parent data_dir = project_root / "data" / "data_1118" input_dir = data_dir / "过去帖子_what解构结果" current_posts_dir = data_dir / "当前帖子_what解构结果" output_file = data_dir / "特征组合_帖子来源.json" # 获取当前帖子的最早发布时间 earliest_time = get_earliest_publish_time(current_posts_dir) print(f"\n正在扫描目录: {input_dir}") # 获取所有JSON文件 json_files = list(input_dir.glob("*.json")) print(f"找到 {len(json_files)} 个JSON文件") # 处理所有文件 all_results = [] for i, file_path in enumerate(json_files, 1): print(f"处理文件 [{i}/{len(json_files)}]: {file_path.name}") result = process_single_file(file_path) all_results.append(result) # 合并结果 print("\n正在合并结果...") merged_result = merge_results(all_results) # 转换为数组格式(带时间过滤) print("正在转换为数组格式...") final_result = convert_to_array_format(merged_result, fetch_details=True, time_filter=earliest_time) # 统计信息 if earliest_time: print(f"\n提取统计 (已过滤掉发布时间 >= {earliest_time} 的帖子):") else: print(f"\n提取统计:") for category in ["灵感点", "目的点", "关键点"]: combo_count = len(final_result[category]) source_count = sum(len(item["特征来源"]) for item in final_result[category]) print(f" {category}: {combo_count} 个特征组合, {source_count} 个来源") # 保存结果 print(f"\n正在保存结果到: {output_file}") with open(output_file, "w", encoding="utf-8") as f: json.dump(final_result, f, ensure_ascii=False, indent=4) print("完成!") if __name__ == "__main__": main()