diff --git a/packages/admin-next/dashboard/src/i18n/translations/en.json b/packages/admin-next/dashboard/src/i18n/translations/en.json index fcf93644d0..2fa8c59d47 100644 --- a/packages/admin-next/dashboard/src/i18n/translations/en.json +++ b/packages/admin-next/dashboard/src/i18n/translations/en.json @@ -2063,6 +2063,8 @@ "runningState": "Running...", "awaitingState": "Awaiting", "failedState": "Failed", + "skippedState": "Skipped", + "skippedFailureState": "Skipped (Failure)", "definitionLabel": "Definition", "outputLabel": "Output", "compensateInputLabel": "Compensate input", @@ -2085,6 +2087,7 @@ "step": { "state": { "skipped": "Skipped", + "skippedFailure": "Skipped (Failure)", "dormant": "Dormant", "timeout": "Timeout" } diff --git a/packages/admin-next/dashboard/src/routes/workflow-executions/constants.ts b/packages/admin-next/dashboard/src/routes/workflow-executions/constants.ts index c02f3cf0ef..f84c286d6a 100644 --- a/packages/admin-next/dashboard/src/routes/workflow-executions/constants.ts +++ b/packages/admin-next/dashboard/src/routes/workflow-executions/constants.ts @@ -4,13 +4,18 @@ export const STEP_IN_PROGRESS_STATES = [ TransactionStepState.COMPENSATING, TransactionStepState.INVOKING, ] + +export const STEP_SKIPPED_STATES = [ + TransactionStepState.SKIPPED, + TransactionStepState.SKIPPED_FAILURE, +] export const STEP_OK_STATES = [TransactionStepState.DONE] + export const STEP_ERROR_STATES = [ TransactionStepState.FAILED, TransactionStepState.REVERTED, TransactionStepState.TIMEOUT, TransactionStepState.DORMANT, - TransactionStepState.SKIPPED, ] export const STEP_INACTIVE_STATES = [TransactionStepState.NOT_STARTED] diff --git a/packages/admin-next/dashboard/src/routes/workflow-executions/types.ts b/packages/admin-next/dashboard/src/routes/workflow-executions/types.ts index eacd3d964f..200bb29b73 100644 --- a/packages/admin-next/dashboard/src/routes/workflow-executions/types.ts +++ b/packages/admin-next/dashboard/src/routes/workflow-executions/types.ts @@ -91,5 +91,6 @@ export enum TransactionStepState { FAILED = "failed", DORMANT = "dormant", SKIPPED = "skipped", + SKIPPED_FAILURE = "skipped_failure", TIMEOUT = "timeout", } diff --git a/packages/admin-next/dashboard/src/routes/workflow-executions/utils.ts b/packages/admin-next/dashboard/src/routes/workflow-executions/utils.ts index 58addd0b74..d168007d54 100644 --- a/packages/admin-next/dashboard/src/routes/workflow-executions/utils.ts +++ b/packages/admin-next/dashboard/src/routes/workflow-executions/utils.ts @@ -93,6 +93,8 @@ export const getStepState = ( return t("workflowExecutions.state.notStarted") case TransactionStepState.SKIPPED: return t("workflowExecutions.step.state.skipped") + case TransactionStepState.SKIPPED_FAILURE: + return t("workflowExecutions.step.state.skippedFailure") case TransactionStepState.DORMANT: return t("workflowExecutions.step.state.dormant") case TransactionStepState.TIMEOUT: diff --git a/packages/admin-next/dashboard/src/routes/workflow-executions/workflow-execution-detail/components/workflow-execution-history-section/workflow-execution-history-section.tsx b/packages/admin-next/dashboard/src/routes/workflow-executions/workflow-execution-detail/components/workflow-execution-history-section/workflow-execution-history-section.tsx index b49390fccd..acfaba9424 100644 --- a/packages/admin-next/dashboard/src/routes/workflow-executions/workflow-execution-detail/components/workflow-execution-history-section/workflow-execution-history-section.tsx +++ b/packages/admin-next/dashboard/src/routes/workflow-executions/workflow-execution-detail/components/workflow-execution-history-section/workflow-execution-history-section.tsx @@ -17,6 +17,7 @@ import { STEP_INACTIVE_STATES, STEP_IN_PROGRESS_STATES, STEP_OK_STATES, + STEP_SKIPPED_STATES, } from "../../../constants" import { StepError, @@ -132,6 +133,9 @@ const Event = ({
- {t("workflowExecutions.history.failedState")} + {stateText} ) } diff --git a/packages/admin-next/dashboard/src/routes/workflow-executions/workflow-execution-detail/components/workflow-execution-timeline-section/workflow-execution-timeline-section.tsx b/packages/admin-next/dashboard/src/routes/workflow-executions/workflow-execution-detail/components/workflow-execution-timeline-section/workflow-execution-timeline-section.tsx index aacc9dd85f..69c0107ee0 100644 --- a/packages/admin-next/dashboard/src/routes/workflow-executions/workflow-execution-detail/components/workflow-execution-timeline-section/workflow-execution-timeline-section.tsx +++ b/packages/admin-next/dashboard/src/routes/workflow-executions/workflow-execution-detail/components/workflow-execution-timeline-section/workflow-execution-timeline-section.tsx @@ -15,6 +15,7 @@ import { STEP_INACTIVE_STATES, STEP_IN_PROGRESS_STATES, STEP_OK_STATES, + STEP_SKIPPED_STATES, } from "../../../constants" import { WorkflowExecutionDTO, WorkflowExecutionStep } from "../../../types" @@ -405,6 +406,9 @@ const Node = ({ step }: { step: WorkflowExecutionStep }) => { className={clx( "size-2 rounded-sm shadow-[inset_0_0_0_1px_rgba(0,0,0,0.12)]", { + "bg-ui-tag-neutral-bg": STEP_SKIPPED_STATES.includes( + step.invoke.state + ), "bg-ui-tag-green-icon": STEP_OK_STATES.includes( step.invoke.state ), diff --git a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts index 5b5048909c..eee8c8b92c 100644 --- a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts @@ -1207,7 +1207,7 @@ describe("Transaction Orchestrator", () => { expect( transaction.getFlow().steps["_root.action1.action2.action4"].invoke .state - ).toBe(TransactionStepState.SKIPPED) + ).toBe(TransactionStepState.SKIPPED_FAILURE) expect( transaction.getFlow().steps["_root.action1.action2.action4"].invoke .status diff --git a/packages/core/orchestration/src/transaction/errors.ts b/packages/core/orchestration/src/transaction/errors.ts index 615188d9a8..3473bd45f4 100644 --- a/packages/core/orchestration/src/transaction/errors.ts +++ b/packages/core/orchestration/src/transaction/errors.ts @@ -14,6 +14,19 @@ export class PermanentStepFailureError extends Error { } } +export class SkipStepResponse extends Error { + static isSkipStepResponse(error: Error): error is SkipStepResponse { + return ( + error instanceof SkipStepResponse || error?.name === "SkipStepResponse" + ) + } + + constructor(message?: string) { + super(message) + this.name = "SkipStepResponse" + } +} + export class TransactionStepTimeoutError extends Error { static isTransactionStepTimeoutError( error: Error diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 71ab04ac3b..ce40e6b25d 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -21,6 +21,7 @@ import { isErrorLike, PermanentStepFailureError, serializeError, + SkipStepResponse, TransactionStepTimeoutError, TransactionTimeoutError, } from "./errors" @@ -36,6 +37,7 @@ export type TransactionFlow = { } hasAsyncSteps: boolean hasFailedSteps: boolean + hasSkippedOnFailureSteps: boolean hasWaitingSteps: boolean hasSkippedSteps: boolean hasRevertedSteps: boolean @@ -125,6 +127,7 @@ export class TransactionOrchestrator extends EventEmitter { TransactionStepState.FAILED, TransactionStepState.TIMEOUT, TransactionStepState.SKIPPED, + TransactionStepState.SKIPPED_FAILURE, ] const siblings = this.getPreviousStep(flow, previousStep).next.map( @@ -143,6 +146,7 @@ export class TransactionOrchestrator extends EventEmitter { TransactionStepState.REVERTED, TransactionStepState.FAILED, TransactionStepState.DORMANT, + TransactionStepState.SKIPPED, ] const siblings = step.next.map((sib) => flow.steps[sib]) return ( @@ -253,6 +257,7 @@ export class TransactionOrchestrator extends EventEmitter { completed: number }> { let hasSkipped = false + let hasSkippedOnFailure = false let hasIgnoredFailure = false let hasFailed = false let hasWaiting = false @@ -328,7 +333,9 @@ export class TransactionOrchestrator extends EventEmitter { } else { completedSteps++ - if (curState.state === TransactionStepState.SKIPPED) { + if (curState.state === TransactionStepState.SKIPPED_FAILURE) { + hasSkippedOnFailure = true + } else if (curState.state === TransactionStepState.SKIPPED) { hasSkipped = true } else if (curState.state === TransactionStepState.REVERTED) { hasReverted = true @@ -358,6 +365,9 @@ export class TransactionOrchestrator extends EventEmitter { return await this.checkAllSteps(transaction) } else if (completedSteps === totalSteps) { + if (hasSkippedOnFailure) { + flow.hasSkippedOnFailureSteps = true + } if (hasSkipped) { flow.hasSkippedSteps = true } @@ -453,6 +463,39 @@ export class TransactionOrchestrator extends EventEmitter { transaction.emit(eventName, { step, transaction }) } + private static async skipStep( + transaction: DistributedTransaction, + step: TransactionStep + ): Promise { + const hasStepTimedOut = + step.getStates().state === TransactionStepState.TIMEOUT + + const flow = transaction.getFlow() + const options = TransactionOrchestrator.getWorkflowOptions(flow.modelId) + + if (!hasStepTimedOut) { + step.changeStatus(TransactionStepStatus.OK) + step.changeState(TransactionStepState.SKIPPED) + } + + if (step.definition.async || options?.storeExecution) { + await transaction.saveCheckpoint() + } + + const cleaningUp: Promise[] = [] + if (step.hasRetryScheduled()) { + cleaningUp.push(transaction.clearRetry(step)) + } + if (step.hasTimeout()) { + cleaningUp.push(transaction.clearStepTimeout(step)) + } + + await promiseAll(cleaningUp) + + const eventName = DistributedTransactionEvent.STEP_SKIPPED + transaction.emit(eventName, { step, transaction }) + } + private static async setStepTimeout( transaction: DistributedTransaction, step: TransactionStep, @@ -539,7 +582,7 @@ export class TransactionOrchestrator extends EventEmitter { ) { for (const childStep of step.next) { const child = flow.steps[childStep] - child.changeState(TransactionStepState.SKIPPED) + child.changeState(TransactionStepState.SKIPPED_FAILURE) } } else { flow.state = TransactionState.WAITING_TO_COMPENSATE @@ -701,6 +744,12 @@ export class TransactionOrchestrator extends EventEmitter { ) } + const output = response?.__type ? response.output : response + if (SkipStepResponse.isSkipStepResponse(output)) { + await TransactionOrchestrator.skipStep(transaction, step) + return + } + await TransactionOrchestrator.setStepSuccess( transaction, step, @@ -754,11 +803,19 @@ export class TransactionOrchestrator extends EventEmitter { ) } - await TransactionOrchestrator.setStepSuccess( - transaction, - step, - response - ) + let setResponse = true + if (SkipStepResponse.isSkipStepResponse(response)) { + await TransactionOrchestrator.skipStep(transaction, step) + setResponse = false + } + + if (setResponse) { + await TransactionOrchestrator.setStepSuccess( + transaction, + step, + response + ) + } await transaction.scheduleRetry( step, @@ -912,6 +969,7 @@ export class TransactionOrchestrator extends EventEmitter { metadata: flowMetadata, hasAsyncSteps: features.hasAsyncSteps, hasFailedSteps: false, + hasSkippedOnFailureSteps: false, hasSkippedSteps: false, hasWaitingSteps: false, hasRevertedSteps: false, @@ -1176,6 +1234,41 @@ export class TransactionOrchestrator extends EventEmitter { return [transaction, step] } + /** Skip the execution of a specific transaction and step + * @param responseIdempotencyKey - The idempotency key for the step + * @param handler - The handler function to execute the step + * @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey + */ + public async skipStep( + responseIdempotencyKey: string, + handler?: TransactionStepHandler, + transaction?: DistributedTransaction + ): Promise { + const [curTransaction, step] = + await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey( + responseIdempotencyKey, + handler, + transaction + ) + + if (step.getStates().status === TransactionStepStatus.WAITING) { + this.emit(DistributedTransactionEvent.RESUME, { + transaction: curTransaction, + }) + + await TransactionOrchestrator.skipStep(curTransaction, step) + + await this.executeNext(curTransaction) + } else { + throw new MedusaError( + MedusaError.Types.NOT_ALLOWED, + `Cannot skip a step when status is ${step.getStates().status}` + ) + } + + return curTransaction + } + /** Register a step success for a specific transaction and step * @param responseIdempotencyKey - The idempotency key for the step * @param handler - The handler function to execute the step diff --git a/packages/core/orchestration/src/transaction/transaction-step.ts b/packages/core/orchestration/src/transaction/transaction-step.ts index c5c8016282..abc0437000 100644 --- a/packages/core/orchestration/src/transaction/transaction-step.ts +++ b/packages/core/orchestration/src/transaction/transaction-step.ts @@ -93,11 +93,13 @@ export class TransactionStep { TransactionStepState.COMPENSATING, TransactionStepState.FAILED, TransactionStepState.SKIPPED, + TransactionStepState.SKIPPED_FAILURE, ], [TransactionStepState.INVOKING]: [ TransactionStepState.FAILED, TransactionStepState.DONE, TransactionStepState.TIMEOUT, + TransactionStepState.SKIPPED, ], [TransactionStepState.COMPENSATING]: [ TransactionStepState.REVERTED, diff --git a/packages/core/orchestration/src/transaction/types.ts b/packages/core/orchestration/src/transaction/types.ts index 7fae4466e1..fee68169d4 100644 --- a/packages/core/orchestration/src/transaction/types.ts +++ b/packages/core/orchestration/src/transaction/types.ts @@ -24,7 +24,7 @@ export type TransactionStepsDefinition = { /** * Indicates whether the workflow should continue even if there is a permanent failure in this step. - * In case it is set to true, the children steps of this step will not be executed and their status will be marked as TransactionStepState.SKIPPED. + * In case it is set to true, the children steps of this step will not be executed and their status will be marked as TransactionStepState.SKIPPED_FAILURE. */ continueOnPermanentFailure?: boolean @@ -164,6 +164,7 @@ export enum DistributedTransactionEvent { TIMEOUT = "timeout", STEP_BEGIN = "stepBegin", STEP_SUCCESS = "stepSuccess", + STEP_SKIPPED = "stepSkipped", STEP_FAILURE = "stepFailure", STEP_AWAITING = "stepAwaiting", COMPENSATE_STEP_SUCCESS = "compensateStepSuccess", @@ -211,6 +212,11 @@ export type DistributedTransactionEvents = { step: TransactionStep transaction: DistributedTransaction }) => void + + onStepSkipped?: (args: { + step: TransactionStep + transaction: DistributedTransaction + }) => void } export type StepFeatures = { diff --git a/packages/core/orchestration/src/workflow/local-workflow.ts b/packages/core/orchestration/src/workflow/local-workflow.ts index c7ff5641c9..e72157d73a 100644 --- a/packages/core/orchestration/src/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/workflow/local-workflow.ts @@ -1,12 +1,12 @@ import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" import { - createMedusaContainer, - isDefined, - isString, MedusaContext, MedusaContextType, MedusaError, MedusaModuleType, + createMedusaContainer, + isDefined, + isString, } from "@medusajs/utils" import { asValue } from "awilix" import { @@ -281,6 +281,13 @@ export class LocalWorkflow { eventWrapperMap.get("onCompensateStepFailure") ) } + + if (subscribe?.onStepSkipped) { + transaction.on( + DistributedTransactionEvent.STEP_SKIPPED, + eventWrapperMap.get("onStepSkipped") + ) + } } if (transaction) { diff --git a/packages/core/utils/src/orchestration/types.ts b/packages/core/utils/src/orchestration/types.ts index 26cba445d6..f3be5930e3 100644 --- a/packages/core/utils/src/orchestration/types.ts +++ b/packages/core/utils/src/orchestration/types.ts @@ -3,14 +3,6 @@ export enum TransactionHandlerType { COMPENSATE = "compensate", } -export enum TransactionStepStatus { - IDLE = "idle", - OK = "ok", - WAITING = "waiting_response", - TEMPORARY_FAILURE = "temp_failure", - PERMANENT_FAILURE = "permanent_failure", -} - export enum TransactionState { NOT_STARTED = "not_started", INVOKING = "invoking", @@ -30,5 +22,14 @@ export enum TransactionStepState { FAILED = "failed", DORMANT = "dormant", SKIPPED = "skipped", + SKIPPED_FAILURE = "skipped_failure", TIMEOUT = "timeout", } + +export enum TransactionStepStatus { + IDLE = "idle", + OK = "ok", + WAITING = "waiting_response", + TEMPORARY_FAILURE = "temp_failure", + PERMANENT_FAILURE = "permanent_failure", +} diff --git a/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts b/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts index e494de1e58..f876c218d4 100644 --- a/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts +++ b/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts @@ -1,20 +1,20 @@ import { WorkflowManager, WorkflowScheduler } from "@medusajs/orchestration" import { + ModuleRegistrationName, composeMessage, createMedusaContainer, - ModuleRegistrationName, promiseAll, } from "@medusajs/utils" +import { asValue } from "awilix" import { + StepResponse, createStep, createWorkflow, hook, parallelize, - StepResponse, transform, } from ".." import { MedusaWorkflow } from "../../../medusa-workflow" -import { asValue } from "awilix" import { IDistributedSchedulerStorage, SchedulerOptions } from "../../dist" jest.setTimeout(30000) @@ -1113,6 +1113,82 @@ describe("Workflow composer", function () { }) }) + it("should compose a new workflow and skip steps depending on the input", async () => { + const mockStep1Fn = jest.fn().mockImplementation((input) => { + if (input === 1) { + return StepResponse.skip() + } else { + return new StepResponse({ obj: "return from 1" }) + } + }) + const mockStep2Fn = jest.fn().mockImplementation((input) => { + if (!input) { + return StepResponse.skip() + } + }) + const mockStep3Fn = jest.fn().mockImplementation((inputs) => { + return new StepResponse({ + inputs, + obj: "return from 3", + }) + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + + const workflow = createWorkflow("workflow1", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2, input }) + }) + + const { result: workflowResult, transaction } = await workflow().run({ + input: 1, + }) + + expect(transaction.getFlow().hasSkippedSteps).toBe(true) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + input: 1, + }) + + expect(workflowResult).toEqual({ + inputs: { + input: 1, + }, + obj: "return from 3", + }) + + const { result: workflowResultTwo, transaction: transactionTwo } = + await workflow().run({ + input: "none", + }) + + expect(transactionTwo.getFlow().hasSkippedSteps).toBe(false) + expect(mockStep3Fn.mock.calls[1][0]).toEqual({ + one: { + obj: "return from 1", + }, + two: { + __type: "Symbol(WorkflowWorkflowData)", + }, + input: "none", + }) + + expect(workflowResultTwo).toEqual({ + inputs: { + one: { + obj: "return from 1", + }, + two: { + __type: "Symbol(WorkflowWorkflowData)", + }, + input: "none", + }, + obj: "return from 3", + }) + }) + it("should compose two new workflows sequentially and execute them sequentially", async () => { const mockStep1Fn = jest.fn().mockImplementation((input, context) => { return new StepResponse({ inputs: [input], obj: "return from 1" }) diff --git a/packages/core/workflows-sdk/src/utils/composer/create-step.ts b/packages/core/workflows-sdk/src/utils/composer/create-step.ts index 11e593332b..81686f8d5b 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-step.ts @@ -4,9 +4,9 @@ import { WorkflowStepHandler, WorkflowStepHandlerArguments, } from "@medusajs/orchestration" -import { deepCopy, isString, OrchestrationUtils } from "@medusajs/utils" +import { OrchestrationUtils, deepCopy, isString } from "@medusajs/utils" import { ulid } from "ulid" -import { resolveValue, StepResponse } from "./helpers" +import { StepResponse, resolveValue } from "./helpers" import { proxify } from "./helpers/proxy" import { CreateWorkflowComposerContext, @@ -353,7 +353,7 @@ function wrapConditionalStep( } if (!canContinue) { - return + return StepResponse.skip() } return await originalInvoke(stepArguments) diff --git a/packages/core/workflows-sdk/src/utils/composer/helpers/step-response.ts b/packages/core/workflows-sdk/src/utils/composer/helpers/step-response.ts index e0ba54ef28..88f2becc0e 100644 --- a/packages/core/workflows-sdk/src/utils/composer/helpers/step-response.ts +++ b/packages/core/workflows-sdk/src/utils/composer/helpers/step-response.ts @@ -1,5 +1,8 @@ -import { PermanentStepFailureError } from "@medusajs/orchestration" -import { isDefined, OrchestrationUtils } from "@medusajs/utils" +import { + PermanentStepFailureError, + SkipStepResponse, +} from "@medusajs/orchestration" +import { OrchestrationUtils, isDefined } from "@medusajs/utils" /** * This class is used to create the response returned by a step. A step return its data by returning an instance of `StepResponse`. @@ -116,6 +119,10 @@ export class StepResponse { throw new PermanentStepFailureError(message) } + static skip(): SkipStepResponse { + return new SkipStepResponse() + } + /** * @internal */ diff --git a/packages/core/workflows-sdk/src/utils/composer/when.ts b/packages/core/workflows-sdk/src/utils/composer/when.ts index 771f73f99d..ebbfe0dbe7 100644 --- a/packages/core/workflows-sdk/src/utils/composer/when.ts +++ b/packages/core/workflows-sdk/src/utils/composer/when.ts @@ -50,6 +50,7 @@ export function when(input, condition) { } delete global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition] + return ret }, }