stats.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package kafka
  2. import (
  3. "sync/atomic"
  4. "time"
  5. )
  6. // SummaryStats is a data structure that carries a summary of observed values.
  7. // The average, minimum, and maximum are reported.
  8. type SummaryStats struct {
  9. Avg int64 `metric:"avg" type:"gauge"`
  10. Min int64 `metric:"min" type:"gauge"`
  11. Max int64 `metric:"max" type:"gauge"`
  12. }
  13. // DurationStats is a data structure that carries a summary of observed duration
  14. // values. The average, minimum, and maximum are reported.
  15. type DurationStats struct {
  16. Avg time.Duration `metric:"avg" type:"gauge"`
  17. Min time.Duration `metric:"min" type:"gauge"`
  18. Max time.Duration `metric:"max" type:"gauge"`
  19. }
  20. // counter is an atomic incrementing counter which gets reset on snapshot.
  21. //
  22. // Since atomic is used to mutate the statistic the value must be 64-bit aligned.
  23. // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  24. type counter int64
  25. func (c *counter) ptr() *int64 {
  26. return (*int64)(c)
  27. }
  28. func (c *counter) observe(v int64) {
  29. atomic.AddInt64(c.ptr(), v)
  30. }
  31. func (c *counter) snapshot() int64 {
  32. return atomic.SwapInt64(c.ptr(), 0)
  33. }
  34. // gauge is an atomic integer that may be set to any arbitrary value, the value
  35. // does not change after a snapshot.
  36. //
  37. // Since atomic is used to mutate the statistic the value must be 64-bit aligned.
  38. // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  39. type gauge int64
  40. func (g *gauge) ptr() *int64 {
  41. return (*int64)(g)
  42. }
  43. func (g *gauge) observe(v int64) {
  44. atomic.StoreInt64(g.ptr(), v)
  45. }
  46. func (g *gauge) snapshot() int64 {
  47. return atomic.LoadInt64(g.ptr())
  48. }
  49. // minimum is an atomic integral type that keeps track of the minimum of all
  50. // values that it observed between snapshots.
  51. //
  52. // Since atomic is used to mutate the statistic the value must be 64-bit aligned.
  53. // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  54. type minimum int64
  55. func (m *minimum) ptr() *int64 {
  56. return (*int64)(m)
  57. }
  58. func (m *minimum) observe(v int64) {
  59. for {
  60. ptr := m.ptr()
  61. min := atomic.LoadInt64(ptr)
  62. if min >= 0 && min <= v {
  63. break
  64. }
  65. if atomic.CompareAndSwapInt64(ptr, min, v) {
  66. break
  67. }
  68. }
  69. }
  70. func (m *minimum) snapshot() int64 {
  71. p := m.ptr()
  72. v := atomic.LoadInt64(p)
  73. atomic.CompareAndSwapInt64(p, v, -1)
  74. if v < 0 {
  75. v = 0
  76. }
  77. return v
  78. }
  79. // maximum is an atomic integral type that keeps track of the maximum of all
  80. // values that it observed between snapshots.
  81. //
  82. // Since atomic is used to mutate the statistic the value must be 64-bit aligned.
  83. // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  84. type maximum int64
  85. func (m *maximum) ptr() *int64 {
  86. return (*int64)(m)
  87. }
  88. func (m *maximum) observe(v int64) {
  89. for {
  90. ptr := m.ptr()
  91. max := atomic.LoadInt64(ptr)
  92. if max >= 0 && max >= v {
  93. break
  94. }
  95. if atomic.CompareAndSwapInt64(ptr, max, v) {
  96. break
  97. }
  98. }
  99. }
  100. func (m *maximum) snapshot() int64 {
  101. p := m.ptr()
  102. v := atomic.LoadInt64(p)
  103. atomic.CompareAndSwapInt64(p, v, -1)
  104. if v < 0 {
  105. v = 0
  106. }
  107. return v
  108. }
  109. type summary struct {
  110. min minimum
  111. max maximum
  112. sum counter
  113. count counter
  114. }
  115. func makeSummary() summary {
  116. return summary{
  117. min: -1,
  118. max: -1,
  119. }
  120. }
  121. func (s *summary) observe(v int64) {
  122. s.min.observe(v)
  123. s.max.observe(v)
  124. s.sum.observe(v)
  125. s.count.observe(1)
  126. }
  127. func (s *summary) observeDuration(v time.Duration) {
  128. s.observe(int64(v))
  129. }
  130. func (s *summary) snapshot() SummaryStats {
  131. avg := int64(0)
  132. min := s.min.snapshot()
  133. max := s.max.snapshot()
  134. sum := s.sum.snapshot()
  135. count := s.count.snapshot()
  136. if count != 0 {
  137. avg = int64(float64(sum) / float64(count))
  138. }
  139. return SummaryStats{
  140. Avg: avg,
  141. Min: min,
  142. Max: max,
  143. }
  144. }
  145. func (s *summary) snapshotDuration() DurationStats {
  146. summary := s.snapshot()
  147. return DurationStats{
  148. Avg: time.Duration(summary.Avg),
  149. Min: time.Duration(summary.Min),
  150. Max: time.Duration(summary.Max),
  151. }
  152. }