InteractorDaemon.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. 'use strict'
  2. const fs = require('fs')
  3. const rpc = require('pm2-axon-rpc')
  4. const axon = require('pm2-axon')
  5. const log = require('debug')('interactor:daemon')
  6. const os = require('os')
  7. const cst = require('../constants.js')
  8. const ReverseInteractor = require('./reverse/ReverseInteractor.js')
  9. const PushInteractor = require('./push/PushInteractor.js')
  10. const Utility = require('./Utility.js')
  11. const PM2Client = require('./PM2Client.js')
  12. const TransporterInterface = require('./TransporterInterface.js')
  13. const domain = require('domain') // eslint-disable-line
  14. const WatchDog = require('./WatchDog')
  15. const InteractorClient = require('./InteractorClient')
  16. const semver = require('semver')
  17. const path = require('path')
  18. const pkg = require('../package.json')
  19. global._logs = false
  20. const InteractorDaemon = module.exports = class InteractorDaemon {
  21. constructor () {
  22. this.opts = this.retrieveConf()
  23. log(`MACHINE_NAME=${this.opts.MACHINE_NAME}`)
  24. log(`PUBLIC_KEY=${this.opts.PUBLIC_KEY}`)
  25. log(`ROOT_URL=${cst.KEYMETRICS_ROOT_URL}`)
  26. this.DAEMON_ACTIVE = false
  27. this.transport = new TransporterInterface(this.opts, this)
  28. .bind('websocket')
  29. this.transport.on('error', (err) => {
  30. return console.error('[NETWORK] Error : ' + err.message || err)
  31. })
  32. this.httpClient = new Utility.HTTPClient()
  33. this._online = true
  34. this._internalDebugger()
  35. }
  36. /**
  37. * Use process.send() if connected
  38. * @param {Object} data
  39. */
  40. sendToParent (data) {
  41. if (!process.connected || !process.send) return console.log('Could not send data to parent')
  42. try {
  43. process.send(data)
  44. } catch (e) {
  45. console.trace('Parent process disconnected')
  46. }
  47. }
  48. /**
  49. * Get an interface for communicating with PM2 daemon
  50. * @private
  51. * @return {PM2Client}
  52. */
  53. getPM2Client () {
  54. if (!this._ipm2) {
  55. this._ipm2 = new PM2Client()
  56. }
  57. return this._ipm2
  58. }
  59. /**
  60. * Terminate aconnections and exit
  61. * @param {cb} callback called at the end
  62. */
  63. exit (err, cb) {
  64. log('Exiting Interactor')
  65. // clear workers
  66. if (this._workerEndpoint) clearInterval(this._workerEndpoint)
  67. // stop interactors
  68. if (this.reverse) this.reverse.stop()
  69. if (this.push) this.push.stop()
  70. if (this._ipm2) this._ipm2.disconnect()
  71. if (this.watchDog) this.watchDog.stop()
  72. // stop transport
  73. if (this.transport) this.transport.disconnect()
  74. if (!err) {
  75. try {
  76. fs.unlinkSync(cst.INTERACTOR_RPC_PORT)
  77. fs.unlinkSync(cst.INTERACTOR_PID_PATH)
  78. } catch (err) {}
  79. }
  80. if (!this._rpc || !this._rpc.sock) {
  81. return process.exit(cst.ERROR_EXIT)
  82. }
  83. if (typeof cb === 'function') {
  84. cb()
  85. }
  86. setTimeout(() => {
  87. this._rpc.sock.close(() => {
  88. log('RPC server closed')
  89. process.exit(err ? cst.ERROR_EXIT : cst.SUCCESS_EXIT)
  90. })
  91. }, 10)
  92. }
  93. /**
  94. * Start a RPC server and expose it throught a socket file
  95. */
  96. startRPC (opts) {
  97. log('Launching Interactor RPC server (bind to %s)', cst.INTERACTOR_RPC_PORT)
  98. const rep = axon.socket('rep')
  99. const rpcServer = new rpc.Server(rep)
  100. const self = this
  101. rep.bind(cst.INTERACTOR_RPC_PORT)
  102. rpcServer.expose({
  103. kill: function (cb) {
  104. log('Shutdown request received via RPC')
  105. return self.exit(null, cb)
  106. },
  107. getInfos: function (cb) {
  108. if (self.opts && self.DAEMON_ACTIVE === true) {
  109. return cb(null, {
  110. machine_name: self.opts.MACHINE_NAME,
  111. public_key: self.opts.PUBLIC_KEY,
  112. secret_key: self.opts.SECRET_KEY,
  113. remote_host: self.km_data.endpoints.web,
  114. connected: self.transport.isConnected(),
  115. transporters: self.transport.getActiveTransporters(),
  116. socket_path: cst.INTERACTOR_RPC_PORT,
  117. pm2_home_monitored: cst.PM2_HOME
  118. })
  119. } else {
  120. return cb(null)
  121. }
  122. }
  123. })
  124. return rpcServer
  125. }
  126. /**
  127. * Handle specific signals to launch memory / cpu profiling
  128. * if available in node
  129. */
  130. _internalDebugger () {
  131. // inspector isn't available under node 8
  132. if (semver.satisfies(process.version, '<8')) return
  133. const inspector = require('inspector')
  134. const state = {
  135. heap: false,
  136. cpu: false,
  137. session: null
  138. }
  139. const commands = {
  140. heap: {
  141. start: 'HeapProfiler.startSampling',
  142. stop: 'HeapProfiler.stopSampling'
  143. },
  144. cpu: {
  145. start: 'Profiler.start',
  146. stop: 'Profiler.stop'
  147. }
  148. }
  149. const handleSignal = type => {
  150. return _ => {
  151. if (state.session === null) {
  152. state.session = new inspector.Session()
  153. state.session.connect()
  154. }
  155. const isAlreadyEnabled = state[type]
  156. const debuggerCommands = commands[type]
  157. const profilerDomain = type === 'cpu' ? 'Profiler' : 'HeapProfiler'
  158. const fileExt = type === 'heap' ? '.heapprofile' : '.cpuprofile'
  159. if (isAlreadyEnabled) {
  160. // stopping the profiling and writting it to disk if its running
  161. console.log(`[DEBUG] Stopping ${type.toUpperCase()} Profiling`)
  162. state.session.post(debuggerCommands.stop, (err, data) => {
  163. const profile = data.profile
  164. if (err) return console.error(err)
  165. const randomId = Math.random().toString(36)
  166. const profilePath = path.resolve(os.tmpdir(), `${type}-${randomId}${fileExt}`)
  167. fs.writeFileSync(profilePath, JSON.stringify(profile))
  168. console.log(`[DEBUG] Writing file in ${profilePath}`)
  169. state[type] = false
  170. state.session.post(`${profilerDomain}.disable`)
  171. })
  172. } else {
  173. // start the profiling otherwise
  174. console.log(`[DEBUG] Starting ${type.toUpperCase()} Profiling`)
  175. state.session.post(`${profilerDomain}.enable`, _ => {
  176. state.session.post(debuggerCommands.start)
  177. state[type] = true
  178. })
  179. }
  180. }
  181. }
  182. // use hook
  183. process.on('SIGUSR1', handleSignal('cpu'))
  184. process.on('SIGUSR2', handleSignal('heap'))
  185. }
  186. /**
  187. * Retrieve metadata about the system
  188. */
  189. getSystemMetadata () {
  190. return {
  191. MACHINE_NAME: this.opts.MACHINE_NAME,
  192. PUBLIC_KEY: this.opts.PUBLIC_KEY,
  193. RECYCLE: this.opts.RECYCLE || false,
  194. PM2_VERSION: process.env.PM2_VERSION,
  195. MEMORY: os.totalmem() / 1000 / 1000,
  196. HOSTNAME: os.hostname(),
  197. CPUS: os.cpus()
  198. }
  199. }
  200. /**
  201. * Ping root url to retrieve node info
  202. * @private
  203. * @param {Function} cb invoked with <Error, Object> where Object is the response sended by the server
  204. */
  205. _pingRoot (cb) {
  206. const data = this.getSystemMetadata()
  207. this.httpClient.open({
  208. url: this.opts.ROOT_URL + '/api/node/verifyPM2',
  209. method: 'POST',
  210. data: {
  211. public_id: this.opts.PUBLIC_KEY,
  212. private_id: this.opts.SECRET_KEY,
  213. data: data
  214. },
  215. headers: {
  216. 'User-Agent': `PM2 Agent v${pkg.version}`
  217. }
  218. }, cb)
  219. }
  220. /**
  221. * Ping root to verify retrieve and connect to the km endpoint
  222. * @private
  223. * @param {Function} cb invoked with <Error, Boolean>
  224. */
  225. _verifyEndpoint (cb) {
  226. if (typeof cb !== 'function') cb = function () {}
  227. this._pingRoot((err, data) => {
  228. if (err) {
  229. log('Got an a error on ping root', err)
  230. return cb(err)
  231. }
  232. this.km_data = data
  233. // Verify data integrity
  234. if (data.disabled === true || data.pending === true) {
  235. log('Interactor is disabled by admins')
  236. return cb(new Error('Connection refused, you might have hit the limit of agents you can connect (send email at contact@keymetrics.io for more infos)'))
  237. }
  238. if (data.active === false) {
  239. log('Interactor not active: %s', data.msg || 'no message')
  240. return cb(null, data)
  241. }
  242. if (!data.endpoints) {
  243. return cb(new Error(`Endpoints field not present (${JSON.stringify(data)})`))
  244. }
  245. this.DAEMON_ACTIVE = true
  246. this.transport.connect(data.endpoints, cb)
  247. })
  248. }
  249. /**
  250. * Retrieve configuration from environnement
  251. */
  252. retrieveConf () {
  253. let opts = {}
  254. opts.MACHINE_NAME = process.env.PM2_MACHINE_NAME
  255. opts.PUBLIC_KEY = process.env.PM2_PUBLIC_KEY
  256. opts.PM2_BINARY_PATH = process.env.PM2_BINARY_PATH
  257. opts.SECRET_KEY = process.env.PM2_SECRET_KEY
  258. opts.RECYCLE = process.env.KM_RECYCLE ? JSON.parse(process.env.KM_RECYCLE) : false
  259. opts.PM2_VERSION = process.env.PM2_VERSION || '0.0.0'
  260. opts.AGENT_TRANSPORT_WEBSOCKET = process.env.AGENT_TRANSPORT_WEBSOCKET
  261. opts.internal_ip = Utility.network.v4
  262. opts.PM2_REMOTE_METHOD_ALLOWED = [
  263. 'restart',
  264. 'reload',
  265. 'reset',
  266. 'scale',
  267. 'startLogging',
  268. 'stopLogging',
  269. 'ping',
  270. 'launchSysMonitoring',
  271. 'deepUpdate'
  272. ]
  273. if (!opts.MACHINE_NAME) {
  274. console.error('You must provide a PM2_MACHINE_NAME environment variable')
  275. process.exit(cst.ERROR_EXIT)
  276. } else if (!opts.PUBLIC_KEY) {
  277. console.error('You must provide a PM2_PUBLIC_KEY environment variable')
  278. process.exit(cst.ERROR_EXIT)
  279. } else if (!opts.SECRET_KEY) {
  280. console.error('You must provide a PM2_SECRET_KEY environment variable')
  281. process.exit(cst.ERROR_EXIT)
  282. }
  283. return opts
  284. }
  285. /**
  286. * Ping root url to retrieve node info
  287. * @private
  288. * @param {Function} cb invoked with <Error> [optional]
  289. */
  290. start (cb) {
  291. let retries = 0
  292. this._rpc = this.startRPC()
  293. this.opts.ROOT_URL = cst.KEYMETRICS_ROOT_URL
  294. const verifyEndpointCallback = (err, result) => {
  295. if (err) {
  296. log('Error while trying to retrieve endpoints : ' + (err.message || err))
  297. if (retries++ < 30 && process.env.NODE_ENV !== 'test') {
  298. log('Retrying to retrieve endpoints...')
  299. return setTimeout(_ => {
  300. return this._verifyEndpoint(verifyEndpointCallback)
  301. }, 200 * retries)
  302. }
  303. this.sendToParent({ error: true, msg: err.message || err })
  304. return this.exit(new Error('Error retrieving endpoints'))
  305. }
  306. if (result === false) {
  307. log('False returned while trying to retrieve endpoints')
  308. return this.exit(new Error('Error retrieving endpoints'))
  309. }
  310. // send data over IPC for CLI feedback
  311. this.sendToParent({
  312. error: false,
  313. km_data: this.km_data,
  314. online: true,
  315. pid: process.pid,
  316. machine_name: this.opts.MACHINE_NAME,
  317. public_key: this.opts.PUBLIC_KEY,
  318. secret_key: this.opts.SECRET_KEY,
  319. reverse_interaction: this.opts.REVERSE_INTERACT
  320. })
  321. if (result && typeof result === 'object' &&
  322. result.error === true && result.active === false) {
  323. log(`Error when connecting: ${result.msg}`)
  324. return this.exit(new Error(`Error when connecting: ${result.msg}`))
  325. }
  326. // start workers
  327. this._workerEndpoint = setInterval(this._verifyEndpoint.bind(this, (err, result) => {
  328. if (err) return
  329. // We need to exit agent if bucket is disabled (trialing)
  330. if (result && typeof result === 'object' && result.error === true && result.active === false) {
  331. log(`Error when connecting: ${result.msg}, disconnecting transporters`)
  332. return this.transport.disconnect()
  333. }
  334. }), 60000)
  335. // start interactors
  336. this.watchDog = WatchDog
  337. setTimeout(() => {
  338. log('>> PM2 Watchdog started')
  339. this.watchDog.start({
  340. pm2_binary_path: this.opts.PM2_BINARY_PATH,
  341. conf: {
  342. ipm2: this.getPM2Client()
  343. }
  344. })
  345. }, 30 * 1000)
  346. this.push = new PushInteractor(this.opts, this.getPM2Client(), this.transport)
  347. this.reverse = new ReverseInteractor(this.opts, this.getPM2Client(), this.transport)
  348. this.push.start()
  349. this.reverse.start()
  350. log('Interactor daemon started')
  351. if (cb) {
  352. setTimeout(cb, 20)
  353. }
  354. }
  355. return this._verifyEndpoint(verifyEndpointCallback)
  356. }
  357. }
  358. // If its the entry file launch the daemon
  359. // otherwise we just required it to use a function
  360. if (require.main === module) {
  361. const d = domain.create()
  362. let daemon = null
  363. d.on('error', function (err) {
  364. console.error('-- FATAL EXCEPTION happened --')
  365. console.error(new Date())
  366. console.error(err.stack)
  367. console.log('Re-initiating Agent')
  368. InteractorClient.getOrSetConf(cst, null, (err, infos) => {
  369. if (err || !infos) {
  370. if (err) {
  371. console.error('[PM2 Agent] Failed to rescue agent :')
  372. console.error(err || new Error(`Cannot find configuration to connect to backend`))
  373. return process.exit(1)
  374. }
  375. }
  376. console.log(`[PM2 Agent] Using (Public key: ${infos.public_key}) (Private key: ${infos.secret_key}) (Info node: ${infos.info_node})`)
  377. // Exit anyway the errored agent
  378. var timeout = setTimeout(_ => {
  379. console.error('Daemonization of failsafe agent did not worked')
  380. daemon.exit(err)
  381. }, 2000)
  382. InteractorClient.daemonize(cst, infos, (err) => {
  383. if (err) {
  384. log('[PM2 Agent] Failed to rescue agent :')
  385. log(err)
  386. } else {
  387. log(`Succesfully launched new agent`)
  388. }
  389. clearTimeout(timeout)
  390. daemon.exit(err)
  391. })
  392. })
  393. })
  394. d.run(_ => {
  395. daemon = new InteractorDaemon()
  396. process.title = `PM2 Agent v${pkg.version}: (${cst.PM2_HOME})`
  397. log('[PM2 Agent] Launching agent')
  398. daemon.start()
  399. })
  400. }