api_request.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. package channel
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "strings"
  9. "sync"
  10. "time"
  11. common2 "github.com/QuantumNous/new-api/common"
  12. "github.com/QuantumNous/new-api/logger"
  13. "github.com/QuantumNous/new-api/relay/common"
  14. "github.com/QuantumNous/new-api/relay/constant"
  15. "github.com/QuantumNous/new-api/relay/helper"
  16. "github.com/QuantumNous/new-api/service"
  17. "github.com/QuantumNous/new-api/setting/operation_setting"
  18. "github.com/QuantumNous/new-api/types"
  19. "github.com/bytedance/gopkg/util/gopool"
  20. "github.com/gin-gonic/gin"
  21. "github.com/gorilla/websocket"
  22. )
  23. func SetupApiRequestHeader(info *common.RelayInfo, c *gin.Context, req *http.Header) {
  24. if info.RelayMode == constant.RelayModeAudioTranscription || info.RelayMode == constant.RelayModeAudioTranslation {
  25. // multipart/form-data
  26. } else if info.RelayMode == constant.RelayModeImagesEdits {
  27. // multipart/form-data
  28. } else if info.RelayMode == constant.RelayModeRealtime {
  29. // websocket
  30. } else {
  31. req.Set("Content-Type", c.Request.Header.Get("Content-Type"))
  32. req.Set("Accept", c.Request.Header.Get("Accept"))
  33. if info.IsStream && c.Request.Header.Get("Accept") == "" {
  34. req.Set("Accept", "text/event-stream")
  35. }
  36. }
  37. }
  38. // processHeaderOverride 处理请求头覆盖,支持变量替换
  39. // 支持的变量:{api_key}
  40. func processHeaderOverride(info *common.RelayInfo) (map[string]string, error) {
  41. headerOverride := make(map[string]string)
  42. for k, v := range info.HeadersOverride {
  43. str, ok := v.(string)
  44. if !ok {
  45. return nil, types.NewError(nil, types.ErrorCodeChannelHeaderOverrideInvalid)
  46. }
  47. // 替换支持的变量
  48. if strings.Contains(str, "{api_key}") {
  49. str = strings.ReplaceAll(str, "{api_key}", info.ApiKey)
  50. }
  51. headerOverride[k] = str
  52. }
  53. return headerOverride, nil
  54. }
  55. func DoApiRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
  56. fullRequestURL, err := a.GetRequestURL(info)
  57. if err != nil {
  58. return nil, fmt.Errorf("get request url failed: %w", err)
  59. }
  60. if common2.DebugEnabled {
  61. println("fullRequestURL:", fullRequestURL)
  62. }
  63. req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
  64. if err != nil {
  65. return nil, fmt.Errorf("new request failed: %w", err)
  66. }
  67. headers := req.Header
  68. headerOverride, err := processHeaderOverride(info)
  69. if err != nil {
  70. return nil, err
  71. }
  72. for key, value := range headerOverride {
  73. headers.Set(key, value)
  74. }
  75. err = a.SetupRequestHeader(c, &headers, info)
  76. if err != nil {
  77. return nil, fmt.Errorf("setup request header failed: %w", err)
  78. }
  79. resp, err := doRequest(c, req, info)
  80. if err != nil {
  81. return nil, fmt.Errorf("do request failed: %w", err)
  82. }
  83. return resp, nil
  84. }
  85. func DoFormRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
  86. fullRequestURL, err := a.GetRequestURL(info)
  87. if err != nil {
  88. return nil, fmt.Errorf("get request url failed: %w", err)
  89. }
  90. if common2.DebugEnabled {
  91. println("fullRequestURL:", fullRequestURL)
  92. }
  93. req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
  94. if err != nil {
  95. return nil, fmt.Errorf("new request failed: %w", err)
  96. }
  97. // set form data
  98. req.Header.Set("Content-Type", c.Request.Header.Get("Content-Type"))
  99. headers := req.Header
  100. headerOverride, err := processHeaderOverride(info)
  101. if err != nil {
  102. return nil, err
  103. }
  104. for key, value := range headerOverride {
  105. headers.Set(key, value)
  106. }
  107. err = a.SetupRequestHeader(c, &headers, info)
  108. if err != nil {
  109. return nil, fmt.Errorf("setup request header failed: %w", err)
  110. }
  111. resp, err := doRequest(c, req, info)
  112. if err != nil {
  113. return nil, fmt.Errorf("do request failed: %w", err)
  114. }
  115. return resp, nil
  116. }
  117. func DoWssRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*websocket.Conn, error) {
  118. fullRequestURL, err := a.GetRequestURL(info)
  119. if err != nil {
  120. return nil, fmt.Errorf("get request url failed: %w", err)
  121. }
  122. targetHeader := http.Header{}
  123. headerOverride, err := processHeaderOverride(info)
  124. if err != nil {
  125. return nil, err
  126. }
  127. for key, value := range headerOverride {
  128. targetHeader.Set(key, value)
  129. }
  130. err = a.SetupRequestHeader(c, &targetHeader, info)
  131. if err != nil {
  132. return nil, fmt.Errorf("setup request header failed: %w", err)
  133. }
  134. targetHeader.Set("Content-Type", c.Request.Header.Get("Content-Type"))
  135. targetConn, _, err := websocket.DefaultDialer.Dial(fullRequestURL, targetHeader)
  136. if err != nil {
  137. return nil, fmt.Errorf("dial failed to %s: %w", fullRequestURL, err)
  138. }
  139. // send request body
  140. //all, err := io.ReadAll(requestBody)
  141. //err = service.WssString(c, targetConn, string(all))
  142. return targetConn, nil
  143. }
  144. func startPingKeepAlive(c *gin.Context, pingInterval time.Duration) context.CancelFunc {
  145. pingerCtx, stopPinger := context.WithCancel(context.Background())
  146. gopool.Go(func() {
  147. defer func() {
  148. // 增加panic恢复处理
  149. if r := recover(); r != nil {
  150. if common2.DebugEnabled {
  151. println("SSE ping goroutine panic recovered:", fmt.Sprintf("%v", r))
  152. }
  153. }
  154. if common2.DebugEnabled {
  155. println("SSE ping goroutine stopped.")
  156. }
  157. }()
  158. if pingInterval <= 0 {
  159. pingInterval = helper.DefaultPingInterval
  160. }
  161. ticker := time.NewTicker(pingInterval)
  162. // 确保在任何情况下都清理ticker
  163. defer func() {
  164. ticker.Stop()
  165. if common2.DebugEnabled {
  166. println("SSE ping ticker stopped")
  167. }
  168. }()
  169. var pingMutex sync.Mutex
  170. if common2.DebugEnabled {
  171. println("SSE ping goroutine started")
  172. }
  173. // 增加超时控制,防止goroutine长时间运行
  174. maxPingDuration := 120 * time.Minute // 最大ping持续时间
  175. pingTimeout := time.NewTimer(maxPingDuration)
  176. defer pingTimeout.Stop()
  177. for {
  178. select {
  179. // 发送 ping 数据
  180. case <-ticker.C:
  181. if err := sendPingData(c, &pingMutex); err != nil {
  182. if common2.DebugEnabled {
  183. println("SSE ping error, stopping goroutine:", err.Error())
  184. }
  185. return
  186. }
  187. // 收到退出信号
  188. case <-pingerCtx.Done():
  189. return
  190. // request 结束
  191. case <-c.Request.Context().Done():
  192. return
  193. // 超时保护,防止goroutine无限运行
  194. case <-pingTimeout.C:
  195. if common2.DebugEnabled {
  196. println("SSE ping goroutine timeout, stopping")
  197. }
  198. return
  199. }
  200. }
  201. })
  202. return stopPinger
  203. }
  204. func sendPingData(c *gin.Context, mutex *sync.Mutex) error {
  205. // 增加超时控制,防止锁死等待
  206. done := make(chan error, 1)
  207. go func() {
  208. mutex.Lock()
  209. defer mutex.Unlock()
  210. err := helper.PingData(c)
  211. if err != nil {
  212. logger.LogError(c, "SSE ping error: "+err.Error())
  213. done <- err
  214. return
  215. }
  216. if common2.DebugEnabled {
  217. println("SSE ping data sent.")
  218. }
  219. done <- nil
  220. }()
  221. // 设置发送ping数据的超时时间
  222. select {
  223. case err := <-done:
  224. return err
  225. case <-time.After(10 * time.Second):
  226. return errors.New("SSE ping data send timeout")
  227. case <-c.Request.Context().Done():
  228. return errors.New("request context cancelled during ping")
  229. }
  230. }
  231. func DoRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http.Response, error) {
  232. return doRequest(c, req, info)
  233. }
  234. func doRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http.Response, error) {
  235. var client *http.Client
  236. var err error
  237. if info.ChannelSetting.Proxy != "" {
  238. client, err = service.NewProxyHttpClient(info.ChannelSetting.Proxy)
  239. if err != nil {
  240. return nil, fmt.Errorf("new proxy http client failed: %w", err)
  241. }
  242. } else {
  243. client = service.GetHttpClient()
  244. }
  245. var stopPinger context.CancelFunc
  246. if info.IsStream {
  247. helper.SetEventStreamHeaders(c)
  248. // 处理流式请求的 ping 保活
  249. generalSettings := operation_setting.GetGeneralSetting()
  250. if generalSettings.PingIntervalEnabled && !info.DisablePing {
  251. pingInterval := time.Duration(generalSettings.PingIntervalSeconds) * time.Second
  252. stopPinger = startPingKeepAlive(c, pingInterval)
  253. // 使用defer确保在任何情况下都能停止ping goroutine
  254. defer func() {
  255. if stopPinger != nil {
  256. stopPinger()
  257. if common2.DebugEnabled {
  258. println("SSE ping goroutine stopped by defer")
  259. }
  260. }
  261. }()
  262. }
  263. }
  264. resp, err := client.Do(req)
  265. if err != nil {
  266. logger.LogError(c, "do request failed: "+err.Error())
  267. return nil, types.NewError(err, types.ErrorCodeDoRequestFailed, types.ErrOptionWithHideErrMsg("upstream error: do request failed"))
  268. }
  269. if resp == nil {
  270. return nil, errors.New("resp is nil")
  271. }
  272. _ = req.Body.Close()
  273. _ = c.Request.Body.Close()
  274. return resp, nil
  275. }
  276. func DoTaskApiRequest(a TaskAdaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
  277. fullRequestURL, err := a.BuildRequestURL(info)
  278. if err != nil {
  279. return nil, err
  280. }
  281. req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
  282. if err != nil {
  283. return nil, fmt.Errorf("new request failed: %w", err)
  284. }
  285. req.GetBody = func() (io.ReadCloser, error) {
  286. return io.NopCloser(requestBody), nil
  287. }
  288. err = a.BuildRequestHeader(c, req, info)
  289. if err != nil {
  290. return nil, fmt.Errorf("setup request header failed: %w", err)
  291. }
  292. resp, err := doRequest(c, req, info)
  293. if err != nil {
  294. return nil, fmt.Errorf("do request failed: %w", err)
  295. }
  296. return resp, nil
  297. }