From e180253d608103cd8dfba8fddd3af2ba6ff2455a Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Date: Thu, 17 Apr 2025 09:49:58 -0300 Subject: [PATCH] feat(orchestration): skip on permanent failure (#12027) What: - Added step config `skipOnPermanentFailure`. Skip all the next steps when the current step fails. If a string is used, the workflow will resume from the given step. - Fix `continueOnPermanentFailure` to continue the execution of the flow when a step fails. ```ts createWorkflow("some-workflow", () => { errorStep().config({ skipOnPermanentFailure: true, }) nextStep1() // skipped nextStep2() // skipped }) createWorkflow("some-workflow", () => { errorStep().config({ skipOnPermanentFailure: "resume-from-here", }); nextStep1(); // skipped nextStep2(); // skipped nextStep3().config({ name: "resume-from-here" }); // executed nextStep4(); // executed }); ``` --- .changeset/cyan-ladybugs-heal.md | 7 + .../transaction/transaction-orchestrator.ts | 4 +- .../transaction/transaction-orchestrator.ts | 32 +++- .../orchestration/src/transaction/types.ts | 9 +- .../http/workflow-execution/admin/entities.ts | 15 +- .../src/utils/composer/__tests__/compose.ts | 159 +++++++++++++++++- .../AdminWorkflowExecutionExecution.ts | 11 +- 7 files changed, 219 insertions(+), 18 deletions(-) create mode 100644 .changeset/cyan-ladybugs-heal.md diff --git a/.changeset/cyan-ladybugs-heal.md b/.changeset/cyan-ladybugs-heal.md new file mode 100644 index 0000000000..92198b9887 --- /dev/null +++ b/.changeset/cyan-ladybugs-heal.md @@ -0,0 +1,7 @@ +--- +"@medusajs/orchestration": patch +"@medusajs/workflows-sdk": patch +"@medusajs/types": patch +--- + +feat(workflows-sdk,orchestrator): step skip on permanent failure diff --git a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts index 9d2620087f..d1ad9a4e71 100644 --- a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts @@ -1246,7 +1246,7 @@ describe("Transaction Orchestrator", () => { expect(transaction.getState()).toBe(TransactionState.REVERTED) }) - it("should continue the transaction and skip children steps when the Transaction Step Timeout is reached but the step is set to 'continueOnPermanentFailure'", async () => { + it("should continue the transaction and skip children steps when the Transaction Step Timeout is reached but the step is set to 'skipOnPermanentFailure'", async () => { const mocks = { f1: jest.fn(() => { return "content f1" @@ -1313,7 +1313,7 @@ describe("Transaction Orchestrator", () => { { timeout: 0.1, // 100ms action: "action2", - continueOnPermanentFailure: true, + skipOnPermanentFailure: true, next: { action: "action4", }, diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 82c663a429..9c0df8b0a9 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -21,6 +21,7 @@ import { isDefined, isErrorLike, isObject, + isString, MedusaError, promiseAll, serializeError, @@ -437,7 +438,10 @@ export class TransactionOrchestrator extends EventEmitter { } else if (curState.state === TransactionStepState.REVERTED) { hasReverted = true } else if (curState.state === TransactionStepState.FAILED) { - if (stepDef.definition.continueOnPermanentFailure) { + if ( + stepDef.definition.continueOnPermanentFailure || + stepDef.definition.skipOnPermanentFailure + ) { hasIgnoredFailure = true } else { hasFailed = true @@ -696,12 +700,28 @@ export class TransactionOrchestrator extends EventEmitter { if (!step.isCompensating()) { if ( - step.definition.continueOnPermanentFailure && + (step.definition.continueOnPermanentFailure || + step.definition.skipOnPermanentFailure) && !TransactionTimeoutError.isTransactionTimeoutError(timeoutError!) ) { - for (const childStep of step.next) { - const child = flow.steps[childStep] - child.changeState(TransactionStepState.SKIPPED_FAILURE) + if (step.definition.skipOnPermanentFailure) { + const until = isString(step.definition.skipOnPermanentFailure) + ? step.definition.skipOnPermanentFailure + : undefined + + let stepsToSkip: string[] = [...step.next] + while (stepsToSkip.length > 0) { + const currentStep = flow.steps[stepsToSkip.shift()!] + + if (until && currentStep.definition.action === until) { + break + } + currentStep.changeState(TransactionStepState.SKIPPED_FAILURE) + + if (currentStep.next?.length > 0) { + stepsToSkip = stepsToSkip.concat(currentStep.next) + } + } } } else { flow.state = TransactionState.WAITING_TO_COMPENSATE @@ -1351,7 +1371,7 @@ export class TransactionOrchestrator extends EventEmitter { states[parent].next?.push(id) } - const definitionCopy = { ...obj } + const definitionCopy = { ...obj } as TransactionStepsDefinition delete definitionCopy.next if (definitionCopy.async) { diff --git a/packages/core/orchestration/src/transaction/types.ts b/packages/core/orchestration/src/transaction/types.ts index d2153200ca..785df77842 100644 --- a/packages/core/orchestration/src/transaction/types.ts +++ b/packages/core/orchestration/src/transaction/types.ts @@ -25,10 +25,17 @@ export type TransactionStepsDefinition = { /** * Indicates whether the workflow should continue even if there is a permanent failure in this step. - * In case it is set to true, the children steps of this step will not be executed and their status will be marked as TransactionStepState.SKIPPED_FAILURE. + * In case it is set to true, the the current step will be marked as TransactionStepState.PERMANENT_FAILURE and the next steps will be executed. */ continueOnPermanentFailure?: boolean + /** + * Indicates whether the workflow should skip all subsequent steps in case of a permanent failure in this step. + * In case it is set to true, the next steps of the workflow will not be executed and their status will be marked as TransactionStepState.SKIPPED_FAILURE. + * In case it is a string, the next steps until the step name provided will be skipped and the workflow will be resumed from the step provided. + */ + skipOnPermanentFailure?: boolean | string + /** * If true, no compensation action will be triggered for this step in case of a failure. */ diff --git a/packages/core/types/src/http/workflow-execution/admin/entities.ts b/packages/core/types/src/http/workflow-execution/admin/entities.ts index 0d1fa373f5..df84614402 100644 --- a/packages/core/types/src/http/workflow-execution/admin/entities.ts +++ b/packages/core/types/src/http/workflow-execution/admin/entities.ts @@ -40,7 +40,7 @@ export type StepInvokeResult = { */ output: { /** - * The output of the step. This is the first parameter + * The output of the step. This is the first parameter * passed to the returned `StepResponse` function. */ output: unknown @@ -75,7 +75,7 @@ export interface WorkflowExecutionContext { /** * The details of the invocation of the workflow execution's steps. * The key is the step's ID, and the value is the step's details. - * + * * These details are only included for steps that have their `saveResponse` property set to `true`. */ invoke: Record @@ -87,7 +87,7 @@ export interface WorkflowExecutionContext { /** * The output of the compensation function of the workflow execution. * The key is the step's ID, and the value is the compensation function's output. - * + * * These details are only included for steps that have their `saveResponse` property set to `true`. */ compensate: Record @@ -114,9 +114,16 @@ export interface WorkflowExecutionDefinition { noCompensation?: boolean /** * Indicates whether the workflow should continue even if there is a permanent failure in this step. - * In case it is set to true, the children steps of this step will not be executed and their status will be marked as `TransactionStepState`.SKIPPED_FAILURE. + * In case it is set to true, the the current step will be marked as TransactionStepState.PERMANENT_FAILURE and the next steps will be executed. */ continueOnPermanentFailure?: boolean + /** + * Indicates whether the workflow should skip all subsequent steps in case of a permanent failure in this step. + * In case it is set to true, the next steps of the workflow will not be executed and their status will be marked as TransactionStepState.SKIPPED_FAILURE. + * In case it is a string, the next steps until the step name provided will be skipped and the workflow will be resumed from the step provided. + */ + skipOnPermanentFailure?: boolean | string + /** * The maximum number of times this step should be retried in case of temporary failures. * The default is 0 (no retries). diff --git a/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts b/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts index 09f20b6dc2..276071f2d4 100644 --- a/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts +++ b/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts @@ -12,6 +12,7 @@ import { promiseAll, } from "@medusajs/utils" import { asValue } from "awilix" +import { setTimeout } from "timers/promises" import { createStep, createWorkflow, @@ -24,7 +25,6 @@ import { } from ".." import { MedusaWorkflow } from "../../../medusa-workflow" import { createHook } from "../create-hook" -import { setTimeout } from "timers/promises" jest.setTimeout(30000) @@ -1297,6 +1297,69 @@ describe("Workflow composer", function () { }) }) + it("should skip all steps in case of permanent failure", async () => { + const logStepFn = jest.fn(async ({ input }: { input: object }) => { + return new StepResponse("done") + }) + + const errorStep = createStep("perma-fail-step", async () => { + return StepResponse.permanentFailure("FAIL") + }) + + const logStep = createStep("log-step", logStepFn) + + const fakeStepWorkflow = createWorkflow("fake-workflow", () => { + const result = errorStep().config({ + skipOnPermanentFailure: true, + }) + logStep({ input: { A: "123" } }) + logStep({ input: { A: "123 a" } }).config({ name: "other" }) + logStep({ input: { A: "123 b" } }).config({ name: "other_2" }) + logStep({ input: { A: "123 c" } }).config({ name: "other_3" }) + return new WorkflowResponse(result) + }) + + const { transaction } = await fakeStepWorkflow().run({ + input: 1, + }) + + expect(transaction.getState()).toEqual("done") + expect(logStepFn).toHaveBeenCalledTimes(0) + }) + + it("should skip steps until the named step in case of permanent failure", async () => { + const logStepFn = jest.fn(async ({ input }: { input: object }) => { + return new StepResponse("done and returned") + }) + + const errorStep = createStep("perma-fail-step", async () => { + return StepResponse.permanentFailure("FAIL") + }) + + const logStep = createStep("log-step", logStepFn) + + const fakeStepWorkflow = createWorkflow("fake-workflow", () => { + errorStep().config({ + skipOnPermanentFailure: "other_2", + }) + logStep({ input: { A: "123" } }) + logStep({ input: { A: "123 a" } }).config({ name: "other" }) + logStep({ input: { A: "123 b" } }).config({ name: "other_2" }) + const ret = logStep({ input: { A: "123 c" } }).config({ + name: "other_3", + }) + return new WorkflowResponse(ret) + }) + + const { result, transaction } = await fakeStepWorkflow().run({ + input: 1, + }) + + expect(transaction.getState()).toEqual("done") + expect(result).toEqual("done and returned") + expect(logStepFn).toHaveBeenCalledTimes(2) + }) + it("should compose a new workflow and skip steps depending on the input", async () => { const mockStep1Fn = jest.fn().mockImplementation((input) => { if (input === 1) { @@ -2701,4 +2764,98 @@ describe("Workflow composer", function () { expect(workflowResult).toEqual("return from 1") }) + + it("should compose a new workflow that skips steps on permanent failure [1]", async () => { + const mockStep1Fn = jest.fn().mockImplementation(async () => { + throw new Error("failed") + }) as any + const mockStep2Fn = jest.fn().mockImplementation(() => { + return new StepResponse(true) + }) as any + const mockStep3Fn = jest.fn().mockImplementation(() => { + return new StepResponse(true) + }) as any + const mockStep4Fn = jest.fn().mockImplementation(() => { + return new StepResponse(true) + }) as any + + const step1 = createStep( + { + name: "step1", + skipOnPermanentFailure: "step3", + }, + mockStep1Fn + ) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + const step4 = createStep("step4", mockStep4Fn) + + const workflow = createWorkflow("workflow1", function (input) { + const ret1 = step1() + const [ret2, ret3] = parallelize(step2(), step3()) + const ret4 = step4() + return new WorkflowResponse({ ret1, ret2, ret3, ret4 }) + }) + + const { result: workflowResult } = await workflow().run() + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockStep2Fn).toHaveBeenCalledTimes(0) + expect(mockStep3Fn).toHaveBeenCalledTimes(1) + expect(mockStep4Fn).toHaveBeenCalledTimes(1) + + expect(workflowResult).toEqual({ + ret1: undefined, + ret2: undefined, + ret3: true, + ret4: true, + }) + }) + + it("should compose a new workflow that skips steps on permanent failure [2]", async () => { + const mockStep1Fn = jest.fn().mockImplementation(async () => { + throw new Error("failed") + }) as any + const mockStep2Fn = jest.fn().mockImplementation(() => { + return new StepResponse(true) + }) as any + const mockStep3Fn = jest.fn().mockImplementation(() => { + return new StepResponse(true) + }) as any + const mockStep4Fn = jest.fn().mockImplementation(() => { + return new StepResponse(true) + }) as any + + const step1 = createStep( + { + name: "step1", + skipOnPermanentFailure: "step4", + }, + mockStep1Fn + ) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + const step4 = createStep("step4", mockStep4Fn) + + const workflow = createWorkflow("workflow1", function (input) { + const ret1 = step1() + const [ret2, ret3] = parallelize(step2(), step3()) + const ret4 = step4() + return new WorkflowResponse({ ret1, ret2, ret3, ret4 }) + }) + + const { result: workflowResult } = await workflow().run() + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockStep2Fn).toHaveBeenCalledTimes(0) + expect(mockStep3Fn).toHaveBeenCalledTimes(0) + expect(mockStep4Fn).toHaveBeenCalledTimes(1) + + expect(workflowResult).toEqual({ + ret1: undefined, + ret2: undefined, + ret3: undefined, + ret4: true, + }) + }) }) diff --git a/www/utils/generated/oas-output/schemas/AdminWorkflowExecutionExecution.ts b/www/utils/generated/oas-output/schemas/AdminWorkflowExecutionExecution.ts index 65235611e7..f41a108ae8 100644 --- a/www/utils/generated/oas-output/schemas/AdminWorkflowExecutionExecution.ts +++ b/www/utils/generated/oas-output/schemas/AdminWorkflowExecutionExecution.ts @@ -74,7 +74,11 @@ * continueOnPermanentFailure: * type: boolean * title: continueOnPermanentFailure - * description: Whether the step continues executing even if its status is changed to failed. + * description: Whether the workflow should continue executing even if its status is changed to failed. + * skipOnPermanentFailure: + * type: boolean + * title: skipOnPermanentFailure + * description: Whether the workflow should skip subsequent steps in case of a permanent failure. * maxRetries: * type: number * title: maxRetries @@ -139,6 +143,5 @@ * type: number * title: startedAt * description: The timestamp the step started executing. - * -*/ - + * + */