index.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.SentinelIterator = void 0;
  4. const net_1 = require("net");
  5. const utils_1 = require("../../utils");
  6. const tls_1 = require("tls");
  7. const SentinelIterator_1 = require("./SentinelIterator");
  8. exports.SentinelIterator = SentinelIterator_1.default;
  9. const AbstractConnector_1 = require("../AbstractConnector");
  10. const Redis_1 = require("../../Redis");
  11. const FailoverDetector_1 = require("./FailoverDetector");
  12. const debug = (0, utils_1.Debug)("SentinelConnector");
  13. class SentinelConnector extends AbstractConnector_1.default {
  14. constructor(options) {
  15. super(options.disconnectTimeout);
  16. this.options = options;
  17. this.emitter = null;
  18. this.failoverDetector = null;
  19. if (!this.options.sentinels.length) {
  20. throw new Error("Requires at least one sentinel to connect to.");
  21. }
  22. if (!this.options.name) {
  23. throw new Error("Requires the name of master.");
  24. }
  25. this.sentinelIterator = new SentinelIterator_1.default(this.options.sentinels);
  26. }
  27. check(info) {
  28. const roleMatches = !info.role || this.options.role === info.role;
  29. if (!roleMatches) {
  30. debug("role invalid, expected %s, but got %s", this.options.role, info.role);
  31. // Start from the next item.
  32. // Note that `reset` will move the cursor to the previous element,
  33. // so we advance two steps here.
  34. this.sentinelIterator.next();
  35. this.sentinelIterator.next();
  36. this.sentinelIterator.reset(true);
  37. }
  38. return roleMatches;
  39. }
  40. disconnect() {
  41. super.disconnect();
  42. if (this.failoverDetector) {
  43. this.failoverDetector.cleanup();
  44. }
  45. }
  46. connect(eventEmitter) {
  47. this.connecting = true;
  48. this.retryAttempts = 0;
  49. let lastError;
  50. const connectToNext = async () => {
  51. const endpoint = this.sentinelIterator.next();
  52. if (endpoint.done) {
  53. this.sentinelIterator.reset(false);
  54. const retryDelay = typeof this.options.sentinelRetryStrategy === "function"
  55. ? this.options.sentinelRetryStrategy(++this.retryAttempts)
  56. : null;
  57. let errorMsg = typeof retryDelay !== "number"
  58. ? "All sentinels are unreachable and retry is disabled."
  59. : `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.`;
  60. if (lastError) {
  61. errorMsg += ` Last error: ${lastError.message}`;
  62. }
  63. debug(errorMsg);
  64. const error = new Error(errorMsg);
  65. if (typeof retryDelay === "number") {
  66. eventEmitter("error", error);
  67. await new Promise((resolve) => setTimeout(resolve, retryDelay));
  68. return connectToNext();
  69. }
  70. else {
  71. throw error;
  72. }
  73. }
  74. let resolved = null;
  75. let err = null;
  76. try {
  77. resolved = await this.resolve(endpoint.value);
  78. }
  79. catch (error) {
  80. err = error;
  81. }
  82. if (!this.connecting) {
  83. throw new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG);
  84. }
  85. const endpointAddress = endpoint.value.host + ":" + endpoint.value.port;
  86. if (resolved) {
  87. debug("resolved: %s:%s from sentinel %s", resolved.host, resolved.port, endpointAddress);
  88. if (this.options.enableTLSForSentinelMode && this.options.tls) {
  89. Object.assign(resolved, this.options.tls);
  90. this.stream = (0, tls_1.connect)(resolved);
  91. this.stream.once("secureConnect", this.initFailoverDetector.bind(this));
  92. }
  93. else {
  94. this.stream = (0, net_1.createConnection)(resolved);
  95. this.stream.once("connect", this.initFailoverDetector.bind(this));
  96. }
  97. this.stream.once("error", (err) => {
  98. this.firstError = err;
  99. });
  100. return this.stream;
  101. }
  102. else {
  103. const errorMsg = err
  104. ? "failed to connect to sentinel " +
  105. endpointAddress +
  106. " because " +
  107. err.message
  108. : "connected to sentinel " +
  109. endpointAddress +
  110. " successfully, but got an invalid reply: " +
  111. resolved;
  112. debug(errorMsg);
  113. eventEmitter("sentinelError", new Error(errorMsg));
  114. if (err) {
  115. lastError = err;
  116. }
  117. return connectToNext();
  118. }
  119. };
  120. return connectToNext();
  121. }
  122. async updateSentinels(client) {
  123. if (!this.options.updateSentinels) {
  124. return;
  125. }
  126. const result = await client.sentinel("sentinels", this.options.name);
  127. if (!Array.isArray(result)) {
  128. return;
  129. }
  130. result
  131. .map(utils_1.packObject)
  132. .forEach((sentinel) => {
  133. const flags = sentinel.flags ? sentinel.flags.split(",") : [];
  134. if (flags.indexOf("disconnected") === -1 &&
  135. sentinel.ip &&
  136. sentinel.port) {
  137. const endpoint = this.sentinelNatResolve(addressResponseToAddress(sentinel));
  138. if (this.sentinelIterator.add(endpoint)) {
  139. debug("adding sentinel %s:%s", endpoint.host, endpoint.port);
  140. }
  141. }
  142. });
  143. debug("Updated internal sentinels: %s", this.sentinelIterator);
  144. }
  145. async resolveMaster(client) {
  146. const result = await client.sentinel("get-master-addr-by-name", this.options.name);
  147. await this.updateSentinels(client);
  148. return this.sentinelNatResolve(Array.isArray(result)
  149. ? { host: result[0], port: Number(result[1]) }
  150. : null);
  151. }
  152. async resolveSlave(client) {
  153. const result = await client.sentinel("slaves", this.options.name);
  154. if (!Array.isArray(result)) {
  155. return null;
  156. }
  157. const availableSlaves = result
  158. .map(utils_1.packObject)
  159. .filter((slave) => slave.flags && !slave.flags.match(/(disconnected|s_down|o_down)/));
  160. return this.sentinelNatResolve(selectPreferredSentinel(availableSlaves, this.options.preferredSlaves));
  161. }
  162. sentinelNatResolve(item) {
  163. if (!item || !this.options.natMap)
  164. return item;
  165. return this.options.natMap[`${item.host}:${item.port}`] || item;
  166. }
  167. connectToSentinel(endpoint, options) {
  168. const redis = new Redis_1.default({
  169. port: endpoint.port || 26379,
  170. host: endpoint.host,
  171. username: this.options.sentinelUsername || null,
  172. password: this.options.sentinelPassword || null,
  173. family: endpoint.family ||
  174. // @ts-expect-error
  175. ("path" in this.options && this.options.path
  176. ? undefined
  177. : // @ts-expect-error
  178. this.options.family),
  179. tls: this.options.sentinelTLS,
  180. retryStrategy: null,
  181. enableReadyCheck: false,
  182. connectTimeout: this.options.connectTimeout,
  183. commandTimeout: this.options.sentinelCommandTimeout,
  184. ...options,
  185. });
  186. // @ts-expect-error
  187. return redis;
  188. }
  189. async resolve(endpoint) {
  190. const client = this.connectToSentinel(endpoint);
  191. // ignore the errors since resolve* methods will handle them
  192. client.on("error", noop);
  193. try {
  194. if (this.options.role === "slave") {
  195. return await this.resolveSlave(client);
  196. }
  197. else {
  198. return await this.resolveMaster(client);
  199. }
  200. }
  201. finally {
  202. client.disconnect();
  203. }
  204. }
  205. async initFailoverDetector() {
  206. var _a;
  207. if (!this.options.failoverDetector) {
  208. return;
  209. }
  210. // Move the current sentinel to the first position
  211. this.sentinelIterator.reset(true);
  212. const sentinels = [];
  213. // In case of a large amount of sentinels, limit the number of concurrent connections
  214. while (sentinels.length < this.options.sentinelMaxConnections) {
  215. const { done, value } = this.sentinelIterator.next();
  216. if (done) {
  217. break;
  218. }
  219. const client = this.connectToSentinel(value, {
  220. lazyConnect: true,
  221. retryStrategy: this.options.sentinelReconnectStrategy,
  222. });
  223. client.on("reconnecting", () => {
  224. var _a;
  225. // Tests listen to this event
  226. (_a = this.emitter) === null || _a === void 0 ? void 0 : _a.emit("sentinelReconnecting");
  227. });
  228. sentinels.push({ address: value, client });
  229. }
  230. this.sentinelIterator.reset(false);
  231. if (this.failoverDetector) {
  232. // Clean up previous detector
  233. this.failoverDetector.cleanup();
  234. }
  235. this.failoverDetector = new FailoverDetector_1.FailoverDetector(this, sentinels);
  236. await this.failoverDetector.subscribe();
  237. // Tests listen to this event
  238. (_a = this.emitter) === null || _a === void 0 ? void 0 : _a.emit("failoverSubscribed");
  239. }
  240. }
  241. exports.default = SentinelConnector;
  242. function selectPreferredSentinel(availableSlaves, preferredSlaves) {
  243. if (availableSlaves.length === 0) {
  244. return null;
  245. }
  246. let selectedSlave;
  247. if (typeof preferredSlaves === "function") {
  248. selectedSlave = preferredSlaves(availableSlaves);
  249. }
  250. else if (preferredSlaves !== null && typeof preferredSlaves === "object") {
  251. const preferredSlavesArray = Array.isArray(preferredSlaves)
  252. ? preferredSlaves
  253. : [preferredSlaves];
  254. // sort by priority
  255. preferredSlavesArray.sort((a, b) => {
  256. // default the priority to 1
  257. if (!a.prio) {
  258. a.prio = 1;
  259. }
  260. if (!b.prio) {
  261. b.prio = 1;
  262. }
  263. // lowest priority first
  264. if (a.prio < b.prio) {
  265. return -1;
  266. }
  267. if (a.prio > b.prio) {
  268. return 1;
  269. }
  270. return 0;
  271. });
  272. // loop over preferred slaves and return the first match
  273. for (let p = 0; p < preferredSlavesArray.length; p++) {
  274. for (let a = 0; a < availableSlaves.length; a++) {
  275. const slave = availableSlaves[a];
  276. if (slave.ip === preferredSlavesArray[p].ip) {
  277. if (slave.port === preferredSlavesArray[p].port) {
  278. selectedSlave = slave;
  279. break;
  280. }
  281. }
  282. }
  283. if (selectedSlave) {
  284. break;
  285. }
  286. }
  287. }
  288. // if none of the preferred slaves are available, a random available slave is returned
  289. if (!selectedSlave) {
  290. selectedSlave = (0, utils_1.sample)(availableSlaves);
  291. }
  292. return addressResponseToAddress(selectedSlave);
  293. }
  294. function addressResponseToAddress(input) {
  295. return { host: input.ip, port: Number(input.port) };
  296. }
  297. function noop() { }