From 7307a5e63fdf8ed6f0ff201a0383f0a0bbfc7002 Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Mon, 12 Jan 2026 16:05:42 +0100 Subject: [PATCH] feat(events): Implement default priority-based event processing (#14476) * feat(events): Set internal events default priority to lowest, default events to 100 and order placed to 10 * Create swift-months-rush.md * improvements * improvements * improvements * fix condition * doc * fix tests * fix tests --------- Co-authored-by: Oli Juhl <59018053+olivermrbl@users.noreply.github.com> --- .changeset/swift-months-rush.md | 11 + .../event-bus/subscriber-registration.spec.ts | 13 +- .../src/cart/workflows/complete-cart.ts | 4 + .../workflows/convert-draft-order.ts | 4 + packages/core/utils/src/event-bus/utils.ts | 12 + .../src/services/__tests__/event-bus.ts | 542 +++++++++++++++++- .../src/services/event-bus-redis.ts | 48 +- 7 files changed, 617 insertions(+), 17 deletions(-) create mode 100644 .changeset/swift-months-rush.md diff --git a/.changeset/swift-months-rush.md b/.changeset/swift-months-rush.md new file mode 100644 index 0000000000..064bbc12b4 --- /dev/null +++ b/.changeset/swift-months-rush.md @@ -0,0 +1,11 @@ +--- +"@medusajs/event-bus-redis": patch +"@medusajs/core-flows": patch +--- + +feat(events): Implement priority-based event processing + +- Internal events default to lowest priority (2,097,152) to prevent queue overload +- Normal events default to priority 100 +- Order placed events explicitly set to priority 10 for immediate processing +- Support for priority overrides at message, emit, and module levels diff --git a/integration-tests/http/__tests__/event-bus/subscriber-registration.spec.ts b/integration-tests/http/__tests__/event-bus/subscriber-registration.spec.ts index e5852e9a3f..dd7576425c 100644 --- a/integration-tests/http/__tests__/event-bus/subscriber-registration.spec.ts +++ b/integration-tests/http/__tests__/event-bus/subscriber-registration.spec.ts @@ -5,6 +5,10 @@ import path from "path" jest.setTimeout(100000) +function getJobCounts(queue) { + return queue.waiting + queue.delayed + (queue.prioritized || 0) +} + medusaIntegrationTestRunner({ medusaConfigFile: path.join( __dirname, @@ -42,8 +46,7 @@ medusaIntegrationTestRunner({ const queue = (eventBus as any).queue_ const jobCountsBefore = await queue.getJobCounts() - const totalJobsBefore = - jobCountsBefore.waiting + jobCountsBefore.delayed + const totalJobsBefore = getJobCounts(jobCountsBefore) await eventBus.emit( composeMessage(testEventName, { @@ -55,9 +58,9 @@ medusaIntegrationTestRunner({ ) const jobCountsAfterWithSubscriber = await queue.getJobCounts() - const totalJobsAfterWithSubscriber = - jobCountsAfterWithSubscriber.waiting + - jobCountsAfterWithSubscriber.delayed + const totalJobsAfterWithSubscriber = getJobCounts( + jobCountsAfterWithSubscriber + ) expect(totalJobsAfterWithSubscriber).toBeGreaterThan(totalJobsBefore) diff --git a/packages/core/core-flows/src/cart/workflows/complete-cart.ts b/packages/core/core-flows/src/cart/workflows/complete-cart.ts index 22bbf2702a..0c60ac9024 100644 --- a/packages/core/core-flows/src/cart/workflows/complete-cart.ts +++ b/packages/core/core-flows/src/cart/workflows/complete-cart.ts @@ -6,6 +6,7 @@ import { UsageComputedActions, } from "@medusajs/framework/types" import { + EventPriority, isDefined, Modules, OrderStatus, @@ -598,6 +599,9 @@ export const completeCartWorkflow = createWorkflow( emitEventStep({ eventName: OrderWorkflowEvents.PLACED, data: { id: createdOrder.id }, + options: { + priority: EventPriority.CRITICAL, + }, }) ) diff --git a/packages/core/core-flows/src/draft-order/workflows/convert-draft-order.ts b/packages/core/core-flows/src/draft-order/workflows/convert-draft-order.ts index 8170c23b49..b809f2d03a 100644 --- a/packages/core/core-flows/src/draft-order/workflows/convert-draft-order.ts +++ b/packages/core/core-flows/src/draft-order/workflows/convert-draft-order.ts @@ -1,4 +1,5 @@ import { + EventPriority, Modules, OrderStatus, OrderWorkflowEvents, @@ -180,6 +181,9 @@ export const convertDraftOrderWorkflow = createWorkflow( emitEventStep({ eventName: OrderWorkflowEvents.PLACED, data: { id: updatedOrder.id }, + options: { + priority: EventPriority.CRITICAL, + }, }) ) diff --git a/packages/core/utils/src/event-bus/utils.ts b/packages/core/utils/src/event-bus/utils.ts index 92b40046dd..be32b232f4 100644 --- a/packages/core/utils/src/event-bus/utils.ts +++ b/packages/core/utils/src/event-bus/utils.ts @@ -87,3 +87,15 @@ export function buildEventNamesFromEntityName( return events as ReturnType } + +export const EventPriority = { + CRITICAL: 10, + HIGH: 50, + DEFAULT: 100, + LOW: 500, + /** + * Lowest priority value supported by BullMQ (2^21) + * Internal events use this priority to ensure they don't block critical business events + */ + LOWEST: 2_097_152, +} as const diff --git a/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts b/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts index 81b4313bcc..a364c9778b 100644 --- a/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts +++ b/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts @@ -103,10 +103,11 @@ describe("RedisEventBusService", () => { expect(queue.addBulk).toHaveBeenCalledWith([ { name: "eventName", - data: { data: { hi: "1234" } }, + data: { data: { hi: "1234" }, metadata: undefined }, opts: { attempts: 1, removeOnComplete: true, + priority: 100, }, }, ]) @@ -125,12 +126,13 @@ describe("RedisEventBusService", () => { expect(queue.addBulk).toHaveBeenCalledWith([ { name: "eventName", - data: { data: { hi: "1234" } }, + data: { data: { hi: "1234" }, metadata: undefined }, opts: { attempts: 3, backoff: 5000, delay: 1000, removeOnComplete: true, + priority: 100, }, }, ]) @@ -168,7 +170,7 @@ describe("RedisEventBusService", () => { expect(queue.addBulk).toHaveBeenCalledWith([ { name: "eventName", - data: { data: { hi: "1234" } }, + data: { data: { hi: "1234" }, metadata: undefined }, opts: { attempts: 3, backoff: 5000, @@ -176,6 +178,7 @@ describe("RedisEventBusService", () => { removeOnComplete: { age: 5, }, + priority: 100, }, }, ]) @@ -210,11 +213,12 @@ describe("RedisEventBusService", () => { expect(queue.addBulk).toHaveBeenCalledWith([ { name: "eventName", - data: { data: { hi: "1234" } }, + data: { data: { hi: "1234" }, metadata: undefined }, opts: { attempts: 1, removeOnComplete: 5, delay: 1000, + priority: 100, }, }, ]) @@ -331,6 +335,536 @@ describe("RedisEventBusService", () => { }) }) + describe("Priority levels", () => { + beforeEach(async () => { + jest.clearAllMocks() + + eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration) + + queue = (eventBus as any).queue_ + queue.addBulk = jest.fn() + redis = (eventBus as any).eventBusRedisConnection_ + redis.rpush = jest.fn() + }) + + it("should add job to queue with default priority (100) for normal events", async () => { + eventBus.subscribe("eventName", () => Promise.resolve()) + + await eventBus.emit([ + { + name: "eventName", + data: { hi: "1234" }, + }, + ]) + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([ + { + name: "eventName", + data: { data: { hi: "1234" }, metadata: undefined }, + opts: { + attempts: 1, + removeOnComplete: true, + priority: 100, + }, + }, + ]) + }) + + it("should add job to queue with lowest priority (2097152) for internal events", async () => { + eventBus.subscribe("eventName", () => Promise.resolve()) + + await eventBus.emit( + [ + { + name: "eventName", + data: { hi: "1234" }, + }, + ], + { internal: true } + ) + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([ + { + name: "eventName", + data: { data: { hi: "1234" }, metadata: undefined }, + opts: { + attempts: 1, + removeOnComplete: true, + priority: 2097152, + internal: true, + }, + }, + ]) + }) + + it("should add job to queue with custom priority override at emit time", async () => { + eventBus.subscribe("eventName", () => Promise.resolve()) + + await eventBus.emit( + [ + { + name: "eventName", + data: { hi: "1234" }, + }, + ], + { priority: 50 } + ) + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([ + { + name: "eventName", + data: { data: { hi: "1234" }, metadata: undefined }, + opts: { + attempts: 1, + removeOnComplete: true, + priority: 50, + }, + }, + ]) + }) + + it("should add job to queue with custom priority via module job options", async () => { + eventBus = new RedisEventBusService( + { + ...moduleDeps, + eventBusRedisJobOptions: { + priority: 200, + }, + }, + {}, + moduleDeclaration + ) + + queue = (eventBus as any).queue_ + queue.addBulk = jest.fn() + + eventBus.subscribe("eventName", () => Promise.resolve()) + + await eventBus.emit([ + { + name: "eventName", + data: { hi: "1234" }, + }, + ]) + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([ + { + name: "eventName", + data: { data: { hi: "1234" }, metadata: undefined }, + opts: { + attempts: 1, + removeOnComplete: true, + priority: 200, + }, + }, + ]) + }) + + it("should override module priority with emit options priority", async () => { + eventBus = new RedisEventBusService( + { + ...moduleDeps, + eventBusRedisJobOptions: { + priority: 200, + }, + }, + {}, + moduleDeclaration + ) + + queue = (eventBus as any).queue_ + queue.addBulk = jest.fn() + + eventBus.subscribe("eventName", () => Promise.resolve()) + + await eventBus.emit( + [ + { + name: "eventName", + data: { hi: "1234" }, + }, + ], + { priority: 25 } + ) + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([ + { + name: "eventName", + data: { data: { hi: "1234" }, metadata: undefined }, + opts: { + attempts: 1, + removeOnComplete: true, + priority: 25, + }, + }, + ]) + }) + + it("should allow module priority to override internal flag default", async () => { + eventBus = new RedisEventBusService( + { + ...moduleDeps, + eventBusRedisJobOptions: { + priority: 200, + }, + }, + {}, + moduleDeclaration + ) + + queue = (eventBus as any).queue_ + queue.addBulk = jest.fn() + + eventBus.subscribe("eventName", () => Promise.resolve()) + + await eventBus.emit( + [ + { + name: "eventName", + data: { hi: "1234" }, + }, + ], + { internal: true } + ) + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([ + { + name: "eventName", + data: { data: { hi: "1234" }, metadata: undefined }, + opts: { + attempts: 1, + removeOnComplete: true, + priority: 200, + internal: true, + }, + }, + ]) + }) + + it("should allow explicit priority to override internal flag default", async () => { + eventBus.subscribe("eventName", () => Promise.resolve()) + + await eventBus.emit( + [ + { + name: "eventName", + data: { hi: "1234" }, + }, + ], + { internal: true, priority: 50 } + ) + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([ + { + name: "eventName", + data: { data: { hi: "1234" }, metadata: undefined }, + opts: { + attempts: 1, + removeOnComplete: true, + priority: 50, + internal: true, + }, + }, + ]) + }) + + describe("Message-level options", () => { + it("should allow message-level priority to override emit-level priority", async () => { + eventBus.subscribe("eventName", () => Promise.resolve()) + + await eventBus.emit( + [ + { + name: "eventName", + data: { hi: "1234" }, + options: { priority: 10 }, + }, + ], + { priority: 100 } + ) + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([ + { + name: "eventName", + data: { data: { hi: "1234" }, metadata: undefined }, + opts: { + attempts: 1, + removeOnComplete: true, + priority: 10, + }, + }, + ]) + }) + + it("should allow different priorities per message in same emit call", async () => { + eventBus.subscribe("eventName1", () => Promise.resolve()) + eventBus.subscribe("eventName2", () => Promise.resolve()) + eventBus.subscribe("eventName3", () => Promise.resolve()) + + await eventBus.emit( + [ + { + name: "eventName1", + data: { id: "1" }, + options: { priority: 10 }, + }, + { + name: "eventName2", + data: { id: "2" }, + options: { priority: 50 }, + }, + { + name: "eventName3", + data: { id: "3" }, + // No options - should use emit-level priority + }, + ], + { priority: 200 } + ) + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([ + { + name: "eventName1", + data: { data: { id: "1" }, metadata: undefined }, + opts: { + attempts: 1, + removeOnComplete: true, + priority: 10, + }, + }, + { + name: "eventName2", + data: { data: { id: "2" }, metadata: undefined }, + opts: { + attempts: 1, + removeOnComplete: true, + priority: 50, + }, + }, + { + name: "eventName3", + data: { data: { id: "3" }, metadata: undefined }, + opts: { + attempts: 1, + removeOnComplete: true, + priority: 200, + }, + }, + ]) + }) + + it("should allow message-level priority to override module-level priority", async () => { + eventBus = new RedisEventBusService( + { + ...moduleDeps, + eventBusRedisJobOptions: { + priority: 300, + }, + }, + {}, + moduleDeclaration + ) + + queue = (eventBus as any).queue_ + queue.addBulk = jest.fn() + + eventBus.subscribe("eventName", () => Promise.resolve()) + + await eventBus.emit([ + { + name: "eventName", + data: { hi: "1234" }, + options: { priority: 5 }, + }, + ]) + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([ + { + name: "eventName", + data: { data: { hi: "1234" }, metadata: undefined }, + opts: { + attempts: 1, + removeOnComplete: true, + priority: 5, + }, + }, + ]) + }) + + it("should allow message-level priority to override internal flag", async () => { + eventBus.subscribe("eventName", () => Promise.resolve()) + + await eventBus.emit( + [ + { + name: "eventName", + data: { hi: "1234" }, + options: { priority: 15 }, + }, + ], + { internal: true } + ) + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([ + { + name: "eventName", + data: { data: { hi: "1234" }, metadata: undefined }, + opts: { + attempts: 1, + removeOnComplete: true, + priority: 15, + internal: true, + }, + }, + ]) + }) + }) + + describe("Grouped events priority", () => { + it("should stage grouped events with default priority (100)", async () => { + const event = { + name: "grouped-event", + data: { hi: "1234" }, + metadata: { eventGroupId: "test-group-priority" }, + } + + await eventBus.emit(event) + + expect(redis.rpush).toHaveBeenCalledTimes(1) + const calledWith = redis.rpush.mock.calls[0] + const stagedEvent = JSON.parse(calledWith[1]) + + expect(stagedEvent.opts.priority).toBe(100) + }) + + it("should stage grouped events with lowest priority (2097152) for internal events", async () => { + const event = { + name: "grouped-event", + data: { hi: "1234" }, + metadata: { eventGroupId: "test-group-priority" }, + } + + await eventBus.emit(event, { internal: true }) + + expect(redis.rpush).toHaveBeenCalledTimes(1) + const calledWith = redis.rpush.mock.calls[0] + const stagedEvent = JSON.parse(calledWith[1]) + + expect(stagedEvent.opts.priority).toBe(2097152) + expect(stagedEvent.opts.internal).toBe(true) + }) + + it("should stage grouped events with custom priority", async () => { + const event = { + name: "grouped-event", + data: { hi: "1234" }, + metadata: { eventGroupId: "test-group-priority" }, + } + + await eventBus.emit(event, { priority: 50 }) + + expect(redis.rpush).toHaveBeenCalledTimes(1) + const calledWith = redis.rpush.mock.calls[0] + const stagedEvent = JSON.parse(calledWith[1]) + + expect(stagedEvent.opts.priority).toBe(50) + }) + + it("should preserve priority when releasing grouped events", async () => { + const event = { + name: "grouped-event", + data: { hi: "1234" }, + metadata: { eventGroupId: "test-group-priority" }, + } + + const [builtEvent] = (eventBus as any).buildEvents([event], { + priority: 75, + }) + + eventBus.subscribe("grouped-event", () => Promise.resolve()) + + redis.lrange = jest.fn((key) => { + if (key === "staging:test-group-priority") { + return Promise.resolve([JSON.stringify(builtEvent)]) + } + return Promise.resolve([]) + }) + redis.unlink = jest.fn() + + await eventBus.releaseGroupedEvents("test-group-priority") + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([builtEvent]) + expect(builtEvent.opts.priority).toBe(75) + }) + + it("should stage grouped events with message-level priority overriding emit-level priority", async () => { + const event = { + name: "grouped-event", + data: { hi: "1234" }, + metadata: { eventGroupId: "test-group-priority" }, + options: { priority: 20 }, + } + + await eventBus.emit(event, { priority: 100 }) + + expect(redis.rpush).toHaveBeenCalledTimes(1) + const calledWith = redis.rpush.mock.calls[0] + const stagedEvent = JSON.parse(calledWith[1]) + + expect(stagedEvent.opts.priority).toBe(20) + }) + + it("should allow different priorities per grouped message in same emit call", async () => { + const events = [ + { + name: "grouped-event-1", + data: { id: "1" }, + metadata: { eventGroupId: "test-group-multi" }, + options: { priority: 10 }, + }, + { + name: "grouped-event-2", + data: { id: "2" }, + metadata: { eventGroupId: "test-group-multi" }, + options: { priority: 50 }, + }, + { + name: "grouped-event-3", + data: { id: "3" }, + metadata: { eventGroupId: "test-group-multi" }, + // No options - should use emit-level priority + }, + ] + + await eventBus.emit(events, { priority: 200 }) + + expect(redis.rpush).toHaveBeenCalledTimes(1) + const calledWith = redis.rpush.mock.calls[0] + + // First argument is the key, rest are the events + const stagedEvent1 = JSON.parse(calledWith[1]) + const stagedEvent2 = JSON.parse(calledWith[2]) + const stagedEvent3 = JSON.parse(calledWith[3]) + + expect(stagedEvent1.opts.priority).toBe(10) + expect(stagedEvent2.opts.priority).toBe(50) + expect(stagedEvent3.opts.priority).toBe(200) + }) + }) + }) + describe("Events without subscribers", () => { beforeEach(async () => { jest.clearAllMocks() diff --git a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts index 3857c31cd4..f6a56d3dfe 100644 --- a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts +++ b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts @@ -6,6 +6,7 @@ import { } from "@medusajs/framework/types" import { AbstractEventBusModuleService, + EventPriority, isPresent, promiseAll, } from "@medusajs/framework/utils" @@ -110,6 +111,20 @@ export default class RedisEventBusService extends AbstractEventBusModuleService }, } + /** + * Build events for queue processing with priority handling. + * + * Priority levels (lower number = higher priority): + * - 10: Critical business events (e.g., order placed) + * - 100: Default priority for normal events (default) + * - 2,097,152: Lowest priority for internal events + * + * Priority override hierarchy (highest to lowest precedence): + * 1. Message-level options (eventData.options.priority) + * 2. Emit-level options (options.priority) + * 3. Module-level job options (this.jobOptions_.priority) + * 4. Internal flag default (options.internal ? EventPriority.LOWEST : EventPriority.DEFAULT) + */ private buildEvents( eventsData: Message[], options: Options = {} @@ -118,6 +133,7 @@ export default class RedisEventBusService extends AbstractEventBusModuleService // default options removeOnComplete: true, attempts: 1, + priority: options.internal ? EventPriority.LOWEST : EventPriority.DEFAULT, // global options ...this.jobOptions_, ...options, @@ -132,16 +148,30 @@ export default class RedisEventBusService extends AbstractEventBusModuleService metadata: eventData.metadata, } + const finalOptions: IORedisEventType["opts"] = { + ...opts, + ...eventData.options, + } + + if ( + finalOptions.priority != undefined && + (finalOptions.priority < 1 || + finalOptions.priority > EventPriority.LOWEST) + ) { + this.logger_.warn( + `Invalid priority value: ${finalOptions.priority} for event ${eventData.name}. Must be between 1 and ${EventPriority.LOWEST}` + ) + finalOptions.priority = EventPriority.DEFAULT + this.logger_.warn( + `Setting priority to default value: ${EventPriority.DEFAULT} for event ${eventData.name}` + ) + } + return { data: event, name: eventData.name, - opts: { - // options for event group - ...opts, - // options for a particular event - ...eventData.options, - }, - } as any + opts: finalOptions, + } as IORedisEventType }) } @@ -358,8 +388,10 @@ export default class RedisEventBusService extends AbstractEventBusModuleService `Retrying ${name} which has ${eventSubscribers.length} subscribers (${subscribersInCurrentAttempt.length} of them failed)` ) } else { + const prioirityInfo = + opts.priority != undefined ? ` (priority: ${opts.priority})` : "" this.logger_.info( - `Processing ${name} which has ${eventSubscribers.length} subscribers` + `Processing ${name}${prioirityInfo} which has ${eventSubscribers.length} subscribers` ) } }