relay-aws.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. package aws
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "strings"
  8. "github.com/QuantumNous/new-api/common"
  9. "github.com/QuantumNous/new-api/dto"
  10. "github.com/QuantumNous/new-api/relay/channel/claude"
  11. relaycommon "github.com/QuantumNous/new-api/relay/common"
  12. "github.com/QuantumNous/new-api/relay/helper"
  13. "github.com/QuantumNous/new-api/service"
  14. "github.com/QuantumNous/new-api/types"
  15. "github.com/gin-gonic/gin"
  16. "github.com/pkg/errors"
  17. "github.com/QuantumNous/new-api/setting/model_setting"
  18. "github.com/aws/aws-sdk-go-v2/aws"
  19. "github.com/aws/aws-sdk-go-v2/credentials"
  20. "github.com/aws/aws-sdk-go-v2/service/bedrockruntime"
  21. bedrockruntimeTypes "github.com/aws/aws-sdk-go-v2/service/bedrockruntime/types"
  22. "github.com/aws/smithy-go/auth/bearer"
  23. )
  24. // getAwsErrorStatusCode extracts HTTP status code from AWS SDK error
  25. func getAwsErrorStatusCode(err error) int {
  26. // Check for HTTP response error which contains status code
  27. var httpErr interface{ HTTPStatusCode() int }
  28. if errors.As(err, &httpErr) {
  29. return httpErr.HTTPStatusCode()
  30. }
  31. // Default to 500 if we can't determine the status code
  32. return http.StatusInternalServerError
  33. }
  34. func newAwsClient(c *gin.Context, info *relaycommon.RelayInfo) (*bedrockruntime.Client, error) {
  35. var (
  36. httpClient *http.Client
  37. err error
  38. )
  39. if info.ChannelSetting.Proxy != "" {
  40. httpClient, err = service.NewProxyHttpClient(info.ChannelSetting.Proxy)
  41. if err != nil {
  42. return nil, fmt.Errorf("new proxy http client failed: %w", err)
  43. }
  44. } else {
  45. httpClient = service.GetHttpClient()
  46. }
  47. awsSecret := strings.Split(info.ApiKey, "|")
  48. var client *bedrockruntime.Client
  49. switch len(awsSecret) {
  50. case 2:
  51. apiKey := awsSecret[0]
  52. region := awsSecret[1]
  53. client = bedrockruntime.New(bedrockruntime.Options{
  54. Region: region,
  55. BearerAuthTokenProvider: bearer.StaticTokenProvider{Token: bearer.Token{Value: apiKey}},
  56. HTTPClient: httpClient,
  57. })
  58. case 3:
  59. ak := awsSecret[0]
  60. sk := awsSecret[1]
  61. region := awsSecret[2]
  62. client = bedrockruntime.New(bedrockruntime.Options{
  63. Region: region,
  64. Credentials: aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(ak, sk, "")),
  65. HTTPClient: httpClient,
  66. })
  67. default:
  68. return nil, errors.New("invalid aws secret key")
  69. }
  70. return client, nil
  71. }
  72. func doAwsClientRequest(c *gin.Context, info *relaycommon.RelayInfo, a *Adaptor, requestBody io.Reader) (any, error) {
  73. awsCli, err := newAwsClient(c, info)
  74. if err != nil {
  75. return nil, types.NewError(err, types.ErrorCodeChannelAwsClientError)
  76. }
  77. a.AwsClient = awsCli
  78. // 获取对应的AWS模型ID
  79. awsModelId := getAwsModelID(info.UpstreamModelName)
  80. awsRegionPrefix := getAwsRegionPrefix(awsCli.Options().Region)
  81. canCrossRegion := awsModelCanCrossRegion(awsModelId, awsRegionPrefix)
  82. if canCrossRegion {
  83. awsModelId = awsModelCrossRegion(awsModelId, awsRegionPrefix)
  84. }
  85. // init empty request.header
  86. requestHeader := http.Header{}
  87. a.SetupRequestHeader(c, &requestHeader, info)
  88. if isNovaModel(awsModelId) {
  89. var novaReq *NovaRequest
  90. err = common.DecodeJson(requestBody, &novaReq)
  91. if err != nil {
  92. return nil, types.NewError(errors.Wrap(err, "decode nova request fail"), types.ErrorCodeBadRequestBody)
  93. }
  94. // 使用InvokeModel API,但使用Nova格式的请求体
  95. awsReq := &bedrockruntime.InvokeModelInput{
  96. ModelId: aws.String(awsModelId),
  97. Accept: aws.String("application/json"),
  98. ContentType: aws.String("application/json"),
  99. }
  100. reqBody, err := common.Marshal(novaReq)
  101. if err != nil {
  102. return nil, types.NewError(errors.Wrap(err, "marshal nova request"), types.ErrorCodeBadResponseBody)
  103. }
  104. awsReq.Body = reqBody
  105. return nil, nil
  106. } else {
  107. awsClaudeReq, err := formatRequest(requestBody, requestHeader)
  108. if err != nil {
  109. return nil, types.NewError(errors.Wrap(err, "format aws request fail"), types.ErrorCodeBadRequestBody)
  110. }
  111. if info.IsStream {
  112. awsReq := &bedrockruntime.InvokeModelWithResponseStreamInput{
  113. ModelId: aws.String(awsModelId),
  114. Accept: aws.String("application/json"),
  115. ContentType: aws.String("application/json"),
  116. }
  117. awsReq.Body, err = buildAwsRequestBody(c, info, awsClaudeReq)
  118. if err != nil {
  119. return nil, types.NewError(errors.Wrap(err, "marshal aws request fail"), types.ErrorCodeBadRequestBody)
  120. }
  121. a.AwsReq = awsReq
  122. return nil, nil
  123. } else {
  124. awsReq := &bedrockruntime.InvokeModelInput{
  125. ModelId: aws.String(awsModelId),
  126. Accept: aws.String("application/json"),
  127. ContentType: aws.String("application/json"),
  128. }
  129. awsReq.Body, err = buildAwsRequestBody(c, info, awsClaudeReq)
  130. if err != nil {
  131. return nil, types.NewError(errors.Wrap(err, "marshal aws request fail"), types.ErrorCodeBadRequestBody)
  132. }
  133. a.AwsReq = awsReq
  134. return nil, nil
  135. }
  136. }
  137. }
  138. // buildAwsRequestBody prepares the payload for AWS requests, applying passthrough rules when enabled.
  139. func buildAwsRequestBody(c *gin.Context, info *relaycommon.RelayInfo, awsClaudeReq any) ([]byte, error) {
  140. if model_setting.GetGlobalSettings().PassThroughRequestEnabled || info.ChannelSetting.PassThroughBodyEnabled {
  141. body, err := common.GetRequestBody(c)
  142. if err != nil {
  143. return nil, errors.Wrap(err, "get request body for pass-through fail")
  144. }
  145. var data map[string]interface{}
  146. if err := common.Unmarshal(body, &data); err != nil {
  147. return nil, errors.Wrap(err, "pass-through unmarshal request body fail")
  148. }
  149. delete(data, "model")
  150. delete(data, "stream")
  151. return common.Marshal(data)
  152. }
  153. return common.Marshal(awsClaudeReq)
  154. }
  155. func getAwsRegionPrefix(awsRegionId string) string {
  156. parts := strings.Split(awsRegionId, "-")
  157. regionPrefix := ""
  158. if len(parts) > 0 {
  159. regionPrefix = parts[0]
  160. }
  161. return regionPrefix
  162. }
  163. func awsModelCanCrossRegion(awsModelId, awsRegionPrefix string) bool {
  164. regionSet, exists := awsModelCanCrossRegionMap[awsModelId]
  165. return exists && regionSet[awsRegionPrefix]
  166. }
  167. func awsModelCrossRegion(awsModelId, awsRegionPrefix string) string {
  168. modelPrefix, find := awsRegionCrossModelPrefixMap[awsRegionPrefix]
  169. if !find {
  170. return awsModelId
  171. }
  172. return modelPrefix + "." + awsModelId
  173. }
  174. func getAwsModelID(requestModel string) string {
  175. if awsModelIDName, ok := awsModelIDMap[requestModel]; ok {
  176. return awsModelIDName
  177. }
  178. return requestModel
  179. }
  180. func awsHandler(c *gin.Context, info *relaycommon.RelayInfo, a *Adaptor) (*types.NewAPIError, *dto.Usage) {
  181. awsResp, err := a.AwsClient.InvokeModel(c.Request.Context(), a.AwsReq.(*bedrockruntime.InvokeModelInput))
  182. if err != nil {
  183. statusCode := getAwsErrorStatusCode(err)
  184. return types.NewOpenAIError(errors.Wrap(err, "InvokeModel"), types.ErrorCodeAwsInvokeError, statusCode), nil
  185. }
  186. claudeInfo := &claude.ClaudeResponseInfo{
  187. ResponseId: helper.GetResponseID(c),
  188. Created: common.GetTimestamp(),
  189. Model: info.UpstreamModelName,
  190. ResponseText: strings.Builder{},
  191. Usage: &dto.Usage{},
  192. }
  193. // 复制上游 Content-Type 到客户端响应头
  194. if awsResp.ContentType != nil && *awsResp.ContentType != "" {
  195. c.Writer.Header().Set("Content-Type", *awsResp.ContentType)
  196. }
  197. handlerErr := claude.HandleClaudeResponseData(c, info, claudeInfo, nil, awsResp.Body, claude.RequestModeMessage)
  198. if handlerErr != nil {
  199. return handlerErr, nil
  200. }
  201. return nil, claudeInfo.Usage
  202. }
  203. func awsStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, a *Adaptor) (*types.NewAPIError, *dto.Usage) {
  204. awsResp, err := a.AwsClient.InvokeModelWithResponseStream(c.Request.Context(), a.AwsReq.(*bedrockruntime.InvokeModelWithResponseStreamInput))
  205. if err != nil {
  206. statusCode := getAwsErrorStatusCode(err)
  207. return types.NewOpenAIError(errors.Wrap(err, "InvokeModelWithResponseStream"), types.ErrorCodeAwsInvokeError, statusCode), nil
  208. }
  209. stream := awsResp.GetStream()
  210. defer stream.Close()
  211. claudeInfo := &claude.ClaudeResponseInfo{
  212. ResponseId: helper.GetResponseID(c),
  213. Created: common.GetTimestamp(),
  214. Model: info.UpstreamModelName,
  215. ResponseText: strings.Builder{},
  216. Usage: &dto.Usage{},
  217. }
  218. for event := range stream.Events() {
  219. switch v := event.(type) {
  220. case *bedrockruntimeTypes.ResponseStreamMemberChunk:
  221. info.SetFirstResponseTime()
  222. respErr := claude.HandleStreamResponseData(c, info, claudeInfo, string(v.Value.Bytes), claude.RequestModeMessage)
  223. if respErr != nil {
  224. return respErr, nil
  225. }
  226. case *bedrockruntimeTypes.UnknownUnionMember:
  227. fmt.Println("unknown tag:", v.Tag)
  228. return types.NewError(errors.New("unknown response type"), types.ErrorCodeInvalidRequest), nil
  229. default:
  230. fmt.Println("union is nil or unknown type")
  231. return types.NewError(errors.New("nil or unknown response type"), types.ErrorCodeInvalidRequest), nil
  232. }
  233. }
  234. claude.HandleStreamFinalResponse(c, info, claudeInfo, claude.RequestModeMessage)
  235. return nil, claudeInfo.Usage
  236. }
  237. // Nova模型处理函数
  238. func handleNovaRequest(c *gin.Context, info *relaycommon.RelayInfo, a *Adaptor) (*types.NewAPIError, *dto.Usage) {
  239. awsResp, err := a.AwsClient.InvokeModel(c.Request.Context(), a.AwsReq.(*bedrockruntime.InvokeModelInput))
  240. if err != nil {
  241. statusCode := getAwsErrorStatusCode(err)
  242. return types.NewOpenAIError(errors.Wrap(err, "InvokeModel"), types.ErrorCodeAwsInvokeError, statusCode), nil
  243. }
  244. // 解析Nova响应
  245. var novaResp struct {
  246. Output struct {
  247. Message struct {
  248. Content []struct {
  249. Text string `json:"text"`
  250. } `json:"content"`
  251. } `json:"message"`
  252. } `json:"output"`
  253. Usage struct {
  254. InputTokens int `json:"inputTokens"`
  255. OutputTokens int `json:"outputTokens"`
  256. TotalTokens int `json:"totalTokens"`
  257. } `json:"usage"`
  258. }
  259. if err := json.Unmarshal(awsResp.Body, &novaResp); err != nil {
  260. return types.NewError(errors.Wrap(err, "unmarshal nova response"), types.ErrorCodeBadResponseBody), nil
  261. }
  262. // 构造OpenAI格式响应
  263. response := dto.OpenAITextResponse{
  264. Id: helper.GetResponseID(c),
  265. Object: "chat.completion",
  266. Created: common.GetTimestamp(),
  267. Model: info.UpstreamModelName,
  268. Choices: []dto.OpenAITextResponseChoice{{
  269. Index: 0,
  270. Message: dto.Message{
  271. Role: "assistant",
  272. Content: novaResp.Output.Message.Content[0].Text,
  273. },
  274. FinishReason: "stop",
  275. }},
  276. Usage: dto.Usage{
  277. PromptTokens: novaResp.Usage.InputTokens,
  278. CompletionTokens: novaResp.Usage.OutputTokens,
  279. TotalTokens: novaResp.Usage.TotalTokens,
  280. },
  281. }
  282. c.JSON(http.StatusOK, response)
  283. return nil, &response.Usage
  284. }