test_run_timeline_observability.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. import json
  2. from content_agent.business_modules.run_record import recorder
  3. from content_agent.dashboard_service import DashboardService, _effect_status_counts, _timeline_summary
  4. from content_agent.run_service import RunService
  5. from content_agent.schemas import RunStartRequest
  6. from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE
  7. def _run(tmp_path, **request_overrides):
  8. service = RunService(
  9. runtime_root=tmp_path / "runtime" / "v1",
  10. query_variant_client=FakeQueryVariantClient(),
  11. )
  12. state = service.start_run(
  13. RunStartRequest(platform_mode="mock", source=str(REAL_SOURCE_FIXTURE), **request_overrides)
  14. )
  15. return service, state
  16. def _stage_events(service, run_id):
  17. return [
  18. row
  19. for row in service.read_jsonl(run_id, "run_events.jsonl")
  20. if str(row.get("event_type") or "").startswith("stage_")
  21. ]
  22. def test_stage_started_event_records_attempt(tmp_path):
  23. service, state = _run(tmp_path)
  24. events = _stage_events(service, state["run_id"])
  25. started = [row for row in events if row["event_type"] == "stage_started"]
  26. assert started
  27. first = started[0]
  28. assert first["event_id"] == "evt_stage_load_source_1_started"
  29. assert first["raw_payload"]["stage"] == "load_source"
  30. assert first["raw_payload"]["attempt"] == 1
  31. assert first["status"] == "running"
  32. def test_stage_completed_event_records_duration_and_attempt(tmp_path):
  33. service, state = _run(tmp_path)
  34. events = _stage_events(service, state["run_id"])
  35. completed = {row["raw_payload"]["stage"]: row for row in events if row["event_type"] == "stage_completed"}
  36. assert set(completed) >= {"load_source", "plan_queries", "evaluate_rules", "record_run"}
  37. row = completed["load_source"]
  38. assert isinstance(row["raw_payload"]["duration_ms"], int)
  39. assert row["raw_payload"]["duration_ms"] >= 0
  40. assert row["raw_payload"]["ended_at"]
  41. assert row["raw_payload"]["attempt"] == 1
  42. assert row["status"] == "success"
  43. def test_stage_failed_event_records_error_code_and_sanitized_message(tmp_path):
  44. service, state = _run(tmp_path, strategy_version="missing_strategy")
  45. assert state["status"] == "failed"
  46. events = _stage_events(service, state["run_id"])
  47. failed = [row for row in events if row["event_type"] == "stage_failed"]
  48. assert len(failed) == 1
  49. row = failed[0]
  50. assert row["raw_payload"]["stage"] == "load_policy"
  51. assert row["status"] == "failed"
  52. message = str(row["message"] or "")
  53. assert message
  54. assert "Traceback" not in message
  55. assert len(message) < 200
  56. def test_stage_event_id_is_stable():
  57. assert recorder._stage_event_id("search_platform", 1, "started") == (
  58. "evt_stage_search_platform_1_started"
  59. )
  60. assert recorder._stage_event_id("recall_pattern", 2, "failed") == (
  61. "evt_stage_recall_pattern_2_failed"
  62. )
  63. def test_run_events_append_to_existing_file_contract(tmp_path):
  64. service, state = _run(tmp_path)
  65. rows = service.read_jsonl(state["run_id"], "run_events.jsonl")
  66. assert rows
  67. for row in rows:
  68. for field in ("record_schema_version", "run_id", "policy_run_id", "event_id", "event_type", "created_at"):
  69. assert row.get(field) is not None, field
  70. event_ids = [row["event_id"] for row in rows]
  71. assert len(event_ids) == len(set(event_ids))
  72. def test_timeline_summary_counts_query_failures():
  73. walk_actions = [
  74. {"edge_id": "query_next_page", "walk_status": "failed"},
  75. {"edge_id": "hashtag_to_query", "walk_status": "failed"},
  76. {"edge_id": "author_to_works", "walk_status": "failed"},
  77. {"edge_id": "query_next_page", "walk_status": "success"},
  78. ]
  79. summary = _timeline_summary([], walk_actions, [])
  80. assert summary["query_failure_count"] == 2
  81. def test_timeline_summary_counts_platform_rate_limited():
  82. events = [
  83. {"event_type": "platform_query_failed", "error_code": "PLATFORM_RATE_LIMITED"},
  84. {"event_type": "platform_query_failed", "error_code": "PLATFORM_RATE_LIMITED"},
  85. {"event_type": "platform_query_failed", "error_code": "PLATFORM_REQUEST_FAILED"},
  86. ]
  87. summary = _timeline_summary(events, [], [])
  88. assert summary["platform_rate_limited_count"] == 2
  89. assert summary["error_counts"] == {
  90. "PLATFORM_RATE_LIMITED": 2,
  91. "PLATFORM_REQUEST_FAILED": 1,
  92. }
  93. def test_timeline_summary_counts_decode_statuses():
  94. events = [
  95. {"event_type": "decode_submitted"},
  96. {"event_type": "decode_polling"},
  97. {"event_type": "decode_polling"},
  98. {"event_type": "decode_succeeded"},
  99. ]
  100. summary = _timeline_summary(events, [], [])
  101. # 历史数据带 decode 事件时按事件计数呈现。
  102. assert summary["decode_status_counts"] == {"submitted": 1, "polling": 2, "succeeded": 1}
  103. # V3 清理: recalls 终态回退已删,无 decode 事件时计数恒为空。
  104. no_decode = _timeline_summary([], [], [])
  105. assert no_decode["decode_status_counts"] == {}
  106. def test_timeline_summary_counts_walk_statuses():
  107. walk_actions = [
  108. {"edge_id": "budget_downgrade", "walk_status": "success"},
  109. {"edge_id": "hashtag_to_query", "walk_status": "skipped"},
  110. {"edge_id": "hashtag_to_query", "walk_status": "skipped"},
  111. ]
  112. summary = _timeline_summary([], walk_actions, [])
  113. assert summary["walk_status_counts"] == {"success": 1, "skipped": 2}
  114. def test_timeline_does_not_emit_stall_or_blocked_judgement(tmp_path):
  115. service, state = _run(tmp_path)
  116. dashboard = DashboardService(service.runtime)
  117. result = dashboard.timeline(state["run_id"])
  118. assert set(result["summary"]) == {
  119. "total_duration_ms",
  120. "stage_duration_ms",
  121. "query_failure_count",
  122. "platform_rate_limited_count",
  123. "decode_status_counts",
  124. "error_counts",
  125. "walk_status_counts",
  126. }
  127. dumped = json.dumps(result, ensure_ascii=False, default=str)
  128. assert "stalled" not in dumped
  129. assert "is_blocked" not in dumped
  130. def test_effect_status_summary_reads_search_query_effect_status():
  131. counts = _effect_status_counts(
  132. [
  133. {"search_query_effect_status": "pending"},
  134. {"search_query_effect_status": "pending", "content_effect_status": "failed"},
  135. {"content_effect_status": "rule_blocked"},
  136. ]
  137. )
  138. assert counts == {"pending": 2, "rule_blocked": 1}
  139. def test_timeline_response_keeps_items_total_data_origin(tmp_path):
  140. service, state = _run(tmp_path)
  141. dashboard = DashboardService(service.runtime)
  142. result = dashboard.timeline(state["run_id"])
  143. assert result["total"] == len(result["items"]) > 0
  144. assert result["data_origin"]
  145. # 本地 runtime 无 lifecycle 行,total_duration_ms 回退为 stage 耗时求和。
  146. assert result["summary"]["total_duration_ms"] == sum(
  147. result["summary"]["stage_duration_ms"].values()
  148. )
  149. assert result["summary"]["stage_duration_ms"]