| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 创作模式分析(完整流程)
- 整合三步流程:
- 1. 数据准备:根据帖子图谱 + 人设图谱,提取待分析数据
- 2. 起点分析:AI分析创意起点
- 3. 模式推导:基于共现关系的迭代推导
- 输入:帖子图谱 + 人设图谱
- 输出:完整的创作模式分析结果
- """
- import asyncio
- import json
- from pathlib import Path
- from typing import Dict, List, Optional, Set
- import sys
- # 添加项目根目录到路径
- project_root = Path(__file__).parent.parent.parent
- sys.path.insert(0, str(project_root))
- from lib.llm_cached import analyze, LLMConfig, AnalyzeResult
- from lib.my_trace import set_trace_smith as set_trace
- from script.data_processing.path_config import PathConfig
- # ===== 配置 =====
- TASK_NAME = "creation_pattern" # 缓存任务名称
- MATCH_SCORE_THRESHOLD = 0.8 # 匹配分数阈值
- GLOBAL_RATIO_THRESHOLD = 0.8 # 全局占比阈值
- ORIGIN_SCORE_THRESHOLD = 0.8 # 起点分数阈值
- # ===== 数据加载 =====
- def load_json(file_path: Path) -> Dict:
- """加载JSON文件"""
- with open(file_path, "r", encoding="utf-8") as f:
- return json.load(f)
- def get_post_graph_files(config: PathConfig) -> List[Path]:
- """获取所有帖子图谱文件"""
- post_graph_dir = config.intermediate_dir / "post_graph"
- return sorted(post_graph_dir.glob("*_帖子图谱.json"))
- # ===== 第一步:数据准备 =====
- def extract_post_detail(post_graph: Dict) -> Dict:
- """提取帖子详情"""
- meta = post_graph.get("meta", {})
- post_detail = meta.get("postDetail", {})
- return {
- "postId": meta.get("postId", ""),
- "postTitle": meta.get("postTitle", ""),
- "body_text": post_detail.get("body_text", ""),
- "images": post_detail.get("images", []),
- "video": post_detail.get("video"),
- "publish_time": post_detail.get("publish_time", ""),
- "like_count": post_detail.get("like_count", 0),
- "collect_count": post_detail.get("collect_count", 0),
- }
- def extract_analysis_nodes(post_graph: Dict, persona_graph: Dict) -> tuple:
- """
- 提取待分析节点列表
- 待分析节点 = 灵感点 + 目的点 + 关键点
- """
- nodes = post_graph.get("nodes", {})
- edges = post_graph.get("edges", {})
- persona_nodes = persona_graph.get("nodes", {})
- persona_index = persona_graph.get("index", {})
- # 1. 收集关键点信息
- keypoints = {}
- for node_id, node in nodes.items():
- if node.get("type") == "标签" and node.get("dimension") == "关键点":
- keypoints[node_id] = {
- "名称": node.get("name", ""),
- "描述": node.get("detail", {}).get("description", ""),
- }
- # 2. 分析支撑关系
- support_map = {}
- for edge_id, edge in edges.items():
- if edge.get("type") == "支撑":
- source_id = edge.get("source", "")
- target_id = edge.get("target", "")
- if source_id in keypoints:
- if target_id not in support_map:
- support_map[target_id] = []
- support_map[target_id].append(keypoints[source_id])
- # 3. 分析关联关系
- relation_map = {}
- for edge_id, edge in edges.items():
- if edge.get("type") == "关联":
- source_id = edge.get("source", "")
- target_id = edge.get("target", "")
- source_name = nodes.get(source_id, {}).get("name", "")
- target_name = nodes.get(target_id, {}).get("name", "")
- if source_id not in relation_map:
- relation_map[source_id] = []
- relation_map[source_id].append(target_name)
- if target_id not in relation_map:
- relation_map[target_id] = []
- relation_map[target_id].append(source_name)
- # 4. 分析人设匹配
- match_map = {}
- persona_out_edges = persona_index.get("outEdges", {})
- def get_node_info(node_id: str) -> Optional[Dict]:
- """获取人设节点的标准信息"""
- node = persona_nodes.get(node_id, {})
- if not node:
- return None
- detail = node.get("detail", {})
- parent_path = detail.get("parentPath", [])
- return {
- "节点ID": node_id,
- "节点名称": node.get("name", ""),
- "节点分类": "/".join(parent_path) if parent_path else "",
- "节点维度": node.get("dimension", ""),
- "节点类型": node.get("type", ""),
- "人设全局占比": detail.get("probGlobal", 0),
- "父类下占比": detail.get("probToParent", 0),
- }
- def get_parent_category_id(node_id: str) -> Optional[str]:
- """通过属于边获取父分类节点ID"""
- belong_edges = persona_out_edges.get(node_id, {}).get("属于", [])
- for edge in belong_edges:
- target_id = edge.get("target", "")
- target_node = persona_nodes.get(target_id, {})
- if target_node.get("type") == "分类":
- return target_id
- return None
- for edge_id, edge in edges.items():
- if edge.get("type") == "匹配":
- source_id = edge.get("source", "")
- target_id = edge.get("target", "")
- if source_id.startswith("帖子:") and target_id.startswith("人设:"):
- match_score = edge.get("score", 0)
- persona_node = persona_nodes.get(target_id, {})
- if persona_node:
- node_type = persona_node.get("type", "")
- match_node_info = get_node_info(target_id)
- if not match_node_info:
- continue
- if node_type == "标签":
- category_id = get_parent_category_id(target_id)
- else:
- category_id = target_id
- category_info = None
- if category_id:
- category_node = persona_nodes.get(category_id, {})
- if category_node:
- category_detail = category_node.get("detail", {})
- category_path = category_detail.get("parentPath", [])
- category_info = {
- "节点ID": category_id,
- "节点名称": category_node.get("name", ""),
- "节点分类": "/".join(category_path) if category_path else "",
- "节点维度": category_node.get("dimension", ""),
- "节点类型": "分类",
- "人设全局占比": category_detail.get("probGlobal", 0),
- "父类下占比": category_detail.get("probToParent", 0),
- "历史共现分类": [],
- }
- co_occur_edges = persona_out_edges.get(category_id, {}).get("分类共现", [])
- co_occur_edges_sorted = sorted(co_occur_edges, key=lambda x: x.get("score", 0), reverse=True)
- for co_edge in co_occur_edges_sorted[:5]:
- co_target_id = co_edge.get("target", "")
- co_score = co_edge.get("score", 0)
- co_node = persona_nodes.get(co_target_id, {})
- if co_node:
- co_detail = co_node.get("detail", {})
- co_path = co_detail.get("parentPath", [])
- category_info["历史共现分类"].append({
- "节点ID": co_target_id,
- "节点名称": co_node.get("name", ""),
- "节点分类": "/".join(co_path) if co_path else "",
- "节点维度": co_node.get("dimension", ""),
- "节点类型": "分类",
- "人设全局占比": co_detail.get("probGlobal", 0),
- "父类下占比": co_detail.get("probToParent", 0),
- "共现度": round(co_score, 4),
- })
- if source_id not in match_map:
- match_map[source_id] = []
- match_map[source_id].append({
- "匹配节点": match_node_info,
- "匹配分数": round(match_score, 4),
- "所属分类": category_info,
- })
- # 5. 构建待分析节点列表
- analysis_nodes = []
- for node_id, node in nodes.items():
- if node.get("type") == "标签" and node.get("domain") == "帖子":
- dimension = node.get("dimension", "")
- if dimension in ["灵感点", "目的点", "关键点"]:
- match_info = match_map.get(node_id)
- analysis_nodes.append({
- "节点ID": node_id,
- "节点名称": node.get("name", ""),
- "节点分类": node.get("category", ""),
- "节点维度": dimension,
- "节点类型": node.get("type", ""),
- "节点描述": node.get("detail", {}).get("description", ""),
- "人设匹配": match_info,
- })
- # 6. 构建关系列表
- relation_list = []
- for edge_id, edge in edges.items():
- if edge.get("type") == "支撑":
- source_id = edge.get("source", "")
- target_id = edge.get("target", "")
- if source_id in keypoints:
- relation_list.append({
- "来源节点": source_id,
- "目标节点": target_id,
- "关系类型": "支撑",
- })
- seen_relations = set()
- for edge_id, edge in edges.items():
- if edge.get("type") == "关联":
- source_id = edge.get("source", "")
- target_id = edge.get("target", "")
- key = tuple(sorted([source_id, target_id]))
- if key not in seen_relations:
- seen_relations.add(key)
- relation_list.append({
- "来源节点": source_id,
- "目标节点": target_id,
- "关系类型": "关联",
- })
- return analysis_nodes, relation_list
- def prepare_analysis_data(post_graph: Dict, persona_graph: Dict) -> Dict:
- """
- 准备完整的分析数据
- 输出扁平化的节点列表 + 独立的人设共现关系数据
- """
- analysis_nodes, relation_list = extract_analysis_nodes(post_graph, persona_graph)
- # 扁平化节点,提取人设共现关系数据
- flat_nodes = []
- persona_co_occur = {} # {分类ID: {名称, 共现分类列表}}
- for node in analysis_nodes:
- # 基础节点字段
- flat_node = {
- "节点ID": node["节点ID"],
- "节点名称": node["节点名称"],
- "节点分类": node.get("节点分类", ""),
- "节点维度": node["节点维度"],
- "节点描述": node.get("节点描述", ""),
- "是否已知": False,
- "发现编号": None,
- }
- # 提取人设匹配信息(list格式,支持多个匹配)
- match_list = node.get("人设匹配") or []
- if match_list:
- flat_node["人设匹配"] = []
- for match_info in match_list:
- category_info = match_info.get("所属分类")
- category_id = category_info.get("节点ID") if category_info else None
- # 保留完整的匹配信息,但去掉历史共现分类(拆到外面)
- clean_match = {
- "匹配节点": match_info.get("匹配节点"),
- "匹配分数": match_info.get("匹配分数", 0),
- }
- if category_info:
- # 复制所属分类,但不包含历史共现分类
- clean_category = {k: v for k, v in category_info.items() if k != "历史共现分类"}
- clean_match["所属分类"] = clean_category
- flat_node["人设匹配"].append(clean_match)
- # 收集人设共现关系(去重)- 从历史共现分类拆出来
- if category_id and category_id not in persona_co_occur:
- co_occur_list = category_info.get("历史共现分类", [])
- if co_occur_list:
- persona_co_occur[category_id] = [
- {
- "节点ID": c.get("节点ID"),
- "节点名称": c.get("节点名称"),
- "节点分类": c.get("节点分类", ""),
- "节点维度": c.get("节点维度", ""),
- "节点类型": c.get("节点类型", ""),
- "人设全局占比": c.get("人设全局占比", 0),
- "父类下占比": c.get("父类下占比", 0),
- "共现度": c.get("共现度", 0),
- }
- for c in co_occur_list
- if c.get("节点ID")
- ]
- else:
- flat_node["人设匹配"] = []
- flat_nodes.append(flat_node)
- return {
- "帖子详情": extract_post_detail(post_graph),
- "节点列表": flat_nodes,
- "关系列表": relation_list,
- "人设共现关系": persona_co_occur,
- }
- # ===== 第二步:起点分析 =====
- def get_best_match(node: Dict) -> Optional[Dict]:
- """获取节点的最佳人设匹配(分数最高的)"""
- match_list = node.get("人设匹配") or []
- if not match_list:
- return None
- return max(match_list, key=lambda m: m.get("匹配分数", 0))
- def get_match_score(node: Dict) -> float:
- """获取节点的最高人设匹配分数"""
- best_match = get_best_match(node)
- if best_match:
- return best_match.get("匹配分数", 0)
- return 0
- def get_category_id(node: Dict) -> Optional[str]:
- """获取节点的所属分类ID(最佳匹配的)"""
- best_match = get_best_match(node)
- if best_match:
- category = best_match.get("所属分类")
- if category:
- return category.get("节点ID")
- return None
- def get_all_category_ids(node: Dict) -> List[str]:
- """获取节点所有匹配的分类ID"""
- match_list = node.get("人设匹配") or []
- result = []
- for m in match_list:
- category = m.get("所属分类")
- if category and category.get("节点ID"):
- result.append(category.get("节点ID"))
- return result
- def get_category_global_ratio(node: Dict) -> float:
- """获取节点所属分类的人设全局占比(最佳匹配的)"""
- best_match = get_best_match(node)
- if best_match:
- category = best_match.get("所属分类")
- if category:
- return category.get("人设全局占比", 0)
- return 0
- def is_persona_constant(node: Dict) -> bool:
- """判断节点是否为人设常量(匹配分数 >= 0.8 且 分类全局占比 >= 0.8)"""
- match_score = get_match_score(node)
- global_ratio = get_category_global_ratio(node)
- return match_score >= MATCH_SCORE_THRESHOLD and global_ratio >= GLOBAL_RATIO_THRESHOLD
- def build_origin_context(nodes: List[Dict]) -> Dict:
- """构造AI分析的上下文"""
- all_points = []
- for node in nodes:
- all_points.append({
- "名称": node["节点名称"],
- "分类": node.get("节点分类", ""),
- "维度": node.get("节点维度", ""),
- "描述": node.get("节点描述", ""),
- "人设匹配度": round(get_match_score(node), 2),
- "人设全局占比": round(get_category_global_ratio(node), 2),
- })
- # 起点候选集(灵感点 + 目的点)
- candidates = [
- node["节点名称"]
- for node in nodes
- if node["节点维度"] in ["灵感点", "目的点"]
- ]
- # 人设常量(匹配分数 >= 0.8 且 分类全局占比 >= 0.8)
- constants = [
- node["节点名称"]
- for node in nodes
- if is_persona_constant(node)
- ]
- return {
- "all_points": all_points,
- "candidates": candidates,
- "constants": constants,
- }
- def format_origin_prompt(context: Dict) -> str:
- """格式化起点分析的prompt"""
- all_points = context["all_points"]
- candidates = context["candidates"]
- constants = context["constants"]
- points_text = ""
- for p in all_points:
- points_text += f"- {p['名称']}\n"
- points_text += f" 维度: {p['维度']} | 分类: {p['分类']}\n"
- points_text += f" 描述: {p['描述']}\n"
- points_text += f" 人设匹配度: {p['人设匹配度']} | 人设全局占比: {p['人设全局占比']}\n"
- points_text += "\n"
- candidates_text = "、".join(candidates)
- constants_text = "、".join(constants) if constants else "无"
- prompt = f"""# Role
- 你是小红书爆款内容的"逆向工程"专家。你的核心能力是透过内容的表象(视觉/形式),还原创作者最初的脑回路(动机/实质)。
- # Task
- 我提供一组笔记的【创意标签】和一个【起点候选集】。
- 请推理出哪些选项是真正的**创意起点**。
- # Input Data
- ## 全部创意点
- {points_text}
- ## 起点候选集
- {candidates_text}
- ## 来自人设的常量
- {constants_text}
- # 推理约束
- 1. 实质推形式,而不是形式推实质,除非形式是一切创意的起点
- 2. 因推果而不是果推因
- 3. 无法被其他项或人设推理出的点,即为起点
- # Output Format
- 请输出一个标准的 JSON 格式。
- - Key: 候选集中的词。
- - Value: 一个对象,包含:
- - `score`: 0.0 到 1.0 的浮点数(代表是起点的可能性)。
- - `analysis`: 一句话推理"""
- return prompt
- async def analyze_origin(nodes: List[Dict], force_llm: bool = False) -> Dict:
- """
- 执行起点分析
- 输入: 节点列表
- 输出: 节点列表(加了起点分析、是否已知、发现编号字段)+ 中间结果
- """
- context = build_origin_context(nodes)
- prompt = format_origin_prompt(context)
- print(f"\n 起点候选: {len(context['candidates'])} 个")
- print(f" 人设常量: {len(context['constants'])} 个")
- result = await analyze(
- prompt=prompt,
- task_name=f"{TASK_NAME}/origin",
- force=force_llm,
- parse_json=True,
- )
- # 把分析结果合并到节点
- llm_result = result.data or {}
- output_nodes = []
- current_order = 1 # 已知节点的发现编号计数
- for node in nodes:
- new_node = dict(node) # 复制原节点
- name = node["节点名称"]
- if name in llm_result:
- score = llm_result[name].get("score", 0)
- analysis = llm_result[name].get("analysis", "")
- # 加起点分析
- new_node["起点分析"] = {
- "分数": score,
- "说明": analysis,
- }
- # 高分起点标记为已知
- if score >= ORIGIN_SCORE_THRESHOLD:
- new_node["是否已知"] = True
- new_node["发现编号"] = current_order
- current_order += 1
- else:
- new_node["起点分析"] = None
- output_nodes.append(new_node)
- return {
- "输入上下文": {
- "起点候选": context["candidates"],
- "人设常量": context["constants"],
- },
- "中间结果": llm_result,
- "输出节点": output_nodes,
- "cache_hit": result.cache_hit,
- "model": result.model_name,
- "log_url": result.log_url,
- }
- # ===== 第三步:模式推导 =====
- def derive_patterns(
- nodes: List[Dict],
- persona_co_occur: Dict[str, Dict],
- ) -> Dict:
- """
- 基于共现关系的迭代推导
- 输入: 带起点分析的节点列表 + 人设共现关系数据
- 输出: 节点列表(加了推导轮次、未知原因字段)+ 推导边列表
- """
- node_by_name: Dict[str, Dict] = {n["节点名称"]: n for n in nodes}
- # 构建共现查找表 {节点ID: {共现节点ID: 共现度}}
- co_occur_lookup = {}
- for cat_id, co_occur_list in persona_co_occur.items():
- co_occur_lookup[cat_id] = {
- c["节点ID"]: c["共现度"]
- for c in co_occur_list
- }
- # 1. 初始化已知点集合(已经是已知的节点)
- known_names: Set[str] = set()
- node_round: Dict[str, int] = {} # {节点名称: 加入轮次}
- for node in nodes:
- if node.get("是否已知"):
- known_names.add(node["节点名称"])
- node_round[node["节点名称"]] = 0
- unknown_names: Set[str] = set(node_by_name.keys()) - known_names
- edges: List[Dict] = []
- # 2. 迭代推导
- round_num = 0
- new_known_this_round = known_names.copy()
- while new_known_this_round:
- round_num += 1
- new_known_next_round: Set[str] = set()
- for known_name in new_known_this_round:
- known_node = node_by_name.get(known_name)
- if not known_node:
- continue
- if get_match_score(known_node) < MATCH_SCORE_THRESHOLD:
- continue
- # 获取该节点所属分类的共现列表
- known_cat_id = get_category_id(known_node)
- if not known_cat_id or known_cat_id not in co_occur_lookup:
- continue
- co_occur_map = co_occur_lookup[known_cat_id]
- for unknown_name in list(unknown_names):
- unknown_node = node_by_name.get(unknown_name)
- if not unknown_node:
- continue
- if get_match_score(unknown_node) < MATCH_SCORE_THRESHOLD:
- continue
- # 检查未知节点的分类是否在已知节点的共现列表中
- unknown_cat_id = get_category_id(unknown_node)
- if unknown_cat_id and unknown_cat_id in co_occur_map:
- co_occur_score = co_occur_map[unknown_cat_id]
- new_known_next_round.add(unknown_name)
- node_round[unknown_name] = round_num
- edges.append({
- "来源": known_node["节点ID"],
- "目标": unknown_node["节点ID"],
- "关系类型": "共现推导",
- "推导轮次": round_num,
- "共现分类ID": unknown_cat_id,
- "共现度": co_occur_score,
- })
- known_names.update(new_known_next_round)
- unknown_names -= new_known_next_round
- new_known_this_round = new_known_next_round
- if not new_known_next_round:
- break
- # 3. 构建输出节点(只更新是否已知、发现编号)
- # 先找出当前最大发现编号
- max_order = 0
- for node in nodes:
- if node.get("发现编号") and node["发现编号"] > max_order:
- max_order = node["发现编号"]
- # 按推导轮次排序新发现的节点,分配发现编号
- new_known_by_round = {}
- for name, r in node_round.items():
- if r > 0: # 排除起点(轮次0)
- if r not in new_known_by_round:
- new_known_by_round[r] = []
- new_known_by_round[r].append(name)
- # 分配发现编号
- order_map = {}
- current_order = max_order + 1
- for r in sorted(new_known_by_round.keys()):
- for name in new_known_by_round[r]:
- order_map[name] = current_order
- current_order += 1
- output_nodes = []
- for node in nodes:
- new_node = dict(node)
- name = node["节点名称"]
- # 如果是新推导出来的(非起点),更新已知状态和发现编号
- if name in node_round and node_round[name] > 0:
- new_node["是否已知"] = True
- new_node["发现编号"] = order_map.get(name)
- output_nodes.append(new_node)
- return {
- "输出节点": output_nodes,
- "推导边列表": edges,
- "推导轮次": round_num,
- }
- # ===== 第四步:下一步分析 =====
- def build_next_step_context(known_nodes: List[Dict], unknown_nodes: List[Dict], all_nodes: List[Dict]) -> Dict:
- """构造下一步分析的上下文"""
- # 已知点信息(按发现顺序排序)
- known_sorted = sorted(known_nodes, key=lambda n: n.get("发现编号") or 999)
- known_info = []
- for n in known_sorted:
- info = {
- "名称": n["节点名称"],
- "维度": n["节点维度"],
- "分类": n.get("节点分类", ""),
- "描述": n.get("节点描述", ""),
- "人设匹配度": round(get_match_score(n), 2),
- "人设全局占比": round(get_category_global_ratio(n), 2),
- "发现编号": n.get("发现编号"),
- }
- # 如果有起点分析,加上
- if n.get("起点分析"):
- info["起点说明"] = n["起点分析"].get("说明", "")
- known_info.append(info)
- # 未知点信息
- unknown_info = []
- for n in unknown_nodes:
- unknown_info.append({
- "名称": n["节点名称"],
- "维度": n["节点维度"],
- "分类": n.get("节点分类", ""),
- "描述": n.get("节点描述", ""),
- "人设匹配度": round(get_match_score(n), 2),
- "人设全局占比": round(get_category_global_ratio(n), 2),
- })
- # 人设常量(从全部节点中筛选)
- constants = [
- n["节点名称"]
- for n in all_nodes
- if is_persona_constant(n)
- ]
- return {
- "known_nodes": known_info,
- "unknown_nodes": unknown_info,
- "constants": constants,
- }
- def format_next_step_prompt(context: Dict) -> str:
- """格式化下一步分析的prompt"""
- known_text = ""
- for i, n in enumerate(context["known_nodes"], 1):
- known_text += f"{i}. {n['名称']} ({n['维度']})\n"
- known_text += f" 分类: {n['分类']}\n"
- known_text += f" 描述: {n['描述']}\n"
- known_text += f" 人设匹配度: {n['人设匹配度']} | 人设全局占比: {n['人设全局占比']}\n"
- if n.get("起点说明"):
- known_text += f" 起点说明: {n['起点说明']}\n"
- known_text += "\n"
- unknown_text = ""
- for n in context["unknown_nodes"]:
- unknown_text += f"- {n['名称']} ({n['维度']})\n"
- unknown_text += f" 分类: {n['分类']}\n"
- unknown_text += f" 描述: {n['描述']}\n"
- unknown_text += f" 人设匹配度: {n['人设匹配度']} | 人设全局占比: {n['人设全局占比']}\n\n"
- constants = context.get("constants", [])
- constants_text = "、".join(constants) if constants else "无"
- prompt = f"""# Role
- 你是小红书爆款内容的"逆向工程"专家。你的任务是还原创作者的思维路径。
- # Task
- 基于已知的创意点,推理哪些未知点最可能是创作者**下一步直接想到**的点。
- 可以有多个点同时被想到(如果它们在逻辑上是并列的)。
- ## 已知点(按发现顺序)
- {known_text}
- ## 未知点(待推理)
- {unknown_text}
- ## 人设常量
- {constants_text}
- # 推理约束
- 1. 创作者的思维是有逻辑的:先有动机/目的,再想形式/手法
- 2. 关键点通常是为了支撑灵感点或目的点
- 3. 人设常量是创作者固有的风格,不需要推理
- 4. 只输出"下一步直接能想到"的点,不是所有未知点
- # Output Format
- 输出 JSON,对每个未知点评分:
- - Key: 未知点名称
- - Value: 对象,包含:
- - `score`: 0.0-1.0(下一步被想到的可能性)
- - `from`: 从哪个已知点推导出来(已知点名称)
- - `reason`: 如何从该已知点推导出来(一句话)"""
- return prompt
- async def analyze_next_step(
- nodes: List[Dict],
- force_llm: bool = False
- ) -> Dict:
- """
- 执行下一步分析
- 输入: 节点列表(有已知和未知)
- 输出: 最可能的下一步点列表
- """
- # 分离已知和未知
- known_nodes = [n for n in nodes if n.get("是否已知")]
- unknown_nodes = [n for n in nodes if not n.get("是否已知")]
- if not unknown_nodes:
- return {
- "输入上下文": {"已知点": [], "未知点": [], "人设常量": []},
- "中间结果": [],
- "下一步点": [],
- }
- context = build_next_step_context(known_nodes, unknown_nodes, nodes)
- prompt = format_next_step_prompt(context)
- print(f"\n 已知点: {len(known_nodes)} 个")
- print(f" 未知点: {len(unknown_nodes)} 个")
- result = await analyze(
- prompt=prompt,
- task_name=f"{TASK_NAME}/next_step",
- force=force_llm,
- parse_json=True,
- )
- # 解析结果(现在是 {name: {score, from, reason}} 格式)
- llm_result = result.data or {}
- # 构建候选列表,按分数排序
- candidates = []
- for name, info in llm_result.items():
- candidates.append({
- "节点名称": name,
- "可能性分数": info.get("score", 0),
- "推导来源": info.get("from", ""),
- "推理说明": info.get("reason", ""),
- })
- candidates.sort(key=lambda x: x["可能性分数"], reverse=True)
- return {
- "输入上下文": {
- "已知点": context["known_nodes"],
- "未知点": context["unknown_nodes"],
- "人设常量": context["constants"],
- },
- "中间结果": llm_result,
- "下一步候选": candidates,
- "cache_hit": result.cache_hit,
- "model": result.model_name,
- "log_url": result.log_url,
- }
- # ===== 完整流程 =====
- def save_result(post_id: str, post_detail: Dict, steps: List, config: PathConfig) -> Path:
- """保存结果到文件"""
- output_dir = config.intermediate_dir / "creation_pattern"
- output_dir.mkdir(parents=True, exist_ok=True)
- output_file = output_dir / f"{post_id}_创作模式.json"
- result = {
- "帖子详情": post_detail,
- "步骤列表": steps,
- }
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(result, f, ensure_ascii=False, indent=2)
- print(f" [已保存] {output_file.name}")
- return output_file
- async def process_single_post(
- post_file: Path,
- persona_graph: Dict,
- config: PathConfig,
- force_llm: bool = False,
- max_step: int = 3,
- ) -> Dict:
- """
- 处理单个帖子
- Args:
- force_llm: 强制重新调用LLM(跳过LLM缓存)
- max_step: 最多运行到第几步 (1=数据准备, 2=起点分析, 3=模式推导)
- """
- post_graph = load_json(post_file)
- post_id = post_graph.get("meta", {}).get("postId", "unknown")
- print(f"\n{'=' * 60}")
- print(f"处理帖子: {post_id}")
- print("-" * 60)
- steps = []
- # ===== 步骤1:数据准备 =====
- print("\n[步骤1] 数据准备...")
- data = prepare_analysis_data(post_graph, persona_graph)
- post_detail = data["帖子详情"]
- nodes_step1 = data["节点列表"]
- relations_step1 = data["关系列表"]
- persona_co_occur = data["人设共现关系"]
- # 步骤1所有节点都是新的
- new_known_step1 = [n["节点名称"] for n in nodes_step1 if n.get("是否已知")]
- step1 = {
- "步骤": "数据准备",
- "输入": {
- "帖子图谱": str(post_file.name),
- "人设图谱": "人设图谱.json",
- },
- "输出": {
- "新的已知节点": new_known_step1,
- "新的边": [],
- "节点列表": nodes_step1,
- "边列表": relations_step1,
- },
- "人设共现关系": persona_co_occur,
- "摘要": {
- "节点数": len(nodes_step1),
- "边数": len(relations_step1),
- "人设共现数": len(persona_co_occur),
- },
- }
- steps.append(step1)
- print(f" 节点数: {len(nodes_step1)}")
- print(f" 关系数: {len(relations_step1)}")
- print(f" 人设共现数: {len(persona_co_occur)}")
- # 步骤1完成,保存
- save_result(post_id, post_detail, steps, config)
- if max_step == 1:
- return {"帖子详情": post_detail, "步骤列表": steps}
- # ===== 步骤2:起点分析 =====
- print("\n[步骤2] 起点分析...")
- origin_result = await analyze_origin(nodes_step1, force_llm=force_llm)
- nodes_step2 = origin_result["输出节点"]
- # 统计高分起点
- def get_origin_score(node):
- analysis = node.get("起点分析")
- if analysis:
- return analysis.get("分数", 0)
- return 0
- high_score_origins = [
- (n["节点名称"], get_origin_score(n))
- for n in nodes_step2
- if get_origin_score(n) >= 0.7
- ]
- # 新发现的已知节点(起点)
- new_known_nodes = [n["节点名称"] for n in nodes_step2 if n.get("是否已知")]
- step2 = {
- "步骤": "起点分析",
- "输入": {
- "节点列表": nodes_step1,
- "起点候选": origin_result["输入上下文"]["起点候选"],
- "人设常量": origin_result["输入上下文"]["人设常量"],
- },
- "中间结果": origin_result["中间结果"],
- "输出": {
- "新的已知节点": new_known_nodes,
- "新的边": [],
- "节点列表": nodes_step2,
- "边列表": relations_step1, # 边没变化
- },
- "摘要": {
- "新已知数": len(new_known_nodes),
- "model": origin_result["model"],
- "cache_hit": origin_result["cache_hit"],
- "log_url": origin_result.get("log_url"),
- },
- }
- steps.append(step2)
- print(f" 高分起点 (>=0.7): {len(high_score_origins)} 个")
- for name, score in sorted(high_score_origins, key=lambda x: -x[1]):
- print(f" ★ {name}: {score:.2f}")
- # 步骤2完成,保存
- save_result(post_id, post_detail, steps, config)
- if max_step == 2:
- return {"帖子详情": post_detail, "步骤列表": steps}
- # ===== 步骤3:模式推导 =====
- print("\n[步骤3] 模式推导...")
- derivation_result = derive_patterns(nodes_step2, persona_co_occur)
- nodes_step3 = derivation_result["输出节点"]
- edges = derivation_result["推导边列表"]
- # 统计
- known_count = sum(1 for n in nodes_step3 if n.get("是否已知"))
- unknown_count = len(nodes_step3) - known_count
- # 新发现的已知节点(本步骤推导出来的,不包括之前的起点)
- prev_known = {n["节点名称"] for n in nodes_step2 if n.get("是否已知")}
- new_known_nodes = [n["节点名称"] for n in nodes_step3 if n.get("是否已知") and n["节点名称"] not in prev_known]
- # 合并边列表(原有边 + 推导边)
- all_edges = relations_step1 + edges
- step3 = {
- "步骤": "模式推导",
- "输入": {
- "节点列表": nodes_step2,
- "人设共现关系": persona_co_occur,
- },
- "输出": {
- "新的已知节点": new_known_nodes,
- "新的边": edges,
- "节点列表": nodes_step3,
- "边列表": all_edges,
- },
- "摘要": {
- "已知点数": known_count,
- "新已知数": len(new_known_nodes),
- "新边数": len(edges),
- "未知点数": unknown_count,
- },
- }
- steps.append(step3)
- print(f" 已知点: {known_count} 个")
- print(f" 推导边: {len(edges)} 条")
- print(f" 未知点: {unknown_count} 个")
- # 步骤3完成,保存
- save_result(post_id, post_detail, steps, config)
- if max_step == 3:
- return {"帖子详情": post_detail, "步骤列表": steps}
- # ===== 步骤4:下一步分析 =====
- print("\n[步骤4] 下一步分析...")
- next_step_result = await analyze_next_step(nodes_step3, force_llm=force_llm)
- # 获取候选列表
- candidates = next_step_result["下一步候选"]
- # 筛选高分候选 (>= 0.8)
- NEXT_STEP_THRESHOLD = 0.8
- high_score_candidates = [c for c in candidates if c["可能性分数"] >= NEXT_STEP_THRESHOLD]
- # 构建节点名称到节点的映射
- node_by_name = {n["节点名称"]: n for n in nodes_step3}
- # 找出当前最大发现编号
- max_order = max((n.get("发现编号") or 0) for n in nodes_step3)
- # 更新节点:把高分候选标记为已知
- nodes_step4 = []
- new_known_names = []
- current_order = max_order + 1
- for node in nodes_step3:
- new_node = dict(node)
- name = node["节点名称"]
- # 检查是否在高分候选中
- matching = [c for c in high_score_candidates if c["节点名称"] == name]
- if matching and not node.get("是否已知"):
- new_node["是否已知"] = True
- new_node["发现编号"] = current_order
- current_order += 1
- new_known_names.append(name)
- nodes_step4.append(new_node)
- # 创建新的边(推导边)
- new_edges = []
- for c in high_score_candidates:
- target_node = node_by_name.get(c["节点名称"])
- source_name = c["推导来源"]
- source_node = node_by_name.get(source_name)
- if target_node and source_node:
- new_edges.append({
- "来源": source_node["节点ID"],
- "目标": target_node["节点ID"],
- "关系类型": "AI推导",
- "可能性分数": c["可能性分数"],
- "推理说明": c["推理说明"],
- })
- # 合并边列表
- all_edges_step4 = all_edges + new_edges
- step4 = {
- "步骤": "下一步分析",
- "输入": {
- "已知点": next_step_result["输入上下文"]["已知点"],
- "未知点": next_step_result["输入上下文"]["未知点"],
- "人设常量": next_step_result["输入上下文"]["人设常量"],
- },
- "中间结果": next_step_result["中间结果"],
- "输出": {
- "新的已知节点": new_known_names,
- "新的边": new_edges,
- "节点列表": nodes_step4,
- "边列表": all_edges_step4,
- },
- "摘要": {
- "已知点数": sum(1 for n in nodes_step4 if n.get("是否已知")),
- "新已知数": len(new_known_names),
- "新边数": len(new_edges),
- "未知点数": sum(1 for n in nodes_step4 if not n.get("是否已知")),
- "model": next_step_result.get("model"),
- "cache_hit": next_step_result.get("cache_hit"),
- "log_url": next_step_result.get("log_url"),
- },
- }
- steps.append(step4)
- # 打印高分候选
- print(f" 候选数: {len(candidates)} 个")
- print(f" 高分候选 (>={NEXT_STEP_THRESHOLD}): {len(high_score_candidates)} 个")
- for c in high_score_candidates:
- print(f" ★ {c['节点名称']} ({c['可能性分数']:.2f}) ← {c['推导来源']}")
- print(f" {c['推理说明']}")
- # 步骤4完成,保存
- save_result(post_id, post_detail, steps, config)
- if max_step == 4:
- return {"帖子详情": post_detail, "步骤列表": steps}
- # ===== 循环:步骤3→步骤4 直到全部已知 =====
- iteration = 1
- current_nodes = nodes_step4
- current_edges = all_edges_step4
- MAX_ITERATIONS = 10 # 防止无限循环
- while True:
- # 检查是否还有未知节点
- unknown_count = sum(1 for n in current_nodes if not n.get("是否已知"))
- if unknown_count == 0:
- print(f"\n[完成] 所有节点已变为已知")
- break
- if iteration > MAX_ITERATIONS:
- print(f"\n[警告] 达到最大迭代次数 {MAX_ITERATIONS},停止循环")
- break
- # ===== 迭代步骤3:共现推导 =====
- print(f"\n[迭代{iteration}-步骤3] 模式推导...")
- derivation_result = derive_patterns(current_nodes, persona_co_occur)
- nodes_iter3 = derivation_result["输出节点"]
- edges_iter3 = derivation_result["推导边列表"]
- # 统计新推导的
- prev_known_names = {n["节点名称"] for n in current_nodes if n.get("是否已知")}
- new_known_step3 = [n["节点名称"] for n in nodes_iter3 if n.get("是否已知") and n["节点名称"] not in prev_known_names]
- new_edges_step3 = edges_iter3 # derive_patterns 返回的是本轮新增的边
- all_edges_iter3 = current_edges + new_edges_step3
- step_iter3 = {
- "步骤": f"迭代{iteration}-模式推导",
- "输入": {
- "节点列表": current_nodes,
- "人设共现关系": persona_co_occur,
- },
- "输出": {
- "新的已知节点": new_known_step3,
- "新的边": new_edges_step3,
- "节点列表": nodes_iter3,
- "边列表": all_edges_iter3,
- },
- "摘要": {
- "已知点数": sum(1 for n in nodes_iter3 if n.get("是否已知")),
- "新已知数": len(new_known_step3),
- "新边数": len(new_edges_step3),
- "未知点数": sum(1 for n in nodes_iter3 if not n.get("是否已知")),
- },
- }
- steps.append(step_iter3)
- print(f" 新已知: {len(new_known_step3)} 个")
- print(f" 新边: {len(new_edges_step3)} 条")
- save_result(post_id, post_detail, steps, config)
- # 检查是否还有未知
- unknown_after_step3 = sum(1 for n in nodes_iter3 if not n.get("是否已知"))
- if unknown_after_step3 == 0:
- print(f"\n[完成] 所有节点已变为已知")
- break
- # ===== 迭代步骤4:AI推导 =====
- print(f"\n[迭代{iteration}-步骤4] 下一步分析...")
- next_step_result = await analyze_next_step(nodes_iter3, force_llm=force_llm)
- candidates_iter4 = next_step_result["下一步候选"]
- high_score_iter4 = [c for c in candidates_iter4 if c["可能性分数"] >= NEXT_STEP_THRESHOLD]
- # 更新节点
- node_by_name_iter4 = {n["节点名称"]: n for n in nodes_iter3}
- max_order_iter4 = max((n.get("发现编号") or 0) for n in nodes_iter3)
- nodes_iter4 = []
- new_known_iter4 = []
- current_order_iter4 = max_order_iter4 + 1
- for node in nodes_iter3:
- new_node = dict(node)
- name = node["节点名称"]
- matching = [c for c in high_score_iter4 if c["节点名称"] == name]
- if matching and not node.get("是否已知"):
- new_node["是否已知"] = True
- new_node["发现编号"] = current_order_iter4
- current_order_iter4 += 1
- new_known_iter4.append(name)
- nodes_iter4.append(new_node)
- # 创建新边
- new_edges_iter4 = []
- for c in high_score_iter4:
- target_node = node_by_name_iter4.get(c["节点名称"])
- source_node = node_by_name_iter4.get(c["推导来源"])
- if target_node and source_node:
- new_edges_iter4.append({
- "来源": source_node["节点ID"],
- "目标": target_node["节点ID"],
- "关系类型": "AI推导",
- "可能性分数": c["可能性分数"],
- "推理说明": c["推理说明"],
- })
- all_edges_iter4 = all_edges_iter3 + new_edges_iter4
- step_iter4 = {
- "步骤": f"迭代{iteration}-下一步分析",
- "输入": {
- "已知点": next_step_result["输入上下文"]["已知点"],
- "未知点": next_step_result["输入上下文"]["未知点"],
- "人设常量": next_step_result["输入上下文"]["人设常量"],
- },
- "中间结果": next_step_result["中间结果"],
- "输出": {
- "新的已知节点": new_known_iter4,
- "新的边": new_edges_iter4,
- "节点列表": nodes_iter4,
- "边列表": all_edges_iter4,
- },
- "摘要": {
- "已知点数": sum(1 for n in nodes_iter4 if n.get("是否已知")),
- "新已知数": len(new_known_iter4),
- "新边数": len(new_edges_iter4),
- "未知点数": sum(1 for n in nodes_iter4 if not n.get("是否已知")),
- "model": next_step_result.get("model"),
- "cache_hit": next_step_result.get("cache_hit"),
- },
- }
- steps.append(step_iter4)
- print(f" 新已知: {len(new_known_iter4)} 个")
- print(f" 新边: {len(new_edges_iter4)} 条")
- save_result(post_id, post_detail, steps, config)
- # 如果这轮没有新进展,停止
- if len(new_known_step3) == 0 and len(new_known_iter4) == 0:
- print(f"\n[停止] 本轮无新进展,停止循环")
- break
- # 更新状态,进入下一轮
- current_nodes = nodes_iter4
- current_edges = all_edges_iter4
- iteration += 1
- return {"帖子详情": post_detail, "步骤列表": steps}
- # ===== 主函数 =====
- async def main(
- post_id: str = None,
- all_posts: bool = False,
- force_llm: bool = False,
- max_step: int = 3,
- ):
- """主函数"""
- _, log_url = set_trace()
- config = PathConfig()
- print(f"账号: {config.account_name}")
- print(f"Trace URL: {log_url}")
- # 加载人设图谱
- persona_graph_file = config.intermediate_dir / "人设图谱.json"
- if not persona_graph_file.exists():
- print(f"错误: 人设图谱文件不存在: {persona_graph_file}")
- return
- persona_graph = load_json(persona_graph_file)
- print(f"人设图谱节点数: {len(persona_graph.get('nodes', {}))}")
- # 获取帖子图谱文件
- post_graph_files = get_post_graph_files(config)
- if not post_graph_files:
- print("错误: 没有找到帖子图谱文件")
- return
- # 确定要处理的帖子
- if post_id:
- target_file = next(
- (f for f in post_graph_files if post_id in f.name),
- None
- )
- if not target_file:
- print(f"错误: 未找到帖子 {post_id}")
- return
- files_to_process = [target_file]
- elif all_posts:
- files_to_process = post_graph_files
- else:
- files_to_process = [post_graph_files[0]]
- print(f"待处理帖子数: {len(files_to_process)}")
- # 处理
- results = []
- for i, post_file in enumerate(files_to_process, 1):
- print(f"\n{'#' * 60}")
- print(f"# 处理帖子 {i}/{len(files_to_process)}")
- print(f"{'#' * 60}")
- result = await process_single_post(
- post_file=post_file,
- persona_graph=persona_graph,
- config=config,
- force_llm=force_llm,
- max_step=max_step,
- )
- results.append(result)
- # 汇总
- print(f"\n{'#' * 60}")
- print(f"# 完成! 共处理 {len(results)} 个帖子")
- print(f"{'#' * 60}")
- print(f"Trace: {log_url}")
- print("\n汇总:")
- for result in results:
- post_id = result["帖子详情"]["postId"]
- steps = result.get("步骤列表", [])
- num_steps = len(steps)
- if num_steps == 1:
- step1_summary = steps[0].get("摘要", {})
- print(f" {post_id}: 节点数={step1_summary.get('节点数', 0)} (仅数据准备)")
- elif num_steps == 2:
- step2_summary = steps[1].get("摘要", {})
- print(f" {post_id}: 起点={step2_summary.get('新已知数', 0)} (未推导)")
- elif num_steps == 3:
- step3_summary = steps[2].get("摘要", {})
- print(f" {post_id}: 已知={step3_summary.get('已知点数', 0)}, "
- f"未知={step3_summary.get('未知点数', 0)}")
- elif num_steps >= 4:
- step4_summary = steps[3].get("摘要", {})
- print(f" {post_id}: 已知={step4_summary.get('已知点数', 0)}, "
- f"新已知={step4_summary.get('新已知数', 0)}, "
- f"新边={step4_summary.get('新边数', 0)}, "
- f"未知={step4_summary.get('未知点数', 0)}")
- else:
- print(f" {post_id}: 无步骤数据")
- if __name__ == "__main__":
- import argparse
- parser = argparse.ArgumentParser(description="创作模式分析")
- parser.add_argument("--post-id", type=str, help="帖子ID")
- parser.add_argument("--all-posts", action="store_true", help="处理所有帖子")
- parser.add_argument("--force-llm", action="store_true", help="强制重新调用LLM(跳过LLM缓存)")
- parser.add_argument("--step", type=int, default=5, choices=[1, 2, 3, 4, 5],
- help="运行到第几步 (1=数据准备, 2=起点分析, 3=模式推导, 4=下一步分析, 5=完整循环)")
- args = parser.parse_args()
- asyncio.run(main(
- post_id=args.post_id,
- all_posts=args.all_posts,
- force_llm=args.force_llm,
- max_step=args.step,
- ))
|