package openai import ( "fmt" "io" "net/http" "strings" "time" "github.com/QuantumNous/new-api/common" "github.com/QuantumNous/new-api/dto" "github.com/QuantumNous/new-api/logger" relaycommon "github.com/QuantumNous/new-api/relay/common" "github.com/QuantumNous/new-api/relay/helper" "github.com/QuantumNous/new-api/service" "github.com/QuantumNous/new-api/types" "github.com/gin-gonic/gin" ) func responsesStreamIndexKey(itemID string, idx *int) string { if itemID == "" { return "" } if idx == nil { return itemID } return fmt.Sprintf("%s:%d", itemID, *idx) } func stringDeltaFromPrefix(prev string, next string) string { if next == "" { return "" } if prev != "" && strings.HasPrefix(next, prev) { return next[len(prev):] } return next } func OaiResponsesToChatHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Response) (*dto.Usage, *types.NewAPIError) { if resp == nil || resp.Body == nil { return nil, types.NewOpenAIError(fmt.Errorf("invalid response"), types.ErrorCodeBadResponse, http.StatusInternalServerError) } defer service.CloseResponseBodyGracefully(resp) var responsesResp dto.OpenAIResponsesResponse body, err := io.ReadAll(resp.Body) if err != nil { return nil, types.NewOpenAIError(err, types.ErrorCodeReadResponseBodyFailed, http.StatusInternalServerError) } if err := common.Unmarshal(body, &responsesResp); err != nil { return nil, types.NewOpenAIError(err, types.ErrorCodeBadResponseBody, http.StatusInternalServerError) } if oaiError := responsesResp.GetOpenAIError(); oaiError != nil && oaiError.Type != "" { return nil, types.WithOpenAIError(*oaiError, resp.StatusCode) } chatId := helper.GetResponseID(c) chatResp, usage, err := service.ResponsesResponseToChatCompletionsResponse(&responsesResp, chatId) if err != nil { return nil, types.NewOpenAIError(err, types.ErrorCodeBadResponseBody, http.StatusInternalServerError) } if usage == nil || usage.TotalTokens == 0 { text := service.ExtractOutputTextFromResponses(&responsesResp) usage = service.ResponseText2Usage(c, text, info.UpstreamModelName, info.GetEstimatePromptTokens()) chatResp.Usage = *usage } var responseBody []byte switch info.RelayFormat { case types.RelayFormatClaude: claudeResp := service.ResponseOpenAI2Claude(chatResp, info) responseBody, err = common.Marshal(claudeResp) case types.RelayFormatGemini: geminiResp := service.ResponseOpenAI2Gemini(chatResp, info) responseBody, err = common.Marshal(geminiResp) default: responseBody, err = common.Marshal(chatResp) } if err != nil { return nil, types.NewOpenAIError(err, types.ErrorCodeJsonMarshalFailed, http.StatusInternalServerError) } service.IOCopyBytesGracefully(c, resp, responseBody) return usage, nil } func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Response) (*dto.Usage, *types.NewAPIError) { if resp == nil || resp.Body == nil { return nil, types.NewOpenAIError(fmt.Errorf("invalid response"), types.ErrorCodeBadResponse, http.StatusInternalServerError) } defer service.CloseResponseBodyGracefully(resp) responseId := helper.GetResponseID(c) createAt := time.Now().Unix() model := info.UpstreamModelName var ( usage = &dto.Usage{} outputText strings.Builder usageText strings.Builder sentStart bool sentStop bool sawToolCall bool streamErr *types.NewAPIError ) toolCallIndexByID := make(map[string]int) toolCallNameByID := make(map[string]string) toolCallArgsByID := make(map[string]string) toolCallNameSent := make(map[string]bool) toolCallCanonicalIDByItemID := make(map[string]string) //reasoningSummaryTextByKey := make(map[string]string) if info.RelayFormat == types.RelayFormatClaude && info.ClaudeConvertInfo == nil { info.ClaudeConvertInfo = &relaycommon.ClaudeConvertInfo{LastMessagesType: relaycommon.LastMessageTypeNone} } sendChatChunk := func(chunk *dto.ChatCompletionsStreamResponse) bool { if chunk == nil { return true } if info.RelayFormat == types.RelayFormatOpenAI { if err := helper.ObjectData(c, chunk); err != nil { streamErr = types.NewOpenAIError(err, types.ErrorCodeBadResponse, http.StatusInternalServerError) return false } return true } chunkData, err := common.Marshal(chunk) if err != nil { streamErr = types.NewOpenAIError(err, types.ErrorCodeJsonMarshalFailed, http.StatusInternalServerError) return false } if err := HandleStreamFormat(c, info, string(chunkData), false, false); err != nil { streamErr = types.NewOpenAIError(err, types.ErrorCodeBadResponse, http.StatusInternalServerError) return false } return true } sendStartIfNeeded := func() bool { if sentStart { return true } if !sendChatChunk(helper.GenerateStartEmptyResponse(responseId, createAt, model, nil)) { return false } sentStart = true return true } //sendReasoningDelta := func(delta string) bool { // if delta == "" { // return true // } // if !sendStartIfNeeded() { // return false // } // // usageText.WriteString(delta) // chunk := &dto.ChatCompletionsStreamResponse{ // Id: responseId, // Object: "chat.completion.chunk", // Created: createAt, // Model: model, // Choices: []dto.ChatCompletionsStreamResponseChoice{ // { // Index: 0, // Delta: dto.ChatCompletionsStreamResponseChoiceDelta{ // ReasoningContent: &delta, // }, // }, // }, // } // if err := helper.ObjectData(c, chunk); err != nil { // streamErr = types.NewOpenAIError(err, types.ErrorCodeBadResponse, http.StatusInternalServerError) // return false // } // return true //} sendReasoningSummaryDelta := func(delta string) bool { if delta == "" { return true } if !sendStartIfNeeded() { return false } usageText.WriteString(delta) chunk := &dto.ChatCompletionsStreamResponse{ Id: responseId, Object: "chat.completion.chunk", Created: createAt, Model: model, Choices: []dto.ChatCompletionsStreamResponseChoice{ { Index: 0, Delta: dto.ChatCompletionsStreamResponseChoiceDelta{ ReasoningContent: &delta, }, }, }, } if !sendChatChunk(chunk) { return false } return true } sendToolCallDelta := func(callID string, name string, argsDelta string) bool { if callID == "" { return true } if outputText.Len() > 0 { // Prefer streaming assistant text over tool calls to match non-stream behavior. return true } if !sendStartIfNeeded() { return false } idx, ok := toolCallIndexByID[callID] if !ok { idx = len(toolCallIndexByID) toolCallIndexByID[callID] = idx } if name != "" { toolCallNameByID[callID] = name } if toolCallNameByID[callID] != "" { name = toolCallNameByID[callID] } tool := dto.ToolCallResponse{ ID: callID, Type: "function", Function: dto.FunctionResponse{ Arguments: argsDelta, }, } tool.SetIndex(idx) if name != "" && !toolCallNameSent[callID] { tool.Function.Name = name toolCallNameSent[callID] = true } chunk := &dto.ChatCompletionsStreamResponse{ Id: responseId, Object: "chat.completion.chunk", Created: createAt, Model: model, Choices: []dto.ChatCompletionsStreamResponseChoice{ { Index: 0, Delta: dto.ChatCompletionsStreamResponseChoiceDelta{ ToolCalls: []dto.ToolCallResponse{tool}, }, }, }, } if !sendChatChunk(chunk) { return false } sawToolCall = true // Include tool call data in the local builder for fallback token estimation. if tool.Function.Name != "" { usageText.WriteString(tool.Function.Name) } if argsDelta != "" { usageText.WriteString(argsDelta) } return true } helper.StreamScannerHandler(c, resp, info, func(data string) bool { if streamErr != nil { return false } var streamResp dto.ResponsesStreamResponse if err := common.UnmarshalJsonStr(data, &streamResp); err != nil { logger.LogError(c, "failed to unmarshal responses stream event: "+err.Error()) return true } switch streamResp.Type { case "response.created": if streamResp.Response != nil { if streamResp.Response.Model != "" { model = streamResp.Response.Model } if streamResp.Response.CreatedAt != 0 { createAt = int64(streamResp.Response.CreatedAt) } } //case "response.reasoning_text.delta": //if !sendReasoningDelta(streamResp.Delta) { // return false //} //case "response.reasoning_text.done": case "response.reasoning_summary_text.delta": if !sendReasoningSummaryDelta(streamResp.Delta) { return false } case "response.reasoning_summary_text.done": //case "response.reasoning_summary_part.added", "response.reasoning_summary_part.done": // key := responsesStreamIndexKey(strings.TrimSpace(streamResp.ItemID), streamResp.SummaryIndex) // if key == "" || streamResp.Part == nil { // break // } // // Only handle summary text parts, ignore other part types. // if streamResp.Part.Type != "" && streamResp.Part.Type != "summary_text" { // break // } // prev := reasoningSummaryTextByKey[key] // next := streamResp.Part.Text // delta := stringDeltaFromPrefix(prev, next) // reasoningSummaryTextByKey[key] = next // if !sendReasoningSummaryDelta(delta) { // return false // } case "response.output_text.delta": if !sendStartIfNeeded() { return false } if streamResp.Delta != "" { outputText.WriteString(streamResp.Delta) usageText.WriteString(streamResp.Delta) delta := streamResp.Delta chunk := &dto.ChatCompletionsStreamResponse{ Id: responseId, Object: "chat.completion.chunk", Created: createAt, Model: model, Choices: []dto.ChatCompletionsStreamResponseChoice{ { Index: 0, Delta: dto.ChatCompletionsStreamResponseChoiceDelta{ Content: &delta, }, }, }, } if !sendChatChunk(chunk) { return false } } case "response.output_item.added", "response.output_item.done": if streamResp.Item == nil { break } if streamResp.Item.Type != "function_call" { break } itemID := strings.TrimSpace(streamResp.Item.ID) callID := strings.TrimSpace(streamResp.Item.CallId) if callID == "" { callID = itemID } if itemID != "" && callID != "" { toolCallCanonicalIDByItemID[itemID] = callID } name := strings.TrimSpace(streamResp.Item.Name) if name != "" { toolCallNameByID[callID] = name } newArgs := streamResp.Item.Arguments prevArgs := toolCallArgsByID[callID] argsDelta := "" if newArgs != "" { if strings.HasPrefix(newArgs, prevArgs) { argsDelta = newArgs[len(prevArgs):] } else { argsDelta = newArgs } toolCallArgsByID[callID] = newArgs } if !sendToolCallDelta(callID, name, argsDelta) { return false } case "response.function_call_arguments.delta": itemID := strings.TrimSpace(streamResp.ItemID) callID := toolCallCanonicalIDByItemID[itemID] if callID == "" { callID = itemID } if callID == "" { break } toolCallArgsByID[callID] += streamResp.Delta if !sendToolCallDelta(callID, "", streamResp.Delta) { return false } case "response.function_call_arguments.done": case "response.completed": if streamResp.Response != nil { if streamResp.Response.Model != "" { model = streamResp.Response.Model } if streamResp.Response.CreatedAt != 0 { createAt = int64(streamResp.Response.CreatedAt) } if streamResp.Response.Usage != nil { if streamResp.Response.Usage.InputTokens != 0 { usage.PromptTokens = streamResp.Response.Usage.InputTokens usage.InputTokens = streamResp.Response.Usage.InputTokens } if streamResp.Response.Usage.OutputTokens != 0 { usage.CompletionTokens = streamResp.Response.Usage.OutputTokens usage.OutputTokens = streamResp.Response.Usage.OutputTokens } if streamResp.Response.Usage.TotalTokens != 0 { usage.TotalTokens = streamResp.Response.Usage.TotalTokens } else { usage.TotalTokens = usage.PromptTokens + usage.CompletionTokens } if streamResp.Response.Usage.InputTokensDetails != nil { usage.PromptTokensDetails.CachedTokens = streamResp.Response.Usage.InputTokensDetails.CachedTokens usage.PromptTokensDetails.ImageTokens = streamResp.Response.Usage.InputTokensDetails.ImageTokens usage.PromptTokensDetails.AudioTokens = streamResp.Response.Usage.InputTokensDetails.AudioTokens } if streamResp.Response.Usage.CompletionTokenDetails.ReasoningTokens != 0 { usage.CompletionTokenDetails.ReasoningTokens = streamResp.Response.Usage.CompletionTokenDetails.ReasoningTokens } } } if !sendStartIfNeeded() { return false } if !sentStop { if info.RelayFormat == types.RelayFormatClaude && info.ClaudeConvertInfo != nil { info.ClaudeConvertInfo.Usage = usage } finishReason := "stop" if sawToolCall && outputText.Len() == 0 { finishReason = "tool_calls" } stop := helper.GenerateStopResponse(responseId, createAt, model, finishReason) if !sendChatChunk(stop) { return false } sentStop = true } case "response.error", "response.failed": if streamResp.Response != nil { if oaiErr := streamResp.Response.GetOpenAIError(); oaiErr != nil && oaiErr.Type != "" { streamErr = types.WithOpenAIError(*oaiErr, http.StatusInternalServerError) return false } } streamErr = types.NewOpenAIError(fmt.Errorf("responses stream error: %s", streamResp.Type), types.ErrorCodeBadResponse, http.StatusInternalServerError) return false default: } return true }) if streamErr != nil { return nil, streamErr } if usage.TotalTokens == 0 { usage = service.ResponseText2Usage(c, usageText.String(), info.UpstreamModelName, info.GetEstimatePromptTokens()) } if !sentStart { if !sendChatChunk(helper.GenerateStartEmptyResponse(responseId, createAt, model, nil)) { return nil, streamErr } } if !sentStop { if info.RelayFormat == types.RelayFormatClaude && info.ClaudeConvertInfo != nil { info.ClaudeConvertInfo.Usage = usage } finishReason := "stop" if sawToolCall && outputText.Len() == 0 { finishReason = "tool_calls" } stop := helper.GenerateStopResponse(responseId, createAt, model, finishReason) if !sendChatChunk(stop) { return nil, streamErr } } if info.RelayFormat == types.RelayFormatOpenAI && info.ShouldIncludeUsage && usage != nil { if err := helper.ObjectData(c, helper.GenerateFinalUsageResponse(responseId, createAt, model, *usage)); err != nil { return nil, types.NewOpenAIError(err, types.ErrorCodeBadResponse, http.StatusInternalServerError) } } if info.RelayFormat == types.RelayFormatOpenAI { helper.Done(c) } return usage, nil }