diff --git a/.changeset/twelve-bears-wait.md b/.changeset/twelve-bears-wait.md new file mode 100644 index 0000000000..e89a292879 --- /dev/null +++ b/.changeset/twelve-bears-wait.md @@ -0,0 +1,7 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/workflows-sdk": patch +--- + +fix(workflow-sdk): Async propagation diff --git a/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts b/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts index 9bb8cb816c..0760b830f3 100644 --- a/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts +++ b/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts @@ -9,12 +9,143 @@ import { transform } from "../transform" import { WorkflowData } from "../type" import { when } from "../when" import { createHook } from "../create-hook" +import { TransactionStepsDefinition } from "@medusajs/orchestration" let count = 1 const getNewWorkflowId = () => `workflow-${count++}` describe("Workflow composer", () => { - describe("running sub workflows", () => { + describe("when running workflows as sub-workflows", () => { + describe("handling of async and nested workflow configurations", () => { + it("should set the runAsStep as nested and async when parent workflow is async", async () => { + const subworkflowStep1 = createStep("step1", async (_, context) => { + return new StepResponse({ result: "sub workflow step1" }) + }) + + const subWorkflowId = getNewWorkflowId() + const subWorkflow = createWorkflow( + subWorkflowId, + function (input: WorkflowData) { + subworkflowStep1() + return new WorkflowResponse(void 0) + } + ) + + const step1 = createStep( + { name: "step1", async: true }, + async (_, context) => { + return new StepResponse({ result: "step1" }) + } + ) + + const workflowId = getNewWorkflowId() + const workflow = createWorkflow(workflowId, function () { + step1() + + const subWorkflowRes = subWorkflow.runAsStep({ + input: "hi from outside", + }) + + return new WorkflowResponse(subWorkflowRes) + }) + + expect(workflow().getFlow().async).toBe(true) + expect(subWorkflow().getFlow().async).toBeUndefined() + + const runAsStep = workflow().getFlow() + .next! as TransactionStepsDefinition + + expect(runAsStep.action).toBe(`${subWorkflowId}-as-step`) + expect(runAsStep.async).toBe(true) + expect(runAsStep.nested).toBe(true) + }) + + it("should set the runAsStep as nested and async when parent workflow is sync but sub workflow is async", async () => { + const subworkflowStep1 = createStep( + { name: "step1", async: true }, + async (_, context) => { + return new StepResponse({ result: "sub workflow step1" }) + } + ) + + const subWorkflowId = getNewWorkflowId() + const subWorkflow = createWorkflow( + subWorkflowId, + function (input: WorkflowData) { + subworkflowStep1() + return new WorkflowResponse(void 0) + } + ) + + const step1 = createStep("step1", async (_, context) => { + return new StepResponse({ result: "step1" }) + }) + + const workflowId = getNewWorkflowId() + const workflow = createWorkflow(workflowId, function () { + step1() + + const subWorkflowRes = subWorkflow.runAsStep({ + input: "hi from outside", + }) + + return new WorkflowResponse(subWorkflowRes) + }) + + expect(workflow().getFlow().async).toBeUndefined() + expect(subWorkflow().getFlow().async).toBe(true) + + const runAsStep = workflow().getFlow() + .next! as TransactionStepsDefinition + + expect(runAsStep.action).toBe(`${subWorkflowId}-as-step`) + expect(runAsStep.async).toBe(true) + expect(runAsStep.nested).toBe(true) + }) + + it("should set the runAsStep as nested and async when parent workflow is sync as well as sub workflow but the step is configured as async", async () => { + const subworkflowStep1 = createStep("step1", async (_, context) => { + return new StepResponse({ result: "sub workflow step1" }) + }) + + const subWorkflowId = getNewWorkflowId() + const subWorkflow = createWorkflow( + subWorkflowId, + function (input: WorkflowData) { + subworkflowStep1() + return new WorkflowResponse({}) + } + ) + + const step1 = createStep("step1", async (_, context) => { + return new StepResponse({ result: "step1" }) + }) + + const workflowId = getNewWorkflowId() + const workflow = createWorkflow(workflowId, function () { + step1() + + const subWorkflowRes = subWorkflow + .runAsStep({ + input: "hi from outside", + }) + .config({ async: true }) + + return new WorkflowResponse(subWorkflowRes) + }) + + expect(workflow().getFlow().async).toBeUndefined() + expect(subWorkflow().getFlow().async).toBeUndefined() + + const runAsStep = workflow().getFlow() + .next! as TransactionStepsDefinition + + expect(runAsStep.action).toBe(`${subWorkflowId}-as-step`) + expect(runAsStep.async).toBe(true) + expect(runAsStep.nested).toBe(true) + }) + }) + it("should succeed", async function () { const step1 = createStep("step1", async (_, context) => { return new StepResponse({ result: "step1" }) diff --git a/packages/core/workflows-sdk/src/utils/composer/create-step.ts b/packages/core/workflows-sdk/src/utils/composer/create-step.ts index be00f6fc40..1b8020ff87 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-step.ts @@ -4,7 +4,7 @@ import { WorkflowStepHandler, WorkflowStepHandlerArguments, } from "@medusajs/orchestration" -import { isString, OrchestrationUtils } from "@medusajs/utils" +import { isDefined, isString, OrchestrationUtils } from "@medusajs/utils" import { ulid } from "ulid" import { resolveValue, StepResponse } from "./helpers" import { createStepHandler } from "./helpers/create-step-handler" @@ -173,6 +173,10 @@ export function applyStep< ...localConfig, } + if (isDefined(newConfig.nested)) { + newConfig.nested ||= newConfig.async + } + delete localConfig.name const handler = createStepHandler.bind(this)({ diff --git a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts index c4138a4893..8a512f5d1a 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts @@ -198,6 +198,7 @@ export function createWorkflow( }, async (stepInput: TData, stepContext) => { const { container, ...sharedContext } = stepContext + const isAsync = stepContext[" stepDefinition"]?.async const workflowEngine = container.resolve(Modules.WORKFLOW_ENGINE, { allowUnregistered: true, @@ -212,7 +213,7 @@ export function createWorkflow( } let transaction - if (workflowEngine && runAsAsync) { + if (workflowEngine && isAsync) { transaction = await workflowEngine.run(name, { input: stepInput as any, context: executionContext, @@ -227,7 +228,7 @@ export function createWorkflow( return new StepResponse( transaction.result, - runAsAsync ? stepContext.transactionId : transaction + isAsync ? stepContext.transactionId : transaction ) }, async (transaction, stepContext) => { @@ -237,6 +238,7 @@ export function createWorkflow( } const { container, ...sharedContext } = stepContext + const isAsync = stepContext[" stepDefinition"]?.async const workflowEngine = container.resolve(Modules.WORKFLOW_ENGINE, { allowUnregistered: true, @@ -252,7 +254,7 @@ export function createWorkflow( const transactionId = step.__step__ + "-" + stepContext.transactionId - if (workflowEngine && runAsAsync) { + if (workflowEngine && isAsync) { await workflowEngine.cancel(name, { transactionId: transactionId, context: executionContext, diff --git a/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts b/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts index 165bf11f33..f8b74dedd4 100644 --- a/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts +++ b/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts @@ -21,7 +21,10 @@ function buildStepContext({ stepArguments.context!.idempotencyKey = idempotencyKey - const flowMetadata = stepArguments.transaction.getFlow()?.metadata + const flow = stepArguments.transaction.getFlow() + const flowMetadata = flow?.metadata + const stepDefinition = stepArguments.step.definition + const executionContext: StepExecutionContext = { workflowId: metadata.model_id, stepName: metadata.action, @@ -36,6 +39,7 @@ function buildStepContext({ preventReleaseEvents: flowMetadata?.preventReleaseEvents ?? false, transactionId: stepArguments.context!.transactionId, context: stepArguments.context!, + " stepDefinition": stepDefinition, " getStepResult"( stepId: string, action: "invoke" | "compensate" = "invoke" diff --git a/packages/core/workflows-sdk/src/utils/composer/type.ts b/packages/core/workflows-sdk/src/utils/composer/type.ts index 38ffbd97f3..c4a51a0d4c 100644 --- a/packages/core/workflows-sdk/src/utils/composer/type.ts +++ b/packages/core/workflows-sdk/src/utils/composer/type.ts @@ -198,6 +198,11 @@ export interface StepExecutionContext { * Adding a space hides the method from the autocomplete */ " getStepResult"(stepId: string, action?: "invoke" | "compensate"): any + + /** + * Get access to the definition of the step. + */ + " stepDefinition": TransactionStepsDefinition } export type WorkflowTransactionContext = StepExecutionContext & diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 1f16c71332..cd179cdb39 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -208,8 +208,12 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } - if (throwOnError && ret.thrownError) { - throw ret.thrownError + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error } return { acknowledgement, ...ret } @@ -317,8 +321,12 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } - if (throwOnError && ret.thrownError) { - throw ret.thrownError + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error } return { acknowledgement, ...ret } @@ -411,8 +419,12 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } - if (throwOnError && ret.thrownError) { - throw ret.thrownError + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error } return ret @@ -477,8 +489,12 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } - if (throwOnError && ret.thrownError) { - throw ret.thrownError + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error } return ret diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_async_compensate.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_async_compensate.ts index 06850027af..676034e039 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_async_compensate.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_async_compensate.ts @@ -1,7 +1,6 @@ import { createStep, createWorkflow, - parallelize, StepResponse, WorkflowResponse, } from "@medusajs/framework/workflows-sdk" diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 0a586220e5..b656404b2b 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -495,6 +495,9 @@ moduleIntegrationTestRunner({ transactionId: "transaction_1", }, stepResponse: { uhuuuu: "yeaah!" }, + options: { + throwOnError: false, + }, }) ;({ data: executionsList } = await query.graph({ entity: "workflow_executions", diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index db4a6b9280..f4b9c5b588 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -268,8 +268,12 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } - if (throwOnError && ret.thrownError) { - throw ret.thrownError + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error } return { acknowledgement, ...ret } @@ -373,8 +377,12 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } - if (throwOnError && ret.thrownError) { - throw ret.thrownError + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error } return { acknowledgement, ...ret } @@ -467,8 +475,12 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } - if (throwOnError && ret.thrownError) { - throw ret.thrownError + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error } return ret @@ -534,8 +546,12 @@ export class WorkflowOrchestratorService { await this.triggerParentStep(ret.transaction, result) } - if (throwOnError && ret.thrownError) { - throw ret.thrownError + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error } return ret