fix(): Workflow engine redis worker instance in worker mode (#14099)

* fix(): Workflow engine redis worker instance in worker mode

* Create wicked-tips-buy.md
This commit is contained in:
Adrien de Peretti
2025-11-24 09:31:36 +01:00
committed by GitHub
parent f67bfb9f92
commit 6c3ec528f1
2 changed files with 32 additions and 27 deletions

View File

@@ -0,0 +1,5 @@
---
"@medusajs/workflow-engine-redis": patch
---
fix(): Workflow engine redis worker instance in worker mode

View File

@@ -145,37 +145,37 @@ export class RedisDistributedTransactionStorage
// Remove all repeatable jobs from the old queue since now we have a queue dedicated to scheduled jobs
await this.removeAllRepeatableJobs(this.queue)
this.worker = new Worker(
this.queueName,
async (job) => {
this.logger_.debug(
`executing job ${job.name} from queue ${
this.queueName
} with the following data: ${JSON.stringify(job.data)}`
)
if (allowedJobs.includes(job.name as JobType)) {
try {
await this.executeTransaction(
job.data.workflowId,
job.data.transactionId,
job.data.transactionMetadata
)
} catch (error) {
if (!SkipExecutionError.isSkipExecutionError(error)) {
throw error
if (this.#isWorkerMode) {
this.worker = new Worker(
this.queueName,
async (job) => {
this.logger_.debug(
`executing job ${job.name} from queue ${
this.queueName
} with the following data: ${JSON.stringify(job.data)}`
)
if (allowedJobs.includes(job.name as JobType)) {
try {
await this.executeTransaction(
job.data.workflowId,
job.data.transactionId,
job.data.transactionMetadata
)
} catch (error) {
if (!SkipExecutionError.isSkipExecutionError(error)) {
throw error
}
}
}
}
if (job.name === JobType.SCHEDULE) {
// Remove repeatable job from the old queue since now we have a queue dedicated to scheduled jobs
await this.remove(job.data.jobId)
}
},
workerOptions
)
if (job.name === JobType.SCHEDULE) {
// Remove repeatable job from the old queue since now we have a queue dedicated to scheduled jobs
await this.remove(job.data.jobId)
}
},
workerOptions
)
if (this.#isWorkerMode) {
this.jobWorker = new Worker(
this.jobQueueName,
async (job) => {