stream.js 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. /**
  2. * Module dependencies.
  3. */
  4. var Stream = require('stream').Writable;
  5. var encode = require('./encode');
  6. /**
  7. * Expose parser.
  8. */
  9. module.exports = Parser;
  10. /**
  11. * Initialize parser.
  12. *
  13. * @param {Options} [opts]
  14. * @api public
  15. */
  16. function Parser(opts) {
  17. Stream.call(this, opts);
  18. this.state = 'message';
  19. this._lenbuf = new Buffer(4);
  20. }
  21. /**
  22. * Inherit from `Stream.prototype`.
  23. */
  24. Parser.prototype.__proto__ = Stream.prototype;
  25. /**
  26. * Write implementation.
  27. */
  28. Parser.prototype._write = function(chunk, encoding, fn){
  29. for (var i = 0; i < chunk.length; i++) {
  30. switch (this.state) {
  31. case 'message':
  32. var meta = chunk[i];
  33. this.version = meta >> 4;
  34. this.argv = meta & 0xf;
  35. this.state = 'arglen';
  36. this._bufs = [new Buffer([meta])];
  37. this._nargs = 0;
  38. this._leni = 0;
  39. break;
  40. case 'arglen':
  41. this._lenbuf[this._leni++] = chunk[i];
  42. // done
  43. if (4 == this._leni) {
  44. this._arglen = this._lenbuf.readUInt32BE(0);
  45. var buf = new Buffer(4);
  46. buf[0] = this._lenbuf[0];
  47. buf[1] = this._lenbuf[1];
  48. buf[2] = this._lenbuf[2];
  49. buf[3] = this._lenbuf[3];
  50. this._bufs.push(buf);
  51. this._argcur = 0;
  52. this.state = 'arg';
  53. }
  54. break;
  55. case 'arg':
  56. // bytes remaining in the argument
  57. var rem = this._arglen - this._argcur;
  58. // consume the chunk we need to complete
  59. // the argument, or the remainder of the
  60. // chunk if it's not mixed-boundary
  61. var pos = Math.min(rem + i, chunk.length);
  62. // slice arg chunk
  63. var part = chunk.slice(i, pos);
  64. this._bufs.push(part);
  65. // check if we have the complete arg
  66. this._argcur += pos - i;
  67. var done = this._argcur == this._arglen;
  68. i = pos - 1;
  69. if (done) this._nargs++;
  70. // no more args
  71. if (this._nargs == this.argv) {
  72. this.state = 'message';
  73. this.emit('data', Buffer.concat(this._bufs));
  74. break;
  75. }
  76. if (done) {
  77. this.state = 'arglen';
  78. this._leni = 0;
  79. }
  80. break;
  81. }
  82. }
  83. fn();
  84. };