#!/usr/bin/env python3 from __future__ import annotations import json import os import sys from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from content_agent.business_modules.m6_acceptance_report import build_report from content_agent.dashboard_service import DashboardService from content_agent.run_service import RunService from content_agent.schemas import RunStartRequest PLATFORMS = ("douyin", "kuaishou", "shipinhao") ALLOWED_DATA_ORIGINS = {"production_db", "mixed_with_runtime_export"} SMOKE_DEFAULTS = { "CONTENT_AGENT_GEMINI_MAX_WORKERS": "2", "CONTENT_AGENT_WALK_MAX_TOTAL_ACTIONS_PER_RUN": "4", } def main() -> int: if not _db_runtime_enabled(): raise SystemExit("CONTENT_AGENT_DB_RUNTIME_ENABLED=1 is required for M6 real acceptance") _apply_smoke_defaults() service = RunService.from_env() dashboard_service = DashboardService.from_runtime(service.runtime) results = [] for platform in PLATFORMS: state = service.start_run( RunStartRequest(platform=platform, platform_mode="real", strategy_version="V4") ) run_id = state["run_id"] result = _collect_run_result(service, dashboard_service, state, platform) results.append(result) result["acceptance_status"] = _acceptance_status(result) payload = { "schema_version": "v4_m6_real_acceptance.v1", "source_kind": "db_default_safe_source", "smoke_limits": _smoke_limits(), "platforms": list(PLATFORMS), "results": results, "status": "pass" if all(row["acceptance_status"] == "pass" for row in results) else "fail", } print(json.dumps(payload, ensure_ascii=False, indent=2, default=str)) return 0 if payload["status"] == "pass" else 1 def _apply_smoke_defaults() -> None: for key, value in SMOKE_DEFAULTS.items(): os.environ.setdefault(key, value) def _smoke_limits() -> dict[str, str | None]: return {key: os.environ.get(key) for key in SMOKE_DEFAULTS} def _collect_run_result( service: RunService, dashboard_service: DashboardService, state: dict[str, Any], platform: str, ) -> dict[str, Any]: run_id = state["run_id"] summary = _safe_call(lambda: service.get_summary(run_id), {}) validation = _safe_call(lambda: service.validate_run(run_id), {"status": "fail", "findings": []}) dashboard = _safe_call(lambda: dashboard_service.dashboard(run_id), {}) timeline = _safe_call(lambda: dashboard_service.timeline(run_id), {}) strategy_review = _safe_call(lambda: service.strategy_review(run_id), {}) runtime_files = _safe_call(lambda: dashboard_service.runtime_files(run_id), {}) report = _safe_call( lambda: build_report(run_id, service.runtime, platform=platform, platform_mode="real"), {}, ) data_origin = dashboard.get("data_origin") or runtime_files.get("data_origin") return { "platform": platform, "platform_mode": "real", "run_id": run_id, "policy_run_id": state.get("policy_run_id") or summary.get("policy_run_id"), "status": state.get("status"), "validation_status": validation.get("status"), "output_dir": summary.get("output_dir"), "data_origin": data_origin, "db_run_record_present": _db_run_record_present(dashboard, data_origin), "runtime_files": _runtime_file_status(runtime_files), "real_interface_progress": _real_interface_progress(runtime_files, timeline), "dashboard_summary": dashboard.get("summary", {}), "timeline_summary": timeline.get("summary", {}), "strategy_review_status": strategy_review.get("review_status"), "failure_classification": _failure_classification(state, dashboard), "m6_report": report, } def _safe_call(fn: Any, fallback: Any) -> Any: try: return fn() except Exception as exc: return {"error": type(exc).__name__, "message": str(exc)} if isinstance(fallback, dict) else fallback def _runtime_file_status(runtime_files: dict[str, Any]) -> dict[str, bool]: return { item.get("filename"): bool(item.get("exists")) for item in runtime_files.get("files", []) if item.get("filename") } def _real_interface_progress( runtime_files: dict[str, Any], timeline: dict[str, Any], ) -> dict[str, bool]: file_status = _runtime_file_status(runtime_files) stages = (timeline.get("summary") or {}).get("stage_duration_ms") or {} return { "source_loaded": bool(file_status.get("source_context.json")), "query_generated": bool(file_status.get("search_queries.jsonl")), "platform_attempted": "search_platform" in stages or bool(file_status.get("discovered_content_items.jsonl")), "scoring_attempted": "evaluate_rules" in stages or bool(file_status.get("rule_decisions.jsonl")), "final_output_generated": bool(file_status.get("final_output.json")), } def _acceptance_status(result: dict[str, Any]) -> str: if not result["db_run_record_present"]: return "fail" if not result["real_interface_progress"]["query_generated"]: return "blocked_before_platform" if result["status"] == "failed" and not result["failure_classification"]: return "fail" return "pass" def _db_run_record_present(dashboard: dict[str, Any], data_origin: str | None) -> bool: if data_origin not in ALLOWED_DATA_ORIGINS: return False summary = dashboard.get("summary") or {} return bool(summary.get("run_id")) def _failure_classification( state: dict[str, Any], dashboard: dict[str, Any], ) -> dict[str, Any] | None: if state.get("status") not in {"failed", "partial_success"}: return None primary = dashboard.get("primary_failure_reason") or {} error_code = state.get("error_code") or primary.get("reason_code") error_detail = state.get("error_detail") or {} return { "category": _failure_category(error_code, error_detail), "error_code": error_code, "message": state.get("error_message") or primary.get("message"), "error_detail": error_detail, "primary_failure_reason": primary, } def _failure_category(error_code: str | None, detail: dict[str, Any]) -> str: code = str(error_code or "") text = json.dumps(detail, ensure_ascii=False, default=str).lower() if code in {"PLATFORM_CONFIG_MISSING", "DB_CONFIG_MISSING", "INVALID_SOURCE", "INVALID_REQUEST"}: return "configuration" if code in {"PLATFORM_RATE_LIMITED"} or "429" in text or "rate" in text: return "rate_limit" if code.startswith("PLATFORM_"): return "upstream_platform" if "gemini" in text or "openrouter" in text: return "gemini_technical_failure" if "timeout" in text or "network" in text or "connection" in text: return "network_or_vpn" return "runtime_or_validator" def _db_runtime_enabled() -> bool: value = os.environ.get("CONTENT_AGENT_DB_RUNTIME_ENABLED") if value is None: value = _env_file_value("CONTENT_AGENT_DB_RUNTIME_ENABLED") return str(value or "").lower() in {"1", "true", "yes", "on"} def _env_file_value(key: str) -> str | None: try: lines = open(".env", encoding="utf-8").read().splitlines() except FileNotFoundError: return None for line in lines: stripped = line.strip() if not stripped or stripped.startswith("#") or "=" not in stripped: continue name, value = stripped.split("=", 1) if name.strip() == key: return value.strip().strip('"').strip("'") return None if __name__ == "__main__": raise SystemExit(main())