sub.js 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. /**
  2. * Module dependencies.
  3. */
  4. var debug = require('debug')('axon:sub');
  5. var escape = require('escape-string-regexp');
  6. var Message = require('amp-message');
  7. var Socket = require('./sock');
  8. /**
  9. * Expose `SubSocket`.
  10. */
  11. module.exports = SubSocket;
  12. /**
  13. * Initialize a new `SubSocket`.
  14. *
  15. * @api private
  16. */
  17. function SubSocket() {
  18. Socket.call(this);
  19. this.subscriptions = [];
  20. }
  21. /**
  22. * Inherits from `Socket.prototype`.
  23. */
  24. SubSocket.prototype.__proto__ = Socket.prototype;
  25. /**
  26. * Check if this socket has subscriptions.
  27. *
  28. * @return {Boolean}
  29. * @api public
  30. */
  31. SubSocket.prototype.hasSubscriptions = function(){
  32. return !! this.subscriptions.length;
  33. };
  34. /**
  35. * Check if any subscriptions match `topic`.
  36. *
  37. * @param {String} topic
  38. * @return {Boolean}
  39. * @api public
  40. */
  41. SubSocket.prototype.matches = function(topic){
  42. for (var i = 0; i < this.subscriptions.length; ++i) {
  43. if (this.subscriptions[i].test(topic)) {
  44. return true;
  45. }
  46. }
  47. return false;
  48. };
  49. /**
  50. * Message handler.
  51. *
  52. * @param {net.Socket} sock
  53. * @return {Function} closure(msg, mulitpart)
  54. * @api private
  55. */
  56. SubSocket.prototype.onmessage = function(sock){
  57. var subs = this.hasSubscriptions();
  58. var self = this;
  59. return function(buf){
  60. var msg = new Message(buf);
  61. if (subs) {
  62. var topic = msg.args[0];
  63. if (!self.matches(topic)) return debug('not subscribed to "%s"', topic);
  64. }
  65. self.emit.apply(self, ['message'].concat(msg.args).concat(sock));
  66. };
  67. };
  68. /**
  69. * Subscribe with the given `re`.
  70. *
  71. * @param {RegExp|String} re
  72. * @return {RegExp}
  73. * @api public
  74. */
  75. SubSocket.prototype.subscribe = function(re){
  76. debug('subscribe to "%s"', re);
  77. this.subscriptions.push(re = toRegExp(re));
  78. return re;
  79. };
  80. /**
  81. * Unsubscribe with the given `re`.
  82. *
  83. * @param {RegExp|String} re
  84. * @api public
  85. */
  86. SubSocket.prototype.unsubscribe = function(re){
  87. debug('unsubscribe from "%s"', re);
  88. re = toRegExp(re);
  89. for (var i = 0; i < this.subscriptions.length; ++i) {
  90. if (this.subscriptions[i].toString() === re.toString()) {
  91. this.subscriptions.splice(i--, 1);
  92. }
  93. }
  94. };
  95. /**
  96. * Clear current subscriptions.
  97. *
  98. * @api public
  99. */
  100. SubSocket.prototype.clearSubscriptions = function(){
  101. this.subscriptions = [];
  102. };
  103. /**
  104. * Subscribers should not send messages.
  105. */
  106. SubSocket.prototype.send = function(){
  107. throw new Error('subscribers cannot send messages');
  108. };
  109. /**
  110. * Convert `str` to a `RegExp`.
  111. *
  112. * @param {String} str
  113. * @return {RegExp}
  114. * @api private
  115. */
  116. function toRegExp(str) {
  117. if (str instanceof RegExp) return str;
  118. str = escape(str);
  119. str = str.replace(/\\\*/g, '(.+)');
  120. return new RegExp('^' + str + '$');
  121. }