lz4.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package lz4
  2. import (
  3. "io"
  4. "sync"
  5. "github.com/pierrec/lz4"
  6. )
  7. var (
  8. readerPool sync.Pool
  9. writerPool sync.Pool
  10. )
  11. // Codec is the implementation of a compress.Codec which supports creating
  12. // readers and writers for kafka messages compressed with lz4.
  13. type Codec struct{}
  14. // Code implements the compress.Codec interface.
  15. func (c *Codec) Code() int8 { return 3 }
  16. // Name implements the compress.Codec interface.
  17. func (c *Codec) Name() string { return "lz4" }
  18. // NewReader implements the compress.Codec interface.
  19. func (c *Codec) NewReader(r io.Reader) io.ReadCloser {
  20. z, _ := readerPool.Get().(*lz4.Reader)
  21. if z != nil {
  22. z.Reset(r)
  23. } else {
  24. z = lz4.NewReader(r)
  25. }
  26. return &reader{Reader: z}
  27. }
  28. // NewWriter implements the compress.Codec interface.
  29. func (c *Codec) NewWriter(w io.Writer) io.WriteCloser {
  30. z, _ := writerPool.Get().(*lz4.Writer)
  31. if z != nil {
  32. z.Reset(w)
  33. } else {
  34. z = lz4.NewWriter(w)
  35. }
  36. return &writer{Writer: z}
  37. }
  38. type reader struct{ *lz4.Reader }
  39. func (r *reader) Close() (err error) {
  40. if z := r.Reader; z != nil {
  41. r.Reader = nil
  42. z.Reset(nil)
  43. readerPool.Put(z)
  44. }
  45. return
  46. }
  47. type writer struct{ *lz4.Writer }
  48. func (w *writer) Close() (err error) {
  49. if z := w.Writer; z != nil {
  50. w.Writer = nil
  51. err = z.Close()
  52. z.Reset(nil)
  53. writerPool.Put(z)
  54. }
  55. return
  56. }