buffer.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645
  1. package protocol
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "math"
  7. "sync"
  8. "sync/atomic"
  9. )
  10. // Bytes is an interface implemented by types that represent immutable
  11. // sequences of bytes.
  12. //
  13. // Bytes values are used to abstract the location where record keys and
  14. // values are read from (e.g. in-memory buffers, network sockets, files).
  15. //
  16. // The Close method should be called to release resources held by the object
  17. // when the program is done with it.
  18. //
  19. // Bytes values are generally not safe to use concurrently from multiple
  20. // goroutines.
  21. type Bytes interface {
  22. io.ReadCloser
  23. // Returns the number of bytes remaining to be read from the payload.
  24. Len() int
  25. }
  26. // NewBytes constructs a Bytes value from b.
  27. //
  28. // The returned value references b, it does not make a copy of the backing
  29. // array.
  30. //
  31. // If b is nil, nil is returned to represent a null BYTES value in the kafka
  32. // protocol.
  33. func NewBytes(b []byte) Bytes {
  34. if b == nil {
  35. return nil
  36. }
  37. r := new(bytesReader)
  38. r.Reset(b)
  39. return r
  40. }
  41. // ReadAll is similar to ioutil.ReadAll, but it takes advantage of knowing the
  42. // length of b to minimize the memory footprint.
  43. //
  44. // The function returns a nil slice if b is nil.
  45. func ReadAll(b Bytes) ([]byte, error) {
  46. if b == nil {
  47. return nil, nil
  48. }
  49. s := make([]byte, b.Len())
  50. _, err := io.ReadFull(b, s)
  51. return s, err
  52. }
  53. type bytesReader struct{ bytes.Reader }
  54. func (*bytesReader) Close() error { return nil }
  55. type refCount uintptr
  56. func (rc *refCount) ref() { atomic.AddUintptr((*uintptr)(rc), 1) }
  57. func (rc *refCount) unref(onZero func()) {
  58. if atomic.AddUintptr((*uintptr)(rc), ^uintptr(0)) == 0 {
  59. onZero()
  60. }
  61. }
  62. const (
  63. // Size of the memory buffer for a single page. We use a farily
  64. // large size here (64 KiB) because batches exchanged with kafka
  65. // tend to be multiple kilobytes in size, sometimes hundreds.
  66. // Using large pages amortizes the overhead of the page metadata
  67. // and algorithms to manage the pages.
  68. pageSize = 65536
  69. )
  70. type page struct {
  71. refc refCount
  72. offset int64
  73. length int
  74. buffer *[pageSize]byte
  75. }
  76. func newPage(offset int64) *page {
  77. p, _ := pagePool.Get().(*page)
  78. if p != nil {
  79. p.offset = offset
  80. p.length = 0
  81. p.ref()
  82. } else {
  83. p = &page{
  84. refc: 1,
  85. offset: offset,
  86. buffer: &[pageSize]byte{},
  87. }
  88. }
  89. return p
  90. }
  91. func (p *page) ref() { p.refc.ref() }
  92. func (p *page) unref() { p.refc.unref(func() { pagePool.Put(p) }) }
  93. func (p *page) slice(begin, end int64) []byte {
  94. i, j := begin-p.offset, end-p.offset
  95. if i < 0 {
  96. i = 0
  97. } else if i > pageSize {
  98. i = pageSize
  99. }
  100. if j < 0 {
  101. j = 0
  102. } else if j > pageSize {
  103. j = pageSize
  104. }
  105. if i < j {
  106. return p.buffer[i:j]
  107. }
  108. return nil
  109. }
  110. func (p *page) Cap() int { return pageSize }
  111. func (p *page) Len() int { return p.length }
  112. func (p *page) Size() int64 { return int64(p.length) }
  113. func (p *page) Truncate(n int) {
  114. if n < p.length {
  115. p.length = n
  116. }
  117. }
  118. func (p *page) ReadAt(b []byte, off int64) (int, error) {
  119. if off -= p.offset; off < 0 || off > pageSize {
  120. panic("offset out of range")
  121. }
  122. if off > int64(p.length) {
  123. return 0, nil
  124. }
  125. return copy(b, p.buffer[off:p.length]), nil
  126. }
  127. func (p *page) ReadFrom(r io.Reader) (int64, error) {
  128. n, err := io.ReadFull(r, p.buffer[p.length:])
  129. if err == io.EOF || err == io.ErrUnexpectedEOF {
  130. err = nil
  131. }
  132. p.length += n
  133. return int64(n), err
  134. }
  135. func (p *page) WriteAt(b []byte, off int64) (int, error) {
  136. if off -= p.offset; off < 0 || off > pageSize {
  137. panic("offset out of range")
  138. }
  139. n := copy(p.buffer[off:], b)
  140. if end := int(off) + n; end > p.length {
  141. p.length = end
  142. }
  143. return n, nil
  144. }
  145. func (p *page) Write(b []byte) (int, error) {
  146. return p.WriteAt(b, p.offset+int64(p.length))
  147. }
  148. var (
  149. _ io.ReaderAt = (*page)(nil)
  150. _ io.ReaderFrom = (*page)(nil)
  151. _ io.Writer = (*page)(nil)
  152. _ io.WriterAt = (*page)(nil)
  153. )
  154. type pageBuffer struct {
  155. refc refCount
  156. pages contiguousPages
  157. length int
  158. cursor int
  159. }
  160. func newPageBuffer() *pageBuffer {
  161. b, _ := pageBufferPool.Get().(*pageBuffer)
  162. if b != nil {
  163. b.cursor = 0
  164. b.refc.ref()
  165. } else {
  166. b = &pageBuffer{
  167. refc: 1,
  168. pages: make(contiguousPages, 0, 16),
  169. }
  170. }
  171. return b
  172. }
  173. func (pb *pageBuffer) refTo(ref *pageRef, begin, end int64) {
  174. length := end - begin
  175. if length > math.MaxUint32 {
  176. panic("reference to contiguous buffer pages exceeds the maximum size of 4 GB")
  177. }
  178. ref.pages = append(ref.buffer[:0], pb.pages.slice(begin, end)...)
  179. ref.pages.ref()
  180. ref.offset = begin
  181. ref.length = uint32(length)
  182. }
  183. func (pb *pageBuffer) ref(begin, end int64) *pageRef {
  184. ref := new(pageRef)
  185. pb.refTo(ref, begin, end)
  186. return ref
  187. }
  188. func (pb *pageBuffer) unref() {
  189. pb.refc.unref(func() {
  190. pb.pages.unref()
  191. pb.pages.clear()
  192. pb.pages = pb.pages[:0]
  193. pb.length = 0
  194. pageBufferPool.Put(pb)
  195. })
  196. }
  197. func (pb *pageBuffer) newPage() *page {
  198. return newPage(int64(pb.length))
  199. }
  200. func (pb *pageBuffer) Close() error {
  201. return nil
  202. }
  203. func (pb *pageBuffer) Len() int {
  204. return pb.length - pb.cursor
  205. }
  206. func (pb *pageBuffer) Size() int64 {
  207. return int64(pb.length)
  208. }
  209. func (pb *pageBuffer) Discard(n int) (int, error) {
  210. remain := pb.length - pb.cursor
  211. if remain < n {
  212. n = remain
  213. }
  214. pb.cursor += n
  215. return n, nil
  216. }
  217. func (pb *pageBuffer) Truncate(n int) {
  218. if n < pb.length {
  219. pb.length = n
  220. if n < pb.cursor {
  221. pb.cursor = n
  222. }
  223. for i := range pb.pages {
  224. if p := pb.pages[i]; p.length <= n {
  225. n -= p.length
  226. } else {
  227. if n > 0 {
  228. pb.pages[i].Truncate(n)
  229. i++
  230. }
  231. pb.pages[i:].unref()
  232. pb.pages[i:].clear()
  233. pb.pages = pb.pages[:i]
  234. break
  235. }
  236. }
  237. }
  238. }
  239. func (pb *pageBuffer) Seek(offset int64, whence int) (int64, error) {
  240. c, err := seek(int64(pb.cursor), int64(pb.length), offset, whence)
  241. if err != nil {
  242. return -1, err
  243. }
  244. pb.cursor = int(c)
  245. return c, nil
  246. }
  247. func (pb *pageBuffer) ReadByte() (byte, error) {
  248. b := [1]byte{}
  249. _, err := pb.Read(b[:])
  250. return b[0], err
  251. }
  252. func (pb *pageBuffer) Read(b []byte) (int, error) {
  253. if pb.cursor >= pb.length {
  254. return 0, io.EOF
  255. }
  256. n, err := pb.ReadAt(b, int64(pb.cursor))
  257. pb.cursor += n
  258. return n, err
  259. }
  260. func (pb *pageBuffer) ReadAt(b []byte, off int64) (int, error) {
  261. return pb.pages.ReadAt(b, off)
  262. }
  263. func (pb *pageBuffer) ReadFrom(r io.Reader) (int64, error) {
  264. if len(pb.pages) == 0 {
  265. pb.pages = append(pb.pages, pb.newPage())
  266. }
  267. rn := int64(0)
  268. for {
  269. tail := pb.pages[len(pb.pages)-1]
  270. free := tail.Cap() - tail.Len()
  271. if free == 0 {
  272. tail = pb.newPage()
  273. free = pageSize
  274. pb.pages = append(pb.pages, tail)
  275. }
  276. n, err := tail.ReadFrom(r)
  277. pb.length += int(n)
  278. rn += n
  279. if n < int64(free) {
  280. return rn, err
  281. }
  282. }
  283. }
  284. func (pb *pageBuffer) WriteString(s string) (int, error) {
  285. return pb.Write([]byte(s))
  286. }
  287. func (pb *pageBuffer) Write(b []byte) (int, error) {
  288. wn := len(b)
  289. if wn == 0 {
  290. return 0, nil
  291. }
  292. if len(pb.pages) == 0 {
  293. pb.pages = append(pb.pages, pb.newPage())
  294. }
  295. for len(b) != 0 {
  296. tail := pb.pages[len(pb.pages)-1]
  297. free := tail.Cap() - tail.Len()
  298. if len(b) <= free {
  299. tail.Write(b)
  300. pb.length += len(b)
  301. break
  302. }
  303. tail.Write(b[:free])
  304. b = b[free:]
  305. pb.length += free
  306. pb.pages = append(pb.pages, pb.newPage())
  307. }
  308. return wn, nil
  309. }
  310. func (pb *pageBuffer) WriteAt(b []byte, off int64) (int, error) {
  311. n, err := pb.pages.WriteAt(b, off)
  312. if err != nil {
  313. return n, err
  314. }
  315. if n < len(b) {
  316. pb.Write(b[n:])
  317. }
  318. return len(b), nil
  319. }
  320. func (pb *pageBuffer) WriteTo(w io.Writer) (int64, error) {
  321. var wn int
  322. var err error
  323. pb.pages.scan(int64(pb.cursor), int64(pb.length), func(b []byte) bool {
  324. var n int
  325. n, err = w.Write(b)
  326. wn += n
  327. return err == nil
  328. })
  329. pb.cursor += wn
  330. return int64(wn), err
  331. }
  332. var (
  333. _ io.ReaderAt = (*pageBuffer)(nil)
  334. _ io.ReaderFrom = (*pageBuffer)(nil)
  335. _ io.StringWriter = (*pageBuffer)(nil)
  336. _ io.Writer = (*pageBuffer)(nil)
  337. _ io.WriterAt = (*pageBuffer)(nil)
  338. _ io.WriterTo = (*pageBuffer)(nil)
  339. pagePool sync.Pool
  340. pageBufferPool sync.Pool
  341. )
  342. type contiguousPages []*page
  343. func (pages contiguousPages) ref() {
  344. for _, p := range pages {
  345. p.ref()
  346. }
  347. }
  348. func (pages contiguousPages) unref() {
  349. for _, p := range pages {
  350. p.unref()
  351. }
  352. }
  353. func (pages contiguousPages) clear() {
  354. for i := range pages {
  355. pages[i] = nil
  356. }
  357. }
  358. func (pages contiguousPages) ReadAt(b []byte, off int64) (int, error) {
  359. rn := 0
  360. for _, p := range pages.slice(off, off+int64(len(b))) {
  361. n, _ := p.ReadAt(b, off)
  362. b = b[n:]
  363. rn += n
  364. off += int64(n)
  365. }
  366. return rn, nil
  367. }
  368. func (pages contiguousPages) WriteAt(b []byte, off int64) (int, error) {
  369. wn := 0
  370. for _, p := range pages.slice(off, off+int64(len(b))) {
  371. n, _ := p.WriteAt(b, off)
  372. b = b[n:]
  373. wn += n
  374. off += int64(n)
  375. }
  376. return wn, nil
  377. }
  378. func (pages contiguousPages) slice(begin, end int64) contiguousPages {
  379. i := pages.indexOf(begin)
  380. j := pages.indexOf(end)
  381. if j < len(pages) {
  382. j++
  383. }
  384. return pages[i:j]
  385. }
  386. func (pages contiguousPages) indexOf(offset int64) int {
  387. if len(pages) == 0 {
  388. return 0
  389. }
  390. return int((offset - pages[0].offset) / pageSize)
  391. }
  392. func (pages contiguousPages) scan(begin, end int64, f func([]byte) bool) {
  393. for _, p := range pages.slice(begin, end) {
  394. if !f(p.slice(begin, end)) {
  395. break
  396. }
  397. }
  398. }
  399. var (
  400. _ io.ReaderAt = contiguousPages{}
  401. _ io.WriterAt = contiguousPages{}
  402. )
  403. type pageRef struct {
  404. buffer [2]*page
  405. pages contiguousPages
  406. offset int64
  407. cursor int64
  408. length uint32
  409. once uint32
  410. }
  411. func (ref *pageRef) unref() {
  412. if atomic.CompareAndSwapUint32(&ref.once, 0, 1) {
  413. ref.pages.unref()
  414. ref.pages.clear()
  415. ref.pages = nil
  416. ref.offset = 0
  417. ref.cursor = 0
  418. ref.length = 0
  419. }
  420. }
  421. func (ref *pageRef) Len() int { return int(ref.Size() - ref.cursor) }
  422. func (ref *pageRef) Size() int64 { return int64(ref.length) }
  423. func (ref *pageRef) Close() error { ref.unref(); return nil }
  424. func (ref *pageRef) String() string {
  425. return fmt.Sprintf("[offset=%d cursor=%d length=%d]", ref.offset, ref.cursor, ref.length)
  426. }
  427. func (ref *pageRef) Seek(offset int64, whence int) (int64, error) {
  428. c, err := seek(ref.cursor, int64(ref.length), offset, whence)
  429. if err != nil {
  430. return -1, err
  431. }
  432. ref.cursor = c
  433. return c, nil
  434. }
  435. func (ref *pageRef) ReadByte() (byte, error) {
  436. var c byte
  437. var ok bool
  438. ref.scan(ref.cursor, func(b []byte) bool {
  439. c, ok = b[0], true
  440. return false
  441. })
  442. if ok {
  443. ref.cursor++
  444. } else {
  445. return 0, io.EOF
  446. }
  447. return c, nil
  448. }
  449. func (ref *pageRef) Read(b []byte) (int, error) {
  450. if ref.cursor >= int64(ref.length) {
  451. return 0, io.EOF
  452. }
  453. n, err := ref.ReadAt(b, ref.cursor)
  454. ref.cursor += int64(n)
  455. return n, err
  456. }
  457. func (ref *pageRef) ReadAt(b []byte, off int64) (int, error) {
  458. limit := ref.offset + int64(ref.length)
  459. off += ref.offset
  460. if off >= limit {
  461. return 0, io.EOF
  462. }
  463. if off+int64(len(b)) > limit {
  464. b = b[:limit-off]
  465. }
  466. if len(b) == 0 {
  467. return 0, nil
  468. }
  469. n, err := ref.pages.ReadAt(b, off)
  470. if n == 0 && err == nil {
  471. err = io.EOF
  472. }
  473. return n, err
  474. }
  475. func (ref *pageRef) WriteTo(w io.Writer) (wn int64, err error) {
  476. ref.scan(ref.cursor, func(b []byte) bool {
  477. var n int
  478. n, err = w.Write(b)
  479. wn += int64(n)
  480. return err == nil
  481. })
  482. ref.cursor += wn
  483. return
  484. }
  485. func (ref *pageRef) scan(off int64, f func([]byte) bool) {
  486. begin := ref.offset + off
  487. end := ref.offset + int64(ref.length)
  488. ref.pages.scan(begin, end, f)
  489. }
  490. var (
  491. _ io.Closer = (*pageRef)(nil)
  492. _ io.Seeker = (*pageRef)(nil)
  493. _ io.Reader = (*pageRef)(nil)
  494. _ io.ReaderAt = (*pageRef)(nil)
  495. _ io.WriterTo = (*pageRef)(nil)
  496. )
  497. type pageRefAllocator struct {
  498. refs []pageRef
  499. head int
  500. size int
  501. }
  502. func (a *pageRefAllocator) newPageRef() *pageRef {
  503. if a.head == len(a.refs) {
  504. a.refs = make([]pageRef, a.size)
  505. a.head = 0
  506. }
  507. ref := &a.refs[a.head]
  508. a.head++
  509. return ref
  510. }
  511. func unref(x interface{}) {
  512. if r, _ := x.(interface{ unref() }); r != nil {
  513. r.unref()
  514. }
  515. }
  516. func seek(cursor, limit, offset int64, whence int) (int64, error) {
  517. switch whence {
  518. case io.SeekStart:
  519. // absolute offset
  520. case io.SeekCurrent:
  521. offset = cursor + offset
  522. case io.SeekEnd:
  523. offset = limit - offset
  524. default:
  525. return -1, fmt.Errorf("seek: invalid whence value: %d", whence)
  526. }
  527. if offset < 0 {
  528. offset = 0
  529. }
  530. if offset > limit {
  531. offset = limit
  532. }
  533. return offset, nil
  534. }
  535. func closeBytes(b Bytes) {
  536. if b != nil {
  537. b.Close()
  538. }
  539. }
  540. func resetBytes(b Bytes) {
  541. if r, _ := b.(interface{ Reset() }); r != nil {
  542. r.Reset()
  543. }
  544. }