Bladeren bron

feat: 添加数据分析脚本并更新默认账号配置

- 添加 analyze_first_step.py 和 analyze_first_step_v2.py 用于首步分析
- 添加 filter_how_results.py 用于筛选 how 结果
- 更新默认账号为 阿里多多酱

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
yangxiaohui 1 week geleden
bovenliggende
commit
e69fe2671c

+ 1 - 1
config/accounts.json

@@ -32,7 +32,7 @@
       "description": "未启用的示例账号"
     }
   ],
-  "default_account": "阿里多多酱3",
+  "default_account": "阿里多多酱",
   "comment": "数据根目录可通过 data_root 配置(支持绝对路径、~、环境变量),也可通过 DATA_ROOT 环境变量覆盖",
   "filter_mode": "exclude_current_posts",
   "filter_modes": {

+ 346 - 0
script/data_processing/analyze_first_step.py

@@ -0,0 +1,346 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+第一步分析脚本
+
+基于过滤后的 how 解构结果,分析哪些点最有可能是创作者的第一步(创作起点)。
+
+输入:intermediate/filtered_results/ 中的过滤结果
+输出:第一步分析结果
+"""
+
+import asyncio
+import json
+from pathlib import Path
+from typing import Dict, List
+import sys
+
+# 添加项目根目录到路径
+project_root = Path(__file__).parent.parent.parent
+sys.path.insert(0, str(project_root))
+
+from agents import Agent, Runner, ModelSettings, trace
+from agents.tracing.create import custom_span
+from lib.client import get_model
+from lib.my_trace import set_trace_smith as set_trace
+from script.data_processing.path_config import PathConfig
+
+# 模型配置
+MODEL_NAME = "google/gemini-3-pro-preview"
+# MODEL_NAME = 'anthropic/claude-sonnet-4.5'
+
+agent = Agent(
+    name="First Step Analyzer",
+    model=get_model(MODEL_NAME),
+    model_settings=ModelSettings(
+        temperature=0.0,
+        max_tokens=65536,
+    ),
+    tools=[],
+)
+
+
+def extract_points_from_filtered_result(how_result: Dict) -> List[Dict]:
+    """
+    从过滤后的 how 解构结果中提取所有点的信息
+
+    Args:
+        how_result: how解构结果
+
+    Returns:
+        点信息列表,不包含原始分类标签
+    """
+    points = []
+
+    for point_type in ["灵感点", "关键点", "目的点"]:
+        point_list_key = f"{point_type}列表"
+        point_list = how_result.get(point_list_key, [])
+
+        for point in point_list:
+            point_name = point.get("名称", "")
+            point_desc = point.get("描述", "")
+
+            # 检查是否有匹配到人设特征
+            has_match = False
+            matched_feature = None
+            similarity = None
+
+            # 遍历 how 步骤列表中的特征
+            for step in point.get("how步骤列表", []):
+                for feature in step.get("特征列表", []):
+                    match_results = feature.get("匹配结果", [])
+                    if match_results:  # 如果有匹配结果(top1)
+                        has_match = True
+                        match = match_results[0]
+                        matched_feature = match.get("人设特征名称", "")
+                        similarity = match.get("匹配结果", {}).get("相似度", 0)
+                        break
+                if has_match:
+                    break
+
+            point_info = {
+                "名称": point_name,
+                "描述": point_desc,
+                "是否匹配到已有人设": has_match
+            }
+
+            if has_match:
+                point_info["匹配的人设特征"] = matched_feature
+                point_info["相似度"] = similarity
+
+            points.append(point_info)
+
+    return points
+
+
+def build_prompt(points: List[Dict]) -> str:
+    """
+    构建分析 prompt
+
+    Args:
+        points: 点信息列表
+
+    Returns:
+        prompt 文本
+    """
+    # 构建点的描述文本
+    points_text = []
+    for i, point in enumerate(points, 1):
+        text = f"{i}. {point['名称']}\n   {point['描述']}"
+
+        if point['是否匹配到已有人设']:
+            text += f"\n   [已匹配] 匹配到人设特征: {point['匹配的人设特征']} (相似度: {point['相似度']:.2f})"
+        else:
+            text += "\n   [未匹配] 未匹配到已有人设特征"
+
+        points_text.append(text)
+
+    points_section = "\n\n".join(points_text)
+
+    return f'''
+以下是一个内容创作的解构结果。这些点已经被分析和分类,但这个分类是分析维度,不代表真实的创作顺序。
+
+请判断:在这些点中,哪些最有可能是创作者的"第一步"(创作起点)?
+
+## 判断标准
+
+**起点特征**:
+- 最先触发创作、不依赖其他点的节点
+- 可能是外部事件、时事热点、商业需求等
+- 起点可能有多个
+
+**参考信息**:
+- **已匹配到人设的点**:来源于创作者已有的人设/风格/习惯
+- **未匹配的点**:可能来自外部触发、人设推导、或新尝试
+
+## 待分析的点
+
+{points_section}
+
+## 输出要求
+
+以 JSON 格式输出:
+{{
+  "推理过程": "详细说明判断逻辑...",
+  "第一步候选": [
+    {{
+      "点名称": "...",
+      "第一步概率": 0.95,  // 0-1之间的数值
+      "推理依据": "...",
+      "来源分析": "外部触发/人设延伸/商业驱动/其他"
+    }}
+  ]
+}}
+
+注意:
+1. 只输出最有可能是第一步的点(通常1-3个)
+2. 按第一步概率降序排列
+3. 不要被点的呈现顺序影响判断
+'''.strip()
+
+
+async def analyze_post(post_data: Dict) -> Dict:
+    """
+    分析单个帖子
+
+    Args:
+        post_data: 帖子数据(包含 how解构结果)
+
+    Returns:
+        分析结果
+    """
+    post_id = post_data.get("帖子id", "")
+    how_result = post_data.get("how解构结果", {})
+
+    # 提取所有点的信息
+    points = extract_points_from_filtered_result(how_result)
+
+    if not points:
+        return {
+            "帖子id": post_id,
+            "模型": MODEL_NAME,
+            "输入": {"点列表": []},
+            "输出": None,
+            "错误": "没有可分析的点"
+        }
+
+    # 构建 prompt
+    prompt = build_prompt(points)
+
+    # 使用 custom_span 标识单个帖子的分析流程
+    with custom_span(
+        name=f"分析第一步 - 帖子 {post_id}",
+        data={
+            "帖子id": post_id,
+            "点数量": len(points),
+            "模型": MODEL_NAME
+        }
+    ):
+        # 调用 agent
+        result = await Runner.run(agent, input=prompt)
+        output = result.final_output
+
+    # 解析 JSON
+    try:
+        if "```json" in output:
+            json_start = output.find("```json") + 7
+            json_end = output.find("```", json_start)
+            json_str = output[json_start:json_end].strip()
+        elif "{" in output and "}" in output:
+            json_start = output.find("{")
+            json_end = output.rfind("}") + 1
+            json_str = output[json_start:json_end]
+        else:
+            json_str = output
+
+        analysis_result = json.loads(json_str)
+
+        return {
+            "帖子id": post_id,
+            "模型": MODEL_NAME,
+            "输入": {
+                "点列表": points,
+                "prompt": prompt
+            },
+            "输出": analysis_result
+        }
+    except Exception as e:
+        return {
+            "帖子id": post_id,
+            "模型": MODEL_NAME,
+            "输入": {
+                "点列表": points,
+                "prompt": prompt
+            },
+            "输出": None,
+            "错误": str(e),
+            "原始输出": output
+        }
+
+
+async def main(current_time: str = None, log_url: str = None):
+    """主函数
+
+    Args:
+        current_time: 当前时间戳(从外部传入)
+        log_url: 日志链接(从外部传入)
+    """
+    # 使用路径配置
+    config = PathConfig()
+
+    # 确保输出目录存在
+    config.ensure_dirs()
+
+    # 获取路径
+    input_dir = config.intermediate_dir / "filtered_results"
+    output_dir = config.intermediate_dir / "first_step_analysis"
+
+    # 确保输出目录存在
+    output_dir.mkdir(parents=True, exist_ok=True)
+
+    print(f"账号: {config.account_name}")
+    print(f"输入目录: {input_dir}")
+    print(f"输出目录: {output_dir}")
+    print(f"使用模型: {MODEL_NAME}")
+    if log_url:
+        print(f"Trace URL: {log_url}")
+    print()
+
+    # 读取所有过滤后的文件
+    input_files = list(input_dir.glob("*_filtered.json"))
+
+    if not input_files:
+        print(f"错误: 在 {input_dir} 中没有找到任何 *_filtered.json 文件")
+        return
+
+    print(f"找到 {len(input_files)} 个文件待分析\n")
+
+    # 批量分析
+    results = []
+    for i, input_file in enumerate(input_files, 1):
+        print(f"[{i}/{len(input_files)}] 分析文件: {input_file.name}")
+
+        # 读取文件
+        with open(input_file, "r", encoding="utf-8") as f:
+            post_data = json.load(f)
+
+        # 分析
+        result = await analyze_post(post_data)
+        results.append(result)
+
+        # 立即保存单个帖子的结果
+        post_id = result.get("帖子id", "unknown")
+        single_output_file = output_dir / f"{post_id}_first_step.json"
+
+        single_result = {
+            "元数据": {
+                "current_time": current_time,
+                "log_url": log_url,
+                "model": MODEL_NAME
+            },
+            "帖子id": post_id,
+            "分析结果": result
+        }
+
+        with open(single_output_file, "w", encoding="utf-8") as f:
+            json.dump(single_result, f, ensure_ascii=False, indent=2)
+
+        # 显示结果
+        output = result.get("输出", {})
+        if output:
+            first_steps = output.get("第一步候选", [])
+            print(f"  第一步候选:")
+            for step in first_steps:
+                print(f"    - {step.get('点名称', 'N/A')} (概率: {step.get('第一步概率', 0):.2f})")
+            print(f"  ✓ 已保存: {single_output_file.name}")
+        else:
+            print(f"  分析失败: {result.get('错误', 'N/A')}")
+        print()
+
+    print(f"✓ 所有分析完成,结果已保存到: {output_dir}")
+    if log_url:
+        print(f"Trace: {log_url}")
+
+    # 打印汇总
+    print("\n" + "=" * 80)
+    print("分析汇总")
+    print("=" * 80)
+    for result in results:
+        post_id = result["帖子id"]
+        output = result.get("输出", {})
+        if output:
+            first_steps = output.get("第一步候选", [])
+            print(f"\n帖子 {post_id}:")
+            for step in first_steps:
+                print(f"  - {step.get('点名称', 'N/A')} ({step.get('来源分析', 'N/A')}, 概率: {step.get('第一步概率', 0):.2f})")
+        else:
+            print(f"\n帖子 {post_id}: 分析失败")
+
+
+if __name__ == "__main__":
+    # 设置 trace
+    current_time, log_url = set_trace()
+
+    # 使用 trace 上下文包裹整个执行流程
+    with trace("第一步分析"):
+        asyncio.run(main(current_time, log_url))

+ 379 - 0
script/data_processing/analyze_first_step_v2.py

@@ -0,0 +1,379 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+第一步分析脚本 v2
+
+基于过滤后的 how 解构结果,分析哪些点最有可能是创作者的第一步(创作起点),
+以及第一步的前序节点(第一步是怎么来的)。
+
+v2 新增功能:
+- 分析第一步的前序节点(已有人设 或 外部触发)
+- 提供搜索关键词(用于后续验证)
+- 严格约束:前序节点只能是已匹配的人设节点,搜索关键词只能来自节点名称
+
+输入:intermediate/filtered_results/ 中的过滤结果
+输出:第一步分析结果(带前序节点信息)
+"""
+
+import asyncio
+import json
+from pathlib import Path
+from typing import Dict, List
+import sys
+
+# 添加项目根目录到路径
+project_root = Path(__file__).parent.parent.parent
+sys.path.insert(0, str(project_root))
+
+from agents import Agent, Runner, ModelSettings, trace
+from agents.tracing.create import custom_span
+from lib.client import get_model
+from lib.my_trace import set_trace_smith as set_trace
+from script.data_processing.path_config import PathConfig
+
+# 模型配置
+MODEL_NAME = "google/gemini-3-pro-preview"
+# MODEL_NAME = 'anthropic/claude-sonnet-4.5'
+
+agent = Agent(
+    name="First Step Analyzer V2",
+    model=get_model(MODEL_NAME),
+    model_settings=ModelSettings(
+        temperature=0.0,
+        max_tokens=65536,
+    ),
+    tools=[],
+)
+
+
+def extract_points_from_filtered_result(how_result: Dict) -> List[Dict]:
+    """
+    从过滤后的 how 解构结果中提取所有点的信息
+
+    Args:
+        how_result: how解构结果
+
+    Returns:
+        点信息列表,不包含原始分类标签
+    """
+    points = []
+
+    for point_type in ["灵感点", "关键点", "目的点"]:
+        point_list_key = f"{point_type}列表"
+        point_list = how_result.get(point_list_key, [])
+
+        for point in point_list:
+            point_name = point.get("名称", "")
+            point_desc = point.get("描述", "")
+
+            # 检查是否有匹配到人设特征
+            has_match = False
+            matched_feature = None
+            similarity = None
+
+            # 遍历 how 步骤列表中的特征
+            for step in point.get("how步骤列表", []):
+                for feature in step.get("特征列表", []):
+                    match_results = feature.get("匹配结果", [])
+                    if match_results:  # 如果有匹配结果(top1)
+                        has_match = True
+                        match = match_results[0]
+                        matched_feature = match.get("人设特征名称", "")
+                        similarity = match.get("匹配结果", {}).get("相似度", 0)
+                        break
+                if has_match:
+                    break
+
+            point_info = {
+                "名称": point_name,
+                "描述": point_desc,
+                "是否匹配到已有人设": has_match
+            }
+
+            if has_match:
+                point_info["匹配的人设特征"] = matched_feature
+                point_info["相似度"] = similarity
+
+            points.append(point_info)
+
+    return points
+
+
+def build_prompt(points: List[Dict]) -> str:
+    """
+    构建分析 prompt (v2版本:增加前序节点分析)
+
+    Args:
+        points: 点信息列表
+
+    Returns:
+        prompt 文本
+    """
+    # 构建点的描述文本
+    points_text = []
+    matched_points_names = []  # 收集已匹配的点名称
+
+    for i, point in enumerate(points, 1):
+        text = f"{i}. {point['名称']}\n   {point['描述']}"
+
+        if point['是否匹配到已有人设']:
+            text += f"\n   [已匹配] 匹配到人设特征: {point['匹配的人设特征']} (相似度: {point['相似度']:.2f})"
+            matched_points_names.append(point['名称'])
+        else:
+            text += "\n   [未匹配] 未匹配到已有人设特征"
+
+        points_text.append(text)
+
+    points_section = "\n\n".join(points_text)
+    matched_points_section = "\n".join([f"- {name}" for name in matched_points_names])
+
+    return f'''
+以下是一个内容创作的解构结果。这些点已经被分析和分类,但这个分类是分析维度,不代表真实的创作顺序。
+
+请判断:在这些点中,哪些最有可能是创作者的"第一步"(创作起点),以及第一步的前序节点(第一步是怎么来的)。
+
+## 判断标准
+
+**起点特征**:
+- 最先触发创作、不依赖其他点的节点
+- 可能是外部事件、时事热点、商业需求等
+- 起点可能有多个
+
+**前序节点规则**:
+- 每个第一步只有**一个**直接前序节点
+- 客观分析哪个前序节点最有可能引发第一步,给出前序概率
+- 前序节点只能是以下两种之一:
+  1. **已匹配的人设节点**:从下面的"已匹配的点"列表中选择
+  2. **外部触发**:纯外部事件触发
+- 选择前序概率最高的那个,不要预设倾向
+
+**搜索关键词规则**(重要!):
+- 如果前序是外部触发,必须提供搜索关键词
+- 搜索关键词**只能**从点的名称中提取,不能推导、不能扩展、不能添加任何额外的词
+
+## 已匹配的点(可作为前序节点)
+
+{matched_points_section}
+
+## 待分析的点
+
+{points_section}
+
+## 输出要求
+
+以 JSON 格式输出:
+{{
+  "推理过程": "详细说明判断逻辑...",
+  "第一步候选": [
+    {{
+      "点名称": "...",
+      "第一步概率": 0.95,
+      "推理依据": "...",
+      "来源分析": "外部触发/人设延伸/商业驱动/其他",
+      "前序节点": {{
+        "类型": "已有人设" 或 "外部触发",
+        "人设节点名称": "..." 或 null,  // 如果类型是"已有人设",必须从上面的"已匹配的点"中选择
+        "匹配的人设特征": "..." 或 null,  // 如果有人设节点,填写其匹配的特征
+        "相似度": 0.84 或 null,  // 人设节点与特征的匹配相似度
+        "前序概率": 0.75,  // 前序节点引发第一步的概率(0-1之间)
+        "搜索关键词": ["关键词1", "关键词2"] 或 null,  // 只在"外部触发"时提供,且只能从点名称中提取
+        "推理": "说明为什么这个前序节点可能引发第一步"
+      }}
+    }}
+  ]
+}}
+
+注意:
+1. 只输出最有可能是第一步的点(通常1-3个)
+2. 按第一步概率降序排列
+3. 客观分析前序节点,选择前序概率最高的(无论是已匹配人设还是外部触发)
+4. 搜索关键词只能从点名称中提取,不能推导
+5. 前序节点不需要能完全推导出第一步,只要可能引发创作者关注即可
+'''.strip()
+
+
+async def analyze_post(post_data: Dict) -> Dict:
+    """
+    分析单个帖子
+
+    Args:
+        post_data: 帖子数据(包含 how解构结果)
+
+    Returns:
+        分析结果
+    """
+    post_id = post_data.get("帖子id", "")
+    how_result = post_data.get("how解构结果", {})
+
+    # 提取所有点的信息
+    points = extract_points_from_filtered_result(how_result)
+
+    if not points:
+        return {
+            "帖子id": post_id,
+            "模型": MODEL_NAME,
+            "输入": {"点列表": []},
+            "输出": None,
+            "错误": "没有可分析的点"
+        }
+
+    # 构建 prompt
+    prompt = build_prompt(points)
+
+    # 使用 custom_span 标识单个帖子的分析流程
+    with custom_span(
+        name=f"分析第一步 - 帖子 {post_id}",
+        data={
+            "帖子id": post_id,
+            "点数量": len(points),
+            "模型": MODEL_NAME
+        }
+    ):
+        # 调用 agent
+        result = await Runner.run(agent, input=prompt)
+        output = result.final_output
+
+    # 解析 JSON
+    try:
+        if "```json" in output:
+            json_start = output.find("```json") + 7
+            json_end = output.find("```", json_start)
+            json_str = output[json_start:json_end].strip()
+        elif "{" in output and "}" in output:
+            json_start = output.find("{")
+            json_end = output.rfind("}") + 1
+            json_str = output[json_start:json_end]
+        else:
+            json_str = output
+
+        analysis_result = json.loads(json_str)
+
+        return {
+            "帖子id": post_id,
+            "模型": MODEL_NAME,
+            "输入": {
+                "点列表": points,
+                "prompt": prompt
+            },
+            "输出": analysis_result
+        }
+    except Exception as e:
+        return {
+            "帖子id": post_id,
+            "模型": MODEL_NAME,
+            "输入": {
+                "点列表": points,
+                "prompt": prompt
+            },
+            "输出": None,
+            "错误": str(e),
+            "原始输出": output
+        }
+
+
+async def main(current_time: str = None, log_url: str = None):
+    """主函数
+
+    Args:
+        current_time: 当前时间戳(从外部传入)
+        log_url: 日志链接(从外部传入)
+    """
+    # 使用路径配置
+    config = PathConfig()
+
+    # 确保输出目录存在
+    config.ensure_dirs()
+
+    # 获取路径
+    input_dir = config.intermediate_dir / "filtered_results"
+    output_dir = config.intermediate_dir / "first_step_analysis_v2"
+
+    # 确保输出目录存在
+    output_dir.mkdir(parents=True, exist_ok=True)
+
+    print(f"账号: {config.account_name}")
+    print(f"输入目录: {input_dir}")
+    print(f"输出目录: {output_dir}")
+    print(f"使用模型: {MODEL_NAME}")
+    if log_url:
+        print(f"Trace URL: {log_url}")
+    print()
+
+    # 读取所有过滤后的文件
+    input_files = list(input_dir.glob("*_filtered.json"))
+
+    if not input_files:
+        print(f"错误: 在 {input_dir} 中没有找到任何 *_filtered.json 文件")
+        return
+
+    print(f"找到 {len(input_files)} 个文件待分析\n")
+
+    # 批量分析
+    results = []
+    for i, input_file in enumerate(input_files, 1):
+        print(f"[{i}/{len(input_files)}] 分析文件: {input_file.name}")
+
+        # 读取文件
+        with open(input_file, "r", encoding="utf-8") as f:
+            post_data = json.load(f)
+
+        # 分析
+        result = await analyze_post(post_data)
+        results.append(result)
+
+        # 立即保存单个帖子的结果
+        post_id = result.get("帖子id", "unknown")
+        single_output_file = output_dir / f"{post_id}_first_step.json"
+
+        single_result = {
+            "元数据": {
+                "current_time": current_time,
+                "log_url": log_url,
+                "model": MODEL_NAME
+            },
+            "帖子id": post_id,
+            "分析结果": result
+        }
+
+        with open(single_output_file, "w", encoding="utf-8") as f:
+            json.dump(single_result, f, ensure_ascii=False, indent=2)
+
+        # 显示结果
+        output = result.get("输出", {})
+        if output:
+            first_steps = output.get("第一步候选", [])
+            print(f"  第一步候选:")
+            for step in first_steps:
+                print(f"    - {step.get('点名称', 'N/A')} (概率: {step.get('第一步概率', 0):.2f})")
+            print(f"  ✓ 已保存: {single_output_file.name}")
+        else:
+            print(f"  分析失败: {result.get('错误', 'N/A')}")
+        print()
+
+    print(f"✓ 所有分析完成,结果已保存到: {output_dir}")
+    if log_url:
+        print(f"Trace: {log_url}")
+
+    # 打印汇总
+    print("\n" + "=" * 80)
+    print("分析汇总")
+    print("=" * 80)
+    for result in results:
+        post_id = result["帖子id"]
+        output = result.get("输出", {})
+        if output:
+            first_steps = output.get("第一步候选", [])
+            print(f"\n帖子 {post_id}:")
+            for step in first_steps:
+                print(f"  - {step.get('点名称', 'N/A')} ({step.get('来源分析', 'N/A')}, 概率: {step.get('第一步概率', 0):.2f})")
+        else:
+            print(f"\n帖子 {post_id}: 分析失败")
+
+
+if __name__ == "__main__":
+    # 设置 trace
+    current_time, log_url = set_trace()
+
+    # 使用 trace 上下文包裹整个执行流程
+    with trace("第一步分析"):
+        asyncio.run(main(current_time, log_url))

+ 293 - 0
script/data_processing/filter_how_results.py

@@ -0,0 +1,293 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+How解构结果过滤脚本
+
+从 how 解构结果中过滤出高质量的匹配结果:
+1. 移除 what解构结果 字段
+2. 只保留相似度 >= 0.6 的匹配结果
+3. 保留特征即使其匹配结果为空
+"""
+
+import json
+import argparse
+from pathlib import Path
+from typing import Dict, List
+import sys
+from tqdm import tqdm
+
+# 添加项目根目录到路径
+project_root = Path(__file__).parent.parent.parent
+sys.path.insert(0, str(project_root))
+
+from script.data_processing.path_config import PathConfig
+
+
+def filter_match_results(feature_list: List[Dict], threshold: float = 0.6) -> List[Dict]:
+    """
+    过滤特征列表中的匹配结果
+
+    Args:
+        feature_list: 特征列表
+        threshold: 相似度阈值
+
+    Returns:
+        过滤后的特征列表
+    """
+    filtered_features = []
+
+    for feature in feature_list:
+        filtered_feature = {
+            "特征名称": feature.get("特征名称", ""),
+            "权重": feature.get("权重", 1.0),
+            "匹配结果": []
+        }
+
+        # 过滤匹配结果
+        match_results = feature.get("匹配结果", [])
+        for match in match_results:
+            similarity = match.get("匹配结果", {}).get("相似度", 0)
+            if similarity >= threshold:
+                filtered_feature["匹配结果"].append(match)
+
+        # 按相似度降序排序,只保留 top1
+        if filtered_feature["匹配结果"]:
+            filtered_feature["匹配结果"].sort(
+                key=lambda x: x.get("匹配结果", {}).get("相似度", 0),
+                reverse=True
+            )
+            # 只保留相似度最高的一个
+            filtered_feature["匹配结果"] = [filtered_feature["匹配结果"][0]]
+
+        # 保留特征即使匹配结果为空
+        filtered_features.append(filtered_feature)
+
+    return filtered_features
+
+
+def filter_how_steps(how_steps: List[Dict], threshold: float = 0.6) -> List[Dict]:
+    """
+    过滤 how 步骤列表
+
+    Args:
+        how_steps: how 步骤列表
+        threshold: 相似度阈值
+
+    Returns:
+        过滤后的 how 步骤列表
+    """
+    filtered_steps = []
+
+    for step in how_steps:
+        filtered_step = {
+            "步骤名称": step.get("步骤名称", ""),
+            "特征列表": filter_match_results(step.get("特征列表", []), threshold)
+        }
+        filtered_steps.append(filtered_step)
+
+    return filtered_steps
+
+
+def filter_point_list(point_list: List[Dict], threshold: float = 0.6) -> List[Dict]:
+    """
+    过滤点列表(灵感点/关键点/目的点)
+
+    Args:
+        point_list: 点列表
+        threshold: 相似度阈值
+
+    Returns:
+        过滤后的点列表
+    """
+    filtered_points = []
+
+    for point in point_list:
+        filtered_point = {
+            "名称": point.get("名称", ""),
+            "描述": point.get("描述", ""),
+            "特征列表": point.get("特征列表", []),
+            "how步骤列表": filter_how_steps(point.get("how步骤列表", []), threshold)
+        }
+        filtered_points.append(filtered_point)
+
+    return filtered_points
+
+
+def calculate_statistics(original_point_list: List[Dict], filtered_point_list: List[Dict]) -> Dict:
+    """
+    计算过滤统计信息
+
+    Args:
+        original_point_list: 原始点列表
+        filtered_point_list: 过滤后的点列表
+
+    Returns:
+        统计信息字典
+    """
+    original_count = 0
+    filtered_count = 0
+
+    # 统计原始匹配数量
+    for point in original_point_list:
+        for step in point.get("how步骤列表", []):
+            for feature in step.get("特征列表", []):
+                original_count += len(feature.get("匹配结果", []))
+
+    # 统计过滤后匹配数量
+    for point in filtered_point_list:
+        for step in point.get("how步骤列表", []):
+            for feature in step.get("特征列表", []):
+                filtered_count += len(feature.get("匹配结果", []))
+
+    return {
+        "原始匹配数": original_count,
+        "过滤后匹配数": filtered_count,
+        "保留数量": filtered_count,
+        "移除数量": original_count - filtered_count,
+        "保留比例": f"{filtered_count / original_count * 100:.2f}%" if original_count > 0 else "0%"
+    }
+
+
+def process_single_file(input_file: Path, output_file: Path, threshold: float = 0.6) -> Dict:
+    """
+    处理单个文件
+
+    Args:
+        input_file: 输入文件路径
+        output_file: 输出文件路径
+        threshold: 相似度阈值
+
+    Returns:
+        统计信息
+    """
+    # 读取原始文件
+    with open(input_file, "r", encoding="utf-8") as f:
+        data = json.load(f)
+
+    # 提取基本信息(移除 what解构结果)
+    filtered_data = {
+        "帖子id": data.get("帖子id", ""),
+        "帖子详情": data.get("帖子详情", {})
+    }
+
+    # 处理 how解构结果
+    how_result = data.get("how解构结果", {})
+    filtered_how_result = {}
+
+    stats = {
+        "灵感点": {"原始匹配数": 0, "过滤后匹配数": 0},
+        "关键点": {"原始匹配数": 0, "过滤后匹配数": 0},
+        "目的点": {"原始匹配数": 0, "过滤后匹配数": 0}
+    }
+
+    for point_type in ["灵感点", "关键点", "目的点"]:
+        point_list_key = f"{point_type}列表"
+        original_points = how_result.get(point_list_key, [])
+
+        if original_points:
+            filtered_points = filter_point_list(original_points, threshold)
+            filtered_how_result[point_list_key] = filtered_points
+
+            # 计算统计
+            point_stats = calculate_statistics(original_points, filtered_points)
+            stats[point_type] = point_stats
+
+    filtered_data["how解构结果"] = filtered_how_result
+
+    # 保存过滤后的文件
+    output_file.parent.mkdir(parents=True, exist_ok=True)
+    with open(output_file, "w", encoding="utf-8") as f:
+        json.dump(filtered_data, f, ensure_ascii=False, indent=4)
+
+    # 汇总统计
+    total_stats = {
+        "原始匹配数": sum(s["原始匹配数"] for s in stats.values()),
+        "过滤后匹配数": sum(s["过滤后匹配数"] for s in stats.values()),
+        "详细统计": stats
+    }
+
+    return total_stats
+
+
+def main():
+    """主函数"""
+    parser = argparse.ArgumentParser(description="过滤 how 解构结果,只保留高相似度的匹配")
+    parser.add_argument(
+        "--threshold",
+        type=float,
+        default=0.6,
+        help="相似度阈值(默认 0.6)"
+    )
+
+    args = parser.parse_args()
+
+    # 使用路径配置
+    config = PathConfig()
+
+    # 确保输出目录存在
+    config.ensure_dirs()
+
+    # 获取路径
+    input_dir = config.how_results_dir
+    output_dir = config.intermediate_dir / "filtered_results"
+    threshold = args.threshold
+
+    print(f"账号: {config.account_name}")
+    print(f"输入目录: {input_dir}")
+    print(f"输出目录: {output_dir}")
+    print(f"相似度阈值: {threshold}")
+    print()
+
+    # 确保输出目录存在
+    output_dir.mkdir(parents=True, exist_ok=True)
+
+    # 获取所有输入文件
+    input_files = list(input_dir.glob("*_how.json"))
+
+    if not input_files:
+        print(f"警告: 在 {input_dir} 中没有找到任何 *_how.json 文件")
+        return
+
+    print(f"找到 {len(input_files)} 个文件待处理\n")
+
+    # 批量处理文件
+    total_original = 0
+    total_filtered = 0
+
+    for input_file in tqdm(input_files, desc="处理文件", unit="文件"):
+        # 生成输出文件名
+        post_id = input_file.stem.replace("_how", "")
+        output_file = output_dir / f"{post_id}_filtered.json"
+
+        # 处理文件
+        stats = process_single_file(input_file, output_file, threshold)
+
+        total_original += stats["原始匹配数"]
+        total_filtered += stats["过滤后匹配数"]
+
+        # 显示单个文件的统计
+        tqdm.write(f"  {input_file.name}:")
+        tqdm.write(f"    原始匹配: {stats['原始匹配数']}")
+        tqdm.write(f"    保留匹配: {stats['过滤后匹配数']}")
+        tqdm.write(f"    移除匹配: {stats['原始匹配数'] - stats['过滤后匹配数']}")
+        if stats['原始匹配数'] > 0:
+            ratio = stats['过滤后匹配数'] / stats['原始匹配数'] * 100
+            tqdm.write(f"    保留比例: {ratio:.2f}%")
+        tqdm.write(f"    → {output_file.name}")
+        tqdm.write("")
+
+    # 输出总体统计
+    print("\n" + "="*60)
+    print("总体统计:")
+    print(f"  处理文件数: {len(input_files)}")
+    print(f"  总原始匹配数: {total_original:,}")
+    print(f"  总保留匹配数: {total_filtered:,}")
+    print(f"  总移除匹配数: {total_original - total_filtered:,}")
+    if total_original > 0:
+        print(f"  总保留比例: {total_filtered / total_original * 100:.2f}%")
+    print("="*60)
+    print(f"\n完成! 输出文件保存在: {output_dir}")
+
+
+if __name__ == "__main__":
+    main()