xerial.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. package snappy
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "io"
  6. "github.com/golang/snappy"
  7. )
  8. const defaultBufferSize = 32 * 1024
  9. // An implementation of io.Reader which consumes a stream of xerial-framed
  10. // snappy-encoeded data. The framing is optional, if no framing is detected
  11. // the reader will simply forward the bytes from its underlying stream.
  12. type xerialReader struct {
  13. reader io.Reader
  14. header [16]byte
  15. input []byte
  16. output []byte
  17. offset int64
  18. nbytes int64
  19. decode func([]byte, []byte) ([]byte, error)
  20. }
  21. func (x *xerialReader) Reset(r io.Reader) {
  22. x.reader = r
  23. x.input = x.input[:0]
  24. x.output = x.output[:0]
  25. x.header = [16]byte{}
  26. x.offset = 0
  27. x.nbytes = 0
  28. }
  29. func (x *xerialReader) Read(b []byte) (int, error) {
  30. for {
  31. if x.offset < int64(len(x.output)) {
  32. n := copy(b, x.output[x.offset:])
  33. x.offset += int64(n)
  34. return n, nil
  35. }
  36. n, err := x.readChunk(b)
  37. if err != nil {
  38. return 0, err
  39. }
  40. if n > 0 {
  41. return n, nil
  42. }
  43. }
  44. }
  45. func (x *xerialReader) WriteTo(w io.Writer) (int64, error) {
  46. wn := int64(0)
  47. for {
  48. for x.offset < int64(len(x.output)) {
  49. n, err := w.Write(x.output[x.offset:])
  50. wn += int64(n)
  51. x.offset += int64(n)
  52. if err != nil {
  53. return wn, err
  54. }
  55. }
  56. if _, err := x.readChunk(nil); err != nil {
  57. if err == io.EOF {
  58. err = nil
  59. }
  60. return wn, err
  61. }
  62. }
  63. }
  64. func (x *xerialReader) readChunk(dst []byte) (int, error) {
  65. x.output = x.output[:0]
  66. x.offset = 0
  67. prefix := 0
  68. if x.nbytes == 0 {
  69. n, err := x.readFull(x.header[:])
  70. if err != nil && n == 0 {
  71. return 0, err
  72. }
  73. prefix = n
  74. }
  75. if isXerialHeader(x.header[:]) {
  76. if cap(x.input) < 4 {
  77. x.input = make([]byte, 4, defaultBufferSize)
  78. } else {
  79. x.input = x.input[:4]
  80. }
  81. _, err := x.readFull(x.input)
  82. if err != nil {
  83. return 0, err
  84. }
  85. frame := int(binary.BigEndian.Uint32(x.input))
  86. if cap(x.input) < frame {
  87. x.input = make([]byte, frame, align(frame, defaultBufferSize))
  88. } else {
  89. x.input = x.input[:frame]
  90. }
  91. if _, err := x.readFull(x.input); err != nil {
  92. return 0, err
  93. }
  94. } else {
  95. if cap(x.input) == 0 {
  96. x.input = make([]byte, 0, defaultBufferSize)
  97. } else {
  98. x.input = x.input[:0]
  99. }
  100. if prefix > 0 {
  101. x.input = append(x.input, x.header[:prefix]...)
  102. }
  103. for {
  104. if len(x.input) == cap(x.input) {
  105. b := make([]byte, len(x.input), 2*cap(x.input))
  106. copy(b, x.input)
  107. x.input = b
  108. }
  109. n, err := x.read(x.input[len(x.input):cap(x.input)])
  110. x.input = x.input[:len(x.input)+n]
  111. if err != nil {
  112. if err == io.EOF && len(x.input) > 0 {
  113. break
  114. }
  115. return 0, err
  116. }
  117. }
  118. }
  119. var n int
  120. var err error
  121. if x.decode == nil {
  122. x.output, x.input, err = x.input, x.output, nil
  123. } else if n, err = snappy.DecodedLen(x.input); n <= len(dst) && err == nil {
  124. // If the output buffer is large enough to hold the decode value,
  125. // write it there directly instead of using the intermediary output
  126. // buffer.
  127. _, err = x.decode(dst, x.input)
  128. } else {
  129. var b []byte
  130. n = 0
  131. b, err = x.decode(x.output[:cap(x.output)], x.input)
  132. if err == nil {
  133. x.output = b
  134. }
  135. }
  136. return n, err
  137. }
  138. func (x *xerialReader) read(b []byte) (int, error) {
  139. n, err := x.reader.Read(b)
  140. x.nbytes += int64(n)
  141. return n, err
  142. }
  143. func (x *xerialReader) readFull(b []byte) (int, error) {
  144. n, err := io.ReadFull(x.reader, b)
  145. x.nbytes += int64(n)
  146. return n, err
  147. }
  148. // An implementation of a xerial-framed snappy-encoded output stream.
  149. // Each Write made to the writer is framed with a xerial header.
  150. type xerialWriter struct {
  151. writer io.Writer
  152. header [16]byte
  153. input []byte
  154. output []byte
  155. nbytes int64
  156. framed bool
  157. encode func([]byte, []byte) []byte
  158. }
  159. func (x *xerialWriter) Reset(w io.Writer) {
  160. x.writer = w
  161. x.input = x.input[:0]
  162. x.output = x.output[:0]
  163. x.nbytes = 0
  164. }
  165. func (x *xerialWriter) ReadFrom(r io.Reader) (int64, error) {
  166. wn := int64(0)
  167. if cap(x.input) == 0 {
  168. x.input = make([]byte, 0, defaultBufferSize)
  169. }
  170. for {
  171. if x.full() {
  172. x.grow()
  173. }
  174. n, err := r.Read(x.input[len(x.input):cap(x.input)])
  175. wn += int64(n)
  176. x.input = x.input[:len(x.input)+n]
  177. if x.fullEnough() {
  178. if err := x.Flush(); err != nil {
  179. return wn, err
  180. }
  181. }
  182. if err != nil {
  183. if err == io.EOF {
  184. err = nil
  185. }
  186. return wn, err
  187. }
  188. }
  189. }
  190. func (x *xerialWriter) Write(b []byte) (int, error) {
  191. wn := 0
  192. if cap(x.input) == 0 {
  193. x.input = make([]byte, 0, defaultBufferSize)
  194. }
  195. for len(b) > 0 {
  196. if x.full() {
  197. x.grow()
  198. }
  199. n := copy(x.input[len(x.input):cap(x.input)], b)
  200. b = b[n:]
  201. wn += n
  202. x.input = x.input[:len(x.input)+n]
  203. if x.fullEnough() {
  204. if err := x.Flush(); err != nil {
  205. return wn, err
  206. }
  207. }
  208. }
  209. return wn, nil
  210. }
  211. func (x *xerialWriter) Flush() error {
  212. if len(x.input) == 0 {
  213. return nil
  214. }
  215. var b []byte
  216. if x.encode == nil {
  217. b = x.input
  218. } else {
  219. x.output = x.encode(x.output[:cap(x.output)], x.input)
  220. b = x.output
  221. }
  222. x.input = x.input[:0]
  223. x.output = x.output[:0]
  224. if x.framed && x.nbytes == 0 {
  225. writeXerialHeader(x.header[:])
  226. _, err := x.write(x.header[:])
  227. if err != nil {
  228. return err
  229. }
  230. }
  231. if x.framed {
  232. writeXerialFrame(x.header[:4], len(b))
  233. _, err := x.write(x.header[:4])
  234. if err != nil {
  235. return err
  236. }
  237. }
  238. _, err := x.write(b)
  239. return err
  240. }
  241. func (x *xerialWriter) write(b []byte) (int, error) {
  242. n, err := x.writer.Write(b)
  243. x.nbytes += int64(n)
  244. return n, err
  245. }
  246. func (x *xerialWriter) full() bool {
  247. return len(x.input) == cap(x.input)
  248. }
  249. func (x *xerialWriter) fullEnough() bool {
  250. return x.framed && (cap(x.input)-len(x.input)) < 1024
  251. }
  252. func (x *xerialWriter) grow() {
  253. tmp := make([]byte, len(x.input), 2*cap(x.input))
  254. copy(tmp, x.input)
  255. x.input = tmp
  256. }
  257. func align(n, a int) int {
  258. if (n % a) == 0 {
  259. return n
  260. }
  261. return ((n / a) + 1) * a
  262. }
  263. var (
  264. xerialHeader = [...]byte{130, 83, 78, 65, 80, 80, 89, 0}
  265. xerialVersionInfo = [...]byte{0, 0, 0, 1, 0, 0, 0, 1}
  266. )
  267. func isXerialHeader(src []byte) bool {
  268. return len(src) >= 16 && bytes.Equal(src[:8], xerialHeader[:])
  269. }
  270. func writeXerialHeader(b []byte) {
  271. copy(b[:8], xerialHeader[:])
  272. copy(b[8:], xerialVersionInfo[:])
  273. }
  274. func writeXerialFrame(b []byte, n int) {
  275. binary.BigEndian.PutUint32(b, uint32(n))
  276. }