feat(orchestration): skip on permanent failure (#12027)
What:
- Added step config `skipOnPermanentFailure`. Skip all the next steps when the current step fails. If a string is used, the workflow will resume from the given step.
- Fix `continueOnPermanentFailure` to continue the execution of the flow when a step fails.
```ts
createWorkflow("some-workflow", () => {
errorStep().config({
skipOnPermanentFailure: true,
})
nextStep1() // skipped
nextStep2() // skipped
})
createWorkflow("some-workflow", () => {
errorStep().config({
skipOnPermanentFailure: "resume-from-here",
});
nextStep1(); // skipped
nextStep2(); // skipped
nextStep3().config({ name: "resume-from-here" }); // executed
nextStep4(); // executed
});
```
This commit is contained in:
committed by
GitHub
parent
1c5e82af51
commit
e180253d60
7
.changeset/cyan-ladybugs-heal.md
Normal file
7
.changeset/cyan-ladybugs-heal.md
Normal file
@@ -0,0 +1,7 @@
|
||||
---
|
||||
"@medusajs/orchestration": patch
|
||||
"@medusajs/workflows-sdk": patch
|
||||
"@medusajs/types": patch
|
||||
---
|
||||
|
||||
feat(workflows-sdk,orchestrator): step skip on permanent failure
|
||||
@@ -1246,7 +1246,7 @@ describe("Transaction Orchestrator", () => {
|
||||
expect(transaction.getState()).toBe(TransactionState.REVERTED)
|
||||
})
|
||||
|
||||
it("should continue the transaction and skip children steps when the Transaction Step Timeout is reached but the step is set to 'continueOnPermanentFailure'", async () => {
|
||||
it("should continue the transaction and skip children steps when the Transaction Step Timeout is reached but the step is set to 'skipOnPermanentFailure'", async () => {
|
||||
const mocks = {
|
||||
f1: jest.fn(() => {
|
||||
return "content f1"
|
||||
@@ -1313,7 +1313,7 @@ describe("Transaction Orchestrator", () => {
|
||||
{
|
||||
timeout: 0.1, // 100ms
|
||||
action: "action2",
|
||||
continueOnPermanentFailure: true,
|
||||
skipOnPermanentFailure: true,
|
||||
next: {
|
||||
action: "action4",
|
||||
},
|
||||
|
||||
@@ -21,6 +21,7 @@ import {
|
||||
isDefined,
|
||||
isErrorLike,
|
||||
isObject,
|
||||
isString,
|
||||
MedusaError,
|
||||
promiseAll,
|
||||
serializeError,
|
||||
@@ -437,7 +438,10 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
} else if (curState.state === TransactionStepState.REVERTED) {
|
||||
hasReverted = true
|
||||
} else if (curState.state === TransactionStepState.FAILED) {
|
||||
if (stepDef.definition.continueOnPermanentFailure) {
|
||||
if (
|
||||
stepDef.definition.continueOnPermanentFailure ||
|
||||
stepDef.definition.skipOnPermanentFailure
|
||||
) {
|
||||
hasIgnoredFailure = true
|
||||
} else {
|
||||
hasFailed = true
|
||||
@@ -696,12 +700,28 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
|
||||
if (!step.isCompensating()) {
|
||||
if (
|
||||
step.definition.continueOnPermanentFailure &&
|
||||
(step.definition.continueOnPermanentFailure ||
|
||||
step.definition.skipOnPermanentFailure) &&
|
||||
!TransactionTimeoutError.isTransactionTimeoutError(timeoutError!)
|
||||
) {
|
||||
for (const childStep of step.next) {
|
||||
const child = flow.steps[childStep]
|
||||
child.changeState(TransactionStepState.SKIPPED_FAILURE)
|
||||
if (step.definition.skipOnPermanentFailure) {
|
||||
const until = isString(step.definition.skipOnPermanentFailure)
|
||||
? step.definition.skipOnPermanentFailure
|
||||
: undefined
|
||||
|
||||
let stepsToSkip: string[] = [...step.next]
|
||||
while (stepsToSkip.length > 0) {
|
||||
const currentStep = flow.steps[stepsToSkip.shift()!]
|
||||
|
||||
if (until && currentStep.definition.action === until) {
|
||||
break
|
||||
}
|
||||
currentStep.changeState(TransactionStepState.SKIPPED_FAILURE)
|
||||
|
||||
if (currentStep.next?.length > 0) {
|
||||
stepsToSkip = stepsToSkip.concat(currentStep.next)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
flow.state = TransactionState.WAITING_TO_COMPENSATE
|
||||
@@ -1351,7 +1371,7 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
states[parent].next?.push(id)
|
||||
}
|
||||
|
||||
const definitionCopy = { ...obj }
|
||||
const definitionCopy = { ...obj } as TransactionStepsDefinition
|
||||
delete definitionCopy.next
|
||||
|
||||
if (definitionCopy.async) {
|
||||
|
||||
@@ -25,10 +25,17 @@ export type TransactionStepsDefinition = {
|
||||
|
||||
/**
|
||||
* Indicates whether the workflow should continue even if there is a permanent failure in this step.
|
||||
* In case it is set to true, the children steps of this step will not be executed and their status will be marked as TransactionStepState.SKIPPED_FAILURE.
|
||||
* In case it is set to true, the the current step will be marked as TransactionStepState.PERMANENT_FAILURE and the next steps will be executed.
|
||||
*/
|
||||
continueOnPermanentFailure?: boolean
|
||||
|
||||
/**
|
||||
* Indicates whether the workflow should skip all subsequent steps in case of a permanent failure in this step.
|
||||
* In case it is set to true, the next steps of the workflow will not be executed and their status will be marked as TransactionStepState.SKIPPED_FAILURE.
|
||||
* In case it is a string, the next steps until the step name provided will be skipped and the workflow will be resumed from the step provided.
|
||||
*/
|
||||
skipOnPermanentFailure?: boolean | string
|
||||
|
||||
/**
|
||||
* If true, no compensation action will be triggered for this step in case of a failure.
|
||||
*/
|
||||
|
||||
@@ -40,7 +40,7 @@ export type StepInvokeResult = {
|
||||
*/
|
||||
output: {
|
||||
/**
|
||||
* The output of the step. This is the first parameter
|
||||
* The output of the step. This is the first parameter
|
||||
* passed to the returned `StepResponse` function.
|
||||
*/
|
||||
output: unknown
|
||||
@@ -75,7 +75,7 @@ export interface WorkflowExecutionContext {
|
||||
/**
|
||||
* The details of the invocation of the workflow execution's steps.
|
||||
* The key is the step's ID, and the value is the step's details.
|
||||
*
|
||||
*
|
||||
* These details are only included for steps that have their `saveResponse` property set to `true`.
|
||||
*/
|
||||
invoke: Record<string, StepInvokeResult>
|
||||
@@ -87,7 +87,7 @@ export interface WorkflowExecutionContext {
|
||||
/**
|
||||
* The output of the compensation function of the workflow execution.
|
||||
* The key is the step's ID, and the value is the compensation function's output.
|
||||
*
|
||||
*
|
||||
* These details are only included for steps that have their `saveResponse` property set to `true`.
|
||||
*/
|
||||
compensate: Record<string, unknown>
|
||||
@@ -114,9 +114,16 @@ export interface WorkflowExecutionDefinition {
|
||||
noCompensation?: boolean
|
||||
/**
|
||||
* Indicates whether the workflow should continue even if there is a permanent failure in this step.
|
||||
* In case it is set to true, the children steps of this step will not be executed and their status will be marked as `TransactionStepState`.SKIPPED_FAILURE.
|
||||
* In case it is set to true, the the current step will be marked as TransactionStepState.PERMANENT_FAILURE and the next steps will be executed.
|
||||
*/
|
||||
continueOnPermanentFailure?: boolean
|
||||
/**
|
||||
* Indicates whether the workflow should skip all subsequent steps in case of a permanent failure in this step.
|
||||
* In case it is set to true, the next steps of the workflow will not be executed and their status will be marked as TransactionStepState.SKIPPED_FAILURE.
|
||||
* In case it is a string, the next steps until the step name provided will be skipped and the workflow will be resumed from the step provided.
|
||||
*/
|
||||
skipOnPermanentFailure?: boolean | string
|
||||
|
||||
/**
|
||||
* The maximum number of times this step should be retried in case of temporary failures.
|
||||
* The default is 0 (no retries).
|
||||
|
||||
@@ -12,6 +12,7 @@ import {
|
||||
promiseAll,
|
||||
} from "@medusajs/utils"
|
||||
import { asValue } from "awilix"
|
||||
import { setTimeout } from "timers/promises"
|
||||
import {
|
||||
createStep,
|
||||
createWorkflow,
|
||||
@@ -24,7 +25,6 @@ import {
|
||||
} from ".."
|
||||
import { MedusaWorkflow } from "../../../medusa-workflow"
|
||||
import { createHook } from "../create-hook"
|
||||
import { setTimeout } from "timers/promises"
|
||||
|
||||
jest.setTimeout(30000)
|
||||
|
||||
@@ -1297,6 +1297,69 @@ describe("Workflow composer", function () {
|
||||
})
|
||||
})
|
||||
|
||||
it("should skip all steps in case of permanent failure", async () => {
|
||||
const logStepFn = jest.fn(async ({ input }: { input: object }) => {
|
||||
return new StepResponse("done")
|
||||
})
|
||||
|
||||
const errorStep = createStep("perma-fail-step", async () => {
|
||||
return StepResponse.permanentFailure("FAIL")
|
||||
})
|
||||
|
||||
const logStep = createStep("log-step", logStepFn)
|
||||
|
||||
const fakeStepWorkflow = createWorkflow("fake-workflow", () => {
|
||||
const result = errorStep().config({
|
||||
skipOnPermanentFailure: true,
|
||||
})
|
||||
logStep({ input: { A: "123" } })
|
||||
logStep({ input: { A: "123 a" } }).config({ name: "other" })
|
||||
logStep({ input: { A: "123 b" } }).config({ name: "other_2" })
|
||||
logStep({ input: { A: "123 c" } }).config({ name: "other_3" })
|
||||
return new WorkflowResponse(result)
|
||||
})
|
||||
|
||||
const { transaction } = await fakeStepWorkflow().run({
|
||||
input: 1,
|
||||
})
|
||||
|
||||
expect(transaction.getState()).toEqual("done")
|
||||
expect(logStepFn).toHaveBeenCalledTimes(0)
|
||||
})
|
||||
|
||||
it("should skip steps until the named step in case of permanent failure", async () => {
|
||||
const logStepFn = jest.fn(async ({ input }: { input: object }) => {
|
||||
return new StepResponse("done and returned")
|
||||
})
|
||||
|
||||
const errorStep = createStep("perma-fail-step", async () => {
|
||||
return StepResponse.permanentFailure("FAIL")
|
||||
})
|
||||
|
||||
const logStep = createStep("log-step", logStepFn)
|
||||
|
||||
const fakeStepWorkflow = createWorkflow("fake-workflow", () => {
|
||||
errorStep().config({
|
||||
skipOnPermanentFailure: "other_2",
|
||||
})
|
||||
logStep({ input: { A: "123" } })
|
||||
logStep({ input: { A: "123 a" } }).config({ name: "other" })
|
||||
logStep({ input: { A: "123 b" } }).config({ name: "other_2" })
|
||||
const ret = logStep({ input: { A: "123 c" } }).config({
|
||||
name: "other_3",
|
||||
})
|
||||
return new WorkflowResponse(ret)
|
||||
})
|
||||
|
||||
const { result, transaction } = await fakeStepWorkflow().run({
|
||||
input: 1,
|
||||
})
|
||||
|
||||
expect(transaction.getState()).toEqual("done")
|
||||
expect(result).toEqual("done and returned")
|
||||
expect(logStepFn).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
|
||||
it("should compose a new workflow and skip steps depending on the input", async () => {
|
||||
const mockStep1Fn = jest.fn().mockImplementation((input) => {
|
||||
if (input === 1) {
|
||||
@@ -2701,4 +2764,98 @@ describe("Workflow composer", function () {
|
||||
|
||||
expect(workflowResult).toEqual("return from 1")
|
||||
})
|
||||
|
||||
it("should compose a new workflow that skips steps on permanent failure [1]", async () => {
|
||||
const mockStep1Fn = jest.fn().mockImplementation(async () => {
|
||||
throw new Error("failed")
|
||||
}) as any
|
||||
const mockStep2Fn = jest.fn().mockImplementation(() => {
|
||||
return new StepResponse(true)
|
||||
}) as any
|
||||
const mockStep3Fn = jest.fn().mockImplementation(() => {
|
||||
return new StepResponse(true)
|
||||
}) as any
|
||||
const mockStep4Fn = jest.fn().mockImplementation(() => {
|
||||
return new StepResponse(true)
|
||||
}) as any
|
||||
|
||||
const step1 = createStep(
|
||||
{
|
||||
name: "step1",
|
||||
skipOnPermanentFailure: "step3",
|
||||
},
|
||||
mockStep1Fn
|
||||
)
|
||||
const step2 = createStep("step2", mockStep2Fn)
|
||||
const step3 = createStep("step3", mockStep3Fn)
|
||||
const step4 = createStep("step4", mockStep4Fn)
|
||||
|
||||
const workflow = createWorkflow("workflow1", function (input) {
|
||||
const ret1 = step1()
|
||||
const [ret2, ret3] = parallelize(step2(), step3())
|
||||
const ret4 = step4()
|
||||
return new WorkflowResponse({ ret1, ret2, ret3, ret4 })
|
||||
})
|
||||
|
||||
const { result: workflowResult } = await workflow().run()
|
||||
|
||||
expect(mockStep1Fn).toHaveBeenCalledTimes(1)
|
||||
expect(mockStep2Fn).toHaveBeenCalledTimes(0)
|
||||
expect(mockStep3Fn).toHaveBeenCalledTimes(1)
|
||||
expect(mockStep4Fn).toHaveBeenCalledTimes(1)
|
||||
|
||||
expect(workflowResult).toEqual({
|
||||
ret1: undefined,
|
||||
ret2: undefined,
|
||||
ret3: true,
|
||||
ret4: true,
|
||||
})
|
||||
})
|
||||
|
||||
it("should compose a new workflow that skips steps on permanent failure [2]", async () => {
|
||||
const mockStep1Fn = jest.fn().mockImplementation(async () => {
|
||||
throw new Error("failed")
|
||||
}) as any
|
||||
const mockStep2Fn = jest.fn().mockImplementation(() => {
|
||||
return new StepResponse(true)
|
||||
}) as any
|
||||
const mockStep3Fn = jest.fn().mockImplementation(() => {
|
||||
return new StepResponse(true)
|
||||
}) as any
|
||||
const mockStep4Fn = jest.fn().mockImplementation(() => {
|
||||
return new StepResponse(true)
|
||||
}) as any
|
||||
|
||||
const step1 = createStep(
|
||||
{
|
||||
name: "step1",
|
||||
skipOnPermanentFailure: "step4",
|
||||
},
|
||||
mockStep1Fn
|
||||
)
|
||||
const step2 = createStep("step2", mockStep2Fn)
|
||||
const step3 = createStep("step3", mockStep3Fn)
|
||||
const step4 = createStep("step4", mockStep4Fn)
|
||||
|
||||
const workflow = createWorkflow("workflow1", function (input) {
|
||||
const ret1 = step1()
|
||||
const [ret2, ret3] = parallelize(step2(), step3())
|
||||
const ret4 = step4()
|
||||
return new WorkflowResponse({ ret1, ret2, ret3, ret4 })
|
||||
})
|
||||
|
||||
const { result: workflowResult } = await workflow().run()
|
||||
|
||||
expect(mockStep1Fn).toHaveBeenCalledTimes(1)
|
||||
expect(mockStep2Fn).toHaveBeenCalledTimes(0)
|
||||
expect(mockStep3Fn).toHaveBeenCalledTimes(0)
|
||||
expect(mockStep4Fn).toHaveBeenCalledTimes(1)
|
||||
|
||||
expect(workflowResult).toEqual({
|
||||
ret1: undefined,
|
||||
ret2: undefined,
|
||||
ret3: undefined,
|
||||
ret4: true,
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -74,7 +74,11 @@
|
||||
* continueOnPermanentFailure:
|
||||
* type: boolean
|
||||
* title: continueOnPermanentFailure
|
||||
* description: Whether the step continues executing even if its status is changed to failed.
|
||||
* description: Whether the workflow should continue executing even if its status is changed to failed.
|
||||
* skipOnPermanentFailure:
|
||||
* type: boolean
|
||||
* title: skipOnPermanentFailure
|
||||
* description: Whether the workflow should skip subsequent steps in case of a permanent failure.
|
||||
* maxRetries:
|
||||
* type: number
|
||||
* title: maxRetries
|
||||
@@ -139,6 +143,5 @@
|
||||
* type: number
|
||||
* title: startedAt
|
||||
* description: The timestamp the step started executing.
|
||||
*
|
||||
*/
|
||||
|
||||
*
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user