chore: Ensure the events are emitted with the same shape all accross (#8063)

* chore: Ensure the events are emitted with the same shape all accross

* fixes:

* rm unsues type

* types

* fix tests
This commit is contained in:
Adrien de Peretti
2024-07-10 19:34:28 +02:00
committed by GitHub
parent 95f29358d1
commit e778870c68
9 changed files with 42 additions and 30 deletions

View File

@@ -1,8 +1,7 @@
import { Context } from "../shared-context"
export type Subscriber<TData = unknown> = (
data: TData,
eventName: string
data: MessageBody<TData>
) => Promise<void>
export type SubscriberContext = {
@@ -14,11 +13,6 @@ export type SubscriberDescriptor = {
subscriber: Subscriber
}
export type EventHandler<TData = unknown> = (
data: TData,
eventName: string
) => Promise<void>
export type EventMetadata = Record<string, unknown> & {
eventGroupId?: string
}

View File

@@ -265,6 +265,7 @@ export function MedusaService<
data: isString(primaryKeyValue)
? { id: primaryKeyValue }
: primaryKeyValue,
metadata: { source: "", action: "", object: "" },
}))
)
}

View File

@@ -1,9 +1,10 @@
import {
IEventBusModuleService,
MedusaContainer,
MessageBody,
Subscriber,
} from "@medusajs/types"
import { ModuleRegistrationName, kebabCase } from "@medusajs/utils"
import { kebabCase, ModuleRegistrationName } from "@medusajs/utils"
import { readdir } from "fs/promises"
import { extname, join, sep } from "path"
@@ -131,7 +132,7 @@ export class SubscriberLoader {
const fullPath = join(dirPath, entry.name)
if (entry.isDirectory()) {
return this.createMap(fullPath)
return await this.createMap(fullPath)
}
return await this.createDescriptor(fullPath, entry.name)
@@ -172,7 +173,7 @@ export class SubscriberLoader {
return kebabCase(idFromFile)
}
private createSubscriber<T>({
private createSubscriber<T = unknown>({
fileName,
config,
handler,
@@ -192,8 +193,8 @@ export class SubscriberLoader {
const subscriberId = this.inferIdentifier(fileName, config, handler)
for (const e of events) {
const subscriber = async (data: T) => {
return handler({
const subscriber = async (data: MessageBody<T>) => {
return await handler({
eventName: e,
data,
container: this.container_,

View File

@@ -10,7 +10,7 @@ export type SubscriberConfig = {
}
export type SubscriberArgs<T = unknown> = {
data: T | MessageBody<T>
data: MessageBody<T>
eventName: string
container: MedusaContainer
pluginOptions: Record<string, unknown>

View File

@@ -37,6 +37,7 @@ describe("LocalEventBusService", () => {
expect(eventEmitter.emit).toHaveBeenCalledTimes(1)
expect(eventEmitter.emit).toHaveBeenCalledWith("eventName", {
data: { hi: "1234" },
eventName: "eventName",
})
})
@@ -51,9 +52,11 @@ describe("LocalEventBusService", () => {
expect(eventEmitter.emit).toHaveBeenCalledTimes(2)
expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", {
data: { hi: "1234" },
eventName: "event-1",
})
expect(eventEmitter.emit).toHaveBeenCalledWith("event-2", {
data: { hi: "5678" },
eventName: "event-2",
})
})
@@ -144,6 +147,7 @@ describe("LocalEventBusService", () => {
expect(eventEmitter.emit).toHaveBeenCalledTimes(1)
expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", {
data: { test: "1" },
eventName: "event-1",
})
expect((eventBus as any).groupedEventsMap_.get("group-1")).toHaveLength(
@@ -155,7 +159,7 @@ describe("LocalEventBusService", () => {
jest.clearAllMocks()
eventEmitter.emit = jest.fn((data) => data)
eventBus.releaseGroupedEvents("group-1")
await eventBus.releaseGroupedEvents("group-1")
expect(
(eventBus as any).groupedEventsMap_.get("group-1")
@@ -167,9 +171,13 @@ describe("LocalEventBusService", () => {
expect(eventEmitter.emit).toHaveBeenCalledTimes(2)
expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", {
data: { test: "1" },
eventName: "event-1",
metadata: { eventGroupId: "group-1" },
})
expect(eventEmitter.emit).toHaveBeenCalledWith("event-2", {
data: { test: "2" },
eventName: "event-2",
metadata: { eventGroupId: "group-1" },
})
})

View File

@@ -14,7 +14,7 @@ type InjectedDependencies = {
logger: Logger
}
type StagingQueueType = Map<string, { eventName: string; data?: unknown }[]>
type StagingQueueType = Map<string, Message[]>
const eventEmitter = new EventEmitter()
eventEmitter.setMaxListeners(Infinity)
@@ -72,16 +72,15 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
if (eventGroupId) {
await this.groupEvent(eventGroupId, eventData)
} else {
this.eventEmitter_.emit(eventData.eventName, {
data: eventData.data,
})
const { options, ...eventBody } = eventData
this.eventEmitter_.emit(eventData.eventName, eventBody)
}
}
// Groups an event to a queue to be emitted upon explicit release
private async groupEvent<T = unknown>(
eventGroupId: string,
eventData: MessageBody<T>
eventData: Message<T>
) {
const groupedEvents = this.groupedEventsMap_.get(eventGroupId) || []
@@ -94,9 +93,9 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
const groupedEvents = this.groupedEventsMap_.get(eventGroupId) || []
for (const event of groupedEvents) {
const { eventName, data } = event
const { options, ...eventBody } = event
this.eventEmitter_.emit(eventName, { data })
this.eventEmitter_.emit(event.eventName, eventBody)
}
this.clearGroupedEvents(eventGroupId)
@@ -109,10 +108,9 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
subscribe(event: string | symbol, subscriber: Subscriber): this {
const randId = ulid()
this.storeSubscribers({ event, subscriberId: randId, subscriber })
this.eventEmitter_.on(event, async (...args) => {
this.eventEmitter_.on(event, async (data: MessageBody) => {
try {
// @ts-ignore
await subscriber(...args)
await subscriber(data)
} catch (e) {
this.logger_?.error(
`An error occurred while processing ${event.toString()}: ${e}`

View File

@@ -112,8 +112,8 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
/**
* Emit a single or number of events
* @param {Message} data - the data to send to the subscriber.
* @param {BulkJobOptions} data - the options to add to bull mq
* @param eventsData
* @param options
*/
async emit<T = unknown>(
eventsData: Message<T> | Message<T>[],
@@ -136,10 +136,10 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
for (const event of eventsToGroup) {
const groupId = event.metadata?.eventGroupId!
const array = groupEventsMap.get(groupId) ?? []
const groupEvents = groupEventsMap.get(groupId) ?? []
array.push(event)
groupEventsMap.set(groupId, array)
groupEvents.push(event)
groupEventsMap.set(groupId, groupEvents)
}
const promises: Promise<unknown>[] = []
@@ -258,7 +258,7 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
const subscribersResult = await Promise.all(
subscribersInCurrentAttempt.map(async ({ id, subscriber }) => {
return await subscriber(data, eventName)
return await subscriber(data)
.then(async (data) => {
// For every subscriber that completes successfully, add their id to the list of completed subscribers
completedSubscribersInCurrentAttempt.push(id)

View File

@@ -682,6 +682,11 @@ moduleIntegrationTestRunner<IProductModuleService>({
expect.objectContaining({
data: { id: productCategoryOne.id },
eventName: "product-category.deleted",
metadata: {
action: "",
object: "",
source: "",
},
}),
])
})

View File

@@ -282,6 +282,11 @@ moduleIntegrationTestRunner<IProductModuleService>({
{
eventName: "product-collection.deleted",
data: { id: collectionId },
metadata: {
action: "",
object: "",
source: "",
},
},
])
})