Talegorithm 2 недель назад
Родитель
Сommit
c7a94a9cb8

+ 15 - 7
agent/trace/compaction.py

@@ -272,13 +272,21 @@ COMPRESSION_PROMPT = """请对以上对话历史进行压缩总结。
 REFLECT_PROMPT = """请回顾以上整个执行过程,提取有价值的经验教训。
 
 关注以下方面:
-1. **人工干预**:用户中途的指令是否说明了原来的执行过程哪里有问题
-2. **弯路**:哪些尝试是不必要的,有没有更直接的方法
-3. **好的决策**:哪些判断和选择是正确的,值得记住
-4. **工具使用**:哪些工具用法是高效的,哪些可以改进
-
-请以简洁的规则列表形式输出,每条规则格式为:
-- 当遇到 [条件] 时,应该 [动作](原因:[简短说明])。具体案例:[案例]
+1. 人工干预:用户中途的指令是否说明了原来的执行过程哪里有问题
+2. 弯路:哪些尝试是不必要的,有没有更直接的方法
+3. 好的决策:哪些判断和选择是正确的,值得记住
+4. 工具使用:哪些工具用法是高效的,哪些可以改进
+
+输出格式(严格遵守):
+- 每条经验单独成段,格式固定为:- 当 [条件] 时,应该 [动作](原因:[一句话说明])。具体案例:[案例]
+- 条目之间用一个空行分隔
+- 不输出任何标题、分类、编号、分隔线或其他结构
+- 不使用 markdown 加粗、表格、代码块等格式
+- 每条经验自包含,读者无需上下文即可理解
+- 只提取最有价值的 5-10 条,宁少勿滥
+
+示例(仅供参考格式,不要复制内容):
+- 当用户说"给我示例"时,应该用真实数据而不是编造(原因:编造的示例无法验证质量)。具体案例:training_samples.json 中的示例全是 LLM 自己编造的,用户明确要求"基于我指定的样本"。
 """
 
 

+ 388 - 0
examples/analyze_story/sft/run_pipeline.py

@@ -0,0 +1,388 @@
+#!/usr/bin/env python3
+"""
+Pipeline Runner:批量执行完整分析 + SFT 数据生成流程
+
+功能:
+- 自动按 window_size 切分小说,串行调用 step1_analyze.py
+- 每个窗口的分析完成后自动传给下一窗口(保持人物/线索连贯)
+- 并行调用 step2_build_sft.py 生成三类 SFT 数据
+- 所有窗口完成后合并 JSONL 到 merged/ 目录
+- **支持断点续跑**:已存在的输出文件自动跳过,直接从中断处继续
+
+用法:
+  cd examples/analyze_story/sft
+  python run_pipeline.py --novel ../input/大奉打更人.txt
+
+  # 指定输出目录(默认在 sft/ 下以文件名命名)
+  python run_pipeline.py --novel ../input/大奉打更人.txt --output-dir runs/大奉/
+
+  # 跳过某个任务,调整并发数
+  python run_pipeline.py --novel ../input/大奉打更人.txt --skip-task 3 --concurrency 8
+
+  # 只重新跑 step2(分析已完成的情况下)
+  python run_pipeline.py --novel ../input/大奉打更人.txt --only-step 2
+
+  # 强制重新跑(忽略已有文件)
+  python run_pipeline.py --novel ../input/大奉打更人.txt --force
+
+输出结构:
+  {output_dir}/
+    analysis/
+      w0.json          ← 第一个窗口分析
+      w1.json          ← 第二个窗口分析(如有)
+      ...
+    sft_raw/
+      w0/
+        task1_structure_planning.jsonl
+        task2_scene_continuation.jsonl
+        task3_shuang_injection.jsonl
+        stats.json
+      w1/
+        ...
+    merged/            ← 所有窗口合并后的最终数据
+      task1_structure_planning.jsonl
+      task2_scene_continuation.jsonl
+      task3_shuang_injection.jsonl
+      stats.json       ← 汇总统计
+    pipeline.log       ← 运行日志(追加写入)
+"""
+
+import sys
+import json
+import math
+import argparse
+import subprocess
+import datetime
+from pathlib import Path
+
+SCRIPT_DIR = Path(__file__).parent
+STEP1 = SCRIPT_DIR / "step1_analyze.py"
+STEP2 = SCRIPT_DIR / "step2_build_sft.py"
+
+SFT_TASKS = [
+    "task1_structure_planning.jsonl",
+    "task2_scene_continuation.jsonl",
+    "task3_shuang_injection.jsonl",
+]
+
+# ──────────────────────────────────────────────────────────────
+# 工具
+# ──────────────────────────────────────────────────────────────
+
+
+def load_text_size(path: str) -> int:
+    """粗略估算文件字符数(不完整解码,用字节数 / 1.5 估算中文字符数)"""
+    for enc in ["utf-8", "gbk", "gb2312", "gb18030"]:
+        try:
+            return len(Path(path).read_text(encoding=enc))
+        except UnicodeDecodeError:
+            continue
+    raise ValueError(f"无法解码文件: {path}")
+
+
+def count_jsonl_lines(path: Path) -> int:
+    if not path.exists():
+        return 0
+    return sum(1 for line in path.read_text(encoding="utf-8").splitlines() if line.strip())
+
+
+class Logger:
+    def __init__(self, log_path: Path):
+        self.log_path = log_path
+        log_path.parent.mkdir(parents=True, exist_ok=True)
+
+    def log(self, msg: str):
+        ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+        line = f"[{ts}] {msg}"
+        print(line)
+        with open(self.log_path, "a", encoding="utf-8") as f:
+            f.write(line + "\n")
+
+
+def run_cmd(cmd: list[str], logger: Logger) -> bool:
+    """执行子进程,实时打印输出,返回是否成功"""
+    logger.log(f"运行: {' '.join(str(c) for c in cmd)}")
+    proc = subprocess.Popen(
+        [str(c) for c in cmd],
+        stdout=subprocess.PIPE,
+        stderr=subprocess.STDOUT,
+        text=True,
+        encoding="utf-8",
+    )
+    for line in proc.stdout:
+        print(line, end="", flush=True)
+    proc.wait()
+    if proc.returncode != 0:
+        logger.log(f"失败(返回码 {proc.returncode})")
+        return False
+    return True
+
+
+# ──────────────────────────────────────────────────────────────
+# Step 1:逐窗口分析
+# ──────────────────────────────────────────────────────────────
+
+
+def run_step1_all(
+    novel: str,
+    analysis_dir: Path,
+    n_windows: int,
+    window_size: int,
+    model: str,
+    force: bool,
+    logger: Logger,
+    only_step: int | None,
+) -> list[Path]:
+    """串行分析所有窗口,返回成功生成的 analysis 文件路径列表"""
+    if only_step == 2:
+        # 只跑 step2,直接读已有的分析文件
+        files = sorted(analysis_dir.glob("w*.json"))
+        logger.log(f"[Step1 跳过] 使用已有分析文件 {len(files)} 个")
+        return files
+
+    analysis_dir.mkdir(parents=True, exist_ok=True)
+    analysis_files: list[Path] = []
+    prev_analysis: Path | None = None
+
+    for i in range(n_windows):
+        out = analysis_dir / f"w{i}.json"
+
+        if out.exists() and not force:
+            logger.log(f"[Step1 w{i}] 已存在,跳过 → {out}")
+            analysis_files.append(out)
+            prev_analysis = out
+            continue
+
+        logger.log(f"[Step1 w{i}/{n_windows-1}] 开始分析")
+        cmd = [
+            sys.executable, STEP1,
+            "--novel", novel,
+            "--window-index", str(i),
+            "--window-size", str(window_size),
+            "--output", str(out),
+            "--model", model,
+        ]
+        if prev_analysis:
+            cmd += ["--prev-analysis", str(prev_analysis)]
+
+        ok = run_cmd(cmd, logger)
+        if not ok:
+            logger.log(f"[Step1 w{i}] 失败,跳过后续窗口")
+            break
+
+        analysis_files.append(out)
+        prev_analysis = out
+
+    return analysis_files
+
+
+# ──────────────────────────────────────────────────────────────
+# Step 2:为每个分析文件生成 SFT 数据
+# ──────────────────────────────────────────────────────────────
+
+
+def run_step2_all(
+    novel: str,
+    analysis_files: list[Path],
+    sft_raw_dir: Path,
+    context_chars: int,
+    concurrency: int,
+    skip_tasks: list[int],
+    model: str,
+    force: bool,
+    logger: Logger,
+    only_step: int | None,
+) -> list[Path]:
+    """为每个 analysis 文件生成 SFT 数据,返回成功的 sft 子目录列表"""
+    if only_step == 1:
+        logger.log("[Step2 跳过] --only-step 1")
+        return []
+
+    sft_dirs: list[Path] = []
+
+    for analysis_path in analysis_files:
+        window_name = analysis_path.stem          # e.g. "w0"
+        sft_dir = sft_raw_dir / window_name
+        done_flag = sft_dir / "stats.json"
+
+        if done_flag.exists() and not force:
+            logger.log(f"[Step2 {window_name}] 已存在,跳过 → {sft_dir}")
+            sft_dirs.append(sft_dir)
+            continue
+
+        logger.log(f"[Step2 {window_name}] 开始生成 SFT 数据")
+        cmd = [
+            sys.executable, STEP2,
+            "--analysis", str(analysis_path),
+            "--novel", novel,
+            "--output-dir", str(sft_dir),
+            "--context-chars", str(context_chars),
+            "--concurrency", str(concurrency),
+            "--model", model,
+        ]
+        for t in skip_tasks:
+            cmd += ["--skip-task", str(t)]
+
+        ok = run_cmd(cmd, logger)
+        if ok:
+            sft_dirs.append(sft_dir)
+        else:
+            logger.log(f"[Step2 {window_name}] 失败,继续处理其他窗口")
+
+    return sft_dirs
+
+
+# ──────────────────────────────────────────────────────────────
+# 合并
+# ──────────────────────────────────────────────────────────────
+
+
+def merge_jsonl(sft_dirs: list[Path], merged_dir: Path, logger: Logger):
+    """合并所有窗口的 JSONL 文件到 merged/ 目录"""
+    if not sft_dirs:
+        logger.log("[Merge] 无 SFT 数据可合并")
+        return
+
+    merged_dir.mkdir(parents=True, exist_ok=True)
+    total_stats: dict[str, int] = {}
+
+    for task_file in SFT_TASKS:
+        out_path = merged_dir / task_file
+        count = 0
+        with open(out_path, "w", encoding="utf-8") as out_f:
+            for sft_dir in sft_dirs:
+                src = sft_dir / task_file
+                if src.exists():
+                    text = src.read_text(encoding="utf-8")
+                    lines = [l for l in text.splitlines() if l.strip()]
+                    for line in lines:
+                        out_f.write(line + "\n")
+                    count += len(lines)
+        total_stats[task_file] = count
+        logger.log(f"[Merge] {task_file}: {count} 条")
+
+    # 汇总统计
+    stats_path = merged_dir / "stats.json"
+    total = sum(total_stats.values())
+    stats = {
+        "total_samples": total,
+        "by_task": total_stats,
+        "windows": len(sft_dirs),
+        "merged_at": datetime.datetime.now().isoformat(),
+    }
+    stats_path.write_text(json.dumps(stats, ensure_ascii=False, indent=2), encoding="utf-8")
+    logger.log(f"[Merge] 完成,总计 {total} 条样本 → {merged_dir}")
+
+    # 打印汇总表
+    print(f"\n{'='*50}")
+    print("合并结果汇总")
+    print(f"{'='*50}")
+    for task_file, count in total_stats.items():
+        name = task_file.replace(".jsonl", "")
+        print(f"  {name:<40} {count:>6} 条")
+    print(f"  {'总计':<40} {total:>6} 条")
+    print(f"{'='*50}\n")
+
+
+# ──────────────────────────────────────────────────────────────
+# 主入口
+# ──────────────────────────────────────────────────────────────
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        description="Pipeline Runner:批量分析小说并生成 SFT 训练数据",
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+    )
+    parser.add_argument("--novel", required=True, help="小说 txt 文件路径")
+    parser.add_argument(
+        "--output-dir", default=None,
+        help="输出根目录(默认:sft/ 目录下以文件名命名,如 runs/大奉打更人/)",
+    )
+    parser.add_argument(
+        "--window-size", type=int, default=500_000,
+        help="每个分析窗口的字符数(默认 500000)",
+    )
+    parser.add_argument("--model", default="qwen-plus", help="使用的模型名称")
+    parser.add_argument(
+        "--context-chars", type=int, default=800,
+        help="Step2 中提取上文的字符数(默认 800)",
+    )
+    parser.add_argument(
+        "--concurrency", type=int, default=5,
+        help="Step2 并发 LLM 调用数(默认 5)",
+    )
+    parser.add_argument(
+        "--skip-task", type=int, action="append", default=[],
+        metavar="N", help="跳过 Step2 的某个任务(1/2/3),可多次指定",
+    )
+    parser.add_argument(
+        "--only-step", type=int, choices=[1, 2], default=None,
+        help="只执行某个步骤(1=只分析,2=只生成SFT,需要analysis已存在)",
+    )
+    parser.add_argument(
+        "--force", action="store_true",
+        help="强制重新运行,忽略已有输出文件",
+    )
+    args = parser.parse_args()
+
+    novel_path = Path(args.novel).resolve()
+    if not novel_path.exists():
+        print(f"错误:文件不存在 {novel_path}")
+        sys.exit(1)
+
+    # 输出目录
+    if args.output_dir:
+        output_dir = Path(args.output_dir).resolve()
+    else:
+        output_dir = SCRIPT_DIR / "runs" / novel_path.stem
+
+    analysis_dir = output_dir / "analysis"
+    sft_raw_dir = output_dir / "sft_raw"
+    merged_dir = output_dir / "merged"
+    log_path = output_dir / "pipeline.log"
+
+    output_dir.mkdir(parents=True, exist_ok=True)
+    logger = Logger(log_path)
+
+    # 计算窗口数
+    total_chars = load_text_size(str(novel_path))
+    n_windows = math.ceil(total_chars / args.window_size)
+
+    logger.log(f"{'='*60}")
+    logger.log(f"小说:{novel_path.name}  ({total_chars:,} 字符)")
+    logger.log(f"窗口:{n_windows} 个(每窗口 {args.window_size:,} 字符)")
+    logger.log(f"输出目录:{output_dir}")
+    logger.log(f"模型:{args.model}  并发:{args.concurrency}")
+    logger.log(f"跳过任务:{args.skip_task or '无'}  只执行步骤:{args.only_step or '全部'}")
+    logger.log(f"强制重跑:{'是' if args.force else '否(已有文件将跳过)'}")
+    logger.log(f"{'='*60}")
+
+    # Step 1
+    analysis_files = run_step1_all(
+        str(novel_path), analysis_dir, n_windows,
+        args.window_size, args.model, args.force, logger, args.only_step,
+    )
+
+    if not analysis_files:
+        logger.log("没有可用的分析文件,退出。")
+        sys.exit(1)
+
+    # Step 2
+    sft_dirs = run_step2_all(
+        str(novel_path), analysis_files, sft_raw_dir,
+        args.context_chars, args.concurrency, args.skip_task,
+        args.model, args.force, logger, args.only_step,
+    )
+
+    # Merge
+    if args.only_step != 1:
+        merge_jsonl(sft_dirs, merged_dir, logger)
+
+    logger.log("Pipeline 完成。")
+    print(f"\n日志文件:{log_path}")
+    print(f"最终数据:{merged_dir}")
+
+
+if __name__ == "__main__":
+    main()

+ 387 - 0
examples/analyze_story/sft/step1_analyze.py

@@ -0,0 +1,387 @@
+#!/usr/bin/env python3
+"""
+步骤1:500K 窗口故事分析
+
+定位方案:让 LLM 复述每个 beat 开头的原文片段作为锚点,
+         用 str.find() + 渐进缩短前缀 定位精确字符位置,
+         不依赖章节标题格式(适用于任意命名风格的小说)。
+
+用法:通常通过 run_pipeline.py 自动调用,也可单独手动运行:
+  python step1_analyze.py --novel ../input/大奉打更人.txt --output analysis/w0.json
+  python step1_analyze.py --novel ../input/大奉打更人.txt \\
+    --window-index 1 --prev-analysis analysis/w0.json --output analysis/w1.json
+
+环境变量(.env):
+  ALI_API_KEY   阿里云 DashScope API Key
+  ALI_BASE_URL  (可选)默认 https://dashscope.aliyuncs.com/compatible-mode/v1
+"""
+
+import os
+import re
+import json
+import asyncio
+import argparse
+from pathlib import Path
+from openai import AsyncOpenAI
+from dotenv import load_dotenv
+
+load_dotenv()
+# 也尝试加载上级目录的 .env
+load_dotenv(Path(__file__).parent.parent / ".env")
+
+client = AsyncOpenAI(
+    api_key=os.getenv("ALI_API_KEY"),
+    base_url=os.getenv(
+        "ALI_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1"
+    ),
+)
+
+WINDOW_SIZE = 500_000
+
+# ──────────────────────────────────────────────────────────────
+# 文本加载
+# ──────────────────────────────────────────────────────────────
+
+
+def load_text(path: str) -> str:
+    for enc in ["utf-8", "gbk", "gb2312", "gb18030"]:
+        try:
+            return Path(path).read_text(encoding=enc)
+        except UnicodeDecodeError:
+            continue
+    raise ValueError(f"无法解码文件: {path}")
+
+
+# ──────────────────────────────────────────────────────────────
+# 锚点定位
+# ──────────────────────────────────────────────────────────────
+
+
+def find_anchor(window_text: str, anchor: str, search_from: int = 0) -> int:
+    """
+    在 window_text[search_from:] 中查找锚点,返回窗口内局部位置。
+    找不到时渐进缩短前缀(25→20→15→10→8字),仍找不到返回 -1。
+    """
+    if not anchor:
+        return -1
+    for length in range(min(len(anchor), 25), 7, -1):
+        pos = window_text.find(anchor[:length], search_from)
+        if pos >= 0:
+            return pos
+    return -1
+
+
+def resolve_positions(beats: list[dict], window_text: str, window_offset: int, window_end: int) -> None:
+    """
+    用 start_anchor 将每个 beat 定位到绝对字符位置,原地写入
+    position_start / position_end。
+    策略:顺序搜索,每次从上一个 beat 的位置向后找,避免误匹配。
+    """
+    search_from = 0
+    unresolved = []
+
+    for i, beat in enumerate(beats):
+        anchor = beat.get("start_anchor", "")
+        pos = find_anchor(window_text, anchor, search_from)
+
+        if pos >= 0:
+            beat["position_start"] = window_offset + pos
+            beat["_anchor_resolved"] = True
+            search_from = pos + 1
+        else:
+            beat["position_start"] = -1  # 标记未解析
+            beat["_anchor_resolved"] = False
+            unresolved.append(beat["id"])
+
+    # 对未解析的 beat 按比例估算位置
+    if unresolved:
+        print(f"  警告:{len(unresolved)} 个 beat 锚点未找到,将按比例估算位置:{unresolved}")
+        resolved = [b for b in beats if b["_anchor_resolved"]]
+        total_beats = len(beats)
+        for i, beat in enumerate(beats):
+            if not beat["_anchor_resolved"]:
+                # 在前后已解析 beat 之间插值
+                prev_pos = next(
+                    (beats[j]["position_start"] for j in range(i - 1, -1, -1) if beats[j]["_anchor_resolved"]),
+                    window_offset,
+                )
+                next_pos = next(
+                    (beats[j]["position_start"] for j in range(i + 1, total_beats) if beats[j]["_anchor_resolved"]),
+                    window_end,
+                )
+                beat["position_start"] = (prev_pos + next_pos) // 2
+
+    # 填写 position_end:每个 beat 的结束 = 下一个 beat 的开始
+    for i, beat in enumerate(beats):
+        if i + 1 < len(beats):
+            beat["position_end"] = beats[i + 1]["position_start"]
+        else:
+            beat["position_end"] = window_end
+
+    # 清理内部标记字段
+    for beat in beats:
+        beat.pop("_anchor_resolved", None)
+
+
+# ──────────────────────────────────────────────────────────────
+# LLM 调用
+# ──────────────────────────────────────────────────────────────
+
+
+async def llm_call(messages: list, model: str, temperature: float = 0.3) -> str:
+    resp = await client.chat.completions.create(
+        model=model,
+        messages=messages,
+        temperature=temperature,
+        max_tokens=8192,
+    )
+    return resp.choices[0].message.content
+
+
+def extract_json(text: str) -> dict:
+    """从 LLM 输出中提取 JSON,兼容 ```json...``` 包裹"""
+    m = re.search(r"```json\s*(.*?)\s*```", text, re.DOTALL)
+    json_str = m.group(1) if m else text.strip()
+    try:
+        return json.loads(json_str)
+    except json.JSONDecodeError:
+        json_str = re.sub(r",\s*([}\]])", r"\1", json_str)
+        return json.loads(json_str)
+
+
+# ──────────────────────────────────────────────────────────────
+# 提示词
+# ──────────────────────────────────────────────────────────────
+
+SYSTEM_ANALYST = (
+    "你是资深故事分析专家,精通 Scene-Sequel 结构、MICE 线程理论(Milieu/Idea/Character/Event)"
+    "以及中国网文的爽点与钩子设计。请严格按指定 JSON 格式输出,不要有多余文字。"
+)
+
+
+def build_prompt(window_text: str, prev_meta: dict | None, novel_title: str) -> str:
+    prev_section = ""
+    if prev_meta:
+        prev_section = f"""## 前序窗口元信息(保持连贯性)
+
+### 已知人物
+{json.dumps(prev_meta.get("characters", []), ensure_ascii=False, indent=2)}
+
+### 已知剧情线索(MICE 线程)
+{json.dumps(prev_meta.get("outline", {}).get("plot_lines", []), ensure_ascii=False, indent=2)}
+
+### 前序主线摘要
+{prev_meta.get("outline", {}).get("main_plot", "无")}
+
+---
+"""
+
+    return f"""{prev_section}## 分析任务
+
+书名:{novel_title}
+
+### 1. 故事大纲
+- **main_plot**:本窗口主线剧情摘要(200-300 字)
+- **plot_lines**:活跃/新增剧情线索,每条包含:
+  - name、mice_type(M/I/C/E)、status(进行中/已解决/待推进)、description(30-60字)
+
+### 2. 人物小传
+主要人物(新出现 + 已有人物状态更新):name、role、goal、traits(3-5个)、relationships
+
+### 3. 节拍切分(Scene-Sequel 交替)
+
+**start_anchor 说明**(非常重要):
+- 从原文中逐字复制该节拍开头的 20-25 个字符,包含标点
+- 必须和原文完全一致,一字不差
+- 选择该节拍真正开始的位置,而非章节标题
+- 避免选择可能重复出现的通用短语
+
+**节拍要素**:
+- Scene:goal / conflict_type(人物冲突|环境冲突|内心冲突|信息冲突)/ conflict_description / disaster
+- Sequel:reaction / dilemma / decision
+- 每个 beat 必须有:mice_thread(推进的 MICE 线程名称)、shuang_point、state_changes
+
+**shuang_point 格式**:
+```
+"shuang_point": {{
+  "has_shuang": true/false,
+  "type": "打脸|升级|装逼|获得|碾压|无",
+  "intensity": "low|medium|high|none",
+  "description": "具体内容(如有)"
+}}
+```
+
+## 输出格式(严格 JSON)
+
+```json
+{{
+  "outline": {{
+    "main_plot": "...",
+    "plot_lines": [
+      {{"name": "税银案", "mice_type": "E", "status": "进行中", "description": "..."}}
+    ]
+  }},
+  "characters": [
+    {{"name": "...", "role": "主角", "goal": "...", "traits": ["机智"], "relationships": {{"角色A": "关系"}}}}
+  ],
+  "beats": [
+    {{
+      "id": "beat_001",
+      "type": "scene",
+      "start_anchor": "从原文逐字复制的开头20字",
+      "mice_thread": "税银案",
+      "summary": "...",
+      "goal": "...",
+      "conflict_type": "人物冲突",
+      "conflict_description": "...",
+      "disaster": "...",
+      "shuang_point": {{
+        "has_shuang": true,
+        "type": "智商碾压",
+        "intensity": "high",
+        "description": "..."
+      }},
+      "state_changes": {{
+        "plot_lines": [{{"name": "税银案", "old_state": "调查中", "new_state": "发现破绽"}}],
+        "characters": [{{"name": "许七安", "change": "从囚犯转变为关键证人"}}]
+      }}
+    }},
+    {{
+      "id": "beat_002",
+      "type": "sequel",
+      "start_anchor": "从原文逐字复制的开头20字",
+      "mice_thread": "身份成长",
+      "summary": "...",
+      "reaction": "...",
+      "dilemma": "...",
+      "decision": "...",
+      "shuang_point": {{"has_shuang": false, "type": "无", "intensity": "none", "description": ""}},
+      "state_changes": {{
+        "characters": [{{"name": "许七安", "change": "心态从被动转为主动"}}]
+      }}
+    }}
+  ]
+}}
+```
+
+## 待分析文本
+
+{window_text}
+"""
+
+
+# ──────────────────────────────────────────────────────────────
+# 主流程
+# ──────────────────────────────────────────────────────────────
+
+
+async def analyze_window(
+    novel_path: str,
+    window_index: int,
+    prev_analysis_path: str | None,
+    output_path: str,
+    model: str,
+    window_size: int = WINDOW_SIZE,
+):
+    print(f"\n{'='*60}")
+    print(f"窗口 {window_index} 分析")
+    print(f"{'='*60}")
+
+    text = load_text(novel_path)
+    total = len(text)
+    print(f"全文:{total:,} 字符")
+
+    start = window_index * window_size
+    end = min(start + window_size, total)
+
+    if start >= total:
+        print("起始位置超过文件长度,退出。")
+        return
+
+    window = text[start:end]
+    print(f"窗口范围:{start:,} - {end:,}({end - start:,} 字符)")
+
+    prev_meta = None
+    if prev_analysis_path and Path(prev_analysis_path).exists():
+        with open(prev_analysis_path, encoding="utf-8") as f:
+            prev_meta = json.load(f)
+        print(f"加载前序分析:{prev_analysis_path}")
+
+    novel_title = Path(novel_path).stem
+    prompt = build_prompt(window, prev_meta, novel_title)
+    messages = [
+        {"role": "system", "content": SYSTEM_ANALYST},
+        {"role": "user", "content": prompt},
+    ]
+
+    print(f"调用 LLM({model})...")
+    raw = await llm_call(messages, model=model)
+
+    print("解析 JSON...")
+    try:
+        analysis = extract_json(raw)
+    except json.JSONDecodeError as e:
+        err_path = Path(output_path).with_suffix(".error.txt")
+        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
+        err_path.write_text(raw, encoding="utf-8")
+        print(f"JSON 解析失败:{e}\n原始响应已保存到 {err_path}")
+        raise
+
+    # 锚点定位
+    beats = analysis.get("beats", [])
+    resolve_positions(beats, window, window_offset=start, window_end=end)
+
+    analysis["_meta"] = {
+        "novel_title": novel_title,
+        "window_index": window_index,
+        "window_start": start,
+        "window_end": end,
+        "total_chars": total,
+        "window_size": window_size,
+        "beats_count": len(beats),
+        "model": model,
+    }
+
+    out = Path(output_path)
+    out.parent.mkdir(parents=True, exist_ok=True)
+    out.write_text(json.dumps(analysis, ensure_ascii=False, indent=2), encoding="utf-8")
+
+    print(f"\n分析完成 → {output_path}")
+    print(f"  节拍数:{len(beats)}  人物:{len(analysis.get('characters', []))}  线索:{len(analysis.get('outline', {}).get('plot_lines', []))}")
+    print()
+    print(f"  {'id':<12} {'type':<8} {'position':<24} {'mice':<14} {'sp'} summary")
+    print(f"  {'-'*75}")
+    for b in beats:
+        pos = f"{b['position_start']:,}-{b['position_end']:,}"
+        sp = "⭐" if b.get("shuang_point", {}).get("has_shuang") else "  "
+        anchor_ok = "✓" if b.get("start_anchor") else "✗"
+        print(
+            f"  {b['id']:<12} {b['type']:<8} {pos:<24} "
+            f"{b.get('mice_thread','?'):<14} {sp} [{anchor_ok}] {b['summary'][:28]}..."
+        )
+
+
+def main():
+    parser = argparse.ArgumentParser(description="步骤1:500K 窗口故事分析")
+    parser.add_argument("--novel", required=True, help="小说 txt 文件路径")
+    parser.add_argument("--window-index", type=int, default=0, help="窗口序号(0-based)")
+    parser.add_argument("--window-size", type=int, default=WINDOW_SIZE, help="窗口大小(字符数)")
+    parser.add_argument("--prev-analysis", default=None, help="前一窗口的分析 JSON")
+    parser.add_argument("--output", required=True, help="输出 JSON 文件路径")
+    parser.add_argument("--model", default="qwen-plus", help="模型名称")
+    args = parser.parse_args()
+
+    asyncio.run(
+        analyze_window(
+            args.novel,
+            args.window_index,
+            args.prev_analysis,
+            args.output,
+            args.model,
+            args.window_size,
+        )
+    )
+
+
+if __name__ == "__main__":
+    main()

+ 784 - 0
examples/analyze_story/sft/step2_build_sft.py

@@ -0,0 +1,784 @@
+#!/usr/bin/env python3
+"""
+步骤2:从分析结果生成三类 SFT 训练数据
+
+三个任务(参考 00_task_definition.md):
+
+  Task 1 - 结构规划(Structure Planning)
+    输入:故事状态(MICE线程、last disaster/decision、位置)+ 上文
+    输出:<think>叙事状态分析 + 续写决策</think> + 结构规划 JSON
+    目标:让模型学会规划下一个 Scene-Sequel 单元的结构
+
+  Task 2 - 场景续写(Scene Continuation)
+    输入:上文 + 结构规划(Task 1 的输出)
+    输出:<think>上文理解 + 写法决策</think> + 续写正文
+    目标:让模型学会根据规划生成高质量正文
+
+  Task 3 - 爽点注入(Shuang Point Injection)
+    输入:平淡草稿 + 爽点类型 + 强度要求
+    输出:<think>草稿分析 + 爽点设计</think> + 增强版正文 + 修改说明
+    目标:让模型学会识别并注入爽点
+
+用法:
+  python step2_build_sft.py \\
+    --analysis analysis_w0.json \\
+    --novel input/大奉打更人.txt \\
+    --output-dir sft/dafeng/ \\
+    [--context-chars 800] \\
+    [--skip-task 3] \\
+    [--concurrency 5] \\
+    [--model qwen-plus]
+
+输出文件:
+  sft/dafeng/task1_structure_planning.jsonl
+  sft/dafeng/task2_scene_continuation.jsonl
+  sft/dafeng/task3_shuang_injection.jsonl
+  sft/dafeng/stats.json
+"""
+
+import os
+import re
+import json
+import asyncio
+import argparse
+from copy import deepcopy
+from pathlib import Path
+from openai import AsyncOpenAI
+from dotenv import load_dotenv
+
+load_dotenv()
+
+client = AsyncOpenAI(
+    api_key=os.getenv("ALI_API_KEY"),
+    base_url=os.getenv(
+        "ALI_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1"
+    ),
+)
+
+# ──────────────────────────────────────────────────────────────
+# 基础工具
+# ──────────────────────────────────────────────────────────────
+
+
+def load_text(path: str) -> str:
+    for enc in ["utf-8", "gbk", "gb2312", "gb18030"]:
+        try:
+            return Path(path).read_text(encoding=enc)
+        except UnicodeDecodeError:
+            continue
+    raise ValueError(f"无法解码文件: {path}")
+
+
+async def llm_call(
+    messages: list,
+    model: str,
+    temperature: float = 0.6,
+    max_tokens: int = 4096,
+) -> str:
+    resp = await client.chat.completions.create(
+        model=model,
+        messages=messages,
+        temperature=temperature,
+        max_tokens=max_tokens,
+    )
+    return resp.choices[0].message.content
+
+
+def extract_json_block(text: str) -> dict:
+    m = re.search(r"```json\s*(.*?)\s*```", text, re.DOTALL)
+    json_str = m.group(1) if m else text.strip()
+    try:
+        return json.loads(json_str)
+    except json.JSONDecodeError:
+        json_str = re.sub(r",\s*([}\]])", r"\1", json_str)
+        return json.loads(json_str)
+
+
+def write_jsonl(samples: list[dict], path: Path) -> None:
+    path.parent.mkdir(parents=True, exist_ok=True)
+    with open(path, "w", encoding="utf-8") as f:
+        for s in samples:
+            # 去掉内部 _* 字段再写入
+            out = {k: v for k, v in s.items() if not k.startswith("_")}
+            f.write(json.dumps(out, ensure_ascii=False) + "\n")
+    print(f"  写入 {len(samples)} 条 → {path}")
+
+
+# ──────────────────────────────────────────────────────────────
+# 故事状态累积
+# ──────────────────────────────────────────────────────────────
+
+
+def apply_state_changes(state: dict, changes: dict) -> dict:
+    """将一个 beat 的 state_changes 应用到状态快照,返回新快照"""
+    state = deepcopy(state)
+    for pl in changes.get("plot_lines", []):
+        for line in state["plot_lines"]:
+            if line["name"] == pl["name"]:
+                line["status"] = pl["new_state"]
+                break
+        else:
+            state["plot_lines"].append(
+                {"name": pl["name"], "status": pl["new_state"],
+                 "mice_type": "?", "description": pl.get("new_state", "")}
+            )
+    for ch in changes.get("characters", []):
+        for char in state["characters"]:
+            if char["name"] == ch["name"]:
+                char.setdefault("recent_changes", []).append(ch["change"])
+                # 只保留最近 3 条变化
+                char["recent_changes"] = char["recent_changes"][-3:]
+                break
+    return state
+
+
+def build_state_snapshot(analysis: dict, beat_index: int) -> dict:
+    """返回 beat_index 之前的故事状态快照"""
+    state = {
+        "plot_lines": deepcopy(analysis.get("outline", {}).get("plot_lines", [])),
+        "characters": deepcopy(analysis.get("characters", [])),
+    }
+    for b in analysis.get("beats", [])[:beat_index]:
+        state = apply_state_changes(state, b.get("state_changes", {}))
+    return state
+
+
+def get_last_disaster_decision(beats: list[dict], before_index: int) -> tuple[str, str]:
+    """返回 beat_index 之前最后一个 scene 的 disaster 和 最后一个 sequel 的 decision"""
+    last_disaster = "无(故事开局)"
+    last_decision = "无(故事开局)"
+    for b in beats[:before_index]:
+        if b["type"] == "scene":
+            last_disaster = b.get("disaster", "")
+        elif b["type"] == "sequel":
+            last_decision = b.get("decision", "")
+    return last_disaster, last_decision
+
+
+def format_mice_threads(plot_lines: list[dict]) -> str:
+    active = [pl for pl in plot_lines if pl.get("status") not in ["已解决", "已关闭"]]
+    if not active:
+        return "(无活跃线程)"
+    lines = []
+    for pl in active:
+        mice = pl.get("mice_type", "?")
+        lines.append(f"  [{mice}] {pl['name']}({pl['status']}):{pl.get('description', '')}")
+    return "\n".join(lines)
+
+
+def format_characters(characters: list[dict]) -> str:
+    parts = []
+    for c in characters:
+        recent = "、".join(c.get("recent_changes", []))
+        recent_str = f"近期:{recent}" if recent else ""
+        parts.append(f"  {c['name']}({c.get('role', '?')})目标:{c.get('goal', '')}  {recent_str}")
+    return "\n".join(parts)
+
+
+def calc_position_percent(beat: dict, total_chars: int) -> float:
+    return round(beat.get("position_start", 0) / max(total_chars, 1) * 100, 1)
+
+
+# ──────────────────────────────────────────────────────────────
+# Task 1:结构规划(Structure Planning)
+# ──────────────────────────────────────────────────────────────
+
+TASK1_SYSTEM = (
+    "你是一位专业的长篇小说结构规划师,精通 Scene-Sequel 结构、MICE 线程理论、"
+    "以及中国网文爽点与钩子设计。请严格按指定格式输出。"
+)
+
+TASK1_USER_TMPL = """\
+## 故事状态
+
+- 书名:{title}
+- 当前位置:第 {chapter} 章,约 {position_pct}% 处
+- 已激活的 MICE 线程:
+{mice_threads}
+- 上一个 Scene 的 Disaster:{last_disaster}
+- 上一个 Sequel 的 Decision:{last_decision}
+
+## 当前人物状态
+
+{characters}
+
+## 上文(最近 {context_chars} 字)
+
+{context_text}
+
+## 任务
+
+请规划下一个 Scene-Sequel 单元的结构。"""
+
+TASK1_COT_GEN_TMPL = """\
+## 故事状态
+
+- 书名:{title}
+- 当前位置:第 {chapter} 章,约 {position_pct}% 处
+- 已激活的 MICE 线程:
+{mice_threads}
+- 上一个 Scene 的 Disaster:{last_disaster}
+- 上一个 Sequel 的 Decision:{last_decision}
+
+## 当前人物状态
+
+{characters}
+
+## 上文(最近 {context_chars} 字)
+
+{context_text}
+
+## 参考信息(该节拍的实际内容摘要,仅用于帮你构建 CoT,禁止直接引用)
+
+类型:{beat_type}
+摘要:{beat_summary}
+核心要素:{beat_core}
+爽点:{shuang_info}
+
+---
+
+请以"事前规划"的视角写出你的思考过程和最终规划。
+
+**输出格式**:
+
+<think>
+## 叙事状态分析
+[分析当前处于哪个 MICE 线程、节拍、读者情绪积累]
+[分析上一个 Disaster/Decision 对下一步的约束]
+
+## 续写决策
+[决定下一个 Scene 的 Goal、Conflict 类型、Disaster 方向]
+[决定是否需要爽点/钩子,类型和强度]
+[决定节奏:快/慢,对话比例]
+</think>
+
+```json
+{{
+  "scene": {{
+    "goal": "...",
+    "conflict_type": "人物冲突|环境冲突|内心冲突|信息冲突",
+    "conflict_description": "...",
+    "disaster": "...",
+    "pacing": "fast|medium|slow",
+    "dialogue_ratio": 0.4
+  }},
+  "sequel": {{
+    "reaction": "...",
+    "dilemma": "...",
+    "decision": "..."
+  }},
+  "hooks": [
+    {{"type": "chapter_end|mid_chapter", "content": "..."}}
+  ],
+  "shuang_point": {{
+    "has_shuang": true,
+    "type": "打脸|升级|装逼|获得|碾压",
+    "position": "scene_start|scene_mid|scene_end"
+  }},
+  "mice_advancement": "M|I|C|E",
+  "estimated_words": 2000
+}}
+```"""
+
+
+def _beat_core_str(beat: dict) -> str:
+    if beat["type"] == "scene":
+        return (
+            f"goal={beat.get('goal', '')}  "
+            f"conflict={beat.get('conflict_description', '')}  "
+            f"disaster={beat.get('disaster', '')}"
+        )
+    return (
+        f"reaction={beat.get('reaction', '')}  "
+        f"dilemma={beat.get('dilemma', '')}  "
+        f"decision={beat.get('decision', '')}"
+    )
+
+
+def _shuang_str(beat: dict) -> str:
+    sp = beat.get("shuang_point", {})
+    if not sp.get("has_shuang"):
+        return "无"
+    return f"{sp.get('type', '')}({sp.get('intensity', '')}):{sp.get('description', '')}"
+
+
+async def gen_task1_sample(
+    i: int,
+    beat: dict,
+    analysis: dict,
+    novel_text: str,
+    context_chars: int,
+    model: str,
+    sem: asyncio.Semaphore,
+) -> dict | None:
+    async with sem:
+        meta = analysis.get("_meta", {})
+        title = meta.get("novel_title", "未知")
+        total_chars = meta.get("total_chars", len(novel_text))
+        beats = analysis.get("beats", [])
+
+        state = build_state_snapshot(analysis, i)
+        last_disaster, last_decision = get_last_disaster_decision(beats, i)
+        mice_threads = format_mice_threads(state["plot_lines"])
+        characters = format_characters(state["characters"])
+
+        chapter = beat.get("chapter_start", "?")
+        position_pct = calc_position_percent(beat, total_chars)
+
+        ctx_start = max(0, beat["position_start"] - context_chars)
+        context_text = novel_text[ctx_start: beat["position_start"]].strip()
+
+        shared_kwargs = dict(
+            title=title,
+            chapter=chapter,
+            position_pct=position_pct,
+            mice_threads=mice_threads,
+            last_disaster=last_disaster,
+            last_decision=last_decision,
+            characters=characters,
+            context_chars=context_chars,
+            context_text=context_text,
+        )
+
+        # 生成 CoT + 规划 JSON
+        cot_prompt = TASK1_COT_GEN_TMPL.format(
+            beat_type=beat["type"],
+            beat_summary=beat.get("summary", ""),
+            beat_core=_beat_core_str(beat),
+            shuang_info=_shuang_str(beat),
+            **shared_kwargs,
+        )
+        messages = [
+            {"role": "system", "content": TASK1_SYSTEM},
+            {"role": "user", "content": cot_prompt},
+        ]
+        try:
+            assistant_content = await llm_call(messages, model=model)
+        except Exception as e:
+            print(f"  [Task1] beat {i+1} LLM 调用失败:{e}")
+            return None
+
+        # 训练样本:用户只看到 story_state + context,不知道 beat 实际内容
+        user_content = TASK1_USER_TMPL.format(**shared_kwargs)
+
+        return {
+            "messages": [
+                {"role": "system", "content": TASK1_SYSTEM},
+                {"role": "user", "content": user_content},
+                {"role": "assistant", "content": assistant_content},
+            ],
+            "metadata": {
+                "task_type": "structure_planning",
+                "source_file": meta.get("novel_title", ""),
+                "chapter": f"第{chapter}章",
+                "position_percent": position_pct,
+                "mice_thread": beat.get("mice_thread", ""),
+                "beat_id": beat.get("id", ""),
+                "beat_type": beat["type"],
+                "word_count": beat["position_end"] - beat["position_start"],
+            },
+        }
+
+
+# ──────────────────────────────────────────────────────────────
+# Task 2:场景续写(Scene Continuation)
+# ──────────────────────────────────────────────────────────────
+
+TASK2_SYSTEM = (
+    "你是一位专业的网文作家,擅长写爽文、悬疑和情感类长篇小说,"
+    "能够根据结构规划生成节奏流畅、爽点鲜明的正文。"
+)
+
+TASK2_USER_TMPL = """\
+## 上文
+
+{context_text}
+
+## 结构规划
+
+{structure_plan}
+
+## 任务
+
+请续写下一段(约 {target_words} 字),风格与上文保持一致。"""
+
+TASK2_COT_GEN_TMPL = """\
+## 上文
+
+{context_text}
+
+## 结构规划
+
+{structure_plan}
+
+## 参考信息(该节拍的实际续写内容,仅用于帮你构建 CoT,禁止逐句引用)
+
+{beat_text_hint}
+
+---
+
+请以"事前决策"的视角写出写作思考过程,然后直接输出实际续写内容。
+
+**输出格式**:
+
+<think>
+## 上文理解
+[识别上文的叙事状态:最后一个 Scene/Sequel 的位置,主角的情绪状态]
+[识别关键信息:哪些细节需要在续写中呼应]
+
+## 写法决策
+[开头如何衔接:直接延续/场景切换/时间跳跃]
+[爽点如何植入:在哪个位置,用什么方式]
+[钩子如何设置:章末悬念的具体内容]
+[对话设计:谁说什么,潜台词是什么]
+</think>
+
+{actual_text}"""
+
+
+async def gen_task2_sample(
+    i: int,
+    beat: dict,
+    analysis: dict,
+    novel_text: str,
+    task1_samples: list,
+    context_chars: int,
+    model: str,
+    sem: asyncio.Semaphore,
+) -> dict | None:
+    async with sem:
+        meta = analysis.get("_meta", {})
+        total_chars = meta.get("total_chars", len(novel_text))
+
+        ctx_start = max(0, beat["position_start"] - context_chars)
+        context_text = novel_text[ctx_start: beat["position_start"]].strip()
+
+        beat_text = novel_text[beat["position_start"]: beat["position_end"]].strip()
+        if not beat_text:
+            return None
+
+        # 从 Task1 样本中提取结构规划(assistant 输出部分)
+        structure_plan = ""
+        if i < len(task1_samples) and task1_samples[i]:
+            for msg in task1_samples[i]["messages"]:
+                if msg["role"] == "assistant":
+                    structure_plan = msg["content"]
+                    break
+        if not structure_plan:
+            structure_plan = f"(Task1 未生成,beat 摘要:{beat.get('summary', '')})"
+
+        target_words = max(500, (beat["position_end"] - beat["position_start"]) // 2)
+
+        # 只给 LLM 前 300 字作为 hint,避免泄露太多
+        beat_hint = beat_text[:300] + "..." if len(beat_text) > 300 else beat_text
+
+        cot_prompt = TASK2_COT_GEN_TMPL.format(
+            context_text=context_text,
+            structure_plan=structure_plan,
+            beat_text_hint=beat_hint,
+            actual_text=beat_text,
+        )
+        messages = [
+            {"role": "system", "content": TASK2_SYSTEM},
+            {"role": "user", "content": cot_prompt},
+        ]
+        try:
+            cot_part = await llm_call(messages, model=model)
+        except Exception as e:
+            print(f"  [Task2] beat {i+1} LLM 调用失败:{e}")
+            return None
+
+        # 确保输出格式:<think>...</think>\n\n{实际正文}
+        if "<think>" in cot_part and beat_text not in cot_part:
+            # LLM 只生成了 CoT,拼接实际文本
+            think_end = cot_part.find("</think>")
+            if think_end != -1:
+                think_block = cot_part[: think_end + len("</think>")]
+                assistant_content = f"{think_block}\n\n{beat_text}"
+            else:
+                assistant_content = f"{cot_part}\n\n{beat_text}"
+        else:
+            assistant_content = cot_part
+
+        user_content = TASK2_USER_TMPL.format(
+            context_text=context_text,
+            structure_plan=structure_plan,
+            target_words=target_words,
+        )
+
+        return {
+            "messages": [
+                {"role": "system", "content": TASK2_SYSTEM},
+                {"role": "user", "content": user_content},
+                {"role": "assistant", "content": assistant_content},
+            ],
+            "metadata": {
+                "task_type": "scene_continuation",
+                "source_file": meta.get("novel_title", ""),
+                "chapter": f"第{beat.get('chapter_start', '?')}章",
+                "position_percent": calc_position_percent(beat, total_chars),
+                "mice_thread": beat.get("mice_thread", ""),
+                "beat_id": beat.get("id", ""),
+                "beat_type": beat["type"],
+                "word_count": len(beat_text),
+            },
+        }
+
+
+# ──────────────────────────────────────────────────────────────
+# Task 3:爽点注入(Shuang Point Injection)
+# ──────────────────────────────────────────────────────────────
+
+TASK3_SYSTEM = (
+    "你是一位专业的网文编辑,擅长识别和设计爽点(打脸、升级、装逼、获得、碾压),"
+    "能在不改变核心情节的前提下大幅提升情感冲击力。"
+)
+
+TASK3_GEN_TMPL = """\
+## 原文(包含爽点的增强版)
+
+{beat_text}
+
+---
+
+## 任务
+
+1. 判断这段文字是否包含明显爽点(打脸/升级/装逼/获得/碾压)
+2. 如果有,生成去掉爽点后的"平淡草稿"(保留核心情节事件,但去掉爽感设计)
+3. 以编辑视角,写出重新注入爽点的完整思考过程(CoT)和修改说明
+
+**输出格式(严格 JSON)**:
+
+```json
+{{
+  "has_shuang": true,
+  "shuang_type": "打脸|升级|装逼|获得|碾压",
+  "intensity": "low|medium|high",
+  "flat_draft": "去掉爽点后的平淡版本(完整文字)",
+  "cot": "<think>\\n## 草稿分析\\n[识别草稿问题]\\n\\n## 爽点设计\\n[注入方案]\\n</think>",
+  "modification_notes": "注入位置:...\\n爽点类型:...\\n关键改动:..."
+}}
+```
+
+如果不包含明显爽点,输出:`{{"has_shuang": false}}`"""
+
+TASK3_USER_TMPL = """\
+## 平淡草稿
+
+{flat_draft}
+
+## 要求
+
+- 爽点类型:{shuang_type}
+- 强度:{intensity}(low=轻微强化 | medium=明显提升 | high=大幅改写)
+- 不改变核心情节,只增强情感冲击力
+
+## 任务
+
+请注入爽点,输出增强版本。"""
+
+
+async def gen_task3_sample(
+    i: int,
+    beat: dict,
+    analysis: dict,
+    novel_text: str,
+    model: str,
+    sem: asyncio.Semaphore,
+) -> dict | None:
+    # 只处理有爽点的 beat
+    sp = beat.get("shuang_point", {})
+    if not sp.get("has_shuang"):
+        return None
+
+    async with sem:
+        meta = analysis.get("_meta", {})
+        total_chars = meta.get("total_chars", len(novel_text))
+
+        beat_text = novel_text[beat["position_start"]: beat["position_end"]].strip()
+        if len(beat_text) < 200:
+            return None
+
+        # 生成平淡草稿 + CoT
+        gen_prompt = TASK3_GEN_TMPL.format(beat_text=beat_text)
+        messages = [
+            {"role": "system", "content": TASK3_SYSTEM},
+            {"role": "user", "content": gen_prompt},
+        ]
+        try:
+            raw = await llm_call(messages, model=model)
+        except Exception as e:
+            print(f"  [Task3] beat {i+1} LLM 调用失败:{e}")
+            return None
+
+        try:
+            result = extract_json_block(raw)
+        except Exception:
+            print(f"  [Task3] beat {i+1} JSON 解析失败,跳过")
+            return None
+
+        if not result.get("has_shuang"):
+            return None
+
+        flat_draft = result.get("flat_draft", "")
+        cot = result.get("cot", "")
+        modification_notes = result.get("modification_notes", "")
+        shuang_type = result.get("shuang_type", sp.get("type", ""))
+        intensity = result.get("intensity", sp.get("intensity", "medium"))
+
+        if not flat_draft or not cot:
+            return None
+
+        # 训练样本
+        user_content = TASK3_USER_TMPL.format(
+            flat_draft=flat_draft,
+            shuang_type=shuang_type,
+            intensity=intensity,
+        )
+
+        # 输出:CoT + 增强版(原文)+ 修改说明
+        assistant_content = (
+            f"{cot}\n\n"
+            f"{beat_text}\n\n"
+            f"---\n**修改说明**:\n{modification_notes}"
+        )
+
+        return {
+            "messages": [
+                {"role": "system", "content": TASK3_SYSTEM},
+                {"role": "user", "content": user_content},
+                {"role": "assistant", "content": assistant_content},
+            ],
+            "metadata": {
+                "task_type": "shuang_injection",
+                "source_file": meta.get("novel_title", ""),
+                "chapter": f"第{beat.get('chapter_start', '?')}章",
+                "position_percent": calc_position_percent(beat, total_chars),
+                "shuang_type": shuang_type,
+                "intensity": intensity,
+                "beat_id": beat.get("id", ""),
+                "word_count": len(beat_text),
+            },
+        }
+
+
+# ──────────────────────────────────────────────────────────────
+# 主流程
+# ──────────────────────────────────────────────────────────────
+
+
+async def build_all(
+    analysis_path: str,
+    novel_path: str,
+    output_dir: str,
+    context_chars: int,
+    skip_tasks: set[int],
+    model: str,
+    concurrency: int,
+):
+    with open(analysis_path, encoding="utf-8") as f:
+        analysis = json.load(f)
+
+    novel_text = load_text(novel_path)
+    beats = analysis.get("beats", [])
+    out = Path(output_dir)
+    sem = asyncio.Semaphore(concurrency)
+
+    print(f"\n分析文件:{analysis_path}")
+    print(f"节拍数:{len(beats)}")
+    print(f"输出目录:{out}")
+    print(f"并发数:{concurrency}\n")
+
+    stats = {}
+
+    # ── Task 1 ──────────────────────────────────
+    task1_samples: list[dict | None] = [None] * len(beats)
+    if 1 not in skip_tasks:
+        print("[Task 1] 结构规划(Structure Planning)...")
+        tasks = [
+            gen_task1_sample(i, b, analysis, novel_text, context_chars, model, sem)
+            for i, b in enumerate(beats)
+        ]
+        results = await asyncio.gather(*tasks)
+        task1_samples = list(results)
+        valid = [s for s in task1_samples if s]
+        write_jsonl(valid, out / "task1_structure_planning.jsonl")
+        stats["task1"] = {"total": len(beats), "valid": len(valid)}
+        print(f"  Task1 完成:{len(valid)}/{len(beats)} 条有效\n")
+
+    # ── Task 2 ──────────────────────────────────
+    if 2 not in skip_tasks:
+        print("[Task 2] 场景续写(Scene Continuation)...")
+        tasks = [
+            gen_task2_sample(
+                i, b, analysis, novel_text, task1_samples, context_chars, model, sem
+            )
+            for i, b in enumerate(beats)
+        ]
+        results = await asyncio.gather(*tasks)
+        valid = [s for s in results if s]
+        write_jsonl(valid, out / "task2_scene_continuation.jsonl")
+        stats["task2"] = {"total": len(beats), "valid": len(valid)}
+        print(f"  Task2 完成:{len(valid)}/{len(beats)} 条有效\n")
+
+    # ── Task 3 ──────────────────────────────────
+    if 3 not in skip_tasks:
+        shuang_beats = [b for b in beats if b.get("shuang_point", {}).get("has_shuang")]
+        print(f"[Task 3] 爽点注入(Shuang Point Injection)... (共 {len(shuang_beats)} 个有爽点的 beat)")
+        tasks = [
+            gen_task3_sample(i, b, analysis, novel_text, model, sem)
+            for i, b in enumerate(beats)
+        ]
+        results = await asyncio.gather(*tasks)
+        valid = [s for s in results if s]
+        write_jsonl(valid, out / "task3_shuang_injection.jsonl")
+        stats["task3"] = {
+            "total": len(shuang_beats),
+            "valid": len(valid),
+        }
+        print(f"  Task3 完成:{len(valid)}/{len(shuang_beats)} 条有效\n")
+
+    # ── 统计 ──────────────────────────────────
+    stats_path = out / "stats.json"
+    stats_path.write_text(json.dumps(stats, ensure_ascii=False, indent=2), encoding="utf-8")
+    print(f"统计信息 → {stats_path}")
+
+    total_valid = sum(v.get("valid", 0) for v in stats.values())
+    print(f"\n全部完成。总有效样本数:{total_valid}")
+
+
+def main():
+    parser = argparse.ArgumentParser(description="步骤2:生成三类 SFT 训练数据")
+    parser.add_argument("--analysis", required=True, help="step1 输出的 analysis JSON")
+    parser.add_argument("--novel", required=True, help="小说 txt 文件路径")
+    parser.add_argument("--output-dir", required=True, help="输出目录")
+    parser.add_argument(
+        "--context-chars", type=int, default=800,
+        help="Task1/2 的上文字符数(默认 800)",
+    )
+    parser.add_argument(
+        "--skip-task", type=int, action="append", default=[],
+        metavar="N", help="跳过某个任务(1/2/3),可多次指定",
+    )
+    parser.add_argument(
+        "--concurrency", type=int, default=5,
+        help="并发 LLM 调用数(默认 5)",
+    )
+    parser.add_argument("--model", default="qwen-plus", help="使用的模型名称")
+    args = parser.parse_args()
+
+    asyncio.run(
+        build_all(
+            args.analysis,
+            args.novel,
+            args.output_dir,
+            args.context_chars,
+            set(args.skip_task),
+            args.model,
+            args.concurrency,
+        )
+    )
+
+
+if __name__ == "__main__":
+    main()