From ff152e7ace60232c02932bbd06a759e555a823d4 Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Thu, 28 Aug 2025 14:35:31 +0200 Subject: [PATCH] fix(orchestration): Use the step definition max retries on set step failure (#13319) * fix(orchestration): Use the step definition max retries on set step failure * Create sweet-turkeys-wait.md * allow to force permanent failure * update changeset --- .changeset/sweet-turkeys-wait.md | 8 +++++ .../transaction/transaction-orchestrator.ts | 5 +++- .../src/workflow/local-workflow.ts | 4 ++- .../core/workflows-sdk/src/helper/type.ts | 3 +- .../src/helper/workflow-export.ts | 29 ++++++++++++++----- .../src/services/workflow-orchestrator.ts | 11 ++++++- .../src/services/workflows-module.ts | 4 ++- .../src/services/workflow-orchestrator.ts | 11 ++++++- .../src/services/workflows-module.ts | 4 ++- 9 files changed, 64 insertions(+), 15 deletions(-) create mode 100644 .changeset/sweet-turkeys-wait.md diff --git a/.changeset/sweet-turkeys-wait.md b/.changeset/sweet-turkeys-wait.md new file mode 100644 index 0000000000..6742cadb1c --- /dev/null +++ b/.changeset/sweet-turkeys-wait.md @@ -0,0 +1,8 @@ +--- +"@medusajs/orchestration": patch +"@medusajs/workflows-sdk": patch +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +--- + +fix(orchestration): Use the step definition max retries on set step failure diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 870fae6ba5..e044e74731 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -1795,12 +1795,14 @@ export class TransactionOrchestrator extends EventEmitter { handler, transaction, onLoad, + forcePermanentFailure, }: { responseIdempotencyKey: string error?: Error | any handler?: TransactionStepHandler transaction?: DistributedTransactionType onLoad?: (transaction: DistributedTransactionType) => Promise | void + forcePermanentFailure?: boolean }): Promise { const [curTransaction, step] = await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey( @@ -1822,7 +1824,8 @@ export class TransactionOrchestrator extends EventEmitter { curTransaction, step, error, - 0 + // On permanent failure, the step should not consider any retries + forcePermanentFailure ? 0 : step.definition.maxRetries ) if (ret.transactionIsCancelling) { diff --git a/packages/core/orchestration/src/workflow/local-workflow.ts b/packages/core/orchestration/src/workflow/local-workflow.ts index 08dde264f7..ca995c725e 100644 --- a/packages/core/orchestration/src/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/workflow/local-workflow.ts @@ -467,7 +467,8 @@ export class LocalWorkflow { idempotencyKey: string, error?: Error | any, context?: Context, - subscribe?: DistributedTransactionEvents + subscribe?: DistributedTransactionEvents, + forcePermanentFailure?: boolean ): Promise { this.medusaContext = context const { handler, orchestrator } = this.workflow @@ -483,6 +484,7 @@ export class LocalWorkflow { error, handler: handler(this.container_, context), onLoad: this.onLoad.bind(this), + forcePermanentFailure, }) try { diff --git a/packages/core/workflows-sdk/src/helper/type.ts b/packages/core/workflows-sdk/src/helper/type.ts index 8de3f53638..33c3adcc78 100644 --- a/packages/core/workflows-sdk/src/helper/type.ts +++ b/packages/core/workflows-sdk/src/helper/type.ts @@ -8,7 +8,7 @@ import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" type BaseFlowRunOptions = { context?: Context - resultFrom?: string | string[] | Symbol + resultFrom?: string | Symbol throwOnError?: boolean logOnError?: boolean events?: DistributedTransactionEvents @@ -29,6 +29,7 @@ export type FlowRegisterStepFailureOptions = BaseFlowRunOptions & { idempotencyKey: string response?: TData + forcePermanentFailure?: boolean } export type FlowCancelOptions = { diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index 3d3d6122cb..b5aef4c482 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -71,6 +71,14 @@ function createContextualWorkflowRunner< resultFrom, isCancel = false, container: executionContainer, + forcePermanentFailure, + }: { + throwOnError?: boolean + logOnError?: boolean + resultFrom?: string | Symbol + isCancel?: boolean + container?: LoadedModule[] | MedusaContainer + forcePermanentFailure?: boolean }, transactionOrIdOrIdempotencyKey: DistributedTransactionType | string, input: unknown, @@ -107,13 +115,15 @@ function createContextualWorkflowRunner< context.isCancelling = isCancel - const args = [ - transactionOrIdOrIdempotencyKey, - input, - context, - events, - flowMetadata, - ] + const args = [transactionOrIdOrIdempotencyKey, input, context, events] + + if (method === originalRegisterStepFailure) { + // Only available on registerStepFailure + args.push(forcePermanentFailure) + } else { + args.push(flowMetadata) + } + const transaction = (await method.apply( method, args @@ -161,7 +171,8 @@ function createContextualWorkflowRunner< }) } } else { - result = transaction.getContext().invoke?.[resultFrom] + result = + resultFrom && transaction.getContext().invoke?.[resultFrom.toString()] } return { @@ -263,6 +274,7 @@ function createContextualWorkflowRunner< resultFrom, events, container, + forcePermanentFailure, }: FlowRegisterStepFailureOptions = { idempotencyKey: "", } @@ -288,6 +300,7 @@ function createContextualWorkflowRunner< resultFrom, container, logOnError, + forcePermanentFailure, }, idempotencyKey, response, diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index bbf3ca1565..7b4d71e8bd 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -40,6 +40,13 @@ type RegisterStepSuccessOptions = Omit< "transactionId" | "input" > +type RegisterStepFailureOptions = Omit< + WorkflowOrchestratorRunOptions, + "transactionId" | "input" +> & { + forcePermanentFailure?: boolean +} + type IdempotencyKeyParts = { workflowId: string transactionId: string @@ -450,7 +457,7 @@ export class WorkflowOrchestratorService { }: { idempotencyKey: string | IdempotencyKeyParts stepResponse: unknown - options?: RegisterStepSuccessOptions + options?: RegisterStepFailureOptions }) { const { context, @@ -458,6 +465,7 @@ export class WorkflowOrchestratorService { resultFrom, container, events: eventHandlers, + forcePermanentFailure = false, } = options ?? {} let { throwOnError } = options ?? {} @@ -484,6 +492,7 @@ export class WorkflowOrchestratorService { throwOnError: false, logOnError, events, + forcePermanentFailure, response: stepResponse, container: container ?? this.container_, }) diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts index 563f1d8025..1f9084a9d5 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts @@ -240,7 +240,9 @@ export class WorkflowsModuleService< }: { idempotencyKey: string | object stepResponse: unknown - options?: Record + options?: Record & { + forcePermanentFailure?: boolean + } }, @MedusaContext() context: Context = {} ) { diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index 70025a67ce..2d71276cb0 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -48,6 +48,13 @@ type RegisterStepSuccessOptions = Omit< "transactionId" | "input" > +type RegisterStepFailureOptions = Omit< + WorkflowOrchestratorRunOptions, + "transactionId" | "input" +> & { + forcePermanentFailure?: boolean +} + type IdempotencyKeyParts = { workflowId: string transactionId: string @@ -493,7 +500,7 @@ export class WorkflowOrchestratorService { }: { idempotencyKey: string | IdempotencyKeyParts stepResponse: unknown - options?: RegisterStepSuccessOptions + options?: RegisterStepFailureOptions }) { const { context, @@ -501,6 +508,7 @@ export class WorkflowOrchestratorService { resultFrom, container, events: eventHandlers, + forcePermanentFailure, } = options ?? {} let { throwOnError } = options ?? {} @@ -529,6 +537,7 @@ export class WorkflowOrchestratorService { events, response: stepResponse, container: container ?? this.container_, + forcePermanentFailure, }) if (ret.transaction.hasFinished()) { diff --git a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts index 445c614b4c..27ca9a9ab5 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts @@ -250,7 +250,9 @@ export class WorkflowsModuleService< }: { idempotencyKey: string | object stepResponse: unknown - options?: Record + options?: Record & { + forcePermanentFailure?: boolean + } }, @MedusaContext() context: Context = {} ) {