diff --git a/.changeset/long-nails-fix.md b/.changeset/long-nails-fix.md new file mode 100644 index 0000000000..51ab3bfd2f --- /dev/null +++ b/.changeset/long-nails-fix.md @@ -0,0 +1,6 @@ +--- +"@medusajs/workflow-engine-redis": patch +"@medusajs/workflow-engine-inmemory": patch +--- + +fix(engine): Always create cleaner job diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 0cc222a59e..d8018e76bf 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -29,6 +29,8 @@ import { WorkflowOrchestratorService } from "@services" import { type CronExpression, parseExpression } from "cron-parser" import { WorkflowExecution } from "../models/workflow-execution" +const THIRTY_MINUTES_IN_MS = 1000 * 60 * 30 + function calculateDelayFromExpression(expression: CronExpression): number { const nextTime = expression.next().getTime() const now = Date.now() @@ -127,7 +129,7 @@ export class InMemoryDistributedTransactionStorage try { await this.clearExpiredExecutions() } catch {} - }, 1000 * 60 * 60) + }, THIRTY_MINUTES_IN_MS) } async onApplicationShutdown() { diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index c4cd1dde29..87191f53f7 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -33,7 +33,7 @@ enum JobType { TRANSACTION_TIMEOUT = "transaction_timeout", } -const ONE_HOUR_IN_MS = 1000 * 60 * 60 +const THIRTY_MINUTES_IN_MS = 1000 * 60 * 30 const REPEATABLE_CLEARER_JOB_ID = "clear-expired-executions" const invokingStatesSet = new Set([ @@ -118,14 +118,6 @@ export class RedisDistributedTransactionStorage await this.worker?.close() await this.jobWorker?.close() - // Clean up repeatable jobs - const repeatableJobs = (await this.cleanerQueue_?.getRepeatableJobs()) ?? [] - for (const job of repeatableJobs) { - if (job.id === REPEATABLE_CLEARER_JOB_ID) { - await this.cleanerQueue_?.removeRepeatableByKey(job.key) - } - } - await this.cleanerWorker_?.close() } @@ -206,7 +198,7 @@ export class RedisDistributedTransactionStorage {}, { repeat: { - every: ONE_HOUR_IN_MS, + every: THIRTY_MINUTES_IN_MS, }, jobId: REPEATABLE_CLEARER_JOB_ID, removeOnComplete: true,