import json from content_agent.business_modules.run_record import recorder from content_agent.dashboard_service import DashboardService, _effect_status_counts, _timeline_summary from content_agent.run_service import RunService from content_agent.schemas import RunStartRequest from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE def _run(tmp_path, **request_overrides): service = RunService( runtime_root=tmp_path / "runtime" / "v1", query_variant_client=FakeQueryVariantClient(), ) state = service.start_run( RunStartRequest(platform_mode="mock", source=str(REAL_SOURCE_FIXTURE), **request_overrides) ) return service, state def _stage_events(service, run_id): return [ row for row in service.read_jsonl(run_id, "run_events.jsonl") if str(row.get("event_type") or "").startswith("stage_") ] def test_stage_started_event_records_attempt(tmp_path): service, state = _run(tmp_path) events = _stage_events(service, state["run_id"]) started = [row for row in events if row["event_type"] == "stage_started"] assert started first = started[0] assert first["event_id"] == "evt_stage_load_source_1_started" assert first["raw_payload"]["stage"] == "load_source" assert first["raw_payload"]["attempt"] == 1 assert first["status"] == "running" def test_stage_completed_event_records_duration_and_attempt(tmp_path): service, state = _run(tmp_path) events = _stage_events(service, state["run_id"]) completed = {row["raw_payload"]["stage"]: row for row in events if row["event_type"] == "stage_completed"} assert set(completed) >= {"load_source", "plan_queries", "evaluate_rules", "record_run"} row = completed["load_source"] assert isinstance(row["raw_payload"]["duration_ms"], int) assert row["raw_payload"]["duration_ms"] >= 0 assert row["raw_payload"]["ended_at"] assert row["raw_payload"]["attempt"] == 1 assert row["status"] == "success" def test_stage_failed_event_records_error_code_and_sanitized_message(tmp_path): service, state = _run(tmp_path, strategy_version="missing_strategy") assert state["status"] == "failed" events = _stage_events(service, state["run_id"]) failed = [row for row in events if row["event_type"] == "stage_failed"] assert len(failed) == 1 row = failed[0] assert row["raw_payload"]["stage"] == "load_policy" assert row["status"] == "failed" message = str(row["message"] or "") assert message assert "Traceback" not in message assert len(message) < 200 def test_stage_event_id_is_stable(): assert recorder._stage_event_id("search_platform", 1, "started") == ( "evt_stage_search_platform_1_started" ) assert recorder._stage_event_id("recall_pattern", 2, "failed") == ( "evt_stage_recall_pattern_2_failed" ) def test_run_events_append_to_existing_file_contract(tmp_path): service, state = _run(tmp_path) rows = service.read_jsonl(state["run_id"], "run_events.jsonl") assert rows for row in rows: for field in ("record_schema_version", "run_id", "policy_run_id", "event_id", "event_type", "created_at"): assert row.get(field) is not None, field event_ids = [row["event_id"] for row in rows] assert len(event_ids) == len(set(event_ids)) def test_timeline_summary_counts_query_failures(): walk_actions = [ {"edge_id": "query_next_page", "walk_status": "failed"}, {"edge_id": "hashtag_to_query", "walk_status": "failed"}, {"edge_id": "author_to_works", "walk_status": "failed"}, {"edge_id": "query_next_page", "walk_status": "success"}, ] summary = _timeline_summary([], walk_actions, []) assert summary["query_failure_count"] == 2 def test_timeline_summary_counts_platform_rate_limited(): events = [ {"event_type": "platform_query_failed", "error_code": "PLATFORM_RATE_LIMITED"}, {"event_type": "platform_query_failed", "error_code": "PLATFORM_RATE_LIMITED"}, {"event_type": "platform_query_failed", "error_code": "PLATFORM_REQUEST_FAILED"}, ] summary = _timeline_summary(events, [], []) assert summary["platform_rate_limited_count"] == 2 assert summary["error_counts"] == { "PLATFORM_RATE_LIMITED": 2, "PLATFORM_REQUEST_FAILED": 1, } def test_timeline_summary_counts_decode_statuses(): events = [ {"event_type": "decode_submitted"}, {"event_type": "decode_polling"}, {"event_type": "decode_polling"}, {"event_type": "decode_succeeded"}, ] summary = _timeline_summary(events, [], []) # 历史数据带 decode 事件时按事件计数呈现。 assert summary["decode_status_counts"] == {"submitted": 1, "polling": 2, "succeeded": 1} # V3 清理: recalls 终态回退已删,无 decode 事件时计数恒为空。 no_decode = _timeline_summary([], [], []) assert no_decode["decode_status_counts"] == {} def test_timeline_summary_counts_walk_statuses(): walk_actions = [ {"edge_id": "budget_downgrade", "walk_status": "success"}, {"edge_id": "hashtag_to_query", "walk_status": "skipped"}, {"edge_id": "hashtag_to_query", "walk_status": "skipped"}, ] summary = _timeline_summary([], walk_actions, []) assert summary["walk_status_counts"] == {"success": 1, "skipped": 2} def test_timeline_does_not_emit_stall_or_blocked_judgement(tmp_path): service, state = _run(tmp_path) dashboard = DashboardService(service.runtime) result = dashboard.timeline(state["run_id"]) assert set(result["summary"]) == { "total_duration_ms", "stage_duration_ms", "query_failure_count", "platform_rate_limited_count", "decode_status_counts", "error_counts", "walk_status_counts", } dumped = json.dumps(result, ensure_ascii=False, default=str) assert "stalled" not in dumped assert "is_blocked" not in dumped def test_effect_status_summary_reads_search_query_effect_status(): counts = _effect_status_counts( [ {"search_query_effect_status": "pending"}, {"search_query_effect_status": "pending", "content_effect_status": "failed"}, {"content_effect_status": "rule_blocked"}, ] ) assert counts == {"pending": 2, "rule_blocked": 1} def test_timeline_response_keeps_items_total_data_origin(tmp_path): service, state = _run(tmp_path) dashboard = DashboardService(service.runtime) result = dashboard.timeline(state["run_id"]) assert result["total"] == len(result["items"]) > 0 assert result["data_origin"] # 本地 runtime 无 lifecycle 行,total_duration_ms 回退为 stage 耗时求和。 assert result["summary"]["total_duration_ms"] == sum( result["summary"]["stage_duration_ms"].values() ) assert result["summary"]["stage_duration_ms"]