diff --git a/.changeset/slimy-avocados-film.md b/.changeset/slimy-avocados-film.md new file mode 100644 index 0000000000..ef12990bed --- /dev/null +++ b/.changeset/slimy-avocados-film.md @@ -0,0 +1,6 @@ +--- +"@medusajs/event-bus-redis": patch +"@medusajs/workflow-engine-redis": patch +--- + +chore(): Enable more granualar queue configuration diff --git a/packages/modules/event-bus-redis/src/loaders/index.ts b/packages/modules/event-bus-redis/src/loaders/index.ts index 9ae66e22e5..6101a97849 100644 --- a/packages/modules/event-bus-redis/src/loaders/index.ts +++ b/packages/modules/event-bus-redis/src/loaders/index.ts @@ -1,5 +1,5 @@ -import { LoaderOptions } from "@medusajs/framework/types" import { asValue } from "@medusajs/framework/awilix" +import { LoaderOptions } from "@medusajs/framework/types" import Redis from "ioredis" import { EOL } from "os" import { EventBusRedisModuleOptions } from "../types" @@ -9,7 +9,14 @@ export default async ({ logger, options, }: LoaderOptions): Promise => { - const { redisUrl, redisOptions } = options as EventBusRedisModuleOptions + const { + redisUrl, + redisOptions, + queueName, + queueOptions, + workerOptions, + jobOptions, + } = options as EventBusRedisModuleOptions if (!redisUrl) { throw Error( @@ -39,5 +46,9 @@ export default async ({ container.register({ eventBusRedisConnection: asValue(connection), + eventBusRedisQueueName: asValue(queueName ?? "events-queue"), + eventBusRedisQueueOptions: asValue(queueOptions ?? {}), + eventBusRedisWorkerOptions: asValue(workerOptions ?? {}), + eventBusRedisJobOptions: asValue(jobOptions ?? {}), }) } diff --git a/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts b/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts index 8cdd9f2664..81b4313bcc 100644 --- a/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts +++ b/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts @@ -21,12 +21,17 @@ const redisMock = { unlink: () => jest.fn(), } as unknown as Redis -const simpleModuleOptions = { redisUrl: "test-url" } const moduleDeps = { logger: loggerMock, eventBusRedisConnection: redisMock, + eventBusRedisQueueName: "events-queue", + eventBusRedisQueueOptions: {}, + eventBusRedisWorkerOptions: {}, + eventBusRedisJobOptions: {}, } +const moduleDeclaration = { scope: "internal" } as any + describe("RedisEventBusService", () => { let eventBus: RedisEventBusService let queue @@ -36,9 +41,7 @@ describe("RedisEventBusService", () => { beforeEach(async () => { jest.clearAllMocks() - eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, { - scope: "internal", - }) + eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration) }) it("Creates a queue + worker", () => { @@ -62,9 +65,7 @@ describe("RedisEventBusService", () => { it("Throws on isolated module declaration", () => { try { - eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, { - scope: "internal", - }) + eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration) } catch (error) { expect(error.message).toEqual( "At the moment this module can only be used with shared resources" @@ -78,9 +79,7 @@ describe("RedisEventBusService", () => { beforeEach(async () => { jest.clearAllMocks() - eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, { - scope: "internal", - }) + eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration) queue = (eventBus as any).queue_ queue.addBulk = jest.fn() @@ -139,17 +138,15 @@ describe("RedisEventBusService", () => { it("should add job to queue with module job options", async () => { eventBus = new RedisEventBusService( - moduleDeps, { - ...simpleModuleOptions, - jobOptions: { + ...moduleDeps, + eventBusRedisJobOptions: { removeOnComplete: { age: 5 }, attempts: 7, }, }, - { - scope: "internal", - } + {}, + moduleDeclaration ) queue = (eventBus as any).queue_ @@ -186,16 +183,14 @@ describe("RedisEventBusService", () => { it("should add job to queue with default, local, and global options merged", async () => { eventBus = new RedisEventBusService( - moduleDeps, { - ...simpleModuleOptions, - jobOptions: { + ...moduleDeps, + eventBusRedisJobOptions: { removeOnComplete: 5, }, }, - { - scope: "internal", - } + {}, + moduleDeclaration ) queue = (eventBus as any).queue_ @@ -340,9 +335,7 @@ describe("RedisEventBusService", () => { beforeEach(async () => { jest.clearAllMocks() - eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, { - scope: "internal", - }) + eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration) queue = (eventBus as any).queue_ queue.addBulk = jest.fn() @@ -485,9 +478,7 @@ describe("RedisEventBusService", () => { beforeEach(async () => { jest.clearAllMocks() - eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, { - scope: "internal", - }) + eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration) }) it("should process a simple event with no options", async () => { diff --git a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts index 6973c5e3e5..3857c31cd4 100644 --- a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts +++ b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts @@ -9,13 +9,28 @@ import { isPresent, promiseAll, } from "@medusajs/framework/utils" -import { BulkJobOptions, Queue, Worker } from "bullmq" +import { + BulkJobOptions, + Queue, + QueueOptions, + Worker, + WorkerOptions, +} from "bullmq" import { Redis } from "ioredis" -import { BullJob, EventBusRedisModuleOptions, Options } from "../types" +import { + BullJob, + EmitOptions, + EventBusRedisModuleOptions, + Options, +} from "../types" type InjectedDependencies = { logger: Logger eventBusRedisConnection: Redis + eventBusRedisQueueName: string + eventBusRedisQueueOptions: Omit + eventBusRedisWorkerOptions: Omit + eventBusRedisJobOptions: EmitOptions } type IORedisEventType = { @@ -31,46 +46,53 @@ type IORedisEventType = { // eslint-disable-next-line max-len export default class RedisEventBusService extends AbstractEventBusModuleService { protected readonly logger_: Logger - protected readonly moduleOptions_: EventBusRedisModuleOptions - // eslint-disable-next-line max-len - protected readonly moduleDeclaration_: InternalModuleDeclaration protected readonly eventBusRedisConnection_: Redis + protected readonly queueName_: string + protected readonly queueOptions_: Omit + protected readonly workerOptions_: Omit + protected readonly jobOptions_: EmitOptions + protected queue_: Queue protected bullWorker_: Worker constructor( - { logger, eventBusRedisConnection }: InjectedDependencies, - moduleOptions: EventBusRedisModuleOptions = {}, - moduleDeclaration: InternalModuleDeclaration + { + logger, + eventBusRedisConnection, + eventBusRedisQueueName, + eventBusRedisQueueOptions, + eventBusRedisWorkerOptions, + eventBusRedisJobOptions, + }: InjectedDependencies, + _moduleOptions: EventBusRedisModuleOptions = {}, + _moduleDeclaration: InternalModuleDeclaration ) { // @ts-ignore - // eslint-disable-next-line prefer-rest-params super(...arguments) this.eventBusRedisConnection_ = eventBusRedisConnection - - this.moduleOptions_ = moduleOptions this.logger_ = logger - this.queue_ = new Queue(moduleOptions.queueName ?? `events-queue`, { + this.queueName_ = eventBusRedisQueueName ?? "events-queue" + this.queueOptions_ = eventBusRedisQueueOptions ?? {} + this.workerOptions_ = eventBusRedisWorkerOptions ?? {} + this.jobOptions_ = eventBusRedisJobOptions ?? {} + + this.queue_ = new Queue(this.queueName_, { prefix: `${this.constructor.name}`, - ...(moduleOptions.queueOptions ?? {}), + ...this.queueOptions_, connection: eventBusRedisConnection, }) // Register our worker to handle emit calls if (this.isWorkerMode) { - this.bullWorker_ = new Worker( - moduleOptions.queueName ?? "events-queue", - this.worker_, - { - prefix: `${this.constructor.name}`, - ...(moduleOptions.workerOptions ?? {}), - connection: eventBusRedisConnection, - autorun: false, - } - ) + this.bullWorker_ = new Worker(this.queueName_, this.worker_, { + prefix: `${this.constructor.name}`, + ...this.workerOptions_, + connection: eventBusRedisConnection, + autorun: false, + }) } } @@ -97,7 +119,7 @@ export default class RedisEventBusService extends AbstractEventBusModuleService removeOnComplete: true, attempts: 1, // global options - ...(this.moduleOptions_.jobOptions ?? {}), + ...this.jobOptions_, ...options, } diff --git a/packages/modules/event-bus-redis/src/types/index.ts b/packages/modules/event-bus-redis/src/types/index.ts index f2e75a7cf7..5c918fd1bf 100644 --- a/packages/modules/event-bus-redis/src/types/index.ts +++ b/packages/modules/event-bus-redis/src/types/index.ts @@ -26,12 +26,31 @@ export type BullJob = { export type EmitOptions = JobsOptions export type EventBusRedisModuleOptions = { + /** + * Queue name for the event bus + */ queueName?: string - queueOptions?: QueueOptions - workerOptions?: WorkerOptions + /** + * Options for BullMQ Queue instance + * @see https://api.docs.bullmq.io/interfaces/v5.QueueOptions.html + */ + queueOptions?: Omit + /** + * Options for BullMQ Worker instance + * @see https://api.docs.bullmq.io/interfaces/v5.WorkerOptions.html + */ + workerOptions?: Omit + + /** + * Redis connection string + */ redisUrl?: string + + /** + * Redis client options + */ redisOptions?: RedisOptions /** diff --git a/packages/modules/workflow-engine-redis/src/loaders/__tests__/redis.spec.ts b/packages/modules/workflow-engine-redis/src/loaders/__tests__/redis.spec.ts new file mode 100644 index 0000000000..2131e0fca8 --- /dev/null +++ b/packages/modules/workflow-engine-redis/src/loaders/__tests__/redis.spec.ts @@ -0,0 +1,323 @@ +import { Logger } from "@medusajs/framework/types" +import redisLoader from "../redis" + +jest.mock("ioredis", () => { + return jest.fn().mockImplementation(() => ({ + connect: jest.fn((callback) => { + if (callback) callback() + return Promise.resolve() + }), + disconnect: jest.fn(), + })) +}) + +const loggerMock = { + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + debug: jest.fn(), +} as unknown as Logger + +describe("Redis Loader", () => { + let containerMock: { register: jest.Mock } + + beforeEach(() => { + jest.clearAllMocks() + containerMock = { + register: jest.fn(), + } + }) + + describe("Option merging", () => { + it("should use shared queueOptions as default for all queues", async () => { + const sharedQueueOptions = { + defaultJobOptions: { removeOnComplete: 1000 }, + } + + await redisLoader( + { + container: containerMock as any, + logger: loggerMock, + options: { + redis: { + redisUrl: "redis://localhost:6379", + queueOptions: sharedQueueOptions, + }, + }, + } as any, + {} as any + ) + + const registerCall = containerMock.register.mock.calls[0][0] + + expect(registerCall.redisMainQueueOptions.resolve()).toEqual( + sharedQueueOptions + ) + expect(registerCall.redisJobQueueOptions.resolve()).toEqual( + sharedQueueOptions + ) + expect(registerCall.redisCleanerQueueOptions.resolve()).toEqual( + sharedQueueOptions + ) + }) + + it("should use shared workerOptions as default for all workers", async () => { + const sharedWorkerOptions = { concurrency: 10 } + + await redisLoader( + { + container: containerMock as any, + logger: loggerMock, + options: { + redis: { + redisUrl: "redis://localhost:6379", + workerOptions: sharedWorkerOptions, + }, + }, + } as any, + {} as any + ) + + const registerCall = containerMock.register.mock.calls[0][0] + + expect(registerCall.redisMainWorkerOptions.resolve()).toEqual( + sharedWorkerOptions + ) + expect(registerCall.redisJobWorkerOptions.resolve()).toEqual( + sharedWorkerOptions + ) + expect(registerCall.redisCleanerWorkerOptions.resolve()).toEqual( + sharedWorkerOptions + ) + }) + + it("should override shared options with per-queue options", async () => { + const sharedQueueOptions = { + defaultJobOptions: { removeOnComplete: 1000 }, + } + const mainQueueOptions = { + defaultJobOptions: { removeOnComplete: 500, attempts: 3 }, + } + + await redisLoader( + { + container: containerMock as any, + logger: loggerMock, + options: { + redis: { + redisUrl: "redis://localhost:6379", + queueOptions: sharedQueueOptions, + mainQueueOptions: mainQueueOptions, + }, + }, + } as any, + {} as any + ) + + const registerCall = containerMock.register.mock.calls[0][0] + + expect(registerCall.redisMainQueueOptions.resolve()).toEqual({ + defaultJobOptions: { removeOnComplete: 500, attempts: 3 }, + }) + + expect(registerCall.redisJobQueueOptions.resolve()).toEqual( + sharedQueueOptions + ) + expect(registerCall.redisCleanerQueueOptions.resolve()).toEqual( + sharedQueueOptions + ) + }) + + it("should override shared worker options with per-worker options", async () => { + const sharedWorkerOptions = { concurrency: 10 } + const jobWorkerOptions = { concurrency: 5 } + const cleanerWorkerOptions = { concurrency: 1 } + + await redisLoader( + { + container: containerMock as any, + logger: loggerMock, + options: { + redis: { + redisUrl: "redis://localhost:6379", + workerOptions: sharedWorkerOptions, + jobWorkerOptions: jobWorkerOptions, + cleanerWorkerOptions: cleanerWorkerOptions, + }, + }, + } as any, + {} as any + ) + + const registerCall = containerMock.register.mock.calls[0][0] + + expect(registerCall.redisMainWorkerOptions.resolve()).toEqual( + sharedWorkerOptions + ) + + expect(registerCall.redisJobWorkerOptions.resolve()).toEqual( + jobWorkerOptions + ) + + expect(registerCall.redisCleanerWorkerOptions.resolve()).toEqual( + cleanerWorkerOptions + ) + }) + + it("should merge nested options correctly", async () => { + const sharedWorkerOptions = { + concurrency: 10, + limiter: { max: 100, duration: 1000 }, + } + const mainWorkerOptions = { + concurrency: 20, + } + + await redisLoader( + { + container: containerMock as any, + logger: loggerMock, + options: { + redis: { + redisUrl: "redis://localhost:6379", + workerOptions: sharedWorkerOptions, + mainWorkerOptions: mainWorkerOptions, + }, + }, + } as any, + {} as any + ) + + const registerCall = containerMock.register.mock.calls[0][0] + + expect(registerCall.redisMainWorkerOptions.resolve()).toEqual({ + concurrency: 20, + limiter: { max: 100, duration: 1000 }, + }) + }) + }) + + describe("Deprecation warnings", () => { + it("should log warning when using deprecated 'url' option", async () => { + await redisLoader( + { + container: containerMock as any, + logger: loggerMock, + options: { + redis: { + url: "redis://localhost:6379", + }, + }, + } as any, + {} as any + ) + + expect(loggerMock.warn).toHaveBeenCalledWith( + "[Workflow-engine-redis] The `url` option is deprecated. Please use `redisUrl` instead for consistency with other modules." + ) + }) + + it("should log warning when using deprecated 'options' option", async () => { + await redisLoader( + { + container: containerMock as any, + logger: loggerMock, + options: { + redis: { + redisUrl: "redis://localhost:6379", + options: { maxRetriesPerRequest: 3 }, + }, + }, + } as any, + {} as any + ) + + expect(loggerMock.warn).toHaveBeenCalledWith( + "[Workflow-engine-redis] The `options` option is deprecated. Please use `redisOptions` instead for consistency with other modules." + ) + }) + + it("should not log warning when using new option names", async () => { + await redisLoader( + { + container: containerMock as any, + logger: loggerMock, + options: { + redis: { + redisUrl: "redis://localhost:6379", + redisOptions: { maxRetriesPerRequest: 3 }, + }, + }, + } as any, + {} as any + ) + + expect(loggerMock.warn).not.toHaveBeenCalled() + }) + }) + + describe("Queue names", () => { + it("should use default queue names when not provided", async () => { + await redisLoader( + { + container: containerMock as any, + logger: loggerMock, + options: { + redis: { + redisUrl: "redis://localhost:6379", + }, + }, + } as any, + {} as any + ) + + const registerCall = containerMock.register.mock.calls[0][0] + + expect(registerCall.redisQueueName.resolve()).toEqual("medusa-workflows") + expect(registerCall.redisJobQueueName.resolve()).toEqual( + "medusa-workflows-jobs" + ) + }) + + it("should use custom queue names when provided", async () => { + await redisLoader( + { + container: containerMock as any, + logger: loggerMock, + options: { + redis: { + redisUrl: "redis://localhost:6379", + queueName: "custom-workflows", + jobQueueName: "custom-jobs", + }, + }, + } as any, + {} as any + ) + + const registerCall = containerMock.register.mock.calls[0][0] + + expect(registerCall.redisQueueName.resolve()).toEqual("custom-workflows") + expect(registerCall.redisJobQueueName.resolve()).toEqual("custom-jobs") + }) + }) + + describe("Error handling", () => { + it("should throw error when redisUrl is not provided", async () => { + await expect( + redisLoader( + { + container: containerMock as any, + logger: loggerMock, + options: { + redis: {}, + }, + } as any, + {} as any + ) + ).rejects.toThrow( + "No `redis.redisUrl` (or deprecated `redis.url`) provided" + ) + }) + }) +}) diff --git a/packages/modules/workflow-engine-redis/src/loaders/redis.ts b/packages/modules/workflow-engine-redis/src/loaders/redis.ts index 0a6b38ddd6..259083d111 100644 --- a/packages/modules/workflow-engine-redis/src/loaders/redis.ts +++ b/packages/modules/workflow-engine-redis/src/loaders/redis.ts @@ -12,32 +12,86 @@ export default async ( ): Promise => { const { url, - options: redisOptions, + redisUrl, + options: deprecatedRedisOptions, + redisOptions: newRedisOptions, jobQueueName, queueName, + // Shared options + queueOptions, + workerOptions, + // Per-queue options + mainQueueOptions, + mainWorkerOptions, + jobQueueOptions, + jobWorkerOptions, + cleanerQueueOptions, + cleanerWorkerOptions, pubsub, } = options?.redis as RedisWorkflowsOptions - // TODO: get default from ENV VAR - if (!url) { - throw Error( - "No `redis.url` provided in `workflowOrchestrator` module options. It is required for the Workflow Orchestrator Redis." + // Handle backward compatibility for deprecated options + const resolvedUrl = redisUrl ?? url + const redisOptions = newRedisOptions ?? deprecatedRedisOptions + + // Log deprecation warnings + if (url && !redisUrl) { + logger?.warn( + "[Workflow-engine-redis] The `url` option is deprecated. Please use `redisUrl` instead for consistency with other modules." + ) + } + if (deprecatedRedisOptions && !newRedisOptions) { + logger?.warn( + "[Workflow-engine-redis] The `options` option is deprecated. Please use `redisOptions` instead for consistency with other modules." ) } - const cnnPubSub = pubsub ?? { url, options: redisOptions } + // TODO: get default from ENV VAR + if (!resolvedUrl) { + throw Error( + "No `redis.redisUrl` (or deprecated `redis.url`) provided in `workflowOrchestrator` module options. It is required for the Workflow Orchestrator Redis." + ) + } + + const cnnPubSub = pubsub ?? { url: resolvedUrl, options: redisOptions } const queueName_ = queueName ?? "medusa-workflows" const jobQueueName_ = jobQueueName ?? "medusa-workflows-jobs" + // Resolve per-queue options by merging shared defaults with per-queue overrides + const resolvedMainQueueOptions = { + ...(queueOptions ?? {}), + ...(mainQueueOptions ?? {}), + } + const resolvedMainWorkerOptions = { + ...(workerOptions ?? {}), + ...(mainWorkerOptions ?? {}), + } + const resolvedJobQueueOptions = { + ...(queueOptions ?? {}), + ...(jobQueueOptions ?? {}), + } + const resolvedJobWorkerOptions = { + ...(workerOptions ?? {}), + ...(jobWorkerOptions ?? {}), + } + const resolvedCleanerQueueOptions = { + ...(queueOptions ?? {}), + ...(cleanerQueueOptions ?? {}), + } + const resolvedCleanerWorkerOptions = { + ...(workerOptions ?? {}), + ...(cleanerWorkerOptions ?? {}), + } + let connection let redisPublisher let redisSubscriber let workerConnection try { - connection = await getConnection(url, redisOptions) - workerConnection = await getConnection(url, { + connection = await getConnection(resolvedUrl, redisOptions) + workerConnection = await getConnection(resolvedUrl, { ...(redisOptions ?? {}), maxRetriesPerRequest: null, }) @@ -71,6 +125,13 @@ export default async ( redisSubscriber: asValue(redisSubscriber), redisQueueName: asValue(queueName_), redisJobQueueName: asValue(jobQueueName_), + // Per-queue resolved options + redisMainQueueOptions: asValue(resolvedMainQueueOptions), + redisMainWorkerOptions: asValue(resolvedMainWorkerOptions), + redisJobQueueOptions: asValue(resolvedJobQueueOptions), + redisJobWorkerOptions: asValue(resolvedJobWorkerOptions), + redisCleanerQueueOptions: asValue(resolvedCleanerQueueOptions), + redisCleanerWorkerOptions: asValue(resolvedCleanerWorkerOptions), redisDisconnectHandler: asValue(async () => { connection.disconnect() workerConnection.disconnect() diff --git a/packages/modules/workflow-engine-redis/src/types/index.ts b/packages/modules/workflow-engine-redis/src/types/index.ts index 1ea5613d9f..c22653472e 100644 --- a/packages/modules/workflow-engine-redis/src/types/index.ts +++ b/packages/modules/workflow-engine-redis/src/types/index.ts @@ -1,4 +1,5 @@ import { Logger } from "@medusajs/framework/types" +import { QueueOptions, WorkerOptions } from "bullmq" import { RedisOptions } from "ioredis" export type InitializeModuleInjectableDependencies = { @@ -6,31 +7,170 @@ export type InitializeModuleInjectableDependencies = { } /** - * Module config type + * Module config type for the Redis Workflow Engine. + * + * The workflow engine uses three queues internally: + * - **Main queue**: Handles workflow retries, step timeouts, and transaction timeouts + * - **Job queue**: Handles scheduled workflow job execution + * - **Cleaner queue**: Handles periodic cleanup of expired workflow executions + * + * You can configure shared options that apply to all queues/workers, or provide + * per-queue overrides for fine-grained control. + * + * @example + * ```ts + * // Simple configuration - same options for all queues + * { + * redisUrl: "redis://localhost:6379", + * queueOptions: { defaultJobOptions: { removeOnComplete: 1000 } }, + * workerOptions: { concurrency: 10 } + * } + * ``` + * + * @example + * ```ts + * // Advanced configuration - per-queue overrides + * { + * redisUrl: "redis://localhost:6379", + * // Shared defaults + * workerOptions: { concurrency: 10 }, + * // Override for job queue (scheduled workflows) + * jobWorkerOptions: { concurrency: 5 }, + * // Override for cleaner (low priority) + * cleanerWorkerOptions: { concurrency: 1 } + * } + * ``` */ export type RedisWorkflowsOptions = { /** * Redis connection string + * @deprecated Use `redisUrl` instead for consistency with other modules */ url?: string /** - * Queue name used for retries and timeouts + * Redis connection string + */ + redisUrl?: string + + /** + * Name for the main workflow queue that handles retries, step timeouts, + * and transaction timeouts. + * @default "medusa-workflows" */ queueName?: string /** - * Queue name used for job execution + * Name for the job queue that handles scheduled workflow execution. + * @default "medusa-workflows-jobs" */ jobQueueName?: string /** * Redis client options + * @deprecated Use `redisOptions` instead for consistency with other modules */ options?: RedisOptions /** - * Optiona connection string and options to pub/sub + * Redis client options passed to ioredis + * @see https://github.com/redis/ioredis#connect-to-redis + */ + redisOptions?: RedisOptions + + /* + * ========================================================================= + * Shared Queue/Worker Options + * ========================================================================= + * These options apply to all queues and workers unless overridden by + * per-queue specific options below. + */ + + /** + * Default options for all BullMQ Queue instances. + * Can be overridden per-queue using `mainQueueOptions`, `jobQueueOptions`, + * or `cleanerQueueOptions`. + * @see https://api.docs.bullmq.io/interfaces/v5.QueueOptions.html + */ + queueOptions?: Omit + + /** + * Default options for all BullMQ Worker instances. + * Can be overridden per-worker using `mainWorkerOptions`, `jobWorkerOptions`, + * or `cleanerWorkerOptions`. + * @see https://api.docs.bullmq.io/interfaces/v5.WorkerOptions.html + */ + workerOptions?: Omit + + /* + * ========================================================================= + * Per-Queue Options (Main Queue) + * ========================================================================= + * The main queue handles workflow retries, step timeouts, and transaction + * timeouts. These are critical real-time operations. + */ + + /** + * Options specific to the main workflow queue. + * Overrides `queueOptions` for this queue only. + * @see https://api.docs.bullmq.io/interfaces/v5.QueueOptions.html + */ + mainQueueOptions?: Omit + + /** + * Options specific to the main workflow worker. + * Overrides `workerOptions` for this worker only. + * @see https://api.docs.bullmq.io/interfaces/v5.WorkerOptions.html + */ + mainWorkerOptions?: Omit + + /* + * ========================================================================= + * Per-Queue Options (Job Queue) + * ========================================================================= + * The job queue handles scheduled workflow execution. You may want different + * concurrency settings for scheduled vs real-time workflows. + */ + + /** + * Options specific to the job queue (scheduled workflows). + * Overrides `queueOptions` for this queue only. + * @see https://api.docs.bullmq.io/interfaces/v5.QueueOptions.html + */ + jobQueueOptions?: Omit + + /** + * Options specific to the job worker (scheduled workflows). + * Overrides `workerOptions` for this worker only. + * @see https://api.docs.bullmq.io/interfaces/v5.WorkerOptions.html + */ + jobWorkerOptions?: Omit + + /* + * ========================================================================= + * Per-Queue Options (Cleaner Queue) + * ========================================================================= + * The cleaner queue runs periodically (every 30 minutes) to remove expired + * workflow executions. This is a low-priority background task. + */ + + /** + * Options specific to the cleaner queue. + * Overrides `queueOptions` for this queue only. + * @see https://api.docs.bullmq.io/interfaces/v5.QueueOptions.html + */ + cleanerQueueOptions?: Omit + + /** + * Options specific to the cleaner worker. + * Overrides `workerOptions` for this worker only. + * @see https://api.docs.bullmq.io/interfaces/v5.WorkerOptions.html + */ + cleanerWorkerOptions?: Omit + + /** + * Optional separate connection string and options for pub/sub. + * If not provided, uses the main Redis connection. */ pubsub?: { url: string diff --git a/packages/modules/workflow-engine-redis/src/utils/__tests__/workflow-orchestrator-storage.spec.ts b/packages/modules/workflow-engine-redis/src/utils/__tests__/workflow-orchestrator-storage.spec.ts new file mode 100644 index 0000000000..001bdf430c --- /dev/null +++ b/packages/modules/workflow-engine-redis/src/utils/__tests__/workflow-orchestrator-storage.spec.ts @@ -0,0 +1,321 @@ +import { Logger, ModulesSdkTypes } from "@medusajs/framework/types" +import { Queue, Worker } from "bullmq" +import Redis from "ioredis" +import { RedisDistributedTransactionStorage } from "../workflow-orchestrator-storage" + +jest.mock("bullmq") +jest.mock("ioredis") + +const loggerMock = { + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + debug: jest.fn(), +} as unknown as Logger + +const redisMock = { + status: "ready", + disconnect: jest.fn(), + get: jest.fn(), + set: jest.fn(), + del: jest.fn(), + pipeline: jest.fn(() => ({ + exec: jest.fn(), + })), +} as unknown as Redis + +const workflowExecutionServiceMock = { + list: jest.fn(), + upsert: jest.fn(), + delete: jest.fn(), +} as unknown as ModulesSdkTypes.IMedusaInternalService + +const baseModuleDeps = { + workflowExecutionService: workflowExecutionServiceMock, + redisConnection: redisMock, + redisWorkerConnection: redisMock, + redisQueueName: "medusa-workflows", + redisJobQueueName: "medusa-workflows-jobs", + logger: loggerMock, + isWorkerMode: true, +} + +describe("RedisDistributedTransactionStorage", () => { + beforeEach(() => { + jest.clearAllMocks() + }) + + describe("constructor - Queue configuration", () => { + it("should create queues with default empty options when no options provided", () => { + new RedisDistributedTransactionStorage({ + ...baseModuleDeps, + redisMainQueueOptions: {}, + redisMainWorkerOptions: {}, + redisJobQueueOptions: {}, + redisJobWorkerOptions: {}, + redisCleanerQueueOptions: {}, + redisCleanerWorkerOptions: {}, + }) + + expect(Queue).toHaveBeenCalledWith("medusa-workflows", { + connection: redisMock, + }) + + expect(Queue).toHaveBeenCalledWith("medusa-workflows-jobs", { + connection: redisMock, + }) + + expect(Queue).toHaveBeenCalledWith("workflows-cleaner", { + connection: redisMock, + }) + + expect(Queue).toHaveBeenCalledTimes(3) + }) + + it("should create main queue with custom options", () => { + const mainQueueOptions = { + defaultJobOptions: { + removeOnComplete: 1000, + removeOnFail: 5000, + }, + } + + new RedisDistributedTransactionStorage({ + ...baseModuleDeps, + redisMainQueueOptions: mainQueueOptions, + redisMainWorkerOptions: {}, + redisJobQueueOptions: {}, + redisJobWorkerOptions: {}, + redisCleanerQueueOptions: {}, + redisCleanerWorkerOptions: {}, + }) + + expect(Queue).toHaveBeenCalledWith("medusa-workflows", { + ...mainQueueOptions, + connection: redisMock, + }) + }) + + it("should create job queue with custom options", () => { + const jobQueueOptions = { + defaultJobOptions: { + attempts: 5, + backoff: { type: "exponential", delay: 1000 }, + }, + } + + new RedisDistributedTransactionStorage({ + ...baseModuleDeps, + redisMainQueueOptions: {}, + redisMainWorkerOptions: {}, + redisJobQueueOptions: jobQueueOptions, + redisJobWorkerOptions: {}, + redisCleanerQueueOptions: {}, + redisCleanerWorkerOptions: {}, + }) + + expect(Queue).toHaveBeenCalledWith("medusa-workflows-jobs", { + ...jobQueueOptions, + connection: redisMock, + }) + }) + + it("should create cleaner queue with custom options", () => { + const cleanerQueueOptions = { + defaultJobOptions: { + removeOnComplete: true, + removeOnFail: true, + }, + } + + new RedisDistributedTransactionStorage({ + ...baseModuleDeps, + redisMainQueueOptions: {}, + redisMainWorkerOptions: {}, + redisJobQueueOptions: {}, + redisJobWorkerOptions: {}, + redisCleanerQueueOptions: cleanerQueueOptions, + redisCleanerWorkerOptions: {}, + }) + + expect(Queue).toHaveBeenCalledWith("workflows-cleaner", { + ...cleanerQueueOptions, + connection: redisMock, + }) + }) + + it("should create each queue with different options", () => { + const mainQueueOptions = { defaultJobOptions: { removeOnComplete: 100 } } + const jobQueueOptions = { defaultJobOptions: { removeOnComplete: 200 } } + const cleanerQueueOptions = { + defaultJobOptions: { removeOnComplete: 300 }, + } + + new RedisDistributedTransactionStorage({ + ...baseModuleDeps, + redisMainQueueOptions: mainQueueOptions, + redisMainWorkerOptions: {}, + redisJobQueueOptions: jobQueueOptions, + redisJobWorkerOptions: {}, + redisCleanerQueueOptions: cleanerQueueOptions, + redisCleanerWorkerOptions: {}, + }) + + expect(Queue).toHaveBeenNthCalledWith(1, "medusa-workflows", { + ...mainQueueOptions, + connection: redisMock, + }) + + expect(Queue).toHaveBeenNthCalledWith(2, "medusa-workflows-jobs", { + ...jobQueueOptions, + connection: redisMock, + }) + + expect(Queue).toHaveBeenNthCalledWith(3, "workflows-cleaner", { + ...cleanerQueueOptions, + connection: redisMock, + }) + }) + + it("should not create job and cleaner queues when not in worker mode", () => { + new RedisDistributedTransactionStorage({ + ...baseModuleDeps, + isWorkerMode: false, + redisMainQueueOptions: {}, + redisMainWorkerOptions: {}, + redisJobQueueOptions: {}, + redisJobWorkerOptions: {}, + redisCleanerQueueOptions: {}, + redisCleanerWorkerOptions: {}, + }) + + expect(Queue).toHaveBeenCalledTimes(1) + expect(Queue).toHaveBeenCalledWith("medusa-workflows", { + connection: redisMock, + }) + }) + }) + + describe("onApplicationStart - Worker configuration", () => { + it("should create workers with custom options", async () => { + const mainWorkerOptions = { + concurrency: 10, + limiter: { max: 100, duration: 1000 }, + } + const jobWorkerOptions = { concurrency: 5 } + const cleanerWorkerOptions = { concurrency: 1 } + + const storage = new RedisDistributedTransactionStorage({ + ...baseModuleDeps, + redisMainQueueOptions: {}, + redisMainWorkerOptions: mainWorkerOptions, + redisJobQueueOptions: {}, + redisJobWorkerOptions: jobWorkerOptions, + redisCleanerQueueOptions: {}, + redisCleanerWorkerOptions: cleanerWorkerOptions, + }) + + const mockQueue = { + getRepeatableJobs: jest.fn().mockResolvedValue([]), + add: jest.fn().mockResolvedValue({}), + } + ;(storage as any).queue = mockQueue + ;(storage as any).cleanerQueue_ = mockQueue + + await storage.onApplicationStart() + + expect(Worker).toHaveBeenCalledWith( + "medusa-workflows", + expect.any(Function), + { + ...mainWorkerOptions, + connection: redisMock, + } + ) + + expect(Worker).toHaveBeenCalledWith( + "medusa-workflows-jobs", + expect.any(Function), + { + ...jobWorkerOptions, + connection: redisMock, + } + ) + + expect(Worker).toHaveBeenCalledWith( + "workflows-cleaner", + expect.any(Function), + { + ...cleanerWorkerOptions, + connection: redisMock, + } + ) + + expect(Worker).toHaveBeenCalledTimes(3) + }) + + it("should create each worker with different concurrency settings", async () => { + const storage = new RedisDistributedTransactionStorage({ + ...baseModuleDeps, + redisMainQueueOptions: {}, + redisMainWorkerOptions: { concurrency: 20 }, + redisJobQueueOptions: {}, + redisJobWorkerOptions: { concurrency: 10 }, + redisCleanerQueueOptions: {}, + redisCleanerWorkerOptions: { concurrency: 1 }, + }) + + const mockQueue = { + getRepeatableJobs: jest.fn().mockResolvedValue([]), + add: jest.fn().mockResolvedValue({}), + } + ;(storage as any).queue = mockQueue + ;(storage as any).cleanerQueue_ = mockQueue + + await storage.onApplicationStart() + + expect(Worker).toHaveBeenNthCalledWith( + 1, + "medusa-workflows", + expect.any(Function), + expect.objectContaining({ concurrency: 20 }) + ) + + expect(Worker).toHaveBeenNthCalledWith( + 2, + "medusa-workflows-jobs", + expect.any(Function), + expect.objectContaining({ concurrency: 10 }) + ) + + expect(Worker).toHaveBeenNthCalledWith( + 3, + "workflows-cleaner", + expect.any(Function), + expect.objectContaining({ concurrency: 1 }) + ) + }) + + it("should not create workers when not in worker mode", async () => { + const storage = new RedisDistributedTransactionStorage({ + ...baseModuleDeps, + isWorkerMode: false, + redisMainQueueOptions: {}, + redisMainWorkerOptions: {}, + redisJobQueueOptions: {}, + redisJobWorkerOptions: {}, + redisCleanerQueueOptions: {}, + redisCleanerWorkerOptions: {}, + }) + + const mockQueue = { + getRepeatableJobs: jest.fn().mockResolvedValue([]), + } + ;(storage as any).queue = mockQueue + + await storage.onApplicationStart() + + expect(Worker).not.toHaveBeenCalled() + }) + }) +}) diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 04aae468a0..19a7293d6c 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -24,7 +24,7 @@ import { TransactionStepState, } from "@medusajs/framework/utils" import { WorkflowOrchestratorService } from "@services" -import { Queue, RepeatOptions, Worker } from "bullmq" +import { Queue, QueueOptions, RepeatOptions, Worker, WorkerOptions } from "bullmq" import Redis from "ioredis" enum JobType { @@ -75,6 +75,14 @@ export class RedisDistributedTransactionStorage private cleanerWorker_: Worker private cleanerQueue_?: Queue + // Per-queue options + private mainQueueOptions_: Omit + private mainWorkerOptions_: Omit + private jobQueueOptions_: Omit + private jobWorkerOptions_: Omit + private cleanerQueueOptions_: Omit + private cleanerWorkerOptions_: Omit + #isWorkerMode: boolean = false constructor({ @@ -83,6 +91,12 @@ export class RedisDistributedTransactionStorage redisWorkerConnection, redisQueueName, redisJobQueueName, + redisMainQueueOptions, + redisMainWorkerOptions, + redisJobQueueOptions, + redisJobWorkerOptions, + redisCleanerQueueOptions, + redisCleanerWorkerOptions, logger, isWorkerMode, }: { @@ -91,6 +105,12 @@ export class RedisDistributedTransactionStorage redisWorkerConnection: Redis redisQueueName: string redisJobQueueName: string + redisMainQueueOptions: Omit + redisMainWorkerOptions: Omit + redisJobQueueOptions: Omit + redisJobWorkerOptions: Omit + redisCleanerQueueOptions: Omit + redisCleanerWorkerOptions: Omit logger: Logger isWorkerMode: boolean }) { @@ -101,14 +121,29 @@ export class RedisDistributedTransactionStorage this.cleanerQueueName = "workflows-cleaner" this.queueName = redisQueueName this.jobQueueName = redisJobQueueName - this.queue = new Queue(redisQueueName, { connection: this.redisClient }) + + // Store per-queue options + this.mainQueueOptions_ = redisMainQueueOptions ?? {} + this.mainWorkerOptions_ = redisMainWorkerOptions ?? {} + this.jobQueueOptions_ = redisJobQueueOptions ?? {} + this.jobWorkerOptions_ = redisJobWorkerOptions ?? {} + this.cleanerQueueOptions_ = redisCleanerQueueOptions ?? {} + this.cleanerWorkerOptions_ = redisCleanerWorkerOptions ?? {} + + // Create queues with their respective options + this.queue = new Queue(redisQueueName, { + ...this.mainQueueOptions_, + connection: this.redisClient, + }) this.jobQueue = isWorkerMode ? new Queue(redisJobQueueName, { + ...this.jobQueueOptions_, connection: this.redisClient, }) : undefined this.cleanerQueue_ = isWorkerMode ? new Queue(this.cleanerQueueName, { + ...this.cleanerQueueOptions_, connection: this.redisClient, }) : undefined @@ -137,7 +172,17 @@ export class RedisDistributedTransactionStorage JobType.TRANSACTION_TIMEOUT, ] - const workerOptions = { + // Per-worker options with their respective configurations + const mainWorkerOptions: WorkerOptions = { + ...this.mainWorkerOptions_, + connection: this.redisWorkerConnection, + } + const jobWorkerOptions: WorkerOptions = { + ...this.jobWorkerOptions_, + connection: this.redisWorkerConnection, + } + const cleanerWorkerOptions: WorkerOptions = { + ...this.cleanerWorkerOptions_, connection: this.redisWorkerConnection, } @@ -173,7 +218,7 @@ export class RedisDistributedTransactionStorage await this.remove(job.data.jobId) } }, - workerOptions + mainWorkerOptions ) this.jobWorker = new Worker( @@ -191,7 +236,7 @@ export class RedisDistributedTransactionStorage job.data.schedulerOptions ) }, - workerOptions + jobWorkerOptions ) this.cleanerWorker_ = new Worker( @@ -199,7 +244,7 @@ export class RedisDistributedTransactionStorage async () => { await this.clearExpiredExecutions() }, - workerOptions + cleanerWorkerOptions ) await this.cleanerQueue_?.add(