| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 创作起点分析
- 整合数据准备 + AI分析两步流程:
- 1. 根据帖子图谱 + 人设图谱,准备待分析数据
- 2. 调用AI分析起点
- 输入:帖子图谱 + 人设图谱
- 输出:起点分析结果
- """
- import asyncio
- import json
- from pathlib import Path
- from typing import Dict, List, Optional
- import sys
- # 添加项目根目录到路径
- project_root = Path(__file__).parent.parent.parent
- sys.path.insert(0, str(project_root))
- from agents import Agent, Runner, ModelSettings, trace
- from agents.tracing.create import custom_span
- from lib.client import get_model
- from lib.my_trace import set_trace_smith as set_trace
- from script.data_processing.path_config import PathConfig
- # ===== 配置 =====
- MODEL_NAME = "google/gemini-3-pro-preview"
- # MODEL_NAME = "anthropic/claude-sonnet-4"
- MATCH_SCORE_THRESHOLD = 0.8 # 匹配分数阈值
- GLOBAL_RATIO_THRESHOLD = 0.8 # 全局占比阈值
- agent = Agent(
- name="Creation Origin Analyzer",
- model=get_model(MODEL_NAME),
- model_settings=ModelSettings(
- temperature=0.0,
- max_tokens=8192,
- ),
- tools=[],
- )
- # ===== 数据加载 =====
- def load_json(file_path: Path) -> Dict:
- """加载JSON文件"""
- with open(file_path, "r", encoding="utf-8") as f:
- return json.load(f)
- def get_post_graph_files(config: PathConfig) -> List[Path]:
- """获取所有帖子图谱文件"""
- post_graph_dir = config.intermediate_dir / "post_graph"
- return sorted(post_graph_dir.glob("*_帖子图谱.json"))
- def get_result_file(config: PathConfig, post_id: str) -> Path:
- """获取分析结果文件路径"""
- return config.intermediate_dir / "origin_analysis_result" / f"{post_id}_起点分析.json"
- def is_already_processed(config: PathConfig, post_id: str) -> bool:
- """检查帖子是否已处理过"""
- result_file = get_result_file(config, post_id)
- return result_file.exists()
- # ===== 第一步:数据准备 =====
- def extract_post_detail(post_graph: Dict) -> Dict:
- """提取帖子详情(保留原始字段名)"""
- meta = post_graph.get("meta", {})
- post_detail = meta.get("postDetail", {})
- return {
- "postId": meta.get("postId", ""),
- "postTitle": meta.get("postTitle", ""),
- "body_text": post_detail.get("body_text", ""),
- "images": post_detail.get("images", []),
- "video": post_detail.get("video"),
- "publish_time": post_detail.get("publish_time", ""),
- "like_count": post_detail.get("like_count", 0),
- "collect_count": post_detail.get("collect_count", 0),
- }
- def extract_analysis_nodes(post_graph: Dict, persona_graph: Dict) -> tuple:
- """
- 提取待分析节点列表
- 待分析节点 = 灵感点 + 目的点 + 关键点
- """
- nodes = post_graph.get("nodes", {})
- edges = post_graph.get("edges", {})
- persona_nodes = persona_graph.get("nodes", {})
- persona_index = persona_graph.get("index", {})
- # 1. 收集关键点信息(用于支撑信息)
- keypoints = {}
- for node_id, node in nodes.items():
- if node.get("type") == "标签" and node.get("dimension") == "关键点":
- keypoints[node_id] = {
- "名称": node.get("name", ""),
- "描述": node.get("detail", {}).get("description", ""),
- }
- # 2. 分析支撑关系:关键点 → 灵感点/目的点
- support_map = {} # {target_node_id: [支撑的关键点信息]}
- for edge_id, edge in edges.items():
- if edge.get("type") == "支撑":
- source_id = edge.get("source", "")
- target_id = edge.get("target", "")
- if source_id in keypoints:
- if target_id not in support_map:
- support_map[target_id] = []
- support_map[target_id].append(keypoints[source_id])
- # 3. 分析关联关系
- relation_map = {} # {node_id: [关联的节点名称]}
- for edge_id, edge in edges.items():
- if edge.get("type") == "关联":
- source_id = edge.get("source", "")
- target_id = edge.get("target", "")
- source_name = nodes.get(source_id, {}).get("name", "")
- target_name = nodes.get(target_id, {}).get("name", "")
- # 双向记录
- if source_id not in relation_map:
- relation_map[source_id] = []
- relation_map[source_id].append(target_name)
- if target_id not in relation_map:
- relation_map[target_id] = []
- relation_map[target_id].append(source_name)
- # 4. 分析人设匹配
- match_map = {} # {node_id: 匹配信息}
- persona_out_edges = persona_index.get("outEdges", {})
- def get_node_info(node_id: str) -> Optional[Dict]:
- """获取人设节点的标准信息"""
- node = persona_nodes.get(node_id, {})
- if not node:
- return None
- detail = node.get("detail", {})
- parent_path = detail.get("parentPath", [])
- return {
- "节点ID": node_id,
- "节点名称": node.get("name", ""),
- "节点分类": "/".join(parent_path) if parent_path else "",
- "节点维度": node.get("dimension", ""),
- "节点类型": node.get("type", ""),
- "人设全局占比": detail.get("probGlobal", 0),
- "父类下占比": detail.get("probToParent", 0),
- }
- def get_parent_category_id(node_id: str) -> Optional[str]:
- """通过属于边获取父分类节点ID"""
- belong_edges = persona_out_edges.get(node_id, {}).get("属于", [])
- for edge in belong_edges:
- target_id = edge.get("target", "")
- target_node = persona_nodes.get(target_id, {})
- if target_node.get("type") == "分类":
- return target_id
- return None
- for edge_id, edge in edges.items():
- if edge.get("type") == "匹配":
- source_id = edge.get("source", "")
- target_id = edge.get("target", "")
- # 只处理 帖子节点 → 人设节点 的匹配
- if source_id.startswith("帖子:") and target_id.startswith("人设:"):
- match_score = edge.get("score", 0)
- persona_node = persona_nodes.get(target_id, {})
- if persona_node:
- node_type = persona_node.get("type", "")
- # 获取匹配节点信息
- match_node_info = get_node_info(target_id)
- if not match_node_info:
- continue
- # 确定所属分类节点
- if node_type == "标签":
- # 标签:找父分类
- category_id = get_parent_category_id(target_id)
- else:
- # 分类:就是自己
- category_id = target_id
- # 获取所属分类信息和常见搭配
- category_info = None
- if category_id:
- category_node = persona_nodes.get(category_id, {})
- if category_node:
- category_detail = category_node.get("detail", {})
- category_path = category_detail.get("parentPath", [])
- category_info = {
- "节点ID": category_id,
- "节点名称": category_node.get("name", ""),
- "节点分类": "/".join(category_path) if category_path else "",
- "节点维度": category_node.get("dimension", ""),
- "节点类型": "分类",
- "人设全局占比": category_detail.get("probGlobal", 0),
- "父类下占比": category_detail.get("probToParent", 0),
- "历史共现分类": [],
- }
- # 获取分类共现节点(按共现度降序排列)
- co_occur_edges = persona_out_edges.get(category_id, {}).get("分类共现", [])
- co_occur_edges_sorted = sorted(co_occur_edges, key=lambda x: x.get("score", 0), reverse=True)
- for co_edge in co_occur_edges_sorted[:5]: # 取前5个
- co_target_id = co_edge.get("target", "")
- co_score = co_edge.get("score", 0)
- co_node = persona_nodes.get(co_target_id, {})
- if co_node:
- co_detail = co_node.get("detail", {})
- co_path = co_detail.get("parentPath", [])
- category_info["历史共现分类"].append({
- "节点ID": co_target_id,
- "节点名称": co_node.get("name", ""),
- "节点分类": "/".join(co_path) if co_path else "",
- "节点维度": co_node.get("dimension", ""),
- "节点类型": "分类",
- "人设全局占比": co_detail.get("probGlobal", 0),
- "父类下占比": co_detail.get("probToParent", 0),
- "共现度": round(co_score, 4),
- })
- match_map[source_id] = {
- "匹配节点": match_node_info,
- "匹配分数": round(match_score, 4),
- "所属分类": category_info,
- }
- # 5. 构建待分析节点列表(灵感点、目的点、关键点)
- analysis_nodes = []
- for node_id, node in nodes.items():
- if node.get("type") == "标签" and node.get("domain") == "帖子":
- dimension = node.get("dimension", "")
- if dimension in ["灵感点", "目的点", "关键点"]:
- # 人设匹配信息
- match_info = match_map.get(node_id)
- analysis_nodes.append({
- "节点ID": node_id,
- "节点名称": node.get("name", ""),
- "节点分类": node.get("category", ""), # 根分类:意图/实质/形式
- "节点维度": dimension,
- "节点类型": node.get("type", ""),
- "节点描述": node.get("detail", {}).get("description", ""),
- "人设匹配": match_info,
- })
- # 6. 构建可能的关系列表
- relation_list = []
- # 支撑关系:关键点 → 灵感点/目的点
- for edge_id, edge in edges.items():
- if edge.get("type") == "支撑":
- source_id = edge.get("source", "")
- target_id = edge.get("target", "")
- if source_id in keypoints:
- relation_list.append({
- "来源节点": source_id,
- "目标节点": target_id,
- "关系类型": "支撑",
- })
- # 关联关系:节点之间的关联(去重,只记录一次)
- seen_relations = set()
- for edge_id, edge in edges.items():
- if edge.get("type") == "关联":
- source_id = edge.get("source", "")
- target_id = edge.get("target", "")
- # 用排序后的元组作为key去重
- key = tuple(sorted([source_id, target_id]))
- if key not in seen_relations:
- seen_relations.add(key)
- relation_list.append({
- "来源节点": source_id,
- "目标节点": target_id,
- "关系类型": "关联",
- })
- return analysis_nodes, relation_list
- def prepare_analysis_data(post_graph: Dict, persona_graph: Dict) -> Dict:
- """
- 准备完整的分析数据
- Returns:
- {
- "帖子详情": {...},
- "待分析节点列表": [...],
- "可能的关系列表": [...]
- }
- """
- analysis_nodes, relation_list = extract_analysis_nodes(post_graph, persona_graph)
- return {
- "帖子详情": extract_post_detail(post_graph),
- "待分析节点列表": analysis_nodes,
- "可能的关系列表": relation_list,
- }
- # ===== 第二步:AI分析 =====
- def build_context(data: Dict) -> Dict:
- """
- 构造AI分析的上下文
- Returns:
- {
- "all_points": [...], # 全部创意点(含详细信息)
- "candidates": [...], # 起点候选集(名称列表)
- "constants": [...], # 人设常量(名称列表)
- }
- """
- nodes = data.get("待分析节点列表", [])
- # 全部创意点(含详细信息)
- all_points = []
- for node in nodes:
- match_info = node.get("人设匹配")
- match_score = 0
- category_global_ratio = 0
- if match_info:
- match_score = match_info.get("匹配分数", 0)
- category_info = match_info.get("所属分类", {})
- if category_info:
- category_global_ratio = category_info.get("人设全局占比", 0)
- all_points.append({
- "名称": node["节点名称"],
- "分类": node.get("节点分类", ""),
- "维度": node.get("节点维度", ""),
- "描述": node.get("节点描述", ""),
- "人设匹配度": round(match_score, 2),
- "所属分类全局占比": round(category_global_ratio, 2),
- })
- # 起点候选集(灵感点 + 目的点)
- candidates = [
- node["节点名称"]
- for node in nodes
- if node["节点维度"] in ["灵感点", "目的点"]
- ]
- # 人设常量(匹配分数 > 0.8 且 全局占比 > 0.8)
- constants = []
- for node in nodes:
- match_info = node.get("人设匹配")
- if match_info:
- match_score = match_info.get("匹配分数", 0)
- match_node = match_info.get("匹配节点", {})
- global_ratio = match_node.get("人设全局占比", 0)
- if match_score > MATCH_SCORE_THRESHOLD and global_ratio > GLOBAL_RATIO_THRESHOLD:
- constants.append(node["节点名称"])
- return {
- "all_points": all_points,
- "candidates": candidates,
- "constants": constants,
- }
- def format_prompt(context: Dict) -> str:
- """
- 格式化为AI prompt
- """
- all_points = context["all_points"]
- candidates = context["candidates"]
- constants = context["constants"]
- # 格式化全部创意点为易读文本
- points_text = ""
- for p in all_points:
- points_text += f"- {p['名称']}\n"
- points_text += f" 维度: {p['维度']} | 分类: {p['分类']}\n"
- points_text += f" 描述: {p['描述']}\n"
- points_text += f" 人设匹配度: {p['人设匹配度']} | 所属分类全局占比: {p['所属分类全局占比']}\n"
- points_text += "\n"
- # 格式化起点候选集
- candidates_text = "、".join(candidates)
- # 格式化人设常量
- constants_text = "、".join(constants) if constants else "无"
- prompt = f"""# Role
- 你是小红书爆款内容的"逆向工程"专家。你的核心能力是透过内容的表象(视觉/形式),还原创作者最初的脑回路(动机/实质)。
- # Task
- 我提供一组笔记的【创意标签】和一个【起点候选集】。
- 请推理出哪些选项是真正的**创意起点**。
- # Input Data
- ## 全部创意点
- {points_text}
- ## 起点候选集
- {candidates_text}
- ## 来自人设的常量
- {constants_text}
- # 推理约束
- 1. 实质推形式,而不是形式推实质,除非形式是一切创意的起点
- 2. 因推果而不是果推因
- 3. 无法被其他项或人设推理出的点,即为起点
- # Output Format
- 请输出一个标准的 JSON 格式。
- - Key: 候选集中的词。
- - Value: 一个对象,包含:
- - `score`: 0.0 到 1.0 的浮点数(代表是起点的可能性)。
- - `analysis`: 一句话推理"""
- return prompt
- # ===== 显示函数 =====
- def display_context(context: Dict, post_id: str):
- """显示构造的上下文"""
- print(f"\n帖子: {post_id}")
- print(f"\n全部创意点 ({len(context['all_points'])} 个):")
- for p in context['all_points']:
- print(f" - {p['名称']} ({p['维度']}/{p['分类']}) 匹配度={p['人设匹配度']}, 分类占比={p['所属分类全局占比']}")
- print(f"\n起点候选集 ({len(context['candidates'])} 个):")
- print(f" {context['candidates']}")
- print(f"\n人设常量 ({len(context['constants'])} 个):")
- print(f" {context['constants']}")
- def display_result(result: Dict):
- """显示分析结果"""
- output = result.get("输出")
- if output:
- print("\n起点分析结果:")
- # 按score降序排列
- sorted_items = sorted(output.items(), key=lambda x: x[1].get("score", 0), reverse=True)
- for name, info in sorted_items:
- score = info.get("score", 0)
- analysis = info.get("analysis", "")
- marker = "★" if score >= 0.7 else "○"
- print(f" {marker} {name}: {score:.2f}")
- print(f" {analysis}")
- else:
- print(f" 分析失败: {result.get('错误', 'N/A')}")
- # ===== 处理函数 =====
- async def process_single_post(
- post_file: Path,
- persona_graph: Dict,
- config: PathConfig,
- current_time: str = None,
- log_url: str = None,
- force: bool = False,
- ) -> Dict:
- """
- 处理单个帖子(数据准备 + AI分析)
- """
- # 加载帖子图谱
- post_graph = load_json(post_file)
- post_id = post_graph.get("meta", {}).get("postId", "unknown")
- # 检查是否已处理
- if not force and is_already_processed(config, post_id):
- print(f"\n跳过帖子 {post_id}(已处理,使用 --force 强制重新分析)")
- # 返回已有结果
- result_file = get_result_file(config, post_id)
- return load_json(result_file)
- print(f"\n{'=' * 60}")
- print(f"处理帖子: {post_id}")
- print("-" * 60)
- # 第一步:准备数据
- data = prepare_analysis_data(post_graph, persona_graph)
- # 构造上下文
- context = build_context(data)
- display_context(context, post_id)
- # 格式化prompt
- prompt = format_prompt(context)
- # 第二步:调用AI
- print("\n调用AI分析中...")
- with custom_span(
- name=f"创作起点分析 - {post_id}",
- data={
- "帖子id": post_id,
- "候选数": len(context["candidates"]),
- "模型": MODEL_NAME
- }
- ):
- result = await Runner.run(agent, input=prompt)
- output_text = result.final_output
- # 解析JSON
- try:
- if "```json" in output_text:
- json_start = output_text.find("```json") + 7
- json_end = output_text.find("```", json_start)
- json_str = output_text[json_start:json_end].strip()
- elif "{" in output_text and "}" in output_text:
- json_start = output_text.find("{")
- json_end = output_text.rfind("}") + 1
- json_str = output_text[json_start:json_end]
- else:
- json_str = output_text
- analysis_result = json.loads(json_str)
- result_data = {
- "帖子id": post_id,
- "模型": MODEL_NAME,
- "输入": context,
- "输出": analysis_result
- }
- except Exception as e:
- result_data = {
- "帖子id": post_id,
- "模型": MODEL_NAME,
- "输入": context,
- "输出": None,
- "错误": str(e),
- "原始输出": output_text
- }
- # 显示结果
- display_result(result_data)
- # 保存结果
- output_dir = config.intermediate_dir / "origin_analysis_result"
- output_dir.mkdir(parents=True, exist_ok=True)
- output_with_meta = {
- "元数据": {
- "current_time": current_time,
- "log_url": log_url,
- "model": MODEL_NAME
- },
- **result_data
- }
- output_file = output_dir / f"{post_id}_起点分析.json"
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(output_with_meta, f, ensure_ascii=False, indent=2)
- print(f"\n已保存: {output_file.name}")
- return result_data
- # ===== 主函数 =====
- async def main(
- post_id: str = None,
- all_posts: bool = False,
- force: bool = False,
- ):
- """
- 主函数
- Args:
- post_id: 帖子ID,可选
- all_posts: 是否处理所有帖子
- force: 强制重新分析已处理的帖子
- """
- # 设置 trace
- current_time, log_url = set_trace()
- config = PathConfig()
- print(f"账号: {config.account_name}")
- print(f"使用模型: {MODEL_NAME}")
- print(f"Trace URL: {log_url}")
- # 加载人设图谱
- persona_graph_file = config.intermediate_dir / "人设图谱.json"
- if not persona_graph_file.exists():
- print(f"错误: 人设图谱文件不存在: {persona_graph_file}")
- return
- persona_graph = load_json(persona_graph_file)
- print(f"人设图谱节点数: {len(persona_graph.get('nodes', {}))}")
- # 获取帖子图谱文件
- post_graph_files = get_post_graph_files(config)
- if not post_graph_files:
- print("错误: 没有找到帖子图谱文件")
- return
- # 确定要处理的帖子
- if post_id:
- target_file = next(
- (f for f in post_graph_files if post_id in f.name),
- None
- )
- if not target_file:
- print(f"错误: 未找到帖子 {post_id}")
- return
- files_to_process = [target_file]
- elif all_posts:
- files_to_process = post_graph_files
- else:
- files_to_process = [post_graph_files[0]]
- print(f"待处理帖子数: {len(files_to_process)}")
- # 处理
- with trace("创作起点分析"):
- results = []
- skipped = 0
- for i, post_file in enumerate(files_to_process, 1):
- print(f"\n{'#' * 60}")
- print(f"# 处理帖子 {i}/{len(files_to_process)}")
- print(f"{'#' * 60}")
- result = await process_single_post(
- post_file=post_file,
- persona_graph=persona_graph,
- config=config,
- current_time=current_time,
- log_url=log_url,
- force=force,
- )
- # 检查是否是跳过的
- if not force and "元数据" in result:
- skipped += 1
- results.append(result)
- # 汇总
- print(f"\n{'#' * 60}")
- print(f"# 完成! 共处理 {len(results)} 个帖子 (跳过 {skipped} 个已处理)")
- print(f"{'#' * 60}")
- print(f"Trace: {log_url}")
- print("\n汇总(score >= 0.7 的起点):")
- for result in results:
- post_id = result.get("帖子id")
- output = result.get("输出")
- if output:
- origins = [f"{k}({v['score']:.2f})" for k, v in output.items() if v.get("score", 0) >= 0.7]
- print(f" {post_id}: {', '.join(origins) if origins else '无高置信起点'}")
- else:
- print(f" {post_id}: 分析失败")
- if __name__ == "__main__":
- import argparse
- parser = argparse.ArgumentParser(description="创作起点分析")
- parser.add_argument("--post-id", type=str, help="帖子ID")
- parser.add_argument("--all-posts", action="store_true", help="处理所有帖子")
- parser.add_argument("--force", action="store_true", help="强制重新分析已处理的帖子")
- args = parser.parse_args()
- asyncio.run(main(
- post_id=args.post_id,
- all_posts=args.all_posts,
- force=args.force,
- ))
|