| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048 |
- #!/usr/bin/env python3
- """
- 步骤2:从分析结果生成三类 SFT 训练数据
- 三个任务(参考 00_task_definition.md):
- Task 1 - 结构规划(Structure Planning)
- 输入:故事状态(MICE线程、last disaster/decision、位置)+ 上文
- 输出:<think>叙事状态分析 + 续写决策</think> + 结构规划 JSON
- 目标:让模型学会规划下一个 Scene-Sequel 单元的结构
- Task 2 - 场景续写(Scene Continuation)
- 输入:上文 + 结构规划(Task 1 的输出)
- 输出:<think>上文理解 + 写法决策</think> + 续写正文
- 目标:让模型学会根据规划生成高质量正文
- Task 3 - 爽点注入(Shuang Point Injection)
- 输入:平淡草稿 + 爽点类型 + 强度要求
- 输出:<think>草稿分析 + 爽点设计</think> + 增强版正文 + 修改说明
- 目标:让模型学会识别并注入爽点
- 用法:
- python step2_build_sft.py \\
- --analysis analysis_w0.json \\
- --novel input/大奉打更人.txt \\
- --output-dir sft/dafeng/ \\
- [--context-chars 800] \\
- [--skip-task 3] \\
- [--concurrency 5] \\
- [--model qwen-plus]
- 输出文件:
- sft/dafeng/task1_structure_planning.jsonl
- sft/dafeng/task2_scene_continuation.jsonl
- sft/dafeng/task3_shuang_injection.jsonl
- sft/dafeng/stats.json
- """
- import os
- import re
- import json
- import asyncio
- import argparse
- from copy import deepcopy
- from pathlib import Path
- from openai import AsyncOpenAI, BadRequestError, RateLimitError, APIError
- from typing import Optional, List, Set
- from dotenv import load_dotenv
- load_dotenv()
- load_dotenv(Path(__file__).parent.parent / ".env") # 项目根目录 .env
- client = AsyncOpenAI(
- api_key=os.getenv("ALI_API_KEY"),
- base_url=os.getenv(
- "ALI_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1"
- ),
- )
- # ──────────────────────────────────────────────────────────────
- # 基础工具
- # ──────────────────────────────────────────────────────────────
- class ContentFilterError(Exception):
- """内容审查不通过,跳过该条样本,不重试"""
- def load_text(path: str) -> str:
- for enc in ["utf-8", "gbk", "gb2312", "gb18030"]:
- try:
- return Path(path).read_text(encoding=enc)
- except UnicodeDecodeError:
- continue
- raise ValueError(f"无法解码文件: {path}")
- async def llm_call(
- messages: list,
- model: str,
- temperature: float = 0.6,
- max_tokens: int = 4096,
- max_retries: int = 3,
- ) -> str:
- delay = 5.0
- for attempt in range(1, max_retries + 2): # +1 for the final attempt
- try:
- resp = await client.chat.completions.create(
- model=model,
- messages=messages,
- temperature=temperature,
- max_tokens=max_tokens,
- )
- return resp.choices[0].message.content
- except BadRequestError as e:
- err_code = getattr(e, "code", "") or ""
- # 阿里云内容审查:data_inspection_failed / content_filter 等
- if "data_inspection_failed" in str(e) or "content_filter" in err_code:
- raise ContentFilterError(f"内容审查不通过: {e}") from e
- raise # 其他 400 错误直接抛出
- except (RateLimitError, APIError) as e:
- if attempt > max_retries:
- raise
- print(f" [重试 {attempt}/{max_retries}] {type(e).__name__}: {e},{delay:.0f}s 后重试...")
- await asyncio.sleep(delay)
- delay = min(delay * 2, 60)
- def extract_json_block(text: str) -> dict:
- m = re.search(r"```json\s*(.*?)\s*```", text, re.DOTALL)
- json_str = m.group(1) if m else text.strip()
- try:
- return json.loads(json_str)
- except json.JSONDecodeError:
- json_str = re.sub(r",\s*([}\]])", r"\1", json_str)
- return json.loads(json_str)
- def write_jsonl(samples: List[dict], path: Path) -> None:
- path.parent.mkdir(parents=True, exist_ok=True)
- with open(path, "w", encoding="utf-8") as f:
- for s in samples:
- # 去掉内部 _* 字段再写入
- out = {k: v for k, v in s.items() if not k.startswith("_")}
- f.write(json.dumps(out, ensure_ascii=False) + "\n")
- print(f" 写入 {len(samples)} 条 → {path}")
- # ──────────────────────────────────────────────────────────────
- # 故事状态累积
- # ──────────────────────────────────────────────────────────────
- def build_state_snapshot(analysis: dict, beat_index: int) -> dict:
- """
- 返回 beat_index 之前的故事状态快照。
- 额外字段(比单纯状态更丰富):
- - plot_line_events: {线索名 -> [事件描述列表]}
- - recent_beats: 最近 5 个 beat 的简要记录
- """
- state = {
- "plot_lines": deepcopy(analysis.get("outline", {}).get("plot_lines", [])),
- "characters": deepcopy(analysis.get("characters", [])),
- "plot_line_events": {}, # name -> [str]
- "recent_beats": [],
- }
- for b in analysis.get("beats", [])[:beat_index]:
- changes = b.get("state_changes", {})
- # 更新线索状态 + 记录事件历史
- for pl in changes.get("plot_lines", []):
- matched = False
- for line in state["plot_lines"]:
- if line["name"] == pl["name"]:
- line["status"] = pl["new_state"]
- matched = True
- break
- if not matched:
- state["plot_lines"].append(
- {"name": pl["name"], "status": pl["new_state"],
- "mice_type": "?", "description": pl.get("new_state", "")}
- )
- event = f"{pl.get('old_state', '?')} → {pl['new_state']}"
- state["plot_line_events"].setdefault(pl["name"], []).append(event)
- # 更新人物近期变化
- for ch in changes.get("characters", []):
- for char in state["characters"]:
- if char["name"] == ch["name"]:
- char.setdefault("recent_changes", []).append(ch["change"])
- char["recent_changes"] = char["recent_changes"][-3:]
- break
- # 记录近期节拍(保留最近 5 个)
- state["recent_beats"].append({
- "id": b.get("id", ""),
- "type": b["type"],
- "summary": b.get("summary", ""),
- "outcome": b.get("disaster", "") if b["type"] == "scene" else b.get("decision", ""),
- })
- state["recent_beats"] = state["recent_beats"][-5:]
- return state
- def get_last_disaster_decision(beats: List[dict], before_index: int) -> tuple:
- """返回 beat_index 之前最后一个 scene 的 disaster 和 最后一个 sequel 的 decision"""
- last_disaster = "无(故事开局)"
- last_decision = "无(故事开局)"
- for b in beats[:before_index]:
- if b["type"] == "scene":
- last_disaster = b.get("disaster", "")
- elif b["type"] == "sequel":
- last_decision = b.get("decision", "")
- return last_disaster, last_decision
- def format_story_notes(
- analysis: dict,
- state: dict,
- last_disaster: str,
- last_decision: str,
- ) -> str:
- """
- 生成故事笔记(约 2000-4000 字符)。
- 包含 core_question/next_steps(线索)、speaking_style/current_state(人物)、writing_insights(窗口级)。
- """
- parts = []
- # 1. 主线摘要
- main_plot = analysis.get("outline", {}).get("main_plot", "")
- if main_plot:
- parts.append(f"**主线**:{main_plot}")
- # 2. 活跃剧情线索(含 core_question, next_steps, 历史事件)
- active = [pl for pl in state["plot_lines"]
- if pl.get("status") not in ["已解决", "已关闭"]]
- resolved = [pl for pl in state["plot_lines"]
- if pl.get("status") in ["已解决", "已关闭"]]
- if active:
- lines = ["**活跃线索**:"]
- for pl in active:
- mice = pl.get("mice_type", "?")
- events = state.get("plot_line_events", {}).get(pl["name"], [])
- ev_str = f"(进展:{';'.join(events[-3:])})" if events else ""
- cq = pl.get("core_question", "")
- ns = pl.get("next_steps", "")
- extra = ""
- if cq:
- extra += f" 核心问:{cq}"
- if ns:
- extra += f" 待推进:{ns}"
- lines.append(
- f"- [{mice}] {pl['name']}({pl['status']}):"
- f"{pl.get('description', '')}{ev_str}{extra}"
- )
- if resolved:
- lines.append(f"- 已结:{'、'.join(p['name'] for p in resolved)}")
- parts.append("\n".join(lines))
- # 3. 人物状态(含 speaking_style, current_state, 性格, 关系, 近期变化)
- if state["characters"]:
- lines = ["**人物**:"]
- for c in state["characters"]:
- segs = [f"{c['name']}({c.get('role', '?')})目标:{c.get('goal', '')}"]
- traits = c.get("traits", [])
- if traits:
- segs.append(f"性格:{'、'.join(traits)}")
- style = c.get("speaking_style", [])
- if style:
- style_str = ",".join(style) if isinstance(style, list) else str(style)
- segs.append(f"说话风格:{style_str}")
- cur_state = c.get("current_state", "")
- if cur_state:
- segs.append(f"当前处境:{cur_state}")
- rels = c.get("relationships", {})
- if rels:
- rel_items = [f"{k}→{v}" for k, v in list(rels.items())[:4]]
- segs.append(f"关系:{';'.join(rel_items)}")
- recent = c.get("recent_changes", [])
- if recent:
- segs.append(f"近期:{';'.join(recent)}")
- lines.append("- " + "。".join(segs))
- parts.append("\n".join(lines))
- # 4. 近期节拍
- recent_beats = state.get("recent_beats", [])
- if recent_beats:
- lines = ["**近期节拍**:"]
- for b in recent_beats:
- tag = "场景" if b["type"] == "scene" else "后续"
- outcome_label = "结局" if b["type"] == "scene" else "决定"
- outcome = f" → {outcome_label}:{b['outcome']}" if b.get("outcome") else ""
- lines.append(f"- [{b['id']}·{tag}] {b['summary']}{outcome}")
- parts.append("\n".join(lines))
- # 5. 写作亮点(窗口级,来自 step1 提取的 writing_insights)
- wi = analysis.get("writing_insights", {})
- if wi:
- wi_lines = []
- for item in wi.get("techniques", []):
- wi_lines.append(f"- 技巧:{item}")
- for item in wi.get("shuang_designs", []):
- wi_lines.append(f"- 爽点设计:{item}")
- for item in wi.get("pacing", []):
- wi_lines.append(f"- 节奏:{item}")
- if wi_lines:
- parts.append("**写作亮点**:\n" + "\n".join(wi_lines))
- # 6. 悬而未决
- parts.append(
- f"**待解决**:上一场景结局:{last_disaster};上一个决定:{last_decision}"
- )
- return "\n\n".join(parts)
- def calc_position_percent(beat: dict, total_chars: int) -> float:
- return round(beat.get("position_start", 0) / max(total_chars, 1) * 100, 1)
- # ──────────────────────────────────────────────────────────────
- # Task 1:结构规划(Structure Planning)
- # ──────────────────────────────────────────────────────────────
- TASK1_SYSTEM = """\
- 你是资深网文作者,擅长基于故事笔记规划场景。
- ## 核心能力
- 1. **分析笔记**:理解当前故事状态、活跃线索、人物动态
- 2. **规划场景**:基于笔记设计下一个场景的结构
- 3. **更新笔记**:记录场景对故事状态的改变
- ## 工作流程
- 1. 仔细阅读故事笔记(当前状态、活跃线索、待办事项)
- 2. 在 `<think>` 中展示你的思考过程(800-1500字)
- 3. 输出场景规划(JSON 格式)
- 4. 输出笔记更新(Markdown 格式)
- ---
- ## Think 要求
- 在 `<think>` 标签中,展示你真实的创作思维过程。**不要求固定格式**,但需要包含以下核心要素:
- 必须包含的要素:
- 1. **笔记分析**:当前故事进行到哪里?哪些线索在推进?主要角色的目标、冲突、关系状态;笔记中标记的待推进事项和风险点
- 2. **方案推演**:至少考虑 2-3 种不同的场景设计方案;对比各方案的优缺点;说明为什么选择某个方案
- 3. **笔记更新计划**:这个场景会推进哪些线索?哪些人物状态会变化?需要新增或完成哪些待推进事项?
- 鼓励的思维方式:
- - **跳跃联想**:从笔记的某个细节突然想到类似案例
- - **自我质疑**:推翻之前的想法,重新思考
- - **细节推敲**:对某个对话、动作、道具的反复打磨
- - **灵感闪现**:突然意识到某个巧妙的设计
- - **风险预警**:发现可能的逻辑漏洞或人设崩塌
- 不要求固定章节标题(如【笔记分析】【方案推演】),不需要按固定顺序展开,可以有口语化、跳跃、修正。
- ---
- ## 输出格式
- ### 1. 场景规划(JSON)
- ```json
- {
- "scene_type": "scene | sequel",
- "goal": "角色目标",
- "conflict_type": "冲突类型",
- "conflict_description": "...",
- "disaster": "场景结尾的灾难/转折(scene 类型必填)",
- "sequel": {"reaction": "...", "dilemma": "...", "decision": "..."},
- "pacing": "fast|medium|slow",
- "dialogue_ratio": 0.4,
- "shuang_point": {
- "has_shuang": true,
- "type": "打脸|升级|装逼|获得|碾压",
- "mechanism": "实现机制"
- },
- "hooks": ["悬念1", "悬念2"],
- "mice_threads": {
- "推进": ["线索名"],
- "开启": ["新线索名"],
- "解决": ["已完成线索名"]
- },
- "estimated_words": 2000
- }
- ```
- ### 2. 笔记更新(Markdown)
- ```markdown
- ## 笔记更新
- ### 剧情线索变化
- - [线索名]:[旧状态] → [新状态]
- - [新线索]:开启([简短描述])
- ### 人物状态变化
- - [角色名]:[变化描述]
- ### 待推进更新
- - [✓] [已完成事项]
- - [ ] [新增事项](紧急/重要)
- ### 新增写作亮点(可选)
- - [技巧/桥段]:[描述]
- ```
- """
- TASK1_USER_TMPL = """\
- ## 故事笔记
- - 书名:{title}
- - 当前位置:第 {chapter} 章,约 {position_pct}% 处
- {story_notes}
- ---
- ## 上文(最近 {context_chars} 字)
- {context_text}
- ## 任务
- 请基于故事笔记和上文,完成以下任务:
- 1. 分析当前故事状态(在 `<think>` 中展示你的思考过程)
- 2. 规划下一个场景的结构(JSON 格式)
- 3. 输出笔记更新(Markdown 格式)"""
- TASK1_COT_GEN_TMPL = """\
- ## 故事笔记
- - 书名:{title}
- - 当前位置:第 {chapter} 章,约 {position_pct}% 处
- {story_notes}
- ---
- ## 上文(最近 {context_chars} 字)
- {context_text}
- ## 参考信息(该节拍的实际内容摘要,仅用于帮你构建 CoT,禁止直接引用)
- 类型:{beat_type}
- 摘要:{beat_summary}
- 核心要素:{beat_core}
- 爽点:{shuang_info}
- ---
- 请以"事前规划"的视角展示你真实的创作思维过程(分析笔记状态、推演至少 2-3 个方案并对比优缺点、规划笔记更新),然后输出规划 JSON 和笔记更新。
- <think>
- [自由思考过程]
- </think>
- ```json
- {{
- "scene_type": "scene | sequel",
- "goal": "...",
- "conflict_type": "人物冲突|环境冲突|内心冲突|信息冲突",
- "conflict_description": "...",
- "disaster": "...",
- "sequel": {{"reaction": "...", "dilemma": "...", "decision": "..."}},
- "pacing": "fast|medium|slow",
- "dialogue_ratio": 0.4,
- "shuang_point": {{
- "has_shuang": true,
- "type": "打脸|升级|装逼|获得|碾压",
- "mechanism": "..."
- }},
- "hooks": [
- {{"type": "chapter_end|mid_chapter", "content": "..."}}
- ],
- "mice_threads": {{
- "推进": ["线索名"],
- "开启": ["新线索名"],
- "解决": ["已完成线索名"]
- }},
- "estimated_words": 2000
- }}
- ```
- ```markdown
- ## 笔记更新
- ### 剧情线索变化
- - [线索名]:[旧状态] → [新状态]
- ### 人物状态变化
- - [角色名]:[变化描述]
- ### 待推进更新
- - [✓] [已完成]
- - [ ] [新增](紧急/重要)
- ### 新增写作亮点(可选)
- - [技巧]:[描述]
- ```"""
- def _beat_core_str(beat: dict) -> str:
- if beat["type"] == "scene":
- return (
- f"goal={beat.get('goal', '')} "
- f"conflict={beat.get('conflict_description', '')} "
- f"disaster={beat.get('disaster', '')}"
- )
- return (
- f"reaction={beat.get('reaction', '')} "
- f"dilemma={beat.get('dilemma', '')} "
- f"decision={beat.get('decision', '')}"
- )
- def _shuang_str(beat: dict) -> str:
- sp = beat.get("shuang_point", {})
- if not sp.get("has_shuang"):
- return "无"
- return f"{sp.get('type', '')}({sp.get('intensity', '')}):{sp.get('description', '')}"
- async def gen_task1_sample(
- i: int,
- beat: dict,
- analysis: dict,
- novel_text: str,
- context_chars: int,
- model: str,
- sem: asyncio.Semaphore,
- ) -> Optional[dict]:
- async with sem:
- meta = analysis.get("_meta", {})
- title = meta.get("novel_title", "未知")
- total_chars = meta.get("total_chars", len(novel_text))
- beats = analysis.get("beats", [])
- state = build_state_snapshot(analysis, i)
- last_disaster, last_decision = get_last_disaster_decision(beats, i)
- chapter = beat.get("chapter_start", "?")
- position_pct = calc_position_percent(beat, total_chars)
- ctx_start = max(0, beat["position_start"] - context_chars)
- context_text = novel_text[ctx_start: beat["position_start"]].strip()
- story_notes = format_story_notes(analysis, state, last_disaster, last_decision)
- shared_kwargs = dict(
- title=title,
- chapter=chapter,
- position_pct=position_pct,
- story_notes=story_notes,
- context_chars=context_chars,
- context_text=context_text,
- )
- # 生成 CoT + 规划 JSON
- cot_prompt = TASK1_COT_GEN_TMPL.format(
- beat_type=beat["type"],
- beat_summary=beat.get("summary", ""),
- beat_core=_beat_core_str(beat),
- shuang_info=_shuang_str(beat),
- **shared_kwargs,
- )
- messages = [
- {"role": "system", "content": TASK1_SYSTEM},
- {"role": "user", "content": cot_prompt},
- ]
- try:
- assistant_content = await llm_call(messages, model=model)
- except ContentFilterError as e:
- print(f" [Task1] beat {i+1} 内容审查拦截,跳过:{e}")
- return None
- except Exception as e:
- print(f" [Task1] beat {i+1} LLM 调用失败:{e}")
- return None
- # 训练样本:用户只看到 story_state + context,不知道 beat 实际内容
- user_content = TASK1_USER_TMPL.format(**shared_kwargs)
- return {
- "messages": [
- {"role": "system", "content": TASK1_SYSTEM},
- {"role": "user", "content": user_content},
- {"role": "assistant", "content": assistant_content},
- ],
- "metadata": {
- "task_type": "structure_planning",
- "source_file": meta.get("novel_title", ""),
- "chapter": f"第{chapter}章",
- "position_percent": position_pct,
- "mice_thread": beat.get("mice_thread", ""),
- "beat_id": beat.get("id", ""),
- "beat_type": beat["type"],
- "word_count": beat["position_end"] - beat["position_start"],
- },
- }
- # ──────────────────────────────────────────────────────────────
- # Task 2:场景续写(Scene Continuation)
- # ──────────────────────────────────────────────────────────────
- TASK2_SYSTEM = (
- "你是一位专业的网文作家,擅长写爽文、悬疑和情感类长篇小说,"
- "能够根据结构规划生成节奏流畅、爽点鲜明的正文。"
- )
- TASK2_USER_TMPL = """\
- ## 故事笔记(概要)
- - 书名:{title},当前位置约 {position_pct}% 处
- {story_notes_brief}
- ---
- ## 上文
- {context_text}
- ## 结构规划
- {structure_plan}
- ## 任务
- 请续写下一段(约 {target_words} 字),风格与上文保持一致。"""
- TASK2_COT_GEN_TMPL = """\
- ## 故事笔记(概要)
- - 书名:{title},当前位置约 {position_pct}% 处
- {story_notes_brief}
- ---
- ## 上文
- {context_text}
- ## 结构规划
- {structure_plan}
- ## 参考信息(该节拍的实际续写内容,仅用于帮你构建 CoT,禁止逐句引用)
- {beat_text_hint}
- ---
- 请以"事前决策"的视角自由写出写作思考过程(上文衔接方式、爽点植入、人物动机、对话设计等,无需固定段落),然后直接输出实际续写内容。
- <think>
- [自由思考过程]
- </think>
- {actual_text}"""
- async def gen_task2_sample(
- i: int,
- beat: dict,
- analysis: dict,
- novel_text: str,
- task1_samples: list,
- context_chars: int,
- model: str,
- sem: asyncio.Semaphore,
- ) -> Optional[dict]:
- async with sem:
- meta = analysis.get("_meta", {})
- title = meta.get("novel_title", "未知")
- total_chars = meta.get("total_chars", len(novel_text))
- beats = analysis.get("beats", [])
- state = build_state_snapshot(analysis, i)
- last_disaster, last_decision = get_last_disaster_decision(beats, i)
- position_pct = calc_position_percent(beat, total_chars)
- ctx_start = max(0, beat["position_start"] - context_chars)
- context_text = novel_text[ctx_start: beat["position_start"]].strip()
- beat_text = novel_text[beat["position_start"]: beat["position_end"]].strip()
- if not beat_text:
- return None
- # Task2 使用精简版笔记:只含活跃线索和人物,不含近期节拍(上文已涵盖)
- story_notes_brief = format_story_notes(analysis, state, last_disaster, last_decision)
- # 从 Task1 样本中提取结构规划(assistant 输出部分)
- structure_plan = ""
- if i < len(task1_samples) and task1_samples[i]:
- for msg in task1_samples[i]["messages"]:
- if msg["role"] == "assistant":
- structure_plan = msg["content"]
- break
- if not structure_plan:
- structure_plan = f"(Task1 未生成,beat 摘要:{beat.get('summary', '')})"
- target_words = max(500, (beat["position_end"] - beat["position_start"]) // 2)
- # 只给 LLM 前 300 字作为 hint,避免泄露太多
- beat_hint = beat_text[:300] + "..." if len(beat_text) > 300 else beat_text
- cot_prompt = TASK2_COT_GEN_TMPL.format(
- title=title,
- position_pct=position_pct,
- story_notes_brief=story_notes_brief,
- context_text=context_text,
- structure_plan=structure_plan,
- beat_text_hint=beat_hint,
- actual_text=beat_text,
- )
- messages = [
- {"role": "system", "content": TASK2_SYSTEM},
- {"role": "user", "content": cot_prompt},
- ]
- try:
- cot_part = await llm_call(messages, model=model)
- except ContentFilterError as e:
- print(f" [Task2] beat {i+1} 内容审查拦截,跳过:{e}")
- return None
- except Exception as e:
- print(f" [Task2] beat {i+1} LLM 调用失败:{e}")
- return None
- # 确保输出格式:<think>...</think>\n\n{实际正文}
- if "<think>" in cot_part and beat_text not in cot_part:
- # LLM 只生成了 CoT,拼接实际文本
- think_end = cot_part.find("</think>")
- if think_end != -1:
- think_block = cot_part[: think_end + len("</think>")]
- assistant_content = f"{think_block}\n\n{beat_text}"
- else:
- assistant_content = f"{cot_part}\n\n{beat_text}"
- else:
- assistant_content = cot_part
- user_content = TASK2_USER_TMPL.format(
- title=title,
- position_pct=position_pct,
- story_notes_brief=story_notes_brief,
- context_text=context_text,
- structure_plan=structure_plan,
- target_words=target_words,
- )
- return {
- "messages": [
- {"role": "system", "content": TASK2_SYSTEM},
- {"role": "user", "content": user_content},
- {"role": "assistant", "content": assistant_content},
- ],
- "metadata": {
- "task_type": "scene_continuation",
- "source_file": meta.get("novel_title", ""),
- "chapter": f"第{beat.get('chapter_start', '?')}章",
- "position_percent": calc_position_percent(beat, total_chars),
- "mice_thread": beat.get("mice_thread", ""),
- "beat_id": beat.get("id", ""),
- "beat_type": beat["type"],
- "word_count": len(beat_text),
- },
- }
- # ──────────────────────────────────────────────────────────────
- # Task 3:爽点注入(Shuang Point Injection)
- # ──────────────────────────────────────────────────────────────
- TASK3_SYSTEM = (
- "你是一位专业的网文编辑,擅长识别和设计爽点(打脸、升级、装逼、获得、碾压),"
- "能在不改变核心情节的前提下大幅提升情感冲击力。"
- )
- TASK3_GEN_TMPL = """\
- ## 故事背景(用于理解爽点来源)
- {story_notes_brief}
- ---
- ## 原文(包含爽点的增强版)
- {beat_text}
- ---
- ## 任务
- 1. 判断这段文字是否包含明显爽点(打脸/升级/装逼/获得/碾压)
- 2. 如果有,生成去掉爽点后的"平淡草稿"(保留核心情节事件,但去掉爽感设计)
- 3. 以编辑视角,写出重新注入爽点的完整思考过程(CoT)和修改说明
- 注意:CoT 应分析人物性格/关系如何使这个爽点成立,以及与当前剧情线索的联动
- **输出格式(严格 JSON)**:
- ```json
- {{
- "has_shuang": true,
- "shuang_type": "打脸|升级|装逼|获得|碾压",
- "intensity": "low|medium|high",
- "flat_draft": "去掉爽点后的平淡版本(完整文字)",
- "cot": "<think>\\n[自由分析草稿问题和注入方案,结合人物特质和线索背景]\\n</think>",
- "modification_notes": "注入位置:...\\n爽点类型:...\\n关键改动:..."
- }}
- ```
- 如果不包含明显爽点,输出:`{{"has_shuang": false}}`"""
- TASK3_USER_TMPL = """\
- ## 故事背景
- {story_notes_brief}
- ---
- ## 平淡草稿
- {flat_draft}
- ## 要求
- - 爽点类型:{shuang_type}
- - 强度:{intensity}(low=轻微强化 | medium=明显提升 | high=大幅改写)
- - 不改变核心情节,只增强情感冲击力
- - 结合人物性格特质和当前剧情线索设计爽感
- ## 任务
- 请注入爽点,输出增强版本。"""
- async def gen_task3_sample(
- i: int,
- beat: dict,
- analysis: dict,
- novel_text: str,
- model: str,
- sem: asyncio.Semaphore,
- ) -> Optional[dict]:
- # 只处理有爽点的 beat
- sp = beat.get("shuang_point", {})
- if not sp.get("has_shuang"):
- return None
- async with sem:
- meta = analysis.get("_meta", {})
- total_chars = meta.get("total_chars", len(novel_text))
- beats = analysis.get("beats", [])
- state = build_state_snapshot(analysis, i)
- last_disaster, last_decision = get_last_disaster_decision(beats, i)
- story_notes_brief = format_story_notes(analysis, state, last_disaster, last_decision)
- beat_text = novel_text[beat["position_start"]: beat["position_end"]].strip()
- if len(beat_text) < 200:
- return None
- # 生成平淡草稿 + CoT
- gen_prompt = TASK3_GEN_TMPL.format(
- story_notes_brief=story_notes_brief,
- beat_text=beat_text,
- )
- messages = [
- {"role": "system", "content": TASK3_SYSTEM},
- {"role": "user", "content": gen_prompt},
- ]
- try:
- raw = await llm_call(messages, model=model)
- except ContentFilterError as e:
- print(f" [Task3] beat {i+1} 内容审查拦截,跳过:{e}")
- return None
- except Exception as e:
- print(f" [Task3] beat {i+1} LLM 调用失败:{e}")
- return None
- try:
- result = extract_json_block(raw)
- except Exception:
- # 保存原始响应供排查
- debug_path = Path(f"/tmp/task3_beat{i+1}_debug.txt")
- debug_path.write_text(raw, encoding="utf-8")
- print(f" [Task3] beat {i+1} JSON 解析失败,原始响应已保存至 {debug_path},跳过")
- return None
- if not result.get("has_shuang"):
- return None
- flat_draft = result.get("flat_draft", "")
- cot = result.get("cot", "")
- modification_notes = result.get("modification_notes", "")
- shuang_type = result.get("shuang_type", sp.get("type", ""))
- intensity = result.get("intensity", sp.get("intensity", "medium"))
- if not flat_draft or not cot:
- return None
- # 训练样本
- user_content = TASK3_USER_TMPL.format(
- story_notes_brief=story_notes_brief,
- flat_draft=flat_draft,
- shuang_type=shuang_type,
- intensity=intensity,
- )
- # 输出:CoT + 增强版(原文)+ 修改说明
- assistant_content = (
- f"{cot}\n\n"
- f"{beat_text}\n\n"
- f"---\n**修改说明**:\n{modification_notes}"
- )
- return {
- "messages": [
- {"role": "system", "content": TASK3_SYSTEM},
- {"role": "user", "content": user_content},
- {"role": "assistant", "content": assistant_content},
- ],
- "metadata": {
- "task_type": "shuang_injection",
- "source_file": meta.get("novel_title", ""),
- "chapter": f"第{beat.get('chapter_start', '?')}章",
- "position_percent": calc_position_percent(beat, total_chars),
- "shuang_type": shuang_type,
- "intensity": intensity,
- "beat_id": beat.get("id", ""),
- "word_count": len(beat_text),
- },
- }
- # ──────────────────────────────────────────────────────────────
- # 主流程
- # ──────────────────────────────────────────────────────────────
- async def build_all(
- analysis_path: str,
- novel_path: str,
- output_dir: str,
- context_chars: int,
- skip_tasks: Set[int],
- model: str,
- concurrency: int,
- max_beats: Optional[int] = None,
- ):
- with open(analysis_path, encoding="utf-8") as f:
- analysis = json.load(f)
- novel_text = load_text(novel_path)
- beats = analysis.get("beats", [])
- if max_beats is not None:
- beats = beats[:max_beats]
- analysis = dict(analysis, beats=beats) # 局部视图,不修改文件
- out = Path(output_dir)
- sem = asyncio.Semaphore(concurrency)
- print(f"\n分析文件:{analysis_path}")
- print(f"节拍数:{len(beats)}")
- print(f"输出目录:{out}")
- print(f"并发数:{concurrency}\n")
- stats = {}
- # ── Task 1 ──────────────────────────────────
- task1_samples: List[Optional[dict]] = [None] * len(beats)
- if 1 not in skip_tasks:
- print("[Task 1] 结构规划(Structure Planning)...")
- tasks = [
- gen_task1_sample(i, b, analysis, novel_text, context_chars, model, sem)
- for i, b in enumerate(beats)
- ]
- results = await asyncio.gather(*tasks)
- task1_samples = list(results)
- valid = [s for s in task1_samples if s]
- write_jsonl(valid, out / "task1_structure_planning.jsonl")
- stats["task1"] = {"total": len(beats), "valid": len(valid)}
- print(f" Task1 完成:{len(valid)}/{len(beats)} 条有效\n")
- # ── Task 2 ──────────────────────────────────
- if 2 not in skip_tasks:
- print("[Task 2] 场景续写(Scene Continuation)...")
- tasks = [
- gen_task2_sample(
- i, b, analysis, novel_text, task1_samples, context_chars, model, sem
- )
- for i, b in enumerate(beats)
- ]
- results = await asyncio.gather(*tasks)
- valid = [s for s in results if s]
- write_jsonl(valid, out / "task2_scene_continuation.jsonl")
- stats["task2"] = {"total": len(beats), "valid": len(valid)}
- print(f" Task2 完成:{len(valid)}/{len(beats)} 条有效\n")
- # ── Task 3 ──────────────────────────────────
- if 3 not in skip_tasks:
- shuang_beats = [b for b in beats if b.get("shuang_point", {}).get("has_shuang")]
- print(f"[Task 3] 爽点注入(Shuang Point Injection)... (共 {len(shuang_beats)} 个有爽点的 beat)")
- tasks = [
- gen_task3_sample(i, b, analysis, novel_text, model, sem)
- for i, b in enumerate(beats)
- ]
- results = await asyncio.gather(*tasks)
- valid = [s for s in results if s]
- write_jsonl(valid, out / "task3_shuang_injection.jsonl")
- stats["task3"] = {
- "total": len(shuang_beats),
- "valid": len(valid),
- }
- print(f" Task3 完成:{len(valid)}/{len(shuang_beats)} 条有效\n")
- # ── 统计 ──────────────────────────────────
- stats_path = out / "stats.json"
- stats_path.write_text(json.dumps(stats, ensure_ascii=False, indent=2), encoding="utf-8")
- print(f"统计信息 → {stats_path}")
- total_valid = sum(v.get("valid", 0) for v in stats.values())
- print(f"\n全部完成。总有效样本数:{total_valid}")
- def main():
- parser = argparse.ArgumentParser(description="步骤2:生成三类 SFT 训练数据")
- parser.add_argument("--analysis", required=True, help="step1 输出的 analysis JSON")
- parser.add_argument("--novel", required=True, help="小说 txt 文件路径")
- parser.add_argument("--output-dir", required=True, help="输出目录")
- parser.add_argument(
- "--context-chars", type=int, default=800,
- help="Task1/2 的上文字符数(默认 800)",
- )
- parser.add_argument(
- "--skip-task", type=int, action="append", default=[],
- metavar="N", help="跳过某个任务(1/2/3),可多次指定",
- )
- parser.add_argument(
- "--concurrency", type=int, default=5,
- help="并发 LLM 调用数(默认 5)",
- )
- parser.add_argument("--model", default="qwen-plus", help="使用的模型名称")
- parser.add_argument(
- "--max-beats", type=int, default=None,
- help="只处理前 N 个 beat(用于试运行验证)",
- )
- args = parser.parse_args()
- asyncio.run(
- build_all(
- args.analysis,
- args.novel,
- args.output_dir,
- args.context_chars,
- set(args.skip_task),
- args.model,
- args.concurrency,
- args.max_beats,
- )
- )
- if __name__ == "__main__":
- main()
|