diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index dca3119a10..0dd34002cf 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -28,6 +28,10 @@ export type TransactionFlow = { options?: TransactionModelOptions definition: TransactionStepsDefinition transactionId: string + metadata?: { + eventGroupId?: string + [key: string]: unknown + } hasAsyncSteps: boolean hasFailedSteps: boolean hasWaitingSteps: boolean @@ -839,7 +843,7 @@ export class TransactionOrchestrator extends EventEmitter { await this.executeNext(transaction) } - private createTransactionFlow(transactionId: string): TransactionFlow { + private createTransactionFlow(transactionId: string, flowMetadata?: TransactionFlow['metadata']): TransactionFlow { const [steps, features] = TransactionOrchestrator.buildSteps( this.definition ) @@ -864,6 +868,7 @@ export class TransactionOrchestrator extends EventEmitter { modelId: this.id, options: this.options, transactionId: transactionId, + metadata: flowMetadata, hasAsyncSteps, hasFailedSteps: false, hasSkippedSteps: false, @@ -1006,11 +1011,13 @@ export class TransactionOrchestrator extends EventEmitter { * @param transactionId - unique identifier of the transaction * @param handler - function to handle action of the transaction * @param payload - payload to be passed to all the transaction steps + * @param flowMetadata - flow metadata which can include event group id for example */ public async beginTransaction( transactionId: string, handler: TransactionStepHandler, - payload?: unknown + payload?: unknown, + flowMetadata?: TransactionFlow["metadata"] ): Promise { const existingTransaction = await TransactionOrchestrator.loadTransactionById(this.id, transactionId) @@ -1018,7 +1025,7 @@ export class TransactionOrchestrator extends EventEmitter { let newTransaction = false let modelFlow: TransactionFlow if (!existingTransaction) { - modelFlow = this.createTransactionFlow(transactionId) + modelFlow = this.createTransactionFlow(transactionId, flowMetadata) newTransaction = true } else { modelFlow = existingTransaction.flow diff --git a/packages/core/orchestration/src/workflow/local-workflow.ts b/packages/core/orchestration/src/workflow/local-workflow.ts index dd4ea711fb..239aca105d 100644 --- a/packages/core/orchestration/src/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/workflow/local-workflow.ts @@ -13,6 +13,7 @@ import { DistributedTransaction, DistributedTransactionEvent, DistributedTransactionEvents, + TransactionFlow, TransactionModelOptions, TransactionOrchestrator, TransactionStepsDefinition, @@ -332,7 +333,8 @@ export class LocalWorkflow { uniqueTransactionId: string, input?: unknown, context?: Context, - subscribe?: DistributedTransactionEvents + subscribe?: DistributedTransactionEvents, + flowMetadata?: TransactionFlow["metadata"] ) { if (this.flow.hasChanges) { this.commit() @@ -343,7 +345,8 @@ export class LocalWorkflow { const transaction = await orchestrator.beginTransaction( uniqueTransactionId, handler(this.container_, context), - input + input, + flowMetadata ) const { cleanUpEventListeners } = this.registerEventCallbacks({ diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index 4d0d2f21e6..0c8872d36e 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -1,10 +1,12 @@ import { MedusaModule } from "@medusajs/modules-sdk" import { + DistributedTransaction, + DistributedTransactionEvents, LocalWorkflow, TransactionHandlerType, TransactionState, } from "@medusajs/orchestration" -import { LoadedModule, MedusaContainer } from "@medusajs/types" +import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" import { isPresent, MedusaContextType } from "@medusajs/utils" import { EOL } from "os" import { ulid } from "ulid" @@ -59,7 +61,10 @@ function createContextualWorkflowRunner< isCancel = false, container: executionContainer, }, - ...args + transactionOrIdOrIdempotencyKey: DistributedTransaction | string, + input: unknown, + context: Context, + events: DistributedTransactionEvents | undefined = {} ) => { if (!executionContainer) { const container_ = flow.container as MedusaContainer @@ -74,6 +79,18 @@ function createContextualWorkflowRunner< flow.container = executionContainer } + const { eventGroupId } = context + const flowMetadata = { + eventGroupId, + } + + const args = [ + transactionOrIdOrIdempotencyKey, + input, + context, + events, + flowMetadata, + ] const transaction = await method.apply(method, args) let errors = transaction.getErrors(TransactionHandlerType.INVOKE) @@ -136,7 +153,7 @@ function createContextualWorkflowRunner< const context = { ...outerContext, - __type: MedusaContextType, + __type: MedusaContextType as Context["__type"], } context.transactionId ??= ulid() @@ -191,7 +208,7 @@ function createContextualWorkflowRunner< const context = { ...outerContext, transactionId, - __type: MedusaContextType, + __type: MedusaContextType as Context["__type"], } context.eventGroupId ??= ulid() @@ -229,7 +246,7 @@ function createContextualWorkflowRunner< const context = { ...outerContext, transactionId, - __type: MedusaContextType, + __type: MedusaContextType as Context["__type"], } context.eventGroupId ??= ulid() @@ -262,7 +279,7 @@ function createContextualWorkflowRunner< const context = { ...outerContext, transactionId, - __type: MedusaContextType, + __type: MedusaContextType as Context["__type"], } context.eventGroupId ??= ulid() @@ -275,7 +292,8 @@ function createContextualWorkflowRunner< isCancel: true, container, }, - transaction ?? transactionId, + transaction ?? transactionId!, + undefined, context, events ) diff --git a/packages/core/workflows-sdk/src/utils/composer/create-step.ts b/packages/core/workflows-sdk/src/utils/composer/create-step.ts index 47d0a223d1..c8213d0fc1 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-step.ts @@ -135,7 +135,9 @@ function applyStep< attempt: metadata.attempt, container: stepArguments.container, metadata, - eventGroupId: stepArguments.context!.eventGroupId, + eventGroupId: + stepArguments.transaction.getFlow()?.metadata?.eventGroupId ?? + stepArguments.context!.eventGroupId, transactionId: stepArguments.context!.transactionId, context: stepArguments.context!, } diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_event_group_id.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_event_group_id.ts new file mode 100644 index 0000000000..4c5d12f650 --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_event_group_id.ts @@ -0,0 +1,40 @@ +import { createStep, createWorkflow } from "@medusajs/workflows-sdk" +import { setTimeout } from "timers/promises" + +export const workflowEventGroupIdStep1Mock = jest.fn(async (input) => { + await setTimeout(200) +}) + +export const workflowEventGroupIdStep2Mock = jest.fn(async (input) => { + return +}) + +const step_1_background = createStep( + { + name: "step_1_event_group_id_background", + async: true, + }, + workflowEventGroupIdStep1Mock +) + +const step_2 = createStep( + { + name: "step_2_event_group_id", + async: true, + }, + workflowEventGroupIdStep2Mock +) + +export const eventGroupWorkflowId = "workflow_event_group_id" + +createWorkflow( + { + name: eventGroupWorkflowId, + }, + function (input) { + const resp = step_1_background(input) + step_2() + + return resp + } +) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index 7ec5d5c929..a5a1b983bb 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -1,14 +1,22 @@ import { MedusaApp } from "@medusajs/modules-sdk" -import { RemoteJoinerQuery } from "@medusajs/types" +import { WorkflowManager } from "@medusajs/orchestration" +import { + Context, + IWorkflowEngineService, + RemoteJoinerQuery, +} from "@medusajs/types" import { TransactionHandlerType } from "@medusajs/utils" -import { IWorkflowEngineService, MedusaWorkflow } from "@medusajs/workflows-sdk" import { knex } from "knex" import { setTimeout as setTimeoutPromise } from "timers/promises" import "../__fixtures__" import { workflow2Step2Invoke, workflow2Step3Invoke } from "../__fixtures__" +import { + eventGroupWorkflowId, + workflowEventGroupIdStep1Mock, + workflowEventGroupIdStep2Mock, +} from "../__fixtures__/workflow_event_group_id" import { createScheduled } from "../__fixtures__/workflow_scheduled" import { DB_URL, TestDatabase } from "../utils" -import { WorkflowManager } from "@medusajs/orchestration" const sharedPgConnection = knex({ client: "pg", @@ -59,7 +67,68 @@ describe("Workflow Orchestrator module", function () { workflowOrcModule = modules.workflows as unknown as IWorkflowEngineService }) - afterEach(afterEach_) + it("should execute an async workflow keeping track of the event group id provided in the context", async () => { + const eventGroupId = "event-group-id" + + await workflowOrcModule.run(eventGroupWorkflowId, { + input: {}, + context: { + eventGroupId, + transactionId: "transaction_id", + }, + throwOnError: true, + }) + + await workflowOrcModule.setStepSuccess({ + idempotencyKey: { + action: TransactionHandlerType.INVOKE, + stepId: "step_1_event_group_id_background", + workflowId: eventGroupWorkflowId, + transactionId: "transaction_id", + }, + stepResponse: { hey: "oh" }, + }) + + // Validate context event group id + expect(workflowEventGroupIdStep1Mock.mock.calls[0][1]).toEqual( + expect.objectContaining({ eventGroupId }) + ) + expect(workflowEventGroupIdStep2Mock.mock.calls[0][1]).toEqual( + expect.objectContaining({ eventGroupId }) + ) + }) + + it("should execute an async workflow keeping track of the event group id that has been auto generated", async () => { + await workflowOrcModule.run(eventGroupWorkflowId, { + input: {}, + context: { + transactionId: "transaction_id_2", + }, + throwOnError: true, + }) + + await workflowOrcModule.setStepSuccess({ + idempotencyKey: { + action: TransactionHandlerType.INVOKE, + stepId: "step_1_event_group_id_background", + workflowId: eventGroupWorkflowId, + transactionId: "transaction_id_2", + }, + stepResponse: { hey: "oh" }, + }) + + const generatedEventGroupId = (workflowEventGroupIdStep1Mock.mock + .calls[0][1] as unknown as Context)!.eventGroupId + + // Validate context event group id + expect(workflowEventGroupIdStep1Mock.mock.calls[0][1]).toEqual( + expect.objectContaining({ eventGroupId: generatedEventGroupId }) + ) + expect(workflowEventGroupIdStep2Mock.mock.calls[0][1]).toEqual( + expect.objectContaining({ eventGroupId: generatedEventGroupId }) + ) + }) + describe("Testing basic workflow", function () { it("should return a list of workflow executions and remove after completed when there is no retentionTime set", async () => { await workflowOrcModule.run("workflow_1", { @@ -174,7 +243,7 @@ describe("Workflow Orchestrator module", function () { expect(transaction.flow.state).toEqual("reverted") }) - it("should subsctibe to a async workflow and receive the response when it finishes", (done) => { + it("should subscribe to a async workflow and receive the response when it finishes", (done) => { const transactionId = "trx_123" const onFinish = jest.fn(() => {