Redis.js 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const commands_1 = require("@ioredis/commands");
  4. const events_1 = require("events");
  5. const standard_as_callback_1 = require("standard-as-callback");
  6. const cluster_1 = require("./cluster");
  7. const Command_1 = require("./Command");
  8. const connectors_1 = require("./connectors");
  9. const SentinelConnector_1 = require("./connectors/SentinelConnector");
  10. const eventHandler = require("./redis/event_handler");
  11. const RedisOptions_1 = require("./redis/RedisOptions");
  12. const ScanStream_1 = require("./ScanStream");
  13. const transaction_1 = require("./transaction");
  14. const utils_1 = require("./utils");
  15. const applyMixin_1 = require("./utils/applyMixin");
  16. const Commander_1 = require("./utils/Commander");
  17. const lodash_1 = require("./utils/lodash");
  18. const Deque = require("denque");
  19. const debug = (0, utils_1.Debug)("redis");
  20. /**
  21. * This is the major component of ioredis.
  22. * Use it to connect to a standalone Redis server or Sentinels.
  23. *
  24. * ```typescript
  25. * const redis = new Redis(); // Default port is 6379
  26. * async function main() {
  27. * redis.set("foo", "bar");
  28. * redis.get("foo", (err, result) => {
  29. * // `result` should be "bar"
  30. * console.log(err, result);
  31. * });
  32. * // Or use Promise
  33. * const result = await redis.get("foo");
  34. * }
  35. * ```
  36. */
  37. class Redis extends Commander_1.default {
  38. constructor(arg1, arg2, arg3) {
  39. super();
  40. this.status = "wait";
  41. /**
  42. * @ignore
  43. */
  44. this.isCluster = false;
  45. this.reconnectTimeout = null;
  46. this.connectionEpoch = 0;
  47. this.retryAttempts = 0;
  48. this.manuallyClosing = false;
  49. // Prepare autopipelines structures
  50. this._autoPipelines = new Map();
  51. this._runningAutoPipelines = new Set();
  52. this.parseOptions(arg1, arg2, arg3);
  53. events_1.EventEmitter.call(this);
  54. this.resetCommandQueue();
  55. this.resetOfflineQueue();
  56. if (this.options.Connector) {
  57. this.connector = new this.options.Connector(this.options);
  58. }
  59. else if (this.options.sentinels) {
  60. const sentinelConnector = new SentinelConnector_1.default(this.options);
  61. sentinelConnector.emitter = this;
  62. this.connector = sentinelConnector;
  63. }
  64. else {
  65. this.connector = new connectors_1.StandaloneConnector(this.options);
  66. }
  67. if (this.options.scripts) {
  68. Object.entries(this.options.scripts).forEach(([name, definition]) => {
  69. this.defineCommand(name, definition);
  70. });
  71. }
  72. // end(or wait) -> connecting -> connect -> ready -> end
  73. if (this.options.lazyConnect) {
  74. this.setStatus("wait");
  75. }
  76. else {
  77. this.connect().catch(lodash_1.noop);
  78. }
  79. }
  80. /**
  81. * Create a Redis instance.
  82. * This is the same as `new Redis()` but is included for compatibility with node-redis.
  83. */
  84. static createClient(...args) {
  85. return new Redis(...args);
  86. }
  87. get autoPipelineQueueSize() {
  88. let queued = 0;
  89. for (const pipeline of this._autoPipelines.values()) {
  90. queued += pipeline.length;
  91. }
  92. return queued;
  93. }
  94. /**
  95. * Create a connection to Redis.
  96. * This method will be invoked automatically when creating a new Redis instance
  97. * unless `lazyConnect: true` is passed.
  98. *
  99. * When calling this method manually, a Promise is returned, which will
  100. * be resolved when the connection status is ready.
  101. */
  102. connect(callback) {
  103. const promise = new Promise((resolve, reject) => {
  104. if (this.status === "connecting" ||
  105. this.status === "connect" ||
  106. this.status === "ready") {
  107. reject(new Error("Redis is already connecting/connected"));
  108. return;
  109. }
  110. this.connectionEpoch += 1;
  111. this.setStatus("connecting");
  112. const { options } = this;
  113. this.condition = {
  114. select: options.db,
  115. auth: options.username
  116. ? [options.username, options.password]
  117. : options.password,
  118. subscriber: false,
  119. };
  120. const _this = this;
  121. (0, standard_as_callback_1.default)(this.connector.connect(function (type, err) {
  122. _this.silentEmit(type, err);
  123. }), function (err, stream) {
  124. if (err) {
  125. _this.flushQueue(err);
  126. _this.silentEmit("error", err);
  127. reject(err);
  128. _this.setStatus("end");
  129. return;
  130. }
  131. let CONNECT_EVENT = options.tls ? "secureConnect" : "connect";
  132. if ("sentinels" in options &&
  133. options.sentinels &&
  134. !options.enableTLSForSentinelMode) {
  135. CONNECT_EVENT = "connect";
  136. }
  137. _this.stream = stream;
  138. if (options.noDelay) {
  139. stream.setNoDelay(true);
  140. }
  141. // Node ignores setKeepAlive before connect, therefore we wait for the event:
  142. // https://github.com/nodejs/node/issues/31663
  143. if (typeof options.keepAlive === 'number') {
  144. if (stream.connecting) {
  145. stream.once(CONNECT_EVENT, () => stream.setKeepAlive(true, options.keepAlive));
  146. }
  147. else {
  148. stream.setKeepAlive(true, options.keepAlive);
  149. }
  150. }
  151. if (stream.connecting) {
  152. stream.once(CONNECT_EVENT, eventHandler.connectHandler(_this));
  153. if (options.connectTimeout) {
  154. /*
  155. * Typically, Socket#setTimeout(0) will clear the timer
  156. * set before. However, in some platforms (Electron 3.x~4.x),
  157. * the timer will not be cleared. So we introduce a variable here.
  158. *
  159. * See https://github.com/electron/electron/issues/14915
  160. */
  161. let connectTimeoutCleared = false;
  162. stream.setTimeout(options.connectTimeout, function () {
  163. if (connectTimeoutCleared) {
  164. return;
  165. }
  166. stream.setTimeout(0);
  167. stream.destroy();
  168. const err = new Error("connect ETIMEDOUT");
  169. // @ts-expect-error
  170. err.errorno = "ETIMEDOUT";
  171. // @ts-expect-error
  172. err.code = "ETIMEDOUT";
  173. // @ts-expect-error
  174. err.syscall = "connect";
  175. eventHandler.errorHandler(_this)(err);
  176. });
  177. stream.once(CONNECT_EVENT, function () {
  178. connectTimeoutCleared = true;
  179. stream.setTimeout(0);
  180. });
  181. }
  182. }
  183. else if (stream.destroyed) {
  184. const firstError = _this.connector.firstError;
  185. if (firstError) {
  186. process.nextTick(() => {
  187. eventHandler.errorHandler(_this)(firstError);
  188. });
  189. }
  190. process.nextTick(eventHandler.closeHandler(_this));
  191. }
  192. else {
  193. process.nextTick(eventHandler.connectHandler(_this));
  194. }
  195. if (!stream.destroyed) {
  196. stream.once("error", eventHandler.errorHandler(_this));
  197. stream.once("close", eventHandler.closeHandler(_this));
  198. }
  199. const connectionReadyHandler = function () {
  200. _this.removeListener("close", connectionCloseHandler);
  201. resolve();
  202. };
  203. var connectionCloseHandler = function () {
  204. _this.removeListener("ready", connectionReadyHandler);
  205. reject(new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG));
  206. };
  207. _this.once("ready", connectionReadyHandler);
  208. _this.once("close", connectionCloseHandler);
  209. });
  210. });
  211. return (0, standard_as_callback_1.default)(promise, callback);
  212. }
  213. /**
  214. * Disconnect from Redis.
  215. *
  216. * This method closes the connection immediately,
  217. * and may lose some pending replies that haven't written to client.
  218. * If you want to wait for the pending replies, use Redis#quit instead.
  219. */
  220. disconnect(reconnect = false) {
  221. if (!reconnect) {
  222. this.manuallyClosing = true;
  223. }
  224. if (this.reconnectTimeout && !reconnect) {
  225. clearTimeout(this.reconnectTimeout);
  226. this.reconnectTimeout = null;
  227. }
  228. if (this.status === "wait") {
  229. eventHandler.closeHandler(this)();
  230. }
  231. else {
  232. this.connector.disconnect();
  233. }
  234. }
  235. /**
  236. * Disconnect from Redis.
  237. *
  238. * @deprecated
  239. */
  240. end() {
  241. this.disconnect();
  242. }
  243. /**
  244. * Create a new instance with the same options as the current one.
  245. *
  246. * @example
  247. * ```js
  248. * var redis = new Redis(6380);
  249. * var anotherRedis = redis.duplicate();
  250. * ```
  251. */
  252. duplicate(override) {
  253. return new Redis({ ...this.options, ...override });
  254. }
  255. /**
  256. * Listen for all requests received by the server in real time.
  257. *
  258. * This command will create a new connection to Redis and send a
  259. * MONITOR command via the new connection in order to avoid disturbing
  260. * the current connection.
  261. *
  262. * @param callback The callback function. If omit, a promise will be returned.
  263. * @example
  264. * ```js
  265. * var redis = new Redis();
  266. * redis.monitor(function (err, monitor) {
  267. * // Entering monitoring mode.
  268. * monitor.on('monitor', function (time, args, source, database) {
  269. * console.log(time + ": " + util.inspect(args));
  270. * });
  271. * });
  272. *
  273. * // supports promise as well as other commands
  274. * redis.monitor().then(function (monitor) {
  275. * monitor.on('monitor', function (time, args, source, database) {
  276. * console.log(time + ": " + util.inspect(args));
  277. * });
  278. * });
  279. * ```
  280. */
  281. monitor(callback) {
  282. const monitorInstance = this.duplicate({
  283. monitor: true,
  284. lazyConnect: false,
  285. });
  286. return (0, standard_as_callback_1.default)(new Promise(function (resolve, reject) {
  287. monitorInstance.once("error", reject);
  288. monitorInstance.once("monitoring", function () {
  289. resolve(monitorInstance);
  290. });
  291. }), callback);
  292. }
  293. /**
  294. * Send a command to Redis
  295. *
  296. * This method is used internally and in most cases you should not
  297. * use it directly. If you need to send a command that is not supported
  298. * by the library, you can use the `call` method:
  299. *
  300. * ```js
  301. * const redis = new Redis();
  302. *
  303. * redis.call('set', 'foo', 'bar');
  304. * // or
  305. * redis.call(['set', 'foo', 'bar']);
  306. * ```
  307. *
  308. * @ignore
  309. */
  310. sendCommand(command, stream) {
  311. if (this.status === "wait") {
  312. this.connect().catch(lodash_1.noop);
  313. }
  314. if (this.status === "end") {
  315. command.reject(new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG));
  316. return command.promise;
  317. }
  318. if (this.condition.subscriber &&
  319. !Command_1.default.checkFlag("VALID_IN_SUBSCRIBER_MODE", command.name)) {
  320. command.reject(new Error("Connection in subscriber mode, only subscriber commands may be used"));
  321. return command.promise;
  322. }
  323. if (typeof this.options.commandTimeout === "number") {
  324. command.setTimeout(this.options.commandTimeout);
  325. }
  326. let writable = this.status === "ready" ||
  327. (!stream &&
  328. this.status === "connect" &&
  329. (0, commands_1.exists)(command.name) &&
  330. (0, commands_1.hasFlag)(command.name, "loading"));
  331. if (!this.stream) {
  332. writable = false;
  333. }
  334. else if (!this.stream.writable) {
  335. writable = false;
  336. // @ts-expect-error
  337. }
  338. else if (this.stream._writableState && this.stream._writableState.ended) {
  339. // TODO: We should be able to remove this as the PR has already been merged.
  340. // https://github.com/iojs/io.js/pull/1217
  341. writable = false;
  342. }
  343. if (!writable) {
  344. if (!this.options.enableOfflineQueue) {
  345. command.reject(new Error("Stream isn't writeable and enableOfflineQueue options is false"));
  346. return command.promise;
  347. }
  348. if (command.name === "quit" && this.offlineQueue.length === 0) {
  349. this.disconnect();
  350. command.resolve(Buffer.from("OK"));
  351. return command.promise;
  352. }
  353. // @ts-expect-error
  354. if (debug.enabled) {
  355. debug("queue command[%s]: %d -> %s(%o)", this._getDescription(), this.condition.select, command.name, command.args);
  356. }
  357. this.offlineQueue.push({
  358. command: command,
  359. stream: stream,
  360. select: this.condition.select,
  361. });
  362. }
  363. else {
  364. // @ts-expect-error
  365. if (debug.enabled) {
  366. debug("write command[%s]: %d -> %s(%o)", this._getDescription(), this.condition.select, command.name, command.args);
  367. }
  368. if (stream) {
  369. if ("isPipeline" in stream && stream.isPipeline) {
  370. stream.write(command.toWritable(stream.destination.redis.stream));
  371. }
  372. else {
  373. stream.write(command.toWritable(stream));
  374. }
  375. }
  376. else {
  377. this.stream.write(command.toWritable(this.stream));
  378. }
  379. this.commandQueue.push({
  380. command: command,
  381. stream: stream,
  382. select: this.condition.select,
  383. });
  384. if (Command_1.default.checkFlag("WILL_DISCONNECT", command.name)) {
  385. this.manuallyClosing = true;
  386. }
  387. }
  388. if (command.name === "select" && (0, utils_1.isInt)(command.args[0])) {
  389. const db = parseInt(command.args[0], 10);
  390. if (this.condition.select !== db) {
  391. this.condition.select = db;
  392. this.emit("select", db);
  393. debug("switch to db [%d]", this.condition.select);
  394. }
  395. }
  396. return command.promise;
  397. }
  398. scanStream(options) {
  399. return this.createScanStream("scan", { options });
  400. }
  401. scanBufferStream(options) {
  402. return this.createScanStream("scanBuffer", { options });
  403. }
  404. sscanStream(key, options) {
  405. return this.createScanStream("sscan", { key, options });
  406. }
  407. sscanBufferStream(key, options) {
  408. return this.createScanStream("sscanBuffer", { key, options });
  409. }
  410. hscanStream(key, options) {
  411. return this.createScanStream("hscan", { key, options });
  412. }
  413. hscanBufferStream(key, options) {
  414. return this.createScanStream("hscanBuffer", { key, options });
  415. }
  416. zscanStream(key, options) {
  417. return this.createScanStream("zscan", { key, options });
  418. }
  419. zscanBufferStream(key, options) {
  420. return this.createScanStream("zscanBuffer", { key, options });
  421. }
  422. /**
  423. * Emit only when there's at least one listener.
  424. *
  425. * @ignore
  426. */
  427. silentEmit(eventName, arg) {
  428. let error;
  429. if (eventName === "error") {
  430. error = arg;
  431. if (this.status === "end") {
  432. return;
  433. }
  434. if (this.manuallyClosing) {
  435. // ignore connection related errors when manually disconnecting
  436. if (error instanceof Error &&
  437. (error.message === utils_1.CONNECTION_CLOSED_ERROR_MSG ||
  438. // @ts-expect-error
  439. error.syscall === "connect" ||
  440. // @ts-expect-error
  441. error.syscall === "read")) {
  442. return;
  443. }
  444. }
  445. }
  446. if (this.listeners(eventName).length > 0) {
  447. return this.emit.apply(this, arguments);
  448. }
  449. if (error && error instanceof Error) {
  450. console.error("[ioredis] Unhandled error event:", error.stack);
  451. }
  452. return false;
  453. }
  454. /**
  455. * Get description of the connection. Used for debugging.
  456. */
  457. _getDescription() {
  458. let description;
  459. if ("path" in this.options && this.options.path) {
  460. description = this.options.path;
  461. }
  462. else if (this.stream &&
  463. this.stream.remoteAddress &&
  464. this.stream.remotePort) {
  465. description = this.stream.remoteAddress + ":" + this.stream.remotePort;
  466. }
  467. else if ("host" in this.options && this.options.host) {
  468. description = this.options.host + ":" + this.options.port;
  469. }
  470. else {
  471. // Unexpected
  472. description = "";
  473. }
  474. if (this.options.connectionName) {
  475. description += ` (${this.options.connectionName})`;
  476. }
  477. return description;
  478. }
  479. resetCommandQueue() {
  480. this.commandQueue = new Deque();
  481. }
  482. resetOfflineQueue() {
  483. this.offlineQueue = new Deque();
  484. }
  485. recoverFromFatalError(commandError, err, options) {
  486. this.flushQueue(err, options);
  487. this.silentEmit("error", err);
  488. this.disconnect(true);
  489. }
  490. handleReconnection(err, item) {
  491. let needReconnect = false;
  492. if (this.options.reconnectOnError) {
  493. needReconnect = this.options.reconnectOnError(err);
  494. }
  495. switch (needReconnect) {
  496. case 1:
  497. case true:
  498. if (this.status !== "reconnecting") {
  499. this.disconnect(true);
  500. }
  501. item.command.reject(err);
  502. break;
  503. case 2:
  504. if (this.status !== "reconnecting") {
  505. this.disconnect(true);
  506. }
  507. if (this.condition.select !== item.select &&
  508. item.command.name !== "select") {
  509. this.select(item.select);
  510. }
  511. // TODO
  512. // @ts-expect-error
  513. this.sendCommand(item.command);
  514. break;
  515. default:
  516. item.command.reject(err);
  517. }
  518. }
  519. parseOptions(...args) {
  520. const options = {};
  521. let isTls = false;
  522. for (let i = 0; i < args.length; ++i) {
  523. const arg = args[i];
  524. if (arg === null || typeof arg === "undefined") {
  525. continue;
  526. }
  527. if (typeof arg === "object") {
  528. (0, lodash_1.defaults)(options, arg);
  529. }
  530. else if (typeof arg === "string") {
  531. (0, lodash_1.defaults)(options, (0, utils_1.parseURL)(arg));
  532. if (arg.startsWith("rediss://")) {
  533. isTls = true;
  534. }
  535. }
  536. else if (typeof arg === "number") {
  537. options.port = arg;
  538. }
  539. else {
  540. throw new Error("Invalid argument " + arg);
  541. }
  542. }
  543. if (isTls) {
  544. (0, lodash_1.defaults)(options, { tls: true });
  545. }
  546. (0, lodash_1.defaults)(options, Redis.defaultOptions);
  547. if (typeof options.port === "string") {
  548. options.port = parseInt(options.port, 10);
  549. }
  550. if (typeof options.db === "string") {
  551. options.db = parseInt(options.db, 10);
  552. }
  553. // @ts-expect-error
  554. this.options = (0, utils_1.resolveTLSProfile)(options);
  555. }
  556. /**
  557. * Change instance's status
  558. */
  559. setStatus(status, arg) {
  560. // @ts-expect-error
  561. if (debug.enabled) {
  562. debug("status[%s]: %s -> %s", this._getDescription(), this.status || "[empty]", status);
  563. }
  564. this.status = status;
  565. process.nextTick(this.emit.bind(this, status, arg));
  566. }
  567. createScanStream(command, { key, options = {} }) {
  568. return new ScanStream_1.default({
  569. objectMode: true,
  570. key: key,
  571. redis: this,
  572. command: command,
  573. ...options,
  574. });
  575. }
  576. /**
  577. * Flush offline queue and command queue with error.
  578. *
  579. * @param error The error object to send to the commands
  580. * @param options options
  581. */
  582. flushQueue(error, options) {
  583. options = (0, lodash_1.defaults)({}, options, {
  584. offlineQueue: true,
  585. commandQueue: true,
  586. });
  587. let item;
  588. if (options.offlineQueue) {
  589. while ((item = this.offlineQueue.shift())) {
  590. item.command.reject(error);
  591. }
  592. }
  593. if (options.commandQueue) {
  594. if (this.commandQueue.length > 0) {
  595. if (this.stream) {
  596. this.stream.removeAllListeners("data");
  597. }
  598. while ((item = this.commandQueue.shift())) {
  599. item.command.reject(error);
  600. }
  601. }
  602. }
  603. }
  604. /**
  605. * Check whether Redis has finished loading the persistent data and is able to
  606. * process commands.
  607. */
  608. _readyCheck(callback) {
  609. const _this = this;
  610. this.info(function (err, res) {
  611. if (err) {
  612. if (err.message && err.message.includes("NOPERM")) {
  613. console.warn(`Skipping the ready check because INFO command fails: "${err.message}". You can disable ready check with "enableReadyCheck". More: https://github.com/luin/ioredis/wiki/Disable-ready-check.`);
  614. return callback(null, {});
  615. }
  616. return callback(err);
  617. }
  618. if (typeof res !== "string") {
  619. return callback(null, res);
  620. }
  621. const info = {};
  622. const lines = res.split("\r\n");
  623. for (let i = 0; i < lines.length; ++i) {
  624. const [fieldName, ...fieldValueParts] = lines[i].split(":");
  625. const fieldValue = fieldValueParts.join(":");
  626. if (fieldValue) {
  627. info[fieldName] = fieldValue;
  628. }
  629. }
  630. if (!info.loading || info.loading === "0") {
  631. callback(null, info);
  632. }
  633. else {
  634. const loadingEtaMs = (info.loading_eta_seconds || 1) * 1000;
  635. const retryTime = _this.options.maxLoadingRetryTime &&
  636. _this.options.maxLoadingRetryTime < loadingEtaMs
  637. ? _this.options.maxLoadingRetryTime
  638. : loadingEtaMs;
  639. debug("Redis server still loading, trying again in " + retryTime + "ms");
  640. setTimeout(function () {
  641. _this._readyCheck(callback);
  642. }, retryTime);
  643. }
  644. }).catch(lodash_1.noop);
  645. }
  646. }
  647. Redis.Cluster = cluster_1.default;
  648. Redis.Command = Command_1.default;
  649. /**
  650. * Default options
  651. */
  652. Redis.defaultOptions = RedisOptions_1.DEFAULT_REDIS_OPTIONS;
  653. (0, applyMixin_1.default)(Redis, events_1.EventEmitter);
  654. (0, transaction_1.addTransactionSupport)(Redis.prototype);
  655. exports.default = Redis;