diff --git a/.changeset/rotten-beds-boil.md b/.changeset/rotten-beds-boil.md new file mode 100644 index 0000000000..3324332c11 --- /dev/null +++ b/.changeset/rotten-beds-boil.md @@ -0,0 +1,9 @@ +--- +"@medusajs/event-bus-local": patch +"@medusajs/event-bus-redis": patch +"@medusajs/core-flows": patch +"@medusajs/types": patch +"@medusajs/utils": patch +--- + +feat(core, event-bus): Compensate emit event step utility diff --git a/integration-tests/modules/__tests__/cart/store/cart.completion.ts b/integration-tests/modules/__tests__/cart/store/cart.completion.ts index 13bacf9e77..00ea50e1e1 100644 --- a/integration-tests/modules/__tests__/cart/store/cart.completion.ts +++ b/integration-tests/modules/__tests__/cart/store/cart.completion.ts @@ -12,6 +12,7 @@ import { medusaIntegrationTestRunner } from "@medusajs/test-utils" import { ICartModuleService, ICustomerModuleService, + IEventBusModuleService, IFulfillmentModuleService, IInventoryService, IPaymentModuleService, @@ -20,6 +21,7 @@ import { IRegionModuleService, ISalesChannelModuleService, IStockLocationService, + Message, } from "@medusajs/types" import { ContainerRegistrationKeys, @@ -60,6 +62,7 @@ medusaIntegrationTestRunner({ let defaultRegion let customer, storeHeadersWithCustomer let setPricingContextHook: any + let eventBus: IEventBusModuleService beforeAll(async () => { appContainer = getContainer() @@ -70,6 +73,7 @@ medusaIntegrationTestRunner({ productModule = appContainer.resolve(Modules.PRODUCT) pricingModule = appContainer.resolve(Modules.PRICING) paymentModule = appContainer.resolve(Modules.PAYMENT) + eventBus = appContainer.resolve(Modules.EVENT_BUS) fulfillmentModule = appContainer.resolve(Modules.FULFILLMENT) inventoryModule = appContainer.resolve(Modules.INVENTORY) stockLocationModule = appContainer.resolve(Modules.STOCK_LOCATION) @@ -747,6 +751,7 @@ medusaIntegrationTestRunner({ paymentSession.id ) }) + it("should complete cart when payment webhook and storefront are called in simultaneously", async () => { const salesChannel = await scModuleService.createSalesChannels({ name: "Webshop", @@ -876,6 +881,142 @@ medusaIntegrationTestRunner({ expect(fullOrder.payment_collections[0].captured_amount).toBe(3000) expect(fullOrder.payment_collections[0].status).toBe("completed") }) + + it("should clear events when complete cart fails after emitting events", async () => { + const salesChannel = await scModuleService.createSalesChannels({ + name: "Webshop", + }) + + const location = await stockLocationModule.createStockLocations({ + name: "Warehouse", + }) + + const region = await regionModuleService.createRegions({ + name: "US", + currency_code: "usd", + }) + + let cart = await cartModuleService.createCarts({ + currency_code: "usd", + sales_channel_id: salesChannel.id, + region_id: region.id, + }) + + await remoteLink.create([ + { + [Modules.SALES_CHANNEL]: { + sales_channel_id: salesChannel.id, + }, + [Modules.STOCK_LOCATION]: { + stock_location_id: location.id, + }, + }, + ]) + + cart = await cartModuleService.retrieveCart(cart.id, { + select: ["id", "region_id", "currency_code", "sales_channel_id"], + }) + + await addToCartWorkflow(appContainer).run({ + input: { + items: [ + { + title: "Test item", + subtitle: "Test subtitle", + thumbnail: "some-url", + requires_shipping: false, + is_discountable: false, + is_tax_inclusive: false, + unit_price: 3000, + metadata: { + foo: "bar", + }, + quantity: 1, + }, + { + title: "zero price item", + subtitle: "zero price item", + thumbnail: "some-url", + requires_shipping: false, + is_discountable: false, + is_tax_inclusive: false, + unit_price: 0, + quantity: 1, + }, + ], + cart_id: cart.id, + }, + }) + + cart = await cartModuleService.retrieveCart(cart.id, { + relations: ["items"], + }) + + await createPaymentCollectionForCartWorkflow(appContainer).run({ + input: { + cart_id: cart.id, + }, + }) + + const [paymentCollection] = + await paymentModule.listPaymentCollections({}) + + await createPaymentSessionsWorkflow(appContainer).run({ + input: { + payment_collection_id: paymentCollection.id, + provider_id: "pp_system_default", + context: {}, + data: {}, + }, + }) + + let grouppedEventBefore: Message[] = [] + let eventGroupId!: string + + /** + * Register order.placed listener to trigger the event group + * registration and be able to check the event group during + * the workflow execution against it after compensation + */ + + eventBus.subscribe("order.placed", async () => { + // noop + }) + + const workflow = completeCartWorkflow(appContainer) + + workflow.addAction("throw", { + invoke: async function failStep({ context }) { + eventGroupId = context!.eventGroupId! + grouppedEventBefore = ( + (eventBus as any).groupedEventsMap_ as Map + ).get(context!.eventGroupId!) + + throw new Error( + `Failed to do something before ending complete cart workflow` + ) + }, + }) + + const { errors } = await workflow.run({ + input: { + id: cart.id, + }, + throwOnError: false, + }) + + const grouppedEventAfter = + ((eventBus as any).groupedEventsMap_ as Map).get( + eventGroupId + ) ?? [] + + expect(grouppedEventBefore).toHaveLength(1) + expect(grouppedEventAfter).toHaveLength(0) // events have been compensated + + expect(errors[0].error.message).toBe( + "Failed to do something before ending complete cart workflow" + ) + }) }) }) }, diff --git a/packages/core/core-flows/src/common/steps/emit-event.ts b/packages/core/core-flows/src/common/steps/emit-event.ts index 8f4571c7dd..a049aeb137 100644 --- a/packages/core/core-flows/src/common/steps/emit-event.ts +++ b/packages/core/core-flows/src/common/steps/emit-event.ts @@ -5,6 +5,7 @@ import { import { Modules } from "@medusajs/framework/utils" import { StepExecutionContext, + StepResponse, createStep, } from "@medusajs/framework/workflows-sdk" @@ -85,6 +86,25 @@ export const emitEventStep = createStep( } await eventBus.emit(message) + + return new StepResponse({ + eventGroupId: context.eventGroupId, + eventName: input.eventName, + }) }, - async (data: void) => {} + async (data, context) => { + if (!data || !data?.eventGroupId) { + return + } + + const { container } = context + + const eventBus: IEventBusModuleService = container.resolve( + Modules.EVENT_BUS + ) + + await eventBus.clearGroupedEvents(data!.eventGroupId, { + eventNames: [data!.eventName], + }) + } ) diff --git a/packages/core/types/src/event-bus/event-bus-module.ts b/packages/core/types/src/event-bus/event-bus-module.ts index 5c09b33ac5..27604e8442 100644 --- a/packages/core/types/src/event-bus/event-bus-module.ts +++ b/packages/core/types/src/event-bus/event-bus-module.ts @@ -3,14 +3,14 @@ import { Message, Subscriber, SubscriberContext } from "./common" export interface IEventBusModuleService { /** * This method emits one or more events. Subscribers listening to the event(s) are executed asynchronously. - * + * * @param data - The details of the events to emit. * @param options - Additional options for the event. - * + * * @example - * await eventModuleService.emit({ - * name: "user.created", - * data: { + * await eventModuleService.emit({ + * name: "user.created", + * data: { * user_id: "user_123" * } * }) @@ -22,12 +22,12 @@ export interface IEventBusModuleService { /** * This method adds a subscriber to an event. It's mainly used internally to register subscribers. - * + * * @param eventName - The name of the event to subscribe to. * @param subscriber - The subscriber function to execute when the event is emitted. * @param context - The context of the subscriber. * @returns The instance of the Event Module - * + * * @example * eventModuleService.subscribe("user.created", async (data) => { * console.log("User created", data) @@ -41,12 +41,12 @@ export interface IEventBusModuleService { /** * This method removes a subscriber from an event. It's mainly used internally to unregister subscribers. - * + * * @param eventName - The name of the event to unsubscribe from. * @param subscriber - The subscriber function to remove. * @param context - The context of the subscriber. * @returns The instance of the Event Module - * + * * @example * eventModuleService.unsubscribe("user.created", async (data) => { * console.log("User created", data) @@ -61,9 +61,9 @@ export interface IEventBusModuleService { /** * This method emits all events in the specified group. Grouped events are useful when you have distributed transactions * where you need to explicitly group, release and clear events upon lifecycle events of a transaction. - * + * * @param eventGroupId - The ID of the event group. - * + * * @example * await eventModuleService.releaseGroupedEvents("group_123") */ @@ -71,11 +71,19 @@ export interface IEventBusModuleService { /** * This method removes all events in the specified group. Grouped events are useful when you have distributed transactions * where you need to explicitly group, release and clear events upon lifecycle events of a transaction. - * + * * @param eventGroupId - The ID of the event group. - * + * @param options - Additional options for the event. + * @param options.eventNames - The names of the events to clear. If not provided, The group will + * be entirely cleared. + * * @example * await eventModuleService.clearGroupedEvents("group_123") */ - clearGroupedEvents(eventGroupId: string): Promise + clearGroupedEvents( + eventGroupId: string, + options?: { + eventNames?: string[] + } + ): Promise } diff --git a/packages/core/utils/src/event-bus/index.ts b/packages/core/utils/src/event-bus/index.ts index 63a9974497..cb9b10ec4f 100644 --- a/packages/core/utils/src/event-bus/index.ts +++ b/packages/core/utils/src/event-bus/index.ts @@ -38,8 +38,16 @@ export abstract class AbstractEventBusModuleService */ // Given a eventGroupId, all the grouped events will be released abstract releaseGroupedEvents(eventGroupId: string): Promise - // Given a eventGroupId, all the grouped events will be cleared - abstract clearGroupedEvents(eventGroupId: string): Promise + + // Given a eventGroupId, all the grouped events will be cleared unless eventNames are provided + // If eventNames are provided, only the events that match the eventNames will be cleared from the + // group + abstract clearGroupedEvents( + eventGroupId: string, + options?: { + eventNames?: string[] + } + ): Promise protected storeSubscribers({ event, diff --git a/packages/modules/event-bus-local/integration-tests/__tests__/index.spec.ts b/packages/modules/event-bus-local/integration-tests/__tests__/index.spec.ts new file mode 100644 index 0000000000..dbf50270a7 --- /dev/null +++ b/packages/modules/event-bus-local/integration-tests/__tests__/index.spec.ts @@ -0,0 +1,141 @@ +import { + CommonEvents, + composeMessage, + Modules, +} from "@medusajs/framework/utils" +import { moduleIntegrationTestRunner } from "@medusajs/test-utils" +import { IEventBusModuleService } from "@medusajs/types" + +moduleIntegrationTestRunner({ + moduleName: Modules.EVENT_BUS, + testSuite: ({ service: eventBus }) => { + describe("Event Bus Local Service", () => { + it("should emit an event", async () => { + const subscriber = jest.fn() + eventBus.subscribe("test", subscriber) + + await eventBus.emit( + composeMessage("test", { + data: { + test: "test", + }, + action: CommonEvents.CREATED, + source: "test", + object: "test", + }) + ) + + expect(subscriber).toHaveBeenCalledWith({ + data: { + test: "test", + }, + metadata: { + source: "test", + object: "test", + action: "created", + }, + name: "test", + }) + + eventBus.unsubscribe("test", subscriber) + }) + + it("should release grouped events", async () => { + const subscriber = jest.fn() + eventBus.subscribe("test", subscriber) + + await eventBus.emit( + composeMessage("test", { + data: { + test: "test", + }, + context: { + eventGroupId: "123", + }, + action: CommonEvents.CREATED, + source: "test", + object: "test", + }) + ) + + expect(subscriber).toHaveBeenCalledTimes(0) + + await eventBus.releaseGroupedEvents("123") + + expect(subscriber).toHaveBeenCalledTimes(1) + + expect(subscriber).toHaveBeenCalledWith({ + data: { + test: "test", + }, + metadata: { + source: "test", + eventGroupId: "123", + object: "test", + action: "created", + }, + name: "test", + }) + + eventBus.unsubscribe("test", subscriber) + }) + + it("should clear grouped events", async () => { + const subscriber = jest.fn() + eventBus.subscribe("test", subscriber) + + await eventBus.emit( + composeMessage("test", { + data: { + test: "test", + }, + context: { + eventGroupId: "123", + }, + action: CommonEvents.CREATED, + source: "test", + object: "test", + }) + ) + + expect(subscriber).toHaveBeenCalledTimes(0) + + await eventBus.clearGroupedEvents("123") + await eventBus.releaseGroupedEvents("123") + + expect(subscriber).toHaveBeenCalledTimes(0) + + eventBus.unsubscribe("test", subscriber) + }) + + it("should clear grouped events with event names", async () => { + const subscriber = jest.fn() + eventBus.subscribe("test", subscriber) + + await eventBus.emit( + composeMessage("test", { + data: { + test: "test", + }, + context: { + eventGroupId: "123", + }, + action: CommonEvents.CREATED, + source: "test", + object: "test", + }) + ) + + await eventBus.clearGroupedEvents("123", { + eventNames: ["test"], + }) + + await eventBus.releaseGroupedEvents("123") + + expect(subscriber).toHaveBeenCalledTimes(0) + + eventBus.unsubscribe("test", subscriber) + }) + }) + }, +}) diff --git a/packages/modules/event-bus-local/package.json b/packages/modules/event-bus-local/package.json index 76c3d33e5b..3957a13904 100644 --- a/packages/modules/event-bus-local/package.json +++ b/packages/modules/event-bus-local/package.json @@ -33,7 +33,8 @@ "scripts": { "watch": "tsc --build --watch", "build": "rimraf dist && tsc --build", - "test": "jest --passWithNoTests" + "test": "jest --passWithNoTests", + "test:integration": "jest --no-cache --maxWorkers=50% --bail --detectOpenHandles --forceExit --logHeapUsage -- integration-tests/__tests__/**/*.spec.ts" }, "dependencies": { "ulid": "^2.3.0" diff --git a/packages/modules/event-bus-local/src/services/event-bus-local.ts b/packages/modules/event-bus-local/src/services/event-bus-local.ts index 8b6780852c..a7cf76c454 100644 --- a/packages/modules/event-bus-local/src/services/event-bus-local.ts +++ b/packages/modules/event-bus-local/src/services/event-bus-local.ts @@ -130,8 +130,19 @@ export default class LocalEventBusService extends AbstractEventBusModuleService await this.clearGroupedEvents(eventGroupId) } - async clearGroupedEvents(eventGroupId: string) { - this.groupedEventsMap_.delete(eventGroupId) + async clearGroupedEvents( + eventGroupId: string, + { eventNames }: { eventNames?: string[] } = {} + ) { + if (eventNames?.length) { + const groupedEvents = this.groupedEventsMap_.get(eventGroupId) || [] + const eventsToKeep = groupedEvents.filter( + (event) => !eventNames!.includes(event.name) + ) + this.groupedEventsMap_.set(eventGroupId, eventsToKeep) + } else { + this.groupedEventsMap_.delete(eventGroupId) + } } subscribe( diff --git a/packages/modules/event-bus-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/event-bus-redis/integration-tests/__tests__/index.spec.ts new file mode 100644 index 0000000000..c82d783ba2 --- /dev/null +++ b/packages/modules/event-bus-redis/integration-tests/__tests__/index.spec.ts @@ -0,0 +1,147 @@ +import { + CommonEvents, + composeMessage, + Modules, +} from "@medusajs/framework/utils" +import { moduleIntegrationTestRunner } from "@medusajs/test-utils" +import { IEventBusModuleService } from "@medusajs/types" + +moduleIntegrationTestRunner({ + moduleName: Modules.EVENT_BUS, + moduleOptions: { + redis: { + host: "localhost", + port: 6379, + }, + }, + testSuite: ({ service: eventBus }) => { + describe("Event Bus Redis Service", () => { + it("should emit an event", async () => { + const subscriber = jest.fn() + eventBus.subscribe("test", subscriber) + + await eventBus.emit( + composeMessage("test", { + data: { + test: "test", + }, + action: CommonEvents.CREATED, + source: "test", + object: "test", + }) + ) + + expect(subscriber).toHaveBeenCalledWith({ + data: { + test: "test", + }, + metadata: { + source: "test", + object: "test", + action: "created", + }, + name: "test", + }) + + eventBus.unsubscribe("test", subscriber) + }) + + it("should release grouped events", async () => { + const subscriber = jest.fn() + eventBus.subscribe("test", subscriber) + + await eventBus.emit( + composeMessage("test", { + data: { + test: "test", + }, + context: { + eventGroupId: "123", + }, + action: CommonEvents.CREATED, + source: "test", + object: "test", + }) + ) + + expect(subscriber).toHaveBeenCalledTimes(0) + + await eventBus.releaseGroupedEvents("123") + + expect(subscriber).toHaveBeenCalledTimes(1) + + expect(subscriber).toHaveBeenCalledWith({ + data: { + test: "test", + }, + metadata: { + source: "test", + eventGroupId: "123", + object: "test", + action: "created", + }, + name: "test", + }) + + eventBus.unsubscribe("test", subscriber) + }) + + it("should clear grouped events", async () => { + const subscriber = jest.fn() + eventBus.subscribe("test", subscriber) + + await eventBus.emit( + composeMessage("test", { + data: { + test: "test", + }, + context: { + eventGroupId: "123", + }, + action: CommonEvents.CREATED, + source: "test", + object: "test", + }) + ) + + expect(subscriber).toHaveBeenCalledTimes(0) + + await eventBus.clearGroupedEvents("123") + await eventBus.releaseGroupedEvents("123") + + expect(subscriber).toHaveBeenCalledTimes(0) + + eventBus.unsubscribe("test", subscriber) + }) + + it("should clear grouped events with event names", async () => { + const subscriber = jest.fn() + eventBus.subscribe("test", subscriber) + + await eventBus.emit( + composeMessage("test", { + data: { + test: "test", + }, + context: { + eventGroupId: "123", + }, + action: CommonEvents.CREATED, + source: "test", + object: "test", + }) + ) + + await eventBus.clearGroupedEvents("123", { + eventNames: ["test"], + }) + + await eventBus.releaseGroupedEvents("123") + + expect(subscriber).toHaveBeenCalledTimes(0) + + eventBus.unsubscribe("test", subscriber) + }) + }) + }, +}) diff --git a/packages/modules/event-bus-redis/package.json b/packages/modules/event-bus-redis/package.json index 5221bf404d..6c3473f5e1 100644 --- a/packages/modules/event-bus-redis/package.json +++ b/packages/modules/event-bus-redis/package.json @@ -33,7 +33,8 @@ "scripts": { "watch": "tsc --build --watch", "build": "rimraf dist && tsc --build", - "test": "jest --silent --bail --maxWorkers=50% --forceExit" + "test": "jest --silent --bail --maxWorkers=50% --forceExit", + "test:integration": "jest --no-cache --maxWorkers=50% --bail --detectOpenHandles --forceExit --logHeapUsage -- integration-tests/__tests__/**/*.spec.ts" }, "dependencies": { "bullmq": "5.13.0", diff --git a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts index b921ea0572..958bec654b 100644 --- a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts +++ b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts @@ -218,11 +218,49 @@ export default class RedisEventBusService extends AbstractEventBusModuleService await this.clearGroupedEvents(eventGroupId) } - async clearGroupedEvents(eventGroupId: string) { + async clearGroupedEvents( + eventGroupId: string, + { + eventNames, + }: { + eventNames?: string[] + } = {} + ) { if (!eventGroupId) { return } + if (eventNames?.length) { + /** + * If any event names are provided, we keep all events except the ones that match the event + * names. which allow to partially clear an event group. + */ + + const eventsToKeep = await this.eventBusRedisConnection_ + .lrange(`staging:${eventGroupId}`, 0, -1) + .then((result) => { + return result + .map((jsonString) => JSON.parse(jsonString)) + .filter((event) => !eventNames.includes(event.name)) + }) + + // Create a pipeline + const pipeline = this.eventBusRedisConnection_.pipeline() + + // Empty the current list + pipeline.del(`staging:${eventGroupId}`) + + // Add the remaining events to the list + pipeline.rpush( + `staging:${eventGroupId}`, + ...eventsToKeep.map((event) => JSON.stringify(event)) + ) + + await pipeline.exec() + + return + } + await this.eventBusRedisConnection_.unlink(`staging:${eventGroupId}`) }