1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- package snappy
- import (
- "io"
- "sync"
- "github.com/golang/snappy"
- )
- // Framing is an enumeration type used to enable or disable xerial framing of
- // snappy messages.
- type Framing int
- const (
- Framed Framing = iota
- Unframed
- )
- var (
- readerPool sync.Pool
- writerPool sync.Pool
- )
- // Codec is the implementation of a compress.Codec which supports creating
- // readers and writers for kafka messages compressed with snappy.
- type Codec struct {
- // An optional framing to apply to snappy compression.
- //
- // Default to Framed.
- Framing Framing
- }
- // Code implements the compress.Codec interface.
- func (c *Codec) Code() int8 { return 2 }
- // Name implements the compress.Codec interface.
- func (c *Codec) Name() string { return "snappy" }
- // NewReader implements the compress.Codec interface.
- func (c *Codec) NewReader(r io.Reader) io.ReadCloser {
- x, _ := readerPool.Get().(*xerialReader)
- if x != nil {
- x.Reset(r)
- } else {
- x = &xerialReader{
- reader: r,
- decode: snappy.Decode,
- }
- }
- return &reader{xerialReader: x}
- }
- // NewWriter implements the compress.Codec interface.
- func (c *Codec) NewWriter(w io.Writer) io.WriteCloser {
- x, _ := writerPool.Get().(*xerialWriter)
- if x != nil {
- x.Reset(w)
- } else {
- x = &xerialWriter{
- writer: w,
- encode: snappy.Encode,
- }
- }
- x.framed = c.Framing == Framed
- return &writer{xerialWriter: x}
- }
- type reader struct{ *xerialReader }
- func (r *reader) Close() (err error) {
- if x := r.xerialReader; x != nil {
- r.xerialReader = nil
- x.Reset(nil)
- readerPool.Put(x)
- }
- return
- }
- type writer struct{ *xerialWriter }
- func (w *writer) Close() (err error) {
- if x := w.xerialWriter; x != nil {
- w.xerialWriter = nil
- err = x.Flush()
- x.Reset(nil)
- writerPool.Put(x)
- }
- return
- }
|