gzip.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package gzip
  2. import (
  3. "compress/gzip"
  4. "io"
  5. "sync"
  6. )
  7. var (
  8. readerPool sync.Pool
  9. )
  10. // Codec is the implementation of a compress.Codec which supports creating
  11. // readers and writers for kafka messages compressed with gzip.
  12. type Codec struct {
  13. // The compression level to configure on writers created by this codec.
  14. // Acceptable values are defined in the standard gzip package.
  15. //
  16. // Default to gzip.DefaultCompressionLevel.
  17. Level int
  18. writerPool sync.Pool
  19. }
  20. // Code implements the compress.Codec interface.
  21. func (c *Codec) Code() int8 { return 1 }
  22. // Name implements the compress.Codec interface.
  23. func (c *Codec) Name() string { return "gzip" }
  24. // NewReader implements the compress.Codec interface.
  25. func (c *Codec) NewReader(r io.Reader) io.ReadCloser {
  26. var err error
  27. z, _ := readerPool.Get().(*gzip.Reader)
  28. if z != nil {
  29. err = z.Reset(r)
  30. } else {
  31. z, err = gzip.NewReader(r)
  32. }
  33. if err != nil {
  34. if z != nil {
  35. readerPool.Put(z)
  36. }
  37. return &errorReader{err: err}
  38. }
  39. return &reader{Reader: z}
  40. }
  41. // NewWriter implements the compress.Codec interface.
  42. func (c *Codec) NewWriter(w io.Writer) io.WriteCloser {
  43. x := c.writerPool.Get()
  44. z, _ := x.(*gzip.Writer)
  45. if z == nil {
  46. x, err := gzip.NewWriterLevel(w, c.level())
  47. if err != nil {
  48. return &errorWriter{err: err}
  49. }
  50. z = x
  51. } else {
  52. z.Reset(w)
  53. }
  54. return &writer{codec: c, Writer: z}
  55. }
  56. func (c *Codec) level() int {
  57. if c.Level != 0 {
  58. return c.Level
  59. }
  60. return gzip.DefaultCompression
  61. }
  62. type reader struct{ *gzip.Reader }
  63. func (r *reader) Close() (err error) {
  64. if z := r.Reader; z != nil {
  65. r.Reader = nil
  66. err = z.Close()
  67. // Pass it an empty reader, which is a zero-size value implementing the
  68. // flate.Reader interface to avoid the construction of a bufio.Reader in
  69. // the call to Reset.
  70. //
  71. // Note: we could also not reset the reader at all, but that would cause
  72. // the underlying reader to be retained until the gzip.Reader is freed,
  73. // which may not be desirable.
  74. z.Reset(emptyReader{})
  75. readerPool.Put(z)
  76. }
  77. return
  78. }
  79. type writer struct {
  80. codec *Codec
  81. *gzip.Writer
  82. }
  83. func (w *writer) Close() (err error) {
  84. if z := w.Writer; z != nil {
  85. w.Writer = nil
  86. err = z.Close()
  87. z.Reset(nil)
  88. w.codec.writerPool.Put(z)
  89. }
  90. return
  91. }
  92. type emptyReader struct{}
  93. func (emptyReader) ReadByte() (byte, error) { return 0, io.EOF }
  94. func (emptyReader) Read([]byte) (int, error) { return 0, io.EOF }
  95. type errorReader struct{ err error }
  96. func (r *errorReader) Close() error { return r.err }
  97. func (r *errorReader) Read([]byte) (int, error) { return 0, r.err }
  98. type errorWriter struct{ err error }
  99. func (w *errorWriter) Close() error { return w.err }
  100. func (w *errorWriter) Write([]byte) (int, error) { return 0, w.err }