fix(orchestration): Use the step definition max retries on set step failure (#13319)
* fix(orchestration): Use the step definition max retries on set step failure * Create sweet-turkeys-wait.md * allow to force permanent failure * update changeset
This commit is contained in:
committed by
GitHub
parent
cbaa403744
commit
ff152e7ace
8
.changeset/sweet-turkeys-wait.md
Normal file
8
.changeset/sweet-turkeys-wait.md
Normal file
@@ -0,0 +1,8 @@
|
||||
---
|
||||
"@medusajs/orchestration": patch
|
||||
"@medusajs/workflows-sdk": patch
|
||||
"@medusajs/workflow-engine-inmemory": patch
|
||||
"@medusajs/workflow-engine-redis": patch
|
||||
---
|
||||
|
||||
fix(orchestration): Use the step definition max retries on set step failure
|
||||
@@ -1795,12 +1795,14 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
handler,
|
||||
transaction,
|
||||
onLoad,
|
||||
forcePermanentFailure,
|
||||
}: {
|
||||
responseIdempotencyKey: string
|
||||
error?: Error | any
|
||||
handler?: TransactionStepHandler
|
||||
transaction?: DistributedTransactionType
|
||||
onLoad?: (transaction: DistributedTransactionType) => Promise<void> | void
|
||||
forcePermanentFailure?: boolean
|
||||
}): Promise<DistributedTransactionType> {
|
||||
const [curTransaction, step] =
|
||||
await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey(
|
||||
@@ -1822,7 +1824,8 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
curTransaction,
|
||||
step,
|
||||
error,
|
||||
0
|
||||
// On permanent failure, the step should not consider any retries
|
||||
forcePermanentFailure ? 0 : step.definition.maxRetries
|
||||
)
|
||||
|
||||
if (ret.transactionIsCancelling) {
|
||||
|
||||
@@ -467,7 +467,8 @@ export class LocalWorkflow {
|
||||
idempotencyKey: string,
|
||||
error?: Error | any,
|
||||
context?: Context,
|
||||
subscribe?: DistributedTransactionEvents
|
||||
subscribe?: DistributedTransactionEvents,
|
||||
forcePermanentFailure?: boolean
|
||||
): Promise<DistributedTransactionType> {
|
||||
this.medusaContext = context
|
||||
const { handler, orchestrator } = this.workflow
|
||||
@@ -483,6 +484,7 @@ export class LocalWorkflow {
|
||||
error,
|
||||
handler: handler(this.container_, context),
|
||||
onLoad: this.onLoad.bind(this),
|
||||
forcePermanentFailure,
|
||||
})
|
||||
|
||||
try {
|
||||
|
||||
@@ -8,7 +8,7 @@ import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
|
||||
|
||||
type BaseFlowRunOptions = {
|
||||
context?: Context
|
||||
resultFrom?: string | string[] | Symbol
|
||||
resultFrom?: string | Symbol
|
||||
throwOnError?: boolean
|
||||
logOnError?: boolean
|
||||
events?: DistributedTransactionEvents
|
||||
@@ -29,6 +29,7 @@ export type FlowRegisterStepFailureOptions<TData = unknown> =
|
||||
BaseFlowRunOptions & {
|
||||
idempotencyKey: string
|
||||
response?: TData
|
||||
forcePermanentFailure?: boolean
|
||||
}
|
||||
|
||||
export type FlowCancelOptions = {
|
||||
|
||||
@@ -71,6 +71,14 @@ function createContextualWorkflowRunner<
|
||||
resultFrom,
|
||||
isCancel = false,
|
||||
container: executionContainer,
|
||||
forcePermanentFailure,
|
||||
}: {
|
||||
throwOnError?: boolean
|
||||
logOnError?: boolean
|
||||
resultFrom?: string | Symbol
|
||||
isCancel?: boolean
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
forcePermanentFailure?: boolean
|
||||
},
|
||||
transactionOrIdOrIdempotencyKey: DistributedTransactionType | string,
|
||||
input: unknown,
|
||||
@@ -107,13 +115,15 @@ function createContextualWorkflowRunner<
|
||||
|
||||
context.isCancelling = isCancel
|
||||
|
||||
const args = [
|
||||
transactionOrIdOrIdempotencyKey,
|
||||
input,
|
||||
context,
|
||||
events,
|
||||
flowMetadata,
|
||||
]
|
||||
const args = [transactionOrIdOrIdempotencyKey, input, context, events]
|
||||
|
||||
if (method === originalRegisterStepFailure) {
|
||||
// Only available on registerStepFailure
|
||||
args.push(forcePermanentFailure)
|
||||
} else {
|
||||
args.push(flowMetadata)
|
||||
}
|
||||
|
||||
const transaction = (await method.apply(
|
||||
method,
|
||||
args
|
||||
@@ -161,7 +171,8 @@ function createContextualWorkflowRunner<
|
||||
})
|
||||
}
|
||||
} else {
|
||||
result = transaction.getContext().invoke?.[resultFrom]
|
||||
result =
|
||||
resultFrom && transaction.getContext().invoke?.[resultFrom.toString()]
|
||||
}
|
||||
|
||||
return {
|
||||
@@ -263,6 +274,7 @@ function createContextualWorkflowRunner<
|
||||
resultFrom,
|
||||
events,
|
||||
container,
|
||||
forcePermanentFailure,
|
||||
}: FlowRegisterStepFailureOptions = {
|
||||
idempotencyKey: "",
|
||||
}
|
||||
@@ -288,6 +300,7 @@ function createContextualWorkflowRunner<
|
||||
resultFrom,
|
||||
container,
|
||||
logOnError,
|
||||
forcePermanentFailure,
|
||||
},
|
||||
idempotencyKey,
|
||||
response,
|
||||
|
||||
@@ -40,6 +40,13 @@ type RegisterStepSuccessOptions<T> = Omit<
|
||||
"transactionId" | "input"
|
||||
>
|
||||
|
||||
type RegisterStepFailureOptions<T> = Omit<
|
||||
WorkflowOrchestratorRunOptions<T>,
|
||||
"transactionId" | "input"
|
||||
> & {
|
||||
forcePermanentFailure?: boolean
|
||||
}
|
||||
|
||||
type IdempotencyKeyParts = {
|
||||
workflowId: string
|
||||
transactionId: string
|
||||
@@ -450,7 +457,7 @@ export class WorkflowOrchestratorService {
|
||||
}: {
|
||||
idempotencyKey: string | IdempotencyKeyParts
|
||||
stepResponse: unknown
|
||||
options?: RegisterStepSuccessOptions<T>
|
||||
options?: RegisterStepFailureOptions<T>
|
||||
}) {
|
||||
const {
|
||||
context,
|
||||
@@ -458,6 +465,7 @@ export class WorkflowOrchestratorService {
|
||||
resultFrom,
|
||||
container,
|
||||
events: eventHandlers,
|
||||
forcePermanentFailure = false,
|
||||
} = options ?? {}
|
||||
|
||||
let { throwOnError } = options ?? {}
|
||||
@@ -484,6 +492,7 @@ export class WorkflowOrchestratorService {
|
||||
throwOnError: false,
|
||||
logOnError,
|
||||
events,
|
||||
forcePermanentFailure,
|
||||
response: stepResponse,
|
||||
container: container ?? this.container_,
|
||||
})
|
||||
|
||||
@@ -240,7 +240,9 @@ export class WorkflowsModuleService<
|
||||
}: {
|
||||
idempotencyKey: string | object
|
||||
stepResponse: unknown
|
||||
options?: Record<string, any>
|
||||
options?: Record<string, any> & {
|
||||
forcePermanentFailure?: boolean
|
||||
}
|
||||
},
|
||||
@MedusaContext() context: Context = {}
|
||||
) {
|
||||
|
||||
@@ -48,6 +48,13 @@ type RegisterStepSuccessOptions<T> = Omit<
|
||||
"transactionId" | "input"
|
||||
>
|
||||
|
||||
type RegisterStepFailureOptions<T> = Omit<
|
||||
WorkflowOrchestratorRunOptions<T>,
|
||||
"transactionId" | "input"
|
||||
> & {
|
||||
forcePermanentFailure?: boolean
|
||||
}
|
||||
|
||||
type IdempotencyKeyParts = {
|
||||
workflowId: string
|
||||
transactionId: string
|
||||
@@ -493,7 +500,7 @@ export class WorkflowOrchestratorService {
|
||||
}: {
|
||||
idempotencyKey: string | IdempotencyKeyParts
|
||||
stepResponse: unknown
|
||||
options?: RegisterStepSuccessOptions<T>
|
||||
options?: RegisterStepFailureOptions<T>
|
||||
}) {
|
||||
const {
|
||||
context,
|
||||
@@ -501,6 +508,7 @@ export class WorkflowOrchestratorService {
|
||||
resultFrom,
|
||||
container,
|
||||
events: eventHandlers,
|
||||
forcePermanentFailure,
|
||||
} = options ?? {}
|
||||
|
||||
let { throwOnError } = options ?? {}
|
||||
@@ -529,6 +537,7 @@ export class WorkflowOrchestratorService {
|
||||
events,
|
||||
response: stepResponse,
|
||||
container: container ?? this.container_,
|
||||
forcePermanentFailure,
|
||||
})
|
||||
|
||||
if (ret.transaction.hasFinished()) {
|
||||
|
||||
@@ -250,7 +250,9 @@ export class WorkflowsModuleService<
|
||||
}: {
|
||||
idempotencyKey: string | object
|
||||
stepResponse: unknown
|
||||
options?: Record<string, any>
|
||||
options?: Record<string, any> & {
|
||||
forcePermanentFailure?: boolean
|
||||
}
|
||||
},
|
||||
@MedusaContext() context: Context = {}
|
||||
) {
|
||||
|
||||
Reference in New Issue
Block a user