consume.js 329 B

123456789101112131415
  1. "use strict";
  2. module.exports = consume;
  3. function consume(channel, emit) {
  4. return function (callback) {
  5. channel.take(onItem);
  6. function onItem(err, item) {
  7. if (item === undefined) return callback(err);
  8. try { emit(item); }
  9. catch (err) { return callback(err); }
  10. channel.take(onItem);
  11. }
  12. };
  13. }