| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- #!/usr/bin/env python3
- from __future__ import annotations
- import json
- import os
- import sys
- from pathlib import Path
- from typing import Any
- ROOT = Path(__file__).resolve().parents[1]
- if str(ROOT) not in sys.path:
- sys.path.insert(0, str(ROOT))
- from content_agent.business_modules.m6_acceptance_report import build_report
- from content_agent.dashboard_service import DashboardService
- from content_agent.run_service import RunService
- from content_agent.schemas import RunStartRequest
- PLATFORMS = ("douyin", "kuaishou", "shipinhao")
- ALLOWED_DATA_ORIGINS = {"production_db", "mixed_with_runtime_export"}
- SMOKE_DEFAULTS = {
- "CONTENT_AGENT_GEMINI_MAX_WORKERS": "2",
- "CONTENT_AGENT_WALK_MAX_TOTAL_ACTIONS_PER_RUN": "4",
- }
- def main() -> int:
- if not _db_runtime_enabled():
- raise SystemExit("CONTENT_AGENT_DB_RUNTIME_ENABLED=1 is required for M6 real acceptance")
- _apply_smoke_defaults()
- service = RunService.from_env()
- dashboard_service = DashboardService.from_runtime(service.runtime)
- results = []
- for platform in PLATFORMS:
- state = service.start_run(
- RunStartRequest(platform=platform, platform_mode="real", strategy_version="V4")
- )
- run_id = state["run_id"]
- result = _collect_run_result(service, dashboard_service, state, platform)
- results.append(result)
- result["acceptance_status"] = _acceptance_status(result)
- payload = {
- "schema_version": "v4_m6_real_acceptance.v1",
- "source_kind": "db_default_safe_source",
- "smoke_limits": _smoke_limits(),
- "platforms": list(PLATFORMS),
- "results": results,
- "status": "pass" if all(row["acceptance_status"] == "pass" for row in results) else "fail",
- }
- print(json.dumps(payload, ensure_ascii=False, indent=2, default=str))
- return 0 if payload["status"] == "pass" else 1
- def _apply_smoke_defaults() -> None:
- for key, value in SMOKE_DEFAULTS.items():
- os.environ.setdefault(key, value)
- def _smoke_limits() -> dict[str, str | None]:
- return {key: os.environ.get(key) for key in SMOKE_DEFAULTS}
- def _collect_run_result(
- service: RunService,
- dashboard_service: DashboardService,
- state: dict[str, Any],
- platform: str,
- ) -> dict[str, Any]:
- run_id = state["run_id"]
- summary = _safe_call(lambda: service.get_summary(run_id), {})
- validation = _safe_call(lambda: service.validate_run(run_id), {"status": "fail", "findings": []})
- dashboard = _safe_call(lambda: dashboard_service.dashboard(run_id), {})
- timeline = _safe_call(lambda: dashboard_service.timeline(run_id), {})
- strategy_review = _safe_call(lambda: service.strategy_review(run_id), {})
- runtime_files = _safe_call(lambda: dashboard_service.runtime_files(run_id), {})
- report = _safe_call(
- lambda: build_report(run_id, service.runtime, platform=platform, platform_mode="real"),
- {},
- )
- data_origin = dashboard.get("data_origin") or runtime_files.get("data_origin")
- return {
- "platform": platform,
- "platform_mode": "real",
- "run_id": run_id,
- "policy_run_id": state.get("policy_run_id") or summary.get("policy_run_id"),
- "status": state.get("status"),
- "validation_status": validation.get("status"),
- "output_dir": summary.get("output_dir"),
- "data_origin": data_origin,
- "db_run_record_present": _db_run_record_present(dashboard, data_origin),
- "runtime_files": _runtime_file_status(runtime_files),
- "real_interface_progress": _real_interface_progress(runtime_files, timeline),
- "dashboard_summary": dashboard.get("summary", {}),
- "timeline_summary": timeline.get("summary", {}),
- "strategy_review_status": strategy_review.get("review_status"),
- "failure_classification": _failure_classification(state, dashboard),
- "m6_report": report,
- }
- def _safe_call(fn: Any, fallback: Any) -> Any:
- try:
- return fn()
- except Exception as exc:
- return {"error": type(exc).__name__, "message": str(exc)} if isinstance(fallback, dict) else fallback
- def _runtime_file_status(runtime_files: dict[str, Any]) -> dict[str, bool]:
- return {
- item.get("filename"): bool(item.get("exists"))
- for item in runtime_files.get("files", [])
- if item.get("filename")
- }
- def _real_interface_progress(
- runtime_files: dict[str, Any],
- timeline: dict[str, Any],
- ) -> dict[str, bool]:
- file_status = _runtime_file_status(runtime_files)
- stages = (timeline.get("summary") or {}).get("stage_duration_ms") or {}
- return {
- "source_loaded": bool(file_status.get("source_context.json")),
- "query_generated": bool(file_status.get("search_queries.jsonl")),
- "platform_attempted": "search_platform" in stages
- or bool(file_status.get("discovered_content_items.jsonl")),
- "scoring_attempted": "evaluate_rules" in stages
- or bool(file_status.get("rule_decisions.jsonl")),
- "final_output_generated": bool(file_status.get("final_output.json")),
- }
- def _acceptance_status(result: dict[str, Any]) -> str:
- if not result["db_run_record_present"]:
- return "fail"
- if not result["real_interface_progress"]["query_generated"]:
- return "blocked_before_platform"
- if result["status"] == "failed" and not result["failure_classification"]:
- return "fail"
- return "pass"
- def _db_run_record_present(dashboard: dict[str, Any], data_origin: str | None) -> bool:
- if data_origin not in ALLOWED_DATA_ORIGINS:
- return False
- summary = dashboard.get("summary") or {}
- return bool(summary.get("run_id"))
- def _failure_classification(
- state: dict[str, Any],
- dashboard: dict[str, Any],
- ) -> dict[str, Any] | None:
- if state.get("status") not in {"failed", "partial_success"}:
- return None
- primary = dashboard.get("primary_failure_reason") or {}
- error_code = state.get("error_code") or primary.get("reason_code")
- error_detail = state.get("error_detail") or {}
- return {
- "category": _failure_category(error_code, error_detail),
- "error_code": error_code,
- "message": state.get("error_message") or primary.get("message"),
- "error_detail": error_detail,
- "primary_failure_reason": primary,
- }
- def _failure_category(error_code: str | None, detail: dict[str, Any]) -> str:
- code = str(error_code or "")
- text = json.dumps(detail, ensure_ascii=False, default=str).lower()
- if code in {"PLATFORM_CONFIG_MISSING", "DB_CONFIG_MISSING", "INVALID_SOURCE", "INVALID_REQUEST"}:
- return "configuration"
- if code in {"PLATFORM_RATE_LIMITED"} or "429" in text or "rate" in text:
- return "rate_limit"
- if code.startswith("PLATFORM_"):
- return "upstream_platform"
- if "gemini" in text or "openrouter" in text:
- return "gemini_technical_failure"
- if "timeout" in text or "network" in text or "connection" in text:
- return "network_or_vpn"
- return "runtime_or_validator"
- def _db_runtime_enabled() -> bool:
- value = os.environ.get("CONTENT_AGENT_DB_RUNTIME_ENABLED")
- if value is None:
- value = _env_file_value("CONTENT_AGENT_DB_RUNTIME_ENABLED")
- return str(value or "").lower() in {"1", "true", "yes", "on"}
- def _env_file_value(key: str) -> str | None:
- try:
- lines = open(".env", encoding="utf-8").read().splitlines()
- except FileNotFoundError:
- return None
- for line in lines:
- stripped = line.strip()
- if not stripped or stripped.startswith("#") or "=" not in stripped:
- continue
- name, value = stripped.split("=", 1)
- if name.strip() == key:
- return value.strip().strip('"').strip("'")
- return None
- if __name__ == "__main__":
- raise SystemExit(main())
|