123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- package kafka
- import (
- "bufio"
- "io"
- "sync"
- "time"
- )
- type Batch struct {
- mutex sync.Mutex
- conn *Conn
- lock *sync.Mutex
- msgs *messageSetReader
- deadline time.Time
- throttle time.Duration
- topic string
- partition int
- offset int64
- highWaterMark int64
- err error
- }
- func (batch *Batch) Throttle() time.Duration {
- return batch.throttle
- }
- func (batch *Batch) HighWaterMark() int64 {
- return batch.highWaterMark
- }
- func (batch *Batch) Partition() int {
- return batch.partition
- }
- func (batch *Batch) Offset() int64 {
- batch.mutex.Lock()
- offset := batch.offset
- batch.mutex.Unlock()
- return offset
- }
- func (batch *Batch) Close() error {
- batch.mutex.Lock()
- err := batch.close()
- batch.mutex.Unlock()
- return err
- }
- func (batch *Batch) close() (err error) {
- conn := batch.conn
- lock := batch.lock
- batch.conn = nil
- batch.lock = nil
- if batch.msgs != nil {
- batch.msgs.discard()
- }
- if err = batch.err; err == io.EOF {
- err = nil
- }
- if conn != nil {
- conn.rdeadline.unsetConnReadDeadline()
- conn.mutex.Lock()
- conn.offset = batch.offset
- conn.mutex.Unlock()
- if err != nil {
- if _, ok := err.(Error); !ok && err != io.ErrShortBuffer {
- conn.Close()
- }
- }
- }
- if lock != nil {
- lock.Unlock()
- }
- return
- }
- func (batch *Batch) Err() error { return batch.err }
- func (batch *Batch) Read(b []byte) (int, error) {
- n := 0
- batch.mutex.Lock()
- offset := batch.offset
- _, _, _, err := batch.readMessage(
- func(r *bufio.Reader, size int, nbytes int) (int, error) {
- if nbytes < 0 {
- return size, nil
- }
- return discardN(r, size, nbytes)
- },
- func(r *bufio.Reader, size int, nbytes int) (int, error) {
- if nbytes < 0 {
- return size, nil
- }
-
-
- if nbytes > size {
- return size, errShortRead
- }
- n = nbytes
- if nbytes > cap(b) {
- nbytes = cap(b)
- }
- if nbytes > len(b) {
- b = b[:nbytes]
- }
- nbytes, err := io.ReadFull(r, b[:nbytes])
- if err != nil {
- return size - nbytes, err
- }
- return discardN(r, size-nbytes, n-nbytes)
- },
- )
- if err == nil && n > len(b) {
- n, err = len(b), io.ErrShortBuffer
- batch.err = io.ErrShortBuffer
- batch.offset = offset
- }
- batch.mutex.Unlock()
- return n, err
- }
- func (batch *Batch) ReadMessage() (Message, error) {
- msg := Message{}
- batch.mutex.Lock()
- var offset, timestamp int64
- var headers []Header
- var err error
- offset, timestamp, headers, err = batch.readMessage(
- func(r *bufio.Reader, size int, nbytes int) (remain int, err error) {
- msg.Key, remain, err = readNewBytes(r, size, nbytes)
- return
- },
- func(r *bufio.Reader, size int, nbytes int) (remain int, err error) {
- msg.Value, remain, err = readNewBytes(r, size, nbytes)
- return
- },
- )
- for batch.conn != nil && offset < batch.conn.offset {
- if err != nil {
- break
- }
- offset, timestamp, headers, err = batch.readMessage(
- func(r *bufio.Reader, size int, nbytes int) (remain int, err error) {
- msg.Key, remain, err = readNewBytes(r, size, nbytes)
- return
- },
- func(r *bufio.Reader, size int, nbytes int) (remain int, err error) {
- msg.Value, remain, err = readNewBytes(r, size, nbytes)
- return
- },
- )
- }
- batch.mutex.Unlock()
- msg.Topic = batch.topic
- msg.Partition = batch.partition
- msg.Offset = offset
- msg.Time = makeTime(timestamp)
- msg.Headers = headers
- return msg, err
- }
- func (batch *Batch) readMessage(
- key func(*bufio.Reader, int, int) (int, error),
- val func(*bufio.Reader, int, int) (int, error),
- ) (offset int64, timestamp int64, headers []Header, err error) {
- if err = batch.err; err != nil {
- return
- }
- offset, timestamp, headers, err = batch.msgs.readMessage(batch.offset, key, val)
- switch err {
- case nil:
- batch.offset = offset + 1
- case errShortRead:
-
-
-
- err = batch.msgs.discard()
- switch {
- case err != nil:
-
-
-
-
-
- batch.err = dontExpectEOF(err)
- case batch.msgs.remaining() == 0:
-
-
-
-
-
-
-
- err = checkTimeoutErr(batch.deadline)
- batch.err = err
- }
- default:
-
-
-
-
-
- batch.err = dontExpectEOF(err)
- }
- return
- }
- func checkTimeoutErr(deadline time.Time) (err error) {
- if !deadline.IsZero() && time.Now().After(deadline) {
- err = RequestTimedOut
- } else {
- err = io.EOF
- }
- return
- }
|