batch.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. package kafka
  2. import (
  3. "bufio"
  4. "io"
  5. "sync"
  6. "time"
  7. )
  8. // A Batch is an iterator over a sequence of messages fetched from a kafka
  9. // server.
  10. //
  11. // Batches are created by calling (*Conn).ReadBatch. They hold a internal lock
  12. // on the connection, which is released when the batch is closed. Failing to
  13. // call a batch's Close method will likely result in a dead-lock when trying to
  14. // use the connection.
  15. //
  16. // Batches are safe to use concurrently from multiple goroutines.
  17. type Batch struct {
  18. mutex sync.Mutex
  19. conn *Conn
  20. lock *sync.Mutex
  21. msgs *messageSetReader
  22. deadline time.Time
  23. throttle time.Duration
  24. topic string
  25. partition int
  26. offset int64
  27. highWaterMark int64
  28. err error
  29. }
  30. // Throttle gives the throttling duration applied by the kafka server on the
  31. // connection.
  32. func (batch *Batch) Throttle() time.Duration {
  33. return batch.throttle
  34. }
  35. // Watermark returns the current highest watermark in a partition.
  36. func (batch *Batch) HighWaterMark() int64 {
  37. return batch.highWaterMark
  38. }
  39. // Partition returns the batch partition.
  40. func (batch *Batch) Partition() int {
  41. return batch.partition
  42. }
  43. // Offset returns the offset of the next message in the batch.
  44. func (batch *Batch) Offset() int64 {
  45. batch.mutex.Lock()
  46. offset := batch.offset
  47. batch.mutex.Unlock()
  48. return offset
  49. }
  50. // Close closes the batch, releasing the connection lock and returning an error
  51. // if reading the batch failed for any reason.
  52. func (batch *Batch) Close() error {
  53. batch.mutex.Lock()
  54. err := batch.close()
  55. batch.mutex.Unlock()
  56. return err
  57. }
  58. func (batch *Batch) close() (err error) {
  59. conn := batch.conn
  60. lock := batch.lock
  61. batch.conn = nil
  62. batch.lock = nil
  63. if batch.msgs != nil {
  64. batch.msgs.discard()
  65. }
  66. if err = batch.err; err == io.EOF {
  67. err = nil
  68. }
  69. if conn != nil {
  70. conn.rdeadline.unsetConnReadDeadline()
  71. conn.mutex.Lock()
  72. conn.offset = batch.offset
  73. conn.mutex.Unlock()
  74. if err != nil {
  75. if _, ok := err.(Error); !ok && err != io.ErrShortBuffer {
  76. conn.Close()
  77. }
  78. }
  79. }
  80. if lock != nil {
  81. lock.Unlock()
  82. }
  83. return
  84. }
  85. // Err returns a non-nil error if the batch is broken. This is the same error
  86. // that would be returned by Read, ReadMessage or Close (except in the case of
  87. // io.EOF which is never returned by Close).
  88. //
  89. // This method is useful when building retry mechanisms for (*Conn).ReadBatch,
  90. // the program can check whether the batch carried a error before attempting to
  91. // read the first message.
  92. //
  93. // Note that checking errors on a batch is optional, calling Read or ReadMessage
  94. // is always valid and can be used to either read a message or an error in cases
  95. // where that's convenient.
  96. func (batch *Batch) Err() error { return batch.err }
  97. // Read reads the value of the next message from the batch into b, returning the
  98. // number of bytes read, or an error if the next message couldn't be read.
  99. //
  100. // If an error is returned the batch cannot be used anymore and calling Read
  101. // again will keep returning that error. All errors except io.EOF (indicating
  102. // that the program consumed all messages from the batch) are also returned by
  103. // Close.
  104. //
  105. // The method fails with io.ErrShortBuffer if the buffer passed as argument is
  106. // too small to hold the message value.
  107. func (batch *Batch) Read(b []byte) (int, error) {
  108. n := 0
  109. batch.mutex.Lock()
  110. offset := batch.offset
  111. _, _, _, err := batch.readMessage(
  112. func(r *bufio.Reader, size int, nbytes int) (int, error) {
  113. if nbytes < 0 {
  114. return size, nil
  115. }
  116. return discardN(r, size, nbytes)
  117. },
  118. func(r *bufio.Reader, size int, nbytes int) (int, error) {
  119. if nbytes < 0 {
  120. return size, nil
  121. }
  122. // make sure there are enough bytes for the message value. return
  123. // errShortRead if the message is truncated.
  124. if nbytes > size {
  125. return size, errShortRead
  126. }
  127. n = nbytes // return value
  128. if nbytes > cap(b) {
  129. nbytes = cap(b)
  130. }
  131. if nbytes > len(b) {
  132. b = b[:nbytes]
  133. }
  134. nbytes, err := io.ReadFull(r, b[:nbytes])
  135. if err != nil {
  136. return size - nbytes, err
  137. }
  138. return discardN(r, size-nbytes, n-nbytes)
  139. },
  140. )
  141. if err == nil && n > len(b) {
  142. n, err = len(b), io.ErrShortBuffer
  143. batch.err = io.ErrShortBuffer
  144. batch.offset = offset // rollback
  145. }
  146. batch.mutex.Unlock()
  147. return n, err
  148. }
  149. // ReadMessage reads and return the next message from the batch.
  150. //
  151. // Because this method allocate memory buffers for the message key and value
  152. // it is less memory-efficient than Read, but has the advantage of never
  153. // failing with io.ErrShortBuffer.
  154. func (batch *Batch) ReadMessage() (Message, error) {
  155. msg := Message{}
  156. batch.mutex.Lock()
  157. var offset, timestamp int64
  158. var headers []Header
  159. var err error
  160. offset, timestamp, headers, err = batch.readMessage(
  161. func(r *bufio.Reader, size int, nbytes int) (remain int, err error) {
  162. msg.Key, remain, err = readNewBytes(r, size, nbytes)
  163. return
  164. },
  165. func(r *bufio.Reader, size int, nbytes int) (remain int, err error) {
  166. msg.Value, remain, err = readNewBytes(r, size, nbytes)
  167. return
  168. },
  169. )
  170. for batch.conn != nil && offset < batch.conn.offset {
  171. if err != nil {
  172. break
  173. }
  174. offset, timestamp, headers, err = batch.readMessage(
  175. func(r *bufio.Reader, size int, nbytes int) (remain int, err error) {
  176. msg.Key, remain, err = readNewBytes(r, size, nbytes)
  177. return
  178. },
  179. func(r *bufio.Reader, size int, nbytes int) (remain int, err error) {
  180. msg.Value, remain, err = readNewBytes(r, size, nbytes)
  181. return
  182. },
  183. )
  184. }
  185. batch.mutex.Unlock()
  186. msg.Topic = batch.topic
  187. msg.Partition = batch.partition
  188. msg.Offset = offset
  189. msg.Time = makeTime(timestamp)
  190. msg.Headers = headers
  191. return msg, err
  192. }
  193. func (batch *Batch) readMessage(
  194. key func(*bufio.Reader, int, int) (int, error),
  195. val func(*bufio.Reader, int, int) (int, error),
  196. ) (offset int64, timestamp int64, headers []Header, err error) {
  197. if err = batch.err; err != nil {
  198. return
  199. }
  200. offset, timestamp, headers, err = batch.msgs.readMessage(batch.offset, key, val)
  201. switch err {
  202. case nil:
  203. batch.offset = offset + 1
  204. case errShortRead:
  205. // As an "optimization" kafka truncates the returned response after
  206. // producing MaxBytes, which could then cause the code to return
  207. // errShortRead.
  208. err = batch.msgs.discard()
  209. switch {
  210. case err != nil:
  211. // Since io.EOF is used by the batch to indicate that there is are
  212. // no more messages to consume, it is crucial that any io.EOF errors
  213. // on the underlying connection are repackaged. Otherwise, the
  214. // caller can't tell the difference between a batch that was fully
  215. // consumed or a batch whose connection is in an error state.
  216. batch.err = dontExpectEOF(err)
  217. case batch.msgs.remaining() == 0:
  218. // Because we use the adjusted deadline we could end up returning
  219. // before the actual deadline occurred. This is necessary otherwise
  220. // timing out the connection for real could end up leaving it in an
  221. // unpredictable state, which would require closing it.
  222. // This design decision was made to maximize the chances of keeping
  223. // the connection open, the trade off being to lose precision on the
  224. // read deadline management.
  225. err = checkTimeoutErr(batch.deadline)
  226. batch.err = err
  227. }
  228. default:
  229. // Since io.EOF is used by the batch to indicate that there is are
  230. // no more messages to consume, it is crucial that any io.EOF errors
  231. // on the underlying connection are repackaged. Otherwise, the
  232. // caller can't tell the difference between a batch that was fully
  233. // consumed or a batch whose connection is in an error state.
  234. batch.err = dontExpectEOF(err)
  235. }
  236. return
  237. }
  238. func checkTimeoutErr(deadline time.Time) (err error) {
  239. if !deadline.IsZero() && time.Now().After(deadline) {
  240. err = RequestTimedOut
  241. } else {
  242. err = io.EOF
  243. }
  244. return
  245. }