sub-emitter.js 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. /**
  2. * Module dependencies.
  3. */
  4. var Message = require('amp-message');
  5. var SubSocket = require('./sub');
  6. /**
  7. * Expose `SubEmitterSocket`.
  8. */
  9. module.exports = SubEmitterSocket;
  10. /**
  11. * Initialzie a new `SubEmitterSocket`.
  12. *
  13. * @api private
  14. */
  15. function SubEmitterSocket() {
  16. this.sock = new SubSocket;
  17. this.sock.onmessage = this.onmessage.bind(this);
  18. this.bind = this.sock.bind.bind(this.sock);
  19. this.connect = this.sock.connect.bind(this.sock);
  20. this.close = this.sock.close.bind(this.sock);
  21. this.listeners = [];
  22. }
  23. /**
  24. * Message handler.
  25. *
  26. * @param {net.Socket} sock
  27. * @return {Function} closure(msg, mulitpart)
  28. * @api private
  29. */
  30. SubEmitterSocket.prototype.onmessage = function(){
  31. var listeners = this.listeners;
  32. var self = this;
  33. return function(buf){
  34. var msg = new Message(buf);
  35. var topic = msg.shift();
  36. for (var i = 0; i < listeners.length; ++i) {
  37. var listener = listeners[i];
  38. var m = listener.re.exec(topic);
  39. if (!m) continue;
  40. listener.fn.apply(this, m.slice(1).concat(msg.args));
  41. }
  42. }
  43. };
  44. /**
  45. * Subscribe to `event` and invoke the given callback `fn`.
  46. *
  47. * @param {String} event
  48. * @param {Function} fn
  49. * @return {SubEmitterSocket} self
  50. * @api public
  51. */
  52. SubEmitterSocket.prototype.on = function(event, fn){
  53. var re = this.sock.subscribe(event);
  54. this.listeners.push({
  55. event: event,
  56. re: re,
  57. fn: fn
  58. });
  59. return this;
  60. };
  61. /**
  62. * Unsubscribe with the given `event`.
  63. *
  64. * @param {String} event
  65. * @return {SubEmitterSocket} self
  66. * @api public
  67. */
  68. SubEmitterSocket.prototype.off = function(event){
  69. for (var i = 0; i < this.listeners.length; ++i) {
  70. if (this.listeners[i].event === event) {
  71. this.sock.unsubscribe(this.listeners[i].re);
  72. this.listeners.splice(i--, 1);
  73. }
  74. }
  75. return this;
  76. };