chore(event-bus, workflow-engine): Enable more granualar queues configuration (#14201)

Summary

  This PR adds BullMQ queue and worker configuration options to the workflow-engine-redis module, bringing feature parity with the event-bus-redis module. It also introduces per-queue
  configuration options for fine-grained control over the three internal queues (main, job, and cleaner).

  Key changes:
  - Added per-queue BullMQ configuration options (mainQueueOptions, jobQueueOptions, cleanerQueueOptions and their worker counterparts) with shared defaults
  - Unified Redis option naming across modules: deprecated url → redisUrl, options → redisOptions (with backward compatibility)
  - Moved configuration resolution to the loader and registered options in the DI container
  - Added comprehensive JSDoc documentation for all configuration options
  - Added unit tests for option merging and queue/worker configuration

  Configuration Example

```ts
  // Simple configuration - same options for all queues
  {
    redisUrl: "redis://localhost:6379",
    queueOptions: { defaultJobOptions: { removeOnComplete: 1000 } },
    workerOptions: { concurrency: 10 }
  }
```

```ts
  // Advanced configuration - per-queue overrides
  {
    redisUrl: "redis://localhost:6379",
    workerOptions: { concurrency: 10 },        // shared default
    jobWorkerOptions: { concurrency: 5 },      // override for scheduled workflows
    cleanerWorkerOptions: { concurrency: 1 }   // override for cleanup (low priority)
  }
```
This commit is contained in:
Adrien de Peretti
2025-12-05 13:03:12 +01:00
committed by GitHub
parent 3e3e6c37bd
commit 144f0f4e2e
10 changed files with 1013 additions and 74 deletions

View File

@@ -1,5 +1,5 @@
import { LoaderOptions } from "@medusajs/framework/types"
import { asValue } from "@medusajs/framework/awilix"
import { LoaderOptions } from "@medusajs/framework/types"
import Redis from "ioredis"
import { EOL } from "os"
import { EventBusRedisModuleOptions } from "../types"
@@ -9,7 +9,14 @@ export default async ({
logger,
options,
}: LoaderOptions): Promise<void> => {
const { redisUrl, redisOptions } = options as EventBusRedisModuleOptions
const {
redisUrl,
redisOptions,
queueName,
queueOptions,
workerOptions,
jobOptions,
} = options as EventBusRedisModuleOptions
if (!redisUrl) {
throw Error(
@@ -39,5 +46,9 @@ export default async ({
container.register({
eventBusRedisConnection: asValue(connection),
eventBusRedisQueueName: asValue(queueName ?? "events-queue"),
eventBusRedisQueueOptions: asValue(queueOptions ?? {}),
eventBusRedisWorkerOptions: asValue(workerOptions ?? {}),
eventBusRedisJobOptions: asValue(jobOptions ?? {}),
})
}

View File

@@ -21,12 +21,17 @@ const redisMock = {
unlink: () => jest.fn(),
} as unknown as Redis
const simpleModuleOptions = { redisUrl: "test-url" }
const moduleDeps = {
logger: loggerMock,
eventBusRedisConnection: redisMock,
eventBusRedisQueueName: "events-queue",
eventBusRedisQueueOptions: {},
eventBusRedisWorkerOptions: {},
eventBusRedisJobOptions: {},
}
const moduleDeclaration = { scope: "internal" } as any
describe("RedisEventBusService", () => {
let eventBus: RedisEventBusService
let queue
@@ -36,9 +41,7 @@ describe("RedisEventBusService", () => {
beforeEach(async () => {
jest.clearAllMocks()
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
scope: "internal",
})
eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration)
})
it("Creates a queue + worker", () => {
@@ -62,9 +65,7 @@ describe("RedisEventBusService", () => {
it("Throws on isolated module declaration", () => {
try {
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
scope: "internal",
})
eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration)
} catch (error) {
expect(error.message).toEqual(
"At the moment this module can only be used with shared resources"
@@ -78,9 +79,7 @@ describe("RedisEventBusService", () => {
beforeEach(async () => {
jest.clearAllMocks()
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
scope: "internal",
})
eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration)
queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
@@ -139,17 +138,15 @@ describe("RedisEventBusService", () => {
it("should add job to queue with module job options", async () => {
eventBus = new RedisEventBusService(
moduleDeps,
{
...simpleModuleOptions,
jobOptions: {
...moduleDeps,
eventBusRedisJobOptions: {
removeOnComplete: { age: 5 },
attempts: 7,
},
},
{
scope: "internal",
}
{},
moduleDeclaration
)
queue = (eventBus as any).queue_
@@ -186,16 +183,14 @@ describe("RedisEventBusService", () => {
it("should add job to queue with default, local, and global options merged", async () => {
eventBus = new RedisEventBusService(
moduleDeps,
{
...simpleModuleOptions,
jobOptions: {
...moduleDeps,
eventBusRedisJobOptions: {
removeOnComplete: 5,
},
},
{
scope: "internal",
}
{},
moduleDeclaration
)
queue = (eventBus as any).queue_
@@ -340,9 +335,7 @@ describe("RedisEventBusService", () => {
beforeEach(async () => {
jest.clearAllMocks()
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
scope: "internal",
})
eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration)
queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
@@ -485,9 +478,7 @@ describe("RedisEventBusService", () => {
beforeEach(async () => {
jest.clearAllMocks()
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
scope: "internal",
})
eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration)
})
it("should process a simple event with no options", async () => {

View File

@@ -9,13 +9,28 @@ import {
isPresent,
promiseAll,
} from "@medusajs/framework/utils"
import { BulkJobOptions, Queue, Worker } from "bullmq"
import {
BulkJobOptions,
Queue,
QueueOptions,
Worker,
WorkerOptions,
} from "bullmq"
import { Redis } from "ioredis"
import { BullJob, EventBusRedisModuleOptions, Options } from "../types"
import {
BullJob,
EmitOptions,
EventBusRedisModuleOptions,
Options,
} from "../types"
type InjectedDependencies = {
logger: Logger
eventBusRedisConnection: Redis
eventBusRedisQueueName: string
eventBusRedisQueueOptions: Omit<QueueOptions, "connection">
eventBusRedisWorkerOptions: Omit<WorkerOptions, "connection">
eventBusRedisJobOptions: EmitOptions
}
type IORedisEventType<T = unknown> = {
@@ -31,46 +46,53 @@ type IORedisEventType<T = unknown> = {
// eslint-disable-next-line max-len
export default class RedisEventBusService extends AbstractEventBusModuleService {
protected readonly logger_: Logger
protected readonly moduleOptions_: EventBusRedisModuleOptions
// eslint-disable-next-line max-len
protected readonly moduleDeclaration_: InternalModuleDeclaration
protected readonly eventBusRedisConnection_: Redis
protected readonly queueName_: string
protected readonly queueOptions_: Omit<QueueOptions, "connection">
protected readonly workerOptions_: Omit<WorkerOptions, "connection">
protected readonly jobOptions_: EmitOptions
protected queue_: Queue
protected bullWorker_: Worker
constructor(
{ logger, eventBusRedisConnection }: InjectedDependencies,
moduleOptions: EventBusRedisModuleOptions = {},
moduleDeclaration: InternalModuleDeclaration
{
logger,
eventBusRedisConnection,
eventBusRedisQueueName,
eventBusRedisQueueOptions,
eventBusRedisWorkerOptions,
eventBusRedisJobOptions,
}: InjectedDependencies,
_moduleOptions: EventBusRedisModuleOptions = {},
_moduleDeclaration: InternalModuleDeclaration
) {
// @ts-ignore
// eslint-disable-next-line prefer-rest-params
super(...arguments)
this.eventBusRedisConnection_ = eventBusRedisConnection
this.moduleOptions_ = moduleOptions
this.logger_ = logger
this.queue_ = new Queue(moduleOptions.queueName ?? `events-queue`, {
this.queueName_ = eventBusRedisQueueName ?? "events-queue"
this.queueOptions_ = eventBusRedisQueueOptions ?? {}
this.workerOptions_ = eventBusRedisWorkerOptions ?? {}
this.jobOptions_ = eventBusRedisJobOptions ?? {}
this.queue_ = new Queue(this.queueName_, {
prefix: `${this.constructor.name}`,
...(moduleOptions.queueOptions ?? {}),
...this.queueOptions_,
connection: eventBusRedisConnection,
})
// Register our worker to handle emit calls
if (this.isWorkerMode) {
this.bullWorker_ = new Worker(
moduleOptions.queueName ?? "events-queue",
this.worker_,
{
prefix: `${this.constructor.name}`,
...(moduleOptions.workerOptions ?? {}),
connection: eventBusRedisConnection,
autorun: false,
}
)
this.bullWorker_ = new Worker(this.queueName_, this.worker_, {
prefix: `${this.constructor.name}`,
...this.workerOptions_,
connection: eventBusRedisConnection,
autorun: false,
})
}
}
@@ -97,7 +119,7 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
removeOnComplete: true,
attempts: 1,
// global options
...(this.moduleOptions_.jobOptions ?? {}),
...this.jobOptions_,
...options,
}

View File

@@ -26,12 +26,31 @@ export type BullJob<T> = {
export type EmitOptions = JobsOptions
export type EventBusRedisModuleOptions = {
/**
* Queue name for the event bus
*/
queueName?: string
queueOptions?: QueueOptions
workerOptions?: WorkerOptions
/**
* Options for BullMQ Queue instance
* @see https://api.docs.bullmq.io/interfaces/v5.QueueOptions.html
*/
queueOptions?: Omit<QueueOptions, "connection">
/**
* Options for BullMQ Worker instance
* @see https://api.docs.bullmq.io/interfaces/v5.WorkerOptions.html
*/
workerOptions?: Omit<WorkerOptions, "connection">
/**
* Redis connection string
*/
redisUrl?: string
/**
* Redis client options
*/
redisOptions?: RedisOptions
/**

View File

@@ -0,0 +1,323 @@
import { Logger } from "@medusajs/framework/types"
import redisLoader from "../redis"
jest.mock("ioredis", () => {
return jest.fn().mockImplementation(() => ({
connect: jest.fn((callback) => {
if (callback) callback()
return Promise.resolve()
}),
disconnect: jest.fn(),
}))
})
const loggerMock = {
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
debug: jest.fn(),
} as unknown as Logger
describe("Redis Loader", () => {
let containerMock: { register: jest.Mock }
beforeEach(() => {
jest.clearAllMocks()
containerMock = {
register: jest.fn(),
}
})
describe("Option merging", () => {
it("should use shared queueOptions as default for all queues", async () => {
const sharedQueueOptions = {
defaultJobOptions: { removeOnComplete: 1000 },
}
await redisLoader(
{
container: containerMock as any,
logger: loggerMock,
options: {
redis: {
redisUrl: "redis://localhost:6379",
queueOptions: sharedQueueOptions,
},
},
} as any,
{} as any
)
const registerCall = containerMock.register.mock.calls[0][0]
expect(registerCall.redisMainQueueOptions.resolve()).toEqual(
sharedQueueOptions
)
expect(registerCall.redisJobQueueOptions.resolve()).toEqual(
sharedQueueOptions
)
expect(registerCall.redisCleanerQueueOptions.resolve()).toEqual(
sharedQueueOptions
)
})
it("should use shared workerOptions as default for all workers", async () => {
const sharedWorkerOptions = { concurrency: 10 }
await redisLoader(
{
container: containerMock as any,
logger: loggerMock,
options: {
redis: {
redisUrl: "redis://localhost:6379",
workerOptions: sharedWorkerOptions,
},
},
} as any,
{} as any
)
const registerCall = containerMock.register.mock.calls[0][0]
expect(registerCall.redisMainWorkerOptions.resolve()).toEqual(
sharedWorkerOptions
)
expect(registerCall.redisJobWorkerOptions.resolve()).toEqual(
sharedWorkerOptions
)
expect(registerCall.redisCleanerWorkerOptions.resolve()).toEqual(
sharedWorkerOptions
)
})
it("should override shared options with per-queue options", async () => {
const sharedQueueOptions = {
defaultJobOptions: { removeOnComplete: 1000 },
}
const mainQueueOptions = {
defaultJobOptions: { removeOnComplete: 500, attempts: 3 },
}
await redisLoader(
{
container: containerMock as any,
logger: loggerMock,
options: {
redis: {
redisUrl: "redis://localhost:6379",
queueOptions: sharedQueueOptions,
mainQueueOptions: mainQueueOptions,
},
},
} as any,
{} as any
)
const registerCall = containerMock.register.mock.calls[0][0]
expect(registerCall.redisMainQueueOptions.resolve()).toEqual({
defaultJobOptions: { removeOnComplete: 500, attempts: 3 },
})
expect(registerCall.redisJobQueueOptions.resolve()).toEqual(
sharedQueueOptions
)
expect(registerCall.redisCleanerQueueOptions.resolve()).toEqual(
sharedQueueOptions
)
})
it("should override shared worker options with per-worker options", async () => {
const sharedWorkerOptions = { concurrency: 10 }
const jobWorkerOptions = { concurrency: 5 }
const cleanerWorkerOptions = { concurrency: 1 }
await redisLoader(
{
container: containerMock as any,
logger: loggerMock,
options: {
redis: {
redisUrl: "redis://localhost:6379",
workerOptions: sharedWorkerOptions,
jobWorkerOptions: jobWorkerOptions,
cleanerWorkerOptions: cleanerWorkerOptions,
},
},
} as any,
{} as any
)
const registerCall = containerMock.register.mock.calls[0][0]
expect(registerCall.redisMainWorkerOptions.resolve()).toEqual(
sharedWorkerOptions
)
expect(registerCall.redisJobWorkerOptions.resolve()).toEqual(
jobWorkerOptions
)
expect(registerCall.redisCleanerWorkerOptions.resolve()).toEqual(
cleanerWorkerOptions
)
})
it("should merge nested options correctly", async () => {
const sharedWorkerOptions = {
concurrency: 10,
limiter: { max: 100, duration: 1000 },
}
const mainWorkerOptions = {
concurrency: 20,
}
await redisLoader(
{
container: containerMock as any,
logger: loggerMock,
options: {
redis: {
redisUrl: "redis://localhost:6379",
workerOptions: sharedWorkerOptions,
mainWorkerOptions: mainWorkerOptions,
},
},
} as any,
{} as any
)
const registerCall = containerMock.register.mock.calls[0][0]
expect(registerCall.redisMainWorkerOptions.resolve()).toEqual({
concurrency: 20,
limiter: { max: 100, duration: 1000 },
})
})
})
describe("Deprecation warnings", () => {
it("should log warning when using deprecated 'url' option", async () => {
await redisLoader(
{
container: containerMock as any,
logger: loggerMock,
options: {
redis: {
url: "redis://localhost:6379",
},
},
} as any,
{} as any
)
expect(loggerMock.warn).toHaveBeenCalledWith(
"[Workflow-engine-redis] The `url` option is deprecated. Please use `redisUrl` instead for consistency with other modules."
)
})
it("should log warning when using deprecated 'options' option", async () => {
await redisLoader(
{
container: containerMock as any,
logger: loggerMock,
options: {
redis: {
redisUrl: "redis://localhost:6379",
options: { maxRetriesPerRequest: 3 },
},
},
} as any,
{} as any
)
expect(loggerMock.warn).toHaveBeenCalledWith(
"[Workflow-engine-redis] The `options` option is deprecated. Please use `redisOptions` instead for consistency with other modules."
)
})
it("should not log warning when using new option names", async () => {
await redisLoader(
{
container: containerMock as any,
logger: loggerMock,
options: {
redis: {
redisUrl: "redis://localhost:6379",
redisOptions: { maxRetriesPerRequest: 3 },
},
},
} as any,
{} as any
)
expect(loggerMock.warn).not.toHaveBeenCalled()
})
})
describe("Queue names", () => {
it("should use default queue names when not provided", async () => {
await redisLoader(
{
container: containerMock as any,
logger: loggerMock,
options: {
redis: {
redisUrl: "redis://localhost:6379",
},
},
} as any,
{} as any
)
const registerCall = containerMock.register.mock.calls[0][0]
expect(registerCall.redisQueueName.resolve()).toEqual("medusa-workflows")
expect(registerCall.redisJobQueueName.resolve()).toEqual(
"medusa-workflows-jobs"
)
})
it("should use custom queue names when provided", async () => {
await redisLoader(
{
container: containerMock as any,
logger: loggerMock,
options: {
redis: {
redisUrl: "redis://localhost:6379",
queueName: "custom-workflows",
jobQueueName: "custom-jobs",
},
},
} as any,
{} as any
)
const registerCall = containerMock.register.mock.calls[0][0]
expect(registerCall.redisQueueName.resolve()).toEqual("custom-workflows")
expect(registerCall.redisJobQueueName.resolve()).toEqual("custom-jobs")
})
})
describe("Error handling", () => {
it("should throw error when redisUrl is not provided", async () => {
await expect(
redisLoader(
{
container: containerMock as any,
logger: loggerMock,
options: {
redis: {},
},
} as any,
{} as any
)
).rejects.toThrow(
"No `redis.redisUrl` (or deprecated `redis.url`) provided"
)
})
})
})

View File

@@ -12,32 +12,86 @@ export default async (
): Promise<void> => {
const {
url,
options: redisOptions,
redisUrl,
options: deprecatedRedisOptions,
redisOptions: newRedisOptions,
jobQueueName,
queueName,
// Shared options
queueOptions,
workerOptions,
// Per-queue options
mainQueueOptions,
mainWorkerOptions,
jobQueueOptions,
jobWorkerOptions,
cleanerQueueOptions,
cleanerWorkerOptions,
pubsub,
} = options?.redis as RedisWorkflowsOptions
// TODO: get default from ENV VAR
if (!url) {
throw Error(
"No `redis.url` provided in `workflowOrchestrator` module options. It is required for the Workflow Orchestrator Redis."
// Handle backward compatibility for deprecated options
const resolvedUrl = redisUrl ?? url
const redisOptions = newRedisOptions ?? deprecatedRedisOptions
// Log deprecation warnings
if (url && !redisUrl) {
logger?.warn(
"[Workflow-engine-redis] The `url` option is deprecated. Please use `redisUrl` instead for consistency with other modules."
)
}
if (deprecatedRedisOptions && !newRedisOptions) {
logger?.warn(
"[Workflow-engine-redis] The `options` option is deprecated. Please use `redisOptions` instead for consistency with other modules."
)
}
const cnnPubSub = pubsub ?? { url, options: redisOptions }
// TODO: get default from ENV VAR
if (!resolvedUrl) {
throw Error(
"No `redis.redisUrl` (or deprecated `redis.url`) provided in `workflowOrchestrator` module options. It is required for the Workflow Orchestrator Redis."
)
}
const cnnPubSub = pubsub ?? { url: resolvedUrl, options: redisOptions }
const queueName_ = queueName ?? "medusa-workflows"
const jobQueueName_ = jobQueueName ?? "medusa-workflows-jobs"
// Resolve per-queue options by merging shared defaults with per-queue overrides
const resolvedMainQueueOptions = {
...(queueOptions ?? {}),
...(mainQueueOptions ?? {}),
}
const resolvedMainWorkerOptions = {
...(workerOptions ?? {}),
...(mainWorkerOptions ?? {}),
}
const resolvedJobQueueOptions = {
...(queueOptions ?? {}),
...(jobQueueOptions ?? {}),
}
const resolvedJobWorkerOptions = {
...(workerOptions ?? {}),
...(jobWorkerOptions ?? {}),
}
const resolvedCleanerQueueOptions = {
...(queueOptions ?? {}),
...(cleanerQueueOptions ?? {}),
}
const resolvedCleanerWorkerOptions = {
...(workerOptions ?? {}),
...(cleanerWorkerOptions ?? {}),
}
let connection
let redisPublisher
let redisSubscriber
let workerConnection
try {
connection = await getConnection(url, redisOptions)
workerConnection = await getConnection(url, {
connection = await getConnection(resolvedUrl, redisOptions)
workerConnection = await getConnection(resolvedUrl, {
...(redisOptions ?? {}),
maxRetriesPerRequest: null,
})
@@ -71,6 +125,13 @@ export default async (
redisSubscriber: asValue(redisSubscriber),
redisQueueName: asValue(queueName_),
redisJobQueueName: asValue(jobQueueName_),
// Per-queue resolved options
redisMainQueueOptions: asValue(resolvedMainQueueOptions),
redisMainWorkerOptions: asValue(resolvedMainWorkerOptions),
redisJobQueueOptions: asValue(resolvedJobQueueOptions),
redisJobWorkerOptions: asValue(resolvedJobWorkerOptions),
redisCleanerQueueOptions: asValue(resolvedCleanerQueueOptions),
redisCleanerWorkerOptions: asValue(resolvedCleanerWorkerOptions),
redisDisconnectHandler: asValue(async () => {
connection.disconnect()
workerConnection.disconnect()

View File

@@ -1,4 +1,5 @@
import { Logger } from "@medusajs/framework/types"
import { QueueOptions, WorkerOptions } from "bullmq"
import { RedisOptions } from "ioredis"
export type InitializeModuleInjectableDependencies = {
@@ -6,31 +7,170 @@ export type InitializeModuleInjectableDependencies = {
}
/**
* Module config type
* Module config type for the Redis Workflow Engine.
*
* The workflow engine uses three queues internally:
* - **Main queue**: Handles workflow retries, step timeouts, and transaction timeouts
* - **Job queue**: Handles scheduled workflow job execution
* - **Cleaner queue**: Handles periodic cleanup of expired workflow executions
*
* You can configure shared options that apply to all queues/workers, or provide
* per-queue overrides for fine-grained control.
*
* @example
* ```ts
* // Simple configuration - same options for all queues
* {
* redisUrl: "redis://localhost:6379",
* queueOptions: { defaultJobOptions: { removeOnComplete: 1000 } },
* workerOptions: { concurrency: 10 }
* }
* ```
*
* @example
* ```ts
* // Advanced configuration - per-queue overrides
* {
* redisUrl: "redis://localhost:6379",
* // Shared defaults
* workerOptions: { concurrency: 10 },
* // Override for job queue (scheduled workflows)
* jobWorkerOptions: { concurrency: 5 },
* // Override for cleaner (low priority)
* cleanerWorkerOptions: { concurrency: 1 }
* }
* ```
*/
export type RedisWorkflowsOptions = {
/**
* Redis connection string
* @deprecated Use `redisUrl` instead for consistency with other modules
*/
url?: string
/**
* Queue name used for retries and timeouts
* Redis connection string
*/
redisUrl?: string
/**
* Name for the main workflow queue that handles retries, step timeouts,
* and transaction timeouts.
* @default "medusa-workflows"
*/
queueName?: string
/**
* Queue name used for job execution
* Name for the job queue that handles scheduled workflow execution.
* @default "medusa-workflows-jobs"
*/
jobQueueName?: string
/**
* Redis client options
* @deprecated Use `redisOptions` instead for consistency with other modules
*/
options?: RedisOptions
/**
* Optiona connection string and options to pub/sub
* Redis client options passed to ioredis
* @see https://github.com/redis/ioredis#connect-to-redis
*/
redisOptions?: RedisOptions
/*
* =========================================================================
* Shared Queue/Worker Options
* =========================================================================
* These options apply to all queues and workers unless overridden by
* per-queue specific options below.
*/
/**
* Default options for all BullMQ Queue instances.
* Can be overridden per-queue using `mainQueueOptions`, `jobQueueOptions`,
* or `cleanerQueueOptions`.
* @see https://api.docs.bullmq.io/interfaces/v5.QueueOptions.html
*/
queueOptions?: Omit<QueueOptions, "connection">
/**
* Default options for all BullMQ Worker instances.
* Can be overridden per-worker using `mainWorkerOptions`, `jobWorkerOptions`,
* or `cleanerWorkerOptions`.
* @see https://api.docs.bullmq.io/interfaces/v5.WorkerOptions.html
*/
workerOptions?: Omit<WorkerOptions, "connection">
/*
* =========================================================================
* Per-Queue Options (Main Queue)
* =========================================================================
* The main queue handles workflow retries, step timeouts, and transaction
* timeouts. These are critical real-time operations.
*/
/**
* Options specific to the main workflow queue.
* Overrides `queueOptions` for this queue only.
* @see https://api.docs.bullmq.io/interfaces/v5.QueueOptions.html
*/
mainQueueOptions?: Omit<QueueOptions, "connection">
/**
* Options specific to the main workflow worker.
* Overrides `workerOptions` for this worker only.
* @see https://api.docs.bullmq.io/interfaces/v5.WorkerOptions.html
*/
mainWorkerOptions?: Omit<WorkerOptions, "connection">
/*
* =========================================================================
* Per-Queue Options (Job Queue)
* =========================================================================
* The job queue handles scheduled workflow execution. You may want different
* concurrency settings for scheduled vs real-time workflows.
*/
/**
* Options specific to the job queue (scheduled workflows).
* Overrides `queueOptions` for this queue only.
* @see https://api.docs.bullmq.io/interfaces/v5.QueueOptions.html
*/
jobQueueOptions?: Omit<QueueOptions, "connection">
/**
* Options specific to the job worker (scheduled workflows).
* Overrides `workerOptions` for this worker only.
* @see https://api.docs.bullmq.io/interfaces/v5.WorkerOptions.html
*/
jobWorkerOptions?: Omit<WorkerOptions, "connection">
/*
* =========================================================================
* Per-Queue Options (Cleaner Queue)
* =========================================================================
* The cleaner queue runs periodically (every 30 minutes) to remove expired
* workflow executions. This is a low-priority background task.
*/
/**
* Options specific to the cleaner queue.
* Overrides `queueOptions` for this queue only.
* @see https://api.docs.bullmq.io/interfaces/v5.QueueOptions.html
*/
cleanerQueueOptions?: Omit<QueueOptions, "connection">
/**
* Options specific to the cleaner worker.
* Overrides `workerOptions` for this worker only.
* @see https://api.docs.bullmq.io/interfaces/v5.WorkerOptions.html
*/
cleanerWorkerOptions?: Omit<WorkerOptions, "connection">
/**
* Optional separate connection string and options for pub/sub.
* If not provided, uses the main Redis connection.
*/
pubsub?: {
url: string

View File

@@ -0,0 +1,321 @@
import { Logger, ModulesSdkTypes } from "@medusajs/framework/types"
import { Queue, Worker } from "bullmq"
import Redis from "ioredis"
import { RedisDistributedTransactionStorage } from "../workflow-orchestrator-storage"
jest.mock("bullmq")
jest.mock("ioredis")
const loggerMock = {
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
debug: jest.fn(),
} as unknown as Logger
const redisMock = {
status: "ready",
disconnect: jest.fn(),
get: jest.fn(),
set: jest.fn(),
del: jest.fn(),
pipeline: jest.fn(() => ({
exec: jest.fn(),
})),
} as unknown as Redis
const workflowExecutionServiceMock = {
list: jest.fn(),
upsert: jest.fn(),
delete: jest.fn(),
} as unknown as ModulesSdkTypes.IMedusaInternalService<any>
const baseModuleDeps = {
workflowExecutionService: workflowExecutionServiceMock,
redisConnection: redisMock,
redisWorkerConnection: redisMock,
redisQueueName: "medusa-workflows",
redisJobQueueName: "medusa-workflows-jobs",
logger: loggerMock,
isWorkerMode: true,
}
describe("RedisDistributedTransactionStorage", () => {
beforeEach(() => {
jest.clearAllMocks()
})
describe("constructor - Queue configuration", () => {
it("should create queues with default empty options when no options provided", () => {
new RedisDistributedTransactionStorage({
...baseModuleDeps,
redisMainQueueOptions: {},
redisMainWorkerOptions: {},
redisJobQueueOptions: {},
redisJobWorkerOptions: {},
redisCleanerQueueOptions: {},
redisCleanerWorkerOptions: {},
})
expect(Queue).toHaveBeenCalledWith("medusa-workflows", {
connection: redisMock,
})
expect(Queue).toHaveBeenCalledWith("medusa-workflows-jobs", {
connection: redisMock,
})
expect(Queue).toHaveBeenCalledWith("workflows-cleaner", {
connection: redisMock,
})
expect(Queue).toHaveBeenCalledTimes(3)
})
it("should create main queue with custom options", () => {
const mainQueueOptions = {
defaultJobOptions: {
removeOnComplete: 1000,
removeOnFail: 5000,
},
}
new RedisDistributedTransactionStorage({
...baseModuleDeps,
redisMainQueueOptions: mainQueueOptions,
redisMainWorkerOptions: {},
redisJobQueueOptions: {},
redisJobWorkerOptions: {},
redisCleanerQueueOptions: {},
redisCleanerWorkerOptions: {},
})
expect(Queue).toHaveBeenCalledWith("medusa-workflows", {
...mainQueueOptions,
connection: redisMock,
})
})
it("should create job queue with custom options", () => {
const jobQueueOptions = {
defaultJobOptions: {
attempts: 5,
backoff: { type: "exponential", delay: 1000 },
},
}
new RedisDistributedTransactionStorage({
...baseModuleDeps,
redisMainQueueOptions: {},
redisMainWorkerOptions: {},
redisJobQueueOptions: jobQueueOptions,
redisJobWorkerOptions: {},
redisCleanerQueueOptions: {},
redisCleanerWorkerOptions: {},
})
expect(Queue).toHaveBeenCalledWith("medusa-workflows-jobs", {
...jobQueueOptions,
connection: redisMock,
})
})
it("should create cleaner queue with custom options", () => {
const cleanerQueueOptions = {
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
},
}
new RedisDistributedTransactionStorage({
...baseModuleDeps,
redisMainQueueOptions: {},
redisMainWorkerOptions: {},
redisJobQueueOptions: {},
redisJobWorkerOptions: {},
redisCleanerQueueOptions: cleanerQueueOptions,
redisCleanerWorkerOptions: {},
})
expect(Queue).toHaveBeenCalledWith("workflows-cleaner", {
...cleanerQueueOptions,
connection: redisMock,
})
})
it("should create each queue with different options", () => {
const mainQueueOptions = { defaultJobOptions: { removeOnComplete: 100 } }
const jobQueueOptions = { defaultJobOptions: { removeOnComplete: 200 } }
const cleanerQueueOptions = {
defaultJobOptions: { removeOnComplete: 300 },
}
new RedisDistributedTransactionStorage({
...baseModuleDeps,
redisMainQueueOptions: mainQueueOptions,
redisMainWorkerOptions: {},
redisJobQueueOptions: jobQueueOptions,
redisJobWorkerOptions: {},
redisCleanerQueueOptions: cleanerQueueOptions,
redisCleanerWorkerOptions: {},
})
expect(Queue).toHaveBeenNthCalledWith(1, "medusa-workflows", {
...mainQueueOptions,
connection: redisMock,
})
expect(Queue).toHaveBeenNthCalledWith(2, "medusa-workflows-jobs", {
...jobQueueOptions,
connection: redisMock,
})
expect(Queue).toHaveBeenNthCalledWith(3, "workflows-cleaner", {
...cleanerQueueOptions,
connection: redisMock,
})
})
it("should not create job and cleaner queues when not in worker mode", () => {
new RedisDistributedTransactionStorage({
...baseModuleDeps,
isWorkerMode: false,
redisMainQueueOptions: {},
redisMainWorkerOptions: {},
redisJobQueueOptions: {},
redisJobWorkerOptions: {},
redisCleanerQueueOptions: {},
redisCleanerWorkerOptions: {},
})
expect(Queue).toHaveBeenCalledTimes(1)
expect(Queue).toHaveBeenCalledWith("medusa-workflows", {
connection: redisMock,
})
})
})
describe("onApplicationStart - Worker configuration", () => {
it("should create workers with custom options", async () => {
const mainWorkerOptions = {
concurrency: 10,
limiter: { max: 100, duration: 1000 },
}
const jobWorkerOptions = { concurrency: 5 }
const cleanerWorkerOptions = { concurrency: 1 }
const storage = new RedisDistributedTransactionStorage({
...baseModuleDeps,
redisMainQueueOptions: {},
redisMainWorkerOptions: mainWorkerOptions,
redisJobQueueOptions: {},
redisJobWorkerOptions: jobWorkerOptions,
redisCleanerQueueOptions: {},
redisCleanerWorkerOptions: cleanerWorkerOptions,
})
const mockQueue = {
getRepeatableJobs: jest.fn().mockResolvedValue([]),
add: jest.fn().mockResolvedValue({}),
}
;(storage as any).queue = mockQueue
;(storage as any).cleanerQueue_ = mockQueue
await storage.onApplicationStart()
expect(Worker).toHaveBeenCalledWith(
"medusa-workflows",
expect.any(Function),
{
...mainWorkerOptions,
connection: redisMock,
}
)
expect(Worker).toHaveBeenCalledWith(
"medusa-workflows-jobs",
expect.any(Function),
{
...jobWorkerOptions,
connection: redisMock,
}
)
expect(Worker).toHaveBeenCalledWith(
"workflows-cleaner",
expect.any(Function),
{
...cleanerWorkerOptions,
connection: redisMock,
}
)
expect(Worker).toHaveBeenCalledTimes(3)
})
it("should create each worker with different concurrency settings", async () => {
const storage = new RedisDistributedTransactionStorage({
...baseModuleDeps,
redisMainQueueOptions: {},
redisMainWorkerOptions: { concurrency: 20 },
redisJobQueueOptions: {},
redisJobWorkerOptions: { concurrency: 10 },
redisCleanerQueueOptions: {},
redisCleanerWorkerOptions: { concurrency: 1 },
})
const mockQueue = {
getRepeatableJobs: jest.fn().mockResolvedValue([]),
add: jest.fn().mockResolvedValue({}),
}
;(storage as any).queue = mockQueue
;(storage as any).cleanerQueue_ = mockQueue
await storage.onApplicationStart()
expect(Worker).toHaveBeenNthCalledWith(
1,
"medusa-workflows",
expect.any(Function),
expect.objectContaining({ concurrency: 20 })
)
expect(Worker).toHaveBeenNthCalledWith(
2,
"medusa-workflows-jobs",
expect.any(Function),
expect.objectContaining({ concurrency: 10 })
)
expect(Worker).toHaveBeenNthCalledWith(
3,
"workflows-cleaner",
expect.any(Function),
expect.objectContaining({ concurrency: 1 })
)
})
it("should not create workers when not in worker mode", async () => {
const storage = new RedisDistributedTransactionStorage({
...baseModuleDeps,
isWorkerMode: false,
redisMainQueueOptions: {},
redisMainWorkerOptions: {},
redisJobQueueOptions: {},
redisJobWorkerOptions: {},
redisCleanerQueueOptions: {},
redisCleanerWorkerOptions: {},
})
const mockQueue = {
getRepeatableJobs: jest.fn().mockResolvedValue([]),
}
;(storage as any).queue = mockQueue
await storage.onApplicationStart()
expect(Worker).not.toHaveBeenCalled()
})
})
})

View File

@@ -24,7 +24,7 @@ import {
TransactionStepState,
} from "@medusajs/framework/utils"
import { WorkflowOrchestratorService } from "@services"
import { Queue, RepeatOptions, Worker } from "bullmq"
import { Queue, QueueOptions, RepeatOptions, Worker, WorkerOptions } from "bullmq"
import Redis from "ioredis"
enum JobType {
@@ -75,6 +75,14 @@ export class RedisDistributedTransactionStorage
private cleanerWorker_: Worker
private cleanerQueue_?: Queue
// Per-queue options
private mainQueueOptions_: Omit<QueueOptions, "connection">
private mainWorkerOptions_: Omit<WorkerOptions, "connection">
private jobQueueOptions_: Omit<QueueOptions, "connection">
private jobWorkerOptions_: Omit<WorkerOptions, "connection">
private cleanerQueueOptions_: Omit<QueueOptions, "connection">
private cleanerWorkerOptions_: Omit<WorkerOptions, "connection">
#isWorkerMode: boolean = false
constructor({
@@ -83,6 +91,12 @@ export class RedisDistributedTransactionStorage
redisWorkerConnection,
redisQueueName,
redisJobQueueName,
redisMainQueueOptions,
redisMainWorkerOptions,
redisJobQueueOptions,
redisJobWorkerOptions,
redisCleanerQueueOptions,
redisCleanerWorkerOptions,
logger,
isWorkerMode,
}: {
@@ -91,6 +105,12 @@ export class RedisDistributedTransactionStorage
redisWorkerConnection: Redis
redisQueueName: string
redisJobQueueName: string
redisMainQueueOptions: Omit<QueueOptions, "connection">
redisMainWorkerOptions: Omit<WorkerOptions, "connection">
redisJobQueueOptions: Omit<QueueOptions, "connection">
redisJobWorkerOptions: Omit<WorkerOptions, "connection">
redisCleanerQueueOptions: Omit<QueueOptions, "connection">
redisCleanerWorkerOptions: Omit<WorkerOptions, "connection">
logger: Logger
isWorkerMode: boolean
}) {
@@ -101,14 +121,29 @@ export class RedisDistributedTransactionStorage
this.cleanerQueueName = "workflows-cleaner"
this.queueName = redisQueueName
this.jobQueueName = redisJobQueueName
this.queue = new Queue(redisQueueName, { connection: this.redisClient })
// Store per-queue options
this.mainQueueOptions_ = redisMainQueueOptions ?? {}
this.mainWorkerOptions_ = redisMainWorkerOptions ?? {}
this.jobQueueOptions_ = redisJobQueueOptions ?? {}
this.jobWorkerOptions_ = redisJobWorkerOptions ?? {}
this.cleanerQueueOptions_ = redisCleanerQueueOptions ?? {}
this.cleanerWorkerOptions_ = redisCleanerWorkerOptions ?? {}
// Create queues with their respective options
this.queue = new Queue(redisQueueName, {
...this.mainQueueOptions_,
connection: this.redisClient,
})
this.jobQueue = isWorkerMode
? new Queue(redisJobQueueName, {
...this.jobQueueOptions_,
connection: this.redisClient,
})
: undefined
this.cleanerQueue_ = isWorkerMode
? new Queue(this.cleanerQueueName, {
...this.cleanerQueueOptions_,
connection: this.redisClient,
})
: undefined
@@ -137,7 +172,17 @@ export class RedisDistributedTransactionStorage
JobType.TRANSACTION_TIMEOUT,
]
const workerOptions = {
// Per-worker options with their respective configurations
const mainWorkerOptions: WorkerOptions = {
...this.mainWorkerOptions_,
connection: this.redisWorkerConnection,
}
const jobWorkerOptions: WorkerOptions = {
...this.jobWorkerOptions_,
connection: this.redisWorkerConnection,
}
const cleanerWorkerOptions: WorkerOptions = {
...this.cleanerWorkerOptions_,
connection: this.redisWorkerConnection,
}
@@ -173,7 +218,7 @@ export class RedisDistributedTransactionStorage
await this.remove(job.data.jobId)
}
},
workerOptions
mainWorkerOptions
)
this.jobWorker = new Worker(
@@ -191,7 +236,7 @@ export class RedisDistributedTransactionStorage
job.data.schedulerOptions
)
},
workerOptions
jobWorkerOptions
)
this.cleanerWorker_ = new Worker(
@@ -199,7 +244,7 @@ export class RedisDistributedTransactionStorage
async () => {
await this.clearExpiredExecutions()
},
workerOptions
cleanerWorkerOptions
)
await this.cleanerQueue_?.add(