| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- package channel
- import (
- "context"
- "errors"
- "fmt"
- "io"
- "net/http"
- common2 "one-api/common"
- "one-api/relay/common"
- "one-api/relay/constant"
- "one-api/relay/helper"
- "one-api/service"
- "one-api/setting/operation_setting"
- "sync"
- "time"
- "github.com/bytedance/gopkg/util/gopool"
- "github.com/gin-gonic/gin"
- "github.com/gorilla/websocket"
- )
- func SetupApiRequestHeader(info *common.RelayInfo, c *gin.Context, req *http.Header) {
- if info.RelayMode == constant.RelayModeAudioTranscription || info.RelayMode == constant.RelayModeAudioTranslation {
- // multipart/form-data
- } else if info.RelayMode == constant.RelayModeRealtime {
- // websocket
- } else {
- req.Set("Content-Type", c.Request.Header.Get("Content-Type"))
- req.Set("Accept", c.Request.Header.Get("Accept"))
- if info.IsStream && c.Request.Header.Get("Accept") == "" {
- req.Set("Accept", "text/event-stream")
- }
- }
- }
- func DoApiRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
- fullRequestURL, err := a.GetRequestURL(info)
- if err != nil {
- return nil, fmt.Errorf("get request url failed: %w", err)
- }
- if common2.DebugEnabled {
- println("fullRequestURL:", fullRequestURL)
- }
- req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
- if err != nil {
- return nil, fmt.Errorf("new request failed: %w", err)
- }
- err = a.SetupRequestHeader(c, &req.Header, info)
- if err != nil {
- return nil, fmt.Errorf("setup request header failed: %w", err)
- }
- resp, err := doRequest(c, req, info)
- if err != nil {
- return nil, fmt.Errorf("do request failed: %w", err)
- }
- return resp, nil
- }
- func DoFormRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*http.Response, error) {
- fullRequestURL, err := a.GetRequestURL(info)
- if err != nil {
- return nil, fmt.Errorf("get request url failed: %w", err)
- }
- req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
- if err != nil {
- return nil, fmt.Errorf("new request failed: %w", err)
- }
- // set form data
- req.Header.Set("Content-Type", c.Request.Header.Get("Content-Type"))
- err = a.SetupRequestHeader(c, &req.Header, info)
- if err != nil {
- return nil, fmt.Errorf("setup request header failed: %w", err)
- }
- resp, err := doRequest(c, req, info)
- if err != nil {
- return nil, fmt.Errorf("do request failed: %w", err)
- }
- return resp, nil
- }
- func DoWssRequest(a Adaptor, c *gin.Context, info *common.RelayInfo, requestBody io.Reader) (*websocket.Conn, error) {
- fullRequestURL, err := a.GetRequestURL(info)
- if err != nil {
- return nil, fmt.Errorf("get request url failed: %w", err)
- }
- targetHeader := http.Header{}
- err = a.SetupRequestHeader(c, &targetHeader, info)
- if err != nil {
- return nil, fmt.Errorf("setup request header failed: %w", err)
- }
- targetHeader.Set("Content-Type", c.Request.Header.Get("Content-Type"))
- targetConn, _, err := websocket.DefaultDialer.Dial(fullRequestURL, targetHeader)
- if err != nil {
- return nil, fmt.Errorf("dial failed to %s: %w", fullRequestURL, err)
- }
- // send request body
- //all, err := io.ReadAll(requestBody)
- //err = service.WssString(c, targetConn, string(all))
- return targetConn, nil
- }
- func doRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http.Response, error) {
- var client *http.Client
- var err error
- if proxyURL, ok := info.ChannelSetting["proxy"]; ok {
- client, err = service.NewProxyHttpClient(proxyURL.(string))
- if err != nil {
- return nil, fmt.Errorf("new proxy http client failed: %w", err)
- }
- } else {
- client = service.GetHttpClient()
- }
- // 流式请求 ping 保活
- var stopPinger func()
- generalSettings := operation_setting.GetGeneralSetting()
- pingEnabled := generalSettings.PingIntervalEnabled
- var pingerWg sync.WaitGroup
- if info.IsStream {
- helper.SetEventStreamHeaders(c)
- pingInterval := time.Duration(generalSettings.PingIntervalSeconds) * time.Second
- var pingerCtx context.Context
- pingerCtx, stopPinger = context.WithCancel(c.Request.Context())
- if pingEnabled {
- pingerWg.Add(1)
- gopool.Go(func() {
- defer pingerWg.Done()
- if pingInterval <= 0 {
- pingInterval = helper.DefaultPingInterval
- }
- ticker := time.NewTicker(pingInterval)
- defer ticker.Stop()
- var pingMutex sync.Mutex
- if common2.DebugEnabled {
- println("SSE ping goroutine started")
- }
- for {
- select {
- case <-ticker.C:
- pingMutex.Lock()
- err2 := helper.PingData(c)
- pingMutex.Unlock()
- if err2 != nil {
- common2.LogError(c, "SSE ping error: "+err.Error())
- return
- }
- if common2.DebugEnabled {
- println("SSE ping data sent.")
- }
- case <-pingerCtx.Done():
- if common2.DebugEnabled {
- println("SSE ping goroutine stopped.")
- }
- return
- }
- }
- })
- }
- }
- resp, err := client.Do(req)
- // request结束后停止ping
- if info.IsStream && pingEnabled {
- stopPinger()
- pingerWg.Wait()
- }
- if err != nil {
- return nil, err
- }
- if resp == nil {
- return nil, errors.New("resp is nil")
- }
- _ = req.Body.Close()
- _ = c.Request.Body.Close()
- return resp, nil
- }
- func DoTaskApiRequest(a TaskAdaptor, c *gin.Context, info *common.TaskRelayInfo, requestBody io.Reader) (*http.Response, error) {
- fullRequestURL, err := a.BuildRequestURL(info)
- if err != nil {
- return nil, err
- }
- req, err := http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
- if err != nil {
- return nil, fmt.Errorf("new request failed: %w", err)
- }
- req.GetBody = func() (io.ReadCloser, error) {
- return io.NopCloser(requestBody), nil
- }
- err = a.BuildRequestHeader(c, req, info)
- if err != nil {
- return nil, fmt.Errorf("setup request header failed: %w", err)
- }
- resp, err := doRequest(c, req, info.RelayInfo)
- if err != nil {
- return nil, fmt.Errorf("do request failed: %w", err)
- }
- return resp, nil
- }
|