refactor(medusa): Refactor and fix EventBusService (#1377)

This commit is contained in:
Adrien de Peretti
2022-06-09 18:18:22 +02:00
committed by GitHub
parent 78bd61abe1
commit 5172b21d09
8 changed files with 376 additions and 353 deletions

View File

@@ -8,16 +8,16 @@
"build": "babel src -d dist --extensions \".ts,.js\""
},
"dependencies": {
"@medusajs/medusa": "1.3.0-dev-1652704115624",
"@medusajs/medusa": "1.2.1-dev-1650623081351",
"faker": "^5.5.3",
"medusa-interfaces": "1.3.0-dev-1652704115624",
"medusa-interfaces": "1.2.1-dev-1650623081351",
"typeorm": "^0.2.31"
},
"devDependencies": {
"@babel/cli": "^7.12.10",
"@babel/core": "^7.12.10",
"@babel/node": "^7.12.10",
"babel-preset-medusa-package": "1.1.19-dev-1652704115624",
"babel-preset-medusa-package": "1.1.19-dev-1650623081351",
"jest": "^26.6.3"
}
}

View File

@@ -1312,25 +1312,10 @@
"@jridgewell/resolve-uri" "^3.0.3"
"@jridgewell/sourcemap-codec" "^1.4.10"
"@mapbox/node-pre-gyp@^1.0.0":
version "1.0.9"
resolved "http://localhost:4873/@mapbox%2fnode-pre-gyp/-/node-pre-gyp-1.0.9.tgz#09a8781a3a036151cdebbe8719d6f8b25d4058bc"
integrity sha512-aDF3S3rK9Q2gey/WAttUlISduDItz5BU3306M9Eyv6/oS40aMprnopshtlKTykxRNIBEZuRMaZAnbrQ4QtKGyw==
dependencies:
detect-libc "^2.0.0"
https-proxy-agent "^5.0.0"
make-dir "^3.1.0"
node-fetch "^2.6.7"
nopt "^5.0.0"
npmlog "^5.0.1"
rimraf "^3.0.2"
semver "^7.3.5"
tar "^6.1.11"
"@medusajs/medusa-cli@1.3.0-dev-1652704115624":
version "1.3.0-dev-1652704115624"
resolved "http://localhost:4873/@medusajs%2fmedusa-cli/-/medusa-cli-1.3.0-dev-1652704115624.tgz#9841fcc6123cd9c72d544d48316d66cf49ad8dc0"
integrity sha512-Lhk6pdvgv4UrLLUY/aYuty3TKfgDlvSUfmG4cTgZXbLehnRSbLelPcX+YKCtiM4d0NYfgF364H7p0NAMuCkBIg==
"@medusajs/medusa-cli@1.2.1-dev-1650623081351":
version "1.2.1-dev-1650623081351"
resolved "http://localhost:4873/@medusajs%2fmedusa-cli/-/medusa-cli-1.2.1-dev-1650623081351.tgz#4c02a0989ecb2dda97839c5a4d9f70ce2c5fda6c"
integrity sha512-afZmMq3Z6WTO77wnbQkdeq7VUxT+6bth7n5Kld3beK+4YBY7DHMTVHs+GoHxagpTBOA6jdvqBXT1llKl/UMT6Q==
dependencies:
"@babel/polyfill" "^7.8.7"
"@babel/runtime" "^7.9.6"
@@ -1348,8 +1333,8 @@
is-valid-path "^0.1.1"
joi-objectid "^3.0.1"
meant "^1.0.1"
medusa-core-utils "1.1.31-dev-1652704115624"
medusa-telemetry "0.0.11-dev-1652704115624"
medusa-core-utils "1.1.31-dev-1650623081351"
medusa-telemetry "0.0.11-dev-1650623081351"
netrc-parser "^3.1.6"
open "^8.0.6"
ora "^5.4.1"
@@ -1363,13 +1348,13 @@
winston "^3.3.3"
yargs "^15.3.1"
"@medusajs/medusa@1.3.0-dev-1652704115624":
version "1.3.0-dev-1652704115624"
resolved "http://localhost:4873/@medusajs%2fmedusa/-/medusa-1.3.0-dev-1652704115624.tgz#040eede718d0eec6b01b4471e9af8495314c3ff0"
integrity sha512-x2Wg7lP5A25NMENqcZoC00O46Y3ojLaSnxa9L6E3u375nYWa99uxyT8ckXhklVqzhM3iBtReb/20H4r2+niwmw==
"@medusajs/medusa@1.2.1-dev-1650623081351":
version "1.2.1-dev-1650623081351"
resolved "http://localhost:4873/@medusajs%2fmedusa/-/medusa-1.2.1-dev-1650623081351.tgz#9e887b9d8c396e08d04597b3cb0faf61e702d745"
integrity sha512-1PNRQcqeKgSQ+LWRLGD9TkNX0csXKk+eKISw5MJKW4U0WB1fyi5TVAqtEA2mmWKMQDc0sq0H3DwaOBrwtTL/XA==
dependencies:
"@hapi/joi" "^16.1.8"
"@medusajs/medusa-cli" "1.3.0-dev-1652704115624"
"@medusajs/medusa-cli" "1.2.1-dev-1650623081351"
"@types/lodash" "^4.14.168"
awilix "^4.2.3"
body-parser "^1.19.0"
@@ -1392,8 +1377,8 @@
joi "^17.3.0"
joi-objectid "^3.0.1"
jsonwebtoken "^8.5.1"
medusa-core-utils "1.1.31-dev-1652704115624"
medusa-test-utils "1.1.37-dev-1652704115624"
medusa-core-utils "1.1.31-dev-1650623081351"
medusa-test-utils "1.1.37-dev-1650623081351"
morgan "^1.9.1"
multer "^1.4.2"
passport "^0.4.0"
@@ -2039,10 +2024,10 @@ babel-preset-jest@^26.6.2:
babel-plugin-jest-hoist "^26.6.2"
babel-preset-current-node-syntax "^1.0.0"
babel-preset-medusa-package@1.1.19-dev-1652704115624:
version "1.1.19-dev-1652704115624"
resolved "http://localhost:4873/babel-preset-medusa-package/-/babel-preset-medusa-package-1.1.19-dev-1652704115624.tgz#fa584e39e7c0a1b808af25953f81653252225be7"
integrity sha512-Nv8Si592nO+UZmMQuOGCs1pFs5ShIsCKclbqBybahjGZIPPD6bVNaz9dujSaZnlxvA9a3gn4dvuh3H8IRitUug==
babel-preset-medusa-package@1.1.19-dev-1650623081351:
version "1.1.19-dev-1650623081351"
resolved "http://localhost:4873/babel-preset-medusa-package/-/babel-preset-medusa-package-1.1.19-dev-1650623081351.tgz#bd46931534637e9b6a6c37a8fd740622967a7258"
integrity sha512-eL/xbx7BG1Yrx2WfvdDsZknR94okk1VPigp54HYEXI8kZu/NhzZlayMSTMFiOrF4NvnW0i/3DS9pUcxdG2gQEg==
dependencies:
"@babel/plugin-proposal-class-properties" "^7.12.1"
"@babel/plugin-proposal-decorators" "^7.12.1"
@@ -5168,23 +5153,23 @@ media-typer@0.3.0:
resolved "http://localhost:4873/media-typer/-/media-typer-0.3.0.tgz#8710d7af0aa626f8fffa1ce00168545263255748"
integrity sha1-hxDXrwqmJvj/+hzgAWhUUmMlV0g=
medusa-core-utils@1.1.31-dev-1652704115624:
version "1.1.31-dev-1652704115624"
resolved "http://localhost:4873/medusa-core-utils/-/medusa-core-utils-1.1.31-dev-1652704115624.tgz#cda24aa1a292a1bd0a770774aae9a970fc7ab963"
integrity sha512-evkWva10x6JaHBex5S8Zi9Ae/ZStG858nb18w5ITxbCHWvGYq6/VyLpo4vB7I6wuU3z6kDTMd/yyvsVXv33O2g==
medusa-core-utils@1.1.31-dev-1650623081351:
version "1.1.31-dev-1650623081351"
resolved "http://localhost:4873/medusa-core-utils/-/medusa-core-utils-1.1.31-dev-1650623081351.tgz#ac70dfe1bf7fac52fa1d08a49bba7b522f1073f6"
integrity sha512-PMqWGob/v+nQfkiUJyA8NfyosZIgbBAXwHrr61XSJWECtnwNYA/VoCXsqJwwnluE3mHbX1ePh5dfzHQ/FS03+g==
dependencies:
joi "^17.3.0"
joi-objectid "^3.0.1"
medusa-interfaces@1.3.0-dev-1652704115624:
version "1.3.0-dev-1652704115624"
resolved "http://localhost:4873/medusa-interfaces/-/medusa-interfaces-1.3.0-dev-1652704115624.tgz#02dbbda562f99e7de96e7e6657a6af2a20855e36"
integrity sha512-aSm6gYWF0gPHoIswHllB/YrGwkrYRr4ZrBmWh3QSulDYnjmtyBFHKkatrC1pscAEFG/5h4d1LEENUHYrBQ4tQg==
medusa-interfaces@1.2.1-dev-1650623081351:
version "1.2.1-dev-1650623081351"
resolved "http://localhost:4873/medusa-interfaces/-/medusa-interfaces-1.2.1-dev-1650623081351.tgz#1d9e2f924c79ff256c1e49693ac38b992415fc9e"
integrity sha512-z8ESArLGSIfOwQ1MwgG3BmFkAA0FTSZFjJ313FCbWAnxY7y9KqmMWhKGJd2keybafrm9OghTZknuQhMCbgwkcw==
medusa-telemetry@0.0.11-dev-1652704115624:
version "0.0.11-dev-1652704115624"
resolved "http://localhost:4873/medusa-telemetry/-/medusa-telemetry-0.0.11-dev-1652704115624.tgz#4de0f4a66a9f8bb4f61d8b7c24aa9cbd719039a7"
integrity sha512-sfCzUM4mlXpBJVaqAycKAmeEWalzyg18+gUo+As/mxoEUIUR5XtpZ5Bdx484/2GUvRP8OdOYC1uPpbuTZbLXcw==
medusa-telemetry@0.0.11-dev-1650623081351:
version "0.0.11-dev-1650623081351"
resolved "http://localhost:4873/medusa-telemetry/-/medusa-telemetry-0.0.11-dev-1650623081351.tgz#6c672cb98c4f24ce8d0722020aae1b2ed75d402e"
integrity sha512-d4ul1DZwPRhDk4tgODiCybPF7nms7QWfNj8g7Jx3yjFVV3o4GK4vl3ui5E21SMyqKMsGX3WtUeRInQ+qWWiOiA==
dependencies:
axios "^0.21.1"
axios-retry "^3.1.9"
@@ -5196,13 +5181,13 @@ medusa-telemetry@0.0.11-dev-1652704115624:
remove-trailing-slash "^0.1.1"
uuid "^8.3.2"
medusa-test-utils@1.1.37-dev-1652704115624:
version "1.1.37-dev-1652704115624"
resolved "http://localhost:4873/medusa-test-utils/-/medusa-test-utils-1.1.37-dev-1652704115624.tgz#7118753a4afd1c6ed6f1d1ceb75fdf9bfb65afa3"
integrity sha512-OnQEA/1jj4jnQGukMtbcqg2HWlKve0BqzeS8T5O2VDL5zldFG9SXFF3LupC51nh96u97dcKZk5A6JL8oHwonog==
medusa-test-utils@1.1.37-dev-1650623081351:
version "1.1.37-dev-1650623081351"
resolved "http://localhost:4873/medusa-test-utils/-/medusa-test-utils-1.1.37-dev-1650623081351.tgz#2bc27ea2856c355316f71ee575b277897911d151"
integrity sha512-UUfjbj+DWUozo0Q2ozJVH5Lf5EZZOnVgGetKZ35WZ/0m7E7EFB4n3fxtMu/+c2xXdOCuWrE534XtojUXQp5xFw==
dependencies:
"@babel/plugin-transform-classes" "^7.9.5"
medusa-core-utils "1.1.31-dev-1652704115624"
medusa-core-utils "1.1.31-dev-1650623081351"
randomatic "^3.1.1"
merge-descriptors@1.0.1:

View File

@@ -32,7 +32,7 @@ describe("EventBusService", () => {
})
afterAll(async () => {
await await eventBus.stopEnqueuer()
await eventBus.stopEnqueuer()
})
it("creates bull queue", () => {
@@ -64,7 +64,7 @@ describe("EventBusService", () => {
})
it("added the subscriber to the queue", () => {
expect(eventBus.observers_["eventName"].length).toEqual(1)
expect(eventBus.observers_.get("eventName").length).toEqual(1)
})
})
@@ -138,7 +138,7 @@ describe("EventBusService", () => {
manager: MockManager,
stagedJobRepository,
logger: loggerMock,
})
}, {})
eventBus.subscribe("eventName", () => Promise.resolve("hi"))
result = await eventBus.worker_({
data: { eventName: "eventName", data: {} },
@@ -191,13 +191,13 @@ describe("EventBusService", () => {
it("calls logger warn on rejections", () => {
expect(loggerMock.warn).toHaveBeenCalledTimes(3)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occured while processing eventName: fail1"
"An error occurred while processing eventName: fail1"
)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occured while processing eventName: fail2"
"An error occurred while processing eventName: fail2"
)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occured while processing eventName: fail3"
"An error occurred while processing eventName: fail3"
)
})

View File

@@ -1,293 +0,0 @@
import Bull from "bull"
import Redis from "ioredis"
/**
* Can keep track of multiple subscribers to different events and run the
* subscribers when events happen. Events will run asynchronously.
* @class
*/
class EventBusService {
constructor(
{ manager, logger, stagedJobRepository, redisClient, redisSubscriber },
config,
singleton = true
) {
const opts = {
createClient: (type) => {
switch (type) {
case "client":
return redisClient
case "subscriber":
return redisSubscriber
default:
if (config.projectConfig.redis_url) {
return new Redis(config.projectConfig.redis_url)
}
return redisClient
}
},
}
this.config_ = config
/** @private {EntityManager} */
this.manager_ = manager
/** @private {logger} */
this.logger_ = logger
this.stagedJobRepository_ = stagedJobRepository
if (singleton) {
/** @private {object} */
this.observers_ = {}
/** @private {BullQueue} */
this.queue_ = new Bull(`${this.constructor.name}:queue`, opts)
/** @private {object} to handle cron jobs */
this.cronHandlers_ = {}
this.redisClient_ = redisClient
this.redisSubscriber_ = redisSubscriber
/** @private {BullQueue} used for cron jobs */
this.cronQueue_ = new Bull(`cron-jobs:queue`, opts)
// Register our worker to handle emit calls
this.queue_.process(this.worker_)
// Register cron worker
this.cronQueue_.process(this.cronWorker_)
if (process.env.NODE_ENV !== "test") {
this.startEnqueuer()
}
}
}
withTransaction(transactionManager) {
if (!transactionManager) {
return this
}
const cloned = new EventBusService(
{
manager: transactionManager,
stagedJobRepository: this.stagedJobRepository_,
logger: this.logger_,
redisClient: this.redisClient_,
redisSubscriber: this.redisSubscriber_,
},
this.config_,
false
)
cloned.transactionManager_ = transactionManager
cloned.queue_ = this.queue_
return cloned
}
/**
* Adds a function to a list of event subscribers.
* @param {string} event - the event that the subscriber will listen for.
* @param {func} subscriber - the function to be called when a certain event
* happens. Subscribers must return a Promise.
*/
subscribe(event, subscriber) {
if (typeof subscriber !== "function") {
throw new Error("Subscriber must be a function")
}
if (this.observers_[event]) {
this.observers_[event].push(subscriber)
} else {
this.observers_[event] = [subscriber]
}
}
/**
* Adds a function to a list of event subscribers.
* @param {string} event - the event that the subscriber will listen for.
* @param {func} subscriber - the function to be called when a certain event
* happens. Subscribers must return a Promise.
*/
unsubscribe(event, subscriber) {
if (typeof subscriber !== "function") {
throw new Error("Subscriber must be a function")
}
if (this.observers_[event]) {
const index = this.observers_[event].indexOf(subscriber)
if (index !== -1) {
this.observers_[event].splice(index, 1)
}
}
}
/**
* Adds a function to a list of event subscribers.
* @param {string} event - the event that the subscriber will listen for.
* @param {func} subscriber - the function to be called when a certain event
* happens. Subscribers must return a Promise.
*/
registerCronHandler_(event, subscriber) {
if (typeof subscriber !== "function") {
throw new Error("Handler must be a function")
}
if (this.observers_[event]) {
this.cronHandlers_[event].push(subscriber)
} else {
this.cronHandlers_[event] = [subscriber]
}
}
/**
* Calls all subscribers when an event occurs.
* @param {string} eventName - the name of the event to be process.
* @param {?any} data - the data to send to the subscriber.
* @param {?any} options - options to add the job with
* @return {BullJob} - the job from our queue
*/
async emit(eventName, data, options = {}) {
if (this.transactionManager_) {
const stagedJobRepository = this.transactionManager_.getCustomRepository(
this.stagedJobRepository_
)
const created = await stagedJobRepository.create({
event_name: eventName,
data,
})
return stagedJobRepository.save(created)
} else {
const opts = { removeOnComplete: true }
if (typeof options.delay === "number") {
opts.delay = options.delay
}
this.queue_.add({ eventName, data }, opts)
}
}
async sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms)
})
}
async startEnqueuer() {
this.enRun_ = true
this.enqueue_ = this.enqueuer_()
}
async stopEnqueuer() {
this.enRun_ = false
await this.enqueue_
}
async enqueuer_() {
while (this.enRun_) {
const listConfig = {
relations: [],
skip: 0,
take: 1000,
}
const sjRepo = this.manager_.getCustomRepository(
this.stagedJobRepository_
)
const jobs = await sjRepo.find({}, listConfig)
await Promise.all(
jobs.map((job) => {
this.queue_
.add(
{ eventName: job.event_name, data: job.data },
{ removeOnComplete: true }
)
.then(async () => {
await sjRepo.remove(job)
})
})
)
await this.sleep(3000)
}
}
/**
* Handles incoming jobs.
* @param {Object} job The job object
* @return {Promise} resolves to the results of the subscriber calls.
*/
worker_ = (job) => {
const { eventName, data } = job.data
const eventObservers = this.observers_[eventName] || []
const wildcardObservers = this.observers_["*"] || []
const observers = eventObservers.concat(wildcardObservers)
this.logger_.info(
`Processing ${eventName} which has ${eventObservers.length} subscribers`
)
return Promise.all(
observers.map((subscriber) => {
return subscriber(data, eventName).catch((err) => {
this.logger_.warn(
`An error occured while processing ${eventName}: ${err}`
)
console.log(err)
return err
})
})
)
}
/**
* Handles incoming jobs.
* @param {Object} job The job object
* @return {Promise} resolves to the results of the subscriber calls.
*/
cronWorker_ = (job) => {
const { eventName, data } = job.data
const observers = this.cronHandlers_[eventName] || []
this.logger_.info(`Processing cron job: ${eventName}`)
return Promise.all(
observers.map((subscriber) => {
return subscriber(data, eventName).catch((err) => {
this.logger_.warn(
`An error occured while processing ${eventName}: ${err}`
)
return err
})
})
)
}
/**
* Registers a cron job.
* @param {string} eventName - the name of the event
* @param {object} data - the data to be sent with the event
* @param {string} cron - the cron pattern
* @param {function} handler - the handler to call on each cron job
* @return {void}
*/
createCronJob(eventName, data, cron, handler) {
this.logger_.info(`Registering ${eventName}`)
this.registerCronHandler_(eventName, handler)
return this.cronQueue_.add(
{
eventName,
data,
},
{ repeat: { cron } }
)
}
}
export default EventBusService

View File

@@ -0,0 +1,324 @@
import Bull from "bull"
import Redis from "ioredis"
import { EntityManager } from "typeorm"
import { ConfigModule, Logger } from "../types/global"
import { StagedJobRepository } from "../repositories/staged-job"
import { StagedJob } from "../models"
import { sleep } from "../utils/sleep"
type InjectedDependencies = {
manager: EntityManager
logger: Logger
stagedJobRepository: typeof StagedJobRepository
redisClient: Redis
redisSubscriber: Redis
}
type Subscriber<T = unknown> = (data: T, eventName: string) => Promise<void>
/**
* Can keep track of multiple subscribers to different events and run the
* subscribers when events happen. Events will run asynchronously.
*/
export default class EventBusService {
protected readonly config_: ConfigModule
protected readonly manager_: EntityManager
protected readonly logger_: Logger
protected readonly stagedJobRepository_: typeof StagedJobRepository
protected readonly observers_: Map<string | symbol, Subscriber[]>
protected readonly cronHandlers_: Map<string | symbol, Subscriber[]>
protected readonly redisClient_: Redis
protected readonly redisSubscriber_: Redis
protected readonly cronQueue_: Bull
protected queue_: Bull
protected shouldEnqueuerRun: boolean
protected transactionManager_: EntityManager | undefined
protected enqueue_: Promise<void>
constructor(
{
manager,
logger,
stagedJobRepository,
redisClient,
redisSubscriber,
}: InjectedDependencies,
config: ConfigModule,
singleton = true
) {
const opts = {
createClient: (type: string): Redis => {
switch (type) {
case "client":
return redisClient
case "subscriber":
return redisSubscriber
default:
if (config.projectConfig.redis_url) {
return new Redis(config.projectConfig.redis_url)
}
return redisClient
}
},
}
this.config_ = config
this.manager_ = manager
this.logger_ = logger
this.stagedJobRepository_ = stagedJobRepository
if (singleton) {
this.observers_ = new Map()
this.queue_ = new Bull(`${this.constructor.name}:queue`, opts)
this.cronHandlers_ = new Map()
this.redisClient_ = redisClient
this.redisSubscriber_ = redisSubscriber
this.cronQueue_ = new Bull(`cron-jobs:queue`, opts)
// Register our worker to handle emit calls
this.queue_.process(this.worker_)
// Register cron worker
this.cronQueue_.process(this.cronWorker_)
if (process.env.NODE_ENV !== "test") {
this.startEnqueuer()
}
}
}
withTransaction(transactionManager): this | EventBusService {
if (!transactionManager) {
return this
}
const cloned = new EventBusService(
{
manager: transactionManager,
stagedJobRepository: this.stagedJobRepository_,
logger: this.logger_,
redisClient: this.redisClient_,
redisSubscriber: this.redisSubscriber_,
},
this.config_,
false
)
cloned.transactionManager_ = transactionManager
cloned.queue_ = this.queue_
return cloned
}
/**
* Adds a function to a list of event subscribers.
* @param event - the event that the subscriber will listen for.
* @param subscriber - the function to be called when a certain event
* happens. Subscribers must return a Promise.
* @return this
*/
subscribe(event: string | symbol, subscriber: Subscriber): this {
if (typeof subscriber !== "function") {
throw new Error("Subscriber must be a function")
}
const observers = this.observers_.get(event) ?? []
this.observers_.set(event, [...observers, subscriber])
return this
}
/**
* Adds a function to a list of event subscribers.
* @param event - the event that the subscriber will listen for.
* @param subscriber - the function to be called when a certain event
* happens. Subscribers must return a Promise.
* @return this
*/
unsubscribe(event: string | symbol, subscriber: Subscriber): this {
if (typeof subscriber !== "function") {
throw new Error("Subscriber must be a function")
}
if (this.observers_.get(event)?.length) {
const index = this.observers_.get(event)?.indexOf(subscriber)
if (index !== -1) {
this.observers_.get(event)?.splice(index as number, 1)
}
}
return this
}
/**
* Adds a function to a list of event subscribers.
* @param event - the event that the subscriber will listen for.
* @param subscriber - the function to be called when a certain event
* happens. Subscribers must return a Promise.
* @return this
*/
protected registerCronHandler_(
event: string | symbol,
subscriber: Subscriber
): this {
if (typeof subscriber !== "function") {
throw new Error("Handler must be a function")
}
const cronHandlers = this.cronHandlers_.get(event) ?? []
this.cronHandlers_.set(event, [...cronHandlers, subscriber])
return this
}
/**
* Calls all subscribers when an event occurs.
* @param {string} eventName - the name of the event to be process.
* @param data - the data to send to the subscriber.
* @param options - options to add the job with
* @return the job from our queue
*/
async emit<T>(
eventName: string,
data: T,
options: { delay?: number } = {}
): Promise<StagedJob | void> {
if (this.transactionManager_) {
const stagedJobRepository = this.transactionManager_.getCustomRepository(
this.stagedJobRepository_
)
const stagedJobInstance = stagedJobRepository.create({
event_name: eventName,
data,
})
return await stagedJobRepository.save(stagedJobInstance)
} else {
const opts: { removeOnComplete: boolean; delay?: number } = {
removeOnComplete: true,
}
if (typeof options.delay === "number") {
opts.delay = options.delay
}
this.queue_.add({ eventName, data }, opts)
}
}
startEnqueuer(): void {
this.shouldEnqueuerRun = true
this.enqueue_ = this.enqueuer_()
}
async stopEnqueuer(): Promise<void> {
this.shouldEnqueuerRun = false
await this.enqueue_
}
async enqueuer_(): Promise<void> {
while (this.shouldEnqueuerRun) {
const listConfig = {
relations: [],
skip: 0,
take: 1000,
}
const stagedJobRepo = this.manager_.getCustomRepository(
this.stagedJobRepository_
)
const jobs = await stagedJobRepo.find(listConfig)
await Promise.all(
jobs.map((job) => {
this.queue_
.add(
{ eventName: job.event_name, data: job.data },
{ removeOnComplete: true }
)
.then(async () => {
await stagedJobRepo.remove(job)
})
})
)
await sleep(3000)
}
}
/**
* Handles incoming jobs.
* @param job The job object
* @return resolves to the results of the subscriber calls.
*/
worker_ = async <T>(job: {
data: { eventName: string; data: T }
}): Promise<unknown[]> => {
const { eventName, data } = job.data
const eventObservers = this.observers_.get(eventName) || []
const wildcardObservers = this.observers_.get("*") || []
const observers = eventObservers.concat(wildcardObservers)
this.logger_.info(
`Processing ${eventName} which has ${eventObservers.length} subscribers`
)
return await Promise.all(
observers.map((subscriber) => {
return subscriber(data, eventName).catch((err) => {
this.logger_.warn(
`An error occurred while processing ${eventName}: ${err}`
)
console.error(err)
return err
})
})
)
}
/**
* Handles incoming jobs.
* @param job The job object
* @return resolves to the results of the subscriber calls.
*/
cronWorker_ = async <T>(job: {
data: { eventName: string; data: T }
}): Promise<unknown[]> => {
const { eventName, data } = job.data
const observers = this.cronHandlers_.get(eventName) || []
this.logger_.info(`Processing cron job: ${eventName}`)
return await Promise.all(
observers.map((subscriber) => {
return subscriber(data, eventName).catch((err) => {
this.logger_.warn(
`An error occured while processing ${eventName}: ${err}`
)
return err
})
})
)
}
/**
* Registers a cron job.
* @param eventName - the name of the event
* @param data - the data to be sent with the event
* @param cron - the cron pattern
* @param handler - the handler to call on each cron job
* @return void
*/
createCronJob<T>(
eventName: string,
data: T,
cron: string,
handler: Subscriber
): void {
this.logger_.info(`Registering ${eventName}`)
this.registerCronHandler_(eventName, handler)
return this.cronQueue_.add(
{
eventName,
data,
},
{ repeat: { cron } }
)
}
}

View File

@@ -17,8 +17,8 @@ import {
FindWithRelationsOptions,
ProductVariantRepository,
} from "../repositories/product-variant"
import EventBusService from "../services/event-bus"
import RegionService from "../services/region"
import EventBusService from "./event-bus"
import RegionService from "./region"
import { FindConfig } from "../types/common"
import {
CreateProductVariantInput,

View File

@@ -23,6 +23,8 @@ export type MedusaContainer = AwilixContainer & {
export type Logger = _Logger & {
progress: (activityId: string, msg: string) => void
info: (msg: string) => void
warn: (msg: string) => void
}
export type ConfigModule = {

View File

@@ -0,0 +1,5 @@
export async function sleep(ms: number) {
return new Promise((resolve) => {
setTimeout(resolve, ms)
})
}