From d7692100e7a2b2f078756cac9ca2b33784d3d1ff Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Mon, 8 Sep 2025 14:46:30 +0200 Subject: [PATCH] chore(orchestration): add support for autoRetry, maxAwaitingRetries, retryStep (#13391) RESOLVES CORE-1163 RESOLVES CORE-1164 **What** ### Add support for non auto retryable steps. When marking a step with `maxRetries`, when it will fail it will be marked as temporary failure and then retry itself automatically. Thats the default behaviour, if you now add `autoRetry: false`, when the step will fail it will be marked as temporary failure but not retry automatically. you can now call the workflow engine run to resume the workflow from the failing step to be retried. ### Add support for `maxAwaitingRetries` When setting `retyIntervalAwaiting` a step that is awaiting will be retried after the specified interval without maximun retry. Now you can set `maxAwaitingRetries` to force a maximum awaiting retry number ### Add support to manually retry an awaiting step In some scenario, either a machine dies while a step is executing or a step is taking longer than expected, you can now call `retryStep` on the workflow engine to force a retry of the step that is supposedly stucked --- .changeset/many-paws-tease.md | 8 + .../transaction/transaction-orchestrator.ts | 110 +++++++++++ .../orchestration/src/transaction/errors.ts | 16 ++ .../transaction/transaction-orchestrator.ts | 117 ++++++++++-- .../src/transaction/transaction-step.ts | 8 +- .../orchestration/src/transaction/types.ts | 12 ++ .../src/workflow/local-workflow.ts | 27 +++ .../http/workflow-execution/admin/entities.ts | 9 + .../core/types/src/workflows-sdk/service.ts | 11 ++ .../core/workflows-sdk/src/helper/type.ts | 5 + .../src/helper/workflow-export.ts | 76 +++++++- .../workflows-sdk/src/utils/composer/type.ts | 6 +- .../__fixtures__/workflow_1_auto_retries.ts | 56 ++++++ .../workflow_1_auto_retries_false.ts | 57 ++++++ .../workflow_1_manual_retry_step.ts | 60 ++++++ .../integration-tests/__tests__/index.spec.ts | 177 ++++++++++++++++-- .../integration-tests/__tests__/race.spec.ts | 16 +- .../src/services/workflow-orchestrator.ts | 71 +++++++ .../src/services/workflows-module.ts | 24 +++ .../utils/workflow-orchestrator-storage.ts | 55 +++++- .../__fixtures__/workflow_1_auto_retries.ts | 56 ++++++ .../workflow_1_auto_retries_false.ts | 57 ++++++ .../workflow_1_manual_retry_step.ts | 60 ++++++ .../integration-tests/__tests__/index.spec.ts | 171 ++++++++++++++++- .../integration-tests/__tests__/race.spec.ts | 14 +- .../src/services/workflow-orchestrator.ts | 72 +++++++ .../src/services/workflows-module.ts | 24 +++ .../utils/workflow-orchestrator-storage.ts | 55 +++++- 28 files changed, 1366 insertions(+), 64 deletions(-) create mode 100644 .changeset/many-paws-tease.md create mode 100644 packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_auto_retries.ts create mode 100644 packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_auto_retries_false.ts create mode 100644 packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_manual_retry_step.ts create mode 100644 packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_auto_retries.ts create mode 100644 packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_auto_retries_false.ts create mode 100644 packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_manual_retry_step.ts diff --git a/.changeset/many-paws-tease.md b/.changeset/many-paws-tease.md new file mode 100644 index 0000000000..eb78a116d8 --- /dev/null +++ b/.changeset/many-paws-tease.md @@ -0,0 +1,8 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/orchestration": patch +"@medusajs/types": patch +--- + +chore(orchestration): add support for autoRetry step configuration diff --git a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts index 2c1ace528b..547c93ad23 100644 --- a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts @@ -648,6 +648,116 @@ describe("Transaction Orchestrator", () => { ) }) + it("Should not retry steps X times automatically when flag 'autoRetry' is set to false and then compensate steps afterward", async () => { + const mocks = { + one: jest.fn().mockImplementation((payload) => { + return payload + }), + compensateOne: jest.fn().mockImplementation((payload) => { + return payload + }), + two: jest.fn().mockImplementation((payload) => { + throw new Error() + }), + compensateTwo: jest.fn().mockImplementation((payload) => { + return payload + }), + } + + async function handler( + actionId: string, + functionHandlerType: TransactionHandlerType, + payload: TransactionPayload + ) { + const command = { + firstMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.one(payload) + }, + [TransactionHandlerType.COMPENSATE]: () => { + mocks.compensateOne(payload) + }, + }, + secondMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.two(payload) + }, + [TransactionHandlerType.COMPENSATE]: () => { + mocks.compensateTwo(payload) + }, + }, + } + + return command[actionId][functionHandlerType](payload) + } + + const flow: TransactionStepsDefinition = { + next: { + action: "firstMethod", + maxRetries: 3, + autoRetry: false, + next: { + action: "secondMethod", + maxRetries: 3, + autoRetry: false, + }, + }, + } + + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) + + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) + + await strategy.resume(transaction) + + expect(transaction.transactionId).toBe("transaction_id_123") + + expect(mocks.one).toHaveBeenCalledTimes(1) + expect(mocks.two).toHaveBeenCalledTimes(1) + + await strategy.resume(transaction) + + expect(mocks.one).toHaveBeenCalledTimes(1) + expect(mocks.two).toHaveBeenCalledTimes(2) + + await strategy.resume(transaction) + + expect(mocks.one).toHaveBeenCalledTimes(1) + expect(mocks.two).toHaveBeenCalledTimes(3) + + await strategy.resume(transaction) + + expect(mocks.one).toHaveBeenCalledTimes(1) + expect(mocks.two).toHaveBeenCalledTimes(4) + + expect(transaction.getState()).toBe(TransactionState.REVERTED) + expect(mocks.compensateOne).toHaveBeenCalledTimes(1) + + expect(mocks.two).nthCalledWith( + 1, + expect.objectContaining({ + metadata: expect.objectContaining({ + attempt: 1, + }), + }) + ) + + expect(mocks.two).nthCalledWith( + 4, + expect.objectContaining({ + metadata: expect.objectContaining({ + attempt: 4, + }), + }) + ) + }) + it("Should fail a transaction if any step fails after retrying X time to compensate it", async () => { const mocks = { one: jest.fn().mockImplementation((payload) => { diff --git a/packages/core/orchestration/src/transaction/errors.ts b/packages/core/orchestration/src/transaction/errors.ts index 627df65aa3..83aba31727 100644 --- a/packages/core/orchestration/src/transaction/errors.ts +++ b/packages/core/orchestration/src/transaction/errors.ts @@ -99,6 +99,22 @@ export class SkipExecutionError extends Error { } } +export class SkipStepAlreadyFinishedError extends Error { + static isSkipStepAlreadyFinishedError( + error: Error + ): error is SkipStepAlreadyFinishedError { + return ( + error instanceof SkipStepAlreadyFinishedError || + error?.name === "SkipStepAlreadyFinishedError" + ) + } + + constructor(message?: string) { + super(message) + this.name = "SkipStepAlreadyFinishedError" + } +} + export class SkipCancelledExecutionError extends Error { static isSkipCancelledExecutionError( error: Error diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 1efad116eb..c1c39b6c72 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -33,6 +33,7 @@ import { PermanentStepFailureError, SkipCancelledExecutionError, SkipExecutionError, + SkipStepAlreadyFinishedError, SkipStepResponse, TransactionStepTimeoutError, TransactionTimeoutError, @@ -117,7 +118,8 @@ export class TransactionOrchestrator extends EventEmitter { private static isExpectedError(error: Error): boolean { return ( SkipCancelledExecutionError.isSkipCancelledExecutionError(error) || - SkipExecutionError.isSkipExecutionError(error) + SkipExecutionError.isSkipExecutionError(error) || + SkipStepAlreadyFinishedError.isSkipStepAlreadyFinishedError(error) ) } @@ -415,10 +417,24 @@ export class TransactionOrchestrator extends EventEmitter { stepDef.definition.retryIntervalAwaiting! ) } + } else if (stepDef.retryRescheduledAt) { + // The step is not configured for awaiting retry but is manually force to retry + stepDef.retryRescheduledAt = null + nextSteps.push(stepDef) } continue } else if (curState.status === TransactionStepStatus.TEMPORARY_FAILURE) { + if ( + !stepDef.temporaryFailedAt && + stepDef.definition.autoRetry === false + ) { + stepDef.temporaryFailedAt = Date.now() + continue + } + + stepDef.temporaryFailedAt = null + currentSteps.push(stepDef) if (!stepDef.canRetry()) { @@ -565,6 +581,27 @@ export class TransactionOrchestrator extends EventEmitter { } } + private static async retryStep( + transaction: DistributedTransactionType, + step: TransactionStep + ): Promise { + if (!step.retryRescheduledAt) { + step.hasScheduledRetry = true + step.retryRescheduledAt = Date.now() + } + + transaction.getFlow().hasWaitingSteps = true + + try { + await transaction.saveCheckpoint() + await transaction.scheduleRetry(step, 0) + } catch (error) { + if (!TransactionOrchestrator.isExpectedError(error)) { + throw error + } + } + } + private static async skipStep({ transaction, step, @@ -677,10 +714,13 @@ export class TransactionOrchestrator extends EventEmitter { stopExecution: boolean transactionIsCancelling?: boolean }> { + const result = { + stopExecution: false, + transactionIsCancelling: false, + } + if (SkipExecutionError.isSkipExecutionError(error)) { - return { - stopExecution: false, - } + return result } step.failures++ @@ -773,10 +813,21 @@ export class TransactionOrchestrator extends EventEmitter { if (step.hasTimeout()) { cleaningUp.push(transaction.clearStepTimeout(step)) } + } else { + const isAsync = step.isCompensating() + ? step.definition.compensateAsync + : step.definition.async + + if ( + step.getStates().status === TransactionStepStatus.TEMPORARY_FAILURE && + step.definition.autoRetry === false && + isAsync + ) { + step.temporaryFailedAt = Date.now() + result.stopExecution = true + } } - let transactionIsCancelling = false - let shouldEmit = true try { await transaction.saveCheckpoint() } catch (error) { @@ -784,11 +835,11 @@ export class TransactionOrchestrator extends EventEmitter { throw error } - transactionIsCancelling = + result.transactionIsCancelling = SkipCancelledExecutionError.isSkipCancelledExecutionError(error) if (SkipExecutionError.isSkipExecutionError(error)) { - shouldEmit = false + result.stopExecution = true } } @@ -798,7 +849,7 @@ export class TransactionOrchestrator extends EventEmitter { await promiseAll(cleaningUp) - if (shouldEmit) { + if (!result.stopExecution) { const eventName = step.isCompensating() ? DistributedTransactionEvent.COMPENSATE_STEP_FAILURE : DistributedTransactionEvent.STEP_FAILURE @@ -806,8 +857,8 @@ export class TransactionOrchestrator extends EventEmitter { } return { - stopExecution: !shouldEmit, - transactionIsCancelling, + stopExecution: result.stopExecution, + transactionIsCancelling: result.transactionIsCancelling, } } @@ -1732,6 +1783,50 @@ export class TransactionOrchestrator extends EventEmitter { return curTransaction } + /** + * Manually force a step to retry even if it is still in awaiting status + * @param responseIdempotencyKey - The idempotency key for the step + * @param handler - The handler function to execute the step + * @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey + */ + public async retryStep({ + responseIdempotencyKey, + handler, + transaction, + onLoad, + }: { + responseIdempotencyKey: string + handler?: TransactionStepHandler + transaction?: DistributedTransactionType + onLoad?: (transaction: DistributedTransactionType) => Promise | void + }): Promise { + const [curTransaction, step] = + await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey( + responseIdempotencyKey, + handler, + transaction + ) + + if (onLoad) { + await onLoad(curTransaction) + } + + if (step.getStates().status === TransactionStepStatus.WAITING) { + this.emit(DistributedTransactionEvent.RESUME, { + transaction: curTransaction, + }) + + await TransactionOrchestrator.retryStep(curTransaction, step) + } else { + throw new MedusaError( + MedusaError.Types.NOT_ALLOWED, + `Cannot retry step when status is ${step.getStates().status}` + ) + } + + return curTransaction + } + /** Register a step success for a specific transaction and step * @param responseIdempotencyKey - The idempotency key for the step * @param handler - The handler function to execute the step diff --git a/packages/core/orchestration/src/transaction/transaction-step.ts b/packages/core/orchestration/src/transaction/transaction-step.ts index e1157abf90..0f1984a2f0 100644 --- a/packages/core/orchestration/src/transaction/transaction-step.ts +++ b/packages/core/orchestration/src/transaction/transaction-step.ts @@ -54,6 +54,7 @@ export class TransactionStep { } attempts: number failures: number + temporaryFailedAt: number | null lastAttempt: number | null retryRescheduledAt: number | null hasScheduledRetry: boolean @@ -189,7 +190,9 @@ export class TransactionStep { this.hasAwaitingRetry() && this.lastAttempt && Date.now() - this.lastAttempt > - this.definition.retryIntervalAwaiting! * 1e3 + this.definition.retryIntervalAwaiting! * 1e3 && + (!("maxAwaitingRetries" in this.definition) || + this.attempts < this.definition.maxAwaitingRetries!) ) } @@ -199,7 +202,8 @@ export class TransactionStep { (!this.isCompensating() && state === TransactionStepState.NOT_STARTED && flowState === TransactionState.INVOKING) || - status === TransactionStepStatus.TEMPORARY_FAILURE + (status === TransactionStepStatus.TEMPORARY_FAILURE && + !this.temporaryFailedAt) ) } diff --git a/packages/core/orchestration/src/transaction/types.ts b/packages/core/orchestration/src/transaction/types.ts index e50fd3c827..831305c3a3 100644 --- a/packages/core/orchestration/src/transaction/types.ts +++ b/packages/core/orchestration/src/transaction/types.ts @@ -47,6 +47,12 @@ export type TransactionStepsDefinition = { */ maxRetries?: number + /** + * If true, the step will be retried automatically in case of a temporary failure. + * The default is true. + */ + autoRetry?: boolean + /** * The interval (in seconds) between retry attempts after a temporary failure. * The default is to retry immediately. @@ -58,6 +64,11 @@ export type TransactionStepsDefinition = { */ retryIntervalAwaiting?: number + /** + * The maximum number of times to retry a step even if its status is "TransactionStepStatus.WAITING". + */ + maxAwaitingRetries?: number + /** * The maximum amount of time (in seconds) to wait for this step to complete. * This is NOT an execution timeout, the step will always be executed and wait for its response. @@ -279,6 +290,7 @@ export type TransactionFlow = { timedOutAt: number | null startedAt?: number cancelledAt?: number + temporaryFailedAt?: number | null state: TransactionState steps: { [key: string]: TransactionStep diff --git a/packages/core/orchestration/src/workflow/local-workflow.ts b/packages/core/orchestration/src/workflow/local-workflow.ts index cdcb1d7b62..74e2cad8fa 100644 --- a/packages/core/orchestration/src/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/workflow/local-workflow.ts @@ -435,6 +435,33 @@ export class LocalWorkflow { } } + async retryStep( + idempotencyKey: string, + context?: Context, + subscribe?: DistributedTransactionEvents + ): Promise { + this.medusaContext = context + const { handler, orchestrator } = this.workflow + + const { cleanUpEventListeners } = this.registerEventCallbacks({ + orchestrator, + idempotencyKey, + subscribe, + }) + + const transaction = await orchestrator.retryStep({ + responseIdempotencyKey: idempotencyKey, + handler: handler(this.container_, context), + onLoad: this.onLoad.bind(this), + }) + + try { + return transaction + } finally { + cleanUpEventListeners() + } + } + async registerStepSuccess( idempotencyKey: string, response?: unknown, diff --git a/packages/core/types/src/http/workflow-execution/admin/entities.ts b/packages/core/types/src/http/workflow-execution/admin/entities.ts index df84614402..cddd015240 100644 --- a/packages/core/types/src/http/workflow-execution/admin/entities.ts +++ b/packages/core/types/src/http/workflow-execution/admin/entities.ts @@ -129,6 +129,11 @@ export interface WorkflowExecutionDefinition { * The default is 0 (no retries). */ maxRetries?: number + /** + * If true, the step will be retried automatically in case of a temporary failure. + * The default is true. + */ + autoRetry?: boolean /** * If true, the workflow will not wait for their sibling steps to complete before moving to the next step. */ @@ -142,6 +147,10 @@ export interface WorkflowExecutionDefinition { * The interval (in seconds) to retry a step even if its status is `TransactionStepStatus.WAITING`. */ retryIntervalAwaiting?: number + /** + * The maximum number of times to retry a step even if its status is `TransactionStepStatus.WAITING`. + */ + maxAwaitingRetries?: number /** * If true, the response of this step will be stored. * Default is true. diff --git a/packages/core/types/src/workflows-sdk/service.ts b/packages/core/types/src/workflows-sdk/service.ts index a2a95ab6de..1488525508 100644 --- a/packages/core/types/src/workflows-sdk/service.ts +++ b/packages/core/types/src/workflows-sdk/service.ts @@ -106,6 +106,17 @@ export interface IWorkflowEngineService extends IModuleService { sharedContext?: Context ) + retryStep( + { + idempotencyKey, + options, + }: { + idempotencyKey: string | IdempotencyKeyParts + options?: Record + }, + sharedContext?: Context + ) + subscribe( args: { workflowId: string diff --git a/packages/core/workflows-sdk/src/helper/type.ts b/packages/core/workflows-sdk/src/helper/type.ts index 33c3adcc78..4bf6039c54 100644 --- a/packages/core/workflows-sdk/src/helper/type.ts +++ b/packages/core/workflows-sdk/src/helper/type.ts @@ -25,6 +25,10 @@ export type FlowRegisterStepSuccessOptions = response?: TData } +export type FlowRetryStepOptions = Omit & { + idempotencyKey: string +} + export type FlowRegisterStepFailureOptions = BaseFlowRunOptions & { idempotencyKey: string @@ -93,6 +97,7 @@ export type ExportedWorkflow< TResultOverride extends undefined ? TResult : TResultOverride > > + retryStep: (args?: FlowRetryStepOptions) => Promise cancel: (args?: FlowCancelOptions) => Promise } diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index b5aef4c482..555dad0e66 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -28,6 +28,7 @@ import { FlowCancelOptions, FlowRegisterStepFailureOptions, FlowRegisterStepSuccessOptions, + FlowRetryStepOptions, FlowRunOptions, MainExportedWorkflow, WorkflowResult, @@ -53,7 +54,7 @@ function createContextualWorkflowRunner< container?: LoadedModule[] | MedusaContainer }): Omit< LocalWorkflow, - "run" | "registerStepSuccess" | "registerStepFailure" | "cancel" + "run" | "registerStepSuccess" | "registerStepFailure" | "cancel" | "retryStep" > & ExportedWorkflow { const flow = new LocalWorkflow(workflowId, container!) @@ -62,6 +63,7 @@ function createContextualWorkflowRunner< const originalRegisterStepSuccess = flow.registerStepSuccess.bind(flow) const originalRegisterStepFailure = flow.registerStepFailure.bind(flow) const originalCancel = flow.cancel.bind(flow) + const originalRetryStep = flow.retryStep.bind(flow) const originalExecution = async ( method, @@ -310,6 +312,46 @@ function createContextualWorkflowRunner< } flow.registerStepFailure = newRegisterStepFailure as any + const newRetryStep = async ( + { + idempotencyKey, + context: outerContext, + throwOnError, + logOnError, + events, + container, + }: FlowRetryStepOptions = { + idempotencyKey: "", + } + ) => { + idempotencyKey ??= "" + throwOnError ??= true + logOnError ??= false + + const [, transactionId] = idempotencyKey.split(":") + const context = { + ...outerContext, + transactionId, + __type: MedusaContextType as Context["__type"], + } + + context.eventGroupId ??= ulid() + + return await originalExecution( + originalRetryStep, + { + throwOnError, + container, + logOnError, + }, + idempotencyKey, + undefined, + context, + events + ) + } + flow.retryStep = newRetryStep as any + const newCancel = async ({ transaction, transactionId, @@ -367,7 +409,11 @@ export const exportWorkflow = ( container?: LoadedModule[] | MedusaContainer ): Omit< LocalWorkflow, - "run" | "registerStepSuccess" | "registerStepFailure" | "cancel" + | "run" + | "registerStepSuccess" + | "registerStepFailure" + | "cancel" + | "retryStep" > & ExportedWorkflow { return createContextualWorkflowRunner< @@ -388,11 +434,17 @@ export const exportWorkflow = ( | "run" | "registerStepSuccess" | "registerStepFailure" - | "cancel", + | "cancel" + | "retryStep", TDataOverride, TResultOverride >( - action: "run" | "registerStepSuccess" | "registerStepFailure" | "cancel", + action: + | "run" + | "registerStepSuccess" + | "registerStepFailure" + | "cancel" + | "retryStep", container?: LoadedModule[] | MedusaContainer ) => { const contextualRunner = createContextualWorkflowRunner< @@ -495,6 +547,22 @@ export const exportWorkflow = ( )(inputArgs) } + exportedWorkflow.retryStep = async < + TDataOverride = undefined, + TResultOverride = undefined + >( + args?: FlowRetryStepOptions + ): Promise => { + const container = args?.container + delete args?.container + const inputArgs = { ...args } as FlowRetryStepOptions + + return await buildRunnerFn<"retryStep", TDataOverride, TResultOverride>( + "retryStep", + container + )(inputArgs) + } + exportedWorkflow.cancel = async ( args?: FlowCancelOptions ): Promise => { diff --git a/packages/core/workflows-sdk/src/utils/composer/type.ts b/packages/core/workflows-sdk/src/utils/composer/type.ts index afb0c14b36..24ad3af83b 100644 --- a/packages/core/workflows-sdk/src/utils/composer/type.ts +++ b/packages/core/workflows-sdk/src/utils/composer/type.ts @@ -268,7 +268,11 @@ export type ReturnWorkflow = { container?: LoadedModule[] | MedusaContainer ): Omit< LocalWorkflow, - "run" | "registerStepSuccess" | "registerStepFailure" | "cancel" + | "run" + | "registerStepSuccess" + | "registerStepFailure" + | "cancel" + | "retryStep" > & ExportedWorkflow } & { diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_auto_retries.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_auto_retries.ts new file mode 100644 index 0000000000..7cb1e43ce9 --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_auto_retries.ts @@ -0,0 +1,56 @@ +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/framework/workflows-sdk" + +const step1InvokeMock = jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) +}) + +const step1CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +const step2InvokeMock = jest.fn((input) => { + throw new Error("Temporary failure") +}) + +const step2CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +export { + step1CompensateMock, + step1InvokeMock, + step2CompensateMock, + step2InvokeMock, +} + +const step_1 = createStep("step_1", step1InvokeMock, step1CompensateMock) + +const step_2 = createStep("step_2", step2InvokeMock, step2CompensateMock) + +createWorkflow("workflow_1_auto_retries", function (input) { + step_1(input) + + const ret2 = step_2({ hey: "oh" }).config({ + async: true, + maxRetries: 2, + }) + + return ret2 +}) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_auto_retries_false.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_auto_retries_false.ts new file mode 100644 index 0000000000..acf2a2ac51 --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_auto_retries_false.ts @@ -0,0 +1,57 @@ +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/framework/workflows-sdk" + +const step1InvokeMock = jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) +}) + +const step1CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +const step2InvokeMock = jest.fn((input) => { + throw new Error("Temporary failure") +}) + +const step2CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +export { + step1CompensateMock, + step1InvokeMock, + step2CompensateMock, + step2InvokeMock, +} + +const step_1 = createStep("step_1", step1InvokeMock, step1CompensateMock) + +const step_2 = createStep("step_2", step2InvokeMock, step2CompensateMock) + +createWorkflow("workflow_1_auto_retries_false", function (input) { + step_1(input) + + const ret2 = step_2({ hey: "oh" }).config({ + async: true, + maxRetries: 2, + autoRetry: false, + }) + + return ret2 +}) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_manual_retry_step.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_manual_retry_step.ts new file mode 100644 index 0000000000..7cae96694a --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_manual_retry_step.ts @@ -0,0 +1,60 @@ +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/framework/workflows-sdk" + +const step1InvokeMock = jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) +}) + +const step1CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +const step2InvokeMock = jest.fn(async (input, context) => { + if (context.metadata.attempt === 1) { + await new Promise((resolve) => setTimeout(resolve, 100000)) + return new StepResponse("success after 100 seconds") + } + + return new StepResponse("success") +}) + +const step2CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +export { + step1CompensateMock, + step1InvokeMock, + step2CompensateMock, + step2InvokeMock, +} + +const step_1 = createStep("step_1", step1InvokeMock, step1CompensateMock) + +const step_2 = createStep("step_2", step2InvokeMock, step2CompensateMock) + +createWorkflow("workflow_1_manual_retry_step", function (input) { + step_1(input) + + const ret2 = step_2({ hey: "oh" }).config({ + async: true, + }) + + return ret2 +}) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index d85bc7f072..32e7d22c76 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -43,18 +43,34 @@ import { workflowEventGroupIdStep1Mock, workflowEventGroupIdStep2Mock, } from "../__fixtures__/workflow_event_group_id" +import { + step1InvokeMock as step1InvokeMockAutoRetries, + step2InvokeMock as step2InvokeMockAutoRetries, + step1CompensateMock as step1CompensateMockAutoRetries, + step2CompensateMock as step2CompensateMockAutoRetries, +} from "../__fixtures__/workflow_1_auto_retries" +import { + step1InvokeMock as step1InvokeMockAutoRetriesFalse, + step2InvokeMock as step2InvokeMockAutoRetriesFalse, + step1CompensateMock as step1CompensateMockAutoRetriesFalse, + step2CompensateMock as step2CompensateMockAutoRetriesFalse, +} from "../__fixtures__/workflow_1_auto_retries_false" +import { + step1InvokeMock as step1InvokeMockManualRetry, + step2InvokeMock as step2InvokeMockManualRetry, +} from "../__fixtures__/workflow_1_manual_retry_step" import { createScheduled } from "../__fixtures__/workflow_scheduled" jest.setTimeout(60000) -const failTrap = (done, name) => { - setTimeoutSync(() => { +const failTrap = (done, name, timeout = 5000) => { + return setTimeoutSync(() => { // REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending console.warn( `Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually. ${name}` ) done() - }, 5000) + }, timeout) } function times(num) { @@ -195,12 +211,13 @@ moduleIntegrationTestRunner({ TransactionState.REVERTED ) done() + clearTimeout(timeout) } }, }) }) - failTrap( + const timeout = failTrap( done, "should cancel an ongoing execution with async unfinished yet step" ) @@ -358,6 +375,137 @@ moduleIntegrationTestRunner({ }) }) + it("should manually retry a step that is taking too long to finish", (done) => { + const transactionId = "transaction-manual-retry" + ulid() + const workflowId = "workflow_1_manual_retry_step" + + void workflowOrcModule + .run(workflowId, { + input: {}, + transactionId, + }) + .then(() => { + expect(step1InvokeMockManualRetry).toHaveBeenCalledTimes(1) + expect(step2InvokeMockManualRetry).toHaveBeenCalledTimes(1) + + void workflowOrcModule.retryStep({ + idempotencyKey: { + workflowId, + transactionId, + stepId: "step_2", + action: "invoke", + }, + }) + }) + + workflowOrcModule.subscribe({ + workflowId, + transactionId, + subscriber: async (event) => { + if (event.eventType === "onFinish") { + expect(step1InvokeMockManualRetry).toHaveBeenCalledTimes(1) + expect(step2InvokeMockManualRetry).toHaveBeenCalledTimes(2) + done() + clearTimeout(timeout) + } + }, + }) + + const timeout = failTrap( + done, + "should manually retry a step that is taking too long to finish" + ) + }) + + it("should retry steps X times automatically when maxRetries is set", (done) => { + const transactionId = "transaction-auto-retries" + ulid() + const workflowId = "workflow_1_auto_retries" + + void workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + workflowOrcModule.subscribe({ + workflowId, + transactionId, + subscriber: async (event) => { + if (event.eventType === "onFinish") { + expect(step1InvokeMockAutoRetries).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetries).toHaveBeenCalledTimes(3) + expect(step1CompensateMockAutoRetries).toHaveBeenCalledTimes(1) + expect(step2CompensateMockAutoRetries).toHaveBeenCalledTimes(1) + done() + clearTimeout(timeout) + } + }, + }) + + const timeout = failTrap( + done, + "should retry steps X times automatically when maxRetries is set" + ) + }) + + it("should not retry steps X times automatically when maxRetries is set and autoRetry is false", async () => { + const transactionId = "transaction-auto-retries" + ulid() + const workflowId = "workflow_1_auto_retries_false" + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) + + let lastExepectHaveBeenCalledTimes = 0 + + workflowOrcModule.subscribe({ + workflowId, + transactionId, + subscriber: async (event) => { + if (event.eventType === "onFinish") { + lastExepectHaveBeenCalledTimes = 1 + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(3) + expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes( + 1 + ) + expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes( + 1 + ) + } + }, + }) + + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) + + await setTimeoutPromise(1000) + + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(2) + expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) + + await setTimeoutPromise(1000) + + expect(lastExepectHaveBeenCalledTimes).toEqual(1) + }) + it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => { const transactionId = "transaction_id" + ulid() const workflowId = "workflow_id" + ulid() @@ -535,12 +683,13 @@ moduleIntegrationTestRunner({ { obj: "return from 3" }, ]) done() + clearTimeout(timeout) } }, }) }) - failTrap( + const timeout = failTrap( done, "should subscribe to a async workflow and receive the response when it finishes" ) @@ -729,6 +878,7 @@ moduleIntegrationTestRunner({ const onFinish = jest.fn(() => { done() + clearTimeout(timeout) }) void workflowOrcModule.subscribe({ @@ -750,7 +900,7 @@ moduleIntegrationTestRunner({ }) expect(onFinish).toHaveBeenCalledTimes(0) - failTrap( + const timeout = failTrap( done, "should subscribe to a async workflow and receive the response when it finishes" ) @@ -812,9 +962,10 @@ moduleIntegrationTestRunner({ workflowId: "workflow_conditional_step", subscriber: (event) => { if (event.eventType === "onFinish") { - done() expect(conditionalStep2Invoke).toHaveBeenCalledTimes(2) expect(conditionalStep3Invoke).toHaveBeenCalledTimes(1) + done() + clearTimeout(timeout) } }, }) @@ -826,7 +977,7 @@ moduleIntegrationTestRunner({ throwOnError: true, }) - failTrap( + const timeout = failTrap( done, "should not run conditional steps if condition is false" ) @@ -837,9 +988,10 @@ moduleIntegrationTestRunner({ workflowId: "workflow_conditional_step", subscriber: (event) => { if (event.eventType === "onFinish") { - done() expect(conditionalStep2Invoke).toHaveBeenCalledTimes(1) expect(conditionalStep3Invoke).toHaveBeenCalledTimes(0) + done() + clearTimeout(timeout) } }, }) @@ -851,7 +1003,7 @@ moduleIntegrationTestRunner({ throwOnError: true, }) - failTrap( + const timeout = failTrap( done, "should not run conditional steps if condition is false" ) @@ -987,7 +1139,6 @@ moduleIntegrationTestRunner({ workflowId: "workflow_parallel_async", subscriber: (event) => { if (event.eventType === "onFinish") { - done() expect(event.errors).toEqual( expect.arrayContaining([ expect.objectContaining({ @@ -999,11 +1150,13 @@ moduleIntegrationTestRunner({ }), ]) ) + done() + clearTimeout(timeout) } }, }) - failTrap( + const timeout = failTrap( done, "should display error when multple async steps are running in parallel" ) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/race.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/race.spec.ts index 1dc8308483..5ac2787d54 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/race.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/race.spec.ts @@ -14,14 +14,14 @@ import "../__fixtures__" jest.setTimeout(300000) -const failTrap = (done, name) => { - setTimeoutSync(() => { +const failTrap = (done, name, timeout = 5000) => { + return setTimeoutSync(() => { // REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending console.warn( `Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually. ${name}` ) done() - }, 5000) + }, timeout) } moduleIntegrationTestRunner({ @@ -87,7 +87,8 @@ moduleIntegrationTestRunner({ expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(1) expect(step2InvokeMock).toHaveBeenCalledTimes(1) expect(transformMock).toHaveBeenCalledTimes(1) - setTimeoutSync(done, 500) + done() + clearTimeout(timeout) } }, }) @@ -99,7 +100,7 @@ moduleIntegrationTestRunner({ }) .catch((e) => e) - failTrap( + const timeout = failTrap( done, "should prevent race continuation of the workflow during retryIntervalAwaiting in background execution" ) @@ -179,7 +180,8 @@ moduleIntegrationTestRunner({ expect(step1CompensateMock).toHaveBeenCalledTimes(1) expect(step2InvokeMock).toHaveBeenCalledTimes(0) expect(transformMock).toHaveBeenCalledTimes(0) - setTimeoutSync(done, 500) + done() + clearTimeout(timeout) } }, }) @@ -193,7 +195,7 @@ moduleIntegrationTestRunner({ }) .catch((e) => e) - failTrap( + const timeout = failTrap( done, "should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution" ) diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 7b4d71e8bd..ff67c1cb22 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -47,6 +47,11 @@ type RegisterStepFailureOptions = Omit< forcePermanentFailure?: boolean } +type RetryStepOptions = Omit< + WorkflowOrchestratorRunOptions, + "transactionId" | "input" | "resultFrom" +> + type IdempotencyKeyParts = { workflowId: string transactionId: string @@ -379,6 +384,72 @@ export class WorkflowOrchestratorService { return transaction } + async retryStep({ + idempotencyKey, + options, + }: { + idempotencyKey: string | IdempotencyKeyParts + options?: RetryStepOptions + }) { + const { + context, + logOnError, + container, + events: eventHandlers, + } = options ?? {} + + let { throwOnError } = options ?? {} + throwOnError ??= true + + const [idempotencyKey_, { workflowId, transactionId }] = + this.buildIdempotencyKeyAndParts(idempotencyKey) + + const exportedWorkflow: any = MedusaWorkflow.getWorkflow(workflowId) + if (!exportedWorkflow) { + throw new Error(`Workflow with id "${workflowId}" not found.`) + } + + const events = this.buildWorkflowEvents({ + customEventHandlers: eventHandlers, + transactionId, + workflowId, + }) + + const ret = await exportedWorkflow.retryStep({ + idempotencyKey: idempotencyKey_, + context, + throwOnError: false, + logOnError, + events, + container: container ?? this.container_, + }) + + if (ret.transaction.hasFinished()) { + const { result, errors } = ret + + this.notify({ + eventType: "onFinish", + workflowId, + transactionId, + state: ret.transaction.getFlow().state as TransactionState, + result, + errors, + }) + + await this.triggerParentStep(ret.transaction, result) + } + + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error + } + + return ret + } + async setStepSuccess({ idempotencyKey, stepResponse, diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts index 1df1e024fe..930780e2a2 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts @@ -3,6 +3,7 @@ import { DAL, FilterableWorkflowExecutionProps, FindConfig, + IdempotencyKeyParts, InferEntityType, InternalModuleDeclaration, ModulesSdkTypes, @@ -206,6 +207,29 @@ export class WorkflowsModuleService< ) } + @InjectSharedContext() + async retryStep( + { + idempotencyKey, + options, + }: { + idempotencyKey: string | IdempotencyKeyParts + options?: Record + }, + @MedusaContext() context: Context = {} + ) { + const options_ = JSON.parse(JSON.stringify(options ?? {})) + + const { manager, transactionManager, ...restContext } = context + + options_.context ??= restContext + + return await this.workflowOrchestratorService_.retryStep({ + idempotencyKey, + options: options_, + }) + } + @InjectSharedContext() async setStepSuccess( { diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 19c8a63767..02db64216a 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -5,6 +5,7 @@ import { SchedulerOptions, SkipCancelledExecutionError, SkipExecutionError, + SkipStepAlreadyFinishedError, TransactionCheckpoint, TransactionContext, TransactionFlow, @@ -392,6 +393,53 @@ export class InMemoryDistributedTransactionStorage throw new SkipExecutionError("Already finished by another execution") } + let currentFlowLatestExecutedStep: TransactionStep | undefined + const currentFlowSteps = Object.values(currentFlow.steps || {}) + for (let i = currentFlowSteps.length - 1; i >= 0; i--) { + if (currentFlowSteps[i].lastAttempt) { + currentFlowLatestExecutedStep = currentFlowSteps[i] + break + } + } + + let latestUpdatedFlowLatestExecutedStep: TransactionStep | undefined + const latestUpdatedFlowSteps = Object.values(latestUpdatedFlow.steps || {}) + for (let i = latestUpdatedFlowSteps.length - 1; i >= 0; i--) { + if (latestUpdatedFlowSteps[i].lastAttempt) { + latestUpdatedFlowLatestExecutedStep = latestUpdatedFlowSteps[i] + break + } + } + + /** + * The current flow and the latest updated flow have the same latest executed step. + */ + const isSameLatestExecutedStep = + currentFlowLatestExecutedStep && + latestUpdatedFlowLatestExecutedStep && + currentFlowLatestExecutedStep?.id === + latestUpdatedFlowLatestExecutedStep?.id + + /** + * The current flow's latest executed step has a last attempt ahead of the latest updated + * flow's latest executed step. Therefor it is fine, otherwise another execution has already + * finished the step. + */ + const isCurrentLatestExecutedStepLastAttemptAhead = + currentFlowLatestExecutedStep?.lastAttempt && + latestUpdatedFlowLatestExecutedStep?.lastAttempt && + currentFlowLatestExecutedStep.lastAttempt >= + latestUpdatedFlowLatestExecutedStep.lastAttempt + + if ( + isSameLatestExecutedStep && + !isCurrentLatestExecutedStepLastAttemptAhead + ) { + throw new SkipStepAlreadyFinishedError( + "Step already in execution ahead of the current one" + ) + } + // First ensure that the latest execution was not cancelled, otherwise we skip the execution const latestTransactionCancelledAt = latestUpdatedFlow.cancelledAt const currentTransactionCancelledAt = currentFlow.cancelledAt @@ -405,13 +453,6 @@ export class InMemoryDistributedTransactionStorage ) } - const currentFlowSteps = Object.values(currentFlow.steps || {}) - const latestUpdatedFlowSteps = latestUpdatedFlow.steps - ? Object.values( - latestUpdatedFlow.steps as Record - ) - : [] - // Predefined states for quick lookup const invokingStates = [ TransactionStepState.INVOKING, diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_auto_retries.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_auto_retries.ts new file mode 100644 index 0000000000..7cb1e43ce9 --- /dev/null +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_auto_retries.ts @@ -0,0 +1,56 @@ +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/framework/workflows-sdk" + +const step1InvokeMock = jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) +}) + +const step1CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +const step2InvokeMock = jest.fn((input) => { + throw new Error("Temporary failure") +}) + +const step2CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +export { + step1CompensateMock, + step1InvokeMock, + step2CompensateMock, + step2InvokeMock, +} + +const step_1 = createStep("step_1", step1InvokeMock, step1CompensateMock) + +const step_2 = createStep("step_2", step2InvokeMock, step2CompensateMock) + +createWorkflow("workflow_1_auto_retries", function (input) { + step_1(input) + + const ret2 = step_2({ hey: "oh" }).config({ + async: true, + maxRetries: 2, + }) + + return ret2 +}) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_auto_retries_false.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_auto_retries_false.ts new file mode 100644 index 0000000000..acf2a2ac51 --- /dev/null +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_auto_retries_false.ts @@ -0,0 +1,57 @@ +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/framework/workflows-sdk" + +const step1InvokeMock = jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) +}) + +const step1CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +const step2InvokeMock = jest.fn((input) => { + throw new Error("Temporary failure") +}) + +const step2CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +export { + step1CompensateMock, + step1InvokeMock, + step2CompensateMock, + step2InvokeMock, +} + +const step_1 = createStep("step_1", step1InvokeMock, step1CompensateMock) + +const step_2 = createStep("step_2", step2InvokeMock, step2CompensateMock) + +createWorkflow("workflow_1_auto_retries_false", function (input) { + step_1(input) + + const ret2 = step_2({ hey: "oh" }).config({ + async: true, + maxRetries: 2, + autoRetry: false, + }) + + return ret2 +}) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_manual_retry_step.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_manual_retry_step.ts new file mode 100644 index 0000000000..7cae96694a --- /dev/null +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_manual_retry_step.ts @@ -0,0 +1,60 @@ +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/framework/workflows-sdk" + +const step1InvokeMock = jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) +}) + +const step1CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +const step2InvokeMock = jest.fn(async (input, context) => { + if (context.metadata.attempt === 1) { + await new Promise((resolve) => setTimeout(resolve, 100000)) + return new StepResponse("success after 100 seconds") + } + + return new StepResponse("success") +}) + +const step2CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +export { + step1CompensateMock, + step1InvokeMock, + step2CompensateMock, + step2InvokeMock, +} + +const step_1 = createStep("step_1", step1InvokeMock, step1CompensateMock) + +const step_2 = createStep("step_2", step2InvokeMock, step2CompensateMock) + +createWorkflow("workflow_1_manual_retry_step", function (input) { + step_1(input) + + const ret2 = step_2({ hey: "oh" }).config({ + async: true, + }) + + return ret2 +}) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 6ef64f77f7..2b17d5fb60 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -38,19 +38,38 @@ import { workflowNotIdempotentWithRetentionStep3Invoke, } from "../__fixtures__" import { createScheduled } from "../__fixtures__/workflow_scheduled" +import { + step1InvokeMock as step1InvokeMockAutoRetries, + step2InvokeMock as step2InvokeMockAutoRetries, + step1CompensateMock as step1CompensateMockAutoRetries, + step2CompensateMock as step2CompensateMockAutoRetries, +} from "../__fixtures__/workflow_1_auto_retries" +import { + step1InvokeMock as step1InvokeMockAutoRetriesFalse, + step2InvokeMock as step2InvokeMockAutoRetriesFalse, + step1CompensateMock as step1CompensateMockAutoRetriesFalse, + step2CompensateMock as step2CompensateMockAutoRetriesFalse, +} from "../__fixtures__/workflow_1_auto_retries_false" + +import { + step1InvokeMock as step1InvokeMockManualRetry, + step2InvokeMock as step2InvokeMockManualRetry, + step1CompensateMock as step1CompensateMockManualRetry, + step2CompensateMock as step2CompensateMockManualRetry, +} from "../__fixtures__/workflow_1_manual_retry_step" import { TestDatabase } from "../utils" import { Redis } from "ioredis" jest.setTimeout(300000) -const failTrap = (done, name) => { - setTimeoutSync(() => { +const failTrap = (done, name, timeout = 5000) => { + return setTimeoutSync(() => { // REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending console.warn( `Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually. ${name}` ) done() - }, 5000) + }, timeout) } function times(num) { @@ -221,12 +240,13 @@ moduleIntegrationTestRunner({ TransactionState.REVERTED ) done() + clearTimeout(timeout) } }, }) }) - failTrap( + const timeout = failTrap( done, "should cancel an ongoing execution with async unfinished yet step" ) @@ -393,6 +413,137 @@ moduleIntegrationTestRunner({ }) }) + it("should manually retry a step that is taking too long to finish", (done) => { + const transactionId = "transaction-manual-retry" + ulid() + const workflowId = "workflow_1_manual_retry_step" + + void workflowOrcModule + .run(workflowId, { + input: {}, + transactionId, + }) + .then(() => { + expect(step1InvokeMockManualRetry).toHaveBeenCalledTimes(1) + expect(step2InvokeMockManualRetry).toHaveBeenCalledTimes(1) + + void workflowOrcModule.retryStep({ + idempotencyKey: { + workflowId, + transactionId, + stepId: "step_2", + action: "invoke", + }, + }) + }) + + workflowOrcModule.subscribe({ + workflowId, + transactionId, + subscriber: async (event) => { + if (event.eventType === "onFinish") { + expect(step1InvokeMockManualRetry).toHaveBeenCalledTimes(1) + expect(step2InvokeMockManualRetry).toHaveBeenCalledTimes(2) + done() + clearTimeout(timeout) + } + }, + }) + + const timeout = failTrap( + done, + "should manually retry a step that is taking too long to finish" + ) + }) + + it("should retry steps X times automatically when maxRetries is set", (done) => { + const transactionId = "transaction-auto-retries" + ulid() + const workflowId = "workflow_1_auto_retries" + + void workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + workflowOrcModule.subscribe({ + workflowId, + transactionId, + subscriber: async (event) => { + if (event.eventType === "onFinish") { + expect(step1InvokeMockAutoRetries).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetries).toHaveBeenCalledTimes(3) + expect(step1CompensateMockAutoRetries).toHaveBeenCalledTimes(1) + expect(step2CompensateMockAutoRetries).toHaveBeenCalledTimes(1) + done() + clearTimeout(timeout) + } + }, + }) + + const timeout = failTrap( + done, + "should retry steps X times automatically when maxRetries is set" + ) + }) + + it("should not retry steps X times automatically when maxRetries is set and autoRetry is false", async () => { + const transactionId = "transaction-auto-retries" + ulid() + const workflowId = "workflow_1_auto_retries_false" + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) + + let lastExepectHaveBeenCalledTimes = 0 + + workflowOrcModule.subscribe({ + workflowId, + transactionId, + subscriber: async (event) => { + if (event.eventType === "onFinish") { + lastExepectHaveBeenCalledTimes = 1 + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(3) + expect( + step1CompensateMockAutoRetriesFalse + ).toHaveBeenCalledTimes(1) + expect( + step2CompensateMockAutoRetriesFalse + ).toHaveBeenCalledTimes(1) + } + }, + }) + + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) + + await setTimeout(1000) + + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(2) + expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) + + await setTimeout(1000) + + expect(lastExepectHaveBeenCalledTimes).toEqual(1) + }) + it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => { const transactionId = "concurrency_transaction_id" + ulid() const workflowId = "concurrency_workflow_id" + ulid() @@ -813,11 +964,12 @@ moduleIntegrationTestRunner({ subscriber: (event) => { if (event.eventType === "onFinish") { done() + clearTimeout(timeout) } }, }) - failTrap(done, "workflow_async_background") + const timeout = failTrap(done, "workflow_async_background") }) it("should subscribe to a async workflow and receive the response when it finishes", (done) => { @@ -840,13 +992,14 @@ moduleIntegrationTestRunner({ if (event.eventType === "onFinish") { onFinish() done() + clearTimeout(timeout) } }, }) expect(onFinish).toHaveBeenCalledTimes(0) - failTrap(done, "workflow_async_background") + const timeout = failTrap(done, "workflow_async_background") }) it("should not skip step if condition is true", function (done) { @@ -866,11 +1019,12 @@ moduleIntegrationTestRunner({ subscriber: (event) => { if (event.eventType === "onFinish") { done() + clearTimeout(timeout) } }, }) - failTrap(done, "wf-when") + const timeout = failTrap(done, "wf-when") }) it("should cancel an async sub workflow when compensating", (done) => { @@ -904,11 +1058,12 @@ moduleIntegrationTestRunner({ }) done() + clearTimeout(timeout) } }, }) - failTrap(done, "workflow_async_background_fail") + const timeout = failTrap(done, "workflow_async_background_fail") }) it("should cancel and revert a completed workflow", async () => { diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts index 3e75c500e8..d84158f021 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts @@ -15,14 +15,14 @@ import "../__fixtures__" jest.setTimeout(300000) -const failTrap = (done, name) => { - setTimeoutSync(() => { +const failTrap = (done, name, timeout = 5000) => { + return setTimeoutSync(() => { // REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending console.warn( `Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually. ${name}` ) done() - }, 5000) + }, timeout) } // REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending @@ -107,6 +107,8 @@ moduleIntegrationTestRunner({ done() } catch (e) { return done(e) + } finally { + clearTimeout(timeout) } } }, @@ -118,7 +120,7 @@ moduleIntegrationTestRunner({ expect(result).toBe("result from step 0") }) - failTrap( + const timeout = failTrap( done, "should prevent race continuation of the workflow during retryIntervalAwaiting in background execution" ) @@ -206,6 +208,8 @@ moduleIntegrationTestRunner({ done() } catch (e) { return done(e) + } finally { + clearTimeout(timeout) } } }, @@ -217,7 +221,7 @@ moduleIntegrationTestRunner({ expect(result).toBe("result from step 0") }) - failTrap( + const timeout = failTrap( done, "should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution" ) diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index 04520fcfe9..d058c432ef 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -55,6 +55,11 @@ type RegisterStepFailureOptions = Omit< forcePermanentFailure?: boolean } +type RetryStepOptions = Omit< + WorkflowOrchestratorRunOptions, + "transactionId" | "input" | "resultFrom" +> + type IdempotencyKeyParts = { workflowId: string transactionId: string @@ -424,6 +429,73 @@ export class WorkflowOrchestratorService { return transaction } + async retryStep({ + idempotencyKey, + options, + }: { + idempotencyKey: string | IdempotencyKeyParts + options?: RetryStepOptions + }) { + const { + context, + logOnError, + container, + events: eventHandlers, + } = options ?? {} + + let { throwOnError } = options ?? {} + throwOnError ??= true + + const [idempotencyKey_, { workflowId, transactionId }] = + this.buildIdempotencyKeyAndParts(idempotencyKey) + + const exportedWorkflow: any = MedusaWorkflow.getWorkflow(workflowId) + if (!exportedWorkflow) { + throw new Error(`Workflow with id "${workflowId}" not found.`) + } + + const events = this.buildWorkflowEvents({ + customEventHandlers: eventHandlers, + transactionId, + workflowId, + }) + + const ret = await exportedWorkflow.retryStep({ + idempotencyKey: idempotencyKey_, + context, + throwOnError: false, + logOnError, + events, + container: container ?? this.container_, + }) + + if (ret.transaction.hasFinished()) { + const { result, errors } = ret + + this.notify({ + isFlowAsync: ret.transaction.getFlow().hasAsyncSteps, + eventType: "onFinish", + workflowId, + transactionId, + state: ret.transaction.getFlow().state as TransactionState, + result, + errors, + }) + + await this.triggerParentStep(ret.transaction, result) + } + + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error + } + + return ret + } + async setStepSuccess({ idempotencyKey, stepResponse, diff --git a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts index 27ca9a9ab5..ed085588dd 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts @@ -3,6 +3,7 @@ import { DAL, FilterableWorkflowExecutionProps, FindConfig, + IdempotencyKeyParts, InferEntityType, InternalModuleDeclaration, ModulesSdkTypes, @@ -269,6 +270,29 @@ export class WorkflowsModuleService< } as any) } + @InjectSharedContext() + async retryStep( + { + idempotencyKey, + options, + }: { + idempotencyKey: string | IdempotencyKeyParts + options?: Record + }, + @MedusaContext() context: Context = {} + ) { + const options_ = JSON.parse(JSON.stringify(options ?? {})) + + const { manager, transactionManager, ...restContext } = context + + options_.context ??= restContext + + return await this.workflowOrchestratorService_.retryStep({ + idempotencyKey, + options: options_, + }) + } + @InjectSharedContext() async subscribe( args: { diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 8789aa174f..dbe27321eb 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -5,6 +5,7 @@ import { SchedulerOptions, SkipCancelledExecutionError, SkipExecutionError, + SkipStepAlreadyFinishedError, TransactionCheckpoint, TransactionContext, TransactionFlow, @@ -719,6 +720,53 @@ export class RedisDistributedTransactionStorage throw new SkipExecutionError("Already finished by another execution") } + let currentFlowLatestExecutedStep: TransactionStep | undefined + const currentFlowSteps = Object.values(currentFlow.steps || {}) + for (let i = currentFlowSteps.length - 1; i >= 0; i--) { + if (currentFlowSteps[i].lastAttempt) { + currentFlowLatestExecutedStep = currentFlowSteps[i] + break + } + } + + let latestUpdatedFlowLatestExecutedStep: TransactionStep | undefined + const latestUpdatedFlowSteps = Object.values(latestUpdatedFlow.steps || {}) + for (let i = latestUpdatedFlowSteps.length - 1; i >= 0; i--) { + if (latestUpdatedFlowSteps[i].lastAttempt) { + latestUpdatedFlowLatestExecutedStep = latestUpdatedFlowSteps[i] + break + } + } + + /** + * The current flow and the latest updated flow have the same latest executed step. + */ + const isSameLatestExecutedStep = + currentFlowLatestExecutedStep && + latestUpdatedFlowLatestExecutedStep && + currentFlowLatestExecutedStep?.id === + latestUpdatedFlowLatestExecutedStep?.id + + /** + * The current flow's latest executed step has a last attempt ahead of the latest updated + * flow's latest executed step. Therefor it is fine, otherwise another execution has already + * finished the step. + */ + const isCurrentLatestExecutedStepLastAttemptAhead = + currentFlowLatestExecutedStep?.lastAttempt && + latestUpdatedFlowLatestExecutedStep?.lastAttempt && + currentFlowLatestExecutedStep.lastAttempt >= + latestUpdatedFlowLatestExecutedStep.lastAttempt + + if ( + isSameLatestExecutedStep && + !isCurrentLatestExecutedStepLastAttemptAhead + ) { + throw new SkipStepAlreadyFinishedError( + "Step already finished by another execution" + ) + } + // First ensure that the latest execution was not cancelled, otherwise we skip the execution const latestTransactionCancelledAt = latestUpdatedFlow.cancelledAt const currentTransactionCancelledAt = currentFlow.cancelledAt @@ -732,13 +780,6 @@ export class RedisDistributedTransactionStorage ) } - const currentFlowSteps = Object.values(currentFlow.steps || {}) - const latestUpdatedFlowSteps = latestUpdatedFlow.steps - ? Object.values( - latestUpdatedFlow.steps as Record - ) - : [] - // Predefined states for quick lookup const invokingStates = [ TransactionStepState.INVOKING,