From ecc8efcb049f5c77cb7e38c2b6a273860590a5f4 Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Wed, 22 Jan 2025 08:40:31 +0100 Subject: [PATCH] fix(event): Subscriber ID miss usage (#11047) PARTIALLY RESOLVES FRMW-2876 **What** Fix wrong usage of the `subscriberId` in the event bus. It happens that the subscriber id coming from the context was not used at all. This issue lead to duplicated event subscriber with the same subscriber id, it also prevent unsubscribing from event since rand id will be assigned. **NOTE** This PR does not handle overide strategy for subscribers with the same id. this still needs to be discussed --- .changeset/green-poets-teach.md | 6 +++ .../abstract-event-bus-module.spec.ts | 52 +++++++++++++++++++ packages/core/utils/src/event-bus/index.ts | 16 +++--- .../src/services/event-bus-local.ts | 32 ++++-------- 4 files changed, 75 insertions(+), 31 deletions(-) create mode 100644 .changeset/green-poets-teach.md create mode 100644 packages/core/utils/src/event-bus/__tests__/abstract-event-bus-module.spec.ts diff --git a/.changeset/green-poets-teach.md b/.changeset/green-poets-teach.md new file mode 100644 index 0000000000..80f4ccff26 --- /dev/null +++ b/.changeset/green-poets-teach.md @@ -0,0 +1,6 @@ +--- +"@medusajs/event-bus-local": patch +"@medusajs/utils": patch +--- + +fix(event): Subscriber ID missusage diff --git a/packages/core/utils/src/event-bus/__tests__/abstract-event-bus-module.spec.ts b/packages/core/utils/src/event-bus/__tests__/abstract-event-bus-module.spec.ts new file mode 100644 index 0000000000..44db3c2c2f --- /dev/null +++ b/packages/core/utils/src/event-bus/__tests__/abstract-event-bus-module.spec.ts @@ -0,0 +1,52 @@ +import { EventBusTypes } from "@medusajs/types" +import { AbstractEventBusModuleService } from ".." + +class MockEventBusModuleService extends AbstractEventBusModuleService { + constructor() { + super({}, {}, {} as any) + } + + async emit( + data: EventBusTypes.Message | EventBusTypes.Message[], + options: Record + ): Promise { + return Promise.resolve() + } + + async releaseGroupedEvents(eventGroupId: string): Promise { + return Promise.resolve() + } + + async clearGroupedEvents(eventGroupId: string): Promise { + return Promise.resolve() + } +} + +describe("AbstractEventBusModuleService", () => { + it("should be able to subscribe to an event", () => { + const eventBus = new MockEventBusModuleService() + const subscriber = jest.fn() + eventBus.subscribe("test", subscriber) + expect(eventBus.eventToSubscribersMap.get("test")).toEqual([ + { id: (subscriber as any).subscriberId, subscriber }, + ]) + }) + + it("should throw an error if a subscriber with the same id is already subscribed to an event", () => { + const eventBus = new MockEventBusModuleService() + const subscriber = jest.fn() + const subscriberId = "test" + eventBus.subscribe("test", subscriber, { subscriberId }) + expect(() => + eventBus.subscribe("test", subscriber, { subscriberId }) + ).toThrow() + }) + + it("should be able to unsubscribe from an event", () => { + const eventBus = new MockEventBusModuleService() + const subscriber = jest.fn() + eventBus.subscribe("test", subscriber) + eventBus.unsubscribe("test", subscriber) + expect(eventBus.eventToSubscribersMap.get("test")).toEqual([]) + }) +}) diff --git a/packages/core/utils/src/event-bus/index.ts b/packages/core/utils/src/event-bus/index.ts index c10a4de9d0..63a9974497 100644 --- a/packages/core/utils/src/event-bus/index.ts +++ b/packages/core/utils/src/event-bus/index.ts @@ -85,12 +85,14 @@ export abstract class AbstractEventBusModuleService * otherwise we generate a random using a ulid */ - const randId = ulid() const event = eventName.toString() + const subscriberId = context?.subscriberId ?? `${event}-${ulid()}` + + ;(subscriber as any).subscriberId = subscriberId this.storeSubscribers({ event, - subscriberId: context?.subscriberId ?? `${event}-${randId}`, + subscriberId, subscriber, }) @@ -100,21 +102,19 @@ export abstract class AbstractEventBusModuleService unsubscribe( eventName: string | symbol, subscriber: EventBusTypes.Subscriber, - context: EventBusTypes.SubscriberContext + context?: EventBusTypes.SubscriberContext ): this { if (!this.isWorkerMode) { return this } - if (typeof subscriber !== `function`) { - throw new Error("Subscriber must be a function") - } - const existingSubscribers = this.eventToSubscribersMap_.get(eventName) + const subscriberId = + context?.subscriberId ?? (subscriber as any).subscriberId if (existingSubscribers?.length) { const subIndex = existingSubscribers?.findIndex( - (sub) => sub.id === context?.subscriberId + (sub) => sub.id === subscriberId ) if (subIndex !== -1) { diff --git a/packages/modules/event-bus-local/src/services/event-bus-local.ts b/packages/modules/event-bus-local/src/services/event-bus-local.ts index 1bc97179e3..eb537c672b 100644 --- a/packages/modules/event-bus-local/src/services/event-bus-local.ts +++ b/packages/modules/event-bus-local/src/services/event-bus-local.ts @@ -10,7 +10,6 @@ import { import { AbstractEventBusModuleService } from "@medusajs/framework/utils" import { EventEmitter } from "events" import { setTimeout } from "timers/promises" -import { ulid } from "ulid" type InjectedDependencies = { logger: Logger @@ -133,13 +132,13 @@ export default class LocalEventBusService extends AbstractEventBusModuleService this.groupedEventsMap_.delete(eventGroupId) } - subscribe(event: string | symbol, subscriber: Subscriber): this { - if (!this.isWorkerMode) { - return this - } + subscribe( + event: string | symbol, + subscriber: Subscriber, + context?: EventBusTypes.SubscriberContext + ): this { + super.subscribe(event, subscriber, context) - const randId = ulid() - this.storeSubscribers({ event, subscriberId: randId, subscriber }) this.eventEmitter_.on(event, async (data: Event) => { try { await subscriber(data) @@ -150,29 +149,16 @@ export default class LocalEventBusService extends AbstractEventBusModuleService this.logger_?.error(err) } }) + return this } unsubscribe( event: string | symbol, subscriber: Subscriber, - context?: EventBusTypes.SubscriberContext + context: EventBusTypes.SubscriberContext ): this { - if (!this.isWorkerMode) { - return this - } - - const existingSubscribers = this.eventToSubscribersMap_.get(event) - - if (existingSubscribers?.length) { - const subIndex = existingSubscribers?.findIndex( - (sub) => sub.id === context?.subscriberId - ) - - if (subIndex !== -1) { - this.eventToSubscribersMap_.get(event)?.splice(subIndex as number, 1) - } - } + super.unsubscribe(event, subscriber, context) this.eventEmitter_.off(event, subscriber) return this