feat: Add a simple configurable notifications subscriber (#7331)

* feat: Add a simple configurable notifications subscriber that is configurable

* Proposal on awaiting all subscribers to run

* fix: Clean up wait subscribers util and notifications test

---------

Co-authored-by: adrien2p <adrien.deperetti@gmail.com>
This commit is contained in:
Stevche Radevski
2024-05-16 13:36:09 +02:00
committed by GitHub
parent 2e42e053d4
commit ee924b1b28
27 changed files with 295 additions and 740 deletions

View File

@@ -1,11 +1,12 @@
import { ModuleRegistrationName } from "@medusajs/modules-sdk"
import {
CreateNotificationDTO,
IEventBusModuleService,
INotificationModuleService,
Logger,
} from "@medusajs/types"
import { ContainerRegistrationKeys } from "@medusajs/utils"
import { medusaIntegrationTestRunner } from "medusa-test-utils"
import { medusaIntegrationTestRunner, TestEventUtils } from "medusa-test-utils"
jest.setTimeout(50000)
@@ -13,7 +14,7 @@ const env = { MEDUSA_FF_MEDUSA_V2: true }
medusaIntegrationTestRunner({
env,
testSuite: ({ getContainer }) => {
describe("Notification module", () => {
describe("Notifications", () => {
let service: INotificationModuleService
let logger: Logger
@@ -26,102 +27,168 @@ medusaIntegrationTestRunner({
jest.restoreAllMocks()
})
it("should successfully send a notification for an available channel", async () => {
const logSpy = jest.spyOn(logger, "info")
const notification = {
to: "test@medusajs.com",
channel: "email",
template: "order-created",
data: { username: "john-doe" },
trigger_type: "order-created",
resource_id: "order-id",
resource_type: "order",
} as CreateNotificationDTO
const result = await service.create(notification)
const fromDB = await service.retrieve(result.id)
expect(result).toEqual(
expect.objectContaining({
id: expect.any(String),
to: "test@medusajs.com",
provider_id: "local-notification-provider",
})
)
delete fromDB.original_notification_id
delete fromDB.external_id
delete fromDB.receiver_id
delete (fromDB as any).idempotency_key
delete (fromDB as any).provider
expect(result).toEqual(fromDB)
expect(logSpy).toHaveBeenCalledWith(
'Attempting to send a notification to: test@medusajs.com on the channel: email with template: order-created and data: {"username":"john-doe"}'
)
})
it("should throw an exception if there is no provider for the channel", async () => {
const notification = {
to: "test@medusajs.com",
channel: "sms",
} as CreateNotificationDTO
const error = await service.create(notification).catch((e) => e)
expect(error.message).toEqual(
"Could not find a notification provider for channel: sms"
)
})
it("should allow listing all notifications with filters", async () => {
const notification1 = {
to: "test@medusajs.com",
channel: "email",
template: "order-created",
} as CreateNotificationDTO
const notification2 = {
to: "test@medusajs.com",
channel: "log",
template: "product-created",
} as CreateNotificationDTO
await service.create([notification1, notification2])
const notifications = await service.list({ channel: "log" })
expect(notifications).toHaveLength(1)
expect(notifications[0]).toEqual(
expect.objectContaining({
to: "test@medusajs.com",
channel: "log",
template: "product-created",
})
)
})
it("should allow retrieving a notification", async () => {
const notification1 = {
to: "test@medusajs.com",
channel: "email",
template: "order-created",
} as CreateNotificationDTO
const notification2 = {
to: "test@medusajs.com",
channel: "log",
template: "product-created",
} as CreateNotificationDTO
const [first] = await service.create([notification1, notification2])
const notification = await service.retrieve(first.id)
expect(notification).toEqual(
expect.objectContaining({
describe("Notifications module", () => {
it("should successfully send a notification for an available channel", async () => {
const logSpy = jest.spyOn(logger, "info")
const notification = {
to: "test@medusajs.com",
channel: "email",
template: "order-created",
data: { username: "john-doe" },
trigger_type: "order-created",
resource_id: "order-id",
resource_type: "order",
} as CreateNotificationDTO
const result = await service.create(notification)
const fromDB = await service.retrieve(result.id)
expect(result).toEqual(
expect.objectContaining({
id: expect.any(String),
to: "test@medusajs.com",
provider_id: "local-notification-provider",
})
)
expect(result).toEqual(
expect.objectContaining({
to: "test@medusajs.com",
channel: "email",
data: {
username: "john-doe",
},
id: expect.any(String),
provider_id: "local-notification-provider",
resource_id: "order-id",
resource_type: "order",
template: "order-created",
trigger_type: "order-created",
})
)
expect(fromDB).toEqual(
expect.objectContaining({
to: "test@medusajs.com",
channel: "email",
data: {
username: "john-doe",
},
id: expect.any(String),
provider_id: "local-notification-provider",
resource_id: "order-id",
resource_type: "order",
template: "order-created",
trigger_type: "order-created",
})
)
expect(logSpy).toHaveBeenCalledWith(
`Attempting to send a notification to: 'test@medusajs.com' on the channel: 'email' with template: 'order-created' and data: '{\"username\":\"john-doe\"}'`
)
})
it("should throw an exception if there is no provider for the channel", async () => {
const notification = {
to: "test@medusajs.com",
channel: "sms",
} as CreateNotificationDTO
const error = await service.create(notification).catch((e) => e)
expect(error.message).toEqual(
"Could not find a notification provider for channel: sms"
)
})
it("should allow listing all notifications with filters", async () => {
const notification1 = {
to: "test@medusajs.com",
channel: "email",
template: "order-created",
} as CreateNotificationDTO
const notification2 = {
to: "test@medusajs.com",
channel: "log",
template: "product-created",
} as CreateNotificationDTO
await service.create([notification1, notification2])
const notifications = await service.list({ channel: "log" })
expect(notifications).toHaveLength(1)
expect(notifications[0]).toEqual(
expect.objectContaining({
to: "test@medusajs.com",
channel: "log",
template: "product-created",
})
)
})
it("should allow retrieving a notification", async () => {
const notification1 = {
to: "test@medusajs.com",
channel: "email",
template: "order-created",
} as CreateNotificationDTO
const notification2 = {
to: "test@medusajs.com",
channel: "log",
template: "product-created",
} as CreateNotificationDTO
const [first] = await service.create([notification1, notification2])
const notification = await service.retrieve(first.id)
expect(notification).toEqual(
expect.objectContaining({
to: "test@medusajs.com",
channel: "email",
template: "order-created",
})
)
})
})
describe("Configurable notification subscriber", () => {
let eventBus: IEventBusModuleService
beforeAll(async () => {
eventBus = getContainer().resolve(ModuleRegistrationName.EVENT_BUS)
})
it("should successfully sent a notification when an order is created (based on configuration)", async () => {
const subscriberExecution = TestEventUtils.waitSubscribersExecution(
"order.created",
eventBus
)
const logSpy = jest.spyOn(logger, "info")
await eventBus.emit("order.created", {
data: {
order: {
id: "1234",
email: "test@medusajs.com",
},
},
})
)
await subscriberExecution
const notifications = await service.list()
expect(logSpy).toHaveBeenLastCalledWith(
`Attempting to send a notification to: 'test@medusajs.com' on the channel: 'email' with template: 'order-created-template' and data: '{\"order_id\":\"1234\"}'`
)
expect(notifications).toHaveLength(1)
expect(notifications[0]).toEqual(
expect.objectContaining({
to: "test@medusajs.com",
channel: "email",
template: "order-created-template",
})
)
})
})
})
},

View File

@@ -0,0 +1,28 @@
import { IEventBusModuleService } from "@medusajs/types"
// Allows you to wait for all subscribers to execute for a given event. Only works with the local event bus.
export const waitSubscribersExecution = (
eventName: string,
eventBus: IEventBusModuleService
) => {
const subscriberPromises: Promise<any>[] = []
;(eventBus as any).eventEmitter_.listeners(eventName).forEach((listener) => {
;(eventBus as any).eventEmitter_.removeListener("order.created", listener)
let ok, nok
const promise = new Promise((resolve, reject) => {
ok = resolve
nok = reject
})
subscriberPromises.push(promise)
const newListener = async (...args2) => {
return await listener.apply(eventBus, args2).then(ok).catch(nok)
}
;(eventBus as any).eventEmitter_.on("order.created", newListener)
})
return Promise.all(subscriberPromises)
}

View File

@@ -1,6 +1,7 @@
export * as TestDatabaseUtils from "./database"
export { default as IdMap } from "./id-map"
export * as TestEventUtils from "./events"
export * as JestUtils from "./jest"
export { default as IdMap } from "./id-map"
export { default as MockManager } from "./mock-manager"
export { default as MockRepository } from "./mock-repository"
export * from "./init-modules"

View File

@@ -78,7 +78,7 @@ export function moduleIntegrationTestRunner({
const moduleOptions_: InitModulesOptions = {
injectedDependencies: {
[ContainerRegistrationKeys.PG_CONNECTION]: connection,
eventBusService: new MockEventBusService(),
["eventBusModuleService"]: new MockEventBusService(),
[ContainerRegistrationKeys.LOGGER]: console,
...injectedDependencies,
},

View File

@@ -6,7 +6,7 @@ export const StockLocationModule = {
label: "StockLocationService",
isRequired: false,
isQueryable: true,
dependencies: ["eventBusService"],
dependencies: ["eventBusModuleService"],
defaultModuleDeclaration: {
scope: "internal",
resources: "shared",

View File

@@ -113,7 +113,7 @@ export const ModulesDefinition: { [key: string | Modules]: ModuleDefinition } =
label: upperCaseFirst(ModuleRegistrationName.STOCK_LOCATION),
isRequired: false,
isQueryable: true,
dependencies: ["eventBusService"],
dependencies: [ModuleRegistrationName.EVENT_BUS],
defaultModuleDeclaration: {
scope: MODULE_SCOPE.INTERNAL,
resources: MODULE_RESOURCE_TYPE.SHARED,
@@ -126,7 +126,7 @@ export const ModulesDefinition: { [key: string | Modules]: ModuleDefinition } =
label: upperCaseFirst(ModuleRegistrationName.INVENTORY),
isRequired: false,
isQueryable: true,
dependencies: ["eventBusService"],
dependencies: [ModuleRegistrationName.EVENT_BUS],
defaultModuleDeclaration: {
scope: MODULE_SCOPE.INTERNAL,
resources: MODULE_RESOURCE_TYPE.SHARED,
@@ -228,7 +228,7 @@ export const ModulesDefinition: { [key: string | Modules]: ModuleDefinition } =
label: upperCaseFirst(ModuleRegistrationName.FULFILLMENT),
isRequired: false,
isQueryable: true,
dependencies: ["logger", "eventBusService"],
dependencies: ["logger", ModuleRegistrationName.EVENT_BUS],
defaultModuleDeclaration: {
scope: MODULE_SCOPE.INTERNAL,
resources: MODULE_RESOURCE_TYPE.SHARED,
@@ -306,7 +306,7 @@ export const ModulesDefinition: { [key: string | Modules]: ModuleDefinition } =
label: upperCaseFirst(ModuleRegistrationName.ORDER),
isRequired: false,
isQueryable: true,
dependencies: ["logger", "eventBusService"],
dependencies: ["logger", ModuleRegistrationName.EVENT_BUS],
defaultModuleDeclaration: {
scope: MODULE_SCOPE.INTERNAL,
resources: MODULE_RESOURCE_TYPE.SHARED,
@@ -319,7 +319,7 @@ export const ModulesDefinition: { [key: string | Modules]: ModuleDefinition } =
label: upperCaseFirst(ModuleRegistrationName.TAX),
isRequired: false,
isQueryable: true,
dependencies: ["logger", "eventBusService"],
dependencies: ["logger", ModuleRegistrationName.EVENT_BUS],
defaultModuleDeclaration: {
scope: MODULE_SCOPE.INTERNAL,
resources: MODULE_RESOURCE_TYPE.SHARED,

View File

@@ -571,13 +571,8 @@ export function abstractModuleServiceFactory<
// TODO: Should use ModuleRegistrationName.EVENT_BUS but it would require to move it to the utils package to prevent circular dependencies
(key) => key === "eventBusModuleService"
)
const hasEventBusService = Object.keys(this.__container__).find(
(key) => key === "eventBusService"
)
this.eventBusModuleService_ = hasEventBusService
? this.__container__.eventBusService
: hasEventBusModuleService
this.eventBusModuleService_ = hasEventBusModuleService
? this.__container__.eventBusModuleService
: undefined
}

View File

@@ -16,7 +16,7 @@ export const POST = async (req: MedusaRequest, res: MedusaResponse) => {
payload: { data: req.body, rawData: req.rawBody, headers: req.headers },
}
const eventBus = req.scope.resolve("eventBusService")
const eventBus = req.scope.resolve(ModuleRegistrationName.EVENT_BUS)
// we delay the processing of the event to avoid a conflict caused by a race condition
await eventBus.emit(PaymentWebhookEvents.WebhookReceived, event, {

View File

@@ -1,17 +0,0 @@
import {
SubscriberArgs,
SubscriberConfig,
} from "../../../../../types/subscribers"
export default async function orderNotifier({
data,
eventName,
container,
pluginOptions,
}: SubscriberArgs) {
return Promise.resolve()
}
export const config: SubscriberConfig = {
event: ["order.placed", "order.canceled", "order.completed"],
}

View File

@@ -1,20 +0,0 @@
import {
SubscriberArgs,
SubscriberConfig,
} from "../../../../../types/subscribers"
export default async function productUpdater({
data,
eventName,
container,
pluginOptions,
}: SubscriberArgs) {
return Promise.resolve()
}
export const config: SubscriberConfig = {
event: "product.updated",
context: {
subscriberId: "product-updater",
},
}

View File

@@ -1,17 +0,0 @@
import {
SubscriberArgs,
SubscriberConfig,
} from "../../../../../types/subscribers"
export default async function ({
data,
eventName,
container,
pluginOptions,
}: SubscriberArgs) {
return Promise.resolve()
}
export const config: SubscriberConfig = {
event: "variant.created",
}

View File

@@ -1,16 +0,0 @@
export const eventBusServiceMock = {
subscribe: jest.fn().mockImplementation((...args) => {
return Promise.resolve(args)
}),
}
export const containerMock = {
// mock .resolve method so if its called with "eventBusService" it returns the mock
resolve: jest.fn().mockImplementation((name: string) => {
if (name === "eventBusModuleService") {
return eventBusServiceMock
} else {
return {}
}
}),
}

View File

@@ -1,115 +0,0 @@
import { MedusaContainer } from "@medusajs/types"
import { join } from "path"
import { containerMock, eventBusServiceMock } from "../__mocks__"
import { SubscriberLoader } from "../index"
describe("SubscriberLoader", () => {
const rootDir = join(__dirname, "../__fixtures__", "subscribers")
const pluginOptions = {
important_data: {
enabled: true,
},
}
let registeredPaths: string[] = []
beforeAll(async () => {
jest.clearAllMocks()
const paths = await new SubscriberLoader(
rootDir,
containerMock as unknown as MedusaContainer,
pluginOptions,
"id-load-subscribers"
).load()
if (paths) {
registeredPaths = [...registeredPaths, ...paths]
}
})
it("should register each subscriber in the '/subscribers' folder", async () => {
// As '/subscribers' contains 3 subscribers, we expect the number of registered paths to be 3
expect(registeredPaths.length).toEqual(3)
})
it("should have registered subscribers for 5 events", async () => {
/**
* The 'product-updater.ts' subscriber is registered for the following events:
* - "product.created"
* The 'order-updater.ts' subscriber is registered for the following events:
* - "order.placed"
* - "order.canceled"
* - "order.completed"
* The 'variant-created.ts' subscriber is registered for the following events:
* - "variant.created"
*
* This means that we expect the eventBusServiceMock.subscribe method to have
* been called times, once for 'product-updater.ts', once for 'variant-created.ts',
* and 3 times for 'order-updater.ts'.
*/
expect(eventBusServiceMock.subscribe).toHaveBeenCalledTimes(5)
})
it("should have registered subscribers with the correct props", async () => {
/**
* The 'product-updater.ts' subscriber is registered
* with a explicit subscriberId of "product-updater".
*/
expect(eventBusServiceMock.subscribe).toHaveBeenCalledWith(
"product.updated",
expect.any(Function),
{
subscriberId: "product-updater",
}
)
/**
* The 'order-updater.ts' subscriber is registered
* without an explicit subscriberId, which means that
* the loader tries to infer one from either the handler
* functions name or the file name. In this case, the
* handler function is named 'orderUpdater' and is used
* to infer the subscriberId.
*/
expect(eventBusServiceMock.subscribe).toHaveBeenCalledWith(
"order.placed",
expect.any(Function),
{
subscriberId: "order-notifier",
}
)
expect(eventBusServiceMock.subscribe).toHaveBeenCalledWith(
"order.canceled",
expect.any(Function),
{
subscriberId: "order-notifier",
}
)
expect(eventBusServiceMock.subscribe).toHaveBeenCalledWith(
"order.completed",
expect.any(Function),
{
subscriberId: "order-notifier",
}
)
/**
* The 'variant-created.ts' subscriber is registered
* without an explicit subscriberId, and with an anonymous
* handler function. This means that the loader tries to
* infer the subscriberId from the file name, which in this
* case is 'variant-created.ts'.
*/
expect(eventBusServiceMock.subscribe).toHaveBeenCalledWith(
"variant.created",
expect.any(Function),
{
subscriberId: "variant-created",
}
)
})
})

View File

@@ -1,248 +0,0 @@
import { ModuleRegistrationName } from "@medusajs/modules-sdk"
import { MedusaContainer, Subscriber } from "@medusajs/types"
import { kebabCase } from "@medusajs/utils"
import { readdir } from "fs/promises"
import { extname, join, sep } from "path"
import { SubscriberArgs, SubscriberConfig } from "../../../types/subscribers"
import logger from "../../logger"
import { IEventBusModuleService } from "@medusajs/types"
type SubscriberHandler<T> = (args: SubscriberArgs<T>) => Promise<void>
type SubscriberModule<T> = {
config: SubscriberConfig
handler: SubscriberHandler<T>
}
export class SubscriberLoader {
protected container_: MedusaContainer
protected pluginOptions_: Record<string, unknown>
protected activityId_: string
protected rootDir_: string
protected excludes: RegExp[] = [
/\.DS_Store/,
/(\.ts\.map|\.js\.map|\.d\.ts)/,
/^_[^/\\]*(\.[^/\\]+)?$/,
]
protected subscriberDescriptors_: Map<string, SubscriberModule<any>> =
new Map()
constructor(
rootDir: string,
container: MedusaContainer,
options: Record<string, unknown> = {},
activityId: string
) {
this.rootDir_ = rootDir
this.pluginOptions_ = options
this.container_ = container
this.activityId_ = activityId
}
private validateSubscriber(
subscriber: any,
path: string
): subscriber is {
default: SubscriberHandler<unknown>
config: SubscriberConfig
} {
const handler = subscriber.default
if (!handler || typeof handler !== "function") {
/**
* If the handler is not a function, we can't use it
*/
logger.warn(`The subscriber in ${path} is not a function.`)
return false
}
const config = subscriber.config
if (!config) {
/**
* If the subscriber is missing a config, we can't use it
*/
logger.warn(`The subscriber in ${path} is missing a config.`)
return false
}
if (!config.event) {
/**
* If the subscriber is missing an event, we can't use it.
* In production we throw an error, else we log a warning
*/
if (process.env.NODE_ENV === "production") {
throw new Error(`The subscriber in ${path} is missing an event.`)
} else {
logger.warn(`The subscriber in ${path} is missing an event.`)
}
return false
}
if (
typeof config.event !== "string" &&
!Array.isArray(config.event) &&
!config.event.every((e: unknown) => typeof e === "string")
) {
/**
* If the subscribers event is not a string or an array of strings, we can't use it
*/
logger.warn(
`The subscriber in ${path} has an invalid event. The event must be a string or an array of strings.`
)
return false
}
return true
}
private async createDescriptor(absolutePath: string, entry: string) {
return await import(absolutePath).then((module_) => {
const isValid = this.validateSubscriber(module_, absolutePath)
if (!isValid) {
return
}
this.subscriberDescriptors_.set(absolutePath, {
config: module_.config,
handler: module_.default,
})
})
}
private async createMap(dirPath: string) {
await Promise.all(
await readdir(dirPath, { withFileTypes: true }).then(async (entries) => {
return entries
.filter((entry) => {
if (
this.excludes.length &&
this.excludes.some((exclude) => exclude.test(entry.name))
) {
return false
}
return true
})
.map(async (entry) => {
const fullPath = join(dirPath, entry.name)
if (entry.isDirectory()) {
return this.createMap(fullPath)
}
return await this.createDescriptor(fullPath, entry.name)
})
})
)
}
private inferIdentifier<T>(
fileName: string,
config: SubscriberConfig,
handler: SubscriberHandler<T>
) {
const { context } = config
/**
* If subscriberId is provided, use that
*/
if (context?.subscriberId) {
return context.subscriberId
}
const handlerName = handler.name
/**
* If the handler is not anonymous, use the name
*/
if (
handlerName &&
!(handlerName.startsWith("default") || handlerName.startsWith("_default"))
) {
return kebabCase(handlerName)
}
/**
* If the handler is anonymous, use the file name
*/
const idFromFile =
fileName.split(sep).pop()?.replace(extname(fileName), "") ?? ""
return kebabCase(idFromFile)
}
private createSubscriber<T>({
fileName,
config,
handler,
}: {
fileName: string
config: SubscriberConfig
handler: SubscriberHandler<T>
}) {
const eventBusService: IEventBusModuleService = this.container_.resolve(
ModuleRegistrationName.EVENT_BUS
)
const { event } = config
const events = Array.isArray(event) ? event : [event]
const subscriber = async (data: T, eventName: string) => {
return handler({
eventName,
data,
container: this.container_,
pluginOptions: this.pluginOptions_,
})
}
const subscriberId = this.inferIdentifier(fileName, config, handler)
for (const e of events) {
const obj = {
...(config.context ?? {}),
subscriberId,
}
eventBusService.subscribe(e, subscriber as Subscriber, obj)
}
}
async load() {
let hasSubscriberDir = false
try {
await readdir(this.rootDir_)
hasSubscriberDir = true
} catch (err) {
hasSubscriberDir = false
}
if (!hasSubscriberDir) {
return
}
await this.createMap(this.rootDir_)
const map = this.subscriberDescriptors_
for (const [fileName, { config, handler }] of map.entries()) {
this.createSubscriber({
fileName,
config,
handler,
})
}
/**
* Return the file paths of the registered subscribers, to prevent the
* backwards compatible loader from trying to register them.
*/
return [...map.keys()]
}
}

View File

@@ -13,11 +13,10 @@ import loadConfig from "./config"
import expressLoader from "./express"
import featureFlagsLoader from "./feature-flags"
import { registerProjectWorkflows } from "./helpers/register-workflows"
import medusaProjectApisLoader from "./load-medusa-project-apis"
import Logger from "./logger"
import loadMedusaApp from "./medusa-app"
import pgConnectionLoader from "./pg-connection"
// import subscribersLoader from "./subscribers"
import subscribersLoader from "./subscribers"
type Options = {
directory: string
@@ -57,7 +56,7 @@ async function loadEntrypoints(
await adminLoader({ app: expressApp, configModule })
// subscribersLoader({ container })
subscribersLoader({ container })
await apiLoader({
container,
@@ -110,14 +109,6 @@ export default async ({
featureFlagRouter
)
await medusaProjectApisLoader({
rootDirectory,
container,
app: expressApp,
configModule,
activityId: "medusa-project-apis",
})
await createDefaultsWorkflow(container).run()
const shutdown = async () => {

View File

@@ -1,164 +0,0 @@
import { promiseAll } from "@medusajs/utils"
import { Express } from "express"
import glob from "glob"
import { trackInstallation } from "medusa-telemetry"
import { EOL } from "os"
import path from "path"
import { Logger, MedusaContainer } from "../types/global"
import { getResolvedPlugins } from "./helpers/resolve-plugins"
import { RoutesLoader } from "./helpers/routing"
import { SubscriberLoader } from "./helpers/subscribers"
import logger from "./logger"
import { ConfigModule } from "@medusajs/types"
type Options = {
rootDirectory: string
container: MedusaContainer
configModule: ConfigModule
app: Express
activityId: string
}
type PluginDetails = {
resolve: string
name: string
id: string
options: Record<string, unknown>
version: string
}
export const MEDUSA_PROJECT_NAME = "project-plugin"
/**
* Registers all services in the services directory
*/
export default async ({
rootDirectory,
container,
app,
configModule,
activityId,
}: Options): Promise<void> => {
const resolved = getResolvedPlugins(rootDirectory, configModule) || []
const shouldStartAPI = configModule.projectConfig.worker_mode !== "worker"
await promiseAll(
resolved.map(async (pluginDetails) => {
if (shouldStartAPI) {
await registerApi(
pluginDetails,
app,
container,
configModule,
activityId
)
}
await registerSubscribers(pluginDetails, container, activityId)
})
)
await promiseAll(
resolved.map(async (pluginDetails) => runLoaders(pluginDetails, container))
)
if (configModule.projectConfig.redis_url) {
await Promise.all(
resolved.map(async (pluginDetails) => {
// await registerScheduledJobs(pluginDetails, container)
// TODO: Decide how scheduled jobs will be loaded and handled
})
)
} else {
logger.warn(
"You don't have Redis configured. Scheduled jobs will not be enabled."
)
}
resolved.forEach((plugin) => trackInstallation(plugin.name, "plugin"))
}
async function runLoaders(
pluginDetails: PluginDetails,
container: MedusaContainer
): Promise<void> {
const loaderFiles = glob.sync(
`${pluginDetails.resolve}/loaders/[!__]*.js`,
{}
)
await promiseAll(
loaderFiles.map(async (loader) => {
try {
const module = require(loader).default
if (typeof module === "function") {
await module(container, pluginDetails.options)
}
} catch (err) {
const logger = container.resolve<Logger>("logger")
logger.warn(`Running loader failed: ${err.message}`)
return Promise.resolve()
}
})
)
}
/**
* Registers the plugin's api routes.
*/
async function registerApi(
pluginDetails: PluginDetails,
app: Express,
container: MedusaContainer,
configmodule: ConfigModule,
activityId: string
): Promise<Express> {
const logger = container.resolve<Logger>("logger")
const projectName =
pluginDetails.name === MEDUSA_PROJECT_NAME
? "your Medusa project"
: `${pluginDetails.name}`
logger.progress(activityId, `Registering custom endpoints for ${projectName}`)
try {
/**
* Register the plugin's API routes using the file based routing.
*/
await new RoutesLoader({
app,
rootDir: path.join(pluginDetails.resolve, "api"),
activityId: activityId,
configModule: configmodule,
}).load()
} catch (err) {
logger.warn(
`An error occurred while registering API Routes in ${projectName}${
err.stack ? EOL + err.stack : ""
}`
)
}
return app
}
/**
* Registers a plugin's subscribers at the right location in our container.
* Subscribers are registered directly in the container.
* @param {object} pluginDetails - the plugin details including plugin options,
* version, id, resolved path, etc. See resolvePlugin
* @param {object} container - the container where the services will be
* registered
* @return {void}
*/
async function registerSubscribers(
pluginDetails: PluginDetails,
container: MedusaContainer,
activityId: string
): Promise<void> {
await new SubscriberLoader(
path.join(pluginDetails.resolve, "subscribers"),
container,
pluginDetails.options,
activityId
).load()
}

View File

@@ -2,21 +2,24 @@ import glob from "glob"
import path from "path"
import { asFunction } from "awilix"
import { MedusaContainer } from "../types/global"
import { MedusaError } from "@medusajs/utils"
/**
* Registers all subscribers in the subscribers directory
*/
export default ({ container }: { container: MedusaContainer }) => {
const isTest = process.env.NODE_ENV === "test"
const corePath = isTest
? "../subscribers/__mocks__/*.js"
: "../subscribers/*.js"
const corePath = "../subscribers/*.js"
const coreFull = path.join(__dirname, corePath)
const core = glob.sync(coreFull, { cwd: __dirname })
core.forEach((fn) => {
const loaded = require(fn).default
if (!loaded) {
throw new MedusaError(
MedusaError.Types.UNEXPECTED_STATE,
`Subscriber ${fn} does not have a default export`
)
}
container.build(asFunction((cradle) => new loaded(cradle)).singleton())
})

View File

@@ -0,0 +1,64 @@
import { IEventBusService, INotificationModuleService } from "@medusajs/types"
import { get } from "lodash"
type InjectedDependencies = {
notificationModuleService: INotificationModuleService
eventBusModuleService: IEventBusService
}
// TODO: The config should be loaded dynamically from medusa-config.js
// TODO: We can use a more powerful templating syntax to allow for eg. combining fields.
const config = [
{
event: "order.created",
template: "order-created-template",
channel: "email",
to: "order.email",
resource_id: "order.id",
data: {
order_id: "order.id",
},
},
]
class ConfigurableNotificationsSubscriber {
private readonly eventBusModuleService_: IEventBusService
private readonly notificationModuleService_: INotificationModuleService
constructor({
eventBusModuleService,
notificationModuleService,
}: InjectedDependencies) {
this.eventBusModuleService_ = eventBusModuleService
this.notificationModuleService_ = notificationModuleService
config.forEach((eventHandler) => {
this.eventBusModuleService_.subscribe(
eventHandler.event,
async (data: any) => {
const payload = data.data
const notificationData = {
template: eventHandler.template,
channel: eventHandler.channel,
to: get(payload, eventHandler.to),
trigger_type: eventHandler.event,
resource_id: get(payload, eventHandler.resource_id),
data: Object.entries(eventHandler.data).reduce(
(acc, [key, value]) => {
acc[key] = get(payload, value)
return acc
},
{}
),
}
await this.notificationModuleService_.create(notificationData)
return
}
)
})
}
}
export default ConfigurableNotificationsSubscriber

View File

@@ -14,18 +14,21 @@ type SerializedBuffer = {
type InjectedDependencies = {
paymentModuleService: IPaymentModuleService
eventBusService: IEventBusService
eventBusModuleService: IEventBusService
}
class PaymentWebhookSubscriber {
private readonly eventBusService_: IEventBusService
private readonly eventBusModuleService_: IEventBusService
private readonly paymentModuleService_: IPaymentModuleService
constructor({ eventBusService, paymentModuleService }: InjectedDependencies) {
this.eventBusService_ = eventBusService
constructor({
eventBusModuleService,
paymentModuleService,
}: InjectedDependencies) {
this.eventBusModuleService_ = eventBusModuleService
this.paymentModuleService_ = paymentModuleService
this.eventBusService_.subscribe(
this.eventBusModuleService_.subscribe(
PaymentWebhookEvents.WebhookReceived,
this.processEvent as Subscriber
)

View File

@@ -3,7 +3,7 @@ import { IEventBusModuleService, Logger } from "@medusajs/types"
export type InitializeModuleInjectableDependencies = {
logger?: Logger
eventBusService?: IEventBusModuleService
eventBusModuleService?: IEventBusModuleService
}
export type CreateApiKeyDTO = {

View File

@@ -2,5 +2,5 @@ import { IEventBusModuleService, Logger } from "@medusajs/types"
export type InitializeModuleInjectableDependencies = {
logger?: Logger
eventBusService?: IEventBusModuleService
eventBusModuleService?: IEventBusModuleService
}

View File

@@ -7,7 +7,7 @@ import {
export type InitializeModuleInjectableDependencies = {
logger?: Logger
eventBusService?: IEventBusModuleService
eventBusModuleService?: IEventBusModuleService
}
export const FulfillmentIdentifiersRegistrationName =

View File

@@ -14,5 +14,5 @@ export * from "./utils"
export type InitializeModuleInjectableDependencies = {
logger?: Logger
eventBusService?: IEventBusModuleService
eventBusModuleService?: IEventBusModuleService
}

View File

@@ -30,7 +30,7 @@ describe("Local notification provider", () => {
expect(logSpy).toHaveBeenCalled()
expect(logSpy).toHaveBeenCalledWith(
'Attempting to send a notification to: test@medusajs.com on the channel: email with template: some-template and data: {"username":"john-doe"}'
`Attempting to send a notification to: 'test@medusajs.com' on the channel: 'email' with template: 'some-template' and data: '{\"username\":\"john-doe\"}'`
)
})
})

View File

@@ -38,9 +38,9 @@ export class LocalNotificationService extends AbstractNotificationProviderServic
}
const message =
`Attempting to send a notification to: ${notification.to}` +
` on the channel: ${notification.channel} with template: ${notification.template}` +
` and data: ${JSON.stringify(notification.data)}`
`Attempting to send a notification to: '${notification.to}'` +
` on the channel: '${notification.channel}' with template: '${notification.template}'` +
` and data: '${JSON.stringify(notification.data)}'`
this.logger_.info(message)
return {}

View File

@@ -26,7 +26,7 @@ import { UpsertStockLocationInput } from "@medusajs/types"
import { promiseAll } from "@medusajs/utils"
type InjectedDependencies = {
eventBusService: IEventBusService
eventBusModuleService: IEventBusService
baseRepository: DAL.RepositoryService
stockLocationService: ModulesSdkTypes.InternalModuleService<any>
stockLocationAddressService: ModulesSdkTypes.InternalModuleService<any>
@@ -52,14 +52,14 @@ export default class StockLocationModuleService<
>(StockLocation, generateMethodForModels, entityNameToLinkableKeysMap)
implements IStockLocationServiceNext
{
protected readonly eventBusService_: IEventBusService
protected readonly eventBusModuleService_: IEventBusService
protected baseRepository_: DAL.RepositoryService
protected readonly stockLocationService_: ModulesSdkTypes.InternalModuleService<TEntity>
protected readonly stockLocationAddressService_: ModulesSdkTypes.InternalModuleService<TStockLocationAddress>
constructor(
{
eventBusService,
eventBusModuleService,
baseRepository,
stockLocationService,
stockLocationAddressService,
@@ -72,7 +72,7 @@ export default class StockLocationModuleService<
this.baseRepository_ = baseRepository
this.stockLocationService_ = stockLocationService
this.stockLocationAddressService_ = stockLocationAddressService
this.eventBusService_ = eventBusService
this.eventBusModuleService_ = eventBusModuleService
}
__joinerConfig(): ModuleJoinerConfig {

View File

@@ -3,7 +3,7 @@ import { IEventBusModuleService, Logger } from "@medusajs/types"
export type InitializeModuleInjectableDependencies = {
logger?: Logger
eventBusService?: IEventBusModuleService
eventBusModuleService?: IEventBusModuleService
}
export type UpdateStoreInput = StoreTypes.UpdateStoreDTO & { id: string }