#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创作模式分析 V4(完整流程) 整合三步流程: 1. 数据准备:根据帖子图谱 + 人设图谱,提取待分析数据 2. 起点分析:AI分析创意起点(新版prompt) 3. 模式推导:基于共现关系的迭代推导 输入:帖子图谱 + 人设图谱 输出:完整的创作模式分析结果 """ import asyncio import json from pathlib import Path from typing import Dict, List, Optional, Set import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from lib.llm_cached import analyze, LLMConfig, AnalyzeResult from lib.my_trace import set_trace_smith as set_trace from script.data_processing.path_config import PathConfig # ===== 配置 ===== TASK_NAME = "creation_pattern_v4" # 缓存任务名称 OUTPUT_DIR_NAME = "creation_pattern_v4" # 输出目录名称 MATCH_SCORE_THRESHOLD = 0.8 # 匹配分数阈值 GLOBAL_RATIO_THRESHOLD = 0.7 # 全局占比阈值(>=0.7 算常量) ORIGIN_SCORE_THRESHOLD = 0.8 # 起点分数阈值 # ===== 数据加载 ===== def load_json(file_path: Path) -> Dict: """加载JSON文件""" with open(file_path, "r", encoding="utf-8") as f: return json.load(f) def get_post_graph_files(config: PathConfig) -> List[Path]: """获取所有帖子图谱文件""" post_graph_dir = config.intermediate_dir / "post_graph" return sorted(post_graph_dir.glob("*_帖子图谱.json")) # ===== 第一步:数据准备 ===== def extract_post_detail(post_graph: Dict) -> Dict: """提取帖子详情""" meta = post_graph.get("meta", {}) post_detail = meta.get("postDetail", {}) return { "postId": meta.get("postId", ""), "postTitle": meta.get("postTitle", ""), "body_text": post_detail.get("body_text", ""), "images": post_detail.get("images", []), "video": post_detail.get("video"), "publish_time": post_detail.get("publish_time", ""), "like_count": post_detail.get("like_count", 0), "collect_count": post_detail.get("collect_count", 0), } def extract_analysis_nodes(post_graph: Dict, persona_graph: Dict) -> tuple: """ 提取待分析节点列表 待分析节点 = 灵感点 + 目的点 + 关键点 """ nodes = post_graph.get("nodes", {}) edges = post_graph.get("edges", {}) persona_nodes = persona_graph.get("nodes", {}) persona_index = persona_graph.get("index", {}) # 1. 收集关键点信息 keypoints = {} for node_id, node in nodes.items(): if node.get("type") == "标签" and node.get("dimension") == "关键点": keypoints[node_id] = { "名称": node.get("name", ""), "描述": node.get("detail", {}).get("description", ""), } # 2. 分析支撑关系 support_map = {} for edge_id, edge in edges.items(): if edge.get("type") == "支撑": source_id = edge.get("source", "") target_id = edge.get("target", "") if source_id in keypoints: if target_id not in support_map: support_map[target_id] = [] support_map[target_id].append(keypoints[source_id]) # 3. 分析关联关系 relation_map = {} for edge_id, edge in edges.items(): if edge.get("type") == "关联": source_id = edge.get("source", "") target_id = edge.get("target", "") source_name = nodes.get(source_id, {}).get("name", "") target_name = nodes.get(target_id, {}).get("name", "") if source_id not in relation_map: relation_map[source_id] = [] relation_map[source_id].append(target_name) if target_id not in relation_map: relation_map[target_id] = [] relation_map[target_id].append(source_name) # 4. 分析人设匹配 match_map = {} persona_out_edges = persona_index.get("outEdges", {}) def get_node_info(node_id: str) -> Optional[Dict]: """获取人设节点的标准信息""" node = persona_nodes.get(node_id, {}) if not node: return None detail = node.get("detail", {}) parent_path = detail.get("parentPath", []) return { "节点ID": node_id, "节点名称": node.get("name", ""), "节点分类": "/".join(parent_path) if parent_path else "", "节点维度": node.get("dimension", ""), "节点类型": node.get("type", ""), "人设全局占比": detail.get("probGlobal", 0), "父类下占比": detail.get("probToParent", 0), } def get_parent_category_id(node_id: str) -> Optional[str]: """通过属于边获取父分类节点ID""" belong_edges = persona_out_edges.get(node_id, {}).get("属于", []) for edge in belong_edges: target_id = edge.get("target", "") target_node = persona_nodes.get(target_id, {}) if target_node.get("type") == "分类": return target_id return None for edge_id, edge in edges.items(): if edge.get("type") == "匹配": source_id = edge.get("source", "") target_id = edge.get("target", "") if source_id.startswith("帖子:") and target_id.startswith("人设:"): match_score = edge.get("score", 0) persona_node = persona_nodes.get(target_id, {}) if persona_node: node_type = persona_node.get("type", "") match_node_info = get_node_info(target_id) if not match_node_info: continue if node_type == "标签": category_id = get_parent_category_id(target_id) else: category_id = target_id category_info = None if category_id: category_node = persona_nodes.get(category_id, {}) if category_node: category_detail = category_node.get("detail", {}) category_path = category_detail.get("parentPath", []) category_info = { "节点ID": category_id, "节点名称": category_node.get("name", ""), "节点分类": "/".join(category_path) if category_path else "", "节点维度": category_node.get("dimension", ""), "节点类型": "分类", "人设全局占比": category_detail.get("probGlobal", 0), "父类下占比": category_detail.get("probToParent", 0), "历史共现分类": [], } co_occur_edges = persona_out_edges.get(category_id, {}).get("分类共现", []) co_occur_edges_sorted = sorted(co_occur_edges, key=lambda x: x.get("score", 0), reverse=True) for co_edge in co_occur_edges_sorted[:5]: co_target_id = co_edge.get("target", "") co_score = co_edge.get("score", 0) co_node = persona_nodes.get(co_target_id, {}) if co_node: co_detail = co_node.get("detail", {}) co_path = co_detail.get("parentPath", []) category_info["历史共现分类"].append({ "节点ID": co_target_id, "节点名称": co_node.get("name", ""), "节点分类": "/".join(co_path) if co_path else "", "节点维度": co_node.get("dimension", ""), "节点类型": "分类", "人设全局占比": co_detail.get("probGlobal", 0), "父类下占比": co_detail.get("probToParent", 0), "共现度": round(co_score, 4), }) if source_id not in match_map: match_map[source_id] = [] match_map[source_id].append({ "匹配节点": match_node_info, "匹配分数": round(match_score, 4), "所属分类": category_info, }) # 5. 构建待分析节点列表 analysis_nodes = [] for node_id, node in nodes.items(): if node.get("type") == "标签" and node.get("domain") == "帖子": dimension = node.get("dimension", "") if dimension in ["灵感点", "目的点", "关键点"]: match_info = match_map.get(node_id) analysis_nodes.append({ "节点ID": node_id, "节点名称": node.get("name", ""), "节点分类": node.get("category", ""), "节点维度": dimension, "节点类型": node.get("type", ""), "节点描述": node.get("detail", {}).get("description", ""), "人设匹配": match_info, }) # 6. 构建关系列表 relation_list = [] for edge_id, edge in edges.items(): if edge.get("type") == "支撑": source_id = edge.get("source", "") target_id = edge.get("target", "") if source_id in keypoints: relation_list.append({ "来源节点": source_id, "目标节点": target_id, "关系类型": "支撑", }) seen_relations = set() for edge_id, edge in edges.items(): if edge.get("type") == "关联": source_id = edge.get("source", "") target_id = edge.get("target", "") key = tuple(sorted([source_id, target_id])) if key not in seen_relations: seen_relations.add(key) relation_list.append({ "来源节点": source_id, "目标节点": target_id, "关系类型": "关联", }) return analysis_nodes, relation_list def prepare_analysis_data(post_graph: Dict, persona_graph: Dict) -> Dict: """ 准备完整的分析数据 输出扁平化的节点列表 + 独立的人设共现关系数据 """ analysis_nodes, relation_list = extract_analysis_nodes(post_graph, persona_graph) # 扁平化节点,提取人设共现关系数据 flat_nodes = [] persona_co_occur = {} # {分类ID: {名称, 共现分类列表}} for node in analysis_nodes: # 基础节点字段 flat_node = { "节点ID": node["节点ID"], "节点名称": node["节点名称"], "节点分类": node.get("节点分类", ""), "节点维度": node["节点维度"], "节点描述": node.get("节点描述", ""), "是否已知": False, "发现编号": None, } # 提取人设匹配信息(list格式,支持多个匹配) match_list = node.get("人设匹配") or [] if match_list: flat_node["人设匹配"] = [] for match_info in match_list: category_info = match_info.get("所属分类") category_id = category_info.get("节点ID") if category_info else None # 保留完整的匹配信息,但去掉历史共现分类(拆到外面) clean_match = { "匹配节点": match_info.get("匹配节点"), "匹配分数": match_info.get("匹配分数", 0), } if category_info: # 复制所属分类,但不包含历史共现分类 clean_category = {k: v for k, v in category_info.items() if k != "历史共现分类"} clean_match["所属分类"] = clean_category flat_node["人设匹配"].append(clean_match) # 收集人设共现关系(去重)- 从历史共现分类拆出来 if category_id and category_id not in persona_co_occur: co_occur_list = category_info.get("历史共现分类", []) if co_occur_list: persona_co_occur[category_id] = [ { "节点ID": c.get("节点ID"), "节点名称": c.get("节点名称"), "节点分类": c.get("节点分类", ""), "节点维度": c.get("节点维度", ""), "节点类型": c.get("节点类型", ""), "人设全局占比": c.get("人设全局占比", 0), "父类下占比": c.get("父类下占比", 0), "共现度": c.get("共现度", 0), } for c in co_occur_list if c.get("节点ID") ] else: flat_node["人设匹配"] = [] flat_nodes.append(flat_node) return { "帖子详情": extract_post_detail(post_graph), "节点列表": flat_nodes, "关系列表": relation_list, "人设共现关系": persona_co_occur, } # ===== 第二步:起点分析(新版prompt) ===== def get_best_match(node: Dict) -> Optional[Dict]: """获取节点的最佳人设匹配(分数最高的)""" match_list = node.get("人设匹配") or [] if not match_list: return None return max(match_list, key=lambda m: m.get("匹配分数", 0)) def get_match_score(node: Dict) -> float: """获取节点的最高人设匹配分数""" best_match = get_best_match(node) if best_match: return best_match.get("匹配分数", 0) return 0 def get_category_id(node: Dict) -> Optional[str]: """获取节点的所属分类ID(最佳匹配的)""" best_match = get_best_match(node) if best_match: category = best_match.get("所属分类") if category: return category.get("节点ID") return None def get_all_category_ids(node: Dict) -> List[str]: """获取节点所有匹配的分类ID""" match_list = node.get("人设匹配") or [] result = [] for m in match_list: category = m.get("所属分类") if category and category.get("节点ID"): result.append(category.get("节点ID")) return result def get_category_global_ratio(node: Dict) -> float: """获取节点所属分类的人设全局占比(最佳匹配的)""" best_match = get_best_match(node) if best_match: category = best_match.get("所属分类") if category: return category.get("人设全局占比", 0) return 0 def is_persona_constant(node: Dict) -> bool: """判断节点是否为人设常量(匹配分数 >= 0.8 且 分类全局占比 >= 0.8)""" match_score = get_match_score(node) global_ratio = get_category_global_ratio(node) return match_score >= MATCH_SCORE_THRESHOLD and global_ratio >= GLOBAL_RATIO_THRESHOLD def build_origin_context(nodes: List[Dict]) -> Dict: """构造AI分析的上下文(新版格式)""" # 所有创意标签 all_tags = [] for node in nodes: all_tags.append({ "名称": node["节点名称"], "人设匹配度": round(get_match_score(node), 2), "所属分类全局占比": round(get_category_global_ratio(node), 2), }) # 起点候选集(灵感点 + 目的点) candidates = [ node["节点名称"] for node in nodes if node["节点维度"] in ["灵感点", "目的点"] ] return { "all_tags": all_tags, "candidates": candidates, } def format_origin_prompt(context: Dict) -> str: """格式化起点分析的prompt(新版)""" all_tags = context["all_tags"] candidates = context["candidates"] # 创意标签列表 tags_text = "" for tag in all_tags: tags_text += f"- {tag['名称']}\n" tags_text += f" 人设匹配度: {tag['人设匹配度']} | 所属分类全局占比: {tag['所属分类全局占比']}\n\n" # 起点候选集(一行) candidates_text = "、".join(candidates) prompt = f"""# Role 你是小红书爆款内容的"逆向工程"专家。你的核心能力是透过内容的表象,还原创作者最初的脑回路。 # Task 我提供一组笔记的【创意标签】和一个【起点候选集】。 请推理出哪些选项是真正的**创意起点**。 # Input Data ## 创意标签 {tags_text} ## 起点候选集 {candidates_text} # 推理约束 - 无法被其他项或人设推理出的点,即为起点(推理关系局限在起点候选集中) - 包含/被包含关系代表一种顺序:由大节点推导出被包含节点 - 目的推理手段 - 实质推理形式 - 和人设匹配度越低的帖子是起点概率越大,证明这个起点具备外部性 # Output Format 请输出一个标准的 JSON 格式。 - Key: 候选集中的词。 - Value: 一个对象,包含: - `score`: 0.0 到 1.0 的浮点数(代表是起点的可能性)。 - `analysis`: 一句话推理""" return prompt async def analyze_origin(nodes: List[Dict], force_llm: bool = False) -> Dict: """ 执行起点分析 输入: 节点列表 输出: 节点列表(加了起点分析、是否已知、发现编号字段)+ 中间结果 """ context = build_origin_context(nodes) prompt = format_origin_prompt(context) print(f"\n 起点候选: {len(context['candidates'])} 个") result = await analyze( prompt=prompt, task_name=f"{TASK_NAME}/origin", force=force_llm, parse_json=True, ) # 把分析结果合并到节点 llm_result = result.data or {} output_nodes = [] # 同一个步骤出来的节点使用相同的发现编号 step_order = 1 # 起点分析步骤的编号 for node in nodes: new_node = dict(node) # 复制原节点 name = node["节点名称"] if name in llm_result: score = llm_result[name].get("score", 0) analysis = llm_result[name].get("analysis", "") # 加起点分析 new_node["起点分析"] = { "分数": score, "说明": analysis, } # 高分起点标记为已知(同一步骤的节点使用相同编号) if score >= ORIGIN_SCORE_THRESHOLD: new_node["是否已知"] = True new_node["发现编号"] = step_order else: new_node["起点分析"] = None output_nodes.append(new_node) return { "输入上下文": { "创意标签": context["all_tags"], "起点候选": context["candidates"], }, "中间结果": llm_result, "输出节点": output_nodes, "cache_hit": result.cache_hit, "model": result.model_name, "log_url": result.log_url, } # ===== 第三步:模式推导 ===== def derive_patterns( nodes: List[Dict], persona_co_occur: Dict[str, Dict], ) -> Dict: """ 基于共现关系的迭代推导 输入: 带起点分析的节点列表 + 人设共现关系数据 输出: 节点列表(加了推导轮次、未知原因字段)+ 推导边列表 """ node_by_name: Dict[str, Dict] = {n["节点名称"]: n for n in nodes} # 构建共现查找表 {节点ID: {共现节点ID: 共现度}} co_occur_lookup = {} for cat_id, co_occur_list in persona_co_occur.items(): co_occur_lookup[cat_id] = { c["节点ID"]: c["共现度"] for c in co_occur_list } # 1. 初始化已知点集合(已经是已知的节点) known_names: Set[str] = set() node_round: Dict[str, int] = {} # {节点名称: 加入轮次} for node in nodes: if node.get("是否已知"): known_names.add(node["节点名称"]) node_round[node["节点名称"]] = 0 unknown_names: Set[str] = set(node_by_name.keys()) - known_names edges: List[Dict] = [] # 2. 迭代推导 round_num = 0 new_known_this_round = known_names.copy() while new_known_this_round: round_num += 1 new_known_next_round: Set[str] = set() for known_name in new_known_this_round: known_node = node_by_name.get(known_name) if not known_node: continue if get_match_score(known_node) < MATCH_SCORE_THRESHOLD: continue # 获取该节点所属分类的共现列表 known_cat_id = get_category_id(known_node) if not known_cat_id or known_cat_id not in co_occur_lookup: continue co_occur_map = co_occur_lookup[known_cat_id] for unknown_name in list(unknown_names): unknown_node = node_by_name.get(unknown_name) if not unknown_node: continue if get_match_score(unknown_node) < MATCH_SCORE_THRESHOLD: continue # 检查未知节点的分类是否在已知节点的共现列表中 unknown_cat_id = get_category_id(unknown_node) if unknown_cat_id and unknown_cat_id in co_occur_map: co_occur_score = co_occur_map[unknown_cat_id] new_known_next_round.add(unknown_name) node_round[unknown_name] = round_num edges.append({ "来源": known_node["节点ID"], "目标": unknown_node["节点ID"], "关系类型": "共现推导", "推导轮次": round_num, "共现分类ID": unknown_cat_id, "共现度": co_occur_score, }) known_names.update(new_known_next_round) unknown_names -= new_known_next_round new_known_this_round = new_known_next_round if not new_known_next_round: break # 3. 构建输出节点(只更新是否已知、发现编号) # 先找出当前最大发现编号 max_order = 0 for node in nodes: if node.get("发现编号") and node["发现编号"] > max_order: max_order = node["发现编号"] # 按推导轮次排序新发现的节点,分配发现编号 new_known_by_round = {} for name, r in node_round.items(): if r > 0: # 排除起点(轮次0) if r not in new_known_by_round: new_known_by_round[r] = [] new_known_by_round[r].append(name) # 分配发现编号(同一轮次的节点使用相同编号) order_map = {} for r in sorted(new_known_by_round.keys()): step_order = max_order + r # 同一轮次使用相同编号 for name in new_known_by_round[r]: order_map[name] = step_order output_nodes = [] for node in nodes: new_node = dict(node) name = node["节点名称"] # 如果是新推导出来的(非起点),更新已知状态和发现编号 if name in node_round and node_round[name] > 0: new_node["是否已知"] = True new_node["发现编号"] = order_map.get(name) output_nodes.append(new_node) return { "输出节点": output_nodes, "推导边列表": edges, "推导轮次": round_num, } # ===== 第四步:下一步分析 ===== def build_next_step_context(known_nodes: List[Dict], unknown_nodes: List[Dict], all_nodes: List[Dict]) -> Dict: """构造下一步分析的上下文(简化版)""" # 已知点信息(按发现顺序排序,只保留名称和维度) known_sorted = sorted(known_nodes, key=lambda n: n.get("发现编号") or 999) known_info = [ {"名称": n["节点名称"], "维度": n["节点维度"]} for n in known_sorted ] # 未知点信息(只保留名称和维度) unknown_info = [ {"名称": n["节点名称"], "维度": n["节点维度"]} for n in unknown_nodes ] return { "known_nodes": known_info, "unknown_nodes": unknown_info, } def format_next_step_prompt(context: Dict) -> str: """格式化下一步分析的prompt(简化版)""" # 已知点:- 名称 (维度) known_text = "\n".join([ f"- {n['名称']} ({n['维度']})" for n in context["known_nodes"] ]) # 未知点:- 名称 (维度) unknown_text = "\n".join([ f"- {n['名称']} ({n['维度']})" for n in context["unknown_nodes"] ]) prompt = f"""# Role 你是小红书爆款内容的"逆向工程"专家。你的任务是还原创作者的思维路径。 # Task 基于已知的创意点,推理哪些未知点最可能是创作者**下一步直接想到**的点。 可以有多个点同时被想到(如果它们在逻辑上是并列的)。 ## 已知点 {known_text} ## 未知点(待推理) {unknown_text} # 推理约束 - 创作者的思维是有逻辑的:先有实质,再想形式 - 包含/被包含关系代表一种顺序:由大节点推导出被包含节点 - 只输出"下一步直接能想到"的点,不是所有未知点 # Output Format 输出 JSON,对每个未知点评分: - Key: 未知点名称 - Value: 对象,包含: - `score`: 0.0-1.0(下一步被想到的可能性) - `from`: 从哪个已知点推导出来(已知点名称),数组 - `reason`: 如何从该已知点推导出来(一句话)""" return prompt async def analyze_next_step( nodes: List[Dict], force_llm: bool = False ) -> Dict: """ 执行下一步分析 输入: 节点列表(有已知和未知) 输出: 最可能的下一步点列表 """ # 分离已知和未知 known_nodes = [n for n in nodes if n.get("是否已知")] unknown_nodes = [n for n in nodes if not n.get("是否已知")] if not unknown_nodes: return { "输入上下文": {"已知点": [], "未知点": []}, "中间结果": [], "下一步点": [], } context = build_next_step_context(known_nodes, unknown_nodes, nodes) prompt = format_next_step_prompt(context) print(f"\n 已知点: {len(known_nodes)} 个") print(f" 未知点: {len(unknown_nodes)} 个") result = await analyze( prompt=prompt, task_name=f"{TASK_NAME}/next_step", force=force_llm, parse_json=True, ) # 解析结果(现在是 {name: {score, from, reason}} 格式) llm_result = result.data or {} # 构建候选列表,按分数排序 candidates = [] for name, info in llm_result.items(): # from 现在是数组 from_list = info.get("from", []) if isinstance(from_list, str): from_list = [from_list] # 兼容旧格式 candidates.append({ "节点名称": name, "可能性分数": info.get("score", 0), "推导来源": from_list, "推理说明": info.get("reason", ""), }) candidates.sort(key=lambda x: x["可能性分数"], reverse=True) return { "输入上下文": { "已知点": context["known_nodes"], "未知点": context["unknown_nodes"], }, "中间结果": llm_result, "下一步候选": candidates, "cache_hit": result.cache_hit, "model": result.model_name, "log_url": result.log_url, } # ===== 完整流程 ===== def save_result(post_id: str, post_detail: Dict, steps: List, config: PathConfig) -> Path: """保存结果到文件""" output_dir = config.intermediate_dir / OUTPUT_DIR_NAME output_dir.mkdir(parents=True, exist_ok=True) output_file = output_dir / f"{post_id}_创作模式.json" result = { "帖子详情": post_detail, "步骤列表": steps, } with open(output_file, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=2) print(f" [已保存] {output_file.name}") return output_file async def process_single_post( post_file: Path, persona_graph: Dict, config: PathConfig, force_llm: bool = False, max_step: int = 3, ) -> Dict: """ 处理单个帖子 Args: force_llm: 强制重新调用LLM(跳过LLM缓存) max_step: 最多运行到第几步 (1=数据准备, 2=起点分析, 3=模式推导) """ post_graph = load_json(post_file) post_id = post_graph.get("meta", {}).get("postId", "unknown") print(f"\n{'=' * 60}") print(f"处理帖子: {post_id}") print("-" * 60) steps = [] # ===== 步骤1:数据准备 ===== print("\n[步骤1] 数据准备...") data = prepare_analysis_data(post_graph, persona_graph) post_detail = data["帖子详情"] nodes_step1 = data["节点列表"] relations_step1 = data["关系列表"] persona_co_occur = data["人设共现关系"] # 步骤1所有节点都是新的 new_known_step1 = [n["节点名称"] for n in nodes_step1 if n.get("是否已知")] step1 = { "步骤": "数据准备", "输入": { "帖子图谱": str(post_file.name), "人设图谱": "人设图谱.json", }, "输出": { "新的已知节点": new_known_step1, "新的边": [], "节点列表": nodes_step1, "边列表": relations_step1, }, "人设共现关系": persona_co_occur, "摘要": { "节点数": len(nodes_step1), "边数": len(relations_step1), "人设共现数": len(persona_co_occur), }, } steps.append(step1) print(f" 节点数: {len(nodes_step1)}") print(f" 关系数: {len(relations_step1)}") print(f" 人设共现数: {len(persona_co_occur)}") # 步骤1完成,保存 save_result(post_id, post_detail, steps, config) if max_step == 1: return {"帖子详情": post_detail, "步骤列表": steps} # ===== 步骤2:起点分析 ===== print("\n[步骤2] 起点分析...") origin_result = await analyze_origin(nodes_step1, force_llm=force_llm) nodes_step2 = origin_result["输出节点"] # 统计高分起点 def get_origin_score(node): analysis = node.get("起点分析") if analysis: return analysis.get("分数", 0) return 0 high_score_origins = [ (n["节点名称"], get_origin_score(n)) for n in nodes_step2 if get_origin_score(n) >= 0.7 ] # 新发现的已知节点(起点) new_known_nodes = [n["节点名称"] for n in nodes_step2 if n.get("是否已知")] step2 = { "步骤": "起点分析", "输入": { "节点列表": nodes_step1, "创意标签": origin_result["输入上下文"]["创意标签"], "起点候选": origin_result["输入上下文"]["起点候选"], }, "中间结果": origin_result["中间结果"], "输出": { "新的已知节点": new_known_nodes, "新的边": [], "节点列表": nodes_step2, "边列表": relations_step1, # 边没变化 }, "摘要": { "新已知数": len(new_known_nodes), "model": origin_result["model"], "cache_hit": origin_result["cache_hit"], "log_url": origin_result.get("log_url"), }, } steps.append(step2) print(f" 高分起点 (>=0.7): {len(high_score_origins)} 个") for name, score in sorted(high_score_origins, key=lambda x: -x[1]): print(f" ★ {name}: {score:.2f}") # 步骤2完成,保存 save_result(post_id, post_detail, steps, config) if max_step == 2: return {"帖子详情": post_detail, "步骤列表": steps} # ===== 步骤3:模式推导 ===== print("\n[步骤3] 模式推导...") derivation_result = derive_patterns(nodes_step2, persona_co_occur) nodes_step3 = derivation_result["输出节点"] edges = derivation_result["推导边列表"] # 统计 known_count = sum(1 for n in nodes_step3 if n.get("是否已知")) unknown_count = len(nodes_step3) - known_count # 新发现的已知节点(本步骤推导出来的,不包括之前的起点) prev_known = {n["节点名称"] for n in nodes_step2 if n.get("是否已知")} new_known_nodes = [n["节点名称"] for n in nodes_step3 if n.get("是否已知") and n["节点名称"] not in prev_known] # 合并边列表(原有边 + 推导边) all_edges = relations_step1 + edges step3 = { "步骤": "模式推导", "输入": { "节点列表": nodes_step2, "人设共现关系": persona_co_occur, }, "输出": { "新的已知节点": new_known_nodes, "新的边": edges, "节点列表": nodes_step3, "边列表": all_edges, }, "摘要": { "已知点数": known_count, "新已知数": len(new_known_nodes), "新边数": len(edges), "未知点数": unknown_count, }, } steps.append(step3) print(f" 已知点: {known_count} 个") print(f" 推导边: {len(edges)} 条") print(f" 未知点: {unknown_count} 个") # 步骤3完成,保存 save_result(post_id, post_detail, steps, config) if max_step == 3: return {"帖子详情": post_detail, "步骤列表": steps} # ===== 步骤4:下一步分析 ===== print("\n[步骤4] 下一步分析...") next_step_result = await analyze_next_step(nodes_step3, force_llm=force_llm) # 获取候选列表 candidates = next_step_result["下一步候选"] # 筛选高分候选 (>= 0.8) NEXT_STEP_THRESHOLD = 0.8 high_score_candidates = [c for c in candidates if c["可能性分数"] >= NEXT_STEP_THRESHOLD] # 构建节点名称到节点的映射 node_by_name = {n["节点名称"]: n for n in nodes_step3} # 找出当前最大发现编号 max_order = max((n.get("发现编号") or 0) for n in nodes_step3) # 更新节点:把高分候选标记为已知(同一步骤的节点使用相同编号) nodes_step4 = [] new_known_names = [] step_order = max_order + 1 # 同一步骤的节点使用相同编号 for node in nodes_step3: new_node = dict(node) name = node["节点名称"] # 检查是否在高分候选中 matching = [c for c in high_score_candidates if c["节点名称"] == name] if matching and not node.get("是否已知"): new_node["是否已知"] = True new_node["发现编号"] = step_order # 同一步骤使用相同编号 new_known_names.append(name) nodes_step4.append(new_node) # 创建新的边(推导边,from 是数组,为每个来源创建一条边) new_edges = [] for c in high_score_candidates: target_node = node_by_name.get(c["节点名称"]) if not target_node: continue for source_name in c["推导来源"]: source_node = node_by_name.get(source_name) if source_node: new_edges.append({ "来源": source_node["节点ID"], "目标": target_node["节点ID"], "关系类型": "AI推导", "可能性分数": c["可能性分数"], "推理说明": c["推理说明"], }) # 合并边列表 all_edges_step4 = all_edges + new_edges step4 = { "步骤": "下一步分析", "输入": { "已知点": next_step_result["输入上下文"]["已知点"], "未知点": next_step_result["输入上下文"]["未知点"], }, "中间结果": next_step_result["中间结果"], "输出": { "新的已知节点": new_known_names, "新的边": new_edges, "节点列表": nodes_step4, "边列表": all_edges_step4, }, "摘要": { "已知点数": sum(1 for n in nodes_step4 if n.get("是否已知")), "新已知数": len(new_known_names), "新边数": len(new_edges), "未知点数": sum(1 for n in nodes_step4 if not n.get("是否已知")), "model": next_step_result.get("model"), "cache_hit": next_step_result.get("cache_hit"), "log_url": next_step_result.get("log_url"), }, } steps.append(step4) # 打印高分候选 print(f" 候选数: {len(candidates)} 个") print(f" 高分候选 (>={NEXT_STEP_THRESHOLD}): {len(high_score_candidates)} 个") for c in high_score_candidates: from_str = " & ".join(c["推导来源"]) print(f" ★ {c['节点名称']} ({c['可能性分数']:.2f}) ← {from_str}") print(f" {c['推理说明']}") # 步骤4完成,保存 save_result(post_id, post_detail, steps, config) if max_step == 4: return {"帖子详情": post_detail, "步骤列表": steps} # ===== 循环:步骤3→步骤4 直到全部已知 ===== iteration = 1 current_nodes = nodes_step4 current_edges = all_edges_step4 MAX_ITERATIONS = 10 # 防止无限循环 while True: # 检查是否还有未知节点 unknown_count = sum(1 for n in current_nodes if not n.get("是否已知")) if unknown_count == 0: print(f"\n[完成] 所有节点已变为已知") break if iteration > MAX_ITERATIONS: print(f"\n[警告] 达到最大迭代次数 {MAX_ITERATIONS},停止循环") break # ===== 迭代步骤3:共现推导 ===== print(f"\n[迭代{iteration}-步骤3] 模式推导...") derivation_result = derive_patterns(current_nodes, persona_co_occur) nodes_iter3 = derivation_result["输出节点"] edges_iter3 = derivation_result["推导边列表"] # 统计新推导的 prev_known_names = {n["节点名称"] for n in current_nodes if n.get("是否已知")} new_known_step3 = [n["节点名称"] for n in nodes_iter3 if n.get("是否已知") and n["节点名称"] not in prev_known_names] new_edges_step3 = edges_iter3 # derive_patterns 返回的是本轮新增的边 all_edges_iter3 = current_edges + new_edges_step3 step_iter3 = { "步骤": f"迭代{iteration}-模式推导", "输入": { "节点列表": current_nodes, "人设共现关系": persona_co_occur, }, "输出": { "新的已知节点": new_known_step3, "新的边": new_edges_step3, "节点列表": nodes_iter3, "边列表": all_edges_iter3, }, "摘要": { "已知点数": sum(1 for n in nodes_iter3 if n.get("是否已知")), "新已知数": len(new_known_step3), "新边数": len(new_edges_step3), "未知点数": sum(1 for n in nodes_iter3 if not n.get("是否已知")), }, } steps.append(step_iter3) print(f" 新已知: {len(new_known_step3)} 个") print(f" 新边: {len(new_edges_step3)} 条") save_result(post_id, post_detail, steps, config) # 检查是否还有未知 unknown_after_step3 = sum(1 for n in nodes_iter3 if not n.get("是否已知")) if unknown_after_step3 == 0: print(f"\n[完成] 所有节点已变为已知") break # ===== 迭代步骤4:AI推导 ===== print(f"\n[迭代{iteration}-步骤4] 下一步分析...") next_step_result = await analyze_next_step(nodes_iter3, force_llm=force_llm) candidates_iter4 = next_step_result["下一步候选"] high_score_iter4 = [c for c in candidates_iter4 if c["可能性分数"] >= NEXT_STEP_THRESHOLD] # 更新节点(同一步骤的节点使用相同编号) node_by_name_iter4 = {n["节点名称"]: n for n in nodes_iter3} max_order_iter4 = max((n.get("发现编号") or 0) for n in nodes_iter3) nodes_iter4 = [] new_known_iter4 = [] step_order_iter4 = max_order_iter4 + 1 # 同一步骤的节点使用相同编号 for node in nodes_iter3: new_node = dict(node) name = node["节点名称"] matching = [c for c in high_score_iter4 if c["节点名称"] == name] if matching and not node.get("是否已知"): new_node["是否已知"] = True new_node["发现编号"] = step_order_iter4 # 同一步骤使用相同编号 new_known_iter4.append(name) nodes_iter4.append(new_node) # 创建新边(from 是数组,为每个来源创建一条边) new_edges_iter4 = [] for c in high_score_iter4: target_node = node_by_name_iter4.get(c["节点名称"]) if not target_node: continue for source_name in c["推导来源"]: source_node = node_by_name_iter4.get(source_name) if source_node: new_edges_iter4.append({ "来源": source_node["节点ID"], "目标": target_node["节点ID"], "关系类型": "AI推导", "可能性分数": c["可能性分数"], "推理说明": c["推理说明"], }) all_edges_iter4 = all_edges_iter3 + new_edges_iter4 step_iter4 = { "步骤": f"迭代{iteration}-下一步分析", "输入": { "已知点": next_step_result["输入上下文"]["已知点"], "未知点": next_step_result["输入上下文"]["未知点"], }, "中间结果": next_step_result["中间结果"], "输出": { "新的已知节点": new_known_iter4, "新的边": new_edges_iter4, "节点列表": nodes_iter4, "边列表": all_edges_iter4, }, "摘要": { "已知点数": sum(1 for n in nodes_iter4 if n.get("是否已知")), "新已知数": len(new_known_iter4), "新边数": len(new_edges_iter4), "未知点数": sum(1 for n in nodes_iter4 if not n.get("是否已知")), "model": next_step_result.get("model"), "cache_hit": next_step_result.get("cache_hit"), }, } steps.append(step_iter4) print(f" 新已知: {len(new_known_iter4)} 个") print(f" 新边: {len(new_edges_iter4)} 条") save_result(post_id, post_detail, steps, config) # 如果这轮没有新进展,停止 if len(new_known_step3) == 0 and len(new_known_iter4) == 0: print(f"\n[停止] 本轮无新进展,停止循环") break # 更新状态,进入下一轮 current_nodes = nodes_iter4 current_edges = all_edges_iter4 iteration += 1 return {"帖子详情": post_detail, "步骤列表": steps} # ===== 主函数 ===== async def main( post_id: str = None, all_posts: bool = False, force_llm: bool = False, max_step: int = 3, ): """主函数""" _, log_url = set_trace() config = PathConfig() print(f"账号: {config.account_name}") print(f"Trace URL: {log_url}") print(f"输出目录: {OUTPUT_DIR_NAME}") # 加载人设图谱 persona_graph_file = config.intermediate_dir / "人设图谱.json" if not persona_graph_file.exists(): print(f"错误: 人设图谱文件不存在: {persona_graph_file}") return persona_graph = load_json(persona_graph_file) print(f"人设图谱节点数: {len(persona_graph.get('nodes', {}))}") # 获取帖子图谱文件 post_graph_files = get_post_graph_files(config) if not post_graph_files: print("错误: 没有找到帖子图谱文件") return # 确定要处理的帖子 if post_id: target_file = next( (f for f in post_graph_files if post_id in f.name), None ) if not target_file: print(f"错误: 未找到帖子 {post_id}") return files_to_process = [target_file] elif all_posts: files_to_process = post_graph_files else: files_to_process = [post_graph_files[0]] print(f"待处理帖子数: {len(files_to_process)}") # 处理 results = [] for i, post_file in enumerate(files_to_process, 1): print(f"\n{'#' * 60}") print(f"# 处理帖子 {i}/{len(files_to_process)}") print(f"{'#' * 60}") result = await process_single_post( post_file=post_file, persona_graph=persona_graph, config=config, force_llm=force_llm, max_step=max_step, ) results.append(result) # 汇总 print(f"\n{'#' * 60}") print(f"# 完成! 共处理 {len(results)} 个帖子") print(f"{'#' * 60}") print(f"Trace: {log_url}") print("\n汇总:") for result in results: post_id = result["帖子详情"]["postId"] steps = result.get("步骤列表", []) num_steps = len(steps) if num_steps == 1: step1_summary = steps[0].get("摘要", {}) print(f" {post_id}: 节点数={step1_summary.get('节点数', 0)} (仅数据准备)") elif num_steps == 2: step2_summary = steps[1].get("摘要", {}) print(f" {post_id}: 起点={step2_summary.get('新已知数', 0)} (未推导)") elif num_steps == 3: step3_summary = steps[2].get("摘要", {}) print(f" {post_id}: 已知={step3_summary.get('已知点数', 0)}, " f"未知={step3_summary.get('未知点数', 0)}") elif num_steps >= 4: step4_summary = steps[3].get("摘要", {}) print(f" {post_id}: 已知={step4_summary.get('已知点数', 0)}, " f"新已知={step4_summary.get('新已知数', 0)}, " f"新边={step4_summary.get('新边数', 0)}, " f"未知={step4_summary.get('未知点数', 0)}") else: print(f" {post_id}: 无步骤数据") if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="创作模式分析 V4") parser.add_argument("--post-id", type=str, help="帖子ID") parser.add_argument("--all-posts", action="store_true", help="处理所有帖子") parser.add_argument("--force-llm", action="store_true", help="强制重新调用LLM(跳过LLM缓存)") parser.add_argument("--step", type=int, default=5, choices=[1, 2, 3, 4, 5], help="运行到第几步 (1=数据准备, 2=起点分析, 3=模式推导, 4=下一步分析, 5=完整循环)") args = parser.parse_args() asyncio.run(main( post_id=args.post_id, all_posts=args.all_posts, force_llm=args.force_llm, max_step=args.step, ))