feat(events): Implement default priority-based event processing (#14476)

* feat(events): Set internal events default priority to lowest, default events to 100 and order placed to 10

* Create swift-months-rush.md

* improvements

* improvements

* improvements

* fix condition

* doc

* fix tests

* fix tests

---------

Co-authored-by: Oli Juhl <59018053+olivermrbl@users.noreply.github.com>
This commit is contained in:
Adrien de Peretti
2026-01-12 16:05:42 +01:00
committed by GitHub
parent 43951ce60e
commit 7307a5e63f
7 changed files with 617 additions and 17 deletions

View File

@@ -0,0 +1,11 @@
---
"@medusajs/event-bus-redis": patch
"@medusajs/core-flows": patch
---
feat(events): Implement priority-based event processing
- Internal events default to lowest priority (2,097,152) to prevent queue overload
- Normal events default to priority 100
- Order placed events explicitly set to priority 10 for immediate processing
- Support for priority overrides at message, emit, and module levels

View File

@@ -5,6 +5,10 @@ import path from "path"
jest.setTimeout(100000) jest.setTimeout(100000)
function getJobCounts(queue) {
return queue.waiting + queue.delayed + (queue.prioritized || 0)
}
medusaIntegrationTestRunner({ medusaIntegrationTestRunner({
medusaConfigFile: path.join( medusaConfigFile: path.join(
__dirname, __dirname,
@@ -42,8 +46,7 @@ medusaIntegrationTestRunner({
const queue = (eventBus as any).queue_ const queue = (eventBus as any).queue_
const jobCountsBefore = await queue.getJobCounts() const jobCountsBefore = await queue.getJobCounts()
const totalJobsBefore = const totalJobsBefore = getJobCounts(jobCountsBefore)
jobCountsBefore.waiting + jobCountsBefore.delayed
await eventBus.emit( await eventBus.emit(
composeMessage(testEventName, { composeMessage(testEventName, {
@@ -55,9 +58,9 @@ medusaIntegrationTestRunner({
) )
const jobCountsAfterWithSubscriber = await queue.getJobCounts() const jobCountsAfterWithSubscriber = await queue.getJobCounts()
const totalJobsAfterWithSubscriber = const totalJobsAfterWithSubscriber = getJobCounts(
jobCountsAfterWithSubscriber.waiting + jobCountsAfterWithSubscriber
jobCountsAfterWithSubscriber.delayed )
expect(totalJobsAfterWithSubscriber).toBeGreaterThan(totalJobsBefore) expect(totalJobsAfterWithSubscriber).toBeGreaterThan(totalJobsBefore)

View File

@@ -6,6 +6,7 @@ import {
UsageComputedActions, UsageComputedActions,
} from "@medusajs/framework/types" } from "@medusajs/framework/types"
import { import {
EventPriority,
isDefined, isDefined,
Modules, Modules,
OrderStatus, OrderStatus,
@@ -598,6 +599,9 @@ export const completeCartWorkflow = createWorkflow(
emitEventStep({ emitEventStep({
eventName: OrderWorkflowEvents.PLACED, eventName: OrderWorkflowEvents.PLACED,
data: { id: createdOrder.id }, data: { id: createdOrder.id },
options: {
priority: EventPriority.CRITICAL,
},
}) })
) )

View File

@@ -1,4 +1,5 @@
import { import {
EventPriority,
Modules, Modules,
OrderStatus, OrderStatus,
OrderWorkflowEvents, OrderWorkflowEvents,
@@ -180,6 +181,9 @@ export const convertDraftOrderWorkflow = createWorkflow(
emitEventStep({ emitEventStep({
eventName: OrderWorkflowEvents.PLACED, eventName: OrderWorkflowEvents.PLACED,
data: { id: updatedOrder.id }, data: { id: updatedOrder.id },
options: {
priority: EventPriority.CRITICAL,
},
}) })
) )

View File

@@ -87,3 +87,15 @@ export function buildEventNamesFromEntityName<TNames extends string[]>(
return events as ReturnType<TNames> return events as ReturnType<TNames>
} }
export const EventPriority = {
CRITICAL: 10,
HIGH: 50,
DEFAULT: 100,
LOW: 500,
/**
* Lowest priority value supported by BullMQ (2^21)
* Internal events use this priority to ensure they don't block critical business events
*/
LOWEST: 2_097_152,
} as const

View File

@@ -103,10 +103,11 @@ describe("RedisEventBusService", () => {
expect(queue.addBulk).toHaveBeenCalledWith([ expect(queue.addBulk).toHaveBeenCalledWith([
{ {
name: "eventName", name: "eventName",
data: { data: { hi: "1234" } }, data: { data: { hi: "1234" }, metadata: undefined },
opts: { opts: {
attempts: 1, attempts: 1,
removeOnComplete: true, removeOnComplete: true,
priority: 100,
}, },
}, },
]) ])
@@ -125,12 +126,13 @@ describe("RedisEventBusService", () => {
expect(queue.addBulk).toHaveBeenCalledWith([ expect(queue.addBulk).toHaveBeenCalledWith([
{ {
name: "eventName", name: "eventName",
data: { data: { hi: "1234" } }, data: { data: { hi: "1234" }, metadata: undefined },
opts: { opts: {
attempts: 3, attempts: 3,
backoff: 5000, backoff: 5000,
delay: 1000, delay: 1000,
removeOnComplete: true, removeOnComplete: true,
priority: 100,
}, },
}, },
]) ])
@@ -168,7 +170,7 @@ describe("RedisEventBusService", () => {
expect(queue.addBulk).toHaveBeenCalledWith([ expect(queue.addBulk).toHaveBeenCalledWith([
{ {
name: "eventName", name: "eventName",
data: { data: { hi: "1234" } }, data: { data: { hi: "1234" }, metadata: undefined },
opts: { opts: {
attempts: 3, attempts: 3,
backoff: 5000, backoff: 5000,
@@ -176,6 +178,7 @@ describe("RedisEventBusService", () => {
removeOnComplete: { removeOnComplete: {
age: 5, age: 5,
}, },
priority: 100,
}, },
}, },
]) ])
@@ -210,11 +213,12 @@ describe("RedisEventBusService", () => {
expect(queue.addBulk).toHaveBeenCalledWith([ expect(queue.addBulk).toHaveBeenCalledWith([
{ {
name: "eventName", name: "eventName",
data: { data: { hi: "1234" } }, data: { data: { hi: "1234" }, metadata: undefined },
opts: { opts: {
attempts: 1, attempts: 1,
removeOnComplete: 5, removeOnComplete: 5,
delay: 1000, delay: 1000,
priority: 100,
}, },
}, },
]) ])
@@ -331,6 +335,536 @@ describe("RedisEventBusService", () => {
}) })
}) })
describe("Priority levels", () => {
beforeEach(async () => {
jest.clearAllMocks()
eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration)
queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
redis = (eventBus as any).eventBusRedisConnection_
redis.rpush = jest.fn()
})
it("should add job to queue with default priority (100) for normal events", async () => {
eventBus.subscribe("eventName", () => Promise.resolve())
await eventBus.emit([
{
name: "eventName",
data: { hi: "1234" },
},
])
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { data: { hi: "1234" }, metadata: undefined },
opts: {
attempts: 1,
removeOnComplete: true,
priority: 100,
},
},
])
})
it("should add job to queue with lowest priority (2097152) for internal events", async () => {
eventBus.subscribe("eventName", () => Promise.resolve())
await eventBus.emit(
[
{
name: "eventName",
data: { hi: "1234" },
},
],
{ internal: true }
)
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { data: { hi: "1234" }, metadata: undefined },
opts: {
attempts: 1,
removeOnComplete: true,
priority: 2097152,
internal: true,
},
},
])
})
it("should add job to queue with custom priority override at emit time", async () => {
eventBus.subscribe("eventName", () => Promise.resolve())
await eventBus.emit(
[
{
name: "eventName",
data: { hi: "1234" },
},
],
{ priority: 50 }
)
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { data: { hi: "1234" }, metadata: undefined },
opts: {
attempts: 1,
removeOnComplete: true,
priority: 50,
},
},
])
})
it("should add job to queue with custom priority via module job options", async () => {
eventBus = new RedisEventBusService(
{
...moduleDeps,
eventBusRedisJobOptions: {
priority: 200,
},
},
{},
moduleDeclaration
)
queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
eventBus.subscribe("eventName", () => Promise.resolve())
await eventBus.emit([
{
name: "eventName",
data: { hi: "1234" },
},
])
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { data: { hi: "1234" }, metadata: undefined },
opts: {
attempts: 1,
removeOnComplete: true,
priority: 200,
},
},
])
})
it("should override module priority with emit options priority", async () => {
eventBus = new RedisEventBusService(
{
...moduleDeps,
eventBusRedisJobOptions: {
priority: 200,
},
},
{},
moduleDeclaration
)
queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
eventBus.subscribe("eventName", () => Promise.resolve())
await eventBus.emit(
[
{
name: "eventName",
data: { hi: "1234" },
},
],
{ priority: 25 }
)
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { data: { hi: "1234" }, metadata: undefined },
opts: {
attempts: 1,
removeOnComplete: true,
priority: 25,
},
},
])
})
it("should allow module priority to override internal flag default", async () => {
eventBus = new RedisEventBusService(
{
...moduleDeps,
eventBusRedisJobOptions: {
priority: 200,
},
},
{},
moduleDeclaration
)
queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
eventBus.subscribe("eventName", () => Promise.resolve())
await eventBus.emit(
[
{
name: "eventName",
data: { hi: "1234" },
},
],
{ internal: true }
)
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { data: { hi: "1234" }, metadata: undefined },
opts: {
attempts: 1,
removeOnComplete: true,
priority: 200,
internal: true,
},
},
])
})
it("should allow explicit priority to override internal flag default", async () => {
eventBus.subscribe("eventName", () => Promise.resolve())
await eventBus.emit(
[
{
name: "eventName",
data: { hi: "1234" },
},
],
{ internal: true, priority: 50 }
)
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { data: { hi: "1234" }, metadata: undefined },
opts: {
attempts: 1,
removeOnComplete: true,
priority: 50,
internal: true,
},
},
])
})
describe("Message-level options", () => {
it("should allow message-level priority to override emit-level priority", async () => {
eventBus.subscribe("eventName", () => Promise.resolve())
await eventBus.emit(
[
{
name: "eventName",
data: { hi: "1234" },
options: { priority: 10 },
},
],
{ priority: 100 }
)
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { data: { hi: "1234" }, metadata: undefined },
opts: {
attempts: 1,
removeOnComplete: true,
priority: 10,
},
},
])
})
it("should allow different priorities per message in same emit call", async () => {
eventBus.subscribe("eventName1", () => Promise.resolve())
eventBus.subscribe("eventName2", () => Promise.resolve())
eventBus.subscribe("eventName3", () => Promise.resolve())
await eventBus.emit(
[
{
name: "eventName1",
data: { id: "1" },
options: { priority: 10 },
},
{
name: "eventName2",
data: { id: "2" },
options: { priority: 50 },
},
{
name: "eventName3",
data: { id: "3" },
// No options - should use emit-level priority
},
],
{ priority: 200 }
)
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName1",
data: { data: { id: "1" }, metadata: undefined },
opts: {
attempts: 1,
removeOnComplete: true,
priority: 10,
},
},
{
name: "eventName2",
data: { data: { id: "2" }, metadata: undefined },
opts: {
attempts: 1,
removeOnComplete: true,
priority: 50,
},
},
{
name: "eventName3",
data: { data: { id: "3" }, metadata: undefined },
opts: {
attempts: 1,
removeOnComplete: true,
priority: 200,
},
},
])
})
it("should allow message-level priority to override module-level priority", async () => {
eventBus = new RedisEventBusService(
{
...moduleDeps,
eventBusRedisJobOptions: {
priority: 300,
},
},
{},
moduleDeclaration
)
queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
eventBus.subscribe("eventName", () => Promise.resolve())
await eventBus.emit([
{
name: "eventName",
data: { hi: "1234" },
options: { priority: 5 },
},
])
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { data: { hi: "1234" }, metadata: undefined },
opts: {
attempts: 1,
removeOnComplete: true,
priority: 5,
},
},
])
})
it("should allow message-level priority to override internal flag", async () => {
eventBus.subscribe("eventName", () => Promise.resolve())
await eventBus.emit(
[
{
name: "eventName",
data: { hi: "1234" },
options: { priority: 15 },
},
],
{ internal: true }
)
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { data: { hi: "1234" }, metadata: undefined },
opts: {
attempts: 1,
removeOnComplete: true,
priority: 15,
internal: true,
},
},
])
})
})
describe("Grouped events priority", () => {
it("should stage grouped events with default priority (100)", async () => {
const event = {
name: "grouped-event",
data: { hi: "1234" },
metadata: { eventGroupId: "test-group-priority" },
}
await eventBus.emit(event)
expect(redis.rpush).toHaveBeenCalledTimes(1)
const calledWith = redis.rpush.mock.calls[0]
const stagedEvent = JSON.parse(calledWith[1])
expect(stagedEvent.opts.priority).toBe(100)
})
it("should stage grouped events with lowest priority (2097152) for internal events", async () => {
const event = {
name: "grouped-event",
data: { hi: "1234" },
metadata: { eventGroupId: "test-group-priority" },
}
await eventBus.emit(event, { internal: true })
expect(redis.rpush).toHaveBeenCalledTimes(1)
const calledWith = redis.rpush.mock.calls[0]
const stagedEvent = JSON.parse(calledWith[1])
expect(stagedEvent.opts.priority).toBe(2097152)
expect(stagedEvent.opts.internal).toBe(true)
})
it("should stage grouped events with custom priority", async () => {
const event = {
name: "grouped-event",
data: { hi: "1234" },
metadata: { eventGroupId: "test-group-priority" },
}
await eventBus.emit(event, { priority: 50 })
expect(redis.rpush).toHaveBeenCalledTimes(1)
const calledWith = redis.rpush.mock.calls[0]
const stagedEvent = JSON.parse(calledWith[1])
expect(stagedEvent.opts.priority).toBe(50)
})
it("should preserve priority when releasing grouped events", async () => {
const event = {
name: "grouped-event",
data: { hi: "1234" },
metadata: { eventGroupId: "test-group-priority" },
}
const [builtEvent] = (eventBus as any).buildEvents([event], {
priority: 75,
})
eventBus.subscribe("grouped-event", () => Promise.resolve())
redis.lrange = jest.fn((key) => {
if (key === "staging:test-group-priority") {
return Promise.resolve([JSON.stringify(builtEvent)])
}
return Promise.resolve([])
})
redis.unlink = jest.fn()
await eventBus.releaseGroupedEvents("test-group-priority")
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([builtEvent])
expect(builtEvent.opts.priority).toBe(75)
})
it("should stage grouped events with message-level priority overriding emit-level priority", async () => {
const event = {
name: "grouped-event",
data: { hi: "1234" },
metadata: { eventGroupId: "test-group-priority" },
options: { priority: 20 },
}
await eventBus.emit(event, { priority: 100 })
expect(redis.rpush).toHaveBeenCalledTimes(1)
const calledWith = redis.rpush.mock.calls[0]
const stagedEvent = JSON.parse(calledWith[1])
expect(stagedEvent.opts.priority).toBe(20)
})
it("should allow different priorities per grouped message in same emit call", async () => {
const events = [
{
name: "grouped-event-1",
data: { id: "1" },
metadata: { eventGroupId: "test-group-multi" },
options: { priority: 10 },
},
{
name: "grouped-event-2",
data: { id: "2" },
metadata: { eventGroupId: "test-group-multi" },
options: { priority: 50 },
},
{
name: "grouped-event-3",
data: { id: "3" },
metadata: { eventGroupId: "test-group-multi" },
// No options - should use emit-level priority
},
]
await eventBus.emit(events, { priority: 200 })
expect(redis.rpush).toHaveBeenCalledTimes(1)
const calledWith = redis.rpush.mock.calls[0]
// First argument is the key, rest are the events
const stagedEvent1 = JSON.parse(calledWith[1])
const stagedEvent2 = JSON.parse(calledWith[2])
const stagedEvent3 = JSON.parse(calledWith[3])
expect(stagedEvent1.opts.priority).toBe(10)
expect(stagedEvent2.opts.priority).toBe(50)
expect(stagedEvent3.opts.priority).toBe(200)
})
})
})
describe("Events without subscribers", () => { describe("Events without subscribers", () => {
beforeEach(async () => { beforeEach(async () => {
jest.clearAllMocks() jest.clearAllMocks()

View File

@@ -6,6 +6,7 @@ import {
} from "@medusajs/framework/types" } from "@medusajs/framework/types"
import { import {
AbstractEventBusModuleService, AbstractEventBusModuleService,
EventPriority,
isPresent, isPresent,
promiseAll, promiseAll,
} from "@medusajs/framework/utils" } from "@medusajs/framework/utils"
@@ -110,6 +111,20 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
}, },
} }
/**
* Build events for queue processing with priority handling.
*
* Priority levels (lower number = higher priority):
* - 10: Critical business events (e.g., order placed)
* - 100: Default priority for normal events (default)
* - 2,097,152: Lowest priority for internal events
*
* Priority override hierarchy (highest to lowest precedence):
* 1. Message-level options (eventData.options.priority)
* 2. Emit-level options (options.priority)
* 3. Module-level job options (this.jobOptions_.priority)
* 4. Internal flag default (options.internal ? EventPriority.LOWEST : EventPriority.DEFAULT)
*/
private buildEvents<T>( private buildEvents<T>(
eventsData: Message<T>[], eventsData: Message<T>[],
options: Options = {} options: Options = {}
@@ -118,6 +133,7 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
// default options // default options
removeOnComplete: true, removeOnComplete: true,
attempts: 1, attempts: 1,
priority: options.internal ? EventPriority.LOWEST : EventPriority.DEFAULT,
// global options // global options
...this.jobOptions_, ...this.jobOptions_,
...options, ...options,
@@ -132,16 +148,30 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
metadata: eventData.metadata, metadata: eventData.metadata,
} }
const finalOptions: IORedisEventType<T>["opts"] = {
...opts,
...eventData.options,
}
if (
finalOptions.priority != undefined &&
(finalOptions.priority < 1 ||
finalOptions.priority > EventPriority.LOWEST)
) {
this.logger_.warn(
`Invalid priority value: ${finalOptions.priority} for event ${eventData.name}. Must be between 1 and ${EventPriority.LOWEST}`
)
finalOptions.priority = EventPriority.DEFAULT
this.logger_.warn(
`Setting priority to default value: ${EventPriority.DEFAULT} for event ${eventData.name}`
)
}
return { return {
data: event, data: event,
name: eventData.name, name: eventData.name,
opts: { opts: finalOptions,
// options for event group } as IORedisEventType<T>
...opts,
// options for a particular event
...eventData.options,
},
} as any
}) })
} }
@@ -358,8 +388,10 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
`Retrying ${name} which has ${eventSubscribers.length} subscribers (${subscribersInCurrentAttempt.length} of them failed)` `Retrying ${name} which has ${eventSubscribers.length} subscribers (${subscribersInCurrentAttempt.length} of them failed)`
) )
} else { } else {
const prioirityInfo =
opts.priority != undefined ? ` (priority: ${opts.priority})` : ""
this.logger_.info( this.logger_.info(
`Processing ${name} which has ${eventSubscribers.length} subscribers` `Processing ${name}${prioirityInfo} which has ${eventSubscribers.length} subscribers`
) )
} }
} }