From 6053ec3976d5ebd18680eba5eff3c71038a108cb Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Date: Wed, 12 Mar 2025 11:54:10 -0300 Subject: [PATCH] chore(workflow-engine-redis): remove repeatable jobs from old queue (#11822) What: * Old deployments have repeatable jobs registered in a wrong queue. When the `server` instance picks that job, the workflow doesn't exist, it calls to remove the job, which then removes the job from the new queue. * This PR cleans up any repeatable job from the queue that is exclusive to handle workflows. --- .changeset/selfish-singers-hammer.md | 5 +++++ .../src/utils/workflow-orchestrator-storage.ts | 12 ++++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) create mode 100644 .changeset/selfish-singers-hammer.md diff --git a/.changeset/selfish-singers-hammer.md b/.changeset/selfish-singers-hammer.md new file mode 100644 index 0000000000..ff517ba115 --- /dev/null +++ b/.changeset/selfish-singers-hammer.md @@ -0,0 +1,5 @@ +--- +"@medusajs/workflow-engine-redis": patch +--- + +chore(workflow-engine-redis): remove repeatable jobs from old queue diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 80b1c5350b..0e75da75c6 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -102,6 +102,10 @@ export class RedisDistributedTransactionStorage this.redisWorkerConnection /*, runRetryDelay: 100000 for tests */, } + // TODO: Remove this once we have released to all clients (Added: v2.6+) + // Remove all repeatable jobs from the old queue since now we have a queue dedicated to scheduled jobs + await this.removeAllRepeatableJobs(this.queue) + this.worker = new Worker( this.queueName, async (job) => { @@ -456,9 +460,13 @@ export class RedisDistributedTransactionStorage } async removeAll(): Promise { - const repeatableJobs = (await this.jobQueue?.getRepeatableJobs()) ?? [] + return await this.removeAllRepeatableJobs(this.jobQueue!) + } + + private async removeAllRepeatableJobs(queue: Queue): Promise { + const repeatableJobs = (await queue.getRepeatableJobs()) ?? [] await promiseAll( - repeatableJobs.map((job) => this.jobQueue?.removeRepeatableByKey(job.key)) + repeatableJobs.map((job) => queue.removeRepeatableByKey(job.key)) ) }