From cc8422d3a163d18492e1bd6de50e0796ca34a41f Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Thu, 6 Mar 2025 12:52:52 +0100 Subject: [PATCH] fix(workflow-engine-redis): Split the queues and respect worker mode for job executions (#11740) **What** Currently, the workflow engine redis does not make any distinction between worker modes, when starting as server, the engine listen to the queue which contains everything and try to execute the corresponding workflow which does not exists since job workflows are not loaded in server mode. Now, a dedicated queue is created for jobs and the worker is only started if the instance is not in server mode. In order to clean up the old queue, if the old queue trigger a scheduled job then it gets removed from the queue since it will get re added to the new queue by the new worker instances --- .changeset/thick-steaks-wait.md | 5 ++ .../src/loaders/redis.ts | 18 +++-- .../workflow-engine-redis/src/types/index.ts | 5 ++ .../utils/workflow-orchestrator-storage.ts | 68 +++++++++++++++---- 4 files changed, 74 insertions(+), 22 deletions(-) create mode 100644 .changeset/thick-steaks-wait.md diff --git a/.changeset/thick-steaks-wait.md b/.changeset/thick-steaks-wait.md new file mode 100644 index 0000000000..8fb363377d --- /dev/null +++ b/.changeset/thick-steaks-wait.md @@ -0,0 +1,5 @@ +--- +"@medusajs/workflow-engine-redis": patch +--- + +fix(workflow-engine-redis): Split the queues and respect worker mode for job executions diff --git a/packages/modules/workflow-engine-redis/src/loaders/redis.ts b/packages/modules/workflow-engine-redis/src/loaders/redis.ts index 75cfd33266..2da145c109 100644 --- a/packages/modules/workflow-engine-redis/src/loaders/redis.ts +++ b/packages/modules/workflow-engine-redis/src/loaders/redis.ts @@ -1,14 +1,15 @@ -import { LoaderOptions } from "@medusajs/framework/types" +import { + InternalModuleDeclaration, + LoaderOptions, +} from "@medusajs/framework/types" import { asValue } from "awilix" import Redis from "ioredis" import { RedisWorkflowsOptions } from "../types" -export default async ({ - container, - logger, - options, - dataLoaderOnly, -}: LoaderOptions): Promise => { +export default async ( + { container, logger, options, dataLoaderOnly }: LoaderOptions, + moduleDeclaration: InternalModuleDeclaration +): Promise => { const { url, options: redisOptions, @@ -25,6 +26,7 @@ export default async ({ const cnnPubSub = pubsub ?? { url, options: redisOptions } const queueName = options?.queueName ?? "medusa-workflows" + const jobQueueName = options?.jobQueueName ?? "medusa-workflows-jobs" let connection let redisPublisher @@ -59,12 +61,14 @@ export default async ({ } container.register({ + isWorkerMode: asValue(moduleDeclaration.worker_mode !== "server"), partialLoading: asValue(true), redisConnection: asValue(connection), redisWorkerConnection: asValue(workerConnection), redisPublisher: asValue(redisPublisher), redisSubscriber: asValue(redisSubscriber), redisQueueName: asValue(queueName), + redisJobQueueName: asValue(jobQueueName), redisDisconnectHandler: asValue(async () => { connection.disconnect() workerConnection.disconnect() diff --git a/packages/modules/workflow-engine-redis/src/types/index.ts b/packages/modules/workflow-engine-redis/src/types/index.ts index 6d7f6d106a..1ea5613d9f 100644 --- a/packages/modules/workflow-engine-redis/src/types/index.ts +++ b/packages/modules/workflow-engine-redis/src/types/index.ts @@ -19,6 +19,11 @@ export type RedisWorkflowsOptions = { */ queueName?: string + /** + * Queue name used for job execution + */ + jobQueueName?: string + /** * Redis client options */ diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index c97a751eab..8365160509 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -36,37 +36,55 @@ export class RedisDistributedTransactionStorage private redisClient: Redis private redisWorkerConnection: Redis private queueName: string + private jobQueueName: string private queue: Queue + private jobQueue?: Queue private worker: Worker + private jobWorker?: Worker + + #isWorkerMode: boolean = false constructor({ workflowExecutionService, redisConnection, redisWorkerConnection, redisQueueName, + redisJobQueueName, logger, + isWorkerMode, }: { workflowExecutionService: ModulesSdkTypes.IMedusaInternalService redisConnection: Redis redisWorkerConnection: Redis redisQueueName: string + redisJobQueueName: string logger: Logger + isWorkerMode: boolean }) { this.workflowExecutionService_ = workflowExecutionService this.logger_ = logger this.redisClient = redisConnection this.redisWorkerConnection = redisWorkerConnection this.queueName = redisQueueName + this.jobQueueName = redisJobQueueName this.queue = new Queue(redisQueueName, { connection: this.redisClient }) + this.jobQueue = isWorkerMode + ? new Queue(redisJobQueueName, { + connection: this.redisClient, + }) + : undefined + this.#isWorkerMode = isWorkerMode } async onApplicationPrepareShutdown() { // Close worker gracefully, i.e. wait for the current jobs to finish await this.worker?.close() + await this.jobWorker?.close() } async onApplicationShutdown() { await this.queue?.close() + await this.jobQueue?.close() } async onApplicationStart() { @@ -76,6 +94,11 @@ export class RedisDistributedTransactionStorage JobType.TRANSACTION_TIMEOUT, ] + const workerOptions = { + connection: + this.redisWorkerConnection /*, runRetryDelay: 100000 for tests */, + } + this.worker = new Worker( this.queueName, async (job) => { @@ -86,19 +109,26 @@ export class RedisDistributedTransactionStorage ) } - // Note: We might even want a separate worker with different concurrency settings in the future, but for now we keep it simple if (job.name === JobType.SCHEDULE) { + // Remove repeatable job from the old queue since now we have a queue dedicated to scheduled jobs + await this.remove(job.data.jobId) + } + }, + workerOptions + ) + + if (this.#isWorkerMode) { + this.jobWorker = new Worker( + this.jobQueueName, + async (job) => { await this.executeScheduledJob( job.data.jobId, job.data.schedulerOptions ) - } - }, - { - connection: - this.redisWorkerConnection /*, runRetryDelay: 100000 for tests */, - } - ) + }, + workerOptions + ) + } } setWorkflowOrchestratorService(workflowOrchestratorService) { @@ -369,10 +399,18 @@ export class RedisDistributedTransactionStorage step?: TransactionStep ) { const jobId = this.getJobId(type, transaction, step) - const job = await this.queue.getJob(jobId) - if (job && job.attemptsStarted === 0) { - await job.remove() + if (type === JobType.SCHEDULE) { + const job = await this.jobQueue?.getJob(jobId) + if (job) { + await job.remove() + } + } else { + const job = await this.queue.getJob(jobId) + + if (job && job.attemptsStarted === 0) { + await job.remove() + } } } @@ -385,7 +423,7 @@ export class RedisDistributedTransactionStorage typeof jobDefinition === "string" ? jobDefinition : jobDefinition.jobId // If it is the same key (eg. the same workflow name), the old one will get overridden. - await this.queue.add( + await this.jobQueue?.add( JobType.SCHEDULE, { jobId, @@ -410,13 +448,13 @@ export class RedisDistributedTransactionStorage } async remove(jobId: string): Promise { - await this.queue.removeRepeatableByKey(`${JobType.SCHEDULE}_${jobId}`) + await this.jobQueue?.removeRepeatableByKey(`${JobType.SCHEDULE}_${jobId}`) } async removeAll(): Promise { - const repeatableJobs = await this.queue.getRepeatableJobs() + const repeatableJobs = (await this.jobQueue?.getRepeatableJobs()) ?? [] await promiseAll( - repeatableJobs.map((job) => this.queue.removeRepeatableByKey(job.key)) + repeatableJobs.map((job) => this.jobQueue?.removeRepeatableByKey(job.key)) ) } }