diff --git a/.changeset/fuzzy-onions-impress.md b/.changeset/fuzzy-onions-impress.md new file mode 100644 index 0000000000..3cf1dbcee1 --- /dev/null +++ b/.changeset/fuzzy-onions-impress.md @@ -0,0 +1,5 @@ +--- +"@medusajs/workflow-engine-redis": patch +--- + +fix(workflow-engine-redis): scheduled jobs 2 diff --git a/packages/core/framework/src/jobs/job-loader.ts b/packages/core/framework/src/jobs/job-loader.ts index ce984f89b8..77fdedd536 100644 --- a/packages/core/framework/src/jobs/job-loader.ts +++ b/packages/core/framework/src/jobs/job-loader.ts @@ -11,7 +11,7 @@ import { ResourceLoader } from "../utils/resource-loader" type CronJobConfig = { name: string - schedule: string + schedule: string | SchedulerOptions numberOfExecutions?: SchedulerOptions["numberOfExecutions"] } diff --git a/packages/core/orchestration/src/transaction/types.ts b/packages/core/orchestration/src/transaction/types.ts index 66c49b5434..26655db4f6 100644 --- a/packages/core/orchestration/src/transaction/types.ts +++ b/packages/core/orchestration/src/transaction/types.ts @@ -144,10 +144,6 @@ export type TransactionModelOptions = { } export type SchedulerOptions = { - /** - * The cron expression to schedule the workflow execution. - */ - cron: string /** * Setting whether to allow concurrent executions (eg. if the previous execution is still running, should the new one be allowed to run or not) * By default concurrent executions are not allowed. @@ -158,7 +154,20 @@ export type SchedulerOptions = { * Optionally limit the number of executions for the scheduled workflow. If not set, the workflow will run indefinitely. */ numberOfExecutions?: number -} +} & ( + | { + /** + * The cron expression to schedule the workflow execution. + */ + cron: string + } + | { + /** + * The interval (in ms) to schedule the workflow execution. + */ + interval: number + } +) export type TransactionModel = { id: string diff --git a/packages/core/orchestration/src/workflow/scheduler.ts b/packages/core/orchestration/src/workflow/scheduler.ts index 5e2b07621e..4fa3a9914f 100644 --- a/packages/core/orchestration/src/workflow/scheduler.ts +++ b/packages/core/orchestration/src/workflow/scheduler.ts @@ -24,9 +24,8 @@ class WorkflowScheduler { concurrency: "forbid", } : { - cron: schedule.cron, - concurrency: schedule.concurrency || "forbid", - numberOfExecutions: schedule.numberOfExecutions, + concurrency: "forbid", + ...schedule, } await WorkflowScheduler.storage.schedule(workflow.id, normalizedSchedule) diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 2e8f81d0cf..4cf6d475d3 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -17,7 +17,33 @@ import { isPresent, } from "@medusajs/framework/utils" import { WorkflowOrchestratorService } from "@services" -import { CronExpression, parseExpression } from "cron-parser" +import { type CronExpression, parseExpression } from "cron-parser" + +function parseNextExecution( + optionsOrExpression: SchedulerOptions | CronExpression | string | number +) { + if (typeof optionsOrExpression === "object") { + if ("cron" in optionsOrExpression) { + const expression = parseExpression(optionsOrExpression.cron) + return expression.next().getTime() - Date.now() + } + + if ("interval" in optionsOrExpression) { + return optionsOrExpression.interval + } + + return optionsOrExpression.next().getTime() - Date.now() + } + + const result = parseInt(`${optionsOrExpression}`) + + if (isNaN(result)) { + const expression = parseExpression(`${optionsOrExpression}`) + return expression.next().getTime() - Date.now() + } + + return result +} export class InMemoryDistributedTransactionStorage implements IDistributedTransactionStorage, IDistributedSchedulerStorage @@ -31,7 +57,7 @@ export class InMemoryDistributedTransactionStorage string, { timer: NodeJS.Timeout - expression: CronExpression + expression: CronExpression | number numberOfExecutions: number config: SchedulerOptions } @@ -383,8 +409,19 @@ export class InMemoryDistributedTransactionStorage // In order to ensure that the schedule configuration is always up to date, we first cancel an existing job, if there was one // any only then we add the new one. await this.remove(jobId) - const expression = parseExpression(schedulerOptions.cron) - const nextExecution = expression.next().getTime() - Date.now() + let expression: CronExpression | number + let nextExecution = parseNextExecution(schedulerOptions) + + if ("cron" in schedulerOptions) { + expression = parseExpression(schedulerOptions.cron) + } else if ("interval" in schedulerOptions) { + expression = schedulerOptions.interval + } else { + throw new MedusaError( + MedusaError.Types.INVALID_ARGUMENT, + "Schedule cron or interval definition is required for scheduled jobs." + ) + } const timer = setTimeout(async () => { this.jobHandler(jobId) @@ -428,7 +465,8 @@ export class InMemoryDistributedTransactionStorage return } - const nextExecution = job.expression.next().getTime() - Date.now() + const nextExecution = parseNextExecution(job.expression) + const timer = setTimeout(async () => { this.jobHandler(jobId) }, nextExecution) diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 0e75da75c6..481fdbcb50 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -19,7 +19,7 @@ import { TransactionStepState, } from "@medusajs/framework/utils" import { WorkflowOrchestratorService } from "@services" -import { Queue, Worker } from "bullmq" +import { Queue, RepeatOptions, Worker } from "bullmq" import Redis from "ioredis" enum JobType { @@ -109,6 +109,11 @@ export class RedisDistributedTransactionStorage this.worker = new Worker( this.queueName, async (job) => { + this.logger_.debug( + `executing job ${job.name} from queue ${ + this.queueName + } with the following data: ${JSON.stringify(job.data)}` + ) if (allowedJobs.includes(job.name as JobType)) { await this.executeTransaction( job.data.workflowId, @@ -128,7 +133,14 @@ export class RedisDistributedTransactionStorage this.jobWorker = new Worker( this.jobQueueName, async (job) => { - await this.executeScheduledJob( + this.logger_.debug( + `executing scheduled job ${job.data.jobId} from queue ${ + this.jobQueueName + } with the following options: ${JSON.stringify( + job.data.schedulerOptions + )}` + ) + return await this.executeScheduledJob( job.data.jobId, job.data.schedulerOptions ) @@ -182,9 +194,8 @@ export class RedisDistributedTransactionStorage try { // TODO: In the case of concurrency being forbidden, we want to generate a predictable transaction ID and rely on the idempotency // of the transaction to ensure that the transaction is only executed once. - return await this.workflowOrchestratorService_.run(jobId, { + await this.workflowOrchestratorService_.run(jobId, { logOnError: true, - throwOnError: false, }) } catch (e) { if (e instanceof MedusaError && e.type === MedusaError.Types.NOT_FOUND) { @@ -430,6 +441,23 @@ export class RedisDistributedTransactionStorage const jobId = typeof jobDefinition === "string" ? jobDefinition : jobDefinition.jobId + if ("cron" in schedulerOptions && "interval" in schedulerOptions) { + throw new Error( + `Unable to register a job with both scheduler options interval and cron.` + ) + } + + const repeatOptions: RepeatOptions = { + limit: schedulerOptions.numberOfExecutions, + key: `${JobType.SCHEDULE}_${jobId}`, + } + + if ("cron" in schedulerOptions) { + repeatOptions.pattern = schedulerOptions.cron + } else { + repeatOptions.every = schedulerOptions.interval + } + // If it is the same key (eg. the same workflow name), the old one will get overridden. await this.jobQueue?.add( JobType.SCHEDULE, @@ -438,11 +466,7 @@ export class RedisDistributedTransactionStorage schedulerOptions, }, { - repeat: { - pattern: schedulerOptions.cron, - limit: schedulerOptions.numberOfExecutions, - key: `${JobType.SCHEDULE}_${jobId}`, - }, + repeat: repeatOptions, removeOnComplete: { age: 86400, count: 1000,