Feat(): distributed caching (#13435)
RESOLVES CORE-1153 **What** - This pr mainly lay the foundation the caching layer. It comes with a modules (built in memory cache) and a redis provider. - Apply caching to few touch point to test Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
5b135a41fe
commit
b9d6f73320
@@ -58,8 +58,9 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
|
||||
const eventListenersCount = this.eventEmitter_.listenerCount(
|
||||
eventData.name
|
||||
)
|
||||
const startSubscribersCount = this.eventEmitter_.listenerCount("*")
|
||||
|
||||
if (eventListenersCount === 0) {
|
||||
if (eventListenersCount === 0 && startSubscribersCount === 0) {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -84,6 +85,7 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
|
||||
private async groupOrEmitEvent<T = unknown>(eventData: Message<T>) {
|
||||
const { options, ...eventBody } = eventData
|
||||
const eventGroupId = eventBody.metadata?.eventGroupId
|
||||
const hasStarSubscriber = this.eventEmitter_.listenerCount("*") > 0
|
||||
|
||||
if (eventGroupId) {
|
||||
await this.groupEvent(eventGroupId, eventData)
|
||||
@@ -91,9 +93,15 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
|
||||
const options_ = eventData.options as { delay: number }
|
||||
const delay = (ms?: number) => (ms ? setTimeout(ms) : Promise.resolve())
|
||||
|
||||
delay(options_?.delay).then(() =>
|
||||
delay(options_?.delay).then(async () => {
|
||||
// Call interceptors before emitting
|
||||
void this.callInterceptors(eventData, { isGrouped: false })
|
||||
|
||||
this.eventEmitter_.emit(eventData.name, eventBody)
|
||||
)
|
||||
if (hasStarSubscriber) {
|
||||
this.eventEmitter_.emit("*", eventBody)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,6 +120,7 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
|
||||
async releaseGroupedEvents(eventGroupId: string) {
|
||||
let groupedEvents = this.groupedEventsMap_.get(eventGroupId) || []
|
||||
groupedEvents = JSON.parse(JSON.stringify(groupedEvents))
|
||||
const hasStarSubscriber = this.eventEmitter_.listenerCount("*") > 0
|
||||
|
||||
for (const event of groupedEvents) {
|
||||
const { options, ...eventBody } = event
|
||||
@@ -119,9 +128,15 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
|
||||
const options_ = options as { delay: number }
|
||||
const delay = (ms?: number) => (ms ? setTimeout(ms) : Promise.resolve())
|
||||
|
||||
delay(options_?.delay).then(() =>
|
||||
delay(options_?.delay).then(async () => {
|
||||
// Call interceptors before emitting grouped events
|
||||
void this.callInterceptors(event, { isGrouped: true, eventGroupId })
|
||||
|
||||
this.eventEmitter_.emit(event.name, eventBody)
|
||||
)
|
||||
if (hasStarSubscriber) {
|
||||
this.eventEmitter_.emit("*", eventBody)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
await this.clearGroupedEvents(eventGroupId)
|
||||
|
||||
Reference in New Issue
Block a user