From 1a7847660890ae84648123567ce8dc4c9a0eca03 Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Tue, 10 Jun 2025 09:23:12 +0200 Subject: [PATCH] fix(workflow-sdk): Async/nested runAsStep propagation (#12675) FIXES CLO-524 **What** Add hidden stepDefinition object as part of the step argument and ensure the runAsStep handlers rely on the latest definition when config is being used on the returned step in order to ensure async configuration propagation and nested configuration --- .changeset/twelve-bears-wait.md | 7 + .../utils/composer/__tests__/index.spec.ts | 133 +++++++++++++++++- .../src/utils/composer/create-step.ts | 6 +- .../src/utils/composer/create-workflow.ts | 8 +- .../composer/helpers/create-step-handler.ts | 6 +- .../workflows-sdk/src/utils/composer/type.ts | 5 + .../src/services/workflow-orchestrator.ts | 32 +++-- .../__fixtures__/workflow_async_compensate.ts | 1 - .../integration-tests/__tests__/index.spec.ts | 3 + .../src/services/workflow-orchestrator.ts | 32 +++-- 10 files changed, 210 insertions(+), 23 deletions(-) create mode 100644 .changeset/twelve-bears-wait.md diff --git a/.changeset/twelve-bears-wait.md b/.changeset/twelve-bears-wait.md new file mode 100644 index 0000000000..e89a292879 --- /dev/null +++ b/.changeset/twelve-bears-wait.md @@ -0,0 +1,7 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/workflows-sdk": patch +--- + +fix(workflow-sdk): Async propagation diff --git a/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts b/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts index 9bb8cb816c..0760b830f3 100644 --- a/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts +++ b/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts @@ -9,12 +9,143 @@ import { transform } from "../transform" import { WorkflowData } from "../type" import { when } from "../when" import { createHook } from "../create-hook" +import { TransactionStepsDefinition } from "@medusajs/orchestration" let count = 1 const getNewWorkflowId = () => `workflow-${count++}` describe("Workflow composer", () => { - describe("running sub workflows", () => { + describe("when running workflows as sub-workflows", () => { + describe("handling of async and nested workflow configurations", () => { + it("should set the runAsStep as nested and async when parent workflow is async", async () => { + const subworkflowStep1 = createStep("step1", async (_, context) => { + return new StepResponse({ result: "sub workflow step1" }) + }) + + const subWorkflowId = getNewWorkflowId() + const subWorkflow = createWorkflow( + subWorkflowId, + function (input: WorkflowData) { + subworkflowStep1() + return new WorkflowResponse(void 0) + } + ) + + const step1 = createStep( + { name: "step1", async: true }, + async (_, context) => { + return new StepResponse({ result: "step1" }) + } + ) + + const workflowId = getNewWorkflowId() + const workflow = createWorkflow(workflowId, function () { + step1() + + const subWorkflowRes = subWorkflow.runAsStep({ + input: "hi from outside", + }) + + return new WorkflowResponse(subWorkflowRes) + }) + + expect(workflow().getFlow().async).toBe(true) + expect(subWorkflow().getFlow().async).toBeUndefined() + + const runAsStep = workflow().getFlow() + .next! as TransactionStepsDefinition + + expect(runAsStep.action).toBe(`${subWorkflowId}-as-step`) + expect(runAsStep.async).toBe(true) + expect(runAsStep.nested).toBe(true) + }) + + it("should set the runAsStep as nested and async when parent workflow is sync but sub workflow is async", async () => { + const subworkflowStep1 = createStep( + { name: "step1", async: true }, + async (_, context) => { + return new StepResponse({ result: "sub workflow step1" }) + } + ) + + const subWorkflowId = getNewWorkflowId() + const subWorkflow = createWorkflow( + subWorkflowId, + function (input: WorkflowData) { + subworkflowStep1() + return new WorkflowResponse(void 0) + } + ) + + const step1 = createStep("step1", async (_, context) => { + return new StepResponse({ result: "step1" }) + }) + + const workflowId = getNewWorkflowId() + const workflow = createWorkflow(workflowId, function () { + step1() + + const subWorkflowRes = subWorkflow.runAsStep({ + input: "hi from outside", + }) + + return new WorkflowResponse(subWorkflowRes) + }) + + expect(workflow().getFlow().async).toBeUndefined() + expect(subWorkflow().getFlow().async).toBe(true) + + const runAsStep = workflow().getFlow() + .next! as TransactionStepsDefinition + + expect(runAsStep.action).toBe(`${subWorkflowId}-as-step`) + expect(runAsStep.async).toBe(true) + expect(runAsStep.nested).toBe(true) + }) + + it("should set the runAsStep as nested and async when parent workflow is sync as well as sub workflow but the step is configured as async", async () => { + const subworkflowStep1 = createStep("step1", async (_, context) => { + return new StepResponse({ result: "sub workflow step1" }) + }) + + const subWorkflowId = getNewWorkflowId() + const subWorkflow = createWorkflow( + subWorkflowId, + function (input: WorkflowData) { + subworkflowStep1() + return new WorkflowResponse({}) + } + ) + + const step1 = createStep("step1", async (_, context) => { + return new StepResponse({ result: "step1" }) + }) + + const workflowId = getNewWorkflowId() + const workflow = createWorkflow(workflowId, function () { + step1() + + const subWorkflowRes = subWorkflow + .runAsStep({ + input: "hi from outside", + }) + .config({ async: true }) + + return new WorkflowResponse(subWorkflowRes) + }) + + expect(workflow().getFlow().async).toBeUndefined() + expect(subWorkflow().getFlow().async).toBeUndefined() + + const runAsStep = workflow().getFlow() + .next! as TransactionStepsDefinition + + expect(runAsStep.action).toBe(`${subWorkflowId}-as-step`) + expect(runAsStep.async).toBe(true) + expect(runAsStep.nested).toBe(true) + }) + }) + it("should succeed", async function () { const step1 = createStep("step1", async (_, context) => { return new StepResponse({ result: "step1" }) diff --git a/packages/core/workflows-sdk/src/utils/composer/create-step.ts b/packages/core/workflows-sdk/src/utils/composer/create-step.ts index be00f6fc40..1b8020ff87 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-step.ts @@ -4,7 +4,7 @@ import { WorkflowStepHandler, WorkflowStepHandlerArguments, } from "@medusajs/orchestration" -import { isString, OrchestrationUtils } from "@medusajs/utils" +import { isDefined, isString, OrchestrationUtils } from "@medusajs/utils" import { ulid } from "ulid" import { resolveValue, StepResponse } from "./helpers" import { createStepHandler } from "./helpers/create-step-handler" @@ -173,6 +173,10 @@ export function applyStep< ...localConfig, } + if (isDefined(newConfig.nested)) { + newConfig.nested ||= newConfig.async + } + delete localConfig.name const handler = createStepHandler.bind(this)({ diff --git a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts index c4138a4893..8a512f5d1a 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts @@ -198,6 +198,7 @@ export function createWorkflow( }, async (stepInput: TData, stepContext) => { const { container, ...sharedContext } = stepContext + const isAsync = stepContext[" stepDefinition"]?.async const workflowEngine = container.resolve(Modules.WORKFLOW_ENGINE, { allowUnregistered: true, @@ -212,7 +213,7 @@ export function createWorkflow( } let transaction - if (workflowEngine && runAsAsync) { + if (workflowEngine && isAsync) { transaction = await workflowEngine.run(name, { input: stepInput as any, context: executionContext, @@ -227,7 +228,7 @@ export function createWorkflow( return new StepResponse( transaction.result, - runAsAsync ? stepContext.transactionId : transaction + isAsync ? stepContext.transactionId : transaction ) }, async (transaction, stepContext) => { @@ -237,6 +238,7 @@ export function createWorkflow( } const { container, ...sharedContext } = stepContext + const isAsync = stepContext[" stepDefinition"]?.async const workflowEngine = container.resolve(Modules.WORKFLOW_ENGINE, { allowUnregistered: true, @@ -252,7 +254,7 @@ export function createWorkflow( const transactionId = step.__step__ + "-" + stepContext.transactionId - if (workflowEngine && runAsAsync) { + if (workflowEngine && isAsync) { await workflowEngine.cancel(name, { transactionId: transactionId, context: executionContext, diff --git a/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts b/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts index 165bf11f33..f8b74dedd4 100644 --- a/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts +++ b/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts @@ -21,7 +21,10 @@ function buildStepContext({ stepArguments.context!.idempotencyKey = idempotencyKey - const flowMetadata = stepArguments.transaction.getFlow()?.metadata + const flow = stepArguments.transaction.getFlow() + const flowMetadata = flow?.metadata + const stepDefinition = stepArguments.step.definition + const executionContext: StepExecutionContext = { workflowId: metadata.model_id, stepName: metadata.action, @@ -36,6 +39,7 @@ function buildStepContext({ preventReleaseEvents: flowMetadata?.preventReleaseEvents ?? false, transactionId: stepArguments.context!.transactionId, context: stepArguments.context!, + " stepDefinition": stepDefinition, " getStepResult"( stepId: string, action: "invoke" | "compensate" = "invoke" diff --git a/packages/core/workflows-sdk/src/utils/composer/type.ts b/packages/core/workflows-sdk/src/utils/composer/type.ts index 38ffbd97f3..c4a51a0d4c 100644 --- a/packages/core/workflows-sdk/src/utils/composer/type.ts +++ b/packages/core/workflows-sdk/src/utils/composer/type.ts @@ -198,6 +198,11 @@ export interface StepExecutionContext { * Adding a space hides the method from the autocomplete */ " getStepResult"(stepId: string, action?: "invoke" | "compensate"): any + + /** + * Get access to the definition of the step. + */ + " stepDefinition": TransactionStepsDefinition } export type WorkflowTransactionContext = StepExecutionContext & diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 1f16c71332..cd179cdb39 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -208,8 +208,12 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } - if (throwOnError && ret.thrownError) { - throw ret.thrownError + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error } return { acknowledgement, ...ret } @@ -317,8 +321,12 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } - if (throwOnError && ret.thrownError) { - throw ret.thrownError + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error } return { acknowledgement, ...ret } @@ -411,8 +419,12 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } - if (throwOnError && ret.thrownError) { - throw ret.thrownError + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error } return ret @@ -477,8 +489,12 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } - if (throwOnError && ret.thrownError) { - throw ret.thrownError + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error } return ret diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_async_compensate.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_async_compensate.ts index 06850027af..676034e039 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_async_compensate.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_async_compensate.ts @@ -1,7 +1,6 @@ import { createStep, createWorkflow, - parallelize, StepResponse, WorkflowResponse, } from "@medusajs/framework/workflows-sdk" diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 0a586220e5..b656404b2b 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -495,6 +495,9 @@ moduleIntegrationTestRunner({ transactionId: "transaction_1", }, stepResponse: { uhuuuu: "yeaah!" }, + options: { + throwOnError: false, + }, }) ;({ data: executionsList } = await query.graph({ entity: "workflow_executions", diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index db4a6b9280..f4b9c5b588 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -268,8 +268,12 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } - if (throwOnError && ret.thrownError) { - throw ret.thrownError + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error } return { acknowledgement, ...ret } @@ -373,8 +377,12 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } - if (throwOnError && ret.thrownError) { - throw ret.thrownError + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error } return { acknowledgement, ...ret } @@ -467,8 +475,12 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } - if (throwOnError && ret.thrownError) { - throw ret.thrownError + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error } return ret @@ -534,8 +546,12 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } - if (throwOnError && ret.thrownError) { - throw ret.thrownError + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error } return ret