diff --git a/.changeset/curly-apricots-double.md b/.changeset/curly-apricots-double.md new file mode 100644 index 0000000000..78e7bdf6dc --- /dev/null +++ b/.changeset/curly-apricots-double.md @@ -0,0 +1,6 @@ +--- +"@medusajs/event-bus-local": patch +"@medusajs/event-bus-redis": patch +--- + +chore(event-vus-*): Do not emit if no subscribers: diff --git a/integration-tests/modules/__tests__/cart/store/cart.completion.ts b/integration-tests/modules/__tests__/cart/store/cart.completion.ts index 1f3bf784d9..3154c24383 100644 --- a/integration-tests/modules/__tests__/cart/store/cart.completion.ts +++ b/integration-tests/modules/__tests__/cart/store/cart.completion.ts @@ -1021,7 +1021,7 @@ medusaIntegrationTestRunner({ eventGroupId ) ?? [] - expect(grouppedEventBefore).toHaveLength(1) + expect(grouppedEventBefore).toHaveLength(17) expect(grouppedEventAfter).toHaveLength(0) // events have been compensated expect(errors[0].error.message).toBe( diff --git a/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts b/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts index eb4ab003c6..f3271e642b 100644 --- a/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts +++ b/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts @@ -26,13 +26,21 @@ describe("LocalEventBusService", () => { }) it("should emit an event", async () => { + eventBus.subscribe("eventName", () => Promise.resolve()) + eventEmitter.emit = jest.fn((data) => data) + eventEmitter.listenerCount = jest.fn((event) => + event === "eventName" ? 1 : 0 + ) await eventBus.emit({ name: "eventName", data: { hi: "1234" }, }) + // Wait for async emission to complete + await new Promise((resolve) => setImmediate(resolve)) + expect(eventEmitter.emit).toHaveBeenCalledTimes(1) expect(eventEmitter.emit).toHaveBeenCalledWith("eventName", { data: { hi: "1234" }, @@ -41,12 +49,17 @@ describe("LocalEventBusService", () => { expect(loggerMock.info).toHaveBeenCalledTimes(1) expect(loggerMock.info).toHaveBeenCalledWith( - "Processing eventName which has undefined subscribers" + "Processing eventName which has 1 subscribers" ) }) it("should emit an event but not log anything if it is internal", async () => { + eventBus.subscribe("eventName", () => Promise.resolve()) + eventEmitter.emit = jest.fn((data) => data) + eventEmitter.listenerCount = jest.fn((event) => + event === "eventName" ? 1 : 0 + ) await eventBus.emit({ name: "eventName", @@ -56,6 +69,9 @@ describe("LocalEventBusService", () => { }, }) + // Wait for async emission to complete + await new Promise((resolve) => setImmediate(resolve)) + expect(eventEmitter.emit).toHaveBeenCalledTimes(1) expect(eventEmitter.emit).toHaveBeenCalledWith("eventName", { data: { hi: "1234" }, @@ -74,6 +90,9 @@ describe("LocalEventBusService", () => { } ) + // Wait for async emission to complete + await new Promise((resolve) => setImmediate(resolve)) + expect(eventEmitter.emit).toHaveBeenCalledTimes(2) expect(eventEmitter.emit).toHaveBeenCalledWith("eventName", { data: { hi: "1234" }, @@ -84,13 +103,22 @@ describe("LocalEventBusService", () => { }) it("should emit multiple events", async () => { + eventBus.subscribe("event-1", () => Promise.resolve()) + eventBus.subscribe("event-2", () => Promise.resolve()) + eventEmitter.emit = jest.fn((data) => data) + eventEmitter.listenerCount = jest.fn((event) => + event === "event-1" || event === "event-2" ? 1 : 0 + ) await eventBus.emit([ { name: "event-1", data: { hi: "1234" } }, { name: "event-2", data: { hi: "5678" } }, ]) + // Wait for async emission to complete + await new Promise((resolve) => setImmediate(resolve)) + expect(eventEmitter.emit).toHaveBeenCalledTimes(2) expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", { data: { hi: "1234" }, @@ -161,7 +189,13 @@ describe("LocalEventBusService", () => { }) it("should release events when requested with eventGroupId", async () => { + eventBus.subscribe("event-1", () => Promise.resolve()) + eventBus.subscribe("event-2", () => Promise.resolve()) + eventEmitter.emit = jest.fn((data) => data) + eventEmitter.listenerCount = jest.fn((event) => + event === "event-1" || event === "event-2" ? 1 : 0 + ) await eventBus.emit([ { @@ -187,6 +221,9 @@ describe("LocalEventBusService", () => { { name: "event-1", data: { test: "1" } }, ]) + // Wait for async emission to complete + await new Promise((resolve) => setImmediate(resolve)) + expect(eventEmitter.emit).toHaveBeenCalledTimes(1) expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", { data: { test: "1" }, @@ -204,6 +241,9 @@ describe("LocalEventBusService", () => { eventEmitter.emit = jest.fn((data) => data) await eventBus.releaseGroupedEvents("group-1") + // Wait for async emission to complete + await new Promise((resolve) => setImmediate(resolve)) + expect( (eventBus as any).groupedEventsMap_.get("group-1") ).not.toBeDefined() @@ -254,5 +294,139 @@ describe("LocalEventBusService", () => { expect(getMap().get("group-2")).not.toBeDefined() }) }) + + describe("Events without subscribers", () => { + beforeEach(() => { + jest.clearAllMocks() + + eventBus = new LocalEventBusService(moduleDeps as any, {}, {} as any) + eventEmitter = (eventBus as any).eventEmitter_ + }) + + it("should not emit events when there are no subscribers", async () => { + eventEmitter.emit = jest.fn((data) => data) + eventEmitter.listenerCount = jest.fn(() => 0) + + await eventBus.emit({ + name: "eventWithoutSubscribers", + data: { test: "data" }, + }) + + expect(eventEmitter.emit).not.toHaveBeenCalled() + }) + + it("should still call interceptors even when there are no subscribers", async () => { + const callInterceptorsSpy = jest.spyOn( + eventBus as any, + "callInterceptors" + ) + + eventEmitter.emit = jest.fn((data) => data) + eventEmitter.listenerCount = jest.fn(() => 0) + + await eventBus.emit({ + name: "eventWithoutSubscribers", + data: { test: "data" }, + }) + + expect(callInterceptorsSpy).toHaveBeenCalledTimes(1) + expect(callInterceptorsSpy).toHaveBeenCalledWith( + expect.objectContaining({ + name: "eventWithoutSubscribers", + data: { test: "data" }, + }), + { isGrouped: false } + ) + + expect(eventEmitter.emit).not.toHaveBeenCalled() + + callInterceptorsSpy.mockRestore() + }) + + it("should emit events with wildcard subscriber", async () => { + eventBus.subscribe("*", () => Promise.resolve()) + + eventEmitter.emit = jest.fn((data) => data) + eventEmitter.listenerCount = jest.fn((event) => (event === "*" ? 1 : 0)) + + await eventBus.emit({ + name: "anyEvent", + data: { test: "data" }, + }) + + // Wait for async emission to complete + await new Promise((resolve) => setImmediate(resolve)) + + expect(eventEmitter.emit).toHaveBeenCalledTimes(1) + expect(eventEmitter.emit).toHaveBeenCalledWith("*", { + data: { test: "data" }, + name: "anyEvent", + }) + }) + + it("should not emit grouped events when releasing if there are no subscribers", async () => { + eventEmitter.emit = jest.fn((data) => data) + eventEmitter.listenerCount = jest.fn(() => 0) + + await eventBus.emit({ + name: "grouped-event-no-sub", + data: { hi: "1234" }, + metadata: { eventGroupId: "test-group-no-sub" }, + }) + + expect( + (eventBus as any).groupedEventsMap_.get("test-group-no-sub") + ).toHaveLength(1) + expect(eventEmitter.emit).not.toHaveBeenCalled() + + jest.clearAllMocks() + eventEmitter.emit = jest.fn((data) => data) + + await eventBus.releaseGroupedEvents("test-group-no-sub") + + expect(eventEmitter.emit).not.toHaveBeenCalled() + + expect( + (eventBus as any).groupedEventsMap_.get("test-group-no-sub") + ).not.toBeDefined() + }) + + it("should still call interceptors for grouped events without subscribers", async () => { + eventEmitter.emit = jest.fn((data) => data) + eventEmitter.listenerCount = jest.fn(() => 0) + + await eventBus.emit({ + name: "grouped-event-no-sub-2", + data: { hi: "1234" }, + metadata: { eventGroupId: "test-group-no-sub-2" }, + }) + + expect( + (eventBus as any).groupedEventsMap_.get("test-group-no-sub-2") + ).toHaveLength(1) + + jest.clearAllMocks() + eventEmitter.emit = jest.fn((data) => data) + + const callInterceptorsSpy = jest.spyOn( + eventBus as any, + "callInterceptors" + ) + + await eventBus.releaseGroupedEvents("test-group-no-sub-2") + + expect(callInterceptorsSpy).toHaveBeenCalledTimes(1) + expect(callInterceptorsSpy).toHaveBeenCalledWith( + expect.objectContaining({ + name: "grouped-event-no-sub-2", + }), + { isGrouped: true, eventGroupId: "test-group-no-sub-2" } + ) + + expect(eventEmitter.emit).not.toHaveBeenCalled() + + callInterceptorsSpy.mockRestore() + }) + }) }) }) diff --git a/packages/modules/event-bus-local/src/services/event-bus-local.ts b/packages/modules/event-bus-local/src/services/event-bus-local.ts index 75364b9fa6..a61e6f78bc 100644 --- a/packages/modules/event-bus-local/src/services/event-bus-local.ts +++ b/packages/modules/event-bus-local/src/services/event-bus-local.ts @@ -58,11 +58,6 @@ export default class LocalEventBusService extends AbstractEventBusModuleService const eventListenersCount = this.eventEmitter_.listenerCount( eventData.name ) - const startSubscribersCount = this.eventEmitter_.listenerCount("*") - - if (eventListenersCount === 0 && startSubscribersCount === 0) { - continue - } if (!options.internal && !eventData.options?.internal) { this.logger_?.info( @@ -93,11 +88,18 @@ export default class LocalEventBusService extends AbstractEventBusModuleService const options_ = eventData.options as { delay: number } const delay = (ms?: number) => (ms ? setTimeout(ms) : Promise.resolve()) + const eventListenersCount = this.eventEmitter_.listenerCount( + eventData.name + ) + delay(options_?.delay).then(async () => { // Call interceptors before emitting void this.callInterceptors(eventData, { isGrouped: false }) - this.eventEmitter_.emit(eventData.name, eventBody) + if (eventListenersCount) { + this.eventEmitter_.emit(eventData.name, eventBody) + } + if (hasStarSubscriber) { this.eventEmitter_.emit("*", eventBody) } @@ -120,11 +122,14 @@ export default class LocalEventBusService extends AbstractEventBusModuleService async releaseGroupedEvents(eventGroupId: string) { let groupedEvents = this.groupedEventsMap_.get(eventGroupId) || [] groupedEvents = JSON.parse(JSON.stringify(groupedEvents)) + const hasStarSubscriber = this.eventEmitter_.listenerCount("*") > 0 for (const event of groupedEvents) { const { options, ...eventBody } = event + const eventListenersCount = this.eventEmitter_.listenerCount(event.name) + const options_ = options as { delay: number } const delay = (ms?: number) => (ms ? setTimeout(ms) : Promise.resolve()) @@ -132,7 +137,10 @@ export default class LocalEventBusService extends AbstractEventBusModuleService // Call interceptors before emitting grouped events void this.callInterceptors(event, { isGrouped: true, eventGroupId }) - this.eventEmitter_.emit(event.name, eventBody) + if (eventListenersCount) { + this.eventEmitter_.emit(event.name, eventBody) + } + if (hasStarSubscriber) { this.eventEmitter_.emit("*", eventBody) } diff --git a/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts b/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts index f67acdce6b..8cdd9f2664 100644 --- a/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts +++ b/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts @@ -3,13 +3,6 @@ import { Queue, Worker } from "bullmq" import { Redis } from "ioredis" import RedisEventBusService from "../event-bus-redis" -// const redisURL = "redis://localhost:6379" -// const client = new Redis(6379, redisURL, { -// // Lazy connect to properly handle connection errors -// lazyConnect: true, -// maxRetriesPerRequest: 0, -// }) - jest.mock("bullmq") jest.mock("ioredis") @@ -96,6 +89,8 @@ describe("RedisEventBusService", () => { }) it("should add job to queue with default options", async () => { + eventBus.subscribe("eventName", () => Promise.resolve()) + await eventBus.emit([ { name: "eventName", @@ -119,6 +114,8 @@ describe("RedisEventBusService", () => { }) it("should add job to queue with custom options passed directly upon emitting", async () => { + eventBus.subscribe("eventName", () => Promise.resolve()) + await eventBus.emit([{ name: "eventName", data: { hi: "1234" } }], { attempts: 3, backoff: 5000, @@ -158,6 +155,8 @@ describe("RedisEventBusService", () => { queue = (eventBus as any).queue_ queue.addBulk = jest.fn() + eventBus.subscribe("eventName", () => Promise.resolve()) + await eventBus.emit( [ { @@ -202,6 +201,8 @@ describe("RedisEventBusService", () => { queue = (eventBus as any).queue_ queue.addBulk = jest.fn() + eventBus.subscribe("eventName", () => Promise.resolve()) + await eventBus.emit( { name: "eventName", @@ -268,12 +269,15 @@ describe("RedisEventBusService", () => { }, ] + eventBus.subscribe("ungrouped-event-2", () => Promise.resolve()) + eventBus.subscribe("grouped-event-1", () => Promise.resolve()) + eventBus.subscribe("grouped-event-2", () => Promise.resolve()) + eventBus.subscribe("grouped-event-3", () => Promise.resolve()) + redis.unlink = jest.fn() await eventBus.emit(events, options) - // Expect 1 event to have been send - // Expect 2 pushes to redis as there are 2 groups of events to push expect(queue.addBulk).toHaveBeenCalledTimes(1) expect(redis.rpush).toHaveBeenCalledTimes(2) expect(redis.unlink).not.toHaveBeenCalled() @@ -331,6 +335,149 @@ describe("RedisEventBusService", () => { expect(redis.unlink).toHaveBeenCalledWith("staging:test-group-2") }) }) + + describe("Events without subscribers", () => { + beforeEach(async () => { + jest.clearAllMocks() + + eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, { + scope: "internal", + }) + + queue = (eventBus as any).queue_ + queue.addBulk = jest.fn() + redis = (eventBus as any).eventBusRedisConnection_ + redis.rpush = jest.fn() + }) + + it("should not add events to queue when there are no subscribers", async () => { + await eventBus.emit([ + { + name: "eventWithoutSubscribers", + data: { test: "data" }, + }, + ]) + + expect(queue.addBulk).not.toHaveBeenCalled() + }) + + it("should still call interceptors even when there are no subscribers", async () => { + const callInterceptorsSpy = jest.spyOn( + eventBus as any, + "callInterceptors" + ) + + await eventBus.emit([ + { + name: "eventWithoutSubscribers", + data: { test: "data" }, + }, + ]) + + expect(callInterceptorsSpy).toHaveBeenCalledTimes(1) + expect(callInterceptorsSpy).toHaveBeenCalledWith( + { + name: "eventWithoutSubscribers", + data: { test: "data" }, + }, + { isGrouped: false } + ) + + expect(queue.addBulk).not.toHaveBeenCalled() + + callInterceptorsSpy.mockRestore() + }) + + it("should add events to queue only for events with subscribers using wildcard", async () => { + eventBus.subscribe("*", () => Promise.resolve()) + + await eventBus.emit([ + { + name: "anyEvent", + data: { test: "data" }, + }, + ]) + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + }) + + it("should not add grouped events to queue when releasing if there are no subscribers", async () => { + const options = { delay: 1000 } + const event = { + name: "grouped-event-no-sub", + data: { hi: "1234" }, + metadata: { eventGroupId: "test-group-no-sub" }, + } + + await eventBus.emit(event, options) + + expect(redis.rpush).toHaveBeenCalledTimes(1) + expect(queue.addBulk).not.toHaveBeenCalled() + + const [builtEvent] = (eventBus as any).buildEvents([event], options) + + redis.lrange = jest.fn((key) => { + if (key === "staging:test-group-no-sub") { + return Promise.resolve([JSON.stringify(builtEvent)]) + } + return Promise.resolve([]) + }) + + redis.unlink = jest.fn() + queue.addBulk = jest.fn() + + await eventBus.releaseGroupedEvents("test-group-no-sub") + + expect(queue.addBulk).not.toHaveBeenCalled() + + expect(redis.unlink).toHaveBeenCalledWith("staging:test-group-no-sub") + }) + + it("should still call interceptors for grouped events without subscribers", async () => { + const options = { delay: 1000 } + const event = { + name: "grouped-event-no-sub-2", + data: { hi: "1234" }, + metadata: { eventGroupId: "test-group-no-sub-2" }, + } + + await eventBus.emit(event, options) + + const [builtEvent] = (eventBus as any).buildEvents([event], options) + + redis.lrange = jest.fn((key) => { + if (key === "staging:test-group-no-sub-2") { + return Promise.resolve([JSON.stringify(builtEvent)]) + } + return Promise.resolve([]) + }) + + redis.unlink = jest.fn() + queue.addBulk = jest.fn() + + const callInterceptorsSpy = jest.spyOn( + eventBus as any, + "callInterceptors" + ) + + await eventBus.releaseGroupedEvents("test-group-no-sub-2") + + expect(callInterceptorsSpy).toHaveBeenCalledTimes(1) + expect(callInterceptorsSpy).toHaveBeenCalledWith( + expect.objectContaining({ + name: "grouped-event-no-sub-2", + }), + { + isGrouped: true, + eventGroupId: "test-group-no-sub-2", + } + ) + + expect(queue.addBulk).not.toHaveBeenCalled() + + callInterceptorsSpy.mockRestore() + }) + }) }) describe("worker_", () => { @@ -352,7 +499,6 @@ describe("RedisEventBusService", () => { return Promise.resolve() }) - // TODO: The typing for this is all over the place await eventBus.worker_({ name: "eventName", data: { data: { test: 1 } }, diff --git a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts index 98397b72c4..6973c5e3e5 100644 --- a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts +++ b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts @@ -162,9 +162,17 @@ export default class RedisEventBusService extends AbstractEventBusModuleService this.callInterceptors(eventData, { isGrouped: false }) ) - const emitData = this.buildEvents(eventsToEmit, options) + const eventsWithSubscribers = eventsToEmit.filter((eventData) => { + const eventSubscribers = + this.eventToSubscribersMap.get(eventData.name) || [] + const wildcardSubscribers = this.eventToSubscribersMap.get("*") || [] + return eventSubscribers.length || wildcardSubscribers.length + }) - promises.push(this.queue_.addBulk(emitData)) + if (eventsWithSubscribers.length) { + const emitData = this.buildEvents(eventsWithSubscribers, options) + promises.push(this.queue_.addBulk(emitData)) + } } for (const [groupId, events] of groupEventsMap.entries()) { @@ -220,7 +228,6 @@ export default class RedisEventBusService extends AbstractEventBusModuleService // Call interceptors before emitting grouped events // Extract the original messages from the job data structure groupedEvents.map((jobData) => { - // Reconstruct the message from the job data const message = { name: jobData.name, data: jobData.data, @@ -232,7 +239,16 @@ export default class RedisEventBusService extends AbstractEventBusModuleService }) }) - await this.queue_.addBulk(groupedEvents) + const eventsWithSubscribers = groupedEvents.filter((jobData) => { + const eventSubscribers = + this.eventToSubscribersMap.get(jobData.name) || [] + const wildcardSubscribers = this.eventToSubscribersMap.get("*") || [] + return eventSubscribers.length || wildcardSubscribers.length + }) + + if (eventsWithSubscribers.length) { + await this.queue_.addBulk(eventsWithSubscribers) + } await this.clearGroupedEvents(eventGroupId) }