lazy.js 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. var EventEmitter = require('events').EventEmitter;
  2. var util = require('util');
  3. var stream = require('stream');
  4. function Lazy(em, opts) {
  5. if (!(this instanceof Lazy)) return new Lazy(em, opts);
  6. EventEmitter.call(this);
  7. var self = this;
  8. self.once = function (name, f) {
  9. self.on(name, function g () {
  10. self.removeListener(name, g);
  11. f.apply(this, arguments);
  12. });
  13. }
  14. if (!opts) opts = {};
  15. var dataName = opts.data || 'data';
  16. var pipeName = opts.pipe || 'pipe';
  17. var endName = opts.pipe || 'end';
  18. if (pipeName != endName) {
  19. var piped = false;
  20. self.once(pipeName, function () { piped = true });
  21. self.once(endName, function () {
  22. if (!piped) self.emit(pipeName);
  23. });
  24. }
  25. self.push = function (x) {
  26. self.emit(dataName, x);
  27. }
  28. self.end = function () {
  29. self.emit(endName);
  30. }
  31. if (em && em.on) {
  32. em.on(endName, function () {
  33. self.emit(endName);
  34. });
  35. self.on(pipeName, function () {
  36. em.emit(pipeName);
  37. });
  38. // Check for v0.10 or Greater (Stream2 has Duplex type)
  39. if (stream.Duplex && em instanceof(stream)) {
  40. em.on('readable', function () {
  41. var x = em.read();
  42. self.emit(dataName, x);
  43. });
  44. } else {
  45. // Old Stream1 or Event support
  46. em.on(dataName, function (x) {
  47. self.emit(dataName, x);
  48. });
  49. }
  50. }
  51. function newLazy (g, h, l) {
  52. if (!g) {
  53. g = function () {
  54. return true;
  55. };
  56. }
  57. if (!h) {
  58. h = function (x) {
  59. return x;
  60. };
  61. }
  62. var lazy = new Lazy(null, opts, l);
  63. self.on(dataName, function (x, y) {
  64. if (g.call(lazy, x)) {
  65. lazy.emit(dataName, h(x), y);
  66. }
  67. });
  68. self.once(pipeName, function () {
  69. lazy.emit(pipeName);
  70. });
  71. return lazy;
  72. }
  73. self.filter = function (f) {
  74. return newLazy(function (x) {
  75. return f(x);
  76. });
  77. }
  78. self.forEach = function (f) {
  79. return newLazy(function (x) {
  80. f(x);
  81. return true;
  82. });
  83. }
  84. self.map = function (f) {
  85. return newLazy(
  86. function () { return true },
  87. function (x) { return f(x) }
  88. );
  89. }
  90. self.head = function (f) {
  91. var lazy = newLazy();
  92. lazy.on(dataName, function g (x) {
  93. f(x)
  94. lazy.removeListener(dataName, g)
  95. })
  96. }
  97. self.tail = function () {
  98. var skip = true;
  99. return newLazy(function () {
  100. if (skip) {
  101. skip = false;
  102. return false;
  103. }
  104. return true;
  105. });
  106. }
  107. self.skip = function (n) {
  108. return newLazy(function () {
  109. if (n > 0) {
  110. n--;
  111. return false;
  112. }
  113. return true;
  114. });
  115. }
  116. self.take = function (n) {
  117. return newLazy(function () {
  118. if (n == 0) self.emit(pipeName);
  119. return n-- > 0;
  120. });
  121. }
  122. self.takeWhile = function (f) {
  123. var cond = true;
  124. return newLazy(function (x) {
  125. if (cond && f(x)) return true;
  126. cond = false;
  127. self.emit(pipeName);
  128. return false;
  129. });
  130. }
  131. self.foldr = function (op, i, f) {
  132. var acc = i;
  133. var lazy = newLazy();
  134. lazy.on(dataName, function g (x) {
  135. acc = op(x, acc);
  136. });
  137. lazy.once(pipeName, function () {
  138. f(acc);
  139. });
  140. }
  141. self.sum = function (f) {
  142. return self.foldr(function (x, acc) { return x + acc }, 0, f);
  143. }
  144. self.product = function (f) {
  145. return self.foldr(function (x, acc) { return x*acc }, 1, f);
  146. }
  147. self.join = function (f) {
  148. var data = []
  149. var lazy = newLazy(function (x) {
  150. data.push(x);
  151. return true;
  152. });
  153. lazy.once(pipeName, function () { f(data) });
  154. return self;
  155. }
  156. self.bucket = function (init, f) {
  157. var lazy = new Lazy(null, opts);
  158. var yieldTo = function (x) {
  159. lazy.emit(dataName, x);
  160. };
  161. var acc = init;
  162. self.on(dataName, function (x) {
  163. acc = f.call(yieldTo, acc, x);
  164. });
  165. self.once(pipeName, function () {
  166. lazy.emit(pipeName);
  167. });
  168. // flush on end event
  169. self.once(endName, function () {
  170. var finalBuffer = mergeBuffers(acc);
  171. if (finalBuffer) {
  172. yieldTo(finalBuffer);
  173. }
  174. });
  175. return lazy;
  176. }
  177. // Streams that use this should emit strings or buffers only
  178. self.__defineGetter__('lines', function () {
  179. return self.bucket([], function (chunkArray, chunk) {
  180. var newline = '\n'.charCodeAt(0), lastNewLineIndex = 0;
  181. if (typeof chunk === 'string') chunk = new Buffer(chunk);
  182. if (chunk){
  183. for (var i = 0; i < chunk.length; i++) {
  184. if (chunk[i] === newline) {
  185. // If we have content from the current chunk to append to our buffers, do it.
  186. if (i > 0) {
  187. chunkArray.push(chunk.slice(lastNewLineIndex, i));
  188. }
  189. // Wrap all our buffers and emit it.
  190. this(mergeBuffers(chunkArray));
  191. lastNewLineIndex = i + 1;
  192. }
  193. }
  194. }
  195. if (lastNewLineIndex > 0) {
  196. // New line found in the chunk, push the remaining part of the buffer.
  197. if (lastNewLineIndex < chunk.length) {
  198. chunkArray.push(chunk.slice(lastNewLineIndex));
  199. }
  200. } else {
  201. // No new line found, push the whole buffer.
  202. if (chunk && chunk.length) {
  203. chunkArray.push(chunk);
  204. }
  205. }
  206. return chunkArray;
  207. });
  208. });
  209. }
  210. Lazy.range = function () {
  211. var args = arguments;
  212. var step = 1;
  213. var infinite = false;
  214. if (args.length == 1 && typeof args[0] == 'number') {
  215. var i = 0, j = args[0];
  216. }
  217. else if (args.length == 1 && typeof args[0] == 'string') { // 'start[,next]..[end]'
  218. var arg = args[0];
  219. var startOpen = false, endClosed = false;
  220. if (arg[0] == '(' || arg[0] == '[') {
  221. if (arg[0] == '(') startOpen = true;
  222. arg = arg.slice(1);
  223. }
  224. if (arg.slice(-1) == ']') endClosed = true;
  225. var parts = arg.split('..');
  226. if (parts.length != 2)
  227. throw new Error("single argument range takes 'start..' or 'start..end' or 'start,next..end'");
  228. if (parts[1] == '') { // 'start..'
  229. var i = parts[0];
  230. infinite = true;
  231. }
  232. else { // 'start[,next]..end'
  233. var progression = parts[0].split(',');
  234. if (progression.length == 1) { // start..end
  235. var i = parts[0], j = parts[1];
  236. }
  237. else { // 'start,next..end'
  238. var i = progression[0], j = parts[1];
  239. step = Math.abs(progression[1]-i);
  240. }
  241. }
  242. i = parseInt(i, 10);
  243. j = parseInt(j, 10);
  244. if (startOpen) {
  245. if (infinite || i < j) i++;
  246. else i--;
  247. }
  248. if (endClosed) {
  249. if (i < j) j++;
  250. else j--;
  251. }
  252. }
  253. else if (args.length == 2 || args.length == 3) { // start, end[, step]
  254. var i = args[0], j = args[1];
  255. if (args.length == 3) {
  256. var step = args[2];
  257. }
  258. }
  259. else {
  260. throw new Error("range takes 1, 2 or 3 arguments");
  261. }
  262. var lazy = new Lazy;
  263. var stopInfinite = false;
  264. lazy.on('pipe', function () {
  265. stopInfinite = true;
  266. });
  267. if (infinite) {
  268. process.nextTick(function g () {
  269. if (stopInfinite) return;
  270. lazy.emit('data', i++);
  271. process.nextTick(g);
  272. });
  273. }
  274. else {
  275. process.nextTick(function () {
  276. if (i < j) {
  277. for (; i<j; i+=step) {
  278. lazy.emit('data', i)
  279. }
  280. }
  281. else {
  282. for (; i>j; i-=step) {
  283. lazy.emit('data', i)
  284. }
  285. }
  286. lazy.emit('end');
  287. });
  288. }
  289. return lazy;
  290. }
  291. var mergeBuffers = function mergeBuffers(buffers) {
  292. // We expect buffers to be a non-empty Array
  293. if (!buffers || !Array.isArray(buffers) || !buffers.length) return;
  294. var finalBufferLength, finalBuffer, currentBuffer, currentSize = 0;
  295. // Sum all the buffers lengths
  296. finalBufferLength = buffers.reduce(function(left, right) { return (left.length||left) + (right.length||right); }, 0);
  297. finalBuffer = new Buffer(finalBufferLength);
  298. while(buffers.length) {
  299. currentBuffer = buffers.shift();
  300. currentBuffer.copy(finalBuffer, currentSize);
  301. currentSize += currentBuffer.length;
  302. }
  303. return finalBuffer;
  304. }
  305. util.inherits(Lazy, EventEmitter);
  306. module.exports = Lazy;