| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043 |
- from __future__ import annotations
- from datetime import date, datetime
- from pathlib import Path
- from typing import Any
- from content_agent.business_modules.run_record import validate_run
- from content_agent.integrations.composite_runtime import CompositeRuntimeStore
- from content_agent.integrations.database_runtime import DatabaseRuntimeStore
- from content_agent.integrations.database_runtime import RUNTIME_FILE_TABLES
- from content_agent.integrations.runtime_files import RUNTIME_FILENAMES
- from content_agent.interfaces import RuntimeStore
- DATA_ORIGIN_PRODUCTION_DB = "production_db"
- DATA_ORIGIN_RUNTIME_EXPORT = "runtime_export"
- DATA_ORIGIN_MIXED = "mixed_with_runtime_export"
- class DashboardService:
- def __init__(
- self,
- runtime: RuntimeStore,
- export_runtime: RuntimeStore | None = None,
- ) -> None:
- self.runtime = runtime
- self.export_runtime = export_runtime
- @classmethod
- def from_runtime(cls, runtime: RuntimeStore) -> "DashboardService":
- if isinstance(runtime, CompositeRuntimeStore):
- return cls(runtime.primary, runtime.export)
- return cls(runtime)
- @property
- def data_origin(self) -> str:
- if isinstance(self.runtime, DatabaseRuntimeStore):
- return DATA_ORIGIN_PRODUCTION_DB
- return DATA_ORIGIN_RUNTIME_EXPORT
- def run_exists(self, run_id: str) -> bool:
- if isinstance(self.runtime, DatabaseRuntimeStore):
- row = self.runtime._fetch_one(
- "SELECT COUNT(*) AS cnt FROM `content_agent_runs` WHERE `run_id` = %s",
- (run_id,),
- )
- return bool(row and row.get("cnt", 0))
- try:
- return self.runtime.run_dir(run_id).exists()
- except Exception:
- return any(self.runtime.file_status(run_id).values())
- def list_runs(
- self,
- *,
- status: str | None = None,
- platform: str | None = None,
- platform_mode: str | None = None,
- strategy_version: str | None = None,
- validation_status: str | None = None,
- error_code: str | None = None,
- page: int = 1,
- page_size: int = 20,
- ) -> dict[str, Any]:
- page = max(page, 1)
- page_size = min(max(page_size, 1), 100)
- if isinstance(self.runtime, DatabaseRuntimeStore):
- return self._list_db_runs(
- status=status,
- platform=platform,
- platform_mode=platform_mode,
- strategy_version=strategy_version,
- validation_status=validation_status,
- error_code=error_code,
- page=page,
- page_size=page_size,
- )
- items = [
- self._local_run_list_item(run_id)
- for run_id in getattr(self.runtime, "list_runs", lambda: [])()
- ]
- items = [
- item
- for item in items
- if _matches(item, "status", status)
- and _matches(item, "platform", platform)
- and _matches(item, "platform_mode", platform_mode)
- and _matches(item, "strategy_version", strategy_version)
- and _matches(item, "validation_status", validation_status)
- and _matches(item, "error_code", error_code)
- ]
- total = len(items)
- offset = (page - 1) * page_size
- return {
- "items": items[offset:offset + page_size],
- "page": page,
- "page_size": page_size,
- "total": total,
- "data_origin": self.data_origin,
- }
- def dashboard(self, run_id: str) -> dict[str, Any]:
- runtime_files_response = self.runtime_files(run_id)
- runtime_files = runtime_files_response["files"]
- file_status = {item["filename"]: item["exists"] for item in runtime_files}
- record_counts = {item["filename"]: item["record_count"] for item in runtime_files}
- final_output = self._read_json_optional(run_id, "final_output.json")
- strategy_review = self._read_json_optional(run_id, "strategy_review.json")
- source_context = self._read_json_optional(run_id, "source_context.json") or {}
- validation = self._validate_optional(run_id)
- run_item = self._db_run_item(run_id) if isinstance(self.runtime, DatabaseRuntimeStore) else self._local_run_list_item(run_id)
- queries = self._read_jsonl_optional(run_id, "search_queries.jsonl")
- run_events = self._read_jsonl_optional(run_id, "run_events.jsonl")
- content_items = self._read_jsonl_optional(run_id, "discovered_content_items.jsonl")
- decisions = self._read_jsonl_optional(run_id, "rule_decisions.jsonl")
- walk_actions = self._read_jsonl_optional(run_id, "walk_actions.jsonl")
- source_paths = self._read_jsonl_optional(run_id, "source_path_records.jsonl")
- counts = {
- "queries": record_counts.get("search_queries.jsonl", 0),
- "discovered_content_items": record_counts.get("discovered_content_items.jsonl", 0),
- "rule_decisions": record_counts.get("rule_decisions.jsonl", 0),
- "walk_actions": record_counts.get("walk_actions.jsonl", 0),
- "source_path_records": record_counts.get("source_path_records.jsonl", 0),
- "run_events": record_counts.get("run_events.jsonl", 0),
- }
- primary_failure_reason = _primary_failure_reason(run_item, run_events)
- return _json_safe({
- "run_id": run_id,
- "summary": run_item,
- "counts": counts,
- "files": file_status,
- "runtime_files": runtime_files,
- "validation": validation,
- "final_output_summary": (final_output or {}).get("summary", {}),
- "strategy_review_status": (strategy_review or {}).get(
- "review_status",
- "not_generated",
- ),
- "business_summary": _business_summary(
- run_item,
- counts,
- final_output or {},
- primary_failure_reason,
- ),
- "stage_conclusions": _stage_conclusions(
- file_status,
- counts,
- run_item,
- queries,
- run_events,
- decisions,
- walk_actions,
- final_output or {},
- strategy_review or {},
- source_context,
- ),
- "rule_application_summary": _rule_application_summary(decisions, content_items),
- "walk_graph": _walk_graph(queries, content_items, walk_actions, source_paths),
- "primary_failure_reason": primary_failure_reason,
- "technical_refs": {
- "runtime_files_url": f"/runs/{run_id}/runtime-files",
- "validation_url": f"/runs/{run_id}/validation",
- "data_origin": runtime_files_response["data_origin"],
- "runtime_file_count": len(runtime_files),
- },
- "data_origin": runtime_files_response["data_origin"],
- "links": {
- "queries": f"/runs/{run_id}/queries",
- "timeline": f"/runs/{run_id}/timeline",
- "content_items": f"/runs/{run_id}/content-items",
- "runtime_files": f"/runs/{run_id}/runtime-files",
- },
- })
- def runtime_files(self, run_id: str) -> dict[str, Any]:
- if isinstance(self.runtime, DatabaseRuntimeStore):
- counts = self._db_runtime_file_counts(run_id)
- files = [
- {
- "filename": filename,
- "exists": counts.get(filename, 0) > 0,
- "record_count": counts.get(filename, 0),
- "file_type": "jsonl" if filename.endswith(".jsonl") else "json",
- "contract_status": "current",
- "data_origin": DATA_ORIGIN_PRODUCTION_DB,
- }
- for filename in RUNTIME_FILENAMES
- ]
- return {
- "run_id": run_id,
- "files": files,
- "data_origin": DATA_ORIGIN_PRODUCTION_DB,
- }
- status = self.runtime.file_status(run_id)
- files = []
- for filename in RUNTIME_FILENAMES:
- exists = bool(status.get(filename))
- files.append({
- "filename": filename,
- "exists": exists,
- "record_count": self._runtime_record_count(run_id, filename) if exists else 0,
- "file_type": "jsonl" if filename.endswith(".jsonl") else "json",
- "contract_status": "current",
- "data_origin": self._file_origin(run_id, filename),
- })
- return {
- "run_id": run_id,
- "files": files,
- "data_origin": self._combined_origin(run_id),
- }
- def runtime_file(
- self,
- run_id: str,
- filename: str,
- *,
- limit: int = 100,
- offset: int = 0,
- ) -> dict[str, Any]:
- self._validate_runtime_filename(filename)
- limit = min(max(limit, 1), 500)
- offset = max(offset, 0)
- if filename.endswith(".jsonl"):
- records = self._read_jsonl_optional(run_id, filename)
- return {
- "run_id": run_id,
- "filename": filename,
- "records": _json_safe(records[offset:offset + limit]),
- "offset": offset,
- "limit": limit,
- "total": len(records),
- "data_origin": self._file_origin(run_id, filename),
- }
- data = self._read_json_optional(run_id, filename) or {}
- return {
- "run_id": run_id,
- "filename": filename,
- "data": _json_safe(data),
- "data_origin": self._file_origin(run_id, filename),
- }
- def queries(self, run_id: str) -> dict[str, Any]:
- queries = self._read_jsonl_optional(run_id, "search_queries.jsonl")
- clues_by_query = {
- row.get("search_query_id"): row
- for row in self._read_jsonl_optional(run_id, "search_clues.jsonl")
- }
- decisions = self._read_jsonl_optional(run_id, "rule_decisions.jsonl")
- decision_counts: dict[str, dict[str, int]] = {}
- for decision in decisions:
- query_id = decision.get("search_query_id")
- if not query_id:
- continue
- counts = decision_counts.setdefault(query_id, {})
- action = decision.get("decision_action") or "unknown"
- counts[action] = counts.get(action, 0) + 1
- items = []
- for query in queries:
- query_id = query.get("search_query_id")
- clue = clues_by_query.get(query_id) or {}
- items.append(_json_safe({
- **query,
- "search_clue": clue,
- "decision_action_counts": decision_counts.get(query_id, {}),
- "search_query_effect_status": clue.get(
- "search_query_effect_status",
- query.get("search_query_effect_status"),
- ),
- "walk_next_step": clue.get("walk_next_step"),
- "failure_reason": clue.get("failure_reason")
- or (clue.get("raw_payload") or {}).get("failure_reason"),
- }))
- return {
- "run_id": run_id,
- "items": items,
- "total": len(items),
- "data_origin": self._combined_origin(run_id),
- }
- def timeline(self, run_id: str) -> dict[str, Any]:
- run_event_rows = self._read_jsonl_optional(run_id, "run_events.jsonl")
- walk_action_rows = self._read_jsonl_optional(run_id, "walk_actions.jsonl")
- source_path_rows = self._read_jsonl_optional(run_id, "source_path_records.jsonl")
- events = [
- {
- "source": "run_events.jsonl",
- "stage": row.get("stage") or row.get("event_type"),
- "event_type": row.get("event_type"),
- "status": row.get("status"),
- "timestamp": row.get("created_at") or row.get("timestamp"),
- "error_code": row.get("error_code"),
- "walk_action_id": (row.get("raw_payload") or {}).get("walk_action_id"),
- "record": row,
- }
- for row in run_event_rows
- ]
- events.extend(
- {
- "source": "walk_actions.jsonl",
- "stage": "walk",
- "event_type": row.get("edge_type"),
- "status": row.get("walk_status"),
- "timestamp": row.get("created_at"),
- "error_code": None,
- "walk_action_id": row.get("walk_action_id"),
- "record": row,
- }
- for row in walk_action_rows
- )
- events.extend(
- {
- "source": "source_path_records.jsonl",
- "stage": "source_path",
- "event_type": row.get("source_path_type"),
- "status": row.get("status"),
- "timestamp": row.get("created_at"),
- "error_code": None,
- "walk_action_id": row.get("walk_action_id")
- or (row.get("raw_payload") or {}).get("walk_action_id"),
- "record": row,
- }
- for row in source_path_rows
- )
- events.sort(key=lambda item: str(item.get("timestamp") or ""))
- return {
- "run_id": run_id,
- "items": _json_safe(events),
- "total": len(events),
- "data_origin": self._combined_origin(run_id),
- "summary": _timeline_summary(
- run_event_rows, walk_action_rows, source_path_rows
- ),
- }
- def content_items(self, run_id: str) -> dict[str, Any]:
- media_by_platform_id = {
- row.get("platform_content_id"): row
- for row in self._read_jsonl_optional(run_id, "content_media_records.jsonl")
- }
- recall_by_platform_id = {
- row.get("platform_content_id"): row
- for row in self._read_jsonl_optional(run_id, "pattern_recall_evidence.jsonl")
- }
- decisions_by_target = {
- row.get("decision_target_id"): row
- for row in self._read_jsonl_optional(run_id, "rule_decisions.jsonl")
- }
- items = []
- for content in self._read_jsonl_optional(run_id, "discovered_content_items.jsonl"):
- platform_content_id = content.get("platform_content_id")
- discovery_id = content.get("content_discovery_id")
- items.append(_json_safe({
- **content,
- "media_record": media_by_platform_id.get(platform_content_id),
- "pattern_recall_evidence": recall_by_platform_id.get(platform_content_id),
- "rule_decision": decisions_by_target.get(discovery_id)
- or decisions_by_target.get(platform_content_id),
- }))
- return {
- "run_id": run_id,
- "items": items,
- "total": len(items),
- "data_origin": self._combined_origin(run_id),
- }
- def _list_db_runs(self, **filters: Any) -> dict[str, Any]:
- page = filters.pop("page")
- page_size = filters.pop("page_size")
- where_sql, params = _db_run_filters(filters)
- count_row = self.runtime._fetch_one(
- f"SELECT COUNT(*) AS cnt FROM `content_agent_runs` r {where_sql}",
- tuple(params),
- )
- rows = self.runtime._fetch_all(
- "SELECT r.run_id, "
- "(SELECT p.policy_run_id FROM `content_agent_policy_runs` p "
- "WHERE p.run_id = r.run_id ORDER BY p.id DESC LIMIT 1) AS policy_run_id, "
- "r.status, r.current_step, r.platform, r.platform_mode, r.strategy_version, "
- "r.validation_status, r.error_code, r.started_at, r.completed_at "
- f"FROM `content_agent_runs` r {where_sql} "
- "ORDER BY r.started_at DESC, r.id DESC LIMIT %s OFFSET %s",
- tuple([*params, page_size, (page - 1) * page_size]),
- )
- return {
- "items": [_json_safe(_db_run_row_to_item(row)) for row in rows],
- "page": page,
- "page_size": page_size,
- "total": int((count_row or {}).get("cnt") or 0),
- "data_origin": self.data_origin,
- }
- def _db_run_item(self, run_id: str) -> dict[str, Any]:
- row = self.runtime._fetch_one(
- "SELECT r.run_id, "
- "(SELECT p.policy_run_id FROM `content_agent_policy_runs` p "
- "WHERE p.run_id = r.run_id ORDER BY p.id DESC LIMIT 1) AS policy_run_id, "
- "r.status, r.current_step, r.platform, r.platform_mode, r.strategy_version, "
- "r.validation_status, r.error_code, r.started_at, r.completed_at "
- "FROM `content_agent_runs` r WHERE r.run_id = %s LIMIT 1",
- (run_id,),
- )
- return _json_safe(_db_run_row_to_item(row or {"run_id": run_id, "status": "unknown"}))
- def _local_run_list_item(self, run_id: str) -> dict[str, Any]:
- final_output = self._read_json_optional(run_id, "final_output.json") or {}
- run_events = self._read_jsonl_optional(run_id, "run_events.jsonl")
- lifecycle = [
- row for row in run_events if str(row.get("event_id", "")).startswith("lifecycle_")
- ]
- latest = lifecycle[-1] if lifecycle else {}
- # 派单信息(平台 / 内容形态 / 策略)落在 dispatch 里,顶层取不到——优先顶层、回退 dispatch。
- dispatch = final_output.get("dispatch") or {}
- # 需求名/描述:来自 source_context.json(1 run = 1 需求)。
- # ext_data.type 有时是干净标签("中医养生知识需求")、有时是占位垃圾("pattern");
- # name 是逗号拼接的种子词且常重复("中医养生,中医养生")。取值:有意义的 type 优先,否则用去重后的 name。
- source_ctx = self._read_json_optional(run_id, "source_context.json") or {}
- ext_data = source_ctx.get("ext_data") or {}
- raw_type = str(ext_data.get("type") or "").strip()
- good_type = raw_type if raw_type and raw_type.lower() != "pattern" else ""
- name_parts = [s.strip() for s in str(source_ctx.get("name") or "").split(",") if s.strip()]
- deduped_name = "、".join(dict.fromkeys(name_parts))
- demand_name = good_type or deduped_name or None
- demand_desc = ext_data.get("desc") or ext_data.get("reason")
- # 这些 run 没有 lifecycle_ 事件,started_at 才一直为空;改成从所有事件的 created_at 取首尾,
- # 作为运行起止时间(ISO8601 同时区可按字典序求 min/max)。
- event_times = [str(row.get("created_at")) for row in run_events if row.get("created_at")]
- return _json_safe({
- "run_id": run_id,
- "policy_run_id": final_output.get("policy_run_id"),
- "status": latest.get("status") or ("success" if final_output else "failed"),
- "current_step": "review_strategy" if final_output else "unknown",
- "platform": final_output.get("platform") or dispatch.get("platform"),
- "platform_mode": final_output.get("platform_mode") or dispatch.get("runtime_stage"),
- "content_format": final_output.get("content_format") or dispatch.get("content_format"),
- "strategy_version": final_output.get("strategy_version") or dispatch.get("strategy_version"),
- "demand_name": demand_name,
- "demand_desc": demand_desc,
- "validation_status": final_output.get("validation_status"),
- "error_code": latest.get("error_code"),
- "started_at": (latest.get("created_at") or (min(event_times) if event_times else None)),
- "completed_at": (max(event_times) if event_times else None),
- })
- def _read_json_optional(self, run_id: str, filename: str) -> dict[str, Any] | None:
- try:
- return self.runtime.read_json(run_id, filename)
- except Exception:
- if self.export_runtime:
- try:
- return self.export_runtime.read_json(run_id, filename)
- except Exception:
- return None
- return None
- def _read_jsonl_optional(self, run_id: str, filename: str) -> list[dict[str, Any]]:
- try:
- return self.runtime.read_jsonl(run_id, filename)
- except Exception:
- if self.export_runtime:
- try:
- return self.export_runtime.read_jsonl(run_id, filename)
- except Exception:
- return []
- return []
- def _validate_optional(self, run_id: str) -> dict[str, Any]:
- if isinstance(self.runtime, DatabaseRuntimeStore):
- run_item = self._db_run_item(run_id)
- status = run_item.get("validation_status")
- return {
- "run_id": run_id,
- "status": status or "unknown",
- "findings": [] if status == "pass" else [],
- }
- try:
- return validate_run(run_id, self.runtime)
- except Exception as exc:
- return {
- "run_id": run_id,
- "status": "fail",
- "findings": [{"severity": "error", "message": str(exc)}],
- }
- def _runtime_record_count(self, run_id: str, filename: str) -> int:
- if isinstance(self.runtime, DatabaseRuntimeStore):
- return self._db_runtime_file_counts(run_id).get(filename, 0)
- if filename.endswith(".jsonl"):
- return len(self._read_jsonl_optional(run_id, filename))
- return 1 if self._read_json_optional(run_id, filename) is not None else 0
- def _combined_origin(self, run_id: str) -> str:
- if not isinstance(self.runtime, DatabaseRuntimeStore):
- return DATA_ORIGIN_RUNTIME_EXPORT
- return DATA_ORIGIN_PRODUCTION_DB
- def _file_origin(self, run_id: str, filename: str) -> str:
- if not isinstance(self.runtime, DatabaseRuntimeStore):
- return DATA_ORIGIN_RUNTIME_EXPORT
- primary_exists = self.runtime.file_status(run_id).get(filename, False)
- export_exists = bool(self.export_runtime and self.export_runtime.file_status(run_id).get(filename))
- if primary_exists:
- return DATA_ORIGIN_PRODUCTION_DB
- if export_exists:
- return DATA_ORIGIN_RUNTIME_EXPORT
- return self.data_origin
- def _validate_runtime_filename(self, filename: str) -> None:
- if (
- filename not in RUNTIME_FILENAMES
- or ".." in filename
- or "/" in filename
- or "\\" in filename
- or Path(filename).is_absolute()
- ):
- raise ValueError("runtime filename is not allowed")
- def _db_runtime_file_counts(self, run_id: str) -> dict[str, int]:
- counts: dict[str, int] = {}
- with self.runtime._connection_factory() as conn:
- with conn.cursor() as cur:
- for filename in RUNTIME_FILENAMES:
- table = RUNTIME_FILE_TABLES.get(filename)
- if not table:
- counts[filename] = 0
- continue
- cur.execute(
- f"SELECT COUNT(*) AS cnt FROM `{table}` WHERE `run_id` = %s",
- (run_id,),
- )
- row = cur.fetchone() or {}
- counts[filename] = int(row.get("cnt") or 0)
- return counts
- def _db_run_filters(filters: dict[str, Any]) -> tuple[str, list[Any]]:
- allowed = {
- "status": "status",
- "platform": "platform",
- "platform_mode": "platform_mode",
- "strategy_version": "strategy_version",
- "validation_status": "validation_status",
- "error_code": "error_code",
- }
- clauses = []
- params = []
- for key, column in allowed.items():
- value = filters.get(key)
- if value:
- clauses.append(f"r.`{column}` = %s")
- params.append(value)
- if not clauses:
- return "", params
- return "WHERE " + " AND ".join(clauses), params
- def _db_run_row_to_item(row: dict[str, Any]) -> dict[str, Any]:
- return {
- "run_id": row.get("run_id"),
- "policy_run_id": row.get("policy_run_id"),
- "status": row.get("status"),
- "current_step": row.get("current_step"),
- "platform": row.get("platform"),
- "platform_mode": row.get("platform_mode"),
- "strategy_version": row.get("strategy_version"),
- "validation_status": row.get("validation_status"),
- "error_code": row.get("error_code"),
- "started_at": row.get("started_at"),
- "completed_at": row.get("completed_at"),
- }
- def _matches(item: dict[str, Any], key: str, expected: str | None) -> bool:
- return not expected or item.get(key) == expected
- def _json_safe(value: Any) -> Any:
- if isinstance(value, dict):
- return {key: _json_safe(item) for key, item in value.items()}
- if isinstance(value, list):
- return [_json_safe(item) for item in value]
- if isinstance(value, (datetime, date)):
- return value.isoformat()
- return value
- def _business_summary(
- run_item: dict[str, Any],
- counts: dict[str, int],
- final_output: dict[str, Any],
- primary_failure_reason: dict[str, Any] | None,
- ) -> dict[str, Any]:
- summary = final_output.get("summary") or {}
- status = run_item.get("status") or "unknown"
- if primary_failure_reason:
- headline = f"本次运行停在{primary_failure_reason.get('stage_label')}:{primary_failure_reason.get('reason_label')}"
- elif status == "success":
- headline = "本次运行已完成,可查看内容判断和资产沉淀结果"
- elif status == "partial_success":
- headline = "本次运行部分成功,有 query 或平台请求失败"
- elif status == "running":
- headline = "本次运行仍在进行中"
- else:
- headline = "本次运行未形成完整成功链路"
- return {
- "headline": headline,
- "status": status,
- "source_label": _source_label(run_item),
- "query_count": counts.get("queries", 0),
- "content_count": counts.get("discovered_content_items", 0),
- "kept_count": int(summary.get("pooled_content_count") or 0),
- "review_count": int(summary.get("review_content_count") or 0),
- "rejected_count": int(summary.get("rejected_content_count") or 0),
- "asset_count": int(summary.get("author_asset_count") or 0),
- "primary_failure_reason": primary_failure_reason,
- }
- def _stage_conclusions(
- files: dict[str, bool],
- counts: dict[str, int],
- run_item: dict[str, Any],
- queries: list[dict[str, Any]],
- run_events: list[dict[str, Any]],
- decisions: list[dict[str, Any]],
- walk_actions: list[dict[str, Any]],
- final_output: dict[str, Any],
- strategy_review: dict[str, Any],
- source_context: dict[str, Any],
- ) -> list[dict[str, Any]]:
- query_failures = [
- event for event in run_events
- if event.get("event_type") == "platform_query_failed"
- or event.get("status") == "failed" and event.get("search_query_id")
- ]
- decision_counts = _effect_status_counts(decisions)
- walk_counts = _walk_status_counts(walk_actions)
- final_summary = final_output.get("summary") or {}
- return [
- {
- "stage_id": "source",
- "label": "数据源",
- "status": "success" if files.get("source_context.json") else "failed",
- "headline": "真实需求已读取" if files.get("source_context.json") else "需求来源缺失",
- "detail": _source_stage_detail(source_context)
- if files.get("source_context.json")
- else "未找到 source_context,无法解释输入来源",
- "metric": "已就绪" if files.get("pattern_seed_pack.json") else "种子缺失",
- },
- {
- "stage_id": "query",
- "label": "Query",
- "status": "success" if counts.get("queries") else "failed",
- "headline": f"生成 {counts.get('queries', 0)} 条搜索意图",
- "detail": f"{len(query_failures)} 条 query 有失败记录,其余进入平台搜索或后续链路",
- "metric": _query_generation_label(queries, run_events),
- },
- {
- "stage_id": "platform",
- "label": "平台 / 内容",
- "status": "success" if counts.get("discovered_content_items") else "failed",
- "headline": f"发现 {counts.get('discovered_content_items', 0)} 条内容",
- "detail": "平台结果已进入内容发现"
- if counts.get("discovered_content_items")
- else "未形成发现内容,通常需要先查看平台请求或 query 失败原因",
- "metric": f"{len(query_failures)} 条平台失败",
- },
- {
- "stage_id": "judge",
- "label": "判断",
- "status": "success" if counts.get("rule_decisions") else "pending",
- "headline": f"{counts.get('rule_decisions', 0)} 条内容完成判断",
- "detail": _decision_status_label(decision_counts),
- "metric": f"入池 {decision_counts.get('success', 0)} / 阻断 {decision_counts.get('rule_blocked', 0)}",
- },
- {
- "stage_id": "walk",
- "label": "游走",
- "status": "success" if counts.get("walk_actions") else "pending",
- "headline": f"触发 {counts.get('walk_actions', 0)} 个游走动作",
- "detail": _walk_status_label(walk_counts),
- "metric": f"成功 {walk_counts.get('success', 0)} / 失败 {walk_counts.get('failed', 0)}",
- },
- {
- "stage_id": "asset",
- "label": "资产",
- "status": "success" if files.get("final_output.json") else "pending",
- "headline": f"沉淀 {final_summary.get('pooled_content_count', 0) or 0} 条内容资产",
- "detail": f"待复看 {final_summary.get('review_content_count', 0) or 0},淘汰 {final_summary.get('rejected_content_count', 0) or 0}",
- "metric": f"作者资产 {final_summary.get('author_asset_count', 0) or 0}",
- },
- {
- "stage_id": "learning",
- "label": "学习",
- "status": "success" if strategy_review.get("review_status") == "generated" else "pending",
- "headline": "策略复盘已生成"
- if strategy_review.get("review_status") == "generated"
- else "策略复盘未生成",
- "detail": "可查看 query / rule / walk 的优化建议"
- if strategy_review.get("review_status") == "generated"
- else "当前 run 还没有可展示的策略学习建议",
- "metric": strategy_review.get("review_status", "not_generated"),
- },
- ]
- def _rule_application_summary(
- decisions: list[dict[str, Any]],
- content_items: list[dict[str, Any]],
- ) -> list[dict[str, Any]]:
- content_by_id = {
- item.get("content_discovery_id") or item.get("platform_content_id"): item
- for item in content_items
- }
- summaries = []
- for decision in decisions:
- replay = decision.get("decision_replay_data") or {}
- target_id = decision.get("decision_target_id")
- content = content_by_id.get(target_id) or {}
- triggered_rules = decision.get("triggered_blocking_rules") or []
- scorecard = decision.get("scorecard") or {}
- summaries.append({
- "decision_id": decision.get("decision_id"),
- "content_title": content.get("title") or decision.get("decision_target_id"),
- "platform_content_id": content.get("platform_content_id") or decision.get("decision_target_id"),
- "rule_pack": decision.get("rule_pack_id")
- or replay.get("rule_pack_id")
- or "Content Rule Pack V1",
- "hard_gate_status": "没通过" if triggered_rules else "通过",
- "score": decision.get("score") or scorecard.get("total_score"),
- "decision_action": decision.get("decision_action"),
- "decision_reason_code": decision.get("decision_reason_code"),
- # 与 _effect_status_counts 同口径: decision 正式字段是 search_query_effect_status,
- # content_effect_status 仅旧数据回退(decision 记录从无该字段时原代码恒读 None)。
- "content_effect_status": decision.get("search_query_effect_status")
- or decision.get("content_effect_status"),
- "primary_reason": _reason_label(decision.get("decision_reason_code")),
- "technical_ref": {
- "decision_id": decision.get("decision_id"),
- "target_id": decision.get("decision_target_id"),
- "has_replay_data": bool(replay),
- },
- })
- return summaries
- def _walk_graph(
- queries: list[dict[str, Any]],
- content_items: list[dict[str, Any]],
- walk_actions: list[dict[str, Any]],
- source_paths: list[dict[str, Any]],
- ) -> dict[str, Any]:
- nodes: dict[str, dict[str, Any]] = {
- "source": {
- "id": "source",
- "type": "source",
- "label": "需求",
- "status": "success",
- }
- }
- edges: list[dict[str, Any]] = []
- for query in queries[:12]:
- node_id = f"query:{query.get('search_query_id')}"
- nodes[node_id] = {
- "id": node_id,
- "type": "query",
- "label": query.get("search_query") or query.get("search_query_id"),
- "status": query.get("search_query_effect_status") or "success",
- }
- edges.append({
- "id": f"source->{node_id}",
- "source": "source",
- "target": node_id,
- "label": query.get("search_query_generation_method") or "query",
- "status": "success",
- "rule_pack": None,
- })
- for content in content_items[:20]:
- node_id = f"content:{content.get('platform_content_id') or content.get('content_discovery_id')}"
- query_id = content.get("search_query_id")
- source_id = f"query:{query_id}" if query_id else "source"
- nodes[node_id] = {
- "id": node_id,
- "type": "content",
- "label": content.get("title") or content.get("platform_content_id") or "内容",
- "status": (content.get("pattern_match_result") or {}).get("recall_status") or "pending",
- }
- edges.append({
- "id": f"{source_id}->{node_id}",
- "source": source_id if source_id in nodes else "source",
- "target": node_id,
- "label": "搜索命中",
- "status": "success",
- "rule_pack": "Content Rule Pack V1",
- })
- for action in walk_actions:
- source_id = _walk_node_id(action.get("from_node_type"), action.get("from_node_id"))
- target_id = _walk_node_id(action.get("to_node_type"), action.get("to_node_id"))
- nodes.setdefault(source_id, {
- "id": source_id,
- "type": action.get("from_node_type") or "node",
- "label": action.get("from_node_id") or "起点",
- "status": "success",
- })
- nodes.setdefault(target_id, {
- "id": target_id,
- "type": action.get("to_node_type") or "node",
- "label": action.get("to_node_id") or action.get("edge_type") or "下一跳",
- "status": action.get("walk_status") or "pending",
- })
- execution = (action.get("raw_payload") or {}).get("rule_pack_execution") or {}
- edges.append({
- "id": action.get("walk_action_id") or f"{source_id}->{target_id}",
- "source": source_id,
- "target": target_id,
- "label": action.get("edge_id") or action.get("edge_type") or action.get("walk_action"),
- "status": action.get("walk_status") or "pending",
- "rule_pack": action.get("rule_pack_id"),
- "rule_pack_executed": execution.get("executed"),
- "executed_rule_pack_id": execution.get("executed_rule_pack_id"),
- "budget_tier": action.get("budget_tier"),
- "reason_code": action.get("reason_code"),
- })
- return {
- "nodes": list(nodes.values()),
- "edges": edges,
- "source_path_count": len(source_paths),
- }
- def _primary_failure_reason(
- run_item: dict[str, Any],
- run_events: list[dict[str, Any]],
- ) -> dict[str, Any] | None:
- error_code = run_item.get("error_code")
- if not error_code and run_item.get("status") not in {"failed", "partial_success"}:
- return None
- failed_events = [
- event for event in run_events
- if event.get("status") == "failed" or event.get("error_code")
- ]
- event = failed_events[-1] if failed_events else {}
- code = error_code or event.get("error_code") or "UNKNOWN"
- return {
- "reason_code": code,
- "reason_label": _reason_label(code),
- "stage": event.get("stage") or _stage_from_error_code(code),
- "stage_label": _stage_label(event.get("stage") or _stage_from_error_code(code)),
- "message": event.get("message") or run_item.get("error_message") or code,
- }
- def _source_label(run_item: dict[str, Any]) -> str:
- demand_id = run_item.get("demand_content_id")
- if demand_id:
- return f"真实需求 #{demand_id}"
- return "真实需求池 / 默认样例"
- def _source_stage_detail(source_context: dict[str, Any]) -> str:
- demand_id = source_context.get("demand_content_id") or source_context.get("id")
- if demand_id:
- return f"需求池 ID:{demand_id}"
- raw_demand = source_context.get("raw_demand_content")
- if isinstance(raw_demand, dict) and raw_demand.get("id"):
- return f"需求池 ID:{raw_demand.get('id')}"
- return "需求池 ID:缺失"
- def _query_generation_label(queries: list[dict[str, Any]], run_events: list[dict[str, Any]]) -> str:
- generated_count = len(queries)
- generation_failed = any(
- event.get("error_code") == "QUERY_GENERATION_FAILED"
- or (event.get("event_type") == "query_generation_failed")
- for event in run_events
- )
- if generated_count and generation_failed:
- return f"{generated_count} 条生成成功 / 生成失败"
- if generated_count:
- return f"{generated_count} 条生成成功"
- if generation_failed:
- return "生成失败"
- return "0 条生成成功"
- DECODE_EVENT_TYPES = (
- "decode_submitted",
- "decode_polling",
- "decode_succeeded",
- "decode_failed",
- "decode_timeout",
- )
- def _timeline_summary(
- events: list[dict[str, Any]],
- walk_actions: list[dict[str, Any]],
- source_paths: list[dict[str, Any]],
- ) -> dict[str, Any]:
- stage_duration_ms: dict[str, int] = {}
- error_counts: dict[str, int] = {}
- decode_event_counts: dict[str, int] = {}
- platform_rate_limited_count = 0
- run_started_at: str | None = None
- run_ended_at: str | None = None
- for event in events:
- event_type = str(event.get("event_type") or "")
- event_id = str(event.get("event_id") or "")
- payload = event.get("raw_payload") or {}
- if event_type in {"stage_completed", "stage_failed"}:
- stage = payload.get("stage")
- duration = payload.get("duration_ms")
- if stage and isinstance(duration, (int, float)):
- stage_duration_ms[stage] = stage_duration_ms.get(stage, 0) + int(duration)
- error_code = event.get("error_code")
- if error_code:
- error_counts[str(error_code)] = error_counts.get(str(error_code), 0) + 1
- if error_code == "PLATFORM_RATE_LIMITED":
- platform_rate_limited_count += 1
- if event_type in DECODE_EVENT_TYPES:
- key = event_type.removeprefix("decode_")
- decode_event_counts[key] = decode_event_counts.get(key, 0) + 1
- if event_type == "run_started" or event_id.startswith("lifecycle_start"):
- run_started_at = run_started_at or event.get("created_at")
- if event_type in {"run_succeeded", "run_failed"} or event_id.startswith(
- ("lifecycle_success", "lifecycle_failed")
- ):
- run_ended_at = event.get("created_at")
- # 固定回退顺序: run started/completed 差值 -> stage 耗时求和 -> null,不估算。
- total_duration_ms: int | None = None
- if run_started_at and run_ended_at:
- try:
- total_duration_ms = int(
- (
- datetime.fromisoformat(str(run_ended_at))
- - datetime.fromisoformat(str(run_started_at))
- ).total_seconds()
- * 1000
- )
- except ValueError:
- total_duration_ms = None
- if total_duration_ms is None and stage_duration_ms:
- total_duration_ms = sum(stage_duration_ms.values())
- # 唯一来源: walk_actions 中 query 链动作(query_next_page/hashtag_to_query)且 walk_status=="failed";
- # 不读 run_events 失败事件,不求并集。首轮 keyword 搜索失败属 stage 失败,计入 error_counts。
- query_failure_count = sum(
- 1
- for action in walk_actions
- if action.get("edge_id") in {"query_next_page", "hashtag_to_query"}
- and action.get("walk_status") == "failed"
- )
- # V3 判定为 Gemini 直读,正常 run 无 decode 事件,此计数恒 {};仅当历史数据带 decode 事件时呈现。
- decode_status_counts = decode_event_counts
- return {
- "total_duration_ms": total_duration_ms,
- "stage_duration_ms": stage_duration_ms,
- "query_failure_count": query_failure_count,
- "platform_rate_limited_count": platform_rate_limited_count,
- "decode_status_counts": decode_status_counts,
- "error_counts": error_counts,
- "walk_status_counts": _walk_status_counts(walk_actions),
- }
- def _effect_status_counts(decisions: list[dict[str, Any]]) -> dict[str, int]:
- counts: dict[str, int] = {}
- for decision in decisions:
- status = str(
- decision.get("search_query_effect_status")
- or decision.get("content_effect_status")
- or "unknown"
- )
- counts[status] = counts.get(status, 0) + 1
- return counts
- def _walk_status_counts(walk_actions: list[dict[str, Any]]) -> dict[str, int]:
- counts: dict[str, int] = {}
- for action in walk_actions:
- status = str(action.get("walk_status") or "unknown")
- counts[status] = counts.get(status, 0) + 1
- return counts
- def _decision_status_label(counts: dict[str, int]) -> str:
- if not counts:
- return "当前没有内容进入规则判断"
- return ",".join(f"{_reason_label(key)} {value}" for key, value in counts.items())
- def _walk_status_label(counts: dict[str, int]) -> str:
- if not counts:
- return "当前没有触发游走动作"
- return ",".join(f"{_reason_label(key)} {value}" for key, value in counts.items())
- def _reason_label(code: Any) -> str:
- labels = {
- "success": "成功",
- "pending": "待复看",
- "failed": "失败",
- "rule_blocked": "规则阻断",
- "PLATFORM_REQUEST_FAILED": "平台请求失败",
- "QUERY_GENERATION_FAILED": "Query 生成失败",
- "INVALID_SOURCE": "需求来源无效",
- "missing_score": "分数缺失",
- "missing_source_evidence": "来源证据缺失",
- "high_risk_content": "高风险内容",
- "budget_exhausted": "预算耗尽",
- }
- value = str(code or "unknown")
- return labels.get(value, value)
- def _stage_from_error_code(code: Any) -> str:
- value = str(code or "")
- if "QUERY" in value:
- return "query"
- if "PLATFORM" in value:
- return "platform"
- if "SOURCE" in value:
- return "source"
- return "run"
- def _stage_label(stage: Any) -> str:
- labels = {
- "source": "数据源阶段",
- "query": "Query 阶段",
- "platform": "平台搜索阶段",
- "judge": "规则判断阶段",
- "walk": "游走阶段",
- "run": "运行阶段",
- }
- return labels.get(str(stage or "run"), str(stage or "运行阶段"))
- def _walk_node_id(node_type: Any, node_id: Any) -> str:
- return f"{node_type or 'node'}:{node_id or 'unknown'}"
|