useFlowChartData.ts 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. import { useCallback, useEffect, useMemo, useRef, useState } from "react";
  2. import { useWebSocket } from "../../../hooks/useWebSocket";
  3. import type { Goal } from "../../../types/goal";
  4. import type { Message } from "../../../types/message";
  5. // WebSocket 数据解析与状态聚合(goals + msgGroups)
  6. const isRecord = (value: unknown): value is Record<string, unknown> =>
  7. !!value && typeof value === "object" && !Array.isArray(value);
  8. const isGoalLike = (value: unknown): value is Partial<Goal> & { id: string } =>
  9. isRecord(value) && typeof value.id === "string";
  10. const isMessage = (value: unknown): value is Message =>
  11. isRecord(value) &&
  12. (typeof value.id === "string" || typeof (value as { message_id?: string }).message_id === "string");
  13. const buildSubGoals = (flatGoals: Goal[]): Goal[] => {
  14. const nodeMap = new Map<string, Goal>();
  15. flatGoals.forEach((goal) => {
  16. nodeMap.set(goal.id, { ...goal, sub_goals: [] });
  17. });
  18. flatGoals.forEach((goal) => {
  19. const parentId = typeof goal.parent_id === "string" && goal.parent_id ? goal.parent_id : undefined;
  20. if (!parentId) return;
  21. const parent = nodeMap.get(parentId);
  22. const child = nodeMap.get(goal.id);
  23. if (!parent || !child) return;
  24. if (!Array.isArray(parent.sub_goals)) parent.sub_goals = [];
  25. parent.sub_goals.push(child);
  26. });
  27. return flatGoals.map((goal) => {
  28. const node = nodeMap.get(goal.id);
  29. if (!node) return goal;
  30. if (Array.isArray(node.sub_goals) && node.sub_goals.length === 0) {
  31. delete node.sub_goals;
  32. }
  33. return node;
  34. });
  35. };
  36. // FlowChart 专用数据 Hook:处理实时事件并聚合消息组
  37. export const useFlowChartData = (traceId: string | null, initialGoals: Goal[], refreshTrigger?: number) => {
  38. const [goals, setGoals] = useState<Goal[]>(initialGoals);
  39. const [messages, setMessages] = useState<Message[]>([]);
  40. const [msgGroups, setMsgGroups] = useState<Record<string, Message[]>>({});
  41. const [sinceEventId, setSinceEventId] = useState(0);
  42. const currentEventIdRef = useRef(0);
  43. const restReloadingRef = useRef(false);
  44. const [reloading, setReloading] = useState(false);
  45. const messageSortKey = useCallback((message: Message): number => {
  46. const mid =
  47. typeof message.message_id === "string"
  48. ? message.message_id
  49. : typeof message.id === "string"
  50. ? message.id
  51. : undefined;
  52. if (!mid) return 0;
  53. if (!mid.includes("-")) return 0;
  54. const suffix = mid.slice(mid.lastIndexOf("-") + 1);
  55. const num = Number.parseInt(suffix, 10);
  56. return Number.isFinite(num) ? num : 0;
  57. }, []);
  58. const updateMessageGroups = useCallback(
  59. (message: Message) => {
  60. const groupKey = typeof message.goal_id === "string" && message.goal_id ? message.goal_id : "START";
  61. if (groupKey === "START") {
  62. setGoals((prev) => {
  63. if (prev.some((g) => g.id === "START")) return prev;
  64. const startGoal: Goal = {
  65. id: "START",
  66. description: "START",
  67. status: "completed",
  68. created_at: "",
  69. };
  70. return [startGoal, ...prev];
  71. });
  72. }
  73. setMsgGroups((prev) => {
  74. const existing = prev[groupKey] ? [...prev[groupKey]] : [];
  75. existing.push(message);
  76. existing.sort((a, b) => messageSortKey(a) - messageSortKey(b));
  77. return { ...prev, [groupKey]: existing };
  78. });
  79. },
  80. [messageSortKey],
  81. );
  82. useEffect(() => {
  83. setGoals(initialGoals);
  84. }, [initialGoals]);
  85. useEffect(() => {
  86. setMessages([]);
  87. setMsgGroups({});
  88. setSinceEventId(0);
  89. currentEventIdRef.current = 0;
  90. restReloadingRef.current = false;
  91. }, [traceId]);
  92. const reloadViaRest = useCallback(async () => {
  93. if (!traceId) return;
  94. if (restReloadingRef.current) return;
  95. restReloadingRef.current = true;
  96. setReloading(true);
  97. let nextSinceEventId: number | null = null;
  98. try {
  99. const [traceRes, messagesRes] = await Promise.all([
  100. fetch(`http://localhost:8000/api/traces/${traceId}`),
  101. fetch(`http://localhost:8000/api/traces/${traceId}/messages?mode=all`),
  102. ]);
  103. if (traceRes.ok) {
  104. const json = (await traceRes.json()) as unknown;
  105. const root = isRecord(json) ? json : {};
  106. const trace = isRecord(root.trace) ? root.trace : undefined;
  107. const goalTree = isRecord(root.goal_tree) ? root.goal_tree : undefined;
  108. const goalList = goalTree && Array.isArray(goalTree.goals) ? (goalTree.goals as Goal[]) : [];
  109. const lastEventId = trace && typeof trace.last_event_id === "number" ? trace.last_event_id : undefined;
  110. if (typeof lastEventId === "number") {
  111. currentEventIdRef.current = Math.max(currentEventIdRef.current, lastEventId);
  112. setSinceEventId(lastEventId);
  113. nextSinceEventId = lastEventId;
  114. }
  115. if (goalList.length > 0) {
  116. setGoals((prev) => {
  117. const mergedFlat = goalList.map((ng) => {
  118. const existing = prev.find((p) => p.id === ng.id);
  119. if (!existing) return ng;
  120. const merged: Goal = { ...existing, ...ng };
  121. if (existing.sub_trace_ids && !merged.sub_trace_ids) {
  122. merged.sub_trace_ids = existing.sub_trace_ids;
  123. }
  124. if (existing.agent_call_mode && !merged.agent_call_mode) {
  125. merged.agent_call_mode = existing.agent_call_mode;
  126. }
  127. return merged;
  128. });
  129. return buildSubGoals(mergedFlat);
  130. });
  131. }
  132. }
  133. if (messagesRes.ok) {
  134. const json = (await messagesRes.json()) as unknown;
  135. const root = isRecord(json) ? json : {};
  136. const list = Array.isArray(root.messages) ? (root.messages as Message[]) : [];
  137. const filtered = list.filter((message) => (message as { status?: string }).status !== "abandoned");
  138. const nextMessages = [...filtered].sort((a, b) => messageSortKey(a) - messageSortKey(b));
  139. setMessages(nextMessages);
  140. const grouped: Record<string, Message[]> = {};
  141. nextMessages.forEach((message) => {
  142. const groupKey = typeof message.goal_id === "string" && message.goal_id ? message.goal_id : "START";
  143. if (!grouped[groupKey]) grouped[groupKey] = [];
  144. grouped[groupKey].push(message);
  145. });
  146. Object.keys(grouped).forEach((key) => {
  147. grouped[key].sort((a, b) => messageSortKey(a) - messageSortKey(b));
  148. });
  149. setMsgGroups(grouped);
  150. if (grouped.START && grouped.START.length > 0) {
  151. setGoals((prev) => {
  152. if (prev.some((g) => g.id === "START")) return prev;
  153. const startGoal: Goal = {
  154. id: "START",
  155. description: "START",
  156. status: "completed",
  157. created_at: "",
  158. };
  159. return [startGoal, ...prev];
  160. });
  161. }
  162. }
  163. } finally {
  164. restReloadingRef.current = false;
  165. setReloading(false);
  166. }
  167. return nextSinceEventId;
  168. }, [messageSortKey, traceId]);
  169. useEffect(() => {
  170. if (!traceId) return;
  171. if (!refreshTrigger) return;
  172. void reloadViaRest();
  173. }, [refreshTrigger, reloadViaRest, traceId]);
  174. const handleWebSocketMessage = useCallback(
  175. (payload: unknown) => {
  176. const raw = isRecord(payload) ? payload : {};
  177. const event = (typeof raw.event === "string" && raw.event) || (typeof raw.type === "string" && raw.type) || "";
  178. const data = isRecord(raw.data) ? raw.data : raw;
  179. console.log("%c [ data ]-182", "font-size:13px; background:pink; color:#bf2c9f;", data);
  180. const eventId = typeof raw.event_id === "number" ? raw.event_id : undefined;
  181. if (typeof eventId === "number") {
  182. currentEventIdRef.current = Math.max(currentEventIdRef.current, eventId);
  183. }
  184. if (event === "error") {
  185. const message =
  186. (typeof data.message === "string" ? data.message : undefined) ||
  187. (typeof raw.message === "string" ? raw.message : undefined) ||
  188. "";
  189. if (message.includes("Too many missed events")) {
  190. void reloadViaRest().then((nextSince) => {
  191. if (typeof nextSince === "number") return;
  192. const fallbackSince = currentEventIdRef.current;
  193. if (fallbackSince > 0) setSinceEventId(fallbackSince);
  194. });
  195. }
  196. return;
  197. }
  198. if (event === "connected") {
  199. const currentEventId =
  200. (typeof data.current_event_id === "number" ? data.current_event_id : undefined) ||
  201. (typeof raw.current_event_id === "number" ? raw.current_event_id : undefined);
  202. if (typeof currentEventId === "number") {
  203. currentEventIdRef.current = Math.max(currentEventIdRef.current, currentEventId);
  204. }
  205. const trace = isRecord(data.trace) ? data.trace : undefined;
  206. const rawTrace = isRecord(raw.trace) ? raw.trace : undefined;
  207. const goalTree =
  208. (isRecord(data.goal_tree) ? data.goal_tree : undefined) ||
  209. (trace && isRecord(trace.goal_tree) ? trace.goal_tree : undefined) ||
  210. (isRecord(raw.goal_tree) ? raw.goal_tree : undefined) ||
  211. (rawTrace && isRecord(rawTrace.goal_tree) ? rawTrace.goal_tree : undefined) ||
  212. {};
  213. const goalList = isRecord(goalTree) ? goalTree.goals : undefined;
  214. const nextGoals = Array.isArray(goalList) ? (goalList as Goal[]) : [];
  215. setGoals((prev) => {
  216. const mergedFlat = nextGoals.map((ng) => {
  217. const existing = prev.find((p) => p.id === ng.id);
  218. if (!existing) return ng;
  219. const merged: Goal = { ...existing, ...ng };
  220. if (existing.sub_trace_ids && !merged.sub_trace_ids) {
  221. merged.sub_trace_ids = existing.sub_trace_ids;
  222. }
  223. if (existing.agent_call_mode && !merged.agent_call_mode) {
  224. merged.agent_call_mode = existing.agent_call_mode;
  225. }
  226. return merged;
  227. });
  228. return buildSubGoals(mergedFlat);
  229. });
  230. return;
  231. }
  232. if (event === "goal_added") {
  233. const goal = isGoalLike(data.goal) ? data.goal : isGoalLike(raw.goal) ? raw.goal : null;
  234. if (!goal) return;
  235. setGoals((prev: Goal[]) => {
  236. const next = [...prev];
  237. const idx = next.findIndex((g) => g.id === goal.id);
  238. if (idx >= 0) {
  239. const existing = next[idx];
  240. const merged = { ...existing, ...goal };
  241. // 保留 sub_trace_ids,如果 WebSocket 数据中缺失但本地已有
  242. if (existing.sub_trace_ids && !merged.sub_trace_ids) {
  243. merged.sub_trace_ids = existing.sub_trace_ids;
  244. }
  245. if (existing.agent_call_mode && !merged.agent_call_mode) {
  246. merged.agent_call_mode = existing.agent_call_mode;
  247. }
  248. next[idx] = merged;
  249. return buildSubGoals(next);
  250. }
  251. next.push(goal as Goal);
  252. return buildSubGoals(next);
  253. });
  254. return;
  255. }
  256. if (event === "goal_updated") {
  257. const goalId =
  258. (typeof data.goal_id === "string" ? data.goal_id : undefined) ||
  259. (isRecord(data.goal) && typeof data.goal.id === "string" ? data.goal.id : undefined) ||
  260. (typeof raw.goal_id === "string" ? raw.goal_id : undefined);
  261. const updates = isRecord(data.updates) ? data.updates : isRecord(raw.updates) ? raw.updates : {};
  262. if (!goalId) return;
  263. setGoals((prev: Goal[]) =>
  264. prev.map((g: Goal) => {
  265. if (g.id !== goalId) return g;
  266. const next: Goal = { ...g };
  267. if ("status" in updates) {
  268. const status = updates.status;
  269. if (typeof status === "string") {
  270. next.status = status as Goal["status"];
  271. }
  272. }
  273. if ("summary" in updates) {
  274. const summary = updates.summary;
  275. if (typeof summary === "string") {
  276. next.summary = summary;
  277. }
  278. }
  279. return next;
  280. }),
  281. );
  282. return;
  283. }
  284. if (event === "message_added") {
  285. const message = isMessage(data.message) ? data.message : isMessage(raw.message) ? raw.message : null;
  286. if (message) {
  287. setMessages((prev) => {
  288. const next = [...prev, message];
  289. next.sort((a, b) => messageSortKey(a) - messageSortKey(b));
  290. return next;
  291. });
  292. updateMessageGroups(message);
  293. }
  294. }
  295. },
  296. [messageSortKey, reloadViaRest, updateMessageGroups],
  297. );
  298. // 主 Trace 连接
  299. const wsOptions = useMemo(
  300. () => ({ onMessage: handleWebSocketMessage, sinceEventId }),
  301. [handleWebSocketMessage, sinceEventId],
  302. );
  303. const { connected } = useWebSocket(traceId, wsOptions);
  304. return { goals, messages, msgGroups, connected, reloading };
  305. };