feat(orchestration,workflows-sdk): Skip step (#8334)

This commit is contained in:
Carlos R. L. Rodrigues
2024-07-29 14:26:47 -03:00
committed by GitHub
parent e98012a858
commit 24c105f288
17 changed files with 272 additions and 33 deletions

View File

@@ -2063,6 +2063,8 @@
"runningState": "Running...",
"awaitingState": "Awaiting",
"failedState": "Failed",
"skippedState": "Skipped",
"skippedFailureState": "Skipped (Failure)",
"definitionLabel": "Definition",
"outputLabel": "Output",
"compensateInputLabel": "Compensate input",
@@ -2085,6 +2087,7 @@
"step": {
"state": {
"skipped": "Skipped",
"skippedFailure": "Skipped (Failure)",
"dormant": "Dormant",
"timeout": "Timeout"
}

View File

@@ -4,13 +4,18 @@ export const STEP_IN_PROGRESS_STATES = [
TransactionStepState.COMPENSATING,
TransactionStepState.INVOKING,
]
export const STEP_SKIPPED_STATES = [
TransactionStepState.SKIPPED,
TransactionStepState.SKIPPED_FAILURE,
]
export const STEP_OK_STATES = [TransactionStepState.DONE]
export const STEP_ERROR_STATES = [
TransactionStepState.FAILED,
TransactionStepState.REVERTED,
TransactionStepState.TIMEOUT,
TransactionStepState.DORMANT,
TransactionStepState.SKIPPED,
]
export const STEP_INACTIVE_STATES = [TransactionStepState.NOT_STARTED]

View File

@@ -91,5 +91,6 @@ export enum TransactionStepState {
FAILED = "failed",
DORMANT = "dormant",
SKIPPED = "skipped",
SKIPPED_FAILURE = "skipped_failure",
TIMEOUT = "timeout",
}

View File

@@ -93,6 +93,8 @@ export const getStepState = (
return t("workflowExecutions.state.notStarted")
case TransactionStepState.SKIPPED:
return t("workflowExecutions.step.state.skipped")
case TransactionStepState.SKIPPED_FAILURE:
return t("workflowExecutions.step.state.skippedFailure")
case TransactionStepState.DORMANT:
return t("workflowExecutions.step.state.dormant")
case TransactionStepState.TIMEOUT:

View File

@@ -17,6 +17,7 @@ import {
STEP_INACTIVE_STATES,
STEP_IN_PROGRESS_STATES,
STEP_OK_STATES,
STEP_SKIPPED_STATES,
} from "../../../constants"
import {
StepError,
@@ -132,6 +133,9 @@ const Event = ({
<div className="bg-ui-bg-base shadow-borders-base flex size-2.5 items-center justify-center rounded-full">
<div
className={clx("size-1.5 rounded-full", {
"bg-ui-tag-neutral-bg": STEP_SKIPPED_STATES.includes(
step.invoke.state
),
"bg-ui-tag-green-icon": STEP_OK_STATES.includes(
step.invoke.state
),
@@ -204,7 +208,8 @@ const Event = ({
snippets={[
{
code: JSON.stringify(
stepInvokeContext.output.output,
// TODO: Apply resolve value: packages/core/workflows-sdk/src/utils/composer/helpers/resolve-value.ts
stepInvokeContext?.output?.output ?? {},
null,
2
),
@@ -227,8 +232,9 @@ const Event = ({
<CodeBlock
snippets={[
{
// TODO: Apply resolve value: packages/core/workflows-sdk/src/utils/composer/helpers/resolve-value.ts
code: JSON.stringify(
stepInvokeContext.output.compensateInput,
stepInvokeContext?.output?.compensateInput ?? {},
null,
2
),
@@ -290,6 +296,8 @@ const StepState = ({
const isFailed = state === TransactionStepState.FAILED
const isRunning = state === TransactionStepState.INVOKING
const isSkipped = state === TransactionStepState.SKIPPED
const isSkippedFailure = state === TransactionStepState.SKIPPED_FAILURE
if (isUnreachable) {
return null
@@ -306,10 +314,20 @@ const StepState = ({
)
}
if (isFailed) {
let stateText: string | undefined
if (isSkipped) {
stateText = t("workflowExecutions.history.skippedState")
} else if (isSkippedFailure) {
stateText = t("workflowExecutions.history.skippedFailureState")
} else if (isFailed) {
stateText = t("workflowExecutions.history.failedState")
}
if (stateText !== null) {
return (
<Text size="small" leading="compact" className="text-ui-fg-subtle">
{t("workflowExecutions.history.failedState")}
{stateText}
</Text>
)
}

View File

@@ -15,6 +15,7 @@ import {
STEP_INACTIVE_STATES,
STEP_IN_PROGRESS_STATES,
STEP_OK_STATES,
STEP_SKIPPED_STATES,
} from "../../../constants"
import { WorkflowExecutionDTO, WorkflowExecutionStep } from "../../../types"
@@ -405,6 +406,9 @@ const Node = ({ step }: { step: WorkflowExecutionStep }) => {
className={clx(
"size-2 rounded-sm shadow-[inset_0_0_0_1px_rgba(0,0,0,0.12)]",
{
"bg-ui-tag-neutral-bg": STEP_SKIPPED_STATES.includes(
step.invoke.state
),
"bg-ui-tag-green-icon": STEP_OK_STATES.includes(
step.invoke.state
),

View File

@@ -1207,7 +1207,7 @@ describe("Transaction Orchestrator", () => {
expect(
transaction.getFlow().steps["_root.action1.action2.action4"].invoke
.state
).toBe(TransactionStepState.SKIPPED)
).toBe(TransactionStepState.SKIPPED_FAILURE)
expect(
transaction.getFlow().steps["_root.action1.action2.action4"].invoke
.status

View File

@@ -14,6 +14,19 @@ export class PermanentStepFailureError extends Error {
}
}
export class SkipStepResponse extends Error {
static isSkipStepResponse(error: Error): error is SkipStepResponse {
return (
error instanceof SkipStepResponse || error?.name === "SkipStepResponse"
)
}
constructor(message?: string) {
super(message)
this.name = "SkipStepResponse"
}
}
export class TransactionStepTimeoutError extends Error {
static isTransactionStepTimeoutError(
error: Error

View File

@@ -21,6 +21,7 @@ import {
isErrorLike,
PermanentStepFailureError,
serializeError,
SkipStepResponse,
TransactionStepTimeoutError,
TransactionTimeoutError,
} from "./errors"
@@ -36,6 +37,7 @@ export type TransactionFlow = {
}
hasAsyncSteps: boolean
hasFailedSteps: boolean
hasSkippedOnFailureSteps: boolean
hasWaitingSteps: boolean
hasSkippedSteps: boolean
hasRevertedSteps: boolean
@@ -125,6 +127,7 @@ export class TransactionOrchestrator extends EventEmitter {
TransactionStepState.FAILED,
TransactionStepState.TIMEOUT,
TransactionStepState.SKIPPED,
TransactionStepState.SKIPPED_FAILURE,
]
const siblings = this.getPreviousStep(flow, previousStep).next.map(
@@ -143,6 +146,7 @@ export class TransactionOrchestrator extends EventEmitter {
TransactionStepState.REVERTED,
TransactionStepState.FAILED,
TransactionStepState.DORMANT,
TransactionStepState.SKIPPED,
]
const siblings = step.next.map((sib) => flow.steps[sib])
return (
@@ -253,6 +257,7 @@ export class TransactionOrchestrator extends EventEmitter {
completed: number
}> {
let hasSkipped = false
let hasSkippedOnFailure = false
let hasIgnoredFailure = false
let hasFailed = false
let hasWaiting = false
@@ -328,7 +333,9 @@ export class TransactionOrchestrator extends EventEmitter {
} else {
completedSteps++
if (curState.state === TransactionStepState.SKIPPED) {
if (curState.state === TransactionStepState.SKIPPED_FAILURE) {
hasSkippedOnFailure = true
} else if (curState.state === TransactionStepState.SKIPPED) {
hasSkipped = true
} else if (curState.state === TransactionStepState.REVERTED) {
hasReverted = true
@@ -358,6 +365,9 @@ export class TransactionOrchestrator extends EventEmitter {
return await this.checkAllSteps(transaction)
} else if (completedSteps === totalSteps) {
if (hasSkippedOnFailure) {
flow.hasSkippedOnFailureSteps = true
}
if (hasSkipped) {
flow.hasSkippedSteps = true
}
@@ -453,6 +463,39 @@ export class TransactionOrchestrator extends EventEmitter {
transaction.emit(eventName, { step, transaction })
}
private static async skipStep(
transaction: DistributedTransaction,
step: TransactionStep
): Promise<void> {
const hasStepTimedOut =
step.getStates().state === TransactionStepState.TIMEOUT
const flow = transaction.getFlow()
const options = TransactionOrchestrator.getWorkflowOptions(flow.modelId)
if (!hasStepTimedOut) {
step.changeStatus(TransactionStepStatus.OK)
step.changeState(TransactionStepState.SKIPPED)
}
if (step.definition.async || options?.storeExecution) {
await transaction.saveCheckpoint()
}
const cleaningUp: Promise<unknown>[] = []
if (step.hasRetryScheduled()) {
cleaningUp.push(transaction.clearRetry(step))
}
if (step.hasTimeout()) {
cleaningUp.push(transaction.clearStepTimeout(step))
}
await promiseAll(cleaningUp)
const eventName = DistributedTransactionEvent.STEP_SKIPPED
transaction.emit(eventName, { step, transaction })
}
private static async setStepTimeout(
transaction: DistributedTransaction,
step: TransactionStep,
@@ -539,7 +582,7 @@ export class TransactionOrchestrator extends EventEmitter {
) {
for (const childStep of step.next) {
const child = flow.steps[childStep]
child.changeState(TransactionStepState.SKIPPED)
child.changeState(TransactionStepState.SKIPPED_FAILURE)
}
} else {
flow.state = TransactionState.WAITING_TO_COMPENSATE
@@ -701,6 +744,12 @@ export class TransactionOrchestrator extends EventEmitter {
)
}
const output = response?.__type ? response.output : response
if (SkipStepResponse.isSkipStepResponse(output)) {
await TransactionOrchestrator.skipStep(transaction, step)
return
}
await TransactionOrchestrator.setStepSuccess(
transaction,
step,
@@ -754,11 +803,19 @@ export class TransactionOrchestrator extends EventEmitter {
)
}
await TransactionOrchestrator.setStepSuccess(
transaction,
step,
response
)
let setResponse = true
if (SkipStepResponse.isSkipStepResponse(response)) {
await TransactionOrchestrator.skipStep(transaction, step)
setResponse = false
}
if (setResponse) {
await TransactionOrchestrator.setStepSuccess(
transaction,
step,
response
)
}
await transaction.scheduleRetry(
step,
@@ -912,6 +969,7 @@ export class TransactionOrchestrator extends EventEmitter {
metadata: flowMetadata,
hasAsyncSteps: features.hasAsyncSteps,
hasFailedSteps: false,
hasSkippedOnFailureSteps: false,
hasSkippedSteps: false,
hasWaitingSteps: false,
hasRevertedSteps: false,
@@ -1176,6 +1234,41 @@ export class TransactionOrchestrator extends EventEmitter {
return [transaction, step]
}
/** Skip the execution of a specific transaction and step
* @param responseIdempotencyKey - The idempotency key for the step
* @param handler - The handler function to execute the step
* @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey
*/
public async skipStep(
responseIdempotencyKey: string,
handler?: TransactionStepHandler,
transaction?: DistributedTransaction
): Promise<DistributedTransaction> {
const [curTransaction, step] =
await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey,
handler,
transaction
)
if (step.getStates().status === TransactionStepStatus.WAITING) {
this.emit(DistributedTransactionEvent.RESUME, {
transaction: curTransaction,
})
await TransactionOrchestrator.skipStep(curTransaction, step)
await this.executeNext(curTransaction)
} else {
throw new MedusaError(
MedusaError.Types.NOT_ALLOWED,
`Cannot skip a step when status is ${step.getStates().status}`
)
}
return curTransaction
}
/** Register a step success for a specific transaction and step
* @param responseIdempotencyKey - The idempotency key for the step
* @param handler - The handler function to execute the step

View File

@@ -93,11 +93,13 @@ export class TransactionStep {
TransactionStepState.COMPENSATING,
TransactionStepState.FAILED,
TransactionStepState.SKIPPED,
TransactionStepState.SKIPPED_FAILURE,
],
[TransactionStepState.INVOKING]: [
TransactionStepState.FAILED,
TransactionStepState.DONE,
TransactionStepState.TIMEOUT,
TransactionStepState.SKIPPED,
],
[TransactionStepState.COMPENSATING]: [
TransactionStepState.REVERTED,

View File

@@ -24,7 +24,7 @@ export type TransactionStepsDefinition = {
/**
* Indicates whether the workflow should continue even if there is a permanent failure in this step.
* In case it is set to true, the children steps of this step will not be executed and their status will be marked as TransactionStepState.SKIPPED.
* In case it is set to true, the children steps of this step will not be executed and their status will be marked as TransactionStepState.SKIPPED_FAILURE.
*/
continueOnPermanentFailure?: boolean
@@ -164,6 +164,7 @@ export enum DistributedTransactionEvent {
TIMEOUT = "timeout",
STEP_BEGIN = "stepBegin",
STEP_SUCCESS = "stepSuccess",
STEP_SKIPPED = "stepSkipped",
STEP_FAILURE = "stepFailure",
STEP_AWAITING = "stepAwaiting",
COMPENSATE_STEP_SUCCESS = "compensateStepSuccess",
@@ -211,6 +212,11 @@ export type DistributedTransactionEvents = {
step: TransactionStep
transaction: DistributedTransaction
}) => void
onStepSkipped?: (args: {
step: TransactionStep
transaction: DistributedTransaction
}) => void
}
export type StepFeatures = {

View File

@@ -1,12 +1,12 @@
import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
import {
createMedusaContainer,
isDefined,
isString,
MedusaContext,
MedusaContextType,
MedusaError,
MedusaModuleType,
createMedusaContainer,
isDefined,
isString,
} from "@medusajs/utils"
import { asValue } from "awilix"
import {
@@ -281,6 +281,13 @@ export class LocalWorkflow {
eventWrapperMap.get("onCompensateStepFailure")
)
}
if (subscribe?.onStepSkipped) {
transaction.on(
DistributedTransactionEvent.STEP_SKIPPED,
eventWrapperMap.get("onStepSkipped")
)
}
}
if (transaction) {

View File

@@ -3,14 +3,6 @@ export enum TransactionHandlerType {
COMPENSATE = "compensate",
}
export enum TransactionStepStatus {
IDLE = "idle",
OK = "ok",
WAITING = "waiting_response",
TEMPORARY_FAILURE = "temp_failure",
PERMANENT_FAILURE = "permanent_failure",
}
export enum TransactionState {
NOT_STARTED = "not_started",
INVOKING = "invoking",
@@ -30,5 +22,14 @@ export enum TransactionStepState {
FAILED = "failed",
DORMANT = "dormant",
SKIPPED = "skipped",
SKIPPED_FAILURE = "skipped_failure",
TIMEOUT = "timeout",
}
export enum TransactionStepStatus {
IDLE = "idle",
OK = "ok",
WAITING = "waiting_response",
TEMPORARY_FAILURE = "temp_failure",
PERMANENT_FAILURE = "permanent_failure",
}

View File

@@ -1,20 +1,20 @@
import { WorkflowManager, WorkflowScheduler } from "@medusajs/orchestration"
import {
ModuleRegistrationName,
composeMessage,
createMedusaContainer,
ModuleRegistrationName,
promiseAll,
} from "@medusajs/utils"
import { asValue } from "awilix"
import {
StepResponse,
createStep,
createWorkflow,
hook,
parallelize,
StepResponse,
transform,
} from ".."
import { MedusaWorkflow } from "../../../medusa-workflow"
import { asValue } from "awilix"
import { IDistributedSchedulerStorage, SchedulerOptions } from "../../dist"
jest.setTimeout(30000)
@@ -1113,6 +1113,82 @@ describe("Workflow composer", function () {
})
})
it("should compose a new workflow and skip steps depending on the input", async () => {
const mockStep1Fn = jest.fn().mockImplementation((input) => {
if (input === 1) {
return StepResponse.skip()
} else {
return new StepResponse({ obj: "return from 1" })
}
})
const mockStep2Fn = jest.fn().mockImplementation((input) => {
if (!input) {
return StepResponse.skip()
}
})
const mockStep3Fn = jest.fn().mockImplementation((inputs) => {
return new StepResponse({
inputs,
obj: "return from 3",
})
})
const step1 = createStep("step1", mockStep1Fn)
const step2 = createStep("step2", mockStep2Fn)
const step3 = createStep("step3", mockStep3Fn)
const workflow = createWorkflow("workflow1", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2, input })
})
const { result: workflowResult, transaction } = await workflow().run({
input: 1,
})
expect(transaction.getFlow().hasSkippedSteps).toBe(true)
expect(mockStep3Fn.mock.calls[0][0]).toEqual({
input: 1,
})
expect(workflowResult).toEqual({
inputs: {
input: 1,
},
obj: "return from 3",
})
const { result: workflowResultTwo, transaction: transactionTwo } =
await workflow().run({
input: "none",
})
expect(transactionTwo.getFlow().hasSkippedSteps).toBe(false)
expect(mockStep3Fn.mock.calls[1][0]).toEqual({
one: {
obj: "return from 1",
},
two: {
__type: "Symbol(WorkflowWorkflowData)",
},
input: "none",
})
expect(workflowResultTwo).toEqual({
inputs: {
one: {
obj: "return from 1",
},
two: {
__type: "Symbol(WorkflowWorkflowData)",
},
input: "none",
},
obj: "return from 3",
})
})
it("should compose two new workflows sequentially and execute them sequentially", async () => {
const mockStep1Fn = jest.fn().mockImplementation((input, context) => {
return new StepResponse({ inputs: [input], obj: "return from 1" })

View File

@@ -4,9 +4,9 @@ import {
WorkflowStepHandler,
WorkflowStepHandlerArguments,
} from "@medusajs/orchestration"
import { deepCopy, isString, OrchestrationUtils } from "@medusajs/utils"
import { OrchestrationUtils, deepCopy, isString } from "@medusajs/utils"
import { ulid } from "ulid"
import { resolveValue, StepResponse } from "./helpers"
import { StepResponse, resolveValue } from "./helpers"
import { proxify } from "./helpers/proxy"
import {
CreateWorkflowComposerContext,
@@ -353,7 +353,7 @@ function wrapConditionalStep(
}
if (!canContinue) {
return
return StepResponse.skip()
}
return await originalInvoke(stepArguments)

View File

@@ -1,5 +1,8 @@
import { PermanentStepFailureError } from "@medusajs/orchestration"
import { isDefined, OrchestrationUtils } from "@medusajs/utils"
import {
PermanentStepFailureError,
SkipStepResponse,
} from "@medusajs/orchestration"
import { OrchestrationUtils, isDefined } from "@medusajs/utils"
/**
* This class is used to create the response returned by a step. A step return its data by returning an instance of `StepResponse`.
@@ -116,6 +119,10 @@ export class StepResponse<TOutput, TCompensateInput = TOutput> {
throw new PermanentStepFailureError(message)
}
static skip(): SkipStepResponse {
return new SkipStepResponse()
}
/**
* @internal
*/

View File

@@ -50,6 +50,7 @@ export function when(input, condition) {
}
delete global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition]
return ret
},
}