compress.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package datahub
  2. import (
  3. "bytes"
  4. "compress/zlib"
  5. "errors"
  6. "github.com/pierrec/lz4"
  7. "io"
  8. )
  9. // compress type
  10. type CompressorType string
  11. const (
  12. NOCOMPRESS CompressorType = ""
  13. LZ4 CompressorType = "lz4"
  14. DEFLATE CompressorType = "deflate"
  15. ZLIB CompressorType = "zlib"
  16. )
  17. // validate that the type is valid
  18. func validateCompressorType(ct CompressorType) bool {
  19. switch ct {
  20. case NOCOMPRESS, LZ4, DEFLATE, ZLIB:
  21. return true
  22. }
  23. return false
  24. }
  25. func getCompressTypeFromValue(value int) CompressorType {
  26. switch value {
  27. case 0:
  28. return NOCOMPRESS
  29. case 1:
  30. return DEFLATE
  31. case 2:
  32. return LZ4
  33. case 3:
  34. return ZLIB
  35. default:
  36. return NOCOMPRESS
  37. }
  38. }
  39. func (ct *CompressorType) String() string {
  40. return string(*ct)
  41. }
  42. func (ct *CompressorType) toValue() int {
  43. switch *ct {
  44. case NOCOMPRESS:
  45. return 0
  46. case DEFLATE:
  47. return 1
  48. case LZ4:
  49. return 2
  50. case ZLIB:
  51. return 3
  52. default:
  53. return 0
  54. }
  55. }
  56. // Compressor is a interface for the compress
  57. type compressor interface {
  58. Compress(data []byte) ([]byte, error)
  59. DeCompress(data []byte, rawSize int64) ([]byte, error)
  60. }
  61. type lz4Compressor struct {
  62. }
  63. func (lc *lz4Compressor) Compress(data []byte) ([]byte, error) {
  64. if len(data) == 0 {
  65. return nil, nil
  66. }
  67. buf := make([]byte, len(data))
  68. ht := make([]int, 64<<10) // buffer for the compression table
  69. n, err := lz4.CompressBlock(data, buf, ht)
  70. if err != nil {
  71. return nil, err
  72. }
  73. if n >= len(data) || n == 0 {
  74. return nil, errors.New("data is not compressible")
  75. }
  76. buf = buf[:n] // compressed data
  77. return buf, nil
  78. }
  79. /*func (lc *Lz4Compressor) Compress(data []byte) ([]byte, error) {
  80. buf := bytes.NewBuffer(nil)
  81. writer := lz4.NewWriter(buf)
  82. defer writer.Close()
  83. // 写入待压缩内容
  84. if _, err := writer.Write(data); err != nil {
  85. return nil, err
  86. }
  87. if err := writer.Flush(); err != nil {
  88. return nil, err
  89. }
  90. return buf.Bytes(), nil
  91. }*/
  92. /*func (lc *Lz4Compressor) DeCompress(data []byte, rawSize int64) ([]byte, error) {
  93. //get the maximum size of data when not compressible.
  94. buffer := bytes.NewBuffer(data)
  95. reader := lz4.NewReader(buffer)
  96. buf, err := ioutil.ReadAll(reader)
  97. if err != nil {
  98. return nil, err
  99. }
  100. return buf, nil
  101. }*/
  102. func (lc *lz4Compressor) DeCompress(data []byte, rawSize int64) ([]byte, error) {
  103. // Allocated a very large buffer for decompression.
  104. buf := make([]byte, rawSize)
  105. _, err := lz4.UncompressBlock(data, buf)
  106. if err != nil {
  107. return nil, err
  108. }
  109. return buf, nil
  110. }
  111. type deflateCompressor struct {
  112. }
  113. /*func (dc *DeflateCompressor) Compress(data []byte) ([]byte, error) {
  114. // 一个缓存区压缩的内容
  115. buf := bytes.NewBuffer(nil)
  116. // 创建一个flate.Writer
  117. deflateWriter, err := flate.NewWriter(buf, flate.DefaultCompression)
  118. if err != nil {
  119. return nil, err
  120. }
  121. defer deflateWriter.Close()
  122. // 写入待压缩内容
  123. if _, err := deflateWriter.Write(data); err != nil {
  124. return nil, err
  125. }
  126. if err := deflateWriter.Flush(); err != nil {
  127. return nil, err
  128. }
  129. return buf.Bytes(), nil
  130. }*/
  131. /*func (dc *DeflateCompressor)Compress(data []byte) ([]byte, error) {
  132. var bufs bytes.Buffer
  133. w, _ := flate.NewWriter(&bufs, flate.DefaultCompression)
  134. if _, err := w.Write([]byte(data)); err != nil {
  135. return nil, err
  136. }
  137. if err := w.Flush(); err != nil {
  138. return nil, err
  139. }
  140. defer w.Close()
  141. return bufs.Bytes(), nil
  142. }*/
  143. func (dc *deflateCompressor) Compress(data []byte) ([]byte, error) {
  144. var buf bytes.Buffer
  145. w := zlib.NewWriter(&buf)
  146. if _, err := w.Write(data); err != nil {
  147. return nil, err
  148. }
  149. if err := w.Close(); err != nil {
  150. return nil, err
  151. }
  152. return buf.Bytes(), nil
  153. }
  154. /*func (dc *DeflateCompressor)DeCompress(data []byte,rawSize int64)([]byte,error) {
  155. r :=flate.NewReader(bytes.NewReader(data))
  156. defer r.Close()
  157. out, err := ioutil.ReadAll(r)
  158. if err !=nil {
  159. return nil,err
  160. }
  161. return out,nil
  162. //fmt.Println(out)
  163. }*/
  164. /*func (dc *DeflateCompressor) DeCompress(data []byte, rawSize int64) ([]byte, error) {
  165. buffer := bytes.NewBuffer(data)
  166. deflateReader := flate.NewReader(buffer)
  167. defer deflateReader.Close()
  168. buf := bytes.NewBuffer(nil)
  169. if _, err := io.CopyN(buf, deflateReader, rawSize); err != nil {
  170. return nil, err
  171. }
  172. return buf.Bytes(), nil
  173. }*/
  174. func (dc *deflateCompressor) DeCompress(data []byte, rawSize int64) ([]byte, error) {
  175. b := bytes.NewReader(data)
  176. var buf bytes.Buffer
  177. r, _ := zlib.NewReader(b)
  178. if _, err := io.Copy(&buf, r); err != nil {
  179. return nil, err
  180. }
  181. return buf.Bytes(), nil
  182. }
  183. type zlibCompressor struct {
  184. }
  185. func (zc *zlibCompressor) Compress(data []byte) ([]byte, error) {
  186. var buf bytes.Buffer
  187. w := zlib.NewWriter(&buf)
  188. if _, err := w.Write(data); err != nil {
  189. return nil, err
  190. }
  191. if err := w.Close(); err != nil {
  192. return nil, err
  193. }
  194. return buf.Bytes(), nil
  195. }
  196. func (zc *zlibCompressor) DeCompress(data []byte, rawSize int64) ([]byte, error) {
  197. b := bytes.NewReader(data)
  198. var buf bytes.Buffer
  199. r, _ := zlib.NewReader(b)
  200. if _, err := io.Copy(&buf, r); err != nil {
  201. return nil, err
  202. }
  203. return buf.Bytes(), nil
  204. }
  205. var compressorMap map[CompressorType]compressor = map[CompressorType]compressor{
  206. LZ4: &lz4Compressor{},
  207. DEFLATE: &deflateCompressor{},
  208. ZLIB: &zlibCompressor{},
  209. }
  210. func newCompressor(c CompressorType) compressor {
  211. switch CompressorType(c) {
  212. case LZ4:
  213. return &lz4Compressor{}
  214. case DEFLATE:
  215. return &deflateCompressor{}
  216. case ZLIB:
  217. return &zlibCompressor{}
  218. default:
  219. return nil
  220. }
  221. }
  222. func getCompressor(c CompressorType) compressor {
  223. if c == NOCOMPRESS {
  224. return nil
  225. }
  226. ret, ok := compressorMap[c]
  227. if !ok {
  228. com := newCompressor(c)
  229. compressorMap[c] = com
  230. }
  231. return ret
  232. }