diff --git a/packages/framework/framework/package.json b/packages/framework/framework/package.json index 0d95160d3c..42b4107491 100644 --- a/packages/framework/framework/package.json +++ b/packages/framework/framework/package.json @@ -12,7 +12,8 @@ "./config": "./dist/config/index.js", "./logger": "./dist/logger/index.js", "./database": "./dist/database/index.js", - "./subscribers": "./dist/subscribers/index.js" + "./subscribers": "./dist/subscribers/index.js", + "./jobs": "./dist/jobs/index.js" }, "engines": { "node": ">=20" @@ -30,8 +31,8 @@ "scripts": { "watch": "tsc --watch -p ./tsconfig.build.json", "watch:test": "tsc --build tsconfig.spec.json --watch", - "prepublishOnly": "cross-env NODE_ENV=production tsc -p ./tsconfig.build.json && tsc-alias -p ./tsconfig.build.json", - "build": "rimraf dist && tsc --build && tsc-alias", + "prepublishOnly": "tsc -p ./tsconfig.build.json && tsc-alias -p ./tsconfig.build.json", + "build": "rimraf dist && tsc --noEmit && yarn prepublishOnly", "test": "jest --runInBand --bail --passWithNoTests --forceExit -- src", "test:integration": "jest --forceExit -- integration-tests/**/__tests__/**/*.ts" }, @@ -47,6 +48,7 @@ "dependencies": { "@medusajs/medusa-cli": "^1.3.22", "@medusajs/utils": "^1.11.9", + "@medusajs/workflows-sdk": "^0.1.6", "awilix": "^8.0.0", "cookie-parser": "^1.4.6", "express": "^4.18.2", diff --git a/packages/framework/framework/src/index.ts b/packages/framework/framework/src/index.ts index 804b202b82..d9abdef176 100644 --- a/packages/framework/framework/src/index.ts +++ b/packages/framework/framework/src/index.ts @@ -4,4 +4,5 @@ export * from "./http" export * from "./database" export * from "./container" export * from "./subscribers" +export * from "./jobs" export * from "./feature-flags" diff --git a/packages/medusa/src/loaders/__fixtures__/mock-scheduler-storage.ts b/packages/framework/framework/src/jobs/__fixtures__/mock-scheduler-storage.ts similarity index 100% rename from packages/medusa/src/loaders/__fixtures__/mock-scheduler-storage.ts rename to packages/framework/framework/src/jobs/__fixtures__/mock-scheduler-storage.ts diff --git a/packages/medusa/src/loaders/__fixtures__/plugin/jobs/order-summary.ts b/packages/framework/framework/src/jobs/__fixtures__/plugin/jobs/order-summary.ts similarity index 100% rename from packages/medusa/src/loaders/__fixtures__/plugin/jobs/order-summary.ts rename to packages/framework/framework/src/jobs/__fixtures__/plugin/jobs/order-summary.ts diff --git a/packages/medusa/src/loaders/__tests__/register-jobs.spec.ts b/packages/framework/framework/src/jobs/__tests__/register-jobs.spec.ts similarity index 69% rename from packages/medusa/src/loaders/__tests__/register-jobs.spec.ts rename to packages/framework/framework/src/jobs/__tests__/register-jobs.spec.ts index ce4539394c..5dbc885fac 100644 --- a/packages/medusa/src/loaders/__tests__/register-jobs.spec.ts +++ b/packages/framework/framework/src/jobs/__tests__/register-jobs.spec.ts @@ -1,14 +1,19 @@ -import path from "path" -import { registerJobs } from "../helpers/register-jobs" +import { join } from "path" import { WorkflowManager, WorkflowScheduler } from "@medusajs/orchestration" import { MockSchedulerStorage } from "../__fixtures__/mock-scheduler-storage" +import { JobLoader } from "../job-loader" describe("register jobs", () => { WorkflowScheduler.setStorage(new MockSchedulerStorage()) + + let jobLoader!: JobLoader + + beforeAll(() => { + jobLoader = new JobLoader(join(__dirname, "../__fixtures__/plugin/jobs")) + }) + it("registers jobs from plugins", async () => { - await registerJobs([ - { resolve: path.join(__dirname, "../__fixtures__/plugin") }, - ]) + await jobLoader.load() const workflow = WorkflowManager.getWorkflow("job-summarize-orders") expect(workflow).toBeDefined() expect(workflow?.options.schedule).toEqual({ diff --git a/packages/framework/framework/src/jobs/index.ts b/packages/framework/framework/src/jobs/index.ts new file mode 100644 index 0000000000..6cb8e40720 --- /dev/null +++ b/packages/framework/framework/src/jobs/index.ts @@ -0,0 +1 @@ +export * from './job-loader' \ No newline at end of file diff --git a/packages/framework/framework/src/jobs/job-loader.ts b/packages/framework/framework/src/jobs/job-loader.ts new file mode 100644 index 0000000000..531c0bc59b --- /dev/null +++ b/packages/framework/framework/src/jobs/job-loader.ts @@ -0,0 +1,180 @@ +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/workflows-sdk" +import { isObject, MedusaError, promiseAll } from "@medusajs/utils" +import { SchedulerOptions } from "@medusajs/orchestration" +import { MedusaContainer } from "@medusajs/types" +import { logger } from "../logger" +import { access, readdir } from "fs/promises" +import { join } from "path" + +type CronJobConfig = { + name: string + schedule: string + numberOfExecutions?: SchedulerOptions["numberOfExecutions"] +} + +type CronJobHandler = (container: MedusaContainer) => Promise + +export class JobLoader { + /** + * The directory from which to load the jobs + * @private + */ + #sourceDir: string | string[] + + /** + * The list of file names to exclude from the subscriber scan + * @private + */ + #excludes: RegExp[] = [ + /index\.js/, + /index\.ts/, + /\.DS_Store/, + /(\.ts\.map|\.js\.map|\.d\.ts|\.md)/, + /^_[^/\\]*(\.[^/\\]+)?$/, + ] + + constructor(sourceDir: string | string[]) { + this.#sourceDir = sourceDir + } + + /** + * Validate cron job configuration + * @param config + * @protected + */ + protected validateConfig(config: { + schedule: string | SchedulerOptions + name: string + }) { + if (!config) { + throw new MedusaError( + MedusaError.Types.INVALID_ARGUMENT, + "Config is required for scheduled jobs." + ) + } + + if (!config.schedule) { + throw new MedusaError( + MedusaError.Types.INVALID_ARGUMENT, + "Cron schedule definition is required for scheduled jobs." + ) + } + + if (!config.name) { + throw new MedusaError( + MedusaError.Types.INVALID_ARGUMENT, + "Job name is required for scheduled jobs." + ) + } + } + + /** + * Create a workflow to register a new cron job + * @param config + * @param handler + * @protected + */ + protected registerJob({ + config, + handler, + }: { + config: CronJobConfig + handler: CronJobHandler + }) { + const workflowName = `job-${config.name}` + const step = createStep( + `${config.name}-as-step`, + async (_, stepContext) => { + const { container } = stepContext + try { + const res = await handler(container) + return new StepResponse(res, res) + } catch (error) { + logger.error( + `Scheduled job ${config.name} failed with error: ${error.message}` + ) + throw error + } + } + ) + + const workflowConfig = { + name: workflowName, + schedule: isObject(config.schedule) + ? config.schedule + : { + cron: config.schedule, + numberOfExecutions: config.numberOfExecutions, + }, + } + + createWorkflow(workflowConfig, () => { + step() + }) + } + + /** + * Load cron jobs from one or multiple source paths + */ + async load() { + const normalizedSourcePath = Array.isArray(this.#sourceDir) + ? this.#sourceDir + : [this.#sourceDir] + + const promises = normalizedSourcePath.map(async (sourcePath) => { + try { + await access(sourcePath) + } catch { + return + } + + return await readdir(sourcePath, { + recursive: true, + withFileTypes: true, + }).then(async (entries) => { + const fileEntries = entries.filter((entry) => { + return ( + !entry.isDirectory() && + !this.#excludes.some((exclude) => exclude.test(entry.name)) + ) + }) + + logger.debug(`Registering jobs from ${sourcePath}.`) + + return await promiseAll( + fileEntries.map(async (entry) => { + const fullPath = join(entry.path, entry.name) + + const module_ = await import(fullPath) + + const input = { + config: module_.config, + handler: module_.default, + } + + this.validateConfig(input.config) + return input + }) + ) + }) + }) + + const jobsInputs = await promiseAll(promises) + const flatJobsInput = jobsInputs.flat(1).filter( + ( + job + ): job is { + config: CronJobConfig + handler: CronJobHandler + } => !!job + ) + + flatJobsInput.map(this.registerJob) + + logger.debug(`Job registered.`) + } +} diff --git a/packages/framework/framework/src/subscribers/subscriber-loader.ts b/packages/framework/framework/src/subscribers/subscriber-loader.ts index 0f84b966ea..be895c26b6 100644 --- a/packages/framework/framework/src/subscribers/subscriber-loader.ts +++ b/packages/framework/framework/src/subscribers/subscriber-loader.ts @@ -130,10 +130,7 @@ export class SubscriberLoader { withFileTypes: true, }).then(async (entries) => { return entries.flatMap(async (entry) => { - if ( - this.#excludes.length && - this.#excludes.some((exclude) => exclude.test(entry.name)) - ) { + if (this.#excludes.some((exclude) => exclude.test(entry.name))) { return } diff --git a/packages/framework/framework/tsconfig.build.json b/packages/framework/framework/tsconfig.build.json index 5c0452cf63..5ea8eebe6d 100644 --- a/packages/framework/framework/tsconfig.build.json +++ b/packages/framework/framework/tsconfig.build.json @@ -3,9 +3,9 @@ "include": ["src"], "exclude": [ "dist", - "./src/**/__tests__", - "./src/**/__mocks__", - "./src/**/__fixtures__", + "src/**/__tests__", + "src/**/__mocks__", + "src/**/__fixtures__", "node_modules" ], } diff --git a/packages/framework/framework/tsconfig.json b/packages/framework/framework/tsconfig.json index 373e623fc7..93695bb174 100644 --- a/packages/framework/framework/tsconfig.json +++ b/packages/framework/framework/tsconfig.json @@ -23,7 +23,6 @@ "paths": { }, }, - "include": ["src"], "exclude": [ "dist", diff --git a/packages/medusa/src/loaders/helpers/register-jobs.ts b/packages/medusa/src/loaders/helpers/register-jobs.ts deleted file mode 100644 index 66d086e9c9..0000000000 --- a/packages/medusa/src/loaders/helpers/register-jobs.ts +++ /dev/null @@ -1,101 +0,0 @@ -import { - createStep, - createWorkflow, - StepResponse, -} from "@medusajs/workflows-sdk" -import { glob } from "glob" -import { logger } from "@medusajs/framework" -import { ContainerRegistrationKeys, MedusaError } from "@medusajs/utils" - -export const registerJobs = async (plugins) => { - await Promise.all( - plugins.map(async (pluginDetails) => { - const files = glob.sync( - `${pluginDetails.resolve}/jobs/*.{ts,js,mjs,mts}`, - { - ignore: ["**/*.d.ts", "**/*.map"], - } - ) - - logger.debug( - `Registering ${files.length} jobs from ${pluginDetails.resolve}` - ) - - const jobs = await Promise.all( - files.map(async (file) => { - const module_ = await import(file) - const input = { - config: module_.config, - handler: module_.default, - } - - validateConfig(input.config) - return input - }) - ) - - const res = await Promise.all(jobs.map(createJob)) - - logger.debug( - `Registered ${res.length} jobs from ${pluginDetails.resolve}` - ) - return res - }) - ) -} - -const createJob = async ({ config, handler }) => { - const workflowName = `job-${config.name}` - const step = createStep( - `${config.name}-as-step`, - async (stepInput, stepContext) => { - const { container } = stepContext - const logger = container.resolve(ContainerRegistrationKeys.LOGGER) - try { - const res = await handler(container) - return new StepResponse(res, res) - } catch (error) { - logger.error( - `Scheduled job ${config.name} failed with error: ${error.message}` - ) - throw error - } - } - ) - - createWorkflow( - { - name: workflowName, - schedule: { - cron: config.schedule, - numberOfExecutions: config.numberOfExecutions, - }, - }, - () => { - return step() - } - ) -} - -const validateConfig = (config) => { - if (!config) { - throw new MedusaError( - MedusaError.Types.INVALID_ARGUMENT, - "Config is required for scheduled" - ) - } - - if (!config.schedule) { - throw new MedusaError( - MedusaError.Types.INVALID_ARGUMENT, - "Cron schedule definition is required for scheduled jobs" - ) - } - - if (!config.name) { - throw new MedusaError( - MedusaError.Types.INVALID_ARGUMENT, - "Job name is required for scheduled jobs" - ) - } -} diff --git a/packages/medusa/src/loaders/index.ts b/packages/medusa/src/loaders/index.ts index 40db6084a1..fa21c20d61 100644 --- a/packages/medusa/src/loaders/index.ts +++ b/packages/medusa/src/loaders/index.ts @@ -3,7 +3,7 @@ import { ConfigModule, MedusaContainer, PluginDetails } from "@medusajs/types" import { ContainerRegistrationKeys, promiseAll } from "@medusajs/utils" import { asValue } from "awilix" import { Express, NextFunction, Request, Response } from "express" -import path, { join } from "path" +import { join } from "path" import requestIp from "request-ip" import { v4 } from "uuid" import adminLoader from "./admin" @@ -13,11 +13,11 @@ import { container, expressLoader, featureFlagsLoader, + JobLoader, logger, pgConnectionLoader, SubscriberLoader, } from "@medusajs/framework" -import { registerJobs } from "./helpers/register-jobs" import { registerWorkflows } from "./helpers/register-workflows" import { getResolvedPlugins } from "./helpers/resolve-plugins" import { resolvePluginsLinks } from "./helpers/resolve-plugins-links" @@ -43,7 +43,7 @@ async function subscribersLoader(plugins: PluginDetails[]) { /** * Load subscribers from the medusa/medusa package */ - await new SubscriberLoader(path.join(__dirname, "../subscribers")).load() + await new SubscriberLoader(join(__dirname, "../subscribers")).load() /** * Load subscribers from all the plugins. @@ -51,7 +51,7 @@ async function subscribersLoader(plugins: PluginDetails[]) { await Promise.all( plugins.map(async (pluginDetails) => { await new SubscriberLoader( - path.join(pluginDetails.resolve, "subscribers"), + join(pluginDetails.resolve, "subscribers"), pluginDetails.options ).load() }) @@ -59,11 +59,15 @@ async function subscribersLoader(plugins: PluginDetails[]) { } async function jobsLoader(plugins: PluginDetails[]) { - /** - * Load jobs from the medusa/medusa package. Remove once the medusa core is converted to a plugin - */ - await registerJobs([{ resolve: path.join(__dirname, "../") }]) - await registerJobs(plugins) + const pluginJobSourcePaths = [ + /** + * Load jobs from the medusa/medusa package. Remove once the medusa core is converted to a plugin + */ + join(__dirname, "../jobs"), + ].concat(plugins.map((plugin) => join(plugin.resolve, "jobs"))) + + const jobLoader = new JobLoader(pluginJobSourcePaths) + await jobLoader.load() } async function loadEntrypoints( diff --git a/yarn.lock b/yarn.lock index 1b04fddc45..f12bb6d14e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4612,6 +4612,7 @@ __metadata: "@medusajs/medusa-cli": ^1.3.22 "@medusajs/types": ^1.11.16 "@medusajs/utils": ^1.11.9 + "@medusajs/workflows-sdk": ^0.1.6 "@types/express": ^4.17.17 awilix: ^8.0.0 cookie-parser: ^1.4.6