diff --git a/packages/core/core-flows/src/common/steps/release-event.ts b/packages/core/core-flows/src/common/steps/release-event.ts index 2cf872d124..d566ed0ff9 100644 --- a/packages/core/core-flows/src/common/steps/release-event.ts +++ b/packages/core/core-flows/src/common/steps/release-event.ts @@ -4,17 +4,16 @@ import { createStep } from "@medusajs/workflows-sdk" export const releaseEventsStepId = "release-events-step" export const releaseEventsStep = createStep( releaseEventsStepId, - async ( - input: void, - { - container, - metadata: { - /* eventGroupId */ - }, + async (input: void, { container, eventGroupId }) => { + const eventBusService = container.resolve( + ModuleRegistrationName.EVENT_BUS, + { allowUnregistered: true } + ) + if (!eventBusService || !eventGroupId) { + return } - ) => { - const eventBus = container.resolve(ModuleRegistrationName.EVENT_BUS) - // await eventBus.release + + await eventBusService.releaseGroupedEvents(eventGroupId) }, async (data: void) => {} ) diff --git a/packages/core/medusa-test-utils/src/mock-event-bus-service.ts b/packages/core/medusa-test-utils/src/mock-event-bus-service.ts index 40dd380c59..51e6b9e021 100644 --- a/packages/core/medusa-test-utils/src/mock-event-bus-service.ts +++ b/packages/core/medusa-test-utils/src/mock-event-bus-service.ts @@ -22,4 +22,12 @@ export default class EventBusService implements IEventBusModuleService { ): this { return this } + + releaseGroupedEvents(eventGroupId: string): Promise { + return Promise.resolve() + } + + clearGroupedEvents(eventGroupId: string): Promise { + return Promise.resolve() + } } diff --git a/packages/core/orchestration/src/workflow/local-workflow.ts b/packages/core/orchestration/src/workflow/local-workflow.ts index 239aca105d..6e0aeefed6 100644 --- a/packages/core/orchestration/src/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/workflow/local-workflow.ts @@ -39,7 +39,7 @@ export class LocalWorkflow { protected handlers: Map protected medusaContext?: Context - get container() { + get container(): MedusaContainer { return this.container_ } diff --git a/packages/core/types/src/event-bus/event-bus-module.ts b/packages/core/types/src/event-bus/event-bus-module.ts index 199733ef35..1ff808002c 100644 --- a/packages/core/types/src/event-bus/event-bus-module.ts +++ b/packages/core/types/src/event-bus/event-bus-module.ts @@ -17,4 +17,7 @@ export interface IEventBusModuleService { subscriber: Subscriber, context?: SubscriberContext ): this + + releaseGroupedEvents(eventGroupId: string): Promise + clearGroupedEvents(eventGroupId: string): Promise } diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index 0c8872d36e..a9b2d653ef 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -7,7 +7,11 @@ import { TransactionState, } from "@medusajs/orchestration" import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" -import { isPresent, MedusaContextType } from "@medusajs/utils" +import { + isPresent, + MedusaContextType, + ModuleRegistrationName, +} from "@medusajs/utils" import { EOL } from "os" import { ulid } from "ulid" import { MedusaWorkflow } from "../medusa-workflow" @@ -21,6 +25,7 @@ import { MainExportedWorkflow, WorkflowResult, } from "./type" +import { ContainerRegistrationKeys } from "@medusajs/utils/dist" function createContextualWorkflowRunner< TData = unknown, @@ -80,6 +85,9 @@ function createContextualWorkflowRunner< } const { eventGroupId } = context + + attachOnFinishReleaseEvents(events, eventGroupId!, flow) + const flowMetadata = { eventGroupId, } @@ -466,3 +474,58 @@ export const exportWorkflow = ( MedusaWorkflow.registerWorkflow(workflowId, exportedWorkflow) return exportedWorkflow as MainExportedWorkflow } + +function attachOnFinishReleaseEvents( + events: DistributedTransactionEvents = {}, + eventGroupId: string, + flow: LocalWorkflow +) { + const onFinish = events.onFinish + + const wrappedOnFinish = async (args: { + transaction: DistributedTransaction + result?: unknown + errors?: unknown[] + }) => { + await onFinish?.(args) + + const eventBusService = (flow.container as MedusaContainer).resolve( + ModuleRegistrationName.EVENT_BUS, + { allowUnregistered: true } + ) + + if (!eventBusService || !eventGroupId) { + return + } + + const logger = + (flow.container as MedusaContainer).resolve( + ContainerRegistrationKeys.LOGGER, + { allowUnregistered: true } + ) || console + + const { transaction } = args + const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED] + + if (failedStatus.includes(transaction.getState())) { + return await eventBusService + .clearGroupedEvents(eventGroupId) + .catch(() => { + logger.warn( + `Failed to clear events for eventGroupId - ${eventGroupId}` + ) + }) + } + + await eventBusService.releaseGroupedEvents(eventGroupId).catch((e) => { + logger.error( + `Failed to release grouped events for eventGroupId: ${eventGroupId}`, + e + ) + + return flow.cancel(transaction) + }) + } + + events.onFinish = wrappedOnFinish +} diff --git a/packages/core/workflows-sdk/src/helper/__tests__/compose.ts b/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts similarity index 94% rename from packages/core/workflows-sdk/src/helper/__tests__/compose.ts rename to packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts index 9b5f672b7f..e494de1e58 100644 --- a/packages/core/workflows-sdk/src/helper/__tests__/compose.ts +++ b/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts @@ -1,17 +1,23 @@ import { WorkflowManager, WorkflowScheduler } from "@medusajs/orchestration" -import { promiseAll } from "@medusajs/utils" +import { + composeMessage, + createMedusaContainer, + ModuleRegistrationName, + promiseAll, +} from "@medusajs/utils" import { createStep, createWorkflow, hook, - MedusaWorkflow, parallelize, StepResponse, transform, -} from "../.." +} from ".." +import { MedusaWorkflow } from "../../../medusa-workflow" +import { asValue } from "awilix" +import { IDistributedSchedulerStorage, SchedulerOptions } from "../../dist" jest.setTimeout(30000) -import { IDistributedSchedulerStorage, SchedulerOptions } from "../../dist" class MockSchedulerStorage implements IDistributedSchedulerStorage { async schedule( @@ -2086,4 +2092,125 @@ describe("Workflow composer", function () { return [{ step1_nested_obj: obj.nested }, s2] }) }) + + it("should emit grouped events once the workflow is executed and finished", async () => { + const container = createMedusaContainer() + container.register({ + [ModuleRegistrationName.EVENT_BUS]: asValue({ + releaseGroupedEvents: jest + .fn() + .mockImplementation(() => Promise.resolve()), + emit: jest.fn(), + }), + }) + + const mockStep1Fn = jest + .fn() + .mockImplementation( + async (input, { context: stepContext, container }) => { + const eventBusService = container.resolve( + ModuleRegistrationName.EVENT_BUS + ) + + await eventBusService.emit( + "event1", + composeMessage("event1", { + data: { eventGroupId: stepContext.eventGroupId }, + context: stepContext, + object: "object", + source: "service", + action: "action", + }) + ) + } + ) + + const step1 = createStep("step1", mockStep1Fn) + + const workflow = createWorkflow("workflow1", function (input) { + step1(input) + }) + + await workflow(container).run({ + context: { + eventGroupId: "event-group-id", + }, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + + const eventBusMock = container.resolve(ModuleRegistrationName.EVENT_BUS) + expect(eventBusMock.emit).toHaveBeenCalledTimes(1) + expect(eventBusMock.emit.mock.calls[0][0]).toEqual("event1") + + expect(eventBusMock.releaseGroupedEvents).toHaveBeenCalledTimes(1) + expect(eventBusMock.releaseGroupedEvents.mock.calls[0][0]).toEqual( + "event-group-id" + ) + }) + + it("should clear grouped events on fail state", async () => { + const container = createMedusaContainer() + container.register({ + [ModuleRegistrationName.EVENT_BUS]: asValue({ + releaseGroupedEvents: jest + .fn() + .mockImplementation(() => Promise.resolve()), + clearGroupedEvents: jest + .fn() + .mockImplementation(() => Promise.resolve()), + emit: jest.fn(), + }), + }) + + const mockStep1Fn = jest + .fn() + .mockImplementation( + async (input, { context: stepContext, container }) => { + const eventBusService = container.resolve( + ModuleRegistrationName.EVENT_BUS + ) + + await eventBusService.emit( + "event1", + composeMessage("event1", { + data: { eventGroupId: stepContext.eventGroupId }, + context: stepContext, + object: "object", + source: "service", + action: "action", + }) + ) + } + ) + + const mockStep2Fn = jest.fn().mockImplementation(() => { + throw new Error("invoke fail") + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + + const workflow = createWorkflow("workflow1", function (input) { + step1(input) + step2() + }) + + await workflow(container).run({ + context: { + eventGroupId: "event-group-id", + }, + throwOnError: false, + }) + + const eventBusMock = container.resolve(ModuleRegistrationName.EVENT_BUS) + + expect(eventBusMock.emit).toHaveBeenCalledTimes(1) + expect(eventBusMock.releaseGroupedEvents).toHaveBeenCalledTimes(0) + expect(eventBusMock.clearGroupedEvents).toHaveBeenCalledTimes(1) + expect(eventBusMock.clearGroupedEvents).toHaveBeenCalledWith( + "event-group-id" + ) + }) })