feat(workflows-sdk): Configurable retries upon step creation (#5728)

**What**
- Allow to create step that can be configured to have a max retry
- Step end retry mechanism on permanent failure

Also added an API to override a step configuration from within the createWorkflow
```ts
const step = createStep({ name: "step", maxRetries: 3 }, async (_, context) => {
  return new StepResponse({ output: "output" })
})

const workflow = createWorkflow("workflow", function () {
  const res = step().config({ maxRetries: 5 }) // This will override the original maxRetries of 3
})
```

**NOTE**
We can maybe find another name than config on the step workflow data to override the step config.
This commit is contained in:
Adrien de Peretti
2023-12-19 10:38:27 +01:00
committed by GitHub
parent 1a2f513d53
commit 9cc787cac4
21 changed files with 255 additions and 92 deletions

View File

@@ -1,12 +1,4 @@
import {
resolveValue,
StepResponse,
SymbolMedusaWorkflowComposerContext,
SymbolWorkflowStep,
SymbolWorkflowStepBind,
SymbolWorkflowStepResponse,
SymbolWorkflowWorkflowData,
} from "./helpers"
import { resolveValue, StepResponse } from "./helpers"
import {
CreateWorkflowComposerContext,
StepExecutionContext,
@@ -15,6 +7,8 @@ import {
WorkflowData,
} from "./type"
import { proxify } from "./helpers/proxy"
import { TransactionStepsDefinition } from "@medusajs/orchestration"
import { isString, OrchestrationUtils } from "@medusajs/utils"
/**
* The type of invocation function passed to a step.
@@ -75,6 +69,7 @@ interface ApplyStepOptions<
TInvokeResultCompensateInput
> {
stepName: string
stepConfig?: TransactionStepsDefinition
input: TStepInputs
invokeFn: InvokeFn<
TInvokeInput,
@@ -91,6 +86,7 @@ interface ApplyStepOptions<
* This is where the inputs and context are passed to the underlying invoke and compensate function.
*
* @param stepName
* @param stepConfig
* @param input
* @param invokeFn
* @param compensateFn
@@ -104,6 +100,7 @@ function applyStep<
TInvokeResultCompensateInput
>({
stepName,
stepConfig = {},
input,
invokeFn,
compensateFn,
@@ -135,12 +132,12 @@ function applyStep<
)
const stepResponseJSON =
stepResponse?.__type === SymbolWorkflowStepResponse
stepResponse?.__type === OrchestrationUtils.SymbolWorkflowStepResponse
? stepResponse.toJSON()
: stepResponse
return {
__type: SymbolWorkflowWorkflowData,
__type: OrchestrationUtils.SymbolWorkflowWorkflowData,
output: stepResponseJSON,
}
},
@@ -154,7 +151,8 @@ function applyStep<
const stepOutput = transactionContext.invoke[stepName]?.output
const invokeResult =
stepOutput?.__type === SymbolWorkflowStepResponse
stepOutput?.__type ===
OrchestrationUtils.SymbolWorkflowStepResponse
? stepOutput.compensateInput &&
JSON.parse(JSON.stringify(stepOutput.compensateInput))
: stepOutput && JSON.parse(JSON.stringify(stepOutput))
@@ -168,14 +166,21 @@ function applyStep<
: undefined,
}
this.flow.addAction(stepName, {
noCompensation: !compensateFn,
})
stepConfig!.noCompensation = !compensateFn
this.flow.addAction(stepName, stepConfig)
this.handlers.set(stepName, handler)
const ret = {
__type: SymbolWorkflowStep,
__type: OrchestrationUtils.SymbolWorkflowStep,
__step__: stepName,
config: (config: Pick<TransactionStepsDefinition, "maxRetries">) => {
this.flow.replaceAction(stepName, stepName, {
...stepConfig,
...config,
})
return proxify(ret)
},
}
return proxify(ret)
@@ -236,9 +241,11 @@ export function createStep<
TInvokeResultCompensateInput
>(
/**
* The name of the step.
* The name of the step or its configuration (currently support maxRetries).
*/
name: string,
nameOrConfig:
| string
| ({ name: string } & Pick<TransactionStepsDefinition, "maxRetries">),
/**
* An invocation function that will be executed when the workflow is executed. The function must return an instance of {@link StepResponse}. The constructor of {@link StepResponse}
* accepts the output of the step as a first argument, and optionally as a second argument the data to be passed to the compensation function as a parameter.
@@ -256,12 +263,14 @@ export function createStep<
*/
compensateFn?: CompensateFn<TInvokeResultCompensateInput>
): StepFunction<TInvokeInput, TInvokeResultOutput> {
const stepName = name ?? invokeFn.name
const stepName =
(isString(nameOrConfig) ? nameOrConfig : nameOrConfig.name) ?? invokeFn.name
const config = isString(nameOrConfig) ? {} : nameOrConfig
const returnFn = function (input: {
[K in keyof TInvokeInput]: WorkflowData<TInvokeInput[K]>
}): WorkflowData<TInvokeResultOutput> {
if (!global[SymbolMedusaWorkflowComposerContext]) {
if (!global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext]) {
throw new Error(
"createStep must be used inside a createWorkflow definition"
)
@@ -269,7 +278,7 @@ export function createStep<
const stepBinder = (
global[
SymbolMedusaWorkflowComposerContext
OrchestrationUtils.SymbolMedusaWorkflowComposerContext
] as CreateWorkflowComposerContext
).stepBinder
@@ -281,6 +290,7 @@ export function createStep<
TInvokeResultCompensateInput
>({
stepName,
stepConfig: config,
input,
invokeFn,
compensateFn,
@@ -288,7 +298,7 @@ export function createStep<
)
}
returnFn.__type = SymbolWorkflowStepBind
returnFn.__type = OrchestrationUtils.SymbolWorkflowStepBind
returnFn.__step__ = stepName
return returnFn as unknown as StepFunction<TInvokeInput, TInvokeResultOutput>

View File

@@ -4,13 +4,9 @@ import {
WorkflowManager,
} from "@medusajs/orchestration"
import { LoadedModule, MedusaContainer } from "@medusajs/types"
import { FlowRunOptions, WorkflowResult, exportWorkflow } from "../../helper"
import {
SymbolInputReference,
SymbolMedusaWorkflowComposerContext,
SymbolWorkflowStep,
resolveValue,
} from "./helpers"
import { OrchestrationUtils } from "@medusajs/utils"
import { exportWorkflow, FlowRunOptions, WorkflowResult } from "../../helper"
import { resolveValue } from "./helpers"
import { proxify } from "./helpers/proxy"
import {
CreateWorkflowComposerContext,
@@ -18,7 +14,7 @@ import {
WorkflowDataProperties,
} from "./type"
global[SymbolMedusaWorkflowComposerContext] = null
global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = null
/**
* An exported workflow, which is the type of a workflow constructed by the {@link createWorkflow} function. The exported workflow can be invoked to create
@@ -189,16 +185,16 @@ export function createWorkflow<
},
}
global[SymbolMedusaWorkflowComposerContext] = context
global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = context
const inputPlaceHolder = proxify<WorkflowData>({
__type: SymbolInputReference,
__type: OrchestrationUtils.SymbolInputReference,
__step__: "",
})
const returnedStep = composer.apply(context, [inputPlaceHolder])
delete global[SymbolMedusaWorkflowComposerContext]
delete global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext]
WorkflowManager.update(name, context.flow, handlers)
@@ -221,7 +217,7 @@ export function createWorkflow<
> => {
args ??= {}
args.resultFrom ??=
returnedStep?.__type === SymbolWorkflowStep
returnedStep?.__type === OrchestrationUtils.SymbolWorkflowStep
? returnedStep.__step__
: undefined

View File

@@ -1,3 +1,2 @@
export * from "./step-response"
export * from "./symbol"
export * from "./resolve-value"
export * from "./resolve-value"

View File

@@ -1,6 +1,6 @@
import { transform } from "../transform"
import { WorkflowData, WorkflowTransactionContext } from "../type"
import { SymbolInputReference, SymbolWorkflowStepTransformer } from "./symbol"
import { OrchestrationUtils } from "@medusajs/utils"
import { resolveValue } from "./resolve-value"
export function proxify<T>(obj: WorkflowData<any>): T {
@@ -13,8 +13,8 @@ export function proxify<T>(obj: WorkflowData<any>): T {
return transform(target[prop], async function (input, context) {
const { invoke } = context as WorkflowTransactionContext
let output =
target.__type === SymbolInputReference ||
target.__type === SymbolWorkflowStepTransformer
target.__type === OrchestrationUtils.SymbolInputReference ||
target.__type === OrchestrationUtils.SymbolWorkflowStepTransformer
? target
: invoke?.[obj.__step__]?.output

View File

@@ -1,29 +1,26 @@
import { promiseAll } from "@medusajs/utils"
import {
SymbolInputReference,
SymbolWorkflowHook,
SymbolWorkflowStep,
SymbolWorkflowStepResponse,
SymbolWorkflowStepTransformer,
} from "./symbol"
import { OrchestrationUtils, promiseAll } from "@medusajs/utils"
async function resolveProperty(property, transactionContext) {
const { invoke: invokeRes } = transactionContext
if (property?.__type === SymbolInputReference) {
if (property?.__type === OrchestrationUtils.SymbolInputReference) {
return transactionContext.payload
} else if (property?.__type === SymbolWorkflowStepTransformer) {
} else if (
property?.__type === OrchestrationUtils.SymbolWorkflowStepTransformer
) {
return await property.__resolver(transactionContext)
} else if (property?.__type === SymbolWorkflowHook) {
} else if (property?.__type === OrchestrationUtils.SymbolWorkflowHook) {
return await property.__value(transactionContext)
} else if (property?.__type === SymbolWorkflowStep) {
} else if (property?.__type === OrchestrationUtils.SymbolWorkflowStep) {
const output = invokeRes[property.__step__]?.output
if (output?.__type === SymbolWorkflowStepResponse) {
if (output?.__type === OrchestrationUtils.SymbolWorkflowStepResponse) {
return output.output
}
return output
} else if (property?.__type === SymbolWorkflowStepResponse) {
} else if (
property?.__type === OrchestrationUtils.SymbolWorkflowStepResponse
) {
return property.output
} else {
return property

View File

@@ -1,4 +1,5 @@
import { SymbolWorkflowStepResponse } from "./symbol"
import { OrchestrationUtils } from "@medusajs/utils"
import { PermanentStepFailureError } from "@medusajs/orchestration"
/**
* This class is used to create the response returned by a step. A step return its data by returning an instance of `StepResponse`.
@@ -9,7 +10,7 @@ import { SymbolWorkflowStepResponse } from "./symbol"
* as that of `TOutput`.
*/
export class StepResponse<TOutput, TCompensateInput = TOutput> {
readonly #__type = SymbolWorkflowStepResponse
readonly #__type = OrchestrationUtils.SymbolWorkflowStepResponse
readonly #output: TOutput
readonly #compensateInput?: TCompensateInput
@@ -35,6 +36,15 @@ export class StepResponse<TOutput, TCompensateInput = TOutput> {
this.#compensateInput = (compensateInput ?? output) as TCompensateInput
}
/**
* Creates a StepResponse that indicates that the step has failed and the retry mechanism should not kick in anymore.
*
* @param message - An optional message to be logged. Default to `Permanent failure`.
*/
static permanentFailure(message = "Permanent failure"): never {
throw new PermanentStepFailureError(message)
}
/**
* @internal
*/

View File

@@ -1,12 +0,0 @@
export const SymbolMedusaWorkflowComposerContext = Symbol.for(
"MedusaWorkflowComposerContext"
)
export const SymbolInputReference = Symbol.for("WorkflowInputReference")
export const SymbolWorkflowStep = Symbol.for("WorkflowStep")
export const SymbolWorkflowHook = Symbol.for("WorkflowHook")
export const SymbolWorkflowWorkflowData = Symbol.for("WorkflowWorkflowData")
export const SymbolWorkflowStepResponse = Symbol.for("WorkflowStepResponse")
export const SymbolWorkflowStepBind = Symbol.for("WorkflowStepBind")
export const SymbolWorkflowStepTransformer = Symbol.for(
"WorkflowStepTransformer"
)

View File

@@ -1,8 +1,5 @@
import {
resolveValue,
SymbolMedusaWorkflowComposerContext,
SymbolWorkflowHook,
} from "./helpers"
import { OrchestrationUtils } from "@medusajs/utils"
import { resolveValue } from "./helpers"
import {
CreateWorkflowComposerContext,
StepExecutionContext,
@@ -104,7 +101,9 @@ export function hook<TOutput>(
value: any
): WorkflowData<TOutput> {
const hookBinder = (
global[SymbolMedusaWorkflowComposerContext] as CreateWorkflowComposerContext
global[
OrchestrationUtils.SymbolMedusaWorkflowComposerContext
] as CreateWorkflowComposerContext
).hookBinder
return hookBinder(name, function (context) {
@@ -130,7 +129,7 @@ export function hook<TOutput>(
}
return finalResult
},
__type: SymbolWorkflowHook,
__type: OrchestrationUtils.SymbolWorkflowHook,
}
})
}

View File

@@ -3,7 +3,6 @@ export * from "./create-workflow"
export * from "./hook"
export * from "./parallelize"
export * from "./helpers/resolve-value"
export * from "./helpers/symbol"
export * from "./helpers/step-response"
export * from "./transform"
export * from "./type"

View File

@@ -1,5 +1,5 @@
import { CreateWorkflowComposerContext, WorkflowData } from "./type"
import { SymbolMedusaWorkflowComposerContext } from "./helpers"
import { OrchestrationUtils } from "@medusajs/utils"
/**
* This function is used to run multiple steps in parallel. The result of each step will be returned as part of the result array.
@@ -43,14 +43,16 @@ import { SymbolMedusaWorkflowComposerContext } from "./helpers"
export function parallelize<TResult extends WorkflowData[]>(
...steps: TResult
): TResult {
if (!global[SymbolMedusaWorkflowComposerContext]) {
if (!global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext]) {
throw new Error(
"parallelize must be used inside a createWorkflow definition"
)
}
const parallelizeBinder = (
global[SymbolMedusaWorkflowComposerContext] as CreateWorkflowComposerContext
global[
OrchestrationUtils.SymbolMedusaWorkflowComposerContext
] as CreateWorkflowComposerContext
).parallelizeBinder
const resultSteps = steps.map((step) => step)

View File

@@ -1,6 +1,7 @@
import { resolveValue, SymbolWorkflowStepTransformer } from "./helpers"
import { resolveValue } from "./helpers"
import { StepExecutionContext, WorkflowData } from "./type"
import { proxify } from "./helpers/proxy"
import { OrchestrationUtils } from "@medusajs/utils"
type Func1<T extends object | WorkflowData, U> = (
input: T extends WorkflowData<infer U>
@@ -163,7 +164,7 @@ export function transform(
...functions: Function[]
): unknown {
const ret = {
__type: SymbolWorkflowStepTransformer,
__type: OrchestrationUtils.SymbolWorkflowStepTransformer,
__resolver: undefined,
}

View File

@@ -2,18 +2,15 @@ import {
OrchestratorBuilder,
TransactionContext as OriginalWorkflowTransactionContext,
TransactionPayload,
TransactionStepsDefinition,
WorkflowHandler,
} from "@medusajs/orchestration"
import { Context, MedusaContainer } from "@medusajs/types"
export type StepFunctionResult<TOutput extends unknown | unknown[] = unknown> =
(this: CreateWorkflowComposerContext) => TOutput extends []
? [
...WorkflowData<{
[K in keyof TOutput]: TOutput[number][K]
}>[]
]
: WorkflowData<{ [K in keyof TOutput]: TOutput[K] }>
(
this: CreateWorkflowComposerContext
) => WorkflowData<{ [K in keyof TOutput]: TOutput[K] }>
/**
* A step function to be used in a workflow.
@@ -24,7 +21,13 @@ export type StepFunctionResult<TOutput extends unknown | unknown[] = unknown> =
export type StepFunction<TInput extends object = object, TOutput = unknown> = {
(input: { [K in keyof TInput]: WorkflowData<TInput[K]> }): WorkflowData<{
[K in keyof TOutput]: TOutput[K]
}>
}> & {
config(
config: Pick<TransactionStepsDefinition, "maxRetries">
): WorkflowData<{
[K in keyof TOutput]: TOutput[K]
}>
}
} & WorkflowDataProperties<{
[K in keyof TOutput]: TOutput[K]
}>