diff --git a/.changeset/calm-eggs-collect.md b/.changeset/calm-eggs-collect.md new file mode 100644 index 0000000000..9cd3103192 --- /dev/null +++ b/.changeset/calm-eggs-collect.md @@ -0,0 +1,8 @@ +--- +"@medusajs/event-bus-local": patch +"@medusajs/medusa": patch +"@medusajs/types": patch +"@medusajs/utils": patch +--- + +fix(medusa, event-bus-redis, event-bus-local): Revert retrieveSubscribers as the wildcard prevent us from filtering diff --git a/packages/event-bus-local/src/services/event-bus-local.ts b/packages/event-bus-local/src/services/event-bus-local.ts index b7c31344f7..2acd56f7f5 100644 --- a/packages/event-bus-local/src/services/event-bus-local.ts +++ b/packages/event-bus-local/src/services/event-bus-local.ts @@ -86,7 +86,7 @@ export default class LocalEventBusService extends AbstractEventBusModuleService subscriber: Subscriber, context?: EventBusTypes.SubscriberContext ): this { - const existingSubscribers = this.retrieveSubscribers(event) + const existingSubscribers = this.eventToSubscribersMap_.get(event) if (existingSubscribers?.length) { const subIndex = existingSubscribers?.findIndex( diff --git a/packages/medusa/src/services/event-bus.ts b/packages/medusa/src/services/event-bus.ts index 184a566086..8adc762be1 100644 --- a/packages/medusa/src/services/event-bus.ts +++ b/packages/medusa/src/services/event-bus.ts @@ -137,7 +137,7 @@ export default class EventBusService ): Promise { const manager = this.activeManager_ const isBulkEmit = !isString(eventNameOrData) - let events: EventBusTypes.EmitData[] = isBulkEmit + const events: EventBusTypes.EmitData[] = isBulkEmit ? eventNameOrData.map((event) => ({ eventName: event.eventName, data: event.data, @@ -151,32 +151,23 @@ export default class EventBusService }, ] - events = events.filter( - (event) => - this.eventBusModuleService_?.retrieveSubscribers(event.eventName) - ?.length ?? false - ) + /** + * We store events in the database when in an ongoing transaction. + * + * If we are in a long-running transaction, the ACID properties of a + * transaction ensure, that events are kept invisible to the enqueuer + * until the transaction has committed. + * + * This patterns also gives us at-least-once delivery of events, as events + * are only removed from the database, if they are successfully delivered. + * + * In case of a failing transaction, jobs stored in the database are removed + * as part of the rollback. + */ - let stagedJobs: StagedJob[] = [] - if (events.length) { - /** - * We store events in the database when in an ongoing transaction. - * - * If we are in a long-running transaction, the ACID properties of a - * transaction ensure, that events are kept invisible to the enqueuer - * until the transaction has committed. - * - * This patterns also gives us at-least-once delivery of events, as events - * are only removed from the database, if they are successfully delivered. - * - * In case of a failing transaction, jobs stored in the database are removed - * as part of the rollback. - */ - - stagedJobs = await this.stagedJobService_ - .withTransaction(manager) - .create(events) - } + const stagedJobs = await this.stagedJobService_ + .withTransaction(manager) + .create(events) return (!isBulkEmit ? stagedJobs[0] : stagedJobs) as unknown as TResult } diff --git a/packages/types/src/event-bus/event-bus-module.ts b/packages/types/src/event-bus/event-bus-module.ts index dad47a55b9..c6a70a258f 100644 --- a/packages/types/src/event-bus/event-bus-module.ts +++ b/packages/types/src/event-bus/event-bus-module.ts @@ -1,15 +1,6 @@ -import { - EmitData, - Subscriber, - SubscriberContext, - SubscriberDescriptor, -} from "./common" +import { EmitData, Subscriber, SubscriberContext } from "./common" export interface IEventBusModuleService { - retrieveSubscribers( - event: string | symbol - ): SubscriberDescriptor[] | undefined - emit( eventName: string, data: T, diff --git a/packages/utils/src/event-bus/index.ts b/packages/utils/src/event-bus/index.ts index 7423cbc96f..602d7ddfff 100644 --- a/packages/utils/src/event-bus/index.ts +++ b/packages/utils/src/event-bus/index.ts @@ -50,10 +50,6 @@ export abstract class AbstractEventBusModuleService ]) } - public retrieveSubscribers(event: string | symbol) { - return this.eventToSubscribersMap_.get(event) - } - public subscribe( eventName: string | symbol, subscriber: EventBusTypes.Subscriber, @@ -88,7 +84,7 @@ export abstract class AbstractEventBusModuleService throw new Error("Subscriber must be a function") } - const existingSubscribers = this.retrieveSubscribers(eventName) + const existingSubscribers = this.eventToSubscribersMap_.get(eventName) if (existingSubscribers?.length) { const subIndex = existingSubscribers?.findIndex(