rep.js 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. /**
  2. * Module dependencies.
  3. */
  4. var slice = require('../utils').slice;
  5. var debug = require('debug')('axon:rep');
  6. var Message = require('amp-message');
  7. var Socket = require('./sock');
  8. /**
  9. * Expose `RepSocket`.
  10. */
  11. module.exports = RepSocket;
  12. /**
  13. * Initialize a new `RepSocket`.
  14. *
  15. * @api private
  16. */
  17. function RepSocket() {
  18. Socket.call(this);
  19. }
  20. /**
  21. * Inherits from `Socket.prototype`.
  22. */
  23. RepSocket.prototype.__proto__ = Socket.prototype;
  24. /**
  25. * Incoming.
  26. *
  27. * @param {net.Socket} sock
  28. * @return {Function} closure(msg, mulitpart)
  29. * @api private
  30. */
  31. RepSocket.prototype.onmessage = function(sock){
  32. var self = this;
  33. return function (buf){
  34. var msg = new Message(buf);
  35. var args = msg.args;
  36. var id = args.pop();
  37. args.unshift('message');
  38. args.push(reply);
  39. self.emit.apply(self, args);
  40. function reply() {
  41. var fn = function(){};
  42. var args = slice(arguments);
  43. args[0] = args[0] || null;
  44. var hasCallback = 'function' == typeof args[args.length - 1];
  45. if (hasCallback) fn = args.pop();
  46. args.push(id);
  47. if (sock.writable) {
  48. sock.write(self.pack(args), function(){ fn(true) });
  49. return true;
  50. } else {
  51. debug('peer went away');
  52. process.nextTick(function(){ fn(false) });
  53. return false;
  54. }
  55. }
  56. };
  57. };