message.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591
  1. package kafka
  2. import (
  3. "bufio"
  4. "bytes"
  5. "fmt"
  6. "io"
  7. "time"
  8. )
  9. // Message is a data structure representing kafka messages.
  10. type Message struct {
  11. // Topic indicates which topic this message was consumed from via Reader.
  12. //
  13. // When being used with Writer, this can be used to configured the topic if
  14. // not already specified on the writer itself.
  15. Topic string
  16. // Partition is read-only and MUST NOT be set when writing messages
  17. Partition int
  18. Offset int64
  19. Key []byte
  20. Value []byte
  21. Headers []Header
  22. // If not set at the creation, Time will be automatically set when
  23. // writing the message.
  24. Time time.Time
  25. }
  26. func (msg Message) message(cw *crc32Writer) message {
  27. m := message{
  28. MagicByte: 1,
  29. Key: msg.Key,
  30. Value: msg.Value,
  31. Timestamp: timestamp(msg.Time),
  32. }
  33. if cw != nil {
  34. m.CRC = m.crc32(cw)
  35. }
  36. return m
  37. }
  38. const timestampSize = 8
  39. func (msg *Message) size() int32 {
  40. return 4 + 1 + 1 + sizeofBytes(msg.Key) + sizeofBytes(msg.Value) + timestampSize
  41. }
  42. type message struct {
  43. CRC int32
  44. MagicByte int8
  45. Attributes int8
  46. Timestamp int64
  47. Key []byte
  48. Value []byte
  49. }
  50. func (m message) crc32(cw *crc32Writer) int32 {
  51. cw.crc32 = 0
  52. cw.writeInt8(m.MagicByte)
  53. cw.writeInt8(m.Attributes)
  54. if m.MagicByte != 0 {
  55. cw.writeInt64(m.Timestamp)
  56. }
  57. cw.writeBytes(m.Key)
  58. cw.writeBytes(m.Value)
  59. return int32(cw.crc32)
  60. }
  61. func (m message) size() int32 {
  62. size := 4 + 1 + 1 + sizeofBytes(m.Key) + sizeofBytes(m.Value)
  63. if m.MagicByte != 0 {
  64. size += timestampSize
  65. }
  66. return size
  67. }
  68. func (m message) writeTo(wb *writeBuffer) {
  69. wb.writeInt32(m.CRC)
  70. wb.writeInt8(m.MagicByte)
  71. wb.writeInt8(m.Attributes)
  72. if m.MagicByte != 0 {
  73. wb.writeInt64(m.Timestamp)
  74. }
  75. wb.writeBytes(m.Key)
  76. wb.writeBytes(m.Value)
  77. }
  78. type messageSetItem struct {
  79. Offset int64
  80. MessageSize int32
  81. Message message
  82. }
  83. func (m messageSetItem) size() int32 {
  84. return 8 + 4 + m.Message.size()
  85. }
  86. func (m messageSetItem) writeTo(wb *writeBuffer) {
  87. wb.writeInt64(m.Offset)
  88. wb.writeInt32(m.MessageSize)
  89. m.Message.writeTo(wb)
  90. }
  91. type messageSet []messageSetItem
  92. func (s messageSet) size() (size int32) {
  93. for _, m := range s {
  94. size += m.size()
  95. }
  96. return
  97. }
  98. func (s messageSet) writeTo(wb *writeBuffer) {
  99. for _, m := range s {
  100. m.writeTo(wb)
  101. }
  102. }
  103. type messageSetReader struct {
  104. empty bool
  105. version int
  106. v1 messageSetReaderV1
  107. v2 messageSetReaderV2
  108. }
  109. func (r *messageSetReader) readMessage(min int64,
  110. key func(*bufio.Reader, int, int) (int, error),
  111. val func(*bufio.Reader, int, int) (int, error),
  112. ) (offset int64, timestamp int64, headers []Header, err error) {
  113. if r.empty {
  114. return 0, 0, nil, RequestTimedOut
  115. }
  116. switch r.version {
  117. case 1:
  118. return r.v1.readMessage(min, key, val)
  119. case 2:
  120. return r.v2.readMessage(min, key, val)
  121. default:
  122. panic("Invalid messageSetReader - unknown message reader version")
  123. }
  124. }
  125. func (r *messageSetReader) remaining() (remain int) {
  126. if r.empty {
  127. return 0
  128. }
  129. switch r.version {
  130. case 1:
  131. return r.v1.remaining()
  132. case 2:
  133. return r.v2.remaining()
  134. default:
  135. panic("Invalid messageSetReader - unknown message reader version")
  136. }
  137. }
  138. func (r *messageSetReader) discard() (err error) {
  139. if r.empty {
  140. return nil
  141. }
  142. switch r.version {
  143. case 1:
  144. return r.v1.discard()
  145. case 2:
  146. return r.v2.discard()
  147. default:
  148. panic("Invalid messageSetReader - unknown message reader version")
  149. }
  150. }
  151. type messageSetReaderV1 struct {
  152. *readerStack
  153. }
  154. type readerStack struct {
  155. reader *bufio.Reader
  156. remain int
  157. base int64
  158. parent *readerStack
  159. }
  160. func newMessageSetReader(reader *bufio.Reader, remain int) (*messageSetReader, error) {
  161. headerLength := 8 + 4 + 4 + 1 // offset + messageSize + crc + magicByte
  162. if headerLength > remain {
  163. return nil, errShortRead
  164. }
  165. b, err := reader.Peek(headerLength)
  166. if err != nil {
  167. return nil, err
  168. }
  169. var version int8 = int8(b[headerLength-1])
  170. switch version {
  171. case 0, 1:
  172. return &messageSetReader{
  173. version: 1,
  174. v1: messageSetReaderV1{&readerStack{
  175. reader: reader,
  176. remain: remain,
  177. }}}, nil
  178. case 2:
  179. mr := &messageSetReader{
  180. version: 2,
  181. v2: messageSetReaderV2{
  182. readerStack: &readerStack{
  183. reader: reader,
  184. remain: remain,
  185. },
  186. messageCount: 0,
  187. }}
  188. return mr, nil
  189. default:
  190. return nil, fmt.Errorf("unsupported message version %d found in fetch response", version)
  191. }
  192. }
  193. func (r *messageSetReaderV1) readMessage(min int64,
  194. key func(*bufio.Reader, int, int) (int, error),
  195. val func(*bufio.Reader, int, int) (int, error),
  196. ) (offset int64, timestamp int64, headers []Header, err error) {
  197. for r.readerStack != nil {
  198. if r.remain == 0 {
  199. r.readerStack = r.parent
  200. continue
  201. }
  202. var attributes int8
  203. if offset, attributes, timestamp, r.remain, err = readMessageHeader(r.reader, r.remain); err != nil {
  204. return
  205. }
  206. // if the message is compressed, decompress it and push a new reader
  207. // onto the stack.
  208. code := attributes & compressionCodecMask
  209. if code != 0 {
  210. var codec CompressionCodec
  211. if codec, err = resolveCodec(code); err != nil {
  212. return
  213. }
  214. // discard next four bytes...will be -1 to indicate null key
  215. if r.remain, err = discardN(r.reader, r.remain, 4); err != nil {
  216. return
  217. }
  218. // read and decompress the contained message set.
  219. var decompressed bytes.Buffer
  220. if r.remain, err = readBytesWith(r.reader, r.remain, func(r *bufio.Reader, sz, n int) (remain int, err error) {
  221. // x4 as a guess that the average compression ratio is near 75%
  222. decompressed.Grow(4 * n)
  223. l := io.LimitedReader{R: r, N: int64(n)}
  224. d := codec.NewReader(&l)
  225. _, err = decompressed.ReadFrom(d)
  226. remain = sz - (n - int(l.N))
  227. d.Close()
  228. return
  229. }); err != nil {
  230. return
  231. }
  232. // the compressed message's offset will be equal to the offset of
  233. // the last message in the set. within the compressed set, the
  234. // offsets will be relative, so we have to scan through them to
  235. // get the base offset. for example, if there are four compressed
  236. // messages at offsets 10-13, then the container message will have
  237. // offset 13 and the contained messages will be 0,1,2,3. the base
  238. // offset for the container, then is 13-3=10.
  239. if offset, err = extractOffset(offset, decompressed.Bytes()); err != nil {
  240. return
  241. }
  242. r.readerStack = &readerStack{
  243. // Allocate a buffer of size 0, which gets capped at 16 bytes
  244. // by the bufio package. We are already reading buffered data
  245. // here, no need to reserve another 4KB buffer.
  246. reader: bufio.NewReaderSize(&decompressed, 0),
  247. remain: decompressed.Len(),
  248. base: offset,
  249. parent: r.readerStack,
  250. }
  251. continue
  252. }
  253. // adjust the offset in case we're reading compressed messages. the
  254. // base will be zero otherwise.
  255. offset += r.base
  256. // When the messages are compressed kafka may return messages at an
  257. // earlier offset than the one that was requested, it's the client's
  258. // responsibility to ignore those.
  259. if offset < min {
  260. if r.remain, err = discardBytes(r.reader, r.remain); err != nil {
  261. return
  262. }
  263. if r.remain, err = discardBytes(r.reader, r.remain); err != nil {
  264. return
  265. }
  266. continue
  267. }
  268. if r.remain, err = readBytesWith(r.reader, r.remain, key); err != nil {
  269. return
  270. }
  271. r.remain, err = readBytesWith(r.reader, r.remain, val)
  272. return
  273. }
  274. err = errShortRead
  275. return
  276. }
  277. func (r *messageSetReaderV1) remaining() (remain int) {
  278. for s := r.readerStack; s != nil; s = s.parent {
  279. remain += s.remain
  280. }
  281. return
  282. }
  283. func (r *messageSetReaderV1) discard() (err error) {
  284. if r.readerStack == nil {
  285. return
  286. }
  287. // rewind up to the top-most reader b/c it's the only one that's doing
  288. // actual i/o. the rest are byte buffers that have been pushed on the stack
  289. // while reading compressed message sets.
  290. for r.parent != nil {
  291. r.readerStack = r.parent
  292. }
  293. r.remain, err = discardN(r.reader, r.remain, r.remain)
  294. return
  295. }
  296. func extractOffset(base int64, msgSet []byte) (offset int64, err error) {
  297. r, remain := bufio.NewReader(bytes.NewReader(msgSet)), len(msgSet)
  298. for remain > 0 {
  299. if remain, err = readInt64(r, remain, &offset); err != nil {
  300. return
  301. }
  302. var sz int32
  303. if remain, err = readInt32(r, remain, &sz); err != nil {
  304. return
  305. }
  306. if remain, err = discardN(r, remain, int(sz)); err != nil {
  307. return
  308. }
  309. }
  310. offset = base - offset
  311. return
  312. }
  313. type messageSetHeaderV2 struct {
  314. firstOffset int64
  315. length int32
  316. partitionLeaderEpoch int32
  317. magic int8
  318. crc int32
  319. batchAttributes int16
  320. lastOffsetDelta int32
  321. firstTimestamp int64
  322. maxTimestamp int64
  323. producerId int64
  324. producerEpoch int16
  325. firstSequence int32
  326. }
  327. type timestampType int8
  328. const (
  329. createTime timestampType = 0
  330. logAppendTime timestampType = 1
  331. )
  332. type transactionType int8
  333. const (
  334. nonTransactional transactionType = 0
  335. transactional transactionType = 1
  336. )
  337. type controlType int8
  338. const (
  339. nonControlMessage controlType = 0
  340. controlMessage controlType = 1
  341. )
  342. func (h *messageSetHeaderV2) compression() int8 {
  343. return int8(h.batchAttributes & 7)
  344. }
  345. func (h *messageSetHeaderV2) timestampType() timestampType {
  346. return timestampType((h.batchAttributes & (1 << 3)) >> 3)
  347. }
  348. func (h *messageSetHeaderV2) transactionType() transactionType {
  349. return transactionType((h.batchAttributes & (1 << 4)) >> 4)
  350. }
  351. func (h *messageSetHeaderV2) controlType() controlType {
  352. return controlType((h.batchAttributes & (1 << 5)) >> 5)
  353. }
  354. type messageSetReaderV2 struct {
  355. *readerStack
  356. messageCount int
  357. header messageSetHeaderV2
  358. }
  359. func (r *messageSetReaderV2) readHeader() (err error) {
  360. h := &r.header
  361. if r.remain, err = readInt64(r.reader, r.remain, &h.firstOffset); err != nil {
  362. return
  363. }
  364. if r.remain, err = readInt32(r.reader, r.remain, &h.length); err != nil {
  365. return
  366. }
  367. if r.remain, err = readInt32(r.reader, r.remain, &h.partitionLeaderEpoch); err != nil {
  368. return
  369. }
  370. if r.remain, err = readInt8(r.reader, r.remain, &h.magic); err != nil {
  371. return
  372. }
  373. if r.remain, err = readInt32(r.reader, r.remain, &h.crc); err != nil {
  374. return
  375. }
  376. if r.remain, err = readInt16(r.reader, r.remain, &h.batchAttributes); err != nil {
  377. return
  378. }
  379. if r.remain, err = readInt32(r.reader, r.remain, &h.lastOffsetDelta); err != nil {
  380. return
  381. }
  382. if r.remain, err = readInt64(r.reader, r.remain, &h.firstTimestamp); err != nil {
  383. return
  384. }
  385. if r.remain, err = readInt64(r.reader, r.remain, &h.maxTimestamp); err != nil {
  386. return
  387. }
  388. if r.remain, err = readInt64(r.reader, r.remain, &h.producerId); err != nil {
  389. return
  390. }
  391. if r.remain, err = readInt16(r.reader, r.remain, &h.producerEpoch); err != nil {
  392. return
  393. }
  394. if r.remain, err = readInt32(r.reader, r.remain, &h.firstSequence); err != nil {
  395. return
  396. }
  397. var messageCount int32
  398. if r.remain, err = readInt32(r.reader, r.remain, &messageCount); err != nil {
  399. return
  400. }
  401. r.messageCount = int(messageCount)
  402. return nil
  403. }
  404. func (r *messageSetReaderV2) readMessage(min int64,
  405. key func(*bufio.Reader, int, int) (int, error),
  406. val func(*bufio.Reader, int, int) (int, error),
  407. ) (offset int64, timestamp int64, headers []Header, err error) {
  408. if r.messageCount == 0 {
  409. if r.remain == 0 {
  410. if r.parent != nil {
  411. r.readerStack = r.parent
  412. }
  413. }
  414. if err = r.readHeader(); err != nil {
  415. return
  416. }
  417. if code := r.header.compression(); code != 0 {
  418. var codec CompressionCodec
  419. if codec, err = resolveCodec(code); err != nil {
  420. return
  421. }
  422. var batchRemain = int(r.header.length - 49)
  423. if batchRemain > r.remain {
  424. err = errShortRead
  425. return
  426. }
  427. var decompressed bytes.Buffer
  428. decompressed.Grow(4 * batchRemain)
  429. l := io.LimitedReader{R: r.reader, N: int64(batchRemain)}
  430. d := codec.NewReader(&l)
  431. _, err = decompressed.ReadFrom(d)
  432. r.remain = r.remain - (batchRemain - int(l.N))
  433. d.Close()
  434. if err != nil {
  435. return
  436. }
  437. r.readerStack = &readerStack{
  438. reader: bufio.NewReaderSize(&decompressed, 0),
  439. remain: decompressed.Len(),
  440. base: -1, // base is unused here
  441. parent: r.readerStack,
  442. }
  443. }
  444. }
  445. var length int64
  446. if r.remain, err = readVarInt(r.reader, r.remain, &length); err != nil {
  447. return
  448. }
  449. var attrs int8
  450. if r.remain, err = readInt8(r.reader, r.remain, &attrs); err != nil {
  451. return
  452. }
  453. var timestampDelta int64
  454. if r.remain, err = readVarInt(r.reader, r.remain, &timestampDelta); err != nil {
  455. return
  456. }
  457. var offsetDelta int64
  458. if r.remain, err = readVarInt(r.reader, r.remain, &offsetDelta); err != nil {
  459. return
  460. }
  461. var keyLen int64
  462. if r.remain, err = readVarInt(r.reader, r.remain, &keyLen); err != nil {
  463. return
  464. }
  465. if r.remain, err = key(r.reader, r.remain, int(keyLen)); err != nil {
  466. return
  467. }
  468. var valueLen int64
  469. if r.remain, err = readVarInt(r.reader, r.remain, &valueLen); err != nil {
  470. return
  471. }
  472. if r.remain, err = val(r.reader, r.remain, int(valueLen)); err != nil {
  473. return
  474. }
  475. var headerCount int64
  476. if r.remain, err = readVarInt(r.reader, r.remain, &headerCount); err != nil {
  477. return
  478. }
  479. headers = make([]Header, headerCount)
  480. for i := 0; i < int(headerCount); i++ {
  481. if err = r.readMessageHeader(&headers[i]); err != nil {
  482. return
  483. }
  484. }
  485. r.messageCount--
  486. return r.header.firstOffset + offsetDelta, r.header.firstTimestamp + timestampDelta, headers, nil
  487. }
  488. func (r *messageSetReaderV2) readMessageHeader(header *Header) (err error) {
  489. var keyLen int64
  490. if r.remain, err = readVarInt(r.reader, r.remain, &keyLen); err != nil {
  491. return
  492. }
  493. if header.Key, r.remain, err = readNewString(r.reader, r.remain, int(keyLen)); err != nil {
  494. return
  495. }
  496. var valLen int64
  497. if r.remain, err = readVarInt(r.reader, r.remain, &valLen); err != nil {
  498. return
  499. }
  500. if header.Value, r.remain, err = readNewBytes(r.reader, r.remain, int(valLen)); err != nil {
  501. return
  502. }
  503. return nil
  504. }
  505. func (r *messageSetReaderV2) remaining() (remain int) {
  506. return r.remain
  507. }
  508. func (r *messageSetReaderV2) discard() (err error) {
  509. r.remain, err = discardN(r.reader, r.remain, r.remain)
  510. return
  511. }