stream_result.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. package helper
  2. import (
  3. relaycommon "github.com/QuantumNous/new-api/relay/common"
  4. )
  5. // StreamResult is passed to each dataHandler invocation, providing methods
  6. // to record soft errors, signal fatal stops, or mark normal completion.
  7. // StreamScannerHandler checks IsStopped() after each callback invocation.
  8. type StreamResult struct {
  9. status *relaycommon.StreamStatus
  10. stopped bool
  11. }
  12. func newStreamResult(status *relaycommon.StreamStatus) *StreamResult {
  13. return &StreamResult{status: status}
  14. }
  15. // Error records a soft error. The stream continues processing.
  16. // Can be called multiple times per chunk.
  17. func (r *StreamResult) Error(err error) {
  18. if err == nil {
  19. return
  20. }
  21. r.status.RecordError(err.Error())
  22. }
  23. // Stop records a fatal error and marks the stream to stop after this chunk.
  24. func (r *StreamResult) Stop(err error) {
  25. if err != nil {
  26. r.status.RecordError(err.Error())
  27. }
  28. r.status.SetEndReason(relaycommon.StreamEndReasonHandlerStop, err)
  29. r.stopped = true
  30. }
  31. // Done signals that the handler has finished processing normally
  32. // (e.g., Dify "message_end"). The stream stops after this chunk.
  33. func (r *StreamResult) Done() {
  34. r.status.SetEndReason(relaycommon.StreamEndReasonDone, nil)
  35. r.stopped = true
  36. }
  37. // IsStopped returns whether Stop() or Done() was called during this chunk.
  38. func (r *StreamResult) IsStopped() bool {
  39. return r.stopped
  40. }
  41. // reset clears the per-chunk stopped flag so the object can be reused.
  42. func (r *StreamResult) reset() {
  43. r.stopped = false
  44. }