| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445 |
- #!/usr/bin/env python3
- """
- 步骤1:500K 窗口故事分析
- 定位方案:让 LLM 复述每个 beat 开头的原文片段作为锚点,
- 用 str.find() + 渐进缩短前缀 定位精确字符位置,
- 不依赖章节标题格式(适用于任意命名风格的小说)。
- 用法:通常通过 run_pipeline.py 自动调用,也可单独手动运行:
- python step1_analyze.py --novel ../input/大奉打更人.txt --output analysis/w0.json
- python step1_analyze.py --novel ../input/大奉打更人.txt \\
- --window-index 1 --prev-analysis analysis/w0.json --output analysis/w1.json
- 环境变量(.env):
- ALI_API_KEY 阿里云 DashScope API Key
- ALI_BASE_URL (可选)默认 https://dashscope.aliyuncs.com/compatible-mode/v1
- """
- import os
- import re
- import json
- import asyncio
- import argparse
- from pathlib import Path
- from openai import AsyncOpenAI, BadRequestError, RateLimitError, APIError
- from typing import Optional, List
- from dotenv import load_dotenv
- load_dotenv()
- # 也尝试加载上级目录的 .env
- load_dotenv(Path(__file__).parent.parent / ".env")
- client = AsyncOpenAI(
- api_key=os.getenv("ALI_API_KEY"),
- base_url=os.getenv(
- "ALI_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1"
- ),
- )
- WINDOW_SIZE = 500_000
- # ──────────────────────────────────────────────────────────────
- # 文本加载
- # ──────────────────────────────────────────────────────────────
- def load_text(path: str) -> str:
- for enc in ["utf-8", "gbk", "gb2312", "gb18030"]:
- try:
- return Path(path).read_text(encoding=enc)
- except UnicodeDecodeError:
- continue
- raise ValueError(f"无法解码文件: {path}")
- # ──────────────────────────────────────────────────────────────
- # 锚点定位
- # ──────────────────────────────────────────────────────────────
- def find_anchor(window_text: str, anchor: str, search_from: int = 0) -> int:
- """
- 在 window_text[search_from:] 中查找锚点,返回窗口内局部位置。
- 找不到时渐进缩短前缀(25→20→15→10→8字),仍找不到返回 -1。
- """
- if not anchor:
- return -1
- for length in range(min(len(anchor), 40), 7, -1):
- pos = window_text.find(anchor[:length], search_from)
- if pos >= 0:
- return pos
- return -1
- def resolve_positions(beats: List[dict], window_text: str, window_offset: int, window_end: int) -> None:
- """
- 用 start_anchor 将每个 beat 定位到绝对字符位置,原地写入
- position_start / position_end。
- 策略:顺序搜索,每次从上一个 beat 的位置向后找,避免误匹配。
- """
- search_from = 0
- unresolved = []
- for i, beat in enumerate(beats):
- anchor = beat.get("start_anchor", "")
- pos = find_anchor(window_text, anchor, search_from)
- if pos >= 0:
- beat["position_start"] = window_offset + pos
- beat["_anchor_resolved"] = True
- search_from = pos + 1
- else:
- beat["position_start"] = -1 # 标记未解析
- beat["_anchor_resolved"] = False
- unresolved.append(beat["id"])
- # 对未解析的 beat 按比例估算位置
- if unresolved:
- print(f" 警告:{len(unresolved)} 个 beat 锚点未找到,将按比例估算位置:{unresolved}")
- resolved = [b for b in beats if b["_anchor_resolved"]]
- total_beats = len(beats)
- for i, beat in enumerate(beats):
- if not beat["_anchor_resolved"]:
- # 在前后已解析 beat 之间插值
- prev_pos = next(
- (beats[j]["position_start"] for j in range(i - 1, -1, -1) if beats[j]["_anchor_resolved"]),
- window_offset,
- )
- next_pos = next(
- (beats[j]["position_start"] for j in range(i + 1, total_beats) if beats[j]["_anchor_resolved"]),
- window_end,
- )
- beat["position_start"] = (prev_pos + next_pos) // 2
- # 填写 position_end:每个 beat 的结束 = 下一个 beat 的开始
- for i, beat in enumerate(beats):
- if i + 1 < len(beats):
- beat["position_end"] = beats[i + 1]["position_start"]
- else:
- beat["position_end"] = window_end
- # 清理内部标记字段
- for beat in beats:
- beat.pop("_anchor_resolved", None)
- # ──────────────────────────────────────────────────────────────
- # LLM 调用
- # ──────────────────────────────────────────────────────────────
- class ContentFilterError(Exception):
- """内容审查不通过"""
- async def llm_call(messages: list, model: str, temperature: float = 0.3, max_retries: int = 3) -> str:
- delay = 5.0
- for attempt in range(1, max_retries + 2):
- try:
- resp = await client.chat.completions.create(
- model=model,
- messages=messages,
- temperature=temperature,
- max_tokens=8192,
- )
- return resp.choices[0].message.content
- except BadRequestError as e:
- if "data_inspection_failed" in str(e) or "content_filter" in (getattr(e, "code", "") or ""):
- raise ContentFilterError(f"内容审查不通过: {e}") from e
- raise
- except (RateLimitError, APIError) as e:
- if attempt > max_retries:
- raise
- print(f" [重试 {attempt}/{max_retries}] {type(e).__name__}: {e},{delay:.0f}s 后重试...")
- await asyncio.sleep(delay)
- delay = min(delay * 2, 60)
- def extract_json(text: str) -> dict:
- """从 LLM 输出中提取 JSON,兼容 ```json...``` 包裹"""
- m = re.search(r"```json\s*(.*?)\s*```", text, re.DOTALL)
- json_str = m.group(1) if m else text.strip()
- try:
- return json.loads(json_str)
- except json.JSONDecodeError:
- json_str = re.sub(r",\s*([}\]])", r"\1", json_str)
- return json.loads(json_str)
- # ──────────────────────────────────────────────────────────────
- # 提示词
- # ──────────────────────────────────────────────────────────────
- SYSTEM_ANALYST = (
- "你是资深故事分析专家,精通 Scene-Sequel 结构、MICE 线程理论(Milieu/Idea/Character/Event)"
- "以及中国网文的爽点与钩子设计。请严格按指定 JSON 格式输出,不要有多余文字。"
- )
- def build_prompt(window_text: str, prev_meta: Optional[dict], novel_title: str) -> str:
- prev_section = ""
- if prev_meta:
- prev_section = f"""## 前序窗口元信息(保持连贯性)
- ### 已知人物
- {json.dumps(prev_meta.get("characters", []), ensure_ascii=False, indent=2)}
- ### 已知剧情线索(MICE 线程)
- {json.dumps(prev_meta.get("outline", {}).get("plot_lines", []), ensure_ascii=False, indent=2)}
- ### 前序主线摘要
- {prev_meta.get("outline", {}).get("main_plot", "无")}
- ---
- """
- # 根据窗口大小给出参考 beat 数量(以 20000 字/beat 为粗估基准,但强调以故事结构为准)
- window_chars = len(window_text)
- rough_beats = max(3, round(window_chars / 20000))
- beat_guidance = (
- f"本窗口约 {window_chars:,} 字。以 Scene-Sequel 叙事功能为切分依据:"
- f"Scene 在主角目标受阻并遭遇 Disaster 时结束,Sequel 在主角做出新 Decision 时结束,二者严格交替。"
- f"切分边界是叙事功能单元的完结,与章节标题、地点切换、视角变化无关。"
- f"根据本文实际节奏,预计大约 {rough_beats} 个节拍,但若故事结构明显更多或更少,以实际为准。"
- )
- return f"""{prev_section}## 分析任务
- 书名:{novel_title}
- ### 1. 故事大纲
- - **main_plot**:本窗口主线剧情摘要(200-300 字)
- - **plot_lines**:活跃/新增剧情线索,每条包含:
- - name、mice_type(M/I/C/E)、status(进行中/已解决/待推进)
- - description:线索核心矛盾与当前进展(50-80字)
- - core_question:一句话概括"这条线索要解答的根本问题"(≤30字)
- - next_steps:推进此线索的下一个关键动作或待揭示信息(≤40字)
- ### 2. 人物小传
- 主要人物(新出现 + 已有人物状态更新),每人包含:
- - name、role、goal(当前目标)
- - traits:性格特质(3-5个词组)
- - speaking_style:说话风格(2-3条典型特征,如"夹杂黑话与文言""关键处斩钉截铁")
- - current_state:本窗口末的最新状态(一句话,描述动态处境而非静态属性)
- - relationships:与其他角色的关系
- ### 3. 写作亮点
- 分析本窗口的叙事技法,每条15-30字:
- - techniques:叙事/结构技巧(2-3条)
- - shuang_designs:爽点设计方式(2-3条,说明实现机制)
- - pacing:节奏处理特点(1-2条)
- ### 4. 节拍切分(Scene-Sequel 交替)
- **切分粒度**:{beat_guidance}
- **start_anchor 说明**(非常重要,直接影响定位精度):
- - 从下方【待分析文本】中,**一字不差**地逐字复制该节拍开头的 30-40 个字符(含标点)
- - **禁止**填写"从原文逐字复制"之类的说明文字,也**禁止**照抄上方示例中的占位符——必须是待分析文本中的真实字符
- - 选择该节拍真正开始的位置,而非章节标题
- - 避免选择可能多处出现的通用短语(如"他说""道"等)
- **节拍要素**:
- - Scene:goal / conflict_type(人物冲突|环境冲突|内心冲突|信息冲突)/ conflict_description / disaster
- - Sequel:reaction / dilemma / decision
- - 每个 beat 必须有:mice_thread(推进的 MICE 线程名称)、shuang_point、state_changes
- **shuang_point 格式**:
- ```
- "shuang_point": {{
- "has_shuang": true/false,
- "type": "打脸|升级|装逼|获得|碾压|无",
- "intensity": "low|medium|high|none",
- "description": "具体内容(如有)"
- }}
- ```
- ## 输出格式(严格 JSON)
- ```json
- {{
- "outline": {{
- "main_plot": "...",
- "plot_lines": [
- {{
- "name": "税银案", "mice_type": "E", "status": "进行中",
- "description": "...",
- "core_question": "真银被谁调包?",
- "next_steps": "锁定御刀卫陆姓经手人"
- }}
- ]
- }},
- "characters": [
- {{
- "name": "...", "role": "主角", "goal": "...",
- "traits": ["机智"],
- "speaking_style": ["夹杂现代俚语与古语混搭", "关键处斩钉截铁"],
- "current_state": "刚凭推理翻盘,获临时协查资格,尚未脱牢",
- "relationships": {{"角色A": "关系"}}
- }}
- ],
- "writing_insights": {{
- "techniques": ["信息差分层释放:主角全知,古代角色见表象,层层迟滞"],
- "shuang_designs": ["逻辑型装逼:靠算术/化学原理碾压,非武力打脸"],
- "pacing": ["对话占比65%,每章2-3次场景切换,无大段独白"]
- }},
- "beats": [
- {{
- "id": "beat_001",
- "type": "scene",
- "start_anchor": "【此处填入待分析文本原文开头20-30字】",
- "mice_thread": "税银案",
- "summary": "...",
- "goal": "...",
- "conflict_type": "人物冲突",
- "conflict_description": "...",
- "disaster": "...",
- "shuang_point": {{
- "has_shuang": true,
- "type": "智商碾压",
- "intensity": "high",
- "description": "..."
- }},
- "state_changes": {{
- "plot_lines": [{{"name": "税银案", "old_state": "调查中", "new_state": "发现破绽"}}],
- "characters": [{{"name": "许七安", "change": "从囚犯转变为关键证人"}}]
- }}
- }},
- {{
- "id": "beat_002",
- "type": "sequel",
- "start_anchor": "【此处填入待分析文本原文开头20-30字】",
- "mice_thread": "身份成长",
- "summary": "...",
- "reaction": "...",
- "dilemma": "...",
- "decision": "...",
- "shuang_point": {{"has_shuang": false, "type": "无", "intensity": "none", "description": ""}},
- "state_changes": {{
- "characters": [{{"name": "许七安", "change": "心态从被动转为主动"}}]
- }}
- }}
- ]
- }}
- ```
- ## 待分析文本
- {window_text}
- """
- # ──────────────────────────────────────────────────────────────
- # 主流程
- # ──────────────────────────────────────────────────────────────
- async def analyze_window(
- novel_path: str,
- window_index: int,
- prev_analysis_path: Optional[str],
- output_path: str,
- model: str,
- window_size: int = WINDOW_SIZE,
- ):
- print(f"\n{'='*60}")
- print(f"窗口 {window_index} 分析")
- print(f"{'='*60}")
- text = load_text(novel_path)
- total = len(text)
- print(f"全文:{total:,} 字符")
- start = window_index * window_size
- end = min(start + window_size, total)
- if start >= total:
- print("起始位置超过文件长度,退出。")
- return
- window = text[start:end]
- print(f"窗口范围:{start:,} - {end:,}({end - start:,} 字符)")
- prev_meta = None
- if prev_analysis_path and Path(prev_analysis_path).exists():
- with open(prev_analysis_path, encoding="utf-8") as f:
- prev_meta = json.load(f)
- print(f"加载前序分析:{prev_analysis_path}")
- novel_title = Path(novel_path).stem
- prompt = build_prompt(window, prev_meta, novel_title)
- messages = [
- {"role": "system", "content": SYSTEM_ANALYST},
- {"role": "user", "content": prompt},
- ]
- print(f"调用 LLM({model})...")
- raw = await llm_call(messages, model=model)
- print("解析 JSON...")
- try:
- analysis = extract_json(raw)
- except json.JSONDecodeError as e:
- err_path = Path(output_path).with_suffix(".error.txt")
- Path(output_path).parent.mkdir(parents=True, exist_ok=True)
- err_path.write_text(raw, encoding="utf-8")
- print(f"JSON 解析失败:{e}\n原始响应已保存到 {err_path}")
- raise
- # 锚点定位
- beats = analysis.get("beats", [])
- resolve_positions(beats, window, window_offset=start, window_end=end)
- analysis["_meta"] = {
- "novel_title": novel_title,
- "window_index": window_index,
- "window_start": start,
- "window_end": end,
- "total_chars": total,
- "window_size": window_size,
- "beats_count": len(beats),
- "model": model,
- }
- out = Path(output_path)
- out.parent.mkdir(parents=True, exist_ok=True)
- out.write_text(json.dumps(analysis, ensure_ascii=False, indent=2), encoding="utf-8")
- print(f"\n分析完成 → {output_path}")
- print(f" 节拍数:{len(beats)} 人物:{len(analysis.get('characters', []))} 线索:{len(analysis.get('outline', {}).get('plot_lines', []))}")
- print()
- print(f" {'id':<12} {'type':<8} {'position':<24} {'mice':<14} {'sp'} summary")
- print(f" {'-'*75}")
- for b in beats:
- pos = f"{b['position_start']:,}-{b['position_end']:,}"
- sp = "⭐" if b.get("shuang_point", {}).get("has_shuang") else " "
- anchor_ok = "✓" if b.get("start_anchor") else "✗"
- print(
- f" {b['id']:<12} {b['type']:<8} {pos:<24} "
- f"{b.get('mice_thread','?'):<14} {sp} [{anchor_ok}] {b['summary'][:28]}..."
- )
- def main():
- parser = argparse.ArgumentParser(description="步骤1:500K 窗口故事分析")
- parser.add_argument("--novel", required=True, help="小说 txt 文件路径")
- parser.add_argument("--window-index", type=int, default=0, help="窗口序号(0-based)")
- parser.add_argument("--window-size", type=int, default=WINDOW_SIZE, help="窗口大小(字符数)")
- parser.add_argument("--prev-analysis", default=None, help="前一窗口的分析 JSON")
- parser.add_argument("--output", required=True, help="输出 JSON 文件路径")
- parser.add_argument("--model", default="qwen-plus", help="模型名称")
- args = parser.parse_args()
- asyncio.run(
- analyze_window(
- args.novel,
- args.window_index,
- args.prev_analysis,
- args.output,
- args.model,
- args.window_size,
- )
- )
- if __name__ == "__main__":
- main()
|