| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- """Harvest a run's runtime artifacts into a scrubbed test corpus (V2-M0A).
- Reads the 13 runtime artifacts of one run via the RuntimeStore interface
- (LocalRuntimeFileStore for on-disk runs, DatabaseRuntimeStore for the production
- DB) and writes a SCRUBBED copy under tests/fixtures/cases/{case_id}/input/ for
- offline replay (see tests/replay_harness.py).
- Scrubbing: the on-disk / DB artifacts are NOT guaranteed secret-free
- (LocalRuntimeFileStore does not enforce FORBIDDEN_RAW_PAYLOAD_KEYS), so this tool
- masks every value whose KEY is sensitive — recursively — before the corpus is
- committed. The key set extends FORBIDDEN_RAW_PAYLOAD_KEYS with platform
- identifiers (sec_uid / account_id) and signed media URLs.
- Usage:
- python scripts/harvest_case_corpus.py --run-id v1_run_xxx --from files --case-id smoke_local
- python scripts/harvest_case_corpus.py --run-id v1_run_xxx --from db --case-id real_id45 --env-file .env
- Exit 0 on success (corpus written + scrub self-check clean), 1 otherwise.
- """
- from __future__ import annotations
- import argparse
- import json
- import sys
- from pathlib import Path
- from typing import Any
- ROOT = Path(__file__).resolve().parents[1]
- if str(ROOT) not in sys.path:
- sys.path.insert(0, str(ROOT))
- from content_agent.integrations.database_runtime import FORBIDDEN_RAW_PAYLOAD_KEYS
- from content_agent.integrations.runtime_files import RUNTIME_FILENAMES, LocalRuntimeFileStore
- CASES_DIR = ROOT / "tests" / "fixtures" / "cases"
- JSON_FILES = {"source_context.json", "pattern_seed_pack.json", "final_output.json", "strategy_review.json"}
- PLACEHOLDER = "<scrubbed>"
- # Sensitive KEY names: DB-backend blocklist + platform identifiers + signed URLs.
- SCRUB_KEYS = {key.lower() for key in FORBIDDEN_RAW_PAYLOAD_KEYS} | {
- "sec_uid",
- "account_id",
- "author_id",
- "play_url",
- "oss_url",
- "platform_author_id",
- }
- def _is_secret_value(value: Any) -> bool:
- """Douyin sec_uid values leak into non-sensitive keys (e.g. walk node ids)."""
- return isinstance(value, str) and value.startswith("MS4w") and len(value) > 20
- def scrub(obj: Any) -> Any:
- """Recursively mask sensitive values by key name or value pattern."""
- if isinstance(obj, dict):
- return {
- key: (PLACEHOLDER if str(key).lower() in SCRUB_KEYS else scrub(value))
- for key, value in obj.items()
- }
- if isinstance(obj, list):
- return [scrub(value) for value in obj]
- if _is_secret_value(obj):
- return PLACEHOLDER
- return obj
- def find_forbidden(obj: Any, path: str = "") -> list[str]:
- """Return key paths still holding a sensitive (unmasked) value."""
- hits: list[str] = []
- if isinstance(obj, dict):
- for key, value in obj.items():
- here = f"{path}.{key}"
- if str(key).lower() in SCRUB_KEYS and value != PLACEHOLDER:
- hits.append(here)
- hits.extend(find_forbidden(value, here))
- elif isinstance(obj, list):
- for index, value in enumerate(obj):
- hits.extend(find_forbidden(value, f"{path}[{index}]"))
- elif _is_secret_value(obj):
- hits.append(path)
- return hits
- def harvest(run_id: str, store: Any) -> dict[str, Any]:
- """Read every present runtime artifact of a run via the store interface."""
- status = store.file_status(run_id)
- artifacts: dict[str, Any] = {}
- for filename in RUNTIME_FILENAMES:
- if not status.get(filename):
- continue
- if filename in JSON_FILES:
- artifacts[filename] = store.read_json(run_id, filename)
- else:
- artifacts[filename] = store.read_jsonl(run_id, filename)
- return artifacts
- def write_corpus(case_id: str, artifacts: dict[str, Any]) -> Path:
- dest = CASES_DIR / case_id / "input"
- dest.mkdir(parents=True, exist_ok=True)
- for filename, data in artifacts.items():
- scrubbed = scrub(data)
- (dest / filename).write_text(
- json.dumps(scrubbed, ensure_ascii=False, indent=2) + "\n", encoding="utf-8"
- )
- return dest
- def _store(source: str, env_file: str) -> Any:
- if source == "files":
- return LocalRuntimeFileStore(ROOT / "runtime" / "v1")
- from content_agent.integrations.database_runtime import (
- ContentSupplyDbConfig,
- DatabaseRuntimeStore,
- )
- return DatabaseRuntimeStore(ContentSupplyDbConfig.from_env(env_file))
- def main() -> int:
- args = _parse_args()
- store = _store(args.source, args.env_file)
- artifacts = harvest(args.run_id, store)
- if not artifacts:
- print(json.dumps({"status": "fail", "reason": "no_artifacts", "run_id": args.run_id}))
- return 1
- dest = write_corpus(args.case_id, artifacts)
- # Self-check: corpus must be free of sensitive values.
- leaks: dict[str, list[str]] = {}
- for filename in artifacts:
- data = json.loads((dest / filename).read_text(encoding="utf-8"))
- hits = find_forbidden(data)
- if hits:
- leaks[filename] = hits
- result = {
- "status": "fail" if leaks else "pass",
- "case_id": args.case_id,
- "run_id": args.run_id,
- "files": sorted(artifacts),
- "corpus_dir": str(dest.relative_to(ROOT)),
- "leaks": leaks,
- }
- print(json.dumps(result, ensure_ascii=False, indent=2))
- return 1 if leaks else 0
- def _parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(description=__doc__)
- parser.add_argument("--run-id", required=True)
- parser.add_argument("--from", dest="source", choices=["files", "db"], default="files")
- parser.add_argument("--case-id", required=True)
- parser.add_argument("--env-file", default=".env")
- return parser.parse_args()
- if __name__ == "__main__":
- sys.exit(main())
|