diff --git a/packages/medusa/src/loaders/helpers/subscribers/__fixtures__/subscribers/order-notifier.ts b/packages/medusa/src/loaders/helpers/subscribers/__fixtures__/subscribers/order-notifier.ts new file mode 100644 index 0000000000..9c5f6531b3 --- /dev/null +++ b/packages/medusa/src/loaders/helpers/subscribers/__fixtures__/subscribers/order-notifier.ts @@ -0,0 +1,18 @@ +import { + SubscriberArgs, + SubscriberConfig, +} from "../../../../../types/subscribers" + +export default async function orderNotifier({ + data, + eventName, + container, + pluginOptions, +}: SubscriberArgs) { + return Promise.resolve() +} + +export const config: SubscriberConfig = { + event: ["order.placed", "order.cancelled", "order.completed"], + context: { subscriberId: "order-notifier" }, +} diff --git a/packages/medusa/src/loaders/helpers/subscribers/__fixtures__/subscribers/product-updater.ts b/packages/medusa/src/loaders/helpers/subscribers/__fixtures__/subscribers/product-updater.ts new file mode 100644 index 0000000000..67e536783b --- /dev/null +++ b/packages/medusa/src/loaders/helpers/subscribers/__fixtures__/subscribers/product-updater.ts @@ -0,0 +1,20 @@ +import { + SubscriberArgs, + SubscriberConfig, +} from "../../../../../types/subscribers" + +export default async function productUpdater({ + data, + eventName, + container, + pluginOptions, +}: SubscriberArgs) { + return Promise.resolve() +} + +export const config: SubscriberConfig = { + event: "product.updated", + context: { + subscriberId: "product-updater", + }, +} diff --git a/packages/medusa/src/loaders/helpers/subscribers/__fixtures__/subscribers/variant-created.ts b/packages/medusa/src/loaders/helpers/subscribers/__fixtures__/subscribers/variant-created.ts new file mode 100644 index 0000000000..874290d550 --- /dev/null +++ b/packages/medusa/src/loaders/helpers/subscribers/__fixtures__/subscribers/variant-created.ts @@ -0,0 +1,17 @@ +import { + SubscriberArgs, + SubscriberConfig, +} from "../../../../../types/subscribers" + +export default async function ({ + data, + eventName, + container, + pluginOptions, +}: SubscriberArgs) { + return Promise.resolve() +} + +export const config: SubscriberConfig = { + event: "variant.created", +} diff --git a/packages/medusa/src/loaders/helpers/subscribers/__mocks__/index.ts b/packages/medusa/src/loaders/helpers/subscribers/__mocks__/index.ts new file mode 100644 index 0000000000..c526507eeb --- /dev/null +++ b/packages/medusa/src/loaders/helpers/subscribers/__mocks__/index.ts @@ -0,0 +1,18 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" + +export const eventBusServiceMock = { + subscribe: jest.fn().mockImplementation((...args) => { + return Promise.resolve(args) + }), +} + +export const containerMock = { + // mock .resolve method so if its called with "eventBusService" it returns the mock + resolve: jest.fn().mockImplementation((name: string) => { + if (name === ModuleRegistrationName.EVENT_BUS) { + return eventBusServiceMock + } else { + return {} + } + }), +} diff --git a/packages/medusa/src/loaders/helpers/subscribers/__tests__/index.spec.ts b/packages/medusa/src/loaders/helpers/subscribers/__tests__/index.spec.ts new file mode 100644 index 0000000000..b05600568e --- /dev/null +++ b/packages/medusa/src/loaders/helpers/subscribers/__tests__/index.spec.ts @@ -0,0 +1,114 @@ +import { MedusaContainer } from "@medusajs/types" +import { join } from "path" +import { containerMock, eventBusServiceMock } from "../__mocks__" +import { SubscriberLoader } from "../index" + +describe("SubscriberLoader", () => { + const rootDir = join(__dirname, "../__fixtures__", "subscribers") + + const pluginOptions = { + important_data: { + enabled: true, + }, + } + + let registeredPaths: string[] = [] + + beforeAll(async () => { + jest.clearAllMocks() + + const paths = await new SubscriberLoader( + rootDir, + containerMock as unknown as MedusaContainer, + pluginOptions + ).load() + + if (paths) { + registeredPaths = [...registeredPaths, ...paths] + } + }) + + it("should register each subscriber in the '/subscribers' folder", async () => { + // As '/subscribers' contains 3 subscribers, we expect the number of registered paths to be 3 + expect(registeredPaths.length).toEqual(3) + }) + + it("should have registered subscribers for 5 events", async () => { + /** + * The 'product-updater.ts' subscriber is registered for the following events: + * - "product.created" + * The 'order-updater.ts' subscriber is registered for the following events: + * - "order.placed" + * - "order.canceled" + * - "order.completed" + * The 'variant-created.ts' subscriber is registered for the following events: + * - "variant.created" + * + * This means that we expect the eventBusServiceMock.subscribe method to have + * been called times, once for 'product-updater.ts', once for 'variant-created.ts', + * and 3 times for 'order-updater.ts'. + */ + expect(eventBusServiceMock.subscribe).toHaveBeenCalledTimes(5) + }) + + it("should have registered subscribers with the correct props", async () => { + /** + * The 'product-updater.ts' subscriber is registered + * with a explicit subscriberId of "product-updater". + */ + expect(eventBusServiceMock.subscribe).toHaveBeenCalledWith( + "product.updated", + expect.any(Function), + { + subscriberId: "product-updater", + } + ) + + /** + * The 'order-updater.ts' subscriber is registered + * without an explicit subscriberId, which means that + * the loader tries to infer one from either the handler + * functions name or the file name. In this case, the + * handler function is named 'orderUpdater' and is used + * to infer the subscriberId. + */ + expect(eventBusServiceMock.subscribe).toHaveBeenCalledWith( + "order.placed", + expect.any(Function), + { + subscriberId: "order-notifier", + } + ) + + expect(eventBusServiceMock.subscribe).toHaveBeenCalledWith( + "order.cancelled", + expect.any(Function), + { + subscriberId: "order-notifier", + } + ) + + expect(eventBusServiceMock.subscribe).toHaveBeenCalledWith( + "order.completed", + expect.any(Function), + { + subscriberId: "order-notifier", + } + ) + + /** + * The 'variant-created.ts' subscriber is registered + * without an explicit subscriberId, and with an anonymous + * handler function. This means that the loader tries to + * infer the subscriberId from the file name, which in this + * case is 'variant-created.ts'. + */ + expect(eventBusServiceMock.subscribe).toHaveBeenCalledWith( + "variant.created", + expect.any(Function), + { + subscriberId: "variant-created", + } + ) + }) +}) diff --git a/packages/medusa/src/loaders/helpers/subscribers/index.ts b/packages/medusa/src/loaders/helpers/subscribers/index.ts new file mode 100644 index 0000000000..dea12a2b32 --- /dev/null +++ b/packages/medusa/src/loaders/helpers/subscribers/index.ts @@ -0,0 +1,244 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { + IEventBusModuleService, + MedusaContainer, + Subscriber, +} from "@medusajs/types" +import { kebabCase } from "@medusajs/utils" +import { readdir } from "fs/promises" +import { extname, join, sep } from "path" + +import { SubscriberArgs, SubscriberConfig } from "../../../types/subscribers" +import logger from "../../logger" + +type SubscriberHandler = (args: SubscriberArgs) => Promise + +type SubscriberModule = { + config: SubscriberConfig + handler: SubscriberHandler +} + +export class SubscriberLoader { + protected container_: MedusaContainer + protected pluginOptions_: Record + protected rootDir_: string + protected excludes: RegExp[] = [ + /\.DS_Store/, + /(\.ts\.map|\.js\.map|\.d\.ts)/, + /^_[^/\\]*(\.[^/\\]+)?$/, + ] + + protected subscriberDescriptors_: Map> = + new Map() + + constructor( + rootDir: string, + container: MedusaContainer, + options: Record = {} + ) { + this.rootDir_ = rootDir + this.pluginOptions_ = options + this.container_ = container + } + + private validateSubscriber( + subscriber: any, + path: string + ): subscriber is { + default: SubscriberHandler + config: SubscriberConfig + } { + const handler = subscriber.default + + if (!handler || typeof handler !== "function") { + /** + * If the handler is not a function, we can't use it + */ + logger.warn(`The subscriber in ${path} is not a function.`) + return false + } + + const config = subscriber.config + + if (!config) { + /** + * If the subscriber is missing a config, we can't use it + */ + logger.warn(`The subscriber in ${path} is missing a config.`) + return false + } + + if (!config.event) { + /** + * If the subscriber is missing an event, we can't use it. + * In production we throw an error, else we log a warning + */ + if (process.env.NODE_ENV === "production") { + throw new Error(`The subscriber in ${path} is missing an event.`) + } else { + logger.warn(`The subscriber in ${path} is missing an event.`) + } + + return false + } + + if ( + typeof config.event !== "string" && + !Array.isArray(config.event) && + !config.event.every((e: unknown) => typeof e === "string") + ) { + /** + * If the subscribers event is not a string or an array of strings, we can't use it + */ + logger.warn( + `The subscriber in ${path} has an invalid event. The event must be a string or an array of strings.` + ) + return false + } + + return true + } + + private async createDescriptor(absolutePath: string, entry: string) { + return await import(absolutePath).then((module_) => { + const isValid = this.validateSubscriber(module_, absolutePath) + + if (!isValid) { + return + } + + this.subscriberDescriptors_.set(absolutePath, { + config: module_.config, + handler: module_.default, + }) + }) + } + + private async createMap(dirPath: string) { + await Promise.all( + await readdir(dirPath, { withFileTypes: true }).then(async (entries) => { + return entries + .filter((entry) => { + if ( + this.excludes.length && + this.excludes.some((exclude) => exclude.test(entry.name)) + ) { + return false + } + + return true + }) + .map(async (entry) => { + const fullPath = join(dirPath, entry.name) + + if (entry.isDirectory()) { + return this.createMap(fullPath) + } + + return await this.createDescriptor(fullPath, entry.name) + }) + }) + ) + } + + private inferIdentifier( + fileName: string, + config: SubscriberConfig, + handler: SubscriberHandler + ) { + const { context } = config + + /** + * If subscriberId is provided, use that + */ + if (context?.subscriberId) { + return context.subscriberId + } + + const handlerName = handler.name + + /** + * If the handler is not anonymous, use the name + */ + if (handlerName && !handlerName.startsWith("_default")) { + return kebabCase(handlerName) + } + + /** + * If the handler is anonymous, use the file name + */ + const idFromFile = + fileName.split(sep).pop()?.replace(extname(fileName), "") ?? "" + + return kebabCase(idFromFile) + } + + private createSubscriber({ + fileName, + config, + handler, + }: { + fileName: string + config: SubscriberConfig + handler: SubscriberHandler + }) { + const eventBusService: IEventBusModuleService = this.container_.resolve( + ModuleRegistrationName.EVENT_BUS + ) + + const { event } = config + + const events = Array.isArray(event) ? event : [event] + + const subscriberId = this.inferIdentifier(fileName, config, handler) + + for (const e of events) { + const subscriber = async (data: T) => { + return handler({ + eventName: e, + data, + container: this.container_, + pluginOptions: this.pluginOptions_, + }) + } + + eventBusService.subscribe(e, subscriber as Subscriber, { + ...(config.context ?? {}), + subscriberId, + }) + } + } + + async load() { + let hasSubscriberDir = false + + try { + await readdir(this.rootDir_) + hasSubscriberDir = true + } catch (err) { + hasSubscriberDir = false + } + + if (!hasSubscriberDir) { + return + } + + await this.createMap(this.rootDir_) + + const map = this.subscriberDescriptors_ + + for (const [fileName, { config, handler }] of map.entries()) { + this.createSubscriber({ + fileName, + config, + handler, + }) + } + + /** + * Return the file paths of the registered subscribers, to prevent the + * backwards compatible loader from trying to register them. + */ + return [...map.keys()] + } +} diff --git a/packages/medusa/src/loaders/index.ts b/packages/medusa/src/loaders/index.ts index 62c58ca10c..43077df4d1 100644 --- a/packages/medusa/src/loaders/index.ts +++ b/packages/medusa/src/loaders/index.ts @@ -6,6 +6,7 @@ import { Express, NextFunction, Request, Response } from "express" import { createMedusaContainer } from "medusa-core-utils" import requestIp from "request-ip" import { v4 } from "uuid" +import path from "path" import { MedusaContainer } from "../types/global" import adminLoader from "./admin" import apiLoader from "./api" @@ -16,7 +17,7 @@ import { registerProjectWorkflows } from "./helpers/register-workflows" import Logger from "./logger" import loadMedusaApp from "./medusa-app" import pgConnectionLoader from "./pg-connection" -import subscribersLoader from "./subscribers" +import { SubscriberLoader } from "./helpers/subscribers" type Options = { directory: string @@ -28,6 +29,11 @@ const isWorkerMode = (configModule) => { return configModule.projectConfig.worker_mode === "worker" } +async function subscribersLoader(container) { + const subscribersPath = path.join(__dirname, "../subscribers") + await new SubscriberLoader(subscribersPath, container).load() +} + async function loadEntrypoints( configModule, container, @@ -55,9 +61,7 @@ async function loadEntrypoints( }) await adminLoader({ app: expressApp, configModule }) - - subscribersLoader({ container }) - + await subscribersLoader(container) await apiLoader({ container, app: expressApp, diff --git a/packages/medusa/src/loaders/subscribers.ts b/packages/medusa/src/loaders/subscribers.ts deleted file mode 100644 index 88350475ad..0000000000 --- a/packages/medusa/src/loaders/subscribers.ts +++ /dev/null @@ -1,26 +0,0 @@ -import glob from "glob" -import path from "path" -import { asFunction } from "awilix" -import { MedusaContainer } from "../types/global" -import { MedusaError } from "@medusajs/utils" - -/** - * Registers all subscribers in the subscribers directory - */ -export default ({ container }: { container: MedusaContainer }) => { - const corePath = "../subscribers/*.js" - const coreFull = path.join(__dirname, corePath) - - const core = glob.sync(coreFull, { cwd: __dirname }) - core.forEach((fn) => { - const loaded = require(fn).default - if (!loaded) { - throw new MedusaError( - MedusaError.Types.UNEXPECTED_STATE, - `Subscriber ${fn} does not have a default export` - ) - } - - container.build(asFunction((cradle) => new loaded(cradle)).singleton()) - }) -} diff --git a/packages/medusa/src/subscribers/configurable-notifications.ts b/packages/medusa/src/subscribers/configurable-notifications.ts index 95e1663a8e..cc622134d7 100644 --- a/packages/medusa/src/subscribers/configurable-notifications.ts +++ b/packages/medusa/src/subscribers/configurable-notifications.ts @@ -1,14 +1,21 @@ -import { IEventBusService, INotificationModuleService } from "@medusajs/types" +import { INotificationModuleService } from "@medusajs/types" import { get } from "lodash" +import { SubscriberArgs, SubscriberConfig } from "../types/subscribers" +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { ContainerRegistrationKeys, promiseAll } from "@medusajs/utils" -type InjectedDependencies = { - notificationModuleService: INotificationModuleService - eventBusModuleService: IEventBusService +type HandlerConfig = { + event: string + template: string + channel: string + to: string + resource_id: string + data: Record } // TODO: The config should be loaded dynamically from medusa-config.js // TODO: We can use a more powerful templating syntax to allow for eg. combining fields. -const config = [ +const handlerConfig: HandlerConfig[] = [ { event: "order.created", template: "order-created-template", @@ -21,44 +28,61 @@ const config = [ }, ] -class ConfigurableNotificationsSubscriber { - private readonly eventBusModuleService_: IEventBusService - private readonly notificationModuleService_: INotificationModuleService +const configAsMap = handlerConfig.reduce( + (acc: Record, h) => { + if (!acc[h.event]) { + acc[h.event] = [] + } - constructor({ - eventBusModuleService, - notificationModuleService, - }: InjectedDependencies) { - this.eventBusModuleService_ = eventBusModuleService - this.notificationModuleService_ = notificationModuleService + acc[h.event].push(h) + return acc + }, + {} +) - config.forEach((eventHandler) => { - this.eventBusModuleService_.subscribe( - eventHandler.event, - async (data: any) => { - const payload = data.data +export default async function configurableNotifications({ + data, + eventName, + container, +}: SubscriberArgs) { + const logger = container.resolve(ContainerRegistrationKeys.LOGGER) + const notificationService: INotificationModuleService = container.resolve( + ModuleRegistrationName.NOTIFICATION + ) - const notificationData = { - template: eventHandler.template, - channel: eventHandler.channel, - to: get(payload, eventHandler.to), - trigger_type: eventHandler.event, - resource_id: get(payload, eventHandler.resource_id), - data: Object.entries(eventHandler.data).reduce( - (acc, [key, value]) => { - acc[key] = get(payload, value) - return acc - }, - {} - ), - } + const handlers = configAsMap[eventName] ?? [] + const payload = data.data - await this.notificationModuleService_.create(notificationData) - return - } - ) + await promiseAll( + handlers.map(async (handler) => { + const notificationData = { + template: handler.template, + channel: handler.channel, + to: get(payload, handler.to), + trigger_type: handler.event, + resource_id: get(payload, handler.resource_id), + data: Object.entries(handler.data).reduce((acc, [key, value]) => { + acc[key] = get(payload, value) + return acc + }, {}), + } + + // We don't want to fail all handlers, so we catch and log errors only + try { + await notificationService.create(notificationData) + } catch (err) { + logger.error( + `Failed to send notification for ${eventName}`, + err.message + ) + } }) - } + ) } -export default ConfigurableNotificationsSubscriber +export const config: SubscriberConfig = { + event: handlerConfig.map((h) => h.event), + context: { + subscriberId: "configurable-notifications-handler", + }, +} diff --git a/packages/medusa/src/subscribers/payment-webhook.ts b/packages/medusa/src/subscribers/payment-webhook.ts index 1e4a7c995f..8527529336 100644 --- a/packages/medusa/src/subscribers/payment-webhook.ts +++ b/packages/medusa/src/subscribers/payment-webhook.ts @@ -1,52 +1,36 @@ import { PaymentWebhookEvents } from "@medusajs/utils" - -import { - IEventBusService, - IPaymentModuleService, - ProviderWebhookPayload, - Subscriber, -} from "@medusajs/types" +import { IPaymentModuleService, ProviderWebhookPayload } from "@medusajs/types" +import { SubscriberArgs, SubscriberConfig } from "../types/subscribers" +import { ModuleRegistrationName } from "@medusajs/modules-sdk" type SerializedBuffer = { data: ArrayBuffer type: "Buffer" } -type InjectedDependencies = { - paymentModuleService: IPaymentModuleService - eventBusModuleService: IEventBusService -} +export default async function paymentWebhookhandler({ + data, + container, +}: SubscriberArgs) { + const paymentService: IPaymentModuleService = container.resolve( + ModuleRegistrationName.PAYMENT + ) -class PaymentWebhookSubscriber { - private readonly eventBusModuleService_: IEventBusService - private readonly paymentModuleService_: IPaymentModuleService + const input = "data" in data ? data.data : data - constructor({ - eventBusModuleService, - paymentModuleService, - }: InjectedDependencies) { - this.eventBusModuleService_ = eventBusModuleService - this.paymentModuleService_ = paymentModuleService - - this.eventBusModuleService_.subscribe( - PaymentWebhookEvents.WebhookReceived, - this.processEvent as Subscriber + if ( + (input.payload.rawData as unknown as SerializedBuffer).type === "Buffer" + ) { + input.payload.rawData = Buffer.from( + (input.payload.rawData as unknown as SerializedBuffer).data ) } - - /** - * TODO: consider moving this to a workflow - */ - processEvent = async (data: ProviderWebhookPayload): Promise => { - if ( - (data.payload.rawData as unknown as SerializedBuffer).type === "Buffer" - ) { - data.payload.rawData = Buffer.from( - (data.payload.rawData as unknown as SerializedBuffer).data - ) - } - await this.paymentModuleService_.processEvent(data) - } + await paymentService.processEvent(input) } -export default PaymentWebhookSubscriber +export const config: SubscriberConfig = { + event: PaymentWebhookEvents.WebhookReceived, + context: { + subscriberId: "payment-webhook-handler", + }, +}