InteractorClient.js 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. 'use strict'
  2. const log = require('debug')('interactor:client')
  3. const fs = require('fs')
  4. const path = require('path')
  5. const rpc = require('pm2-axon-rpc')
  6. const axon = require('pm2-axon')
  7. const chalk = require('chalk')
  8. const os = require('os')
  9. const constants = require('../constants')
  10. const childProcess = require('child_process')
  11. const printError = function (msg) {
  12. if (process.env.PM2_SILENT || process.env.PM2_PROGRAMMATIC) return false
  13. if (msg instanceof Error) return console.error(msg.message)
  14. return console.error.apply(console, arguments)
  15. }
  16. const printOut = function (msg) {
  17. if (process.env.PM2_SILENT || process.env.PM2_PROGRAMMATIC) return false
  18. return console.log.apply(console, arguments)
  19. }
  20. module.exports = class InteractorDaemonizer {
  21. /**
  22. * Ping the Interactor to see if its online
  23. * @param {Object} opts global constants
  24. * @param {String} opts.INTERACTOR_RPC_PORT path used to connect to the interactor
  25. * @param {Function} cb invoked with <err, result>
  26. */
  27. static ping (opts, cb) {
  28. if (typeof cb !== 'function') {
  29. throw new Error('Missing parameters')
  30. } else if (typeof opts !== 'object' || !opts || !opts.INTERACTOR_RPC_PORT) {
  31. return cb(new Error('Missing parameters'))
  32. }
  33. const req = axon.socket('req')
  34. const client = new rpc.Client(req)
  35. log('[PING INTERACTOR] Trying to connect to Interactor daemon')
  36. client.sock.once('reconnect attempt', _ => {
  37. client.sock.close()
  38. log('Interactor Daemon not launched')
  39. return cb(null, false)
  40. })
  41. client.sock.once('connect', _ => {
  42. client.sock.once('close', _ => {
  43. return cb(null, true)
  44. })
  45. client.sock.close()
  46. log('Interactor Daemon alive')
  47. })
  48. client.sock.once('error', (e) => {
  49. if (e.code === 'EACCES') {
  50. fs.stat(opts.INTERACTOR_RPC_PORT, (e, stats) => {
  51. if (stats.uid === 0) {
  52. console.error('Permission denied, activate current user')
  53. return process.exit(1)
  54. }
  55. })
  56. } else {
  57. console.error('unexpected error')
  58. console.error(e)
  59. }
  60. })
  61. req.connect(opts.INTERACTOR_RPC_PORT)
  62. }
  63. /**
  64. * Try to kill the interactor daemon via RPC
  65. * @param {Object} conf global constants
  66. * @param {String} conf.INTERACTOR_RPC_PORT path used to connect to the interactor
  67. * @param {Function} cb invoked with <err>
  68. */
  69. static killInteractorDaemon (conf, cb) {
  70. process.env.PM2_INTERACTOR_PROCESSING = true
  71. log('Killing interactor #1 ping')
  72. this.ping(conf, (err, online) => {
  73. log(`Interactor is ${!online || err ? 'offline' : 'online'}`)
  74. if (!online || err) {
  75. return cb ? err ? cb(err) : cb(new Error('Interactor not launched')) : printError('Interactor not launched')
  76. }
  77. this.launchRPC(conf, (err, data) => {
  78. if (err) {
  79. setTimeout(_ => {
  80. this.disconnectRPC(cb)
  81. }, 100)
  82. return false
  83. }
  84. this.rpc.kill((err) => {
  85. if (err) printError(err)
  86. setTimeout(_ => {
  87. this.disconnectRPC(cb)
  88. }, 100)
  89. })
  90. return false
  91. })
  92. return false
  93. })
  94. }
  95. /**
  96. * Start a RPC client that connect to the InteractorDaemon
  97. * @param {Object} conf global constants
  98. * @param {Function} cb invoked with <err>
  99. */
  100. static launchRPC (conf, cb) {
  101. const req = axon.socket('req')
  102. this.rpc = {}
  103. this.client = new rpc.Client(req)
  104. log('Generating Interactor methods of RPC client')
  105. // attach known methods to RPC client
  106. const generateMethods = (cb) => {
  107. this.client.methods((err, methods) => {
  108. if (err) return cb(err)
  109. Object.keys(methods).forEach((key) => {
  110. let method = methods[key]
  111. log('+ Adding %s method to interactor RPC client', method.name);
  112. ((name) => {
  113. let self = this
  114. this.rpc[name] = function () {
  115. let args = Array.prototype.slice.call(arguments)
  116. args.unshift(name)
  117. self.client.call.apply(self.client, args)
  118. }
  119. })(method.name)
  120. })
  121. return cb()
  122. })
  123. }
  124. this.client.sock.once('reconnect attempt', (err) => {
  125. this.client.sock.removeAllListeners()
  126. return cb(err, { success: false, msg: 'reconnect attempt' })
  127. })
  128. this.client.sock.once('error', (err) => {
  129. log('-- Error in error catch all on Interactor --', err)
  130. return cb(err, { success: false, msg: 'reconnect attempt' })
  131. })
  132. this.client.sock.once('connect', () => {
  133. this.client.sock.removeAllListeners()
  134. generateMethods(_ => {
  135. log('Methods of RPC client for Interaction ready.')
  136. return cb(null, { success: true })
  137. })
  138. })
  139. this.client_sock = req.connect(conf.INTERACTOR_RPC_PORT)
  140. }
  141. /**
  142. * Start or Restart the Interaction Daemon depending if its online or not
  143. * @private
  144. * @param {Object} conf global constants
  145. * @param {Object} infos data used to start the interactor [can be recovered from FS]
  146. * @param {String} infos.secret_key the secret key used to cipher data
  147. * @param {String} infos.public_key the public key used identify the user
  148. * @param {String} infos.machine_name [optional] override name of the machine
  149. * @param {Function} cb invoked with <err, msg, process>
  150. */
  151. static daemonize (cst, conf, cb) {
  152. const InteractorJS = path.resolve(path.dirname(module.filename), 'InteractorDaemon.js')
  153. const PM2Path = require.main.filename
  154. // Redirect PM2 internal err and out
  155. // to STDERR STDOUT when running with Travis
  156. const testEnv = process.env.TRAVIS || (process.env.NODE_ENV && process.env.NODE_ENV.match(/test/))
  157. const out = testEnv ? 1 : fs.openSync(constants.INTERACTOR_LOG_FILE_PATH, 'a')
  158. const err = testEnv ? 2 : fs.openSync(constants.INTERACTOR_LOG_FILE_PATH, 'a')
  159. let binary = process.execPath
  160. if (binary.indexOf('node') === -1) {
  161. binary = 'node'
  162. }
  163. if (process.env.NODEJS_EXECUTABLE) {
  164. binary = process.env.NODEJS_EXECUTABLE
  165. }
  166. const child = childProcess.spawn(binary, [InteractorJS], {
  167. silent: false,
  168. detached: true,
  169. cwd: process.cwd(),
  170. env: Object.assign({
  171. PM2_HOME: cst.PM2_HOME,
  172. PM2_MACHINE_NAME: conf.machine_name,
  173. PM2_SECRET_KEY: conf.secret_key,
  174. PM2_PUBLIC_KEY: conf.public_key,
  175. PM2_REVERSE_INTERACT: conf.reverse_interact,
  176. PM2_BINARY_PATH: PM2Path,
  177. KEYMETRICS_NODE: conf.info_node,
  178. PM2_VERSION: conf.pm2_version,
  179. DEBUG: process.env.DEBUG || 'interactor:*,-interactor:axon,-interactor:websocket,-interactor:pm2:client,-interactor:push'
  180. }, process.env),
  181. stdio: ['ipc', out, err]
  182. })
  183. try {
  184. let prevPid = fs.readFileSync(constants.INTERACTOR_PID_PATH)
  185. prevPid = parseInt(prevPid)
  186. process.kill(prevPid)
  187. } catch (e) {
  188. }
  189. let pid = ''
  190. if (child.pid)
  191. pid = child.pid.toString()
  192. fs.writeFileSync(cst.INTERACTOR_PID_PATH, pid)
  193. child.on('close', (status) => {
  194. if (status === constants.ERROR_EXIT) {
  195. return cb(new Error('Agent has shutdown for unknown reason'))
  196. }
  197. return cb()
  198. })
  199. child.once('error', (err) => {
  200. log('Error when launching Interactor, please check the agent logs')
  201. return cb(err)
  202. })
  203. child.unref()
  204. const timeout = setTimeout(_ => {
  205. printOut(`${chalk.yellow('[PM2.IO][WARNING]')} Not managed to connect to PM2 Plus, retrying in background.`)
  206. child.removeAllListeners()
  207. child.disconnect()
  208. return cb(null, {}, child)
  209. }, 7000)
  210. child.once('message', (msg) => {
  211. clearTimeout(timeout)
  212. log('Interactor daemon launched :', msg)
  213. if (msg.log) {
  214. return cb(null, msg, child)
  215. }
  216. child.removeAllListeners('error')
  217. child.disconnect()
  218. // Handle and show to user the different error message that can happen
  219. if (msg.km_data && msg.km_data.error === true) {
  220. if (!process.env.PM2_SILENT) {
  221. console.log(chalk.red('[PM2.IO][ERROR]'), msg.km_data.msg)
  222. console.log(chalk.cyan('[PM2.IO]') + ' Contact support contact@keymetrics.io and send us the error message')
  223. }
  224. return cb(msg)
  225. } else if (msg.km_data && msg.km_data.disabled === true) {
  226. if (!process.env.PM2_SILENT) {
  227. console.log(chalk.cyan('[PM2.IO]') + ' Server DISABLED BY ADMINISTRATION contact support contact@keymetrics.io with reference to your public and secret keys)')
  228. }
  229. return cb(msg)
  230. } else if (msg.km_data && msg.km_data.error === true) {
  231. if (!process.env.PM2_SILENT) {
  232. console.log('%s %s (Public: %s) (Secret: %s) (Machine name: %s)', chalk.red('[PM2.IO][ERROR]'),
  233. msg.km_data.msg, msg.public_key, msg.secret_key, msg.machine_name)
  234. }
  235. return cb(msg)
  236. } else if (msg.km_data && msg.km_data.active === false && msg.km_data.pending === true) {
  237. if (!process.env.PM2_SILENT) {
  238. console.log('%s You must upgrade your bucket in order to monitor more servers.', chalk.red('[PM2.IO]'))
  239. }
  240. return cb(msg)
  241. }
  242. return cb(null, msg, child)
  243. })
  244. }
  245. /**
  246. * Start or Restart the Interaction Daemon depending if its online or not
  247. * @private
  248. * @param {Object} conf global constants
  249. * @param {Object} infos data used to start the interactor [can be recovered from FS]
  250. * @param {String} infos.secret_key the secret key used to cipher data
  251. * @param {String} infos.public_key the public key used identify the user
  252. * @param {String} infos.machine_name [optional] override name of the machine
  253. * @param {Function} cb invoked with <err, msg, process>
  254. */
  255. static launchOrAttach (conf, infos, cb) {
  256. this.ping(conf, (err, online) => {
  257. if (!err && online) {
  258. log('Interactor online, restarting it...')
  259. this.launchRPC(conf, _ => {
  260. this.rpc.kill((ignoredErr) => {
  261. this.daemonize(conf, infos, cb)
  262. })
  263. })
  264. } else {
  265. log('Interactor offline, launching it...')
  266. this.daemonize(conf, infos, cb)
  267. }
  268. })
  269. }
  270. /**
  271. * Restart the Interactor Daemon
  272. * @param {Object} conf global constants
  273. * @param {Function} cb invoked with <err, msg>
  274. */
  275. static update (conf, cb) {
  276. this.ping(conf, (err, online) => {
  277. if (err || !online) {
  278. return cb ? cb(new Error('Interactor not launched')) : printError('Interactor not launched')
  279. }
  280. this.launchRPC(conf, _ => {
  281. this.rpc.kill((err) => {
  282. if (err) {
  283. return cb ? cb(err) : printError(err)
  284. }
  285. printOut('Interactor successfully killed')
  286. setTimeout(_ => {
  287. this.launchAndInteract(conf, {}, _ => {
  288. return cb(null, { msg: 'Daemon launched' })
  289. })
  290. }, 500)
  291. })
  292. })
  293. })
  294. }
  295. /**
  296. * Retrieve Interactor configuration from env, params and filesystem.
  297. * @param {Object} cst global constants
  298. * @param {Object} infos data used to start the interactor [optional]
  299. * @param {String} infos.secret_key the secret key used to cipher data [optional]
  300. * @param {String} infos.public_key the public key used identify the user [optional]
  301. * @param {String} infos.machine_name override name of the machine [optional]
  302. * @param {Function} cb invoked with <err, configuration>
  303. */
  304. static getOrSetConf (cst, infos, cb) {
  305. infos = infos || {}
  306. let configuration = {
  307. version_management: {
  308. active: true
  309. }
  310. }
  311. let confFS = {}
  312. // Try loading configuration file on FS
  313. try {
  314. let fileContent = fs.readFileSync(cst.INTERACTION_CONF).toString()
  315. // Handle old configuration with json5
  316. fileContent = fileContent.replace(/\s(\w+):/g, '"$1":')
  317. // parse
  318. confFS = JSON.parse(fileContent)
  319. if (confFS.version_management) {
  320. configuration.version_management.active = confFS.version_management.active
  321. }
  322. } catch (e) {
  323. log('Interaction file does not exists')
  324. }
  325. // load the configration (first have priority)
  326. // -> from env variable
  327. // -> from params (eg. CLI)
  328. // -> from configuration on FS
  329. configuration.public_key = process.env.PM2_PUBLIC_KEY || process.env.KEYMETRICS_PUBLIC || infos.public_key || confFS.public_key
  330. configuration.secret_key = process.env.PM2_SECRET_KEY || process.env.KEYMETRICS_SECRET || infos.secret_key || confFS.secret_key
  331. configuration.machine_name = process.env.PM2_MACHINE_NAME || process.env.INSTANCE_NAME || infos.machine_name || confFS.machine_name || `${os.hostname()}-${require('crypto').randomBytes(2).toString('hex')}`
  332. configuration.pm2_version = process.env.PM2_VERSION || infos.pm2_version || confFS.pm2_version
  333. configuration.reverse_interact = confFS.reverse_interact || true
  334. // is setup empty ? use the one provided in env OR root OTHERWISE get the one on FS conf OR fallback on root
  335. configuration.info_node = process.env.KEYMETRICS_NODE || infos.info_node || confFS.info_node || cst.KEYMETRICS_ROOT_URL
  336. if (!configuration.secret_key) {
  337. log('Secret key is not defined in configuration', configuration)
  338. return cb(new Error('secret key is not defined'))
  339. }
  340. if (!configuration.public_key) {
  341. log('Public key is not defined in configuration', configuration)
  342. return cb(new Error('public key is not defined'))
  343. }
  344. // write configuration on FS
  345. try {
  346. fs.writeFileSync(cst.INTERACTION_CONF, JSON.stringify(configuration, null, 4))
  347. } catch (e) {
  348. console.error('Error when writting configuration file %s', cst.INTERACTION_CONF)
  349. return cb(e)
  350. }
  351. if (configuration.info_node.indexOf('http') === -1) { // handle old file
  352. configuration.info_node = `https://${configuration.info_node}`
  353. }
  354. return cb(null, configuration)
  355. }
  356. /**
  357. * Disconnect the RPC client from Interactor Daemon
  358. * @param {Function} cb invoked with <err, msg>
  359. */
  360. static disconnectRPC (cb) {
  361. log('Disconnect RPC')
  362. if (!this.client_sock || !this.client_sock.close) {
  363. log('RPC not launched')
  364. return cb(null, {
  365. success: false,
  366. msg: 'RPC connection to Interactor Daemon is not launched'
  367. })
  368. }
  369. if (this.client_sock.closing === true) {
  370. log('RPC already closed')
  371. return cb(null, {
  372. success: false,
  373. msg: 'RPC closed'
  374. })
  375. }
  376. try {
  377. let timer
  378. log('Closing RPC INTERACTOR')
  379. this.client_sock.once('close', _ => {
  380. log('RPC INTERACTOR cleanly closed')
  381. clearTimeout(timer)
  382. return cb ? cb(null, { success: true }) : false
  383. })
  384. timer = setTimeout(_ => {
  385. if (this.client_sock.destroy) {
  386. this.client_sock.destroy()
  387. }
  388. return cb ? cb(null, { success: true }) : false
  389. }, 200)
  390. this.client_sock.close()
  391. } catch (err) {
  392. log('Error while closing RPC INTERACTOR : %s', err.message || err)
  393. return cb ? cb(err) : false
  394. }
  395. }
  396. /**
  397. * Start the Interactor Daemon
  398. * @param {Object} cst global constants
  399. * @param {Object} infos data used to start the interactor [can be recovered from FS]
  400. * @param {String} infos.secret_key the secret key used to cipher data
  401. * @param {String} infos.public_key the public key used identify the user
  402. * @param {String} infos.machine_name [optional] override name of the machine
  403. * @param {Function} cb invoked with <err, msg, process>
  404. */
  405. static launchAndInteract (cst, opts, cb) {
  406. // For Watchdog
  407. if (process.env.PM2_AGENT_ONLINE) {
  408. return cb()
  409. }
  410. process.env.PM2_INTERACTOR_PROCESSING = true
  411. this.getOrSetConf(Object.assign(cst, constants), opts, (err, conf) => {
  412. if (err || !conf) return cb(err || new Error('Cant retrieve configuration'))
  413. if (!process.env.PM2_SILENT) {
  414. console.log(chalk.cyan('[PM2 I/O]') + ' Using: Public key: %s | Private key: %s | Machine name: %s', conf.public_key, conf.secret_key, conf.machine_name)
  415. }
  416. return this.launchOrAttach(cst, conf, cb)
  417. })
  418. }
  419. /**
  420. * Retrieve configuration used by the Interaction Daemon
  421. * @param {Object} cst global constants
  422. * @param {Function} cb invoked with <err, data>
  423. */
  424. static getInteractInfo (cst, cb) {
  425. log('Getting interaction info')
  426. if (process.env.PM2_NO_INTERACTION) return cb(new Error('PM2_NO_INTERACTION set'))
  427. this.ping(cst, (err, online) => {
  428. if (err || !online) return cb(new Error('Interactor is offline'))
  429. this.launchRPC(cst, _ => {
  430. this.rpc.getInfos((err, infos) => {
  431. if (err) return cb(err)
  432. // Avoid general CLI to interfere with Keymetrics CLI commands
  433. if (process.env.PM2_INTERACTOR_PROCESSING) return cb(null, infos)
  434. this.disconnectRPC(() => {
  435. return cb(null, infos)
  436. })
  437. })
  438. })
  439. })
  440. }
  441. }