123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614 |
- package kafka
- import (
- "bytes"
- "encoding/binary"
- "fmt"
- "hash/crc32"
- "io"
- "time"
- )
- type writeBuffer struct {
- w io.Writer
- b [16]byte
- }
- func (wb *writeBuffer) writeInt8(i int8) {
- wb.b[0] = byte(i)
- wb.Write(wb.b[:1])
- }
- func (wb *writeBuffer) writeInt16(i int16) {
- binary.BigEndian.PutUint16(wb.b[:2], uint16(i))
- wb.Write(wb.b[:2])
- }
- func (wb *writeBuffer) writeInt32(i int32) {
- binary.BigEndian.PutUint32(wb.b[:4], uint32(i))
- wb.Write(wb.b[:4])
- }
- func (wb *writeBuffer) writeInt64(i int64) {
- binary.BigEndian.PutUint64(wb.b[:8], uint64(i))
- wb.Write(wb.b[:8])
- }
- func (wb *writeBuffer) writeVarInt(i int64) {
- u := uint64((i << 1) ^ (i >> 63))
- n := 0
- for u >= 0x80 && n < len(wb.b) {
- wb.b[n] = byte(u) | 0x80
- u >>= 7
- n++
- }
- if n < len(wb.b) {
- wb.b[n] = byte(u)
- n++
- }
- wb.Write(wb.b[:n])
- }
- func (wb *writeBuffer) writeString(s string) {
- wb.writeInt16(int16(len(s)))
- wb.WriteString(s)
- }
- func (wb *writeBuffer) writeVarString(s string) {
- wb.writeVarInt(int64(len(s)))
- wb.WriteString(s)
- }
- func (wb *writeBuffer) writeNullableString(s *string) {
- if s == nil {
- wb.writeInt16(-1)
- } else {
- wb.writeString(*s)
- }
- }
- func (wb *writeBuffer) writeBytes(b []byte) {
- n := len(b)
- if b == nil {
- n = -1
- }
- wb.writeInt32(int32(n))
- wb.Write(b)
- }
- func (wb *writeBuffer) writeVarBytes(b []byte) {
- if b != nil {
- wb.writeVarInt(int64(len(b)))
- wb.Write(b)
- } else {
- //-1 is used to indicate nil key
- wb.writeVarInt(-1)
- }
- }
- func (wb *writeBuffer) writeBool(b bool) {
- v := int8(0)
- if b {
- v = 1
- }
- wb.writeInt8(v)
- }
- func (wb *writeBuffer) writeArrayLen(n int) {
- wb.writeInt32(int32(n))
- }
- func (wb *writeBuffer) writeArray(n int, f func(int)) {
- wb.writeArrayLen(n)
- for i := 0; i < n; i++ {
- f(i)
- }
- }
- func (wb *writeBuffer) writeVarArray(n int, f func(int)) {
- wb.writeVarInt(int64(n))
- for i := 0; i < n; i++ {
- f(i)
- }
- }
- func (wb *writeBuffer) writeStringArray(a []string) {
- wb.writeArray(len(a), func(i int) { wb.writeString(a[i]) })
- }
- func (wb *writeBuffer) writeInt32Array(a []int32) {
- wb.writeArray(len(a), func(i int) { wb.writeInt32(a[i]) })
- }
- func (wb *writeBuffer) write(a interface{}) {
- switch v := a.(type) {
- case int8:
- wb.writeInt8(v)
- case int16:
- wb.writeInt16(v)
- case int32:
- wb.writeInt32(v)
- case int64:
- wb.writeInt64(v)
- case string:
- wb.writeString(v)
- case []byte:
- wb.writeBytes(v)
- case bool:
- wb.writeBool(v)
- case writable:
- v.writeTo(wb)
- default:
- panic(fmt.Sprintf("unsupported type: %T", a))
- }
- }
- func (wb *writeBuffer) Write(b []byte) (int, error) {
- return wb.w.Write(b)
- }
- func (wb *writeBuffer) WriteString(s string) (int, error) {
- return io.WriteString(wb.w, s)
- }
- func (wb *writeBuffer) Flush() error {
- if x, ok := wb.w.(interface{ Flush() error }); ok {
- return x.Flush()
- }
- return nil
- }
- type writable interface {
- writeTo(*writeBuffer)
- }
- func (wb *writeBuffer) writeFetchRequestV2(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration) error {
- h := requestHeader{
- ApiKey: int16(fetch),
- ApiVersion: int16(v2),
- CorrelationID: correlationID,
- ClientID: clientID,
- }
- h.Size = (h.size() - 4) +
- 4 + // replica ID
- 4 + // max wait time
- 4 + // min bytes
- 4 + // topic array length
- sizeofString(topic) +
- 4 + // partition array length
- 4 + // partition
- 8 + // offset
- 4 // max bytes
- h.writeTo(wb)
- wb.writeInt32(-1) // replica ID
- wb.writeInt32(milliseconds(maxWait))
- wb.writeInt32(int32(minBytes))
- // topic array
- wb.writeArrayLen(1)
- wb.writeString(topic)
- // partition array
- wb.writeArrayLen(1)
- wb.writeInt32(partition)
- wb.writeInt64(offset)
- wb.writeInt32(int32(maxBytes))
- return wb.Flush()
- }
- func (wb *writeBuffer) writeFetchRequestV5(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error {
- h := requestHeader{
- ApiKey: int16(fetch),
- ApiVersion: int16(v5),
- CorrelationID: correlationID,
- ClientID: clientID,
- }
- h.Size = (h.size() - 4) +
- 4 + // replica ID
- 4 + // max wait time
- 4 + // min bytes
- 4 + // max bytes
- 1 + // isolation level
- 4 + // topic array length
- sizeofString(topic) +
- 4 + // partition array length
- 4 + // partition
- 8 + // offset
- 8 + // log start offset
- 4 // max bytes
- h.writeTo(wb)
- wb.writeInt32(-1) // replica ID
- wb.writeInt32(milliseconds(maxWait))
- wb.writeInt32(int32(minBytes))
- wb.writeInt32(int32(maxBytes))
- wb.writeInt8(isolationLevel) // isolation level 0 - read uncommitted
- // topic array
- wb.writeArrayLen(1)
- wb.writeString(topic)
- // partition array
- wb.writeArrayLen(1)
- wb.writeInt32(partition)
- wb.writeInt64(offset)
- wb.writeInt64(int64(0)) // log start offset only used when is sent by follower
- wb.writeInt32(int32(maxBytes))
- return wb.Flush()
- }
- func (wb *writeBuffer) writeFetchRequestV10(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error {
- h := requestHeader{
- ApiKey: int16(fetch),
- ApiVersion: int16(v10),
- CorrelationID: correlationID,
- ClientID: clientID,
- }
- h.Size = (h.size() - 4) +
- 4 + // replica ID
- 4 + // max wait time
- 4 + // min bytes
- 4 + // max bytes
- 1 + // isolation level
- 4 + // session ID
- 4 + // session epoch
- 4 + // topic array length
- sizeofString(topic) +
- 4 + // partition array length
- 4 + // partition
- 4 + // current leader epoch
- 8 + // fetch offset
- 8 + // log start offset
- 4 + // partition max bytes
- 4 // forgotten topics data
- h.writeTo(wb)
- wb.writeInt32(-1) // replica ID
- wb.writeInt32(milliseconds(maxWait))
- wb.writeInt32(int32(minBytes))
- wb.writeInt32(int32(maxBytes))
- wb.writeInt8(isolationLevel) // isolation level 0 - read uncommitted
- wb.writeInt32(0) //FIXME
- wb.writeInt32(-1) //FIXME
- // topic array
- wb.writeArrayLen(1)
- wb.writeString(topic)
- // partition array
- wb.writeArrayLen(1)
- wb.writeInt32(partition)
- wb.writeInt32(-1) //FIXME
- wb.writeInt64(offset)
- wb.writeInt64(int64(0)) // log start offset only used when is sent by follower
- wb.writeInt32(int32(maxBytes))
- // forgotten topics array
- wb.writeArrayLen(0) // forgotten topics not supported yet
- return wb.Flush()
- }
- func (wb *writeBuffer) writeListOffsetRequestV1(correlationID int32, clientID, topic string, partition int32, time int64) error {
- h := requestHeader{
- ApiKey: int16(listOffsets),
- ApiVersion: int16(v1),
- CorrelationID: correlationID,
- ClientID: clientID,
- }
- h.Size = (h.size() - 4) +
- 4 + // replica ID
- 4 + // topic array length
- sizeofString(topic) + // topic
- 4 + // partition array length
- 4 + // partition
- 8 // time
- h.writeTo(wb)
- wb.writeInt32(-1) // replica ID
- // topic array
- wb.writeArrayLen(1)
- wb.writeString(topic)
- // partition array
- wb.writeArrayLen(1)
- wb.writeInt32(partition)
- wb.writeInt64(time)
- return wb.Flush()
- }
- func (wb *writeBuffer) writeProduceRequestV2(codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message) (err error) {
- var size int32
- var attributes int8
- var compressed *bytes.Buffer
- if codec == nil {
- size = messageSetSize(msgs...)
- } else {
- compressed, attributes, size, err = compressMessageSet(codec, msgs...)
- if err != nil {
- return
- }
- msgs = []Message{{Value: compressed.Bytes()}}
- }
- h := requestHeader{
- ApiKey: int16(produce),
- ApiVersion: int16(v2),
- CorrelationID: correlationID,
- ClientID: clientID,
- }
- h.Size = (h.size() - 4) +
- 2 + // required acks
- 4 + // timeout
- 4 + // topic array length
- sizeofString(topic) + // topic
- 4 + // partition array length
- 4 + // partition
- 4 + // message set size
- size
- h.writeTo(wb)
- wb.writeInt16(requiredAcks) // required acks
- wb.writeInt32(milliseconds(timeout))
- // topic array
- wb.writeArrayLen(1)
- wb.writeString(topic)
- // partition array
- wb.writeArrayLen(1)
- wb.writeInt32(partition)
- wb.writeInt32(size)
- cw := &crc32Writer{table: crc32.IEEETable}
- for _, msg := range msgs {
- wb.writeMessage(msg.Offset, attributes, msg.Time, msg.Key, msg.Value, cw)
- }
- releaseBuffer(compressed)
- return wb.Flush()
- }
- func (wb *writeBuffer) writeProduceRequestV3(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) {
- h := requestHeader{
- ApiKey: int16(produce),
- ApiVersion: int16(v3),
- CorrelationID: correlationID,
- ClientID: clientID,
- }
- h.Size = (h.size() - 4) +
- sizeofNullableString(transactionalID) +
- 2 + // required acks
- 4 + // timeout
- 4 + // topic array length
- sizeofString(topic) + // topic
- 4 + // partition array length
- 4 + // partition
- 4 + // message set size
- recordBatch.size
- h.writeTo(wb)
- wb.writeNullableString(transactionalID)
- wb.writeInt16(requiredAcks) // required acks
- wb.writeInt32(milliseconds(timeout))
- // topic array
- wb.writeArrayLen(1)
- wb.writeString(topic)
- // partition array
- wb.writeArrayLen(1)
- wb.writeInt32(partition)
- recordBatch.writeTo(wb)
- return wb.Flush()
- }
- func (wb *writeBuffer) writeProduceRequestV7(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) {
- h := requestHeader{
- ApiKey: int16(produce),
- ApiVersion: int16(v7),
- CorrelationID: correlationID,
- ClientID: clientID,
- }
- h.Size = (h.size() - 4) +
- sizeofNullableString(transactionalID) +
- 2 + // required acks
- 4 + // timeout
- 4 + // topic array length
- sizeofString(topic) + // topic
- 4 + // partition array length
- 4 + // partition
- 4 + // message set size
- recordBatch.size
- h.writeTo(wb)
- wb.writeNullableString(transactionalID)
- wb.writeInt16(requiredAcks) // required acks
- wb.writeInt32(milliseconds(timeout))
- // topic array
- wb.writeArrayLen(1)
- wb.writeString(topic)
- // partition array
- wb.writeArrayLen(1)
- wb.writeInt32(partition)
- recordBatch.writeTo(wb)
- return wb.Flush()
- }
- func (wb *writeBuffer) writeRecordBatch(attributes int16, size int32, count int, baseTime, lastTime time.Time, write func(*writeBuffer)) {
- var (
- baseTimestamp = timestamp(baseTime)
- lastTimestamp = timestamp(lastTime)
- lastOffsetDelta = int32(count - 1)
- producerID = int64(-1) // default producer id for now
- producerEpoch = int16(-1) // default producer epoch for now
- baseSequence = int32(-1) // default base sequence
- recordCount = int32(count) // record count
- writerBackup = wb.w
- )
- // dry run to compute the checksum
- cw := &crc32Writer{table: crc32.MakeTable(crc32.Castagnoli)}
- wb.w = cw
- cw.writeInt16(attributes) // attributes, timestamp type 0 - create time, not part of a transaction, no control messages
- cw.writeInt32(lastOffsetDelta)
- cw.writeInt64(baseTimestamp)
- cw.writeInt64(lastTimestamp)
- cw.writeInt64(producerID)
- cw.writeInt16(producerEpoch)
- cw.writeInt32(baseSequence)
- cw.writeInt32(recordCount)
- write(wb)
- wb.w = writerBackup
- // actual write to the output buffer
- wb.writeInt64(int64(0))
- wb.writeInt32(int32(size - 12)) // 12 = batch length + base offset sizes
- wb.writeInt32(-1) // partition leader epoch
- wb.writeInt8(2) // magic byte
- wb.writeInt32(int32(cw.crc32))
- wb.writeInt16(attributes)
- wb.writeInt32(lastOffsetDelta)
- wb.writeInt64(baseTimestamp)
- wb.writeInt64(lastTimestamp)
- wb.writeInt64(producerID)
- wb.writeInt16(producerEpoch)
- wb.writeInt32(baseSequence)
- wb.writeInt32(recordCount)
- write(wb)
- }
- func compressMessageSet(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int8, size int32, err error) {
- compressed = acquireBuffer()
- compressor := codec.NewWriter(compressed)
- wb := &writeBuffer{w: compressor}
- cw := &crc32Writer{table: crc32.IEEETable}
- for offset, msg := range msgs {
- wb.writeMessage(int64(offset), 0, msg.Time, msg.Key, msg.Value, cw)
- }
- if err = compressor.Close(); err != nil {
- releaseBuffer(compressed)
- return
- }
- attributes = codec.Code()
- size = messageSetSize(Message{Value: compressed.Bytes()})
- return
- }
- func (wb *writeBuffer) writeMessage(offset int64, attributes int8, time time.Time, key, value []byte, cw *crc32Writer) {
- const magicByte = 1 // compatible with kafka 0.10.0.0+
- timestamp := timestamp(time)
- size := messageSize(key, value)
- // dry run to compute the checksum
- cw.crc32 = 0
- cw.writeInt8(magicByte)
- cw.writeInt8(attributes)
- cw.writeInt64(timestamp)
- cw.writeBytes(key)
- cw.writeBytes(value)
- // actual write to the output buffer
- wb.writeInt64(offset)
- wb.writeInt32(size)
- wb.writeInt32(int32(cw.crc32))
- wb.writeInt8(magicByte)
- wb.writeInt8(attributes)
- wb.writeInt64(timestamp)
- wb.writeBytes(key)
- wb.writeBytes(value)
- }
- // Messages with magic >2 are called records. This method writes messages using message format 2.
- func (wb *writeBuffer) writeRecord(attributes int8, baseTime time.Time, offset int64, msg Message) {
- timestampDelta := msg.Time.Sub(baseTime)
- offsetDelta := int64(offset)
- wb.writeVarInt(int64(recordSize(&msg, timestampDelta, offsetDelta)))
- wb.writeInt8(attributes)
- wb.writeVarInt(int64(milliseconds(timestampDelta)))
- wb.writeVarInt(offsetDelta)
- wb.writeVarBytes(msg.Key)
- wb.writeVarBytes(msg.Value)
- wb.writeVarArray(len(msg.Headers), func(i int) {
- h := &msg.Headers[i]
- wb.writeVarString(h.Key)
- wb.writeVarBytes(h.Value)
- })
- }
- func varIntLen(i int64) int {
- u := uint64((i << 1) ^ (i >> 63)) // zig-zag encoding
- n := 0
- for u >= 0x80 {
- u >>= 7
- n++
- }
- return n + 1
- }
- func varBytesLen(b []byte) int {
- return varIntLen(int64(len(b))) + len(b)
- }
- func varStringLen(s string) int {
- return varIntLen(int64(len(s))) + len(s)
- }
- func varArrayLen(n int, f func(int) int) int {
- size := varIntLen(int64(n))
- for i := 0; i < n; i++ {
- size += f(i)
- }
- return size
- }
- func messageSize(key, value []byte) int32 {
- return 4 + // crc
- 1 + // magic byte
- 1 + // attributes
- 8 + // timestamp
- sizeofBytes(key) +
- sizeofBytes(value)
- }
- func messageSetSize(msgs ...Message) (size int32) {
- for _, msg := range msgs {
- size += 8 + // offset
- 4 + // message size
- 4 + // crc
- 1 + // magic byte
- 1 + // attributes
- 8 + // timestamp
- sizeofBytes(msg.Key) +
- sizeofBytes(msg.Value)
- }
- return
- }
|