fix(workflow-engine-*): scheduled jobs interval (#11800)

**What**
Currently only cron pattern are supported by scheduled jobs, this can lead to issue. for example you set the pattern to execute every hours at minute 0 and second 0 (as it is expected to execute at exactly this constraint) but due to the moment it gets executed we our out of the second 0 then the job wont get executed until the next scheduled cron table execution.

With this pr we introduce the `interval` configuration which allows you the specify a delay between execution in ms (e.g every minute -> 60 * 1000 ms) which ensure that once a job is executed another one is scheduled for a minute later.

**Usage**
```ts
// jobs/job-1.ts
const thirtySeconds = 30 * 1000

export const config = {
  name: "job-1",
  schedule: {
    interval: thirtySeconds
  },
}
```
This commit is contained in:
Adrien de Peretti
2025-03-13 16:05:13 +01:00
committed by GitHub
parent e05491c24f
commit fc652ea51e
6 changed files with 98 additions and 23 deletions

View File

@@ -0,0 +1,5 @@
---
"@medusajs/workflow-engine-redis": patch
---
fix(workflow-engine-redis): scheduled jobs 2

View File

@@ -11,7 +11,7 @@ import { ResourceLoader } from "../utils/resource-loader"
type CronJobConfig = { type CronJobConfig = {
name: string name: string
schedule: string schedule: string | SchedulerOptions
numberOfExecutions?: SchedulerOptions["numberOfExecutions"] numberOfExecutions?: SchedulerOptions["numberOfExecutions"]
} }

View File

@@ -144,10 +144,6 @@ export type TransactionModelOptions = {
} }
export type SchedulerOptions = { export type SchedulerOptions = {
/**
* The cron expression to schedule the workflow execution.
*/
cron: string
/** /**
* Setting whether to allow concurrent executions (eg. if the previous execution is still running, should the new one be allowed to run or not) * Setting whether to allow concurrent executions (eg. if the previous execution is still running, should the new one be allowed to run or not)
* By default concurrent executions are not allowed. * By default concurrent executions are not allowed.
@@ -158,7 +154,20 @@ export type SchedulerOptions = {
* Optionally limit the number of executions for the scheduled workflow. If not set, the workflow will run indefinitely. * Optionally limit the number of executions for the scheduled workflow. If not set, the workflow will run indefinitely.
*/ */
numberOfExecutions?: number numberOfExecutions?: number
} } & (
| {
/**
* The cron expression to schedule the workflow execution.
*/
cron: string
}
| {
/**
* The interval (in ms) to schedule the workflow execution.
*/
interval: number
}
)
export type TransactionModel = { export type TransactionModel = {
id: string id: string

View File

@@ -24,9 +24,8 @@ class WorkflowScheduler {
concurrency: "forbid", concurrency: "forbid",
} }
: { : {
cron: schedule.cron, concurrency: "forbid",
concurrency: schedule.concurrency || "forbid", ...schedule,
numberOfExecutions: schedule.numberOfExecutions,
} }
await WorkflowScheduler.storage.schedule(workflow.id, normalizedSchedule) await WorkflowScheduler.storage.schedule(workflow.id, normalizedSchedule)

View File

@@ -17,7 +17,33 @@ import {
isPresent, isPresent,
} from "@medusajs/framework/utils" } from "@medusajs/framework/utils"
import { WorkflowOrchestratorService } from "@services" import { WorkflowOrchestratorService } from "@services"
import { CronExpression, parseExpression } from "cron-parser" import { type CronExpression, parseExpression } from "cron-parser"
function parseNextExecution(
optionsOrExpression: SchedulerOptions | CronExpression | string | number
) {
if (typeof optionsOrExpression === "object") {
if ("cron" in optionsOrExpression) {
const expression = parseExpression(optionsOrExpression.cron)
return expression.next().getTime() - Date.now()
}
if ("interval" in optionsOrExpression) {
return optionsOrExpression.interval
}
return optionsOrExpression.next().getTime() - Date.now()
}
const result = parseInt(`${optionsOrExpression}`)
if (isNaN(result)) {
const expression = parseExpression(`${optionsOrExpression}`)
return expression.next().getTime() - Date.now()
}
return result
}
export class InMemoryDistributedTransactionStorage export class InMemoryDistributedTransactionStorage
implements IDistributedTransactionStorage, IDistributedSchedulerStorage implements IDistributedTransactionStorage, IDistributedSchedulerStorage
@@ -31,7 +57,7 @@ export class InMemoryDistributedTransactionStorage
string, string,
{ {
timer: NodeJS.Timeout timer: NodeJS.Timeout
expression: CronExpression expression: CronExpression | number
numberOfExecutions: number numberOfExecutions: number
config: SchedulerOptions config: SchedulerOptions
} }
@@ -383,8 +409,19 @@ export class InMemoryDistributedTransactionStorage
// In order to ensure that the schedule configuration is always up to date, we first cancel an existing job, if there was one // In order to ensure that the schedule configuration is always up to date, we first cancel an existing job, if there was one
// any only then we add the new one. // any only then we add the new one.
await this.remove(jobId) await this.remove(jobId)
const expression = parseExpression(schedulerOptions.cron) let expression: CronExpression | number
const nextExecution = expression.next().getTime() - Date.now() let nextExecution = parseNextExecution(schedulerOptions)
if ("cron" in schedulerOptions) {
expression = parseExpression(schedulerOptions.cron)
} else if ("interval" in schedulerOptions) {
expression = schedulerOptions.interval
} else {
throw new MedusaError(
MedusaError.Types.INVALID_ARGUMENT,
"Schedule cron or interval definition is required for scheduled jobs."
)
}
const timer = setTimeout(async () => { const timer = setTimeout(async () => {
this.jobHandler(jobId) this.jobHandler(jobId)
@@ -428,7 +465,8 @@ export class InMemoryDistributedTransactionStorage
return return
} }
const nextExecution = job.expression.next().getTime() - Date.now() const nextExecution = parseNextExecution(job.expression)
const timer = setTimeout(async () => { const timer = setTimeout(async () => {
this.jobHandler(jobId) this.jobHandler(jobId)
}, nextExecution) }, nextExecution)

View File

@@ -19,7 +19,7 @@ import {
TransactionStepState, TransactionStepState,
} from "@medusajs/framework/utils" } from "@medusajs/framework/utils"
import { WorkflowOrchestratorService } from "@services" import { WorkflowOrchestratorService } from "@services"
import { Queue, Worker } from "bullmq" import { Queue, RepeatOptions, Worker } from "bullmq"
import Redis from "ioredis" import Redis from "ioredis"
enum JobType { enum JobType {
@@ -109,6 +109,11 @@ export class RedisDistributedTransactionStorage
this.worker = new Worker( this.worker = new Worker(
this.queueName, this.queueName,
async (job) => { async (job) => {
this.logger_.debug(
`executing job ${job.name} from queue ${
this.queueName
} with the following data: ${JSON.stringify(job.data)}`
)
if (allowedJobs.includes(job.name as JobType)) { if (allowedJobs.includes(job.name as JobType)) {
await this.executeTransaction( await this.executeTransaction(
job.data.workflowId, job.data.workflowId,
@@ -128,7 +133,14 @@ export class RedisDistributedTransactionStorage
this.jobWorker = new Worker( this.jobWorker = new Worker(
this.jobQueueName, this.jobQueueName,
async (job) => { async (job) => {
await this.executeScheduledJob( this.logger_.debug(
`executing scheduled job ${job.data.jobId} from queue ${
this.jobQueueName
} with the following options: ${JSON.stringify(
job.data.schedulerOptions
)}`
)
return await this.executeScheduledJob(
job.data.jobId, job.data.jobId,
job.data.schedulerOptions job.data.schedulerOptions
) )
@@ -182,9 +194,8 @@ export class RedisDistributedTransactionStorage
try { try {
// TODO: In the case of concurrency being forbidden, we want to generate a predictable transaction ID and rely on the idempotency // TODO: In the case of concurrency being forbidden, we want to generate a predictable transaction ID and rely on the idempotency
// of the transaction to ensure that the transaction is only executed once. // of the transaction to ensure that the transaction is only executed once.
return await this.workflowOrchestratorService_.run(jobId, { await this.workflowOrchestratorService_.run(jobId, {
logOnError: true, logOnError: true,
throwOnError: false,
}) })
} catch (e) { } catch (e) {
if (e instanceof MedusaError && e.type === MedusaError.Types.NOT_FOUND) { if (e instanceof MedusaError && e.type === MedusaError.Types.NOT_FOUND) {
@@ -430,6 +441,23 @@ export class RedisDistributedTransactionStorage
const jobId = const jobId =
typeof jobDefinition === "string" ? jobDefinition : jobDefinition.jobId typeof jobDefinition === "string" ? jobDefinition : jobDefinition.jobId
if ("cron" in schedulerOptions && "interval" in schedulerOptions) {
throw new Error(
`Unable to register a job with both scheduler options interval and cron.`
)
}
const repeatOptions: RepeatOptions = {
limit: schedulerOptions.numberOfExecutions,
key: `${JobType.SCHEDULE}_${jobId}`,
}
if ("cron" in schedulerOptions) {
repeatOptions.pattern = schedulerOptions.cron
} else {
repeatOptions.every = schedulerOptions.interval
}
// If it is the same key (eg. the same workflow name), the old one will get overridden. // If it is the same key (eg. the same workflow name), the old one will get overridden.
await this.jobQueue?.add( await this.jobQueue?.add(
JobType.SCHEDULE, JobType.SCHEDULE,
@@ -438,11 +466,7 @@ export class RedisDistributedTransactionStorage
schedulerOptions, schedulerOptions,
}, },
{ {
repeat: { repeat: repeatOptions,
pattern: schedulerOptions.cron,
limit: schedulerOptions.numberOfExecutions,
key: `${JobType.SCHEDULE}_${jobId}`,
},
removeOnComplete: { removeOnComplete: {
age: 86400, age: 86400,
count: 1000, count: 1000,