#!/usr/bin/env python3 """ 步骤1:500K 窗口故事分析 定位方案:让 LLM 复述每个 beat 开头的原文片段作为锚点, 用 str.find() + 渐进缩短前缀 定位精确字符位置, 不依赖章节标题格式(适用于任意命名风格的小说)。 用法:通常通过 run_pipeline.py 自动调用,也可单独手动运行: python step1_analyze.py --novel ../input/大奉打更人.txt --output analysis/w0.json python step1_analyze.py --novel ../input/大奉打更人.txt \\ --window-index 1 --prev-analysis analysis/w0.json --output analysis/w1.json 环境变量(.env): ALI_API_KEY 阿里云 DashScope API Key ALI_BASE_URL (可选)默认 https://dashscope.aliyuncs.com/compatible-mode/v1 """ import os import re import json import asyncio import argparse from pathlib import Path from openai import AsyncOpenAI, BadRequestError, RateLimitError, APIError from typing import Optional, List from dotenv import load_dotenv load_dotenv() # 也尝试加载上级目录的 .env load_dotenv(Path(__file__).parent.parent / ".env") client = AsyncOpenAI( api_key=os.getenv("ALI_API_KEY"), base_url=os.getenv( "ALI_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1" ), ) WINDOW_SIZE = 500_000 # ────────────────────────────────────────────────────────────── # 文本加载 # ────────────────────────────────────────────────────────────── def load_text(path: str) -> str: for enc in ["utf-8", "gbk", "gb2312", "gb18030"]: try: return Path(path).read_text(encoding=enc) except UnicodeDecodeError: continue raise ValueError(f"无法解码文件: {path}") # ────────────────────────────────────────────────────────────── # 锚点定位 # ────────────────────────────────────────────────────────────── def find_anchor(window_text: str, anchor: str, search_from: int = 0) -> int: """ 在 window_text[search_from:] 中查找锚点,返回窗口内局部位置。 找不到时渐进缩短前缀(25→20→15→10→8字),仍找不到返回 -1。 """ if not anchor: return -1 for length in range(min(len(anchor), 40), 7, -1): pos = window_text.find(anchor[:length], search_from) if pos >= 0: return pos return -1 def resolve_positions(beats: List[dict], window_text: str, window_offset: int, window_end: int) -> None: """ 用 start_anchor 将每个 beat 定位到绝对字符位置,原地写入 position_start / position_end。 策略:顺序搜索,每次从上一个 beat 的位置向后找,避免误匹配。 """ search_from = 0 unresolved = [] for i, beat in enumerate(beats): anchor = beat.get("start_anchor", "") pos = find_anchor(window_text, anchor, search_from) if pos >= 0: beat["position_start"] = window_offset + pos beat["_anchor_resolved"] = True search_from = pos + 1 else: beat["position_start"] = -1 # 标记未解析 beat["_anchor_resolved"] = False unresolved.append(beat["id"]) # 对未解析的 beat 按比例估算位置 if unresolved: print(f" 警告:{len(unresolved)} 个 beat 锚点未找到,将按比例估算位置:{unresolved}") resolved = [b for b in beats if b["_anchor_resolved"]] total_beats = len(beats) for i, beat in enumerate(beats): if not beat["_anchor_resolved"]: # 在前后已解析 beat 之间插值 prev_pos = next( (beats[j]["position_start"] for j in range(i - 1, -1, -1) if beats[j]["_anchor_resolved"]), window_offset, ) next_pos = next( (beats[j]["position_start"] for j in range(i + 1, total_beats) if beats[j]["_anchor_resolved"]), window_end, ) beat["position_start"] = (prev_pos + next_pos) // 2 # 填写 position_end:每个 beat 的结束 = 下一个 beat 的开始 for i, beat in enumerate(beats): if i + 1 < len(beats): beat["position_end"] = beats[i + 1]["position_start"] else: beat["position_end"] = window_end # 清理内部标记字段 for beat in beats: beat.pop("_anchor_resolved", None) # ────────────────────────────────────────────────────────────── # LLM 调用 # ────────────────────────────────────────────────────────────── class ContentFilterError(Exception): """内容审查不通过""" async def llm_call(messages: list, model: str, temperature: float = 0.3, max_retries: int = 3) -> str: delay = 5.0 for attempt in range(1, max_retries + 2): try: resp = await client.chat.completions.create( model=model, messages=messages, temperature=temperature, max_tokens=8192, ) return resp.choices[0].message.content except BadRequestError as e: if "data_inspection_failed" in str(e) or "content_filter" in (getattr(e, "code", "") or ""): raise ContentFilterError(f"内容审查不通过: {e}") from e raise except (RateLimitError, APIError) as e: if attempt > max_retries: raise print(f" [重试 {attempt}/{max_retries}] {type(e).__name__}: {e},{delay:.0f}s 后重试...") await asyncio.sleep(delay) delay = min(delay * 2, 60) def extract_json(text: str) -> dict: """从 LLM 输出中提取 JSON,兼容 ```json...``` 包裹""" m = re.search(r"```json\s*(.*?)\s*```", text, re.DOTALL) json_str = m.group(1) if m else text.strip() try: return json.loads(json_str) except json.JSONDecodeError: json_str = re.sub(r",\s*([}\]])", r"\1", json_str) return json.loads(json_str) # ────────────────────────────────────────────────────────────── # 提示词 # ────────────────────────────────────────────────────────────── SYSTEM_ANALYST = ( "你是资深故事分析专家,精通 Scene-Sequel 结构、MICE 线程理论(Milieu/Idea/Character/Event)" "以及中国网文的爽点与钩子设计。请严格按指定 JSON 格式输出,不要有多余文字。" ) def build_prompt(window_text: str, prev_meta: Optional[dict], novel_title: str) -> str: prev_section = "" if prev_meta: prev_section = f"""## 前序窗口元信息(保持连贯性) ### 已知人物 {json.dumps(prev_meta.get("characters", []), ensure_ascii=False, indent=2)} ### 已知剧情线索(MICE 线程) {json.dumps(prev_meta.get("outline", {}).get("plot_lines", []), ensure_ascii=False, indent=2)} ### 前序主线摘要 {prev_meta.get("outline", {}).get("main_plot", "无")} --- """ # 根据窗口大小给出参考 beat 数量(以 20000 字/beat 为粗估基准,但强调以故事结构为准) window_chars = len(window_text) rough_beats = max(3, round(window_chars / 20000)) beat_guidance = ( f"本窗口约 {window_chars:,} 字。以 Scene-Sequel 叙事功能为切分依据:" f"Scene 在主角目标受阻并遭遇 Disaster 时结束,Sequel 在主角做出新 Decision 时结束,二者严格交替。" f"切分边界是叙事功能单元的完结,与章节标题、地点切换、视角变化无关。" f"根据本文实际节奏,预计大约 {rough_beats} 个节拍,但若故事结构明显更多或更少,以实际为准。" ) return f"""{prev_section}## 分析任务 书名:{novel_title} ### 1. 故事大纲 - **main_plot**:本窗口主线剧情摘要(200-300 字) - **plot_lines**:活跃/新增剧情线索,每条包含: - name、mice_type(M/I/C/E)、status(进行中/已解决/待推进) - description:线索核心矛盾与当前进展(50-80字) - core_question:一句话概括"这条线索要解答的根本问题"(≤30字) - next_steps:推进此线索的下一个关键动作或待揭示信息(≤40字) ### 2. 人物小传 主要人物(新出现 + 已有人物状态更新),每人包含: - name、role、goal(当前目标) - traits:性格特质(3-5个词组) - speaking_style:说话风格(2-3条典型特征,如"夹杂黑话与文言""关键处斩钉截铁") - current_state:本窗口末的最新状态(一句话,描述动态处境而非静态属性) - relationships:与其他角色的关系 ### 3. 写作亮点 分析本窗口的叙事技法,每条15-30字: - techniques:叙事/结构技巧(2-3条) - shuang_designs:爽点设计方式(2-3条,说明实现机制) - pacing:节奏处理特点(1-2条) ### 4. 节拍切分(Scene-Sequel 交替) **切分粒度**:{beat_guidance} **start_anchor 说明**(非常重要,直接影响定位精度): - 从下方【待分析文本】中,**一字不差**地逐字复制该节拍开头的 30-40 个字符(含标点) - **禁止**填写"从原文逐字复制"之类的说明文字,也**禁止**照抄上方示例中的占位符——必须是待分析文本中的真实字符 - 选择该节拍真正开始的位置,而非章节标题 - 避免选择可能多处出现的通用短语(如"他说""道"等) **节拍要素**: - Scene:goal / conflict_type(人物冲突|环境冲突|内心冲突|信息冲突)/ conflict_description / disaster - Sequel:reaction / dilemma / decision - 每个 beat 必须有:mice_thread(推进的 MICE 线程名称)、shuang_point、state_changes **shuang_point 格式**: ``` "shuang_point": {{ "has_shuang": true/false, "type": "打脸|升级|装逼|获得|碾压|无", "intensity": "low|medium|high|none", "description": "具体内容(如有)" }} ``` ## 输出格式(严格 JSON) ```json {{ "outline": {{ "main_plot": "...", "plot_lines": [ {{ "name": "税银案", "mice_type": "E", "status": "进行中", "description": "...", "core_question": "真银被谁调包?", "next_steps": "锁定御刀卫陆姓经手人" }} ] }}, "characters": [ {{ "name": "...", "role": "主角", "goal": "...", "traits": ["机智"], "speaking_style": ["夹杂现代俚语与古语混搭", "关键处斩钉截铁"], "current_state": "刚凭推理翻盘,获临时协查资格,尚未脱牢", "relationships": {{"角色A": "关系"}} }} ], "writing_insights": {{ "techniques": ["信息差分层释放:主角全知,古代角色见表象,层层迟滞"], "shuang_designs": ["逻辑型装逼:靠算术/化学原理碾压,非武力打脸"], "pacing": ["对话占比65%,每章2-3次场景切换,无大段独白"] }}, "beats": [ {{ "id": "beat_001", "type": "scene", "start_anchor": "【此处填入待分析文本原文开头20-30字】", "mice_thread": "税银案", "summary": "...", "goal": "...", "conflict_type": "人物冲突", "conflict_description": "...", "disaster": "...", "shuang_point": {{ "has_shuang": true, "type": "智商碾压", "intensity": "high", "description": "..." }}, "state_changes": {{ "plot_lines": [{{"name": "税银案", "old_state": "调查中", "new_state": "发现破绽"}}], "characters": [{{"name": "许七安", "change": "从囚犯转变为关键证人"}}] }} }}, {{ "id": "beat_002", "type": "sequel", "start_anchor": "【此处填入待分析文本原文开头20-30字】", "mice_thread": "身份成长", "summary": "...", "reaction": "...", "dilemma": "...", "decision": "...", "shuang_point": {{"has_shuang": false, "type": "无", "intensity": "none", "description": ""}}, "state_changes": {{ "characters": [{{"name": "许七安", "change": "心态从被动转为主动"}}] }} }} ] }} ``` ## 待分析文本 {window_text} """ # ────────────────────────────────────────────────────────────── # 主流程 # ────────────────────────────────────────────────────────────── async def analyze_window( novel_path: str, window_index: int, prev_analysis_path: Optional[str], output_path: str, model: str, window_size: int = WINDOW_SIZE, ): print(f"\n{'='*60}") print(f"窗口 {window_index} 分析") print(f"{'='*60}") text = load_text(novel_path) total = len(text) print(f"全文:{total:,} 字符") start = window_index * window_size end = min(start + window_size, total) if start >= total: print("起始位置超过文件长度,退出。") return window = text[start:end] print(f"窗口范围:{start:,} - {end:,}({end - start:,} 字符)") prev_meta = None if prev_analysis_path and Path(prev_analysis_path).exists(): with open(prev_analysis_path, encoding="utf-8") as f: prev_meta = json.load(f) print(f"加载前序分析:{prev_analysis_path}") novel_title = Path(novel_path).stem prompt = build_prompt(window, prev_meta, novel_title) messages = [ {"role": "system", "content": SYSTEM_ANALYST}, {"role": "user", "content": prompt}, ] print(f"调用 LLM({model})...") raw = await llm_call(messages, model=model) print("解析 JSON...") try: analysis = extract_json(raw) except json.JSONDecodeError as e: err_path = Path(output_path).with_suffix(".error.txt") Path(output_path).parent.mkdir(parents=True, exist_ok=True) err_path.write_text(raw, encoding="utf-8") print(f"JSON 解析失败:{e}\n原始响应已保存到 {err_path}") raise # 锚点定位 beats = analysis.get("beats", []) resolve_positions(beats, window, window_offset=start, window_end=end) analysis["_meta"] = { "novel_title": novel_title, "window_index": window_index, "window_start": start, "window_end": end, "total_chars": total, "window_size": window_size, "beats_count": len(beats), "model": model, } out = Path(output_path) out.parent.mkdir(parents=True, exist_ok=True) out.write_text(json.dumps(analysis, ensure_ascii=False, indent=2), encoding="utf-8") print(f"\n分析完成 → {output_path}") print(f" 节拍数:{len(beats)} 人物:{len(analysis.get('characters', []))} 线索:{len(analysis.get('outline', {}).get('plot_lines', []))}") print() print(f" {'id':<12} {'type':<8} {'position':<24} {'mice':<14} {'sp'} summary") print(f" {'-'*75}") for b in beats: pos = f"{b['position_start']:,}-{b['position_end']:,}" sp = "⭐" if b.get("shuang_point", {}).get("has_shuang") else " " anchor_ok = "✓" if b.get("start_anchor") else "✗" print( f" {b['id']:<12} {b['type']:<8} {pos:<24} " f"{b.get('mice_thread','?'):<14} {sp} [{anchor_ok}] {b['summary'][:28]}..." ) def main(): parser = argparse.ArgumentParser(description="步骤1:500K 窗口故事分析") parser.add_argument("--novel", required=True, help="小说 txt 文件路径") parser.add_argument("--window-index", type=int, default=0, help="窗口序号(0-based)") parser.add_argument("--window-size", type=int, default=WINDOW_SIZE, help="窗口大小(字符数)") parser.add_argument("--prev-analysis", default=None, help="前一窗口的分析 JSON") parser.add_argument("--output", required=True, help="输出 JSON 文件路径") parser.add_argument("--model", default="qwen-plus", help="模型名称") args = parser.parse_args() asyncio.run( analyze_window( args.novel, args.window_index, args.prev_analysis, args.output, args.model, args.window_size, ) ) if __name__ == "__main__": main()