ConnectionPool.js 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const events_1 = require("events");
  4. const utils_1 = require("../utils");
  5. const util_1 = require("./util");
  6. const Redis_1 = require("../Redis");
  7. const debug = (0, utils_1.Debug)("cluster:connectionPool");
  8. class ConnectionPool extends events_1.EventEmitter {
  9. constructor(redisOptions) {
  10. super();
  11. this.redisOptions = redisOptions;
  12. // master + slave = all
  13. this.nodes = {
  14. all: {},
  15. master: {},
  16. slave: {},
  17. };
  18. this.specifiedOptions = {};
  19. }
  20. getNodes(role = "all") {
  21. const nodes = this.nodes[role];
  22. return Object.keys(nodes).map((key) => nodes[key]);
  23. }
  24. getInstanceByKey(key) {
  25. return this.nodes.all[key];
  26. }
  27. getSampleInstance(role) {
  28. const keys = Object.keys(this.nodes[role]);
  29. const sampleKey = (0, utils_1.sample)(keys);
  30. return this.nodes[role][sampleKey];
  31. }
  32. /**
  33. * Find or create a connection to the node
  34. */
  35. findOrCreate(node, readOnly = false) {
  36. const key = (0, util_1.getNodeKey)(node);
  37. readOnly = Boolean(readOnly);
  38. if (this.specifiedOptions[key]) {
  39. Object.assign(node, this.specifiedOptions[key]);
  40. }
  41. else {
  42. this.specifiedOptions[key] = node;
  43. }
  44. let redis;
  45. if (this.nodes.all[key]) {
  46. redis = this.nodes.all[key];
  47. if (redis.options.readOnly !== readOnly) {
  48. redis.options.readOnly = readOnly;
  49. debug("Change role of %s to %s", key, readOnly ? "slave" : "master");
  50. redis[readOnly ? "readonly" : "readwrite"]().catch(utils_1.noop);
  51. if (readOnly) {
  52. delete this.nodes.master[key];
  53. this.nodes.slave[key] = redis;
  54. }
  55. else {
  56. delete this.nodes.slave[key];
  57. this.nodes.master[key] = redis;
  58. }
  59. }
  60. }
  61. else {
  62. debug("Connecting to %s as %s", key, readOnly ? "slave" : "master");
  63. redis = new Redis_1.default((0, utils_1.defaults)({
  64. // Never try to reconnect when a node is lose,
  65. // instead, waiting for a `MOVED` error and
  66. // fetch the slots again.
  67. retryStrategy: null,
  68. // Offline queue should be enabled so that
  69. // we don't need to wait for the `ready` event
  70. // before sending commands to the node.
  71. enableOfflineQueue: true,
  72. readOnly: readOnly,
  73. }, node, this.redisOptions, { lazyConnect: true }));
  74. this.nodes.all[key] = redis;
  75. this.nodes[readOnly ? "slave" : "master"][key] = redis;
  76. redis.once("end", () => {
  77. this.removeNode(key);
  78. this.emit("-node", redis, key);
  79. if (!Object.keys(this.nodes.all).length) {
  80. this.emit("drain");
  81. }
  82. });
  83. this.emit("+node", redis, key);
  84. redis.on("error", function (error) {
  85. this.emit("nodeError", error, key);
  86. });
  87. }
  88. return redis;
  89. }
  90. /**
  91. * Reset the pool with a set of nodes.
  92. * The old node will be removed.
  93. */
  94. reset(nodes) {
  95. debug("Reset with %O", nodes);
  96. const newNodes = {};
  97. nodes.forEach((node) => {
  98. const key = (0, util_1.getNodeKey)(node);
  99. // Don't override the existing (master) node
  100. // when the current one is slave.
  101. if (!(node.readOnly && newNodes[key])) {
  102. newNodes[key] = node;
  103. }
  104. });
  105. Object.keys(this.nodes.all).forEach((key) => {
  106. if (!newNodes[key]) {
  107. debug("Disconnect %s because the node does not hold any slot", key);
  108. this.nodes.all[key].disconnect();
  109. this.removeNode(key);
  110. }
  111. });
  112. Object.keys(newNodes).forEach((key) => {
  113. const node = newNodes[key];
  114. this.findOrCreate(node, node.readOnly);
  115. });
  116. }
  117. /**
  118. * Remove a node from the pool.
  119. */
  120. removeNode(key) {
  121. const { nodes } = this;
  122. if (nodes.all[key]) {
  123. debug("Remove %s from the pool", key);
  124. delete nodes.all[key];
  125. }
  126. delete nodes.master[key];
  127. delete nodes.slave[key];
  128. }
  129. }
  130. exports.default = ConnectionPool;