package compress import ( "io" "github.com/segmentio/kafka-go/compress/gzip" "github.com/segmentio/kafka-go/compress/lz4" "github.com/segmentio/kafka-go/compress/snappy" "github.com/segmentio/kafka-go/compress/zstd" ) // Compression represents the the compression applied to a record set. type Compression int8 const ( Gzip Compression = 1 Snappy Compression = 2 Lz4 Compression = 3 Zstd Compression = 4 ) func (c Compression) Codec() Codec { if i := int(c); i >= 0 && i < len(Codecs) { return Codecs[i] } return nil } func (c Compression) String() string { if codec := c.Codec(); codec != nil { return codec.Name() } return "uncompressed" } // Codec represents a compression codec to encode and decode the messages. // See : https://cwiki.apache.org/confluence/display/KAFKA/Compression // // A Codec must be safe for concurrent access by multiple go routines. type Codec interface { // Code returns the compression codec code Code() int8 // Human-readable name for the codec. Name() string // Constructs a new reader which decompresses data from r. NewReader(r io.Reader) io.ReadCloser // Constructs a new writer which writes compressed data to w. NewWriter(w io.Writer) io.WriteCloser } var ( // The global gzip codec installed on the Codecs table. GzipCodec gzip.Codec // The global snappy codec installed on the Codecs table. SnappyCodec snappy.Codec // The global lz4 codec installed on the Codecs table. Lz4Codec lz4.Codec // The global zstd codec installed on the Codecs table. ZstdCodec zstd.Codec // The global table of compression codecs supported by the kafka protocol. Codecs = [...]Codec{ Gzip: &GzipCodec, Snappy: &SnappyCodec, Lz4: &Lz4Codec, Zstd: &ZstdCodec, } )