123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- package kafka
- import (
- "bytes"
- "time"
- )
- const recordBatchHeaderSize int32 = 0 +
- 8 + // base offset
- 4 + // batch length
- 4 + // partition leader epoch
- 1 + // magic
- 4 + // crc
- 2 + // attributes
- 4 + // last offset delta
- 8 + // first timestamp
- 8 + // max timestamp
- 8 + // producer id
- 2 + // producer epoch
- 4 + // base sequence
- 4 // msg count
- func recordBatchSize(msgs ...Message) (size int32) {
- size = recordBatchHeaderSize
- baseTime := msgs[0].Time
- for i := range msgs {
- msg := &msgs[i]
- msz := recordSize(msg, msg.Time.Sub(baseTime), int64(i))
- size += int32(msz + varIntLen(int64(msz)))
- }
- return
- }
- func compressRecordBatch(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int16, size int32, err error) {
- compressed = acquireBuffer()
- compressor := codec.NewWriter(compressed)
- wb := &writeBuffer{w: compressor}
- for i, msg := range msgs {
- wb.writeRecord(0, msgs[0].Time, int64(i), msg)
- }
- if err = compressor.Close(); err != nil {
- releaseBuffer(compressed)
- return
- }
- attributes = int16(codec.Code())
- size = recordBatchHeaderSize + int32(compressed.Len())
- return
- }
- type recordBatch struct {
- // required input parameters
- codec CompressionCodec
- attributes int16
- msgs []Message
- // parameters calculated during init
- compressed *bytes.Buffer
- size int32
- }
- func newRecordBatch(codec CompressionCodec, msgs ...Message) (r *recordBatch, err error) {
- r = &recordBatch{
- codec: codec,
- msgs: msgs,
- }
- if r.codec == nil {
- r.size = recordBatchSize(r.msgs...)
- } else {
- r.compressed, r.attributes, r.size, err = compressRecordBatch(r.codec, r.msgs...)
- }
- return
- }
- func (r *recordBatch) writeTo(wb *writeBuffer) {
- wb.writeInt32(r.size)
- baseTime := r.msgs[0].Time
- lastTime := r.msgs[len(r.msgs)-1].Time
- if r.compressed != nil {
- wb.writeRecordBatch(r.attributes, r.size, len(r.msgs), baseTime, lastTime, func(wb *writeBuffer) {
- wb.Write(r.compressed.Bytes())
- })
- releaseBuffer(r.compressed)
- } else {
- wb.writeRecordBatch(r.attributes, r.size, len(r.msgs), baseTime, lastTime, func(wb *writeBuffer) {
- for i, msg := range r.msgs {
- wb.writeRecord(0, r.msgs[0].Time, int64(i), msg)
- }
- })
- }
- }
- func recordSize(msg *Message, timestampDelta time.Duration, offsetDelta int64) int {
- return 1 + // attributes
- varIntLen(int64(milliseconds(timestampDelta))) +
- varIntLen(offsetDelta) +
- varBytesLen(msg.Key) +
- varBytesLen(msg.Value) +
- varArrayLen(len(msg.Headers), func(i int) int {
- h := &msg.Headers[i]
- return varStringLen(h.Key) + varBytesLen(h.Value)
- })
- }
|