123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- var EventEmitter = require('events').EventEmitter;
- var util = require('util');
- var stream = require('stream');
- function Lazy(em, opts) {
- if (!(this instanceof Lazy)) return new Lazy(em, opts);
- EventEmitter.call(this);
- var self = this;
- self.once = function (name, f) {
- self.on(name, function g () {
- self.removeListener(name, g);
- f.apply(this, arguments);
- });
- }
- if (!opts) opts = {};
- var dataName = opts.data || 'data';
- var pipeName = opts.pipe || 'pipe';
- var endName = opts.pipe || 'end';
- if (pipeName != endName) {
- var piped = false;
- self.once(pipeName, function () { piped = true });
- self.once(endName, function () {
- if (!piped) self.emit(pipeName);
- });
- }
- self.push = function (x) {
- self.emit(dataName, x);
- }
- self.end = function () {
- self.emit(endName);
- }
- if (em && em.on) {
- em.on(endName, function () {
- self.emit(endName);
- });
- self.on(pipeName, function () {
- em.emit(pipeName);
- });
- // Check for v0.10 or Greater (Stream2 has Duplex type)
- if (stream.Duplex && em instanceof(stream)) {
- em.on('readable', function () {
- var x = em.read();
- self.emit(dataName, x);
- });
- } else {
- // Old Stream1 or Event support
- em.on(dataName, function (x) {
- self.emit(dataName, x);
- });
- }
- }
- function newLazy (g, h, l) {
- if (!g) {
- g = function () {
- return true;
- };
- }
- if (!h) {
- h = function (x) {
- return x;
- };
- }
- var lazy = new Lazy(null, opts, l);
- self.on(dataName, function (x, y) {
- if (g.call(lazy, x)) {
- lazy.emit(dataName, h(x), y);
- }
- });
- self.once(pipeName, function () {
- lazy.emit(pipeName);
- });
- return lazy;
- }
- self.filter = function (f) {
- return newLazy(function (x) {
- return f(x);
- });
- }
- self.forEach = function (f) {
- return newLazy(function (x) {
- f(x);
- return true;
- });
- }
- self.map = function (f) {
- return newLazy(
- function () { return true },
- function (x) { return f(x) }
- );
- }
- self.head = function (f) {
- var lazy = newLazy();
- lazy.on(dataName, function g (x) {
- f(x)
- lazy.removeListener(dataName, g)
- })
- }
- self.tail = function () {
- var skip = true;
- return newLazy(function () {
- if (skip) {
- skip = false;
- return false;
- }
- return true;
- });
- }
- self.skip = function (n) {
- return newLazy(function () {
- if (n > 0) {
- n--;
- return false;
- }
- return true;
- });
- }
- self.take = function (n) {
- return newLazy(function () {
- if (n == 0) self.emit(pipeName);
- return n-- > 0;
- });
- }
- self.takeWhile = function (f) {
- var cond = true;
- return newLazy(function (x) {
- if (cond && f(x)) return true;
- cond = false;
- self.emit(pipeName);
- return false;
- });
- }
- self.foldr = function (op, i, f) {
- var acc = i;
- var lazy = newLazy();
- lazy.on(dataName, function g (x) {
- acc = op(x, acc);
- });
- lazy.once(pipeName, function () {
- f(acc);
- });
- }
- self.sum = function (f) {
- return self.foldr(function (x, acc) { return x + acc }, 0, f);
- }
- self.product = function (f) {
- return self.foldr(function (x, acc) { return x*acc }, 1, f);
- }
- self.join = function (f) {
- var data = []
- var lazy = newLazy(function (x) {
- data.push(x);
- return true;
- });
- lazy.once(pipeName, function () { f(data) });
- return self;
- }
- self.bucket = function (init, f) {
- var lazy = new Lazy(null, opts);
- var yieldTo = function (x) {
- lazy.emit(dataName, x);
- };
- var acc = init;
- self.on(dataName, function (x) {
- acc = f.call(yieldTo, acc, x);
- });
- self.once(pipeName, function () {
- lazy.emit(pipeName);
- });
- // flush on end event
- self.once(endName, function () {
- var finalBuffer = mergeBuffers(acc);
- if (finalBuffer) {
- yieldTo(finalBuffer);
- }
- });
- return lazy;
- }
- // Streams that use this should emit strings or buffers only
- self.__defineGetter__('lines', function () {
- return self.bucket([], function (chunkArray, chunk) {
- var newline = '\n'.charCodeAt(0), lastNewLineIndex = 0;
- if (typeof chunk === 'string') chunk = new Buffer(chunk);
- if (chunk){
- for (var i = 0; i < chunk.length; i++) {
- if (chunk[i] === newline) {
- // If we have content from the current chunk to append to our buffers, do it.
- if (i > 0) {
- chunkArray.push(chunk.slice(lastNewLineIndex, i));
- }
- // Wrap all our buffers and emit it.
- this(mergeBuffers(chunkArray));
- lastNewLineIndex = i + 1;
- }
- }
- }
- if (lastNewLineIndex > 0) {
- // New line found in the chunk, push the remaining part of the buffer.
- if (lastNewLineIndex < chunk.length) {
- chunkArray.push(chunk.slice(lastNewLineIndex));
- }
- } else {
- // No new line found, push the whole buffer.
- if (chunk && chunk.length) {
- chunkArray.push(chunk);
- }
- }
- return chunkArray;
- });
- });
- }
- Lazy.range = function () {
- var args = arguments;
- var step = 1;
- var infinite = false;
- if (args.length == 1 && typeof args[0] == 'number') {
- var i = 0, j = args[0];
- }
- else if (args.length == 1 && typeof args[0] == 'string') { // 'start[,next]..[end]'
- var arg = args[0];
- var startOpen = false, endClosed = false;
- if (arg[0] == '(' || arg[0] == '[') {
- if (arg[0] == '(') startOpen = true;
- arg = arg.slice(1);
- }
- if (arg.slice(-1) == ']') endClosed = true;
- var parts = arg.split('..');
- if (parts.length != 2)
- throw new Error("single argument range takes 'start..' or 'start..end' or 'start,next..end'");
- if (parts[1] == '') { // 'start..'
- var i = parts[0];
- infinite = true;
- }
- else { // 'start[,next]..end'
- var progression = parts[0].split(',');
- if (progression.length == 1) { // start..end
- var i = parts[0], j = parts[1];
- }
- else { // 'start,next..end'
- var i = progression[0], j = parts[1];
- step = Math.abs(progression[1]-i);
- }
- }
- i = parseInt(i, 10);
- j = parseInt(j, 10);
- if (startOpen) {
- if (infinite || i < j) i++;
- else i--;
- }
- if (endClosed) {
- if (i < j) j++;
- else j--;
- }
- }
- else if (args.length == 2 || args.length == 3) { // start, end[, step]
- var i = args[0], j = args[1];
- if (args.length == 3) {
- var step = args[2];
- }
- }
- else {
- throw new Error("range takes 1, 2 or 3 arguments");
- }
- var lazy = new Lazy;
- var stopInfinite = false;
- lazy.on('pipe', function () {
- stopInfinite = true;
- });
- if (infinite) {
- process.nextTick(function g () {
- if (stopInfinite) return;
- lazy.emit('data', i++);
- process.nextTick(g);
- });
- }
- else {
- process.nextTick(function () {
- if (i < j) {
- for (; i<j; i+=step) {
- lazy.emit('data', i)
- }
- }
- else {
- for (; i>j; i-=step) {
- lazy.emit('data', i)
- }
- }
- lazy.emit('end');
- });
- }
- return lazy;
- }
- var mergeBuffers = function mergeBuffers(buffers) {
- // We expect buffers to be a non-empty Array
- if (!buffers || !Array.isArray(buffers) || !buffers.length) return;
- var finalBufferLength, finalBuffer, currentBuffer, currentSize = 0;
- // Sum all the buffers lengths
- finalBufferLength = buffers.reduce(function(left, right) { return (left.length||left) + (right.length||right); }, 0);
- finalBuffer = new Buffer(finalBufferLength);
- while(buffers.length) {
- currentBuffer = buffers.shift();
- currentBuffer.copy(finalBuffer, currentSize);
- currentSize += currentBuffer.length;
- }
- return finalBuffer;
- }
- util.inherits(Lazy, EventEmitter);
- module.exports = Lazy;
|