| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- package common
- import (
- "fmt"
- "sync"
- "testing"
- "github.com/stretchr/testify/assert"
- )
- func TestStreamStatus_SetEndReason_FirstWins(t *testing.T) {
- t.Parallel()
- s := NewStreamStatus()
- s.SetEndReason(StreamEndReasonDone, nil)
- s.SetEndReason(StreamEndReasonTimeout, nil)
- s.SetEndReason(StreamEndReasonClientGone, fmt.Errorf("context canceled"))
- assert.Equal(t, StreamEndReasonDone, s.EndReason)
- assert.Nil(t, s.EndError)
- }
- func TestStreamStatus_SetEndReason_WithError(t *testing.T) {
- t.Parallel()
- s := NewStreamStatus()
- expectedErr := fmt.Errorf("read: connection reset")
- s.SetEndReason(StreamEndReasonScannerErr, expectedErr)
- assert.Equal(t, StreamEndReasonScannerErr, s.EndReason)
- assert.Equal(t, expectedErr, s.EndError)
- }
- func TestStreamStatus_SetEndReason_NilSafe(t *testing.T) {
- t.Parallel()
- var s *StreamStatus
- s.SetEndReason(StreamEndReasonDone, nil)
- }
- func TestStreamStatus_SetEndReason_Concurrent(t *testing.T) {
- t.Parallel()
- s := NewStreamStatus()
- reasons := []StreamEndReason{
- StreamEndReasonDone,
- StreamEndReasonTimeout,
- StreamEndReasonClientGone,
- StreamEndReasonScannerErr,
- StreamEndReasonHandlerStop,
- StreamEndReasonEOF,
- StreamEndReasonPanic,
- StreamEndReasonPingFail,
- }
- var wg sync.WaitGroup
- for _, r := range reasons {
- wg.Add(1)
- go func(reason StreamEndReason) {
- defer wg.Done()
- s.SetEndReason(reason, nil)
- }(r)
- }
- wg.Wait()
- assert.NotEqual(t, StreamEndReasonNone, s.EndReason)
- }
- func TestStreamStatus_RecordError_Basic(t *testing.T) {
- t.Parallel()
- s := NewStreamStatus()
- s.RecordError("bad json")
- s.RecordError("another bad json")
- s.RecordError("client gone")
- assert.True(t, s.HasErrors())
- assert.Equal(t, 3, s.TotalErrorCount())
- assert.Len(t, s.Errors, 3)
- }
- func TestStreamStatus_RecordError_CapAtMax(t *testing.T) {
- t.Parallel()
- s := NewStreamStatus()
- for i := 0; i < 30; i++ {
- s.RecordError(fmt.Sprintf("error_%d", i))
- }
- assert.Equal(t, maxStreamErrorEntries, len(s.Errors))
- assert.Equal(t, 30, s.TotalErrorCount())
- }
- func TestStreamStatus_RecordError_NilSafe(t *testing.T) {
- t.Parallel()
- var s *StreamStatus
- s.RecordError("should not panic")
- }
- func TestStreamStatus_RecordError_Concurrent(t *testing.T) {
- t.Parallel()
- s := NewStreamStatus()
- var wg sync.WaitGroup
- for i := 0; i < 100; i++ {
- wg.Add(1)
- go func(idx int) {
- defer wg.Done()
- s.RecordError(fmt.Sprintf("error_%d", idx))
- }(i)
- }
- wg.Wait()
- assert.Equal(t, 100, s.TotalErrorCount())
- assert.LessOrEqual(t, len(s.Errors), maxStreamErrorEntries)
- }
- func TestStreamStatus_HasErrors_Empty(t *testing.T) {
- t.Parallel()
- s := NewStreamStatus()
- assert.False(t, s.HasErrors())
- assert.Equal(t, 0, s.TotalErrorCount())
- }
- func TestStreamStatus_HasErrors_NilSafe(t *testing.T) {
- t.Parallel()
- var s *StreamStatus
- assert.False(t, s.HasErrors())
- assert.Equal(t, 0, s.TotalErrorCount())
- }
- func TestStreamStatus_IsNormalEnd(t *testing.T) {
- t.Parallel()
- tests := []struct {
- reason StreamEndReason
- normal bool
- }{
- {StreamEndReasonDone, true},
- {StreamEndReasonEOF, true},
- {StreamEndReasonHandlerStop, true},
- {StreamEndReasonTimeout, false},
- {StreamEndReasonClientGone, false},
- {StreamEndReasonScannerErr, false},
- {StreamEndReasonPanic, false},
- {StreamEndReasonPingFail, false},
- {StreamEndReasonNone, false},
- }
- for _, tt := range tests {
- s := NewStreamStatus()
- s.SetEndReason(tt.reason, nil)
- assert.Equal(t, tt.normal, s.IsNormalEnd(), "reason=%s", tt.reason)
- }
- }
- func TestStreamStatus_IsNormalEnd_NilSafe(t *testing.T) {
- t.Parallel()
- var s *StreamStatus
- assert.True(t, s.IsNormalEnd())
- }
- func TestStreamStatus_Summary(t *testing.T) {
- t.Parallel()
- s := NewStreamStatus()
- s.SetEndReason(StreamEndReasonDone, nil)
- summary := s.Summary()
- assert.Contains(t, summary, "reason=done")
- assert.NotContains(t, summary, "soft_errors")
- s2 := NewStreamStatus()
- s2.SetEndReason(StreamEndReasonTimeout, nil)
- s2.RecordError("bad json")
- s2.RecordError("write failed")
- summary2 := s2.Summary()
- assert.Contains(t, summary2, "reason=timeout")
- assert.Contains(t, summary2, "soft_errors=2")
- }
- func TestStreamStatus_Summary_NilSafe(t *testing.T) {
- t.Parallel()
- var s *StreamStatus
- assert.Equal(t, "StreamStatus<nil>", s.Summary())
- }
|