zstd.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. // Package zstd implements Zstandard compression.
  2. package zstd
  3. import (
  4. "io"
  5. "runtime"
  6. "sync"
  7. "github.com/klauspost/compress/zstd"
  8. )
  9. // Codec is the implementation of a compress.Codec which supports creating
  10. // readers and writers for kafka messages compressed with zstd.
  11. type Codec struct {
  12. // The compression level configured on writers created by the codec.
  13. //
  14. // Default to 3.
  15. Level int
  16. encoderPool sync.Pool // *encoder
  17. }
  18. // Code implements the compress.Codec interface.
  19. func (c *Codec) Code() int8 { return 4 }
  20. // Name implements the compress.Codec interface.
  21. func (c *Codec) Name() string { return "zstd" }
  22. // NewReader implements the compress.Codec interface.
  23. func (c *Codec) NewReader(r io.Reader) io.ReadCloser {
  24. p := new(reader)
  25. if dec, _ := decoderPool.Get().(*decoder); dec == nil {
  26. z, err := zstd.NewReader(r)
  27. if err != nil {
  28. p.err = err
  29. } else {
  30. p.dec = &decoder{z}
  31. // We need a finalizer because the reader spawns goroutines
  32. // that will only be stopped if the Close method is called.
  33. runtime.SetFinalizer(p.dec, (*decoder).finalize)
  34. }
  35. } else {
  36. p.dec = dec
  37. p.err = dec.Reset(r)
  38. }
  39. return p
  40. }
  41. func (c *Codec) level() int {
  42. if c.Level != 0 {
  43. return c.Level
  44. }
  45. return 3
  46. }
  47. func (c *Codec) zstdLevel() zstd.EncoderLevel {
  48. return zstd.EncoderLevelFromZstd(c.level())
  49. }
  50. var decoderPool sync.Pool // *decoder
  51. type decoder struct {
  52. *zstd.Decoder
  53. }
  54. func (d *decoder) finalize() {
  55. d.Close()
  56. }
  57. type reader struct {
  58. dec *decoder
  59. err error
  60. }
  61. // Close implements the io.Closer interface.
  62. func (r *reader) Close() error {
  63. if r.dec != nil {
  64. r.dec.Reset(devNull{}) // don't retain the underlying reader
  65. decoderPool.Put(r.dec)
  66. r.dec = nil
  67. r.err = io.ErrClosedPipe
  68. }
  69. return nil
  70. }
  71. // Read implements the io.Reader interface.
  72. func (r *reader) Read(p []byte) (int, error) {
  73. if r.err != nil {
  74. return 0, r.err
  75. }
  76. return r.dec.Read(p)
  77. }
  78. // WriteTo implements the io.WriterTo interface.
  79. func (r *reader) WriteTo(w io.Writer) (int64, error) {
  80. if r.err != nil {
  81. return 0, r.err
  82. }
  83. return r.dec.WriteTo(w)
  84. }
  85. // NewWriter implements the compress.Codec interface.
  86. func (c *Codec) NewWriter(w io.Writer) io.WriteCloser {
  87. p := new(writer)
  88. if enc, _ := c.encoderPool.Get().(*encoder); enc == nil {
  89. z, err := zstd.NewWriter(w, zstd.WithEncoderLevel(c.zstdLevel()))
  90. if err != nil {
  91. p.err = err
  92. } else {
  93. p.enc = &encoder{z}
  94. // We need a finalizer because the writer spawns goroutines
  95. // that will only be stopped if the Close method is called.
  96. runtime.SetFinalizer(p.enc, (*encoder).finalize)
  97. }
  98. } else {
  99. p.enc = enc
  100. p.enc.Reset(w)
  101. }
  102. p.c = c
  103. return p
  104. }
  105. type encoder struct {
  106. *zstd.Encoder
  107. }
  108. func (e *encoder) finalize() {
  109. e.Close()
  110. }
  111. type writer struct {
  112. c *Codec
  113. enc *encoder
  114. err error
  115. }
  116. // Close implements the io.Closer interface.
  117. func (w *writer) Close() error {
  118. if w.enc != nil {
  119. // Close needs to be called to write the end of stream marker and flush
  120. // the buffers. The zstd package documents that the encoder is re-usable
  121. // after being closed.
  122. err := w.enc.Close()
  123. if err != nil {
  124. w.err = err
  125. }
  126. w.enc.Reset(devNull{}) // don't retain the underyling writer
  127. w.c.encoderPool.Put(w.enc)
  128. w.enc = nil
  129. return err
  130. }
  131. return nil
  132. }
  133. // WriteTo implements the io.WriterTo interface.
  134. func (w *writer) Write(p []byte) (int, error) {
  135. if w.err != nil {
  136. return 0, w.err
  137. }
  138. if w.enc == nil {
  139. return 0, io.ErrClosedPipe
  140. }
  141. return w.enc.Write(p)
  142. }
  143. // ReadFrom implements the io.ReaderFrom interface.
  144. func (w *writer) ReadFrom(r io.Reader) (int64, error) {
  145. if w.err != nil {
  146. return 0, w.err
  147. }
  148. if w.enc == nil {
  149. return 0, io.ErrClosedPipe
  150. }
  151. return w.enc.ReadFrom(r)
  152. }
  153. type devNull struct{}
  154. func (devNull) Read([]byte) (int, error) { return 0, io.EOF }
  155. func (devNull) Write([]byte) (int, error) { return 0, nil }