| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 | 
/** * Module dependencies. */var debug = require('debug')('axon:req');var queue = require('../plugins/queue');var slice = require('../utils').slice;var Message = require('amp-message');var Socket = require('./sock');/** * Expose `ReqSocket`. */module.exports = ReqSocket;/** * Initialize a new `ReqSocket`. * * @api private */function ReqSocket() {  Socket.call(this);  this.n = 0;  this.ids = 0;  this.callbacks = {};  this.identity = this.get('identity');  this.use(queue());}/** * Inherits from `Socket.prototype`. */ReqSocket.prototype.__proto__ = Socket.prototype;/** * Return a message id. * * @return {String} * @api private */ReqSocket.prototype.id = function(){  return this.identity + ':' + this.ids++;};/** * Emits the "message" event with all message parts * after the null delimeter part. * * @param {net.Socket} sock * @return {Function} closure(msg, multipart) * @api private */ReqSocket.prototype.onmessage = function(){  var self = this;  return function(buf){    var msg = new Message(buf);    var id = msg.pop();    var fn = self.callbacks[id];    if (!fn) return debug('missing callback %s', id);    fn.apply(null, msg.args);    delete self.callbacks[id];  };};/** * Sends `msg` to the remote peers. Appends * the null message part prior to sending. * * @param {Mixed} msg * @api public */ReqSocket.prototype.send = function(msg){  var socks = this.socks;  var len = socks.length;  var sock = socks[this.n++ % len];  var args = slice(arguments);  if (sock) {    var hasCallback = 'function' == typeof args[args.length - 1];    if (!hasCallback) args.push(function(){});    var fn = args.pop();    fn.id = this.id();    this.callbacks[fn.id] = fn;    args.push(fn.id);  }  if (sock) {    sock.write(this.pack(args));  } else {    debug('no connected peers');    this.enqueue(args);  }};
 |