nssocket.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. /*
  2. * nssocket.js - Wraps a TLS/TCP socket to emit namespace events also auto-buffers.
  3. *
  4. * (C) 2011, Charlie Robbins, Paolo Fragomeni, & the Contributors.
  5. *
  6. */
  7. var net = require('net'),
  8. tls = require('tls'),
  9. util = require('util'),
  10. events2 = require('eventemitter2'),
  11. Lazy = require('lazy'),
  12. common = require('./common');
  13. //
  14. // ### function NsSocket (socket, options)
  15. // #### @socket {Object} TCP or TLS 'socket' either from a 'connect' 'new' or from a server
  16. // #### @options {Object} Options for this NsSocket
  17. // NameSpace Socket, NsSocket, is a thin wrapper above TLS/TCP.
  18. // It provides automatic buffering and name space based data emits.
  19. //
  20. var NsSocket = exports.NsSocket = function (socket, options) {
  21. if (!(this instanceof NsSocket)) {
  22. return new NsSocket(socket, options);
  23. }
  24. //
  25. // If there is no Socket instnace to wrap,
  26. // create one.
  27. //
  28. if (!options) {
  29. options = socket;
  30. socket = common.createSocket(options);
  31. }
  32. //
  33. // Options should be
  34. //
  35. // {
  36. // type : 'tcp' or 'tls',
  37. // delimiter : '::', delimiter that separates between segments
  38. // msgLength : 3 //number of segments in a complete message
  39. // }
  40. //
  41. options = options || {};
  42. var self = this,
  43. startName;
  44. //
  45. // Setup underlying socket state.
  46. //
  47. this.socket = socket;
  48. this.connected = options.connected || socket.writable && socket.readable || false;
  49. //
  50. // Setup reconnect options.
  51. //
  52. this._reconnect = options.reconnect || false;
  53. this.retry = {
  54. retries: 0,
  55. max: options.maxRetries || 10,
  56. interval: options.retryInterval || 5000,
  57. wait: options.retryInterval || 5000
  58. };
  59. //
  60. // Setup default instance variables.
  61. //
  62. this._options = options;
  63. this._type = options.type || 'tcp4',
  64. this._delimiter = options.delimiter || '::';
  65. //
  66. // Setup encode format.
  67. //
  68. this._encode = options.encode || JSON.stringify;
  69. this._decode = options.decode || JSON.parse;
  70. events2.EventEmitter2.call(this, {
  71. delimiter: this._delimiter,
  72. wildcard: true,
  73. maxListeners: options.maxListeners || 10
  74. });
  75. this._setup();
  76. };
  77. //
  78. // Inherit from `events2.EventEmitter2`.
  79. //
  80. util.inherits(NsSocket, events2.EventEmitter2);
  81. //
  82. // ### function createServer (options, connectionListener)
  83. // #### @options {Object} **Optional**
  84. // Creates a new TCP/TLS server which wraps every incoming connection
  85. // in an instance of `NsSocket`.
  86. //
  87. exports.createServer = function createServer(options, connectionListener) {
  88. if (!connectionListener && typeof options === 'function') {
  89. connectionListener = options;
  90. options = {};
  91. }
  92. options.type = options.type || 'tcp4';
  93. options.delimiter = options.delimiter || '::';
  94. function onConnection (socket) {
  95. //
  96. // Incoming socket connections cannot reconnect
  97. // by definition.
  98. //
  99. options.reconnect = false;
  100. connectionListener(new NsSocket(socket, options));
  101. }
  102. return options.type === 'tls'
  103. ? tls.createServer(options, onConnection)
  104. : net.createServer(options, onConnection);
  105. };
  106. //
  107. // ### function send (data, callback)
  108. // #### @event {Array|string} The array (or string) that holds the event name
  109. // #### @data {Literal|Object} The data to be sent with the event.
  110. // #### @callback {Function} the callback function when send is done sending
  111. // The send function follows write/send rules for TCP/TLS/UDP
  112. // in that the callback is called when sending is complete, not when delivered
  113. //
  114. NsSocket.prototype.send = function send(event, data, callback) {
  115. var dataType = typeof data,
  116. message;
  117. // rebinds
  118. if (typeof event === 'string') {
  119. event = event.split(this._delimiter);
  120. }
  121. if (dataType === 'undefined' || dataType === 'function') {
  122. callback = data;
  123. data = null;
  124. }
  125. // if we aren't connected/socketed, then error
  126. if (!this.socket || !this.connected) {
  127. return this.emit('error', new Error('NsSocket: sending on a bad socket'));
  128. }
  129. message = Buffer(this._encode(event.concat(data)) + '\n');
  130. if (this.socket.cleartext) {
  131. this.socket.cleartext.write(message, callback);
  132. }
  133. else {
  134. // now actually write to the socket
  135. this.socket.write(message, callback);
  136. }
  137. };
  138. //
  139. // ### function data (event, callback)
  140. // #### @event {Array|string} Namespaced `data` event to listen to.
  141. // #### @callback {function} Continuation to call when the event is raised.
  142. // Shorthand function for listening to `['data', '*']` events.
  143. //
  144. NsSocket.prototype.data = function (event, callback) {
  145. if (typeof event === 'string') {
  146. event = event.split(this._delimiter);
  147. }
  148. this.on(['data'].concat(event), callback);
  149. };
  150. NsSocket.prototype.undata = function (event, listener) {
  151. this.off(['data'].concat(event), listener);
  152. };
  153. //
  154. // ### function data (event, callback)
  155. // #### @event {Array|string} Namespaced `data` event to listen to once.
  156. // #### @callback {function} Continuation to call when the event is raised.
  157. // Shorthand function for listening to `['data', '*']` events once.
  158. //
  159. NsSocket.prototype.dataOnce = function (event, callback) {
  160. if (typeof event === 'string') {
  161. event = event.split(this._delimiter);
  162. }
  163. this.once(['data'].concat(event), callback);
  164. };
  165. //
  166. // ### function setIdle (time, callback)
  167. // #### @time {Integer} how often to emit idle
  168. // Set the idle/timeout timer
  169. //
  170. NsSocket.prototype.setIdle = function setIdle(time) {
  171. this.socket.setTimeout(time);
  172. this._timeout = time;
  173. };
  174. //
  175. // ### function destroy (void)
  176. // #### forcibly destroys this nsSocket, unregister socket, remove all callbacks
  177. //
  178. NsSocket.prototype.destroy = function destroy() {
  179. if (this.socket) {
  180. try {
  181. this.socket.end(); // send FIN
  182. this.socket.destroy(); // make sure fd's are gone
  183. }
  184. catch (ex) {
  185. // do nothing on errors
  186. }
  187. }
  188. // clear buffer
  189. this.data = '';
  190. this.emit('destroy');
  191. // this should forcibly remove EVERY listener
  192. this.removeAllListeners();
  193. };
  194. //
  195. // ### function end (void)
  196. // #### closes the underlying socket, recommend you call destroy after
  197. //
  198. NsSocket.prototype.end = function end() {
  199. this.connected = false;
  200. if (this.socket) {
  201. try {
  202. this.socket.end();
  203. }
  204. catch (ex) {
  205. this.emit('error', ex);
  206. return;
  207. }
  208. this.socket = null;
  209. }
  210. return;
  211. };
  212. //
  213. // ### function connect (port[, host, callback])
  214. // A passthrough to the underlying socket's connect function
  215. //
  216. NsSocket.prototype.connect = function connect(/*port, host, callback*/) {
  217. var args = Array.prototype.slice.call(arguments),
  218. self = this,
  219. callback,
  220. host,
  221. port;
  222. args.forEach(function handle(arg) {
  223. var type = typeof arg;
  224. switch (type) {
  225. case 'number':
  226. port = arg;
  227. break;
  228. case 'string':
  229. host = arg;
  230. break;
  231. case 'function':
  232. callback = arg;
  233. break;
  234. default:
  235. self.emit('error', new Error('bad argument to connect'));
  236. break;
  237. }
  238. });
  239. this.port = port || this.port;
  240. this.host = host || this.host;
  241. this.host = this.host || '127.0.0.1';
  242. args = this.port ? [this.port, this.host] : [this.host];
  243. if (callback) {
  244. args.push(callback);
  245. }
  246. if (['tcp4', 'tls'].indexOf(this._type) === -1) {
  247. return this.emit('error', new Error('Unknown Socket Type'));
  248. }
  249. var errHandlers = self.listeners('error');
  250. if (errHandlers.length > 0) {
  251. //
  252. // copy the last error from nssocker onto the error event.
  253. //
  254. self.socket._events.error = errHandlers[errHandlers.length-1];
  255. }
  256. this.connected = true;
  257. this.socket.connect.apply(this.socket, args);
  258. };
  259. //
  260. // ### function reconnect ()
  261. // Attempts to reconnect the current socket on `close` or `error`.
  262. // This instance will attempt to reconnect until `this.retry.max` is reached,
  263. // with an interval increasing by powers of 10.
  264. //
  265. NsSocket.prototype.reconnect = function reconnect() {
  266. var self = this;
  267. //
  268. // Helper function containing the core reconnect logic
  269. //
  270. function doReconnect() {
  271. //
  272. // Cleanup and recreate the socket associated
  273. // with this instance.
  274. //
  275. self.retry.waiting = true;
  276. self.socket.removeAllListeners();
  277. self.socket = common.createSocket(self._options);
  278. //
  279. // Cleanup reconnect logic once the socket connects
  280. //
  281. self.socket.once('connect', function () {
  282. self.retry.waiting = false;
  283. self.retry.retries = 0;
  284. });
  285. //
  286. // Attempt to reconnect the socket
  287. //
  288. self._setup();
  289. self.connect();
  290. }
  291. //
  292. // Helper function which attempts to retry if
  293. // it is less than the maximum
  294. //
  295. function tryReconnect() {
  296. self.retry.retries++;
  297. if (self.retry.retries >= self.retry.max) {
  298. return self.emit('error', new Error('Did not reconnect after maximum retries: ' + self.retry.max));
  299. }
  300. doReconnect();
  301. }
  302. this.retry.wait = this.retry.interval * Math.pow(10, this.retry.retries);
  303. setTimeout(tryReconnect, this.retry.wait);
  304. };
  305. //
  306. // ### @private function _setup ()
  307. // Sets up the underlying socket associate with this instance.
  308. //
  309. NsSocket.prototype._setup = function () {
  310. var self = this,
  311. startName;
  312. function bindData(sock) {
  313. Lazy(sock)
  314. .lines
  315. .map(String)
  316. .forEach(self._onData.bind(self));
  317. }
  318. //
  319. // Because of how the code node.js `tls` module works, we have
  320. // to separate some bindings. The main difference is on
  321. // connection, some socket activities.
  322. //
  323. if (this._type === 'tcp4') {
  324. startName = 'connect';
  325. bindData(this.socket);
  326. // create a stub for the setKeepAlive functionality
  327. this.setKeepAlive = function () {
  328. self.socket.setKeepAlive.apply(self.socket, arguments);
  329. };
  330. }
  331. else if (this._type === 'tls') {
  332. startName = 'secureConnection';
  333. if (this.connected) {
  334. bindData(self.socket);
  335. } else {
  336. this.socket.once('connect', function () {
  337. bindData(self.socket.cleartext);
  338. });
  339. }
  340. // create a stub for the setKeepAlive functionality
  341. this.setKeepAlive = function () {
  342. self.socket.socket.setKeepAlive.apply(self.socket.socket, arguments);
  343. };
  344. }
  345. else {
  346. // bad arguments, so throw an error
  347. this.emit('error', new Error('Bad Option Argument [type]'));
  348. return null;
  349. }
  350. // make sure we listen to the underlying socket
  351. this.socket.on(startName, this._onStart.bind(this));
  352. this.socket.on('close', this._onClose.bind(this));
  353. if (this.socket.socket) {
  354. //
  355. // otherwise we get a error passed from net.js
  356. // they need to backport the fix from v5 to v4
  357. //
  358. this.socket.socket.on('error', this._onError.bind(this));
  359. }
  360. this.socket.on('error', this._onError.bind(this));
  361. this.socket.on('timeout', this._onIdle.bind(this));
  362. };
  363. //
  364. // ### @private function _onStart ()
  365. // Emits a start event when the underlying socket finish connecting
  366. // might be used to do other activities.
  367. //
  368. NsSocket.prototype._onStart = function _onStart() {
  369. this.emit('start');
  370. };
  371. //
  372. // ### @private function _onData (message)
  373. // #### @message {String} literal message from the data event of the socket
  374. // Messages are assumed to be delimited properly (if using nssocket to send)
  375. // otherwise the delimiter should exist at the end of every message
  376. // We assume messages arrive in order.
  377. //
  378. NsSocket.prototype._onData = function _onData(message) {
  379. var parsed,
  380. data;
  381. try {
  382. parsed = this._decode(message);
  383. data = parsed.pop();
  384. }
  385. catch (err) {
  386. //
  387. // Don't do anything, assume that the message is only partially
  388. // received.
  389. //
  390. }
  391. this.emit(['data'].concat(parsed), data);
  392. };
  393. //
  394. // ### @private function _onClose (hadError)
  395. // #### @hadError {Boolean} true if there was an error, which then include the
  396. // actual error included by the underlying socket
  397. //
  398. NsSocket.prototype._onClose = function _onClose(hadError) {
  399. this.connected = false;
  400. if (hadError) {
  401. this.emit('close', hadError, arguments[1]);
  402. }
  403. else {
  404. this.emit('close');
  405. }
  406. if (this._reconnect) {
  407. this.reconnect();
  408. }
  409. };
  410. //
  411. // ### @private function _onError (error)
  412. // #### @error {Error} emits and error event in place of the socket
  413. // Error event is raise with an error if there was one
  414. //
  415. NsSocket.prototype._onError = function _onError(error) {
  416. this.connected = false;
  417. if (!this._reconnect) {
  418. return this.emit('error', error || new Error('An Unknown Error occured'));
  419. }
  420. this.reconnect();
  421. };
  422. //
  423. // ### @private function _onIdle ()
  424. // #### Emits the idle event (based on timeout)
  425. //
  426. NsSocket.prototype._onIdle = function _onIdle() {
  427. this.emit('idle');
  428. if (this._timeout) {
  429. this.socket.setTimeout(this._timeout);
  430. }
  431. };