| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681 |
- from __future__ import annotations
- import os
- from collections import Counter
- from datetime import datetime, timezone
- from pathlib import Path
- from typing import Any
- from uuid import uuid4
- from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION, RUNTIME_SCHEMA_VERSION
- from content_agent.business_modules import run_record
- from content_agent.errors import ContentAgentError, ErrorCode, error_from_exception
- from content_agent.graph import RunDependencies, build_run_graph
- from content_agent.integrations.composite_runtime import CompositeRuntimeStore
- from content_agent.integrations.database_runtime import ContentSupplyDbConfig, DatabaseRuntimeStore
- from content_agent.integrations.demand_source import DemandSourceService
- from content_agent.integrations.douyin import CrawapiDouyinClient
- from content_agent.integrations.gemini_quota import QuotaCappedGeminiVideoClient
- from content_agent.integrations.gemini_video import GeminiVideoClient as RealGeminiVideoClient
- from content_agent.integrations.walk_graph_json import WalkGraphStore
- from content_agent.integrations.mock_platform import MockPlatformClient
- from content_agent.integrations.shipinhao import CrawapiShipinhaoClient
- from content_agent.integrations.policy_json import JsonPolicyBundleStore
- from content_agent.integrations.query_variant import (
- MissingQueryVariantClient,
- query_variant_client_from_env,
- )
- from content_agent.integrations.runtime_files import LocalRuntimeFileStore
- from content_agent.interfaces import (
- GeminiVideoClient,
- PlatformSearchClient,
- PolicyBundleStore,
- QueryVariantClient,
- RuntimeStore,
- )
- from content_agent.models import RunState
- from content_agent.record_payload import with_raw_payload
- from content_agent.schemas import RunStartRequest
- class RunService:
- def __init__(
- self,
- runtime_root: Path | str = Path("runtime/v1"),
- runtime: RuntimeStore | None = None,
- policy_store: PolicyBundleStore | None = None,
- demand_source: DemandSourceService | None = None,
- query_variant_client: QueryVariantClient | None = None,
- gemini_video_client: GeminiVideoClient | None = None,
- ) -> None:
- self.runtime = runtime or LocalRuntimeFileStore(runtime_root)
- self.policy_store = policy_store or JsonPolicyBundleStore(Path("."))
- self.demand_source = demand_source
- self.query_variant_client = query_variant_client or MissingQueryVariantClient(
- "query variant client is not configured"
- )
- self._gemini_video_client = gemini_video_client or _DeterministicGeminiVideoClient()
- @classmethod
- def from_env(cls, runtime_root: Path | str = Path("runtime/v1")) -> "RunService":
- local_runtime = LocalRuntimeFileStore(runtime_root)
- env = _merged_project_env()
- query_variant_client = query_variant_client_from_env(env)
- gemini_video_client = _gemini_video_client_from_env(env)
- db_runtime_enabled = _env_enabled("CONTENT_AGENT_DB_RUNTIME_ENABLED")
- try:
- config = ContentSupplyDbConfig.from_env()
- except Exception as exc:
- if not db_runtime_enabled:
- return cls(
- runtime_root=runtime_root,
- runtime=local_runtime,
- query_variant_client=query_variant_client,
- gemini_video_client=gemini_video_client,
- )
- raise ContentAgentError(
- ErrorCode.DB_CONFIG_MISSING,
- "content supply db config is missing",
- {"exception_type": type(exc).__name__},
- ) from exc
- demand_source = DemandSourceService(config)
- if not db_runtime_enabled:
- return cls(
- runtime_root=runtime_root,
- runtime=local_runtime,
- demand_source=demand_source,
- query_variant_client=query_variant_client,
- gemini_video_client=gemini_video_client,
- )
- db_runtime = DatabaseRuntimeStore(config)
- return cls(
- runtime_root=runtime_root,
- runtime=CompositeRuntimeStore(db_runtime, local_runtime),
- demand_source=demand_source,
- query_variant_client=query_variant_client,
- gemini_video_client=gemini_video_client,
- )
- def start_run(self, request: RunStartRequest) -> RunState:
- run_id = request.run_id or f"v1_run_{uuid4().hex[:12]}"
- policy_run_id = f"policy_run_{run_id.removeprefix('v1_run_')}"
- source_ref = self._source_ref_from_request(request)
- initial_state: RunState = {
- "run_id": run_id,
- "policy_run_id": policy_run_id,
- "schema_version": RUNTIME_SCHEMA_VERSION,
- "platform": request.platform,
- "platform_mode": request.platform_mode,
- "source": request.source,
- "strategy_version": request.strategy_version,
- "current_step": "start",
- "status": "running",
- "errors": [],
- }
- try:
- self.runtime.prepare_run(run_id)
- self._create_run_record(run_id, request, source_ref)
- source = self._resolve_source(request)
- resolved_source_ref = self._source_ref_from_resolved_source(source, source_ref)
- if resolved_source_ref != source_ref:
- self.runtime.update_run_record(
- run_id,
- {
- "demand_content_id": _demand_content_id_from_source(source),
- "source_ref": resolved_source_ref,
- },
- )
- initial_state["source"] = source
- self._append_lifecycle_event(
- run_id,
- policy_run_id,
- event_id="lifecycle_start",
- event_type="run_started",
- status="running",
- message="run started",
- raw_payload={"source_ref": resolved_source_ref},
- )
- # M5C: 每 run new 一个配额 wrapper(per-run 计数,跨初始+walk 两次 recall 共享)。
- gemini_client = QuotaCappedGeminiVideoClient(self._gemini_video_client, _gemini_calls_cap())
- deps = RunDependencies(
- runtime=self.runtime,
- platform_client=self._platform_client(request.platform, request.platform_mode),
- policy_store=self.policy_store,
- query_variant_client=self.query_variant_client,
- gemini_video_client=gemini_client,
- )
- graph = build_run_graph(deps)
- state = graph.invoke(initial_state)
- if gemini_client.cap is not None and gemini_client.used >= gemini_client.cap:
- # 走 run_events.jsonl 通道(同 stage 事件):文件/DB 双模式都可观测。
- quota_event = with_raw_payload(
- {
- "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
- "run_id": run_id,
- "policy_run_id": policy_run_id,
- "event_id": "gemini_quota",
- "event_type": "gemini_quota_exhausted",
- "status": "running",
- "input_ref": None,
- "output_ref": None,
- "error_code": None,
- "message": "gemini call quota reached; remaining judgments truncated",
- "created_at": _utc_now(),
- }
- )
- quota_event["raw_payload"].update({"cap": gemini_client.cap, "used": gemini_client.used})
- self.runtime.append_jsonl(run_id, "run_events.jsonl", [quota_event])
- self._record_success_metadata(state)
- return state
- except Exception as exc:
- error = self._classify_error(exc)
- failed_state = self._failed_state(initial_state, error)
- self._record_failure_metadata(
- failed_state,
- request,
- source_ref,
- error,
- )
- return failed_state
- def _source_ref_from_request(self, request: RunStartRequest) -> dict[str, Any]:
- if request.demand_content_id is not None:
- return {
- "source_type": "demand_content",
- "demand_content_id": request.demand_content_id,
- }
- if request.run_label:
- return {
- "source_type": "demand_content",
- "run_label": request.run_label,
- }
- if request.source:
- return {"source_type": "local_source", "source": request.source}
- return {
- "source_type": "demand_content_default",
- "selector": "pg_pattern_v2_passed_first",
- }
- def _resolve_source(self, request: RunStartRequest) -> str | dict[str, Any] | None:
- if request.demand_content_id is not None:
- if not self.demand_source:
- raise ContentAgentError(
- ErrorCode.DB_CONFIG_MISSING,
- "demand source db is not configured",
- {"selector": "demand_content_id"},
- )
- source = self.demand_source.get_by_id(request.demand_content_id)
- return source
- if request.run_label:
- if not self.demand_source:
- raise ContentAgentError(
- ErrorCode.DB_CONFIG_MISSING,
- "demand source db is not configured",
- {"selector": "run_label"},
- )
- source = self.demand_source.get_by_run_label(request.run_label)
- return source
- if request.source:
- return request.source
- if not self.demand_source:
- raise ContentAgentError(
- ErrorCode.DB_CONFIG_MISSING,
- "demand source db is not configured",
- {"selector": "default_pg_pattern_v2_passed"},
- )
- return self.demand_source.get_default_pg_pattern_source()
- def _source_ref_from_resolved_source(
- self,
- source: str | dict[str, Any] | None,
- source_ref: dict[str, Any],
- ) -> dict[str, Any]:
- if source_ref.get("source_type") != "demand_content_default" or not isinstance(
- source,
- dict,
- ):
- return source_ref
- demand_content_id = _demand_content_id_from_source(source)
- resolved = dict(source_ref)
- if demand_content_id is not None:
- resolved["demand_content_id"] = demand_content_id
- return resolved
- def _create_run_record(
- self,
- run_id: str,
- request: RunStartRequest,
- source_ref: dict[str, Any],
- ) -> None:
- self.runtime.create_run_record(
- {
- "run_id": run_id,
- "demand_content_id": request.demand_content_id,
- "run_label": request.run_label,
- "platform": request.platform,
- "platform_mode": request.platform_mode,
- "strategy_version": request.strategy_version,
- "status": "running",
- "current_step": "start",
- "source_ref": source_ref,
- "started_at": _utc_now(),
- }
- )
- def _record_success_metadata(self, state: RunState) -> None:
- final_status = "partial_success" if state.get("query_failures") else "success"
- state["status"] = final_status
- self.runtime.record_policy_run(_policy_run_record_from_state(state))
- validation = self.validate_run(state["run_id"])
- self._update_final_output_validation(state["run_id"], validation)
- self.runtime.update_run_record(
- state["run_id"],
- {
- "status": final_status,
- "current_step": state.get("current_step", "review_strategy"),
- "validation_status": validation["status"],
- "completed_at": _utc_now(),
- },
- )
- self._append_lifecycle_event(
- state["run_id"],
- state["policy_run_id"],
- event_id="lifecycle_success",
- event_type="run_succeeded",
- status=final_status,
- message="run succeeded"
- if final_status == "success"
- else "run partially succeeded",
- output_ref="final_output.json",
- raw_payload={
- "validation_status": validation["status"],
- "policy_bundle_id": state.get("policy_bundle_id"),
- "query_failures": state.get("query_failures", []),
- },
- )
- def _update_final_output_validation(self, run_id: str, validation: dict[str, Any]) -> None:
- final_output = self.runtime.read_json(run_id, "final_output.json")
- validation_status = validation["status"]
- findings = validation.get("findings", [])
- final_output["validation_status"] = validation_status
- summary = final_output.setdefault("summary", {})
- summary["run_path_complete"] = validation_status == "pass"
- summary["trace_complete"] = validation_status == "pass"
- summary["validation_findings_summary"] = [
- finding.get("message", str(finding)) if isinstance(finding, dict) else str(finding)
- for finding in findings
- ]
- self.runtime.update_json(run_id, "final_output.json", final_output)
- def _record_failure_metadata(
- self,
- state: RunState,
- request: RunStartRequest,
- source_ref: dict[str, Any],
- error: ContentAgentError,
- ) -> None:
- run_id = state["run_id"]
- policy_run_id = state["policy_run_id"]
- try:
- self.runtime.update_run_record(
- run_id,
- {
- "status": "failed",
- "current_step": state.get("current_step", "failed"),
- "error_code": error.error_code.value,
- "error_message": error.message,
- "error_detail": error.detail,
- "completed_at": _utc_now(),
- },
- )
- self._record_platform_query_failure_details(run_id, policy_run_id, error)
- self._append_lifecycle_event(
- run_id,
- policy_run_id,
- event_id="lifecycle_failed",
- event_type="run_failed",
- status="failed",
- message=error.message,
- error_code=error.error_code.value,
- raw_payload={
- "error_detail": error.detail,
- "source_ref": source_ref,
- "platform_mode": request.platform_mode,
- },
- )
- except Exception:
- # Preserve the original run failure; DB failure here is already reflected by the state.
- return
- def _record_platform_query_failure_details(
- self,
- run_id: str,
- policy_run_id: str,
- error: ContentAgentError,
- ) -> None:
- query_failures = _query_failures_from_error(error)
- if not query_failures:
- return
- try:
- search_queries = self.runtime.read_jsonl(run_id, "search_queries.jsonl")
- except FileNotFoundError:
- return
- if not search_queries:
- return
- records = run_record.build_platform_query_failure_records(
- run_id,
- policy_run_id,
- search_queries,
- query_failures,
- )
- self.runtime.append_jsonl(run_id, "search_queries.jsonl", records["search_queries"])
- self.runtime.append_jsonl(run_id, "search_clues.jsonl", records["search_clues"])
- self.runtime.append_jsonl(run_id, "run_events.jsonl", records["run_events"])
- def _append_lifecycle_event(
- self,
- run_id: str,
- policy_run_id: str,
- *,
- event_id: str,
- event_type: str,
- status: str,
- message: str,
- input_ref: str | None = None,
- output_ref: str | None = None,
- error_code: str | None = None,
- raw_payload: dict[str, Any] | None = None,
- ) -> None:
- self.runtime.append_run_event_records(
- run_id,
- policy_run_id,
- [
- {
- "event_id": event_id,
- "event_type": event_type,
- "status": status,
- "input_ref": input_ref,
- "output_ref": output_ref,
- "error_code": error_code,
- "message": message,
- "raw_payload": raw_payload or {},
- "created_at": _utc_now(),
- }
- ],
- )
- def _failed_state(self, initial_state: RunState, error: ContentAgentError) -> RunState:
- return {
- **initial_state,
- "current_step": "failed",
- "status": "failed",
- "error_code": error.error_code.value,
- "error_message": error.message,
- "error_detail": error.detail,
- "http_status_code": error.status_code,
- "errors": [error.message],
- }
- def _classify_error(self, exc: Exception) -> ContentAgentError:
- if isinstance(exc, ContentAgentError):
- return exc
- if isinstance(exc, FileNotFoundError):
- return ContentAgentError(
- ErrorCode.INVALID_SOURCE,
- "source file not found",
- {"exception_type": type(exc).__name__},
- status_code=400,
- )
- if isinstance(exc, ValueError) and "unknown strategy_version" in str(exc):
- return ContentAgentError(
- ErrorCode.POLICY_BUNDLE_NOT_FOUND,
- "policy bundle not found",
- {"exception_type": type(exc).__name__},
- status_code=400,
- )
- if isinstance(exc, ValueError) and "evidence_pack" in str(exc):
- return ContentAgentError(
- ErrorCode.INVALID_SOURCE,
- "invalid source",
- {"exception_type": type(exc).__name__},
- status_code=400,
- )
- return error_from_exception(exc, detail={"exception_type": type(exc).__name__})
- def _platform_client(self, platform: str, platform_mode: str) -> PlatformSearchClient:
- if platform_mode == "mock":
- return MockPlatformClient()
- if platform_mode == "real":
- real_clients = {
- "douyin": CrawapiDouyinClient.from_env,
- "shipinhao": CrawapiShipinhaoClient.from_env,
- }
- builder = real_clients.get(platform)
- if builder is None:
- raise ContentAgentError(
- ErrorCode.INVALID_REQUEST,
- "unsupported real platform",
- {"platform": platform},
- status_code=400,
- )
- try:
- return builder()
- except Exception as exc:
- raise ContentAgentError(
- ErrorCode.PLATFORM_CONFIG_MISSING,
- "platform config missing",
- {"exception_type": type(exc).__name__},
- ) from exc
- raise ContentAgentError(
- ErrorCode.INVALID_REQUEST,
- "unsupported platform_mode",
- {"platform_mode": platform_mode},
- status_code=400,
- )
- def get_summary(self, run_id: str) -> dict:
- final_output_exists = (self.runtime.run_dir(run_id) / "final_output.json").exists()
- status = self._summary_status(run_id, final_output_exists)
- validation = self.validate_run(run_id) if final_output_exists else {"status": "fail"}
- policy_run_id = None
- if final_output_exists:
- policy_run_id = self.runtime.read_json(run_id, "final_output.json").get("policy_run_id")
- return {
- "run_id": run_id,
- "policy_run_id": policy_run_id,
- "status": status,
- "current_step": "review_strategy" if final_output_exists else "unknown",
- "output_dir": str(self.runtime.run_dir(run_id)),
- "files": self.runtime.file_status(run_id),
- "validation_status": validation["status"],
- "errors": [],
- }
- def _summary_status(self, run_id: str, final_output_exists: bool) -> str:
- try:
- run_events = self.runtime.read_jsonl(run_id, "run_events.jsonl")
- lifecycle_events = [
- event for event in run_events if str(event.get("event_id", "")).startswith("lifecycle_")
- ]
- except Exception:
- run_events = []
- lifecycle_events = []
- if lifecycle_events:
- return lifecycle_events[-1].get("status") or (
- "success" if final_output_exists else "failed"
- )
- if final_output_exists and any(
- event.get("event_type") == "platform_query_failed" for event in run_events
- ):
- return "partial_success"
- return "success" if final_output_exists else "failed"
- def read_jsonl(self, run_id: str, filename: str) -> list[dict]:
- return self.runtime.read_jsonl(run_id, filename)
- def read_json(self, run_id: str, filename: str) -> dict:
- return self.runtime.read_json(run_id, filename)
- def strategy_review(self, run_id: str) -> dict:
- try:
- return self.runtime.read_json(run_id, "strategy_review.json")
- except FileNotFoundError:
- policy_run_id = None
- try:
- policy_run_id = self.runtime.read_json(run_id, "final_output.json").get(
- "policy_run_id"
- )
- except FileNotFoundError:
- pass
- return {
- "schema_version": RUNTIME_SCHEMA_VERSION,
- "run_id": run_id,
- "policy_run_id": policy_run_id,
- "review_status": "not_generated",
- }
- def regenerate_strategy_review(self, run_id: str) -> dict:
- from content_agent.business_modules.learning_review import run
- final_output = self.runtime.read_json(run_id, "final_output.json")
- timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S%f")
- review_id = f"review_{final_output['policy_run_id']}_{timestamp}"
- return run(run_id, final_output["policy_run_id"], self.runtime, review_id=review_id)
- def validate_run(self, run_id: str) -> dict:
- from content_agent.business_modules.run_record import validate_run
- return validate_run(run_id, self.runtime)
- def _policy_run_record_from_state(state: RunState) -> dict[str, Any]:
- policy_bundle = state.get("policy_bundle") or {}
- decisions = state.get("rule_decisions") or []
- return {
- "run_id": state["run_id"],
- "policy_run_id": state["policy_run_id"],
- "run_role": "primary",
- "policy_bundle_id": state.get("policy_bundle_id"),
- "rule_pack_id": policy_bundle.get("rule_pack_id"),
- "strategy_id": policy_bundle.get("strategy_id"),
- "strategy_version": state.get("strategy_version"),
- "rule_pack_version": policy_bundle.get("rule_pack_version"),
- "walk_strategy_version": policy_bundle.get("walk_strategy_version"),
- "policy_bundle_hash": policy_bundle.get("policy_bundle_hash"),
- "strategy_source_ref": policy_bundle.get("strategy_source_ref"),
- "rule_pack_source_ref": policy_bundle.get("rule_pack_source_ref"),
- "evidence_bundle_schema_version": policy_bundle.get("evidence_bundle_schema_version"),
- "runtime_record_schema_version": policy_bundle.get("runtime_record_schema_version"),
- "status": state.get("status", "success"),
- "metrics": (state.get("final_output") or {}).get("summary", {}),
- "decision_summary": _decision_summary(decisions),
- "raw_payload": {
- "current_step": state.get("current_step"),
- "search_query_count": len(state.get("search_queries") or []),
- "discovered_content_count": len(state.get("discovered_content_items") or []),
- "decision_count": len(decisions),
- "final_output_ref": "final_output.json",
- "dispatch": policy_bundle.get("dispatch"),
- "runtime_status_contract": policy_bundle.get("runtime_status_contract", {}),
- "effect_status_mapping": policy_bundle.get("effect_status_mapping", []),
- "query_effect_aggregation": policy_bundle.get("query_effect_aggregation", []),
- "decision_reason_codes": policy_bundle.get("decision_reason_codes", []),
- "policy_bundle_hash": policy_bundle.get("policy_bundle_hash"),
- },
- }
- def _decision_summary(decisions: list[dict[str, Any]]) -> dict[str, Any]:
- return {
- "decision_action_counts": dict(Counter(item.get("decision_action") for item in decisions)),
- "decision_reason_code_counts": dict(
- Counter(item.get("decision_reason_code") for item in decisions)
- ),
- "effect_status_counts": dict(
- Counter(item.get("search_query_effect_status") for item in decisions)
- ),
- }
- def _query_failures_from_error(error: ContentAgentError) -> list[dict[str, Any]]:
- query_failures = error.detail.get("query_failures") if isinstance(error.detail, dict) else None
- if not isinstance(query_failures, list):
- return []
- return [failure for failure in query_failures if isinstance(failure, dict)]
- def _gemini_video_client_from_env(env: dict[str, str]) -> GeminiVideoClient:
- return RealGeminiVideoClient.from_env(env)
- def _gemini_calls_cap() -> int | None:
- try:
- return WalkGraphStore().load_policy()["global"]["gemini_calls_per_run_cap"]
- except Exception:
- return None
- class _DeterministicGeminiVideoClient:
- """mock/默认判定 client:固定返回适合 50+ 的高分结果,供本地/smoke 无网跑通。"""
- def analyze(
- self,
- content: dict[str, Any],
- media: dict[str, Any],
- source_context: dict[str, Any],
- ) -> dict[str, Any]:
- return {
- "fit_senior_50plus": True,
- "fit_confidence": 0.9,
- "relevance_score": 0.8,
- "reason": "deterministic stub judgment",
- }
- def _env_enabled(key: str) -> bool:
- value = _load_project_env().get(key)
- if os.environ.get(key) is not None:
- value = os.environ[key]
- return (value or "").lower() in {"1", "true", "yes", "on"}
- def _merged_project_env() -> dict[str, str]:
- return {
- **_load_project_env(),
- **dict(os.environ),
- }
- def _demand_content_id_from_source(source: str | dict[str, Any] | None) -> int | None:
- if not isinstance(source, dict):
- return None
- value = source.get("id") or source.get("demand_content_id")
- if value in (None, ""):
- return None
- try:
- return int(value)
- except (TypeError, ValueError):
- return None
- def _utc_now() -> str:
- return datetime.now(timezone.utc).isoformat()
- def _load_project_env(env_file: Path | str = ".env") -> dict[str, str]:
- path = Path(env_file)
- if not path.exists():
- return {}
- result: dict[str, str] = {}
- for line in path.read_text(encoding="utf-8").splitlines():
- stripped = line.strip()
- if not stripped or stripped.startswith("#") or "=" not in stripped:
- continue
- key, value = stripped.split("=", 1)
- result[key.strip()] = value.strip().strip('"').strip("'")
- return result
|