fix(medusa, types, utils, event-bus-local): Revert retrieveSubscribers (#4002)

* fix(medusa, event-bus-redis, event-bus-local): Revert retrieveSubscribers as the wildcard prevent us from filtering

* Create calm-eggs-collect.md

---------

Co-authored-by: Oliver Windall Juhl <59018053+olivermrbl@users.noreply.github.com>
This commit is contained in:
Adrien de Peretti
2023-05-03 18:55:45 +02:00
committed by GitHub
parent bcaaa0288b
commit 0e488e71b1
5 changed files with 28 additions and 42 deletions

View File

@@ -0,0 +1,8 @@
---
"@medusajs/event-bus-local": patch
"@medusajs/medusa": patch
"@medusajs/types": patch
"@medusajs/utils": patch
---
fix(medusa, event-bus-redis, event-bus-local): Revert retrieveSubscribers as the wildcard prevent us from filtering

View File

@@ -86,7 +86,7 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
subscriber: Subscriber,
context?: EventBusTypes.SubscriberContext
): this {
const existingSubscribers = this.retrieveSubscribers(event)
const existingSubscribers = this.eventToSubscribersMap_.get(event)
if (existingSubscribers?.length) {
const subIndex = existingSubscribers?.findIndex(

View File

@@ -137,7 +137,7 @@ export default class EventBusService
): Promise<TResult | void> {
const manager = this.activeManager_
const isBulkEmit = !isString(eventNameOrData)
let events: EventBusTypes.EmitData[] = isBulkEmit
const events: EventBusTypes.EmitData[] = isBulkEmit
? eventNameOrData.map((event) => ({
eventName: event.eventName,
data: event.data,
@@ -151,32 +151,23 @@ export default class EventBusService
},
]
events = events.filter(
(event) =>
this.eventBusModuleService_?.retrieveSubscribers(event.eventName)
?.length ?? false
)
/**
* We store events in the database when in an ongoing transaction.
*
* If we are in a long-running transaction, the ACID properties of a
* transaction ensure, that events are kept invisible to the enqueuer
* until the transaction has committed.
*
* This patterns also gives us at-least-once delivery of events, as events
* are only removed from the database, if they are successfully delivered.
*
* In case of a failing transaction, jobs stored in the database are removed
* as part of the rollback.
*/
let stagedJobs: StagedJob[] = []
if (events.length) {
/**
* We store events in the database when in an ongoing transaction.
*
* If we are in a long-running transaction, the ACID properties of a
* transaction ensure, that events are kept invisible to the enqueuer
* until the transaction has committed.
*
* This patterns also gives us at-least-once delivery of events, as events
* are only removed from the database, if they are successfully delivered.
*
* In case of a failing transaction, jobs stored in the database are removed
* as part of the rollback.
*/
stagedJobs = await this.stagedJobService_
.withTransaction(manager)
.create(events)
}
const stagedJobs = await this.stagedJobService_
.withTransaction(manager)
.create(events)
return (!isBulkEmit ? stagedJobs[0] : stagedJobs) as unknown as TResult
}

View File

@@ -1,15 +1,6 @@
import {
EmitData,
Subscriber,
SubscriberContext,
SubscriberDescriptor,
} from "./common"
import { EmitData, Subscriber, SubscriberContext } from "./common"
export interface IEventBusModuleService {
retrieveSubscribers(
event: string | symbol
): SubscriberDescriptor[] | undefined
emit<T>(
eventName: string,
data: T,

View File

@@ -50,10 +50,6 @@ export abstract class AbstractEventBusModuleService
])
}
public retrieveSubscribers(event: string | symbol) {
return this.eventToSubscribersMap_.get(event)
}
public subscribe(
eventName: string | symbol,
subscriber: EventBusTypes.Subscriber,
@@ -88,7 +84,7 @@ export abstract class AbstractEventBusModuleService
throw new Error("Subscriber must be a function")
}
const existingSubscribers = this.retrieveSubscribers(eventName)
const existingSubscribers = this.eventToSubscribersMap_.get(eventName)
if (existingSubscribers?.length) {
const subIndex = existingSubscribers?.findIndex(