"""Harvest a run's runtime artifacts into a scrubbed test corpus (V2-M0A). Reads the 13 runtime artifacts of one run via the RuntimeStore interface (LocalRuntimeFileStore for on-disk runs, DatabaseRuntimeStore for the production DB) and writes a SCRUBBED copy under tests/fixtures/cases/{case_id}/input/ for offline replay (see tests/replay_harness.py). Scrubbing: the on-disk / DB artifacts are NOT guaranteed secret-free (LocalRuntimeFileStore does not enforce FORBIDDEN_RAW_PAYLOAD_KEYS), so this tool masks every value whose KEY is sensitive — recursively — before the corpus is committed. The key set extends FORBIDDEN_RAW_PAYLOAD_KEYS with platform identifiers (sec_uid / account_id) and signed media URLs. Usage: python scripts/harvest_case_corpus.py --run-id v1_run_xxx --from files --case-id smoke_local python scripts/harvest_case_corpus.py --run-id v1_run_xxx --from db --case-id real_id45 --env-file .env Exit 0 on success (corpus written + scrub self-check clean), 1 otherwise. """ from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from content_agent.integrations.database_runtime import FORBIDDEN_RAW_PAYLOAD_KEYS from content_agent.integrations.runtime_files import RUNTIME_FILENAMES, LocalRuntimeFileStore CASES_DIR = ROOT / "tests" / "fixtures" / "cases" JSON_FILES = {"source_context.json", "pattern_seed_pack.json", "final_output.json", "strategy_review.json"} PLACEHOLDER = "" # Sensitive KEY names: DB-backend blocklist + platform identifiers + signed URLs. SCRUB_KEYS = {key.lower() for key in FORBIDDEN_RAW_PAYLOAD_KEYS} | { "sec_uid", "account_id", "author_id", "play_url", "oss_url", "platform_author_id", } def _is_secret_value(value: Any) -> bool: """Douyin sec_uid values leak into non-sensitive keys (e.g. walk node ids).""" return isinstance(value, str) and value.startswith("MS4w") and len(value) > 20 def scrub(obj: Any) -> Any: """Recursively mask sensitive values by key name or value pattern.""" if isinstance(obj, dict): return { key: (PLACEHOLDER if str(key).lower() in SCRUB_KEYS else scrub(value)) for key, value in obj.items() } if isinstance(obj, list): return [scrub(value) for value in obj] if _is_secret_value(obj): return PLACEHOLDER return obj def find_forbidden(obj: Any, path: str = "") -> list[str]: """Return key paths still holding a sensitive (unmasked) value.""" hits: list[str] = [] if isinstance(obj, dict): for key, value in obj.items(): here = f"{path}.{key}" if str(key).lower() in SCRUB_KEYS and value != PLACEHOLDER: hits.append(here) hits.extend(find_forbidden(value, here)) elif isinstance(obj, list): for index, value in enumerate(obj): hits.extend(find_forbidden(value, f"{path}[{index}]")) elif _is_secret_value(obj): hits.append(path) return hits def harvest(run_id: str, store: Any) -> dict[str, Any]: """Read every present runtime artifact of a run via the store interface.""" status = store.file_status(run_id) artifacts: dict[str, Any] = {} for filename in RUNTIME_FILENAMES: if not status.get(filename): continue if filename in JSON_FILES: artifacts[filename] = store.read_json(run_id, filename) else: artifacts[filename] = store.read_jsonl(run_id, filename) return artifacts def write_corpus(case_id: str, artifacts: dict[str, Any]) -> Path: dest = CASES_DIR / case_id / "input" dest.mkdir(parents=True, exist_ok=True) for filename, data in artifacts.items(): scrubbed = scrub(data) (dest / filename).write_text( json.dumps(scrubbed, ensure_ascii=False, indent=2) + "\n", encoding="utf-8" ) return dest def _store(source: str, env_file: str) -> Any: if source == "files": return LocalRuntimeFileStore(ROOT / "runtime" / "v1") from content_agent.integrations.database_runtime import ( ContentSupplyDbConfig, DatabaseRuntimeStore, ) return DatabaseRuntimeStore(ContentSupplyDbConfig.from_env(env_file)) def main() -> int: args = _parse_args() store = _store(args.source, args.env_file) artifacts = harvest(args.run_id, store) if not artifacts: print(json.dumps({"status": "fail", "reason": "no_artifacts", "run_id": args.run_id})) return 1 dest = write_corpus(args.case_id, artifacts) # Self-check: corpus must be free of sensitive values. leaks: dict[str, list[str]] = {} for filename in artifacts: data = json.loads((dest / filename).read_text(encoding="utf-8")) hits = find_forbidden(data) if hits: leaks[filename] = hits result = { "status": "fail" if leaks else "pass", "case_id": args.case_id, "run_id": args.run_id, "files": sorted(artifacts), "corpus_dir": str(dest.relative_to(ROOT)), "leaks": leaks, } print(json.dumps(result, ensure_ascii=False, indent=2)) return 1 if leaks else 0 def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--run-id", required=True) parser.add_argument("--from", dest="source", choices=["files", "db"], default="files") parser.add_argument("--case-id", required=True) parser.add_argument("--env-file", default=".env") return parser.parse_args() if __name__ == "__main__": sys.exit(main())