relay-dify.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package dify
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "github.com/gin-gonic/gin"
  6. "io"
  7. "net/http"
  8. "one-api/common"
  9. "one-api/dto"
  10. relaycommon "one-api/relay/common"
  11. "one-api/service"
  12. "strings"
  13. )
  14. func requestOpenAI2Dify(request dto.GeneralOpenAIRequest) *DifyChatRequest {
  15. content := ""
  16. for _, message := range request.Messages {
  17. if message.Role == "system" {
  18. content += "SYSTEM: \n" + message.StringContent() + "\n"
  19. } else if message.Role == "assistant" {
  20. content += "ASSISTANT: \n" + message.StringContent() + "\n"
  21. } else {
  22. content += "USER: \n" + message.StringContent() + "\n"
  23. }
  24. }
  25. mode := "blocking"
  26. if request.Stream {
  27. mode = "streaming"
  28. }
  29. user := request.User
  30. if user == "" {
  31. user = "api-user"
  32. }
  33. return &DifyChatRequest{
  34. Inputs: make(map[string]interface{}),
  35. Query: content,
  36. ResponseMode: mode,
  37. User: user,
  38. AutoGenerateName: false,
  39. }
  40. }
  41. func streamResponseDify2OpenAI(difyResponse DifyChunkChatCompletionResponse) *dto.ChatCompletionsStreamResponse {
  42. response := dto.ChatCompletionsStreamResponse{
  43. Object: "chat.completion.chunk",
  44. Created: common.GetTimestamp(),
  45. Model: "dify",
  46. }
  47. var choice dto.ChatCompletionsStreamResponseChoice
  48. if difyResponse.Event == "workflow_started" {
  49. choice.Delta.SetContentString("Workflow: " + difyResponse.Data.WorkflowId + "\n")
  50. } else if difyResponse.Event == "node_started" {
  51. choice.Delta.SetContentString("Node: " + difyResponse.Data.NodeId + "\n")
  52. } else if difyResponse.Event == "message" {
  53. choice.Delta.SetContentString(difyResponse.Answer)
  54. }
  55. response.Choices = append(response.Choices, choice)
  56. return &response
  57. }
  58. func difyStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo) (*dto.OpenAIErrorWithStatusCode, *dto.Usage) {
  59. var responseText string
  60. usage := &dto.Usage{}
  61. scanner := bufio.NewScanner(resp.Body)
  62. scanner.Split(bufio.ScanLines)
  63. service.SetEventStreamHeaders(c)
  64. for scanner.Scan() {
  65. data := scanner.Text()
  66. if len(data) < 5 || !strings.HasPrefix(data, "data:") {
  67. continue
  68. }
  69. data = strings.TrimPrefix(data, "data:")
  70. var difyResponse DifyChunkChatCompletionResponse
  71. err := json.Unmarshal([]byte(data), &difyResponse)
  72. if err != nil {
  73. common.SysError("error unmarshalling stream response: " + err.Error())
  74. continue
  75. }
  76. var openaiResponse dto.ChatCompletionsStreamResponse
  77. if difyResponse.Event == "message_end" {
  78. usage = &difyResponse.MetaData.Usage
  79. break
  80. } else if difyResponse.Event == "error" {
  81. break
  82. } else {
  83. openaiResponse = *streamResponseDify2OpenAI(difyResponse)
  84. if len(openaiResponse.Choices) != 0 {
  85. responseText += openaiResponse.Choices[0].Delta.GetContentString()
  86. }
  87. }
  88. err = service.ObjectData(c, openaiResponse)
  89. if err != nil {
  90. common.SysError(err.Error())
  91. }
  92. }
  93. if err := scanner.Err(); err != nil {
  94. common.SysError("error reading stream: " + err.Error())
  95. }
  96. service.Done(c)
  97. err := resp.Body.Close()
  98. if err != nil {
  99. //return service.OpenAIErrorWrapper(err, "close_response_body_failed", http.StatusInternalServerError), nil
  100. common.SysError("close_response_body_failed: " + err.Error())
  101. }
  102. if usage.TotalTokens == 0 {
  103. usage.PromptTokens = info.PromptTokens
  104. usage.CompletionTokens, _ = service.CountTokenText("gpt-3.5-turbo", responseText)
  105. usage.TotalTokens = usage.PromptTokens + usage.CompletionTokens
  106. }
  107. return nil, usage
  108. }
  109. func difyHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo) (*dto.OpenAIErrorWithStatusCode, *dto.Usage) {
  110. var difyResponse DifyChatCompletionResponse
  111. responseBody, err := io.ReadAll(resp.Body)
  112. if err != nil {
  113. return service.OpenAIErrorWrapper(err, "read_response_body_failed", http.StatusInternalServerError), nil
  114. }
  115. err = resp.Body.Close()
  116. if err != nil {
  117. return service.OpenAIErrorWrapper(err, "close_response_body_failed", http.StatusInternalServerError), nil
  118. }
  119. err = json.Unmarshal(responseBody, &difyResponse)
  120. if err != nil {
  121. return service.OpenAIErrorWrapper(err, "unmarshal_response_body_failed", http.StatusInternalServerError), nil
  122. }
  123. fullTextResponse := dto.OpenAITextResponse{
  124. Id: difyResponse.ConversationId,
  125. Object: "chat.completion",
  126. Created: common.GetTimestamp(),
  127. Usage: difyResponse.MetaData.Usage,
  128. }
  129. content, _ := json.Marshal(difyResponse.Answers)
  130. choice := dto.OpenAITextResponseChoice{
  131. Index: 0,
  132. Message: dto.Message{
  133. Role: "assistant",
  134. Content: content,
  135. },
  136. FinishReason: "stop",
  137. }
  138. fullTextResponse.Choices = append(fullTextResponse.Choices, choice)
  139. jsonResponse, err := json.Marshal(fullTextResponse)
  140. if err != nil {
  141. return service.OpenAIErrorWrapper(err, "marshal_response_body_failed", http.StatusInternalServerError), nil
  142. }
  143. c.Writer.Header().Set("Content-Type", "application/json")
  144. c.Writer.WriteHeader(resp.StatusCode)
  145. _, err = c.Writer.Write(jsonResponse)
  146. return nil, &difyResponse.MetaData.Usage
  147. }