diff --git a/.changeset/thick-steaks-wait.md b/.changeset/thick-steaks-wait.md new file mode 100644 index 0000000000..8fb363377d --- /dev/null +++ b/.changeset/thick-steaks-wait.md @@ -0,0 +1,5 @@ +--- +"@medusajs/workflow-engine-redis": patch +--- + +fix(workflow-engine-redis): Split the queues and respect worker mode for job executions diff --git a/packages/modules/workflow-engine-redis/src/loaders/redis.ts b/packages/modules/workflow-engine-redis/src/loaders/redis.ts index 75cfd33266..2da145c109 100644 --- a/packages/modules/workflow-engine-redis/src/loaders/redis.ts +++ b/packages/modules/workflow-engine-redis/src/loaders/redis.ts @@ -1,14 +1,15 @@ -import { LoaderOptions } from "@medusajs/framework/types" +import { + InternalModuleDeclaration, + LoaderOptions, +} from "@medusajs/framework/types" import { asValue } from "awilix" import Redis from "ioredis" import { RedisWorkflowsOptions } from "../types" -export default async ({ - container, - logger, - options, - dataLoaderOnly, -}: LoaderOptions): Promise => { +export default async ( + { container, logger, options, dataLoaderOnly }: LoaderOptions, + moduleDeclaration: InternalModuleDeclaration +): Promise => { const { url, options: redisOptions, @@ -25,6 +26,7 @@ export default async ({ const cnnPubSub = pubsub ?? { url, options: redisOptions } const queueName = options?.queueName ?? "medusa-workflows" + const jobQueueName = options?.jobQueueName ?? "medusa-workflows-jobs" let connection let redisPublisher @@ -59,12 +61,14 @@ export default async ({ } container.register({ + isWorkerMode: asValue(moduleDeclaration.worker_mode !== "server"), partialLoading: asValue(true), redisConnection: asValue(connection), redisWorkerConnection: asValue(workerConnection), redisPublisher: asValue(redisPublisher), redisSubscriber: asValue(redisSubscriber), redisQueueName: asValue(queueName), + redisJobQueueName: asValue(jobQueueName), redisDisconnectHandler: asValue(async () => { connection.disconnect() workerConnection.disconnect() diff --git a/packages/modules/workflow-engine-redis/src/types/index.ts b/packages/modules/workflow-engine-redis/src/types/index.ts index 6d7f6d106a..1ea5613d9f 100644 --- a/packages/modules/workflow-engine-redis/src/types/index.ts +++ b/packages/modules/workflow-engine-redis/src/types/index.ts @@ -19,6 +19,11 @@ export type RedisWorkflowsOptions = { */ queueName?: string + /** + * Queue name used for job execution + */ + jobQueueName?: string + /** * Redis client options */ diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index c97a751eab..8365160509 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -36,37 +36,55 @@ export class RedisDistributedTransactionStorage private redisClient: Redis private redisWorkerConnection: Redis private queueName: string + private jobQueueName: string private queue: Queue + private jobQueue?: Queue private worker: Worker + private jobWorker?: Worker + + #isWorkerMode: boolean = false constructor({ workflowExecutionService, redisConnection, redisWorkerConnection, redisQueueName, + redisJobQueueName, logger, + isWorkerMode, }: { workflowExecutionService: ModulesSdkTypes.IMedusaInternalService redisConnection: Redis redisWorkerConnection: Redis redisQueueName: string + redisJobQueueName: string logger: Logger + isWorkerMode: boolean }) { this.workflowExecutionService_ = workflowExecutionService this.logger_ = logger this.redisClient = redisConnection this.redisWorkerConnection = redisWorkerConnection this.queueName = redisQueueName + this.jobQueueName = redisJobQueueName this.queue = new Queue(redisQueueName, { connection: this.redisClient }) + this.jobQueue = isWorkerMode + ? new Queue(redisJobQueueName, { + connection: this.redisClient, + }) + : undefined + this.#isWorkerMode = isWorkerMode } async onApplicationPrepareShutdown() { // Close worker gracefully, i.e. wait for the current jobs to finish await this.worker?.close() + await this.jobWorker?.close() } async onApplicationShutdown() { await this.queue?.close() + await this.jobQueue?.close() } async onApplicationStart() { @@ -76,6 +94,11 @@ export class RedisDistributedTransactionStorage JobType.TRANSACTION_TIMEOUT, ] + const workerOptions = { + connection: + this.redisWorkerConnection /*, runRetryDelay: 100000 for tests */, + } + this.worker = new Worker( this.queueName, async (job) => { @@ -86,19 +109,26 @@ export class RedisDistributedTransactionStorage ) } - // Note: We might even want a separate worker with different concurrency settings in the future, but for now we keep it simple if (job.name === JobType.SCHEDULE) { + // Remove repeatable job from the old queue since now we have a queue dedicated to scheduled jobs + await this.remove(job.data.jobId) + } + }, + workerOptions + ) + + if (this.#isWorkerMode) { + this.jobWorker = new Worker( + this.jobQueueName, + async (job) => { await this.executeScheduledJob( job.data.jobId, job.data.schedulerOptions ) - } - }, - { - connection: - this.redisWorkerConnection /*, runRetryDelay: 100000 for tests */, - } - ) + }, + workerOptions + ) + } } setWorkflowOrchestratorService(workflowOrchestratorService) { @@ -369,10 +399,18 @@ export class RedisDistributedTransactionStorage step?: TransactionStep ) { const jobId = this.getJobId(type, transaction, step) - const job = await this.queue.getJob(jobId) - if (job && job.attemptsStarted === 0) { - await job.remove() + if (type === JobType.SCHEDULE) { + const job = await this.jobQueue?.getJob(jobId) + if (job) { + await job.remove() + } + } else { + const job = await this.queue.getJob(jobId) + + if (job && job.attemptsStarted === 0) { + await job.remove() + } } } @@ -385,7 +423,7 @@ export class RedisDistributedTransactionStorage typeof jobDefinition === "string" ? jobDefinition : jobDefinition.jobId // If it is the same key (eg. the same workflow name), the old one will get overridden. - await this.queue.add( + await this.jobQueue?.add( JobType.SCHEDULE, { jobId, @@ -410,13 +448,13 @@ export class RedisDistributedTransactionStorage } async remove(jobId: string): Promise { - await this.queue.removeRepeatableByKey(`${JobType.SCHEDULE}_${jobId}`) + await this.jobQueue?.removeRepeatableByKey(`${JobType.SCHEDULE}_${jobId}`) } async removeAll(): Promise { - const repeatableJobs = await this.queue.getRepeatableJobs() + const repeatableJobs = (await this.jobQueue?.getRepeatableJobs()) ?? [] await promiseAll( - repeatableJobs.map((job) => this.queue.removeRepeatableByKey(job.key)) + repeatableJobs.map((job) => this.jobQueue?.removeRepeatableByKey(job.key)) ) } }