diff --git a/packages/core/types/src/event-bus/common.ts b/packages/core/types/src/event-bus/common.ts index d38606db2f..83046ebadd 100644 --- a/packages/core/types/src/event-bus/common.ts +++ b/packages/core/types/src/event-bus/common.ts @@ -1,8 +1,7 @@ import { Context } from "../shared-context" export type Subscriber = ( - data: TData, - eventName: string + data: MessageBody ) => Promise export type SubscriberContext = { @@ -14,11 +13,6 @@ export type SubscriberDescriptor = { subscriber: Subscriber } -export type EventHandler = ( - data: TData, - eventName: string -) => Promise - export type EventMetadata = Record & { eventGroupId?: string } diff --git a/packages/core/utils/src/modules-sdk/medusa-service.ts b/packages/core/utils/src/modules-sdk/medusa-service.ts index 448f1529da..3d863169bd 100644 --- a/packages/core/utils/src/modules-sdk/medusa-service.ts +++ b/packages/core/utils/src/modules-sdk/medusa-service.ts @@ -265,6 +265,7 @@ export function MedusaService< data: isString(primaryKeyValue) ? { id: primaryKeyValue } : primaryKeyValue, + metadata: { source: "", action: "", object: "" }, })) ) } diff --git a/packages/medusa/src/loaders/helpers/subscribers/index.ts b/packages/medusa/src/loaders/helpers/subscribers/index.ts index 59130263d1..2dbad4e28f 100644 --- a/packages/medusa/src/loaders/helpers/subscribers/index.ts +++ b/packages/medusa/src/loaders/helpers/subscribers/index.ts @@ -1,9 +1,10 @@ import { IEventBusModuleService, MedusaContainer, + MessageBody, Subscriber, } from "@medusajs/types" -import { ModuleRegistrationName, kebabCase } from "@medusajs/utils" +import { kebabCase, ModuleRegistrationName } from "@medusajs/utils" import { readdir } from "fs/promises" import { extname, join, sep } from "path" @@ -131,7 +132,7 @@ export class SubscriberLoader { const fullPath = join(dirPath, entry.name) if (entry.isDirectory()) { - return this.createMap(fullPath) + return await this.createMap(fullPath) } return await this.createDescriptor(fullPath, entry.name) @@ -172,7 +173,7 @@ export class SubscriberLoader { return kebabCase(idFromFile) } - private createSubscriber({ + private createSubscriber({ fileName, config, handler, @@ -192,8 +193,8 @@ export class SubscriberLoader { const subscriberId = this.inferIdentifier(fileName, config, handler) for (const e of events) { - const subscriber = async (data: T) => { - return handler({ + const subscriber = async (data: MessageBody) => { + return await handler({ eventName: e, data, container: this.container_, diff --git a/packages/medusa/src/types/subscribers.ts b/packages/medusa/src/types/subscribers.ts index e49f498a00..76da86ce6a 100644 --- a/packages/medusa/src/types/subscribers.ts +++ b/packages/medusa/src/types/subscribers.ts @@ -10,7 +10,7 @@ export type SubscriberConfig = { } export type SubscriberArgs = { - data: T | MessageBody + data: MessageBody eventName: string container: MedusaContainer pluginOptions: Record diff --git a/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts b/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts index bcba7559b9..738614479e 100644 --- a/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts +++ b/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts @@ -37,6 +37,7 @@ describe("LocalEventBusService", () => { expect(eventEmitter.emit).toHaveBeenCalledTimes(1) expect(eventEmitter.emit).toHaveBeenCalledWith("eventName", { data: { hi: "1234" }, + eventName: "eventName", }) }) @@ -51,9 +52,11 @@ describe("LocalEventBusService", () => { expect(eventEmitter.emit).toHaveBeenCalledTimes(2) expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", { data: { hi: "1234" }, + eventName: "event-1", }) expect(eventEmitter.emit).toHaveBeenCalledWith("event-2", { data: { hi: "5678" }, + eventName: "event-2", }) }) @@ -144,6 +147,7 @@ describe("LocalEventBusService", () => { expect(eventEmitter.emit).toHaveBeenCalledTimes(1) expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", { data: { test: "1" }, + eventName: "event-1", }) expect((eventBus as any).groupedEventsMap_.get("group-1")).toHaveLength( @@ -155,7 +159,7 @@ describe("LocalEventBusService", () => { jest.clearAllMocks() eventEmitter.emit = jest.fn((data) => data) - eventBus.releaseGroupedEvents("group-1") + await eventBus.releaseGroupedEvents("group-1") expect( (eventBus as any).groupedEventsMap_.get("group-1") @@ -167,9 +171,13 @@ describe("LocalEventBusService", () => { expect(eventEmitter.emit).toHaveBeenCalledTimes(2) expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", { data: { test: "1" }, + eventName: "event-1", + metadata: { eventGroupId: "group-1" }, }) expect(eventEmitter.emit).toHaveBeenCalledWith("event-2", { data: { test: "2" }, + eventName: "event-2", + metadata: { eventGroupId: "group-1" }, }) }) diff --git a/packages/modules/event-bus-local/src/services/event-bus-local.ts b/packages/modules/event-bus-local/src/services/event-bus-local.ts index 7230ecd89a..e8826e0724 100644 --- a/packages/modules/event-bus-local/src/services/event-bus-local.ts +++ b/packages/modules/event-bus-local/src/services/event-bus-local.ts @@ -14,7 +14,7 @@ type InjectedDependencies = { logger: Logger } -type StagingQueueType = Map +type StagingQueueType = Map const eventEmitter = new EventEmitter() eventEmitter.setMaxListeners(Infinity) @@ -72,16 +72,15 @@ export default class LocalEventBusService extends AbstractEventBusModuleService if (eventGroupId) { await this.groupEvent(eventGroupId, eventData) } else { - this.eventEmitter_.emit(eventData.eventName, { - data: eventData.data, - }) + const { options, ...eventBody } = eventData + this.eventEmitter_.emit(eventData.eventName, eventBody) } } // Groups an event to a queue to be emitted upon explicit release private async groupEvent( eventGroupId: string, - eventData: MessageBody + eventData: Message ) { const groupedEvents = this.groupedEventsMap_.get(eventGroupId) || [] @@ -94,9 +93,9 @@ export default class LocalEventBusService extends AbstractEventBusModuleService const groupedEvents = this.groupedEventsMap_.get(eventGroupId) || [] for (const event of groupedEvents) { - const { eventName, data } = event + const { options, ...eventBody } = event - this.eventEmitter_.emit(eventName, { data }) + this.eventEmitter_.emit(event.eventName, eventBody) } this.clearGroupedEvents(eventGroupId) @@ -109,10 +108,9 @@ export default class LocalEventBusService extends AbstractEventBusModuleService subscribe(event: string | symbol, subscriber: Subscriber): this { const randId = ulid() this.storeSubscribers({ event, subscriberId: randId, subscriber }) - this.eventEmitter_.on(event, async (...args) => { + this.eventEmitter_.on(event, async (data: MessageBody) => { try { - // @ts-ignore - await subscriber(...args) + await subscriber(data) } catch (e) { this.logger_?.error( `An error occurred while processing ${event.toString()}: ${e}` diff --git a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts index 976cf56439..19cc78fc21 100644 --- a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts +++ b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts @@ -112,8 +112,8 @@ export default class RedisEventBusService extends AbstractEventBusModuleService /** * Emit a single or number of events - * @param {Message} data - the data to send to the subscriber. - * @param {BulkJobOptions} data - the options to add to bull mq + * @param eventsData + * @param options */ async emit( eventsData: Message | Message[], @@ -136,10 +136,10 @@ export default class RedisEventBusService extends AbstractEventBusModuleService for (const event of eventsToGroup) { const groupId = event.metadata?.eventGroupId! - const array = groupEventsMap.get(groupId) ?? [] + const groupEvents = groupEventsMap.get(groupId) ?? [] - array.push(event) - groupEventsMap.set(groupId, array) + groupEvents.push(event) + groupEventsMap.set(groupId, groupEvents) } const promises: Promise[] = [] @@ -258,7 +258,7 @@ export default class RedisEventBusService extends AbstractEventBusModuleService const subscribersResult = await Promise.all( subscribersInCurrentAttempt.map(async ({ id, subscriber }) => { - return await subscriber(data, eventName) + return await subscriber(data) .then(async (data) => { // For every subscriber that completes successfully, add their id to the list of completed subscribers completedSubscribersInCurrentAttempt.push(id) diff --git a/packages/modules/product/integration-tests/__tests__/product-module-service/product-categories.spec.ts b/packages/modules/product/integration-tests/__tests__/product-module-service/product-categories.spec.ts index 6c42faee1a..f561c3d03f 100644 --- a/packages/modules/product/integration-tests/__tests__/product-module-service/product-categories.spec.ts +++ b/packages/modules/product/integration-tests/__tests__/product-module-service/product-categories.spec.ts @@ -682,6 +682,11 @@ moduleIntegrationTestRunner({ expect.objectContaining({ data: { id: productCategoryOne.id }, eventName: "product-category.deleted", + metadata: { + action: "", + object: "", + source: "", + }, }), ]) }) diff --git a/packages/modules/product/integration-tests/__tests__/product-module-service/product-collections.spec.ts b/packages/modules/product/integration-tests/__tests__/product-module-service/product-collections.spec.ts index 35a7aaa8de..ae7e30d2b1 100644 --- a/packages/modules/product/integration-tests/__tests__/product-module-service/product-collections.spec.ts +++ b/packages/modules/product/integration-tests/__tests__/product-module-service/product-collections.spec.ts @@ -282,6 +282,11 @@ moduleIntegrationTestRunner({ { eventName: "product-collection.deleted", data: { id: collectionId }, + metadata: { + action: "", + object: "", + source: "", + }, }, ]) })