chore(orchestration): add support for autoRetry, maxAwaitingRetries, retryStep (#13391)

RESOLVES CORE-1163
RESOLVES CORE-1164

**What**

### Add support for non auto retryable steps.

When marking a step with `maxRetries`, when it will fail it will be marked as temporary failure and then retry itself automatically. Thats the default behaviour, if you now add `autoRetry: false`, when the step will fail it will be marked as temporary failure but not retry automatically. you can now call the workflow engine run to resume the workflow from the failing step to be retried.

### Add support for `maxAwaitingRetries`

When setting `retyIntervalAwaiting` a step that is awaiting will be retried after the specified interval without maximun retry. Now you can set `maxAwaitingRetries` to force a maximum awaiting retry number

### Add support to manually retry an awaiting step

In some scenario, either a machine dies while a step is executing or a step is taking longer than expected, you can now call `retryStep` on the workflow engine to force a retry of the step that is supposedly stucked
This commit is contained in:
Adrien de Peretti
2025-09-08 14:46:30 +02:00
committed by GitHub
parent ac5e23b96c
commit d7692100e7
28 changed files with 1366 additions and 64 deletions

View File

@@ -47,6 +47,11 @@ type RegisterStepFailureOptions<T> = Omit<
forcePermanentFailure?: boolean
}
type RetryStepOptions<T> = Omit<
WorkflowOrchestratorRunOptions<T>,
"transactionId" | "input" | "resultFrom"
>
type IdempotencyKeyParts = {
workflowId: string
transactionId: string
@@ -379,6 +384,72 @@ export class WorkflowOrchestratorService {
return transaction
}
async retryStep<T = unknown>({
idempotencyKey,
options,
}: {
idempotencyKey: string | IdempotencyKeyParts
options?: RetryStepOptions<T>
}) {
const {
context,
logOnError,
container,
events: eventHandlers,
} = options ?? {}
let { throwOnError } = options ?? {}
throwOnError ??= true
const [idempotencyKey_, { workflowId, transactionId }] =
this.buildIdempotencyKeyAndParts(idempotencyKey)
const exportedWorkflow: any = MedusaWorkflow.getWorkflow(workflowId)
if (!exportedWorkflow) {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const events = this.buildWorkflowEvents({
customEventHandlers: eventHandlers,
transactionId,
workflowId,
})
const ret = await exportedWorkflow.retryStep({
idempotencyKey: idempotencyKey_,
context,
throwOnError: false,
logOnError,
events,
container: container ?? this.container_,
})
if (ret.transaction.hasFinished()) {
const { result, errors } = ret
this.notify({
eventType: "onFinish",
workflowId,
transactionId,
state: ret.transaction.getFlow().state as TransactionState,
result,
errors,
})
await this.triggerParentStep(ret.transaction, result)
}
if (throwOnError && (ret.thrownError || ret.errors?.length)) {
if (ret.thrownError) {
throw ret.thrownError
}
throw ret.errors[0].error
}
return ret
}
async setStepSuccess<T = unknown>({
idempotencyKey,
stepResponse,

View File

@@ -3,6 +3,7 @@ import {
DAL,
FilterableWorkflowExecutionProps,
FindConfig,
IdempotencyKeyParts,
InferEntityType,
InternalModuleDeclaration,
ModulesSdkTypes,
@@ -206,6 +207,29 @@ export class WorkflowsModuleService<
)
}
@InjectSharedContext()
async retryStep(
{
idempotencyKey,
options,
}: {
idempotencyKey: string | IdempotencyKeyParts
options?: Record<string, any>
},
@MedusaContext() context: Context = {}
) {
const options_ = JSON.parse(JSON.stringify(options ?? {}))
const { manager, transactionManager, ...restContext } = context
options_.context ??= restContext
return await this.workflowOrchestratorService_.retryStep({
idempotencyKey,
options: options_,
})
}
@InjectSharedContext()
async setStepSuccess(
{

View File

@@ -5,6 +5,7 @@ import {
SchedulerOptions,
SkipCancelledExecutionError,
SkipExecutionError,
SkipStepAlreadyFinishedError,
TransactionCheckpoint,
TransactionContext,
TransactionFlow,
@@ -392,6 +393,53 @@ export class InMemoryDistributedTransactionStorage
throw new SkipExecutionError("Already finished by another execution")
}
let currentFlowLatestExecutedStep: TransactionStep | undefined
const currentFlowSteps = Object.values(currentFlow.steps || {})
for (let i = currentFlowSteps.length - 1; i >= 0; i--) {
if (currentFlowSteps[i].lastAttempt) {
currentFlowLatestExecutedStep = currentFlowSteps[i]
break
}
}
let latestUpdatedFlowLatestExecutedStep: TransactionStep | undefined
const latestUpdatedFlowSteps = Object.values(latestUpdatedFlow.steps || {})
for (let i = latestUpdatedFlowSteps.length - 1; i >= 0; i--) {
if (latestUpdatedFlowSteps[i].lastAttempt) {
latestUpdatedFlowLatestExecutedStep = latestUpdatedFlowSteps[i]
break
}
}
/**
* The current flow and the latest updated flow have the same latest executed step.
*/
const isSameLatestExecutedStep =
currentFlowLatestExecutedStep &&
latestUpdatedFlowLatestExecutedStep &&
currentFlowLatestExecutedStep?.id ===
latestUpdatedFlowLatestExecutedStep?.id
/**
* The current flow's latest executed step has a last attempt ahead of the latest updated
* flow's latest executed step. Therefor it is fine, otherwise another execution has already
* finished the step.
*/
const isCurrentLatestExecutedStepLastAttemptAhead =
currentFlowLatestExecutedStep?.lastAttempt &&
latestUpdatedFlowLatestExecutedStep?.lastAttempt &&
currentFlowLatestExecutedStep.lastAttempt >=
latestUpdatedFlowLatestExecutedStep.lastAttempt
if (
isSameLatestExecutedStep &&
!isCurrentLatestExecutedStepLastAttemptAhead
) {
throw new SkipStepAlreadyFinishedError(
"Step already in execution ahead of the current one"
)
}
// First ensure that the latest execution was not cancelled, otherwise we skip the execution
const latestTransactionCancelledAt = latestUpdatedFlow.cancelledAt
const currentTransactionCancelledAt = currentFlow.cancelledAt
@@ -405,13 +453,6 @@ export class InMemoryDistributedTransactionStorage
)
}
const currentFlowSteps = Object.values(currentFlow.steps || {})
const latestUpdatedFlowSteps = latestUpdatedFlow.steps
? Object.values(
latestUpdatedFlow.steps as Record<string, TransactionStep>
)
: []
// Predefined states for quick lookup
const invokingStates = [
TransactionStepState.INVOKING,