index.js 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const commands_1 = require("@ioredis/commands");
  4. const events_1 = require("events");
  5. const redis_errors_1 = require("redis-errors");
  6. const standard_as_callback_1 = require("standard-as-callback");
  7. const Command_1 = require("../Command");
  8. const ClusterAllFailedError_1 = require("../errors/ClusterAllFailedError");
  9. const Redis_1 = require("../Redis");
  10. const ScanStream_1 = require("../ScanStream");
  11. const transaction_1 = require("../transaction");
  12. const utils_1 = require("../utils");
  13. const applyMixin_1 = require("../utils/applyMixin");
  14. const Commander_1 = require("../utils/Commander");
  15. const ClusterOptions_1 = require("./ClusterOptions");
  16. const ClusterSubscriber_1 = require("./ClusterSubscriber");
  17. const ConnectionPool_1 = require("./ConnectionPool");
  18. const DelayQueue_1 = require("./DelayQueue");
  19. const util_1 = require("./util");
  20. const Deque = require("denque");
  21. const debug = (0, utils_1.Debug)("cluster");
  22. const REJECT_OVERWRITTEN_COMMANDS = new WeakSet();
  23. /**
  24. * Client for the official Redis Cluster
  25. */
  26. class Cluster extends Commander_1.default {
  27. /**
  28. * Creates an instance of Cluster.
  29. */
  30. constructor(startupNodes, options = {}) {
  31. super();
  32. this.slots = [];
  33. /**
  34. * @ignore
  35. */
  36. this._groupsIds = {};
  37. /**
  38. * @ignore
  39. */
  40. this._groupsBySlot = Array(16384);
  41. /**
  42. * @ignore
  43. */
  44. this.isCluster = true;
  45. this.retryAttempts = 0;
  46. this.delayQueue = new DelayQueue_1.default();
  47. this.offlineQueue = new Deque();
  48. this.isRefreshing = false;
  49. this._autoPipelines = new Map();
  50. this._runningAutoPipelines = new Set();
  51. this._readyDelayedCallbacks = [];
  52. /**
  53. * Every time Cluster#connect() is called, this value will be
  54. * auto-incrementing. The purpose of this value is used for
  55. * discarding previous connect attampts when creating a new
  56. * connection.
  57. */
  58. this.connectionEpoch = 0;
  59. events_1.EventEmitter.call(this);
  60. this.startupNodes = startupNodes;
  61. this.options = (0, utils_1.defaults)({}, options, ClusterOptions_1.DEFAULT_CLUSTER_OPTIONS, this.options);
  62. if (this.options.redisOptions &&
  63. this.options.redisOptions.keyPrefix &&
  64. !this.options.keyPrefix) {
  65. this.options.keyPrefix = this.options.redisOptions.keyPrefix;
  66. }
  67. // validate options
  68. if (typeof this.options.scaleReads !== "function" &&
  69. ["all", "master", "slave"].indexOf(this.options.scaleReads) === -1) {
  70. throw new Error('Invalid option scaleReads "' +
  71. this.options.scaleReads +
  72. '". Expected "all", "master", "slave" or a custom function');
  73. }
  74. this.connectionPool = new ConnectionPool_1.default(this.options.redisOptions);
  75. this.connectionPool.on("-node", (redis, key) => {
  76. this.emit("-node", redis);
  77. });
  78. this.connectionPool.on("+node", (redis) => {
  79. this.emit("+node", redis);
  80. });
  81. this.connectionPool.on("drain", () => {
  82. this.setStatus("close");
  83. });
  84. this.connectionPool.on("nodeError", (error, key) => {
  85. this.emit("node error", error, key);
  86. });
  87. this.subscriber = new ClusterSubscriber_1.default(this.connectionPool, this);
  88. if (this.options.scripts) {
  89. Object.entries(this.options.scripts).forEach(([name, definition]) => {
  90. this.defineCommand(name, definition);
  91. });
  92. }
  93. if (this.options.lazyConnect) {
  94. this.setStatus("wait");
  95. }
  96. else {
  97. this.connect().catch((err) => {
  98. debug("connecting failed: %s", err);
  99. });
  100. }
  101. }
  102. /**
  103. * Connect to a cluster
  104. */
  105. connect() {
  106. return new Promise((resolve, reject) => {
  107. if (this.status === "connecting" ||
  108. this.status === "connect" ||
  109. this.status === "ready") {
  110. reject(new Error("Redis is already connecting/connected"));
  111. return;
  112. }
  113. const epoch = ++this.connectionEpoch;
  114. this.setStatus("connecting");
  115. this.resolveStartupNodeHostnames()
  116. .then((nodes) => {
  117. if (this.connectionEpoch !== epoch) {
  118. debug("discard connecting after resolving startup nodes because epoch not match: %d != %d", epoch, this.connectionEpoch);
  119. reject(new redis_errors_1.RedisError("Connection is discarded because a new connection is made"));
  120. return;
  121. }
  122. if (this.status !== "connecting") {
  123. debug("discard connecting after resolving startup nodes because the status changed to %s", this.status);
  124. reject(new redis_errors_1.RedisError("Connection is aborted"));
  125. return;
  126. }
  127. this.connectionPool.reset(nodes);
  128. const readyHandler = () => {
  129. this.setStatus("ready");
  130. this.retryAttempts = 0;
  131. this.executeOfflineCommands();
  132. this.resetNodesRefreshInterval();
  133. resolve();
  134. };
  135. let closeListener = undefined;
  136. const refreshListener = () => {
  137. this.invokeReadyDelayedCallbacks(undefined);
  138. this.removeListener("close", closeListener);
  139. this.manuallyClosing = false;
  140. this.setStatus("connect");
  141. if (this.options.enableReadyCheck) {
  142. this.readyCheck((err, fail) => {
  143. if (err || fail) {
  144. debug("Ready check failed (%s). Reconnecting...", err || fail);
  145. if (this.status === "connect") {
  146. this.disconnect(true);
  147. }
  148. }
  149. else {
  150. readyHandler();
  151. }
  152. });
  153. }
  154. else {
  155. readyHandler();
  156. }
  157. };
  158. closeListener = () => {
  159. const error = new Error("None of startup nodes is available");
  160. this.removeListener("refresh", refreshListener);
  161. this.invokeReadyDelayedCallbacks(error);
  162. reject(error);
  163. };
  164. this.once("refresh", refreshListener);
  165. this.once("close", closeListener);
  166. this.once("close", this.handleCloseEvent.bind(this));
  167. this.refreshSlotsCache((err) => {
  168. if (err && err.message === ClusterAllFailedError_1.default.defaultMessage) {
  169. Redis_1.default.prototype.silentEmit.call(this, "error", err);
  170. this.connectionPool.reset([]);
  171. }
  172. });
  173. this.subscriber.start();
  174. })
  175. .catch((err) => {
  176. this.setStatus("close");
  177. this.handleCloseEvent(err);
  178. this.invokeReadyDelayedCallbacks(err);
  179. reject(err);
  180. });
  181. });
  182. }
  183. /**
  184. * Disconnect from every node in the cluster.
  185. */
  186. disconnect(reconnect = false) {
  187. const status = this.status;
  188. this.setStatus("disconnecting");
  189. if (!reconnect) {
  190. this.manuallyClosing = true;
  191. }
  192. if (this.reconnectTimeout && !reconnect) {
  193. clearTimeout(this.reconnectTimeout);
  194. this.reconnectTimeout = null;
  195. debug("Canceled reconnecting attempts");
  196. }
  197. this.clearNodesRefreshInterval();
  198. this.subscriber.stop();
  199. if (status === "wait") {
  200. this.setStatus("close");
  201. this.handleCloseEvent();
  202. }
  203. else {
  204. this.connectionPool.reset([]);
  205. }
  206. }
  207. /**
  208. * Quit the cluster gracefully.
  209. */
  210. quit(callback) {
  211. const status = this.status;
  212. this.setStatus("disconnecting");
  213. this.manuallyClosing = true;
  214. if (this.reconnectTimeout) {
  215. clearTimeout(this.reconnectTimeout);
  216. this.reconnectTimeout = null;
  217. }
  218. this.clearNodesRefreshInterval();
  219. this.subscriber.stop();
  220. if (status === "wait") {
  221. const ret = (0, standard_as_callback_1.default)(Promise.resolve("OK"), callback);
  222. // use setImmediate to make sure "close" event
  223. // being emitted after quit() is returned
  224. setImmediate(function () {
  225. this.setStatus("close");
  226. this.handleCloseEvent();
  227. }.bind(this));
  228. return ret;
  229. }
  230. return (0, standard_as_callback_1.default)(Promise.all(this.nodes().map((node) => node.quit().catch((err) => {
  231. // Ignore the error caused by disconnecting since
  232. // we're disconnecting...
  233. if (err.message === utils_1.CONNECTION_CLOSED_ERROR_MSG) {
  234. return "OK";
  235. }
  236. throw err;
  237. }))).then(() => "OK"), callback);
  238. }
  239. /**
  240. * Create a new instance with the same startup nodes and options as the current one.
  241. *
  242. * @example
  243. * ```js
  244. * var cluster = new Redis.Cluster([{ host: "127.0.0.1", port: "30001" }]);
  245. * var anotherCluster = cluster.duplicate();
  246. * ```
  247. */
  248. duplicate(overrideStartupNodes = [], overrideOptions = {}) {
  249. const startupNodes = overrideStartupNodes.length > 0
  250. ? overrideStartupNodes
  251. : this.startupNodes.slice(0);
  252. const options = Object.assign({}, this.options, overrideOptions);
  253. return new Cluster(startupNodes, options);
  254. }
  255. /**
  256. * Get nodes with the specified role
  257. */
  258. nodes(role = "all") {
  259. if (role !== "all" && role !== "master" && role !== "slave") {
  260. throw new Error('Invalid role "' + role + '". Expected "all", "master" or "slave"');
  261. }
  262. return this.connectionPool.getNodes(role);
  263. }
  264. /**
  265. * This is needed in order not to install a listener for each auto pipeline
  266. *
  267. * @ignore
  268. */
  269. delayUntilReady(callback) {
  270. this._readyDelayedCallbacks.push(callback);
  271. }
  272. /**
  273. * Get the number of commands queued in automatic pipelines.
  274. *
  275. * This is not available (and returns 0) until the cluster is connected and slots information have been received.
  276. */
  277. get autoPipelineQueueSize() {
  278. let queued = 0;
  279. for (const pipeline of this._autoPipelines.values()) {
  280. queued += pipeline.length;
  281. }
  282. return queued;
  283. }
  284. /**
  285. * Refresh the slot cache
  286. *
  287. * @ignore
  288. */
  289. refreshSlotsCache(callback) {
  290. if (this.isRefreshing) {
  291. if (callback) {
  292. process.nextTick(callback);
  293. }
  294. return;
  295. }
  296. this.isRefreshing = true;
  297. const _this = this;
  298. const wrapper = (error) => {
  299. this.isRefreshing = false;
  300. if (callback) {
  301. callback(error);
  302. }
  303. };
  304. const nodes = (0, utils_1.shuffle)(this.connectionPool.getNodes());
  305. let lastNodeError = null;
  306. function tryNode(index) {
  307. if (index === nodes.length) {
  308. const error = new ClusterAllFailedError_1.default(ClusterAllFailedError_1.default.defaultMessage, lastNodeError);
  309. return wrapper(error);
  310. }
  311. const node = nodes[index];
  312. const key = `${node.options.host}:${node.options.port}`;
  313. debug("getting slot cache from %s", key);
  314. _this.getInfoFromNode(node, function (err) {
  315. switch (_this.status) {
  316. case "close":
  317. case "end":
  318. return wrapper(new Error("Cluster is disconnected."));
  319. case "disconnecting":
  320. return wrapper(new Error("Cluster is disconnecting."));
  321. }
  322. if (err) {
  323. _this.emit("node error", err, key);
  324. lastNodeError = err;
  325. tryNode(index + 1);
  326. }
  327. else {
  328. _this.emit("refresh");
  329. wrapper();
  330. }
  331. });
  332. }
  333. tryNode(0);
  334. }
  335. /**
  336. * @ignore
  337. */
  338. sendCommand(command, stream, node) {
  339. if (this.status === "wait") {
  340. this.connect().catch(utils_1.noop);
  341. }
  342. if (this.status === "end") {
  343. command.reject(new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG));
  344. return command.promise;
  345. }
  346. let to = this.options.scaleReads;
  347. if (to !== "master") {
  348. const isCommandReadOnly = command.isReadOnly ||
  349. ((0, commands_1.exists)(command.name) && (0, commands_1.hasFlag)(command.name, "readonly"));
  350. if (!isCommandReadOnly) {
  351. to = "master";
  352. }
  353. }
  354. let targetSlot = node ? node.slot : command.getSlot();
  355. const ttl = {};
  356. const _this = this;
  357. if (!node && !REJECT_OVERWRITTEN_COMMANDS.has(command)) {
  358. REJECT_OVERWRITTEN_COMMANDS.add(command);
  359. const reject = command.reject;
  360. command.reject = function (err) {
  361. const partialTry = tryConnection.bind(null, true);
  362. _this.handleError(err, ttl, {
  363. moved: function (slot, key) {
  364. debug("command %s is moved to %s", command.name, key);
  365. targetSlot = Number(slot);
  366. if (_this.slots[slot]) {
  367. _this.slots[slot][0] = key;
  368. }
  369. else {
  370. _this.slots[slot] = [key];
  371. }
  372. _this._groupsBySlot[slot] =
  373. _this._groupsIds[_this.slots[slot].join(";")];
  374. _this.connectionPool.findOrCreate(_this.natMapper(key));
  375. tryConnection();
  376. debug("refreshing slot caches... (triggered by MOVED error)");
  377. _this.refreshSlotsCache();
  378. },
  379. ask: function (slot, key) {
  380. debug("command %s is required to ask %s:%s", command.name, key);
  381. const mapped = _this.natMapper(key);
  382. _this.connectionPool.findOrCreate(mapped);
  383. tryConnection(false, `${mapped.host}:${mapped.port}`);
  384. },
  385. tryagain: partialTry,
  386. clusterDown: partialTry,
  387. connectionClosed: partialTry,
  388. maxRedirections: function (redirectionError) {
  389. reject.call(command, redirectionError);
  390. },
  391. defaults: function () {
  392. reject.call(command, err);
  393. },
  394. });
  395. };
  396. }
  397. tryConnection();
  398. function tryConnection(random, asking) {
  399. if (_this.status === "end") {
  400. command.reject(new redis_errors_1.AbortError("Cluster is ended."));
  401. return;
  402. }
  403. let redis;
  404. if (_this.status === "ready" || command.name === "cluster") {
  405. if (node && node.redis) {
  406. redis = node.redis;
  407. }
  408. else if (Command_1.default.checkFlag("ENTER_SUBSCRIBER_MODE", command.name) ||
  409. Command_1.default.checkFlag("EXIT_SUBSCRIBER_MODE", command.name)) {
  410. redis = _this.subscriber.getInstance();
  411. if (!redis) {
  412. command.reject(new redis_errors_1.AbortError("No subscriber for the cluster"));
  413. return;
  414. }
  415. }
  416. else {
  417. if (!random) {
  418. if (typeof targetSlot === "number" && _this.slots[targetSlot]) {
  419. const nodeKeys = _this.slots[targetSlot];
  420. if (typeof to === "function") {
  421. const nodes = nodeKeys.map(function (key) {
  422. return _this.connectionPool.getInstanceByKey(key);
  423. });
  424. redis = to(nodes, command);
  425. if (Array.isArray(redis)) {
  426. redis = (0, utils_1.sample)(redis);
  427. }
  428. if (!redis) {
  429. redis = nodes[0];
  430. }
  431. }
  432. else {
  433. let key;
  434. if (to === "all") {
  435. key = (0, utils_1.sample)(nodeKeys);
  436. }
  437. else if (to === "slave" && nodeKeys.length > 1) {
  438. key = (0, utils_1.sample)(nodeKeys, 1);
  439. }
  440. else {
  441. key = nodeKeys[0];
  442. }
  443. redis = _this.connectionPool.getInstanceByKey(key);
  444. }
  445. }
  446. if (asking) {
  447. redis = _this.connectionPool.getInstanceByKey(asking);
  448. redis.asking();
  449. }
  450. }
  451. if (!redis) {
  452. redis =
  453. (typeof to === "function"
  454. ? null
  455. : _this.connectionPool.getSampleInstance(to)) ||
  456. _this.connectionPool.getSampleInstance("all");
  457. }
  458. }
  459. if (node && !node.redis) {
  460. node.redis = redis;
  461. }
  462. }
  463. if (redis) {
  464. redis.sendCommand(command, stream);
  465. }
  466. else if (_this.options.enableOfflineQueue) {
  467. _this.offlineQueue.push({
  468. command: command,
  469. stream: stream,
  470. node: node,
  471. });
  472. }
  473. else {
  474. command.reject(new Error("Cluster isn't ready and enableOfflineQueue options is false"));
  475. }
  476. }
  477. return command.promise;
  478. }
  479. sscanStream(key, options) {
  480. return this.createScanStream("sscan", { key, options });
  481. }
  482. sscanBufferStream(key, options) {
  483. return this.createScanStream("sscanBuffer", { key, options });
  484. }
  485. hscanStream(key, options) {
  486. return this.createScanStream("hscan", { key, options });
  487. }
  488. hscanBufferStream(key, options) {
  489. return this.createScanStream("hscanBuffer", { key, options });
  490. }
  491. zscanStream(key, options) {
  492. return this.createScanStream("zscan", { key, options });
  493. }
  494. zscanBufferStream(key, options) {
  495. return this.createScanStream("zscanBuffer", { key, options });
  496. }
  497. /**
  498. * @ignore
  499. */
  500. handleError(error, ttl, handlers) {
  501. if (typeof ttl.value === "undefined") {
  502. ttl.value = this.options.maxRedirections;
  503. }
  504. else {
  505. ttl.value -= 1;
  506. }
  507. if (ttl.value <= 0) {
  508. handlers.maxRedirections(new Error("Too many Cluster redirections. Last error: " + error));
  509. return;
  510. }
  511. const errv = error.message.split(" ");
  512. if (errv[0] === "MOVED") {
  513. const timeout = this.options.retryDelayOnMoved;
  514. if (timeout && typeof timeout === "number") {
  515. this.delayQueue.push("moved", handlers.moved.bind(null, errv[1], errv[2]), { timeout });
  516. }
  517. else {
  518. handlers.moved(errv[1], errv[2]);
  519. }
  520. }
  521. else if (errv[0] === "ASK") {
  522. handlers.ask(errv[1], errv[2]);
  523. }
  524. else if (errv[0] === "TRYAGAIN") {
  525. this.delayQueue.push("tryagain", handlers.tryagain, {
  526. timeout: this.options.retryDelayOnTryAgain,
  527. });
  528. }
  529. else if (errv[0] === "CLUSTERDOWN" &&
  530. this.options.retryDelayOnClusterDown > 0) {
  531. this.delayQueue.push("clusterdown", handlers.connectionClosed, {
  532. timeout: this.options.retryDelayOnClusterDown,
  533. callback: this.refreshSlotsCache.bind(this),
  534. });
  535. }
  536. else if (error.message === utils_1.CONNECTION_CLOSED_ERROR_MSG &&
  537. this.options.retryDelayOnFailover > 0 &&
  538. this.status === "ready") {
  539. this.delayQueue.push("failover", handlers.connectionClosed, {
  540. timeout: this.options.retryDelayOnFailover,
  541. callback: this.refreshSlotsCache.bind(this),
  542. });
  543. }
  544. else {
  545. handlers.defaults();
  546. }
  547. }
  548. resetOfflineQueue() {
  549. this.offlineQueue = new Deque();
  550. }
  551. clearNodesRefreshInterval() {
  552. if (this.slotsTimer) {
  553. clearTimeout(this.slotsTimer);
  554. this.slotsTimer = null;
  555. }
  556. }
  557. resetNodesRefreshInterval() {
  558. if (this.slotsTimer || !this.options.slotsRefreshInterval) {
  559. return;
  560. }
  561. const nextRound = () => {
  562. this.slotsTimer = setTimeout(() => {
  563. debug('refreshing slot caches... (triggered by "slotsRefreshInterval" option)');
  564. this.refreshSlotsCache(() => {
  565. nextRound();
  566. });
  567. }, this.options.slotsRefreshInterval);
  568. };
  569. nextRound();
  570. }
  571. /**
  572. * Change cluster instance's status
  573. */
  574. setStatus(status) {
  575. debug("status: %s -> %s", this.status || "[empty]", status);
  576. this.status = status;
  577. process.nextTick(() => {
  578. this.emit(status);
  579. });
  580. }
  581. /**
  582. * Called when closed to check whether a reconnection should be made
  583. */
  584. handleCloseEvent(reason) {
  585. if (reason) {
  586. debug("closed because %s", reason);
  587. }
  588. let retryDelay;
  589. if (!this.manuallyClosing &&
  590. typeof this.options.clusterRetryStrategy === "function") {
  591. retryDelay = this.options.clusterRetryStrategy.call(this, ++this.retryAttempts, reason);
  592. }
  593. if (typeof retryDelay === "number") {
  594. this.setStatus("reconnecting");
  595. this.reconnectTimeout = setTimeout(() => {
  596. this.reconnectTimeout = null;
  597. debug("Cluster is disconnected. Retrying after %dms", retryDelay);
  598. this.connect().catch(function (err) {
  599. debug("Got error %s when reconnecting. Ignoring...", err);
  600. });
  601. }, retryDelay);
  602. }
  603. else {
  604. this.setStatus("end");
  605. this.flushQueue(new Error("None of startup nodes is available"));
  606. }
  607. }
  608. /**
  609. * Flush offline queue with error.
  610. */
  611. flushQueue(error) {
  612. let item;
  613. while ((item = this.offlineQueue.shift())) {
  614. item.command.reject(error);
  615. }
  616. }
  617. executeOfflineCommands() {
  618. if (this.offlineQueue.length) {
  619. debug("send %d commands in offline queue", this.offlineQueue.length);
  620. const offlineQueue = this.offlineQueue;
  621. this.resetOfflineQueue();
  622. let item;
  623. while ((item = offlineQueue.shift())) {
  624. this.sendCommand(item.command, item.stream, item.node);
  625. }
  626. }
  627. }
  628. natMapper(nodeKey) {
  629. if (this.options.natMap && typeof this.options.natMap === "object") {
  630. const key = typeof nodeKey === "string"
  631. ? nodeKey
  632. : `${nodeKey.host}:${nodeKey.port}`;
  633. const mapped = this.options.natMap[key];
  634. if (mapped) {
  635. debug("NAT mapping %s -> %O", key, mapped);
  636. return Object.assign({}, mapped);
  637. }
  638. }
  639. return typeof nodeKey === "string"
  640. ? (0, util_1.nodeKeyToRedisOptions)(nodeKey)
  641. : nodeKey;
  642. }
  643. getInfoFromNode(redis, callback) {
  644. if (!redis) {
  645. return callback(new Error("Node is disconnected"));
  646. }
  647. // Use a duplication of the connection to avoid
  648. // timeouts when the connection is in the blocking
  649. // mode (e.g. waiting for BLPOP).
  650. const duplicatedConnection = redis.duplicate({
  651. enableOfflineQueue: true,
  652. enableReadyCheck: false,
  653. retryStrategy: null,
  654. connectionName: (0, util_1.getConnectionName)("refresher", this.options.redisOptions && this.options.redisOptions.connectionName),
  655. });
  656. // Ignore error events since we will handle
  657. // exceptions for the CLUSTER SLOTS command.
  658. duplicatedConnection.on("error", utils_1.noop);
  659. duplicatedConnection.cluster("SLOTS", (0, utils_1.timeout)((err, result) => {
  660. duplicatedConnection.disconnect();
  661. if (err) {
  662. return callback(err);
  663. }
  664. if (this.status === "disconnecting" ||
  665. this.status === "close" ||
  666. this.status === "end") {
  667. debug("ignore CLUSTER.SLOTS results (count: %d) since cluster status is %s", result.length, this.status);
  668. callback();
  669. return;
  670. }
  671. const nodes = [];
  672. debug("cluster slots result count: %d", result.length);
  673. for (let i = 0; i < result.length; ++i) {
  674. const items = result[i];
  675. const slotRangeStart = items[0];
  676. const slotRangeEnd = items[1];
  677. const keys = [];
  678. for (let j = 2; j < items.length; j++) {
  679. if (!items[j][0]) {
  680. continue;
  681. }
  682. const node = this.natMapper({
  683. host: items[j][0],
  684. port: items[j][1],
  685. });
  686. node.readOnly = j !== 2;
  687. nodes.push(node);
  688. keys.push(node.host + ":" + node.port);
  689. }
  690. debug("cluster slots result [%d]: slots %d~%d served by %s", i, slotRangeStart, slotRangeEnd, keys);
  691. for (let slot = slotRangeStart; slot <= slotRangeEnd; slot++) {
  692. this.slots[slot] = keys;
  693. }
  694. }
  695. // Assign to each node keys a numeric value to make autopipeline comparison faster.
  696. this._groupsIds = Object.create(null);
  697. let j = 0;
  698. for (let i = 0; i < 16384; i++) {
  699. const target = (this.slots[i] || []).join(";");
  700. if (!target.length) {
  701. this._groupsBySlot[i] = undefined;
  702. continue;
  703. }
  704. if (!this._groupsIds[target]) {
  705. this._groupsIds[target] = ++j;
  706. }
  707. this._groupsBySlot[i] = this._groupsIds[target];
  708. }
  709. this.connectionPool.reset(nodes);
  710. callback();
  711. }, this.options.slotsRefreshTimeout));
  712. }
  713. invokeReadyDelayedCallbacks(err) {
  714. for (const c of this._readyDelayedCallbacks) {
  715. process.nextTick(c, err);
  716. }
  717. this._readyDelayedCallbacks = [];
  718. }
  719. /**
  720. * Check whether Cluster is able to process commands
  721. */
  722. readyCheck(callback) {
  723. this.cluster("INFO", (err, res) => {
  724. if (err) {
  725. return callback(err);
  726. }
  727. if (typeof res !== "string") {
  728. return callback();
  729. }
  730. let state;
  731. const lines = res.split("\r\n");
  732. for (let i = 0; i < lines.length; ++i) {
  733. const parts = lines[i].split(":");
  734. if (parts[0] === "cluster_state") {
  735. state = parts[1];
  736. break;
  737. }
  738. }
  739. if (state === "fail") {
  740. debug("cluster state not ok (%s)", state);
  741. callback(null, state);
  742. }
  743. else {
  744. callback();
  745. }
  746. });
  747. }
  748. resolveSrv(hostname) {
  749. return new Promise((resolve, reject) => {
  750. this.options.resolveSrv(hostname, (err, records) => {
  751. if (err) {
  752. return reject(err);
  753. }
  754. const self = this, groupedRecords = (0, util_1.groupSrvRecords)(records), sortedKeys = Object.keys(groupedRecords).sort((a, b) => parseInt(a) - parseInt(b));
  755. function tryFirstOne(err) {
  756. if (!sortedKeys.length) {
  757. return reject(err);
  758. }
  759. const key = sortedKeys[0], group = groupedRecords[key], record = (0, util_1.weightSrvRecords)(group);
  760. if (!group.records.length) {
  761. sortedKeys.shift();
  762. }
  763. self.dnsLookup(record.name).then((host) => resolve({
  764. host,
  765. port: record.port,
  766. }), tryFirstOne);
  767. }
  768. tryFirstOne();
  769. });
  770. });
  771. }
  772. dnsLookup(hostname) {
  773. return new Promise((resolve, reject) => {
  774. this.options.dnsLookup(hostname, (err, address) => {
  775. if (err) {
  776. debug("failed to resolve hostname %s to IP: %s", hostname, err.message);
  777. reject(err);
  778. }
  779. else {
  780. debug("resolved hostname %s to IP %s", hostname, address);
  781. resolve(address);
  782. }
  783. });
  784. });
  785. }
  786. /**
  787. * Normalize startup nodes, and resolving hostnames to IPs.
  788. *
  789. * This process happens every time when #connect() is called since
  790. * #startupNodes and DNS records may chanage.
  791. */
  792. async resolveStartupNodeHostnames() {
  793. if (!Array.isArray(this.startupNodes) || this.startupNodes.length === 0) {
  794. throw new Error("`startupNodes` should contain at least one node.");
  795. }
  796. const startupNodes = (0, util_1.normalizeNodeOptions)(this.startupNodes);
  797. const hostnames = (0, util_1.getUniqueHostnamesFromOptions)(startupNodes);
  798. if (hostnames.length === 0) {
  799. return startupNodes;
  800. }
  801. const configs = await Promise.all(hostnames.map((this.options.useSRVRecords ? this.resolveSrv : this.dnsLookup).bind(this)));
  802. const hostnameToConfig = (0, utils_1.zipMap)(hostnames, configs);
  803. return startupNodes.map((node) => {
  804. const config = hostnameToConfig.get(node.host);
  805. if (!config) {
  806. return node;
  807. }
  808. if (this.options.useSRVRecords) {
  809. return Object.assign({}, node, config);
  810. }
  811. return Object.assign({}, node, { host: config });
  812. });
  813. }
  814. createScanStream(command, { key, options = {} }) {
  815. return new ScanStream_1.default({
  816. objectMode: true,
  817. key: key,
  818. redis: this,
  819. command: command,
  820. ...options,
  821. });
  822. }
  823. }
  824. (0, applyMixin_1.default)(Cluster, events_1.EventEmitter);
  825. (0, transaction_1.addTransactionSupport)(Cluster.prototype);
  826. exports.default = Cluster;