stream_status_test.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package common
  2. import (
  3. "fmt"
  4. "sync"
  5. "testing"
  6. "github.com/stretchr/testify/assert"
  7. )
  8. func TestStreamStatus_SetEndReason_FirstWins(t *testing.T) {
  9. t.Parallel()
  10. s := NewStreamStatus()
  11. s.SetEndReason(StreamEndReasonDone, nil)
  12. s.SetEndReason(StreamEndReasonTimeout, nil)
  13. s.SetEndReason(StreamEndReasonClientGone, fmt.Errorf("context canceled"))
  14. assert.Equal(t, StreamEndReasonDone, s.EndReason)
  15. assert.Nil(t, s.EndError)
  16. }
  17. func TestStreamStatus_SetEndReason_WithError(t *testing.T) {
  18. t.Parallel()
  19. s := NewStreamStatus()
  20. expectedErr := fmt.Errorf("read: connection reset")
  21. s.SetEndReason(StreamEndReasonScannerErr, expectedErr)
  22. assert.Equal(t, StreamEndReasonScannerErr, s.EndReason)
  23. assert.Equal(t, expectedErr, s.EndError)
  24. }
  25. func TestStreamStatus_SetEndReason_NilSafe(t *testing.T) {
  26. t.Parallel()
  27. var s *StreamStatus
  28. s.SetEndReason(StreamEndReasonDone, nil)
  29. }
  30. func TestStreamStatus_SetEndReason_Concurrent(t *testing.T) {
  31. t.Parallel()
  32. s := NewStreamStatus()
  33. reasons := []StreamEndReason{
  34. StreamEndReasonDone,
  35. StreamEndReasonTimeout,
  36. StreamEndReasonClientGone,
  37. StreamEndReasonScannerErr,
  38. StreamEndReasonHandlerStop,
  39. StreamEndReasonEOF,
  40. StreamEndReasonPanic,
  41. StreamEndReasonPingFail,
  42. }
  43. var wg sync.WaitGroup
  44. for _, r := range reasons {
  45. wg.Add(1)
  46. go func(reason StreamEndReason) {
  47. defer wg.Done()
  48. s.SetEndReason(reason, nil)
  49. }(r)
  50. }
  51. wg.Wait()
  52. assert.NotEqual(t, StreamEndReasonNone, s.EndReason)
  53. }
  54. func TestStreamStatus_RecordError_Basic(t *testing.T) {
  55. t.Parallel()
  56. s := NewStreamStatus()
  57. s.RecordError("bad json")
  58. s.RecordError("another bad json")
  59. s.RecordError("client gone")
  60. assert.True(t, s.HasErrors())
  61. assert.Equal(t, 3, s.TotalErrorCount())
  62. assert.Len(t, s.Errors, 3)
  63. }
  64. func TestStreamStatus_RecordError_CapAtMax(t *testing.T) {
  65. t.Parallel()
  66. s := NewStreamStatus()
  67. for i := 0; i < 30; i++ {
  68. s.RecordError(fmt.Sprintf("error_%d", i))
  69. }
  70. assert.Equal(t, maxStreamErrorEntries, len(s.Errors))
  71. assert.Equal(t, 30, s.TotalErrorCount())
  72. }
  73. func TestStreamStatus_RecordError_NilSafe(t *testing.T) {
  74. t.Parallel()
  75. var s *StreamStatus
  76. s.RecordError("should not panic")
  77. }
  78. func TestStreamStatus_RecordError_Concurrent(t *testing.T) {
  79. t.Parallel()
  80. s := NewStreamStatus()
  81. var wg sync.WaitGroup
  82. for i := 0; i < 100; i++ {
  83. wg.Add(1)
  84. go func(idx int) {
  85. defer wg.Done()
  86. s.RecordError(fmt.Sprintf("error_%d", idx))
  87. }(i)
  88. }
  89. wg.Wait()
  90. assert.Equal(t, 100, s.TotalErrorCount())
  91. assert.LessOrEqual(t, len(s.Errors), maxStreamErrorEntries)
  92. }
  93. func TestStreamStatus_HasErrors_Empty(t *testing.T) {
  94. t.Parallel()
  95. s := NewStreamStatus()
  96. assert.False(t, s.HasErrors())
  97. assert.Equal(t, 0, s.TotalErrorCount())
  98. }
  99. func TestStreamStatus_HasErrors_NilSafe(t *testing.T) {
  100. t.Parallel()
  101. var s *StreamStatus
  102. assert.False(t, s.HasErrors())
  103. assert.Equal(t, 0, s.TotalErrorCount())
  104. }
  105. func TestStreamStatus_IsNormalEnd(t *testing.T) {
  106. t.Parallel()
  107. tests := []struct {
  108. reason StreamEndReason
  109. normal bool
  110. }{
  111. {StreamEndReasonDone, true},
  112. {StreamEndReasonEOF, true},
  113. {StreamEndReasonHandlerStop, true},
  114. {StreamEndReasonTimeout, false},
  115. {StreamEndReasonClientGone, false},
  116. {StreamEndReasonScannerErr, false},
  117. {StreamEndReasonPanic, false},
  118. {StreamEndReasonPingFail, false},
  119. {StreamEndReasonNone, false},
  120. }
  121. for _, tt := range tests {
  122. s := NewStreamStatus()
  123. s.SetEndReason(tt.reason, nil)
  124. assert.Equal(t, tt.normal, s.IsNormalEnd(), "reason=%s", tt.reason)
  125. }
  126. }
  127. func TestStreamStatus_IsNormalEnd_NilSafe(t *testing.T) {
  128. t.Parallel()
  129. var s *StreamStatus
  130. assert.True(t, s.IsNormalEnd())
  131. }
  132. func TestStreamStatus_Summary(t *testing.T) {
  133. t.Parallel()
  134. s := NewStreamStatus()
  135. s.SetEndReason(StreamEndReasonDone, nil)
  136. summary := s.Summary()
  137. assert.Contains(t, summary, "reason=done")
  138. assert.NotContains(t, summary, "soft_errors")
  139. s2 := NewStreamStatus()
  140. s2.SetEndReason(StreamEndReasonTimeout, nil)
  141. s2.RecordError("bad json")
  142. s2.RecordError("write failed")
  143. summary2 := s2.Summary()
  144. assert.Contains(t, summary2, "reason=timeout")
  145. assert.Contains(t, summary2, "soft_errors=2")
  146. }
  147. func TestStreamStatus_Summary_NilSafe(t *testing.T) {
  148. t.Parallel()
  149. var s *StreamStatus
  150. assert.Equal(t, "StreamStatus<nil>", s.Summary())
  151. }