sock.js 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. /**
  2. * Module dependencies.
  3. */
  4. var Emitter = require('events').EventEmitter;
  5. var Configurable = require('../configurable');
  6. var debug = require('debug')('axon:sock');
  7. var Message = require('amp-message');
  8. var Parser = require('amp').Stream;
  9. var url = require('url');
  10. var net = require('net');
  11. var fs = require('fs');
  12. /**
  13. * Errors to ignore.
  14. */
  15. var ignore = [
  16. 'ECONNREFUSED',
  17. 'ECONNRESET',
  18. 'ETIMEDOUT',
  19. 'EHOSTUNREACH',
  20. 'ENETUNREACH',
  21. 'ENETDOWN',
  22. 'EPIPE',
  23. 'ENOENT'
  24. ];
  25. /**
  26. * Expose `Socket`.
  27. */
  28. module.exports = Socket;
  29. /**
  30. * Initialize a new `Socket`.
  31. *
  32. * A "Socket" encapsulates the ability of being
  33. * the "client" or the "server" depending on
  34. * whether `connect()` or `bind()` was called.
  35. *
  36. * @api private
  37. */
  38. function Socket() {
  39. var self = this;
  40. this.opts = {};
  41. this.server = null;
  42. this.socks = [];
  43. this.settings = {};
  44. this.set('hwm', Infinity);
  45. this.set('identity', String(process.pid));
  46. this.set('retry timeout', 100);
  47. this.set('retry max timeout', 5000);
  48. }
  49. /**
  50. * Inherit from `Emitter.prototype`.
  51. */
  52. Socket.prototype.__proto__ = Emitter.prototype;
  53. /**
  54. * Make it configurable `.set()` etc.
  55. */
  56. Configurable(Socket.prototype);
  57. /**
  58. * Use the given `plugin`.
  59. *
  60. * @param {Function} plugin
  61. * @api private
  62. */
  63. Socket.prototype.use = function(plugin){
  64. plugin(this);
  65. return this;
  66. };
  67. /**
  68. * Creates a new `Message` and write the `args`.
  69. *
  70. * @param {Array} args
  71. * @return {Buffer}
  72. * @api private
  73. */
  74. Socket.prototype.pack = function(args){
  75. var msg = new Message(args);
  76. return msg.toBuffer();
  77. };
  78. /**
  79. * Close all open underlying sockets.
  80. *
  81. * @api private
  82. */
  83. Socket.prototype.closeSockets = function(){
  84. debug('closing %d connections', this.socks.length);
  85. this.socks.forEach(function(sock){
  86. sock.destroy();
  87. });
  88. };
  89. /**
  90. * Close the socket.
  91. *
  92. * Delegates to the server or clients
  93. * based on the socket `type`.
  94. *
  95. * @param {Function} [fn]
  96. * @api public
  97. */
  98. Socket.prototype.close = function(fn){
  99. debug('closing');
  100. this.closing = true;
  101. this.closeSockets();
  102. if (this.server) this.closeServer(fn);
  103. };
  104. /**
  105. * Close the server.
  106. *
  107. * @param {Function} [fn]
  108. * @api public
  109. */
  110. Socket.prototype.closeServer = function(fn){
  111. debug('closing server');
  112. this.server.on('close', this.emit.bind(this, 'close'));
  113. this.server.close();
  114. fn && fn();
  115. };
  116. /**
  117. * Return the server address.
  118. *
  119. * @return {Object}
  120. * @api public
  121. */
  122. Socket.prototype.address = function(){
  123. if (!this.server) return;
  124. var addr = this.server.address();
  125. addr.string = 'tcp://' + addr.address + ':' + addr.port;
  126. return addr;
  127. };
  128. /**
  129. * Remove `sock`.
  130. *
  131. * @param {Socket} sock
  132. * @api private
  133. */
  134. Socket.prototype.removeSocket = function(sock){
  135. var i = this.socks.indexOf(sock);
  136. if (!~i) return;
  137. debug('remove socket %d', i);
  138. this.socks.splice(i, 1);
  139. };
  140. /**
  141. * Add `sock`.
  142. *
  143. * @param {Socket} sock
  144. * @api private
  145. */
  146. Socket.prototype.addSocket = function(sock){
  147. var parser = new Parser;
  148. var i = this.socks.push(sock) - 1;
  149. debug('add socket %d', i);
  150. sock.pipe(parser);
  151. parser.on('data', this.onmessage(sock));
  152. };
  153. /**
  154. * Handle `sock` errors.
  155. *
  156. * Emits:
  157. *
  158. * - `error` (err) when the error is not ignored
  159. * - `ignored error` (err) when the error is ignored
  160. * - `socket error` (err) regardless of ignoring
  161. *
  162. * @param {Socket} sock
  163. * @api private
  164. */
  165. Socket.prototype.handleErrors = function(sock){
  166. var self = this;
  167. sock.on('error', function(err){
  168. debug('error %s', err.code || err.message);
  169. self.emit('socket error', err);
  170. self.removeSocket(sock);
  171. if (!~ignore.indexOf(err.code)) return self.emit('error', err);
  172. debug('ignored %s', err.code);
  173. self.emit('ignored error', err);
  174. });
  175. };
  176. /**
  177. * Handles framed messages emitted from the parser, by
  178. * default it will go ahead and emit the "message" events on
  179. * the socket. However, if the "higher level" socket needs
  180. * to hook into the messages before they are emitted, it
  181. * should override this method and take care of everything
  182. * it self, including emitted the "message" event.
  183. *
  184. * @param {net.Socket} sock
  185. * @return {Function} closure(msg, mulitpart)
  186. * @api private
  187. */
  188. Socket.prototype.onmessage = function(sock){
  189. var self = this;
  190. return function(buf){
  191. var msg = new Message(buf);
  192. self.emit.apply(self, ['message'].concat(msg.args), sock);
  193. };
  194. };
  195. /**
  196. * Connect to `port` at `host` and invoke `fn()`.
  197. *
  198. * Defaults `host` to localhost.
  199. *
  200. * TODO: needs big cleanup
  201. *
  202. * @param {Number|String} port
  203. * @param {String} host
  204. * @param {Function} fn
  205. * @return {Socket}
  206. * @api public
  207. */
  208. Socket.prototype.connect = function(port, host, fn){
  209. var self = this;
  210. if ('server' == this.type) throw new Error('cannot connect() after bind()');
  211. if ('function' == typeof host) fn = host, host = undefined;
  212. if ('string' == typeof port) {
  213. port = url.parse(port);
  214. if (port.pathname) {
  215. fn = host;
  216. host = null;
  217. fn = undefined;
  218. port = port.pathname;
  219. } else {
  220. host = port.hostname || '0.0.0.0';
  221. port = parseInt(port.port, 10);
  222. }
  223. } else {
  224. host = host || '0.0.0.0';
  225. }
  226. var max = self.get('retry max timeout');
  227. var sock = new net.Socket;
  228. sock.setNoDelay();
  229. this.type = 'client';
  230. port = port;
  231. this.handleErrors(sock);
  232. sock.on('close', function(){
  233. self.connected = false;
  234. self.removeSocket(sock);
  235. if (self.closing) return self.emit('close');
  236. var retry = self.retry || self.get('retry timeout');
  237. if (retry === 0) return self.emit('close');
  238. setTimeout(function(){
  239. debug('attempting reconnect');
  240. self.emit('reconnect attempt');
  241. sock.destroy();
  242. self.connect(port, host);
  243. self.retry = Math.round(Math.min(max, retry * 1.5));
  244. }, retry);
  245. });
  246. sock.on('connect', function(){
  247. debug('connect');
  248. self.connected = true;
  249. self.addSocket(sock);
  250. self.retry = self.get('retry timeout');
  251. self.emit('connect');
  252. fn && fn();
  253. });
  254. debug('connect attempt %s:%s', host, port);
  255. sock.connect(port, host);
  256. return this;
  257. };
  258. /**
  259. * Handle connection.
  260. *
  261. * @param {Socket} sock
  262. * @api private
  263. */
  264. Socket.prototype.onconnect = function(sock){
  265. var self = this;
  266. var addr = null;
  267. if (sock.remoteAddress && sock.remotePort)
  268. addr = sock.remoteAddress + ':' + sock.remotePort;
  269. else if (sock.server && sock.server._pipeName)
  270. addr = sock.server._pipeName;
  271. debug('accept %s', addr);
  272. this.addSocket(sock);
  273. this.handleErrors(sock);
  274. this.emit('connect', sock);
  275. sock.on('close', function(){
  276. debug('disconnect %s', addr);
  277. self.emit('disconnect', sock);
  278. self.removeSocket(sock);
  279. });
  280. };
  281. /**
  282. * Bind to `port` at `host` and invoke `fn()`.
  283. *
  284. * Defaults `host` to INADDR_ANY.
  285. *
  286. * Emits:
  287. *
  288. * - `connection` when a client connects
  289. * - `disconnect` when a client disconnects
  290. * - `bind` when bound and listening
  291. *
  292. * @param {Number|String} port
  293. * @param {Function} fn
  294. * @return {Socket}
  295. * @api public
  296. */
  297. Socket.prototype.bind = function(port, host, fn){
  298. var self = this;
  299. if ('client' == this.type) throw new Error('cannot bind() after connect()');
  300. if ('function' == typeof host) fn = host, host = undefined;
  301. var unixSocket = false;
  302. if ('string' == typeof port) {
  303. port = url.parse(port);
  304. if (port.pathname) {
  305. fn = host;
  306. host = null;
  307. port = port.pathname;
  308. unixSocket = true;
  309. } else {
  310. host = port.hostname || '0.0.0.0';
  311. port = parseInt(port.port, 10);
  312. }
  313. } else {
  314. host = host || '0.0.0.0';
  315. }
  316. this.type = 'server';
  317. this.server = net.createServer(this.onconnect.bind(this));
  318. debug('bind %s:%s', host, port);
  319. this.server.on('listening', this.emit.bind(this, 'bind'));
  320. if (unixSocket) {
  321. // TODO: move out
  322. this.server.on('error', function(e) {
  323. debug('Got error while trying to bind', e.stack || e);
  324. if (e.code == 'EADDRINUSE') {
  325. // Unix file socket and error EADDRINUSE is the case if
  326. // the file socket exists. We check if other processes
  327. // listen on file socket, otherwise it is a stale socket
  328. // that we could reopen
  329. // We try to connect to socket via plain network socket
  330. var clientSocket = new net.Socket();
  331. clientSocket.on('error', function(e2) {
  332. debug('Got sub-error', e2);
  333. if (e2.code == 'ECONNREFUSED' || e2.code == 'ENOENT') {
  334. // No other server listening, so we can delete stale
  335. // socket file and reopen server socket
  336. try {
  337. fs.unlinkSync(port);
  338. } catch(e) {}
  339. self.server.listen(port, host, fn);
  340. }
  341. });
  342. clientSocket.connect({path: port}, function() {
  343. // Connection is possible, so other server is listening
  344. // on this file socket
  345. if (fn) return fn(new Error('Process already listening on socket ' + port));
  346. });
  347. }
  348. else {
  349. try {
  350. fs.unlinkSync(port);
  351. } catch(e) {}
  352. self.server.listen(port, host, fn);
  353. }
  354. });
  355. }
  356. this.server.listen(port, host, fn);
  357. return this;
  358. };