dashboard_service.py 43 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043
  1. from __future__ import annotations
  2. from datetime import date, datetime
  3. from pathlib import Path
  4. from typing import Any
  5. from content_agent.business_modules.run_record import validate_run
  6. from content_agent.integrations.composite_runtime import CompositeRuntimeStore
  7. from content_agent.integrations.database_runtime import DatabaseRuntimeStore
  8. from content_agent.integrations.database_runtime import RUNTIME_FILE_TABLES
  9. from content_agent.integrations.runtime_files import RUNTIME_FILENAMES
  10. from content_agent.interfaces import RuntimeStore
  11. DATA_ORIGIN_PRODUCTION_DB = "production_db"
  12. DATA_ORIGIN_RUNTIME_EXPORT = "runtime_export"
  13. DATA_ORIGIN_MIXED = "mixed_with_runtime_export"
  14. class DashboardService:
  15. def __init__(
  16. self,
  17. runtime: RuntimeStore,
  18. export_runtime: RuntimeStore | None = None,
  19. ) -> None:
  20. self.runtime = runtime
  21. self.export_runtime = export_runtime
  22. @classmethod
  23. def from_runtime(cls, runtime: RuntimeStore) -> "DashboardService":
  24. if isinstance(runtime, CompositeRuntimeStore):
  25. return cls(runtime.primary, runtime.export)
  26. return cls(runtime)
  27. @property
  28. def data_origin(self) -> str:
  29. if isinstance(self.runtime, DatabaseRuntimeStore):
  30. return DATA_ORIGIN_PRODUCTION_DB
  31. return DATA_ORIGIN_RUNTIME_EXPORT
  32. def run_exists(self, run_id: str) -> bool:
  33. if isinstance(self.runtime, DatabaseRuntimeStore):
  34. row = self.runtime._fetch_one(
  35. "SELECT COUNT(*) AS cnt FROM `content_agent_runs` WHERE `run_id` = %s",
  36. (run_id,),
  37. )
  38. return bool(row and row.get("cnt", 0))
  39. try:
  40. return self.runtime.run_dir(run_id).exists()
  41. except Exception:
  42. return any(self.runtime.file_status(run_id).values())
  43. def list_runs(
  44. self,
  45. *,
  46. status: str | None = None,
  47. platform: str | None = None,
  48. platform_mode: str | None = None,
  49. strategy_version: str | None = None,
  50. validation_status: str | None = None,
  51. error_code: str | None = None,
  52. page: int = 1,
  53. page_size: int = 20,
  54. ) -> dict[str, Any]:
  55. page = max(page, 1)
  56. page_size = min(max(page_size, 1), 100)
  57. if isinstance(self.runtime, DatabaseRuntimeStore):
  58. return self._list_db_runs(
  59. status=status,
  60. platform=platform,
  61. platform_mode=platform_mode,
  62. strategy_version=strategy_version,
  63. validation_status=validation_status,
  64. error_code=error_code,
  65. page=page,
  66. page_size=page_size,
  67. )
  68. items = [
  69. self._local_run_list_item(run_id)
  70. for run_id in getattr(self.runtime, "list_runs", lambda: [])()
  71. ]
  72. items = [
  73. item
  74. for item in items
  75. if _matches(item, "status", status)
  76. and _matches(item, "platform", platform)
  77. and _matches(item, "platform_mode", platform_mode)
  78. and _matches(item, "strategy_version", strategy_version)
  79. and _matches(item, "validation_status", validation_status)
  80. and _matches(item, "error_code", error_code)
  81. ]
  82. total = len(items)
  83. offset = (page - 1) * page_size
  84. return {
  85. "items": items[offset:offset + page_size],
  86. "page": page,
  87. "page_size": page_size,
  88. "total": total,
  89. "data_origin": self.data_origin,
  90. }
  91. def dashboard(self, run_id: str) -> dict[str, Any]:
  92. runtime_files_response = self.runtime_files(run_id)
  93. runtime_files = runtime_files_response["files"]
  94. file_status = {item["filename"]: item["exists"] for item in runtime_files}
  95. record_counts = {item["filename"]: item["record_count"] for item in runtime_files}
  96. final_output = self._read_json_optional(run_id, "final_output.json")
  97. strategy_review = self._read_json_optional(run_id, "strategy_review.json")
  98. source_context = self._read_json_optional(run_id, "source_context.json") or {}
  99. validation = self._validate_optional(run_id)
  100. run_item = self._db_run_item(run_id) if isinstance(self.runtime, DatabaseRuntimeStore) else self._local_run_list_item(run_id)
  101. queries = self._read_jsonl_optional(run_id, "search_queries.jsonl")
  102. run_events = self._read_jsonl_optional(run_id, "run_events.jsonl")
  103. content_items = self._read_jsonl_optional(run_id, "discovered_content_items.jsonl")
  104. decisions = self._read_jsonl_optional(run_id, "rule_decisions.jsonl")
  105. walk_actions = self._read_jsonl_optional(run_id, "walk_actions.jsonl")
  106. source_paths = self._read_jsonl_optional(run_id, "source_path_records.jsonl")
  107. counts = {
  108. "queries": record_counts.get("search_queries.jsonl", 0),
  109. "discovered_content_items": record_counts.get("discovered_content_items.jsonl", 0),
  110. "rule_decisions": record_counts.get("rule_decisions.jsonl", 0),
  111. "walk_actions": record_counts.get("walk_actions.jsonl", 0),
  112. "source_path_records": record_counts.get("source_path_records.jsonl", 0),
  113. "run_events": record_counts.get("run_events.jsonl", 0),
  114. }
  115. primary_failure_reason = _primary_failure_reason(run_item, run_events)
  116. return _json_safe({
  117. "run_id": run_id,
  118. "summary": run_item,
  119. "counts": counts,
  120. "files": file_status,
  121. "runtime_files": runtime_files,
  122. "validation": validation,
  123. "final_output_summary": (final_output or {}).get("summary", {}),
  124. "strategy_review_status": (strategy_review or {}).get(
  125. "review_status",
  126. "not_generated",
  127. ),
  128. "business_summary": _business_summary(
  129. run_item,
  130. counts,
  131. final_output or {},
  132. primary_failure_reason,
  133. ),
  134. "stage_conclusions": _stage_conclusions(
  135. file_status,
  136. counts,
  137. run_item,
  138. queries,
  139. run_events,
  140. decisions,
  141. walk_actions,
  142. final_output or {},
  143. strategy_review or {},
  144. source_context,
  145. ),
  146. "rule_application_summary": _rule_application_summary(decisions, content_items),
  147. "walk_graph": _walk_graph(queries, content_items, walk_actions, source_paths),
  148. "primary_failure_reason": primary_failure_reason,
  149. "technical_refs": {
  150. "runtime_files_url": f"/runs/{run_id}/runtime-files",
  151. "validation_url": f"/runs/{run_id}/validation",
  152. "data_origin": runtime_files_response["data_origin"],
  153. "runtime_file_count": len(runtime_files),
  154. },
  155. "data_origin": runtime_files_response["data_origin"],
  156. "links": {
  157. "queries": f"/runs/{run_id}/queries",
  158. "timeline": f"/runs/{run_id}/timeline",
  159. "content_items": f"/runs/{run_id}/content-items",
  160. "runtime_files": f"/runs/{run_id}/runtime-files",
  161. },
  162. })
  163. def runtime_files(self, run_id: str) -> dict[str, Any]:
  164. if isinstance(self.runtime, DatabaseRuntimeStore):
  165. counts = self._db_runtime_file_counts(run_id)
  166. files = [
  167. {
  168. "filename": filename,
  169. "exists": counts.get(filename, 0) > 0,
  170. "record_count": counts.get(filename, 0),
  171. "file_type": "jsonl" if filename.endswith(".jsonl") else "json",
  172. "contract_status": "current",
  173. "data_origin": DATA_ORIGIN_PRODUCTION_DB,
  174. }
  175. for filename in RUNTIME_FILENAMES
  176. ]
  177. return {
  178. "run_id": run_id,
  179. "files": files,
  180. "data_origin": DATA_ORIGIN_PRODUCTION_DB,
  181. }
  182. status = self.runtime.file_status(run_id)
  183. files = []
  184. for filename in RUNTIME_FILENAMES:
  185. exists = bool(status.get(filename))
  186. files.append({
  187. "filename": filename,
  188. "exists": exists,
  189. "record_count": self._runtime_record_count(run_id, filename) if exists else 0,
  190. "file_type": "jsonl" if filename.endswith(".jsonl") else "json",
  191. "contract_status": "current",
  192. "data_origin": self._file_origin(run_id, filename),
  193. })
  194. return {
  195. "run_id": run_id,
  196. "files": files,
  197. "data_origin": self._combined_origin(run_id),
  198. }
  199. def runtime_file(
  200. self,
  201. run_id: str,
  202. filename: str,
  203. *,
  204. limit: int = 100,
  205. offset: int = 0,
  206. ) -> dict[str, Any]:
  207. self._validate_runtime_filename(filename)
  208. limit = min(max(limit, 1), 500)
  209. offset = max(offset, 0)
  210. if filename.endswith(".jsonl"):
  211. records = self._read_jsonl_optional(run_id, filename)
  212. return {
  213. "run_id": run_id,
  214. "filename": filename,
  215. "records": _json_safe(records[offset:offset + limit]),
  216. "offset": offset,
  217. "limit": limit,
  218. "total": len(records),
  219. "data_origin": self._file_origin(run_id, filename),
  220. }
  221. data = self._read_json_optional(run_id, filename) or {}
  222. return {
  223. "run_id": run_id,
  224. "filename": filename,
  225. "data": _json_safe(data),
  226. "data_origin": self._file_origin(run_id, filename),
  227. }
  228. def queries(self, run_id: str) -> dict[str, Any]:
  229. queries = self._read_jsonl_optional(run_id, "search_queries.jsonl")
  230. clues_by_query = {
  231. row.get("search_query_id"): row
  232. for row in self._read_jsonl_optional(run_id, "search_clues.jsonl")
  233. }
  234. decisions = self._read_jsonl_optional(run_id, "rule_decisions.jsonl")
  235. decision_counts: dict[str, dict[str, int]] = {}
  236. for decision in decisions:
  237. query_id = decision.get("search_query_id")
  238. if not query_id:
  239. continue
  240. counts = decision_counts.setdefault(query_id, {})
  241. action = decision.get("decision_action") or "unknown"
  242. counts[action] = counts.get(action, 0) + 1
  243. items = []
  244. for query in queries:
  245. query_id = query.get("search_query_id")
  246. clue = clues_by_query.get(query_id) or {}
  247. items.append(_json_safe({
  248. **query,
  249. "search_clue": clue,
  250. "decision_action_counts": decision_counts.get(query_id, {}),
  251. "search_query_effect_status": clue.get(
  252. "search_query_effect_status",
  253. query.get("search_query_effect_status"),
  254. ),
  255. "walk_next_step": clue.get("walk_next_step"),
  256. "failure_reason": clue.get("failure_reason")
  257. or (clue.get("raw_payload") or {}).get("failure_reason"),
  258. }))
  259. return {
  260. "run_id": run_id,
  261. "items": items,
  262. "total": len(items),
  263. "data_origin": self._combined_origin(run_id),
  264. }
  265. def timeline(self, run_id: str) -> dict[str, Any]:
  266. run_event_rows = self._read_jsonl_optional(run_id, "run_events.jsonl")
  267. walk_action_rows = self._read_jsonl_optional(run_id, "walk_actions.jsonl")
  268. source_path_rows = self._read_jsonl_optional(run_id, "source_path_records.jsonl")
  269. events = [
  270. {
  271. "source": "run_events.jsonl",
  272. "stage": row.get("stage") or row.get("event_type"),
  273. "event_type": row.get("event_type"),
  274. "status": row.get("status"),
  275. "timestamp": row.get("created_at") or row.get("timestamp"),
  276. "error_code": row.get("error_code"),
  277. "walk_action_id": (row.get("raw_payload") or {}).get("walk_action_id"),
  278. "record": row,
  279. }
  280. for row in run_event_rows
  281. ]
  282. events.extend(
  283. {
  284. "source": "walk_actions.jsonl",
  285. "stage": "walk",
  286. "event_type": row.get("edge_type"),
  287. "status": row.get("walk_status"),
  288. "timestamp": row.get("created_at"),
  289. "error_code": None,
  290. "walk_action_id": row.get("walk_action_id"),
  291. "record": row,
  292. }
  293. for row in walk_action_rows
  294. )
  295. events.extend(
  296. {
  297. "source": "source_path_records.jsonl",
  298. "stage": "source_path",
  299. "event_type": row.get("source_path_type"),
  300. "status": row.get("status"),
  301. "timestamp": row.get("created_at"),
  302. "error_code": None,
  303. "walk_action_id": row.get("walk_action_id")
  304. or (row.get("raw_payload") or {}).get("walk_action_id"),
  305. "record": row,
  306. }
  307. for row in source_path_rows
  308. )
  309. events.sort(key=lambda item: str(item.get("timestamp") or ""))
  310. return {
  311. "run_id": run_id,
  312. "items": _json_safe(events),
  313. "total": len(events),
  314. "data_origin": self._combined_origin(run_id),
  315. "summary": _timeline_summary(
  316. run_event_rows, walk_action_rows, source_path_rows
  317. ),
  318. }
  319. def content_items(self, run_id: str) -> dict[str, Any]:
  320. media_by_platform_id = {
  321. row.get("platform_content_id"): row
  322. for row in self._read_jsonl_optional(run_id, "content_media_records.jsonl")
  323. }
  324. recall_by_platform_id = {
  325. row.get("platform_content_id"): row
  326. for row in self._read_jsonl_optional(run_id, "pattern_recall_evidence.jsonl")
  327. }
  328. decisions_by_target = {
  329. row.get("decision_target_id"): row
  330. for row in self._read_jsonl_optional(run_id, "rule_decisions.jsonl")
  331. }
  332. items = []
  333. for content in self._read_jsonl_optional(run_id, "discovered_content_items.jsonl"):
  334. platform_content_id = content.get("platform_content_id")
  335. discovery_id = content.get("content_discovery_id")
  336. items.append(_json_safe({
  337. **content,
  338. "media_record": media_by_platform_id.get(platform_content_id),
  339. "pattern_recall_evidence": recall_by_platform_id.get(platform_content_id),
  340. "rule_decision": decisions_by_target.get(discovery_id)
  341. or decisions_by_target.get(platform_content_id),
  342. }))
  343. return {
  344. "run_id": run_id,
  345. "items": items,
  346. "total": len(items),
  347. "data_origin": self._combined_origin(run_id),
  348. }
  349. def _list_db_runs(self, **filters: Any) -> dict[str, Any]:
  350. page = filters.pop("page")
  351. page_size = filters.pop("page_size")
  352. where_sql, params = _db_run_filters(filters)
  353. count_row = self.runtime._fetch_one(
  354. f"SELECT COUNT(*) AS cnt FROM `content_agent_runs` r {where_sql}",
  355. tuple(params),
  356. )
  357. rows = self.runtime._fetch_all(
  358. "SELECT r.run_id, "
  359. "(SELECT p.policy_run_id FROM `content_agent_policy_runs` p "
  360. "WHERE p.run_id = r.run_id ORDER BY p.id DESC LIMIT 1) AS policy_run_id, "
  361. "r.status, r.current_step, r.platform, r.platform_mode, r.strategy_version, "
  362. "r.validation_status, r.error_code, r.started_at, r.completed_at "
  363. f"FROM `content_agent_runs` r {where_sql} "
  364. "ORDER BY r.started_at DESC, r.id DESC LIMIT %s OFFSET %s",
  365. tuple([*params, page_size, (page - 1) * page_size]),
  366. )
  367. return {
  368. "items": [_json_safe(_db_run_row_to_item(row)) for row in rows],
  369. "page": page,
  370. "page_size": page_size,
  371. "total": int((count_row or {}).get("cnt") or 0),
  372. "data_origin": self.data_origin,
  373. }
  374. def _db_run_item(self, run_id: str) -> dict[str, Any]:
  375. row = self.runtime._fetch_one(
  376. "SELECT r.run_id, "
  377. "(SELECT p.policy_run_id FROM `content_agent_policy_runs` p "
  378. "WHERE p.run_id = r.run_id ORDER BY p.id DESC LIMIT 1) AS policy_run_id, "
  379. "r.status, r.current_step, r.platform, r.platform_mode, r.strategy_version, "
  380. "r.validation_status, r.error_code, r.started_at, r.completed_at "
  381. "FROM `content_agent_runs` r WHERE r.run_id = %s LIMIT 1",
  382. (run_id,),
  383. )
  384. return _json_safe(_db_run_row_to_item(row or {"run_id": run_id, "status": "unknown"}))
  385. def _local_run_list_item(self, run_id: str) -> dict[str, Any]:
  386. final_output = self._read_json_optional(run_id, "final_output.json") or {}
  387. run_events = self._read_jsonl_optional(run_id, "run_events.jsonl")
  388. lifecycle = [
  389. row for row in run_events if str(row.get("event_id", "")).startswith("lifecycle_")
  390. ]
  391. latest = lifecycle[-1] if lifecycle else {}
  392. # 派单信息(平台 / 内容形态 / 策略)落在 dispatch 里,顶层取不到——优先顶层、回退 dispatch。
  393. dispatch = final_output.get("dispatch") or {}
  394. # 需求名/描述:来自 source_context.json(1 run = 1 需求)。
  395. # ext_data.type 有时是干净标签("中医养生知识需求")、有时是占位垃圾("pattern");
  396. # name 是逗号拼接的种子词且常重复("中医养生,中医养生")。取值:有意义的 type 优先,否则用去重后的 name。
  397. source_ctx = self._read_json_optional(run_id, "source_context.json") or {}
  398. ext_data = source_ctx.get("ext_data") or {}
  399. raw_type = str(ext_data.get("type") or "").strip()
  400. good_type = raw_type if raw_type and raw_type.lower() != "pattern" else ""
  401. name_parts = [s.strip() for s in str(source_ctx.get("name") or "").split(",") if s.strip()]
  402. deduped_name = "、".join(dict.fromkeys(name_parts))
  403. demand_name = good_type or deduped_name or None
  404. demand_desc = ext_data.get("desc") or ext_data.get("reason")
  405. # 这些 run 没有 lifecycle_ 事件,started_at 才一直为空;改成从所有事件的 created_at 取首尾,
  406. # 作为运行起止时间(ISO8601 同时区可按字典序求 min/max)。
  407. event_times = [str(row.get("created_at")) for row in run_events if row.get("created_at")]
  408. return _json_safe({
  409. "run_id": run_id,
  410. "policy_run_id": final_output.get("policy_run_id"),
  411. "status": latest.get("status") or ("success" if final_output else "failed"),
  412. "current_step": "review_strategy" if final_output else "unknown",
  413. "platform": final_output.get("platform") or dispatch.get("platform"),
  414. "platform_mode": final_output.get("platform_mode") or dispatch.get("runtime_stage"),
  415. "content_format": final_output.get("content_format") or dispatch.get("content_format"),
  416. "strategy_version": final_output.get("strategy_version") or dispatch.get("strategy_version"),
  417. "demand_name": demand_name,
  418. "demand_desc": demand_desc,
  419. "validation_status": final_output.get("validation_status"),
  420. "error_code": latest.get("error_code"),
  421. "started_at": (latest.get("created_at") or (min(event_times) if event_times else None)),
  422. "completed_at": (max(event_times) if event_times else None),
  423. })
  424. def _read_json_optional(self, run_id: str, filename: str) -> dict[str, Any] | None:
  425. try:
  426. return self.runtime.read_json(run_id, filename)
  427. except Exception:
  428. if self.export_runtime:
  429. try:
  430. return self.export_runtime.read_json(run_id, filename)
  431. except Exception:
  432. return None
  433. return None
  434. def _read_jsonl_optional(self, run_id: str, filename: str) -> list[dict[str, Any]]:
  435. try:
  436. return self.runtime.read_jsonl(run_id, filename)
  437. except Exception:
  438. if self.export_runtime:
  439. try:
  440. return self.export_runtime.read_jsonl(run_id, filename)
  441. except Exception:
  442. return []
  443. return []
  444. def _validate_optional(self, run_id: str) -> dict[str, Any]:
  445. if isinstance(self.runtime, DatabaseRuntimeStore):
  446. run_item = self._db_run_item(run_id)
  447. status = run_item.get("validation_status")
  448. return {
  449. "run_id": run_id,
  450. "status": status or "unknown",
  451. "findings": [] if status == "pass" else [],
  452. }
  453. try:
  454. return validate_run(run_id, self.runtime)
  455. except Exception as exc:
  456. return {
  457. "run_id": run_id,
  458. "status": "fail",
  459. "findings": [{"severity": "error", "message": str(exc)}],
  460. }
  461. def _runtime_record_count(self, run_id: str, filename: str) -> int:
  462. if isinstance(self.runtime, DatabaseRuntimeStore):
  463. return self._db_runtime_file_counts(run_id).get(filename, 0)
  464. if filename.endswith(".jsonl"):
  465. return len(self._read_jsonl_optional(run_id, filename))
  466. return 1 if self._read_json_optional(run_id, filename) is not None else 0
  467. def _combined_origin(self, run_id: str) -> str:
  468. if not isinstance(self.runtime, DatabaseRuntimeStore):
  469. return DATA_ORIGIN_RUNTIME_EXPORT
  470. return DATA_ORIGIN_PRODUCTION_DB
  471. def _file_origin(self, run_id: str, filename: str) -> str:
  472. if not isinstance(self.runtime, DatabaseRuntimeStore):
  473. return DATA_ORIGIN_RUNTIME_EXPORT
  474. primary_exists = self.runtime.file_status(run_id).get(filename, False)
  475. export_exists = bool(self.export_runtime and self.export_runtime.file_status(run_id).get(filename))
  476. if primary_exists:
  477. return DATA_ORIGIN_PRODUCTION_DB
  478. if export_exists:
  479. return DATA_ORIGIN_RUNTIME_EXPORT
  480. return self.data_origin
  481. def _validate_runtime_filename(self, filename: str) -> None:
  482. if (
  483. filename not in RUNTIME_FILENAMES
  484. or ".." in filename
  485. or "/" in filename
  486. or "\\" in filename
  487. or Path(filename).is_absolute()
  488. ):
  489. raise ValueError("runtime filename is not allowed")
  490. def _db_runtime_file_counts(self, run_id: str) -> dict[str, int]:
  491. counts: dict[str, int] = {}
  492. with self.runtime._connection_factory() as conn:
  493. with conn.cursor() as cur:
  494. for filename in RUNTIME_FILENAMES:
  495. table = RUNTIME_FILE_TABLES.get(filename)
  496. if not table:
  497. counts[filename] = 0
  498. continue
  499. cur.execute(
  500. f"SELECT COUNT(*) AS cnt FROM `{table}` WHERE `run_id` = %s",
  501. (run_id,),
  502. )
  503. row = cur.fetchone() or {}
  504. counts[filename] = int(row.get("cnt") or 0)
  505. return counts
  506. def _db_run_filters(filters: dict[str, Any]) -> tuple[str, list[Any]]:
  507. allowed = {
  508. "status": "status",
  509. "platform": "platform",
  510. "platform_mode": "platform_mode",
  511. "strategy_version": "strategy_version",
  512. "validation_status": "validation_status",
  513. "error_code": "error_code",
  514. }
  515. clauses = []
  516. params = []
  517. for key, column in allowed.items():
  518. value = filters.get(key)
  519. if value:
  520. clauses.append(f"r.`{column}` = %s")
  521. params.append(value)
  522. if not clauses:
  523. return "", params
  524. return "WHERE " + " AND ".join(clauses), params
  525. def _db_run_row_to_item(row: dict[str, Any]) -> dict[str, Any]:
  526. return {
  527. "run_id": row.get("run_id"),
  528. "policy_run_id": row.get("policy_run_id"),
  529. "status": row.get("status"),
  530. "current_step": row.get("current_step"),
  531. "platform": row.get("platform"),
  532. "platform_mode": row.get("platform_mode"),
  533. "strategy_version": row.get("strategy_version"),
  534. "validation_status": row.get("validation_status"),
  535. "error_code": row.get("error_code"),
  536. "started_at": row.get("started_at"),
  537. "completed_at": row.get("completed_at"),
  538. }
  539. def _matches(item: dict[str, Any], key: str, expected: str | None) -> bool:
  540. return not expected or item.get(key) == expected
  541. def _json_safe(value: Any) -> Any:
  542. if isinstance(value, dict):
  543. return {key: _json_safe(item) for key, item in value.items()}
  544. if isinstance(value, list):
  545. return [_json_safe(item) for item in value]
  546. if isinstance(value, (datetime, date)):
  547. return value.isoformat()
  548. return value
  549. def _business_summary(
  550. run_item: dict[str, Any],
  551. counts: dict[str, int],
  552. final_output: dict[str, Any],
  553. primary_failure_reason: dict[str, Any] | None,
  554. ) -> dict[str, Any]:
  555. summary = final_output.get("summary") or {}
  556. status = run_item.get("status") or "unknown"
  557. if primary_failure_reason:
  558. headline = f"本次运行停在{primary_failure_reason.get('stage_label')}:{primary_failure_reason.get('reason_label')}"
  559. elif status == "success":
  560. headline = "本次运行已完成,可查看内容判断和资产沉淀结果"
  561. elif status == "partial_success":
  562. headline = "本次运行部分成功,有 query 或平台请求失败"
  563. elif status == "running":
  564. headline = "本次运行仍在进行中"
  565. else:
  566. headline = "本次运行未形成完整成功链路"
  567. return {
  568. "headline": headline,
  569. "status": status,
  570. "source_label": _source_label(run_item),
  571. "query_count": counts.get("queries", 0),
  572. "content_count": counts.get("discovered_content_items", 0),
  573. "kept_count": int(summary.get("pooled_content_count") or 0),
  574. "review_count": int(summary.get("review_content_count") or 0),
  575. "rejected_count": int(summary.get("rejected_content_count") or 0),
  576. "asset_count": int(summary.get("author_asset_count") or 0),
  577. "primary_failure_reason": primary_failure_reason,
  578. }
  579. def _stage_conclusions(
  580. files: dict[str, bool],
  581. counts: dict[str, int],
  582. run_item: dict[str, Any],
  583. queries: list[dict[str, Any]],
  584. run_events: list[dict[str, Any]],
  585. decisions: list[dict[str, Any]],
  586. walk_actions: list[dict[str, Any]],
  587. final_output: dict[str, Any],
  588. strategy_review: dict[str, Any],
  589. source_context: dict[str, Any],
  590. ) -> list[dict[str, Any]]:
  591. query_failures = [
  592. event for event in run_events
  593. if event.get("event_type") == "platform_query_failed"
  594. or event.get("status") == "failed" and event.get("search_query_id")
  595. ]
  596. decision_counts = _effect_status_counts(decisions)
  597. walk_counts = _walk_status_counts(walk_actions)
  598. final_summary = final_output.get("summary") or {}
  599. return [
  600. {
  601. "stage_id": "source",
  602. "label": "数据源",
  603. "status": "success" if files.get("source_context.json") else "failed",
  604. "headline": "真实需求已读取" if files.get("source_context.json") else "需求来源缺失",
  605. "detail": _source_stage_detail(source_context)
  606. if files.get("source_context.json")
  607. else "未找到 source_context,无法解释输入来源",
  608. "metric": "已就绪" if files.get("pattern_seed_pack.json") else "种子缺失",
  609. },
  610. {
  611. "stage_id": "query",
  612. "label": "Query",
  613. "status": "success" if counts.get("queries") else "failed",
  614. "headline": f"生成 {counts.get('queries', 0)} 条搜索意图",
  615. "detail": f"{len(query_failures)} 条 query 有失败记录,其余进入平台搜索或后续链路",
  616. "metric": _query_generation_label(queries, run_events),
  617. },
  618. {
  619. "stage_id": "platform",
  620. "label": "平台 / 内容",
  621. "status": "success" if counts.get("discovered_content_items") else "failed",
  622. "headline": f"发现 {counts.get('discovered_content_items', 0)} 条内容",
  623. "detail": "平台结果已进入内容发现"
  624. if counts.get("discovered_content_items")
  625. else "未形成发现内容,通常需要先查看平台请求或 query 失败原因",
  626. "metric": f"{len(query_failures)} 条平台失败",
  627. },
  628. {
  629. "stage_id": "judge",
  630. "label": "判断",
  631. "status": "success" if counts.get("rule_decisions") else "pending",
  632. "headline": f"{counts.get('rule_decisions', 0)} 条内容完成判断",
  633. "detail": _decision_status_label(decision_counts),
  634. "metric": f"入池 {decision_counts.get('success', 0)} / 阻断 {decision_counts.get('rule_blocked', 0)}",
  635. },
  636. {
  637. "stage_id": "walk",
  638. "label": "游走",
  639. "status": "success" if counts.get("walk_actions") else "pending",
  640. "headline": f"触发 {counts.get('walk_actions', 0)} 个游走动作",
  641. "detail": _walk_status_label(walk_counts),
  642. "metric": f"成功 {walk_counts.get('success', 0)} / 失败 {walk_counts.get('failed', 0)}",
  643. },
  644. {
  645. "stage_id": "asset",
  646. "label": "资产",
  647. "status": "success" if files.get("final_output.json") else "pending",
  648. "headline": f"沉淀 {final_summary.get('pooled_content_count', 0) or 0} 条内容资产",
  649. "detail": f"待复看 {final_summary.get('review_content_count', 0) or 0},淘汰 {final_summary.get('rejected_content_count', 0) or 0}",
  650. "metric": f"作者资产 {final_summary.get('author_asset_count', 0) or 0}",
  651. },
  652. {
  653. "stage_id": "learning",
  654. "label": "学习",
  655. "status": "success" if strategy_review.get("review_status") == "generated" else "pending",
  656. "headline": "策略复盘已生成"
  657. if strategy_review.get("review_status") == "generated"
  658. else "策略复盘未生成",
  659. "detail": "可查看 query / rule / walk 的优化建议"
  660. if strategy_review.get("review_status") == "generated"
  661. else "当前 run 还没有可展示的策略学习建议",
  662. "metric": strategy_review.get("review_status", "not_generated"),
  663. },
  664. ]
  665. def _rule_application_summary(
  666. decisions: list[dict[str, Any]],
  667. content_items: list[dict[str, Any]],
  668. ) -> list[dict[str, Any]]:
  669. content_by_id = {
  670. item.get("content_discovery_id") or item.get("platform_content_id"): item
  671. for item in content_items
  672. }
  673. summaries = []
  674. for decision in decisions:
  675. replay = decision.get("decision_replay_data") or {}
  676. target_id = decision.get("decision_target_id")
  677. content = content_by_id.get(target_id) or {}
  678. triggered_rules = decision.get("triggered_blocking_rules") or []
  679. scorecard = decision.get("scorecard") or {}
  680. summaries.append({
  681. "decision_id": decision.get("decision_id"),
  682. "content_title": content.get("title") or decision.get("decision_target_id"),
  683. "platform_content_id": content.get("platform_content_id") or decision.get("decision_target_id"),
  684. "rule_pack": decision.get("rule_pack_id")
  685. or replay.get("rule_pack_id")
  686. or "Content Rule Pack V1",
  687. "hard_gate_status": "没通过" if triggered_rules else "通过",
  688. "score": decision.get("score") or scorecard.get("total_score"),
  689. "decision_action": decision.get("decision_action"),
  690. "decision_reason_code": decision.get("decision_reason_code"),
  691. # 与 _effect_status_counts 同口径: decision 正式字段是 search_query_effect_status,
  692. # content_effect_status 仅旧数据回退(decision 记录从无该字段时原代码恒读 None)。
  693. "content_effect_status": decision.get("search_query_effect_status")
  694. or decision.get("content_effect_status"),
  695. "primary_reason": _reason_label(decision.get("decision_reason_code")),
  696. "technical_ref": {
  697. "decision_id": decision.get("decision_id"),
  698. "target_id": decision.get("decision_target_id"),
  699. "has_replay_data": bool(replay),
  700. },
  701. })
  702. return summaries
  703. def _walk_graph(
  704. queries: list[dict[str, Any]],
  705. content_items: list[dict[str, Any]],
  706. walk_actions: list[dict[str, Any]],
  707. source_paths: list[dict[str, Any]],
  708. ) -> dict[str, Any]:
  709. nodes: dict[str, dict[str, Any]] = {
  710. "source": {
  711. "id": "source",
  712. "type": "source",
  713. "label": "需求",
  714. "status": "success",
  715. }
  716. }
  717. edges: list[dict[str, Any]] = []
  718. for query in queries[:12]:
  719. node_id = f"query:{query.get('search_query_id')}"
  720. nodes[node_id] = {
  721. "id": node_id,
  722. "type": "query",
  723. "label": query.get("search_query") or query.get("search_query_id"),
  724. "status": query.get("search_query_effect_status") or "success",
  725. }
  726. edges.append({
  727. "id": f"source->{node_id}",
  728. "source": "source",
  729. "target": node_id,
  730. "label": query.get("search_query_generation_method") or "query",
  731. "status": "success",
  732. "rule_pack": None,
  733. })
  734. for content in content_items[:20]:
  735. node_id = f"content:{content.get('platform_content_id') or content.get('content_discovery_id')}"
  736. query_id = content.get("search_query_id")
  737. source_id = f"query:{query_id}" if query_id else "source"
  738. nodes[node_id] = {
  739. "id": node_id,
  740. "type": "content",
  741. "label": content.get("title") or content.get("platform_content_id") or "内容",
  742. "status": (content.get("pattern_match_result") or {}).get("recall_status") or "pending",
  743. }
  744. edges.append({
  745. "id": f"{source_id}->{node_id}",
  746. "source": source_id if source_id in nodes else "source",
  747. "target": node_id,
  748. "label": "搜索命中",
  749. "status": "success",
  750. "rule_pack": "Content Rule Pack V1",
  751. })
  752. for action in walk_actions:
  753. source_id = _walk_node_id(action.get("from_node_type"), action.get("from_node_id"))
  754. target_id = _walk_node_id(action.get("to_node_type"), action.get("to_node_id"))
  755. nodes.setdefault(source_id, {
  756. "id": source_id,
  757. "type": action.get("from_node_type") or "node",
  758. "label": action.get("from_node_id") or "起点",
  759. "status": "success",
  760. })
  761. nodes.setdefault(target_id, {
  762. "id": target_id,
  763. "type": action.get("to_node_type") or "node",
  764. "label": action.get("to_node_id") or action.get("edge_type") or "下一跳",
  765. "status": action.get("walk_status") or "pending",
  766. })
  767. execution = (action.get("raw_payload") or {}).get("rule_pack_execution") or {}
  768. edges.append({
  769. "id": action.get("walk_action_id") or f"{source_id}->{target_id}",
  770. "source": source_id,
  771. "target": target_id,
  772. "label": action.get("edge_id") or action.get("edge_type") or action.get("walk_action"),
  773. "status": action.get("walk_status") or "pending",
  774. "rule_pack": action.get("rule_pack_id"),
  775. "rule_pack_executed": execution.get("executed"),
  776. "executed_rule_pack_id": execution.get("executed_rule_pack_id"),
  777. "budget_tier": action.get("budget_tier"),
  778. "reason_code": action.get("reason_code"),
  779. })
  780. return {
  781. "nodes": list(nodes.values()),
  782. "edges": edges,
  783. "source_path_count": len(source_paths),
  784. }
  785. def _primary_failure_reason(
  786. run_item: dict[str, Any],
  787. run_events: list[dict[str, Any]],
  788. ) -> dict[str, Any] | None:
  789. error_code = run_item.get("error_code")
  790. if not error_code and run_item.get("status") not in {"failed", "partial_success"}:
  791. return None
  792. failed_events = [
  793. event for event in run_events
  794. if event.get("status") == "failed" or event.get("error_code")
  795. ]
  796. event = failed_events[-1] if failed_events else {}
  797. code = error_code or event.get("error_code") or "UNKNOWN"
  798. return {
  799. "reason_code": code,
  800. "reason_label": _reason_label(code),
  801. "stage": event.get("stage") or _stage_from_error_code(code),
  802. "stage_label": _stage_label(event.get("stage") or _stage_from_error_code(code)),
  803. "message": event.get("message") or run_item.get("error_message") or code,
  804. }
  805. def _source_label(run_item: dict[str, Any]) -> str:
  806. demand_id = run_item.get("demand_content_id")
  807. if demand_id:
  808. return f"真实需求 #{demand_id}"
  809. return "真实需求池 / 默认样例"
  810. def _source_stage_detail(source_context: dict[str, Any]) -> str:
  811. demand_id = source_context.get("demand_content_id") or source_context.get("id")
  812. if demand_id:
  813. return f"需求池 ID:{demand_id}"
  814. raw_demand = source_context.get("raw_demand_content")
  815. if isinstance(raw_demand, dict) and raw_demand.get("id"):
  816. return f"需求池 ID:{raw_demand.get('id')}"
  817. return "需求池 ID:缺失"
  818. def _query_generation_label(queries: list[dict[str, Any]], run_events: list[dict[str, Any]]) -> str:
  819. generated_count = len(queries)
  820. generation_failed = any(
  821. event.get("error_code") == "QUERY_GENERATION_FAILED"
  822. or (event.get("event_type") == "query_generation_failed")
  823. for event in run_events
  824. )
  825. if generated_count and generation_failed:
  826. return f"{generated_count} 条生成成功 / 生成失败"
  827. if generated_count:
  828. return f"{generated_count} 条生成成功"
  829. if generation_failed:
  830. return "生成失败"
  831. return "0 条生成成功"
  832. DECODE_EVENT_TYPES = (
  833. "decode_submitted",
  834. "decode_polling",
  835. "decode_succeeded",
  836. "decode_failed",
  837. "decode_timeout",
  838. )
  839. def _timeline_summary(
  840. events: list[dict[str, Any]],
  841. walk_actions: list[dict[str, Any]],
  842. source_paths: list[dict[str, Any]],
  843. ) -> dict[str, Any]:
  844. stage_duration_ms: dict[str, int] = {}
  845. error_counts: dict[str, int] = {}
  846. decode_event_counts: dict[str, int] = {}
  847. platform_rate_limited_count = 0
  848. run_started_at: str | None = None
  849. run_ended_at: str | None = None
  850. for event in events:
  851. event_type = str(event.get("event_type") or "")
  852. event_id = str(event.get("event_id") or "")
  853. payload = event.get("raw_payload") or {}
  854. if event_type in {"stage_completed", "stage_failed"}:
  855. stage = payload.get("stage")
  856. duration = payload.get("duration_ms")
  857. if stage and isinstance(duration, (int, float)):
  858. stage_duration_ms[stage] = stage_duration_ms.get(stage, 0) + int(duration)
  859. error_code = event.get("error_code")
  860. if error_code:
  861. error_counts[str(error_code)] = error_counts.get(str(error_code), 0) + 1
  862. if error_code == "PLATFORM_RATE_LIMITED":
  863. platform_rate_limited_count += 1
  864. if event_type in DECODE_EVENT_TYPES:
  865. key = event_type.removeprefix("decode_")
  866. decode_event_counts[key] = decode_event_counts.get(key, 0) + 1
  867. if event_type == "run_started" or event_id.startswith("lifecycle_start"):
  868. run_started_at = run_started_at or event.get("created_at")
  869. if event_type in {"run_succeeded", "run_failed"} or event_id.startswith(
  870. ("lifecycle_success", "lifecycle_failed")
  871. ):
  872. run_ended_at = event.get("created_at")
  873. # 固定回退顺序: run started/completed 差值 -> stage 耗时求和 -> null,不估算。
  874. total_duration_ms: int | None = None
  875. if run_started_at and run_ended_at:
  876. try:
  877. total_duration_ms = int(
  878. (
  879. datetime.fromisoformat(str(run_ended_at))
  880. - datetime.fromisoformat(str(run_started_at))
  881. ).total_seconds()
  882. * 1000
  883. )
  884. except ValueError:
  885. total_duration_ms = None
  886. if total_duration_ms is None and stage_duration_ms:
  887. total_duration_ms = sum(stage_duration_ms.values())
  888. # 唯一来源: walk_actions 中 query 链动作(query_next_page/hashtag_to_query)且 walk_status=="failed";
  889. # 不读 run_events 失败事件,不求并集。首轮 keyword 搜索失败属 stage 失败,计入 error_counts。
  890. query_failure_count = sum(
  891. 1
  892. for action in walk_actions
  893. if action.get("edge_id") in {"query_next_page", "hashtag_to_query"}
  894. and action.get("walk_status") == "failed"
  895. )
  896. # V3 判定为 Gemini 直读,正常 run 无 decode 事件,此计数恒 {};仅当历史数据带 decode 事件时呈现。
  897. decode_status_counts = decode_event_counts
  898. return {
  899. "total_duration_ms": total_duration_ms,
  900. "stage_duration_ms": stage_duration_ms,
  901. "query_failure_count": query_failure_count,
  902. "platform_rate_limited_count": platform_rate_limited_count,
  903. "decode_status_counts": decode_status_counts,
  904. "error_counts": error_counts,
  905. "walk_status_counts": _walk_status_counts(walk_actions),
  906. }
  907. def _effect_status_counts(decisions: list[dict[str, Any]]) -> dict[str, int]:
  908. counts: dict[str, int] = {}
  909. for decision in decisions:
  910. status = str(
  911. decision.get("search_query_effect_status")
  912. or decision.get("content_effect_status")
  913. or "unknown"
  914. )
  915. counts[status] = counts.get(status, 0) + 1
  916. return counts
  917. def _walk_status_counts(walk_actions: list[dict[str, Any]]) -> dict[str, int]:
  918. counts: dict[str, int] = {}
  919. for action in walk_actions:
  920. status = str(action.get("walk_status") or "unknown")
  921. counts[status] = counts.get(status, 0) + 1
  922. return counts
  923. def _decision_status_label(counts: dict[str, int]) -> str:
  924. if not counts:
  925. return "当前没有内容进入规则判断"
  926. return ",".join(f"{_reason_label(key)} {value}" for key, value in counts.items())
  927. def _walk_status_label(counts: dict[str, int]) -> str:
  928. if not counts:
  929. return "当前没有触发游走动作"
  930. return ",".join(f"{_reason_label(key)} {value}" for key, value in counts.items())
  931. def _reason_label(code: Any) -> str:
  932. labels = {
  933. "success": "成功",
  934. "pending": "待复看",
  935. "failed": "失败",
  936. "rule_blocked": "规则阻断",
  937. "PLATFORM_REQUEST_FAILED": "平台请求失败",
  938. "QUERY_GENERATION_FAILED": "Query 生成失败",
  939. "INVALID_SOURCE": "需求来源无效",
  940. "missing_score": "分数缺失",
  941. "missing_source_evidence": "来源证据缺失",
  942. "high_risk_content": "高风险内容",
  943. "budget_exhausted": "预算耗尽",
  944. }
  945. value = str(code or "unknown")
  946. return labels.get(value, value)
  947. def _stage_from_error_code(code: Any) -> str:
  948. value = str(code or "")
  949. if "QUERY" in value:
  950. return "query"
  951. if "PLATFORM" in value:
  952. return "platform"
  953. if "SOURCE" in value:
  954. return "source"
  955. return "run"
  956. def _stage_label(stage: Any) -> str:
  957. labels = {
  958. "source": "数据源阶段",
  959. "query": "Query 阶段",
  960. "platform": "平台搜索阶段",
  961. "judge": "规则判断阶段",
  962. "walk": "游走阶段",
  963. "run": "运行阶段",
  964. }
  965. return labels.get(str(stage or "run"), str(stage or "运行阶段"))
  966. def _walk_node_id(node_type: Any, node_id: Any) -> str:
  967. return f"{node_type or 'node'}:{node_id or 'unknown'}"