| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418 |
- import { useCallback, useEffect, useMemo, useRef, useState } from "react";
- import { useWebSocket } from "../../../hooks/useWebSocket";
- import { request } from "../../../api/client";
- import type { Goal } from "../../../types/goal";
- import type { Message } from "../../../types/message";
- // WebSocket 数据解析与状态聚合(goals + msgGroups)
- const isRecord = (value: unknown): value is Record<string, unknown> =>
- !!value && typeof value === "object" && !Array.isArray(value);
- const isGoalLike = (value: unknown): value is Partial<Goal> & { id: string } =>
- isRecord(value) && typeof value.id === "string";
- const isMessage = (value: unknown): value is Message =>
- isRecord(value) &&
- (typeof value.id === "string" || typeof (value as { message_id?: string }).message_id === "string");
- const buildSubGoals = (flatGoals: Goal[]): Goal[] => {
- const nodeMap = new Map<string, Goal>();
- flatGoals.forEach((goal) => {
- nodeMap.set(goal.id, { ...goal, sub_goals: [] });
- });
- flatGoals.forEach((goal) => {
- const parentId = typeof goal.parent_id === "string" && goal.parent_id ? goal.parent_id : undefined;
- if (!parentId) return;
- const parent = nodeMap.get(parentId);
- const child = nodeMap.get(goal.id);
- if (!parent || !child) return;
- if (!Array.isArray(parent.sub_goals)) parent.sub_goals = [];
- parent.sub_goals.push(child);
- });
- return flatGoals.map((goal) => {
- const node = nodeMap.get(goal.id);
- if (!node) return goal;
- if (Array.isArray(node.sub_goals) && node.sub_goals.length === 0) {
- delete node.sub_goals;
- }
- return node;
- });
- };
- import { processRetryLogic } from "../utils/retryLogic";
- // FlowChart 专用数据 Hook:处理实时事件并聚合消息组
- export const useFlowChartData = (traceId: string | null, refreshTrigger?: number) => {
- const [goals, setGoals] = useState<Goal[]>([]);
- const [messages, setMessages] = useState<Message[]>([]);
- const [msgGroups, setMsgGroups] = useState<Record<string, Message[]>>({});
- const [sinceEventId, setSinceEventId] = useState(0);
- const [readyToConnect, setReadyToConnect] = useState(false);
- const currentEventIdRef = useRef(0);
- const maxSequenceRef = useRef(0);
- const restReloadingRef = useRef(false);
- const [reloading, setReloading] = useState(false);
- const [invalidBranches, setInvalidBranches] = useState<Message[][]>([]);
- const [traceCompleted, setTraceCompleted] = useState(false);
- const messageComparator = useCallback((a: Message, b: Message): number => {
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- const seqA = typeof (a as any).sequence === "number" ? (a as any).sequence : 0;
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- const seqB = typeof (b as any).sequence === "number" ? (b as any).sequence : 0;
- return seqA - seqB;
- }, []);
- const updateMessageGroups = useCallback(
- (message: Message) => {
- const groupKey = typeof message.goal_id === "string" && message.goal_id ? message.goal_id : "START";
- if (groupKey === "START") {
- setGoals((prev) => {
- if (prev.some((g) => g.id === "START")) return prev;
- const startGoal: Goal = {
- id: "START",
- description: "START",
- status: "completed",
- created_at: "",
- };
- return [startGoal, ...prev];
- });
- }
- setMsgGroups((prev) => {
- const existing = prev[groupKey] ? [...prev[groupKey]] : [];
- existing.push(message);
- existing.sort(messageComparator);
- return { ...prev, [groupKey]: existing };
- });
- },
- [messageComparator],
- );
- useEffect(() => {
- setGoals([]);
- setMessages([]);
- setMsgGroups({});
- setSinceEventId(0);
- setReadyToConnect(false);
- currentEventIdRef.current = 0;
- maxSequenceRef.current = 0;
- restReloadingRef.current = false;
- setTraceCompleted(false);
- }, [traceId]);
- const reloadViaRest = useCallback(async () => {
- if (!traceId) return;
- if (restReloadingRef.current) return;
- restReloadingRef.current = true;
- setReloading(true);
- let nextSinceEventId: number | null = null;
- try {
- const [traceJson, messagesJson] = await Promise.all([
- request<unknown>(`/api/traces/${traceId}`),
- request<unknown>(`/api/traces/${traceId}/messages?mode=all`),
- ]);
- const traceRoot = isRecord(traceJson) ? traceJson : {};
- const trace = isRecord(traceRoot.trace) ? traceRoot.trace : undefined;
- const goalTree = isRecord(traceRoot.goal_tree) ? traceRoot.goal_tree : undefined;
- const goalList = goalTree && Array.isArray(goalTree.goals) ? (goalTree.goals as Goal[]) : [];
- const lastEventId = trace && typeof trace.last_event_id === "number" ? trace.last_event_id : undefined;
- if (typeof lastEventId === "number") {
- currentEventIdRef.current = Math.max(currentEventIdRef.current, lastEventId);
- setSinceEventId(lastEventId);
- nextSinceEventId = lastEventId;
- }
- if (goalList.length > 0) {
- setGoals((prev) => {
- const mergedFlat = goalList.map((ng) => {
- const existing = prev.find((p) => p.id === ng.id);
- if (!existing) return ng;
- const merged: Goal = { ...existing, ...ng };
- if (existing.sub_trace_ids && !merged.sub_trace_ids) {
- merged.sub_trace_ids = existing.sub_trace_ids;
- }
- if (existing.agent_call_mode && !merged.agent_call_mode) {
- merged.agent_call_mode = existing.agent_call_mode;
- }
- if (existing.knowledge && !merged.knowledge) {
- merged.knowledge = existing.knowledge;
- }
- return merged;
- });
- return buildSubGoals(mergedFlat);
- });
- }
- const messagesRoot = isRecord(messagesJson) ? messagesJson : {};
- const list = Array.isArray(messagesRoot.messages) ? (messagesRoot.messages as Message[]) : [];
- console.log("%c [ list ]-149", "font-size:13px; background:pink; color:#bf2c9f;", list);
- const filtered = list.filter((message) => (message as { status?: string }).status !== "abandoned");
- const nextMessages = [...filtered].sort(messageComparator);
- const { availableData: finalMessages, invalidBranches: invalidBranchesTemp } = processRetryLogic(nextMessages);
- const maxSeq = finalMessages.reduce((max, msg) => {
- const seq = typeof msg.sequence === "number" ? msg.sequence : -1;
- return Math.max(max, seq);
- }, 0);
- maxSequenceRef.current = maxSeq;
- setMessages(finalMessages);
- setInvalidBranches(invalidBranchesTemp);
- const grouped: Record<string, Message[]> = {};
- finalMessages.forEach((message) => {
- const groupKey = typeof message.goal_id === "string" && message.goal_id ? message.goal_id : "START";
- if (!grouped[groupKey]) grouped[groupKey] = [];
- grouped[groupKey].push(message);
- });
- Object.keys(grouped).forEach((key) => {
- grouped[key].sort(messageComparator);
- });
- setMsgGroups(grouped);
- if (grouped.START && grouped.START.length > 0) {
- setGoals((prev) => {
- if (prev.some((g) => g.id === "START")) return prev;
- const startGoal: Goal = {
- id: "START",
- description: "START",
- status: "completed",
- created_at: "",
- };
- return [startGoal, ...prev];
- });
- }
- // REST 请求完成后,允许建立 WebSocket 连接
- setReadyToConnect(true);
- } finally {
- restReloadingRef.current = false;
- setReloading(false);
- setReadyToConnect(true);
- }
- return nextSinceEventId;
- }, [messageComparator, traceId]);
- const prevTraceIdRef = useRef<string | null>(null);
- const prevRefreshTriggerRef = useRef<number | undefined>(undefined);
- useEffect(() => {
- // 确保 traceId 存在
- if (!traceId) {
- prevTraceIdRef.current = null;
- return;
- }
- // 检查是否发生了变化
- const traceChanged = traceId !== prevTraceIdRef.current;
- const refreshTriggerChanged = refreshTrigger !== prevRefreshTriggerRef.current;
- // 只有当 traceId 真正变化,或者 refreshTrigger 真正变化时,才执行加载
- if (traceChanged || (typeof refreshTrigger === "number" && refreshTrigger > 0 && refreshTriggerChanged)) {
- prevTraceIdRef.current = traceId;
- prevRefreshTriggerRef.current = refreshTrigger;
- // 注意:traceId 变化时,另外一个清理 useEffect 也会执行,这里只负责触发加载
- // 这里不直接调用,而是通过一个 setTimeout 0 来确保清理操作已经完成
- // 但实际上清理操作是在副作用执行前发生的(对于同一组件)
- void reloadViaRest();
- }
- }, [traceId, refreshTrigger, reloadViaRest]); // 添加 reloadViaRest 到依赖列表,但由于我们用了 ref 来控制,所以不会因为它的变化而重复执行
- const handleWebSocketMessage = useCallback(
- (payload: unknown) => {
- const raw = isRecord(payload) ? payload : {};
- const event = (typeof raw.event === "string" && raw.event) || (typeof raw.type === "string" && raw.type) || "";
- const data = isRecord(raw.data) ? raw.data : raw;
- console.log("%c [ data ]-182", "font-size:13px; background:pink; color:#bf2c9f;", data);
- const eventId = typeof raw.event_id === "number" ? raw.event_id : undefined;
- if (typeof eventId === "number") {
- currentEventIdRef.current = Math.max(currentEventIdRef.current, eventId);
- }
- if (event === "error") {
- const message =
- (typeof data.message === "string" ? data.message : undefined) ||
- (typeof raw.message === "string" ? raw.message : undefined) ||
- "";
- if (message.includes("Too many missed events")) {
- void reloadViaRest().then((nextSince) => {
- if (typeof nextSince === "number") return;
- const fallbackSince = currentEventIdRef.current;
- if (fallbackSince > 0) setSinceEventId(fallbackSince);
- });
- }
- return;
- }
- if (event === "connected") {
- const currentEventId =
- (typeof data.current_event_id === "number" ? data.current_event_id : undefined) ||
- (typeof raw.current_event_id === "number" ? raw.current_event_id : undefined);
- if (typeof currentEventId === "number") {
- currentEventIdRef.current = Math.max(currentEventIdRef.current, currentEventId);
- setSinceEventId(currentEventId);
- }
- const goalTree = isRecord(data.goal_tree)
- ? data.goal_tree
- : isRecord(raw.goal_tree)
- ? raw.goal_tree
- : undefined;
- if (goalTree && Array.isArray(goalTree.goals)) {
- setGoals((prev) => {
- if (prev.length > 0) return prev;
- return buildSubGoals(goalTree.goals as Goal[]);
- });
- }
- return;
- }
- if (event === "pong") {
- return;
- }
- if (event === "rewind") {
- console.log("Processing rewind event:", data);
- const afterSequence =
- (typeof data.after_sequence === "number" ? data.after_sequence : undefined) ||
- (typeof raw.after_sequence === "number" ? raw.after_sequence : undefined);
- if (typeof afterSequence === "number") {
- maxSequenceRef.current = afterSequence;
- setMessages((prev) =>
- prev.filter((msg) => (typeof msg.sequence === "number" ? msg.sequence : -1) <= afterSequence),
- );
- setMsgGroups((prev) => {
- const next: Record<string, Message[]> = {};
- Object.entries(prev).forEach(([k, v]) => {
- const filtered = v.filter(
- (msg) => (typeof msg.sequence === "number" ? msg.sequence : -1) <= afterSequence,
- );
- if (filtered.length > 0) next[k] = filtered;
- });
- return next;
- });
- // 如果有 goal_tree_snapshot,直接更新 Goals
- const snapshot = isRecord(data.goal_tree_snapshot) ? data.goal_tree_snapshot : undefined;
- if (snapshot && Array.isArray(snapshot.goals)) {
- setGoals(buildSubGoals(snapshot.goals as Goal[]));
- } else {
- // 否则触发重载
- void reloadViaRest();
- }
- }
- return;
- }
- if (event === "goal_added") {
- const goal = isGoalLike(data.goal) ? data.goal : isGoalLike(raw.goal) ? raw.goal : null;
- if (!goal) return;
- setGoals((prev: Goal[]) => {
- const next = [...prev];
- const idx = next.findIndex((g) => g.id === goal.id);
- if (idx >= 0) {
- const existing = next[idx];
- const merged = { ...existing, ...goal };
- // 保留 sub_trace_ids,如果 WebSocket 数据中缺失但本地已有
- if (existing.sub_trace_ids && !merged.sub_trace_ids) {
- merged.sub_trace_ids = existing.sub_trace_ids;
- }
- if (existing.agent_call_mode && !merged.agent_call_mode) {
- merged.agent_call_mode = existing.agent_call_mode;
- }
- next[idx] = merged;
- return buildSubGoals(next);
- }
- next.push(goal as Goal);
- return buildSubGoals(next);
- });
- return;
- }
- if (event === "goal_updated") {
- const goalId =
- (typeof data.goal_id === "string" ? data.goal_id : undefined) ||
- (isRecord(data.goal) && typeof data.goal.id === "string" ? data.goal.id : undefined) ||
- (typeof raw.goal_id === "string" ? raw.goal_id : undefined);
- const updates =
- (isRecord(data.updates) ? data.updates : undefined) ||
- (isRecord(raw.updates) ? raw.updates : undefined) ||
- (isRecord(data.patch) ? data.patch : undefined) ||
- (isRecord(raw.patch) ? raw.patch : undefined) ||
- {};
- if (!goalId) return;
- setGoals((prev: Goal[]) =>
- prev.map((g: Goal) => {
- if (g.id !== goalId) return g;
- const next: Goal = { ...g };
- if ("status" in updates) {
- const status = updates.status;
- if (typeof status === "string") {
- next.status = status as Goal["status"];
- }
- }
- if ("summary" in updates) {
- const summary = updates.summary;
- if (typeof summary === "string") {
- next.summary = summary;
- }
- }
- return next;
- }),
- );
- return;
- }
- if (event === "trace_completed") {
- setTraceCompleted(true);
- return;
- }
- if (event === "message_added") {
- const message = isMessage(data.message) ? data.message : isMessage(raw.message) ? raw.message : null;
- if (message) {
- // Check sequence continuity
- const seq = typeof message.sequence === "number" ? message.sequence : -1;
- if (seq > 0) {
- if (maxSequenceRef.current > 0 && seq > maxSequenceRef.current + 1) {
- console.warn(
- `Message sequence gap detected: current max ${maxSequenceRef.current}, received ${seq}. Triggering reload.`,
- );
- void reloadViaRest();
- return;
- }
- maxSequenceRef.current = Math.max(maxSequenceRef.current, seq);
- }
- setMessages((prev) => {
- const next = [...prev, message];
- next.sort(messageComparator);
- return next;
- });
- updateMessageGroups(message);
- }
- }
- },
- [messageComparator, reloadViaRest, updateMessageGroups],
- );
- // 主 Trace 连接
- const wsOptions = useMemo(
- () => ({ onMessage: handleWebSocketMessage, sinceEventId }),
- [handleWebSocketMessage, sinceEventId],
- );
- // 只有当 traceId 存在且 REST 加载完成 (readyToConnect) 后才连接 WebSocket
- const { connected } = useWebSocket(readyToConnect ? traceId : null, wsOptions);
- return { goals, messages, msgGroups, connected, reloading, invalidBranches, traceCompleted };
- };
|