feat: Add support for scheduled jobs (#7674)

This commit is contained in:
Stevche Radevski
2024-06-11 13:35:56 +02:00
committed by GitHub
parent afae395fea
commit bef5941714
6 changed files with 154 additions and 1 deletions

View File

View File

@@ -0,0 +1,21 @@
import {
IDistributedSchedulerStorage,
SchedulerOptions,
} from "@medusajs/orchestration"
export class MockSchedulerStorage implements IDistributedSchedulerStorage {
async schedule(
jobDefinition: string | { jobId: string },
schedulerOptions: SchedulerOptions
): Promise<void> {
return Promise.resolve()
}
async remove(jobId: string): Promise<void> {
return Promise.resolve()
}
async removeAll(): Promise<void> {
return Promise.resolve()
}
}

View File

@@ -0,0 +1,11 @@
import { MedusaContainer } from "@medusajs/types"
export default async function handler(container: MedusaContainer) {
console.log(`You have received 5 orders today`)
}
export const config = {
name: "summarize-orders-job",
schedule: "* * * * * *",
numberOfExecutions: 2,
}

View File

@@ -0,0 +1,19 @@
import path from "path"
import { registerJobs } from "../helpers/register-jobs"
import { WorkflowManager, WorkflowScheduler } from "@medusajs/orchestration"
import { MockSchedulerStorage } from "../__fixtures__/mock-scheduler-storage"
describe("register jobs", () => {
WorkflowScheduler.setStorage(new MockSchedulerStorage())
it("registers jobs from plugins", async () => {
await registerJobs([
{ resolve: path.join(__dirname, "../__fixtures__/plugin") },
])
const workflow = WorkflowManager.getWorkflow("summarize-orders-job")
expect(workflow).toBeDefined()
expect(workflow?.options.schedule).toEqual({
cron: "* * * * * *",
numberOfExecutions: 2,
})
})
})

View File

@@ -0,0 +1,92 @@
import {
StepResponse,
createStep,
createWorkflow,
} from "@medusajs/workflows-sdk"
import { glob } from "glob"
import logger from "../logger"
import { MedusaError } from "@medusajs/utils"
export const registerJobs = async (plugins) => {
await Promise.all(
plugins.map(async (pluginDetails) => {
const files = glob.sync(
`${pluginDetails.resolve}/jobs/*.{ts,js,mjs,mts}`,
{
ignore: ["**/*.d.ts", "**/*.map"],
}
)
logger.debug(
`Registering ${files.length} jobs from ${pluginDetails.resolve}`
)
const jobs = await Promise.all(
files.map(async (file) => {
const module_ = await import(file)
const input = {
config: module_.config,
handler: module_.default,
}
validateConfig(input.config)
return input
})
)
const res = await Promise.all(jobs.map(createJob))
logger.debug(
`Registered ${res.length} jobs from ${pluginDetails.resolve}`
)
return res
})
)
}
const createJob = async ({ config, handler }) => {
const step = createStep(
`${config.name}-as-step`,
async (stepInput, stepContext) => {
const { container } = stepContext
const res = await handler(container)
return new StepResponse(res, res)
}
)
createWorkflow(
{
name: config.name,
schedule: {
cron: config.schedule,
numberOfExecutions: config.numberOfExecutions,
},
},
() => {
return step()
}
)
}
const validateConfig = (config) => {
if (!config) {
throw new MedusaError(
MedusaError.Types.INVALID_ARGUMENT,
"Config is required for scheduled"
)
}
if (!config.schedule) {
throw new MedusaError(
MedusaError.Types.INVALID_ARGUMENT,
"Cron schedule definition is required for scheduled jobs"
)
}
if (!config.name) {
throw new MedusaError(
MedusaError.Types.INVALID_ARGUMENT,
"Job name is required for scheduled jobs"
)
}
}

View File

@@ -22,6 +22,7 @@ import { SubscriberLoader } from "./helpers/subscribers"
import Logger from "./logger"
import loadMedusaApp from "./medusa-app"
import registerPgConnection from "./pg-connection"
import { registerJobs } from "./helpers/register-jobs"
type Options = {
directory: string
@@ -57,6 +58,14 @@ async function subscribersLoader(
)
}
async function jobsLoader(plugins: PluginDetails[]) {
/**
* Load jobs from the medusa/medusa package. Remove once the medusa core is converted to a plugin
*/
await registerJobs([{ resolve: path.join(__dirname, "../") }])
await registerJobs(plugins)
}
async function loadEntrypoints(
plugins: PluginDetails[],
container: MedusaContainer,
@@ -89,6 +98,7 @@ async function loadEntrypoints(
await adminLoader({ app: expressApp, configModule, rootDirectory })
await subscribersLoader(plugins, container)
await jobsLoader(plugins)
await apiLoader({
container,
plugins,
@@ -128,9 +138,9 @@ export default async ({
)
const plugins = getResolvedPlugins(rootDirectory, configModule, true) || []
const pluginLinks = await resolvePluginsLinks(plugins, container)
await registerWorkflows(plugins)
const pluginLinks = await resolvePluginsLinks(plugins, container)
const { onApplicationShutdown, onApplicationPrepareShutdown } =
await loadMedusaApp({
container,