relay-aws.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. package aws
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "strings"
  8. "github.com/QuantumNous/new-api/common"
  9. "github.com/QuantumNous/new-api/dto"
  10. "github.com/QuantumNous/new-api/relay/channel/claude"
  11. relaycommon "github.com/QuantumNous/new-api/relay/common"
  12. "github.com/QuantumNous/new-api/relay/helper"
  13. "github.com/QuantumNous/new-api/service"
  14. "github.com/QuantumNous/new-api/types"
  15. "github.com/gin-gonic/gin"
  16. "github.com/pkg/errors"
  17. "github.com/aws/aws-sdk-go-v2/aws"
  18. "github.com/aws/aws-sdk-go-v2/credentials"
  19. "github.com/aws/aws-sdk-go-v2/service/bedrockruntime"
  20. bedrockruntimeTypes "github.com/aws/aws-sdk-go-v2/service/bedrockruntime/types"
  21. "github.com/aws/smithy-go/auth/bearer"
  22. )
  23. // getAwsErrorStatusCode extracts HTTP status code from AWS SDK error
  24. func getAwsErrorStatusCode(err error) int {
  25. // Check for HTTP response error which contains status code
  26. var httpErr interface{ HTTPStatusCode() int }
  27. if errors.As(err, &httpErr) {
  28. return httpErr.HTTPStatusCode()
  29. }
  30. // Default to 500 if we can't determine the status code
  31. return http.StatusInternalServerError
  32. }
  33. func newAwsClient(c *gin.Context, info *relaycommon.RelayInfo) (*bedrockruntime.Client, error) {
  34. var (
  35. httpClient *http.Client
  36. err error
  37. )
  38. if info.ChannelSetting.Proxy != "" {
  39. httpClient, err = service.NewProxyHttpClient(info.ChannelSetting.Proxy)
  40. if err != nil {
  41. return nil, fmt.Errorf("new proxy http client failed: %w", err)
  42. }
  43. } else {
  44. httpClient = service.GetHttpClient()
  45. }
  46. awsSecret := strings.Split(info.ApiKey, "|")
  47. var client *bedrockruntime.Client
  48. switch len(awsSecret) {
  49. case 2:
  50. apiKey := awsSecret[0]
  51. region := awsSecret[1]
  52. client = bedrockruntime.New(bedrockruntime.Options{
  53. Region: region,
  54. BearerAuthTokenProvider: bearer.StaticTokenProvider{Token: bearer.Token{Value: apiKey}},
  55. HTTPClient: httpClient,
  56. })
  57. case 3:
  58. ak := awsSecret[0]
  59. sk := awsSecret[1]
  60. region := awsSecret[2]
  61. client = bedrockruntime.New(bedrockruntime.Options{
  62. Region: region,
  63. Credentials: aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(ak, sk, "")),
  64. HTTPClient: httpClient,
  65. })
  66. default:
  67. return nil, errors.New("invalid aws secret key")
  68. }
  69. return client, nil
  70. }
  71. func doAwsClientRequest(c *gin.Context, info *relaycommon.RelayInfo, a *Adaptor, requestBody io.Reader) (any, error) {
  72. awsCli, err := newAwsClient(c, info)
  73. if err != nil {
  74. return nil, types.NewError(err, types.ErrorCodeChannelAwsClientError)
  75. }
  76. a.AwsClient = awsCli
  77. // 获取对应的AWS模型ID
  78. awsModelId := getAwsModelID(info.UpstreamModelName)
  79. awsRegionPrefix := getAwsRegionPrefix(awsCli.Options().Region)
  80. canCrossRegion := awsModelCanCrossRegion(awsModelId, awsRegionPrefix)
  81. if canCrossRegion {
  82. awsModelId = awsModelCrossRegion(awsModelId, awsRegionPrefix)
  83. }
  84. // init empty request.header
  85. requestHeader := http.Header{}
  86. a.SetupRequestHeader(c, &requestHeader, info)
  87. if isNovaModel(awsModelId) {
  88. var novaReq *NovaRequest
  89. err = common.DecodeJson(requestBody, &novaReq)
  90. if err != nil {
  91. return nil, types.NewError(errors.Wrap(err, "decode nova request fail"), types.ErrorCodeBadRequestBody)
  92. }
  93. // 使用InvokeModel API,但使用Nova格式的请求体
  94. awsReq := &bedrockruntime.InvokeModelInput{
  95. ModelId: aws.String(awsModelId),
  96. Accept: aws.String("application/json"),
  97. ContentType: aws.String("application/json"),
  98. }
  99. reqBody, err := common.Marshal(novaReq)
  100. if err != nil {
  101. return nil, types.NewError(errors.Wrap(err, "marshal nova request"), types.ErrorCodeBadResponseBody)
  102. }
  103. awsReq.Body = reqBody
  104. return nil, nil
  105. } else {
  106. awsClaudeReq, err := formatRequest(requestBody, requestHeader)
  107. if err != nil {
  108. return nil, types.NewError(errors.Wrap(err, "format aws request fail"), types.ErrorCodeBadRequestBody)
  109. }
  110. if info.IsStream {
  111. awsReq := &bedrockruntime.InvokeModelWithResponseStreamInput{
  112. ModelId: aws.String(awsModelId),
  113. Accept: aws.String("application/json"),
  114. ContentType: aws.String("application/json"),
  115. }
  116. awsReq.Body, err = common.Marshal(awsClaudeReq)
  117. if err != nil {
  118. return nil, types.NewError(errors.Wrap(err, "marshal aws request fail"), types.ErrorCodeBadRequestBody)
  119. }
  120. a.AwsReq = awsReq
  121. return nil, nil
  122. } else {
  123. awsReq := &bedrockruntime.InvokeModelInput{
  124. ModelId: aws.String(awsModelId),
  125. Accept: aws.String("application/json"),
  126. ContentType: aws.String("application/json"),
  127. }
  128. awsReq.Body, err = common.Marshal(awsClaudeReq)
  129. if err != nil {
  130. return nil, types.NewError(errors.Wrap(err, "marshal aws request fail"), types.ErrorCodeBadRequestBody)
  131. }
  132. a.AwsReq = awsReq
  133. return nil, nil
  134. }
  135. }
  136. }
  137. func getAwsRegionPrefix(awsRegionId string) string {
  138. parts := strings.Split(awsRegionId, "-")
  139. regionPrefix := ""
  140. if len(parts) > 0 {
  141. regionPrefix = parts[0]
  142. }
  143. return regionPrefix
  144. }
  145. func awsModelCanCrossRegion(awsModelId, awsRegionPrefix string) bool {
  146. regionSet, exists := awsModelCanCrossRegionMap[awsModelId]
  147. return exists && regionSet[awsRegionPrefix]
  148. }
  149. func awsModelCrossRegion(awsModelId, awsRegionPrefix string) string {
  150. modelPrefix, find := awsRegionCrossModelPrefixMap[awsRegionPrefix]
  151. if !find {
  152. return awsModelId
  153. }
  154. return modelPrefix + "." + awsModelId
  155. }
  156. func getAwsModelID(requestModel string) string {
  157. if awsModelIDName, ok := awsModelIDMap[requestModel]; ok {
  158. return awsModelIDName
  159. }
  160. return requestModel
  161. }
  162. func awsHandler(c *gin.Context, info *relaycommon.RelayInfo, a *Adaptor) (*types.NewAPIError, *dto.Usage) {
  163. awsResp, err := a.AwsClient.InvokeModel(c.Request.Context(), a.AwsReq.(*bedrockruntime.InvokeModelInput))
  164. if err != nil {
  165. statusCode := getAwsErrorStatusCode(err)
  166. return types.NewOpenAIError(errors.Wrap(err, "InvokeModel"), types.ErrorCodeAwsInvokeError, statusCode), nil
  167. }
  168. claudeInfo := &claude.ClaudeResponseInfo{
  169. ResponseId: helper.GetResponseID(c),
  170. Created: common.GetTimestamp(),
  171. Model: info.UpstreamModelName,
  172. ResponseText: strings.Builder{},
  173. Usage: &dto.Usage{},
  174. }
  175. // 复制上游 Content-Type 到客户端响应头
  176. if awsResp.ContentType != nil && *awsResp.ContentType != "" {
  177. c.Writer.Header().Set("Content-Type", *awsResp.ContentType)
  178. }
  179. handlerErr := claude.HandleClaudeResponseData(c, info, claudeInfo, nil, awsResp.Body, claude.RequestModeMessage)
  180. if handlerErr != nil {
  181. return handlerErr, nil
  182. }
  183. return nil, claudeInfo.Usage
  184. }
  185. func awsStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, a *Adaptor) (*types.NewAPIError, *dto.Usage) {
  186. awsResp, err := a.AwsClient.InvokeModelWithResponseStream(c.Request.Context(), a.AwsReq.(*bedrockruntime.InvokeModelWithResponseStreamInput))
  187. if err != nil {
  188. statusCode := getAwsErrorStatusCode(err)
  189. return types.NewOpenAIError(errors.Wrap(err, "InvokeModelWithResponseStream"), types.ErrorCodeAwsInvokeError, statusCode), nil
  190. }
  191. stream := awsResp.GetStream()
  192. defer stream.Close()
  193. claudeInfo := &claude.ClaudeResponseInfo{
  194. ResponseId: helper.GetResponseID(c),
  195. Created: common.GetTimestamp(),
  196. Model: info.UpstreamModelName,
  197. ResponseText: strings.Builder{},
  198. Usage: &dto.Usage{},
  199. }
  200. for event := range stream.Events() {
  201. switch v := event.(type) {
  202. case *bedrockruntimeTypes.ResponseStreamMemberChunk:
  203. info.SetFirstResponseTime()
  204. respErr := claude.HandleStreamResponseData(c, info, claudeInfo, string(v.Value.Bytes), claude.RequestModeMessage)
  205. if respErr != nil {
  206. return respErr, nil
  207. }
  208. case *bedrockruntimeTypes.UnknownUnionMember:
  209. fmt.Println("unknown tag:", v.Tag)
  210. return types.NewError(errors.New("unknown response type"), types.ErrorCodeInvalidRequest), nil
  211. default:
  212. fmt.Println("union is nil or unknown type")
  213. return types.NewError(errors.New("nil or unknown response type"), types.ErrorCodeInvalidRequest), nil
  214. }
  215. }
  216. claude.HandleStreamFinalResponse(c, info, claudeInfo, claude.RequestModeMessage)
  217. return nil, claudeInfo.Usage
  218. }
  219. // Nova模型处理函数
  220. func handleNovaRequest(c *gin.Context, info *relaycommon.RelayInfo, a *Adaptor) (*types.NewAPIError, *dto.Usage) {
  221. awsResp, err := a.AwsClient.InvokeModel(c.Request.Context(), a.AwsReq.(*bedrockruntime.InvokeModelInput))
  222. if err != nil {
  223. statusCode := getAwsErrorStatusCode(err)
  224. return types.NewOpenAIError(errors.Wrap(err, "InvokeModel"), types.ErrorCodeAwsInvokeError, statusCode), nil
  225. }
  226. // 解析Nova响应
  227. var novaResp struct {
  228. Output struct {
  229. Message struct {
  230. Content []struct {
  231. Text string `json:"text"`
  232. } `json:"content"`
  233. } `json:"message"`
  234. } `json:"output"`
  235. Usage struct {
  236. InputTokens int `json:"inputTokens"`
  237. OutputTokens int `json:"outputTokens"`
  238. TotalTokens int `json:"totalTokens"`
  239. } `json:"usage"`
  240. }
  241. if err := json.Unmarshal(awsResp.Body, &novaResp); err != nil {
  242. return types.NewError(errors.Wrap(err, "unmarshal nova response"), types.ErrorCodeBadResponseBody), nil
  243. }
  244. // 构造OpenAI格式响应
  245. response := dto.OpenAITextResponse{
  246. Id: helper.GetResponseID(c),
  247. Object: "chat.completion",
  248. Created: common.GetTimestamp(),
  249. Model: info.UpstreamModelName,
  250. Choices: []dto.OpenAITextResponseChoice{{
  251. Index: 0,
  252. Message: dto.Message{
  253. Role: "assistant",
  254. Content: novaResp.Output.Message.Content[0].Text,
  255. },
  256. FinishReason: "stop",
  257. }},
  258. Usage: dto.Usage{
  259. PromptTokens: novaResp.Usage.InputTokens,
  260. CompletionTokens: novaResp.Usage.OutputTokens,
  261. TotalTokens: novaResp.Usage.TotalTokens,
  262. },
  263. }
  264. c.JSON(http.StatusOK, response)
  265. return nil, &response.Usage
  266. }