relay-text.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. package controller
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/gin-gonic/gin"
  8. "io"
  9. "log"
  10. "net/http"
  11. "one-api/common"
  12. "one-api/model"
  13. "strings"
  14. "time"
  15. )
  16. const (
  17. APITypeOpenAI = iota
  18. APITypeClaude
  19. APITypePaLM
  20. APITypeBaidu
  21. APITypeZhipu
  22. APITypeAli
  23. APITypeXunfei
  24. )
  25. var httpClient *http.Client
  26. var impatientHTTPClient *http.Client
  27. func init() {
  28. httpClient = &http.Client{}
  29. impatientHTTPClient = &http.Client{
  30. Timeout: 5 * time.Second,
  31. }
  32. }
  33. func relayTextHelper(c *gin.Context, relayMode int) *OpenAIErrorWithStatusCode {
  34. channelType := c.GetInt("channel")
  35. tokenId := c.GetInt("token_id")
  36. userId := c.GetInt("id")
  37. consumeQuota := c.GetBool("consume_quota")
  38. group := c.GetString("group")
  39. var textRequest GeneralOpenAIRequest
  40. if consumeQuota || channelType == common.ChannelTypeAzure || channelType == common.ChannelTypePaLM {
  41. err := common.UnmarshalBodyReusable(c, &textRequest)
  42. if err != nil {
  43. return errorWrapper(err, "bind_request_body_failed", http.StatusBadRequest)
  44. }
  45. }
  46. if relayMode == RelayModeModerations && textRequest.Model == "" {
  47. textRequest.Model = "text-moderation-latest"
  48. }
  49. if relayMode == RelayModeEmbeddings && textRequest.Model == "" {
  50. textRequest.Model = c.Param("model")
  51. }
  52. // request validation
  53. if textRequest.Model == "" {
  54. return errorWrapper(errors.New("model is required"), "required_field_missing", http.StatusBadRequest)
  55. }
  56. switch relayMode {
  57. case RelayModeCompletions:
  58. if textRequest.Prompt == "" {
  59. return errorWrapper(errors.New("field prompt is required"), "required_field_missing", http.StatusBadRequest)
  60. }
  61. case RelayModeChatCompletions:
  62. if textRequest.Messages == nil || len(textRequest.Messages) == 0 {
  63. return errorWrapper(errors.New("field messages is required"), "required_field_missing", http.StatusBadRequest)
  64. }
  65. case RelayModeEmbeddings:
  66. case RelayModeModerations:
  67. if textRequest.Input == "" {
  68. return errorWrapper(errors.New("field input is required"), "required_field_missing", http.StatusBadRequest)
  69. }
  70. case RelayModeEdits:
  71. if textRequest.Instruction == "" {
  72. return errorWrapper(errors.New("field instruction is required"), "required_field_missing", http.StatusBadRequest)
  73. }
  74. }
  75. // map model name
  76. modelMapping := c.GetString("model_mapping")
  77. isModelMapped := false
  78. if modelMapping != "" && modelMapping != "{}" {
  79. modelMap := make(map[string]string)
  80. err := json.Unmarshal([]byte(modelMapping), &modelMap)
  81. if err != nil {
  82. return errorWrapper(err, "unmarshal_model_mapping_failed", http.StatusInternalServerError)
  83. }
  84. if modelMap[textRequest.Model] != "" {
  85. textRequest.Model = modelMap[textRequest.Model]
  86. isModelMapped = true
  87. }
  88. }
  89. apiType := APITypeOpenAI
  90. switch channelType {
  91. case common.ChannelTypeAnthropic:
  92. apiType = APITypeClaude
  93. case common.ChannelTypeBaidu:
  94. apiType = APITypeBaidu
  95. case common.ChannelTypePaLM:
  96. apiType = APITypePaLM
  97. case common.ChannelTypeZhipu:
  98. apiType = APITypeZhipu
  99. case common.ChannelTypeAli:
  100. apiType = APITypeAli
  101. case common.ChannelTypeXunfei:
  102. apiType = APITypeXunfei
  103. }
  104. isStable := c.GetBool("stable")
  105. baseURL := common.ChannelBaseURLs[channelType]
  106. requestURL := c.Request.URL.String()
  107. if c.GetString("base_url") != "" {
  108. baseURL = c.GetString("base_url")
  109. }
  110. fullRequestURL := fmt.Sprintf("%s%s", baseURL, requestURL)
  111. switch apiType {
  112. case APITypeOpenAI:
  113. if channelType == common.ChannelTypeAzure {
  114. // https://learn.microsoft.com/en-us/azure/cognitive-services/openai/chatgpt-quickstart?pivots=rest-api&tabs=command-line#rest-api
  115. query := c.Request.URL.Query()
  116. apiVersion := query.Get("api-version")
  117. if apiVersion == "" {
  118. apiVersion = c.GetString("api_version")
  119. }
  120. requestURL := strings.Split(requestURL, "?")[0]
  121. requestURL = fmt.Sprintf("%s?api-version=%s", requestURL, apiVersion)
  122. baseURL = c.GetString("base_url")
  123. task := strings.TrimPrefix(requestURL, "/v1/")
  124. model_ := textRequest.Model
  125. model_ = strings.Replace(model_, ".", "", -1)
  126. // https://github.com/songquanpeng/one-api/issues/67
  127. model_ = strings.TrimSuffix(model_, "-0301")
  128. model_ = strings.TrimSuffix(model_, "-0314")
  129. model_ = strings.TrimSuffix(model_, "-0613")
  130. fullRequestURL = fmt.Sprintf("%s/openai/deployments/%s/%s", baseURL, model_, task)
  131. }
  132. case APITypeClaude:
  133. fullRequestURL = "https://api.anthropic.com/v1/complete"
  134. if baseURL != "" {
  135. fullRequestURL = fmt.Sprintf("%s/v1/complete", baseURL)
  136. }
  137. case APITypeBaidu:
  138. switch textRequest.Model {
  139. case "ERNIE-Bot":
  140. fullRequestURL = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions"
  141. case "ERNIE-Bot-turbo":
  142. fullRequestURL = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant"
  143. case "BLOOMZ-7B":
  144. fullRequestURL = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/bloomz_7b1"
  145. case "Embedding-V1":
  146. fullRequestURL = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/embeddings/embedding-v1"
  147. }
  148. apiKey := c.Request.Header.Get("Authorization")
  149. apiKey = strings.TrimPrefix(apiKey, "Bearer ")
  150. var err error
  151. if apiKey, err = getBaiduAccessToken(apiKey); err != nil {
  152. return errorWrapper(err, "invalid_baidu_config", http.StatusInternalServerError)
  153. }
  154. fullRequestURL += "?access_token=" + apiKey
  155. case APITypePaLM:
  156. fullRequestURL = "https://generativelanguage.googleapis.com/v1beta2/models/chat-bison-001:generateMessage"
  157. if baseURL != "" {
  158. fullRequestURL = fmt.Sprintf("%s/v1beta2/models/chat-bison-001:generateMessage", baseURL)
  159. }
  160. apiKey := c.Request.Header.Get("Authorization")
  161. apiKey = strings.TrimPrefix(apiKey, "Bearer ")
  162. fullRequestURL += "?key=" + apiKey
  163. case APITypeZhipu:
  164. method := "invoke"
  165. if textRequest.Stream {
  166. method = "sse-invoke"
  167. }
  168. fullRequestURL = fmt.Sprintf("https://open.bigmodel.cn/api/paas/v3/model-api/%s/%s", textRequest.Model, method)
  169. case APITypeAli:
  170. fullRequestURL = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
  171. }
  172. var promptTokens int
  173. var completionTokens int
  174. switch relayMode {
  175. case RelayModeChatCompletions:
  176. promptTokens = countTokenMessages(textRequest.Messages, textRequest.Model)
  177. case RelayModeCompletions:
  178. promptTokens = countTokenInput(textRequest.Prompt, textRequest.Model)
  179. case RelayModeModerations:
  180. promptTokens = countTokenInput(textRequest.Input, textRequest.Model)
  181. }
  182. preConsumedTokens := common.PreConsumedQuota
  183. if textRequest.MaxTokens != 0 {
  184. preConsumedTokens = promptTokens + textRequest.MaxTokens
  185. }
  186. modelRatio := common.GetModelRatio(textRequest.Model)
  187. stableRatio := modelRatio
  188. groupRatio := common.GetGroupRatio(group)
  189. ratio := modelRatio * groupRatio
  190. preConsumedQuota := int(float64(preConsumedTokens) * ratio)
  191. userQuota, err := model.CacheGetUserQuota(userId)
  192. if isStable {
  193. stableRatio = (common.StablePrice / common.BasePrice) * modelRatio
  194. ratio = stableRatio * groupRatio
  195. }
  196. if err != nil {
  197. return errorWrapper(err, "get_user_quota_failed", http.StatusInternalServerError)
  198. }
  199. err = model.CacheDecreaseUserQuota(userId, preConsumedQuota)
  200. if err != nil {
  201. return errorWrapper(err, "decrease_user_quota_failed", http.StatusInternalServerError)
  202. }
  203. if userQuota > 100*preConsumedQuota {
  204. // in this case, we do not pre-consume quota
  205. // because the user has enough quota
  206. preConsumedQuota = 0
  207. }
  208. if consumeQuota && preConsumedQuota > 0 {
  209. err := model.PreConsumeTokenQuota(tokenId, preConsumedQuota)
  210. if err != nil {
  211. return errorWrapper(err, "pre_consume_token_quota_failed", http.StatusForbidden)
  212. }
  213. }
  214. var requestBody io.Reader
  215. if isModelMapped {
  216. jsonStr, err := json.Marshal(textRequest)
  217. if err != nil {
  218. return errorWrapper(err, "marshal_text_request_failed", http.StatusInternalServerError)
  219. }
  220. requestBody = bytes.NewBuffer(jsonStr)
  221. } else {
  222. requestBody = c.Request.Body
  223. }
  224. switch apiType {
  225. case APITypeClaude:
  226. claudeRequest := requestOpenAI2Claude(textRequest)
  227. jsonStr, err := json.Marshal(claudeRequest)
  228. if err != nil {
  229. return errorWrapper(err, "marshal_text_request_failed", http.StatusInternalServerError)
  230. }
  231. requestBody = bytes.NewBuffer(jsonStr)
  232. case APITypeBaidu:
  233. var jsonData []byte
  234. var err error
  235. switch relayMode {
  236. case RelayModeEmbeddings:
  237. baiduEmbeddingRequest := embeddingRequestOpenAI2Baidu(textRequest)
  238. jsonData, err = json.Marshal(baiduEmbeddingRequest)
  239. default:
  240. baiduRequest := requestOpenAI2Baidu(textRequest)
  241. jsonData, err = json.Marshal(baiduRequest)
  242. }
  243. if err != nil {
  244. return errorWrapper(err, "marshal_text_request_failed", http.StatusInternalServerError)
  245. }
  246. requestBody = bytes.NewBuffer(jsonData)
  247. case APITypePaLM:
  248. palmRequest := requestOpenAI2PaLM(textRequest)
  249. jsonStr, err := json.Marshal(palmRequest)
  250. if err != nil {
  251. return errorWrapper(err, "marshal_text_request_failed", http.StatusInternalServerError)
  252. }
  253. requestBody = bytes.NewBuffer(jsonStr)
  254. case APITypeZhipu:
  255. zhipuRequest := requestOpenAI2Zhipu(textRequest)
  256. jsonStr, err := json.Marshal(zhipuRequest)
  257. if err != nil {
  258. return errorWrapper(err, "marshal_text_request_failed", http.StatusInternalServerError)
  259. }
  260. requestBody = bytes.NewBuffer(jsonStr)
  261. case APITypeAli:
  262. aliRequest := requestOpenAI2Ali(textRequest)
  263. jsonStr, err := json.Marshal(aliRequest)
  264. if err != nil {
  265. return errorWrapper(err, "marshal_text_request_failed", http.StatusInternalServerError)
  266. }
  267. requestBody = bytes.NewBuffer(jsonStr)
  268. }
  269. var req *http.Request
  270. var resp *http.Response
  271. isStream := textRequest.Stream
  272. if apiType != APITypeXunfei { // cause xunfei use websocket
  273. req, err = http.NewRequest(c.Request.Method, fullRequestURL, requestBody)
  274. // 设置GetBody函数,该函数返回一个新的io.ReadCloser,该io.ReadCloser返回与原始请求体相同的数据
  275. req.GetBody = func() (io.ReadCloser, error) {
  276. return io.NopCloser(requestBody), nil
  277. }
  278. if err != nil {
  279. return errorWrapper(err, "new_request_failed", http.StatusInternalServerError)
  280. }
  281. apiKey := c.Request.Header.Get("Authorization")
  282. apiKey = strings.TrimPrefix(apiKey, "Bearer ")
  283. switch apiType {
  284. case APITypeOpenAI:
  285. if channelType == common.ChannelTypeAzure {
  286. req.Header.Set("api-key", apiKey)
  287. } else {
  288. req.Header.Set("Authorization", c.Request.Header.Get("Authorization"))
  289. }
  290. case APITypeClaude:
  291. req.Header.Set("x-api-key", apiKey)
  292. anthropicVersion := c.Request.Header.Get("anthropic-version")
  293. if anthropicVersion == "" {
  294. anthropicVersion = "2023-06-01"
  295. }
  296. req.Header.Set("anthropic-version", anthropicVersion)
  297. case APITypeZhipu:
  298. token := getZhipuToken(apiKey)
  299. req.Header.Set("Authorization", token)
  300. case APITypeAli:
  301. req.Header.Set("Authorization", "Bearer "+apiKey)
  302. if textRequest.Stream {
  303. req.Header.Set("X-DashScope-SSE", "enable")
  304. }
  305. }
  306. req.Header.Set("Content-Type", c.Request.Header.Get("Content-Type"))
  307. req.Header.Set("Accept", c.Request.Header.Get("Accept"))
  308. //req.Header.Set("Connection", c.Request.Header.Get("Connection"))
  309. req.Close = true
  310. resp, err = httpClient.Do(req)
  311. if err != nil {
  312. return errorWrapper(err, "do_request_failed", http.StatusInternalServerError)
  313. }
  314. err = req.Body.Close()
  315. if err != nil {
  316. return errorWrapper(err, "close_request_body_failed", http.StatusInternalServerError)
  317. }
  318. err = c.Request.Body.Close()
  319. if err != nil {
  320. return errorWrapper(err, "close_request_body_failed", http.StatusInternalServerError)
  321. }
  322. isStream = isStream || strings.HasPrefix(resp.Header.Get("Content-Type"), "text/event-stream")
  323. if resp.StatusCode != http.StatusOK {
  324. //print resp body
  325. body, err := io.ReadAll(resp.Body)
  326. if err != nil {
  327. log.Println("read resp err body failed", err)
  328. }
  329. log.Println("resp body:", string(body))
  330. errStr := fmt.Sprintf("bad status code: %d", resp.StatusCode)
  331. if resp.StatusCode == 503 {
  332. errStr = string(body)
  333. }
  334. return errorWrapper(
  335. fmt.Errorf(errStr), "bad_status_code", resp.StatusCode)
  336. }
  337. }
  338. var textResponse TextResponse
  339. tokenName := c.GetString("token_name")
  340. channelId := c.GetInt("channel_id")
  341. defer func() {
  342. // c.Writer.Flush()
  343. go func() {
  344. if consumeQuota {
  345. quota := 0
  346. completionRatio := common.GetCompletionRatio(textRequest.Model)
  347. promptTokens = textResponse.Usage.PromptTokens
  348. completionTokens = textResponse.Usage.CompletionTokens
  349. quota = promptTokens + int(float64(completionTokens)*completionRatio)
  350. quota = int(float64(quota) * ratio)
  351. if ratio != 0 && quota <= 0 {
  352. quota = 1
  353. }
  354. totalTokens := promptTokens + completionTokens
  355. if totalTokens == 0 {
  356. // in this case, must be some error happened
  357. // we cannot just return, because we may have to return the pre-consumed quota
  358. quota = 0
  359. }
  360. quotaDelta := quota - preConsumedQuota
  361. err := model.PostConsumeTokenQuota(tokenId, quotaDelta)
  362. if err != nil {
  363. common.SysError("error consuming token remain quota: " + err.Error())
  364. }
  365. err = model.CacheUpdateUserQuota(userId)
  366. if err != nil {
  367. common.SysError("error update user quota cache: " + err.Error())
  368. }
  369. if quota != 0 {
  370. logContent := fmt.Sprintf("模型倍率 %.2f,分组倍率 %.2f", modelRatio, groupRatio)
  371. model.RecordConsumeLog(userId, promptTokens, completionTokens, textRequest.Model, tokenName, quota, logContent, tokenId)
  372. model.UpdateUserUsedQuotaAndRequestCount(userId, quota)
  373. model.UpdateChannelUsedQuota(channelId, quota)
  374. }
  375. }
  376. }()
  377. }()
  378. switch apiType {
  379. case APITypeOpenAI:
  380. if isStream {
  381. err, responseText := openaiStreamHandler(c, resp, relayMode)
  382. if err != nil {
  383. return err
  384. }
  385. textResponse.Usage.PromptTokens = promptTokens
  386. textResponse.Usage.CompletionTokens = countTokenText(responseText, textRequest.Model)
  387. return nil
  388. } else {
  389. err, usage := openaiHandler(c, resp, consumeQuota, promptTokens, textRequest.Model)
  390. if err != nil {
  391. return err
  392. }
  393. if usage != nil {
  394. textResponse.Usage = *usage
  395. }
  396. return nil
  397. }
  398. case APITypeClaude:
  399. if isStream {
  400. err, responseText := claudeStreamHandler(c, resp)
  401. if err != nil {
  402. return err
  403. }
  404. textResponse.Usage.PromptTokens = promptTokens
  405. textResponse.Usage.CompletionTokens = countTokenText(responseText, textRequest.Model)
  406. return nil
  407. } else {
  408. err, usage := claudeHandler(c, resp, promptTokens, textRequest.Model)
  409. if err != nil {
  410. return err
  411. }
  412. if usage != nil {
  413. textResponse.Usage = *usage
  414. }
  415. return nil
  416. }
  417. case APITypeBaidu:
  418. if isStream {
  419. err, usage := baiduStreamHandler(c, resp)
  420. if err != nil {
  421. return err
  422. }
  423. if usage != nil {
  424. textResponse.Usage = *usage
  425. }
  426. return nil
  427. } else {
  428. var err *OpenAIErrorWithStatusCode
  429. var usage *Usage
  430. switch relayMode {
  431. case RelayModeEmbeddings:
  432. err, usage = baiduEmbeddingHandler(c, resp)
  433. default:
  434. err, usage = baiduHandler(c, resp)
  435. }
  436. if err != nil {
  437. return err
  438. }
  439. if usage != nil {
  440. textResponse.Usage = *usage
  441. }
  442. return nil
  443. }
  444. case APITypePaLM:
  445. if textRequest.Stream { // PaLM2 API does not support stream
  446. err, responseText := palmStreamHandler(c, resp)
  447. if err != nil {
  448. return err
  449. }
  450. textResponse.Usage.PromptTokens = promptTokens
  451. textResponse.Usage.CompletionTokens = countTokenText(responseText, textRequest.Model)
  452. return nil
  453. } else {
  454. err, usage := palmHandler(c, resp, promptTokens, textRequest.Model)
  455. if err != nil {
  456. return err
  457. }
  458. if usage != nil {
  459. textResponse.Usage = *usage
  460. }
  461. return nil
  462. }
  463. case APITypeZhipu:
  464. if isStream {
  465. err, usage := zhipuStreamHandler(c, resp)
  466. if err != nil {
  467. return err
  468. }
  469. if usage != nil {
  470. textResponse.Usage = *usage
  471. }
  472. // zhipu's API does not return prompt tokens & completion tokens
  473. textResponse.Usage.PromptTokens = textResponse.Usage.TotalTokens
  474. return nil
  475. } else {
  476. err, usage := zhipuHandler(c, resp)
  477. if err != nil {
  478. return err
  479. }
  480. if usage != nil {
  481. textResponse.Usage = *usage
  482. }
  483. // zhipu's API does not return prompt tokens & completion tokens
  484. textResponse.Usage.PromptTokens = textResponse.Usage.TotalTokens
  485. return nil
  486. }
  487. case APITypeAli:
  488. if isStream {
  489. err, usage := aliStreamHandler(c, resp)
  490. if err != nil {
  491. return err
  492. }
  493. if usage != nil {
  494. textResponse.Usage = *usage
  495. }
  496. return nil
  497. } else {
  498. err, usage := aliHandler(c, resp)
  499. if err != nil {
  500. return err
  501. }
  502. if usage != nil {
  503. textResponse.Usage = *usage
  504. }
  505. return nil
  506. }
  507. case APITypeXunfei:
  508. if isStream {
  509. auth := c.Request.Header.Get("Authorization")
  510. auth = strings.TrimPrefix(auth, "Bearer ")
  511. splits := strings.Split(auth, "|")
  512. if len(splits) != 3 {
  513. return errorWrapper(errors.New("invalid auth"), "invalid_auth", http.StatusBadRequest)
  514. }
  515. err, usage := xunfeiStreamHandler(c, textRequest, splits[0], splits[1], splits[2])
  516. if err != nil {
  517. return err
  518. }
  519. if usage != nil {
  520. textResponse.Usage = *usage
  521. }
  522. return nil
  523. } else {
  524. return errorWrapper(errors.New("xunfei api does not support non-stream mode"), "invalid_api_type", http.StatusBadRequest)
  525. }
  526. default:
  527. return errorWrapper(errors.New("unknown api type"), "unknown_api_type", http.StatusInternalServerError)
  528. }
  529. }