123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591 |
- package kafka
- import (
- "bufio"
- "bytes"
- "fmt"
- "io"
- "time"
- )
- // Message is a data structure representing kafka messages.
- type Message struct {
- // Topic indicates which topic this message was consumed from via Reader.
- //
- // When being used with Writer, this can be used to configured the topic if
- // not already specified on the writer itself.
- Topic string
- // Partition is read-only and MUST NOT be set when writing messages
- Partition int
- Offset int64
- Key []byte
- Value []byte
- Headers []Header
- // If not set at the creation, Time will be automatically set when
- // writing the message.
- Time time.Time
- }
- func (msg Message) message(cw *crc32Writer) message {
- m := message{
- MagicByte: 1,
- Key: msg.Key,
- Value: msg.Value,
- Timestamp: timestamp(msg.Time),
- }
- if cw != nil {
- m.CRC = m.crc32(cw)
- }
- return m
- }
- const timestampSize = 8
- func (msg *Message) size() int32 {
- return 4 + 1 + 1 + sizeofBytes(msg.Key) + sizeofBytes(msg.Value) + timestampSize
- }
- type message struct {
- CRC int32
- MagicByte int8
- Attributes int8
- Timestamp int64
- Key []byte
- Value []byte
- }
- func (m message) crc32(cw *crc32Writer) int32 {
- cw.crc32 = 0
- cw.writeInt8(m.MagicByte)
- cw.writeInt8(m.Attributes)
- if m.MagicByte != 0 {
- cw.writeInt64(m.Timestamp)
- }
- cw.writeBytes(m.Key)
- cw.writeBytes(m.Value)
- return int32(cw.crc32)
- }
- func (m message) size() int32 {
- size := 4 + 1 + 1 + sizeofBytes(m.Key) + sizeofBytes(m.Value)
- if m.MagicByte != 0 {
- size += timestampSize
- }
- return size
- }
- func (m message) writeTo(wb *writeBuffer) {
- wb.writeInt32(m.CRC)
- wb.writeInt8(m.MagicByte)
- wb.writeInt8(m.Attributes)
- if m.MagicByte != 0 {
- wb.writeInt64(m.Timestamp)
- }
- wb.writeBytes(m.Key)
- wb.writeBytes(m.Value)
- }
- type messageSetItem struct {
- Offset int64
- MessageSize int32
- Message message
- }
- func (m messageSetItem) size() int32 {
- return 8 + 4 + m.Message.size()
- }
- func (m messageSetItem) writeTo(wb *writeBuffer) {
- wb.writeInt64(m.Offset)
- wb.writeInt32(m.MessageSize)
- m.Message.writeTo(wb)
- }
- type messageSet []messageSetItem
- func (s messageSet) size() (size int32) {
- for _, m := range s {
- size += m.size()
- }
- return
- }
- func (s messageSet) writeTo(wb *writeBuffer) {
- for _, m := range s {
- m.writeTo(wb)
- }
- }
- type messageSetReader struct {
- empty bool
- version int
- v1 messageSetReaderV1
- v2 messageSetReaderV2
- }
- func (r *messageSetReader) readMessage(min int64,
- key func(*bufio.Reader, int, int) (int, error),
- val func(*bufio.Reader, int, int) (int, error),
- ) (offset int64, timestamp int64, headers []Header, err error) {
- if r.empty {
- return 0, 0, nil, RequestTimedOut
- }
- switch r.version {
- case 1:
- return r.v1.readMessage(min, key, val)
- case 2:
- return r.v2.readMessage(min, key, val)
- default:
- panic("Invalid messageSetReader - unknown message reader version")
- }
- }
- func (r *messageSetReader) remaining() (remain int) {
- if r.empty {
- return 0
- }
- switch r.version {
- case 1:
- return r.v1.remaining()
- case 2:
- return r.v2.remaining()
- default:
- panic("Invalid messageSetReader - unknown message reader version")
- }
- }
- func (r *messageSetReader) discard() (err error) {
- if r.empty {
- return nil
- }
- switch r.version {
- case 1:
- return r.v1.discard()
- case 2:
- return r.v2.discard()
- default:
- panic("Invalid messageSetReader - unknown message reader version")
- }
- }
- type messageSetReaderV1 struct {
- *readerStack
- }
- type readerStack struct {
- reader *bufio.Reader
- remain int
- base int64
- parent *readerStack
- }
- func newMessageSetReader(reader *bufio.Reader, remain int) (*messageSetReader, error) {
- headerLength := 8 + 4 + 4 + 1 // offset + messageSize + crc + magicByte
- if headerLength > remain {
- return nil, errShortRead
- }
- b, err := reader.Peek(headerLength)
- if err != nil {
- return nil, err
- }
- var version int8 = int8(b[headerLength-1])
- switch version {
- case 0, 1:
- return &messageSetReader{
- version: 1,
- v1: messageSetReaderV1{&readerStack{
- reader: reader,
- remain: remain,
- }}}, nil
- case 2:
- mr := &messageSetReader{
- version: 2,
- v2: messageSetReaderV2{
- readerStack: &readerStack{
- reader: reader,
- remain: remain,
- },
- messageCount: 0,
- }}
- return mr, nil
- default:
- return nil, fmt.Errorf("unsupported message version %d found in fetch response", version)
- }
- }
- func (r *messageSetReaderV1) readMessage(min int64,
- key func(*bufio.Reader, int, int) (int, error),
- val func(*bufio.Reader, int, int) (int, error),
- ) (offset int64, timestamp int64, headers []Header, err error) {
- for r.readerStack != nil {
- if r.remain == 0 {
- r.readerStack = r.parent
- continue
- }
- var attributes int8
- if offset, attributes, timestamp, r.remain, err = readMessageHeader(r.reader, r.remain); err != nil {
- return
- }
- // if the message is compressed, decompress it and push a new reader
- // onto the stack.
- code := attributes & compressionCodecMask
- if code != 0 {
- var codec CompressionCodec
- if codec, err = resolveCodec(code); err != nil {
- return
- }
- // discard next four bytes...will be -1 to indicate null key
- if r.remain, err = discardN(r.reader, r.remain, 4); err != nil {
- return
- }
- // read and decompress the contained message set.
- var decompressed bytes.Buffer
- if r.remain, err = readBytesWith(r.reader, r.remain, func(r *bufio.Reader, sz, n int) (remain int, err error) {
- // x4 as a guess that the average compression ratio is near 75%
- decompressed.Grow(4 * n)
- l := io.LimitedReader{R: r, N: int64(n)}
- d := codec.NewReader(&l)
- _, err = decompressed.ReadFrom(d)
- remain = sz - (n - int(l.N))
- d.Close()
- return
- }); err != nil {
- return
- }
- // the compressed message's offset will be equal to the offset of
- // the last message in the set. within the compressed set, the
- // offsets will be relative, so we have to scan through them to
- // get the base offset. for example, if there are four compressed
- // messages at offsets 10-13, then the container message will have
- // offset 13 and the contained messages will be 0,1,2,3. the base
- // offset for the container, then is 13-3=10.
- if offset, err = extractOffset(offset, decompressed.Bytes()); err != nil {
- return
- }
- r.readerStack = &readerStack{
- // Allocate a buffer of size 0, which gets capped at 16 bytes
- // by the bufio package. We are already reading buffered data
- // here, no need to reserve another 4KB buffer.
- reader: bufio.NewReaderSize(&decompressed, 0),
- remain: decompressed.Len(),
- base: offset,
- parent: r.readerStack,
- }
- continue
- }
- // adjust the offset in case we're reading compressed messages. the
- // base will be zero otherwise.
- offset += r.base
- // When the messages are compressed kafka may return messages at an
- // earlier offset than the one that was requested, it's the client's
- // responsibility to ignore those.
- if offset < min {
- if r.remain, err = discardBytes(r.reader, r.remain); err != nil {
- return
- }
- if r.remain, err = discardBytes(r.reader, r.remain); err != nil {
- return
- }
- continue
- }
- if r.remain, err = readBytesWith(r.reader, r.remain, key); err != nil {
- return
- }
- r.remain, err = readBytesWith(r.reader, r.remain, val)
- return
- }
- err = errShortRead
- return
- }
- func (r *messageSetReaderV1) remaining() (remain int) {
- for s := r.readerStack; s != nil; s = s.parent {
- remain += s.remain
- }
- return
- }
- func (r *messageSetReaderV1) discard() (err error) {
- if r.readerStack == nil {
- return
- }
- // rewind up to the top-most reader b/c it's the only one that's doing
- // actual i/o. the rest are byte buffers that have been pushed on the stack
- // while reading compressed message sets.
- for r.parent != nil {
- r.readerStack = r.parent
- }
- r.remain, err = discardN(r.reader, r.remain, r.remain)
- return
- }
- func extractOffset(base int64, msgSet []byte) (offset int64, err error) {
- r, remain := bufio.NewReader(bytes.NewReader(msgSet)), len(msgSet)
- for remain > 0 {
- if remain, err = readInt64(r, remain, &offset); err != nil {
- return
- }
- var sz int32
- if remain, err = readInt32(r, remain, &sz); err != nil {
- return
- }
- if remain, err = discardN(r, remain, int(sz)); err != nil {
- return
- }
- }
- offset = base - offset
- return
- }
- type messageSetHeaderV2 struct {
- firstOffset int64
- length int32
- partitionLeaderEpoch int32
- magic int8
- crc int32
- batchAttributes int16
- lastOffsetDelta int32
- firstTimestamp int64
- maxTimestamp int64
- producerId int64
- producerEpoch int16
- firstSequence int32
- }
- type timestampType int8
- const (
- createTime timestampType = 0
- logAppendTime timestampType = 1
- )
- type transactionType int8
- const (
- nonTransactional transactionType = 0
- transactional transactionType = 1
- )
- type controlType int8
- const (
- nonControlMessage controlType = 0
- controlMessage controlType = 1
- )
- func (h *messageSetHeaderV2) compression() int8 {
- return int8(h.batchAttributes & 7)
- }
- func (h *messageSetHeaderV2) timestampType() timestampType {
- return timestampType((h.batchAttributes & (1 << 3)) >> 3)
- }
- func (h *messageSetHeaderV2) transactionType() transactionType {
- return transactionType((h.batchAttributes & (1 << 4)) >> 4)
- }
- func (h *messageSetHeaderV2) controlType() controlType {
- return controlType((h.batchAttributes & (1 << 5)) >> 5)
- }
- type messageSetReaderV2 struct {
- *readerStack
- messageCount int
- header messageSetHeaderV2
- }
- func (r *messageSetReaderV2) readHeader() (err error) {
- h := &r.header
- if r.remain, err = readInt64(r.reader, r.remain, &h.firstOffset); err != nil {
- return
- }
- if r.remain, err = readInt32(r.reader, r.remain, &h.length); err != nil {
- return
- }
- if r.remain, err = readInt32(r.reader, r.remain, &h.partitionLeaderEpoch); err != nil {
- return
- }
- if r.remain, err = readInt8(r.reader, r.remain, &h.magic); err != nil {
- return
- }
- if r.remain, err = readInt32(r.reader, r.remain, &h.crc); err != nil {
- return
- }
- if r.remain, err = readInt16(r.reader, r.remain, &h.batchAttributes); err != nil {
- return
- }
- if r.remain, err = readInt32(r.reader, r.remain, &h.lastOffsetDelta); err != nil {
- return
- }
- if r.remain, err = readInt64(r.reader, r.remain, &h.firstTimestamp); err != nil {
- return
- }
- if r.remain, err = readInt64(r.reader, r.remain, &h.maxTimestamp); err != nil {
- return
- }
- if r.remain, err = readInt64(r.reader, r.remain, &h.producerId); err != nil {
- return
- }
- if r.remain, err = readInt16(r.reader, r.remain, &h.producerEpoch); err != nil {
- return
- }
- if r.remain, err = readInt32(r.reader, r.remain, &h.firstSequence); err != nil {
- return
- }
- var messageCount int32
- if r.remain, err = readInt32(r.reader, r.remain, &messageCount); err != nil {
- return
- }
- r.messageCount = int(messageCount)
- return nil
- }
- func (r *messageSetReaderV2) readMessage(min int64,
- key func(*bufio.Reader, int, int) (int, error),
- val func(*bufio.Reader, int, int) (int, error),
- ) (offset int64, timestamp int64, headers []Header, err error) {
- if r.messageCount == 0 {
- if r.remain == 0 {
- if r.parent != nil {
- r.readerStack = r.parent
- }
- }
- if err = r.readHeader(); err != nil {
- return
- }
- if code := r.header.compression(); code != 0 {
- var codec CompressionCodec
- if codec, err = resolveCodec(code); err != nil {
- return
- }
- var batchRemain = int(r.header.length - 49)
- if batchRemain > r.remain {
- err = errShortRead
- return
- }
- var decompressed bytes.Buffer
- decompressed.Grow(4 * batchRemain)
- l := io.LimitedReader{R: r.reader, N: int64(batchRemain)}
- d := codec.NewReader(&l)
- _, err = decompressed.ReadFrom(d)
- r.remain = r.remain - (batchRemain - int(l.N))
- d.Close()
- if err != nil {
- return
- }
- r.readerStack = &readerStack{
- reader: bufio.NewReaderSize(&decompressed, 0),
- remain: decompressed.Len(),
- base: -1, // base is unused here
- parent: r.readerStack,
- }
- }
- }
- var length int64
- if r.remain, err = readVarInt(r.reader, r.remain, &length); err != nil {
- return
- }
- var attrs int8
- if r.remain, err = readInt8(r.reader, r.remain, &attrs); err != nil {
- return
- }
- var timestampDelta int64
- if r.remain, err = readVarInt(r.reader, r.remain, ×tampDelta); err != nil {
- return
- }
- var offsetDelta int64
- if r.remain, err = readVarInt(r.reader, r.remain, &offsetDelta); err != nil {
- return
- }
- var keyLen int64
- if r.remain, err = readVarInt(r.reader, r.remain, &keyLen); err != nil {
- return
- }
- if r.remain, err = key(r.reader, r.remain, int(keyLen)); err != nil {
- return
- }
- var valueLen int64
- if r.remain, err = readVarInt(r.reader, r.remain, &valueLen); err != nil {
- return
- }
- if r.remain, err = val(r.reader, r.remain, int(valueLen)); err != nil {
- return
- }
- var headerCount int64
- if r.remain, err = readVarInt(r.reader, r.remain, &headerCount); err != nil {
- return
- }
- headers = make([]Header, headerCount)
- for i := 0; i < int(headerCount); i++ {
- if err = r.readMessageHeader(&headers[i]); err != nil {
- return
- }
- }
- r.messageCount--
- return r.header.firstOffset + offsetDelta, r.header.firstTimestamp + timestampDelta, headers, nil
- }
- func (r *messageSetReaderV2) readMessageHeader(header *Header) (err error) {
- var keyLen int64
- if r.remain, err = readVarInt(r.reader, r.remain, &keyLen); err != nil {
- return
- }
- if header.Key, r.remain, err = readNewString(r.reader, r.remain, int(keyLen)); err != nil {
- return
- }
- var valLen int64
- if r.remain, err = readVarInt(r.reader, r.remain, &valLen); err != nil {
- return
- }
- if header.Value, r.remain, err = readNewBytes(r.reader, r.remain, int(valLen)); err != nil {
- return
- }
- return nil
- }
- func (r *messageSetReaderV2) remaining() (remain int) {
- return r.remain
- }
- func (r *messageSetReaderV2) discard() (err error) {
- r.remain, err = discardN(r.reader, r.remain, r.remain)
- return
- }
|