diff --git a/integration-tests/modules/__tests__/notification/admin/notification.spec.ts b/integration-tests/modules/__tests__/notification/admin/notification.spec.ts index 75940d269e..c36cfff5d0 100644 --- a/integration-tests/modules/__tests__/notification/admin/notification.spec.ts +++ b/integration-tests/modules/__tests__/notification/admin/notification.spec.ts @@ -165,7 +165,8 @@ medusaIntegrationTestRunner({ ) const logSpy = jest.spyOn(logger, "info") - await eventBus.emit("order.created", { + await eventBus.emit({ + eventName: "order.created", data: { order: { id: "1234", diff --git a/packages/core/core-flows/src/common/steps/emit-event.ts b/packages/core/core-flows/src/common/steps/emit-event.ts index 0366c7a16a..40c70d7f52 100644 --- a/packages/core/core-flows/src/common/steps/emit-event.ts +++ b/packages/core/core-flows/src/common/steps/emit-event.ts @@ -35,7 +35,7 @@ export const emitEventStep = createStep( context, }) - await eventBus.emit([message]) + await eventBus.emit(message) }, async (data: void) => {} ) diff --git a/packages/core/medusa-test-utils/src/mock-event-bus-service.ts b/packages/core/medusa-test-utils/src/mock-event-bus-service.ts index 4cc3f1c604..40dd380c59 100644 --- a/packages/core/medusa-test-utils/src/mock-event-bus-service.ts +++ b/packages/core/medusa-test-utils/src/mock-event-bus-service.ts @@ -1,5 +1,4 @@ import { - EmitData, EventBusTypes, IEventBusModuleService, Message, @@ -7,24 +6,9 @@ import { } from "@medusajs/types" export default class EventBusService implements IEventBusModuleService { - emit( - eventName: string, - data: T, - options?: Record - ): Promise - emit(data: EmitData[]): Promise - emit(data: Message[]): Promise - - async emit< - T, - TInput extends - | string - | EventBusTypes.EmitData[] - | EventBusTypes.Message[] = string - >( - eventOrData: TInput, - data?: T, - options: Record = {} + async emit( + data: Message | Message[], + options: Record ): Promise {} subscribe(event: string | symbol, subscriber: Subscriber): this { diff --git a/packages/core/medusa-test-utils/src/module-test-runner.ts b/packages/core/medusa-test-utils/src/module-test-runner.ts index 0712250e8d..5649c48137 100644 --- a/packages/core/medusa-test-utils/src/module-test-runner.ts +++ b/packages/core/medusa-test-utils/src/module-test-runner.ts @@ -1,8 +1,8 @@ -import { initModules, InitModulesOptions } from "./init-modules" import { getDatabaseURL, getMikroOrmWrapper, TestDatabase } from "./database" +import { initModules, InitModulesOptions } from "./init-modules" -import { MockEventBusService } from "." import { ContainerRegistrationKeys, ModulesSdkUtils } from "@medusajs/utils" +import { MockEventBusService } from "." export interface SuiteOptions { MikroOrmWrapper: TestDatabase @@ -75,7 +75,7 @@ export function moduleIntegrationTestRunner({ const moduleOptions_: InitModulesOptions = { injectedDependencies: { [ContainerRegistrationKeys.PG_CONNECTION]: connection, - ["eventBusModuleService"]: new MockEventBusService(), + eventBusModuleService: new MockEventBusService(), [ContainerRegistrationKeys.LOGGER]: console, ...injectedDependencies, }, diff --git a/packages/core/types/src/event-bus/common.ts b/packages/core/types/src/event-bus/common.ts index 049acf7cf1..d38606db2f 100644 --- a/packages/core/types/src/event-bus/common.ts +++ b/packages/core/types/src/event-bus/common.ts @@ -1,7 +1,7 @@ import { Context } from "../shared-context" -export type Subscriber = ( - data: T, +export type Subscriber = ( + data: TData, eventName: string ) => Promise @@ -14,36 +14,28 @@ export type SubscriberDescriptor = { subscriber: Subscriber } -export type EventHandler = ( - data: T, +export type EventHandler = ( + data: TData, eventName: string ) => Promise -export type EmitData = { +export type EventMetadata = Record & { + eventGroupId?: string +} + +export type MessageBody = { eventName: string - data: T + metadata?: EventMetadata + data: TData +} + +export type Message = MessageBody & { options?: Record } -export type MessageBody = { - metadata: { - source: string - action: string - object: string - eventGroupId?: string - } - data: T -} - -export type Message = { +export type RawMessageFormat = { eventName: string - body: MessageBody - options?: Record -} - -export type RawMessageFormat = { - eventName: string - data: T + data: TData source: string object: string action?: string diff --git a/packages/core/types/src/event-bus/event-bus-module.ts b/packages/core/types/src/event-bus/event-bus-module.ts index 73b497263a..199733ef35 100644 --- a/packages/core/types/src/event-bus/event-bus-module.ts +++ b/packages/core/types/src/event-bus/event-bus-module.ts @@ -1,13 +1,10 @@ -import { EmitData, Message, Subscriber, SubscriberContext } from "./common" +import { Message, Subscriber, SubscriberContext } from "./common" export interface IEventBusModuleService { emit( - eventName: string, - data: T, + data: Message | Message[], options?: Record ): Promise - emit(data: EmitData[]): Promise - emit(data: Message[]): Promise subscribe( eventName: string | symbol, diff --git a/packages/core/types/src/event-bus/event-bus.ts b/packages/core/types/src/event-bus/event-bus.ts index 33be2c43a2..20c1d89ce1 100644 --- a/packages/core/types/src/event-bus/event-bus.ts +++ b/packages/core/types/src/event-bus/event-bus.ts @@ -1,5 +1,5 @@ import { ITransactionBaseService } from "../transaction-base" -import { EmitData, Message, Subscriber, SubscriberContext } from "./common" +import { Message, Subscriber, SubscriberContext } from "./common" export interface IEventBusService extends ITransactionBaseService { subscribe( @@ -14,7 +14,5 @@ export interface IEventBusService extends ITransactionBaseService { context?: SubscriberContext ): this - emit(event: string, data: T, options?: unknown): Promise - emit(data: EmitData[]): Promise - emit(data: Message[]): Promise + emit(data: Message | Message[]): Promise } diff --git a/packages/core/utils/src/event-bus/__tests__/message-aggregator.spec.ts b/packages/core/utils/src/event-bus/__tests__/message-aggregator.spec.ts index 90068f0103..59c137fca1 100644 --- a/packages/core/utils/src/event-bus/__tests__/message-aggregator.spec.ts +++ b/packages/core/utils/src/event-bus/__tests__/message-aggregator.spec.ts @@ -9,70 +9,60 @@ describe("MessageAggregator", function () { const aggregator = new MessageAggregator() aggregator.save({ eventName: "ProductVariant.created", - body: { - metadata: { - source: "ProductService", - action: "created", - object: "ProductVariant", - eventGroupId: "1", - }, - data: { id: 999 }, + metadata: { + source: "ProductService", + action: "created", + object: "ProductVariant", + eventGroupId: "1", }, + data: { id: 999 }, }) aggregator.save({ eventName: "Product.created", - body: { - metadata: { - source: "ProductService", - action: "created", - object: "Product", - eventGroupId: "1", - }, - data: { id: 1 }, + metadata: { + source: "ProductService", + action: "created", + object: "Product", + eventGroupId: "1", }, + data: { id: 1 }, }) aggregator.save({ eventName: "ProductVariant.created", - body: { - metadata: { - source: "ProductService", - action: "created", - object: "ProductVariant", - eventGroupId: "1", - }, - data: { id: 222 }, + metadata: { + source: "ProductService", + action: "created", + object: "ProductVariant", + eventGroupId: "1", }, + data: { id: 222 }, }) aggregator.save({ eventName: "ProductType.detached", - body: { - metadata: { - source: "ProductService", - action: "detached", - object: "ProductType", - eventGroupId: "1", - }, - data: { id: 333 }, + metadata: { + source: "ProductService", + action: "detached", + object: "ProductType", + eventGroupId: "1", }, + data: { id: 333 }, }) aggregator.save({ eventName: "ProductVariant.updated", - body: { - metadata: { - source: "ProductService", - action: "updated", - object: "ProductVariant", - eventGroupId: "1", - }, - data: { id: 123 }, + metadata: { + source: "ProductService", + action: "updated", + object: "ProductVariant", + eventGroupId: "1", }, + data: { id: 123 }, }) const format = { - groupBy: ["eventName", "body.metadata.object", "body.metadata.action"], + groupBy: ["eventName", "metadata.object", "metadata.action"], sortBy: { - "body.metadata.object": ["ProductType", "ProductVariant", "Product"], - "body.data.id": "asc", + "metadata.object": ["ProductType", "ProductVariant", "Product"], + "data.id": "asc", }, } @@ -85,72 +75,62 @@ describe("MessageAggregator", function () { expect(allGroups[0]).toEqual([ { eventName: "ProductType.detached", - body: { - metadata: { - source: "ProductService", - action: "detached", - object: "ProductType", - eventGroupId: "1", - }, - data: { id: 333 }, + metadata: { + source: "ProductService", + action: "detached", + object: "ProductType", + eventGroupId: "1", }, + data: { id: 333 }, }, ]) expect(allGroups[1]).toEqual([ { eventName: "ProductVariant.updated", - body: { - metadata: { - source: "ProductService", - action: "updated", - object: "ProductVariant", - eventGroupId: "1", - }, - data: { id: 123 }, + metadata: { + source: "ProductService", + action: "updated", + object: "ProductVariant", + eventGroupId: "1", }, + data: { id: 123 }, }, ]) expect(allGroups[2]).toEqual([ { eventName: "ProductVariant.created", - body: { - metadata: { - source: "ProductService", - action: "created", - object: "ProductVariant", - eventGroupId: "1", - }, - data: { id: 222 }, + metadata: { + source: "ProductService", + action: "created", + object: "ProductVariant", + eventGroupId: "1", }, + data: { id: 222 }, }, { eventName: "ProductVariant.created", - body: { - metadata: { - source: "ProductService", - action: "created", - object: "ProductVariant", - eventGroupId: "1", - }, - data: { id: 999 }, + metadata: { + source: "ProductService", + action: "created", + object: "ProductVariant", + eventGroupId: "1", }, + data: { id: 999 }, }, ]) expect(allGroups[3]).toEqual([ { eventName: "Product.created", - body: { - metadata: { - source: "ProductService", - action: "created", - object: "Product", - eventGroupId: "1", - }, - data: { id: 1 }, + metadata: { + source: "ProductService", + action: "created", + object: "Product", + eventGroupId: "1", }, + data: { id: 1 }, }, ]) }) diff --git a/packages/core/utils/src/event-bus/build-event-messages.ts b/packages/core/utils/src/event-bus/build-event-messages.ts index 5eaa8ae862..66335550a9 100644 --- a/packages/core/utils/src/event-bus/build-event-messages.ts +++ b/packages/core/utils/src/event-bus/build-event-messages.ts @@ -45,10 +45,8 @@ export function composeMessage( return { eventName, - body: { - metadata, - data, - }, + metadata, + data, options, } } diff --git a/packages/core/utils/src/event-bus/index.ts b/packages/core/utils/src/event-bus/index.ts index c82217bb6e..d5548200ef 100644 --- a/packages/core/utils/src/event-bus/index.ts +++ b/packages/core/utils/src/event-bus/index.ts @@ -17,12 +17,9 @@ export abstract class AbstractEventBusModuleService } abstract emit( - eventName: string, - data: T, + data: EventBusTypes.Message | EventBusTypes.Message[], options: Record ): Promise - abstract emit(data: EventBusTypes.EmitData[]): Promise - abstract emit(data: EventBusTypes.Message[]): Promise /* Grouped events are useful when you have distributed transactions diff --git a/packages/core/utils/src/modules-sdk/abstract-module-service-factory.ts b/packages/core/utils/src/modules-sdk/abstract-module-service-factory.ts index 3273dbd085..08f1565bfd 100644 --- a/packages/core/utils/src/modules-sdk/abstract-module-service-factory.ts +++ b/packages/core/utils/src/modules-sdk/abstract-module-service-factory.ts @@ -501,6 +501,7 @@ export function abstractModuleServiceFactory< await this.eventBusModuleService_?.emit( softDeletedEntities.map(({ id }) => ({ eventName: `${kebabCase(model.name)}.deleted`, + metadata: { source: "", action: "", object: "" }, data: { id }, })) ) diff --git a/packages/medusa/src/api/hooks/payment/[provider]/route.ts b/packages/medusa/src/api/hooks/payment/[provider]/route.ts index a5a60d2914..ea3dbc034a 100644 --- a/packages/medusa/src/api/hooks/payment/[provider]/route.ts +++ b/packages/medusa/src/api/hooks/payment/[provider]/route.ts @@ -1,6 +1,6 @@ import { ModuleRegistrationName } from "@medusajs/modules-sdk" -import { PaymentWebhookEvents } from "@medusajs/utils" import { PaymentModuleOptions } from "@medusajs/types" +import { PaymentWebhookEvents } from "@medusajs/utils" import { MedusaRequest, MedusaResponse } from "../../../../types/routing" @@ -20,10 +20,16 @@ export const POST = async (req: MedusaRequest, res: MedusaResponse) => { const eventBus = req.scope.resolve(ModuleRegistrationName.EVENT_BUS) // we delay the processing of the event to avoid a conflict caused by a race condition - await eventBus.emit(PaymentWebhookEvents.WebhookReceived, event, { - delay: options.webhook_delay || 5000, - attempts: options.webhook_retries || 3, - }) + await eventBus.emit( + { + eventName: PaymentWebhookEvents.WebhookReceived, + data: event, + }, + { + delay: options.webhook_delay || 5000, + attempts: options.webhook_retries || 3, + } + ) } catch (err) { res.status(400).send(`Webhook Error: ${err.message}`) return diff --git a/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts b/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts index f604f4d241..bcba7559b9 100644 --- a/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts +++ b/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts @@ -14,7 +14,8 @@ const moduleDeps = { } describe("LocalEventBusService", () => { - let eventBus + let eventBus: LocalEventBusService + let eventEmitter describe("emit", () => { describe("Successfully emits events", () => { @@ -22,148 +23,184 @@ describe("LocalEventBusService", () => { jest.clearAllMocks() eventBus = new LocalEventBusService(moduleDeps as any) + eventEmitter = (eventBus as any).eventEmitter_ }) it("should emit an event", async () => { - eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data) + eventEmitter.emit = jest.fn((data) => data) - await eventBus.emit("eventName", { hi: "1234" }) + await eventBus.emit({ + eventName: "eventName", + data: { hi: "1234" }, + }) - expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(1) - expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("eventName", { - hi: "1234", + expect(eventEmitter.emit).toHaveBeenCalledTimes(1) + expect(eventEmitter.emit).toHaveBeenCalledWith("eventName", { + data: { hi: "1234" }, }) }) it("should emit multiple events", async () => { - eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data) + eventEmitter.emit = jest.fn((data) => data) await eventBus.emit([ { eventName: "event-1", data: { hi: "1234" } }, { eventName: "event-2", data: { hi: "5678" } }, ]) - expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(2) - expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-1", { - hi: "1234", + expect(eventEmitter.emit).toHaveBeenCalledTimes(2) + expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", { + data: { hi: "1234" }, }) - expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-2", { - hi: "5678", + expect(eventEmitter.emit).toHaveBeenCalledWith("event-2", { + data: { hi: "5678" }, }) }) it("should group an event if data consists of eventGroupId", async () => { - const groupEventFn = jest.spyOn(eventBus, "groupEvent") + let groupEventFn = jest.spyOn(eventBus, "groupEvent" as any) + eventEmitter.emit = jest.fn((data) => data) - eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data) - - await eventBus.emit("test-event", { - test: "1234", - eventGroupId: "test", + await eventBus.emit({ + eventName: "test-event", + data: { + test: "1234", + }, + metadata: { + eventGroupId: "test", + }, }) - expect(eventBus.eventEmitter_.emit).not.toHaveBeenCalled() + expect(eventEmitter.emit).not.toHaveBeenCalled() expect(groupEventFn).toHaveBeenCalledTimes(1) - expect(groupEventFn).toHaveBeenCalledWith("test", "test-event", { - test: "1234", + expect(groupEventFn).toHaveBeenCalledWith("test", { + data: { test: "1234" }, + metadata: { eventGroupId: "test" }, + eventName: "test-event", }) jest.clearAllMocks() - eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data) - eventBus.emit("test-event", { test: "1234", eventGroupId: "test" }) - eventBus.emit("test-event", { test: "test-1" }) + groupEventFn = jest.spyOn(eventBus, "groupEvent" as any) + eventEmitter.emit = jest.fn((data) => data) + + eventBus.emit([ + { + eventName: "test-event", + data: { test: "1234" }, + metadata: { eventGroupId: "test" }, + }, + { + eventName: "test-event", + data: { test: "test-1" }, + }, + ]) - expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(1) expect(groupEventFn).toHaveBeenCalledTimes(1) - expect(eventBus.groupedEventsMap_.get("test")).toEqual([ + expect((eventBus as any).groupedEventsMap_.get("test")).toEqual([ expect.objectContaining({ eventName: "test-event" }), expect.objectContaining({ eventName: "test-event" }), ]) - await eventBus.emit("test-event", { - test: "1234", - eventGroupId: "test-2", + await eventBus.emit({ + eventName: "test-event", + data: { test: "1234" }, + metadata: { eventGroupId: "test-2" }, }) - expect(eventBus.groupedEventsMap_.get("test-2")).toEqual([ + expect((eventBus as any).groupedEventsMap_.get("test-2")).toEqual([ expect.objectContaining({ eventName: "test-event" }), ]) }) it("should release events when requested with eventGroupId", async () => { - eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data) + eventEmitter.emit = jest.fn((data) => data) await eventBus.emit([ { eventName: "event-1", - data: { test: "1", eventGroupId: "group-1" }, + data: { test: "1" }, + metadata: { eventGroupId: "group-1" }, }, { eventName: "event-2", - data: { test: "2", eventGroupId: "group-1" }, + data: { test: "2" }, + metadata: { eventGroupId: "group-1" }, }, { eventName: "event-1", - data: { test: "1", eventGroupId: "group-2" }, + data: { test: "1" }, + metadata: { eventGroupId: "group-2" }, }, { eventName: "event-2", - data: { test: "2", eventGroupId: "group-2" }, + data: { test: "2" }, + metadata: { eventGroupId: "group-2" }, }, { eventName: "event-1", data: { test: "1" } }, ]) - expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(1) - expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-1", { - test: "1", + expect(eventEmitter.emit).toHaveBeenCalledTimes(1) + expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", { + data: { test: "1" }, }) - expect(eventBus.groupedEventsMap_.get("group-1")).toHaveLength(2) - expect(eventBus.groupedEventsMap_.get("group-2")).toHaveLength(2) + expect((eventBus as any).groupedEventsMap_.get("group-1")).toHaveLength( + 2 + ) + expect((eventBus as any).groupedEventsMap_.get("group-2")).toHaveLength( + 2 + ) jest.clearAllMocks() - eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data) + eventEmitter.emit = jest.fn((data) => data) eventBus.releaseGroupedEvents("group-1") - expect(eventBus.groupedEventsMap_.get("group-1")).not.toBeDefined() - expect(eventBus.groupedEventsMap_.get("group-2")).toHaveLength(2) + expect( + (eventBus as any).groupedEventsMap_.get("group-1") + ).not.toBeDefined() + expect((eventBus as any).groupedEventsMap_.get("group-2")).toHaveLength( + 2 + ) - expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(2) - expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-1", { - test: "1", + expect(eventEmitter.emit).toHaveBeenCalledTimes(2) + expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", { + data: { test: "1" }, }) - expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-2", { - test: "2", + expect(eventEmitter.emit).toHaveBeenCalledWith("event-2", { + data: { test: "2" }, }) }) it("should clear events from grouped events when requested with eventGroupId", async () => { - eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data) + eventEmitter.emit = jest.fn((data) => data) + const getMap = () => (eventBus as any).groupedEventsMap_ await eventBus.emit([ { eventName: "event-1", - data: { test: "1", eventGroupId: "group-1" }, + data: { test: "1" }, + metadata: { eventGroupId: "group-1" }, }, { eventName: "event-1", - data: { test: "1", eventGroupId: "group-2" }, + data: { test: "1" }, + metadata: { eventGroupId: "group-2" }, }, ]) - expect(eventBus.groupedEventsMap_.get("group-1")).toHaveLength(1) - expect(eventBus.groupedEventsMap_.get("group-2")).toHaveLength(1) + expect(getMap().get("group-1")).toHaveLength(1) + expect(getMap().get("group-2")).toHaveLength(1) eventBus.clearGroupedEvents("group-1") - expect(eventBus.groupedEventsMap_.get("group-1")).not.toBeDefined() - expect(eventBus.groupedEventsMap_.get("group-2")).toHaveLength(1) + expect(getMap().get("group-1")).not.toBeDefined() + expect(getMap().get("group-2")).toHaveLength(1) eventBus.clearGroupedEvents("group-2") - expect(eventBus.groupedEventsMap_.get("group-2")).not.toBeDefined() + expect(getMap().get("group-2")).not.toBeDefined() }) }) }) diff --git a/packages/modules/event-bus-local/src/services/event-bus-local.ts b/packages/modules/event-bus-local/src/services/event-bus-local.ts index ba11b55351..7230ecd89a 100644 --- a/packages/modules/event-bus-local/src/services/event-bus-local.ts +++ b/packages/modules/event-bus-local/src/services/event-bus-local.ts @@ -1,9 +1,9 @@ import { MedusaContainer } from "@medusajs/modules-sdk" import { - EmitData, EventBusTypes, Logger, Message, + MessageBody, Subscriber, } from "@medusajs/types" import { AbstractEventBusModuleService } from "@medusajs/utils" @@ -35,47 +35,28 @@ export default class LocalEventBusService extends AbstractEventBusModuleService this.groupedEventsMap_ = new Map() } - async emit( - eventName: string, - data: T, - options: Record - ): Promise - - /** - * Emit a number of events - * @param {EmitData} data - the data to send to the subscriber. - */ - async emit(data: EmitData[]): Promise - - async emit(data: Message[]): Promise - - async emit[] | Message[] = string>( - eventOrData: TInput, - data?: T, + async emit( + eventsData: Message | Message[], options: Record = {} ): Promise { - const isBulkEmit = Array.isArray(eventOrData) + const normalizedEventsData = Array.isArray(eventsData) + ? eventsData + : [eventsData] - const events: EmitData[] | Message[] = isBulkEmit - ? eventOrData - : [{ eventName: eventOrData, data }] - - for (const event of events) { + for (const eventData of normalizedEventsData) { const eventListenersCount = this.eventEmitter_.listenerCount( - event.eventName + eventData.eventName ) this.logger_?.info( - `Processing ${event.eventName} which has ${eventListenersCount} subscribers` + `Processing ${eventData.eventName} which has ${eventListenersCount} subscribers` ) if (eventListenersCount === 0) { continue } - const data = (event as EmitData).data ?? (event as Message).body - - await this.groupOrEmitEvent(event.eventName, data) + await this.groupOrEmitEvent(eventData) } } @@ -84,28 +65,27 @@ export default class LocalEventBusService extends AbstractEventBusModuleService // explicitly requested. // This is useful in the event of a distributed transaction where you'd want to emit // events only once the transaction ends. - private async groupOrEmitEvent( - eventName: string, - data: unknown & { eventGroupId?: string } - ) { - const { eventGroupId, ...eventData } = data + private async groupOrEmitEvent(eventData: Message) { + const { options, ...eventBody } = eventData + const eventGroupId = eventBody.metadata?.eventGroupId if (eventGroupId) { - await this.groupEvent(eventGroupId, eventName, eventData) + await this.groupEvent(eventGroupId, eventData) } else { - this.eventEmitter_.emit(eventName, data) + this.eventEmitter_.emit(eventData.eventName, { + data: eventData.data, + }) } } // Groups an event to a queue to be emitted upon explicit release - private async groupEvent( + private async groupEvent( eventGroupId: string, - eventName: string, - data: unknown + eventData: MessageBody ) { const groupedEvents = this.groupedEventsMap_.get(eventGroupId) || [] - groupedEvents.push({ eventName, data }) + groupedEvents.push(eventData) this.groupedEventsMap_.set(eventGroupId, groupedEvents) } @@ -116,7 +96,7 @@ export default class LocalEventBusService extends AbstractEventBusModuleService for (const event of groupedEvents) { const { eventName, data } = event - this.eventEmitter_.emit(eventName, data) + this.eventEmitter_.emit(eventName, { data }) } this.clearGroupedEvents(eventGroupId) diff --git a/packages/modules/event-bus-redis/src/services/__tests__/event-bus.js b/packages/modules/event-bus-redis/src/services/__tests__/event-bus.js deleted file mode 100644 index 898e8f6a90..0000000000 --- a/packages/modules/event-bus-redis/src/services/__tests__/event-bus.js +++ /dev/null @@ -1,318 +0,0 @@ -import { Queue, Worker } from "bullmq" -import RedisEventBusService from "../event-bus-redis" - -jest.genMockFromModule("bullmq") -jest.genMockFromModule("ioredis") -jest.mock("bullmq") -jest.mock("ioredis") - -const loggerMock = { - info: jest.fn().mockReturnValue(console.log), - warn: jest.fn().mockReturnValue(console.log), - error: jest.fn().mockReturnValue(console.log), -} - -const simpleModuleOptions = { redisUrl: "test-url" } -const moduleDeps = { - manager: {}, - logger: loggerMock, - eventBusRedisConnection: {}, -} - -describe("RedisEventBusService", () => { - let eventBus - - describe("constructor", () => { - beforeAll(() => { - jest.clearAllMocks() - }) - - it("Creates a queue + worker", () => { - eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, { - resources: "shared", - }) - - expect(Queue).toHaveBeenCalledTimes(1) - expect(Queue).toHaveBeenCalledWith("events-queue", { - connection: expect.any(Object), - prefix: "RedisEventBusService", - }) - - expect(Worker).toHaveBeenCalledTimes(1) - expect(Worker).toHaveBeenCalledWith( - "events-queue", - expect.any(Function), - { - connection: expect.any(Object), - prefix: "RedisEventBusService", - } - ) - }) - - it("Throws on isolated module declaration", () => { - try { - eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, { - resources: "isolated", - }) - } catch (error) { - expect(error.message).toEqual( - "At the moment this module can only be used with shared resources" - ) - } - }) - }) - - describe("emit", () => { - describe("Successfully emits events", () => { - beforeEach(() => { - jest.clearAllMocks() - }) - - it("Adds job to queue with default options", () => { - eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, { - resources: "shared", - }) - - eventBus.queue_.addBulk.mockImplementationOnce(() => "hi") - eventBus.emit("eventName", { hi: "1234" }) - - expect(eventBus.queue_.addBulk).toHaveBeenCalledTimes(1) - expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([ - { - name: "eventName", - data: { eventName: "eventName", data: { hi: "1234" } }, - opts: { - attempts: 1, - removeOnComplete: true, - }, - }, - ]) - }) - - it("Adds job to queue with custom options passed directly upon emitting", () => { - eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, { - resources: "shared", - }) - - eventBus.queue_.addBulk.mockImplementationOnce(() => "hi") - eventBus.emit( - "eventName", - { hi: "1234" }, - { attempts: 3, backoff: 5000, delay: 1000 } - ) - - expect(eventBus.queue_.addBulk).toHaveBeenCalledTimes(1) - expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([ - { - name: "eventName", - data: { eventName: "eventName", data: { hi: "1234" } }, - opts: { - attempts: 3, - backoff: 5000, - delay: 1000, - removeOnComplete: true, - }, - }, - ]) - }) - - it("Adds job to queue with module job options", () => { - eventBus = new RedisEventBusService( - moduleDeps, - { - ...simpleModuleOptions, - jobOptions: { - removeOnComplete: { - age: 5, - }, - attempts: 7, - }, - }, - { - resources: "shared", - } - ) - - eventBus.queue_.addBulk.mockImplementationOnce(() => "hi") - eventBus.emit("eventName", { hi: "1234" }) - - expect(eventBus.queue_.addBulk).toHaveBeenCalledTimes(1) - expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([ - { - name: "eventName", - data: { eventName: "eventName", data: { hi: "1234" } }, - opts: { - attempts: 7, - removeOnComplete: { - age: 5, - }, - }, - }, - ]) - }) - - it("Adds job to queue with default, local, and global options merged", () => { - eventBus = new RedisEventBusService( - moduleDeps, - { - ...simpleModuleOptions, - jobOptions: { - removeOnComplete: 5, - }, - }, - { - resources: "shared", - } - ) - - eventBus.queue_.addBulk.mockImplementationOnce(() => "hi") - eventBus.emit("eventName", { hi: "1234" }, { delay: 1000 }) - - expect(eventBus.queue_.addBulk).toHaveBeenCalledTimes(1) - expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([ - { - name: "eventName", - data: { eventName: "eventName", data: { hi: "1234" } }, - opts: { - attempts: 1, - removeOnComplete: 5, - delay: 1000, - }, - }, - ]) - }) - }) - }) - - describe("worker_", () => { - let result - - describe("Successfully processes the jobs", () => { - beforeEach(async () => { - jest.clearAllMocks() - - eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, { - resources: "shared", - }) - }) - - it("Processes a simple event with no options", async () => { - eventBus.subscribe("eventName", () => Promise.resolve("hi")) - - result = await eventBus.worker_({ - data: { eventName: "eventName", data: {} }, - opts: { attempts: 1 }, - }) - - expect(loggerMock.info).toHaveBeenCalledTimes(1) - expect(loggerMock.info).toHaveBeenCalledWith( - "Processing eventName which has 1 subscribers" - ) - - expect(result).toEqual(["hi"]) - }) - - it("Processes event with failing subscribers", async () => { - eventBus.subscribe("eventName", () => Promise.resolve("hi")) - eventBus.subscribe("eventName", () => Promise.reject("fail1")) - eventBus.subscribe("eventName", () => Promise.resolve("hi2")) - eventBus.subscribe("eventName", () => Promise.reject("fail2")) - - result = await eventBus.worker_({ - data: { eventName: "eventName", data: {} }, - update: (data) => data, - opts: { attempts: 1 }, - }) - - expect(loggerMock.info).toHaveBeenCalledTimes(1) - expect(loggerMock.info).toHaveBeenCalledWith( - "Processing eventName which has 4 subscribers" - ) - - expect(loggerMock.warn).toHaveBeenCalledTimes(3) - expect(loggerMock.warn).toHaveBeenCalledWith( - "An error occurred while processing eventName: fail1" - ) - expect(loggerMock.warn).toHaveBeenCalledWith( - "An error occurred while processing eventName: fail2" - ) - - expect(loggerMock.warn).toHaveBeenCalledWith( - "One or more subscribers of eventName failed. Retrying is not configured. Use 'attempts' option when emitting events." - ) - - expect(result).toEqual(["hi", "fail1", "hi2", "fail2"]) - }) - - it("Retries processing when subcribers fail, if configured - final attempt", async () => { - eventBus.subscribe("eventName", async () => Promise.resolve("hi"), { - subscriberId: "1", - }) - eventBus.subscribe("eventName", async () => Promise.reject("fail1"), { - subscriberId: "2", - }) - - result = await eventBus - .worker_({ - data: { - eventName: "eventName", - data: {}, - completedSubscriberIds: ["1"], - }, - attemptsMade: 2, - update: (data) => data, - opts: { attempts: 2 }, - }) - .catch((error) => void 0) - - expect(loggerMock.warn).toHaveBeenCalledTimes(1) - expect(loggerMock.warn).toHaveBeenCalledWith( - "An error occurred while processing eventName: fail1" - ) - - expect(loggerMock.info).toHaveBeenCalledTimes(2) - expect(loggerMock.info).toHaveBeenCalledWith( - "Final retry attempt for eventName" - ) - expect(loggerMock.info).toHaveBeenCalledWith( - "Retrying eventName which has 2 subscribers (1 of them failed)" - ) - }) - - it("Retries processing when subcribers fail, if configured", async () => { - eventBus.subscribe("eventName", async () => Promise.resolve("hi"), { - subscriberId: "1", - }) - eventBus.subscribe("eventName", async () => Promise.reject("fail1"), { - subscriberId: "2", - }) - - result = await eventBus - .worker_({ - data: { - eventName: "eventName", - data: {}, - completedSubscriberIds: ["1"], - }, - attemptsMade: 2, - updateData: (data) => data, - opts: { attempts: 3 }, - }) - .catch((err) => void 0) - - expect(loggerMock.warn).toHaveBeenCalledTimes(2) - expect(loggerMock.warn).toHaveBeenCalledWith( - "An error occurred while processing eventName: fail1" - ) - expect(loggerMock.warn).toHaveBeenCalledWith( - "One or more subscribers of eventName failed. Retrying..." - ) - - expect(loggerMock.info).toHaveBeenCalledTimes(1) - expect(loggerMock.info).toHaveBeenCalledWith( - "Retrying eventName which has 2 subscribers (1 of them failed)" - ) - }) - }) - }) -}) diff --git a/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts b/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts new file mode 100644 index 0000000000..9bc44da642 --- /dev/null +++ b/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts @@ -0,0 +1,491 @@ +import { Logger } from "@medusajs/types" +import { Queue, Worker } from "bullmq" +import { Redis } from "ioredis" +import RedisEventBusService from "../event-bus-redis" + +// const redisURL = "redis://localhost:6379" +// const client = new Redis(6379, redisURL, { +// // Lazy connect to properly handle connection errors +// lazyConnect: true, +// maxRetriesPerRequest: 0, +// }) + +jest.genMockFromModule("bullmq") +jest.genMockFromModule("ioredis") +jest.mock("bullmq") +jest.mock("ioredis") + +const loggerMock = { + info: jest.fn().mockReturnValue(console.log), + warn: jest.fn().mockReturnValue(console.log), + error: jest.fn().mockReturnValue(console.log), +} as unknown as Logger + +const redisMock = { + del: () => jest.fn(), + rpush: () => jest.fn(), + lrange: () => jest.fn(), + disconnect: () => jest.fn(), + expire: () => jest.fn(), +} as unknown as Redis + +const simpleModuleOptions = { redisUrl: "test-url" } +const moduleDeps = { + logger: loggerMock, + eventBusRedisConnection: redisMock, +} + +describe("RedisEventBusService", () => { + let eventBus: RedisEventBusService + let queue + let redis + + describe("constructor", () => { + beforeEach(async () => { + jest.clearAllMocks() + + eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, { + scope: "internal", + resources: "shared", + }) + }) + + it("Creates a queue + worker", () => { + expect(Queue).toHaveBeenCalledTimes(1) + expect(Queue).toHaveBeenCalledWith("events-queue", { + connection: expect.any(Object), + prefix: "RedisEventBusService", + }) + + expect(Worker).toHaveBeenCalledTimes(1) + expect(Worker).toHaveBeenCalledWith( + "events-queue", + expect.any(Function), + { + connection: expect.any(Object), + prefix: "RedisEventBusService", + } + ) + }) + + it("Throws on isolated module declaration", () => { + try { + eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, { + resources: "isolated", + scope: "internal", + }) + } catch (error) { + expect(error.message).toEqual( + "At the moment this module can only be used with shared resources" + ) + } + }) + }) + + describe("emit", () => { + describe("Successfully emits events", () => { + beforeEach(async () => { + jest.clearAllMocks() + + eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, { + scope: "internal", + resources: "shared", + }) + + queue = (eventBus as any).queue_ + queue.addBulk = jest.fn() + redis = (eventBus as any).eventBusRedisConnection_ + redis.rpush = jest.fn() + }) + + it("should add job to queue with default options", async () => { + await eventBus.emit([ + { + eventName: "eventName", + data: { + hi: "1234", + }, + }, + ]) + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([ + { + name: "eventName", + data: { eventName: "eventName", data: { hi: "1234" } }, + opts: { + attempts: 1, + removeOnComplete: true, + }, + }, + ]) + }) + + it("should add job to queue with custom options passed directly upon emitting", async () => { + await eventBus.emit( + [{ eventName: "eventName", data: { hi: "1234" } }], + { attempts: 3, backoff: 5000, delay: 1000 } + ) + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([ + { + name: "eventName", + data: { eventName: "eventName", data: { hi: "1234" } }, + opts: { + attempts: 3, + backoff: 5000, + delay: 1000, + removeOnComplete: true, + }, + }, + ]) + }) + + it("should add job to queue with module job options", async () => { + eventBus = new RedisEventBusService( + moduleDeps, + { + ...simpleModuleOptions, + jobOptions: { + removeOnComplete: { age: 5 }, + attempts: 7, + }, + }, + { + resources: "shared", + scope: "internal", + } + ) + + queue = (eventBus as any).queue_ + queue.addBulk = jest.fn() + + await eventBus.emit( + [ + { + eventName: "eventName", + data: { hi: "1234" }, + }, + ], + { attempts: 3, backoff: 5000, delay: 1000 } + ) + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([ + { + name: "eventName", + data: { eventName: "eventName", data: { hi: "1234" } }, + opts: { + attempts: 3, + backoff: 5000, + delay: 1000, + removeOnComplete: { + age: 5, + }, + }, + }, + ]) + }) + + it("should add job to queue with default, local, and global options merged", async () => { + eventBus = new RedisEventBusService( + moduleDeps, + { + ...simpleModuleOptions, + jobOptions: { + removeOnComplete: 5, + }, + }, + { + resources: "shared", + scope: "internal", + } + ) + + queue = (eventBus as any).queue_ + queue.addBulk = jest.fn() + + await eventBus.emit( + { + eventName: "eventName", + data: { hi: "1234" }, + }, + { delay: 1000 } + ) + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([ + { + name: "eventName", + data: { eventName: "eventName", data: { hi: "1234" } }, + opts: { + attempts: 1, + removeOnComplete: 5, + delay: 1000, + }, + }, + ]) + }) + + it("should successfully group events", async () => { + const options = { delay: 1000 } + const event = { + eventName: "eventName", + data: { hi: "1234" }, + metadata: { eventGroupId: "test-group-1" }, + } + + const [builtEvent] = (eventBus as any).buildEvents([event], options) + + await eventBus.emit(event, options) + + expect(queue.addBulk).toHaveBeenCalledTimes(0) + expect(redis.rpush).toHaveBeenCalledTimes(1) + expect(redis.rpush).toHaveBeenCalledWith( + "staging:test-group-1", + JSON.stringify(builtEvent) + ) + }) + + it("should successfully group, release and clear events", async () => { + const options = { delay: 1000 } + const events = [ + { + eventName: "grouped-event-1", + data: { hi: "1234" }, + metadata: { eventGroupId: "test-group-1" }, + }, + { + eventName: "ungrouped-event-2", + data: { hi: "1234" }, + }, + { + eventName: "grouped-event-2", + data: { hi: "1234" }, + metadata: { eventGroupId: "test-group-2" }, + }, + { + eventName: "grouped-event-3", + data: { hi: "1235" }, + metadata: { eventGroupId: "test-group-2" }, + }, + ] + + redis.del = jest.fn() + + await eventBus.emit(events, options) + + // Expect 1 event to have been send + // Expect 2 pushes to redis as there are 2 groups of events to push + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(redis.rpush).toHaveBeenCalledTimes(2) + expect(redis.del).not.toHaveBeenCalled() + + const [testGroup1Event] = (eventBus as any).buildEvents( + [events[0]], + options + ) + const [testGroup2Event] = (eventBus as any).buildEvents( + [events[2]], + options + ) + const [testGroup2Event2] = (eventBus as any).buildEvents( + [events[3]], + options + ) + + redis.lrange = jest.fn((key) => { + if (key === "staging:test-group-1") { + return Promise.resolve([JSON.stringify(testGroup1Event)]) + } + + if (key === "staging:test-group-2") { + return Promise.resolve([ + JSON.stringify(testGroup2Event), + JSON.stringify(testGroup2Event2), + ]) + } + }) + + queue = (eventBus as any).queue_ + queue.addBulk = jest.fn() + + await eventBus.releaseGroupedEvents("test-group-1") + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([testGroup1Event]) + expect(redis.del).toHaveBeenCalledTimes(1) + expect(redis.del).toHaveBeenCalledWith("staging:test-group-1") + + queue = (eventBus as any).queue_ + queue.addBulk = jest.fn() + redis.del = jest.fn() + + await eventBus.releaseGroupedEvents("test-group-2") + + expect(queue.addBulk).toHaveBeenCalledTimes(1) + expect(queue.addBulk).toHaveBeenCalledWith([ + testGroup2Event, + testGroup2Event2, + ]) + expect(redis.del).toHaveBeenCalledTimes(1) + expect(redis.del).toHaveBeenCalledWith("staging:test-group-2") + }) + }) + }) + + describe("worker_", () => { + let result + + describe("Successfully processes the jobs", () => { + beforeEach(async () => { + jest.clearAllMocks() + + eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, { + resources: "shared", + scope: "internal", + }) + }) + + it("should process a simple event with no options", async () => { + const test: string[] = [] + + eventBus.subscribe("eventName", () => { + test.push("success") + + return Promise.resolve() + }) + + // TODO: The typing for this is all over the place + await eventBus.worker_({ + data: { eventName: "eventName", data: { test: 1 } }, + opts: { attempts: 1 }, + } as any) + + expect(loggerMock.info).toHaveBeenCalledTimes(1) + expect(loggerMock.info).toHaveBeenCalledWith( + "Processing eventName which has 1 subscribers" + ) + + expect(test).toEqual(["success"]) + }) + + it("should process event with failing subscribers", async () => { + const test: string[] = [] + + eventBus.subscribe("eventName", () => { + test.push("hi") + return Promise.resolve() + }) + eventBus.subscribe("eventName", () => { + test.push("fail1") + return Promise.reject("fail1") + }) + eventBus.subscribe("eventName", () => { + test.push("hi2") + return Promise.resolve() + }) + eventBus.subscribe("eventName", () => { + test.push("fail2") + return Promise.reject("fail2") + }) + + result = await eventBus.worker_({ + data: { eventName: "eventName", data: { test: 1 } }, + opts: { attempts: 1 }, + update: (data) => data, + } as any) + + expect(loggerMock.info).toHaveBeenCalledTimes(1) + expect(loggerMock.info).toHaveBeenCalledWith( + "Processing eventName which has 4 subscribers" + ) + + expect(loggerMock.warn).toHaveBeenCalledTimes(3) + expect(loggerMock.warn).toHaveBeenCalledWith( + "An error occurred while processing eventName: fail1" + ) + expect(loggerMock.warn).toHaveBeenCalledWith( + "An error occurred while processing eventName: fail2" + ) + + expect(loggerMock.warn).toHaveBeenCalledWith( + "One or more subscribers of eventName failed. Retrying is not configured. Use 'attempts' option when emitting events." + ) + + expect(test.sort()).toEqual(["hi", "fail1", "hi2", "fail2"].sort()) + }) + + it("should retry processing when subcribers fail, if configured - final attempt", async () => { + eventBus.subscribe("eventName", async () => Promise.resolve(), { + subscriberId: "1", + }) + eventBus.subscribe("eventName", async () => Promise.reject("fail1"), { + subscriberId: "2", + }) + + result = await eventBus + .worker_({ + data: { + eventName: "eventName", + data: {}, + completedSubscriberIds: ["1"], + }, + attemptsMade: 2, + update: (data) => data, + opts: { attempts: 2 }, + } as any) + .catch((error) => void 0) + + expect(loggerMock.warn).toHaveBeenCalledTimes(1) + expect(loggerMock.warn).toHaveBeenCalledWith( + "An error occurred while processing eventName: fail1" + ) + + expect(loggerMock.info).toHaveBeenCalledTimes(2) + expect(loggerMock.info).toHaveBeenCalledWith( + "Final retry attempt for eventName" + ) + expect(loggerMock.info).toHaveBeenCalledWith( + "Retrying eventName which has 2 subscribers (1 of them failed)" + ) + }) + + it("should retry processing when subcribers fail, if configured", async () => { + eventBus.subscribe("eventName", async () => Promise.resolve(), { + subscriberId: "1", + }) + eventBus.subscribe("eventName", async () => Promise.reject("fail1"), { + subscriberId: "2", + }) + + result = await eventBus + .worker_({ + data: { + eventName: "eventName", + data: {}, + completedSubscriberIds: ["1"], + }, + attemptsMade: 2, + updateData: (data) => data, + opts: { attempts: 3 }, + } as any) + .catch((err) => void 0) + + expect(loggerMock.warn).toHaveBeenCalledTimes(2) + expect(loggerMock.warn).toHaveBeenCalledWith( + "An error occurred while processing eventName: fail1" + ) + expect(loggerMock.warn).toHaveBeenCalledWith( + "One or more subscribers of eventName failed. Retrying..." + ) + + expect(loggerMock.info).toHaveBeenCalledTimes(1) + expect(loggerMock.info).toHaveBeenCalledWith( + "Retrying eventName which has 2 subscribers (1 of them failed)" + ) + }) + }) + }) +}) diff --git a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts index 6d76d628e1..976cf56439 100644 --- a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts +++ b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts @@ -1,15 +1,25 @@ import { InternalModuleDeclaration } from "@medusajs/modules-sdk" -import { EmitData, Logger, Message } from "@medusajs/types" -import { AbstractEventBusModuleService, isString } from "@medusajs/utils" -import { BulkJobOptions, JobsOptions, Queue, Worker } from "bullmq" +import { Logger, Message, MessageBody } from "@medusajs/types" +import { + AbstractEventBusModuleService, + isPresent, + promiseAll, +} from "@medusajs/utils" +import { BulkJobOptions, Queue, Worker } from "bullmq" import { Redis } from "ioredis" -import { BullJob, EmitOptions, EventBusRedisModuleOptions } from "../types" +import { BullJob, EventBusRedisModuleOptions } from "../types" type InjectedDependencies = { logger: Logger eventBusRedisConnection: Redis } +type IORedisEventType = { + name: string + data: MessageBody + opts: BulkJobOptions +} + /** * Can keep track of multiple subscribers to different events and run the * subscribers when events happen. Events will run asynchronously. @@ -71,78 +81,137 @@ export default class RedisEventBusService extends AbstractEventBusModuleService }, } - /** - * Emit a single event - * @param {string} eventName - the name of the event to be process. - * @param data - the data to send to the subscriber. - * @param options - options to add the job with - */ - async emit( - eventName: string, - data: T, - options: Record - ): Promise - - /** - * Emit a number of events - * @param {EmitData} data - the data to send to the subscriber. - */ - async emit(data: EmitData[]): Promise - - async emit(data: Message[]): Promise - - async emit[] | Message[] = string>( - eventNameOrData: TInput, - data?: T, - options: BulkJobOptions | JobsOptions = {} - ): Promise { - const globalJobOptions = this.moduleOptions_.jobOptions ?? {} - - const isBulkEmit = Array.isArray(eventNameOrData) - + private buildEvents( + eventsData: Message[], + options: BulkJobOptions = {} + ): IORedisEventType[] { const opts = { // default options removeOnComplete: true, attempts: 1, // global options - ...globalJobOptions, - } as EmitOptions + ...(this.moduleOptions_.jobOptions ?? {}), + ...options, + } - const dataBody = isString(eventNameOrData) - ? data ?? (data as Message).body - : undefined + return eventsData.map((eventData) => { + const { options, ...eventBody } = eventData - const events = isBulkEmit - ? eventNameOrData.map((event) => ({ - name: event.eventName, - data: { - eventName: event.eventName, - data: (event as EmitData).data ?? (event as Message).body, - }, - opts: { - ...opts, - // local options - ...event.options, - }, - })) - : [ - { - name: eventNameOrData as string, - data: { eventName: eventNameOrData, data: dataBody }, - opts: { - ...opts, - // local options - ...options, - }, - }, - ] - - await this.queue_.addBulk(events) + return { + name: eventData.eventName, + data: eventBody, + opts: { + // options for event group + ...opts, + // options for a particular event + ...options, + }, + } + }) } - // TODO: Implement redis based staging + release - async releaseGroupedEvents(eventGroupId: string) {} - async clearGroupedEvents(eventGroupId: string) {} + /** + * Emit a single or number of events + * @param {Message} data - the data to send to the subscriber. + * @param {BulkJobOptions} data - the options to add to bull mq + */ + async emit( + eventsData: Message | Message[], + options: BulkJobOptions & { groupedEventsTTL?: number } = {} + ): Promise { + let eventsDataArray = Array.isArray(eventsData) ? eventsData : [eventsData] + + const { groupedEventsTTL = 600 } = options + delete options.groupedEventsTTL + + const eventsToEmit = eventsDataArray.filter( + (eventData) => !isPresent(eventData.metadata?.eventGroupId) + ) + + const eventsToGroup = eventsDataArray.filter((eventData) => + isPresent(eventData.metadata?.eventGroupId) + ) + + const groupEventsMap = new Map[]>() + + for (const event of eventsToGroup) { + const groupId = event.metadata?.eventGroupId! + const array = groupEventsMap.get(groupId) ?? [] + + array.push(event) + groupEventsMap.set(groupId, array) + } + + const promises: Promise[] = [] + + if (eventsToEmit.length) { + const emitData = this.buildEvents(eventsToEmit, options) + + promises.push(this.queue_.addBulk(emitData)) + } + + for (const [groupId, events] of groupEventsMap.entries()) { + if (!events?.length) { + continue + } + + // Set a TTL for the key of the list that is scoped to a group + // This will be helpful in preventing stale data from staying in redis for too long + // in the event the module fails to cleanup events. For long running workflows, setting a much higher + // TTL or even skipping the TTL would be required + this.setExpire(groupId, groupedEventsTTL) + + const eventsData = this.buildEvents(events, options) + + promises.push(this.groupEvents(groupId, eventsData)) + } + + await promiseAll(promises) + } + + private async setExpire(eventGroupId: string, ttl: number) { + if (!eventGroupId) { + return + } + + await this.eventBusRedisConnection_.expire(`staging:${eventGroupId}`, ttl) + } + + private async groupEvents( + eventGroupId: string, + events: IORedisEventType[] + ) { + await this.eventBusRedisConnection_.rpush( + `staging:${eventGroupId}`, + ...events.map((event) => JSON.stringify(event)) + ) + } + + private async getGroupedEvents( + eventGroupId: string + ): Promise { + return await this.eventBusRedisConnection_ + .lrange(`staging:${eventGroupId}`, 0, -1) + .then((result) => { + return result.map((jsonString) => JSON.parse(jsonString)) + }) + } + + async releaseGroupedEvents(eventGroupId: string) { + const groupedEvents = await this.getGroupedEvents(eventGroupId) + + await this.queue_.addBulk(groupedEvents) + + await this.clearGroupedEvents(eventGroupId) + } + + async clearGroupedEvents(eventGroupId: string) { + if (!eventGroupId) { + return + } + + await this.eventBusRedisConnection_.del(`staging:${eventGroupId}`) + } /** * Handles incoming jobs. diff --git a/packages/modules/fulfillment/integration-tests/__fixtures__/events.ts b/packages/modules/fulfillment/integration-tests/__fixtures__/events.ts index 204fffda1b..53b7946a98 100644 --- a/packages/modules/fulfillment/integration-tests/__fixtures__/events.ts +++ b/packages/modules/fulfillment/integration-tests/__fixtures__/events.ts @@ -10,15 +10,13 @@ export function buildExpectedEventMessageShape(options: { }): EventBusTypes.Message { return { eventName: options.eventName, - body: { - metadata: { - action: options.action, - eventGroupId: options.eventGroupId, - source: "fulfillment", - object: options.object, - }, - data: options.data, + metadata: { + action: options.action, + eventGroupId: options.eventGroupId, + source: "fulfillment", + object: options.object, }, + data: options.data, options: options.options, } } diff --git a/packages/modules/link-modules/src/services/link-module-service.ts b/packages/modules/link-modules/src/services/link-module-service.ts index bfd90055e9..0aa2d4d468 100644 --- a/packages/modules/link-modules/src/services/link-module-service.ts +++ b/packages/modules/link-modules/src/services/link-module-service.ts @@ -208,15 +208,13 @@ export default class LinkModuleService implements ILinkModule { await this.eventBusModuleService_?.emit>( (data as { id: unknown }[]).map(({ id }) => ({ eventName: this.entityName_ + "." + CommonEvents.ATTACHED, - body: { - metadata: { - source: this.serviceName_, - action: CommonEvents.ATTACHED, - object: this.entityName_, - eventGroupId: sharedContext.eventGroupId, - }, - data: { id }, + metadata: { + source: this.serviceName_, + action: CommonEvents.ATTACHED, + object: this.entityName_, + eventGroupId: sharedContext.eventGroupId, }, + data: { id }, })) ) @@ -261,15 +259,13 @@ export default class LinkModuleService implements ILinkModule { await this.eventBusModuleService_?.emit>( allData.map(({ id }) => ({ eventName: this.entityName_ + "." + CommonEvents.DETACHED, - body: { - metadata: { - source: this.serviceName_, - action: CommonEvents.DETACHED, - object: this.entityName_, - eventGroupId: sharedContext.eventGroupId, - }, - data: { id }, + metadata: { + source: this.serviceName_, + action: CommonEvents.DETACHED, + object: this.entityName_, + eventGroupId: sharedContext.eventGroupId, }, + data: { id }, })) ) } @@ -312,15 +308,13 @@ export default class LinkModuleService implements ILinkModule { await this.eventBusModuleService_?.emit>( (deletedEntities as { id: string }[]).map(({ id }) => ({ eventName: this.entityName_ + "." + CommonEvents.DETACHED, - body: { - metadata: { - source: this.serviceName_, - action: CommonEvents.DETACHED, - object: this.entityName_, - eventGroupId: sharedContext.eventGroupId, - }, - data: { id }, + metadata: { + source: this.serviceName_, + action: CommonEvents.DETACHED, + object: this.entityName_, + eventGroupId: sharedContext.eventGroupId, }, + data: { id }, })) ) @@ -372,15 +366,13 @@ export default class LinkModuleService implements ILinkModule { await this.eventBusModuleService_?.emit>( (restoredEntities as { id: string }[]).map(({ id }) => ({ eventName: this.entityName_ + "." + CommonEvents.ATTACHED, - body: { - metadata: { - source: this.serviceName_, - action: CommonEvents.ATTACHED, - object: this.entityName_, - eventGroupId: sharedContext.eventGroupId, - }, - data: { id }, + metadata: { + source: this.serviceName_, + action: CommonEvents.ATTACHED, + object: this.entityName_, + eventGroupId: sharedContext.eventGroupId, }, + data: { id }, })) ) diff --git a/packages/modules/product/integration-tests/__tests__/product-module-service/product-categories.spec.ts b/packages/modules/product/integration-tests/__tests__/product-module-service/product-categories.spec.ts index a6b9b46781..57e879bd8a 100644 --- a/packages/modules/product/integration-tests/__tests__/product-module-service/product-categories.spec.ts +++ b/packages/modules/product/integration-tests/__tests__/product-module-service/product-categories.spec.ts @@ -291,8 +291,9 @@ moduleIntegrationTestRunner({ }) expect(eventBusSpy).toHaveBeenCalledTimes(1) - expect(eventBusSpy).toHaveBeenCalledWith("product-category.created", { - id: category.id, + expect(eventBusSpy).toHaveBeenCalledWith({ + data: { id: category.id }, + eventName: "product-category.created", }) }) @@ -380,8 +381,9 @@ moduleIntegrationTestRunner({ }) expect(eventBusSpy).toHaveBeenCalledTimes(1) - expect(eventBusSpy).toHaveBeenCalledWith("product-category.updated", { - id: productCategoryZero.id, + expect(eventBusSpy).toHaveBeenCalledWith({ + data: { id: productCategoryZero.id }, + eventName: "product-category.updated", }) }) @@ -547,8 +549,9 @@ moduleIntegrationTestRunner({ await service.deleteCategory(productCategoryOne.id) expect(eventBusSpy).toHaveBeenCalledTimes(1) - expect(eventBusSpy).toHaveBeenCalledWith("product-category.deleted", { - id: productCategoryOne.id, + expect(eventBusSpy).toHaveBeenCalledWith({ + data: { id: productCategoryOne.id }, + eventName: "product-category.deleted", }) }) diff --git a/packages/modules/product/integration-tests/__tests__/product-module-service/product-collections.spec.ts b/packages/modules/product/integration-tests/__tests__/product-module-service/product-collections.spec.ts index 4b047534ce..85f18bb589 100644 --- a/packages/modules/product/integration-tests/__tests__/product-module-service/product-collections.spec.ts +++ b/packages/modules/product/integration-tests/__tests__/product-module-service/product-collections.spec.ts @@ -298,8 +298,8 @@ moduleIntegrationTestRunner({ expect(eventBusSpy).toHaveBeenCalledTimes(1) expect(eventBusSpy).toHaveBeenCalledWith([ { - eventName: "product-collection.updated", data: { id: collectionId }, + eventName: "product-collection.updated", }, ]) }) @@ -488,16 +488,14 @@ moduleIntegrationTestRunner({ const eventBusSpy = jest.spyOn(MockEventBusService.prototype, "emit") const collections = await service.createCollections([ - { - title: "New Collection", - }, + { title: "New Collection" }, ]) expect(eventBusSpy).toHaveBeenCalledTimes(1) expect(eventBusSpy).toHaveBeenCalledWith([ { - eventName: "product-collection.created", data: { id: collections[0].id }, + eventName: "product-collection.created", }, ]) }) diff --git a/packages/modules/product/src/services/product-module-service.ts b/packages/modules/product/src/services/product-module-service.ts index df083d8ec3..6637b71baf 100644 --- a/packages/modules/product/src/services/product-module-service.ts +++ b/packages/modules/product/src/services/product-module-service.ts @@ -51,8 +51,8 @@ import { UpdateTagInput, UpdateTypeInput, } from "../types" -import { entityNameToLinkableKeysMap, joinerConfig } from "./../joiner-config" import { eventBuilders } from "../utils" +import { entityNameToLinkableKeysMap, joinerConfig } from "./../joiner-config" type InjectedDependencies = { baseRepository: DAL.RepositoryService @@ -1134,10 +1134,10 @@ export default class ProductModuleService< sharedContext ) - await this.eventBusModuleService_?.emit( - ProductCategoryEvents.CATEGORY_CREATED, - { id: productCategory.id } - ) + await this.eventBusModuleService_?.emit({ + eventName: ProductCategoryEvents.CATEGORY_CREATED, + data: { id: productCategory.id }, + }) return productCategory } @@ -1154,10 +1154,10 @@ export default class ProductModuleService< sharedContext ) - await this.eventBusModuleService_?.emit( - ProductCategoryEvents.CATEGORY_UPDATED, - { id: productCategory.id } - ) + await this.eventBusModuleService_?.emit({ + eventName: ProductCategoryEvents.CATEGORY_UPDATED, + data: { id: productCategory.id }, + }) return await this.baseRepository_.serialize(productCategory, { populate: true, @@ -1171,10 +1171,10 @@ export default class ProductModuleService< ): Promise { await this.productCategoryService_.delete(categoryId, sharedContext) - await this.eventBusModuleService_?.emit( - ProductCategoryEvents.CATEGORY_DELETED, - { id: categoryId } - ) + await this.eventBusModuleService_?.emit({ + eventName: ProductCategoryEvents.CATEGORY_DELETED, + data: { id: categoryId }, + }) } create( diff --git a/packages/modules/user/integration-tests/__tests__/services/module/invite.spec.ts b/packages/modules/user/integration-tests/__tests__/services/module/invite.spec.ts index b58d62d481..7747455a2c 100644 --- a/packages/modules/user/integration-tests/__tests__/services/module/invite.spec.ts +++ b/packages/modules/user/integration-tests/__tests__/services/module/invite.spec.ts @@ -1,9 +1,12 @@ -import { IUserModuleService } from "@medusajs/types/dist/user" -import { MockEventBusService } from "medusa-test-utils" import { Modules } from "@medusajs/modules-sdk" +import { IUserModuleService } from "@medusajs/types/dist/user" import { UserEvents } from "@medusajs/utils" +import { + MockEventBusService, + moduleIntegrationTestRunner, + SuiteOptions, +} from "medusa-test-utils" import { createInvites } from "../../../__fixtures__/invite" -import { moduleIntegrationTestRunner, SuiteOptions } from "medusa-test-utils" jest.setTimeout(30000) @@ -178,9 +181,7 @@ moduleIntegrationTestRunner({ expect(eventBusSpy).toHaveBeenCalledTimes(1) expect(eventBusSpy).toHaveBeenCalledWith([ expect.objectContaining({ - body: expect.objectContaining({ - data: { id: "1" }, - }), + data: { id: "1" }, eventName: UserEvents.invite_updated, }), ]) @@ -197,9 +198,7 @@ moduleIntegrationTestRunner({ expect(eventBusSpy).toHaveBeenCalledTimes(1) expect(eventBusSpy).toHaveBeenCalledWith([ expect.objectContaining({ - body: expect.objectContaining({ - data: { id: "1" }, - }), + data: { id: "1" }, eventName: UserEvents.invite_token_generated, }), ]) @@ -228,27 +227,19 @@ moduleIntegrationTestRunner({ expect(eventBusSpy).toHaveBeenCalledTimes(1) expect(eventBusSpy).toHaveBeenCalledWith([ expect.objectContaining({ - body: expect.objectContaining({ - data: { id: "1" }, - }), + data: { id: "1" }, eventName: UserEvents.invite_created, }), expect.objectContaining({ - body: expect.objectContaining({ - data: { id: "2" }, - }), + data: { id: "2" }, eventName: UserEvents.invite_created, }), expect.objectContaining({ - body: expect.objectContaining({ - data: { id: "1" }, - }), + data: { id: "1" }, eventName: UserEvents.invite_token_generated, }), expect.objectContaining({ - body: expect.objectContaining({ - data: { id: "2" }, - }), + data: { id: "2" }, eventName: UserEvents.invite_token_generated, }), ]) diff --git a/packages/modules/user/integration-tests/__tests__/services/module/user.spec.ts b/packages/modules/user/integration-tests/__tests__/services/module/user.spec.ts index 77f3bcab8f..9654dbae19 100644 --- a/packages/modules/user/integration-tests/__tests__/services/module/user.spec.ts +++ b/packages/modules/user/integration-tests/__tests__/services/module/user.spec.ts @@ -1,9 +1,12 @@ -import { IUserModuleService } from "@medusajs/types/dist/user" -import { MockEventBusService } from "medusa-test-utils" import { Modules } from "@medusajs/modules-sdk" +import { IUserModuleService } from "@medusajs/types/dist/user" import { UserEvents } from "@medusajs/utils" +import { + MockEventBusService, + moduleIntegrationTestRunner, + SuiteOptions, +} from "medusa-test-utils" import { createUsers } from "../../../__fixtures__/user" -import { moduleIntegrationTestRunner, SuiteOptions } from "medusa-test-utils" jest.setTimeout(30000) @@ -190,9 +193,7 @@ moduleIntegrationTestRunner({ expect(eventBusSpy).toHaveBeenCalledTimes(1) expect(eventBusSpy).toHaveBeenCalledWith([ expect.objectContaining({ - body: expect.objectContaining({ - data: { id: "1" }, - }), + data: { id: "1" }, eventName: UserEvents.updated, }), ]) @@ -222,15 +223,11 @@ moduleIntegrationTestRunner({ expect(eventBusSpy).toHaveBeenCalledTimes(1) expect(eventBusSpy).toHaveBeenCalledWith([ expect.objectContaining({ - body: expect.objectContaining({ - data: { id: "1" }, - }), + data: { id: "1" }, eventName: UserEvents.created, }), expect.objectContaining({ - body: expect.objectContaining({ - data: { id: "2" }, - }), + data: { id: "2" }, eventName: UserEvents.created, }), ])