Chore/framework 6/n (#8356)

**What**
Refactor and improve job loader as well as move it to the framework

FIXES FRMW-2626
This commit is contained in:
Adrien de Peretti
2024-07-31 11:17:17 +02:00
committed by GitHub
parent 838eb0e34a
commit 6ccf83128c
13 changed files with 215 additions and 126 deletions

View File

@@ -12,7 +12,8 @@
"./config": "./dist/config/index.js",
"./logger": "./dist/logger/index.js",
"./database": "./dist/database/index.js",
"./subscribers": "./dist/subscribers/index.js"
"./subscribers": "./dist/subscribers/index.js",
"./jobs": "./dist/jobs/index.js"
},
"engines": {
"node": ">=20"
@@ -30,8 +31,8 @@
"scripts": {
"watch": "tsc --watch -p ./tsconfig.build.json",
"watch:test": "tsc --build tsconfig.spec.json --watch",
"prepublishOnly": "cross-env NODE_ENV=production tsc -p ./tsconfig.build.json && tsc-alias -p ./tsconfig.build.json",
"build": "rimraf dist && tsc --build && tsc-alias",
"prepublishOnly": "tsc -p ./tsconfig.build.json && tsc-alias -p ./tsconfig.build.json",
"build": "rimraf dist && tsc --noEmit && yarn prepublishOnly",
"test": "jest --runInBand --bail --passWithNoTests --forceExit -- src",
"test:integration": "jest --forceExit -- integration-tests/**/__tests__/**/*.ts"
},
@@ -47,6 +48,7 @@
"dependencies": {
"@medusajs/medusa-cli": "^1.3.22",
"@medusajs/utils": "^1.11.9",
"@medusajs/workflows-sdk": "^0.1.6",
"awilix": "^8.0.0",
"cookie-parser": "^1.4.6",
"express": "^4.18.2",

View File

@@ -4,4 +4,5 @@ export * from "./http"
export * from "./database"
export * from "./container"
export * from "./subscribers"
export * from "./jobs"
export * from "./feature-flags"

View File

@@ -1,14 +1,19 @@
import path from "path"
import { registerJobs } from "../helpers/register-jobs"
import { join } from "path"
import { WorkflowManager, WorkflowScheduler } from "@medusajs/orchestration"
import { MockSchedulerStorage } from "../__fixtures__/mock-scheduler-storage"
import { JobLoader } from "../job-loader"
describe("register jobs", () => {
WorkflowScheduler.setStorage(new MockSchedulerStorage())
let jobLoader!: JobLoader
beforeAll(() => {
jobLoader = new JobLoader(join(__dirname, "../__fixtures__/plugin/jobs"))
})
it("registers jobs from plugins", async () => {
await registerJobs([
{ resolve: path.join(__dirname, "../__fixtures__/plugin") },
])
await jobLoader.load()
const workflow = WorkflowManager.getWorkflow("job-summarize-orders")
expect(workflow).toBeDefined()
expect(workflow?.options.schedule).toEqual({

View File

@@ -0,0 +1 @@
export * from './job-loader'

View File

@@ -0,0 +1,180 @@
import {
createStep,
createWorkflow,
StepResponse,
} from "@medusajs/workflows-sdk"
import { isObject, MedusaError, promiseAll } from "@medusajs/utils"
import { SchedulerOptions } from "@medusajs/orchestration"
import { MedusaContainer } from "@medusajs/types"
import { logger } from "../logger"
import { access, readdir } from "fs/promises"
import { join } from "path"
type CronJobConfig = {
name: string
schedule: string
numberOfExecutions?: SchedulerOptions["numberOfExecutions"]
}
type CronJobHandler = (container: MedusaContainer) => Promise<any>
export class JobLoader {
/**
* The directory from which to load the jobs
* @private
*/
#sourceDir: string | string[]
/**
* The list of file names to exclude from the subscriber scan
* @private
*/
#excludes: RegExp[] = [
/index\.js/,
/index\.ts/,
/\.DS_Store/,
/(\.ts\.map|\.js\.map|\.d\.ts|\.md)/,
/^_[^/\\]*(\.[^/\\]+)?$/,
]
constructor(sourceDir: string | string[]) {
this.#sourceDir = sourceDir
}
/**
* Validate cron job configuration
* @param config
* @protected
*/
protected validateConfig(config: {
schedule: string | SchedulerOptions
name: string
}) {
if (!config) {
throw new MedusaError(
MedusaError.Types.INVALID_ARGUMENT,
"Config is required for scheduled jobs."
)
}
if (!config.schedule) {
throw new MedusaError(
MedusaError.Types.INVALID_ARGUMENT,
"Cron schedule definition is required for scheduled jobs."
)
}
if (!config.name) {
throw new MedusaError(
MedusaError.Types.INVALID_ARGUMENT,
"Job name is required for scheduled jobs."
)
}
}
/**
* Create a workflow to register a new cron job
* @param config
* @param handler
* @protected
*/
protected registerJob({
config,
handler,
}: {
config: CronJobConfig
handler: CronJobHandler
}) {
const workflowName = `job-${config.name}`
const step = createStep(
`${config.name}-as-step`,
async (_, stepContext) => {
const { container } = stepContext
try {
const res = await handler(container)
return new StepResponse(res, res)
} catch (error) {
logger.error(
`Scheduled job ${config.name} failed with error: ${error.message}`
)
throw error
}
}
)
const workflowConfig = {
name: workflowName,
schedule: isObject(config.schedule)
? config.schedule
: {
cron: config.schedule,
numberOfExecutions: config.numberOfExecutions,
},
}
createWorkflow(workflowConfig, () => {
step()
})
}
/**
* Load cron jobs from one or multiple source paths
*/
async load() {
const normalizedSourcePath = Array.isArray(this.#sourceDir)
? this.#sourceDir
: [this.#sourceDir]
const promises = normalizedSourcePath.map(async (sourcePath) => {
try {
await access(sourcePath)
} catch {
return
}
return await readdir(sourcePath, {
recursive: true,
withFileTypes: true,
}).then(async (entries) => {
const fileEntries = entries.filter((entry) => {
return (
!entry.isDirectory() &&
!this.#excludes.some((exclude) => exclude.test(entry.name))
)
})
logger.debug(`Registering jobs from ${sourcePath}.`)
return await promiseAll(
fileEntries.map(async (entry) => {
const fullPath = join(entry.path, entry.name)
const module_ = await import(fullPath)
const input = {
config: module_.config,
handler: module_.default,
}
this.validateConfig(input.config)
return input
})
)
})
})
const jobsInputs = await promiseAll(promises)
const flatJobsInput = jobsInputs.flat(1).filter(
(
job
): job is {
config: CronJobConfig
handler: CronJobHandler
} => !!job
)
flatJobsInput.map(this.registerJob)
logger.debug(`Job registered.`)
}
}

View File

@@ -130,10 +130,7 @@ export class SubscriberLoader {
withFileTypes: true,
}).then(async (entries) => {
return entries.flatMap(async (entry) => {
if (
this.#excludes.length &&
this.#excludes.some((exclude) => exclude.test(entry.name))
) {
if (this.#excludes.some((exclude) => exclude.test(entry.name))) {
return
}

View File

@@ -3,9 +3,9 @@
"include": ["src"],
"exclude": [
"dist",
"./src/**/__tests__",
"./src/**/__mocks__",
"./src/**/__fixtures__",
"src/**/__tests__",
"src/**/__mocks__",
"src/**/__fixtures__",
"node_modules"
],
}

View File

@@ -23,7 +23,6 @@
"paths": {
},
},
"include": ["src"],
"exclude": [
"dist",

View File

@@ -1,101 +0,0 @@
import {
createStep,
createWorkflow,
StepResponse,
} from "@medusajs/workflows-sdk"
import { glob } from "glob"
import { logger } from "@medusajs/framework"
import { ContainerRegistrationKeys, MedusaError } from "@medusajs/utils"
export const registerJobs = async (plugins) => {
await Promise.all(
plugins.map(async (pluginDetails) => {
const files = glob.sync(
`${pluginDetails.resolve}/jobs/*.{ts,js,mjs,mts}`,
{
ignore: ["**/*.d.ts", "**/*.map"],
}
)
logger.debug(
`Registering ${files.length} jobs from ${pluginDetails.resolve}`
)
const jobs = await Promise.all(
files.map(async (file) => {
const module_ = await import(file)
const input = {
config: module_.config,
handler: module_.default,
}
validateConfig(input.config)
return input
})
)
const res = await Promise.all(jobs.map(createJob))
logger.debug(
`Registered ${res.length} jobs from ${pluginDetails.resolve}`
)
return res
})
)
}
const createJob = async ({ config, handler }) => {
const workflowName = `job-${config.name}`
const step = createStep(
`${config.name}-as-step`,
async (stepInput, stepContext) => {
const { container } = stepContext
const logger = container.resolve(ContainerRegistrationKeys.LOGGER)
try {
const res = await handler(container)
return new StepResponse(res, res)
} catch (error) {
logger.error(
`Scheduled job ${config.name} failed with error: ${error.message}`
)
throw error
}
}
)
createWorkflow(
{
name: workflowName,
schedule: {
cron: config.schedule,
numberOfExecutions: config.numberOfExecutions,
},
},
() => {
return step()
}
)
}
const validateConfig = (config) => {
if (!config) {
throw new MedusaError(
MedusaError.Types.INVALID_ARGUMENT,
"Config is required for scheduled"
)
}
if (!config.schedule) {
throw new MedusaError(
MedusaError.Types.INVALID_ARGUMENT,
"Cron schedule definition is required for scheduled jobs"
)
}
if (!config.name) {
throw new MedusaError(
MedusaError.Types.INVALID_ARGUMENT,
"Job name is required for scheduled jobs"
)
}
}

View File

@@ -3,7 +3,7 @@ import { ConfigModule, MedusaContainer, PluginDetails } from "@medusajs/types"
import { ContainerRegistrationKeys, promiseAll } from "@medusajs/utils"
import { asValue } from "awilix"
import { Express, NextFunction, Request, Response } from "express"
import path, { join } from "path"
import { join } from "path"
import requestIp from "request-ip"
import { v4 } from "uuid"
import adminLoader from "./admin"
@@ -13,11 +13,11 @@ import {
container,
expressLoader,
featureFlagsLoader,
JobLoader,
logger,
pgConnectionLoader,
SubscriberLoader,
} from "@medusajs/framework"
import { registerJobs } from "./helpers/register-jobs"
import { registerWorkflows } from "./helpers/register-workflows"
import { getResolvedPlugins } from "./helpers/resolve-plugins"
import { resolvePluginsLinks } from "./helpers/resolve-plugins-links"
@@ -43,7 +43,7 @@ async function subscribersLoader(plugins: PluginDetails[]) {
/**
* Load subscribers from the medusa/medusa package
*/
await new SubscriberLoader(path.join(__dirname, "../subscribers")).load()
await new SubscriberLoader(join(__dirname, "../subscribers")).load()
/**
* Load subscribers from all the plugins.
@@ -51,7 +51,7 @@ async function subscribersLoader(plugins: PluginDetails[]) {
await Promise.all(
plugins.map(async (pluginDetails) => {
await new SubscriberLoader(
path.join(pluginDetails.resolve, "subscribers"),
join(pluginDetails.resolve, "subscribers"),
pluginDetails.options
).load()
})
@@ -59,11 +59,15 @@ async function subscribersLoader(plugins: PluginDetails[]) {
}
async function jobsLoader(plugins: PluginDetails[]) {
/**
* Load jobs from the medusa/medusa package. Remove once the medusa core is converted to a plugin
*/
await registerJobs([{ resolve: path.join(__dirname, "../") }])
await registerJobs(plugins)
const pluginJobSourcePaths = [
/**
* Load jobs from the medusa/medusa package. Remove once the medusa core is converted to a plugin
*/
join(__dirname, "../jobs"),
].concat(plugins.map((plugin) => join(plugin.resolve, "jobs")))
const jobLoader = new JobLoader(pluginJobSourcePaths)
await jobLoader.load()
}
async function loadEntrypoints(

View File

@@ -4612,6 +4612,7 @@ __metadata:
"@medusajs/medusa-cli": ^1.3.22
"@medusajs/types": ^1.11.16
"@medusajs/utils": ^1.11.9
"@medusajs/workflows-sdk": ^0.1.6
"@types/express": ^4.17.17
awilix: ^8.0.0
cookie-parser: ^1.4.6