DelayQueue.js 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const utils_1 = require("../utils");
  4. const Deque = require("denque");
  5. const debug = (0, utils_1.Debug)("delayqueue");
  6. /**
  7. * Queue that runs items after specified duration
  8. */
  9. class DelayQueue {
  10. constructor() {
  11. this.queues = {};
  12. this.timeouts = {};
  13. }
  14. /**
  15. * Add a new item to the queue
  16. *
  17. * @param bucket bucket name
  18. * @param item function that will run later
  19. * @param options
  20. */
  21. push(bucket, item, options) {
  22. const callback = options.callback || process.nextTick;
  23. if (!this.queues[bucket]) {
  24. this.queues[bucket] = new Deque();
  25. }
  26. const queue = this.queues[bucket];
  27. queue.push(item);
  28. if (!this.timeouts[bucket]) {
  29. this.timeouts[bucket] = setTimeout(() => {
  30. callback(() => {
  31. this.timeouts[bucket] = null;
  32. this.execute(bucket);
  33. });
  34. }, options.timeout);
  35. }
  36. }
  37. execute(bucket) {
  38. const queue = this.queues[bucket];
  39. if (!queue) {
  40. return;
  41. }
  42. const { length } = queue;
  43. if (!length) {
  44. return;
  45. }
  46. debug("send %d commands in %s queue", length, bucket);
  47. this.queues[bucket] = null;
  48. while (queue.length > 0) {
  49. queue.shift()();
  50. }
  51. }
  52. }
  53. exports.default = DelayQueue;