feat(orchestration,workflows-sdk,core-flows): workflow cancel (#6778)

This commit is contained in:
Carlos R. L. Rodrigues
2024-03-22 11:03:06 -03:00
committed by GitHub
parent 3e85f4dd93
commit 7e93eda1a4
9 changed files with 271 additions and 22 deletions

View File

@@ -2086,4 +2086,40 @@ describe("Workflow composer", function () {
},
])
})
it("should cancel the workflow after completed", async () => {
const mockStep1Fn = jest.fn().mockImplementation(function (input) {
return new StepResponse({ obj: "return from 1" }, { data: "data" })
})
const mockCompensateSte1 = jest.fn().mockImplementation(function (input) {
return input
})
const step1 = createStep("step1", mockStep1Fn, mockCompensateSte1)
const workflow = createWorkflow("workflow1", function (input) {
return step1(input)
})
const workflowInput = { test: "payload1" }
const { transaction } = await workflow().run({
input: workflowInput,
throwOnError: false,
})
expect(mockStep1Fn).toHaveBeenCalledTimes(1)
expect(mockCompensateSte1).toHaveBeenCalledTimes(0)
await workflow().cancel({
transaction,
throwOnError: false,
})
expect(mockStep1Fn).toHaveBeenCalledTimes(1)
expect(mockCompensateSte1).toHaveBeenCalledTimes(1)
expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput)
expect(mockCompensateSte1.mock.calls[0][0]).toEqual({ data: "data" })
})
})

View File

@@ -1,5 +1,5 @@
import { exportWorkflow } from "../workflow-export"
import { createMedusaContainer } from "@medusajs/utils"
import { exportWorkflow } from "../workflow-export"
jest.mock("@medusajs/orchestration", () => {
return {
@@ -46,6 +46,17 @@ jest.mock("@medusajs/orchestration", () => {
}),
}
}),
cancel: jest.fn(() => {
return {
getErrors: jest.fn(),
getState: jest.fn(() => "reverted"),
getContext: jest.fn(() => {
return {
invoke: { result_step: "invoke_test" },
}
}),
}
}),
}
}),
}

View File

@@ -41,6 +41,14 @@ export type FlowRegisterStepFailureOptions<TData = unknown> = {
events?: DistributedTransactionEvents
}
export type FlowCancelOptions = {
transaction?: DistributedTransaction
transactionId?: string
context?: Context
throwOnError?: boolean
events?: DistributedTransactionEvents
}
export type WorkflowResult<TResult = unknown> = {
errors: TransactionStepError[]
transaction: DistributedTransaction
@@ -80,6 +88,7 @@ export type ExportedWorkflow<
TResultOverride extends undefined ? TResult : TResultOverride
>
>
cancel: (args?: FlowCancelOptions) => Promise<WorkflowResult>
}
export type MainExportedWorkflow<TData = unknown, TResult = unknown> = {
@@ -88,12 +97,12 @@ export type MainExportedWorkflow<TData = unknown, TResult = unknown> = {
container?: LoadedModule[] | MedusaContainer
): Omit<
LocalWorkflow,
"run" | "registerStepSuccess" | "registerStepFailure"
"run" | "registerStepSuccess" | "registerStepFailure" | "cancel"
> &
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>
/**
* You can also directly call run, registerStepSuccess and registerStepFailure on the exported workflow
* You can also directly call run, registerStepSuccess, registerStepFailure and cancel on the exported workflow
*/
run<TDataOverride = undefined, TResultOverride = undefined>(
@@ -131,6 +140,12 @@ export type MainExportedWorkflow<TData = unknown, TResult = unknown> = {
TResultOverride extends undefined ? TResult : TResultOverride
>
>
cancel(
args?: FlowCancelOptions & {
container?: LoadedModule[] | MedusaContainer
}
): Promise<WorkflowResult>
}
function createContextualWorkflowRunner<
@@ -152,7 +167,10 @@ function createContextualWorkflowRunner<
wrappedInput?: boolean
}
container?: LoadedModule[] | MedusaContainer
}): Omit<LocalWorkflow, "run" | "registerStepSuccess" | "registerStepFailure"> &
}): Omit<
LocalWorkflow,
"run" | "registerStepSuccess" | "registerStepFailure" | "cancel"
> &
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride> {
if (!container) {
container = MedusaModule.getLoadedModules().map(
@@ -165,10 +183,11 @@ function createContextualWorkflowRunner<
const originalRun = flow.run.bind(flow)
const originalRegisterStepSuccess = flow.registerStepSuccess.bind(flow)
const originalRegisterStepFailure = flow.registerStepFailure.bind(flow)
const originalCancel = flow.cancel.bind(flow)
const originalExecution = async (
method,
{ throwOnError, resultFrom },
{ throwOnError, resultFrom, isCancel = false },
...args
) => {
const transaction = await method.apply(method, args)
@@ -176,7 +195,14 @@ function createContextualWorkflowRunner<
const errors = transaction.getErrors(TransactionHandlerType.INVOKE)
const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED]
if (failedStatus.includes(transaction.getState()) && throwOnError) {
const isCancelled =
isCancel && transaction.getState() === TransactionState.REVERTED
if (
!isCancelled &&
failedStatus.includes(transaction.getState()) &&
throwOnError
) {
const errorMessage = errors
?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`)
?.join(`${EOL}`)
@@ -316,6 +342,39 @@ function createContextualWorkflowRunner<
}
flow.registerStepFailure = newRegisterStepFailure as any
const newCancel = async (
{
transaction,
transactionId,
context: outerContext,
throwOnError,
events,
}: FlowCancelOptions = {
throwOnError: true,
}
) => {
throwOnError ??= true
const context = {
...outerContext,
transactionId,
__type: MedusaContextType,
}
return await originalExecution(
originalCancel,
{
throwOnError,
resultFrom: undefined,
isCancel: true,
},
transaction ?? transactionId,
context,
events
)
}
flow.cancel = newCancel as any
return flow as unknown as LocalWorkflow &
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>
}
@@ -335,7 +394,7 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
container?: LoadedModule[] | MedusaContainer
): Omit<
LocalWorkflow,
"run" | "registerStepSuccess" | "registerStepFailure"
"run" | "registerStepSuccess" | "registerStepFailure" | "cancel"
> &
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride> {
return createContextualWorkflowRunner<
@@ -353,11 +412,15 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
}
const buildRunnerFn = <
TAction extends "run" | "registerStepSuccess" | "registerStepFailure",
TAction extends
| "run"
| "registerStepSuccess"
| "registerStepFailure"
| "cancel",
TDataOverride,
TResultOverride
>(
action: "run" | "registerStepSuccess" | "registerStepFailure",
action: "run" | "registerStepSuccess" | "registerStepFailure" | "cancel",
container?: LoadedModule[] | MedusaContainer
) => {
const contextualRunner = createContextualWorkflowRunner<
@@ -467,6 +530,21 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
)(inputArgs)
}
exportedWorkflow.cancel = async (
args?: FlowCancelOptions & {
container?: LoadedModule[] | MedusaContainer
}
): Promise<WorkflowResult> => {
const container = args?.container
delete args?.container
const inputArgs = { ...args } as FlowCancelOptions
return await buildRunnerFn<"cancel", unknown, unknown>(
"cancel",
container
)(inputArgs)
}
MedusaWorkflow.registerWorkflow(workflowId, exportedWorkflow)
return exportedWorkflow
}

View File

@@ -2,7 +2,7 @@ import {
TransactionStepsDefinition,
WorkflowManager,
} from "@medusajs/orchestration"
import { OrchestrationUtils, isString } from "@medusajs/utils"
import { OrchestrationUtils, deepCopy, isString } from "@medusajs/utils"
import { ulid } from "ulid"
import { StepResponse, resolveValue } from "./helpers"
import { proxify } from "./helpers/proxy"
@@ -168,8 +168,8 @@ function applyStep<
stepOutput?.__type ===
OrchestrationUtils.SymbolWorkflowStepResponse
? stepOutput.compensateInput &&
JSON.parse(JSON.stringify(stepOutput.compensateInput))
: stepOutput && JSON.parse(JSON.stringify(stepOutput))
deepCopy(stepOutput.compensateInput)
: stepOutput && deepCopy(stepOutput)
const args = [invokeResult, executionContext]
const output = await compensateFn.apply(this, args)