req.js 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. /**
  2. * Module dependencies.
  3. */
  4. var debug = require('debug')('axon:req');
  5. var queue = require('../plugins/queue');
  6. var slice = require('../utils').slice;
  7. var Message = require('amp-message');
  8. var Socket = require('./sock');
  9. /**
  10. * Expose `ReqSocket`.
  11. */
  12. module.exports = ReqSocket;
  13. /**
  14. * Initialize a new `ReqSocket`.
  15. *
  16. * @api private
  17. */
  18. function ReqSocket() {
  19. Socket.call(this);
  20. this.n = 0;
  21. this.ids = 0;
  22. this.callbacks = {};
  23. this.identity = this.get('identity');
  24. this.use(queue());
  25. }
  26. /**
  27. * Inherits from `Socket.prototype`.
  28. */
  29. ReqSocket.prototype.__proto__ = Socket.prototype;
  30. /**
  31. * Return a message id.
  32. *
  33. * @return {String}
  34. * @api private
  35. */
  36. ReqSocket.prototype.id = function(){
  37. return this.identity + ':' + this.ids++;
  38. };
  39. /**
  40. * Emits the "message" event with all message parts
  41. * after the null delimeter part.
  42. *
  43. * @param {net.Socket} sock
  44. * @return {Function} closure(msg, multipart)
  45. * @api private
  46. */
  47. ReqSocket.prototype.onmessage = function(){
  48. var self = this;
  49. return function(buf){
  50. var msg = new Message(buf);
  51. var id = msg.pop();
  52. var fn = self.callbacks[id];
  53. if (!fn) return debug('missing callback %s', id);
  54. fn.apply(null, msg.args);
  55. delete self.callbacks[id];
  56. };
  57. };
  58. /**
  59. * Sends `msg` to the remote peers. Appends
  60. * the null message part prior to sending.
  61. *
  62. * @param {Mixed} msg
  63. * @api public
  64. */
  65. ReqSocket.prototype.send = function(msg){
  66. var socks = this.socks;
  67. var len = socks.length;
  68. var sock = socks[this.n++ % len];
  69. var args = slice(arguments);
  70. if (sock) {
  71. var hasCallback = 'function' == typeof args[args.length - 1];
  72. if (!hasCallback) args.push(function(){});
  73. var fn = args.pop();
  74. fn.id = this.id();
  75. this.callbacks[fn.id] = fn;
  76. args.push(fn.id);
  77. }
  78. if (sock) {
  79. sock.write(this.pack(args));
  80. } else {
  81. debug('no connected peers');
  82. this.enqueue(args);
  83. }
  84. };