channel.js 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. "use strict";
  2. module.exports = makeChannel;
  3. function makeChannel(bufferSize, monitor) {
  4. bufferSize = bufferSize|0;
  5. var dataQueue = [];
  6. var readQueue = [];
  7. var drainList = [];
  8. if (typeof monitor === "string") {
  9. monitor = log(monitor);
  10. }
  11. return {
  12. drain: drain,
  13. put: put,
  14. take: take,
  15. };
  16. function drain(callback) {
  17. if (typeof callback !== "function") {
  18. throw new TypeError("callback must be function");
  19. }
  20. if (dataQueue.length <= bufferSize) return callback();
  21. drainList.push(callback);
  22. }
  23. // Returns true when it's safe to continue without draining
  24. function put(item) {
  25. if (monitor) monitor("put", item);
  26. if (readQueue.length) {
  27. if (monitor) monitor("take", item);
  28. readQueue.shift()(null, item);
  29. }
  30. else {
  31. dataQueue.push(item);
  32. }
  33. return dataQueue.length <= bufferSize;
  34. }
  35. function take(callback) {
  36. if (typeof callback !== "function") {
  37. throw new TypeError("callback must be function");
  38. }
  39. if (dataQueue.length) {
  40. var item = dataQueue.shift();
  41. if (monitor) monitor("take", item);
  42. callback(null, item);
  43. if (dataQueue.length <= bufferSize && drainList.length) {
  44. var list = drainList;
  45. drainList = [];
  46. for (var i = 0; i < list.length; i++) {
  47. list[i]();
  48. }
  49. }
  50. return;
  51. }
  52. readQueue.push(callback);
  53. }
  54. }
  55. function log(name) {
  56. return function (type, value) {
  57. console.info(name, type, value);
  58. };
  59. }