event_handler.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.readyHandler = exports.errorHandler = exports.closeHandler = exports.connectHandler = void 0;
  4. const redis_errors_1 = require("redis-errors");
  5. const Command_1 = require("../Command");
  6. const errors_1 = require("../errors");
  7. const utils_1 = require("../utils");
  8. const DataHandler_1 = require("../DataHandler");
  9. const debug = (0, utils_1.Debug)("connection");
  10. function connectHandler(self) {
  11. return function () {
  12. self.setStatus("connect");
  13. self.resetCommandQueue();
  14. // AUTH command should be processed before any other commands
  15. let flushed = false;
  16. const { connectionEpoch } = self;
  17. if (self.condition.auth) {
  18. self.auth(self.condition.auth, function (err) {
  19. if (connectionEpoch !== self.connectionEpoch) {
  20. return;
  21. }
  22. if (err) {
  23. if (err.message.indexOf("no password is set") !== -1) {
  24. console.warn("[WARN] Redis server does not require a password, but a password was supplied.");
  25. }
  26. else if (err.message.indexOf("without any password configured for the default user") !== -1) {
  27. console.warn("[WARN] This Redis server's `default` user does not require a password, but a password was supplied");
  28. }
  29. else if (err.message.indexOf("wrong number of arguments for 'auth' command") !== -1) {
  30. console.warn(`[ERROR] The server returned "wrong number of arguments for 'auth' command". You are probably passing both username and password to Redis version 5 or below. You should only pass the 'password' option for Redis version 5 and under.`);
  31. }
  32. else {
  33. flushed = true;
  34. self.recoverFromFatalError(err, err);
  35. }
  36. }
  37. });
  38. }
  39. if (self.condition.select) {
  40. self.select(self.condition.select).catch((err) => {
  41. // If the node is in cluster mode, select is disallowed.
  42. // In this case, reconnect won't help.
  43. self.silentEmit("error", err);
  44. });
  45. }
  46. if (!self.options.enableReadyCheck) {
  47. exports.readyHandler(self)();
  48. }
  49. /*
  50. No need to keep the reference of DataHandler here
  51. because we don't need to do the cleanup.
  52. `Stream#end()` will remove all listeners for us.
  53. */
  54. new DataHandler_1.default(self, {
  55. stringNumbers: self.options.stringNumbers,
  56. });
  57. if (self.options.enableReadyCheck) {
  58. self._readyCheck(function (err, info) {
  59. if (connectionEpoch !== self.connectionEpoch) {
  60. return;
  61. }
  62. if (err) {
  63. if (!flushed) {
  64. self.recoverFromFatalError(new Error("Ready check failed: " + err.message), err);
  65. }
  66. }
  67. else {
  68. if (self.connector.check(info)) {
  69. exports.readyHandler(self)();
  70. }
  71. else {
  72. self.disconnect(true);
  73. }
  74. }
  75. });
  76. }
  77. };
  78. }
  79. exports.connectHandler = connectHandler;
  80. function abortError(command) {
  81. const err = new redis_errors_1.AbortError("Command aborted due to connection close");
  82. err.command = {
  83. name: command.name,
  84. args: command.args,
  85. };
  86. return err;
  87. }
  88. // If a contiguous set of pipeline commands starts from index zero then they
  89. // can be safely reattempted. If however we have a chain of pipelined commands
  90. // starting at index 1 or more it means we received a partial response before
  91. // the connection close and those pipelined commands must be aborted. For
  92. // example, if the queue looks like this: [2, 3, 4, 0, 1, 2] then after
  93. // aborting and purging we'll have a queue that looks like this: [0, 1, 2]
  94. function abortIncompletePipelines(commandQueue) {
  95. let expectedIndex = 0;
  96. for (let i = 0; i < commandQueue.length;) {
  97. const command = commandQueue.peekAt(i).command;
  98. const pipelineIndex = command.pipelineIndex;
  99. if (pipelineIndex === undefined || pipelineIndex === 0) {
  100. expectedIndex = 0;
  101. }
  102. if (pipelineIndex !== undefined && pipelineIndex !== expectedIndex++) {
  103. commandQueue.remove(i, 1);
  104. command.reject(abortError(command));
  105. continue;
  106. }
  107. i++;
  108. }
  109. }
  110. // If only a partial transaction result was received before connection close,
  111. // we have to abort any transaction fragments that may have ended up in the
  112. // offline queue
  113. function abortTransactionFragments(commandQueue) {
  114. for (let i = 0; i < commandQueue.length;) {
  115. const command = commandQueue.peekAt(i).command;
  116. if (command.name === "multi") {
  117. break;
  118. }
  119. if (command.name === "exec") {
  120. commandQueue.remove(i, 1);
  121. command.reject(abortError(command));
  122. break;
  123. }
  124. if (command.inTransaction) {
  125. commandQueue.remove(i, 1);
  126. command.reject(abortError(command));
  127. }
  128. else {
  129. i++;
  130. }
  131. }
  132. }
  133. function closeHandler(self) {
  134. return function () {
  135. self.setStatus("close");
  136. if (!self.prevCondition) {
  137. self.prevCondition = self.condition;
  138. }
  139. if (self.commandQueue.length) {
  140. abortIncompletePipelines(self.commandQueue);
  141. self.prevCommandQueue = self.commandQueue;
  142. }
  143. if (self.offlineQueue.length) {
  144. abortTransactionFragments(self.offlineQueue);
  145. }
  146. if (self.manuallyClosing) {
  147. self.manuallyClosing = false;
  148. debug("skip reconnecting since the connection is manually closed.");
  149. return close();
  150. }
  151. if (typeof self.options.retryStrategy !== "function") {
  152. debug("skip reconnecting because `retryStrategy` is not a function");
  153. return close();
  154. }
  155. const retryDelay = self.options.retryStrategy(++self.retryAttempts);
  156. if (typeof retryDelay !== "number") {
  157. debug("skip reconnecting because `retryStrategy` doesn't return a number");
  158. return close();
  159. }
  160. debug("reconnect in %sms", retryDelay);
  161. self.setStatus("reconnecting", retryDelay);
  162. self.reconnectTimeout = setTimeout(function () {
  163. self.reconnectTimeout = null;
  164. self.connect().catch(utils_1.noop);
  165. }, retryDelay);
  166. const { maxRetriesPerRequest } = self.options;
  167. if (typeof maxRetriesPerRequest === "number") {
  168. if (maxRetriesPerRequest < 0) {
  169. debug("maxRetriesPerRequest is negative, ignoring...");
  170. }
  171. else {
  172. const remainder = self.retryAttempts % (maxRetriesPerRequest + 1);
  173. if (remainder === 0) {
  174. debug("reach maxRetriesPerRequest limitation, flushing command queue...");
  175. self.flushQueue(new errors_1.MaxRetriesPerRequestError(maxRetriesPerRequest));
  176. }
  177. }
  178. }
  179. };
  180. function close() {
  181. self.setStatus("end");
  182. self.flushQueue(new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG));
  183. }
  184. }
  185. exports.closeHandler = closeHandler;
  186. function errorHandler(self) {
  187. return function (error) {
  188. debug("error: %s", error);
  189. self.silentEmit("error", error);
  190. };
  191. }
  192. exports.errorHandler = errorHandler;
  193. function readyHandler(self) {
  194. return function () {
  195. self.setStatus("ready");
  196. self.retryAttempts = 0;
  197. if (self.options.monitor) {
  198. self.call("monitor").then(() => self.setStatus("monitoring"), (error) => self.emit("error", error));
  199. const { sendCommand } = self;
  200. self.sendCommand = function (command) {
  201. if (Command_1.default.checkFlag("VALID_IN_MONITOR_MODE", command.name)) {
  202. return sendCommand.call(self, command);
  203. }
  204. command.reject(new Error("Connection is in monitoring mode, can't process commands."));
  205. return command.promise;
  206. };
  207. self.once("close", function () {
  208. delete self.sendCommand;
  209. });
  210. return;
  211. }
  212. const finalSelect = self.prevCondition
  213. ? self.prevCondition.select
  214. : self.condition.select;
  215. if (self.options.connectionName) {
  216. debug("set the connection name [%s]", self.options.connectionName);
  217. self.client("setname", self.options.connectionName).catch(utils_1.noop);
  218. }
  219. if (self.options.readOnly) {
  220. debug("set the connection to readonly mode");
  221. self.readonly().catch(utils_1.noop);
  222. }
  223. if (self.prevCondition) {
  224. const condition = self.prevCondition;
  225. self.prevCondition = null;
  226. if (condition.subscriber && self.options.autoResubscribe) {
  227. // We re-select the previous db first since
  228. // `SELECT` command is not valid in sub mode.
  229. if (self.condition.select !== finalSelect) {
  230. debug("connect to db [%d]", finalSelect);
  231. self.select(finalSelect);
  232. }
  233. const subscribeChannels = condition.subscriber.channels("subscribe");
  234. if (subscribeChannels.length) {
  235. debug("subscribe %d channels", subscribeChannels.length);
  236. self.subscribe(subscribeChannels);
  237. }
  238. const psubscribeChannels = condition.subscriber.channels("psubscribe");
  239. if (psubscribeChannels.length) {
  240. debug("psubscribe %d channels", psubscribeChannels.length);
  241. self.psubscribe(psubscribeChannels);
  242. }
  243. }
  244. }
  245. if (self.prevCommandQueue) {
  246. if (self.options.autoResendUnfulfilledCommands) {
  247. debug("resend %d unfulfilled commands", self.prevCommandQueue.length);
  248. while (self.prevCommandQueue.length > 0) {
  249. const item = self.prevCommandQueue.shift();
  250. if (item.select !== self.condition.select &&
  251. item.command.name !== "select") {
  252. self.select(item.select);
  253. }
  254. self.sendCommand(item.command, item.stream);
  255. }
  256. }
  257. else {
  258. self.prevCommandQueue = null;
  259. }
  260. }
  261. if (self.offlineQueue.length) {
  262. debug("send %d commands in offline queue", self.offlineQueue.length);
  263. const offlineQueue = self.offlineQueue;
  264. self.resetOfflineQueue();
  265. while (offlineQueue.length > 0) {
  266. const item = offlineQueue.shift();
  267. if (item.select !== self.condition.select &&
  268. item.command.name !== "select") {
  269. self.select(item.select);
  270. }
  271. self.sendCommand(item.command, item.stream);
  272. }
  273. }
  274. if (self.condition.select !== finalSelect) {
  275. debug("connect to db [%d]", finalSelect);
  276. self.select(finalSelect);
  277. }
  278. };
  279. }
  280. exports.readyHandler = readyHandler;