step1_analyze.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. #!/usr/bin/env python3
  2. """
  3. 步骤1:500K 窗口故事分析
  4. 定位方案:让 LLM 复述每个 beat 开头的原文片段作为锚点,
  5. 用 str.find() + 渐进缩短前缀 定位精确字符位置,
  6. 不依赖章节标题格式(适用于任意命名风格的小说)。
  7. 用法:通常通过 run_pipeline.py 自动调用,也可单独手动运行:
  8. python step1_analyze.py --novel ../input/大奉打更人.txt --output analysis/w0.json
  9. python step1_analyze.py --novel ../input/大奉打更人.txt \\
  10. --window-index 1 --prev-analysis analysis/w0.json --output analysis/w1.json
  11. 环境变量(.env):
  12. ALI_API_KEY 阿里云 DashScope API Key
  13. ALI_BASE_URL (可选)默认 https://dashscope.aliyuncs.com/compatible-mode/v1
  14. """
  15. import os
  16. import re
  17. import json
  18. import asyncio
  19. import argparse
  20. from pathlib import Path
  21. from openai import AsyncOpenAI, BadRequestError, RateLimitError, APIError
  22. from typing import Optional, List
  23. from dotenv import load_dotenv
  24. load_dotenv()
  25. # 也尝试加载上级目录的 .env
  26. load_dotenv(Path(__file__).parent.parent / ".env")
  27. client = AsyncOpenAI(
  28. api_key=os.getenv("ALI_API_KEY"),
  29. base_url=os.getenv(
  30. "ALI_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1"
  31. ),
  32. )
  33. WINDOW_SIZE = 500_000
  34. # ──────────────────────────────────────────────────────────────
  35. # 文本加载
  36. # ──────────────────────────────────────────────────────────────
  37. def load_text(path: str) -> str:
  38. for enc in ["utf-8", "gbk", "gb2312", "gb18030"]:
  39. try:
  40. return Path(path).read_text(encoding=enc)
  41. except UnicodeDecodeError:
  42. continue
  43. raise ValueError(f"无法解码文件: {path}")
  44. # ──────────────────────────────────────────────────────────────
  45. # 锚点定位
  46. # ──────────────────────────────────────────────────────────────
  47. def find_anchor(window_text: str, anchor: str, search_from: int = 0) -> int:
  48. """
  49. 在 window_text[search_from:] 中查找锚点,返回窗口内局部位置。
  50. 找不到时渐进缩短前缀(25→20→15→10→8字),仍找不到返回 -1。
  51. """
  52. if not anchor:
  53. return -1
  54. for length in range(min(len(anchor), 40), 7, -1):
  55. pos = window_text.find(anchor[:length], search_from)
  56. if pos >= 0:
  57. return pos
  58. return -1
  59. def resolve_positions(beats: List[dict], window_text: str, window_offset: int, window_end: int) -> None:
  60. """
  61. 用 start_anchor 将每个 beat 定位到绝对字符位置,原地写入
  62. position_start / position_end。
  63. 策略:顺序搜索,每次从上一个 beat 的位置向后找,避免误匹配。
  64. """
  65. search_from = 0
  66. unresolved = []
  67. for i, beat in enumerate(beats):
  68. anchor = beat.get("start_anchor", "")
  69. pos = find_anchor(window_text, anchor, search_from)
  70. if pos >= 0:
  71. beat["position_start"] = window_offset + pos
  72. beat["_anchor_resolved"] = True
  73. search_from = pos + 1
  74. else:
  75. beat["position_start"] = -1 # 标记未解析
  76. beat["_anchor_resolved"] = False
  77. unresolved.append(beat["id"])
  78. # 对未解析的 beat 按比例估算位置
  79. if unresolved:
  80. print(f" 警告:{len(unresolved)} 个 beat 锚点未找到,将按比例估算位置:{unresolved}")
  81. resolved = [b for b in beats if b["_anchor_resolved"]]
  82. total_beats = len(beats)
  83. for i, beat in enumerate(beats):
  84. if not beat["_anchor_resolved"]:
  85. # 在前后已解析 beat 之间插值
  86. prev_pos = next(
  87. (beats[j]["position_start"] for j in range(i - 1, -1, -1) if beats[j]["_anchor_resolved"]),
  88. window_offset,
  89. )
  90. next_pos = next(
  91. (beats[j]["position_start"] for j in range(i + 1, total_beats) if beats[j]["_anchor_resolved"]),
  92. window_end,
  93. )
  94. beat["position_start"] = (prev_pos + next_pos) // 2
  95. # 填写 position_end:每个 beat 的结束 = 下一个 beat 的开始
  96. for i, beat in enumerate(beats):
  97. if i + 1 < len(beats):
  98. beat["position_end"] = beats[i + 1]["position_start"]
  99. else:
  100. beat["position_end"] = window_end
  101. # 清理内部标记字段
  102. for beat in beats:
  103. beat.pop("_anchor_resolved", None)
  104. # ──────────────────────────────────────────────────────────────
  105. # LLM 调用
  106. # ──────────────────────────────────────────────────────────────
  107. class ContentFilterError(Exception):
  108. """内容审查不通过"""
  109. async def llm_call(messages: list, model: str, temperature: float = 0.3, max_retries: int = 3) -> str:
  110. delay = 5.0
  111. for attempt in range(1, max_retries + 2):
  112. try:
  113. resp = await client.chat.completions.create(
  114. model=model,
  115. messages=messages,
  116. temperature=temperature,
  117. max_tokens=8192,
  118. )
  119. return resp.choices[0].message.content
  120. except BadRequestError as e:
  121. if "data_inspection_failed" in str(e) or "content_filter" in (getattr(e, "code", "") or ""):
  122. raise ContentFilterError(f"内容审查不通过: {e}") from e
  123. raise
  124. except (RateLimitError, APIError) as e:
  125. if attempt > max_retries:
  126. raise
  127. print(f" [重试 {attempt}/{max_retries}] {type(e).__name__}: {e},{delay:.0f}s 后重试...")
  128. await asyncio.sleep(delay)
  129. delay = min(delay * 2, 60)
  130. def extract_json(text: str) -> dict:
  131. """从 LLM 输出中提取 JSON,兼容 ```json...``` 包裹"""
  132. m = re.search(r"```json\s*(.*?)\s*```", text, re.DOTALL)
  133. json_str = m.group(1) if m else text.strip()
  134. try:
  135. return json.loads(json_str)
  136. except json.JSONDecodeError:
  137. json_str = re.sub(r",\s*([}\]])", r"\1", json_str)
  138. return json.loads(json_str)
  139. # ──────────────────────────────────────────────────────────────
  140. # 提示词
  141. # ──────────────────────────────────────────────────────────────
  142. SYSTEM_ANALYST = (
  143. "你是资深故事分析专家,精通 Scene-Sequel 结构、MICE 线程理论(Milieu/Idea/Character/Event)"
  144. "以及中国网文的爽点与钩子设计。请严格按指定 JSON 格式输出,不要有多余文字。"
  145. )
  146. def build_prompt(window_text: str, prev_meta: Optional[dict], novel_title: str) -> str:
  147. prev_section = ""
  148. if prev_meta:
  149. prev_section = f"""## 前序窗口元信息(保持连贯性)
  150. ### 已知人物
  151. {json.dumps(prev_meta.get("characters", []), ensure_ascii=False, indent=2)}
  152. ### 已知剧情线索(MICE 线程)
  153. {json.dumps(prev_meta.get("outline", {}).get("plot_lines", []), ensure_ascii=False, indent=2)}
  154. ### 前序主线摘要
  155. {prev_meta.get("outline", {}).get("main_plot", "无")}
  156. ---
  157. """
  158. # 根据窗口大小给出参考 beat 数量(以 20000 字/beat 为粗估基准,但强调以故事结构为准)
  159. window_chars = len(window_text)
  160. rough_beats = max(3, round(window_chars / 20000))
  161. beat_guidance = (
  162. f"本窗口约 {window_chars:,} 字。以 Scene-Sequel 叙事功能为切分依据:"
  163. f"Scene 在主角目标受阻并遭遇 Disaster 时结束,Sequel 在主角做出新 Decision 时结束,二者严格交替。"
  164. f"切分边界是叙事功能单元的完结,与章节标题、地点切换、视角变化无关。"
  165. f"根据本文实际节奏,预计大约 {rough_beats} 个节拍,但若故事结构明显更多或更少,以实际为准。"
  166. )
  167. return f"""{prev_section}## 分析任务
  168. 书名:{novel_title}
  169. ### 1. 故事大纲
  170. - **main_plot**:本窗口主线剧情摘要(200-300 字)
  171. - **plot_lines**:活跃/新增剧情线索,每条包含:
  172. - name、mice_type(M/I/C/E)、status(进行中/已解决/待推进)
  173. - description:线索核心矛盾与当前进展(50-80字)
  174. - core_question:一句话概括"这条线索要解答的根本问题"(≤30字)
  175. - next_steps:推进此线索的下一个关键动作或待揭示信息(≤40字)
  176. ### 2. 人物小传
  177. 主要人物(新出现 + 已有人物状态更新),每人包含:
  178. - name、role、goal(当前目标)
  179. - traits:性格特质(3-5个词组)
  180. - speaking_style:说话风格(2-3条典型特征,如"夹杂黑话与文言""关键处斩钉截铁")
  181. - current_state:本窗口末的最新状态(一句话,描述动态处境而非静态属性)
  182. - relationships:与其他角色的关系
  183. ### 3. 写作亮点
  184. 分析本窗口的叙事技法,每条15-30字:
  185. - techniques:叙事/结构技巧(2-3条)
  186. - shuang_designs:爽点设计方式(2-3条,说明实现机制)
  187. - pacing:节奏处理特点(1-2条)
  188. ### 4. 节拍切分(Scene-Sequel 交替)
  189. **切分粒度**:{beat_guidance}
  190. **start_anchor 说明**(非常重要,直接影响定位精度):
  191. - 从下方【待分析文本】中,**一字不差**地逐字复制该节拍开头的 30-40 个字符(含标点)
  192. - **禁止**填写"从原文逐字复制"之类的说明文字,也**禁止**照抄上方示例中的占位符——必须是待分析文本中的真实字符
  193. - 选择该节拍真正开始的位置,而非章节标题
  194. - 避免选择可能多处出现的通用短语(如"他说""道"等)
  195. **节拍要素**:
  196. - Scene:goal / conflict_type(人物冲突|环境冲突|内心冲突|信息冲突)/ conflict_description / disaster
  197. - Sequel:reaction / dilemma / decision
  198. - 每个 beat 必须有:mice_thread(推进的 MICE 线程名称)、shuang_point、state_changes
  199. **shuang_point 格式**:
  200. ```
  201. "shuang_point": {{
  202. "has_shuang": true/false,
  203. "type": "打脸|升级|装逼|获得|碾压|无",
  204. "intensity": "low|medium|high|none",
  205. "description": "具体内容(如有)"
  206. }}
  207. ```
  208. ## 输出格式(严格 JSON)
  209. ```json
  210. {{
  211. "outline": {{
  212. "main_plot": "...",
  213. "plot_lines": [
  214. {{
  215. "name": "税银案", "mice_type": "E", "status": "进行中",
  216. "description": "...",
  217. "core_question": "真银被谁调包?",
  218. "next_steps": "锁定御刀卫陆姓经手人"
  219. }}
  220. ]
  221. }},
  222. "characters": [
  223. {{
  224. "name": "...", "role": "主角", "goal": "...",
  225. "traits": ["机智"],
  226. "speaking_style": ["夹杂现代俚语与古语混搭", "关键处斩钉截铁"],
  227. "current_state": "刚凭推理翻盘,获临时协查资格,尚未脱牢",
  228. "relationships": {{"角色A": "关系"}}
  229. }}
  230. ],
  231. "writing_insights": {{
  232. "techniques": ["信息差分层释放:主角全知,古代角色见表象,层层迟滞"],
  233. "shuang_designs": ["逻辑型装逼:靠算术/化学原理碾压,非武力打脸"],
  234. "pacing": ["对话占比65%,每章2-3次场景切换,无大段独白"]
  235. }},
  236. "beats": [
  237. {{
  238. "id": "beat_001",
  239. "type": "scene",
  240. "start_anchor": "【此处填入待分析文本原文开头20-30字】",
  241. "mice_thread": "税银案",
  242. "summary": "...",
  243. "goal": "...",
  244. "conflict_type": "人物冲突",
  245. "conflict_description": "...",
  246. "disaster": "...",
  247. "shuang_point": {{
  248. "has_shuang": true,
  249. "type": "智商碾压",
  250. "intensity": "high",
  251. "description": "..."
  252. }},
  253. "state_changes": {{
  254. "plot_lines": [{{"name": "税银案", "old_state": "调查中", "new_state": "发现破绽"}}],
  255. "characters": [{{"name": "许七安", "change": "从囚犯转变为关键证人"}}]
  256. }}
  257. }},
  258. {{
  259. "id": "beat_002",
  260. "type": "sequel",
  261. "start_anchor": "【此处填入待分析文本原文开头20-30字】",
  262. "mice_thread": "身份成长",
  263. "summary": "...",
  264. "reaction": "...",
  265. "dilemma": "...",
  266. "decision": "...",
  267. "shuang_point": {{"has_shuang": false, "type": "无", "intensity": "none", "description": ""}},
  268. "state_changes": {{
  269. "characters": [{{"name": "许七安", "change": "心态从被动转为主动"}}]
  270. }}
  271. }}
  272. ]
  273. }}
  274. ```
  275. ## 待分析文本
  276. {window_text}
  277. """
  278. # ──────────────────────────────────────────────────────────────
  279. # 主流程
  280. # ──────────────────────────────────────────────────────────────
  281. async def analyze_window(
  282. novel_path: str,
  283. window_index: int,
  284. prev_analysis_path: Optional[str],
  285. output_path: str,
  286. model: str,
  287. window_size: int = WINDOW_SIZE,
  288. ):
  289. print(f"\n{'='*60}")
  290. print(f"窗口 {window_index} 分析")
  291. print(f"{'='*60}")
  292. text = load_text(novel_path)
  293. total = len(text)
  294. print(f"全文:{total:,} 字符")
  295. start = window_index * window_size
  296. end = min(start + window_size, total)
  297. if start >= total:
  298. print("起始位置超过文件长度,退出。")
  299. return
  300. window = text[start:end]
  301. print(f"窗口范围:{start:,} - {end:,}({end - start:,} 字符)")
  302. prev_meta = None
  303. if prev_analysis_path and Path(prev_analysis_path).exists():
  304. with open(prev_analysis_path, encoding="utf-8") as f:
  305. prev_meta = json.load(f)
  306. print(f"加载前序分析:{prev_analysis_path}")
  307. novel_title = Path(novel_path).stem
  308. prompt = build_prompt(window, prev_meta, novel_title)
  309. messages = [
  310. {"role": "system", "content": SYSTEM_ANALYST},
  311. {"role": "user", "content": prompt},
  312. ]
  313. print(f"调用 LLM({model})...")
  314. raw = await llm_call(messages, model=model)
  315. print("解析 JSON...")
  316. try:
  317. analysis = extract_json(raw)
  318. except json.JSONDecodeError as e:
  319. err_path = Path(output_path).with_suffix(".error.txt")
  320. Path(output_path).parent.mkdir(parents=True, exist_ok=True)
  321. err_path.write_text(raw, encoding="utf-8")
  322. print(f"JSON 解析失败:{e}\n原始响应已保存到 {err_path}")
  323. raise
  324. # 锚点定位
  325. beats = analysis.get("beats", [])
  326. resolve_positions(beats, window, window_offset=start, window_end=end)
  327. analysis["_meta"] = {
  328. "novel_title": novel_title,
  329. "window_index": window_index,
  330. "window_start": start,
  331. "window_end": end,
  332. "total_chars": total,
  333. "window_size": window_size,
  334. "beats_count": len(beats),
  335. "model": model,
  336. }
  337. out = Path(output_path)
  338. out.parent.mkdir(parents=True, exist_ok=True)
  339. out.write_text(json.dumps(analysis, ensure_ascii=False, indent=2), encoding="utf-8")
  340. print(f"\n分析完成 → {output_path}")
  341. print(f" 节拍数:{len(beats)} 人物:{len(analysis.get('characters', []))} 线索:{len(analysis.get('outline', {}).get('plot_lines', []))}")
  342. print()
  343. print(f" {'id':<12} {'type':<8} {'position':<24} {'mice':<14} {'sp'} summary")
  344. print(f" {'-'*75}")
  345. for b in beats:
  346. pos = f"{b['position_start']:,}-{b['position_end']:,}"
  347. sp = "⭐" if b.get("shuang_point", {}).get("has_shuang") else " "
  348. anchor_ok = "✓" if b.get("start_anchor") else "✗"
  349. print(
  350. f" {b['id']:<12} {b['type']:<8} {pos:<24} "
  351. f"{b.get('mice_thread','?'):<14} {sp} [{anchor_ok}] {b['summary'][:28]}..."
  352. )
  353. def main():
  354. parser = argparse.ArgumentParser(description="步骤1:500K 窗口故事分析")
  355. parser.add_argument("--novel", required=True, help="小说 txt 文件路径")
  356. parser.add_argument("--window-index", type=int, default=0, help="窗口序号(0-based)")
  357. parser.add_argument("--window-size", type=int, default=WINDOW_SIZE, help="窗口大小(字符数)")
  358. parser.add_argument("--prev-analysis", default=None, help="前一窗口的分析 JSON")
  359. parser.add_argument("--output", required=True, help="输出 JSON 文件路径")
  360. parser.add_argument("--model", default="qwen-plus", help="模型名称")
  361. args = parser.parse_args()
  362. asyncio.run(
  363. analyze_window(
  364. args.novel,
  365. args.window_index,
  366. args.prev_analysis,
  367. args.output,
  368. args.model,
  369. args.window_size,
  370. )
  371. )
  372. if __name__ == "__main__":
  373. main()