snappy.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package snappy
  2. import (
  3. "io"
  4. "sync"
  5. "github.com/golang/snappy"
  6. )
  7. // Framing is an enumeration type used to enable or disable xerial framing of
  8. // snappy messages.
  9. type Framing int
  10. const (
  11. Framed Framing = iota
  12. Unframed
  13. )
  14. var (
  15. readerPool sync.Pool
  16. writerPool sync.Pool
  17. )
  18. // Codec is the implementation of a compress.Codec which supports creating
  19. // readers and writers for kafka messages compressed with snappy.
  20. type Codec struct {
  21. // An optional framing to apply to snappy compression.
  22. //
  23. // Default to Framed.
  24. Framing Framing
  25. }
  26. // Code implements the compress.Codec interface.
  27. func (c *Codec) Code() int8 { return 2 }
  28. // Name implements the compress.Codec interface.
  29. func (c *Codec) Name() string { return "snappy" }
  30. // NewReader implements the compress.Codec interface.
  31. func (c *Codec) NewReader(r io.Reader) io.ReadCloser {
  32. x, _ := readerPool.Get().(*xerialReader)
  33. if x != nil {
  34. x.Reset(r)
  35. } else {
  36. x = &xerialReader{
  37. reader: r,
  38. decode: snappy.Decode,
  39. }
  40. }
  41. return &reader{xerialReader: x}
  42. }
  43. // NewWriter implements the compress.Codec interface.
  44. func (c *Codec) NewWriter(w io.Writer) io.WriteCloser {
  45. x, _ := writerPool.Get().(*xerialWriter)
  46. if x != nil {
  47. x.Reset(w)
  48. } else {
  49. x = &xerialWriter{
  50. writer: w,
  51. encode: snappy.Encode,
  52. }
  53. }
  54. x.framed = c.Framing == Framed
  55. return &writer{xerialWriter: x}
  56. }
  57. type reader struct{ *xerialReader }
  58. func (r *reader) Close() (err error) {
  59. if x := r.xerialReader; x != nil {
  60. r.xerialReader = nil
  61. x.Reset(nil)
  62. readerPool.Put(x)
  63. }
  64. return
  65. }
  66. type writer struct{ *xerialWriter }
  67. func (w *writer) Close() (err error) {
  68. if x := w.xerialWriter; x != nil {
  69. w.xerialWriter = nil
  70. err = x.Flush()
  71. x.Reset(nil)
  72. writerPool.Put(x)
  73. }
  74. return
  75. }