api_request.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. package channel
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "strings"
  9. "sync"
  10. "time"
  11. common2 "github.com/QuantumNous/new-api/common"
  12. "github.com/QuantumNous/new-api/logger"
  13. "github.com/QuantumNous/new-api/relay/common"
  14. "github.com/QuantumNous/new-api/relay/constant"
  15. "github.com/QuantumNous/new-api/relay/helper"
  16. "github.com/QuantumNous/new-api/service"
  17. "github.com/QuantumNous/new-api/setting/operation_setting"
  18. "github.com/QuantumNous/new-api/types"
  19. "github.com/bytedance/gopkg/util/gopool"
  20. "github.com/gin-gonic/gin"
  21. "github.com/gorilla/websocket"
  22. )
  23. func SetupApiRequestHeader(info *common.RelayInfo, c *gin.Context, req *http.Header) {
  24. if info.RelayMode == constant.RelayModeAudioTranscription || info.RelayMode == constant.RelayModeAudioTranslation {
  25. // multipart/form-data
  26. } else if info.RelayMode == constant.RelayModeRealtime {
  27. // websocket
  28. } else {
  29. req.Set("Content-Type", c.Request.Header.Get("Content-Type"))
  30. req.Set("Accept", c.Request.Header.Get("Accept"))
  31. if info.IsStream && c.Request.Header.Get("Accept") == "" {
  32. req.Set("Accept", "text/event-stream")
  33. }
  34. }
  35. }
  36. const clientHeaderPlaceholderPrefix = "{client_header:"
  37. func applyHeaderOverridePlaceholders(template string, c *gin.Context, apiKey string) (string, bool, error) {
  38. trimmed := strings.TrimSpace(template)
  39. if strings.HasPrefix(trimmed, clientHeaderPlaceholderPrefix) {
  40. afterPrefix := trimmed[len(clientHeaderPlaceholderPrefix):]
  41. end := strings.Index(afterPrefix, "}")
  42. if end < 0 || end != len(afterPrefix)-1 {
  43. return "", false, fmt.Errorf("client_header placeholder must be the full value: %q", template)
  44. }
  45. name := strings.TrimSpace(afterPrefix[:end])
  46. if name == "" {
  47. return "", false, fmt.Errorf("client_header placeholder name is empty: %q", template)
  48. }
  49. if c == nil || c.Request == nil {
  50. return "", false, fmt.Errorf("missing request context for client_header placeholder")
  51. }
  52. clientHeaderValue := c.Request.Header.Get(name)
  53. if strings.TrimSpace(clientHeaderValue) == "" {
  54. return "", false, nil
  55. }
  56. // Do not interpolate {api_key} inside client-supplied content.
  57. return clientHeaderValue, true, nil
  58. }
  59. if strings.Contains(template, "{api_key}") {
  60. template = strings.ReplaceAll(template, "{api_key}", apiKey)
  61. }
  62. if strings.TrimSpace(template) == "" {
  63. return "", false, nil
  64. }
  65. return template, true, nil
  66. }
  67. // processHeaderOverride applies channel header overrides, with placeholder substitution.
  68. // Supported placeholders:
  69. // - {api_key}: resolved to the channel API key
  70. // - {client_header:<name>}: resolved to the incoming request header value
  71. func processHeaderOverride(info *common.RelayInfo, c *gin.Context) (map[string]string, error) {
  72. headerOverride := make(map[string]string)
  73. for k, v := range info.HeadersOverride {
  74. str, ok := v.(string)
  75. if !ok {
  76. return nil, types.NewError(nil, types.ErrorCodeChannelHeaderOverrideInvalid)
  77. }
  78. value, include, err := applyHeaderOverridePlaceholders(str, c, info.ApiKey)
  79. if err != nil {
  80. return nil, types.NewError(err, types.ErrorCodeChannelHeaderOverrideInvalid)
  81. }
  82. if !include {
  83. continue
  84. }
  85. headerOverride[k] = value
  86. }
  87. return headerOverride, nil
  88. }
  89. func applyHeaderOverrideToRequest(req *http.Request, headerOverride map[string]string) {
  90. if req == nil {
  91. return
  92. }
  93. for key, value := range headerOverride {
  94. req.Header.Set(key, value)
  95. // set Host in req
  96. if strings.EqualFold(key, "Host") {
  97. req.Host = value
  98. }
  99. }
  100. }
  101. func DoApiRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
  102. fullRequestURL, err := a.GetRequestURL(info)
  103. if err != nil {
  104. return nil, fmt.Errorf("get request url failed: %w", err)
  105. }
  106. if common2.DebugEnabled {
  107. println("fullRequestURL:", fullRequestURL)
  108. }
  109. req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
  110. if err != nil {
  111. return nil, fmt.Errorf("new request failed: %w", err)
  112. }
  113. headers := req.Header
  114. err = a.SetupRequestHeader(c, &headers, info)
  115. if err != nil {
  116. return nil, fmt.Errorf("setup request header failed: %w", err)
  117. }
  118. // 在 SetupRequestHeader 之后应用 Header Override,确保用户设置优先级最高
  119. // 这样可以覆盖默认的 Authorization header 设置
  120. headerOverride, err := processHeaderOverride(info, c)
  121. if err != nil {
  122. return nil, err
  123. }
  124. applyHeaderOverrideToRequest(req, headerOverride)
  125. resp, err := doRequest(c, req, info)
  126. if err != nil {
  127. return nil, fmt.Errorf("do request failed: %w", err)
  128. }
  129. return resp, nil
  130. }
  131. func DoFormRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
  132. fullRequestURL, err := a.GetRequestURL(info)
  133. if err != nil {
  134. return nil, fmt.Errorf("get request url failed: %w", err)
  135. }
  136. if common2.DebugEnabled {
  137. println("fullRequestURL:", fullRequestURL)
  138. }
  139. req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
  140. if err != nil {
  141. return nil, fmt.Errorf("new request failed: %w", err)
  142. }
  143. // set form data
  144. req.Header.Set("Content-Type", c.Request.Header.Get("Content-Type"))
  145. headers := req.Header
  146. err = a.SetupRequestHeader(c, &headers, info)
  147. if err != nil {
  148. return nil, fmt.Errorf("setup request header failed: %w", err)
  149. }
  150. // 在 SetupRequestHeader 之后应用 Header Override,确保用户设置优先级最高
  151. // 这样可以覆盖默认的 Authorization header 设置
  152. headerOverride, err := processHeaderOverride(info, c)
  153. if err != nil {
  154. return nil, err
  155. }
  156. applyHeaderOverrideToRequest(req, headerOverride)
  157. resp, err := doRequest(c, req, info)
  158. if err != nil {
  159. return nil, fmt.Errorf("do request failed: %w", err)
  160. }
  161. return resp, nil
  162. }
  163. func DoWssRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*websocket.Conn, error) {
  164. fullRequestURL, err := a.GetRequestURL(info)
  165. if err != nil {
  166. return nil, fmt.Errorf("get request url failed: %w", err)
  167. }
  168. targetHeader := http.Header{}
  169. err = a.SetupRequestHeader(c, &targetHeader, info)
  170. if err != nil {
  171. return nil, fmt.Errorf("setup request header failed: %w", err)
  172. }
  173. // 在 SetupRequestHeader 之后应用 Header Override,确保用户设置优先级最高
  174. // 这样可以覆盖默认的 Authorization header 设置
  175. headerOverride, err := processHeaderOverride(info, c)
  176. if err != nil {
  177. return nil, err
  178. }
  179. for key, value := range headerOverride {
  180. targetHeader.Set(key, value)
  181. }
  182. targetHeader.Set("Content-Type", c.Request.Header.Get("Content-Type"))
  183. targetConn, _, err := websocket.DefaultDialer.Dial(fullRequestURL, targetHeader)
  184. if err != nil {
  185. return nil, fmt.Errorf("dial failed to %s: %w", fullRequestURL, err)
  186. }
  187. // send request body
  188. //all, err := io.ReadAll(requestBody)
  189. //err = service.WssString(c, targetConn, string(all))
  190. return targetConn, nil
  191. }
  192. func startPingKeepAlive(c *gin.Context, pingInterval time.Duration) context.CancelFunc {
  193. pingerCtx, stopPinger := context.WithCancel(context.Background())
  194. gopool.Go(func() {
  195. defer func() {
  196. // 增加panic恢复处理
  197. if r := recover(); r != nil {
  198. if common2.DebugEnabled {
  199. println("SSE ping goroutine panic recovered:", fmt.Sprintf("%v", r))
  200. }
  201. }
  202. if common2.DebugEnabled {
  203. println("SSE ping goroutine stopped.")
  204. }
  205. }()
  206. if pingInterval <= 0 {
  207. pingInterval = helper.DefaultPingInterval
  208. }
  209. ticker := time.NewTicker(pingInterval)
  210. // 确保在任何情况下都清理ticker
  211. defer func() {
  212. ticker.Stop()
  213. if common2.DebugEnabled {
  214. println("SSE ping ticker stopped")
  215. }
  216. }()
  217. var pingMutex sync.Mutex
  218. if common2.DebugEnabled {
  219. println("SSE ping goroutine started")
  220. }
  221. // 增加超时控制,防止goroutine长时间运行
  222. maxPingDuration := 120 * time.Minute // 最大ping持续时间
  223. pingTimeout := time.NewTimer(maxPingDuration)
  224. defer pingTimeout.Stop()
  225. for {
  226. select {
  227. // 发送 ping 数据
  228. case <-ticker.C:
  229. if err := sendPingData(c, &pingMutex); err != nil {
  230. if common2.DebugEnabled {
  231. println("SSE ping error, stopping goroutine:", err.Error())
  232. }
  233. return
  234. }
  235. // 收到退出信号
  236. case <-pingerCtx.Done():
  237. return
  238. // request 结束
  239. case <-c.Request.Context().Done():
  240. return
  241. // 超时保护,防止goroutine无限运行
  242. case <-pingTimeout.C:
  243. if common2.DebugEnabled {
  244. println("SSE ping goroutine timeout, stopping")
  245. }
  246. return
  247. }
  248. }
  249. })
  250. return stopPinger
  251. }
  252. func sendPingData(c *gin.Context, mutex *sync.Mutex) error {
  253. // 增加超时控制,防止锁死等待
  254. done := make(chan error, 1)
  255. go func() {
  256. mutex.Lock()
  257. defer mutex.Unlock()
  258. err := helper.PingData(c)
  259. if err != nil {
  260. logger.LogError(c, "SSE ping error: "+err.Error())
  261. done <- err
  262. return
  263. }
  264. if common2.DebugEnabled {
  265. println("SSE ping data sent.")
  266. }
  267. done <- nil
  268. }()
  269. // 设置发送ping数据的超时时间
  270. select {
  271. case err := <-done:
  272. return err
  273. case <-time.After(10 * time.Second):
  274. return errors.New("SSE ping data send timeout")
  275. case <-c.Request.Context().Done():
  276. return errors.New("request context cancelled during ping")
  277. }
  278. }
  279. func DoRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http.Response, error) {
  280. return doRequest(c, req, info)
  281. }
  282. func doRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http.Response, error) {
  283. var client *http.Client
  284. var err error
  285. if info.ChannelSetting.Proxy != "" {
  286. client, err = service.NewProxyHttpClient(info.ChannelSetting.Proxy)
  287. if err != nil {
  288. return nil, fmt.Errorf("new proxy http client failed: %w", err)
  289. }
  290. } else {
  291. client = service.GetHttpClient()
  292. }
  293. var stopPinger context.CancelFunc
  294. if info.IsStream {
  295. helper.SetEventStreamHeaders(c)
  296. // 处理流式请求的 ping 保活
  297. generalSettings := operation_setting.GetGeneralSetting()
  298. if generalSettings.PingIntervalEnabled && !info.DisablePing {
  299. pingInterval := time.Duration(generalSettings.PingIntervalSeconds) * time.Second
  300. stopPinger = startPingKeepAlive(c, pingInterval)
  301. // 使用defer确保在任何情况下都能停止ping goroutine
  302. defer func() {
  303. if stopPinger != nil {
  304. stopPinger()
  305. if common2.DebugEnabled {
  306. println("SSE ping goroutine stopped by defer")
  307. }
  308. }
  309. }()
  310. }
  311. }
  312. resp, err := client.Do(req)
  313. if err != nil {
  314. logger.LogError(c, "do request failed: "+err.Error())
  315. return nil, types.NewError(err, types.ErrorCodeDoRequestFailed, types.ErrOptionWithHideErrMsg("upstream error: do request failed"))
  316. }
  317. if resp == nil {
  318. return nil, errors.New("resp is nil")
  319. }
  320. _ = req.Body.Close()
  321. _ = c.Request.Body.Close()
  322. return resp, nil
  323. }
  324. func DoTaskApiRequest(a TaskAdaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
  325. fullRequestURL, err := a.BuildRequestURL(info)
  326. if err != nil {
  327. return nil, err
  328. }
  329. req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
  330. if err != nil {
  331. return nil, fmt.Errorf("new request failed: %w", err)
  332. }
  333. req.GetBody = func() (io.ReadCloser, error) {
  334. return io.NopCloser(requestBody), nil
  335. }
  336. err = a.BuildRequestHeader(c, req, info)
  337. if err != nil {
  338. return nil, fmt.Errorf("setup request header failed: %w", err)
  339. }
  340. resp, err := doRequest(c, req, info)
  341. if err != nil {
  342. return nil, fmt.Errorf("do request failed: %w", err)
  343. }
  344. return resp, nil
  345. }