123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- /**
- * Module dependencies.
- */
- var Emitter = require('events').EventEmitter;
- var Configurable = require('../configurable');
- var debug = require('debug')('axon:sock');
- var Message = require('amp-message');
- var Parser = require('amp').Stream;
- var url = require('url');
- var net = require('net');
- var fs = require('fs');
- /**
- * Errors to ignore.
- */
- var ignore = [
- 'ECONNREFUSED',
- 'ECONNRESET',
- 'ETIMEDOUT',
- 'EHOSTUNREACH',
- 'ENETUNREACH',
- 'ENETDOWN',
- 'EPIPE',
- 'ENOENT'
- ];
- /**
- * Expose `Socket`.
- */
- module.exports = Socket;
- /**
- * Initialize a new `Socket`.
- *
- * A "Socket" encapsulates the ability of being
- * the "client" or the "server" depending on
- * whether `connect()` or `bind()` was called.
- *
- * @api private
- */
- function Socket() {
- var self = this;
- this.opts = {};
- this.server = null;
- this.socks = [];
- this.settings = {};
- this.set('hwm', Infinity);
- this.set('identity', String(process.pid));
- this.set('retry timeout', 100);
- this.set('retry max timeout', 5000);
- }
- /**
- * Inherit from `Emitter.prototype`.
- */
- Socket.prototype.__proto__ = Emitter.prototype;
- /**
- * Make it configurable `.set()` etc.
- */
- Configurable(Socket.prototype);
- /**
- * Use the given `plugin`.
- *
- * @param {Function} plugin
- * @api private
- */
- Socket.prototype.use = function(plugin){
- plugin(this);
- return this;
- };
- /**
- * Creates a new `Message` and write the `args`.
- *
- * @param {Array} args
- * @return {Buffer}
- * @api private
- */
- Socket.prototype.pack = function(args){
- var msg = new Message(args);
- return msg.toBuffer();
- };
- /**
- * Close all open underlying sockets.
- *
- * @api private
- */
- Socket.prototype.closeSockets = function(){
- debug('closing %d connections', this.socks.length);
- this.socks.forEach(function(sock){
- sock.destroy();
- });
- };
- /**
- * Close the socket.
- *
- * Delegates to the server or clients
- * based on the socket `type`.
- *
- * @param {Function} [fn]
- * @api public
- */
- Socket.prototype.close = function(fn){
- debug('closing');
- this.closing = true;
- this.closeSockets();
- if (this.server) this.closeServer(fn);
- };
- /**
- * Close the server.
- *
- * @param {Function} [fn]
- * @api public
- */
- Socket.prototype.closeServer = function(fn){
- debug('closing server');
- this.server.on('close', this.emit.bind(this, 'close'));
- this.server.close();
- fn && fn();
- };
- /**
- * Return the server address.
- *
- * @return {Object}
- * @api public
- */
- Socket.prototype.address = function(){
- if (!this.server) return;
- var addr = this.server.address();
- addr.string = 'tcp://' + addr.address + ':' + addr.port;
- return addr;
- };
- /**
- * Remove `sock`.
- *
- * @param {Socket} sock
- * @api private
- */
- Socket.prototype.removeSocket = function(sock){
- var i = this.socks.indexOf(sock);
- if (!~i) return;
- debug('remove socket %d', i);
- this.socks.splice(i, 1);
- };
- /**
- * Add `sock`.
- *
- * @param {Socket} sock
- * @api private
- */
- Socket.prototype.addSocket = function(sock){
- var parser = new Parser;
- var i = this.socks.push(sock) - 1;
- debug('add socket %d', i);
- sock.pipe(parser);
- parser.on('data', this.onmessage(sock));
- };
- /**
- * Handle `sock` errors.
- *
- * Emits:
- *
- * - `error` (err) when the error is not ignored
- * - `ignored error` (err) when the error is ignored
- * - `socket error` (err) regardless of ignoring
- *
- * @param {Socket} sock
- * @api private
- */
- Socket.prototype.handleErrors = function(sock){
- var self = this;
- sock.on('error', function(err){
- debug('error %s', err.code || err.message);
- self.emit('socket error', err);
- self.removeSocket(sock);
- if (!~ignore.indexOf(err.code)) return self.emit('error', err);
- debug('ignored %s', err.code);
- self.emit('ignored error', err);
- });
- };
- /**
- * Handles framed messages emitted from the parser, by
- * default it will go ahead and emit the "message" events on
- * the socket. However, if the "higher level" socket needs
- * to hook into the messages before they are emitted, it
- * should override this method and take care of everything
- * it self, including emitted the "message" event.
- *
- * @param {net.Socket} sock
- * @return {Function} closure(msg, mulitpart)
- * @api private
- */
- Socket.prototype.onmessage = function(sock){
- var self = this;
- return function(buf){
- var msg = new Message(buf);
- self.emit.apply(self, ['message'].concat(msg.args), sock);
- };
- };
- /**
- * Connect to `port` at `host` and invoke `fn()`.
- *
- * Defaults `host` to localhost.
- *
- * TODO: needs big cleanup
- *
- * @param {Number|String} port
- * @param {String} host
- * @param {Function} fn
- * @return {Socket}
- * @api public
- */
- Socket.prototype.connect = function(port, host, fn){
- var self = this;
- if ('server' == this.type) throw new Error('cannot connect() after bind()');
- if ('function' == typeof host) fn = host, host = undefined;
- if ('string' == typeof port) {
- port = url.parse(port);
- if (port.pathname) {
- fn = host;
- host = null;
- fn = undefined;
- port = port.pathname;
- } else {
- host = port.hostname || '0.0.0.0';
- port = parseInt(port.port, 10);
- }
- } else {
- host = host || '0.0.0.0';
- }
- var max = self.get('retry max timeout');
- var sock = new net.Socket;
- sock.setNoDelay();
- this.type = 'client';
- port = port;
- this.handleErrors(sock);
- sock.on('close', function(){
- self.connected = false;
- self.removeSocket(sock);
- if (self.closing) return self.emit('close');
- var retry = self.retry || self.get('retry timeout');
- if (retry === 0) return self.emit('close');
- setTimeout(function(){
- debug('attempting reconnect');
- self.emit('reconnect attempt');
- sock.destroy();
- self.connect(port, host);
- self.retry = Math.round(Math.min(max, retry * 1.5));
- }, retry);
- });
- sock.on('connect', function(){
- debug('connect');
- self.connected = true;
- self.addSocket(sock);
- self.retry = self.get('retry timeout');
- self.emit('connect');
- fn && fn();
- });
- debug('connect attempt %s:%s', host, port);
- sock.connect(port, host);
- return this;
- };
- /**
- * Handle connection.
- *
- * @param {Socket} sock
- * @api private
- */
- Socket.prototype.onconnect = function(sock){
- var self = this;
- var addr = null;
- if (sock.remoteAddress && sock.remotePort)
- addr = sock.remoteAddress + ':' + sock.remotePort;
- else if (sock.server && sock.server._pipeName)
- addr = sock.server._pipeName;
- debug('accept %s', addr);
- this.addSocket(sock);
- this.handleErrors(sock);
- this.emit('connect', sock);
- sock.on('close', function(){
- debug('disconnect %s', addr);
- self.emit('disconnect', sock);
- self.removeSocket(sock);
- });
- };
- /**
- * Bind to `port` at `host` and invoke `fn()`.
- *
- * Defaults `host` to INADDR_ANY.
- *
- * Emits:
- *
- * - `connection` when a client connects
- * - `disconnect` when a client disconnects
- * - `bind` when bound and listening
- *
- * @param {Number|String} port
- * @param {Function} fn
- * @return {Socket}
- * @api public
- */
- Socket.prototype.bind = function(port, host, fn){
- var self = this;
- if ('client' == this.type) throw new Error('cannot bind() after connect()');
- if ('function' == typeof host) fn = host, host = undefined;
- var unixSocket = false;
- if ('string' == typeof port) {
- port = url.parse(port);
- if (port.pathname) {
- fn = host;
- host = null;
- port = port.pathname;
- unixSocket = true;
- } else {
- host = port.hostname || '0.0.0.0';
- port = parseInt(port.port, 10);
- }
- } else {
- host = host || '0.0.0.0';
- }
- this.type = 'server';
- this.server = net.createServer(this.onconnect.bind(this));
- debug('bind %s:%s', host, port);
- this.server.on('listening', this.emit.bind(this, 'bind'));
- if (unixSocket) {
- // TODO: move out
- this.server.on('error', function(e) {
- debug('Got error while trying to bind', e.stack || e);
- if (e.code == 'EADDRINUSE') {
- // Unix file socket and error EADDRINUSE is the case if
- // the file socket exists. We check if other processes
- // listen on file socket, otherwise it is a stale socket
- // that we could reopen
- // We try to connect to socket via plain network socket
- var clientSocket = new net.Socket();
- clientSocket.on('error', function(e2) {
- debug('Got sub-error', e2);
- if (e2.code == 'ECONNREFUSED' || e2.code == 'ENOENT') {
- // No other server listening, so we can delete stale
- // socket file and reopen server socket
- try {
- fs.unlinkSync(port);
- } catch(e) {}
- self.server.listen(port, host, fn);
- }
- });
- clientSocket.connect({path: port}, function() {
- // Connection is possible, so other server is listening
- // on this file socket
- if (fn) return fn(new Error('Process already listening on socket ' + port));
- });
- }
- else {
- try {
- fs.unlinkSync(port);
- } catch(e) {}
- self.server.listen(port, host, fn);
- }
- });
- }
- this.server.listen(port, host, fn);
- return this;
- };
|