123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- var debug = require('debug')('axon:req');
- var queue = require('../plugins/queue');
- var slice = require('../utils').slice;
- var Message = require('amp-message');
- var Socket = require('./sock');
- module.exports = ReqSocket;
- function ReqSocket() {
- Socket.call(this);
- this.n = 0;
- this.ids = 0;
- this.callbacks = {};
- this.identity = this.get('identity');
- this.use(queue());
- }
- ReqSocket.prototype.__proto__ = Socket.prototype;
- ReqSocket.prototype.id = function(){
- return this.identity + ':' + this.ids++;
- };
- ReqSocket.prototype.onmessage = function(){
- var self = this;
- return function(buf){
- var msg = new Message(buf);
- var id = msg.pop();
- var fn = self.callbacks[id];
- if (!fn) return debug('missing callback %s', id);
- fn.apply(null, msg.args);
- delete self.callbacks[id];
- };
- };
- ReqSocket.prototype.send = function(msg){
- var socks = this.socks;
- var len = socks.length;
- var sock = socks[this.n++ % len];
- var args = slice(arguments);
- if (sock) {
- var hasCallback = 'function' == typeof args[args.length - 1];
- if (!hasCallback) args.push(function(){});
- var fn = args.pop();
- fn.id = this.id();
- this.callbacks[fn.id] = fn;
- args.push(fn.id);
- }
- if (sock) {
- sock.write(this.pack(args));
- } else {
- debug('no connected peers');
- this.enqueue(args);
- }
- };
|