diff --git a/.changeset/many-paws-tease.md b/.changeset/many-paws-tease.md new file mode 100644 index 0000000000..eb78a116d8 --- /dev/null +++ b/.changeset/many-paws-tease.md @@ -0,0 +1,8 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/orchestration": patch +"@medusajs/types": patch +--- + +chore(orchestration): add support for autoRetry step configuration diff --git a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts index 2c1ace528b..547c93ad23 100644 --- a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts @@ -648,6 +648,116 @@ describe("Transaction Orchestrator", () => { ) }) + it("Should not retry steps X times automatically when flag 'autoRetry' is set to false and then compensate steps afterward", async () => { + const mocks = { + one: jest.fn().mockImplementation((payload) => { + return payload + }), + compensateOne: jest.fn().mockImplementation((payload) => { + return payload + }), + two: jest.fn().mockImplementation((payload) => { + throw new Error() + }), + compensateTwo: jest.fn().mockImplementation((payload) => { + return payload + }), + } + + async function handler( + actionId: string, + functionHandlerType: TransactionHandlerType, + payload: TransactionPayload + ) { + const command = { + firstMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.one(payload) + }, + [TransactionHandlerType.COMPENSATE]: () => { + mocks.compensateOne(payload) + }, + }, + secondMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.two(payload) + }, + [TransactionHandlerType.COMPENSATE]: () => { + mocks.compensateTwo(payload) + }, + }, + } + + return command[actionId][functionHandlerType](payload) + } + + const flow: TransactionStepsDefinition = { + next: { + action: "firstMethod", + maxRetries: 3, + autoRetry: false, + next: { + action: "secondMethod", + maxRetries: 3, + autoRetry: false, + }, + }, + } + + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) + + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) + + await strategy.resume(transaction) + + expect(transaction.transactionId).toBe("transaction_id_123") + + expect(mocks.one).toHaveBeenCalledTimes(1) + expect(mocks.two).toHaveBeenCalledTimes(1) + + await strategy.resume(transaction) + + expect(mocks.one).toHaveBeenCalledTimes(1) + expect(mocks.two).toHaveBeenCalledTimes(2) + + await strategy.resume(transaction) + + expect(mocks.one).toHaveBeenCalledTimes(1) + expect(mocks.two).toHaveBeenCalledTimes(3) + + await strategy.resume(transaction) + + expect(mocks.one).toHaveBeenCalledTimes(1) + expect(mocks.two).toHaveBeenCalledTimes(4) + + expect(transaction.getState()).toBe(TransactionState.REVERTED) + expect(mocks.compensateOne).toHaveBeenCalledTimes(1) + + expect(mocks.two).nthCalledWith( + 1, + expect.objectContaining({ + metadata: expect.objectContaining({ + attempt: 1, + }), + }) + ) + + expect(mocks.two).nthCalledWith( + 4, + expect.objectContaining({ + metadata: expect.objectContaining({ + attempt: 4, + }), + }) + ) + }) + it("Should fail a transaction if any step fails after retrying X time to compensate it", async () => { const mocks = { one: jest.fn().mockImplementation((payload) => { diff --git a/packages/core/orchestration/src/transaction/errors.ts b/packages/core/orchestration/src/transaction/errors.ts index 627df65aa3..83aba31727 100644 --- a/packages/core/orchestration/src/transaction/errors.ts +++ b/packages/core/orchestration/src/transaction/errors.ts @@ -99,6 +99,22 @@ export class SkipExecutionError extends Error { } } +export class SkipStepAlreadyFinishedError extends Error { + static isSkipStepAlreadyFinishedError( + error: Error + ): error is SkipStepAlreadyFinishedError { + return ( + error instanceof SkipStepAlreadyFinishedError || + error?.name === "SkipStepAlreadyFinishedError" + ) + } + + constructor(message?: string) { + super(message) + this.name = "SkipStepAlreadyFinishedError" + } +} + export class SkipCancelledExecutionError extends Error { static isSkipCancelledExecutionError( error: Error diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 1efad116eb..c1c39b6c72 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -33,6 +33,7 @@ import { PermanentStepFailureError, SkipCancelledExecutionError, SkipExecutionError, + SkipStepAlreadyFinishedError, SkipStepResponse, TransactionStepTimeoutError, TransactionTimeoutError, @@ -117,7 +118,8 @@ export class TransactionOrchestrator extends EventEmitter { private static isExpectedError(error: Error): boolean { return ( SkipCancelledExecutionError.isSkipCancelledExecutionError(error) || - SkipExecutionError.isSkipExecutionError(error) + SkipExecutionError.isSkipExecutionError(error) || + SkipStepAlreadyFinishedError.isSkipStepAlreadyFinishedError(error) ) } @@ -415,10 +417,24 @@ export class TransactionOrchestrator extends EventEmitter { stepDef.definition.retryIntervalAwaiting! ) } + } else if (stepDef.retryRescheduledAt) { + // The step is not configured for awaiting retry but is manually force to retry + stepDef.retryRescheduledAt = null + nextSteps.push(stepDef) } continue } else if (curState.status === TransactionStepStatus.TEMPORARY_FAILURE) { + if ( + !stepDef.temporaryFailedAt && + stepDef.definition.autoRetry === false + ) { + stepDef.temporaryFailedAt = Date.now() + continue + } + + stepDef.temporaryFailedAt = null + currentSteps.push(stepDef) if (!stepDef.canRetry()) { @@ -565,6 +581,27 @@ export class TransactionOrchestrator extends EventEmitter { } } + private static async retryStep( + transaction: DistributedTransactionType, + step: TransactionStep + ): Promise { + if (!step.retryRescheduledAt) { + step.hasScheduledRetry = true + step.retryRescheduledAt = Date.now() + } + + transaction.getFlow().hasWaitingSteps = true + + try { + await transaction.saveCheckpoint() + await transaction.scheduleRetry(step, 0) + } catch (error) { + if (!TransactionOrchestrator.isExpectedError(error)) { + throw error + } + } + } + private static async skipStep({ transaction, step, @@ -677,10 +714,13 @@ export class TransactionOrchestrator extends EventEmitter { stopExecution: boolean transactionIsCancelling?: boolean }> { + const result = { + stopExecution: false, + transactionIsCancelling: false, + } + if (SkipExecutionError.isSkipExecutionError(error)) { - return { - stopExecution: false, - } + return result } step.failures++ @@ -773,10 +813,21 @@ export class TransactionOrchestrator extends EventEmitter { if (step.hasTimeout()) { cleaningUp.push(transaction.clearStepTimeout(step)) } + } else { + const isAsync = step.isCompensating() + ? step.definition.compensateAsync + : step.definition.async + + if ( + step.getStates().status === TransactionStepStatus.TEMPORARY_FAILURE && + step.definition.autoRetry === false && + isAsync + ) { + step.temporaryFailedAt = Date.now() + result.stopExecution = true + } } - let transactionIsCancelling = false - let shouldEmit = true try { await transaction.saveCheckpoint() } catch (error) { @@ -784,11 +835,11 @@ export class TransactionOrchestrator extends EventEmitter { throw error } - transactionIsCancelling = + result.transactionIsCancelling = SkipCancelledExecutionError.isSkipCancelledExecutionError(error) if (SkipExecutionError.isSkipExecutionError(error)) { - shouldEmit = false + result.stopExecution = true } } @@ -798,7 +849,7 @@ export class TransactionOrchestrator extends EventEmitter { await promiseAll(cleaningUp) - if (shouldEmit) { + if (!result.stopExecution) { const eventName = step.isCompensating() ? DistributedTransactionEvent.COMPENSATE_STEP_FAILURE : DistributedTransactionEvent.STEP_FAILURE @@ -806,8 +857,8 @@ export class TransactionOrchestrator extends EventEmitter { } return { - stopExecution: !shouldEmit, - transactionIsCancelling, + stopExecution: result.stopExecution, + transactionIsCancelling: result.transactionIsCancelling, } } @@ -1732,6 +1783,50 @@ export class TransactionOrchestrator extends EventEmitter { return curTransaction } + /** + * Manually force a step to retry even if it is still in awaiting status + * @param responseIdempotencyKey - The idempotency key for the step + * @param handler - The handler function to execute the step + * @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey + */ + public async retryStep({ + responseIdempotencyKey, + handler, + transaction, + onLoad, + }: { + responseIdempotencyKey: string + handler?: TransactionStepHandler + transaction?: DistributedTransactionType + onLoad?: (transaction: DistributedTransactionType) => Promise | void + }): Promise { + const [curTransaction, step] = + await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey( + responseIdempotencyKey, + handler, + transaction + ) + + if (onLoad) { + await onLoad(curTransaction) + } + + if (step.getStates().status === TransactionStepStatus.WAITING) { + this.emit(DistributedTransactionEvent.RESUME, { + transaction: curTransaction, + }) + + await TransactionOrchestrator.retryStep(curTransaction, step) + } else { + throw new MedusaError( + MedusaError.Types.NOT_ALLOWED, + `Cannot retry step when status is ${step.getStates().status}` + ) + } + + return curTransaction + } + /** Register a step success for a specific transaction and step * @param responseIdempotencyKey - The idempotency key for the step * @param handler - The handler function to execute the step diff --git a/packages/core/orchestration/src/transaction/transaction-step.ts b/packages/core/orchestration/src/transaction/transaction-step.ts index e1157abf90..0f1984a2f0 100644 --- a/packages/core/orchestration/src/transaction/transaction-step.ts +++ b/packages/core/orchestration/src/transaction/transaction-step.ts @@ -54,6 +54,7 @@ export class TransactionStep { } attempts: number failures: number + temporaryFailedAt: number | null lastAttempt: number | null retryRescheduledAt: number | null hasScheduledRetry: boolean @@ -189,7 +190,9 @@ export class TransactionStep { this.hasAwaitingRetry() && this.lastAttempt && Date.now() - this.lastAttempt > - this.definition.retryIntervalAwaiting! * 1e3 + this.definition.retryIntervalAwaiting! * 1e3 && + (!("maxAwaitingRetries" in this.definition) || + this.attempts < this.definition.maxAwaitingRetries!) ) } @@ -199,7 +202,8 @@ export class TransactionStep { (!this.isCompensating() && state === TransactionStepState.NOT_STARTED && flowState === TransactionState.INVOKING) || - status === TransactionStepStatus.TEMPORARY_FAILURE + (status === TransactionStepStatus.TEMPORARY_FAILURE && + !this.temporaryFailedAt) ) } diff --git a/packages/core/orchestration/src/transaction/types.ts b/packages/core/orchestration/src/transaction/types.ts index e50fd3c827..831305c3a3 100644 --- a/packages/core/orchestration/src/transaction/types.ts +++ b/packages/core/orchestration/src/transaction/types.ts @@ -47,6 +47,12 @@ export type TransactionStepsDefinition = { */ maxRetries?: number + /** + * If true, the step will be retried automatically in case of a temporary failure. + * The default is true. + */ + autoRetry?: boolean + /** * The interval (in seconds) between retry attempts after a temporary failure. * The default is to retry immediately. @@ -58,6 +64,11 @@ export type TransactionStepsDefinition = { */ retryIntervalAwaiting?: number + /** + * The maximum number of times to retry a step even if its status is "TransactionStepStatus.WAITING". + */ + maxAwaitingRetries?: number + /** * The maximum amount of time (in seconds) to wait for this step to complete. * This is NOT an execution timeout, the step will always be executed and wait for its response. @@ -279,6 +290,7 @@ export type TransactionFlow = { timedOutAt: number | null startedAt?: number cancelledAt?: number + temporaryFailedAt?: number | null state: TransactionState steps: { [key: string]: TransactionStep diff --git a/packages/core/orchestration/src/workflow/local-workflow.ts b/packages/core/orchestration/src/workflow/local-workflow.ts index cdcb1d7b62..74e2cad8fa 100644 --- a/packages/core/orchestration/src/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/workflow/local-workflow.ts @@ -435,6 +435,33 @@ export class LocalWorkflow { } } + async retryStep( + idempotencyKey: string, + context?: Context, + subscribe?: DistributedTransactionEvents + ): Promise { + this.medusaContext = context + const { handler, orchestrator } = this.workflow + + const { cleanUpEventListeners } = this.registerEventCallbacks({ + orchestrator, + idempotencyKey, + subscribe, + }) + + const transaction = await orchestrator.retryStep({ + responseIdempotencyKey: idempotencyKey, + handler: handler(this.container_, context), + onLoad: this.onLoad.bind(this), + }) + + try { + return transaction + } finally { + cleanUpEventListeners() + } + } + async registerStepSuccess( idempotencyKey: string, response?: unknown, diff --git a/packages/core/types/src/http/workflow-execution/admin/entities.ts b/packages/core/types/src/http/workflow-execution/admin/entities.ts index df84614402..cddd015240 100644 --- a/packages/core/types/src/http/workflow-execution/admin/entities.ts +++ b/packages/core/types/src/http/workflow-execution/admin/entities.ts @@ -129,6 +129,11 @@ export interface WorkflowExecutionDefinition { * The default is 0 (no retries). */ maxRetries?: number + /** + * If true, the step will be retried automatically in case of a temporary failure. + * The default is true. + */ + autoRetry?: boolean /** * If true, the workflow will not wait for their sibling steps to complete before moving to the next step. */ @@ -142,6 +147,10 @@ export interface WorkflowExecutionDefinition { * The interval (in seconds) to retry a step even if its status is `TransactionStepStatus.WAITING`. */ retryIntervalAwaiting?: number + /** + * The maximum number of times to retry a step even if its status is `TransactionStepStatus.WAITING`. + */ + maxAwaitingRetries?: number /** * If true, the response of this step will be stored. * Default is true. diff --git a/packages/core/types/src/workflows-sdk/service.ts b/packages/core/types/src/workflows-sdk/service.ts index a2a95ab6de..1488525508 100644 --- a/packages/core/types/src/workflows-sdk/service.ts +++ b/packages/core/types/src/workflows-sdk/service.ts @@ -106,6 +106,17 @@ export interface IWorkflowEngineService extends IModuleService { sharedContext?: Context ) + retryStep( + { + idempotencyKey, + options, + }: { + idempotencyKey: string | IdempotencyKeyParts + options?: Record + }, + sharedContext?: Context + ) + subscribe( args: { workflowId: string diff --git a/packages/core/workflows-sdk/src/helper/type.ts b/packages/core/workflows-sdk/src/helper/type.ts index 33c3adcc78..4bf6039c54 100644 --- a/packages/core/workflows-sdk/src/helper/type.ts +++ b/packages/core/workflows-sdk/src/helper/type.ts @@ -25,6 +25,10 @@ export type FlowRegisterStepSuccessOptions = response?: TData } +export type FlowRetryStepOptions = Omit & { + idempotencyKey: string +} + export type FlowRegisterStepFailureOptions = BaseFlowRunOptions & { idempotencyKey: string @@ -93,6 +97,7 @@ export type ExportedWorkflow< TResultOverride extends undefined ? TResult : TResultOverride > > + retryStep: (args?: FlowRetryStepOptions) => Promise cancel: (args?: FlowCancelOptions) => Promise } diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index b5aef4c482..555dad0e66 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -28,6 +28,7 @@ import { FlowCancelOptions, FlowRegisterStepFailureOptions, FlowRegisterStepSuccessOptions, + FlowRetryStepOptions, FlowRunOptions, MainExportedWorkflow, WorkflowResult, @@ -53,7 +54,7 @@ function createContextualWorkflowRunner< container?: LoadedModule[] | MedusaContainer }): Omit< LocalWorkflow, - "run" | "registerStepSuccess" | "registerStepFailure" | "cancel" + "run" | "registerStepSuccess" | "registerStepFailure" | "cancel" | "retryStep" > & ExportedWorkflow { const flow = new LocalWorkflow(workflowId, container!) @@ -62,6 +63,7 @@ function createContextualWorkflowRunner< const originalRegisterStepSuccess = flow.registerStepSuccess.bind(flow) const originalRegisterStepFailure = flow.registerStepFailure.bind(flow) const originalCancel = flow.cancel.bind(flow) + const originalRetryStep = flow.retryStep.bind(flow) const originalExecution = async ( method, @@ -310,6 +312,46 @@ function createContextualWorkflowRunner< } flow.registerStepFailure = newRegisterStepFailure as any + const newRetryStep = async ( + { + idempotencyKey, + context: outerContext, + throwOnError, + logOnError, + events, + container, + }: FlowRetryStepOptions = { + idempotencyKey: "", + } + ) => { + idempotencyKey ??= "" + throwOnError ??= true + logOnError ??= false + + const [, transactionId] = idempotencyKey.split(":") + const context = { + ...outerContext, + transactionId, + __type: MedusaContextType as Context["__type"], + } + + context.eventGroupId ??= ulid() + + return await originalExecution( + originalRetryStep, + { + throwOnError, + container, + logOnError, + }, + idempotencyKey, + undefined, + context, + events + ) + } + flow.retryStep = newRetryStep as any + const newCancel = async ({ transaction, transactionId, @@ -367,7 +409,11 @@ export const exportWorkflow = ( container?: LoadedModule[] | MedusaContainer ): Omit< LocalWorkflow, - "run" | "registerStepSuccess" | "registerStepFailure" | "cancel" + | "run" + | "registerStepSuccess" + | "registerStepFailure" + | "cancel" + | "retryStep" > & ExportedWorkflow { return createContextualWorkflowRunner< @@ -388,11 +434,17 @@ export const exportWorkflow = ( | "run" | "registerStepSuccess" | "registerStepFailure" - | "cancel", + | "cancel" + | "retryStep", TDataOverride, TResultOverride >( - action: "run" | "registerStepSuccess" | "registerStepFailure" | "cancel", + action: + | "run" + | "registerStepSuccess" + | "registerStepFailure" + | "cancel" + | "retryStep", container?: LoadedModule[] | MedusaContainer ) => { const contextualRunner = createContextualWorkflowRunner< @@ -495,6 +547,22 @@ export const exportWorkflow = ( )(inputArgs) } + exportedWorkflow.retryStep = async < + TDataOverride = undefined, + TResultOverride = undefined + >( + args?: FlowRetryStepOptions + ): Promise => { + const container = args?.container + delete args?.container + const inputArgs = { ...args } as FlowRetryStepOptions + + return await buildRunnerFn<"retryStep", TDataOverride, TResultOverride>( + "retryStep", + container + )(inputArgs) + } + exportedWorkflow.cancel = async ( args?: FlowCancelOptions ): Promise => { diff --git a/packages/core/workflows-sdk/src/utils/composer/type.ts b/packages/core/workflows-sdk/src/utils/composer/type.ts index afb0c14b36..24ad3af83b 100644 --- a/packages/core/workflows-sdk/src/utils/composer/type.ts +++ b/packages/core/workflows-sdk/src/utils/composer/type.ts @@ -268,7 +268,11 @@ export type ReturnWorkflow = { container?: LoadedModule[] | MedusaContainer ): Omit< LocalWorkflow, - "run" | "registerStepSuccess" | "registerStepFailure" | "cancel" + | "run" + | "registerStepSuccess" + | "registerStepFailure" + | "cancel" + | "retryStep" > & ExportedWorkflow } & { diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_auto_retries.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_auto_retries.ts new file mode 100644 index 0000000000..7cb1e43ce9 --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_auto_retries.ts @@ -0,0 +1,56 @@ +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/framework/workflows-sdk" + +const step1InvokeMock = jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) +}) + +const step1CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +const step2InvokeMock = jest.fn((input) => { + throw new Error("Temporary failure") +}) + +const step2CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +export { + step1CompensateMock, + step1InvokeMock, + step2CompensateMock, + step2InvokeMock, +} + +const step_1 = createStep("step_1", step1InvokeMock, step1CompensateMock) + +const step_2 = createStep("step_2", step2InvokeMock, step2CompensateMock) + +createWorkflow("workflow_1_auto_retries", function (input) { + step_1(input) + + const ret2 = step_2({ hey: "oh" }).config({ + async: true, + maxRetries: 2, + }) + + return ret2 +}) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_auto_retries_false.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_auto_retries_false.ts new file mode 100644 index 0000000000..acf2a2ac51 --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_auto_retries_false.ts @@ -0,0 +1,57 @@ +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/framework/workflows-sdk" + +const step1InvokeMock = jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) +}) + +const step1CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +const step2InvokeMock = jest.fn((input) => { + throw new Error("Temporary failure") +}) + +const step2CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +export { + step1CompensateMock, + step1InvokeMock, + step2CompensateMock, + step2InvokeMock, +} + +const step_1 = createStep("step_1", step1InvokeMock, step1CompensateMock) + +const step_2 = createStep("step_2", step2InvokeMock, step2CompensateMock) + +createWorkflow("workflow_1_auto_retries_false", function (input) { + step_1(input) + + const ret2 = step_2({ hey: "oh" }).config({ + async: true, + maxRetries: 2, + autoRetry: false, + }) + + return ret2 +}) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_manual_retry_step.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_manual_retry_step.ts new file mode 100644 index 0000000000..7cae96694a --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1_manual_retry_step.ts @@ -0,0 +1,60 @@ +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/framework/workflows-sdk" + +const step1InvokeMock = jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) +}) + +const step1CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +const step2InvokeMock = jest.fn(async (input, context) => { + if (context.metadata.attempt === 1) { + await new Promise((resolve) => setTimeout(resolve, 100000)) + return new StepResponse("success after 100 seconds") + } + + return new StepResponse("success") +}) + +const step2CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +export { + step1CompensateMock, + step1InvokeMock, + step2CompensateMock, + step2InvokeMock, +} + +const step_1 = createStep("step_1", step1InvokeMock, step1CompensateMock) + +const step_2 = createStep("step_2", step2InvokeMock, step2CompensateMock) + +createWorkflow("workflow_1_manual_retry_step", function (input) { + step_1(input) + + const ret2 = step_2({ hey: "oh" }).config({ + async: true, + }) + + return ret2 +}) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index d85bc7f072..32e7d22c76 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -43,18 +43,34 @@ import { workflowEventGroupIdStep1Mock, workflowEventGroupIdStep2Mock, } from "../__fixtures__/workflow_event_group_id" +import { + step1InvokeMock as step1InvokeMockAutoRetries, + step2InvokeMock as step2InvokeMockAutoRetries, + step1CompensateMock as step1CompensateMockAutoRetries, + step2CompensateMock as step2CompensateMockAutoRetries, +} from "../__fixtures__/workflow_1_auto_retries" +import { + step1InvokeMock as step1InvokeMockAutoRetriesFalse, + step2InvokeMock as step2InvokeMockAutoRetriesFalse, + step1CompensateMock as step1CompensateMockAutoRetriesFalse, + step2CompensateMock as step2CompensateMockAutoRetriesFalse, +} from "../__fixtures__/workflow_1_auto_retries_false" +import { + step1InvokeMock as step1InvokeMockManualRetry, + step2InvokeMock as step2InvokeMockManualRetry, +} from "../__fixtures__/workflow_1_manual_retry_step" import { createScheduled } from "../__fixtures__/workflow_scheduled" jest.setTimeout(60000) -const failTrap = (done, name) => { - setTimeoutSync(() => { +const failTrap = (done, name, timeout = 5000) => { + return setTimeoutSync(() => { // REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending console.warn( `Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually. ${name}` ) done() - }, 5000) + }, timeout) } function times(num) { @@ -195,12 +211,13 @@ moduleIntegrationTestRunner({ TransactionState.REVERTED ) done() + clearTimeout(timeout) } }, }) }) - failTrap( + const timeout = failTrap( done, "should cancel an ongoing execution with async unfinished yet step" ) @@ -358,6 +375,137 @@ moduleIntegrationTestRunner({ }) }) + it("should manually retry a step that is taking too long to finish", (done) => { + const transactionId = "transaction-manual-retry" + ulid() + const workflowId = "workflow_1_manual_retry_step" + + void workflowOrcModule + .run(workflowId, { + input: {}, + transactionId, + }) + .then(() => { + expect(step1InvokeMockManualRetry).toHaveBeenCalledTimes(1) + expect(step2InvokeMockManualRetry).toHaveBeenCalledTimes(1) + + void workflowOrcModule.retryStep({ + idempotencyKey: { + workflowId, + transactionId, + stepId: "step_2", + action: "invoke", + }, + }) + }) + + workflowOrcModule.subscribe({ + workflowId, + transactionId, + subscriber: async (event) => { + if (event.eventType === "onFinish") { + expect(step1InvokeMockManualRetry).toHaveBeenCalledTimes(1) + expect(step2InvokeMockManualRetry).toHaveBeenCalledTimes(2) + done() + clearTimeout(timeout) + } + }, + }) + + const timeout = failTrap( + done, + "should manually retry a step that is taking too long to finish" + ) + }) + + it("should retry steps X times automatically when maxRetries is set", (done) => { + const transactionId = "transaction-auto-retries" + ulid() + const workflowId = "workflow_1_auto_retries" + + void workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + workflowOrcModule.subscribe({ + workflowId, + transactionId, + subscriber: async (event) => { + if (event.eventType === "onFinish") { + expect(step1InvokeMockAutoRetries).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetries).toHaveBeenCalledTimes(3) + expect(step1CompensateMockAutoRetries).toHaveBeenCalledTimes(1) + expect(step2CompensateMockAutoRetries).toHaveBeenCalledTimes(1) + done() + clearTimeout(timeout) + } + }, + }) + + const timeout = failTrap( + done, + "should retry steps X times automatically when maxRetries is set" + ) + }) + + it("should not retry steps X times automatically when maxRetries is set and autoRetry is false", async () => { + const transactionId = "transaction-auto-retries" + ulid() + const workflowId = "workflow_1_auto_retries_false" + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) + + let lastExepectHaveBeenCalledTimes = 0 + + workflowOrcModule.subscribe({ + workflowId, + transactionId, + subscriber: async (event) => { + if (event.eventType === "onFinish") { + lastExepectHaveBeenCalledTimes = 1 + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(3) + expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes( + 1 + ) + expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes( + 1 + ) + } + }, + }) + + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) + + await setTimeoutPromise(1000) + + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(2) + expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) + + await setTimeoutPromise(1000) + + expect(lastExepectHaveBeenCalledTimes).toEqual(1) + }) + it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => { const transactionId = "transaction_id" + ulid() const workflowId = "workflow_id" + ulid() @@ -535,12 +683,13 @@ moduleIntegrationTestRunner({ { obj: "return from 3" }, ]) done() + clearTimeout(timeout) } }, }) }) - failTrap( + const timeout = failTrap( done, "should subscribe to a async workflow and receive the response when it finishes" ) @@ -729,6 +878,7 @@ moduleIntegrationTestRunner({ const onFinish = jest.fn(() => { done() + clearTimeout(timeout) }) void workflowOrcModule.subscribe({ @@ -750,7 +900,7 @@ moduleIntegrationTestRunner({ }) expect(onFinish).toHaveBeenCalledTimes(0) - failTrap( + const timeout = failTrap( done, "should subscribe to a async workflow and receive the response when it finishes" ) @@ -812,9 +962,10 @@ moduleIntegrationTestRunner({ workflowId: "workflow_conditional_step", subscriber: (event) => { if (event.eventType === "onFinish") { - done() expect(conditionalStep2Invoke).toHaveBeenCalledTimes(2) expect(conditionalStep3Invoke).toHaveBeenCalledTimes(1) + done() + clearTimeout(timeout) } }, }) @@ -826,7 +977,7 @@ moduleIntegrationTestRunner({ throwOnError: true, }) - failTrap( + const timeout = failTrap( done, "should not run conditional steps if condition is false" ) @@ -837,9 +988,10 @@ moduleIntegrationTestRunner({ workflowId: "workflow_conditional_step", subscriber: (event) => { if (event.eventType === "onFinish") { - done() expect(conditionalStep2Invoke).toHaveBeenCalledTimes(1) expect(conditionalStep3Invoke).toHaveBeenCalledTimes(0) + done() + clearTimeout(timeout) } }, }) @@ -851,7 +1003,7 @@ moduleIntegrationTestRunner({ throwOnError: true, }) - failTrap( + const timeout = failTrap( done, "should not run conditional steps if condition is false" ) @@ -987,7 +1139,6 @@ moduleIntegrationTestRunner({ workflowId: "workflow_parallel_async", subscriber: (event) => { if (event.eventType === "onFinish") { - done() expect(event.errors).toEqual( expect.arrayContaining([ expect.objectContaining({ @@ -999,11 +1150,13 @@ moduleIntegrationTestRunner({ }), ]) ) + done() + clearTimeout(timeout) } }, }) - failTrap( + const timeout = failTrap( done, "should display error when multple async steps are running in parallel" ) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/race.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/race.spec.ts index 1dc8308483..5ac2787d54 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/race.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/race.spec.ts @@ -14,14 +14,14 @@ import "../__fixtures__" jest.setTimeout(300000) -const failTrap = (done, name) => { - setTimeoutSync(() => { +const failTrap = (done, name, timeout = 5000) => { + return setTimeoutSync(() => { // REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending console.warn( `Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually. ${name}` ) done() - }, 5000) + }, timeout) } moduleIntegrationTestRunner({ @@ -87,7 +87,8 @@ moduleIntegrationTestRunner({ expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(1) expect(step2InvokeMock).toHaveBeenCalledTimes(1) expect(transformMock).toHaveBeenCalledTimes(1) - setTimeoutSync(done, 500) + done() + clearTimeout(timeout) } }, }) @@ -99,7 +100,7 @@ moduleIntegrationTestRunner({ }) .catch((e) => e) - failTrap( + const timeout = failTrap( done, "should prevent race continuation of the workflow during retryIntervalAwaiting in background execution" ) @@ -179,7 +180,8 @@ moduleIntegrationTestRunner({ expect(step1CompensateMock).toHaveBeenCalledTimes(1) expect(step2InvokeMock).toHaveBeenCalledTimes(0) expect(transformMock).toHaveBeenCalledTimes(0) - setTimeoutSync(done, 500) + done() + clearTimeout(timeout) } }, }) @@ -193,7 +195,7 @@ moduleIntegrationTestRunner({ }) .catch((e) => e) - failTrap( + const timeout = failTrap( done, "should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution" ) diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 7b4d71e8bd..ff67c1cb22 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -47,6 +47,11 @@ type RegisterStepFailureOptions = Omit< forcePermanentFailure?: boolean } +type RetryStepOptions = Omit< + WorkflowOrchestratorRunOptions, + "transactionId" | "input" | "resultFrom" +> + type IdempotencyKeyParts = { workflowId: string transactionId: string @@ -379,6 +384,72 @@ export class WorkflowOrchestratorService { return transaction } + async retryStep({ + idempotencyKey, + options, + }: { + idempotencyKey: string | IdempotencyKeyParts + options?: RetryStepOptions + }) { + const { + context, + logOnError, + container, + events: eventHandlers, + } = options ?? {} + + let { throwOnError } = options ?? {} + throwOnError ??= true + + const [idempotencyKey_, { workflowId, transactionId }] = + this.buildIdempotencyKeyAndParts(idempotencyKey) + + const exportedWorkflow: any = MedusaWorkflow.getWorkflow(workflowId) + if (!exportedWorkflow) { + throw new Error(`Workflow with id "${workflowId}" not found.`) + } + + const events = this.buildWorkflowEvents({ + customEventHandlers: eventHandlers, + transactionId, + workflowId, + }) + + const ret = await exportedWorkflow.retryStep({ + idempotencyKey: idempotencyKey_, + context, + throwOnError: false, + logOnError, + events, + container: container ?? this.container_, + }) + + if (ret.transaction.hasFinished()) { + const { result, errors } = ret + + this.notify({ + eventType: "onFinish", + workflowId, + transactionId, + state: ret.transaction.getFlow().state as TransactionState, + result, + errors, + }) + + await this.triggerParentStep(ret.transaction, result) + } + + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error + } + + return ret + } + async setStepSuccess({ idempotencyKey, stepResponse, diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts index 1df1e024fe..930780e2a2 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts @@ -3,6 +3,7 @@ import { DAL, FilterableWorkflowExecutionProps, FindConfig, + IdempotencyKeyParts, InferEntityType, InternalModuleDeclaration, ModulesSdkTypes, @@ -206,6 +207,29 @@ export class WorkflowsModuleService< ) } + @InjectSharedContext() + async retryStep( + { + idempotencyKey, + options, + }: { + idempotencyKey: string | IdempotencyKeyParts + options?: Record + }, + @MedusaContext() context: Context = {} + ) { + const options_ = JSON.parse(JSON.stringify(options ?? {})) + + const { manager, transactionManager, ...restContext } = context + + options_.context ??= restContext + + return await this.workflowOrchestratorService_.retryStep({ + idempotencyKey, + options: options_, + }) + } + @InjectSharedContext() async setStepSuccess( { diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 19c8a63767..02db64216a 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -5,6 +5,7 @@ import { SchedulerOptions, SkipCancelledExecutionError, SkipExecutionError, + SkipStepAlreadyFinishedError, TransactionCheckpoint, TransactionContext, TransactionFlow, @@ -392,6 +393,53 @@ export class InMemoryDistributedTransactionStorage throw new SkipExecutionError("Already finished by another execution") } + let currentFlowLatestExecutedStep: TransactionStep | undefined + const currentFlowSteps = Object.values(currentFlow.steps || {}) + for (let i = currentFlowSteps.length - 1; i >= 0; i--) { + if (currentFlowSteps[i].lastAttempt) { + currentFlowLatestExecutedStep = currentFlowSteps[i] + break + } + } + + let latestUpdatedFlowLatestExecutedStep: TransactionStep | undefined + const latestUpdatedFlowSteps = Object.values(latestUpdatedFlow.steps || {}) + for (let i = latestUpdatedFlowSteps.length - 1; i >= 0; i--) { + if (latestUpdatedFlowSteps[i].lastAttempt) { + latestUpdatedFlowLatestExecutedStep = latestUpdatedFlowSteps[i] + break + } + } + + /** + * The current flow and the latest updated flow have the same latest executed step. + */ + const isSameLatestExecutedStep = + currentFlowLatestExecutedStep && + latestUpdatedFlowLatestExecutedStep && + currentFlowLatestExecutedStep?.id === + latestUpdatedFlowLatestExecutedStep?.id + + /** + * The current flow's latest executed step has a last attempt ahead of the latest updated + * flow's latest executed step. Therefor it is fine, otherwise another execution has already + * finished the step. + */ + const isCurrentLatestExecutedStepLastAttemptAhead = + currentFlowLatestExecutedStep?.lastAttempt && + latestUpdatedFlowLatestExecutedStep?.lastAttempt && + currentFlowLatestExecutedStep.lastAttempt >= + latestUpdatedFlowLatestExecutedStep.lastAttempt + + if ( + isSameLatestExecutedStep && + !isCurrentLatestExecutedStepLastAttemptAhead + ) { + throw new SkipStepAlreadyFinishedError( + "Step already in execution ahead of the current one" + ) + } + // First ensure that the latest execution was not cancelled, otherwise we skip the execution const latestTransactionCancelledAt = latestUpdatedFlow.cancelledAt const currentTransactionCancelledAt = currentFlow.cancelledAt @@ -405,13 +453,6 @@ export class InMemoryDistributedTransactionStorage ) } - const currentFlowSteps = Object.values(currentFlow.steps || {}) - const latestUpdatedFlowSteps = latestUpdatedFlow.steps - ? Object.values( - latestUpdatedFlow.steps as Record - ) - : [] - // Predefined states for quick lookup const invokingStates = [ TransactionStepState.INVOKING, diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_auto_retries.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_auto_retries.ts new file mode 100644 index 0000000000..7cb1e43ce9 --- /dev/null +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_auto_retries.ts @@ -0,0 +1,56 @@ +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/framework/workflows-sdk" + +const step1InvokeMock = jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) +}) + +const step1CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +const step2InvokeMock = jest.fn((input) => { + throw new Error("Temporary failure") +}) + +const step2CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +export { + step1CompensateMock, + step1InvokeMock, + step2CompensateMock, + step2InvokeMock, +} + +const step_1 = createStep("step_1", step1InvokeMock, step1CompensateMock) + +const step_2 = createStep("step_2", step2InvokeMock, step2CompensateMock) + +createWorkflow("workflow_1_auto_retries", function (input) { + step_1(input) + + const ret2 = step_2({ hey: "oh" }).config({ + async: true, + maxRetries: 2, + }) + + return ret2 +}) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_auto_retries_false.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_auto_retries_false.ts new file mode 100644 index 0000000000..acf2a2ac51 --- /dev/null +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_auto_retries_false.ts @@ -0,0 +1,57 @@ +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/framework/workflows-sdk" + +const step1InvokeMock = jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) +}) + +const step1CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +const step2InvokeMock = jest.fn((input) => { + throw new Error("Temporary failure") +}) + +const step2CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +export { + step1CompensateMock, + step1InvokeMock, + step2CompensateMock, + step2InvokeMock, +} + +const step_1 = createStep("step_1", step1InvokeMock, step1CompensateMock) + +const step_2 = createStep("step_2", step2InvokeMock, step2CompensateMock) + +createWorkflow("workflow_1_auto_retries_false", function (input) { + step_1(input) + + const ret2 = step_2({ hey: "oh" }).config({ + async: true, + maxRetries: 2, + autoRetry: false, + }) + + return ret2 +}) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_manual_retry_step.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_manual_retry_step.ts new file mode 100644 index 0000000000..7cae96694a --- /dev/null +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1_manual_retry_step.ts @@ -0,0 +1,60 @@ +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/framework/workflows-sdk" + +const step1InvokeMock = jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) +}) + +const step1CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +const step2InvokeMock = jest.fn(async (input, context) => { + if (context.metadata.attempt === 1) { + await new Promise((resolve) => setTimeout(resolve, 100000)) + return new StepResponse("success after 100 seconds") + } + + return new StepResponse("success") +}) + +const step2CompensateMock = jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) +}) + +export { + step1CompensateMock, + step1InvokeMock, + step2CompensateMock, + step2InvokeMock, +} + +const step_1 = createStep("step_1", step1InvokeMock, step1CompensateMock) + +const step_2 = createStep("step_2", step2InvokeMock, step2CompensateMock) + +createWorkflow("workflow_1_manual_retry_step", function (input) { + step_1(input) + + const ret2 = step_2({ hey: "oh" }).config({ + async: true, + }) + + return ret2 +}) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 6ef64f77f7..2b17d5fb60 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -38,19 +38,38 @@ import { workflowNotIdempotentWithRetentionStep3Invoke, } from "../__fixtures__" import { createScheduled } from "../__fixtures__/workflow_scheduled" +import { + step1InvokeMock as step1InvokeMockAutoRetries, + step2InvokeMock as step2InvokeMockAutoRetries, + step1CompensateMock as step1CompensateMockAutoRetries, + step2CompensateMock as step2CompensateMockAutoRetries, +} from "../__fixtures__/workflow_1_auto_retries" +import { + step1InvokeMock as step1InvokeMockAutoRetriesFalse, + step2InvokeMock as step2InvokeMockAutoRetriesFalse, + step1CompensateMock as step1CompensateMockAutoRetriesFalse, + step2CompensateMock as step2CompensateMockAutoRetriesFalse, +} from "../__fixtures__/workflow_1_auto_retries_false" + +import { + step1InvokeMock as step1InvokeMockManualRetry, + step2InvokeMock as step2InvokeMockManualRetry, + step1CompensateMock as step1CompensateMockManualRetry, + step2CompensateMock as step2CompensateMockManualRetry, +} from "../__fixtures__/workflow_1_manual_retry_step" import { TestDatabase } from "../utils" import { Redis } from "ioredis" jest.setTimeout(300000) -const failTrap = (done, name) => { - setTimeoutSync(() => { +const failTrap = (done, name, timeout = 5000) => { + return setTimeoutSync(() => { // REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending console.warn( `Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually. ${name}` ) done() - }, 5000) + }, timeout) } function times(num) { @@ -221,12 +240,13 @@ moduleIntegrationTestRunner({ TransactionState.REVERTED ) done() + clearTimeout(timeout) } }, }) }) - failTrap( + const timeout = failTrap( done, "should cancel an ongoing execution with async unfinished yet step" ) @@ -393,6 +413,137 @@ moduleIntegrationTestRunner({ }) }) + it("should manually retry a step that is taking too long to finish", (done) => { + const transactionId = "transaction-manual-retry" + ulid() + const workflowId = "workflow_1_manual_retry_step" + + void workflowOrcModule + .run(workflowId, { + input: {}, + transactionId, + }) + .then(() => { + expect(step1InvokeMockManualRetry).toHaveBeenCalledTimes(1) + expect(step2InvokeMockManualRetry).toHaveBeenCalledTimes(1) + + void workflowOrcModule.retryStep({ + idempotencyKey: { + workflowId, + transactionId, + stepId: "step_2", + action: "invoke", + }, + }) + }) + + workflowOrcModule.subscribe({ + workflowId, + transactionId, + subscriber: async (event) => { + if (event.eventType === "onFinish") { + expect(step1InvokeMockManualRetry).toHaveBeenCalledTimes(1) + expect(step2InvokeMockManualRetry).toHaveBeenCalledTimes(2) + done() + clearTimeout(timeout) + } + }, + }) + + const timeout = failTrap( + done, + "should manually retry a step that is taking too long to finish" + ) + }) + + it("should retry steps X times automatically when maxRetries is set", (done) => { + const transactionId = "transaction-auto-retries" + ulid() + const workflowId = "workflow_1_auto_retries" + + void workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + workflowOrcModule.subscribe({ + workflowId, + transactionId, + subscriber: async (event) => { + if (event.eventType === "onFinish") { + expect(step1InvokeMockAutoRetries).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetries).toHaveBeenCalledTimes(3) + expect(step1CompensateMockAutoRetries).toHaveBeenCalledTimes(1) + expect(step2CompensateMockAutoRetries).toHaveBeenCalledTimes(1) + done() + clearTimeout(timeout) + } + }, + }) + + const timeout = failTrap( + done, + "should retry steps X times automatically when maxRetries is set" + ) + }) + + it("should not retry steps X times automatically when maxRetries is set and autoRetry is false", async () => { + const transactionId = "transaction-auto-retries" + ulid() + const workflowId = "workflow_1_auto_retries_false" + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) + + let lastExepectHaveBeenCalledTimes = 0 + + workflowOrcModule.subscribe({ + workflowId, + transactionId, + subscriber: async (event) => { + if (event.eventType === "onFinish") { + lastExepectHaveBeenCalledTimes = 1 + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(3) + expect( + step1CompensateMockAutoRetriesFalse + ).toHaveBeenCalledTimes(1) + expect( + step2CompensateMockAutoRetriesFalse + ).toHaveBeenCalledTimes(1) + } + }, + }) + + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) + + await setTimeout(1000) + + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(2) + expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) + + await setTimeout(1000) + + expect(lastExepectHaveBeenCalledTimes).toEqual(1) + }) + it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => { const transactionId = "concurrency_transaction_id" + ulid() const workflowId = "concurrency_workflow_id" + ulid() @@ -813,11 +964,12 @@ moduleIntegrationTestRunner({ subscriber: (event) => { if (event.eventType === "onFinish") { done() + clearTimeout(timeout) } }, }) - failTrap(done, "workflow_async_background") + const timeout = failTrap(done, "workflow_async_background") }) it("should subscribe to a async workflow and receive the response when it finishes", (done) => { @@ -840,13 +992,14 @@ moduleIntegrationTestRunner({ if (event.eventType === "onFinish") { onFinish() done() + clearTimeout(timeout) } }, }) expect(onFinish).toHaveBeenCalledTimes(0) - failTrap(done, "workflow_async_background") + const timeout = failTrap(done, "workflow_async_background") }) it("should not skip step if condition is true", function (done) { @@ -866,11 +1019,12 @@ moduleIntegrationTestRunner({ subscriber: (event) => { if (event.eventType === "onFinish") { done() + clearTimeout(timeout) } }, }) - failTrap(done, "wf-when") + const timeout = failTrap(done, "wf-when") }) it("should cancel an async sub workflow when compensating", (done) => { @@ -904,11 +1058,12 @@ moduleIntegrationTestRunner({ }) done() + clearTimeout(timeout) } }, }) - failTrap(done, "workflow_async_background_fail") + const timeout = failTrap(done, "workflow_async_background_fail") }) it("should cancel and revert a completed workflow", async () => { diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts index 3e75c500e8..d84158f021 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts @@ -15,14 +15,14 @@ import "../__fixtures__" jest.setTimeout(300000) -const failTrap = (done, name) => { - setTimeoutSync(() => { +const failTrap = (done, name, timeout = 5000) => { + return setTimeoutSync(() => { // REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending console.warn( `Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually. ${name}` ) done() - }, 5000) + }, timeout) } // REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending @@ -107,6 +107,8 @@ moduleIntegrationTestRunner({ done() } catch (e) { return done(e) + } finally { + clearTimeout(timeout) } } }, @@ -118,7 +120,7 @@ moduleIntegrationTestRunner({ expect(result).toBe("result from step 0") }) - failTrap( + const timeout = failTrap( done, "should prevent race continuation of the workflow during retryIntervalAwaiting in background execution" ) @@ -206,6 +208,8 @@ moduleIntegrationTestRunner({ done() } catch (e) { return done(e) + } finally { + clearTimeout(timeout) } } }, @@ -217,7 +221,7 @@ moduleIntegrationTestRunner({ expect(result).toBe("result from step 0") }) - failTrap( + const timeout = failTrap( done, "should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution" ) diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index 04520fcfe9..d058c432ef 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -55,6 +55,11 @@ type RegisterStepFailureOptions = Omit< forcePermanentFailure?: boolean } +type RetryStepOptions = Omit< + WorkflowOrchestratorRunOptions, + "transactionId" | "input" | "resultFrom" +> + type IdempotencyKeyParts = { workflowId: string transactionId: string @@ -424,6 +429,73 @@ export class WorkflowOrchestratorService { return transaction } + async retryStep({ + idempotencyKey, + options, + }: { + idempotencyKey: string | IdempotencyKeyParts + options?: RetryStepOptions + }) { + const { + context, + logOnError, + container, + events: eventHandlers, + } = options ?? {} + + let { throwOnError } = options ?? {} + throwOnError ??= true + + const [idempotencyKey_, { workflowId, transactionId }] = + this.buildIdempotencyKeyAndParts(idempotencyKey) + + const exportedWorkflow: any = MedusaWorkflow.getWorkflow(workflowId) + if (!exportedWorkflow) { + throw new Error(`Workflow with id "${workflowId}" not found.`) + } + + const events = this.buildWorkflowEvents({ + customEventHandlers: eventHandlers, + transactionId, + workflowId, + }) + + const ret = await exportedWorkflow.retryStep({ + idempotencyKey: idempotencyKey_, + context, + throwOnError: false, + logOnError, + events, + container: container ?? this.container_, + }) + + if (ret.transaction.hasFinished()) { + const { result, errors } = ret + + this.notify({ + isFlowAsync: ret.transaction.getFlow().hasAsyncSteps, + eventType: "onFinish", + workflowId, + transactionId, + state: ret.transaction.getFlow().state as TransactionState, + result, + errors, + }) + + await this.triggerParentStep(ret.transaction, result) + } + + if (throwOnError && (ret.thrownError || ret.errors?.length)) { + if (ret.thrownError) { + throw ret.thrownError + } + + throw ret.errors[0].error + } + + return ret + } + async setStepSuccess({ idempotencyKey, stepResponse, diff --git a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts index 27ca9a9ab5..ed085588dd 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts @@ -3,6 +3,7 @@ import { DAL, FilterableWorkflowExecutionProps, FindConfig, + IdempotencyKeyParts, InferEntityType, InternalModuleDeclaration, ModulesSdkTypes, @@ -269,6 +270,29 @@ export class WorkflowsModuleService< } as any) } + @InjectSharedContext() + async retryStep( + { + idempotencyKey, + options, + }: { + idempotencyKey: string | IdempotencyKeyParts + options?: Record + }, + @MedusaContext() context: Context = {} + ) { + const options_ = JSON.parse(JSON.stringify(options ?? {})) + + const { manager, transactionManager, ...restContext } = context + + options_.context ??= restContext + + return await this.workflowOrchestratorService_.retryStep({ + idempotencyKey, + options: options_, + }) + } + @InjectSharedContext() async subscribe( args: { diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 8789aa174f..dbe27321eb 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -5,6 +5,7 @@ import { SchedulerOptions, SkipCancelledExecutionError, SkipExecutionError, + SkipStepAlreadyFinishedError, TransactionCheckpoint, TransactionContext, TransactionFlow, @@ -719,6 +720,53 @@ export class RedisDistributedTransactionStorage throw new SkipExecutionError("Already finished by another execution") } + let currentFlowLatestExecutedStep: TransactionStep | undefined + const currentFlowSteps = Object.values(currentFlow.steps || {}) + for (let i = currentFlowSteps.length - 1; i >= 0; i--) { + if (currentFlowSteps[i].lastAttempt) { + currentFlowLatestExecutedStep = currentFlowSteps[i] + break + } + } + + let latestUpdatedFlowLatestExecutedStep: TransactionStep | undefined + const latestUpdatedFlowSteps = Object.values(latestUpdatedFlow.steps || {}) + for (let i = latestUpdatedFlowSteps.length - 1; i >= 0; i--) { + if (latestUpdatedFlowSteps[i].lastAttempt) { + latestUpdatedFlowLatestExecutedStep = latestUpdatedFlowSteps[i] + break + } + } + + /** + * The current flow and the latest updated flow have the same latest executed step. + */ + const isSameLatestExecutedStep = + currentFlowLatestExecutedStep && + latestUpdatedFlowLatestExecutedStep && + currentFlowLatestExecutedStep?.id === + latestUpdatedFlowLatestExecutedStep?.id + + /** + * The current flow's latest executed step has a last attempt ahead of the latest updated + * flow's latest executed step. Therefor it is fine, otherwise another execution has already + * finished the step. + */ + const isCurrentLatestExecutedStepLastAttemptAhead = + currentFlowLatestExecutedStep?.lastAttempt && + latestUpdatedFlowLatestExecutedStep?.lastAttempt && + currentFlowLatestExecutedStep.lastAttempt >= + latestUpdatedFlowLatestExecutedStep.lastAttempt + + if ( + isSameLatestExecutedStep && + !isCurrentLatestExecutedStepLastAttemptAhead + ) { + throw new SkipStepAlreadyFinishedError( + "Step already finished by another execution" + ) + } + // First ensure that the latest execution was not cancelled, otherwise we skip the execution const latestTransactionCancelledAt = latestUpdatedFlow.cancelledAt const currentTransactionCancelledAt = currentFlow.cancelledAt @@ -732,13 +780,6 @@ export class RedisDistributedTransactionStorage ) } - const currentFlowSteps = Object.values(currentFlow.steps || {}) - const latestUpdatedFlowSteps = latestUpdatedFlow.steps - ? Object.values( - latestUpdatedFlow.steps as Record - ) - : [] - // Predefined states for quick lookup const invokingStates = [ TransactionStepState.INVOKING,