|
@@ -0,0 +1,925 @@
|
|
|
|
|
+from __future__ import annotations
|
|
|
|
|
+
|
|
|
|
|
+from datetime import date, datetime
|
|
|
|
|
+from pathlib import Path
|
|
|
|
|
+from typing import Any
|
|
|
|
|
+
|
|
|
|
|
+from content_agent.business_modules.run_record import validate_run
|
|
|
|
|
+from content_agent.integrations.composite_runtime import CompositeRuntimeStore
|
|
|
|
|
+from content_agent.integrations.database_runtime import DatabaseRuntimeStore
|
|
|
|
|
+from content_agent.integrations.database_runtime import RUNTIME_FILE_TABLES
|
|
|
|
|
+from content_agent.integrations.runtime_files import RUNTIME_FILENAMES
|
|
|
|
|
+from content_agent.interfaces import RuntimeStore
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+DATA_ORIGIN_PRODUCTION_DB = "production_db"
|
|
|
|
|
+DATA_ORIGIN_RUNTIME_EXPORT = "runtime_export"
|
|
|
|
|
+DATA_ORIGIN_MIXED = "mixed_with_runtime_export"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class DashboardService:
|
|
|
|
|
+ def __init__(
|
|
|
|
|
+ self,
|
|
|
|
|
+ runtime: RuntimeStore,
|
|
|
|
|
+ export_runtime: RuntimeStore | None = None,
|
|
|
|
|
+ ) -> None:
|
|
|
|
|
+ self.runtime = runtime
|
|
|
|
|
+ self.export_runtime = export_runtime
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def from_runtime(cls, runtime: RuntimeStore) -> "DashboardService":
|
|
|
|
|
+ if isinstance(runtime, CompositeRuntimeStore):
|
|
|
|
|
+ return cls(runtime.primary, runtime.export)
|
|
|
|
|
+ return cls(runtime)
|
|
|
|
|
+
|
|
|
|
|
+ @property
|
|
|
|
|
+ def data_origin(self) -> str:
|
|
|
|
|
+ if isinstance(self.runtime, DatabaseRuntimeStore):
|
|
|
|
|
+ return DATA_ORIGIN_PRODUCTION_DB
|
|
|
|
|
+ return DATA_ORIGIN_RUNTIME_EXPORT
|
|
|
|
|
+
|
|
|
|
|
+ def run_exists(self, run_id: str) -> bool:
|
|
|
|
|
+ if isinstance(self.runtime, DatabaseRuntimeStore):
|
|
|
|
|
+ row = self.runtime._fetch_one(
|
|
|
|
|
+ "SELECT COUNT(*) AS cnt FROM `content_agent_runs` WHERE `run_id` = %s",
|
|
|
|
|
+ (run_id,),
|
|
|
|
|
+ )
|
|
|
|
|
+ return bool(row and row.get("cnt", 0))
|
|
|
|
|
+ try:
|
|
|
|
|
+ return self.runtime.run_dir(run_id).exists()
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ return any(self.runtime.file_status(run_id).values())
|
|
|
|
|
+
|
|
|
|
|
+ def list_runs(
|
|
|
|
|
+ self,
|
|
|
|
|
+ *,
|
|
|
|
|
+ status: str | None = None,
|
|
|
|
|
+ platform: str | None = None,
|
|
|
|
|
+ platform_mode: str | None = None,
|
|
|
|
|
+ strategy_version: str | None = None,
|
|
|
|
|
+ validation_status: str | None = None,
|
|
|
|
|
+ error_code: str | None = None,
|
|
|
|
|
+ page: int = 1,
|
|
|
|
|
+ page_size: int = 20,
|
|
|
|
|
+ ) -> dict[str, Any]:
|
|
|
|
|
+ page = max(page, 1)
|
|
|
|
|
+ page_size = min(max(page_size, 1), 100)
|
|
|
|
|
+ if isinstance(self.runtime, DatabaseRuntimeStore):
|
|
|
|
|
+ return self._list_db_runs(
|
|
|
|
|
+ status=status,
|
|
|
|
|
+ platform=platform,
|
|
|
|
|
+ platform_mode=platform_mode,
|
|
|
|
|
+ strategy_version=strategy_version,
|
|
|
|
|
+ validation_status=validation_status,
|
|
|
|
|
+ error_code=error_code,
|
|
|
|
|
+ page=page,
|
|
|
|
|
+ page_size=page_size,
|
|
|
|
|
+ )
|
|
|
|
|
+ items = [
|
|
|
|
|
+ self._local_run_list_item(run_id)
|
|
|
|
|
+ for run_id in getattr(self.runtime, "list_runs", lambda: [])()
|
|
|
|
|
+ ]
|
|
|
|
|
+ items = [
|
|
|
|
|
+ item
|
|
|
|
|
+ for item in items
|
|
|
|
|
+ if _matches(item, "status", status)
|
|
|
|
|
+ and _matches(item, "platform", platform)
|
|
|
|
|
+ and _matches(item, "platform_mode", platform_mode)
|
|
|
|
|
+ and _matches(item, "strategy_version", strategy_version)
|
|
|
|
|
+ and _matches(item, "validation_status", validation_status)
|
|
|
|
|
+ and _matches(item, "error_code", error_code)
|
|
|
|
|
+ ]
|
|
|
|
|
+ total = len(items)
|
|
|
|
|
+ offset = (page - 1) * page_size
|
|
|
|
|
+ return {
|
|
|
|
|
+ "items": items[offset:offset + page_size],
|
|
|
|
|
+ "page": page,
|
|
|
|
|
+ "page_size": page_size,
|
|
|
|
|
+ "total": total,
|
|
|
|
|
+ "data_origin": self.data_origin,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def dashboard(self, run_id: str) -> dict[str, Any]:
|
|
|
|
|
+ runtime_files_response = self.runtime_files(run_id)
|
|
|
|
|
+ runtime_files = runtime_files_response["files"]
|
|
|
|
|
+ file_status = {item["filename"]: item["exists"] for item in runtime_files}
|
|
|
|
|
+ record_counts = {item["filename"]: item["record_count"] for item in runtime_files}
|
|
|
|
|
+ final_output = self._read_json_optional(run_id, "final_output.json")
|
|
|
|
|
+ strategy_review = self._read_json_optional(run_id, "strategy_review.json")
|
|
|
|
|
+ source_context = self._read_json_optional(run_id, "source_context.json") or {}
|
|
|
|
|
+ validation = self._validate_optional(run_id)
|
|
|
|
|
+ run_item = self._db_run_item(run_id) if isinstance(self.runtime, DatabaseRuntimeStore) else self._local_run_list_item(run_id)
|
|
|
|
|
+ queries = self._read_jsonl_optional(run_id, "search_queries.jsonl")
|
|
|
|
|
+ run_events = self._read_jsonl_optional(run_id, "run_events.jsonl")
|
|
|
|
|
+ content_items = self._read_jsonl_optional(run_id, "discovered_content_items.jsonl")
|
|
|
|
|
+ decisions = self._read_jsonl_optional(run_id, "rule_decisions.jsonl")
|
|
|
|
|
+ walk_actions = self._read_jsonl_optional(run_id, "walk_actions.jsonl")
|
|
|
|
|
+ source_paths = self._read_jsonl_optional(run_id, "source_path_records.jsonl")
|
|
|
|
|
+ counts = {
|
|
|
|
|
+ "queries": record_counts.get("search_queries.jsonl", 0),
|
|
|
|
|
+ "discovered_content_items": record_counts.get("discovered_content_items.jsonl", 0),
|
|
|
|
|
+ "rule_decisions": record_counts.get("rule_decisions.jsonl", 0),
|
|
|
|
|
+ "walk_actions": record_counts.get("walk_actions.jsonl", 0),
|
|
|
|
|
+ "source_path_records": record_counts.get("source_path_records.jsonl", 0),
|
|
|
|
|
+ "run_events": record_counts.get("run_events.jsonl", 0),
|
|
|
|
|
+ }
|
|
|
|
|
+ primary_failure_reason = _primary_failure_reason(run_item, run_events)
|
|
|
|
|
+ return _json_safe({
|
|
|
|
|
+ "run_id": run_id,
|
|
|
|
|
+ "summary": run_item,
|
|
|
|
|
+ "counts": counts,
|
|
|
|
|
+ "files": file_status,
|
|
|
|
|
+ "runtime_files": runtime_files,
|
|
|
|
|
+ "validation": validation,
|
|
|
|
|
+ "final_output_summary": (final_output or {}).get("summary", {}),
|
|
|
|
|
+ "strategy_review_status": (strategy_review or {}).get(
|
|
|
|
|
+ "review_status",
|
|
|
|
|
+ "not_generated",
|
|
|
|
|
+ ),
|
|
|
|
|
+ "business_summary": _business_summary(
|
|
|
|
|
+ run_item,
|
|
|
|
|
+ counts,
|
|
|
|
|
+ final_output or {},
|
|
|
|
|
+ primary_failure_reason,
|
|
|
|
|
+ ),
|
|
|
|
|
+ "stage_conclusions": _stage_conclusions(
|
|
|
|
|
+ file_status,
|
|
|
|
|
+ counts,
|
|
|
|
|
+ run_item,
|
|
|
|
|
+ queries,
|
|
|
|
|
+ run_events,
|
|
|
|
|
+ decisions,
|
|
|
|
|
+ walk_actions,
|
|
|
|
|
+ final_output or {},
|
|
|
|
|
+ strategy_review or {},
|
|
|
|
|
+ source_context,
|
|
|
|
|
+ ),
|
|
|
|
|
+ "rule_application_summary": _rule_application_summary(decisions, content_items),
|
|
|
|
|
+ "walk_graph": _walk_graph(queries, content_items, walk_actions, source_paths),
|
|
|
|
|
+ "primary_failure_reason": primary_failure_reason,
|
|
|
|
|
+ "technical_refs": {
|
|
|
|
|
+ "runtime_files_url": f"/runs/{run_id}/runtime-files",
|
|
|
|
|
+ "validation_url": f"/runs/{run_id}/validation",
|
|
|
|
|
+ "data_origin": runtime_files_response["data_origin"],
|
|
|
|
|
+ "runtime_file_count": len(runtime_files),
|
|
|
|
|
+ },
|
|
|
|
|
+ "data_origin": runtime_files_response["data_origin"],
|
|
|
|
|
+ "links": {
|
|
|
|
|
+ "queries": f"/runs/{run_id}/queries",
|
|
|
|
|
+ "timeline": f"/runs/{run_id}/timeline",
|
|
|
|
|
+ "content_items": f"/runs/{run_id}/content-items",
|
|
|
|
|
+ "runtime_files": f"/runs/{run_id}/runtime-files",
|
|
|
|
|
+ },
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ def runtime_files(self, run_id: str) -> dict[str, Any]:
|
|
|
|
|
+ if isinstance(self.runtime, DatabaseRuntimeStore):
|
|
|
|
|
+ counts = self._db_runtime_file_counts(run_id)
|
|
|
|
|
+ files = [
|
|
|
|
|
+ {
|
|
|
|
|
+ "filename": filename,
|
|
|
|
|
+ "exists": counts.get(filename, 0) > 0,
|
|
|
|
|
+ "record_count": counts.get(filename, 0),
|
|
|
|
|
+ "file_type": "jsonl" if filename.endswith(".jsonl") else "json",
|
|
|
|
|
+ "contract_status": "current",
|
|
|
|
|
+ "data_origin": DATA_ORIGIN_PRODUCTION_DB,
|
|
|
|
|
+ }
|
|
|
|
|
+ for filename in RUNTIME_FILENAMES
|
|
|
|
|
+ ]
|
|
|
|
|
+ return {
|
|
|
|
|
+ "run_id": run_id,
|
|
|
|
|
+ "files": files,
|
|
|
|
|
+ "data_origin": DATA_ORIGIN_PRODUCTION_DB,
|
|
|
|
|
+ }
|
|
|
|
|
+ status = self.runtime.file_status(run_id)
|
|
|
|
|
+ files = []
|
|
|
|
|
+ for filename in RUNTIME_FILENAMES:
|
|
|
|
|
+ exists = bool(status.get(filename))
|
|
|
|
|
+ files.append({
|
|
|
|
|
+ "filename": filename,
|
|
|
|
|
+ "exists": exists,
|
|
|
|
|
+ "record_count": self._runtime_record_count(run_id, filename) if exists else 0,
|
|
|
|
|
+ "file_type": "jsonl" if filename.endswith(".jsonl") else "json",
|
|
|
|
|
+ "contract_status": "current",
|
|
|
|
|
+ "data_origin": self._file_origin(run_id, filename),
|
|
|
|
|
+ })
|
|
|
|
|
+ return {
|
|
|
|
|
+ "run_id": run_id,
|
|
|
|
|
+ "files": files,
|
|
|
|
|
+ "data_origin": self._combined_origin(run_id),
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def runtime_file(
|
|
|
|
|
+ self,
|
|
|
|
|
+ run_id: str,
|
|
|
|
|
+ filename: str,
|
|
|
|
|
+ *,
|
|
|
|
|
+ limit: int = 100,
|
|
|
|
|
+ offset: int = 0,
|
|
|
|
|
+ ) -> dict[str, Any]:
|
|
|
|
|
+ self._validate_runtime_filename(filename)
|
|
|
|
|
+ limit = min(max(limit, 1), 500)
|
|
|
|
|
+ offset = max(offset, 0)
|
|
|
|
|
+ if filename.endswith(".jsonl"):
|
|
|
|
|
+ records = self._read_jsonl_optional(run_id, filename)
|
|
|
|
|
+ return {
|
|
|
|
|
+ "run_id": run_id,
|
|
|
|
|
+ "filename": filename,
|
|
|
|
|
+ "records": _json_safe(records[offset:offset + limit]),
|
|
|
|
|
+ "offset": offset,
|
|
|
|
|
+ "limit": limit,
|
|
|
|
|
+ "total": len(records),
|
|
|
|
|
+ "data_origin": self._file_origin(run_id, filename),
|
|
|
|
|
+ }
|
|
|
|
|
+ data = self._read_json_optional(run_id, filename) or {}
|
|
|
|
|
+ return {
|
|
|
|
|
+ "run_id": run_id,
|
|
|
|
|
+ "filename": filename,
|
|
|
|
|
+ "data": _json_safe(data),
|
|
|
|
|
+ "data_origin": self._file_origin(run_id, filename),
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def queries(self, run_id: str) -> dict[str, Any]:
|
|
|
|
|
+ queries = self._read_jsonl_optional(run_id, "search_queries.jsonl")
|
|
|
|
|
+ clues_by_query = {
|
|
|
|
|
+ row.get("search_query_id"): row
|
|
|
|
|
+ for row in self._read_jsonl_optional(run_id, "search_clues.jsonl")
|
|
|
|
|
+ }
|
|
|
|
|
+ decisions = self._read_jsonl_optional(run_id, "rule_decisions.jsonl")
|
|
|
|
|
+ decision_counts: dict[str, dict[str, int]] = {}
|
|
|
|
|
+ for decision in decisions:
|
|
|
|
|
+ query_id = decision.get("search_query_id")
|
|
|
|
|
+ if not query_id:
|
|
|
|
|
+ continue
|
|
|
|
|
+ counts = decision_counts.setdefault(query_id, {})
|
|
|
|
|
+ action = decision.get("decision_action") or "unknown"
|
|
|
|
|
+ counts[action] = counts.get(action, 0) + 1
|
|
|
|
|
+ items = []
|
|
|
|
|
+ for query in queries:
|
|
|
|
|
+ query_id = query.get("search_query_id")
|
|
|
|
|
+ clue = clues_by_query.get(query_id) or {}
|
|
|
|
|
+ items.append(_json_safe({
|
|
|
|
|
+ **query,
|
|
|
|
|
+ "search_clue": clue,
|
|
|
|
|
+ "decision_action_counts": decision_counts.get(query_id, {}),
|
|
|
|
|
+ "search_query_effect_status": clue.get(
|
|
|
|
|
+ "search_query_effect_status",
|
|
|
|
|
+ query.get("search_query_effect_status"),
|
|
|
|
|
+ ),
|
|
|
|
|
+ "walk_next_step": clue.get("walk_next_step"),
|
|
|
|
|
+ "failure_reason": clue.get("failure_reason")
|
|
|
|
|
+ or (clue.get("raw_payload") or {}).get("failure_reason"),
|
|
|
|
|
+ }))
|
|
|
|
|
+ return {
|
|
|
|
|
+ "run_id": run_id,
|
|
|
|
|
+ "items": items,
|
|
|
|
|
+ "total": len(items),
|
|
|
|
|
+ "data_origin": self._combined_origin(run_id),
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def timeline(self, run_id: str) -> dict[str, Any]:
|
|
|
|
|
+ events = [
|
|
|
|
|
+ {
|
|
|
|
|
+ "source": "run_events.jsonl",
|
|
|
|
|
+ "stage": row.get("stage") or row.get("event_type"),
|
|
|
|
|
+ "event_type": row.get("event_type"),
|
|
|
|
|
+ "status": row.get("status"),
|
|
|
|
|
+ "timestamp": row.get("created_at") or row.get("timestamp"),
|
|
|
|
|
+ "error_code": row.get("error_code"),
|
|
|
|
|
+ "walk_action_id": (row.get("raw_payload") or {}).get("walk_action_id"),
|
|
|
|
|
+ "record": row,
|
|
|
|
|
+ }
|
|
|
|
|
+ for row in self._read_jsonl_optional(run_id, "run_events.jsonl")
|
|
|
|
|
+ ]
|
|
|
|
|
+ events.extend(
|
|
|
|
|
+ {
|
|
|
|
|
+ "source": "walk_actions.jsonl",
|
|
|
|
|
+ "stage": "walk",
|
|
|
|
|
+ "event_type": row.get("edge_type"),
|
|
|
|
|
+ "status": row.get("walk_status"),
|
|
|
|
|
+ "timestamp": row.get("created_at"),
|
|
|
|
|
+ "error_code": None,
|
|
|
|
|
+ "walk_action_id": row.get("walk_action_id"),
|
|
|
|
|
+ "record": row,
|
|
|
|
|
+ }
|
|
|
|
|
+ for row in self._read_jsonl_optional(run_id, "walk_actions.jsonl")
|
|
|
|
|
+ )
|
|
|
|
|
+ events.extend(
|
|
|
|
|
+ {
|
|
|
|
|
+ "source": "source_path_records.jsonl",
|
|
|
|
|
+ "stage": "source_path",
|
|
|
|
|
+ "event_type": row.get("source_path_type"),
|
|
|
|
|
+ "status": row.get("status"),
|
|
|
|
|
+ "timestamp": row.get("created_at"),
|
|
|
|
|
+ "error_code": None,
|
|
|
|
|
+ "walk_action_id": row.get("walk_action_id")
|
|
|
|
|
+ or (row.get("raw_payload") or {}).get("walk_action_id"),
|
|
|
|
|
+ "record": row,
|
|
|
|
|
+ }
|
|
|
|
|
+ for row in self._read_jsonl_optional(run_id, "source_path_records.jsonl")
|
|
|
|
|
+ )
|
|
|
|
|
+ events.sort(key=lambda item: str(item.get("timestamp") or ""))
|
|
|
|
|
+ return {
|
|
|
|
|
+ "run_id": run_id,
|
|
|
|
|
+ "items": _json_safe(events),
|
|
|
|
|
+ "total": len(events),
|
|
|
|
|
+ "data_origin": self._combined_origin(run_id),
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def content_items(self, run_id: str) -> dict[str, Any]:
|
|
|
|
|
+ media_by_platform_id = {
|
|
|
|
|
+ row.get("platform_content_id"): row
|
|
|
|
|
+ for row in self._read_jsonl_optional(run_id, "content_media_records.jsonl")
|
|
|
|
|
+ }
|
|
|
|
|
+ recall_by_platform_id = {
|
|
|
|
|
+ row.get("platform_content_id"): row
|
|
|
|
|
+ for row in self._read_jsonl_optional(run_id, "pattern_recall_evidence.jsonl")
|
|
|
|
|
+ }
|
|
|
|
|
+ decisions_by_target = {
|
|
|
|
|
+ row.get("decision_target_id"): row
|
|
|
|
|
+ for row in self._read_jsonl_optional(run_id, "rule_decisions.jsonl")
|
|
|
|
|
+ }
|
|
|
|
|
+ items = []
|
|
|
|
|
+ for content in self._read_jsonl_optional(run_id, "discovered_content_items.jsonl"):
|
|
|
|
|
+ platform_content_id = content.get("platform_content_id")
|
|
|
|
|
+ discovery_id = content.get("content_discovery_id")
|
|
|
|
|
+ items.append(_json_safe({
|
|
|
|
|
+ **content,
|
|
|
|
|
+ "media_record": media_by_platform_id.get(platform_content_id),
|
|
|
|
|
+ "pattern_recall_evidence": recall_by_platform_id.get(platform_content_id),
|
|
|
|
|
+ "rule_decision": decisions_by_target.get(discovery_id)
|
|
|
|
|
+ or decisions_by_target.get(platform_content_id),
|
|
|
|
|
+ }))
|
|
|
|
|
+ return {
|
|
|
|
|
+ "run_id": run_id,
|
|
|
|
|
+ "items": items,
|
|
|
|
|
+ "total": len(items),
|
|
|
|
|
+ "data_origin": self._combined_origin(run_id),
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def _list_db_runs(self, **filters: Any) -> dict[str, Any]:
|
|
|
|
|
+ page = filters.pop("page")
|
|
|
|
|
+ page_size = filters.pop("page_size")
|
|
|
|
|
+ where_sql, params = _db_run_filters(filters)
|
|
|
|
|
+ count_row = self.runtime._fetch_one(
|
|
|
|
|
+ f"SELECT COUNT(*) AS cnt FROM `content_agent_runs` r {where_sql}",
|
|
|
|
|
+ tuple(params),
|
|
|
|
|
+ )
|
|
|
|
|
+ rows = self.runtime._fetch_all(
|
|
|
|
|
+ "SELECT r.run_id, "
|
|
|
|
|
+ "(SELECT p.policy_run_id FROM `content_agent_policy_runs` p "
|
|
|
|
|
+ "WHERE p.run_id = r.run_id ORDER BY p.id DESC LIMIT 1) AS policy_run_id, "
|
|
|
|
|
+ "r.status, r.current_step, r.platform, r.platform_mode, r.strategy_version, "
|
|
|
|
|
+ "r.validation_status, r.error_code, r.started_at, r.completed_at "
|
|
|
|
|
+ f"FROM `content_agent_runs` r {where_sql} "
|
|
|
|
|
+ "ORDER BY r.started_at DESC, r.id DESC LIMIT %s OFFSET %s",
|
|
|
|
|
+ tuple([*params, page_size, (page - 1) * page_size]),
|
|
|
|
|
+ )
|
|
|
|
|
+ return {
|
|
|
|
|
+ "items": [_json_safe(_db_run_row_to_item(row)) for row in rows],
|
|
|
|
|
+ "page": page,
|
|
|
|
|
+ "page_size": page_size,
|
|
|
|
|
+ "total": int((count_row or {}).get("cnt") or 0),
|
|
|
|
|
+ "data_origin": self.data_origin,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def _db_run_item(self, run_id: str) -> dict[str, Any]:
|
|
|
|
|
+ row = self.runtime._fetch_one(
|
|
|
|
|
+ "SELECT r.run_id, "
|
|
|
|
|
+ "(SELECT p.policy_run_id FROM `content_agent_policy_runs` p "
|
|
|
|
|
+ "WHERE p.run_id = r.run_id ORDER BY p.id DESC LIMIT 1) AS policy_run_id, "
|
|
|
|
|
+ "r.status, r.current_step, r.platform, r.platform_mode, r.strategy_version, "
|
|
|
|
|
+ "r.validation_status, r.error_code, r.started_at, r.completed_at "
|
|
|
|
|
+ "FROM `content_agent_runs` r WHERE r.run_id = %s LIMIT 1",
|
|
|
|
|
+ (run_id,),
|
|
|
|
|
+ )
|
|
|
|
|
+ return _json_safe(_db_run_row_to_item(row or {"run_id": run_id, "status": "unknown"}))
|
|
|
|
|
+
|
|
|
|
|
+ def _local_run_list_item(self, run_id: str) -> dict[str, Any]:
|
|
|
|
|
+ final_output = self._read_json_optional(run_id, "final_output.json") or {}
|
|
|
|
|
+ run_events = self._read_jsonl_optional(run_id, "run_events.jsonl")
|
|
|
|
|
+ lifecycle = [
|
|
|
|
|
+ row for row in run_events if str(row.get("event_id", "")).startswith("lifecycle_")
|
|
|
|
|
+ ]
|
|
|
|
|
+ latest = lifecycle[-1] if lifecycle else {}
|
|
|
|
|
+ return _json_safe({
|
|
|
|
|
+ "run_id": run_id,
|
|
|
|
|
+ "policy_run_id": final_output.get("policy_run_id"),
|
|
|
|
|
+ "status": latest.get("status") or ("success" if final_output else "failed"),
|
|
|
|
|
+ "current_step": "review_strategy" if final_output else "unknown",
|
|
|
|
|
+ "platform": final_output.get("platform"),
|
|
|
|
|
+ "platform_mode": final_output.get("platform_mode"),
|
|
|
|
|
+ "strategy_version": final_output.get("strategy_version"),
|
|
|
|
|
+ "validation_status": final_output.get("validation_status"),
|
|
|
|
|
+ "error_code": latest.get("error_code"),
|
|
|
|
|
+ "started_at": latest.get("created_at"),
|
|
|
|
|
+ "completed_at": None,
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ def _read_json_optional(self, run_id: str, filename: str) -> dict[str, Any] | None:
|
|
|
|
|
+ try:
|
|
|
|
|
+ return self.runtime.read_json(run_id, filename)
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ if self.export_runtime:
|
|
|
|
|
+ try:
|
|
|
|
|
+ return self.export_runtime.read_json(run_id, filename)
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ return None
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ def _read_jsonl_optional(self, run_id: str, filename: str) -> list[dict[str, Any]]:
|
|
|
|
|
+ try:
|
|
|
|
|
+ return self.runtime.read_jsonl(run_id, filename)
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ if self.export_runtime:
|
|
|
|
|
+ try:
|
|
|
|
|
+ return self.export_runtime.read_jsonl(run_id, filename)
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ return []
|
|
|
|
|
+ return []
|
|
|
|
|
+
|
|
|
|
|
+ def _validate_optional(self, run_id: str) -> dict[str, Any]:
|
|
|
|
|
+ if isinstance(self.runtime, DatabaseRuntimeStore):
|
|
|
|
|
+ run_item = self._db_run_item(run_id)
|
|
|
|
|
+ status = run_item.get("validation_status")
|
|
|
|
|
+ return {
|
|
|
|
|
+ "run_id": run_id,
|
|
|
|
|
+ "status": status or "unknown",
|
|
|
|
|
+ "findings": [] if status == "pass" else [],
|
|
|
|
|
+ }
|
|
|
|
|
+ try:
|
|
|
|
|
+ return validate_run(run_id, self.runtime)
|
|
|
|
|
+ except Exception as exc:
|
|
|
|
|
+ return {
|
|
|
|
|
+ "run_id": run_id,
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "findings": [{"severity": "error", "message": str(exc)}],
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def _runtime_record_count(self, run_id: str, filename: str) -> int:
|
|
|
|
|
+ if isinstance(self.runtime, DatabaseRuntimeStore):
|
|
|
|
|
+ return self._db_runtime_file_counts(run_id).get(filename, 0)
|
|
|
|
|
+ if filename.endswith(".jsonl"):
|
|
|
|
|
+ return len(self._read_jsonl_optional(run_id, filename))
|
|
|
|
|
+ return 1 if self._read_json_optional(run_id, filename) is not None else 0
|
|
|
|
|
+
|
|
|
|
|
+ def _combined_origin(self, run_id: str) -> str:
|
|
|
|
|
+ if not isinstance(self.runtime, DatabaseRuntimeStore):
|
|
|
|
|
+ return DATA_ORIGIN_RUNTIME_EXPORT
|
|
|
|
|
+ return DATA_ORIGIN_PRODUCTION_DB
|
|
|
|
|
+
|
|
|
|
|
+ def _file_origin(self, run_id: str, filename: str) -> str:
|
|
|
|
|
+ if not isinstance(self.runtime, DatabaseRuntimeStore):
|
|
|
|
|
+ return DATA_ORIGIN_RUNTIME_EXPORT
|
|
|
|
|
+ primary_exists = self.runtime.file_status(run_id).get(filename, False)
|
|
|
|
|
+ export_exists = bool(self.export_runtime and self.export_runtime.file_status(run_id).get(filename))
|
|
|
|
|
+ if primary_exists:
|
|
|
|
|
+ return DATA_ORIGIN_PRODUCTION_DB
|
|
|
|
|
+ if export_exists:
|
|
|
|
|
+ return DATA_ORIGIN_RUNTIME_EXPORT
|
|
|
|
|
+ return self.data_origin
|
|
|
|
|
+
|
|
|
|
|
+ def _validate_runtime_filename(self, filename: str) -> None:
|
|
|
|
|
+ if (
|
|
|
|
|
+ filename not in RUNTIME_FILENAMES
|
|
|
|
|
+ or ".." in filename
|
|
|
|
|
+ or "/" in filename
|
|
|
|
|
+ or "\\" in filename
|
|
|
|
|
+ or Path(filename).is_absolute()
|
|
|
|
|
+ ):
|
|
|
|
|
+ raise ValueError("runtime filename is not allowed")
|
|
|
|
|
+
|
|
|
|
|
+ def _db_runtime_file_counts(self, run_id: str) -> dict[str, int]:
|
|
|
|
|
+ counts: dict[str, int] = {}
|
|
|
|
|
+ with self.runtime._connection_factory() as conn:
|
|
|
|
|
+ with conn.cursor() as cur:
|
|
|
|
|
+ for filename in RUNTIME_FILENAMES:
|
|
|
|
|
+ table = RUNTIME_FILE_TABLES.get(filename)
|
|
|
|
|
+ if not table:
|
|
|
|
|
+ counts[filename] = 0
|
|
|
|
|
+ continue
|
|
|
|
|
+ cur.execute(
|
|
|
|
|
+ f"SELECT COUNT(*) AS cnt FROM `{table}` WHERE `run_id` = %s",
|
|
|
|
|
+ (run_id,),
|
|
|
|
|
+ )
|
|
|
|
|
+ row = cur.fetchone() or {}
|
|
|
|
|
+ counts[filename] = int(row.get("cnt") or 0)
|
|
|
|
|
+ return counts
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _db_run_filters(filters: dict[str, Any]) -> tuple[str, list[Any]]:
|
|
|
|
|
+ allowed = {
|
|
|
|
|
+ "status": "status",
|
|
|
|
|
+ "platform": "platform",
|
|
|
|
|
+ "platform_mode": "platform_mode",
|
|
|
|
|
+ "strategy_version": "strategy_version",
|
|
|
|
|
+ "validation_status": "validation_status",
|
|
|
|
|
+ "error_code": "error_code",
|
|
|
|
|
+ }
|
|
|
|
|
+ clauses = []
|
|
|
|
|
+ params = []
|
|
|
|
|
+ for key, column in allowed.items():
|
|
|
|
|
+ value = filters.get(key)
|
|
|
|
|
+ if value:
|
|
|
|
|
+ clauses.append(f"r.`{column}` = %s")
|
|
|
|
|
+ params.append(value)
|
|
|
|
|
+ if not clauses:
|
|
|
|
|
+ return "", params
|
|
|
|
|
+ return "WHERE " + " AND ".join(clauses), params
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _db_run_row_to_item(row: dict[str, Any]) -> dict[str, Any]:
|
|
|
|
|
+ return {
|
|
|
|
|
+ "run_id": row.get("run_id"),
|
|
|
|
|
+ "policy_run_id": row.get("policy_run_id"),
|
|
|
|
|
+ "status": row.get("status"),
|
|
|
|
|
+ "current_step": row.get("current_step"),
|
|
|
|
|
+ "platform": row.get("platform"),
|
|
|
|
|
+ "platform_mode": row.get("platform_mode"),
|
|
|
|
|
+ "strategy_version": row.get("strategy_version"),
|
|
|
|
|
+ "validation_status": row.get("validation_status"),
|
|
|
|
|
+ "error_code": row.get("error_code"),
|
|
|
|
|
+ "started_at": row.get("started_at"),
|
|
|
|
|
+ "completed_at": row.get("completed_at"),
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _matches(item: dict[str, Any], key: str, expected: str | None) -> bool:
|
|
|
|
|
+ return not expected or item.get(key) == expected
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _json_safe(value: Any) -> Any:
|
|
|
|
|
+ if isinstance(value, dict):
|
|
|
|
|
+ return {key: _json_safe(item) for key, item in value.items()}
|
|
|
|
|
+ if isinstance(value, list):
|
|
|
|
|
+ return [_json_safe(item) for item in value]
|
|
|
|
|
+ if isinstance(value, (datetime, date)):
|
|
|
|
|
+ return value.isoformat()
|
|
|
|
|
+ return value
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _business_summary(
|
|
|
|
|
+ run_item: dict[str, Any],
|
|
|
|
|
+ counts: dict[str, int],
|
|
|
|
|
+ final_output: dict[str, Any],
|
|
|
|
|
+ primary_failure_reason: dict[str, Any] | None,
|
|
|
|
|
+) -> dict[str, Any]:
|
|
|
|
|
+ summary = final_output.get("summary") or {}
|
|
|
|
|
+ status = run_item.get("status") or "unknown"
|
|
|
|
|
+ if primary_failure_reason:
|
|
|
|
|
+ headline = f"本次运行停在{primary_failure_reason.get('stage_label')}:{primary_failure_reason.get('reason_label')}"
|
|
|
|
|
+ elif status == "success":
|
|
|
|
|
+ headline = "本次运行已完成,可查看内容判断和资产沉淀结果"
|
|
|
|
|
+ elif status == "partial_success":
|
|
|
|
|
+ headline = "本次运行部分成功,有 query 或平台请求失败"
|
|
|
|
|
+ elif status == "running":
|
|
|
|
|
+ headline = "本次运行仍在进行中"
|
|
|
|
|
+ else:
|
|
|
|
|
+ headline = "本次运行未形成完整成功链路"
|
|
|
|
|
+ return {
|
|
|
|
|
+ "headline": headline,
|
|
|
|
|
+ "status": status,
|
|
|
|
|
+ "source_label": _source_label(run_item),
|
|
|
|
|
+ "query_count": counts.get("queries", 0),
|
|
|
|
|
+ "content_count": counts.get("discovered_content_items", 0),
|
|
|
|
|
+ "kept_count": int(summary.get("pooled_content_count") or 0),
|
|
|
|
|
+ "review_count": int(summary.get("review_content_count") or 0),
|
|
|
|
|
+ "rejected_count": int(summary.get("rejected_content_count") or 0),
|
|
|
|
|
+ "asset_count": int(summary.get("author_asset_count") or 0),
|
|
|
|
|
+ "primary_failure_reason": primary_failure_reason,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _stage_conclusions(
|
|
|
|
|
+ files: dict[str, bool],
|
|
|
|
|
+ counts: dict[str, int],
|
|
|
|
|
+ run_item: dict[str, Any],
|
|
|
|
|
+ queries: list[dict[str, Any]],
|
|
|
|
|
+ run_events: list[dict[str, Any]],
|
|
|
|
|
+ decisions: list[dict[str, Any]],
|
|
|
|
|
+ walk_actions: list[dict[str, Any]],
|
|
|
|
|
+ final_output: dict[str, Any],
|
|
|
|
|
+ strategy_review: dict[str, Any],
|
|
|
|
|
+ source_context: dict[str, Any],
|
|
|
|
|
+) -> list[dict[str, Any]]:
|
|
|
|
|
+ query_failures = [
|
|
|
|
|
+ event for event in run_events
|
|
|
|
|
+ if event.get("event_type") == "platform_query_failed"
|
|
|
|
|
+ or event.get("status") == "failed" and event.get("search_query_id")
|
|
|
|
|
+ ]
|
|
|
|
|
+ decision_counts = _effect_status_counts(decisions)
|
|
|
|
|
+ walk_counts = _walk_status_counts(walk_actions)
|
|
|
|
|
+ final_summary = final_output.get("summary") or {}
|
|
|
|
|
+ return [
|
|
|
|
|
+ {
|
|
|
|
|
+ "stage_id": "source",
|
|
|
|
|
+ "label": "数据源",
|
|
|
|
|
+ "status": "success" if files.get("source_context.json") else "failed",
|
|
|
|
|
+ "headline": "真实需求已读取" if files.get("source_context.json") else "需求来源缺失",
|
|
|
|
|
+ "detail": _source_stage_detail(source_context)
|
|
|
|
|
+ if files.get("source_context.json")
|
|
|
|
|
+ else "未找到 source_context,无法解释输入来源",
|
|
|
|
|
+ "metric": "已就绪" if files.get("pattern_seed_pack.json") else "种子缺失",
|
|
|
|
|
+ },
|
|
|
|
|
+ {
|
|
|
|
|
+ "stage_id": "query",
|
|
|
|
|
+ "label": "Query",
|
|
|
|
|
+ "status": "success" if counts.get("queries") else "failed",
|
|
|
|
|
+ "headline": f"生成 {counts.get('queries', 0)} 条搜索意图",
|
|
|
|
|
+ "detail": f"{len(query_failures)} 条 query 有失败记录,其余进入平台搜索或后续链路",
|
|
|
|
|
+ "metric": _query_generation_label(queries, run_events),
|
|
|
|
|
+ },
|
|
|
|
|
+ {
|
|
|
|
|
+ "stage_id": "platform",
|
|
|
|
|
+ "label": "平台 / 内容",
|
|
|
|
|
+ "status": "success" if counts.get("discovered_content_items") else "failed",
|
|
|
|
|
+ "headline": f"发现 {counts.get('discovered_content_items', 0)} 条内容",
|
|
|
|
|
+ "detail": "平台结果已进入内容发现"
|
|
|
|
|
+ if counts.get("discovered_content_items")
|
|
|
|
|
+ else "未形成发现内容,通常需要先查看平台请求或 query 失败原因",
|
|
|
|
|
+ "metric": f"{len(query_failures)} 条平台失败",
|
|
|
|
|
+ },
|
|
|
|
|
+ {
|
|
|
|
|
+ "stage_id": "judge",
|
|
|
|
|
+ "label": "判断",
|
|
|
|
|
+ "status": "success" if counts.get("rule_decisions") else "pending",
|
|
|
|
|
+ "headline": f"{counts.get('rule_decisions', 0)} 条内容完成判断",
|
|
|
|
|
+ "detail": _decision_status_label(decision_counts),
|
|
|
|
|
+ "metric": f"入池 {decision_counts.get('success', 0)} / 阻断 {decision_counts.get('rule_blocked', 0)}",
|
|
|
|
|
+ },
|
|
|
|
|
+ {
|
|
|
|
|
+ "stage_id": "walk",
|
|
|
|
|
+ "label": "游走",
|
|
|
|
|
+ "status": "success" if counts.get("walk_actions") else "pending",
|
|
|
|
|
+ "headline": f"触发 {counts.get('walk_actions', 0)} 个游走动作",
|
|
|
|
|
+ "detail": _walk_status_label(walk_counts),
|
|
|
|
|
+ "metric": f"成功 {walk_counts.get('success', 0)} / 失败 {walk_counts.get('failed', 0)}",
|
|
|
|
|
+ },
|
|
|
|
|
+ {
|
|
|
|
|
+ "stage_id": "asset",
|
|
|
|
|
+ "label": "资产",
|
|
|
|
|
+ "status": "success" if files.get("final_output.json") else "pending",
|
|
|
|
|
+ "headline": f"沉淀 {final_summary.get('pooled_content_count', 0) or 0} 条内容资产",
|
|
|
|
|
+ "detail": f"待复看 {final_summary.get('review_content_count', 0) or 0},淘汰 {final_summary.get('rejected_content_count', 0) or 0}",
|
|
|
|
|
+ "metric": f"作者资产 {final_summary.get('author_asset_count', 0) or 0}",
|
|
|
|
|
+ },
|
|
|
|
|
+ {
|
|
|
|
|
+ "stage_id": "learning",
|
|
|
|
|
+ "label": "学习",
|
|
|
|
|
+ "status": "success" if strategy_review.get("review_status") == "generated" else "pending",
|
|
|
|
|
+ "headline": "策略复盘已生成"
|
|
|
|
|
+ if strategy_review.get("review_status") == "generated"
|
|
|
|
|
+ else "策略复盘未生成",
|
|
|
|
|
+ "detail": "可查看 query / rule / walk 的优化建议"
|
|
|
|
|
+ if strategy_review.get("review_status") == "generated"
|
|
|
|
|
+ else "当前 run 还没有可展示的策略学习建议",
|
|
|
|
|
+ "metric": strategy_review.get("review_status", "not_generated"),
|
|
|
|
|
+ },
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _rule_application_summary(
|
|
|
|
|
+ decisions: list[dict[str, Any]],
|
|
|
|
|
+ content_items: list[dict[str, Any]],
|
|
|
|
|
+) -> list[dict[str, Any]]:
|
|
|
|
|
+ content_by_id = {
|
|
|
|
|
+ item.get("content_discovery_id") or item.get("platform_content_id"): item
|
|
|
|
|
+ for item in content_items
|
|
|
|
|
+ }
|
|
|
|
|
+ summaries = []
|
|
|
|
|
+ for decision in decisions:
|
|
|
|
|
+ replay = decision.get("decision_replay_data") or {}
|
|
|
|
|
+ target_id = decision.get("decision_target_id")
|
|
|
|
|
+ content = content_by_id.get(target_id) or {}
|
|
|
|
|
+ triggered_rules = decision.get("triggered_blocking_rules") or []
|
|
|
|
|
+ scorecard = decision.get("scorecard") or {}
|
|
|
|
|
+ summaries.append({
|
|
|
|
|
+ "decision_id": decision.get("decision_id"),
|
|
|
|
|
+ "content_title": content.get("title") or decision.get("decision_target_id"),
|
|
|
|
|
+ "platform_content_id": content.get("platform_content_id") or decision.get("decision_target_id"),
|
|
|
|
|
+ "rule_pack": decision.get("rule_pack_id")
|
|
|
|
|
+ or replay.get("rule_pack_id")
|
|
|
|
|
+ or "Content Rule Pack V1",
|
|
|
|
|
+ "hard_gate_status": "没通过" if triggered_rules else "通过",
|
|
|
|
|
+ "score": decision.get("score") or scorecard.get("total_score"),
|
|
|
|
|
+ "decision_action": decision.get("decision_action"),
|
|
|
|
|
+ "decision_reason_code": decision.get("decision_reason_code"),
|
|
|
|
|
+ "content_effect_status": decision.get("content_effect_status"),
|
|
|
|
|
+ "primary_reason": _reason_label(decision.get("decision_reason_code")),
|
|
|
|
|
+ "technical_ref": {
|
|
|
|
|
+ "decision_id": decision.get("decision_id"),
|
|
|
|
|
+ "target_id": decision.get("decision_target_id"),
|
|
|
|
|
+ "has_replay_data": bool(replay),
|
|
|
|
|
+ },
|
|
|
|
|
+ })
|
|
|
|
|
+ return summaries
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _walk_graph(
|
|
|
|
|
+ queries: list[dict[str, Any]],
|
|
|
|
|
+ content_items: list[dict[str, Any]],
|
|
|
|
|
+ walk_actions: list[dict[str, Any]],
|
|
|
|
|
+ source_paths: list[dict[str, Any]],
|
|
|
|
|
+) -> dict[str, Any]:
|
|
|
|
|
+ nodes: dict[str, dict[str, Any]] = {
|
|
|
|
|
+ "source": {
|
|
|
|
|
+ "id": "source",
|
|
|
|
|
+ "type": "source",
|
|
|
|
|
+ "label": "需求",
|
|
|
|
|
+ "status": "success",
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ edges: list[dict[str, Any]] = []
|
|
|
|
|
+ for query in queries[:12]:
|
|
|
|
|
+ node_id = f"query:{query.get('search_query_id')}"
|
|
|
|
|
+ nodes[node_id] = {
|
|
|
|
|
+ "id": node_id,
|
|
|
|
|
+ "type": "query",
|
|
|
|
|
+ "label": query.get("search_query") or query.get("search_query_id"),
|
|
|
|
|
+ "status": query.get("search_query_effect_status") or "success",
|
|
|
|
|
+ }
|
|
|
|
|
+ edges.append({
|
|
|
|
|
+ "id": f"source->{node_id}",
|
|
|
|
|
+ "source": "source",
|
|
|
|
|
+ "target": node_id,
|
|
|
|
|
+ "label": query.get("search_query_generation_method") or "query",
|
|
|
|
|
+ "status": "success",
|
|
|
|
|
+ "rule_pack": None,
|
|
|
|
|
+ })
|
|
|
|
|
+ for content in content_items[:20]:
|
|
|
|
|
+ node_id = f"content:{content.get('platform_content_id') or content.get('content_discovery_id')}"
|
|
|
|
|
+ query_id = content.get("search_query_id")
|
|
|
|
|
+ source_id = f"query:{query_id}" if query_id else "source"
|
|
|
|
|
+ nodes[node_id] = {
|
|
|
|
|
+ "id": node_id,
|
|
|
|
|
+ "type": "content",
|
|
|
|
|
+ "label": content.get("title") or content.get("platform_content_id") or "内容",
|
|
|
|
|
+ "status": (content.get("pattern_match_result") or {}).get("recall_status") or "pending",
|
|
|
|
|
+ }
|
|
|
|
|
+ edges.append({
|
|
|
|
|
+ "id": f"{source_id}->{node_id}",
|
|
|
|
|
+ "source": source_id if source_id in nodes else "source",
|
|
|
|
|
+ "target": node_id,
|
|
|
|
|
+ "label": "搜索命中",
|
|
|
|
|
+ "status": "success",
|
|
|
|
|
+ "rule_pack": "Content Rule Pack V1",
|
|
|
|
|
+ })
|
|
|
|
|
+ for action in walk_actions:
|
|
|
|
|
+ source_id = _walk_node_id(action.get("from_node_type"), action.get("from_node_id"))
|
|
|
|
|
+ target_id = _walk_node_id(action.get("to_node_type"), action.get("to_node_id"))
|
|
|
|
|
+ nodes.setdefault(source_id, {
|
|
|
|
|
+ "id": source_id,
|
|
|
|
|
+ "type": action.get("from_node_type") or "node",
|
|
|
|
|
+ "label": action.get("from_node_id") or "起点",
|
|
|
|
|
+ "status": "success",
|
|
|
|
|
+ })
|
|
|
|
|
+ nodes.setdefault(target_id, {
|
|
|
|
|
+ "id": target_id,
|
|
|
|
|
+ "type": action.get("to_node_type") or "node",
|
|
|
|
|
+ "label": action.get("to_node_id") or action.get("edge_type") or "下一跳",
|
|
|
|
|
+ "status": action.get("walk_status") or "pending",
|
|
|
|
|
+ })
|
|
|
|
|
+ edges.append({
|
|
|
|
|
+ "id": action.get("walk_action_id") or f"{source_id}->{target_id}",
|
|
|
|
|
+ "source": source_id,
|
|
|
|
|
+ "target": target_id,
|
|
|
|
|
+ "label": action.get("edge_id") or action.get("edge_type") or action.get("walk_action"),
|
|
|
|
|
+ "status": action.get("walk_status") or "pending",
|
|
|
|
|
+ "rule_pack": action.get("rule_pack_id"),
|
|
|
|
|
+ "budget_tier": action.get("budget_tier"),
|
|
|
|
|
+ "reason_code": action.get("reason_code"),
|
|
|
|
|
+ })
|
|
|
|
|
+ return {
|
|
|
|
|
+ "nodes": list(nodes.values()),
|
|
|
|
|
+ "edges": edges,
|
|
|
|
|
+ "source_path_count": len(source_paths),
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _primary_failure_reason(
|
|
|
|
|
+ run_item: dict[str, Any],
|
|
|
|
|
+ run_events: list[dict[str, Any]],
|
|
|
|
|
+) -> dict[str, Any] | None:
|
|
|
|
|
+ error_code = run_item.get("error_code")
|
|
|
|
|
+ if not error_code and run_item.get("status") not in {"failed", "partial_success"}:
|
|
|
|
|
+ return None
|
|
|
|
|
+ failed_events = [
|
|
|
|
|
+ event for event in run_events
|
|
|
|
|
+ if event.get("status") == "failed" or event.get("error_code")
|
|
|
|
|
+ ]
|
|
|
|
|
+ event = failed_events[-1] if failed_events else {}
|
|
|
|
|
+ code = error_code or event.get("error_code") or "UNKNOWN"
|
|
|
|
|
+ return {
|
|
|
|
|
+ "reason_code": code,
|
|
|
|
|
+ "reason_label": _reason_label(code),
|
|
|
|
|
+ "stage": event.get("stage") or _stage_from_error_code(code),
|
|
|
|
|
+ "stage_label": _stage_label(event.get("stage") or _stage_from_error_code(code)),
|
|
|
|
|
+ "message": event.get("message") or run_item.get("error_message") or code,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _source_label(run_item: dict[str, Any]) -> str:
|
|
|
|
|
+ demand_id = run_item.get("demand_content_id")
|
|
|
|
|
+ if demand_id:
|
|
|
|
|
+ return f"真实需求 #{demand_id}"
|
|
|
|
|
+ return "真实需求池 / 默认样例"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _source_stage_detail(source_context: dict[str, Any]) -> str:
|
|
|
|
|
+ demand_id = source_context.get("demand_content_id") or source_context.get("id")
|
|
|
|
|
+ if demand_id:
|
|
|
|
|
+ return f"需求池 ID:{demand_id}"
|
|
|
|
|
+ raw_demand = source_context.get("raw_demand_content")
|
|
|
|
|
+ if isinstance(raw_demand, dict) and raw_demand.get("id"):
|
|
|
|
|
+ return f"需求池 ID:{raw_demand.get('id')}"
|
|
|
|
|
+ return "需求池 ID:缺失"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _query_generation_label(queries: list[dict[str, Any]], run_events: list[dict[str, Any]]) -> str:
|
|
|
|
|
+ generated_count = len(queries)
|
|
|
|
|
+ generation_failed = any(
|
|
|
|
|
+ event.get("error_code") == "QUERY_GENERATION_FAILED"
|
|
|
|
|
+ or (event.get("event_type") == "query_generation_failed")
|
|
|
|
|
+ for event in run_events
|
|
|
|
|
+ )
|
|
|
|
|
+ if generated_count and generation_failed:
|
|
|
|
|
+ return f"{generated_count} 条生成成功 / 生成失败"
|
|
|
|
|
+ if generated_count:
|
|
|
|
|
+ return f"{generated_count} 条生成成功"
|
|
|
|
|
+ if generation_failed:
|
|
|
|
|
+ return "生成失败"
|
|
|
|
|
+ return "0 条生成成功"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _effect_status_counts(decisions: list[dict[str, Any]]) -> dict[str, int]:
|
|
|
|
|
+ counts: dict[str, int] = {}
|
|
|
|
|
+ for decision in decisions:
|
|
|
|
|
+ status = str(decision.get("content_effect_status") or "unknown")
|
|
|
|
|
+ counts[status] = counts.get(status, 0) + 1
|
|
|
|
|
+ return counts
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _walk_status_counts(walk_actions: list[dict[str, Any]]) -> dict[str, int]:
|
|
|
|
|
+ counts: dict[str, int] = {}
|
|
|
|
|
+ for action in walk_actions:
|
|
|
|
|
+ status = str(action.get("walk_status") or "unknown")
|
|
|
|
|
+ counts[status] = counts.get(status, 0) + 1
|
|
|
|
|
+ return counts
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _decision_status_label(counts: dict[str, int]) -> str:
|
|
|
|
|
+ if not counts:
|
|
|
|
|
+ return "当前没有内容进入规则判断"
|
|
|
|
|
+ return ",".join(f"{_reason_label(key)} {value}" for key, value in counts.items())
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _walk_status_label(counts: dict[str, int]) -> str:
|
|
|
|
|
+ if not counts:
|
|
|
|
|
+ return "当前没有触发游走动作"
|
|
|
|
|
+ return ",".join(f"{_reason_label(key)} {value}" for key, value in counts.items())
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _reason_label(code: Any) -> str:
|
|
|
|
|
+ labels = {
|
|
|
|
|
+ "success": "成功",
|
|
|
|
|
+ "pending": "待复看",
|
|
|
|
|
+ "failed": "失败",
|
|
|
|
|
+ "rule_blocked": "规则阻断",
|
|
|
|
|
+ "PLATFORM_REQUEST_FAILED": "平台请求失败",
|
|
|
|
|
+ "QUERY_GENERATION_FAILED": "Query 生成失败",
|
|
|
|
|
+ "INVALID_SOURCE": "需求来源无效",
|
|
|
|
|
+ "missing_score": "分数缺失",
|
|
|
|
|
+ "pattern_recall_failed": "Pattern 回扣失败",
|
|
|
|
|
+ "missing_source_evidence": "来源证据缺失",
|
|
|
|
|
+ "missing_content_portrait": "画像缺失",
|
|
|
|
|
+ "high_risk_content": "高风险内容",
|
|
|
|
|
+ }
|
|
|
|
|
+ value = str(code or "unknown")
|
|
|
|
|
+ return labels.get(value, value)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _stage_from_error_code(code: Any) -> str:
|
|
|
|
|
+ value = str(code or "")
|
|
|
|
|
+ if "QUERY" in value:
|
|
|
|
|
+ return "query"
|
|
|
|
|
+ if "PLATFORM" in value:
|
|
|
|
|
+ return "platform"
|
|
|
|
|
+ if "SOURCE" in value:
|
|
|
|
|
+ return "source"
|
|
|
|
|
+ return "run"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _stage_label(stage: Any) -> str:
|
|
|
|
|
+ labels = {
|
|
|
|
|
+ "source": "数据源阶段",
|
|
|
|
|
+ "query": "Query 阶段",
|
|
|
|
|
+ "platform": "平台搜索阶段",
|
|
|
|
|
+ "judge": "规则判断阶段",
|
|
|
|
|
+ "walk": "游走阶段",
|
|
|
|
|
+ "run": "运行阶段",
|
|
|
|
|
+ }
|
|
|
|
|
+ return labels.get(str(stage or "run"), str(stage or "运行阶段"))
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _walk_node_id(node_type: Any, node_id: Any) -> str:
|
|
|
|
|
+ return f"{node_type or 'node'}:{node_id or 'unknown'}"
|