chore(event-bus-*): Do not emit if no subscribers: (#14084)

* chore(event-vus-*): Do not emit if no subscribers:

* Create curly-apricots-double.md

* add tests

* align tests
This commit is contained in:
Adrien de Peretti
2025-11-24 09:36:52 +01:00
committed by GitHub
parent 6c3ec528f1
commit 113f200a99
6 changed files with 373 additions and 23 deletions

View File

@@ -0,0 +1,6 @@
---
"@medusajs/event-bus-local": patch
"@medusajs/event-bus-redis": patch
---
chore(event-vus-*): Do not emit if no subscribers:

View File

@@ -1021,7 +1021,7 @@ medusaIntegrationTestRunner({
eventGroupId
) ?? []
expect(grouppedEventBefore).toHaveLength(1)
expect(grouppedEventBefore).toHaveLength(17)
expect(grouppedEventAfter).toHaveLength(0) // events have been compensated
expect(errors[0].error.message).toBe(

View File

@@ -26,13 +26,21 @@ describe("LocalEventBusService", () => {
})
it("should emit an event", async () => {
eventBus.subscribe("eventName", () => Promise.resolve())
eventEmitter.emit = jest.fn((data) => data)
eventEmitter.listenerCount = jest.fn((event) =>
event === "eventName" ? 1 : 0
)
await eventBus.emit({
name: "eventName",
data: { hi: "1234" },
})
// Wait for async emission to complete
await new Promise((resolve) => setImmediate(resolve))
expect(eventEmitter.emit).toHaveBeenCalledTimes(1)
expect(eventEmitter.emit).toHaveBeenCalledWith("eventName", {
data: { hi: "1234" },
@@ -41,12 +49,17 @@ describe("LocalEventBusService", () => {
expect(loggerMock.info).toHaveBeenCalledTimes(1)
expect(loggerMock.info).toHaveBeenCalledWith(
"Processing eventName which has undefined subscribers"
"Processing eventName which has 1 subscribers"
)
})
it("should emit an event but not log anything if it is internal", async () => {
eventBus.subscribe("eventName", () => Promise.resolve())
eventEmitter.emit = jest.fn((data) => data)
eventEmitter.listenerCount = jest.fn((event) =>
event === "eventName" ? 1 : 0
)
await eventBus.emit({
name: "eventName",
@@ -56,6 +69,9 @@ describe("LocalEventBusService", () => {
},
})
// Wait for async emission to complete
await new Promise((resolve) => setImmediate(resolve))
expect(eventEmitter.emit).toHaveBeenCalledTimes(1)
expect(eventEmitter.emit).toHaveBeenCalledWith("eventName", {
data: { hi: "1234" },
@@ -74,6 +90,9 @@ describe("LocalEventBusService", () => {
}
)
// Wait for async emission to complete
await new Promise((resolve) => setImmediate(resolve))
expect(eventEmitter.emit).toHaveBeenCalledTimes(2)
expect(eventEmitter.emit).toHaveBeenCalledWith("eventName", {
data: { hi: "1234" },
@@ -84,13 +103,22 @@ describe("LocalEventBusService", () => {
})
it("should emit multiple events", async () => {
eventBus.subscribe("event-1", () => Promise.resolve())
eventBus.subscribe("event-2", () => Promise.resolve())
eventEmitter.emit = jest.fn((data) => data)
eventEmitter.listenerCount = jest.fn((event) =>
event === "event-1" || event === "event-2" ? 1 : 0
)
await eventBus.emit([
{ name: "event-1", data: { hi: "1234" } },
{ name: "event-2", data: { hi: "5678" } },
])
// Wait for async emission to complete
await new Promise((resolve) => setImmediate(resolve))
expect(eventEmitter.emit).toHaveBeenCalledTimes(2)
expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", {
data: { hi: "1234" },
@@ -161,7 +189,13 @@ describe("LocalEventBusService", () => {
})
it("should release events when requested with eventGroupId", async () => {
eventBus.subscribe("event-1", () => Promise.resolve())
eventBus.subscribe("event-2", () => Promise.resolve())
eventEmitter.emit = jest.fn((data) => data)
eventEmitter.listenerCount = jest.fn((event) =>
event === "event-1" || event === "event-2" ? 1 : 0
)
await eventBus.emit([
{
@@ -187,6 +221,9 @@ describe("LocalEventBusService", () => {
{ name: "event-1", data: { test: "1" } },
])
// Wait for async emission to complete
await new Promise((resolve) => setImmediate(resolve))
expect(eventEmitter.emit).toHaveBeenCalledTimes(1)
expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", {
data: { test: "1" },
@@ -204,6 +241,9 @@ describe("LocalEventBusService", () => {
eventEmitter.emit = jest.fn((data) => data)
await eventBus.releaseGroupedEvents("group-1")
// Wait for async emission to complete
await new Promise((resolve) => setImmediate(resolve))
expect(
(eventBus as any).groupedEventsMap_.get("group-1")
).not.toBeDefined()
@@ -254,5 +294,139 @@ describe("LocalEventBusService", () => {
expect(getMap().get("group-2")).not.toBeDefined()
})
})
describe("Events without subscribers", () => {
beforeEach(() => {
jest.clearAllMocks()
eventBus = new LocalEventBusService(moduleDeps as any, {}, {} as any)
eventEmitter = (eventBus as any).eventEmitter_
})
it("should not emit events when there are no subscribers", async () => {
eventEmitter.emit = jest.fn((data) => data)
eventEmitter.listenerCount = jest.fn(() => 0)
await eventBus.emit({
name: "eventWithoutSubscribers",
data: { test: "data" },
})
expect(eventEmitter.emit).not.toHaveBeenCalled()
})
it("should still call interceptors even when there are no subscribers", async () => {
const callInterceptorsSpy = jest.spyOn(
eventBus as any,
"callInterceptors"
)
eventEmitter.emit = jest.fn((data) => data)
eventEmitter.listenerCount = jest.fn(() => 0)
await eventBus.emit({
name: "eventWithoutSubscribers",
data: { test: "data" },
})
expect(callInterceptorsSpy).toHaveBeenCalledTimes(1)
expect(callInterceptorsSpy).toHaveBeenCalledWith(
expect.objectContaining({
name: "eventWithoutSubscribers",
data: { test: "data" },
}),
{ isGrouped: false }
)
expect(eventEmitter.emit).not.toHaveBeenCalled()
callInterceptorsSpy.mockRestore()
})
it("should emit events with wildcard subscriber", async () => {
eventBus.subscribe("*", () => Promise.resolve())
eventEmitter.emit = jest.fn((data) => data)
eventEmitter.listenerCount = jest.fn((event) => (event === "*" ? 1 : 0))
await eventBus.emit({
name: "anyEvent",
data: { test: "data" },
})
// Wait for async emission to complete
await new Promise((resolve) => setImmediate(resolve))
expect(eventEmitter.emit).toHaveBeenCalledTimes(1)
expect(eventEmitter.emit).toHaveBeenCalledWith("*", {
data: { test: "data" },
name: "anyEvent",
})
})
it("should not emit grouped events when releasing if there are no subscribers", async () => {
eventEmitter.emit = jest.fn((data) => data)
eventEmitter.listenerCount = jest.fn(() => 0)
await eventBus.emit({
name: "grouped-event-no-sub",
data: { hi: "1234" },
metadata: { eventGroupId: "test-group-no-sub" },
})
expect(
(eventBus as any).groupedEventsMap_.get("test-group-no-sub")
).toHaveLength(1)
expect(eventEmitter.emit).not.toHaveBeenCalled()
jest.clearAllMocks()
eventEmitter.emit = jest.fn((data) => data)
await eventBus.releaseGroupedEvents("test-group-no-sub")
expect(eventEmitter.emit).not.toHaveBeenCalled()
expect(
(eventBus as any).groupedEventsMap_.get("test-group-no-sub")
).not.toBeDefined()
})
it("should still call interceptors for grouped events without subscribers", async () => {
eventEmitter.emit = jest.fn((data) => data)
eventEmitter.listenerCount = jest.fn(() => 0)
await eventBus.emit({
name: "grouped-event-no-sub-2",
data: { hi: "1234" },
metadata: { eventGroupId: "test-group-no-sub-2" },
})
expect(
(eventBus as any).groupedEventsMap_.get("test-group-no-sub-2")
).toHaveLength(1)
jest.clearAllMocks()
eventEmitter.emit = jest.fn((data) => data)
const callInterceptorsSpy = jest.spyOn(
eventBus as any,
"callInterceptors"
)
await eventBus.releaseGroupedEvents("test-group-no-sub-2")
expect(callInterceptorsSpy).toHaveBeenCalledTimes(1)
expect(callInterceptorsSpy).toHaveBeenCalledWith(
expect.objectContaining({
name: "grouped-event-no-sub-2",
}),
{ isGrouped: true, eventGroupId: "test-group-no-sub-2" }
)
expect(eventEmitter.emit).not.toHaveBeenCalled()
callInterceptorsSpy.mockRestore()
})
})
})
})

View File

@@ -58,11 +58,6 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
const eventListenersCount = this.eventEmitter_.listenerCount(
eventData.name
)
const startSubscribersCount = this.eventEmitter_.listenerCount("*")
if (eventListenersCount === 0 && startSubscribersCount === 0) {
continue
}
if (!options.internal && !eventData.options?.internal) {
this.logger_?.info(
@@ -93,11 +88,18 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
const options_ = eventData.options as { delay: number }
const delay = (ms?: number) => (ms ? setTimeout(ms) : Promise.resolve())
const eventListenersCount = this.eventEmitter_.listenerCount(
eventData.name
)
delay(options_?.delay).then(async () => {
// Call interceptors before emitting
void this.callInterceptors(eventData, { isGrouped: false })
this.eventEmitter_.emit(eventData.name, eventBody)
if (eventListenersCount) {
this.eventEmitter_.emit(eventData.name, eventBody)
}
if (hasStarSubscriber) {
this.eventEmitter_.emit("*", eventBody)
}
@@ -120,11 +122,14 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
async releaseGroupedEvents(eventGroupId: string) {
let groupedEvents = this.groupedEventsMap_.get(eventGroupId) || []
groupedEvents = JSON.parse(JSON.stringify(groupedEvents))
const hasStarSubscriber = this.eventEmitter_.listenerCount("*") > 0
for (const event of groupedEvents) {
const { options, ...eventBody } = event
const eventListenersCount = this.eventEmitter_.listenerCount(event.name)
const options_ = options as { delay: number }
const delay = (ms?: number) => (ms ? setTimeout(ms) : Promise.resolve())
@@ -132,7 +137,10 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
// Call interceptors before emitting grouped events
void this.callInterceptors(event, { isGrouped: true, eventGroupId })
this.eventEmitter_.emit(event.name, eventBody)
if (eventListenersCount) {
this.eventEmitter_.emit(event.name, eventBody)
}
if (hasStarSubscriber) {
this.eventEmitter_.emit("*", eventBody)
}

View File

@@ -3,13 +3,6 @@ import { Queue, Worker } from "bullmq"
import { Redis } from "ioredis"
import RedisEventBusService from "../event-bus-redis"
// const redisURL = "redis://localhost:6379"
// const client = new Redis(6379, redisURL, {
// // Lazy connect to properly handle connection errors
// lazyConnect: true,
// maxRetriesPerRequest: 0,
// })
jest.mock("bullmq")
jest.mock("ioredis")
@@ -96,6 +89,8 @@ describe("RedisEventBusService", () => {
})
it("should add job to queue with default options", async () => {
eventBus.subscribe("eventName", () => Promise.resolve())
await eventBus.emit([
{
name: "eventName",
@@ -119,6 +114,8 @@ describe("RedisEventBusService", () => {
})
it("should add job to queue with custom options passed directly upon emitting", async () => {
eventBus.subscribe("eventName", () => Promise.resolve())
await eventBus.emit([{ name: "eventName", data: { hi: "1234" } }], {
attempts: 3,
backoff: 5000,
@@ -158,6 +155,8 @@ describe("RedisEventBusService", () => {
queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
eventBus.subscribe("eventName", () => Promise.resolve())
await eventBus.emit(
[
{
@@ -202,6 +201,8 @@ describe("RedisEventBusService", () => {
queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
eventBus.subscribe("eventName", () => Promise.resolve())
await eventBus.emit(
{
name: "eventName",
@@ -268,12 +269,15 @@ describe("RedisEventBusService", () => {
},
]
eventBus.subscribe("ungrouped-event-2", () => Promise.resolve())
eventBus.subscribe("grouped-event-1", () => Promise.resolve())
eventBus.subscribe("grouped-event-2", () => Promise.resolve())
eventBus.subscribe("grouped-event-3", () => Promise.resolve())
redis.unlink = jest.fn()
await eventBus.emit(events, options)
// Expect 1 event to have been send
// Expect 2 pushes to redis as there are 2 groups of events to push
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(redis.rpush).toHaveBeenCalledTimes(2)
expect(redis.unlink).not.toHaveBeenCalled()
@@ -331,6 +335,149 @@ describe("RedisEventBusService", () => {
expect(redis.unlink).toHaveBeenCalledWith("staging:test-group-2")
})
})
describe("Events without subscribers", () => {
beforeEach(async () => {
jest.clearAllMocks()
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
scope: "internal",
})
queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
redis = (eventBus as any).eventBusRedisConnection_
redis.rpush = jest.fn()
})
it("should not add events to queue when there are no subscribers", async () => {
await eventBus.emit([
{
name: "eventWithoutSubscribers",
data: { test: "data" },
},
])
expect(queue.addBulk).not.toHaveBeenCalled()
})
it("should still call interceptors even when there are no subscribers", async () => {
const callInterceptorsSpy = jest.spyOn(
eventBus as any,
"callInterceptors"
)
await eventBus.emit([
{
name: "eventWithoutSubscribers",
data: { test: "data" },
},
])
expect(callInterceptorsSpy).toHaveBeenCalledTimes(1)
expect(callInterceptorsSpy).toHaveBeenCalledWith(
{
name: "eventWithoutSubscribers",
data: { test: "data" },
},
{ isGrouped: false }
)
expect(queue.addBulk).not.toHaveBeenCalled()
callInterceptorsSpy.mockRestore()
})
it("should add events to queue only for events with subscribers using wildcard", async () => {
eventBus.subscribe("*", () => Promise.resolve())
await eventBus.emit([
{
name: "anyEvent",
data: { test: "data" },
},
])
expect(queue.addBulk).toHaveBeenCalledTimes(1)
})
it("should not add grouped events to queue when releasing if there are no subscribers", async () => {
const options = { delay: 1000 }
const event = {
name: "grouped-event-no-sub",
data: { hi: "1234" },
metadata: { eventGroupId: "test-group-no-sub" },
}
await eventBus.emit(event, options)
expect(redis.rpush).toHaveBeenCalledTimes(1)
expect(queue.addBulk).not.toHaveBeenCalled()
const [builtEvent] = (eventBus as any).buildEvents([event], options)
redis.lrange = jest.fn((key) => {
if (key === "staging:test-group-no-sub") {
return Promise.resolve([JSON.stringify(builtEvent)])
}
return Promise.resolve([])
})
redis.unlink = jest.fn()
queue.addBulk = jest.fn()
await eventBus.releaseGroupedEvents("test-group-no-sub")
expect(queue.addBulk).not.toHaveBeenCalled()
expect(redis.unlink).toHaveBeenCalledWith("staging:test-group-no-sub")
})
it("should still call interceptors for grouped events without subscribers", async () => {
const options = { delay: 1000 }
const event = {
name: "grouped-event-no-sub-2",
data: { hi: "1234" },
metadata: { eventGroupId: "test-group-no-sub-2" },
}
await eventBus.emit(event, options)
const [builtEvent] = (eventBus as any).buildEvents([event], options)
redis.lrange = jest.fn((key) => {
if (key === "staging:test-group-no-sub-2") {
return Promise.resolve([JSON.stringify(builtEvent)])
}
return Promise.resolve([])
})
redis.unlink = jest.fn()
queue.addBulk = jest.fn()
const callInterceptorsSpy = jest.spyOn(
eventBus as any,
"callInterceptors"
)
await eventBus.releaseGroupedEvents("test-group-no-sub-2")
expect(callInterceptorsSpy).toHaveBeenCalledTimes(1)
expect(callInterceptorsSpy).toHaveBeenCalledWith(
expect.objectContaining({
name: "grouped-event-no-sub-2",
}),
{
isGrouped: true,
eventGroupId: "test-group-no-sub-2",
}
)
expect(queue.addBulk).not.toHaveBeenCalled()
callInterceptorsSpy.mockRestore()
})
})
})
describe("worker_", () => {
@@ -352,7 +499,6 @@ describe("RedisEventBusService", () => {
return Promise.resolve()
})
// TODO: The typing for this is all over the place
await eventBus.worker_({
name: "eventName",
data: { data: { test: 1 } },

View File

@@ -162,9 +162,17 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
this.callInterceptors(eventData, { isGrouped: false })
)
const emitData = this.buildEvents(eventsToEmit, options)
const eventsWithSubscribers = eventsToEmit.filter((eventData) => {
const eventSubscribers =
this.eventToSubscribersMap.get(eventData.name) || []
const wildcardSubscribers = this.eventToSubscribersMap.get("*") || []
return eventSubscribers.length || wildcardSubscribers.length
})
promises.push(this.queue_.addBulk(emitData))
if (eventsWithSubscribers.length) {
const emitData = this.buildEvents(eventsWithSubscribers, options)
promises.push(this.queue_.addBulk(emitData))
}
}
for (const [groupId, events] of groupEventsMap.entries()) {
@@ -220,7 +228,6 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
// Call interceptors before emitting grouped events
// Extract the original messages from the job data structure
groupedEvents.map((jobData) => {
// Reconstruct the message from the job data
const message = {
name: jobData.name,
data: jobData.data,
@@ -232,7 +239,16 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
})
})
await this.queue_.addBulk(groupedEvents)
const eventsWithSubscribers = groupedEvents.filter((jobData) => {
const eventSubscribers =
this.eventToSubscribersMap.get(jobData.name) || []
const wildcardSubscribers = this.eventToSubscribersMap.get("*") || []
return eventSubscribers.length || wildcardSubscribers.length
})
if (eventsWithSubscribers.length) {
await this.queue_.addBulk(eventsWithSubscribers)
}
await this.clearGroupedEvents(eventGroupId)
}