|
|
@@ -40,28 +40,26 @@ const buildSubGoals = (flatGoals: Goal[]): Goal[] => {
|
|
|
});
|
|
|
};
|
|
|
|
|
|
+import { processRetryLogic } from "../utils/retryLogic";
|
|
|
+
|
|
|
// FlowChart 专用数据 Hook:处理实时事件并聚合消息组
|
|
|
-export const useFlowChartData = (traceId: string | null, initialGoals: Goal[], refreshTrigger?: number) => {
|
|
|
- const [goals, setGoals] = useState<Goal[]>(initialGoals);
|
|
|
+export const useFlowChartData = (traceId: string | null, refreshTrigger?: number) => {
|
|
|
+ const [goals, setGoals] = useState<Goal[]>([]);
|
|
|
const [messages, setMessages] = useState<Message[]>([]);
|
|
|
const [msgGroups, setMsgGroups] = useState<Record<string, Message[]>>({});
|
|
|
const [sinceEventId, setSinceEventId] = useState(0);
|
|
|
+ const [readyToConnect, setReadyToConnect] = useState(false);
|
|
|
const currentEventIdRef = useRef(0);
|
|
|
const restReloadingRef = useRef(false);
|
|
|
const [reloading, setReloading] = useState(false);
|
|
|
+ const [invalidBranches, setInvalidBranches] = useState<Message[][]>([]);
|
|
|
|
|
|
- const messageSortKey = useCallback((message: Message): number => {
|
|
|
- const mid =
|
|
|
- typeof message.message_id === "string"
|
|
|
- ? message.message_id
|
|
|
- : typeof message.id === "string"
|
|
|
- ? message.id
|
|
|
- : undefined;
|
|
|
- if (!mid) return 0;
|
|
|
- if (!mid.includes("-")) return 0;
|
|
|
- const suffix = mid.slice(mid.lastIndexOf("-") + 1);
|
|
|
- const num = Number.parseInt(suffix, 10);
|
|
|
- return Number.isFinite(num) ? num : 0;
|
|
|
+ const messageComparator = useCallback((a: Message, b: Message): number => {
|
|
|
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
|
+ const seqA = typeof (a as any).sequence === "number" ? (a as any).sequence : 0;
|
|
|
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
|
+ const seqB = typeof (b as any).sequence === "number" ? (b as any).sequence : 0;
|
|
|
+ return seqA - seqB;
|
|
|
}, []);
|
|
|
|
|
|
const updateMessageGroups = useCallback(
|
|
|
@@ -82,21 +80,19 @@ export const useFlowChartData = (traceId: string | null, initialGoals: Goal[], r
|
|
|
setMsgGroups((prev) => {
|
|
|
const existing = prev[groupKey] ? [...prev[groupKey]] : [];
|
|
|
existing.push(message);
|
|
|
- existing.sort((a, b) => messageSortKey(a) - messageSortKey(b));
|
|
|
+ existing.sort(messageComparator);
|
|
|
return { ...prev, [groupKey]: existing };
|
|
|
});
|
|
|
},
|
|
|
- [messageSortKey],
|
|
|
+ [messageComparator],
|
|
|
);
|
|
|
|
|
|
useEffect(() => {
|
|
|
- setGoals(initialGoals);
|
|
|
- }, [initialGoals]);
|
|
|
-
|
|
|
- useEffect(() => {
|
|
|
+ setGoals([]);
|
|
|
setMessages([]);
|
|
|
setMsgGroups({});
|
|
|
setSinceEventId(0);
|
|
|
+ setReadyToConnect(false);
|
|
|
currentEventIdRef.current = 0;
|
|
|
restReloadingRef.current = false;
|
|
|
}, [traceId]);
|
|
|
@@ -150,17 +146,22 @@ export const useFlowChartData = (traceId: string | null, initialGoals: Goal[], r
|
|
|
const json = (await messagesRes.json()) as unknown;
|
|
|
const root = isRecord(json) ? json : {};
|
|
|
const list = Array.isArray(root.messages) ? (root.messages as Message[]) : [];
|
|
|
+
|
|
|
const filtered = list.filter((message) => (message as { status?: string }).status !== "abandoned");
|
|
|
- const nextMessages = [...filtered].sort((a, b) => messageSortKey(a) - messageSortKey(b));
|
|
|
- setMessages(nextMessages);
|
|
|
+ const nextMessages = [...filtered].sort(messageComparator);
|
|
|
+
|
|
|
+ const { availableData: finalMessages, invalidBranches: invalidBranchesTemp } = processRetryLogic(nextMessages);
|
|
|
+
|
|
|
+ setMessages(finalMessages);
|
|
|
+ setInvalidBranches(invalidBranchesTemp);
|
|
|
const grouped: Record<string, Message[]> = {};
|
|
|
- nextMessages.forEach((message) => {
|
|
|
+ finalMessages.forEach((message) => {
|
|
|
const groupKey = typeof message.goal_id === "string" && message.goal_id ? message.goal_id : "START";
|
|
|
if (!grouped[groupKey]) grouped[groupKey] = [];
|
|
|
grouped[groupKey].push(message);
|
|
|
});
|
|
|
Object.keys(grouped).forEach((key) => {
|
|
|
- grouped[key].sort((a, b) => messageSortKey(a) - messageSortKey(b));
|
|
|
+ grouped[key].sort(messageComparator);
|
|
|
});
|
|
|
setMsgGroups(grouped);
|
|
|
|
|
|
@@ -177,18 +178,42 @@ export const useFlowChartData = (traceId: string | null, initialGoals: Goal[], r
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ // REST 请求完成后,允许建立 WebSocket 连接
|
|
|
+ setReadyToConnect(true);
|
|
|
} finally {
|
|
|
restReloadingRef.current = false;
|
|
|
setReloading(false);
|
|
|
}
|
|
|
return nextSinceEventId;
|
|
|
- }, [messageSortKey, traceId]);
|
|
|
+ }, [messageComparator, traceId]);
|
|
|
+
|
|
|
+ const prevTraceIdRef = useRef<string | null>(null);
|
|
|
+ const prevRefreshTriggerRef = useRef<number | undefined>(undefined);
|
|
|
|
|
|
useEffect(() => {
|
|
|
- if (!traceId) return;
|
|
|
- if (!refreshTrigger) return;
|
|
|
- void reloadViaRest();
|
|
|
- }, [refreshTrigger, reloadViaRest, traceId]);
|
|
|
+ // 确保 traceId 存在
|
|
|
+ if (!traceId) {
|
|
|
+ prevTraceIdRef.current = null;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 检查是否发生了变化
|
|
|
+ const traceChanged = traceId !== prevTraceIdRef.current;
|
|
|
+ const refreshTriggerChanged = refreshTrigger !== prevRefreshTriggerRef.current;
|
|
|
+
|
|
|
+ // 只有当 traceId 真正变化,或者 refreshTrigger 真正变化时,才执行加载
|
|
|
+ if (traceChanged || (typeof refreshTrigger === "number" && refreshTrigger > 0 && refreshTriggerChanged)) {
|
|
|
+ prevTraceIdRef.current = traceId;
|
|
|
+ prevRefreshTriggerRef.current = refreshTrigger;
|
|
|
+
|
|
|
+ // 注意:traceId 变化时,另外一个清理 useEffect 也会执行,这里只负责触发加载
|
|
|
+ // 这里不直接调用,而是通过一个 setTimeout 0 来确保清理操作已经完成
|
|
|
+ // 但实际上清理操作是在副作用执行前发生的(对于同一组件)
|
|
|
+
|
|
|
+ void reloadViaRest();
|
|
|
+ }
|
|
|
+ }, [traceId, refreshTrigger, reloadViaRest]); // 添加 reloadViaRest 到依赖列表,但由于我们用了 ref 来控制,所以不会因为它的变化而重复执行
|
|
|
|
|
|
const handleWebSocketMessage = useCallback(
|
|
|
(payload: unknown) => {
|
|
|
@@ -312,14 +337,14 @@ export const useFlowChartData = (traceId: string | null, initialGoals: Goal[], r
|
|
|
if (message) {
|
|
|
setMessages((prev) => {
|
|
|
const next = [...prev, message];
|
|
|
- next.sort((a, b) => messageSortKey(a) - messageSortKey(b));
|
|
|
+ next.sort(messageComparator);
|
|
|
return next;
|
|
|
});
|
|
|
updateMessageGroups(message);
|
|
|
}
|
|
|
}
|
|
|
},
|
|
|
- [messageSortKey, reloadViaRest, updateMessageGroups],
|
|
|
+ [messageComparator, reloadViaRest, updateMessageGroups],
|
|
|
);
|
|
|
|
|
|
// 主 Trace 连接
|
|
|
@@ -327,7 +352,8 @@ export const useFlowChartData = (traceId: string | null, initialGoals: Goal[], r
|
|
|
() => ({ onMessage: handleWebSocketMessage, sinceEventId }),
|
|
|
[handleWebSocketMessage, sinceEventId],
|
|
|
);
|
|
|
- const { connected } = useWebSocket(traceId, wsOptions);
|
|
|
+ // 只有当 traceId 存在且 REST 加载完成 (readyToConnect) 后才连接 WebSocket
|
|
|
+ const { connected } = useWebSocket(readyToConnect ? traceId : null, wsOptions);
|
|
|
|
|
|
- return { goals, messages, msgGroups, connected, reloading };
|
|
|
+ return { goals, messages, msgGroups, connected, reloading, invalidBranches };
|
|
|
};
|