diff --git a/packages/medusa/src/jobs/.gitkeep b/packages/medusa/src/jobs/.gitkeep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/packages/medusa/src/loaders/__fixtures__/mock-scheduler-storage.ts b/packages/medusa/src/loaders/__fixtures__/mock-scheduler-storage.ts new file mode 100644 index 0000000000..772da63aa3 --- /dev/null +++ b/packages/medusa/src/loaders/__fixtures__/mock-scheduler-storage.ts @@ -0,0 +1,21 @@ +import { + IDistributedSchedulerStorage, + SchedulerOptions, +} from "@medusajs/orchestration" + +export class MockSchedulerStorage implements IDistributedSchedulerStorage { + async schedule( + jobDefinition: string | { jobId: string }, + schedulerOptions: SchedulerOptions + ): Promise { + return Promise.resolve() + } + + async remove(jobId: string): Promise { + return Promise.resolve() + } + + async removeAll(): Promise { + return Promise.resolve() + } +} diff --git a/packages/medusa/src/loaders/__fixtures__/plugin/jobs/order-summary.ts b/packages/medusa/src/loaders/__fixtures__/plugin/jobs/order-summary.ts new file mode 100644 index 0000000000..b74b773413 --- /dev/null +++ b/packages/medusa/src/loaders/__fixtures__/plugin/jobs/order-summary.ts @@ -0,0 +1,11 @@ +import { MedusaContainer } from "@medusajs/types" + +export default async function handler(container: MedusaContainer) { + console.log(`You have received 5 orders today`) +} + +export const config = { + name: "summarize-orders-job", + schedule: "* * * * * *", + numberOfExecutions: 2, +} diff --git a/packages/medusa/src/loaders/__tests__/register-jobs.spec.ts b/packages/medusa/src/loaders/__tests__/register-jobs.spec.ts new file mode 100644 index 0000000000..9a6443a668 --- /dev/null +++ b/packages/medusa/src/loaders/__tests__/register-jobs.spec.ts @@ -0,0 +1,19 @@ +import path from "path" +import { registerJobs } from "../helpers/register-jobs" +import { WorkflowManager, WorkflowScheduler } from "@medusajs/orchestration" +import { MockSchedulerStorage } from "../__fixtures__/mock-scheduler-storage" + +describe("register jobs", () => { + WorkflowScheduler.setStorage(new MockSchedulerStorage()) + it("registers jobs from plugins", async () => { + await registerJobs([ + { resolve: path.join(__dirname, "../__fixtures__/plugin") }, + ]) + const workflow = WorkflowManager.getWorkflow("summarize-orders-job") + expect(workflow).toBeDefined() + expect(workflow?.options.schedule).toEqual({ + cron: "* * * * * *", + numberOfExecutions: 2, + }) + }) +}) diff --git a/packages/medusa/src/loaders/helpers/register-jobs.ts b/packages/medusa/src/loaders/helpers/register-jobs.ts new file mode 100644 index 0000000000..4ec1c37e82 --- /dev/null +++ b/packages/medusa/src/loaders/helpers/register-jobs.ts @@ -0,0 +1,92 @@ +import { + StepResponse, + createStep, + createWorkflow, +} from "@medusajs/workflows-sdk" +import { glob } from "glob" +import logger from "../logger" +import { MedusaError } from "@medusajs/utils" + +export const registerJobs = async (plugins) => { + await Promise.all( + plugins.map(async (pluginDetails) => { + const files = glob.sync( + `${pluginDetails.resolve}/jobs/*.{ts,js,mjs,mts}`, + { + ignore: ["**/*.d.ts", "**/*.map"], + } + ) + + logger.debug( + `Registering ${files.length} jobs from ${pluginDetails.resolve}` + ) + + const jobs = await Promise.all( + files.map(async (file) => { + const module_ = await import(file) + const input = { + config: module_.config, + handler: module_.default, + } + + validateConfig(input.config) + return input + }) + ) + + const res = await Promise.all(jobs.map(createJob)) + + logger.debug( + `Registered ${res.length} jobs from ${pluginDetails.resolve}` + ) + return res + }) + ) +} + +const createJob = async ({ config, handler }) => { + const step = createStep( + `${config.name}-as-step`, + async (stepInput, stepContext) => { + const { container } = stepContext + const res = await handler(container) + return new StepResponse(res, res) + } + ) + + createWorkflow( + { + name: config.name, + schedule: { + cron: config.schedule, + numberOfExecutions: config.numberOfExecutions, + }, + }, + () => { + return step() + } + ) +} + +const validateConfig = (config) => { + if (!config) { + throw new MedusaError( + MedusaError.Types.INVALID_ARGUMENT, + "Config is required for scheduled" + ) + } + + if (!config.schedule) { + throw new MedusaError( + MedusaError.Types.INVALID_ARGUMENT, + "Cron schedule definition is required for scheduled jobs" + ) + } + + if (!config.name) { + throw new MedusaError( + MedusaError.Types.INVALID_ARGUMENT, + "Job name is required for scheduled jobs" + ) + } +} diff --git a/packages/medusa/src/loaders/index.ts b/packages/medusa/src/loaders/index.ts index 841cb43f22..681db192ca 100644 --- a/packages/medusa/src/loaders/index.ts +++ b/packages/medusa/src/loaders/index.ts @@ -22,6 +22,7 @@ import { SubscriberLoader } from "./helpers/subscribers" import Logger from "./logger" import loadMedusaApp from "./medusa-app" import registerPgConnection from "./pg-connection" +import { registerJobs } from "./helpers/register-jobs" type Options = { directory: string @@ -57,6 +58,14 @@ async function subscribersLoader( ) } +async function jobsLoader(plugins: PluginDetails[]) { + /** + * Load jobs from the medusa/medusa package. Remove once the medusa core is converted to a plugin + */ + await registerJobs([{ resolve: path.join(__dirname, "../") }]) + await registerJobs(plugins) +} + async function loadEntrypoints( plugins: PluginDetails[], container: MedusaContainer, @@ -89,6 +98,7 @@ async function loadEntrypoints( await adminLoader({ app: expressApp, configModule, rootDirectory }) await subscribersLoader(plugins, container) + await jobsLoader(plugins) await apiLoader({ container, plugins, @@ -128,9 +138,9 @@ export default async ({ ) const plugins = getResolvedPlugins(rootDirectory, configModule, true) || [] + const pluginLinks = await resolvePluginsLinks(plugins, container) await registerWorkflows(plugins) - const pluginLinks = await resolvePluginsLinks(plugins, container) const { onApplicationShutdown, onApplicationPrepareShutdown } = await loadMedusaApp({ container,