useFlowChartData.ts 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. import { useCallback, useEffect, useMemo, useRef, useState } from "react";
  2. import { useWebSocket } from "../../../hooks/useWebSocket";
  3. import { request } from "../../../api/client";
  4. import type { Goal } from "../../../types/goal";
  5. import type { Message } from "../../../types/message";
  6. // WebSocket 数据解析与状态聚合(goals + msgGroups)
  7. const isRecord = (value: unknown): value is Record<string, unknown> =>
  8. !!value && typeof value === "object" && !Array.isArray(value);
  9. const isGoalLike = (value: unknown): value is Partial<Goal> & { id: string } =>
  10. isRecord(value) && typeof value.id === "string";
  11. const isMessage = (value: unknown): value is Message =>
  12. isRecord(value) &&
  13. (typeof value.id === "string" || typeof (value as { message_id?: string }).message_id === "string");
  14. const buildSubGoals = (flatGoals: Goal[]): Goal[] => {
  15. const nodeMap = new Map<string, Goal>();
  16. flatGoals.forEach((goal) => {
  17. nodeMap.set(goal.id, { ...goal, sub_goals: [] });
  18. });
  19. flatGoals.forEach((goal) => {
  20. const parentId = typeof goal.parent_id === "string" && goal.parent_id ? goal.parent_id : undefined;
  21. if (!parentId) return;
  22. const parent = nodeMap.get(parentId);
  23. const child = nodeMap.get(goal.id);
  24. if (!parent || !child) return;
  25. if (!Array.isArray(parent.sub_goals)) parent.sub_goals = [];
  26. parent.sub_goals.push(child);
  27. });
  28. return flatGoals.map((goal) => {
  29. const node = nodeMap.get(goal.id);
  30. if (!node) return goal;
  31. if (Array.isArray(node.sub_goals) && node.sub_goals.length === 0) {
  32. delete node.sub_goals;
  33. }
  34. return node;
  35. });
  36. };
  37. import { processRetryLogic } from "../utils/retryLogic";
  38. // FlowChart 专用数据 Hook:处理实时事件并聚合消息组
  39. export const useFlowChartData = (traceId: string | null, refreshTrigger?: number) => {
  40. const [goals, setGoals] = useState<Goal[]>([]);
  41. const [messages, setMessages] = useState<Message[]>([]);
  42. const [msgGroups, setMsgGroups] = useState<Record<string, Message[]>>({});
  43. const [sinceEventId, setSinceEventId] = useState(0);
  44. const [readyToConnect, setReadyToConnect] = useState(false);
  45. const currentEventIdRef = useRef(0);
  46. const maxSequenceRef = useRef(0);
  47. const restReloadingRef = useRef(false);
  48. const [reloading, setReloading] = useState(false);
  49. const [invalidBranches, setInvalidBranches] = useState<Message[][]>([]);
  50. const [traceCompleted, setTraceCompleted] = useState(false);
  51. const messageComparator = useCallback((a: Message, b: Message): number => {
  52. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  53. const seqA = typeof (a as any).sequence === "number" ? (a as any).sequence : 0;
  54. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  55. const seqB = typeof (b as any).sequence === "number" ? (b as any).sequence : 0;
  56. return seqA - seqB;
  57. }, []);
  58. const updateMessageGroups = useCallback(
  59. (message: Message) => {
  60. const groupKey = typeof message.goal_id === "string" && message.goal_id ? message.goal_id : "START";
  61. if (groupKey === "START") {
  62. setGoals((prev) => {
  63. if (prev.some((g) => g.id === "START")) return prev;
  64. const startGoal: Goal = {
  65. id: "START",
  66. description: "START",
  67. status: "completed",
  68. created_at: "",
  69. };
  70. return [startGoal, ...prev];
  71. });
  72. }
  73. setMsgGroups((prev) => {
  74. const existing = prev[groupKey] ? [...prev[groupKey]] : [];
  75. existing.push(message);
  76. existing.sort(messageComparator);
  77. return { ...prev, [groupKey]: existing };
  78. });
  79. },
  80. [messageComparator],
  81. );
  82. useEffect(() => {
  83. setGoals([]);
  84. setMessages([]);
  85. setMsgGroups({});
  86. setSinceEventId(0);
  87. setReadyToConnect(false);
  88. currentEventIdRef.current = 0;
  89. maxSequenceRef.current = 0;
  90. restReloadingRef.current = false;
  91. setTraceCompleted(false);
  92. }, [traceId]);
  93. const reloadViaRest = useCallback(async () => {
  94. if (!traceId) return;
  95. if (restReloadingRef.current) return;
  96. restReloadingRef.current = true;
  97. setReloading(true);
  98. let nextSinceEventId: number | null = null;
  99. try {
  100. const [traceJson, messagesJson] = await Promise.all([
  101. request<unknown>(`/api/traces/${traceId}`),
  102. request<unknown>(`/api/traces/${traceId}/messages?mode=all`),
  103. ]);
  104. const traceRoot = isRecord(traceJson) ? traceJson : {};
  105. const trace = isRecord(traceRoot.trace) ? traceRoot.trace : undefined;
  106. const goalTree = isRecord(traceRoot.goal_tree) ? traceRoot.goal_tree : undefined;
  107. const goalList = goalTree && Array.isArray(goalTree.goals) ? (goalTree.goals as Goal[]) : [];
  108. const lastEventId = trace && typeof trace.last_event_id === "number" ? trace.last_event_id : undefined;
  109. if (typeof lastEventId === "number") {
  110. currentEventIdRef.current = Math.max(currentEventIdRef.current, lastEventId);
  111. setSinceEventId(lastEventId);
  112. nextSinceEventId = lastEventId;
  113. }
  114. if (goalList.length > 0) {
  115. setGoals((prev) => {
  116. const mergedFlat = goalList.map((ng) => {
  117. const existing = prev.find((p) => p.id === ng.id);
  118. if (!existing) return ng;
  119. const merged: Goal = { ...existing, ...ng };
  120. if (existing.sub_trace_ids && !merged.sub_trace_ids) {
  121. merged.sub_trace_ids = existing.sub_trace_ids;
  122. }
  123. if (existing.agent_call_mode && !merged.agent_call_mode) {
  124. merged.agent_call_mode = existing.agent_call_mode;
  125. }
  126. if (existing.knowledge && !merged.knowledge) {
  127. merged.knowledge = existing.knowledge;
  128. }
  129. return merged;
  130. });
  131. return buildSubGoals(mergedFlat);
  132. });
  133. }
  134. const messagesRoot = isRecord(messagesJson) ? messagesJson : {};
  135. const list = Array.isArray(messagesRoot.messages) ? (messagesRoot.messages as Message[]) : [];
  136. console.log("%c [ list ]-149", "font-size:13px; background:pink; color:#bf2c9f;", list);
  137. const filtered = list.filter((message) => (message as { status?: string }).status !== "abandoned");
  138. const nextMessages = [...filtered].sort(messageComparator);
  139. const { availableData: finalMessages, invalidBranches: invalidBranchesTemp } = processRetryLogic(nextMessages);
  140. const maxSeq = finalMessages.reduce((max, msg) => {
  141. const seq = typeof msg.sequence === "number" ? msg.sequence : -1;
  142. return Math.max(max, seq);
  143. }, 0);
  144. maxSequenceRef.current = maxSeq;
  145. setMessages(finalMessages);
  146. setInvalidBranches(invalidBranchesTemp);
  147. const grouped: Record<string, Message[]> = {};
  148. finalMessages.forEach((message) => {
  149. const groupKey = typeof message.goal_id === "string" && message.goal_id ? message.goal_id : "START";
  150. if (!grouped[groupKey]) grouped[groupKey] = [];
  151. grouped[groupKey].push(message);
  152. });
  153. Object.keys(grouped).forEach((key) => {
  154. grouped[key].sort(messageComparator);
  155. });
  156. setMsgGroups(grouped);
  157. if (grouped.START && grouped.START.length > 0) {
  158. setGoals((prev) => {
  159. if (prev.some((g) => g.id === "START")) return prev;
  160. const startGoal: Goal = {
  161. id: "START",
  162. description: "START",
  163. status: "completed",
  164. created_at: "",
  165. };
  166. return [startGoal, ...prev];
  167. });
  168. }
  169. // REST 请求完成后,允许建立 WebSocket 连接
  170. setReadyToConnect(true);
  171. } finally {
  172. restReloadingRef.current = false;
  173. setReloading(false);
  174. setReadyToConnect(true);
  175. }
  176. return nextSinceEventId;
  177. }, [messageComparator, traceId]);
  178. const prevTraceIdRef = useRef<string | null>(null);
  179. const prevRefreshTriggerRef = useRef<number | undefined>(undefined);
  180. useEffect(() => {
  181. // 确保 traceId 存在
  182. if (!traceId) {
  183. prevTraceIdRef.current = null;
  184. return;
  185. }
  186. // 检查是否发生了变化
  187. const traceChanged = traceId !== prevTraceIdRef.current;
  188. const refreshTriggerChanged = refreshTrigger !== prevRefreshTriggerRef.current;
  189. // 只有当 traceId 真正变化,或者 refreshTrigger 真正变化时,才执行加载
  190. if (traceChanged || (typeof refreshTrigger === "number" && refreshTrigger > 0 && refreshTriggerChanged)) {
  191. prevTraceIdRef.current = traceId;
  192. prevRefreshTriggerRef.current = refreshTrigger;
  193. // 注意:traceId 变化时,另外一个清理 useEffect 也会执行,这里只负责触发加载
  194. // 这里不直接调用,而是通过一个 setTimeout 0 来确保清理操作已经完成
  195. // 但实际上清理操作是在副作用执行前发生的(对于同一组件)
  196. void reloadViaRest();
  197. }
  198. }, [traceId, refreshTrigger, reloadViaRest]); // 添加 reloadViaRest 到依赖列表,但由于我们用了 ref 来控制,所以不会因为它的变化而重复执行
  199. const handleWebSocketMessage = useCallback(
  200. (payload: unknown) => {
  201. const raw = isRecord(payload) ? payload : {};
  202. const event = (typeof raw.event === "string" && raw.event) || (typeof raw.type === "string" && raw.type) || "";
  203. const data = isRecord(raw.data) ? raw.data : raw;
  204. console.log("%c [ data ]-182", "font-size:13px; background:pink; color:#bf2c9f;", data);
  205. const eventId = typeof raw.event_id === "number" ? raw.event_id : undefined;
  206. if (typeof eventId === "number") {
  207. currentEventIdRef.current = Math.max(currentEventIdRef.current, eventId);
  208. }
  209. if (event === "error") {
  210. const message =
  211. (typeof data.message === "string" ? data.message : undefined) ||
  212. (typeof raw.message === "string" ? raw.message : undefined) ||
  213. "";
  214. if (message.includes("Too many missed events")) {
  215. void reloadViaRest().then((nextSince) => {
  216. if (typeof nextSince === "number") return;
  217. const fallbackSince = currentEventIdRef.current;
  218. if (fallbackSince > 0) setSinceEventId(fallbackSince);
  219. });
  220. }
  221. return;
  222. }
  223. if (event === "connected") {
  224. const currentEventId =
  225. (typeof data.current_event_id === "number" ? data.current_event_id : undefined) ||
  226. (typeof raw.current_event_id === "number" ? raw.current_event_id : undefined);
  227. if (typeof currentEventId === "number") {
  228. currentEventIdRef.current = Math.max(currentEventIdRef.current, currentEventId);
  229. setSinceEventId(currentEventId);
  230. }
  231. const goalTree = isRecord(data.goal_tree)
  232. ? data.goal_tree
  233. : isRecord(raw.goal_tree)
  234. ? raw.goal_tree
  235. : undefined;
  236. if (goalTree && Array.isArray(goalTree.goals)) {
  237. setGoals((prev) => {
  238. if (prev.length > 0) return prev;
  239. return buildSubGoals(goalTree.goals as Goal[]);
  240. });
  241. }
  242. return;
  243. }
  244. if (event === "pong") {
  245. return;
  246. }
  247. if (event === "rewind") {
  248. console.log("Processing rewind event:", data);
  249. const afterSequence =
  250. (typeof data.after_sequence === "number" ? data.after_sequence : undefined) ||
  251. (typeof raw.after_sequence === "number" ? raw.after_sequence : undefined);
  252. if (typeof afterSequence === "number") {
  253. maxSequenceRef.current = afterSequence;
  254. setMessages((prev) =>
  255. prev.filter((msg) => (typeof msg.sequence === "number" ? msg.sequence : -1) <= afterSequence),
  256. );
  257. setMsgGroups((prev) => {
  258. const next: Record<string, Message[]> = {};
  259. Object.entries(prev).forEach(([k, v]) => {
  260. const filtered = v.filter(
  261. (msg) => (typeof msg.sequence === "number" ? msg.sequence : -1) <= afterSequence,
  262. );
  263. if (filtered.length > 0) next[k] = filtered;
  264. });
  265. return next;
  266. });
  267. // 如果有 goal_tree_snapshot,直接更新 Goals
  268. const snapshot = isRecord(data.goal_tree_snapshot) ? data.goal_tree_snapshot : undefined;
  269. if (snapshot && Array.isArray(snapshot.goals)) {
  270. setGoals(buildSubGoals(snapshot.goals as Goal[]));
  271. } else {
  272. // 否则触发重载
  273. void reloadViaRest();
  274. }
  275. }
  276. return;
  277. }
  278. if (event === "goal_added") {
  279. const goal = isGoalLike(data.goal) ? data.goal : isGoalLike(raw.goal) ? raw.goal : null;
  280. if (!goal) return;
  281. setGoals((prev: Goal[]) => {
  282. const next = [...prev];
  283. const idx = next.findIndex((g) => g.id === goal.id);
  284. if (idx >= 0) {
  285. const existing = next[idx];
  286. const merged = { ...existing, ...goal };
  287. // 保留 sub_trace_ids,如果 WebSocket 数据中缺失但本地已有
  288. if (existing.sub_trace_ids && !merged.sub_trace_ids) {
  289. merged.sub_trace_ids = existing.sub_trace_ids;
  290. }
  291. if (existing.agent_call_mode && !merged.agent_call_mode) {
  292. merged.agent_call_mode = existing.agent_call_mode;
  293. }
  294. next[idx] = merged;
  295. return buildSubGoals(next);
  296. }
  297. next.push(goal as Goal);
  298. return buildSubGoals(next);
  299. });
  300. return;
  301. }
  302. if (event === "goal_updated") {
  303. const goalId =
  304. (typeof data.goal_id === "string" ? data.goal_id : undefined) ||
  305. (isRecord(data.goal) && typeof data.goal.id === "string" ? data.goal.id : undefined) ||
  306. (typeof raw.goal_id === "string" ? raw.goal_id : undefined);
  307. const updates =
  308. (isRecord(data.updates) ? data.updates : undefined) ||
  309. (isRecord(raw.updates) ? raw.updates : undefined) ||
  310. (isRecord(data.patch) ? data.patch : undefined) ||
  311. (isRecord(raw.patch) ? raw.patch : undefined) ||
  312. {};
  313. if (!goalId) return;
  314. setGoals((prev: Goal[]) =>
  315. prev.map((g: Goal) => {
  316. if (g.id !== goalId) return g;
  317. const next: Goal = { ...g };
  318. if ("status" in updates) {
  319. const status = updates.status;
  320. if (typeof status === "string") {
  321. next.status = status as Goal["status"];
  322. }
  323. }
  324. if ("summary" in updates) {
  325. const summary = updates.summary;
  326. if (typeof summary === "string") {
  327. next.summary = summary;
  328. }
  329. }
  330. return next;
  331. }),
  332. );
  333. return;
  334. }
  335. if (event === "trace_completed") {
  336. setTraceCompleted(true);
  337. return;
  338. }
  339. if (event === "message_added") {
  340. const message = isMessage(data.message) ? data.message : isMessage(raw.message) ? raw.message : null;
  341. if (message) {
  342. // Check sequence continuity
  343. const seq = typeof message.sequence === "number" ? message.sequence : -1;
  344. if (seq > 0) {
  345. if (maxSequenceRef.current > 0 && seq > maxSequenceRef.current + 1) {
  346. console.warn(
  347. `Message sequence gap detected: current max ${maxSequenceRef.current}, received ${seq}. Triggering reload.`,
  348. );
  349. void reloadViaRest();
  350. return;
  351. }
  352. maxSequenceRef.current = Math.max(maxSequenceRef.current, seq);
  353. }
  354. setMessages((prev) => {
  355. const next = [...prev, message];
  356. next.sort(messageComparator);
  357. return next;
  358. });
  359. updateMessageGroups(message);
  360. }
  361. }
  362. },
  363. [messageComparator, reloadViaRest, updateMessageGroups],
  364. );
  365. // 主 Trace 连接
  366. const wsOptions = useMemo(
  367. () => ({ onMessage: handleWebSocketMessage, sinceEventId }),
  368. [handleWebSocketMessage, sinceEventId],
  369. );
  370. // 只有当 traceId 存在且 REST 加载完成 (readyToConnect) 后才连接 WebSocket
  371. const { connected } = useWebSocket(readyToConnect ? traceId : null, wsOptions);
  372. return { goals, messages, msgGroups, connected, reloading, invalidBranches, traceCompleted };
  373. };