| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550 |
- package channel
- import (
- "context"
- "errors"
- "fmt"
- "io"
- "net/http"
- "regexp"
- "strings"
- "sync"
- "time"
- common2 "github.com/QuantumNous/new-api/common"
- "github.com/QuantumNous/new-api/logger"
- "github.com/QuantumNous/new-api/relay/common"
- "github.com/QuantumNous/new-api/relay/constant"
- "github.com/QuantumNous/new-api/relay/helper"
- "github.com/QuantumNous/new-api/service"
- "github.com/QuantumNous/new-api/setting/operation_setting"
- "github.com/QuantumNous/new-api/types"
- "github.com/bytedance/gopkg/util/gopool"
- "github.com/gin-gonic/gin"
- "github.com/gorilla/websocket"
- )
- func SetupApiRequestHeader(info *common.RelayInfo, c *gin.Context, req *http.Header) {
- if info.RelayMode == constant.RelayModeAudioTranscription || info.RelayMode == constant.RelayModeAudioTranslation {
- // multipart/form-data
- } else if info.RelayMode == constant.RelayModeRealtime {
- // websocket
- } else {
- req.Set("Content-Type", c.Request.Header.Get("Content-Type"))
- req.Set("Accept", c.Request.Header.Get("Accept"))
- if info.IsStream && c.Request.Header.Get("Accept") == "" {
- req.Set("Accept", "text/event-stream")
- }
- }
- }
- const clientHeaderPlaceholderPrefix = "{client_header:"
- const (
- headerPassthroughAllKey = "*"
- headerPassthroughRegexPrefix = "re:"
- headerPassthroughRegexPrefixV2 = "regex:"
- )
- var passthroughSkipHeaderNamesLower = map[string]struct{}{
- // RFC 7230 hop-by-hop headers.
- "connection": {},
- "keep-alive": {},
- "proxy-authenticate": {},
- "proxy-authorization": {},
- "te": {},
- "trailer": {},
- "transfer-encoding": {},
- "upgrade": {},
- "cookie": {},
- // Additional headers that should not be forwarded by name-matching passthrough rules.
- "host": {},
- "content-length": {},
- // Do not passthrough credentials by wildcard/regex.
- "authorization": {},
- "x-api-key": {},
- "x-goog-api-key": {},
- // WebSocket handshake headers are generated by the client/dialer.
- "sec-websocket-key": {},
- "sec-websocket-version": {},
- "sec-websocket-extensions": {},
- }
- var headerPassthroughRegexCache sync.Map // map[string]*regexp.Regexp
- func getHeaderPassthroughRegex(pattern string) (*regexp.Regexp, error) {
- pattern = strings.TrimSpace(pattern)
- if pattern == "" {
- return nil, errors.New("empty regex pattern")
- }
- if v, ok := headerPassthroughRegexCache.Load(pattern); ok {
- if re, ok := v.(*regexp.Regexp); ok {
- return re, nil
- }
- headerPassthroughRegexCache.Delete(pattern)
- }
- compiled, err := regexp.Compile(pattern)
- if err != nil {
- return nil, err
- }
- actual, _ := headerPassthroughRegexCache.LoadOrStore(pattern, compiled)
- if re, ok := actual.(*regexp.Regexp); ok {
- return re, nil
- }
- return compiled, nil
- }
- func isHeaderPassthroughRuleKey(key string) bool {
- key = strings.TrimSpace(key)
- if key == "" {
- return false
- }
- if key == headerPassthroughAllKey {
- return true
- }
- lower := strings.ToLower(key)
- return strings.HasPrefix(lower, headerPassthroughRegexPrefix) || strings.HasPrefix(lower, headerPassthroughRegexPrefixV2)
- }
- func shouldSkipPassthroughHeader(name string) bool {
- name = strings.TrimSpace(name)
- if name == "" {
- return true
- }
- lower := strings.ToLower(name)
- if _, ok := passthroughSkipHeaderNamesLower[lower]; ok {
- return true
- }
- return false
- }
- func applyHeaderOverridePlaceholders(template string, c *gin.Context, apiKey string) (string, bool, error) {
- trimmed := strings.TrimSpace(template)
- if strings.HasPrefix(trimmed, clientHeaderPlaceholderPrefix) {
- afterPrefix := trimmed[len(clientHeaderPlaceholderPrefix):]
- end := strings.Index(afterPrefix, "}")
- if end < 0 || end != len(afterPrefix)-1 {
- return "", false, fmt.Errorf("client_header placeholder must be the full value: %q", template)
- }
- name := strings.TrimSpace(afterPrefix[:end])
- if name == "" {
- return "", false, fmt.Errorf("client_header placeholder name is empty: %q", template)
- }
- if c == nil || c.Request == nil {
- return "", false, fmt.Errorf("missing request context for client_header placeholder")
- }
- clientHeaderValue := c.Request.Header.Get(name)
- if strings.TrimSpace(clientHeaderValue) == "" {
- return "", false, nil
- }
- // Do not interpolate {api_key} inside client-supplied content.
- return clientHeaderValue, true, nil
- }
- if strings.Contains(template, "{api_key}") {
- template = strings.ReplaceAll(template, "{api_key}", apiKey)
- }
- if strings.TrimSpace(template) == "" {
- return "", false, nil
- }
- return template, true, nil
- }
- // processHeaderOverride applies channel header overrides, with placeholder substitution.
- // Supported placeholders:
- // - {api_key}: resolved to the channel API key
- // - {client_header:<name>}: resolved to the incoming request header value
- //
- // Header passthrough rules (keys only; values are ignored):
- // - "*": passthrough all incoming headers by name (excluding unsafe headers)
- // - "re:<regex>" / "regex:<regex>": passthrough headers whose names match the regex (Go regexp)
- //
- // Passthrough rules are applied first, then normal overrides are applied, so explicit overrides win.
- func processHeaderOverride(info *common.RelayInfo, c *gin.Context) (map[string]string, error) {
- headerOverride := make(map[string]string)
- if info == nil {
- return headerOverride, nil
- }
- headerOverrideSource := info.HeadersOverride
- if info.UseRuntimeHeadersOverride {
- headerOverrideSource = info.RuntimeHeadersOverride
- }
- passAll := false
- var passthroughRegex []*regexp.Regexp
- if !info.IsChannelTest {
- for k := range headerOverrideSource {
- key := strings.TrimSpace(k)
- if key == "" {
- continue
- }
- if key == headerPassthroughAllKey {
- passAll = true
- continue
- }
- lower := strings.ToLower(key)
- var pattern string
- switch {
- case strings.HasPrefix(lower, headerPassthroughRegexPrefix):
- pattern = strings.TrimSpace(key[len(headerPassthroughRegexPrefix):])
- case strings.HasPrefix(lower, headerPassthroughRegexPrefixV2):
- pattern = strings.TrimSpace(key[len(headerPassthroughRegexPrefixV2):])
- default:
- continue
- }
- if pattern == "" {
- return nil, types.NewError(fmt.Errorf("header passthrough regex pattern is empty: %q", k), types.ErrorCodeChannelHeaderOverrideInvalid)
- }
- compiled, err := getHeaderPassthroughRegex(pattern)
- if err != nil {
- return nil, types.NewError(err, types.ErrorCodeChannelHeaderOverrideInvalid)
- }
- passthroughRegex = append(passthroughRegex, compiled)
- }
- }
- if passAll || len(passthroughRegex) > 0 {
- if c == nil || c.Request == nil {
- return nil, types.NewError(fmt.Errorf("missing request context for header passthrough"), types.ErrorCodeChannelHeaderOverrideInvalid)
- }
- for name := range c.Request.Header {
- if shouldSkipPassthroughHeader(name) {
- continue
- }
- if !passAll {
- matched := false
- for _, re := range passthroughRegex {
- if re.MatchString(name) {
- matched = true
- break
- }
- }
- if !matched {
- continue
- }
- }
- value := strings.TrimSpace(c.Request.Header.Get(name))
- if value == "" {
- continue
- }
- headerOverride[name] = value
- }
- }
- for k, v := range headerOverrideSource {
- if isHeaderPassthroughRuleKey(k) {
- continue
- }
- key := strings.TrimSpace(k)
- if key == "" {
- continue
- }
- str, ok := v.(string)
- if !ok {
- return nil, types.NewError(nil, types.ErrorCodeChannelHeaderOverrideInvalid)
- }
- if info.IsChannelTest && strings.HasPrefix(strings.TrimSpace(str), clientHeaderPlaceholderPrefix) {
- continue
- }
- value, include, err := applyHeaderOverridePlaceholders(str, c, info.ApiKey)
- if err != nil {
- return nil, types.NewError(err, types.ErrorCodeChannelHeaderOverrideInvalid)
- }
- if !include {
- continue
- }
- headerOverride[key] = value
- }
- return headerOverride, nil
- }
- func applyHeaderOverrideToRequest(req *http.Request, headerOverride map[string]string) {
- if req == nil {
- return
- }
- for key, value := range headerOverride {
- req.Header.Set(key, value)
- // set Host in req
- if strings.EqualFold(key, "Host") {
- req.Host = value
- }
- }
- }
- func DoApiRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
- fullRequestURL, err := a.GetRequestURL(info)
- if err != nil {
- return nil, fmt.Errorf("get request url failed: %w", err)
- }
- if common2.DebugEnabled {
- println("fullRequestURL:", fullRequestURL)
- }
- req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
- if err != nil {
- return nil, fmt.Errorf("new request failed: %w", err)
- }
- headers := req.Header
- err = a.SetupRequestHeader(c, &headers, info)
- if err != nil {
- return nil, fmt.Errorf("setup request header failed: %w", err)
- }
- // 在 SetupRequestHeader 之后应用 Header Override,确保用户设置优先级最高
- // 这样可以覆盖默认的 Authorization header 设置
- headerOverride, err := processHeaderOverride(info, c)
- if err != nil {
- return nil, err
- }
- applyHeaderOverrideToRequest(req, headerOverride)
- resp, err := doRequest(c, req, info)
- if err != nil {
- return nil, fmt.Errorf("do request failed: %w", err)
- }
- return resp, nil
- }
- func DoFormRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
- fullRequestURL, err := a.GetRequestURL(info)
- if err != nil {
- return nil, fmt.Errorf("get request url failed: %w", err)
- }
- if common2.DebugEnabled {
- println("fullRequestURL:", fullRequestURL)
- }
- req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
- if err != nil {
- return nil, fmt.Errorf("new request failed: %w", err)
- }
- // set form data
- req.Header.Set("Content-Type", c.Request.Header.Get("Content-Type"))
- headers := req.Header
- err = a.SetupRequestHeader(c, &headers, info)
- if err != nil {
- return nil, fmt.Errorf("setup request header failed: %w", err)
- }
- // 在 SetupRequestHeader 之后应用 Header Override,确保用户设置优先级最高
- // 这样可以覆盖默认的 Authorization header 设置
- headerOverride, err := processHeaderOverride(info, c)
- if err != nil {
- return nil, err
- }
- applyHeaderOverrideToRequest(req, headerOverride)
- resp, err := doRequest(c, req, info)
- if err != nil {
- return nil, fmt.Errorf("do request failed: %w", err)
- }
- return resp, nil
- }
- func DoWssRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*websocket.Conn, error) {
- fullRequestURL, err := a.GetRequestURL(info)
- if err != nil {
- return nil, fmt.Errorf("get request url failed: %w", err)
- }
- targetHeader := http.Header{}
- err = a.SetupRequestHeader(c, &targetHeader, info)
- if err != nil {
- return nil, fmt.Errorf("setup request header failed: %w", err)
- }
- // 在 SetupRequestHeader 之后应用 Header Override,确保用户设置优先级最高
- // 这样可以覆盖默认的 Authorization header 设置
- headerOverride, err := processHeaderOverride(info, c)
- if err != nil {
- return nil, err
- }
- for key, value := range headerOverride {
- targetHeader.Set(key, value)
- }
- targetHeader.Set("Content-Type", c.Request.Header.Get("Content-Type"))
- targetConn, _, err := websocket.DefaultDialer.Dial(fullRequestURL, targetHeader)
- if err != nil {
- return nil, fmt.Errorf("dial failed to %s: %w", fullRequestURL, err)
- }
- // send request body
- //all, err := io.ReadAll(requestBody)
- //err = service.WssString(c, targetConn, string(all))
- return targetConn, nil
- }
- func startPingKeepAlive(c *gin.Context, pingInterval time.Duration) context.CancelFunc {
- pingerCtx, stopPinger := context.WithCancel(context.Background())
- gopool.Go(func() {
- defer func() {
- // 增加panic恢复处理
- if r := recover(); r != nil {
- if common2.DebugEnabled {
- println("SSE ping goroutine panic recovered:", fmt.Sprintf("%v", r))
- }
- }
- if common2.DebugEnabled {
- println("SSE ping goroutine stopped.")
- }
- }()
- if pingInterval <= 0 {
- pingInterval = helper.DefaultPingInterval
- }
- ticker := time.NewTicker(pingInterval)
- // 确保在任何情况下都清理ticker
- defer func() {
- ticker.Stop()
- if common2.DebugEnabled {
- println("SSE ping ticker stopped")
- }
- }()
- var pingMutex sync.Mutex
- if common2.DebugEnabled {
- println("SSE ping goroutine started")
- }
- // 增加超时控制,防止goroutine长时间运行
- maxPingDuration := 120 * time.Minute // 最大ping持续时间
- pingTimeout := time.NewTimer(maxPingDuration)
- defer pingTimeout.Stop()
- for {
- select {
- // 发送 ping 数据
- case <-ticker.C:
- if err := sendPingData(c, &pingMutex); err != nil {
- if common2.DebugEnabled {
- println("SSE ping error, stopping goroutine:", err.Error())
- }
- return
- }
- // 收到退出信号
- case <-pingerCtx.Done():
- return
- // request 结束
- case <-c.Request.Context().Done():
- return
- // 超时保护,防止goroutine无限运行
- case <-pingTimeout.C:
- if common2.DebugEnabled {
- println("SSE ping goroutine timeout, stopping")
- }
- return
- }
- }
- })
- return stopPinger
- }
- func sendPingData(c *gin.Context, mutex *sync.Mutex) error {
- // 增加超时控制,防止锁死等待
- done := make(chan error, 1)
- go func() {
- mutex.Lock()
- defer mutex.Unlock()
- err := helper.PingData(c)
- if err != nil {
- logger.LogError(c, "SSE ping error: "+err.Error())
- done <- err
- return
- }
- if common2.DebugEnabled {
- println("SSE ping data sent.")
- }
- done <- nil
- }()
- // 设置发送ping数据的超时时间
- select {
- case err := <-done:
- return err
- case <-time.After(10 * time.Second):
- return errors.New("SSE ping data send timeout")
- case <-c.Request.Context().Done():
- return errors.New("request context cancelled during ping")
- }
- }
- func DoRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http.Response, error) {
- return doRequest(c, req, info)
- }
- func doRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http.Response, error) {
- var client *http.Client
- var err error
- if info.ChannelSetting.Proxy != "" {
- client, err = service.NewProxyHttpClient(info.ChannelSetting.Proxy)
- if err != nil {
- return nil, fmt.Errorf("new proxy http client failed: %w", err)
- }
- } else {
- client = service.GetHttpClient()
- }
- var stopPinger context.CancelFunc
- if info.IsStream {
- helper.SetEventStreamHeaders(c)
- // 处理流式请求的 ping 保活
- generalSettings := operation_setting.GetGeneralSetting()
- if generalSettings.PingIntervalEnabled && !info.DisablePing {
- pingInterval := time.Duration(generalSettings.PingIntervalSeconds) * time.Second
- stopPinger = startPingKeepAlive(c, pingInterval)
- // 使用defer确保在任何情况下都能停止ping goroutine
- defer func() {
- if stopPinger != nil {
- stopPinger()
- if common2.DebugEnabled {
- println("SSE ping goroutine stopped by defer")
- }
- }
- }()
- }
- }
- resp, err := client.Do(req)
- if err != nil {
- logger.LogError(c, "do request failed: "+err.Error())
- return nil, types.NewError(err, types.ErrorCodeDoRequestFailed, types.ErrOptionWithHideErrMsg("upstream error: do request failed"))
- }
- if resp == nil {
- return nil, errors.New("resp is nil")
- }
- _ = req.Body.Close()
- _ = c.Request.Body.Close()
- return resp, nil
- }
- func DoTaskApiRequest(a TaskAdaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
- fullRequestURL, err := a.BuildRequestURL(info)
- if err != nil {
- return nil, err
- }
- req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
- if err != nil {
- return nil, fmt.Errorf("new request failed: %w", err)
- }
- req.GetBody = func() (io.ReadCloser, error) {
- return io.NopCloser(requestBody), nil
- }
- err = a.BuildRequestHeader(c, req, info)
- if err != nil {
- return nil, fmt.Errorf("setup request header failed: %w", err)
- }
- resp, err := doRequest(c, req, info)
- if err != nil {
- return nil, fmt.Errorf("do request failed: %w", err)
- }
- return resp, nil
- }
|