harvest_case_corpus.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. """Harvest a run's runtime artifacts into a scrubbed test corpus (V2-M0A).
  2. Reads the 13 runtime artifacts of one run via the RuntimeStore interface
  3. (LocalRuntimeFileStore for on-disk runs, DatabaseRuntimeStore for the production
  4. DB) and writes a SCRUBBED copy under tests/fixtures/cases/{case_id}/input/ for
  5. offline replay (see tests/replay_harness.py).
  6. Scrubbing: the on-disk / DB artifacts are NOT guaranteed secret-free
  7. (LocalRuntimeFileStore does not enforce FORBIDDEN_RAW_PAYLOAD_KEYS), so this tool
  8. masks every value whose KEY is sensitive — recursively — before the corpus is
  9. committed. The key set extends FORBIDDEN_RAW_PAYLOAD_KEYS with platform
  10. identifiers (sec_uid / account_id) and signed media URLs.
  11. Usage:
  12. python scripts/harvest_case_corpus.py --run-id v1_run_xxx --from files --case-id smoke_local
  13. python scripts/harvest_case_corpus.py --run-id v1_run_xxx --from db --case-id real_id45 --env-file .env
  14. Exit 0 on success (corpus written + scrub self-check clean), 1 otherwise.
  15. """
  16. from __future__ import annotations
  17. import argparse
  18. import json
  19. import sys
  20. from pathlib import Path
  21. from typing import Any
  22. ROOT = Path(__file__).resolve().parents[1]
  23. if str(ROOT) not in sys.path:
  24. sys.path.insert(0, str(ROOT))
  25. from content_agent.integrations.database_runtime import FORBIDDEN_RAW_PAYLOAD_KEYS
  26. from content_agent.integrations.runtime_files import RUNTIME_FILENAMES, LocalRuntimeFileStore
  27. CASES_DIR = ROOT / "tests" / "fixtures" / "cases"
  28. JSON_FILES = {"source_context.json", "pattern_seed_pack.json", "final_output.json", "strategy_review.json"}
  29. PLACEHOLDER = "<scrubbed>"
  30. # Sensitive KEY names: DB-backend blocklist + platform identifiers + signed URLs.
  31. SCRUB_KEYS = {key.lower() for key in FORBIDDEN_RAW_PAYLOAD_KEYS} | {
  32. "sec_uid",
  33. "account_id",
  34. "author_id",
  35. "play_url",
  36. "oss_url",
  37. "platform_author_id",
  38. }
  39. def _is_secret_value(value: Any) -> bool:
  40. """Douyin sec_uid values leak into non-sensitive keys (e.g. walk node ids)."""
  41. return isinstance(value, str) and value.startswith("MS4w") and len(value) > 20
  42. def scrub(obj: Any) -> Any:
  43. """Recursively mask sensitive values by key name or value pattern."""
  44. if isinstance(obj, dict):
  45. return {
  46. key: (PLACEHOLDER if str(key).lower() in SCRUB_KEYS else scrub(value))
  47. for key, value in obj.items()
  48. }
  49. if isinstance(obj, list):
  50. return [scrub(value) for value in obj]
  51. if _is_secret_value(obj):
  52. return PLACEHOLDER
  53. return obj
  54. def find_forbidden(obj: Any, path: str = "") -> list[str]:
  55. """Return key paths still holding a sensitive (unmasked) value."""
  56. hits: list[str] = []
  57. if isinstance(obj, dict):
  58. for key, value in obj.items():
  59. here = f"{path}.{key}"
  60. if str(key).lower() in SCRUB_KEYS and value != PLACEHOLDER:
  61. hits.append(here)
  62. hits.extend(find_forbidden(value, here))
  63. elif isinstance(obj, list):
  64. for index, value in enumerate(obj):
  65. hits.extend(find_forbidden(value, f"{path}[{index}]"))
  66. elif _is_secret_value(obj):
  67. hits.append(path)
  68. return hits
  69. def harvest(run_id: str, store: Any) -> dict[str, Any]:
  70. """Read every present runtime artifact of a run via the store interface."""
  71. status = store.file_status(run_id)
  72. artifacts: dict[str, Any] = {}
  73. for filename in RUNTIME_FILENAMES:
  74. if not status.get(filename):
  75. continue
  76. if filename in JSON_FILES:
  77. artifacts[filename] = store.read_json(run_id, filename)
  78. else:
  79. artifacts[filename] = store.read_jsonl(run_id, filename)
  80. return artifacts
  81. def write_corpus(case_id: str, artifacts: dict[str, Any]) -> Path:
  82. dest = CASES_DIR / case_id / "input"
  83. dest.mkdir(parents=True, exist_ok=True)
  84. for filename, data in artifacts.items():
  85. scrubbed = scrub(data)
  86. (dest / filename).write_text(
  87. json.dumps(scrubbed, ensure_ascii=False, indent=2) + "\n", encoding="utf-8"
  88. )
  89. return dest
  90. def _store(source: str, env_file: str) -> Any:
  91. if source == "files":
  92. return LocalRuntimeFileStore(ROOT / "runtime" / "v1")
  93. from content_agent.integrations.database_runtime import (
  94. ContentSupplyDbConfig,
  95. DatabaseRuntimeStore,
  96. )
  97. return DatabaseRuntimeStore(ContentSupplyDbConfig.from_env(env_file))
  98. def main() -> int:
  99. args = _parse_args()
  100. store = _store(args.source, args.env_file)
  101. artifacts = harvest(args.run_id, store)
  102. if not artifacts:
  103. print(json.dumps({"status": "fail", "reason": "no_artifacts", "run_id": args.run_id}))
  104. return 1
  105. dest = write_corpus(args.case_id, artifacts)
  106. # Self-check: corpus must be free of sensitive values.
  107. leaks: dict[str, list[str]] = {}
  108. for filename in artifacts:
  109. data = json.loads((dest / filename).read_text(encoding="utf-8"))
  110. hits = find_forbidden(data)
  111. if hits:
  112. leaks[filename] = hits
  113. result = {
  114. "status": "fail" if leaks else "pass",
  115. "case_id": args.case_id,
  116. "run_id": args.run_id,
  117. "files": sorted(artifacts),
  118. "corpus_dir": str(dest.relative_to(ROOT)),
  119. "leaks": leaks,
  120. }
  121. print(json.dumps(result, ensure_ascii=False, indent=2))
  122. return 1 if leaks else 0
  123. def _parse_args() -> argparse.Namespace:
  124. parser = argparse.ArgumentParser(description=__doc__)
  125. parser.add_argument("--run-id", required=True)
  126. parser.add_argument("--from", dest="source", choices=["files", "db"], default="files")
  127. parser.add_argument("--case-id", required=True)
  128. parser.add_argument("--env-file", default=".env")
  129. return parser.parse_args()
  130. if __name__ == "__main__":
  131. sys.exit(main())