quote-source.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. #!/usr/bin/env python3
  2. """quote-source.py — 从原文 case json 按字符匹配查找并**原样返回**片段, 供填 value / directive。
  3. 为什么需要: value / directive 要填原文里的真实内容(那段 JSON 风格分析、那句完整提示词),
  4. 不能写 "<JSON 风格分析数据>" 这种空壳, 也不能凭记忆缩写。用本工具把原文那段逐字捞出来,
  5. 再粘进 workflow.json 的 value / directive。
  6. 匹配是**空白无关**的: 原文 body_text 常有乱换行 / 前导空格("交给\\n AI"), 直接子串匹配
  7. 多词查询会落空。本工具匹配时忽略所有空白, 但返回的是**原文逐字片段**(连原排版一起)。
  8. 用法 (Agent 走 bash_command 调):
  9. # 按几个关键词捞出周边片段 (默认前后各 300 字)
  10. python spec/tools/quote-source.py --source input/case-N.json --query "视觉风格分析"
  11. # 捞大块时放宽窗口
  12. python spec/tools/quote-source.py --source input/case-N.json --query "主色调" --window 800
  13. # 命中点落在 {...} 里时, 返回整个 JSON 块 (适合捞那段结构化分析)
  14. python spec/tools/quote-source.py --source input/case-N.json --query "视觉风格分析" --json-block
  15. # 捞一句完整提示词当 directive
  16. python spec/tools/quote-source.py --source input/case-N.json --query "请以 JSON 结构化"
  17. # 范围引用: 用首尾两个短锚点引出之间的整段长原文 (比 --window 猜长度精确)
  18. python spec/tools/quote-source.py --source input/case-N.json --from "请以 JSON" --to "500 字以内"
  19. python spec/tools/quote-source.py --source input/case-N.json --from "视觉风格分析" --to "电影感人像"
  20. 输出: 命中的原文逐字片段 (多处命中各自分隔)。无精确命中时给出最接近的一段 (标 ~approx)。
  21. 退出码: 0 有命中(含 approx) / 1 完全找不到 / 2 CLI/IO 错。
  22. """
  23. import argparse
  24. import json
  25. import sys
  26. from pathlib import Path
  27. for _s in (sys.stdout, sys.stderr):
  28. if hasattr(_s, "reconfigure"):
  29. try:
  30. _s.reconfigure(encoding="utf-8", errors="replace")
  31. except Exception:
  32. pass
  33. def _source_text(path: Path) -> str:
  34. """从 case json 抽可搜索文本: title + body_text (两套 schema 都有 body_text)。"""
  35. data = json.loads(path.read_text(encoding="utf-8"))
  36. if not isinstance(data, dict):
  37. return str(data)
  38. parts = []
  39. for k in ("title", "body_text"):
  40. v = data.get(k)
  41. if isinstance(v, str) and v:
  42. parts.append(v)
  43. return "\n".join(parts)
  44. def _build_norm(text: str):
  45. """去掉所有空白, 返回 (norm_str, idxmap)。idxmap[i] = norm 第 i 个字符在原文里的下标。"""
  46. chars, idxmap = [], []
  47. for i, ch in enumerate(text):
  48. if ch.isspace():
  49. continue
  50. chars.append(ch)
  51. idxmap.append(i)
  52. return "".join(chars), idxmap
  53. def _find_all(ns: str, nq: str, max_hits: int):
  54. hits, start = [], 0
  55. while len(hits) < max_hits:
  56. p = ns.find(nq, start)
  57. if p < 0:
  58. break
  59. hits.append(p)
  60. start = p + max(1, len(nq))
  61. return hits
  62. def _enclosing_braces(text: str, lo: int, hi: int):
  63. """找包住 [lo,hi] 的最小 {...} (按花括号配平)。找不到返回 None。"""
  64. # 向左找候选 '{'
  65. depth = 0
  66. open_pos = None
  67. i = lo
  68. while i >= 0:
  69. if text[i] == "}":
  70. depth += 1
  71. elif text[i] == "{":
  72. if depth == 0:
  73. open_pos = i
  74. break
  75. depth -= 1
  76. i -= 1
  77. if open_pos is None:
  78. return None
  79. # 从 open_pos 向右配平
  80. depth = 0
  81. j = open_pos
  82. while j < len(text):
  83. if text[j] == "{":
  84. depth += 1
  85. elif text[j] == "}":
  86. depth -= 1
  87. if depth == 0:
  88. if j >= hi:
  89. return (open_pos, j)
  90. return None
  91. j += 1
  92. return None
  93. def _approx(ns: str, nq: str, idxmap, text: str, window: int):
  94. """无精确命中时, 用最长公共子串给最接近的一段 (标 approx)。"""
  95. from difflib import SequenceMatcher
  96. if not nq:
  97. return None
  98. m = SequenceMatcher(None, ns, nq, autojunk=False).find_longest_match(0, len(ns), 0, len(nq))
  99. if m.size == 0:
  100. return None
  101. lo_n, hi_n = m.a, m.a + m.size - 1
  102. return _slice(idxmap, text, lo_n, hi_n, window, False)
  103. def _slice(idxmap, text: str, lo_n: int, hi_n: int, window: int, json_block: bool):
  104. o_lo = idxmap[lo_n]
  105. o_hi = idxmap[hi_n]
  106. if json_block:
  107. br = _enclosing_braces(text, o_lo, o_hi)
  108. if br:
  109. return text[br[0]: br[1] + 1], br[0], br[1]
  110. lo = max(0, o_lo - window)
  111. hi = min(len(text), o_hi + 1 + window)
  112. return text[lo:hi], lo, hi
  113. def _range_between(text: str, frm: str, to: str):
  114. """范围引用: 返回 frm 首次出现 → 其后首次 to 结束 之间(含两端)的原文逐字片段。
  115. 匹配空白无关; to 必须落在 frm 之后(取最近的一个), 给出最小包裹区间。找不到返回 None。
  116. """
  117. ns, idxmap = _build_norm(text)
  118. nf, _ = _build_norm(frm)
  119. nt, _ = _build_norm(to)
  120. if not nf or not nt:
  121. return None
  122. p = ns.find(nf)
  123. if p < 0:
  124. return None
  125. q = ns.find(nt, p + len(nf)) # to 必须在 from 之后
  126. if q < 0:
  127. return None
  128. o_lo = idxmap[p]
  129. o_hi = idxmap[q + len(nt) - 1]
  130. return text[o_lo:o_hi + 1], o_lo, o_hi
  131. def main() -> int:
  132. ap = argparse.ArgumentParser(description="从原文 case json 按字符匹配原样捞片段 (供填 value/directive)")
  133. ap.add_argument("--source", required=True, help="原文 case json 路径")
  134. ap.add_argument("--query", default=None, help="关键词模式: 要查找的文字 (空白无关匹配; 配 --window / --json-block)")
  135. ap.add_argument("--from", dest="frm", default=None, help="范围模式: 起始锚点 (短串)")
  136. ap.add_argument("--to", dest="to", default=None, help="范围模式: 结束锚点 (短串); 与 --from 同用, 用两个短参数引出之间(含)的整段长原文")
  137. ap.add_argument("--ocr", default=None, help="配图 OCR 文本文件 (如 <case_dir>/_scratch/ocr.txt); 一并搜图片里的文字")
  138. ap.add_argument("--window", type=int, default=300, help="命中点前后各保留多少字 (default 300)")
  139. ap.add_argument("--max", type=int, default=3, help="每个语料最多返回几处命中 (default 3)")
  140. ap.add_argument("--json-block", action="store_true", help="命中落在 {...} 内时返回整个 JSON 块")
  141. args = ap.parse_args()
  142. src = Path(args.source)
  143. if not src.exists():
  144. print(f"source not found: {src}", file=sys.stderr)
  145. return 2
  146. try:
  147. corpora = [("原文", _source_text(src))]
  148. except (json.JSONDecodeError, OSError) as e:
  149. print(f"read source failed: {e}", file=sys.stderr)
  150. return 2
  151. # 第二语料: 配图 OCR 文本
  152. if args.ocr:
  153. ocr_p = Path(args.ocr)
  154. if ocr_p.exists():
  155. corpora.append(("配图OCR", ocr_p.read_text(encoding="utf-8")))
  156. else:
  157. print(f"# (--ocr {ocr_p} 不存在, 仅搜原文)", file=sys.stderr)
  158. # ── 范围模式: --from / --to 两个短锚点 → 返回之间(含)的整段原文 ──
  159. if args.frm is not None or args.to is not None:
  160. if not (args.frm and args.to):
  161. print("范围模式需同时给 --from 和 --to", file=sys.stderr)
  162. return 2
  163. out = []
  164. for label, text in corpora:
  165. rng = _range_between(text, args.frm, args.to)
  166. if rng:
  167. frag, lo, hi = rng
  168. warn = " ⚠ 跨度很大, 确认锚点是否选窄了" if (hi - lo) > 4000 else ""
  169. out.append(f"# [{label}] 范围命中 (第 {lo}–{hi} 字, 共 {hi - lo + 1} 字{warn}; 下为两锚点间逐字原文)\n{frag}")
  170. if out:
  171. print("\n\n".join(out))
  172. return 0
  173. print(f"# 找不到从 {args.frm!r} 到 {args.to!r} 的范围 (检查: 两锚点是否各自存在、to 是否在 from 之后)", file=sys.stderr)
  174. return 1
  175. # ── 关键词模式: --query ──
  176. if not args.query:
  177. print("需要 --query, 或 (--from + --to) 范围模式", file=sys.stderr)
  178. return 2
  179. nq, _ = _build_norm(args.query)
  180. if not nq:
  181. print("query 去空白后为空", file=sys.stderr)
  182. return 2
  183. out, found = [], False
  184. for label, text in corpora:
  185. ns, idxmap = _build_norm(text)
  186. hits = _find_all(ns, nq, args.max)
  187. if hits:
  188. found = True
  189. out.append(f"# [{label}] {len(hits)} 处命中 (query={args.query!r}, 空白无关匹配; 下为逐字片段)")
  190. for k, p in enumerate(hits, 1):
  191. frag, lo, hi = _slice(idxmap, text, p, p + len(nq) - 1, args.window, args.json_block)
  192. out.append(f"\n--- [{label}] 命中 {k} (第 {lo}–{hi} 字) ---\n{frag}")
  193. if found:
  194. print("\n".join(out))
  195. return 0
  196. # 无精确命中 → 各语料给最接近的一段
  197. for label, text in corpora:
  198. ns, idxmap = _build_norm(text)
  199. ap_res = _approx(ns, nq, idxmap, text, args.window)
  200. if ap_res:
  201. frag, lo, hi = ap_res
  202. out.append(f"# [{label}] 无精确命中, 最接近的一段 (~approx, 第 {lo}–{hi} 字; 请核对):\n\n{frag}")
  203. if out:
  204. print("\n".join(out))
  205. return 0
  206. print(f"# 原文/OCR 里都找不到与 {args.query!r} 相关的内容", file=sys.stderr)
  207. return 1
  208. if __name__ == "__main__":
  209. sys.exit(main())