write.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. package kafka
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "hash/crc32"
  7. "io"
  8. "time"
  9. )
  10. type writeBuffer struct {
  11. w io.Writer
  12. b [16]byte
  13. }
  14. func (wb *writeBuffer) writeInt8(i int8) {
  15. wb.b[0] = byte(i)
  16. wb.Write(wb.b[:1])
  17. }
  18. func (wb *writeBuffer) writeInt16(i int16) {
  19. binary.BigEndian.PutUint16(wb.b[:2], uint16(i))
  20. wb.Write(wb.b[:2])
  21. }
  22. func (wb *writeBuffer) writeInt32(i int32) {
  23. binary.BigEndian.PutUint32(wb.b[:4], uint32(i))
  24. wb.Write(wb.b[:4])
  25. }
  26. func (wb *writeBuffer) writeInt64(i int64) {
  27. binary.BigEndian.PutUint64(wb.b[:8], uint64(i))
  28. wb.Write(wb.b[:8])
  29. }
  30. func (wb *writeBuffer) writeVarInt(i int64) {
  31. u := uint64((i << 1) ^ (i >> 63))
  32. n := 0
  33. for u >= 0x80 && n < len(wb.b) {
  34. wb.b[n] = byte(u) | 0x80
  35. u >>= 7
  36. n++
  37. }
  38. if n < len(wb.b) {
  39. wb.b[n] = byte(u)
  40. n++
  41. }
  42. wb.Write(wb.b[:n])
  43. }
  44. func (wb *writeBuffer) writeString(s string) {
  45. wb.writeInt16(int16(len(s)))
  46. wb.WriteString(s)
  47. }
  48. func (wb *writeBuffer) writeVarString(s string) {
  49. wb.writeVarInt(int64(len(s)))
  50. wb.WriteString(s)
  51. }
  52. func (wb *writeBuffer) writeNullableString(s *string) {
  53. if s == nil {
  54. wb.writeInt16(-1)
  55. } else {
  56. wb.writeString(*s)
  57. }
  58. }
  59. func (wb *writeBuffer) writeBytes(b []byte) {
  60. n := len(b)
  61. if b == nil {
  62. n = -1
  63. }
  64. wb.writeInt32(int32(n))
  65. wb.Write(b)
  66. }
  67. func (wb *writeBuffer) writeVarBytes(b []byte) {
  68. if b != nil {
  69. wb.writeVarInt(int64(len(b)))
  70. wb.Write(b)
  71. } else {
  72. //-1 is used to indicate nil key
  73. wb.writeVarInt(-1)
  74. }
  75. }
  76. func (wb *writeBuffer) writeBool(b bool) {
  77. v := int8(0)
  78. if b {
  79. v = 1
  80. }
  81. wb.writeInt8(v)
  82. }
  83. func (wb *writeBuffer) writeArrayLen(n int) {
  84. wb.writeInt32(int32(n))
  85. }
  86. func (wb *writeBuffer) writeArray(n int, f func(int)) {
  87. wb.writeArrayLen(n)
  88. for i := 0; i < n; i++ {
  89. f(i)
  90. }
  91. }
  92. func (wb *writeBuffer) writeVarArray(n int, f func(int)) {
  93. wb.writeVarInt(int64(n))
  94. for i := 0; i < n; i++ {
  95. f(i)
  96. }
  97. }
  98. func (wb *writeBuffer) writeStringArray(a []string) {
  99. wb.writeArray(len(a), func(i int) { wb.writeString(a[i]) })
  100. }
  101. func (wb *writeBuffer) writeInt32Array(a []int32) {
  102. wb.writeArray(len(a), func(i int) { wb.writeInt32(a[i]) })
  103. }
  104. func (wb *writeBuffer) write(a interface{}) {
  105. switch v := a.(type) {
  106. case int8:
  107. wb.writeInt8(v)
  108. case int16:
  109. wb.writeInt16(v)
  110. case int32:
  111. wb.writeInt32(v)
  112. case int64:
  113. wb.writeInt64(v)
  114. case string:
  115. wb.writeString(v)
  116. case []byte:
  117. wb.writeBytes(v)
  118. case bool:
  119. wb.writeBool(v)
  120. case writable:
  121. v.writeTo(wb)
  122. default:
  123. panic(fmt.Sprintf("unsupported type: %T", a))
  124. }
  125. }
  126. func (wb *writeBuffer) Write(b []byte) (int, error) {
  127. return wb.w.Write(b)
  128. }
  129. func (wb *writeBuffer) WriteString(s string) (int, error) {
  130. return io.WriteString(wb.w, s)
  131. }
  132. func (wb *writeBuffer) Flush() error {
  133. if x, ok := wb.w.(interface{ Flush() error }); ok {
  134. return x.Flush()
  135. }
  136. return nil
  137. }
  138. type writable interface {
  139. writeTo(*writeBuffer)
  140. }
  141. func (wb *writeBuffer) writeFetchRequestV2(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration) error {
  142. h := requestHeader{
  143. ApiKey: int16(fetch),
  144. ApiVersion: int16(v2),
  145. CorrelationID: correlationID,
  146. ClientID: clientID,
  147. }
  148. h.Size = (h.size() - 4) +
  149. 4 + // replica ID
  150. 4 + // max wait time
  151. 4 + // min bytes
  152. 4 + // topic array length
  153. sizeofString(topic) +
  154. 4 + // partition array length
  155. 4 + // partition
  156. 8 + // offset
  157. 4 // max bytes
  158. h.writeTo(wb)
  159. wb.writeInt32(-1) // replica ID
  160. wb.writeInt32(milliseconds(maxWait))
  161. wb.writeInt32(int32(minBytes))
  162. // topic array
  163. wb.writeArrayLen(1)
  164. wb.writeString(topic)
  165. // partition array
  166. wb.writeArrayLen(1)
  167. wb.writeInt32(partition)
  168. wb.writeInt64(offset)
  169. wb.writeInt32(int32(maxBytes))
  170. return wb.Flush()
  171. }
  172. func (wb *writeBuffer) writeFetchRequestV5(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error {
  173. h := requestHeader{
  174. ApiKey: int16(fetch),
  175. ApiVersion: int16(v5),
  176. CorrelationID: correlationID,
  177. ClientID: clientID,
  178. }
  179. h.Size = (h.size() - 4) +
  180. 4 + // replica ID
  181. 4 + // max wait time
  182. 4 + // min bytes
  183. 4 + // max bytes
  184. 1 + // isolation level
  185. 4 + // topic array length
  186. sizeofString(topic) +
  187. 4 + // partition array length
  188. 4 + // partition
  189. 8 + // offset
  190. 8 + // log start offset
  191. 4 // max bytes
  192. h.writeTo(wb)
  193. wb.writeInt32(-1) // replica ID
  194. wb.writeInt32(milliseconds(maxWait))
  195. wb.writeInt32(int32(minBytes))
  196. wb.writeInt32(int32(maxBytes))
  197. wb.writeInt8(isolationLevel) // isolation level 0 - read uncommitted
  198. // topic array
  199. wb.writeArrayLen(1)
  200. wb.writeString(topic)
  201. // partition array
  202. wb.writeArrayLen(1)
  203. wb.writeInt32(partition)
  204. wb.writeInt64(offset)
  205. wb.writeInt64(int64(0)) // log start offset only used when is sent by follower
  206. wb.writeInt32(int32(maxBytes))
  207. return wb.Flush()
  208. }
  209. func (wb *writeBuffer) writeFetchRequestV10(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error {
  210. h := requestHeader{
  211. ApiKey: int16(fetch),
  212. ApiVersion: int16(v10),
  213. CorrelationID: correlationID,
  214. ClientID: clientID,
  215. }
  216. h.Size = (h.size() - 4) +
  217. 4 + // replica ID
  218. 4 + // max wait time
  219. 4 + // min bytes
  220. 4 + // max bytes
  221. 1 + // isolation level
  222. 4 + // session ID
  223. 4 + // session epoch
  224. 4 + // topic array length
  225. sizeofString(topic) +
  226. 4 + // partition array length
  227. 4 + // partition
  228. 4 + // current leader epoch
  229. 8 + // fetch offset
  230. 8 + // log start offset
  231. 4 + // partition max bytes
  232. 4 // forgotten topics data
  233. h.writeTo(wb)
  234. wb.writeInt32(-1) // replica ID
  235. wb.writeInt32(milliseconds(maxWait))
  236. wb.writeInt32(int32(minBytes))
  237. wb.writeInt32(int32(maxBytes))
  238. wb.writeInt8(isolationLevel) // isolation level 0 - read uncommitted
  239. wb.writeInt32(0) //FIXME
  240. wb.writeInt32(-1) //FIXME
  241. // topic array
  242. wb.writeArrayLen(1)
  243. wb.writeString(topic)
  244. // partition array
  245. wb.writeArrayLen(1)
  246. wb.writeInt32(partition)
  247. wb.writeInt32(-1) //FIXME
  248. wb.writeInt64(offset)
  249. wb.writeInt64(int64(0)) // log start offset only used when is sent by follower
  250. wb.writeInt32(int32(maxBytes))
  251. // forgotten topics array
  252. wb.writeArrayLen(0) // forgotten topics not supported yet
  253. return wb.Flush()
  254. }
  255. func (wb *writeBuffer) writeListOffsetRequestV1(correlationID int32, clientID, topic string, partition int32, time int64) error {
  256. h := requestHeader{
  257. ApiKey: int16(listOffsets),
  258. ApiVersion: int16(v1),
  259. CorrelationID: correlationID,
  260. ClientID: clientID,
  261. }
  262. h.Size = (h.size() - 4) +
  263. 4 + // replica ID
  264. 4 + // topic array length
  265. sizeofString(topic) + // topic
  266. 4 + // partition array length
  267. 4 + // partition
  268. 8 // time
  269. h.writeTo(wb)
  270. wb.writeInt32(-1) // replica ID
  271. // topic array
  272. wb.writeArrayLen(1)
  273. wb.writeString(topic)
  274. // partition array
  275. wb.writeArrayLen(1)
  276. wb.writeInt32(partition)
  277. wb.writeInt64(time)
  278. return wb.Flush()
  279. }
  280. func (wb *writeBuffer) writeProduceRequestV2(codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message) (err error) {
  281. var size int32
  282. var attributes int8
  283. var compressed *bytes.Buffer
  284. if codec == nil {
  285. size = messageSetSize(msgs...)
  286. } else {
  287. compressed, attributes, size, err = compressMessageSet(codec, msgs...)
  288. if err != nil {
  289. return
  290. }
  291. msgs = []Message{{Value: compressed.Bytes()}}
  292. }
  293. h := requestHeader{
  294. ApiKey: int16(produce),
  295. ApiVersion: int16(v2),
  296. CorrelationID: correlationID,
  297. ClientID: clientID,
  298. }
  299. h.Size = (h.size() - 4) +
  300. 2 + // required acks
  301. 4 + // timeout
  302. 4 + // topic array length
  303. sizeofString(topic) + // topic
  304. 4 + // partition array length
  305. 4 + // partition
  306. 4 + // message set size
  307. size
  308. h.writeTo(wb)
  309. wb.writeInt16(requiredAcks) // required acks
  310. wb.writeInt32(milliseconds(timeout))
  311. // topic array
  312. wb.writeArrayLen(1)
  313. wb.writeString(topic)
  314. // partition array
  315. wb.writeArrayLen(1)
  316. wb.writeInt32(partition)
  317. wb.writeInt32(size)
  318. cw := &crc32Writer{table: crc32.IEEETable}
  319. for _, msg := range msgs {
  320. wb.writeMessage(msg.Offset, attributes, msg.Time, msg.Key, msg.Value, cw)
  321. }
  322. releaseBuffer(compressed)
  323. return wb.Flush()
  324. }
  325. func (wb *writeBuffer) writeProduceRequestV3(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) {
  326. h := requestHeader{
  327. ApiKey: int16(produce),
  328. ApiVersion: int16(v3),
  329. CorrelationID: correlationID,
  330. ClientID: clientID,
  331. }
  332. h.Size = (h.size() - 4) +
  333. sizeofNullableString(transactionalID) +
  334. 2 + // required acks
  335. 4 + // timeout
  336. 4 + // topic array length
  337. sizeofString(topic) + // topic
  338. 4 + // partition array length
  339. 4 + // partition
  340. 4 + // message set size
  341. recordBatch.size
  342. h.writeTo(wb)
  343. wb.writeNullableString(transactionalID)
  344. wb.writeInt16(requiredAcks) // required acks
  345. wb.writeInt32(milliseconds(timeout))
  346. // topic array
  347. wb.writeArrayLen(1)
  348. wb.writeString(topic)
  349. // partition array
  350. wb.writeArrayLen(1)
  351. wb.writeInt32(partition)
  352. recordBatch.writeTo(wb)
  353. return wb.Flush()
  354. }
  355. func (wb *writeBuffer) writeProduceRequestV7(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) {
  356. h := requestHeader{
  357. ApiKey: int16(produce),
  358. ApiVersion: int16(v7),
  359. CorrelationID: correlationID,
  360. ClientID: clientID,
  361. }
  362. h.Size = (h.size() - 4) +
  363. sizeofNullableString(transactionalID) +
  364. 2 + // required acks
  365. 4 + // timeout
  366. 4 + // topic array length
  367. sizeofString(topic) + // topic
  368. 4 + // partition array length
  369. 4 + // partition
  370. 4 + // message set size
  371. recordBatch.size
  372. h.writeTo(wb)
  373. wb.writeNullableString(transactionalID)
  374. wb.writeInt16(requiredAcks) // required acks
  375. wb.writeInt32(milliseconds(timeout))
  376. // topic array
  377. wb.writeArrayLen(1)
  378. wb.writeString(topic)
  379. // partition array
  380. wb.writeArrayLen(1)
  381. wb.writeInt32(partition)
  382. recordBatch.writeTo(wb)
  383. return wb.Flush()
  384. }
  385. func (wb *writeBuffer) writeRecordBatch(attributes int16, size int32, count int, baseTime, lastTime time.Time, write func(*writeBuffer)) {
  386. var (
  387. baseTimestamp = timestamp(baseTime)
  388. lastTimestamp = timestamp(lastTime)
  389. lastOffsetDelta = int32(count - 1)
  390. producerID = int64(-1) // default producer id for now
  391. producerEpoch = int16(-1) // default producer epoch for now
  392. baseSequence = int32(-1) // default base sequence
  393. recordCount = int32(count) // record count
  394. writerBackup = wb.w
  395. )
  396. // dry run to compute the checksum
  397. cw := &crc32Writer{table: crc32.MakeTable(crc32.Castagnoli)}
  398. wb.w = cw
  399. cw.writeInt16(attributes) // attributes, timestamp type 0 - create time, not part of a transaction, no control messages
  400. cw.writeInt32(lastOffsetDelta)
  401. cw.writeInt64(baseTimestamp)
  402. cw.writeInt64(lastTimestamp)
  403. cw.writeInt64(producerID)
  404. cw.writeInt16(producerEpoch)
  405. cw.writeInt32(baseSequence)
  406. cw.writeInt32(recordCount)
  407. write(wb)
  408. wb.w = writerBackup
  409. // actual write to the output buffer
  410. wb.writeInt64(int64(0))
  411. wb.writeInt32(int32(size - 12)) // 12 = batch length + base offset sizes
  412. wb.writeInt32(-1) // partition leader epoch
  413. wb.writeInt8(2) // magic byte
  414. wb.writeInt32(int32(cw.crc32))
  415. wb.writeInt16(attributes)
  416. wb.writeInt32(lastOffsetDelta)
  417. wb.writeInt64(baseTimestamp)
  418. wb.writeInt64(lastTimestamp)
  419. wb.writeInt64(producerID)
  420. wb.writeInt16(producerEpoch)
  421. wb.writeInt32(baseSequence)
  422. wb.writeInt32(recordCount)
  423. write(wb)
  424. }
  425. func compressMessageSet(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int8, size int32, err error) {
  426. compressed = acquireBuffer()
  427. compressor := codec.NewWriter(compressed)
  428. wb := &writeBuffer{w: compressor}
  429. cw := &crc32Writer{table: crc32.IEEETable}
  430. for offset, msg := range msgs {
  431. wb.writeMessage(int64(offset), 0, msg.Time, msg.Key, msg.Value, cw)
  432. }
  433. if err = compressor.Close(); err != nil {
  434. releaseBuffer(compressed)
  435. return
  436. }
  437. attributes = codec.Code()
  438. size = messageSetSize(Message{Value: compressed.Bytes()})
  439. return
  440. }
  441. func (wb *writeBuffer) writeMessage(offset int64, attributes int8, time time.Time, key, value []byte, cw *crc32Writer) {
  442. const magicByte = 1 // compatible with kafka 0.10.0.0+
  443. timestamp := timestamp(time)
  444. size := messageSize(key, value)
  445. // dry run to compute the checksum
  446. cw.crc32 = 0
  447. cw.writeInt8(magicByte)
  448. cw.writeInt8(attributes)
  449. cw.writeInt64(timestamp)
  450. cw.writeBytes(key)
  451. cw.writeBytes(value)
  452. // actual write to the output buffer
  453. wb.writeInt64(offset)
  454. wb.writeInt32(size)
  455. wb.writeInt32(int32(cw.crc32))
  456. wb.writeInt8(magicByte)
  457. wb.writeInt8(attributes)
  458. wb.writeInt64(timestamp)
  459. wb.writeBytes(key)
  460. wb.writeBytes(value)
  461. }
  462. // Messages with magic >2 are called records. This method writes messages using message format 2.
  463. func (wb *writeBuffer) writeRecord(attributes int8, baseTime time.Time, offset int64, msg Message) {
  464. timestampDelta := msg.Time.Sub(baseTime)
  465. offsetDelta := int64(offset)
  466. wb.writeVarInt(int64(recordSize(&msg, timestampDelta, offsetDelta)))
  467. wb.writeInt8(attributes)
  468. wb.writeVarInt(int64(milliseconds(timestampDelta)))
  469. wb.writeVarInt(offsetDelta)
  470. wb.writeVarBytes(msg.Key)
  471. wb.writeVarBytes(msg.Value)
  472. wb.writeVarArray(len(msg.Headers), func(i int) {
  473. h := &msg.Headers[i]
  474. wb.writeVarString(h.Key)
  475. wb.writeVarBytes(h.Value)
  476. })
  477. }
  478. func varIntLen(i int64) int {
  479. u := uint64((i << 1) ^ (i >> 63)) // zig-zag encoding
  480. n := 0
  481. for u >= 0x80 {
  482. u >>= 7
  483. n++
  484. }
  485. return n + 1
  486. }
  487. func varBytesLen(b []byte) int {
  488. return varIntLen(int64(len(b))) + len(b)
  489. }
  490. func varStringLen(s string) int {
  491. return varIntLen(int64(len(s))) + len(s)
  492. }
  493. func varArrayLen(n int, f func(int) int) int {
  494. size := varIntLen(int64(n))
  495. for i := 0; i < n; i++ {
  496. size += f(i)
  497. }
  498. return size
  499. }
  500. func messageSize(key, value []byte) int32 {
  501. return 4 + // crc
  502. 1 + // magic byte
  503. 1 + // attributes
  504. 8 + // timestamp
  505. sizeofBytes(key) +
  506. sizeofBytes(value)
  507. }
  508. func messageSetSize(msgs ...Message) (size int32) {
  509. for _, msg := range msgs {
  510. size += 8 + // offset
  511. 4 + // message size
  512. 4 + // crc
  513. 1 + // magic byte
  514. 1 + // attributes
  515. 8 + // timestamp
  516. sizeofBytes(msg.Key) +
  517. sizeofBytes(msg.Value)
  518. }
  519. return
  520. }