chore(framework): Move and improve subscriber loader (#8347)

**What**
Move `SubscriberLoader` and improve implementation

FIXES FRMW-2635
This commit is contained in:
Adrien de Peretti
2024-07-30 14:54:31 +02:00
committed by GitHub
parent 169953ad1e
commit a9fea986b0
17 changed files with 158 additions and 202 deletions

View File

@@ -32,7 +32,7 @@ medusaIntegrationTestRunner({
expect(testEventPayloadHandlerMock).toHaveBeenCalled()
expect(
testEventPayloadHandlerMock.mock.calls[0][0].pluginOptions
).toEqual({})
).toEqual(expect.any(Object))
expect(testEventPayloadHandlerMock.mock.calls[0][0].event).toEqual({
name: eventName,
data: {

View File

@@ -5,7 +5,7 @@
"license": "MIT",
"private": true,
"scripts": {
"test:integration": "NODE_OPTIONS=--experimental-vm-modules jest --silent=false --no-cache --maxWorkers=50% --bail --detectOpenHandles --forceExit --logHeapUsage",
"test:integration": "NODE_OPTIONS=--experimental-vm-modules jest --no-cache --maxWorkers=50% --bail --detectOpenHandles --forceExit --logHeapUsage",
"test:integration:chunk": "NODE_OPTIONS=--experimental-vm-modules jest --silent --no-cache --bail --maxWorkers=50% --forceExit --testPathPattern=$(echo $CHUNKS | jq -r \".[${CHUNK}] | .[]\")",
"build": "tsc --allowJs --outDir ./dist"
},

View File

@@ -8,37 +8,11 @@
"dist"
],
"exports": {
".": {
"node": "./dist/index.js",
"import": "./dist/index.js",
"require": "./dist/index.js",
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
},
"./config": {
"types": "./dist/config/index.d.ts",
"import": "./dist/config/index.js",
"require": "./dist/config/index.js",
"node": "./dist/config/index.js"
},
"./logger": {
"types": "./dist/logger/index.d.ts",
"import": "./dist/logger/index.js",
"require": "./dist/logger/index.js",
"node": "./dist/logger/index.js"
},
"./database": {
"types": "./dist/database/index.d.ts",
"import": "./dist/database/index.js",
"require": "./dist/database/index.js",
"node": "./dist/database/index.js"
},
"./feature-flag": {
"types": "./dist/feature-flags/index.d.ts",
"import": "./dist/feature-flags/index.js",
"require": "./dist/feature-flags/index.js",
"node": "./dist/feature-flags/index.js"
}
".": "./dist/index.js",
"./config": "./dist/config/index.js",
"./logger": "./dist/logger/index.js",
"./database": "./dist/database/index.js",
"./subscribers": "./dist/subscribers/index.js"
},
"engines": {
"node": ">=20"

View File

@@ -3,4 +3,5 @@ export * from "./logger"
export * from "./http"
export * from "./database"
export * from "./container"
export * from "./subscribers"
export * from "./feature-flags"

View File

@@ -0,0 +1,10 @@
import { SubscriberArgs, SubscriberConfig } from "../../types"
export default async function orderNotifier(_: SubscriberArgs) {
return await Promise.resolve()
}
export const config: SubscriberConfig = {
event: ["order.placed", "order.canceled", "order.completed"],
context: { subscriberId: "order-notifier" },
}

View File

@@ -0,0 +1,12 @@
import { SubscriberArgs, SubscriberConfig } from "../../types"
export default async function productUpdater(_: SubscriberArgs) {
return await Promise.resolve()
}
export const config: SubscriberConfig = {
event: "product.updated",
context: {
subscriberId: "product-updater",
},
}

View File

@@ -0,0 +1,9 @@
import { SubscriberArgs, SubscriberConfig } from "../../types"
export default async function (_: SubscriberArgs) {
return await Promise.resolve()
}
export const config: SubscriberConfig = {
event: "variant.created",
}

View File

@@ -0,0 +1,5 @@
export const eventBusServiceMock = {
subscribe: jest.fn().mockImplementation((...args) => {
return Promise.resolve(args)
}),
}

View File

@@ -1,7 +1,9 @@
import { MedusaContainer } from "@medusajs/types"
import { join } from "path"
import { containerMock, eventBusServiceMock } from "../__mocks__"
import { SubscriberLoader } from "../index"
import { eventBusServiceMock } from "../__mocks__"
import { SubscriberLoader } from "../subscriber-loader"
import { container } from "../../container"
import { ModuleRegistrationName } from "@medusajs/utils"
import { asValue } from "awilix"
describe("SubscriberLoader", () => {
const rootDir = join(__dirname, "../__fixtures__", "subscribers")
@@ -15,13 +17,12 @@ describe("SubscriberLoader", () => {
let registeredPaths: string[] = []
beforeAll(async () => {
jest.clearAllMocks()
container.register(
ModuleRegistrationName.EVENT_BUS,
asValue(eventBusServiceMock)
)
const paths = await new SubscriberLoader(
rootDir,
containerMock as unknown as MedusaContainer,
pluginOptions
).load()
const paths = await new SubscriberLoader(rootDir, pluginOptions).load()
if (paths) {
registeredPaths = [...registeredPaths, ...paths]

View File

@@ -0,0 +1,2 @@
export * from "./subscriber-loader"
export * from "./types"

View File

@@ -1,15 +1,12 @@
import {
Event,
IEventBusModuleService,
MedusaContainer,
Subscriber,
} from "@medusajs/types"
import { kebabCase, ModuleRegistrationName } from "@medusajs/utils"
import { readdir } from "fs/promises"
import { extname, join, sep } from "path"
import { Event, IEventBusModuleService, Subscriber } from "@medusajs/types"
import { kebabCase, ModuleRegistrationName, promiseAll } from "@medusajs/utils"
import { access, readdir } from "fs/promises"
import { join, parse } from "path"
import { SubscriberArgs, SubscriberConfig } from "../../../types/subscribers"
import { logger } from "@medusajs/framework"
import { configManager } from "../config"
import { container } from "../container"
import { SubscriberArgs, SubscriberConfig } from "./types"
import { logger } from "../logger"
type SubscriberHandler<T> = (args: SubscriberArgs<T>) => Promise<void>
@@ -19,26 +16,37 @@ type SubscriberModule<T> = {
}
export class SubscriberLoader {
protected container_: MedusaContainer
protected pluginOptions_: Record<string, unknown>
protected rootDir_: string
protected excludes: RegExp[] = [
/**
* The options of the plugin from which the subscribers are being loaded
* @private
*/
#pluginOptions: Record<string, unknown>
/**
* The base directory from which to scan for the subscribers
* @private
*/
#sourceDir: string
/**
* The list of file names to exclude from the subscriber scan
* @private
*/
#excludes: RegExp[] = [
/\.DS_Store/,
/(\.ts\.map|\.js\.map|\.d\.ts|\.md)/,
/^_[^/\\]*(\.[^/\\]+)?$/,
]
protected subscriberDescriptors_: Map<string, SubscriberModule<any>> =
new Map()
/**
* Map of subscribers descriptors to consume in the loader
* @private
*/
#subscriberDescriptors: Map<string, SubscriberModule<any>> = new Map()
constructor(
rootDir: string,
container: MedusaContainer,
options: Record<string, unknown> = {}
) {
this.rootDir_ = rootDir
this.pluginOptions_ = options
this.container_ = container
constructor(sourceDir: string, options: Record<string, unknown> = {}) {
this.#sourceDir = sourceDir
this.#pluginOptions = options
}
private validateSubscriber(
@@ -54,7 +62,7 @@ export class SubscriberLoader {
/**
* If the handler is not a function, we can't use it
*/
logger.warn(`The subscriber in ${path} is not a function.`)
logger.warn(`The subscriber in ${path} is not a function. skipped.`)
return false
}
@@ -64,7 +72,7 @@ export class SubscriberLoader {
/**
* If the subscriber is missing a config, we can't use it
*/
logger.warn(`The subscriber in ${path} is missing a config.`)
logger.warn(`The subscriber in ${path} is missing a config. skipped.`)
return false
}
@@ -73,25 +81,27 @@ export class SubscriberLoader {
* If the subscriber is missing an event, we can't use it.
* In production we throw an error, else we log a warning
*/
if (process.env.NODE_ENV === "production") {
throw new Error(`The subscriber in ${path} is missing an event.`)
if (configManager.isProduction) {
throw new Error(
`The subscriber in ${path} is missing an event in the config.`
)
} else {
logger.warn(`The subscriber in ${path} is missing an event.`)
logger.warn(
`The subscriber in ${path} is missing an event in the config. skipped.`
)
}
return false
}
if (
typeof config.event !== "string" &&
!Array.isArray(config.event) &&
!config.event.every((e: unknown) => typeof e === "string")
) {
const events = Array.isArray(config.event) ? config.event : [config.event]
if (events.some((e: unknown) => !(typeof e === "string"))) {
/**
* If the subscribers event is not a string or an array of strings, we can't use it
*/
logger.warn(
`The subscriber in ${path} has an invalid event. The event must be a string or an array of strings.`
`The subscriber in ${path} has an invalid event config. The event must be a string or an array of strings. skipped.`
)
return false
}
@@ -99,7 +109,7 @@ export class SubscriberLoader {
return true
}
private async createDescriptor(absolutePath: string, entry: string) {
private async createDescriptor(absolutePath: string) {
return await import(absolutePath).then((module_) => {
const isValid = this.validateSubscriber(module_, absolutePath)
@@ -107,7 +117,7 @@ export class SubscriberLoader {
return
}
this.subscriberDescriptors_.set(absolutePath, {
this.#subscriberDescriptors.set(absolutePath, {
config: module_.config,
handler: module_.default,
})
@@ -115,39 +125,36 @@ export class SubscriberLoader {
}
private async createMap(dirPath: string) {
await Promise.all(
await readdir(dirPath, { withFileTypes: true }).then(async (entries) => {
return entries
.filter((entry) => {
if (
this.excludes.length &&
this.excludes.some((exclude) => exclude.test(entry.name))
) {
return false
}
const promises = await readdir(dirPath, {
recursive: true,
withFileTypes: true,
}).then(async (entries) => {
return entries.flatMap(async (entry) => {
if (
this.#excludes.length &&
this.#excludes.some((exclude) => exclude.test(entry.name))
) {
return
}
return true
})
.map(async (entry) => {
const fullPath = join(dirPath, entry.name)
const fullPath = join(dirPath, entry.name)
if (entry.isDirectory()) {
return await this.createMap(fullPath)
}
if (entry.isDirectory()) {
return await this.createMap(fullPath)
}
return await this.createDescriptor(fullPath, entry.name)
})
return await this.createDescriptor(fullPath)
})
)
})
await promiseAll(promises)
}
private inferIdentifier<T>(
fileName: string,
config: SubscriberConfig,
{ context }: SubscriberConfig,
handler: SubscriberHandler<T>
) {
const { context } = config
/**
* If subscriberId is provided, use that
*/
@@ -167,9 +174,7 @@ export class SubscriberLoader {
/**
* If the handler is anonymous, use the file name
*/
const idFromFile =
fileName.split(sep).pop()?.replace(extname(fileName), "") ?? ""
const idFromFile = parse(fileName).name
return kebabCase(idFromFile)
}
@@ -182,7 +187,7 @@ export class SubscriberLoader {
config: SubscriberConfig
handler: SubscriberHandler<T>
}) {
const eventBusService: IEventBusModuleService = this.container_.resolve(
const eventBusService: IEventBusModuleService = container.resolve(
ModuleRegistrationName.EVENT_BUS
)
@@ -196,13 +201,13 @@ export class SubscriberLoader {
const subscriber = async (data: T) => {
return await handler({
event: { name: e, ...data } as unknown as Event<T>,
container: this.container_,
pluginOptions: this.pluginOptions_,
container,
pluginOptions: this.#pluginOptions,
})
}
eventBusService.subscribe(e, subscriber as Subscriber, {
...(config.context ?? {}),
...config.context,
subscriberId,
})
}
@@ -212,22 +217,22 @@ export class SubscriberLoader {
let hasSubscriberDir = false
try {
await readdir(this.rootDir_)
await access(this.#sourceDir)
hasSubscriberDir = true
} catch (err) {
logger.debug(`No subscriber directory found in ${this.rootDir_}`)
hasSubscriberDir = false
logger.debug(`No subscriber directory found in ${this.#sourceDir}`)
}
if (!hasSubscriberDir) {
return
}
await this.createMap(this.rootDir_)
await this.createMap(this.#sourceDir)
const map = this.subscriberDescriptors_
for (const [fileName, { config, handler }] of map.entries()) {
for (const [
fileName,
{ config, handler },
] of this.#subscriberDescriptors.entries()) {
this.createSubscriber({
fileName,
config,
@@ -239,6 +244,6 @@ export class SubscriberLoader {
* Return the file paths of the registered subscribers, to prevent the
* backwards compatible loader from trying to register them.
*/
return [...map.keys()]
return [...this.#subscriberDescriptors.keys()]
}
}

View File

@@ -0,0 +1,16 @@
import { Event, MedusaContainer } from "@medusajs/types"
interface SubscriberContext extends Record<string, unknown> {
subscriberId?: string
}
export type SubscriberConfig = {
event: string | string[]
context?: SubscriberContext
}
export type SubscriberArgs<T = unknown> = {
event: Event<T>
container: MedusaContainer
pluginOptions: Record<string, unknown>
}

View File

@@ -1,18 +0,0 @@
import {
SubscriberArgs,
SubscriberConfig,
} from "../../../../../types/subscribers"
export default async function orderNotifier({
data,
eventName,
container,
pluginOptions,
}: SubscriberArgs) {
return Promise.resolve()
}
export const config: SubscriberConfig = {
event: ["order.placed", "order.canceled", "order.completed"],
context: { subscriberId: "order-notifier" },
}

View File

@@ -1,20 +0,0 @@
import {
SubscriberArgs,
SubscriberConfig,
} from "../../../../../types/subscribers"
export default async function productUpdater({
data,
eventName,
container,
pluginOptions,
}: SubscriberArgs) {
return Promise.resolve()
}
export const config: SubscriberConfig = {
event: "product.updated",
context: {
subscriberId: "product-updater",
},
}

View File

@@ -1,17 +0,0 @@
import {
SubscriberArgs,
SubscriberConfig,
} from "../../../../../types/subscribers"
export default async function ({
data,
eventName,
container,
pluginOptions,
}: SubscriberArgs) {
return Promise.resolve()
}
export const config: SubscriberConfig = {
event: "variant.created",
}

View File

@@ -1,18 +0,0 @@
import { ModuleRegistrationName } from "@medusajs/utils"
export const eventBusServiceMock = {
subscribe: jest.fn().mockImplementation((...args) => {
return Promise.resolve(args)
}),
}
export const containerMock = {
// mock .resolve method so if its called with "eventBusService" it returns the mock
resolve: jest.fn().mockImplementation((name: string) => {
if (name === ModuleRegistrationName.EVENT_BUS) {
return eventBusServiceMock
} else {
return {}
}
}),
}

View File

@@ -15,12 +15,12 @@ import {
featureFlagsLoader,
logger,
pgConnectionLoader,
SubscriberLoader,
} from "@medusajs/framework"
import { registerJobs } from "./helpers/register-jobs"
import { registerWorkflows } from "./helpers/register-workflows"
import { getResolvedPlugins } from "./helpers/resolve-plugins"
import { resolvePluginsLinks } from "./helpers/resolve-plugins-links"
import { SubscriberLoader } from "./helpers/subscribers"
import loadMedusaApp from "./medusa-app"
type Options = {
@@ -39,17 +39,11 @@ const shouldLoadBackgroundProcessors = (configModule) => {
)
}
async function subscribersLoader(
plugins: PluginDetails[],
container: MedusaContainer
) {
async function subscribersLoader(plugins: PluginDetails[]) {
/**
* Load subscribers from the medusa/medusa package
*/
await new SubscriberLoader(
path.join(__dirname, "../subscribers"),
container
).load()
await new SubscriberLoader(path.join(__dirname, "../subscribers")).load()
/**
* Load subscribers from all the plugins.
@@ -58,7 +52,7 @@ async function subscribersLoader(
plugins.map(async (pluginDetails) => {
await new SubscriberLoader(
path.join(pluginDetails.resolve, "subscribers"),
container
pluginDetails.options
).load()
})
)
@@ -83,7 +77,7 @@ async function loadEntrypoints(
)
if (shouldLoadBackgroundProcessors(configModule)) {
await subscribersLoader(plugins, container)
await subscribersLoader(plugins)
await jobsLoader(plugins)
}