123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501 |
- /*
- * nssocket.js - Wraps a TLS/TCP socket to emit namespace events also auto-buffers.
- *
- * (C) 2011, Charlie Robbins, Paolo Fragomeni, & the Contributors.
- *
- */
- var net = require('net'),
- tls = require('tls'),
- util = require('util'),
- events2 = require('eventemitter2'),
- Lazy = require('lazy'),
- common = require('./common');
- //
- // ### function NsSocket (socket, options)
- // #### @socket {Object} TCP or TLS 'socket' either from a 'connect' 'new' or from a server
- // #### @options {Object} Options for this NsSocket
- // NameSpace Socket, NsSocket, is a thin wrapper above TLS/TCP.
- // It provides automatic buffering and name space based data emits.
- //
- var NsSocket = exports.NsSocket = function (socket, options) {
- if (!(this instanceof NsSocket)) {
- return new NsSocket(socket, options);
- }
- //
- // If there is no Socket instnace to wrap,
- // create one.
- //
- if (!options) {
- options = socket;
- socket = common.createSocket(options);
- }
- //
- // Options should be
- //
- // {
- // type : 'tcp' or 'tls',
- // delimiter : '::', delimiter that separates between segments
- // msgLength : 3 //number of segments in a complete message
- // }
- //
- options = options || {};
- var self = this,
- startName;
- //
- // Setup underlying socket state.
- //
- this.socket = socket;
- this.connected = options.connected || socket.writable && socket.readable || false;
- //
- // Setup reconnect options.
- //
- this._reconnect = options.reconnect || false;
- this.retry = {
- retries: 0,
- max: options.maxRetries || 10,
- interval: options.retryInterval || 5000,
- wait: options.retryInterval || 5000
- };
- //
- // Setup default instance variables.
- //
- this._options = options;
- this._type = options.type || 'tcp4',
- this._delimiter = options.delimiter || '::';
- //
- // Setup encode format.
- //
- this._encode = options.encode || JSON.stringify;
- this._decode = options.decode || JSON.parse;
- events2.EventEmitter2.call(this, {
- delimiter: this._delimiter,
- wildcard: true,
- maxListeners: options.maxListeners || 10
- });
- this._setup();
- };
- //
- // Inherit from `events2.EventEmitter2`.
- //
- util.inherits(NsSocket, events2.EventEmitter2);
- //
- // ### function createServer (options, connectionListener)
- // #### @options {Object} **Optional**
- // Creates a new TCP/TLS server which wraps every incoming connection
- // in an instance of `NsSocket`.
- //
- exports.createServer = function createServer(options, connectionListener) {
- if (!connectionListener && typeof options === 'function') {
- connectionListener = options;
- options = {};
- }
- options.type = options.type || 'tcp4';
- options.delimiter = options.delimiter || '::';
- function onConnection (socket) {
- //
- // Incoming socket connections cannot reconnect
- // by definition.
- //
- options.reconnect = false;
- connectionListener(new NsSocket(socket, options));
- }
- return options.type === 'tls'
- ? tls.createServer(options, onConnection)
- : net.createServer(options, onConnection);
- };
- //
- // ### function send (data, callback)
- // #### @event {Array|string} The array (or string) that holds the event name
- // #### @data {Literal|Object} The data to be sent with the event.
- // #### @callback {Function} the callback function when send is done sending
- // The send function follows write/send rules for TCP/TLS/UDP
- // in that the callback is called when sending is complete, not when delivered
- //
- NsSocket.prototype.send = function send(event, data, callback) {
- var dataType = typeof data,
- message;
- // rebinds
- if (typeof event === 'string') {
- event = event.split(this._delimiter);
- }
- if (dataType === 'undefined' || dataType === 'function') {
- callback = data;
- data = null;
- }
- // if we aren't connected/socketed, then error
- if (!this.socket || !this.connected) {
- return this.emit('error', new Error('NsSocket: sending on a bad socket'));
- }
- message = Buffer(this._encode(event.concat(data)) + '\n');
- if (this.socket.cleartext) {
- this.socket.cleartext.write(message, callback);
- }
- else {
- // now actually write to the socket
- this.socket.write(message, callback);
- }
- };
- //
- // ### function data (event, callback)
- // #### @event {Array|string} Namespaced `data` event to listen to.
- // #### @callback {function} Continuation to call when the event is raised.
- // Shorthand function for listening to `['data', '*']` events.
- //
- NsSocket.prototype.data = function (event, callback) {
- if (typeof event === 'string') {
- event = event.split(this._delimiter);
- }
- this.on(['data'].concat(event), callback);
- };
- NsSocket.prototype.undata = function (event, listener) {
- this.off(['data'].concat(event), listener);
- };
- //
- // ### function data (event, callback)
- // #### @event {Array|string} Namespaced `data` event to listen to once.
- // #### @callback {function} Continuation to call when the event is raised.
- // Shorthand function for listening to `['data', '*']` events once.
- //
- NsSocket.prototype.dataOnce = function (event, callback) {
- if (typeof event === 'string') {
- event = event.split(this._delimiter);
- }
- this.once(['data'].concat(event), callback);
- };
- //
- // ### function setIdle (time, callback)
- // #### @time {Integer} how often to emit idle
- // Set the idle/timeout timer
- //
- NsSocket.prototype.setIdle = function setIdle(time) {
- this.socket.setTimeout(time);
- this._timeout = time;
- };
- //
- // ### function destroy (void)
- // #### forcibly destroys this nsSocket, unregister socket, remove all callbacks
- //
- NsSocket.prototype.destroy = function destroy() {
- if (this.socket) {
- try {
- this.socket.end(); // send FIN
- this.socket.destroy(); // make sure fd's are gone
- }
- catch (ex) {
- // do nothing on errors
- }
- }
- // clear buffer
- this.data = '';
- this.emit('destroy');
- // this should forcibly remove EVERY listener
- this.removeAllListeners();
- };
- //
- // ### function end (void)
- // #### closes the underlying socket, recommend you call destroy after
- //
- NsSocket.prototype.end = function end() {
- this.connected = false;
- if (this.socket) {
- try {
- this.socket.end();
- }
- catch (ex) {
- this.emit('error', ex);
- return;
- }
- this.socket = null;
- }
- return;
- };
- //
- // ### function connect (port[, host, callback])
- // A passthrough to the underlying socket's connect function
- //
- NsSocket.prototype.connect = function connect(/*port, host, callback*/) {
- var args = Array.prototype.slice.call(arguments),
- self = this,
- callback,
- host,
- port;
- args.forEach(function handle(arg) {
- var type = typeof arg;
- switch (type) {
- case 'number':
- port = arg;
- break;
- case 'string':
- host = arg;
- break;
- case 'function':
- callback = arg;
- break;
- default:
- self.emit('error', new Error('bad argument to connect'));
- break;
- }
- });
- this.port = port || this.port;
- this.host = host || this.host;
- this.host = this.host || '127.0.0.1';
- args = this.port ? [this.port, this.host] : [this.host];
- if (callback) {
- args.push(callback);
- }
- if (['tcp4', 'tls'].indexOf(this._type) === -1) {
- return this.emit('error', new Error('Unknown Socket Type'));
- }
- var errHandlers = self.listeners('error');
- if (errHandlers.length > 0) {
- //
- // copy the last error from nssocker onto the error event.
- //
- self.socket._events.error = errHandlers[errHandlers.length-1];
- }
- this.connected = true;
- this.socket.connect.apply(this.socket, args);
- };
- //
- // ### function reconnect ()
- // Attempts to reconnect the current socket on `close` or `error`.
- // This instance will attempt to reconnect until `this.retry.max` is reached,
- // with an interval increasing by powers of 10.
- //
- NsSocket.prototype.reconnect = function reconnect() {
- var self = this;
- //
- // Helper function containing the core reconnect logic
- //
- function doReconnect() {
- //
- // Cleanup and recreate the socket associated
- // with this instance.
- //
- self.retry.waiting = true;
- self.socket.removeAllListeners();
- self.socket = common.createSocket(self._options);
- //
- // Cleanup reconnect logic once the socket connects
- //
- self.socket.once('connect', function () {
- self.retry.waiting = false;
- self.retry.retries = 0;
- });
- //
- // Attempt to reconnect the socket
- //
- self._setup();
- self.connect();
- }
- //
- // Helper function which attempts to retry if
- // it is less than the maximum
- //
- function tryReconnect() {
- self.retry.retries++;
- if (self.retry.retries >= self.retry.max) {
- return self.emit('error', new Error('Did not reconnect after maximum retries: ' + self.retry.max));
- }
- doReconnect();
- }
- this.retry.wait = this.retry.interval * Math.pow(10, this.retry.retries);
- setTimeout(tryReconnect, this.retry.wait);
- };
- //
- // ### @private function _setup ()
- // Sets up the underlying socket associate with this instance.
- //
- NsSocket.prototype._setup = function () {
- var self = this,
- startName;
- function bindData(sock) {
- Lazy(sock)
- .lines
- .map(String)
- .forEach(self._onData.bind(self));
- }
- //
- // Because of how the code node.js `tls` module works, we have
- // to separate some bindings. The main difference is on
- // connection, some socket activities.
- //
- if (this._type === 'tcp4') {
- startName = 'connect';
- bindData(this.socket);
- // create a stub for the setKeepAlive functionality
- this.setKeepAlive = function () {
- self.socket.setKeepAlive.apply(self.socket, arguments);
- };
- }
- else if (this._type === 'tls') {
- startName = 'secureConnection';
- if (this.connected) {
- bindData(self.socket);
- } else {
- this.socket.once('connect', function () {
- bindData(self.socket.cleartext);
- });
- }
- // create a stub for the setKeepAlive functionality
- this.setKeepAlive = function () {
- self.socket.socket.setKeepAlive.apply(self.socket.socket, arguments);
- };
- }
- else {
- // bad arguments, so throw an error
- this.emit('error', new Error('Bad Option Argument [type]'));
- return null;
- }
- // make sure we listen to the underlying socket
- this.socket.on(startName, this._onStart.bind(this));
- this.socket.on('close', this._onClose.bind(this));
- if (this.socket.socket) {
- //
- // otherwise we get a error passed from net.js
- // they need to backport the fix from v5 to v4
- //
- this.socket.socket.on('error', this._onError.bind(this));
- }
- this.socket.on('error', this._onError.bind(this));
- this.socket.on('timeout', this._onIdle.bind(this));
- };
- //
- // ### @private function _onStart ()
- // Emits a start event when the underlying socket finish connecting
- // might be used to do other activities.
- //
- NsSocket.prototype._onStart = function _onStart() {
- this.emit('start');
- };
- //
- // ### @private function _onData (message)
- // #### @message {String} literal message from the data event of the socket
- // Messages are assumed to be delimited properly (if using nssocket to send)
- // otherwise the delimiter should exist at the end of every message
- // We assume messages arrive in order.
- //
- NsSocket.prototype._onData = function _onData(message) {
- var parsed,
- data;
- try {
- parsed = this._decode(message);
- data = parsed.pop();
- }
- catch (err) {
- //
- // Don't do anything, assume that the message is only partially
- // received.
- //
- }
- this.emit(['data'].concat(parsed), data);
- };
- //
- // ### @private function _onClose (hadError)
- // #### @hadError {Boolean} true if there was an error, which then include the
- // actual error included by the underlying socket
- //
- NsSocket.prototype._onClose = function _onClose(hadError) {
- this.connected = false;
- if (hadError) {
- this.emit('close', hadError, arguments[1]);
- }
- else {
- this.emit('close');
- }
- if (this._reconnect) {
- this.reconnect();
- }
- };
- //
- // ### @private function _onError (error)
- // #### @error {Error} emits and error event in place of the socket
- // Error event is raise with an error if there was one
- //
- NsSocket.prototype._onError = function _onError(error) {
- this.connected = false;
- if (!this._reconnect) {
- return this.emit('error', error || new Error('An Unknown Error occured'));
- }
- this.reconnect();
- };
- //
- // ### @private function _onIdle ()
- // #### Emits the idle event (based on timeout)
- //
- NsSocket.prototype._onIdle = function _onIdle() {
- this.emit('idle');
- if (this._timeout) {
- this.socket.setTimeout(this._timeout);
- }
- };
|