import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { useWebSocket } from "../../../hooks/useWebSocket"; import { request } from "../../../api/client"; import type { Goal } from "../../../types/goal"; import type { Message } from "../../../types/message"; // WebSocket 数据解析与状态聚合(goals + msgGroups) const isRecord = (value: unknown): value is Record => !!value && typeof value === "object" && !Array.isArray(value); const isGoalLike = (value: unknown): value is Partial & { id: string } => isRecord(value) && typeof value.id === "string"; const isMessage = (value: unknown): value is Message => isRecord(value) && (typeof value.id === "string" || typeof (value as { message_id?: string }).message_id === "string"); const buildSubGoals = (flatGoals: Goal[]): Goal[] => { const nodeMap = new Map(); flatGoals.forEach((goal) => { nodeMap.set(goal.id, { ...goal, sub_goals: [] }); }); flatGoals.forEach((goal) => { const parentId = typeof goal.parent_id === "string" && goal.parent_id ? goal.parent_id : undefined; if (!parentId) return; const parent = nodeMap.get(parentId); const child = nodeMap.get(goal.id); if (!parent || !child) return; if (!Array.isArray(parent.sub_goals)) parent.sub_goals = []; parent.sub_goals.push(child); }); return flatGoals.map((goal) => { const node = nodeMap.get(goal.id); if (!node) return goal; if (Array.isArray(node.sub_goals) && node.sub_goals.length === 0) { delete node.sub_goals; } return node; }); }; import { processRetryLogic } from "../utils/retryLogic"; // FlowChart 专用数据 Hook:处理实时事件并聚合消息组 export const useFlowChartData = (traceId: string | null, refreshTrigger?: number) => { const [goals, setGoals] = useState([]); const [messages, setMessages] = useState([]); const [msgGroups, setMsgGroups] = useState>({}); const [sinceEventId, setSinceEventId] = useState(0); const [readyToConnect, setReadyToConnect] = useState(false); const currentEventIdRef = useRef(0); const maxSequenceRef = useRef(0); const restReloadingRef = useRef(false); const [reloading, setReloading] = useState(false); const [invalidBranches, setInvalidBranches] = useState([]); const [traceCompleted, setTraceCompleted] = useState(false); const messageComparator = useCallback((a: Message, b: Message): number => { // eslint-disable-next-line @typescript-eslint/no-explicit-any const seqA = typeof (a as any).sequence === "number" ? (a as any).sequence : 0; // eslint-disable-next-line @typescript-eslint/no-explicit-any const seqB = typeof (b as any).sequence === "number" ? (b as any).sequence : 0; return seqA - seqB; }, []); const updateMessageGroups = useCallback( (message: Message) => { const groupKey = typeof message.goal_id === "string" && message.goal_id ? message.goal_id : "START"; if (groupKey === "START") { setGoals((prev) => { if (prev.some((g) => g.id === "START")) return prev; const startGoal: Goal = { id: "START", description: "START", status: "completed", created_at: "", }; return [startGoal, ...prev]; }); } setMsgGroups((prev) => { const existing = prev[groupKey] ? [...prev[groupKey]] : []; existing.push(message); existing.sort(messageComparator); return { ...prev, [groupKey]: existing }; }); }, [messageComparator], ); useEffect(() => { setGoals([]); setMessages([]); setMsgGroups({}); setSinceEventId(0); setReadyToConnect(false); currentEventIdRef.current = 0; maxSequenceRef.current = 0; restReloadingRef.current = false; setTraceCompleted(false); }, [traceId]); const reloadViaRest = useCallback(async () => { if (!traceId) return; if (restReloadingRef.current) return; restReloadingRef.current = true; setReloading(true); let nextSinceEventId: number | null = null; try { const [traceJson, messagesJson] = await Promise.all([ request(`/api/traces/${traceId}`), request(`/api/traces/${traceId}/messages?mode=all`), ]); const traceRoot = isRecord(traceJson) ? traceJson : {}; const trace = isRecord(traceRoot.trace) ? traceRoot.trace : undefined; const goalTree = isRecord(traceRoot.goal_tree) ? traceRoot.goal_tree : undefined; const goalList = goalTree && Array.isArray(goalTree.goals) ? (goalTree.goals as Goal[]) : []; const lastEventId = trace && typeof trace.last_event_id === "number" ? trace.last_event_id : undefined; if (typeof lastEventId === "number") { currentEventIdRef.current = Math.max(currentEventIdRef.current, lastEventId); setSinceEventId(lastEventId); nextSinceEventId = lastEventId; } if (goalList.length > 0) { setGoals((prev) => { const mergedFlat = goalList.map((ng) => { const existing = prev.find((p) => p.id === ng.id); if (!existing) return ng; const merged: Goal = { ...existing, ...ng }; if (existing.sub_trace_ids && !merged.sub_trace_ids) { merged.sub_trace_ids = existing.sub_trace_ids; } if (existing.agent_call_mode && !merged.agent_call_mode) { merged.agent_call_mode = existing.agent_call_mode; } if (existing.knowledge && !merged.knowledge) { merged.knowledge = existing.knowledge; } return merged; }); return buildSubGoals(mergedFlat); }); } const messagesRoot = isRecord(messagesJson) ? messagesJson : {}; const list = Array.isArray(messagesRoot.messages) ? (messagesRoot.messages as Message[]) : []; console.log("%c [ list ]-149", "font-size:13px; background:pink; color:#bf2c9f;", list); const filtered = list.filter((message) => (message as { status?: string }).status !== "abandoned"); const nextMessages = [...filtered].sort(messageComparator); const { availableData: finalMessages, invalidBranches: invalidBranchesTemp } = processRetryLogic(nextMessages); const maxSeq = finalMessages.reduce((max, msg) => { const seq = typeof msg.sequence === "number" ? msg.sequence : -1; return Math.max(max, seq); }, 0); maxSequenceRef.current = maxSeq; setMessages(finalMessages); setInvalidBranches(invalidBranchesTemp); const grouped: Record = {}; finalMessages.forEach((message) => { const groupKey = typeof message.goal_id === "string" && message.goal_id ? message.goal_id : "START"; if (!grouped[groupKey]) grouped[groupKey] = []; grouped[groupKey].push(message); }); Object.keys(grouped).forEach((key) => { grouped[key].sort(messageComparator); }); setMsgGroups(grouped); if (grouped.START && grouped.START.length > 0) { setGoals((prev) => { if (prev.some((g) => g.id === "START")) return prev; const startGoal: Goal = { id: "START", description: "START", status: "completed", created_at: "", }; return [startGoal, ...prev]; }); } // REST 请求完成后,允许建立 WebSocket 连接 setReadyToConnect(true); } finally { restReloadingRef.current = false; setReloading(false); setReadyToConnect(true); } return nextSinceEventId; }, [messageComparator, traceId]); const prevTraceIdRef = useRef(null); const prevRefreshTriggerRef = useRef(undefined); useEffect(() => { // 确保 traceId 存在 if (!traceId) { prevTraceIdRef.current = null; return; } // 检查是否发生了变化 const traceChanged = traceId !== prevTraceIdRef.current; const refreshTriggerChanged = refreshTrigger !== prevRefreshTriggerRef.current; // 只有当 traceId 真正变化,或者 refreshTrigger 真正变化时,才执行加载 if (traceChanged || (typeof refreshTrigger === "number" && refreshTrigger > 0 && refreshTriggerChanged)) { prevTraceIdRef.current = traceId; prevRefreshTriggerRef.current = refreshTrigger; // 注意:traceId 变化时,另外一个清理 useEffect 也会执行,这里只负责触发加载 // 这里不直接调用,而是通过一个 setTimeout 0 来确保清理操作已经完成 // 但实际上清理操作是在副作用执行前发生的(对于同一组件) void reloadViaRest(); } }, [traceId, refreshTrigger, reloadViaRest]); // 添加 reloadViaRest 到依赖列表,但由于我们用了 ref 来控制,所以不会因为它的变化而重复执行 const handleWebSocketMessage = useCallback( (payload: unknown) => { const raw = isRecord(payload) ? payload : {}; const event = (typeof raw.event === "string" && raw.event) || (typeof raw.type === "string" && raw.type) || ""; const data = isRecord(raw.data) ? raw.data : raw; console.log("%c [ data ]-182", "font-size:13px; background:pink; color:#bf2c9f;", data); const eventId = typeof raw.event_id === "number" ? raw.event_id : undefined; if (typeof eventId === "number") { currentEventIdRef.current = Math.max(currentEventIdRef.current, eventId); } if (event === "error") { const message = (typeof data.message === "string" ? data.message : undefined) || (typeof raw.message === "string" ? raw.message : undefined) || ""; if (message.includes("Too many missed events")) { void reloadViaRest().then((nextSince) => { if (typeof nextSince === "number") return; const fallbackSince = currentEventIdRef.current; if (fallbackSince > 0) setSinceEventId(fallbackSince); }); } return; } if (event === "connected") { const currentEventId = (typeof data.current_event_id === "number" ? data.current_event_id : undefined) || (typeof raw.current_event_id === "number" ? raw.current_event_id : undefined); if (typeof currentEventId === "number") { currentEventIdRef.current = Math.max(currentEventIdRef.current, currentEventId); setSinceEventId(currentEventId); } const goalTree = isRecord(data.goal_tree) ? data.goal_tree : isRecord(raw.goal_tree) ? raw.goal_tree : undefined; if (goalTree && Array.isArray(goalTree.goals)) { setGoals((prev) => { if (prev.length > 0) return prev; return buildSubGoals(goalTree.goals as Goal[]); }); } return; } if (event === "pong") { return; } if (event === "rewind") { console.log("Processing rewind event:", data); const afterSequence = (typeof data.after_sequence === "number" ? data.after_sequence : undefined) || (typeof raw.after_sequence === "number" ? raw.after_sequence : undefined); if (typeof afterSequence === "number") { maxSequenceRef.current = afterSequence; setMessages((prev) => prev.filter((msg) => (typeof msg.sequence === "number" ? msg.sequence : -1) <= afterSequence), ); setMsgGroups((prev) => { const next: Record = {}; Object.entries(prev).forEach(([k, v]) => { const filtered = v.filter( (msg) => (typeof msg.sequence === "number" ? msg.sequence : -1) <= afterSequence, ); if (filtered.length > 0) next[k] = filtered; }); return next; }); // 如果有 goal_tree_snapshot,直接更新 Goals const snapshot = isRecord(data.goal_tree_snapshot) ? data.goal_tree_snapshot : undefined; if (snapshot && Array.isArray(snapshot.goals)) { setGoals(buildSubGoals(snapshot.goals as Goal[])); } else { // 否则触发重载 void reloadViaRest(); } } return; } if (event === "goal_added") { const goal = isGoalLike(data.goal) ? data.goal : isGoalLike(raw.goal) ? raw.goal : null; if (!goal) return; setGoals((prev: Goal[]) => { const next = [...prev]; const idx = next.findIndex((g) => g.id === goal.id); if (idx >= 0) { const existing = next[idx]; const merged = { ...existing, ...goal }; // 保留 sub_trace_ids,如果 WebSocket 数据中缺失但本地已有 if (existing.sub_trace_ids && !merged.sub_trace_ids) { merged.sub_trace_ids = existing.sub_trace_ids; } if (existing.agent_call_mode && !merged.agent_call_mode) { merged.agent_call_mode = existing.agent_call_mode; } next[idx] = merged; return buildSubGoals(next); } next.push(goal as Goal); return buildSubGoals(next); }); return; } if (event === "goal_updated") { const goalId = (typeof data.goal_id === "string" ? data.goal_id : undefined) || (isRecord(data.goal) && typeof data.goal.id === "string" ? data.goal.id : undefined) || (typeof raw.goal_id === "string" ? raw.goal_id : undefined); const updates = (isRecord(data.updates) ? data.updates : undefined) || (isRecord(raw.updates) ? raw.updates : undefined) || (isRecord(data.patch) ? data.patch : undefined) || (isRecord(raw.patch) ? raw.patch : undefined) || {}; if (!goalId) return; setGoals((prev: Goal[]) => prev.map((g: Goal) => { if (g.id !== goalId) return g; const next: Goal = { ...g }; if ("status" in updates) { const status = updates.status; if (typeof status === "string") { next.status = status as Goal["status"]; } } if ("summary" in updates) { const summary = updates.summary; if (typeof summary === "string") { next.summary = summary; } } return next; }), ); return; } if (event === "trace_completed") { setTraceCompleted(true); return; } if (event === "message_added") { const message = isMessage(data.message) ? data.message : isMessage(raw.message) ? raw.message : null; if (message) { // Check sequence continuity const seq = typeof message.sequence === "number" ? message.sequence : -1; if (seq > 0) { if (maxSequenceRef.current > 0 && seq > maxSequenceRef.current + 1) { console.warn( `Message sequence gap detected: current max ${maxSequenceRef.current}, received ${seq}. Triggering reload.`, ); void reloadViaRest(); return; } maxSequenceRef.current = Math.max(maxSequenceRef.current, seq); } setMessages((prev) => { const next = [...prev, message]; next.sort(messageComparator); return next; }); updateMessageGroups(message); } } }, [messageComparator, reloadViaRest, updateMessageGroups], ); // 主 Trace 连接 const wsOptions = useMemo( () => ({ onMessage: handleWebSocketMessage, sinceEventId }), [handleWebSocketMessage, sinceEventId], ); // 只有当 traceId 存在且 REST 加载完成 (readyToConnect) 后才连接 WebSocket const { connected } = useWebSocket(readyToConnect ? traceId : null, wsOptions); return { goals, messages, msgGroups, connected, reloading, invalidBranches, traceCompleted }; };