| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- import json
- from content_agent.business_modules.run_record import recorder
- from content_agent.dashboard_service import DashboardService, _effect_status_counts, _timeline_summary
- from content_agent.run_service import RunService
- from content_agent.schemas import RunStartRequest
- from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE
- def _run(tmp_path, **request_overrides):
- service = RunService(
- runtime_root=tmp_path / "runtime" / "v1",
- query_variant_client=FakeQueryVariantClient(),
- )
- state = service.start_run(
- RunStartRequest(platform_mode="mock", source=str(REAL_SOURCE_FIXTURE), **request_overrides)
- )
- return service, state
- def _stage_events(service, run_id):
- return [
- row
- for row in service.read_jsonl(run_id, "run_events.jsonl")
- if str(row.get("event_type") or "").startswith("stage_")
- ]
- def test_stage_started_event_records_attempt(tmp_path):
- service, state = _run(tmp_path)
- events = _stage_events(service, state["run_id"])
- started = [row for row in events if row["event_type"] == "stage_started"]
- assert started
- first = started[0]
- assert first["event_id"] == "evt_stage_load_source_1_started"
- assert first["raw_payload"]["stage"] == "load_source"
- assert first["raw_payload"]["attempt"] == 1
- assert first["status"] == "running"
- def test_stage_completed_event_records_duration_and_attempt(tmp_path):
- service, state = _run(tmp_path)
- events = _stage_events(service, state["run_id"])
- completed = {row["raw_payload"]["stage"]: row for row in events if row["event_type"] == "stage_completed"}
- assert set(completed) >= {"load_source", "plan_queries", "evaluate_rules", "record_run"}
- row = completed["load_source"]
- assert isinstance(row["raw_payload"]["duration_ms"], int)
- assert row["raw_payload"]["duration_ms"] >= 0
- assert row["raw_payload"]["ended_at"]
- assert row["raw_payload"]["attempt"] == 1
- assert row["status"] == "success"
- def test_stage_failed_event_records_error_code_and_sanitized_message(tmp_path):
- service, state = _run(tmp_path, strategy_version="missing_strategy")
- assert state["status"] == "failed"
- events = _stage_events(service, state["run_id"])
- failed = [row for row in events if row["event_type"] == "stage_failed"]
- assert len(failed) == 1
- row = failed[0]
- assert row["raw_payload"]["stage"] == "load_policy"
- assert row["status"] == "failed"
- message = str(row["message"] or "")
- assert message
- assert "Traceback" not in message
- assert len(message) < 200
- def test_stage_event_id_is_stable():
- assert recorder._stage_event_id("search_platform", 1, "started") == (
- "evt_stage_search_platform_1_started"
- )
- assert recorder._stage_event_id("recall_pattern", 2, "failed") == (
- "evt_stage_recall_pattern_2_failed"
- )
- def test_run_events_append_to_existing_file_contract(tmp_path):
- service, state = _run(tmp_path)
- rows = service.read_jsonl(state["run_id"], "run_events.jsonl")
- assert rows
- for row in rows:
- for field in ("record_schema_version", "run_id", "policy_run_id", "event_id", "event_type", "created_at"):
- assert row.get(field) is not None, field
- event_ids = [row["event_id"] for row in rows]
- assert len(event_ids) == len(set(event_ids))
- def test_timeline_summary_counts_query_failures():
- walk_actions = [
- {"edge_id": "query_next_page", "walk_status": "failed"},
- {"edge_id": "hashtag_to_query", "walk_status": "failed"},
- {"edge_id": "author_to_works", "walk_status": "failed"},
- {"edge_id": "query_next_page", "walk_status": "success"},
- ]
- summary = _timeline_summary([], walk_actions, [])
- assert summary["query_failure_count"] == 2
- def test_timeline_summary_counts_platform_rate_limited():
- events = [
- {"event_type": "platform_query_failed", "error_code": "PLATFORM_RATE_LIMITED"},
- {"event_type": "platform_query_failed", "error_code": "PLATFORM_RATE_LIMITED"},
- {"event_type": "platform_query_failed", "error_code": "PLATFORM_REQUEST_FAILED"},
- ]
- summary = _timeline_summary(events, [], [])
- assert summary["platform_rate_limited_count"] == 2
- assert summary["error_counts"] == {
- "PLATFORM_RATE_LIMITED": 2,
- "PLATFORM_REQUEST_FAILED": 1,
- }
- def test_timeline_summary_counts_decode_statuses():
- events = [
- {"event_type": "decode_submitted"},
- {"event_type": "decode_polling"},
- {"event_type": "decode_polling"},
- {"event_type": "decode_succeeded"},
- ]
- summary = _timeline_summary(events, [], [])
- # 历史数据带 decode 事件时按事件计数呈现。
- assert summary["decode_status_counts"] == {"submitted": 1, "polling": 2, "succeeded": 1}
- # V3 清理: recalls 终态回退已删,无 decode 事件时计数恒为空。
- no_decode = _timeline_summary([], [], [])
- assert no_decode["decode_status_counts"] == {}
- def test_timeline_summary_counts_walk_statuses():
- walk_actions = [
- {"edge_id": "budget_downgrade", "walk_status": "success"},
- {"edge_id": "hashtag_to_query", "walk_status": "skipped"},
- {"edge_id": "hashtag_to_query", "walk_status": "skipped"},
- ]
- summary = _timeline_summary([], walk_actions, [])
- assert summary["walk_status_counts"] == {"success": 1, "skipped": 2}
- def test_timeline_does_not_emit_stall_or_blocked_judgement(tmp_path):
- service, state = _run(tmp_path)
- dashboard = DashboardService(service.runtime)
- result = dashboard.timeline(state["run_id"])
- assert set(result["summary"]) == {
- "total_duration_ms",
- "stage_duration_ms",
- "query_failure_count",
- "platform_rate_limited_count",
- "decode_status_counts",
- "error_counts",
- "walk_status_counts",
- }
- dumped = json.dumps(result, ensure_ascii=False, default=str)
- assert "stalled" not in dumped
- assert "is_blocked" not in dumped
- def test_effect_status_summary_reads_search_query_effect_status():
- counts = _effect_status_counts(
- [
- {"search_query_effect_status": "pending"},
- {"search_query_effect_status": "pending", "content_effect_status": "failed"},
- {"content_effect_status": "rule_blocked"},
- ]
- )
- assert counts == {"pending": 2, "rule_blocked": 1}
- def test_timeline_response_keeps_items_total_data_origin(tmp_path):
- service, state = _run(tmp_path)
- dashboard = DashboardService(service.runtime)
- result = dashboard.timeline(state["run_id"])
- assert result["total"] == len(result["items"]) > 0
- assert result["data_origin"]
- # 本地 runtime 无 lifecycle 行,total_duration_ms 回退为 stage 耗时求和。
- assert result["summary"]["total_duration_ms"] == sum(
- result["summary"]["stage_duration_ms"].values()
- )
- assert result["summary"]["stage_duration_ms"]
|