encode.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639
  1. package protocol
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "hash/crc32"
  7. "io"
  8. "reflect"
  9. "sync"
  10. "sync/atomic"
  11. )
  12. type encoder struct {
  13. writer io.Writer
  14. err error
  15. table *crc32.Table
  16. crc32 uint32
  17. buffer [32]byte
  18. }
  19. type encoderChecksum struct {
  20. reader io.Reader
  21. encoder *encoder
  22. }
  23. func (e *encoderChecksum) Read(b []byte) (int, error) {
  24. n, err := e.reader.Read(b)
  25. if n > 0 {
  26. e.encoder.update(b[:n])
  27. }
  28. return n, err
  29. }
  30. func (e *encoder) Reset(w io.Writer) {
  31. e.writer = w
  32. e.err = nil
  33. e.table = nil
  34. e.crc32 = 0
  35. e.buffer = [32]byte{}
  36. }
  37. func (e *encoder) ReadFrom(r io.Reader) (int64, error) {
  38. if e.table != nil {
  39. r = &encoderChecksum{
  40. reader: r,
  41. encoder: e,
  42. }
  43. }
  44. return io.Copy(e.writer, r)
  45. }
  46. func (e *encoder) Write(b []byte) (int, error) {
  47. if e.err != nil {
  48. return 0, e.err
  49. }
  50. n, err := e.writer.Write(b)
  51. if n > 0 {
  52. e.update(b[:n])
  53. }
  54. if err != nil {
  55. e.err = err
  56. }
  57. return n, err
  58. }
  59. func (e *encoder) WriteByte(b byte) error {
  60. e.buffer[0] = b
  61. _, err := e.Write(e.buffer[:1])
  62. return err
  63. }
  64. func (e *encoder) WriteString(s string) (int, error) {
  65. // This implementation is an optimization to avoid the heap allocation that
  66. // would occur when converting the string to a []byte to call crc32.Update.
  67. //
  68. // Strings are rarely long in the kafka protocol, so the use of a 32 byte
  69. // buffer is a good comprise between keeping the encoder value small and
  70. // limiting the number of calls to Write.
  71. //
  72. // We introduced this optimization because memory profiles on the benchmarks
  73. // showed that most heap allocations were caused by this code path.
  74. n := 0
  75. for len(s) != 0 {
  76. c := copy(e.buffer[:], s)
  77. w, err := e.Write(e.buffer[:c])
  78. n += w
  79. if err != nil {
  80. return n, err
  81. }
  82. s = s[c:]
  83. }
  84. return n, nil
  85. }
  86. func (e *encoder) setCRC(table *crc32.Table) {
  87. e.table, e.crc32 = table, 0
  88. }
  89. func (e *encoder) update(b []byte) {
  90. if e.table != nil {
  91. e.crc32 = crc32.Update(e.crc32, e.table, b)
  92. }
  93. }
  94. func (e *encoder) encodeBool(v value) {
  95. b := int8(0)
  96. if v.bool() {
  97. b = 1
  98. }
  99. e.writeInt8(b)
  100. }
  101. func (e *encoder) encodeInt8(v value) {
  102. e.writeInt8(v.int8())
  103. }
  104. func (e *encoder) encodeInt16(v value) {
  105. e.writeInt16(v.int16())
  106. }
  107. func (e *encoder) encodeInt32(v value) {
  108. e.writeInt32(v.int32())
  109. }
  110. func (e *encoder) encodeInt64(v value) {
  111. e.writeInt64(v.int64())
  112. }
  113. func (e *encoder) encodeString(v value) {
  114. e.writeString(v.string())
  115. }
  116. func (e *encoder) encodeVarString(v value) {
  117. e.writeVarString(v.string())
  118. }
  119. func (e *encoder) encodeCompactString(v value) {
  120. e.writeCompactString(v.string())
  121. }
  122. func (e *encoder) encodeNullString(v value) {
  123. e.writeNullString(v.string())
  124. }
  125. func (e *encoder) encodeVarNullString(v value) {
  126. e.writeVarNullString(v.string())
  127. }
  128. func (e *encoder) encodeCompactNullString(v value) {
  129. e.writeCompactNullString(v.string())
  130. }
  131. func (e *encoder) encodeBytes(v value) {
  132. e.writeBytes(v.bytes())
  133. }
  134. func (e *encoder) encodeVarBytes(v value) {
  135. e.writeVarBytes(v.bytes())
  136. }
  137. func (e *encoder) encodeCompactBytes(v value) {
  138. e.writeCompactBytes(v.bytes())
  139. }
  140. func (e *encoder) encodeNullBytes(v value) {
  141. e.writeNullBytes(v.bytes())
  142. }
  143. func (e *encoder) encodeVarNullBytes(v value) {
  144. e.writeVarNullBytes(v.bytes())
  145. }
  146. func (e *encoder) encodeCompactNullBytes(v value) {
  147. e.writeCompactNullBytes(v.bytes())
  148. }
  149. func (e *encoder) encodeArray(v value, elemType reflect.Type, encodeElem encodeFunc) {
  150. a := v.array(elemType)
  151. n := a.length()
  152. e.writeInt32(int32(n))
  153. for i := 0; i < n; i++ {
  154. encodeElem(e, a.index(i))
  155. }
  156. }
  157. func (e *encoder) encodeCompactArray(v value, elemType reflect.Type, encodeElem encodeFunc) {
  158. a := v.array(elemType)
  159. n := a.length()
  160. e.writeUnsignedVarInt(uint64(n + 1))
  161. for i := 0; i < n; i++ {
  162. encodeElem(e, a.index(i))
  163. }
  164. }
  165. func (e *encoder) encodeNullArray(v value, elemType reflect.Type, encodeElem encodeFunc) {
  166. a := v.array(elemType)
  167. if a.isNil() {
  168. e.writeInt32(-1)
  169. return
  170. }
  171. n := a.length()
  172. e.writeInt32(int32(n))
  173. for i := 0; i < n; i++ {
  174. encodeElem(e, a.index(i))
  175. }
  176. }
  177. func (e *encoder) encodeCompactNullArray(v value, elemType reflect.Type, encodeElem encodeFunc) {
  178. a := v.array(elemType)
  179. if a.isNil() {
  180. e.writeUnsignedVarInt(0)
  181. return
  182. }
  183. n := a.length()
  184. e.writeUnsignedVarInt(uint64(n + 1))
  185. for i := 0; i < n; i++ {
  186. encodeElem(e, a.index(i))
  187. }
  188. }
  189. func (e *encoder) writeInt8(i int8) {
  190. writeInt8(e.buffer[:1], i)
  191. e.Write(e.buffer[:1])
  192. }
  193. func (e *encoder) writeInt16(i int16) {
  194. writeInt16(e.buffer[:2], i)
  195. e.Write(e.buffer[:2])
  196. }
  197. func (e *encoder) writeInt32(i int32) {
  198. writeInt32(e.buffer[:4], i)
  199. e.Write(e.buffer[:4])
  200. }
  201. func (e *encoder) writeInt64(i int64) {
  202. writeInt64(e.buffer[:8], i)
  203. e.Write(e.buffer[:8])
  204. }
  205. func (e *encoder) writeString(s string) {
  206. e.writeInt16(int16(len(s)))
  207. e.WriteString(s)
  208. }
  209. func (e *encoder) writeVarString(s string) {
  210. e.writeVarInt(int64(len(s)))
  211. e.WriteString(s)
  212. }
  213. func (e *encoder) writeCompactString(s string) {
  214. e.writeUnsignedVarInt(uint64(len(s)) + 1)
  215. e.WriteString(s)
  216. }
  217. func (e *encoder) writeNullString(s string) {
  218. if s == "" {
  219. e.writeInt16(-1)
  220. } else {
  221. e.writeInt16(int16(len(s)))
  222. e.WriteString(s)
  223. }
  224. }
  225. func (e *encoder) writeVarNullString(s string) {
  226. if s == "" {
  227. e.writeVarInt(-1)
  228. } else {
  229. e.writeVarInt(int64(len(s)))
  230. e.WriteString(s)
  231. }
  232. }
  233. func (e *encoder) writeCompactNullString(s string) {
  234. if s == "" {
  235. e.writeUnsignedVarInt(0)
  236. } else {
  237. e.writeUnsignedVarInt(uint64(len(s)) + 1)
  238. e.WriteString(s)
  239. }
  240. }
  241. func (e *encoder) writeBytes(b []byte) {
  242. e.writeInt32(int32(len(b)))
  243. e.Write(b)
  244. }
  245. func (e *encoder) writeVarBytes(b []byte) {
  246. e.writeVarInt(int64(len(b)))
  247. e.Write(b)
  248. }
  249. func (e *encoder) writeCompactBytes(b []byte) {
  250. e.writeUnsignedVarInt(uint64(len(b)) + 1)
  251. e.Write(b)
  252. }
  253. func (e *encoder) writeNullBytes(b []byte) {
  254. if b == nil {
  255. e.writeInt32(-1)
  256. } else {
  257. e.writeInt32(int32(len(b)))
  258. e.Write(b)
  259. }
  260. }
  261. func (e *encoder) writeVarNullBytes(b []byte) {
  262. if b == nil {
  263. e.writeVarInt(-1)
  264. } else {
  265. e.writeVarInt(int64(len(b)))
  266. e.Write(b)
  267. }
  268. }
  269. func (e *encoder) writeCompactNullBytes(b []byte) {
  270. if b == nil {
  271. e.writeUnsignedVarInt(0)
  272. } else {
  273. e.writeUnsignedVarInt(uint64(len(b)) + 1)
  274. e.Write(b)
  275. }
  276. }
  277. func (e *encoder) writeBytesFrom(b Bytes) error {
  278. size := int64(b.Len())
  279. e.writeInt32(int32(size))
  280. n, err := io.Copy(e, b)
  281. if err == nil && n != size {
  282. err = fmt.Errorf("size of bytes does not match the number of bytes that were written (size=%d, written=%d): %w", size, n, io.ErrUnexpectedEOF)
  283. }
  284. return err
  285. }
  286. func (e *encoder) writeNullBytesFrom(b Bytes) error {
  287. if b == nil {
  288. e.writeInt32(-1)
  289. return nil
  290. } else {
  291. size := int64(b.Len())
  292. e.writeInt32(int32(size))
  293. n, err := io.Copy(e, b)
  294. if err == nil && n != size {
  295. err = fmt.Errorf("size of nullable bytes does not match the number of bytes that were written (size=%d, written=%d): %w", size, n, io.ErrUnexpectedEOF)
  296. }
  297. return err
  298. }
  299. }
  300. func (e *encoder) writeVarNullBytesFrom(b Bytes) error {
  301. if b == nil {
  302. e.writeVarInt(-1)
  303. return nil
  304. } else {
  305. size := int64(b.Len())
  306. e.writeVarInt(size)
  307. n, err := io.Copy(e, b)
  308. if err == nil && n != size {
  309. err = fmt.Errorf("size of nullable bytes does not match the number of bytes that were written (size=%d, written=%d): %w", size, n, io.ErrUnexpectedEOF)
  310. }
  311. return err
  312. }
  313. }
  314. func (e *encoder) writeCompactNullBytesFrom(b Bytes) error {
  315. if b == nil {
  316. e.writeUnsignedVarInt(0)
  317. return nil
  318. } else {
  319. size := int64(b.Len())
  320. e.writeUnsignedVarInt(uint64(size + 1))
  321. n, err := io.Copy(e, b)
  322. if err == nil && n != size {
  323. err = fmt.Errorf("size of compact nullable bytes does not match the number of bytes that were written (size=%d, written=%d): %w", size, n, io.ErrUnexpectedEOF)
  324. }
  325. return err
  326. }
  327. }
  328. func (e *encoder) writeVarInt(i int64) {
  329. e.writeUnsignedVarInt(uint64((i << 1) ^ (i >> 63)))
  330. }
  331. func (e *encoder) writeUnsignedVarInt(i uint64) {
  332. b := e.buffer[:]
  333. n := 0
  334. for i >= 0x80 && n < len(b) {
  335. b[n] = byte(i) | 0x80
  336. i >>= 7
  337. n++
  338. }
  339. if n < len(b) {
  340. b[n] = byte(i)
  341. n++
  342. }
  343. e.Write(b[:n])
  344. }
  345. type encodeFunc func(*encoder, value)
  346. var (
  347. _ io.ReaderFrom = (*encoder)(nil)
  348. _ io.Writer = (*encoder)(nil)
  349. _ io.ByteWriter = (*encoder)(nil)
  350. _ io.StringWriter = (*encoder)(nil)
  351. writerTo = reflect.TypeOf((*io.WriterTo)(nil)).Elem()
  352. )
  353. func encodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) encodeFunc {
  354. if reflect.PtrTo(typ).Implements(writerTo) {
  355. return writerEncodeFuncOf(typ)
  356. }
  357. switch typ.Kind() {
  358. case reflect.Bool:
  359. return (*encoder).encodeBool
  360. case reflect.Int8:
  361. return (*encoder).encodeInt8
  362. case reflect.Int16:
  363. return (*encoder).encodeInt16
  364. case reflect.Int32:
  365. return (*encoder).encodeInt32
  366. case reflect.Int64:
  367. return (*encoder).encodeInt64
  368. case reflect.String:
  369. return stringEncodeFuncOf(flexible, tag)
  370. case reflect.Struct:
  371. return structEncodeFuncOf(typ, version, flexible)
  372. case reflect.Slice:
  373. if typ.Elem().Kind() == reflect.Uint8 { // []byte
  374. return bytesEncodeFuncOf(flexible, tag)
  375. }
  376. return arrayEncodeFuncOf(typ, version, flexible, tag)
  377. default:
  378. panic("unsupported type: " + typ.String())
  379. }
  380. }
  381. func stringEncodeFuncOf(flexible bool, tag structTag) encodeFunc {
  382. switch {
  383. case flexible && tag.Nullable:
  384. // In flexible messages, all strings are compact
  385. return (*encoder).encodeCompactNullString
  386. case flexible:
  387. // In flexible messages, all strings are compact
  388. return (*encoder).encodeCompactString
  389. case tag.Nullable:
  390. return (*encoder).encodeNullString
  391. default:
  392. return (*encoder).encodeString
  393. }
  394. }
  395. func bytesEncodeFuncOf(flexible bool, tag structTag) encodeFunc {
  396. switch {
  397. case flexible && tag.Nullable:
  398. // In flexible messages, all arrays are compact
  399. return (*encoder).encodeCompactNullBytes
  400. case flexible:
  401. // In flexible messages, all arrays are compact
  402. return (*encoder).encodeCompactBytes
  403. case tag.Nullable:
  404. return (*encoder).encodeNullBytes
  405. default:
  406. return (*encoder).encodeBytes
  407. }
  408. }
  409. func structEncodeFuncOf(typ reflect.Type, version int16, flexible bool) encodeFunc {
  410. type field struct {
  411. encode encodeFunc
  412. index index
  413. tagID int
  414. }
  415. var fields []field
  416. var taggedFields []field
  417. forEachStructField(typ, func(typ reflect.Type, index index, tag string) {
  418. if typ.Size() != 0 { // skip struct{}
  419. forEachStructTag(tag, func(tag structTag) bool {
  420. if tag.MinVersion <= version && version <= tag.MaxVersion {
  421. f := field{
  422. encode: encodeFuncOf(typ, version, flexible, tag),
  423. index: index,
  424. tagID: tag.TagID,
  425. }
  426. if tag.TagID < -1 {
  427. // Normal required field
  428. fields = append(fields, f)
  429. } else {
  430. // Optional tagged field (flexible messages only)
  431. taggedFields = append(taggedFields, f)
  432. }
  433. return false
  434. }
  435. return true
  436. })
  437. }
  438. })
  439. return func(e *encoder, v value) {
  440. for i := range fields {
  441. f := &fields[i]
  442. f.encode(e, v.fieldByIndex(f.index))
  443. }
  444. if flexible {
  445. // See https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields
  446. // for details of tag buffers in "flexible" messages.
  447. e.writeUnsignedVarInt(uint64(len(taggedFields)))
  448. for i := range taggedFields {
  449. f := &taggedFields[i]
  450. e.writeUnsignedVarInt(uint64(f.tagID))
  451. buf := &bytes.Buffer{}
  452. se := &encoder{writer: buf}
  453. f.encode(se, v.fieldByIndex(f.index))
  454. e.writeUnsignedVarInt(uint64(buf.Len()))
  455. e.Write(buf.Bytes())
  456. }
  457. }
  458. }
  459. }
  460. func arrayEncodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) encodeFunc {
  461. elemType := typ.Elem()
  462. elemFunc := encodeFuncOf(elemType, version, flexible, tag)
  463. switch {
  464. case flexible && tag.Nullable:
  465. // In flexible messages, all arrays are compact
  466. return func(e *encoder, v value) { e.encodeCompactNullArray(v, elemType, elemFunc) }
  467. case flexible:
  468. // In flexible messages, all arrays are compact
  469. return func(e *encoder, v value) { e.encodeCompactArray(v, elemType, elemFunc) }
  470. case tag.Nullable:
  471. return func(e *encoder, v value) { e.encodeNullArray(v, elemType, elemFunc) }
  472. default:
  473. return func(e *encoder, v value) { e.encodeArray(v, elemType, elemFunc) }
  474. }
  475. }
  476. func writerEncodeFuncOf(typ reflect.Type) encodeFunc {
  477. typ = reflect.PtrTo(typ)
  478. return func(e *encoder, v value) {
  479. // Optimization to write directly into the buffer when the encoder
  480. // does no need to compute a crc32 checksum.
  481. w := io.Writer(e)
  482. if e.table == nil {
  483. w = e.writer
  484. }
  485. _, err := v.iface(typ).(io.WriterTo).WriteTo(w)
  486. if err != nil {
  487. e.err = err
  488. }
  489. }
  490. }
  491. func writeInt8(b []byte, i int8) {
  492. b[0] = byte(i)
  493. }
  494. func writeInt16(b []byte, i int16) {
  495. binary.BigEndian.PutUint16(b, uint16(i))
  496. }
  497. func writeInt32(b []byte, i int32) {
  498. binary.BigEndian.PutUint32(b, uint32(i))
  499. }
  500. func writeInt64(b []byte, i int64) {
  501. binary.BigEndian.PutUint64(b, uint64(i))
  502. }
  503. func Marshal(version int16, value interface{}) ([]byte, error) {
  504. typ := typeOf(value)
  505. cache, _ := marshalers.Load().(map[_type]encodeFunc)
  506. encode := cache[typ]
  507. if encode == nil {
  508. encode = encodeFuncOf(reflect.TypeOf(value), version, false, structTag{
  509. MinVersion: -1,
  510. MaxVersion: -1,
  511. TagID: -2,
  512. Compact: true,
  513. Nullable: true,
  514. })
  515. newCache := make(map[_type]encodeFunc, len(cache)+1)
  516. newCache[typ] = encode
  517. for typ, fun := range cache {
  518. newCache[typ] = fun
  519. }
  520. marshalers.Store(newCache)
  521. }
  522. e, _ := encoders.Get().(*encoder)
  523. if e == nil {
  524. e = &encoder{writer: new(bytes.Buffer)}
  525. }
  526. b, _ := e.writer.(*bytes.Buffer)
  527. defer func() {
  528. b.Reset()
  529. e.Reset(b)
  530. encoders.Put(e)
  531. }()
  532. encode(e, nonAddressableValueOf(value))
  533. if e.err != nil {
  534. return nil, e.err
  535. }
  536. buf := b.Bytes()
  537. out := make([]byte, len(buf))
  538. copy(out, buf)
  539. return out, nil
  540. }
  541. var (
  542. encoders sync.Pool // *encoder
  543. marshalers atomic.Value // map[_type]encodeFunc
  544. )