Просмотр исходного кода

feat: 支持设置流模式超时时间(gemini, claude)

CalciumIon 1 год назад
Родитель
Сommit
402a415c79

+ 8 - 3
relay/channel/claude/relay-claude.go

@@ -8,6 +8,7 @@ import (
 	"io"
 	"io"
 	"net/http"
 	"net/http"
 	"one-api/common"
 	"one-api/common"
+	"one-api/constant"
 	"one-api/dto"
 	"one-api/dto"
 	relaycommon "one-api/relay/common"
 	relaycommon "one-api/relay/common"
 	"one-api/service"
 	"one-api/service"
@@ -267,8 +268,8 @@ func claudeStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.
 		}
 		}
 		return 0, nil, nil
 		return 0, nil, nil
 	})
 	})
-	dataChan := make(chan string)
-	stopChan := make(chan bool)
+	dataChan := make(chan string, 5)
+	stopChan := make(chan bool, 2)
 	go func() {
 	go func() {
 		for scanner.Scan() {
 		for scanner.Scan() {
 			data := scanner.Text()
 			data := scanner.Text()
@@ -276,7 +277,11 @@ func claudeStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.
 				continue
 				continue
 			}
 			}
 			data = strings.TrimPrefix(data, "data: ")
 			data = strings.TrimPrefix(data, "data: ")
-			dataChan <- data
+			if !common.SafeSendStringTimeout(dataChan, data, constant.StreamingTimeout) {
+				// send data timeout, stop the stream
+				common.LogError(c, "send data timeout, stop the stream")
+				break
+			}
 		}
 		}
 		stopChan <- true
 		stopChan <- true
 	}()
 	}()

+ 8 - 3
relay/channel/gemini/relay-gemini.go

@@ -7,6 +7,7 @@ import (
 	"io"
 	"io"
 	"net/http"
 	"net/http"
 	"one-api/common"
 	"one-api/common"
+	"one-api/constant"
 	"one-api/dto"
 	"one-api/dto"
 	relaycommon "one-api/relay/common"
 	relaycommon "one-api/relay/common"
 	"one-api/service"
 	"one-api/service"
@@ -163,8 +164,8 @@ func streamResponseGeminiChat2OpenAI(geminiResponse *GeminiChatResponse) *dto.Ch
 
 
 func geminiChatStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo) (*dto.OpenAIErrorWithStatusCode, string) {
 func geminiChatStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo) (*dto.OpenAIErrorWithStatusCode, string) {
 	responseText := ""
 	responseText := ""
-	dataChan := make(chan string)
-	stopChan := make(chan bool)
+	dataChan := make(chan string, 5)
+	stopChan := make(chan bool, 2)
 	scanner := bufio.NewScanner(resp.Body)
 	scanner := bufio.NewScanner(resp.Body)
 	scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
 	scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
 		if atEOF && len(data) == 0 {
 		if atEOF && len(data) == 0 {
@@ -187,7 +188,11 @@ func geminiChatStreamHandler(c *gin.Context, resp *http.Response, info *relaycom
 			}
 			}
 			data = strings.TrimPrefix(data, "\"text\": \"")
 			data = strings.TrimPrefix(data, "\"text\": \"")
 			data = strings.TrimSuffix(data, "\"")
 			data = strings.TrimSuffix(data, "\"")
-			dataChan <- data
+			if !common.SafeSendStringTimeout(dataChan, data, constant.StreamingTimeout) {
+				// send data timeout, stop the stream
+				common.LogError(c, "send data timeout, stop the stream")
+				break
+			}
 		}
 		}
 		stopChan <- true
 		stopChan <- true
 	}()
 	}()

+ 1 - 1
relay/channel/openai/relay-openai.go

@@ -54,7 +54,7 @@ func OpenaiStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.
 			}
 			}
 			if !common.SafeSendStringTimeout(dataChan, data, constant.StreamingTimeout) {
 			if !common.SafeSendStringTimeout(dataChan, data, constant.StreamingTimeout) {
 				// send data timeout, stop the stream
 				// send data timeout, stop the stream
-				common.LogInfo(c, "send data timeout, stop the stream")
+				common.LogError(c, "send data timeout, stop the stream")
 				break
 				break
 			}
 			}
 			data = data[6:]
 			data = data[6:]