diff --git a/integration-tests/api/package.json b/integration-tests/api/package.json index b33b3077b5..071024f23b 100644 --- a/integration-tests/api/package.json +++ b/integration-tests/api/package.json @@ -8,16 +8,16 @@ "build": "babel src -d dist --extensions \".ts,.js\"" }, "dependencies": { - "@medusajs/medusa": "1.3.0-dev-1652704115624", + "@medusajs/medusa": "1.2.1-dev-1650623081351", "faker": "^5.5.3", - "medusa-interfaces": "1.3.0-dev-1652704115624", + "medusa-interfaces": "1.2.1-dev-1650623081351", "typeorm": "^0.2.31" }, "devDependencies": { "@babel/cli": "^7.12.10", "@babel/core": "^7.12.10", "@babel/node": "^7.12.10", - "babel-preset-medusa-package": "1.1.19-dev-1652704115624", + "babel-preset-medusa-package": "1.1.19-dev-1650623081351", "jest": "^26.6.3" } } diff --git a/integration-tests/api/yarn.lock b/integration-tests/api/yarn.lock index 9222f91511..4e74140869 100644 --- a/integration-tests/api/yarn.lock +++ b/integration-tests/api/yarn.lock @@ -1312,25 +1312,10 @@ "@jridgewell/resolve-uri" "^3.0.3" "@jridgewell/sourcemap-codec" "^1.4.10" -"@mapbox/node-pre-gyp@^1.0.0": - version "1.0.9" - resolved "http://localhost:4873/@mapbox%2fnode-pre-gyp/-/node-pre-gyp-1.0.9.tgz#09a8781a3a036151cdebbe8719d6f8b25d4058bc" - integrity sha512-aDF3S3rK9Q2gey/WAttUlISduDItz5BU3306M9Eyv6/oS40aMprnopshtlKTykxRNIBEZuRMaZAnbrQ4QtKGyw== - dependencies: - detect-libc "^2.0.0" - https-proxy-agent "^5.0.0" - make-dir "^3.1.0" - node-fetch "^2.6.7" - nopt "^5.0.0" - npmlog "^5.0.1" - rimraf "^3.0.2" - semver "^7.3.5" - tar "^6.1.11" - -"@medusajs/medusa-cli@1.3.0-dev-1652704115624": - version "1.3.0-dev-1652704115624" - resolved "http://localhost:4873/@medusajs%2fmedusa-cli/-/medusa-cli-1.3.0-dev-1652704115624.tgz#9841fcc6123cd9c72d544d48316d66cf49ad8dc0" - integrity sha512-Lhk6pdvgv4UrLLUY/aYuty3TKfgDlvSUfmG4cTgZXbLehnRSbLelPcX+YKCtiM4d0NYfgF364H7p0NAMuCkBIg== +"@medusajs/medusa-cli@1.2.1-dev-1650623081351": + version "1.2.1-dev-1650623081351" + resolved "http://localhost:4873/@medusajs%2fmedusa-cli/-/medusa-cli-1.2.1-dev-1650623081351.tgz#4c02a0989ecb2dda97839c5a4d9f70ce2c5fda6c" + integrity sha512-afZmMq3Z6WTO77wnbQkdeq7VUxT+6bth7n5Kld3beK+4YBY7DHMTVHs+GoHxagpTBOA6jdvqBXT1llKl/UMT6Q== dependencies: "@babel/polyfill" "^7.8.7" "@babel/runtime" "^7.9.6" @@ -1348,8 +1333,8 @@ is-valid-path "^0.1.1" joi-objectid "^3.0.1" meant "^1.0.1" - medusa-core-utils "1.1.31-dev-1652704115624" - medusa-telemetry "0.0.11-dev-1652704115624" + medusa-core-utils "1.1.31-dev-1650623081351" + medusa-telemetry "0.0.11-dev-1650623081351" netrc-parser "^3.1.6" open "^8.0.6" ora "^5.4.1" @@ -1363,13 +1348,13 @@ winston "^3.3.3" yargs "^15.3.1" -"@medusajs/medusa@1.3.0-dev-1652704115624": - version "1.3.0-dev-1652704115624" - resolved "http://localhost:4873/@medusajs%2fmedusa/-/medusa-1.3.0-dev-1652704115624.tgz#040eede718d0eec6b01b4471e9af8495314c3ff0" - integrity sha512-x2Wg7lP5A25NMENqcZoC00O46Y3ojLaSnxa9L6E3u375nYWa99uxyT8ckXhklVqzhM3iBtReb/20H4r2+niwmw== +"@medusajs/medusa@1.2.1-dev-1650623081351": + version "1.2.1-dev-1650623081351" + resolved "http://localhost:4873/@medusajs%2fmedusa/-/medusa-1.2.1-dev-1650623081351.tgz#9e887b9d8c396e08d04597b3cb0faf61e702d745" + integrity sha512-1PNRQcqeKgSQ+LWRLGD9TkNX0csXKk+eKISw5MJKW4U0WB1fyi5TVAqtEA2mmWKMQDc0sq0H3DwaOBrwtTL/XA== dependencies: "@hapi/joi" "^16.1.8" - "@medusajs/medusa-cli" "1.3.0-dev-1652704115624" + "@medusajs/medusa-cli" "1.2.1-dev-1650623081351" "@types/lodash" "^4.14.168" awilix "^4.2.3" body-parser "^1.19.0" @@ -1392,8 +1377,8 @@ joi "^17.3.0" joi-objectid "^3.0.1" jsonwebtoken "^8.5.1" - medusa-core-utils "1.1.31-dev-1652704115624" - medusa-test-utils "1.1.37-dev-1652704115624" + medusa-core-utils "1.1.31-dev-1650623081351" + medusa-test-utils "1.1.37-dev-1650623081351" morgan "^1.9.1" multer "^1.4.2" passport "^0.4.0" @@ -2039,10 +2024,10 @@ babel-preset-jest@^26.6.2: babel-plugin-jest-hoist "^26.6.2" babel-preset-current-node-syntax "^1.0.0" -babel-preset-medusa-package@1.1.19-dev-1652704115624: - version "1.1.19-dev-1652704115624" - resolved "http://localhost:4873/babel-preset-medusa-package/-/babel-preset-medusa-package-1.1.19-dev-1652704115624.tgz#fa584e39e7c0a1b808af25953f81653252225be7" - integrity sha512-Nv8Si592nO+UZmMQuOGCs1pFs5ShIsCKclbqBybahjGZIPPD6bVNaz9dujSaZnlxvA9a3gn4dvuh3H8IRitUug== +babel-preset-medusa-package@1.1.19-dev-1650623081351: + version "1.1.19-dev-1650623081351" + resolved "http://localhost:4873/babel-preset-medusa-package/-/babel-preset-medusa-package-1.1.19-dev-1650623081351.tgz#bd46931534637e9b6a6c37a8fd740622967a7258" + integrity sha512-eL/xbx7BG1Yrx2WfvdDsZknR94okk1VPigp54HYEXI8kZu/NhzZlayMSTMFiOrF4NvnW0i/3DS9pUcxdG2gQEg== dependencies: "@babel/plugin-proposal-class-properties" "^7.12.1" "@babel/plugin-proposal-decorators" "^7.12.1" @@ -5168,23 +5153,23 @@ media-typer@0.3.0: resolved "http://localhost:4873/media-typer/-/media-typer-0.3.0.tgz#8710d7af0aa626f8fffa1ce00168545263255748" integrity sha1-hxDXrwqmJvj/+hzgAWhUUmMlV0g= -medusa-core-utils@1.1.31-dev-1652704115624: - version "1.1.31-dev-1652704115624" - resolved "http://localhost:4873/medusa-core-utils/-/medusa-core-utils-1.1.31-dev-1652704115624.tgz#cda24aa1a292a1bd0a770774aae9a970fc7ab963" - integrity sha512-evkWva10x6JaHBex5S8Zi9Ae/ZStG858nb18w5ITxbCHWvGYq6/VyLpo4vB7I6wuU3z6kDTMd/yyvsVXv33O2g== +medusa-core-utils@1.1.31-dev-1650623081351: + version "1.1.31-dev-1650623081351" + resolved "http://localhost:4873/medusa-core-utils/-/medusa-core-utils-1.1.31-dev-1650623081351.tgz#ac70dfe1bf7fac52fa1d08a49bba7b522f1073f6" + integrity sha512-PMqWGob/v+nQfkiUJyA8NfyosZIgbBAXwHrr61XSJWECtnwNYA/VoCXsqJwwnluE3mHbX1ePh5dfzHQ/FS03+g== dependencies: joi "^17.3.0" joi-objectid "^3.0.1" -medusa-interfaces@1.3.0-dev-1652704115624: - version "1.3.0-dev-1652704115624" - resolved "http://localhost:4873/medusa-interfaces/-/medusa-interfaces-1.3.0-dev-1652704115624.tgz#02dbbda562f99e7de96e7e6657a6af2a20855e36" - integrity sha512-aSm6gYWF0gPHoIswHllB/YrGwkrYRr4ZrBmWh3QSulDYnjmtyBFHKkatrC1pscAEFG/5h4d1LEENUHYrBQ4tQg== +medusa-interfaces@1.2.1-dev-1650623081351: + version "1.2.1-dev-1650623081351" + resolved "http://localhost:4873/medusa-interfaces/-/medusa-interfaces-1.2.1-dev-1650623081351.tgz#1d9e2f924c79ff256c1e49693ac38b992415fc9e" + integrity sha512-z8ESArLGSIfOwQ1MwgG3BmFkAA0FTSZFjJ313FCbWAnxY7y9KqmMWhKGJd2keybafrm9OghTZknuQhMCbgwkcw== -medusa-telemetry@0.0.11-dev-1652704115624: - version "0.0.11-dev-1652704115624" - resolved "http://localhost:4873/medusa-telemetry/-/medusa-telemetry-0.0.11-dev-1652704115624.tgz#4de0f4a66a9f8bb4f61d8b7c24aa9cbd719039a7" - integrity sha512-sfCzUM4mlXpBJVaqAycKAmeEWalzyg18+gUo+As/mxoEUIUR5XtpZ5Bdx484/2GUvRP8OdOYC1uPpbuTZbLXcw== +medusa-telemetry@0.0.11-dev-1650623081351: + version "0.0.11-dev-1650623081351" + resolved "http://localhost:4873/medusa-telemetry/-/medusa-telemetry-0.0.11-dev-1650623081351.tgz#6c672cb98c4f24ce8d0722020aae1b2ed75d402e" + integrity sha512-d4ul1DZwPRhDk4tgODiCybPF7nms7QWfNj8g7Jx3yjFVV3o4GK4vl3ui5E21SMyqKMsGX3WtUeRInQ+qWWiOiA== dependencies: axios "^0.21.1" axios-retry "^3.1.9" @@ -5196,13 +5181,13 @@ medusa-telemetry@0.0.11-dev-1652704115624: remove-trailing-slash "^0.1.1" uuid "^8.3.2" -medusa-test-utils@1.1.37-dev-1652704115624: - version "1.1.37-dev-1652704115624" - resolved "http://localhost:4873/medusa-test-utils/-/medusa-test-utils-1.1.37-dev-1652704115624.tgz#7118753a4afd1c6ed6f1d1ceb75fdf9bfb65afa3" - integrity sha512-OnQEA/1jj4jnQGukMtbcqg2HWlKve0BqzeS8T5O2VDL5zldFG9SXFF3LupC51nh96u97dcKZk5A6JL8oHwonog== +medusa-test-utils@1.1.37-dev-1650623081351: + version "1.1.37-dev-1650623081351" + resolved "http://localhost:4873/medusa-test-utils/-/medusa-test-utils-1.1.37-dev-1650623081351.tgz#2bc27ea2856c355316f71ee575b277897911d151" + integrity sha512-UUfjbj+DWUozo0Q2ozJVH5Lf5EZZOnVgGetKZ35WZ/0m7E7EFB4n3fxtMu/+c2xXdOCuWrE534XtojUXQp5xFw== dependencies: "@babel/plugin-transform-classes" "^7.9.5" - medusa-core-utils "1.1.31-dev-1652704115624" + medusa-core-utils "1.1.31-dev-1650623081351" randomatic "^3.1.1" merge-descriptors@1.0.1: diff --git a/packages/medusa/src/services/__tests__/event-bus.js b/packages/medusa/src/services/__tests__/event-bus.js index 9dfc2b5f81..83e59c77ce 100644 --- a/packages/medusa/src/services/__tests__/event-bus.js +++ b/packages/medusa/src/services/__tests__/event-bus.js @@ -32,7 +32,7 @@ describe("EventBusService", () => { }) afterAll(async () => { - await await eventBus.stopEnqueuer() + await eventBus.stopEnqueuer() }) it("creates bull queue", () => { @@ -64,7 +64,7 @@ describe("EventBusService", () => { }) it("added the subscriber to the queue", () => { - expect(eventBus.observers_["eventName"].length).toEqual(1) + expect(eventBus.observers_.get("eventName").length).toEqual(1) }) }) @@ -138,7 +138,7 @@ describe("EventBusService", () => { manager: MockManager, stagedJobRepository, logger: loggerMock, - }) + }, {}) eventBus.subscribe("eventName", () => Promise.resolve("hi")) result = await eventBus.worker_({ data: { eventName: "eventName", data: {} }, @@ -191,13 +191,13 @@ describe("EventBusService", () => { it("calls logger warn on rejections", () => { expect(loggerMock.warn).toHaveBeenCalledTimes(3) expect(loggerMock.warn).toHaveBeenCalledWith( - "An error occured while processing eventName: fail1" + "An error occurred while processing eventName: fail1" ) expect(loggerMock.warn).toHaveBeenCalledWith( - "An error occured while processing eventName: fail2" + "An error occurred while processing eventName: fail2" ) expect(loggerMock.warn).toHaveBeenCalledWith( - "An error occured while processing eventName: fail3" + "An error occurred while processing eventName: fail3" ) }) diff --git a/packages/medusa/src/services/event-bus.js b/packages/medusa/src/services/event-bus.js deleted file mode 100644 index 10b4e306df..0000000000 --- a/packages/medusa/src/services/event-bus.js +++ /dev/null @@ -1,293 +0,0 @@ -import Bull from "bull" -import Redis from "ioredis" - -/** - * Can keep track of multiple subscribers to different events and run the - * subscribers when events happen. Events will run asynchronously. - * @class - */ -class EventBusService { - constructor( - { manager, logger, stagedJobRepository, redisClient, redisSubscriber }, - config, - singleton = true - ) { - const opts = { - createClient: (type) => { - switch (type) { - case "client": - return redisClient - case "subscriber": - return redisSubscriber - default: - if (config.projectConfig.redis_url) { - return new Redis(config.projectConfig.redis_url) - } - return redisClient - } - }, - } - - this.config_ = config - - /** @private {EntityManager} */ - this.manager_ = manager - - /** @private {logger} */ - this.logger_ = logger - - this.stagedJobRepository_ = stagedJobRepository - - if (singleton) { - /** @private {object} */ - this.observers_ = {} - - /** @private {BullQueue} */ - this.queue_ = new Bull(`${this.constructor.name}:queue`, opts) - - /** @private {object} to handle cron jobs */ - this.cronHandlers_ = {} - - this.redisClient_ = redisClient - this.redisSubscriber_ = redisSubscriber - - /** @private {BullQueue} used for cron jobs */ - this.cronQueue_ = new Bull(`cron-jobs:queue`, opts) - - // Register our worker to handle emit calls - this.queue_.process(this.worker_) - - // Register cron worker - this.cronQueue_.process(this.cronWorker_) - - if (process.env.NODE_ENV !== "test") { - this.startEnqueuer() - } - } - } - - withTransaction(transactionManager) { - if (!transactionManager) { - return this - } - - const cloned = new EventBusService( - { - manager: transactionManager, - stagedJobRepository: this.stagedJobRepository_, - logger: this.logger_, - redisClient: this.redisClient_, - redisSubscriber: this.redisSubscriber_, - }, - this.config_, - false - ) - - cloned.transactionManager_ = transactionManager - cloned.queue_ = this.queue_ - - return cloned - } - - /** - * Adds a function to a list of event subscribers. - * @param {string} event - the event that the subscriber will listen for. - * @param {func} subscriber - the function to be called when a certain event - * happens. Subscribers must return a Promise. - */ - subscribe(event, subscriber) { - if (typeof subscriber !== "function") { - throw new Error("Subscriber must be a function") - } - - if (this.observers_[event]) { - this.observers_[event].push(subscriber) - } else { - this.observers_[event] = [subscriber] - } - } - - /** - * Adds a function to a list of event subscribers. - * @param {string} event - the event that the subscriber will listen for. - * @param {func} subscriber - the function to be called when a certain event - * happens. Subscribers must return a Promise. - */ - unsubscribe(event, subscriber) { - if (typeof subscriber !== "function") { - throw new Error("Subscriber must be a function") - } - - if (this.observers_[event]) { - const index = this.observers_[event].indexOf(subscriber) - if (index !== -1) { - this.observers_[event].splice(index, 1) - } - } - } - - /** - * Adds a function to a list of event subscribers. - * @param {string} event - the event that the subscriber will listen for. - * @param {func} subscriber - the function to be called when a certain event - * happens. Subscribers must return a Promise. - */ - registerCronHandler_(event, subscriber) { - if (typeof subscriber !== "function") { - throw new Error("Handler must be a function") - } - - if (this.observers_[event]) { - this.cronHandlers_[event].push(subscriber) - } else { - this.cronHandlers_[event] = [subscriber] - } - } - - /** - * Calls all subscribers when an event occurs. - * @param {string} eventName - the name of the event to be process. - * @param {?any} data - the data to send to the subscriber. - * @param {?any} options - options to add the job with - * @return {BullJob} - the job from our queue - */ - async emit(eventName, data, options = {}) { - if (this.transactionManager_) { - const stagedJobRepository = this.transactionManager_.getCustomRepository( - this.stagedJobRepository_ - ) - - const created = await stagedJobRepository.create({ - event_name: eventName, - data, - }) - - return stagedJobRepository.save(created) - } else { - const opts = { removeOnComplete: true } - if (typeof options.delay === "number") { - opts.delay = options.delay - } - this.queue_.add({ eventName, data }, opts) - } - } - - async sleep(ms) { - return new Promise((resolve) => { - setTimeout(resolve, ms) - }) - } - - async startEnqueuer() { - this.enRun_ = true - this.enqueue_ = this.enqueuer_() - } - - async stopEnqueuer() { - this.enRun_ = false - await this.enqueue_ - } - - async enqueuer_() { - while (this.enRun_) { - const listConfig = { - relations: [], - skip: 0, - take: 1000, - } - - const sjRepo = this.manager_.getCustomRepository( - this.stagedJobRepository_ - ) - const jobs = await sjRepo.find({}, listConfig) - - await Promise.all( - jobs.map((job) => { - this.queue_ - .add( - { eventName: job.event_name, data: job.data }, - { removeOnComplete: true } - ) - .then(async () => { - await sjRepo.remove(job) - }) - }) - ) - - await this.sleep(3000) - } - } - - /** - * Handles incoming jobs. - * @param {Object} job The job object - * @return {Promise} resolves to the results of the subscriber calls. - */ - worker_ = (job) => { - const { eventName, data } = job.data - const eventObservers = this.observers_[eventName] || [] - const wildcardObservers = this.observers_["*"] || [] - - const observers = eventObservers.concat(wildcardObservers) - - this.logger_.info( - `Processing ${eventName} which has ${eventObservers.length} subscribers` - ) - - return Promise.all( - observers.map((subscriber) => { - return subscriber(data, eventName).catch((err) => { - this.logger_.warn( - `An error occured while processing ${eventName}: ${err}` - ) - console.log(err) - return err - }) - }) - ) - } - - /** - * Handles incoming jobs. - * @param {Object} job The job object - * @return {Promise} resolves to the results of the subscriber calls. - */ - cronWorker_ = (job) => { - const { eventName, data } = job.data - const observers = this.cronHandlers_[eventName] || [] - this.logger_.info(`Processing cron job: ${eventName}`) - - return Promise.all( - observers.map((subscriber) => { - return subscriber(data, eventName).catch((err) => { - this.logger_.warn( - `An error occured while processing ${eventName}: ${err}` - ) - return err - }) - }) - ) - } - - /** - * Registers a cron job. - * @param {string} eventName - the name of the event - * @param {object} data - the data to be sent with the event - * @param {string} cron - the cron pattern - * @param {function} handler - the handler to call on each cron job - * @return {void} - */ - createCronJob(eventName, data, cron, handler) { - this.logger_.info(`Registering ${eventName}`) - this.registerCronHandler_(eventName, handler) - return this.cronQueue_.add( - { - eventName, - data, - }, - { repeat: { cron } } - ) - } -} - -export default EventBusService diff --git a/packages/medusa/src/services/event-bus.ts b/packages/medusa/src/services/event-bus.ts new file mode 100644 index 0000000000..4929949e7f --- /dev/null +++ b/packages/medusa/src/services/event-bus.ts @@ -0,0 +1,324 @@ +import Bull from "bull" +import Redis from "ioredis" +import { EntityManager } from "typeorm" +import { ConfigModule, Logger } from "../types/global" +import { StagedJobRepository } from "../repositories/staged-job" +import { StagedJob } from "../models" +import { sleep } from "../utils/sleep" + +type InjectedDependencies = { + manager: EntityManager + logger: Logger + stagedJobRepository: typeof StagedJobRepository + redisClient: Redis + redisSubscriber: Redis +} + +type Subscriber = (data: T, eventName: string) => Promise + +/** + * Can keep track of multiple subscribers to different events and run the + * subscribers when events happen. Events will run asynchronously. + */ +export default class EventBusService { + protected readonly config_: ConfigModule + protected readonly manager_: EntityManager + protected readonly logger_: Logger + protected readonly stagedJobRepository_: typeof StagedJobRepository + protected readonly observers_: Map + protected readonly cronHandlers_: Map + protected readonly redisClient_: Redis + protected readonly redisSubscriber_: Redis + protected readonly cronQueue_: Bull + protected queue_: Bull + protected shouldEnqueuerRun: boolean + protected transactionManager_: EntityManager | undefined + protected enqueue_: Promise + + constructor( + { + manager, + logger, + stagedJobRepository, + redisClient, + redisSubscriber, + }: InjectedDependencies, + config: ConfigModule, + singleton = true + ) { + const opts = { + createClient: (type: string): Redis => { + switch (type) { + case "client": + return redisClient + case "subscriber": + return redisSubscriber + default: + if (config.projectConfig.redis_url) { + return new Redis(config.projectConfig.redis_url) + } + return redisClient + } + }, + } + + this.config_ = config + this.manager_ = manager + this.logger_ = logger + this.stagedJobRepository_ = stagedJobRepository + + if (singleton) { + this.observers_ = new Map() + this.queue_ = new Bull(`${this.constructor.name}:queue`, opts) + this.cronHandlers_ = new Map() + this.redisClient_ = redisClient + this.redisSubscriber_ = redisSubscriber + this.cronQueue_ = new Bull(`cron-jobs:queue`, opts) + // Register our worker to handle emit calls + this.queue_.process(this.worker_) + // Register cron worker + this.cronQueue_.process(this.cronWorker_) + + if (process.env.NODE_ENV !== "test") { + this.startEnqueuer() + } + } + } + + withTransaction(transactionManager): this | EventBusService { + if (!transactionManager) { + return this + } + + const cloned = new EventBusService( + { + manager: transactionManager, + stagedJobRepository: this.stagedJobRepository_, + logger: this.logger_, + redisClient: this.redisClient_, + redisSubscriber: this.redisSubscriber_, + }, + this.config_, + false + ) + + cloned.transactionManager_ = transactionManager + cloned.queue_ = this.queue_ + + return cloned + } + + /** + * Adds a function to a list of event subscribers. + * @param event - the event that the subscriber will listen for. + * @param subscriber - the function to be called when a certain event + * happens. Subscribers must return a Promise. + * @return this + */ + subscribe(event: string | symbol, subscriber: Subscriber): this { + if (typeof subscriber !== "function") { + throw new Error("Subscriber must be a function") + } + + const observers = this.observers_.get(event) ?? [] + this.observers_.set(event, [...observers, subscriber]) + + return this + } + + /** + * Adds a function to a list of event subscribers. + * @param event - the event that the subscriber will listen for. + * @param subscriber - the function to be called when a certain event + * happens. Subscribers must return a Promise. + * @return this + */ + unsubscribe(event: string | symbol, subscriber: Subscriber): this { + if (typeof subscriber !== "function") { + throw new Error("Subscriber must be a function") + } + + if (this.observers_.get(event)?.length) { + const index = this.observers_.get(event)?.indexOf(subscriber) + if (index !== -1) { + this.observers_.get(event)?.splice(index as number, 1) + } + } + + return this + } + + /** + * Adds a function to a list of event subscribers. + * @param event - the event that the subscriber will listen for. + * @param subscriber - the function to be called when a certain event + * happens. Subscribers must return a Promise. + * @return this + */ + protected registerCronHandler_( + event: string | symbol, + subscriber: Subscriber + ): this { + if (typeof subscriber !== "function") { + throw new Error("Handler must be a function") + } + + const cronHandlers = this.cronHandlers_.get(event) ?? [] + this.cronHandlers_.set(event, [...cronHandlers, subscriber]) + + return this + } + + /** + * Calls all subscribers when an event occurs. + * @param {string} eventName - the name of the event to be process. + * @param data - the data to send to the subscriber. + * @param options - options to add the job with + * @return the job from our queue + */ + async emit( + eventName: string, + data: T, + options: { delay?: number } = {} + ): Promise { + if (this.transactionManager_) { + const stagedJobRepository = this.transactionManager_.getCustomRepository( + this.stagedJobRepository_ + ) + + const stagedJobInstance = stagedJobRepository.create({ + event_name: eventName, + data, + }) + return await stagedJobRepository.save(stagedJobInstance) + } else { + const opts: { removeOnComplete: boolean; delay?: number } = { + removeOnComplete: true, + } + if (typeof options.delay === "number") { + opts.delay = options.delay + } + this.queue_.add({ eventName, data }, opts) + } + } + + startEnqueuer(): void { + this.shouldEnqueuerRun = true + this.enqueue_ = this.enqueuer_() + } + + async stopEnqueuer(): Promise { + this.shouldEnqueuerRun = false + await this.enqueue_ + } + + async enqueuer_(): Promise { + while (this.shouldEnqueuerRun) { + const listConfig = { + relations: [], + skip: 0, + take: 1000, + } + + const stagedJobRepo = this.manager_.getCustomRepository( + this.stagedJobRepository_ + ) + const jobs = await stagedJobRepo.find(listConfig) + + await Promise.all( + jobs.map((job) => { + this.queue_ + .add( + { eventName: job.event_name, data: job.data }, + { removeOnComplete: true } + ) + .then(async () => { + await stagedJobRepo.remove(job) + }) + }) + ) + + await sleep(3000) + } + } + + /** + * Handles incoming jobs. + * @param job The job object + * @return resolves to the results of the subscriber calls. + */ + worker_ = async (job: { + data: { eventName: string; data: T } + }): Promise => { + const { eventName, data } = job.data + const eventObservers = this.observers_.get(eventName) || [] + const wildcardObservers = this.observers_.get("*") || [] + + const observers = eventObservers.concat(wildcardObservers) + + this.logger_.info( + `Processing ${eventName} which has ${eventObservers.length} subscribers` + ) + + return await Promise.all( + observers.map((subscriber) => { + return subscriber(data, eventName).catch((err) => { + this.logger_.warn( + `An error occurred while processing ${eventName}: ${err}` + ) + console.error(err) + return err + }) + }) + ) + } + + /** + * Handles incoming jobs. + * @param job The job object + * @return resolves to the results of the subscriber calls. + */ + cronWorker_ = async (job: { + data: { eventName: string; data: T } + }): Promise => { + const { eventName, data } = job.data + const observers = this.cronHandlers_.get(eventName) || [] + this.logger_.info(`Processing cron job: ${eventName}`) + + return await Promise.all( + observers.map((subscriber) => { + return subscriber(data, eventName).catch((err) => { + this.logger_.warn( + `An error occured while processing ${eventName}: ${err}` + ) + return err + }) + }) + ) + } + + /** + * Registers a cron job. + * @param eventName - the name of the event + * @param data - the data to be sent with the event + * @param cron - the cron pattern + * @param handler - the handler to call on each cron job + * @return void + */ + createCronJob( + eventName: string, + data: T, + cron: string, + handler: Subscriber + ): void { + this.logger_.info(`Registering ${eventName}`) + this.registerCronHandler_(eventName, handler) + return this.cronQueue_.add( + { + eventName, + data, + }, + { repeat: { cron } } + ) + } +} diff --git a/packages/medusa/src/services/product-variant.ts b/packages/medusa/src/services/product-variant.ts index 2d8f6cf4af..db26f22a07 100644 --- a/packages/medusa/src/services/product-variant.ts +++ b/packages/medusa/src/services/product-variant.ts @@ -17,8 +17,8 @@ import { FindWithRelationsOptions, ProductVariantRepository, } from "../repositories/product-variant" -import EventBusService from "../services/event-bus" -import RegionService from "../services/region" +import EventBusService from "./event-bus" +import RegionService from "./region" import { FindConfig } from "../types/common" import { CreateProductVariantInput, diff --git a/packages/medusa/src/types/global.ts b/packages/medusa/src/types/global.ts index 048df9f459..25b9388560 100644 --- a/packages/medusa/src/types/global.ts +++ b/packages/medusa/src/types/global.ts @@ -23,6 +23,8 @@ export type MedusaContainer = AwilixContainer & { export type Logger = _Logger & { progress: (activityId: string, msg: string) => void + info: (msg: string) => void + warn: (msg: string) => void } export type ConfigModule = { diff --git a/packages/medusa/src/utils/sleep.ts b/packages/medusa/src/utils/sleep.ts new file mode 100644 index 0000000000..f294976b85 --- /dev/null +++ b/packages/medusa/src/utils/sleep.ts @@ -0,0 +1,5 @@ +export async function sleep(ms: number) { + return new Promise((resolve) => { + setTimeout(resolve, ms) + }) +}