useFlowChartData.ts 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. import { useCallback, useEffect, useMemo, useRef, useState } from "react";
  2. import { useWebSocket } from "../../../hooks/useWebSocket";
  3. import type { Goal } from "../../../types/goal";
  4. import type { Message } from "../../../types/message";
  5. // WebSocket 数据解析与状态聚合(goals + msgGroups)
  6. const isRecord = (value: unknown): value is Record<string, unknown> =>
  7. !!value && typeof value === "object" && !Array.isArray(value);
  8. const isGoalLike = (value: unknown): value is Partial<Goal> & { id: string } =>
  9. isRecord(value) && typeof value.id === "string";
  10. const isMessage = (value: unknown): value is Message =>
  11. isRecord(value) &&
  12. (typeof value.id === "string" || typeof (value as { message_id?: string }).message_id === "string");
  13. const buildSubGoals = (flatGoals: Goal[]): Goal[] => {
  14. const nodeMap = new Map<string, Goal>();
  15. flatGoals.forEach((goal) => {
  16. nodeMap.set(goal.id, { ...goal, sub_goals: [] });
  17. });
  18. flatGoals.forEach((goal) => {
  19. const parentId = typeof goal.parent_id === "string" && goal.parent_id ? goal.parent_id : undefined;
  20. if (!parentId) return;
  21. const parent = nodeMap.get(parentId);
  22. const child = nodeMap.get(goal.id);
  23. if (!parent || !child) return;
  24. if (!Array.isArray(parent.sub_goals)) parent.sub_goals = [];
  25. parent.sub_goals.push(child);
  26. });
  27. return flatGoals.map((goal) => {
  28. const node = nodeMap.get(goal.id);
  29. if (!node) return goal;
  30. if (Array.isArray(node.sub_goals) && node.sub_goals.length === 0) {
  31. delete node.sub_goals;
  32. }
  33. return node;
  34. });
  35. };
  36. import { processRetryLogic } from "../utils/retryLogic";
  37. // FlowChart 专用数据 Hook:处理实时事件并聚合消息组
  38. export const useFlowChartData = (traceId: string | null, refreshTrigger?: number) => {
  39. const [goals, setGoals] = useState<Goal[]>([]);
  40. const [messages, setMessages] = useState<Message[]>([]);
  41. const [msgGroups, setMsgGroups] = useState<Record<string, Message[]>>({});
  42. const [sinceEventId, setSinceEventId] = useState(0);
  43. const [readyToConnect, setReadyToConnect] = useState(false);
  44. const currentEventIdRef = useRef(0);
  45. const restReloadingRef = useRef(false);
  46. const [reloading, setReloading] = useState(false);
  47. const [invalidBranches, setInvalidBranches] = useState<Message[][]>([]);
  48. const messageComparator = useCallback((a: Message, b: Message): number => {
  49. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  50. const seqA = typeof (a as any).sequence === "number" ? (a as any).sequence : 0;
  51. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  52. const seqB = typeof (b as any).sequence === "number" ? (b as any).sequence : 0;
  53. return seqA - seqB;
  54. }, []);
  55. const updateMessageGroups = useCallback(
  56. (message: Message) => {
  57. const groupKey = typeof message.goal_id === "string" && message.goal_id ? message.goal_id : "START";
  58. if (groupKey === "START") {
  59. setGoals((prev) => {
  60. if (prev.some((g) => g.id === "START")) return prev;
  61. const startGoal: Goal = {
  62. id: "START",
  63. description: "START",
  64. status: "completed",
  65. created_at: "",
  66. };
  67. return [startGoal, ...prev];
  68. });
  69. }
  70. setMsgGroups((prev) => {
  71. const existing = prev[groupKey] ? [...prev[groupKey]] : [];
  72. existing.push(message);
  73. existing.sort(messageComparator);
  74. return { ...prev, [groupKey]: existing };
  75. });
  76. },
  77. [messageComparator],
  78. );
  79. useEffect(() => {
  80. setGoals([]);
  81. setMessages([]);
  82. setMsgGroups({});
  83. setSinceEventId(0);
  84. setReadyToConnect(false);
  85. currentEventIdRef.current = 0;
  86. restReloadingRef.current = false;
  87. }, [traceId]);
  88. const reloadViaRest = useCallback(async () => {
  89. if (!traceId) return;
  90. if (restReloadingRef.current) return;
  91. restReloadingRef.current = true;
  92. setReloading(true);
  93. let nextSinceEventId: number | null = null;
  94. try {
  95. const [traceRes, messagesRes] = await Promise.all([
  96. fetch(`http://localhost:8000/api/traces/${traceId}`),
  97. fetch(`http://localhost:8000/api/traces/${traceId}/messages?mode=all`),
  98. ]);
  99. if (traceRes.ok) {
  100. const json = (await traceRes.json()) as unknown;
  101. const root = isRecord(json) ? json : {};
  102. const trace = isRecord(root.trace) ? root.trace : undefined;
  103. const goalTree = isRecord(root.goal_tree) ? root.goal_tree : undefined;
  104. const goalList = goalTree && Array.isArray(goalTree.goals) ? (goalTree.goals as Goal[]) : [];
  105. const lastEventId = trace && typeof trace.last_event_id === "number" ? trace.last_event_id : undefined;
  106. if (typeof lastEventId === "number") {
  107. currentEventIdRef.current = Math.max(currentEventIdRef.current, lastEventId);
  108. setSinceEventId(lastEventId);
  109. nextSinceEventId = lastEventId;
  110. }
  111. if (goalList.length > 0) {
  112. setGoals((prev) => {
  113. const mergedFlat = goalList.map((ng) => {
  114. const existing = prev.find((p) => p.id === ng.id);
  115. if (!existing) return ng;
  116. const merged: Goal = { ...existing, ...ng };
  117. if (existing.sub_trace_ids && !merged.sub_trace_ids) {
  118. merged.sub_trace_ids = existing.sub_trace_ids;
  119. }
  120. if (existing.agent_call_mode && !merged.agent_call_mode) {
  121. merged.agent_call_mode = existing.agent_call_mode;
  122. }
  123. return merged;
  124. });
  125. return buildSubGoals(mergedFlat);
  126. });
  127. }
  128. }
  129. if (messagesRes.ok) {
  130. const json = (await messagesRes.json()) as unknown;
  131. const root = isRecord(json) ? json : {};
  132. const list = Array.isArray(root.messages) ? (root.messages as Message[]) : [];
  133. console.log("%c [ list ]-149", "font-size:13px; background:pink; color:#bf2c9f;", list);
  134. const filtered = list.filter((message) => (message as { status?: string }).status !== "abandoned");
  135. const nextMessages = [...filtered].sort(messageComparator);
  136. const { availableData: finalMessages, invalidBranches: invalidBranchesTemp } = processRetryLogic(nextMessages);
  137. setMessages(finalMessages);
  138. setInvalidBranches(invalidBranchesTemp);
  139. const grouped: Record<string, Message[]> = {};
  140. finalMessages.forEach((message) => {
  141. const groupKey = typeof message.goal_id === "string" && message.goal_id ? message.goal_id : "START";
  142. if (!grouped[groupKey]) grouped[groupKey] = [];
  143. grouped[groupKey].push(message);
  144. });
  145. Object.keys(grouped).forEach((key) => {
  146. grouped[key].sort(messageComparator);
  147. });
  148. setMsgGroups(grouped);
  149. if (grouped.START && grouped.START.length > 0) {
  150. setGoals((prev) => {
  151. if (prev.some((g) => g.id === "START")) return prev;
  152. const startGoal: Goal = {
  153. id: "START",
  154. description: "START",
  155. status: "completed",
  156. created_at: "",
  157. };
  158. return [startGoal, ...prev];
  159. });
  160. }
  161. }
  162. // REST 请求完成后,允许建立 WebSocket 连接
  163. setReadyToConnect(true);
  164. } finally {
  165. restReloadingRef.current = false;
  166. setReloading(false);
  167. }
  168. return nextSinceEventId;
  169. }, [messageComparator, traceId]);
  170. const prevTraceIdRef = useRef<string | null>(null);
  171. const prevRefreshTriggerRef = useRef<number | undefined>(undefined);
  172. useEffect(() => {
  173. // 确保 traceId 存在
  174. if (!traceId) {
  175. prevTraceIdRef.current = null;
  176. return;
  177. }
  178. // 检查是否发生了变化
  179. const traceChanged = traceId !== prevTraceIdRef.current;
  180. const refreshTriggerChanged = refreshTrigger !== prevRefreshTriggerRef.current;
  181. // 只有当 traceId 真正变化,或者 refreshTrigger 真正变化时,才执行加载
  182. if (traceChanged || (typeof refreshTrigger === "number" && refreshTrigger > 0 && refreshTriggerChanged)) {
  183. prevTraceIdRef.current = traceId;
  184. prevRefreshTriggerRef.current = refreshTrigger;
  185. // 注意:traceId 变化时,另外一个清理 useEffect 也会执行,这里只负责触发加载
  186. // 这里不直接调用,而是通过一个 setTimeout 0 来确保清理操作已经完成
  187. // 但实际上清理操作是在副作用执行前发生的(对于同一组件)
  188. void reloadViaRest();
  189. }
  190. }, [traceId, refreshTrigger, reloadViaRest]); // 添加 reloadViaRest 到依赖列表,但由于我们用了 ref 来控制,所以不会因为它的变化而重复执行
  191. const handleWebSocketMessage = useCallback(
  192. (payload: unknown) => {
  193. const raw = isRecord(payload) ? payload : {};
  194. const event = (typeof raw.event === "string" && raw.event) || (typeof raw.type === "string" && raw.type) || "";
  195. const data = isRecord(raw.data) ? raw.data : raw;
  196. console.log("%c [ data ]-182", "font-size:13px; background:pink; color:#bf2c9f;", data);
  197. const eventId = typeof raw.event_id === "number" ? raw.event_id : undefined;
  198. if (typeof eventId === "number") {
  199. currentEventIdRef.current = Math.max(currentEventIdRef.current, eventId);
  200. }
  201. if (event === "error") {
  202. const message =
  203. (typeof data.message === "string" ? data.message : undefined) ||
  204. (typeof raw.message === "string" ? raw.message : undefined) ||
  205. "";
  206. if (message.includes("Too many missed events")) {
  207. void reloadViaRest().then((nextSince) => {
  208. if (typeof nextSince === "number") return;
  209. const fallbackSince = currentEventIdRef.current;
  210. if (fallbackSince > 0) setSinceEventId(fallbackSince);
  211. });
  212. }
  213. return;
  214. }
  215. if (event === "connected") {
  216. const currentEventId =
  217. (typeof data.current_event_id === "number" ? data.current_event_id : undefined) ||
  218. (typeof raw.current_event_id === "number" ? raw.current_event_id : undefined);
  219. if (typeof currentEventId === "number") {
  220. currentEventIdRef.current = Math.max(currentEventIdRef.current, currentEventId);
  221. }
  222. const trace = isRecord(data.trace) ? data.trace : undefined;
  223. const rawTrace = isRecord(raw.trace) ? raw.trace : undefined;
  224. const goalTree =
  225. (isRecord(data.goal_tree) ? data.goal_tree : undefined) ||
  226. (trace && isRecord(trace.goal_tree) ? trace.goal_tree : undefined) ||
  227. (isRecord(raw.goal_tree) ? raw.goal_tree : undefined) ||
  228. (rawTrace && isRecord(rawTrace.goal_tree) ? rawTrace.goal_tree : undefined) ||
  229. {};
  230. const goalList = isRecord(goalTree) ? goalTree.goals : undefined;
  231. const nextGoals = Array.isArray(goalList) ? (goalList as Goal[]) : [];
  232. setGoals((prev) => {
  233. const mergedFlat = nextGoals.map((ng) => {
  234. const existing = prev.find((p) => p.id === ng.id);
  235. if (!existing) return ng;
  236. const merged: Goal = { ...existing, ...ng };
  237. if (existing.sub_trace_ids && !merged.sub_trace_ids) {
  238. merged.sub_trace_ids = existing.sub_trace_ids;
  239. }
  240. if (existing.agent_call_mode && !merged.agent_call_mode) {
  241. merged.agent_call_mode = existing.agent_call_mode;
  242. }
  243. return merged;
  244. });
  245. return buildSubGoals(mergedFlat);
  246. });
  247. return;
  248. }
  249. if (event === "goal_added") {
  250. const goal = isGoalLike(data.goal) ? data.goal : isGoalLike(raw.goal) ? raw.goal : null;
  251. if (!goal) return;
  252. setGoals((prev: Goal[]) => {
  253. const next = [...prev];
  254. const idx = next.findIndex((g) => g.id === goal.id);
  255. if (idx >= 0) {
  256. const existing = next[idx];
  257. const merged = { ...existing, ...goal };
  258. // 保留 sub_trace_ids,如果 WebSocket 数据中缺失但本地已有
  259. if (existing.sub_trace_ids && !merged.sub_trace_ids) {
  260. merged.sub_trace_ids = existing.sub_trace_ids;
  261. }
  262. if (existing.agent_call_mode && !merged.agent_call_mode) {
  263. merged.agent_call_mode = existing.agent_call_mode;
  264. }
  265. next[idx] = merged;
  266. return buildSubGoals(next);
  267. }
  268. next.push(goal as Goal);
  269. return buildSubGoals(next);
  270. });
  271. return;
  272. }
  273. if (event === "goal_updated") {
  274. const goalId =
  275. (typeof data.goal_id === "string" ? data.goal_id : undefined) ||
  276. (isRecord(data.goal) && typeof data.goal.id === "string" ? data.goal.id : undefined) ||
  277. (typeof raw.goal_id === "string" ? raw.goal_id : undefined);
  278. const updates = isRecord(data.updates) ? data.updates : isRecord(raw.updates) ? raw.updates : {};
  279. if (!goalId) return;
  280. setGoals((prev: Goal[]) =>
  281. prev.map((g: Goal) => {
  282. if (g.id !== goalId) return g;
  283. const next: Goal = { ...g };
  284. if ("status" in updates) {
  285. const status = updates.status;
  286. if (typeof status === "string") {
  287. next.status = status as Goal["status"];
  288. }
  289. }
  290. if ("summary" in updates) {
  291. const summary = updates.summary;
  292. if (typeof summary === "string") {
  293. next.summary = summary;
  294. }
  295. }
  296. return next;
  297. }),
  298. );
  299. return;
  300. }
  301. if (event === "message_added") {
  302. const message = isMessage(data.message) ? data.message : isMessage(raw.message) ? raw.message : null;
  303. if (message) {
  304. setMessages((prev) => {
  305. const next = [...prev, message];
  306. next.sort(messageComparator);
  307. return next;
  308. });
  309. updateMessageGroups(message);
  310. }
  311. }
  312. },
  313. [messageComparator, reloadViaRest, updateMessageGroups],
  314. );
  315. // 主 Trace 连接
  316. const wsOptions = useMemo(
  317. () => ({ onMessage: handleWebSocketMessage, sinceEventId }),
  318. [handleWebSocketMessage, sinceEventId],
  319. );
  320. // 只有当 traceId 存在且 REST 加载完成 (readyToConnect) 后才连接 WebSocket
  321. const { connected } = useWebSocket(readyToConnect ? traceId : null, wsOptions);
  322. return { goals, messages, msgGroups, connected, reloading, invalidBranches };
  323. };