chore(workflow-engine-redis): remove repeatable jobs from old queue (#11822)

What:
  * Old  deployments have repeatable jobs registered in a wrong queue. When the `server` instance picks that job, the workflow doesn't exist, it calls to remove the job, which then removes the job from the new queue.
  * This PR cleans up any repeatable job from the queue that is exclusive to handle workflows.
This commit is contained in:
Carlos R. L. Rodrigues
2025-03-12 11:54:10 -03:00
committed by GitHub
parent 76a0d8e1d8
commit 6053ec3976
2 changed files with 15 additions and 2 deletions

View File

@@ -0,0 +1,5 @@
---
"@medusajs/workflow-engine-redis": patch
---
chore(workflow-engine-redis): remove repeatable jobs from old queue

View File

@@ -102,6 +102,10 @@ export class RedisDistributedTransactionStorage
this.redisWorkerConnection /*, runRetryDelay: 100000 for tests */,
}
// TODO: Remove this once we have released to all clients (Added: v2.6+)
// Remove all repeatable jobs from the old queue since now we have a queue dedicated to scheduled jobs
await this.removeAllRepeatableJobs(this.queue)
this.worker = new Worker(
this.queueName,
async (job) => {
@@ -456,9 +460,13 @@ export class RedisDistributedTransactionStorage
}
async removeAll(): Promise<void> {
const repeatableJobs = (await this.jobQueue?.getRepeatableJobs()) ?? []
return await this.removeAllRepeatableJobs(this.jobQueue!)
}
private async removeAllRepeatableJobs(queue: Queue): Promise<void> {
const repeatableJobs = (await queue.getRepeatableJobs()) ?? []
await promiseAll(
repeatableJobs.map((job) => this.jobQueue?.removeRepeatableByKey(job.key))
repeatableJobs.map((job) => queue.removeRepeatableByKey(job.key))
)
}