queue.js 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. /**
  2. * Module dependencies.
  3. */
  4. var debug = require('debug')('axon:queue');
  5. /**
  6. * Queue plugin.
  7. *
  8. * Provides an `.enqueue()` method to the `sock`. Messages
  9. * passed to `enqueue` will be buffered until the next
  10. * `connect` event is emitted.
  11. *
  12. * Emits:
  13. *
  14. * - `drop` (msg) when a message is dropped
  15. * - `flush` (msgs) when the queue is flushed
  16. *
  17. * @param {Object} options
  18. * @api private
  19. */
  20. module.exports = function(options){
  21. options = options || {};
  22. return function(sock){
  23. /**
  24. * Message buffer.
  25. */
  26. sock.queue = [];
  27. /**
  28. * Flush `buf` on `connect`.
  29. */
  30. sock.on('connect', function(){
  31. var prev = sock.queue;
  32. var len = prev.length;
  33. sock.queue = [];
  34. debug('flush %d messages', len);
  35. for (var i = 0; i < len; ++i) {
  36. this.send.apply(this, prev[i]);
  37. }
  38. sock.emit('flush', prev);
  39. });
  40. /**
  41. * Pushes `msg` into `buf`.
  42. */
  43. sock.enqueue = function(msg){
  44. var hwm = sock.settings.hwm;
  45. if (sock.queue.length >= hwm) return drop(msg);
  46. sock.queue.push(msg);
  47. };
  48. /**
  49. * Drop the given `msg`.
  50. */
  51. function drop(msg) {
  52. debug('drop');
  53. sock.emit('drop', msg);
  54. }
  55. };
  56. };