123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268 |
- package datahub
- import (
- "bytes"
- "compress/zlib"
- "errors"
- "github.com/pierrec/lz4"
- "io"
- )
- // compress type
- type CompressorType string
- const (
- NOCOMPRESS CompressorType = ""
- LZ4 CompressorType = "lz4"
- DEFLATE CompressorType = "deflate"
- ZLIB CompressorType = "zlib"
- )
- // validate that the type is valid
- func validateCompressorType(ct CompressorType) bool {
- switch ct {
- case NOCOMPRESS, LZ4, DEFLATE, ZLIB:
- return true
- }
- return false
- }
- func getCompressTypeFromValue(value int) CompressorType {
- switch value {
- case 0:
- return NOCOMPRESS
- case 1:
- return DEFLATE
- case 2:
- return LZ4
- case 3:
- return ZLIB
- default:
- return NOCOMPRESS
- }
- }
- func (ct *CompressorType) String() string {
- return string(*ct)
- }
- func (ct *CompressorType) toValue() int {
- switch *ct {
- case NOCOMPRESS:
- return 0
- case DEFLATE:
- return 1
- case LZ4:
- return 2
- case ZLIB:
- return 3
- default:
- return 0
- }
- }
- // Compressor is a interface for the compress
- type compressor interface {
- Compress(data []byte) ([]byte, error)
- DeCompress(data []byte, rawSize int64) ([]byte, error)
- }
- type lz4Compressor struct {
- }
- func (lc *lz4Compressor) Compress(data []byte) ([]byte, error) {
- if len(data) == 0 {
- return nil, nil
- }
- buf := make([]byte, len(data))
- ht := make([]int, 64<<10) // buffer for the compression table
- n, err := lz4.CompressBlock(data, buf, ht)
- if err != nil {
- return nil, err
- }
- if n >= len(data) || n == 0 {
- return nil, errors.New("data is not compressible")
- }
- buf = buf[:n] // compressed data
- return buf, nil
- }
- /*func (lc *Lz4Compressor) Compress(data []byte) ([]byte, error) {
- buf := bytes.NewBuffer(nil)
- writer := lz4.NewWriter(buf)
- defer writer.Close()
- // 写入待压缩内容
- if _, err := writer.Write(data); err != nil {
- return nil, err
- }
- if err := writer.Flush(); err != nil {
- return nil, err
- }
- return buf.Bytes(), nil
- }*/
- /*func (lc *Lz4Compressor) DeCompress(data []byte, rawSize int64) ([]byte, error) {
- //get the maximum size of data when not compressible.
- buffer := bytes.NewBuffer(data)
- reader := lz4.NewReader(buffer)
- buf, err := ioutil.ReadAll(reader)
- if err != nil {
- return nil, err
- }
- return buf, nil
- }*/
- func (lc *lz4Compressor) DeCompress(data []byte, rawSize int64) ([]byte, error) {
- // Allocated a very large buffer for decompression.
- buf := make([]byte, rawSize)
- _, err := lz4.UncompressBlock(data, buf)
- if err != nil {
- return nil, err
- }
- return buf, nil
- }
- type deflateCompressor struct {
- }
- /*func (dc *DeflateCompressor) Compress(data []byte) ([]byte, error) {
- // 一个缓存区压缩的内容
- buf := bytes.NewBuffer(nil)
- // 创建一个flate.Writer
- deflateWriter, err := flate.NewWriter(buf, flate.DefaultCompression)
- if err != nil {
- return nil, err
- }
- defer deflateWriter.Close()
- // 写入待压缩内容
- if _, err := deflateWriter.Write(data); err != nil {
- return nil, err
- }
- if err := deflateWriter.Flush(); err != nil {
- return nil, err
- }
- return buf.Bytes(), nil
- }*/
- /*func (dc *DeflateCompressor)Compress(data []byte) ([]byte, error) {
- var bufs bytes.Buffer
- w, _ := flate.NewWriter(&bufs, flate.DefaultCompression)
- if _, err := w.Write([]byte(data)); err != nil {
- return nil, err
- }
- if err := w.Flush(); err != nil {
- return nil, err
- }
- defer w.Close()
- return bufs.Bytes(), nil
- }*/
- func (dc *deflateCompressor) Compress(data []byte) ([]byte, error) {
- var buf bytes.Buffer
- w := zlib.NewWriter(&buf)
- if _, err := w.Write(data); err != nil {
- return nil, err
- }
- if err := w.Close(); err != nil {
- return nil, err
- }
- return buf.Bytes(), nil
- }
- /*func (dc *DeflateCompressor)DeCompress(data []byte,rawSize int64)([]byte,error) {
- r :=flate.NewReader(bytes.NewReader(data))
- defer r.Close()
- out, err := ioutil.ReadAll(r)
- if err !=nil {
- return nil,err
- }
- return out,nil
- //fmt.Println(out)
- }*/
- /*func (dc *DeflateCompressor) DeCompress(data []byte, rawSize int64) ([]byte, error) {
- buffer := bytes.NewBuffer(data)
- deflateReader := flate.NewReader(buffer)
- defer deflateReader.Close()
- buf := bytes.NewBuffer(nil)
- if _, err := io.CopyN(buf, deflateReader, rawSize); err != nil {
- return nil, err
- }
- return buf.Bytes(), nil
- }*/
- func (dc *deflateCompressor) DeCompress(data []byte, rawSize int64) ([]byte, error) {
- b := bytes.NewReader(data)
- var buf bytes.Buffer
- r, _ := zlib.NewReader(b)
- if _, err := io.Copy(&buf, r); err != nil {
- return nil, err
- }
- return buf.Bytes(), nil
- }
- type zlibCompressor struct {
- }
- func (zc *zlibCompressor) Compress(data []byte) ([]byte, error) {
- var buf bytes.Buffer
- w := zlib.NewWriter(&buf)
- if _, err := w.Write(data); err != nil {
- return nil, err
- }
- if err := w.Close(); err != nil {
- return nil, err
- }
- return buf.Bytes(), nil
- }
- func (zc *zlibCompressor) DeCompress(data []byte, rawSize int64) ([]byte, error) {
- b := bytes.NewReader(data)
- var buf bytes.Buffer
- r, _ := zlib.NewReader(b)
- if _, err := io.Copy(&buf, r); err != nil {
- return nil, err
- }
- return buf.Bytes(), nil
- }
- var compressorMap map[CompressorType]compressor = map[CompressorType]compressor{
- LZ4: &lz4Compressor{},
- DEFLATE: &deflateCompressor{},
- ZLIB: &zlibCompressor{},
- }
- func newCompressor(c CompressorType) compressor {
- switch CompressorType(c) {
- case LZ4:
- return &lz4Compressor{}
- case DEFLATE:
- return &deflateCompressor{}
- case ZLIB:
- return &zlibCompressor{}
- default:
- return nil
- }
- }
- func getCompressor(c CompressorType) compressor {
- if c == NOCOMPRESS {
- return nil
- }
- ret, ok := compressorMap[c]
- if !ok {
- com := newCompressor(c)
- compressorMap[c] = com
- }
- return ret
- }
|