ClusterSubscriber.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const util_1 = require("./util");
  4. const utils_1 = require("../utils");
  5. const Redis_1 = require("../Redis");
  6. const debug = (0, utils_1.Debug)("cluster:subscriber");
  7. class ClusterSubscriber {
  8. constructor(connectionPool, emitter) {
  9. this.connectionPool = connectionPool;
  10. this.emitter = emitter;
  11. this.started = false;
  12. this.subscriber = null;
  13. this.connectionPool.on("-node", (_, key) => {
  14. if (!this.started || !this.subscriber) {
  15. return;
  16. }
  17. if ((0, util_1.getNodeKey)(this.subscriber.options) === key) {
  18. debug("subscriber has left, selecting a new one...");
  19. this.selectSubscriber();
  20. }
  21. });
  22. this.connectionPool.on("+node", () => {
  23. if (!this.started || this.subscriber) {
  24. return;
  25. }
  26. debug("a new node is discovered and there is no subscriber, selecting a new one...");
  27. this.selectSubscriber();
  28. });
  29. }
  30. getInstance() {
  31. return this.subscriber;
  32. }
  33. start() {
  34. this.started = true;
  35. this.selectSubscriber();
  36. debug("started");
  37. }
  38. stop() {
  39. this.started = false;
  40. if (this.subscriber) {
  41. this.subscriber.disconnect();
  42. this.subscriber = null;
  43. }
  44. debug("stopped");
  45. }
  46. selectSubscriber() {
  47. const lastActiveSubscriber = this.lastActiveSubscriber;
  48. // Disconnect the previous subscriber even if there
  49. // will not be a new one.
  50. if (lastActiveSubscriber) {
  51. lastActiveSubscriber.disconnect();
  52. }
  53. if (this.subscriber) {
  54. this.subscriber.disconnect();
  55. }
  56. const sampleNode = (0, utils_1.sample)(this.connectionPool.getNodes());
  57. if (!sampleNode) {
  58. debug("selecting subscriber failed since there is no node discovered in the cluster yet");
  59. this.subscriber = null;
  60. return;
  61. }
  62. const { options } = sampleNode;
  63. debug("selected a subscriber %s:%s", options.host, options.port);
  64. /*
  65. * Create a specialized Redis connection for the subscription.
  66. * Note that auto reconnection is enabled here.
  67. *
  68. * `enableReadyCheck` is also enabled because although subscription is allowed
  69. * while redis is loading data from the disk, we can check if the password
  70. * provided for the subscriber is correct, and if not, the current subscriber
  71. * will be disconnected and a new subscriber will be selected.
  72. */
  73. this.subscriber = new Redis_1.default({
  74. port: options.port,
  75. host: options.host,
  76. username: options.username,
  77. password: options.password,
  78. enableReadyCheck: true,
  79. connectionName: (0, util_1.getConnectionName)("subscriber", options.connectionName),
  80. lazyConnect: true,
  81. tls: options.tls,
  82. });
  83. // Ignore the errors since they're handled in the connection pool.
  84. this.subscriber.on("error", utils_1.noop);
  85. // Re-subscribe previous channels
  86. const previousChannels = { subscribe: [], psubscribe: [] };
  87. if (lastActiveSubscriber) {
  88. const condition = lastActiveSubscriber.condition || lastActiveSubscriber.prevCondition;
  89. if (condition && condition.subscriber) {
  90. previousChannels.subscribe = condition.subscriber.channels("subscribe");
  91. previousChannels.psubscribe =
  92. condition.subscriber.channels("psubscribe");
  93. }
  94. }
  95. if (previousChannels.subscribe.length ||
  96. previousChannels.psubscribe.length) {
  97. let pending = 0;
  98. for (const type of ["subscribe", "psubscribe"]) {
  99. const channels = previousChannels[type];
  100. if (channels.length) {
  101. pending += 1;
  102. debug("%s %d channels", type, channels.length);
  103. this.subscriber[type](channels)
  104. .then(() => {
  105. if (!--pending) {
  106. this.lastActiveSubscriber = this.subscriber;
  107. }
  108. })
  109. .catch(() => {
  110. // TODO: should probably disconnect the subscriber and try again.
  111. debug("failed to %s %d channels", type, channels.length);
  112. });
  113. }
  114. }
  115. }
  116. else {
  117. this.lastActiveSubscriber = this.subscriber;
  118. }
  119. for (const event of ["message", "messageBuffer"]) {
  120. this.subscriber.on(event, (arg1, arg2) => {
  121. this.emitter.emit(event, arg1, arg2);
  122. });
  123. }
  124. for (const event of ["pmessage", "pmessageBuffer"]) {
  125. this.subscriber.on(event, (arg1, arg2, arg3) => {
  126. this.emitter.emit(event, arg1, arg2, arg3);
  127. });
  128. }
  129. }
  130. }
  131. exports.default = ClusterSubscriber;