feat(medusa): Alternative Subscriber API and new ScheduledJobs API (#5624)

* workking subscribers API

* progress

* update registrar args

* cleanup

* progress

* progress

* tests

* rename to loaders

* rm build artifacts

* improve validation and change jobs args to object

* spread context on subscribe

* add changeset

* address comments

* fix spelling of warning and add warning on Redis not enabled for Scheduled Jobs

* fix tests

---------

Co-authored-by: Oli Juhl <59018053+olivermrbl@users.noreply.github.com>
This commit is contained in:
Kasper Fabricius Kristensen
2023-11-16 20:36:02 +01:00
committed by GitHub
parent 4f0bea4909
commit 57573ed4d7
16 changed files with 805 additions and 17 deletions

View File

@@ -0,0 +1,5 @@
---
"@medusajs/medusa": patch
---
feat(medusa): Adds a new alternative syntax for creating Subscribers in Medusa, as well as adding a new API for creating scheduled jobs.

View File

@@ -1,14 +1,16 @@
export * from "./api"
export * from "./api/middlewares"
export * from "./interfaces"
export * from "./joiner-config"
export * from "./models"
export * from "./modules-config"
export * from "./services"
export * from "./types/batch-job"
export * from "./types/common"
export * from "./types/middlewares"
export * from "./types/routing"
export * from "./types/global"
export * from "./types/middlewares"
export * from "./types/price-list"
export * from "./types/routing"
export * from "./types/scheduled-jobs"
export * from "./types/subscribers"
export * from "./utils"
export * from "./joiner-config"
export * from "./modules-config"

View File

@@ -0,0 +1,11 @@
import { ScheduledJobArgs } from "../../../../../types/scheduled-jobs"
export default async function ({ container, pluginOptions }: ScheduledJobArgs) {
// noop
return {}
}
export const config = {
name: "every-hour",
schedule: "0 * * * *",
}

View File

@@ -0,0 +1,11 @@
import { ScheduledJobArgs } from "../../../../../types/scheduled-jobs"
export default async function ({ container, pluginOptions }: ScheduledJobArgs) {
// noop
return {}
}
export const config = {
name: "every-minute",
schedule: "* * * * *",
}

View File

@@ -0,0 +1,16 @@
export const jobSchedulerServiceMock = {
create: jest.fn().mockImplementation((...args) => {
return Promise.resolve(args)
}),
}
export const containerMock = {
// mock .resolve method so if its called with "jobSchedulerService" it returns the mock
resolve: jest.fn().mockImplementation((name: string) => {
if (name === "jobSchedulerService") {
return jobSchedulerServiceMock
} else {
return {}
}
}),
}

View File

@@ -0,0 +1,49 @@
import { MedusaContainer } from "@medusajs/types"
import { join } from "path"
import { containerMock, jobSchedulerServiceMock } from "../__mocks__"
import ScheduledJobsLoader from "../index"
describe("ScheduledJobsLoader", () => {
const rootDir = join(__dirname, "../__fixtures__", "jobs")
const pluginOptions = {
important_data: {
enabled: true,
},
}
beforeAll(async () => {
jest.clearAllMocks()
await new ScheduledJobsLoader(
rootDir,
containerMock as unknown as MedusaContainer,
pluginOptions
).load()
})
it("should register every job in '/jobs'", async () => {
// As '/jobs' contains 2 jobs, we expect the create method to be called twice
expect(jobSchedulerServiceMock.create).toHaveBeenCalledTimes(2)
})
it("should register every job with the correct props", async () => {
// Registering every-hour.ts
expect(jobSchedulerServiceMock.create).toHaveBeenCalledWith(
"every-hour",
undefined,
"0 * * * *",
expect.any(Function),
{ keepExisting: false }
)
// Registering every-minute.ts
expect(jobSchedulerServiceMock.create).toHaveBeenCalledWith(
"every-minute",
undefined,
"* * * * *",
expect.any(Function),
{ keepExisting: false }
)
})
})

View File

@@ -0,0 +1,174 @@
import { MedusaContainer } from "@medusajs/types"
import { readdir } from "fs/promises"
import { join } from "path"
import JobSchedulerService from "../../../services/job-scheduler"
import {
ScheduledJobArgs,
ScheduledJobConfig,
} from "../../../types/scheduled-jobs"
import logger from "../../logger"
type ScheduledJobHandler = (args: ScheduledJobArgs) => Promise<void>
type ScheduledJobModule = {
config: ScheduledJobConfig
handler: ScheduledJobHandler
}
export default class ScheduledJobsLoader {
protected container_: MedusaContainer
protected pluginOptions_: Record<string, unknown>
protected rootDir_: string
protected excludes: RegExp[] = [
/\.DS_Store/,
/(\.ts\.map|\.js\.map|\.d\.ts)/,
/^_[^/\\]*(\.[^/\\]+)?$/,
]
protected jobDescriptors_: Map<string, ScheduledJobModule> = new Map()
constructor(
rootDir: string,
container: MedusaContainer,
options: Record<string, unknown> = {}
) {
this.rootDir_ = rootDir
this.pluginOptions_ = options
this.container_ = container
}
private validateJob(
job: any,
path: string
): job is {
default: ScheduledJobHandler
config: ScheduledJobConfig
} {
const handler = job.default
if (!handler || typeof handler !== "function") {
logger.warn(`The job in ${path} is not a function.`)
return false
}
const config = job.config
if (!config) {
logger.warn(`The job in ${path} is missing a config.`)
return false
}
if (!config.schedule) {
logger.warn(`The job in ${path} is missing a schedule.`)
return false
}
if (!config.name) {
logger.warn(`The job in ${path} is missing a name.`)
return false
}
if (config.data && typeof config.data !== "object") {
logger.warn(`The job data in ${path} is not an object.`)
return false
}
return true
}
private async createDescriptor(absolutePath: string, entry: string) {
return await import(absolutePath).then((module_) => {
const isValid = this.validateJob(module_, absolutePath)
if (!isValid) {
return
}
this.jobDescriptors_.set(absolutePath, {
config: module_.config,
handler: module_.default,
})
})
}
private async createMap(dirPath: string) {
await Promise.all(
await readdir(dirPath, { withFileTypes: true }).then(async (entries) => {
return entries
.filter((entry) => {
if (
this.excludes.length &&
this.excludes.some((exclude) => exclude.test(entry.name))
) {
return false
}
return true
})
.map(async (entry) => {
const fullPath = join(dirPath, entry.name)
if (entry.isDirectory()) {
return this.createMap(fullPath)
}
return await this.createDescriptor(fullPath, entry.name)
})
})
)
}
private async createScheduledJobs() {
const jobs = Array.from(this.jobDescriptors_.values())
if (!jobs.length) {
return
}
const jobSchedulerService: JobSchedulerService = this.container_.resolve(
"jobSchedulerService"
)
for (const job of jobs) {
try {
const { name, data, schedule } = job.config
const handler = async () => {
await job.handler({
container: this.container_,
data,
pluginOptions: this.pluginOptions_,
})
}
await jobSchedulerService.create(name, data, schedule, handler, {
keepExisting: false, // For now, we do not support changing this flag
})
} catch (err) {
logger.error(
`An error occurred while registering job ${job.config.name}`,
err
)
}
}
}
async load(): Promise<void> {
let hasJobsDir = false
try {
await readdir(this.rootDir_)
hasJobsDir = true
} catch (_err) {
hasJobsDir = false
}
if (!hasJobsDir) {
return
}
await this.createMap(this.rootDir_)
await this.createScheduledJobs()
}
}

View File

@@ -0,0 +1,22 @@
import { OrderService } from "../../../../../services"
import {
SubscriberArgs,
SubscriberConfig,
} from "../../../../../types/subscribers"
export default async function orderNotifier({
data,
eventName,
container,
pluginOptions,
}: SubscriberArgs) {
return Promise.resolve()
}
export const config: SubscriberConfig = {
event: [
OrderService.Events.PLACED,
OrderService.Events.CANCELED,
OrderService.Events.COMPLETED,
],
}

View File

@@ -0,0 +1,21 @@
import { ProductService } from "../../../../../services"
import {
SubscriberArgs,
SubscriberConfig,
} from "../../../../../types/subscribers"
export default async function productUpdater({
data,
eventName,
container,
pluginOptions,
}: SubscriberArgs) {
return Promise.resolve()
}
export const config: SubscriberConfig = {
event: ProductService.Events.UPDATED,
context: {
subscriberId: "product-updater",
},
}

View File

@@ -0,0 +1,18 @@
import { ProductVariantService } from "../../../../../services"
import {
SubscriberArgs,
SubscriberConfig,
} from "../../../../../types/subscribers"
export default async function ({
data,
eventName,
container,
pluginOptions,
}: SubscriberArgs) {
return Promise.resolve()
}
export const config: SubscriberConfig = {
event: ProductVariantService.Events.CREATED,
}

View File

@@ -0,0 +1,16 @@
export const eventBusServiceMock = {
subscribe: jest.fn().mockImplementation((...args) => {
return Promise.resolve(args)
}),
}
export const containerMock = {
// mock .resolve method so if its called with "eventBusService" it returns the mock
resolve: jest.fn().mockImplementation((name: string) => {
if (name === "eventBusService") {
return eventBusServiceMock
} else {
return {}
}
}),
}

View File

@@ -0,0 +1,120 @@
import { MedusaContainer } from "@medusajs/types"
import { join } from "path"
import {
OrderService,
ProductService,
ProductVariantService,
} from "../../../../services"
import { containerMock, eventBusServiceMock } from "../__mocks__"
import { SubscriberLoader } from "../index"
describe("SubscriberLoader", () => {
const rootDir = join(__dirname, "../__fixtures__", "subscribers")
const pluginOptions = {
important_data: {
enabled: true,
},
}
let registeredPaths: string[] = []
beforeAll(async () => {
jest.clearAllMocks()
const paths = await new SubscriberLoader(
rootDir,
containerMock as unknown as MedusaContainer,
pluginOptions,
"id-load-subscribers"
).load()
if (paths) {
registeredPaths = [...registeredPaths, ...paths]
}
})
it("should register each subscriber in the '/subscribers' folder", async () => {
// As '/subscribers' contains 3 subscribers, we expect the number of registered paths to be 3
expect(registeredPaths.length).toEqual(3)
})
it("should have registered subscribers for 5 events", async () => {
/**
* The 'product-updater.ts' subscriber is registered for the following events:
* - "product.created"
* The 'order-updater.ts' subscriber is registered for the following events:
* - "order.placed"
* - "order.canceled"
* - "order.completed"
* The 'variant-created.ts' subscriber is registered for the following events:
* - "variant.created"
*
* This means that we expect the eventBusServiceMock.subscribe method to have
* been called times, once for 'product-updater.ts', once for 'variant-created.ts',
* and 3 times for 'order-updater.ts'.
*/
expect(eventBusServiceMock.subscribe).toHaveBeenCalledTimes(5)
})
it("should have registered subscribers with the correct props", async () => {
/**
* The 'product-updater.ts' subscriber is registered
* with a explicit subscriberId of "product-updater".
*/
expect(eventBusServiceMock.subscribe).toHaveBeenCalledWith(
ProductService.Events.UPDATED,
expect.any(Function),
{
subscriberId: "product-updater",
}
)
/**
* The 'order-updater.ts' subscriber is registered
* without an explicit subscriberId, which means that
* the loader tries to infer one from either the handler
* functions name or the file name. In this case, the
* handler function is named 'orderUpdater' and is used
* to infer the subscriberId.
*/
expect(eventBusServiceMock.subscribe).toHaveBeenCalledWith(
OrderService.Events.PLACED,
expect.any(Function),
{
subscriberId: "order-notifier",
}
)
expect(eventBusServiceMock.subscribe).toHaveBeenCalledWith(
OrderService.Events.CANCELED,
expect.any(Function),
{
subscriberId: "order-notifier",
}
)
expect(eventBusServiceMock.subscribe).toHaveBeenCalledWith(
OrderService.Events.COMPLETED,
expect.any(Function),
{
subscriberId: "order-notifier",
}
)
/**
* The 'variant-created.ts' subscriber is registered
* without an explicit subscriberId, and with an anonymous
* handler function. This means that the loader tries to
* infer the subscriberId from the file name, which in this
* case is 'variant-created.ts'.
*/
expect(eventBusServiceMock.subscribe).toHaveBeenCalledWith(
ProductVariantService.Events.CREATED,
expect.any(Function),
{
subscriberId: "variant-created",
}
)
})
})

View File

@@ -0,0 +1,242 @@
import { MedusaContainer, Subscriber } from "@medusajs/types"
import { kebabCase } from "@medusajs/utils"
import { readdir } from "fs/promises"
import { extname, join, sep } from "path"
import { EventBusService } from "../../../services"
import { SubscriberArgs, SubscriberConfig } from "../../../types/subscribers"
import logger from "../../logger"
type SubscriberHandler<T> = (args: SubscriberArgs<T>) => Promise<void>
type SubscriberModule<T> = {
config: SubscriberConfig
handler: SubscriberHandler<T>
}
export class SubscriberLoader {
protected container_: MedusaContainer
protected pluginOptions_: Record<string, unknown>
protected activityId_: string
protected rootDir_: string
protected excludes: RegExp[] = [
/\.DS_Store/,
/(\.ts\.map|\.js\.map|\.d\.ts)/,
/^_[^/\\]*(\.[^/\\]+)?$/,
]
protected subscriberDescriptors_: Map<string, SubscriberModule<any>> =
new Map()
constructor(
rootDir: string,
container: MedusaContainer,
options: Record<string, unknown> = {},
activityId: string
) {
this.rootDir_ = rootDir
this.pluginOptions_ = options
this.container_ = container
this.activityId_ = activityId
}
private validateSubscriber(
subscriber: any,
path: string
): subscriber is {
default: SubscriberHandler<unknown>
config: SubscriberConfig
} {
const handler = subscriber.default
if (!handler || typeof handler !== "function") {
/**
* If the handler is not a function, we can't use it
*/
logger.warn(`The subscriber in ${path} is not a function.`)
return false
}
const config = subscriber.config
if (!config) {
/**
* If the subscriber is missing a config, we can't use it
*/
logger.warn(`The subscriber in ${path} is missing a config.`)
return false
}
if (!config.event) {
/**
* If the subscriber is missing an event, we can't use it.
* In production we throw an error, else we log a warning
*/
if (process.env.NODE_ENV === "production") {
throw new Error(`The subscriber in ${path} is missing an event.`)
} else {
logger.warn(`The subscriber in ${path} is missing an event.`)
}
return false
}
if (
typeof config.event !== "string" &&
!Array.isArray(config.event) &&
!config.event.every((e: unknown) => typeof e === "string")
) {
/**
* If the subscribers event is not a string or an array of strings, we can't use it
*/
logger.warn(
`The subscriber in ${path} has an invalid event. The event must be a string or an array of strings.`
)
return false
}
return true
}
private async createDescriptor(absolutePath: string, entry: string) {
return await import(absolutePath).then((module_) => {
const isValid = this.validateSubscriber(module_, absolutePath)
if (!isValid) {
return
}
this.subscriberDescriptors_.set(absolutePath, {
config: module_.config,
handler: module_.default,
})
})
}
private async createMap(dirPath: string) {
await Promise.all(
await readdir(dirPath, { withFileTypes: true }).then(async (entries) => {
return entries
.filter((entry) => {
if (
this.excludes.length &&
this.excludes.some((exclude) => exclude.test(entry.name))
) {
return false
}
return true
})
.map(async (entry) => {
const fullPath = join(dirPath, entry.name)
if (entry.isDirectory()) {
return this.createMap(fullPath)
}
return await this.createDescriptor(fullPath, entry.name)
})
})
)
}
private inferIdentifier<T>(
fileName: string,
config: SubscriberConfig,
handler: SubscriberHandler<T>
) {
const { context } = config
/**
* If subscriberId is provided, use that
*/
if (context?.subscriberId) {
return context.subscriberId
}
const handlerName = handler.name
/**
* If the handler is not anonymous, use the name
*/
if (handlerName && !handlerName.startsWith("default")) {
return kebabCase(handlerName)
}
/**
* If the handler is anonymous, use the file name
*/
const idFromFile =
fileName.split(sep).pop()?.replace(extname(fileName), "") ?? ""
return kebabCase(idFromFile)
}
private createSubscriber<T>({
fileName,
config,
handler,
}: {
fileName: string
config: SubscriberConfig
handler: SubscriberHandler<T>
}) {
const eventBusService: EventBusService =
this.container_.resolve("eventBusService")
const { event } = config
const events = Array.isArray(event) ? event : [event]
const subscriber: Subscriber<T> = async (data: T, eventName: string) => {
return handler({
eventName,
data,
container: this.container_,
pluginOptions: this.pluginOptions_,
})
}
const subscriberId = this.inferIdentifier(fileName, config, handler)
for (const e of events) {
eventBusService.subscribe(e, subscriber as Subscriber<unknown>, {
...(config.context ?? {}),
subscriberId,
})
}
}
async load() {
let hasSubscriberDir = false
try {
await readdir(this.rootDir_)
hasSubscriberDir = true
} catch (err) {
hasSubscriberDir = false
}
if (!hasSubscriberDir) {
return
}
await this.createMap(this.rootDir_)
const map = this.subscriberDescriptors_
for (const [fileName, { config, handler }] of map.entries()) {
this.createSubscriber({
fileName,
config,
handler,
})
}
/**
* Return the file paths of the registered subscribers, to prevent the
* backwards compatible loader from trying to register them.
*/
return [...map.keys()]
}
}

View File

@@ -32,6 +32,7 @@ import {
formatRegistrationNameWithoutNamespace,
} from "../utils/format-registration-name"
import { getModelExtensionsMap } from "./helpers/get-model-extension-map"
import ScheduledJobsLoader from "./helpers/jobs"
import {
registerAbstractFulfillmentServiceFromClass,
registerFulfillmentServiceFromClass,
@@ -39,6 +40,7 @@ import {
registerPaymentServiceFromClass,
} from "./helpers/plugins"
import { RoutesLoader } from "./helpers/routing"
import { SubscriberLoader } from "./helpers/subscribers"
import logger from "./logger"
type Options = {
@@ -93,7 +95,7 @@ export default async ({
activityId
)
registerCoreRouters(pluginDetails, container)
registerSubscribers(pluginDetails, container)
await registerSubscribers(pluginDetails, container, activityId)
})
)
@@ -101,6 +103,18 @@ export default async ({
resolved.map(async (pluginDetails) => runLoaders(pluginDetails, container))
)
if (configModule.projectConfig.redis_url) {
await Promise.all(
resolved.map(async (pluginDetails) => {
await registerScheduledJobs(pluginDetails, container)
})
)
} else {
logger.warn(
"You don't have Redis configured. Scheduled jobs will not be enabled."
)
}
resolved.forEach((plugin) => trackInstallation(plugin.name, "plugin"))
}
@@ -183,6 +197,17 @@ async function runLoaders(
)
}
async function registerScheduledJobs(
pluginDetails: PluginDetails,
container: MedusaContainer
): Promise<void> {
await new ScheduledJobsLoader(
path.join(pluginDetails.resolve, "jobs"),
container,
pluginDetails.options
).load()
}
async function registerMedusaApi(
pluginDetails: PluginDetails,
container: MedusaContainer
@@ -548,20 +573,37 @@ export async function registerServices(
* registered
* @return {void}
*/
function registerSubscribers(
async function registerSubscribers(
pluginDetails: PluginDetails,
container: MedusaContainer
): void {
const files = glob.sync(`${pluginDetails.resolve}/subscribers/*.js`, {})
files.forEach((fn) => {
const loaded = require(fn).default
container: MedusaContainer,
activityId: string
): Promise<void> {
const exclude: string[] = []
container.build(
asFunction(
(cradle) => new loaded(cradle, pluginDetails.options)
).singleton()
)
})
const loadedFiles = await new SubscriberLoader(
path.join(pluginDetails.resolve, "subscribers"),
container,
pluginDetails.options,
activityId
).load()
/**
* Exclude any files that have already been loaded by the subscriber loader
*/
exclude.push(...(loadedFiles ?? []))
const files = glob.sync(`${pluginDetails.resolve}/subscribers/*.js`, {})
files
.filter((file) => !exclude.includes(file))
.forEach((fn) => {
const loaded = require(fn).default
container.build(
asFunction(
(cradle) => new loaded(cradle, pluginDetails.options)
).singleton()
)
})
}
/**

View File

@@ -0,0 +1,22 @@
import { MedusaContainer } from "@medusajs/types"
export type ScheduledJobConfig<T = unknown> = {
/**
* The name of the job
*/
name: string
/**
* The cron schedule of the job, e.g. `0 0 * * *` for running every day at midnight.
*/
schedule: string
/**
* An optional data object to pass to the job handler
*/
data?: T
}
export type ScheduledJobArgs<T = unknown> = {
container: MedusaContainer
data?: T
pluginOptions?: Record<string, unknown>
}

View File

@@ -0,0 +1,17 @@
import { MedusaContainer } from "@medusajs/types"
interface SubscriberContext extends Record<string, unknown> {
subscriberId?: string
}
export type SubscriberConfig = {
event: string | string[]
context?: SubscriberContext
}
export type SubscriberArgs<T = unknown> = {
data: T
eventName: string
container: MedusaContainer
pluginOptions: Record<string, unknown>
}