diff --git a/.changeset/wicked-tips-buy.md b/.changeset/wicked-tips-buy.md new file mode 100644 index 0000000000..4f0fb1a21f --- /dev/null +++ b/.changeset/wicked-tips-buy.md @@ -0,0 +1,5 @@ +--- +"@medusajs/workflow-engine-redis": patch +--- + +fix(): Workflow engine redis worker instance in worker mode diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index cb4a3c6f65..04aae468a0 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -145,37 +145,37 @@ export class RedisDistributedTransactionStorage // Remove all repeatable jobs from the old queue since now we have a queue dedicated to scheduled jobs await this.removeAllRepeatableJobs(this.queue) - this.worker = new Worker( - this.queueName, - async (job) => { - this.logger_.debug( - `executing job ${job.name} from queue ${ - this.queueName - } with the following data: ${JSON.stringify(job.data)}` - ) - if (allowedJobs.includes(job.name as JobType)) { - try { - await this.executeTransaction( - job.data.workflowId, - job.data.transactionId, - job.data.transactionMetadata - ) - } catch (error) { - if (!SkipExecutionError.isSkipExecutionError(error)) { - throw error + if (this.#isWorkerMode) { + this.worker = new Worker( + this.queueName, + async (job) => { + this.logger_.debug( + `executing job ${job.name} from queue ${ + this.queueName + } with the following data: ${JSON.stringify(job.data)}` + ) + if (allowedJobs.includes(job.name as JobType)) { + try { + await this.executeTransaction( + job.data.workflowId, + job.data.transactionId, + job.data.transactionMetadata + ) + } catch (error) { + if (!SkipExecutionError.isSkipExecutionError(error)) { + throw error + } } } - } - if (job.name === JobType.SCHEDULE) { - // Remove repeatable job from the old queue since now we have a queue dedicated to scheduled jobs - await this.remove(job.data.jobId) - } - }, - workerOptions - ) + if (job.name === JobType.SCHEDULE) { + // Remove repeatable job from the old queue since now we have a queue dedicated to scheduled jobs + await this.remove(job.data.jobId) + } + }, + workerOptions + ) - if (this.#isWorkerMode) { this.jobWorker = new Worker( this.jobQueueName, async (job) => {