recordbatch.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package kafka
  2. import (
  3. "bytes"
  4. "time"
  5. )
  6. const recordBatchHeaderSize int32 = 0 +
  7. 8 + // base offset
  8. 4 + // batch length
  9. 4 + // partition leader epoch
  10. 1 + // magic
  11. 4 + // crc
  12. 2 + // attributes
  13. 4 + // last offset delta
  14. 8 + // first timestamp
  15. 8 + // max timestamp
  16. 8 + // producer id
  17. 2 + // producer epoch
  18. 4 + // base sequence
  19. 4 // msg count
  20. func recordBatchSize(msgs ...Message) (size int32) {
  21. size = recordBatchHeaderSize
  22. baseTime := msgs[0].Time
  23. for i := range msgs {
  24. msg := &msgs[i]
  25. msz := recordSize(msg, msg.Time.Sub(baseTime), int64(i))
  26. size += int32(msz + varIntLen(int64(msz)))
  27. }
  28. return
  29. }
  30. func compressRecordBatch(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int16, size int32, err error) {
  31. compressed = acquireBuffer()
  32. compressor := codec.NewWriter(compressed)
  33. wb := &writeBuffer{w: compressor}
  34. for i, msg := range msgs {
  35. wb.writeRecord(0, msgs[0].Time, int64(i), msg)
  36. }
  37. if err = compressor.Close(); err != nil {
  38. releaseBuffer(compressed)
  39. return
  40. }
  41. attributes = int16(codec.Code())
  42. size = recordBatchHeaderSize + int32(compressed.Len())
  43. return
  44. }
  45. type recordBatch struct {
  46. // required input parameters
  47. codec CompressionCodec
  48. attributes int16
  49. msgs []Message
  50. // parameters calculated during init
  51. compressed *bytes.Buffer
  52. size int32
  53. }
  54. func newRecordBatch(codec CompressionCodec, msgs ...Message) (r *recordBatch, err error) {
  55. r = &recordBatch{
  56. codec: codec,
  57. msgs: msgs,
  58. }
  59. if r.codec == nil {
  60. r.size = recordBatchSize(r.msgs...)
  61. } else {
  62. r.compressed, r.attributes, r.size, err = compressRecordBatch(r.codec, r.msgs...)
  63. }
  64. return
  65. }
  66. func (r *recordBatch) writeTo(wb *writeBuffer) {
  67. wb.writeInt32(r.size)
  68. baseTime := r.msgs[0].Time
  69. lastTime := r.msgs[len(r.msgs)-1].Time
  70. if r.compressed != nil {
  71. wb.writeRecordBatch(r.attributes, r.size, len(r.msgs), baseTime, lastTime, func(wb *writeBuffer) {
  72. wb.Write(r.compressed.Bytes())
  73. })
  74. releaseBuffer(r.compressed)
  75. } else {
  76. wb.writeRecordBatch(r.attributes, r.size, len(r.msgs), baseTime, lastTime, func(wb *writeBuffer) {
  77. for i, msg := range r.msgs {
  78. wb.writeRecord(0, r.msgs[0].Time, int64(i), msg)
  79. }
  80. })
  81. }
  82. }
  83. func recordSize(msg *Message, timestampDelta time.Duration, offsetDelta int64) int {
  84. return 1 + // attributes
  85. varIntLen(int64(milliseconds(timestampDelta))) +
  86. varIntLen(offsetDelta) +
  87. varBytesLen(msg.Key) +
  88. varBytesLen(msg.Value) +
  89. varArrayLen(len(msg.Headers), func(i int) int {
  90. h := &msg.Headers[i]
  91. return varStringLen(h.Key) + varBytesLen(h.Value)
  92. })
  93. }