api_request.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. package channel
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "regexp"
  9. "strings"
  10. "sync"
  11. "time"
  12. common2 "github.com/QuantumNous/new-api/common"
  13. "github.com/QuantumNous/new-api/logger"
  14. "github.com/QuantumNous/new-api/relay/common"
  15. "github.com/QuantumNous/new-api/relay/constant"
  16. "github.com/QuantumNous/new-api/relay/helper"
  17. "github.com/QuantumNous/new-api/service"
  18. "github.com/QuantumNous/new-api/setting/operation_setting"
  19. "github.com/QuantumNous/new-api/types"
  20. "github.com/bytedance/gopkg/util/gopool"
  21. "github.com/gin-gonic/gin"
  22. "github.com/gorilla/websocket"
  23. )
  24. func SetupApiRequestHeader(info *common.RelayInfo, c *gin.Context, req *http.Header) {
  25. if info.RelayMode == constant.RelayModeAudioTranscription || info.RelayMode == constant.RelayModeAudioTranslation {
  26. // multipart/form-data
  27. } else if info.RelayMode == constant.RelayModeRealtime {
  28. // websocket
  29. } else {
  30. req.Set("Content-Type", c.Request.Header.Get("Content-Type"))
  31. req.Set("Accept", c.Request.Header.Get("Accept"))
  32. if info.IsStream && c.Request.Header.Get("Accept") == "" {
  33. req.Set("Accept", "text/event-stream")
  34. }
  35. }
  36. }
  37. const clientHeaderPlaceholderPrefix = "{client_header:"
  38. const (
  39. headerPassthroughAllKey = "*"
  40. headerPassthroughRegexPrefix = "re:"
  41. headerPassthroughRegexPrefixV2 = "regex:"
  42. )
  43. var passthroughSkipHeaderNamesLower = map[string]struct{}{
  44. // RFC 7230 hop-by-hop headers.
  45. "connection": {},
  46. "keep-alive": {},
  47. "proxy-authenticate": {},
  48. "proxy-authorization": {},
  49. "te": {},
  50. "trailer": {},
  51. "transfer-encoding": {},
  52. "upgrade": {},
  53. "cookie": {},
  54. // Additional headers that should not be forwarded by name-matching passthrough rules.
  55. "host": {},
  56. "content-length": {},
  57. "accept-encoding": {},
  58. // Do not passthrough credentials by wildcard/regex.
  59. "authorization": {},
  60. "x-api-key": {},
  61. "x-goog-api-key": {},
  62. // WebSocket handshake headers are generated by the client/dialer.
  63. "sec-websocket-key": {},
  64. "sec-websocket-version": {},
  65. "sec-websocket-extensions": {},
  66. }
  67. var headerPassthroughRegexCache sync.Map // map[string]*regexp.Regexp
  68. func getHeaderPassthroughRegex(pattern string) (*regexp.Regexp, error) {
  69. pattern = strings.TrimSpace(pattern)
  70. if pattern == "" {
  71. return nil, errors.New("empty regex pattern")
  72. }
  73. if v, ok := headerPassthroughRegexCache.Load(pattern); ok {
  74. if re, ok := v.(*regexp.Regexp); ok {
  75. return re, nil
  76. }
  77. headerPassthroughRegexCache.Delete(pattern)
  78. }
  79. compiled, err := regexp.Compile(pattern)
  80. if err != nil {
  81. return nil, err
  82. }
  83. actual, _ := headerPassthroughRegexCache.LoadOrStore(pattern, compiled)
  84. if re, ok := actual.(*regexp.Regexp); ok {
  85. return re, nil
  86. }
  87. return compiled, nil
  88. }
  89. func IsHeaderPassthroughRuleKey(key string) bool {
  90. return isHeaderPassthroughRuleKey(key)
  91. }
  92. func isHeaderPassthroughRuleKey(key string) bool {
  93. key = strings.TrimSpace(key)
  94. if key == "" {
  95. return false
  96. }
  97. if key == headerPassthroughAllKey {
  98. return true
  99. }
  100. lower := strings.ToLower(key)
  101. return strings.HasPrefix(lower, headerPassthroughRegexPrefix) || strings.HasPrefix(lower, headerPassthroughRegexPrefixV2)
  102. }
  103. func shouldSkipPassthroughHeader(name string) bool {
  104. name = strings.TrimSpace(name)
  105. if name == "" {
  106. return true
  107. }
  108. lower := strings.ToLower(name)
  109. if _, ok := passthroughSkipHeaderNamesLower[lower]; ok {
  110. return true
  111. }
  112. return false
  113. }
  114. func applyHeaderOverridePlaceholders(template string, c *gin.Context, apiKey string) (string, bool, error) {
  115. trimmed := strings.TrimSpace(template)
  116. if strings.HasPrefix(trimmed, clientHeaderPlaceholderPrefix) {
  117. afterPrefix := trimmed[len(clientHeaderPlaceholderPrefix):]
  118. end := strings.Index(afterPrefix, "}")
  119. if end < 0 || end != len(afterPrefix)-1 {
  120. return "", false, fmt.Errorf("client_header placeholder must be the full value: %q", template)
  121. }
  122. name := strings.TrimSpace(afterPrefix[:end])
  123. if name == "" {
  124. return "", false, fmt.Errorf("client_header placeholder name is empty: %q", template)
  125. }
  126. if c == nil || c.Request == nil {
  127. return "", false, fmt.Errorf("missing request context for client_header placeholder")
  128. }
  129. clientHeaderValue := c.Request.Header.Get(name)
  130. if strings.TrimSpace(clientHeaderValue) == "" {
  131. return "", false, nil
  132. }
  133. // Do not interpolate {api_key} inside client-supplied content.
  134. return clientHeaderValue, true, nil
  135. }
  136. if strings.Contains(template, "{api_key}") {
  137. template = strings.ReplaceAll(template, "{api_key}", apiKey)
  138. }
  139. if strings.TrimSpace(template) == "" {
  140. return "", false, nil
  141. }
  142. return template, true, nil
  143. }
  144. // processHeaderOverride applies channel header overrides, with placeholder substitution.
  145. // Supported placeholders:
  146. // - {api_key}: resolved to the channel API key
  147. // - {client_header:<name>}: resolved to the incoming request header value
  148. //
  149. // Header passthrough rules (keys only; values are ignored):
  150. // - "*": passthrough all incoming headers by name (excluding unsafe headers)
  151. // - "re:<regex>" / "regex:<regex>": passthrough headers whose names match the regex (Go regexp)
  152. //
  153. // Passthrough rules are applied first, then normal overrides are applied, so explicit overrides win.
  154. func processHeaderOverride(info *common.RelayInfo, c *gin.Context) (map[string]string, error) {
  155. headerOverride := make(map[string]string)
  156. if info == nil {
  157. return headerOverride, nil
  158. }
  159. headerOverrideSource := common.GetEffectiveHeaderOverride(info)
  160. passAll := false
  161. var passthroughRegex []*regexp.Regexp
  162. if !info.IsChannelTest {
  163. for k := range headerOverrideSource {
  164. key := strings.TrimSpace(strings.ToLower(k))
  165. if key == "" {
  166. continue
  167. }
  168. if key == headerPassthroughAllKey {
  169. passAll = true
  170. continue
  171. }
  172. var pattern string
  173. switch {
  174. case strings.HasPrefix(key, headerPassthroughRegexPrefix):
  175. pattern = strings.TrimSpace(key[len(headerPassthroughRegexPrefix):])
  176. case strings.HasPrefix(key, headerPassthroughRegexPrefixV2):
  177. pattern = strings.TrimSpace(key[len(headerPassthroughRegexPrefixV2):])
  178. default:
  179. continue
  180. }
  181. if pattern == "" {
  182. return nil, types.NewError(fmt.Errorf("header passthrough regex pattern is empty: %q", k), types.ErrorCodeChannelHeaderOverrideInvalid)
  183. }
  184. compiled, err := getHeaderPassthroughRegex(pattern)
  185. if err != nil {
  186. return nil, types.NewError(err, types.ErrorCodeChannelHeaderOverrideInvalid)
  187. }
  188. passthroughRegex = append(passthroughRegex, compiled)
  189. }
  190. }
  191. if passAll || len(passthroughRegex) > 0 {
  192. if c == nil || c.Request == nil {
  193. return nil, types.NewError(fmt.Errorf("missing request context for header passthrough"), types.ErrorCodeChannelHeaderOverrideInvalid)
  194. }
  195. for name := range c.Request.Header {
  196. if shouldSkipPassthroughHeader(name) {
  197. continue
  198. }
  199. if !passAll {
  200. matched := false
  201. for _, re := range passthroughRegex {
  202. if re.MatchString(name) {
  203. matched = true
  204. break
  205. }
  206. }
  207. if !matched {
  208. continue
  209. }
  210. }
  211. value := strings.TrimSpace(c.Request.Header.Get(name))
  212. if value == "" {
  213. continue
  214. }
  215. headerOverride[strings.ToLower(strings.TrimSpace(name))] = value
  216. }
  217. }
  218. for k, v := range headerOverrideSource {
  219. if isHeaderPassthroughRuleKey(k) {
  220. continue
  221. }
  222. key := strings.TrimSpace(strings.ToLower(k))
  223. if key == "" {
  224. continue
  225. }
  226. str, ok := v.(string)
  227. if !ok {
  228. return nil, types.NewError(nil, types.ErrorCodeChannelHeaderOverrideInvalid)
  229. }
  230. if info.IsChannelTest && strings.HasPrefix(strings.TrimSpace(str), clientHeaderPlaceholderPrefix) {
  231. continue
  232. }
  233. value, include, err := applyHeaderOverridePlaceholders(str, c, info.ApiKey)
  234. if err != nil {
  235. return nil, types.NewError(err, types.ErrorCodeChannelHeaderOverrideInvalid)
  236. }
  237. if !include {
  238. continue
  239. }
  240. headerOverride[key] = value
  241. }
  242. return headerOverride, nil
  243. }
  244. func ResolveHeaderOverride(info *common.RelayInfo, c *gin.Context) (map[string]string, error) {
  245. return processHeaderOverride(info, c)
  246. }
  247. func applyHeaderOverrideToRequest(req *http.Request, headerOverride map[string]string) {
  248. if req == nil {
  249. return
  250. }
  251. for key, value := range headerOverride {
  252. req.Header.Set(key, value)
  253. // set Host in req
  254. if strings.EqualFold(key, "Host") {
  255. req.Host = value
  256. }
  257. }
  258. }
  259. func DoApiRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
  260. fullRequestURL, err := a.GetRequestURL(info)
  261. if err != nil {
  262. return nil, fmt.Errorf("get request url failed: %w", err)
  263. }
  264. if common2.DebugEnabled {
  265. println("fullRequestURL:", fullRequestURL)
  266. }
  267. req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
  268. if err != nil {
  269. return nil, fmt.Errorf("new request failed: %w", err)
  270. }
  271. headers := req.Header
  272. err = a.SetupRequestHeader(c, &headers, info)
  273. if err != nil {
  274. return nil, fmt.Errorf("setup request header failed: %w", err)
  275. }
  276. // 在 SetupRequestHeader 之后应用 Header Override,确保用户设置优先级最高
  277. // 这样可以覆盖默认的 Authorization header 设置
  278. headerOverride, err := processHeaderOverride(info, c)
  279. if err != nil {
  280. return nil, err
  281. }
  282. applyHeaderOverrideToRequest(req, headerOverride)
  283. resp, err := doRequest(c, req, info)
  284. if err != nil {
  285. return nil, fmt.Errorf("do request failed: %w", err)
  286. }
  287. return resp, nil
  288. }
  289. func DoFormRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
  290. fullRequestURL, err := a.GetRequestURL(info)
  291. if err != nil {
  292. return nil, fmt.Errorf("get request url failed: %w", err)
  293. }
  294. if common2.DebugEnabled {
  295. println("fullRequestURL:", fullRequestURL)
  296. }
  297. req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
  298. if err != nil {
  299. return nil, fmt.Errorf("new request failed: %w", err)
  300. }
  301. // set form data
  302. req.Header.Set("Content-Type", c.Request.Header.Get("Content-Type"))
  303. headers := req.Header
  304. err = a.SetupRequestHeader(c, &headers, info)
  305. if err != nil {
  306. return nil, fmt.Errorf("setup request header failed: %w", err)
  307. }
  308. // 在 SetupRequestHeader 之后应用 Header Override,确保用户设置优先级最高
  309. // 这样可以覆盖默认的 Authorization header 设置
  310. headerOverride, err := processHeaderOverride(info, c)
  311. if err != nil {
  312. return nil, err
  313. }
  314. applyHeaderOverrideToRequest(req, headerOverride)
  315. resp, err := doRequest(c, req, info)
  316. if err != nil {
  317. return nil, fmt.Errorf("do request failed: %w", err)
  318. }
  319. return resp, nil
  320. }
  321. func DoWssRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*websocket.Conn, error) {
  322. fullRequestURL, err := a.GetRequestURL(info)
  323. if err != nil {
  324. return nil, fmt.Errorf("get request url failed: %w", err)
  325. }
  326. targetHeader := http.Header{}
  327. err = a.SetupRequestHeader(c, &targetHeader, info)
  328. if err != nil {
  329. return nil, fmt.Errorf("setup request header failed: %w", err)
  330. }
  331. // 在 SetupRequestHeader 之后应用 Header Override,确保用户设置优先级最高
  332. // 这样可以覆盖默认的 Authorization header 设置
  333. headerOverride, err := processHeaderOverride(info, c)
  334. if err != nil {
  335. return nil, err
  336. }
  337. for key, value := range headerOverride {
  338. targetHeader.Set(key, value)
  339. }
  340. targetHeader.Set("Content-Type", c.Request.Header.Get("Content-Type"))
  341. targetConn, _, err := websocket.DefaultDialer.Dial(fullRequestURL, targetHeader)
  342. if err != nil {
  343. return nil, fmt.Errorf("dial failed to %s: %w", fullRequestURL, err)
  344. }
  345. // send request body
  346. //all, err := io.ReadAll(requestBody)
  347. //err = service.WssString(c, targetConn, string(all))
  348. return targetConn, nil
  349. }
  350. func startPingKeepAlive(c *gin.Context, pingInterval time.Duration) context.CancelFunc {
  351. pingerCtx, stopPinger := context.WithCancel(context.Background())
  352. gopool.Go(func() {
  353. defer func() {
  354. // 增加panic恢复处理
  355. if r := recover(); r != nil {
  356. if common2.DebugEnabled {
  357. println("SSE ping goroutine panic recovered:", fmt.Sprintf("%v", r))
  358. }
  359. }
  360. if common2.DebugEnabled {
  361. println("SSE ping goroutine stopped.")
  362. }
  363. }()
  364. if pingInterval <= 0 {
  365. pingInterval = helper.DefaultPingInterval
  366. }
  367. ticker := time.NewTicker(pingInterval)
  368. // 确保在任何情况下都清理ticker
  369. defer func() {
  370. ticker.Stop()
  371. if common2.DebugEnabled {
  372. println("SSE ping ticker stopped")
  373. }
  374. }()
  375. var pingMutex sync.Mutex
  376. if common2.DebugEnabled {
  377. println("SSE ping goroutine started")
  378. }
  379. // 增加超时控制,防止goroutine长时间运行
  380. maxPingDuration := 120 * time.Minute // 最大ping持续时间
  381. pingTimeout := time.NewTimer(maxPingDuration)
  382. defer pingTimeout.Stop()
  383. for {
  384. select {
  385. // 发送 ping 数据
  386. case <-ticker.C:
  387. if err := sendPingData(c, &pingMutex); err != nil {
  388. if common2.DebugEnabled {
  389. println("SSE ping error, stopping goroutine:", err.Error())
  390. }
  391. return
  392. }
  393. // 收到退出信号
  394. case <-pingerCtx.Done():
  395. return
  396. // request 结束
  397. case <-c.Request.Context().Done():
  398. return
  399. // 超时保护,防止goroutine无限运行
  400. case <-pingTimeout.C:
  401. if common2.DebugEnabled {
  402. println("SSE ping goroutine timeout, stopping")
  403. }
  404. return
  405. }
  406. }
  407. })
  408. return stopPinger
  409. }
  410. func sendPingData(c *gin.Context, mutex *sync.Mutex) error {
  411. // 增加超时控制,防止锁死等待
  412. done := make(chan error, 1)
  413. go func() {
  414. mutex.Lock()
  415. defer mutex.Unlock()
  416. err := helper.PingData(c)
  417. if err != nil {
  418. logger.LogError(c, "SSE ping error: "+err.Error())
  419. done <- err
  420. return
  421. }
  422. if common2.DebugEnabled {
  423. println("SSE ping data sent.")
  424. }
  425. done <- nil
  426. }()
  427. // 设置发送ping数据的超时时间
  428. select {
  429. case err := <-done:
  430. return err
  431. case <-time.After(10 * time.Second):
  432. return errors.New("SSE ping data send timeout")
  433. case <-c.Request.Context().Done():
  434. return errors.New("request context cancelled during ping")
  435. }
  436. }
  437. func DoRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http.Response, error) {
  438. return doRequest(c, req, info)
  439. }
  440. func doRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http.Response, error) {
  441. var client *http.Client
  442. var err error
  443. if info.ChannelSetting.Proxy != "" {
  444. client, err = service.NewProxyHttpClient(info.ChannelSetting.Proxy)
  445. if err != nil {
  446. return nil, fmt.Errorf("new proxy http client failed: %w", err)
  447. }
  448. } else {
  449. client = service.GetHttpClient()
  450. }
  451. var stopPinger context.CancelFunc
  452. if info.IsStream {
  453. helper.SetEventStreamHeaders(c)
  454. // 处理流式请求的 ping 保活
  455. generalSettings := operation_setting.GetGeneralSetting()
  456. if generalSettings.PingIntervalEnabled && !info.DisablePing {
  457. pingInterval := time.Duration(generalSettings.PingIntervalSeconds) * time.Second
  458. stopPinger = startPingKeepAlive(c, pingInterval)
  459. // 使用defer确保在任何情况下都能停止ping goroutine
  460. defer func() {
  461. if stopPinger != nil {
  462. stopPinger()
  463. if common2.DebugEnabled {
  464. println("SSE ping goroutine stopped by defer")
  465. }
  466. }
  467. }()
  468. }
  469. }
  470. resp, err := client.Do(req)
  471. if err != nil {
  472. logger.LogError(c, "do request failed: "+err.Error())
  473. return nil, types.NewError(err, types.ErrorCodeDoRequestFailed, types.ErrOptionWithHideErrMsg("upstream error: do request failed"))
  474. }
  475. if resp == nil {
  476. return nil, errors.New("resp is nil")
  477. }
  478. _ = req.Body.Close()
  479. _ = c.Request.Body.Close()
  480. return resp, nil
  481. }
  482. func DoTaskApiRequest(a TaskAdaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
  483. fullRequestURL, err := a.BuildRequestURL(info)
  484. if err != nil {
  485. return nil, err
  486. }
  487. req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
  488. if err != nil {
  489. return nil, fmt.Errorf("new request failed: %w", err)
  490. }
  491. req.GetBody = func() (io.ReadCloser, error) {
  492. return io.NopCloser(requestBody), nil
  493. }
  494. err = a.BuildRequestHeader(c, req, info)
  495. if err != nil {
  496. return nil, fmt.Errorf("setup request header failed: %w", err)
  497. }
  498. resp, err := doRequest(c, req, info)
  499. if err != nil {
  500. return nil, fmt.Errorf("do request failed: %w", err)
  501. }
  502. return resp, nil
  503. }