relay-aws.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. package aws
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "strings"
  9. "time"
  10. "github.com/QuantumNous/new-api/common"
  11. "github.com/QuantumNous/new-api/dto"
  12. "github.com/QuantumNous/new-api/relay/channel"
  13. "github.com/QuantumNous/new-api/relay/channel/claude"
  14. relaycommon "github.com/QuantumNous/new-api/relay/common"
  15. "github.com/QuantumNous/new-api/relay/helper"
  16. "github.com/QuantumNous/new-api/service"
  17. "github.com/QuantumNous/new-api/types"
  18. "github.com/gin-gonic/gin"
  19. "github.com/pkg/errors"
  20. "github.com/QuantumNous/new-api/setting/model_setting"
  21. "github.com/aws/aws-sdk-go-v2/aws"
  22. "github.com/aws/aws-sdk-go-v2/credentials"
  23. "github.com/aws/aws-sdk-go-v2/service/bedrockruntime"
  24. bedrockruntimeTypes "github.com/aws/aws-sdk-go-v2/service/bedrockruntime/types"
  25. "github.com/aws/smithy-go/auth/bearer"
  26. )
  27. // getAwsErrorStatusCode extracts HTTP status code from AWS SDK error
  28. func getAwsErrorStatusCode(err error) int {
  29. // Check for HTTP response error which contains status code
  30. var httpErr interface{ HTTPStatusCode() int }
  31. if errors.As(err, &httpErr) {
  32. return httpErr.HTTPStatusCode()
  33. }
  34. // Default to 500 if we can't determine the status code
  35. return http.StatusInternalServerError
  36. }
  37. func newAwsInvokeContext() (context.Context, context.CancelFunc) {
  38. if common.RelayTimeout <= 0 {
  39. return context.Background(), func() {}
  40. }
  41. return context.WithTimeout(context.Background(), time.Duration(common.RelayTimeout)*time.Second)
  42. }
  43. func newAwsClient(c *gin.Context, info *relaycommon.RelayInfo) (*bedrockruntime.Client, error) {
  44. var (
  45. httpClient *http.Client
  46. err error
  47. )
  48. if info.ChannelSetting.Proxy != "" {
  49. httpClient, err = service.NewProxyHttpClient(info.ChannelSetting.Proxy)
  50. if err != nil {
  51. return nil, fmt.Errorf("new proxy http client failed: %w", err)
  52. }
  53. } else {
  54. httpClient = service.GetHttpClient()
  55. }
  56. awsSecret := strings.Split(info.ApiKey, "|")
  57. var client *bedrockruntime.Client
  58. switch len(awsSecret) {
  59. case 2:
  60. apiKey := awsSecret[0]
  61. region := awsSecret[1]
  62. client = bedrockruntime.New(bedrockruntime.Options{
  63. Region: region,
  64. BearerAuthTokenProvider: bearer.StaticTokenProvider{Token: bearer.Token{Value: apiKey}},
  65. HTTPClient: httpClient,
  66. })
  67. case 3:
  68. ak := awsSecret[0]
  69. sk := awsSecret[1]
  70. region := awsSecret[2]
  71. client = bedrockruntime.New(bedrockruntime.Options{
  72. Region: region,
  73. Credentials: aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(ak, sk, "")),
  74. HTTPClient: httpClient,
  75. })
  76. default:
  77. return nil, errors.New("invalid aws secret key")
  78. }
  79. return client, nil
  80. }
  81. func doAwsClientRequest(c *gin.Context, info *relaycommon.RelayInfo, a *Adaptor, requestBody io.Reader) (any, error) {
  82. awsCli, err := newAwsClient(c, info)
  83. if err != nil {
  84. return nil, types.NewError(err, types.ErrorCodeChannelAwsClientError)
  85. }
  86. a.AwsClient = awsCli
  87. // 获取对应的AWS模型ID
  88. awsModelId := getAwsModelID(info.UpstreamModelName)
  89. awsRegionPrefix := getAwsRegionPrefix(awsCli.Options().Region)
  90. canCrossRegion := awsModelCanCrossRegion(awsModelId, awsRegionPrefix)
  91. if canCrossRegion {
  92. awsModelId = awsModelCrossRegion(awsModelId, awsRegionPrefix)
  93. }
  94. // init empty request.header
  95. requestHeader := http.Header{}
  96. a.SetupRequestHeader(c, &requestHeader, info)
  97. headerOverride, err := channel.ResolveHeaderOverride(info, c)
  98. if err != nil {
  99. return nil, err
  100. }
  101. for key, value := range headerOverride {
  102. requestHeader.Set(key, value)
  103. }
  104. if isNovaModel(awsModelId) {
  105. var novaReq *NovaRequest
  106. err = common.DecodeJson(requestBody, &novaReq)
  107. if err != nil {
  108. return nil, types.NewError(errors.Wrap(err, "decode nova request fail"), types.ErrorCodeBadRequestBody)
  109. }
  110. // 使用InvokeModel API,但使用Nova格式的请求体
  111. awsReq := &bedrockruntime.InvokeModelInput{
  112. ModelId: aws.String(awsModelId),
  113. Accept: aws.String("application/json"),
  114. ContentType: aws.String("application/json"),
  115. }
  116. reqBody, err := common.Marshal(novaReq)
  117. if err != nil {
  118. return nil, types.NewError(errors.Wrap(err, "marshal nova request"), types.ErrorCodeBadResponseBody)
  119. }
  120. awsReq.Body = reqBody
  121. a.AwsReq = awsReq
  122. return nil, nil
  123. } else {
  124. awsClaudeReq, err := formatRequest(requestBody, requestHeader)
  125. if err != nil {
  126. return nil, types.NewError(errors.Wrap(err, "format aws request fail"), types.ErrorCodeBadRequestBody)
  127. }
  128. if info.IsStream {
  129. awsReq := &bedrockruntime.InvokeModelWithResponseStreamInput{
  130. ModelId: aws.String(awsModelId),
  131. Accept: aws.String("application/json"),
  132. ContentType: aws.String("application/json"),
  133. }
  134. awsReq.Body, err = buildAwsRequestBody(c, info, awsClaudeReq)
  135. if err != nil {
  136. return nil, types.NewError(errors.Wrap(err, "marshal aws request fail"), types.ErrorCodeBadRequestBody)
  137. }
  138. a.AwsReq = awsReq
  139. return nil, nil
  140. } else {
  141. awsReq := &bedrockruntime.InvokeModelInput{
  142. ModelId: aws.String(awsModelId),
  143. Accept: aws.String("application/json"),
  144. ContentType: aws.String("application/json"),
  145. }
  146. awsReq.Body, err = buildAwsRequestBody(c, info, awsClaudeReq)
  147. if err != nil {
  148. return nil, types.NewError(errors.Wrap(err, "marshal aws request fail"), types.ErrorCodeBadRequestBody)
  149. }
  150. a.AwsReq = awsReq
  151. return nil, nil
  152. }
  153. }
  154. }
  155. // buildAwsRequestBody prepares the payload for AWS requests, applying passthrough rules when enabled.
  156. func buildAwsRequestBody(c *gin.Context, info *relaycommon.RelayInfo, awsClaudeReq any) ([]byte, error) {
  157. if model_setting.GetGlobalSettings().PassThroughRequestEnabled || info.ChannelSetting.PassThroughBodyEnabled {
  158. storage, err := common.GetBodyStorage(c)
  159. if err != nil {
  160. return nil, errors.Wrap(err, "get request body for pass-through fail")
  161. }
  162. body, err := storage.Bytes()
  163. if err != nil {
  164. return nil, errors.Wrap(err, "get request body bytes fail")
  165. }
  166. var data map[string]interface{}
  167. if err := common.Unmarshal(body, &data); err != nil {
  168. return nil, errors.Wrap(err, "pass-through unmarshal request body fail")
  169. }
  170. delete(data, "model")
  171. delete(data, "stream")
  172. return common.Marshal(data)
  173. }
  174. return common.Marshal(awsClaudeReq)
  175. }
  176. func getAwsRegionPrefix(awsRegionId string) string {
  177. parts := strings.Split(awsRegionId, "-")
  178. regionPrefix := ""
  179. if len(parts) > 0 {
  180. regionPrefix = parts[0]
  181. }
  182. return regionPrefix
  183. }
  184. func awsModelCanCrossRegion(awsModelId, awsRegionPrefix string) bool {
  185. regionSet, exists := awsModelCanCrossRegionMap[awsModelId]
  186. return exists && regionSet[awsRegionPrefix]
  187. }
  188. func awsModelCrossRegion(awsModelId, awsRegionPrefix string) string {
  189. modelPrefix, find := awsRegionCrossModelPrefixMap[awsRegionPrefix]
  190. if !find {
  191. return awsModelId
  192. }
  193. return modelPrefix + "." + awsModelId
  194. }
  195. func getAwsModelID(requestModel string) string {
  196. if awsModelIDName, ok := awsModelIDMap[requestModel]; ok {
  197. return awsModelIDName
  198. }
  199. return requestModel
  200. }
  201. func awsHandler(c *gin.Context, info *relaycommon.RelayInfo, a *Adaptor) (*types.NewAPIError, *dto.Usage) {
  202. ctx, cancel := newAwsInvokeContext()
  203. defer cancel()
  204. awsResp, err := a.AwsClient.InvokeModel(ctx, a.AwsReq.(*bedrockruntime.InvokeModelInput))
  205. if err != nil {
  206. statusCode := getAwsErrorStatusCode(err)
  207. return types.NewOpenAIError(errors.Wrap(err, "InvokeModel"), types.ErrorCodeAwsInvokeError, statusCode), nil
  208. }
  209. claudeInfo := &claude.ClaudeResponseInfo{
  210. ResponseId: helper.GetResponseID(c),
  211. Created: common.GetTimestamp(),
  212. Model: info.UpstreamModelName,
  213. ResponseText: strings.Builder{},
  214. Usage: &dto.Usage{},
  215. }
  216. // 复制上游 Content-Type 到客户端响应头
  217. if awsResp.ContentType != nil && *awsResp.ContentType != "" {
  218. c.Writer.Header().Set("Content-Type", *awsResp.ContentType)
  219. }
  220. handlerErr := claude.HandleClaudeResponseData(c, info, claudeInfo, nil, awsResp.Body)
  221. if handlerErr != nil {
  222. return handlerErr, nil
  223. }
  224. return nil, claudeInfo.Usage
  225. }
  226. func awsStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, a *Adaptor) (*types.NewAPIError, *dto.Usage) {
  227. ctx, cancel := newAwsInvokeContext()
  228. defer cancel()
  229. awsResp, err := a.AwsClient.InvokeModelWithResponseStream(ctx, a.AwsReq.(*bedrockruntime.InvokeModelWithResponseStreamInput))
  230. if err != nil {
  231. statusCode := getAwsErrorStatusCode(err)
  232. return types.NewOpenAIError(errors.Wrap(err, "InvokeModelWithResponseStream"), types.ErrorCodeAwsInvokeError, statusCode), nil
  233. }
  234. stream := awsResp.GetStream()
  235. defer stream.Close()
  236. claudeInfo := &claude.ClaudeResponseInfo{
  237. ResponseId: helper.GetResponseID(c),
  238. Created: common.GetTimestamp(),
  239. Model: info.UpstreamModelName,
  240. ResponseText: strings.Builder{},
  241. Usage: &dto.Usage{},
  242. }
  243. for event := range stream.Events() {
  244. switch v := event.(type) {
  245. case *bedrockruntimeTypes.ResponseStreamMemberChunk:
  246. info.SetFirstResponseTime()
  247. respErr := claude.HandleStreamResponseData(c, info, claudeInfo, string(v.Value.Bytes))
  248. if respErr != nil {
  249. return respErr, nil
  250. }
  251. case *bedrockruntimeTypes.UnknownUnionMember:
  252. fmt.Println("unknown tag:", v.Tag)
  253. return types.NewError(errors.New("unknown response type"), types.ErrorCodeInvalidRequest), nil
  254. default:
  255. fmt.Println("union is nil or unknown type")
  256. return types.NewError(errors.New("nil or unknown response type"), types.ErrorCodeInvalidRequest), nil
  257. }
  258. }
  259. claude.HandleStreamFinalResponse(c, info, claudeInfo)
  260. return nil, claudeInfo.Usage
  261. }
  262. // Nova模型处理函数
  263. func handleNovaRequest(c *gin.Context, info *relaycommon.RelayInfo, a *Adaptor) (*types.NewAPIError, *dto.Usage) {
  264. ctx, cancel := newAwsInvokeContext()
  265. defer cancel()
  266. awsResp, err := a.AwsClient.InvokeModel(ctx, a.AwsReq.(*bedrockruntime.InvokeModelInput))
  267. if err != nil {
  268. statusCode := getAwsErrorStatusCode(err)
  269. return types.NewOpenAIError(errors.Wrap(err, "InvokeModel"), types.ErrorCodeAwsInvokeError, statusCode), nil
  270. }
  271. // 解析Nova响应
  272. var novaResp struct {
  273. Output struct {
  274. Message struct {
  275. Content []struct {
  276. Text string `json:"text"`
  277. } `json:"content"`
  278. } `json:"message"`
  279. } `json:"output"`
  280. Usage struct {
  281. InputTokens int `json:"inputTokens"`
  282. OutputTokens int `json:"outputTokens"`
  283. TotalTokens int `json:"totalTokens"`
  284. } `json:"usage"`
  285. }
  286. if err := json.Unmarshal(awsResp.Body, &novaResp); err != nil {
  287. return types.NewError(errors.Wrap(err, "unmarshal nova response"), types.ErrorCodeBadResponseBody), nil
  288. }
  289. // 构造OpenAI格式响应
  290. response := dto.OpenAITextResponse{
  291. Id: helper.GetResponseID(c),
  292. Object: "chat.completion",
  293. Created: common.GetTimestamp(),
  294. Model: info.UpstreamModelName,
  295. Choices: []dto.OpenAITextResponseChoice{{
  296. Index: 0,
  297. Message: dto.Message{
  298. Role: "assistant",
  299. Content: novaResp.Output.Message.Content[0].Text,
  300. },
  301. FinishReason: "stop",
  302. }},
  303. Usage: dto.Usage{
  304. PromptTokens: novaResp.Usage.InputTokens,
  305. CompletionTokens: novaResp.Usage.OutputTokens,
  306. TotalTokens: novaResp.Usage.TotalTokens,
  307. },
  308. }
  309. c.JSON(http.StatusOK, response)
  310. return nil, &response.Usage
  311. }