|
|
@@ -0,0 +1,827 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+节点来源分析脚本 V4
|
|
|
+
|
|
|
+采用两步法:
|
|
|
+1. 第一步(筛选):筛选可能的来源特征
|
|
|
+2. 第二步(评估):对筛选出的特征进行可能性评估
|
|
|
+
|
|
|
+输入:post_graph 目录中的帖子图谱文件
|
|
|
+输出:节点来源分析结果
|
|
|
+"""
|
|
|
+
|
|
|
+import asyncio
|
|
|
+import json
|
|
|
+from pathlib import Path
|
|
|
+from typing import Dict, List
|
|
|
+import sys
|
|
|
+
|
|
|
+# 添加项目根目录到路径
|
|
|
+project_root = Path(__file__).parent.parent.parent
|
|
|
+sys.path.insert(0, str(project_root))
|
|
|
+
|
|
|
+from agents import Agent, Runner, ModelSettings, trace
|
|
|
+from agents.tracing.create import custom_span
|
|
|
+from lib.client import get_model
|
|
|
+from lib.my_trace import set_trace_smith as set_trace
|
|
|
+from script.data_processing.path_config import PathConfig
|
|
|
+
|
|
|
+# 模型配置
|
|
|
+MODEL_NAME = "google/gemini-3-pro-preview"
|
|
|
+# MODEL_NAME = 'deepseek/deepseek-v3.2'
|
|
|
+# MODEL_NAME = 'anthropic/claude-sonnet-4.5'
|
|
|
+
|
|
|
+# 第一步筛选 Agent
|
|
|
+filter_agent = Agent(
|
|
|
+ name="Feature Filter",
|
|
|
+ model=get_model(MODEL_NAME),
|
|
|
+ model_settings=ModelSettings(
|
|
|
+ temperature=0.0,
|
|
|
+ max_tokens=16384,
|
|
|
+ ),
|
|
|
+ tools=[],
|
|
|
+)
|
|
|
+
|
|
|
+# 第二步评估 Agent
|
|
|
+evaluate_agent = Agent(
|
|
|
+ name="Feature Evaluator",
|
|
|
+ model=get_model(MODEL_NAME),
|
|
|
+ model_settings=ModelSettings(
|
|
|
+ temperature=0.0,
|
|
|
+ max_tokens=32768,
|
|
|
+ ),
|
|
|
+ tools=[],
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+# ===== 数据提取函数 =====
|
|
|
+
|
|
|
+def get_post_graph_files(config: PathConfig) -> List[Path]:
|
|
|
+ """获取所有帖子图谱文件"""
|
|
|
+ post_graph_dir = config.intermediate_dir / "post_graph"
|
|
|
+ return sorted(post_graph_dir.glob("*_帖子图谱.json"))
|
|
|
+
|
|
|
+
|
|
|
+def load_post_graph(file_path: Path) -> Dict:
|
|
|
+ """加载帖子图谱"""
|
|
|
+ with open(file_path, "r", encoding="utf-8") as f:
|
|
|
+ return json.load(f)
|
|
|
+
|
|
|
+
|
|
|
+def extract_tags_from_post_graph(post_graph: Dict) -> List[Dict]:
|
|
|
+ """从帖子图谱中提取标签节点"""
|
|
|
+ tags = []
|
|
|
+ for node_id, node in post_graph.get("nodes", {}).items():
|
|
|
+ if node.get("type") == "标签" and node.get("domain") == "帖子":
|
|
|
+ tags.append({
|
|
|
+ "id": node_id,
|
|
|
+ "name": node.get("name", ""),
|
|
|
+ "dimension": node.get("dimension", ""),
|
|
|
+ })
|
|
|
+ return tags
|
|
|
+
|
|
|
+
|
|
|
+def prepare_analyze_input(post_graph: Dict, target_name: str = None) -> Dict:
|
|
|
+ """准备分析输入数据"""
|
|
|
+ tags = extract_tags_from_post_graph(post_graph)
|
|
|
+
|
|
|
+ if not tags:
|
|
|
+ raise ValueError("帖子图谱中没有找到标签节点")
|
|
|
+
|
|
|
+ # 确定目标节点
|
|
|
+ if target_name:
|
|
|
+ target_tag = next((t for t in tags if t["name"] == target_name), None)
|
|
|
+ if not target_tag:
|
|
|
+ raise ValueError(f"未找到目标节点: {target_name}")
|
|
|
+ else:
|
|
|
+ key_point_tags = [t for t in tags if t["dimension"] == "关键点"]
|
|
|
+ if not key_point_tags:
|
|
|
+ raise ValueError("没有找到关键点标签")
|
|
|
+ target_tag = key_point_tags[0]
|
|
|
+
|
|
|
+ # 候选节点筛选:灵感点/目的点的候选集排除关键点
|
|
|
+ target_dimension = target_tag["dimension"]
|
|
|
+ candidate_tags = []
|
|
|
+ for t in tags:
|
|
|
+ if t["name"] == target_tag["name"]:
|
|
|
+ continue
|
|
|
+ if target_dimension in ["灵感点", "目的点"] and t["dimension"] == "关键点":
|
|
|
+ continue
|
|
|
+ candidate_tags.append(t)
|
|
|
+
|
|
|
+ return {
|
|
|
+ "目标特征": {
|
|
|
+ "特征名称": target_tag["name"],
|
|
|
+ "特征类型": target_tag["dimension"]
|
|
|
+ },
|
|
|
+ "候选特征": [
|
|
|
+ {
|
|
|
+ "特征名称": t["name"],
|
|
|
+ "特征类型": t["dimension"]
|
|
|
+ }
|
|
|
+ for t in candidate_tags
|
|
|
+ ]
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+# ===== Prompt 构建 =====
|
|
|
+
|
|
|
+def build_filter_prompt(input_data: Dict) -> str:
|
|
|
+ """构建第一步筛选 prompt"""
|
|
|
+ target = input_data["目标特征"]
|
|
|
+ candidates = input_data["候选特征"]
|
|
|
+
|
|
|
+ # 构建候选特征列表
|
|
|
+ candidates_text = []
|
|
|
+ for c in candidates:
|
|
|
+ candidates_text.append(f"- {c['特征名称']} ({c['特征类型']})")
|
|
|
+ candidates_section = "\n".join(candidates_text)
|
|
|
+
|
|
|
+ return f'''# 背景
|
|
|
+推理一个小红书帖子选题前脑海中的点,在创作者脑中的因果顺序
|
|
|
+
|
|
|
+# Task
|
|
|
+请分析【输入数据】与【目标点】的关系,按以下两类筛选证据:
|
|
|
+1. **单独推理**:哪个特征单凭自己就能有可能指向目标特征?
|
|
|
+2. **组合推理**:哪几个特征必须结合在一起,才能指向目标特征?(缺一不可才算组合)
|
|
|
+
|
|
|
+如果能独立推出则无需组合。
|
|
|
+
|
|
|
+# 筛选原则
|
|
|
+1. 实质推形式,而不是形式推实质
|
|
|
+2. 因推果而不是果推因
|
|
|
+3. 目的推理手段而不是手段推理目的
|
|
|
+4. 只有当 A 是 B 的充分必要条件的时候,A 可以推理出 B
|
|
|
+
|
|
|
+**本次分析的目标特征是:{target['特征名称']}({target['特征类型']})**
|
|
|
+
|
|
|
+# 输入数据
|
|
|
+{candidates_section}
|
|
|
+
|
|
|
+# 输出格式
|
|
|
+请严格按照以下 JSON 结构输出:
|
|
|
+
|
|
|
+```json
|
|
|
+{{
|
|
|
+ "目标特征": "{target['特征名称']}",
|
|
|
+ "预备分析列表": {{
|
|
|
+ "单独推理": [
|
|
|
+ {{
|
|
|
+ "来源特征": "特征A",
|
|
|
+ "来源特征类型": "灵感点/目的点/关键点",
|
|
|
+ "初步理由": "简要说明为什么这个特征可能推导出目标"
|
|
|
+ }}
|
|
|
+ ],
|
|
|
+ "组合推理": [
|
|
|
+ {{
|
|
|
+ "组合成员": ["特征B", "特征C"],
|
|
|
+ "成员类型": ["目的点", "关键点"],
|
|
|
+ "初步理由": "简要说明为什么这些特征需要组合才能推导出目标"
|
|
|
+ }}
|
|
|
+ ]
|
|
|
+ }}
|
|
|
+}}
|
|
|
+```
|
|
|
+
|
|
|
+注意:
|
|
|
+- 单独推理的来源特征必须是输入数据中的原话
|
|
|
+- 组合推理的成员数量通常为 2-3 个
|
|
|
+- 如果某个特征完全无法推导出目标,不要勉强添加
|
|
|
+'''.strip()
|
|
|
+
|
|
|
+
|
|
|
+def build_evaluate_prompt(input_data: Dict, filter_result: Dict) -> str:
|
|
|
+ """构建第二步评估 prompt"""
|
|
|
+ target = input_data["目标特征"]
|
|
|
+ prep_list = filter_result.get("预备分析列表", {})
|
|
|
+
|
|
|
+ # 构建单独推理列表
|
|
|
+ single_items = prep_list.get("单独推理", [])
|
|
|
+ single_text = ""
|
|
|
+ if single_items:
|
|
|
+ for item in single_items:
|
|
|
+ single_text += f"- {item.get('来源特征', '')}({item.get('来源特征类型', '')})\n"
|
|
|
+ else:
|
|
|
+ single_text = "(无)\n"
|
|
|
+
|
|
|
+ # 构建组合推理列表
|
|
|
+ combo_items = prep_list.get("组合推理", [])
|
|
|
+ combo_text = ""
|
|
|
+ if combo_items:
|
|
|
+ for item in combo_items:
|
|
|
+ members = " + ".join(item.get("组合成员", []))
|
|
|
+ combo_text += f"- {members}\n"
|
|
|
+ else:
|
|
|
+ combo_text = "(无)\n"
|
|
|
+
|
|
|
+ return f'''# 背景
|
|
|
+推理一个小红书帖子选题前的点,在创作者脑中的因果顺序
|
|
|
+
|
|
|
+# Task
|
|
|
+请判断以下筛选出的特征推理出【{target['特征名称']}】的可能性
|
|
|
+
|
|
|
+## 待评估的单独推理特征:
|
|
|
+{single_text}
|
|
|
+## 待评估的组合推理特征:
|
|
|
+{combo_text}
|
|
|
+# 推理约束
|
|
|
+1. 实质推形式,而不是形式推实质
|
|
|
+2. 因推果而不是果推因
|
|
|
+3. 目的推理手段而不是手段推理目的
|
|
|
+4. 只有当 A 是 B 的充分必要条件的时候,A 可以推理出 B
|
|
|
+
|
|
|
+# 评分标准
|
|
|
+| 分数范围 | 等级 | 说明 |
|
|
|
+|---------|------|------|
|
|
|
+| 0.80 - 1.00 | 逻辑必然 | A 是 B 的充分必要条件,必然推导 |
|
|
|
+| 0.50 - 0.79 | 高可能性 | A 高度倾向于推导出 B,但非唯一选择 |
|
|
|
+| 0.20 - 0.49 | 创意偏好 | A 可以推导出 B,但其他选择同样可行 |
|
|
|
+| 0.00 - 0.19 | 弱关联 | A 与 B 关联性很弱,不建议保留 |
|
|
|
+
|
|
|
+# 输出格式
|
|
|
+请严格按照以下 JSON 结构输出:
|
|
|
+
|
|
|
+```json
|
|
|
+{{
|
|
|
+ "目标关键特征": "{target['特征名称']}",
|
|
|
+ "推理分析": {{
|
|
|
+ "单独推理": [
|
|
|
+ {{
|
|
|
+ "来源特征": "特征A",
|
|
|
+ "来源特征类型": "灵感点/目的点/关键点",
|
|
|
+ "可能性": 0.xx,
|
|
|
+ "结论": "详细说明推导逻辑..."
|
|
|
+ }}
|
|
|
+ ],
|
|
|
+ "组合推理": [
|
|
|
+ {{
|
|
|
+ "组合成员": ["特征B", "特征C"],
|
|
|
+ "成员类型": ["目的点", "关键点"],
|
|
|
+ "可能性": 0.xx,
|
|
|
+ "结论": "详细说明组合推导逻辑..."
|
|
|
+ }}
|
|
|
+ ]
|
|
|
+ }}
|
|
|
+}}
|
|
|
+```
|
|
|
+
|
|
|
+注意:
|
|
|
+- 如果某个特征经评估后可能性低于 0.2,可以标注但建议说明原因
|
|
|
+- 结论要清晰说明推导逻辑,避免空洞表述
|
|
|
+'''.strip()
|
|
|
+
|
|
|
+
|
|
|
+# ===== 主分析函数 =====
|
|
|
+
|
|
|
+async def analyze_node_origin(
|
|
|
+ post_id: str = None,
|
|
|
+ target_name: str = None,
|
|
|
+ config: PathConfig = None
|
|
|
+) -> Dict:
|
|
|
+ """分析目标节点可能由哪些候选节点推导而来(两步法)"""
|
|
|
+ if config is None:
|
|
|
+ config = PathConfig()
|
|
|
+
|
|
|
+ # 获取帖子图谱文件
|
|
|
+ post_graph_files = get_post_graph_files(config)
|
|
|
+ if not post_graph_files:
|
|
|
+ raise ValueError("没有找到帖子图谱文件")
|
|
|
+
|
|
|
+ # 选择帖子
|
|
|
+ if post_id:
|
|
|
+ target_file = next(
|
|
|
+ (f for f in post_graph_files if post_id in f.name),
|
|
|
+ None
|
|
|
+ )
|
|
|
+ if not target_file:
|
|
|
+ raise ValueError(f"未找到帖子: {post_id}")
|
|
|
+ else:
|
|
|
+ target_file = post_graph_files[0]
|
|
|
+
|
|
|
+ # 加载帖子图谱
|
|
|
+ post_graph = load_post_graph(target_file)
|
|
|
+ actual_post_id = post_graph.get("meta", {}).get("postId", "unknown")
|
|
|
+
|
|
|
+ # 准备输入数据
|
|
|
+ input_data = prepare_analyze_input(post_graph, target_name)
|
|
|
+ actual_target_name = input_data["目标特征"]["特征名称"]
|
|
|
+
|
|
|
+ print(f"帖子ID: {actual_post_id}")
|
|
|
+ print(f"目标特征: {actual_target_name}")
|
|
|
+ print(f"候选特征数: {len(input_data['候选特征'])}")
|
|
|
+
|
|
|
+ # ===== 第一步:筛选 =====
|
|
|
+ filter_prompt = build_filter_prompt(input_data)
|
|
|
+
|
|
|
+ with custom_span(
|
|
|
+ name=f"Step1 筛选 - {actual_target_name}",
|
|
|
+ data={"目标特征": actual_target_name}
|
|
|
+ ):
|
|
|
+ filter_result_raw = await Runner.run(filter_agent, input=filter_prompt)
|
|
|
+ filter_output = filter_result_raw.final_output
|
|
|
+
|
|
|
+ # 解析筛选结果
|
|
|
+ try:
|
|
|
+ filter_result = parse_json_output(filter_output)
|
|
|
+ except Exception as e:
|
|
|
+ return {
|
|
|
+ "帖子id": actual_post_id,
|
|
|
+ "目标节点": actual_target_name,
|
|
|
+ "模型": MODEL_NAME,
|
|
|
+ "输入": input_data,
|
|
|
+ "输出": None,
|
|
|
+ "错误": f"筛选步骤解析失败: {str(e)}",
|
|
|
+ "原始输出_筛选": filter_output
|
|
|
+ }
|
|
|
+
|
|
|
+ # 检查是否有可评估的特征
|
|
|
+ prep_list = filter_result.get("预备分析列表", {})
|
|
|
+ single_count = len(prep_list.get("单独推理", []))
|
|
|
+ combo_count = len(prep_list.get("组合推理", []))
|
|
|
+
|
|
|
+ if single_count == 0 and combo_count == 0:
|
|
|
+ return {
|
|
|
+ "帖子id": actual_post_id,
|
|
|
+ "目标节点": actual_target_name,
|
|
|
+ "模型": MODEL_NAME,
|
|
|
+ "输入": input_data,
|
|
|
+ "筛选结果": filter_result,
|
|
|
+ "输出": {
|
|
|
+ "目标关键特征": actual_target_name,
|
|
|
+ "推理分析": {
|
|
|
+ "单独推理": [],
|
|
|
+ "组合推理": []
|
|
|
+ }
|
|
|
+ },
|
|
|
+ "说明": "筛选步骤未找到可推导的特征"
|
|
|
+ }
|
|
|
+
|
|
|
+ print(f" 筛选结果: 单独推理 {single_count} 个, 组合推理 {combo_count} 个")
|
|
|
+
|
|
|
+ # ===== 第二步:评估 =====
|
|
|
+ evaluate_prompt = build_evaluate_prompt(input_data, filter_result)
|
|
|
+
|
|
|
+ with custom_span(
|
|
|
+ name=f"Step2 评估 - {actual_target_name}",
|
|
|
+ data={"单独推理数": single_count, "组合推理数": combo_count}
|
|
|
+ ):
|
|
|
+ evaluate_result_raw = await Runner.run(evaluate_agent, input=evaluate_prompt)
|
|
|
+ evaluate_output = evaluate_result_raw.final_output
|
|
|
+
|
|
|
+ # 解析评估结果
|
|
|
+ try:
|
|
|
+ evaluate_result = parse_json_output(evaluate_output)
|
|
|
+ except Exception as e:
|
|
|
+ return {
|
|
|
+ "帖子id": actual_post_id,
|
|
|
+ "目标节点": actual_target_name,
|
|
|
+ "模型": MODEL_NAME,
|
|
|
+ "输入": input_data,
|
|
|
+ "筛选结果": filter_result,
|
|
|
+ "输出": None,
|
|
|
+ "错误": f"评估步骤解析失败: {str(e)}",
|
|
|
+ "原始输出_评估": evaluate_output
|
|
|
+ }
|
|
|
+
|
|
|
+ return {
|
|
|
+ "帖子id": actual_post_id,
|
|
|
+ "目标节点": actual_target_name,
|
|
|
+ "模型": MODEL_NAME,
|
|
|
+ "输入": input_data,
|
|
|
+ "筛选结果": filter_result,
|
|
|
+ "输出": evaluate_result
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def parse_json_output(output: str) -> Dict:
|
|
|
+ """解析 JSON 输出"""
|
|
|
+ if "```json" in output:
|
|
|
+ json_start = output.find("```json") + 7
|
|
|
+ json_end = output.find("```", json_start)
|
|
|
+ json_str = output[json_start:json_end].strip()
|
|
|
+ elif "{" in output and "}" in output:
|
|
|
+ json_start = output.find("{")
|
|
|
+ json_end = output.rfind("}") + 1
|
|
|
+ json_str = output[json_start:json_end]
|
|
|
+ else:
|
|
|
+ json_str = output
|
|
|
+
|
|
|
+ return json.loads(json_str)
|
|
|
+
|
|
|
+
|
|
|
+# ===== 图谱构建函数 =====
|
|
|
+
|
|
|
+def build_origin_graph(all_results: List[Dict], post_id: str) -> Dict:
|
|
|
+ """将分析结果转换为图谱格式"""
|
|
|
+ nodes = {}
|
|
|
+ edges = {}
|
|
|
+
|
|
|
+ for result in all_results:
|
|
|
+ target_input = result.get("输入", {})
|
|
|
+
|
|
|
+ # 添加目标节点
|
|
|
+ target_info = target_input.get("目标特征", {})
|
|
|
+ target_name = target_info.get("特征名称", "")
|
|
|
+ target_type = target_info.get("特征类型", "关键点")
|
|
|
+ node_id = f"帖子:{target_type}:标签:{target_name}"
|
|
|
+ if node_id not in nodes:
|
|
|
+ nodes[node_id] = {
|
|
|
+ "name": target_name,
|
|
|
+ "type": "标签",
|
|
|
+ "dimension": target_type,
|
|
|
+ "domain": "帖子",
|
|
|
+ "detail": {}
|
|
|
+ }
|
|
|
+
|
|
|
+ # 添加候选特征节点
|
|
|
+ for candidate in target_input.get("候选特征", []):
|
|
|
+ c_name = candidate.get("特征名称", "")
|
|
|
+ c_type = candidate.get("特征类型", "关键点")
|
|
|
+ c_node_id = f"帖子:{c_type}:标签:{c_name}"
|
|
|
+ if c_node_id not in nodes:
|
|
|
+ nodes[c_node_id] = {
|
|
|
+ "name": c_name,
|
|
|
+ "type": "标签",
|
|
|
+ "dimension": c_type,
|
|
|
+ "domain": "帖子",
|
|
|
+ "detail": {}
|
|
|
+ }
|
|
|
+
|
|
|
+ # 构建推导边
|
|
|
+ for result in all_results:
|
|
|
+ target_name = result.get("目标特征", "")
|
|
|
+ target_input = result.get("输入", {})
|
|
|
+ target_info = target_input.get("目标特征", {})
|
|
|
+ target_type = target_info.get("特征类型", "关键点")
|
|
|
+ target_node_id = f"帖子:{target_type}:标签:{target_name}"
|
|
|
+
|
|
|
+ # V4 的推理分析在顶层,不是在 输出 下面
|
|
|
+ reasoning = result.get("推理分析", {})
|
|
|
+
|
|
|
+ # 单独推理的边
|
|
|
+ for item in reasoning.get("单独推理", []):
|
|
|
+ source_name = item.get("来源特征", "")
|
|
|
+ source_type = item.get("来源特征类型", "关键点")
|
|
|
+ source_node_id = f"帖子:{source_type}:标签:{source_name}"
|
|
|
+ probability = item.get("可能性", 0)
|
|
|
+
|
|
|
+ edge_id = f"{source_node_id}|推导|{target_node_id}"
|
|
|
+ edges[edge_id] = {
|
|
|
+ "source": source_node_id,
|
|
|
+ "target": target_node_id,
|
|
|
+ "type": "推导",
|
|
|
+ "score": probability,
|
|
|
+ "detail": {
|
|
|
+ "推理类型": "单独推理",
|
|
|
+ "结论": item.get("结论", "")
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ # 组合推理的边
|
|
|
+ for item in reasoning.get("组合推理", []):
|
|
|
+ members = item.get("组合成员", [])
|
|
|
+ member_types = item.get("成员类型", [])
|
|
|
+ probability = item.get("可能性", 0)
|
|
|
+
|
|
|
+ # 创建组合虚拟节点
|
|
|
+ member_pairs = list(zip(members, member_types)) if len(member_types) == len(members) else [(m, "关键点") for m in members]
|
|
|
+ sorted_pairs = sorted(member_pairs, key=lambda x: x[0])
|
|
|
+ sorted_members = [p[0] for p in sorted_pairs]
|
|
|
+ sorted_types = [p[1] for p in sorted_pairs]
|
|
|
+
|
|
|
+ combo_parts = [f"{sorted_types[i]}:{m}" for i, m in enumerate(sorted_members)]
|
|
|
+ combo_name = " + ".join(combo_parts)
|
|
|
+ combo_node_id = f"帖子:组合:组合:{combo_name}"
|
|
|
+ if combo_node_id not in nodes:
|
|
|
+ nodes[combo_node_id] = {
|
|
|
+ "name": combo_name,
|
|
|
+ "type": "组合",
|
|
|
+ "dimension": "组合",
|
|
|
+ "domain": "帖子",
|
|
|
+ "detail": {
|
|
|
+ "成员": sorted_members,
|
|
|
+ "成员类型": sorted_types
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ # 组合节点到目标的边
|
|
|
+ edge_id = f"{combo_node_id}|推导|{target_node_id}"
|
|
|
+ edges[edge_id] = {
|
|
|
+ "source": combo_node_id,
|
|
|
+ "target": target_node_id,
|
|
|
+ "type": "推导",
|
|
|
+ "score": probability,
|
|
|
+ "detail": {
|
|
|
+ "推理类型": "组合推理",
|
|
|
+ "结论": item.get("结论", "")
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ # 成员到组合节点的边
|
|
|
+ for i, member in enumerate(sorted_members):
|
|
|
+ m_type = sorted_types[i]
|
|
|
+ m_node_id = f"帖子:{m_type}:标签:{member}"
|
|
|
+ m_edge_id = f"{m_node_id}|组成|{combo_node_id}"
|
|
|
+ if m_edge_id not in edges:
|
|
|
+ edges[m_edge_id] = {
|
|
|
+ "source": m_node_id,
|
|
|
+ "target": combo_node_id,
|
|
|
+ "type": "组成",
|
|
|
+ "score": 1.0,
|
|
|
+ "detail": {}
|
|
|
+ }
|
|
|
+
|
|
|
+ return {
|
|
|
+ "meta": {
|
|
|
+ "postId": post_id,
|
|
|
+ "type": "推导图谱",
|
|
|
+ "version": "v4",
|
|
|
+ "stats": {
|
|
|
+ "nodeCount": len(nodes),
|
|
|
+ "edgeCount": len(edges)
|
|
|
+ }
|
|
|
+ },
|
|
|
+ "nodes": nodes,
|
|
|
+ "edges": edges
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+# ===== 辅助函数 =====
|
|
|
+
|
|
|
+def get_all_target_names(post_graph: Dict, dimensions: List[str] = None) -> List[str]:
|
|
|
+ """获取所有可作为目标的特征名称"""
|
|
|
+ if dimensions is None:
|
|
|
+ dimensions = ["灵感点", "目的点", "关键点"]
|
|
|
+ tags = extract_tags_from_post_graph(post_graph)
|
|
|
+ return [t["name"] for t in tags if t["dimension"] in dimensions]
|
|
|
+
|
|
|
+
|
|
|
+def get_score_level(score: float) -> str:
|
|
|
+ """根据分数返回等级"""
|
|
|
+ if score >= 0.80:
|
|
|
+ return "逻辑必然"
|
|
|
+ elif score >= 0.50:
|
|
|
+ return "高可能性"
|
|
|
+ elif score >= 0.20:
|
|
|
+ return "创意偏好"
|
|
|
+ else:
|
|
|
+ return "弱关联"
|
|
|
+
|
|
|
+
|
|
|
+def display_result(result: Dict):
|
|
|
+ """显示单个分析结果"""
|
|
|
+ output = result.get("输出")
|
|
|
+ if output:
|
|
|
+ print(f"\n目标关键特征: {output.get('目标关键特征', 'N/A')}")
|
|
|
+
|
|
|
+ reasoning = output.get("推理分析", {})
|
|
|
+
|
|
|
+ # 显示单独推理
|
|
|
+ single = reasoning.get("单独推理", [])
|
|
|
+ if single:
|
|
|
+ print(" 【单独推理】")
|
|
|
+ for item in single[:5]:
|
|
|
+ score = item.get("可能性", 0)
|
|
|
+ level = get_score_level(score)
|
|
|
+ print(f" [{score:.2f} {level}] {item.get('来源特征', '')}")
|
|
|
+
|
|
|
+ # 显示组合推理
|
|
|
+ combo = reasoning.get("组合推理", [])
|
|
|
+ if combo:
|
|
|
+ print(" 【组合推理】")
|
|
|
+ for item in combo[:3]:
|
|
|
+ members = " + ".join(item.get("组合成员", []))
|
|
|
+ score = item.get("可能性", 0)
|
|
|
+ level = get_score_level(score)
|
|
|
+ print(f" [{score:.2f} {level}] {members}")
|
|
|
+ else:
|
|
|
+ error = result.get("错误", "")
|
|
|
+ if error:
|
|
|
+ print(f" 分析失败: {error}")
|
|
|
+ else:
|
|
|
+ print(f" {result.get('说明', '无结果')}")
|
|
|
+
|
|
|
+
|
|
|
+# ===== 单帖子处理函数 =====
|
|
|
+
|
|
|
+async def process_single_post(
|
|
|
+ post_file: Path,
|
|
|
+ config: PathConfig,
|
|
|
+ target_name: str = None,
|
|
|
+ num_targets: int = 999,
|
|
|
+ dimensions: List[str] = None
|
|
|
+):
|
|
|
+ """处理单个帖子"""
|
|
|
+ if dimensions is None:
|
|
|
+ dimensions = ["灵感点", "目的点", "关键点"]
|
|
|
+
|
|
|
+ # 为每个帖子生成独立的 trace
|
|
|
+ current_time, log_url = set_trace()
|
|
|
+
|
|
|
+ # 加载帖子图谱
|
|
|
+ post_graph = load_post_graph(post_file)
|
|
|
+ actual_post_id = post_graph.get("meta", {}).get("postId", "unknown")
|
|
|
+
|
|
|
+ print(f"\n{'=' * 60}")
|
|
|
+ print(f"帖子ID: {actual_post_id}")
|
|
|
+ print(f"Trace URL: {log_url}")
|
|
|
+
|
|
|
+ # 确定要分析的目标特征列表
|
|
|
+ if target_name:
|
|
|
+ target_names = [target_name]
|
|
|
+ else:
|
|
|
+ all_targets = get_all_target_names(post_graph, dimensions)
|
|
|
+ target_names = all_targets[:num_targets]
|
|
|
+
|
|
|
+ print(f"待分析目标特征: {target_names}")
|
|
|
+ print("-" * 60)
|
|
|
+
|
|
|
+ # 输出目录
|
|
|
+ output_dir = config.intermediate_dir / "node_origin_analysis"
|
|
|
+ output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+
|
|
|
+ # 使用 trace 上下文包裹单个帖子的分析
|
|
|
+ with trace(f"节点来源分析 V4 - {actual_post_id}"):
|
|
|
+ # 并发分析所有目标特征
|
|
|
+ async def analyze_single(name: str, index: int):
|
|
|
+ print(f"\n[{index}/{len(target_names)}] 开始分析: {name}")
|
|
|
+ result = await analyze_node_origin(
|
|
|
+ post_id=actual_post_id,
|
|
|
+ target_name=name,
|
|
|
+ config=config
|
|
|
+ )
|
|
|
+ print(f"[{index}/{len(target_names)}] 完成: {name}")
|
|
|
+ display_result(result)
|
|
|
+
|
|
|
+ output = result.get("输出", {})
|
|
|
+ return {
|
|
|
+ "目标特征": result.get("目标节点"),
|
|
|
+ "筛选结果": result.get("筛选结果"),
|
|
|
+ "推理分析": output.get("推理分析", {}) if output else {},
|
|
|
+ "输入": result.get("输入"),
|
|
|
+ "错误": result.get("错误"),
|
|
|
+ "说明": result.get("说明")
|
|
|
+ }
|
|
|
+
|
|
|
+ # 创建并发任务
|
|
|
+ tasks = [
|
|
|
+ analyze_single(name, i)
|
|
|
+ for i, name in enumerate(target_names, 1)
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 并发执行
|
|
|
+ all_results = await asyncio.gather(*tasks)
|
|
|
+
|
|
|
+ # 合并保存到一个文件
|
|
|
+ merged_output = {
|
|
|
+ "元数据": {
|
|
|
+ "current_time": current_time,
|
|
|
+ "log_url": log_url,
|
|
|
+ "model": MODEL_NAME,
|
|
|
+ "version": "v4"
|
|
|
+ },
|
|
|
+ "帖子id": actual_post_id,
|
|
|
+ "分析结果列表": all_results
|
|
|
+ }
|
|
|
+
|
|
|
+ output_file = output_dir / f"{actual_post_id}_来源分析_v4.json"
|
|
|
+ with open(output_file, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(merged_output, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ # 生成推导关系图谱
|
|
|
+ graph_output = build_origin_graph(all_results, actual_post_id)
|
|
|
+ graph_file = output_dir / f"{actual_post_id}_推导图谱_v4.json"
|
|
|
+ with open(graph_file, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(graph_output, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ print(f"\n完成! 共分析 {len(target_names)} 个目标特征")
|
|
|
+ print(f"分析结果: {output_file}")
|
|
|
+ print(f"推导图谱: {graph_file}")
|
|
|
+ print(f"Trace: {log_url}")
|
|
|
+
|
|
|
+ return actual_post_id
|
|
|
+
|
|
|
+
|
|
|
+# ===== 主函数 =====
|
|
|
+
|
|
|
+async def main(
|
|
|
+ post_id: str = None,
|
|
|
+ target_name: str = None,
|
|
|
+ num_targets: int = 999,
|
|
|
+ dimensions: List[str] = None,
|
|
|
+ all_posts: bool = False
|
|
|
+):
|
|
|
+ """主函数"""
|
|
|
+ if dimensions is None:
|
|
|
+ dimensions = ["灵感点", "目的点", "关键点"]
|
|
|
+
|
|
|
+ config = PathConfig()
|
|
|
+
|
|
|
+ print(f"账号: {config.account_name}")
|
|
|
+ print(f"使用模型: {MODEL_NAME}")
|
|
|
+ print(f"分析维度: {dimensions}")
|
|
|
+ print(f"版本: V4 (两步法: 筛选 + 评估)")
|
|
|
+
|
|
|
+ # 获取帖子图谱文件
|
|
|
+ post_graph_files = get_post_graph_files(config)
|
|
|
+ if not post_graph_files:
|
|
|
+ print("错误: 没有找到帖子图谱文件")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 确定要处理的帖子列表
|
|
|
+ if post_id:
|
|
|
+ target_file = next(
|
|
|
+ (f for f in post_graph_files if post_id in f.name),
|
|
|
+ None
|
|
|
+ )
|
|
|
+ if not target_file:
|
|
|
+ print(f"错误: 未找到帖子 {post_id}")
|
|
|
+ return
|
|
|
+ files_to_process = [target_file]
|
|
|
+ elif all_posts:
|
|
|
+ files_to_process = post_graph_files
|
|
|
+ else:
|
|
|
+ files_to_process = [post_graph_files[0]]
|
|
|
+
|
|
|
+ print(f"待处理帖子数: {len(files_to_process)}")
|
|
|
+
|
|
|
+ # 逐个处理帖子
|
|
|
+ processed_posts = []
|
|
|
+ for i, post_file in enumerate(files_to_process, 1):
|
|
|
+ print(f"\n{'#' * 60}")
|
|
|
+ print(f"# 处理帖子 {i}/{len(files_to_process)}")
|
|
|
+ print(f"{'#' * 60}")
|
|
|
+
|
|
|
+ post_id_result = await process_single_post(
|
|
|
+ post_file=post_file,
|
|
|
+ config=config,
|
|
|
+ target_name=target_name,
|
|
|
+ num_targets=num_targets,
|
|
|
+ dimensions=dimensions
|
|
|
+ )
|
|
|
+ processed_posts.append(post_id_result)
|
|
|
+
|
|
|
+ print(f"\n{'#' * 60}")
|
|
|
+ print(f"# 全部完成! 共处理 {len(processed_posts)} 个帖子")
|
|
|
+ print(f"{'#' * 60}")
|
|
|
+
|
|
|
+
|
|
|
+def rebuild_graph_from_file(analysis_file: Path) -> None:
|
|
|
+ """从已有的分析结果文件重建图谱"""
|
|
|
+ with open(analysis_file, "r", encoding="utf-8") as f:
|
|
|
+ data = json.load(f)
|
|
|
+
|
|
|
+ post_id = data.get("帖子id", "unknown")
|
|
|
+ all_results = data.get("分析结果列表", [])
|
|
|
+
|
|
|
+ print(f"从分析文件重建图谱: {analysis_file.name}")
|
|
|
+ print(f"帖子ID: {post_id}")
|
|
|
+ print(f"分析结果数: {len(all_results)}")
|
|
|
+
|
|
|
+ # 构建图谱
|
|
|
+ graph_output = build_origin_graph(all_results, post_id)
|
|
|
+
|
|
|
+ # 保存图谱
|
|
|
+ graph_file = analysis_file.parent / f"{post_id}_推导图谱_v4.json"
|
|
|
+ with open(graph_file, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(graph_output, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ print(f"图谱已保存: {graph_file}")
|
|
|
+ print(f"节点数: {graph_output['meta']['stats']['nodeCount']}")
|
|
|
+ print(f"边数: {graph_output['meta']['stats']['edgeCount']}")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ import argparse
|
|
|
+
|
|
|
+ parser = argparse.ArgumentParser(description="分析节点来源 (V4 两步法)")
|
|
|
+ parser.add_argument("--post-id", type=str, help="帖子ID(指定则只处理该帖子)")
|
|
|
+ parser.add_argument("--target", type=str, help="目标节点名称(指定则只分析这一个特征)")
|
|
|
+ parser.add_argument("--num", type=int, default=999, help="要分析的目标特征数量")
|
|
|
+ parser.add_argument("--dims", type=str, nargs="+",
|
|
|
+ choices=["灵感点", "目的点", "关键点"],
|
|
|
+ help="指定要分析的维度(默认全部)")
|
|
|
+ parser.add_argument("--all-posts", action="store_true", help="处理所有帖子")
|
|
|
+ parser.add_argument("--rebuild-graph", type=str, metavar="FILE",
|
|
|
+ help="从已有分析文件重建图谱(不重新分析)")
|
|
|
+ args = parser.parse_args()
|
|
|
+
|
|
|
+ # 如果指定了 --rebuild-graph,只重建图谱
|
|
|
+ if args.rebuild_graph:
|
|
|
+ rebuild_graph_from_file(Path(args.rebuild_graph))
|
|
|
+ else:
|
|
|
+ # 确定维度(默认所有维度)
|
|
|
+ if args.dims:
|
|
|
+ dimensions = args.dims
|
|
|
+ else:
|
|
|
+ dimensions = ["灵感点", "目的点", "关键点"]
|
|
|
+
|
|
|
+ # 运行主函数
|
|
|
+ asyncio.run(main(
|
|
|
+ post_id=args.post_id,
|
|
|
+ target_name=args.target,
|
|
|
+ num_targets=args.num,
|
|
|
+ dimensions=dimensions,
|
|
|
+ all_posts=args.all_posts
|
|
|
+ ))
|