relay-aws.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package aws
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "strings"
  8. "github.com/QuantumNous/new-api/common"
  9. "github.com/QuantumNous/new-api/dto"
  10. "github.com/QuantumNous/new-api/relay/channel/claude"
  11. relaycommon "github.com/QuantumNous/new-api/relay/common"
  12. "github.com/QuantumNous/new-api/relay/helper"
  13. "github.com/QuantumNous/new-api/service"
  14. "github.com/QuantumNous/new-api/types"
  15. "github.com/gin-gonic/gin"
  16. "github.com/pkg/errors"
  17. "github.com/aws/aws-sdk-go-v2/aws"
  18. "github.com/aws/aws-sdk-go-v2/credentials"
  19. "github.com/aws/aws-sdk-go-v2/service/bedrockruntime"
  20. bedrockruntimeTypes "github.com/aws/aws-sdk-go-v2/service/bedrockruntime/types"
  21. "github.com/aws/smithy-go/auth/bearer"
  22. )
  23. func newAwsClient(c *gin.Context, info *relaycommon.RelayInfo) (*bedrockruntime.Client, error) {
  24. var (
  25. httpClient *http.Client
  26. err error
  27. )
  28. if info.ChannelSetting.Proxy != "" {
  29. httpClient, err = service.NewProxyHttpClient(info.ChannelSetting.Proxy)
  30. if err != nil {
  31. return nil, fmt.Errorf("new proxy http client failed: %w", err)
  32. }
  33. } else {
  34. httpClient = service.GetHttpClient()
  35. }
  36. awsSecret := strings.Split(info.ApiKey, "|")
  37. var client *bedrockruntime.Client
  38. switch len(awsSecret) {
  39. case 2:
  40. apiKey := awsSecret[0]
  41. region := awsSecret[1]
  42. client = bedrockruntime.New(bedrockruntime.Options{
  43. Region: region,
  44. BearerAuthTokenProvider: bearer.StaticTokenProvider{Token: bearer.Token{Value: apiKey}},
  45. HTTPClient: httpClient,
  46. })
  47. case 3:
  48. ak := awsSecret[0]
  49. sk := awsSecret[1]
  50. region := awsSecret[2]
  51. client = bedrockruntime.New(bedrockruntime.Options{
  52. Region: region,
  53. Credentials: aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(ak, sk, "")),
  54. HTTPClient: httpClient,
  55. })
  56. default:
  57. return nil, errors.New("invalid aws secret key")
  58. }
  59. return client, nil
  60. }
  61. func doAwsClientRequest(c *gin.Context, info *relaycommon.RelayInfo, a *Adaptor, requestBody io.Reader) (any, error) {
  62. awsCli, err := newAwsClient(c, info)
  63. if err != nil {
  64. return nil, types.NewError(err, types.ErrorCodeChannelAwsClientError)
  65. }
  66. a.AwsClient = awsCli
  67. println(info.UpstreamModelName)
  68. // 获取对应的AWS模型ID
  69. awsModelId := getAwsModelID(info.UpstreamModelName)
  70. awsRegionPrefix := getAwsRegionPrefix(awsCli.Options().Region)
  71. canCrossRegion := awsModelCanCrossRegion(awsModelId, awsRegionPrefix)
  72. if canCrossRegion {
  73. awsModelId = awsModelCrossRegion(awsModelId, awsRegionPrefix)
  74. }
  75. if isNovaModel(awsModelId) {
  76. var novaReq *NovaRequest
  77. err = common.DecodeJson(requestBody, &novaReq)
  78. if err != nil {
  79. return nil, types.NewError(errors.Wrap(err, "decode nova request fail"), types.ErrorCodeBadRequestBody)
  80. }
  81. // 使用InvokeModel API,但使用Nova格式的请求体
  82. awsReq := &bedrockruntime.InvokeModelInput{
  83. ModelId: aws.String(awsModelId),
  84. Accept: aws.String("application/json"),
  85. ContentType: aws.String("application/json"),
  86. }
  87. reqBody, err := common.Marshal(novaReq)
  88. if err != nil {
  89. return nil, types.NewError(errors.Wrap(err, "marshal nova request"), types.ErrorCodeBadResponseBody)
  90. }
  91. awsReq.Body = reqBody
  92. return nil, nil
  93. } else {
  94. awsClaudeReq, err := formatRequest(requestBody)
  95. if err != nil {
  96. return nil, types.NewError(errors.Wrap(err, "format aws request fail"), types.ErrorCodeBadRequestBody)
  97. }
  98. if info.IsStream {
  99. awsReq := &bedrockruntime.InvokeModelWithResponseStreamInput{
  100. ModelId: aws.String(awsModelId),
  101. Accept: aws.String("application/json"),
  102. ContentType: aws.String("application/json"),
  103. }
  104. awsReq.Body, err = common.Marshal(awsClaudeReq)
  105. if err != nil {
  106. return nil, types.NewError(errors.Wrap(err, "marshal aws request fail"), types.ErrorCodeBadRequestBody)
  107. }
  108. a.AwsReq = awsReq
  109. return nil, nil
  110. } else {
  111. awsReq := &bedrockruntime.InvokeModelInput{
  112. ModelId: aws.String(awsModelId),
  113. Accept: aws.String("application/json"),
  114. ContentType: aws.String("application/json"),
  115. }
  116. awsReq.Body, err = common.Marshal(awsClaudeReq)
  117. if err != nil {
  118. return nil, types.NewError(errors.Wrap(err, "marshal aws request fail"), types.ErrorCodeBadRequestBody)
  119. }
  120. a.AwsReq = awsReq
  121. return nil, nil
  122. }
  123. }
  124. }
  125. func getAwsRegionPrefix(awsRegionId string) string {
  126. parts := strings.Split(awsRegionId, "-")
  127. regionPrefix := ""
  128. if len(parts) > 0 {
  129. regionPrefix = parts[0]
  130. }
  131. return regionPrefix
  132. }
  133. func awsModelCanCrossRegion(awsModelId, awsRegionPrefix string) bool {
  134. regionSet, exists := awsModelCanCrossRegionMap[awsModelId]
  135. return exists && regionSet[awsRegionPrefix]
  136. }
  137. func awsModelCrossRegion(awsModelId, awsRegionPrefix string) string {
  138. modelPrefix, find := awsRegionCrossModelPrefixMap[awsRegionPrefix]
  139. if !find {
  140. return awsModelId
  141. }
  142. return modelPrefix + "." + awsModelId
  143. }
  144. func getAwsModelID(requestModel string) string {
  145. if awsModelIDName, ok := awsModelIDMap[requestModel]; ok {
  146. return awsModelIDName
  147. }
  148. return requestModel
  149. }
  150. func awsHandler(c *gin.Context, info *relaycommon.RelayInfo, a *Adaptor) (*types.NewAPIError, *dto.Usage) {
  151. awsResp, err := a.AwsClient.InvokeModel(c.Request.Context(), a.AwsReq.(*bedrockruntime.InvokeModelInput))
  152. if err != nil {
  153. return types.NewOpenAIError(errors.Wrap(err, "InvokeModel"), types.ErrorCodeAwsInvokeError, http.StatusInternalServerError), nil
  154. }
  155. claudeInfo := &claude.ClaudeResponseInfo{
  156. ResponseId: helper.GetResponseID(c),
  157. Created: common.GetTimestamp(),
  158. Model: info.UpstreamModelName,
  159. ResponseText: strings.Builder{},
  160. Usage: &dto.Usage{},
  161. }
  162. // 复制上游 Content-Type 到客户端响应头
  163. if awsResp.ContentType != nil && *awsResp.ContentType != "" {
  164. c.Writer.Header().Set("Content-Type", *awsResp.ContentType)
  165. }
  166. handlerErr := claude.HandleClaudeResponseData(c, info, claudeInfo, nil, awsResp.Body, claude.RequestModeMessage)
  167. if handlerErr != nil {
  168. return handlerErr, nil
  169. }
  170. return nil, claudeInfo.Usage
  171. }
  172. func awsStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, a *Adaptor) (*types.NewAPIError, *dto.Usage) {
  173. awsResp, err := a.AwsClient.InvokeModelWithResponseStream(c.Request.Context(), a.AwsReq.(*bedrockruntime.InvokeModelWithResponseStreamInput))
  174. if err != nil {
  175. return types.NewOpenAIError(errors.Wrap(err, "InvokeModelWithResponseStream"), types.ErrorCodeAwsInvokeError, http.StatusInternalServerError), nil
  176. }
  177. stream := awsResp.GetStream()
  178. defer stream.Close()
  179. claudeInfo := &claude.ClaudeResponseInfo{
  180. ResponseId: helper.GetResponseID(c),
  181. Created: common.GetTimestamp(),
  182. Model: info.UpstreamModelName,
  183. ResponseText: strings.Builder{},
  184. Usage: &dto.Usage{},
  185. }
  186. for event := range stream.Events() {
  187. switch v := event.(type) {
  188. case *bedrockruntimeTypes.ResponseStreamMemberChunk:
  189. info.SetFirstResponseTime()
  190. respErr := claude.HandleStreamResponseData(c, info, claudeInfo, string(v.Value.Bytes), claude.RequestModeMessage)
  191. if respErr != nil {
  192. return respErr, nil
  193. }
  194. case *bedrockruntimeTypes.UnknownUnionMember:
  195. fmt.Println("unknown tag:", v.Tag)
  196. return types.NewError(errors.New("unknown response type"), types.ErrorCodeInvalidRequest), nil
  197. default:
  198. fmt.Println("union is nil or unknown type")
  199. return types.NewError(errors.New("nil or unknown response type"), types.ErrorCodeInvalidRequest), nil
  200. }
  201. }
  202. claude.HandleStreamFinalResponse(c, info, claudeInfo, claude.RequestModeMessage)
  203. return nil, claudeInfo.Usage
  204. }
  205. // Nova模型处理函数
  206. func handleNovaRequest(c *gin.Context, info *relaycommon.RelayInfo, a *Adaptor) (*types.NewAPIError, *dto.Usage) {
  207. awsResp, err := a.AwsClient.InvokeModel(c.Request.Context(), a.AwsReq.(*bedrockruntime.InvokeModelInput))
  208. if err != nil {
  209. return types.NewError(errors.Wrap(err, "InvokeModel"), types.ErrorCodeChannelAwsClientError), nil
  210. }
  211. // 解析Nova响应
  212. var novaResp struct {
  213. Output struct {
  214. Message struct {
  215. Content []struct {
  216. Text string `json:"text"`
  217. } `json:"content"`
  218. } `json:"message"`
  219. } `json:"output"`
  220. Usage struct {
  221. InputTokens int `json:"inputTokens"`
  222. OutputTokens int `json:"outputTokens"`
  223. TotalTokens int `json:"totalTokens"`
  224. } `json:"usage"`
  225. }
  226. if err := json.Unmarshal(awsResp.Body, &novaResp); err != nil {
  227. return types.NewError(errors.Wrap(err, "unmarshal nova response"), types.ErrorCodeBadResponseBody), nil
  228. }
  229. // 构造OpenAI格式响应
  230. response := dto.OpenAITextResponse{
  231. Id: helper.GetResponseID(c),
  232. Object: "chat.completion",
  233. Created: common.GetTimestamp(),
  234. Model: info.UpstreamModelName,
  235. Choices: []dto.OpenAITextResponseChoice{{
  236. Index: 0,
  237. Message: dto.Message{
  238. Role: "assistant",
  239. Content: novaResp.Output.Message.Content[0].Text,
  240. },
  241. FinishReason: "stop",
  242. }},
  243. Usage: dto.Usage{
  244. PromptTokens: novaResp.Usage.InputTokens,
  245. CompletionTokens: novaResp.Usage.OutputTokens,
  246. TotalTokens: novaResp.Usage.TotalTokens,
  247. },
  248. }
  249. c.JSON(http.StatusOK, response)
  250. return nil, &response.Usage
  251. }