From 1ea932a56f1d85fd1ebbe4a77f3619fe58d2947a Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Mon, 17 Nov 2025 12:47:57 +0100 Subject: [PATCH] fix(): Identify step that should require a save on checkpoint (#14037) * fix(): Identify step that force save checkpoint * Create sour-peas-provide.md --- .changeset/sour-peas-provide.md | 7 + .../transaction/transaction-orchestrator.ts | 27 ++- .../orchestration/src/transaction/types.ts | 6 + .../__fixtures__/workflow_retry_interval.ts | 74 +++++++ .../workflow_sync_retry_interval.ts | 84 ++++++++ .../__tests__/retry-interval.spec.ts | 180 ++++++++++++++++++ .../workflow-engine-inmemory/package.json | 2 +- .../utils/workflow-orchestrator-storage.ts | 8 +- .../__fixtures__/workflow_retry_interval.ts | 6 +- .../workflow_sync_retry_interval.ts | 84 ++++++++ .../__tests__/retry-interval.spec.ts | 124 ++++++++++-- .../utils/workflow-orchestrator-storage.ts | 8 +- 12 files changed, 571 insertions(+), 39 deletions(-) create mode 100644 .changeset/sour-peas-provide.md create mode 100644 packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_retry_interval.ts create mode 100644 packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync_retry_interval.ts create mode 100644 packages/modules/workflow-engine-inmemory/integration-tests/__tests__/retry-interval.spec.ts create mode 100644 packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync_retry_interval.ts diff --git a/.changeset/sour-peas-provide.md b/.changeset/sour-peas-provide.md new file mode 100644 index 0000000000..d3b9d8968e --- /dev/null +++ b/.changeset/sour-peas-provide.md @@ -0,0 +1,7 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/orchestration": patch +--- + +fix(): Identify step that force save checkpoint diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 7f032e0744..6d4fc3ab38 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -1504,16 +1504,13 @@ export class TransactionOrchestrator extends EventEmitter { const hasTransactionTimeout = !!this.options.timeout const isIdempotent = !!this.options.idempotent - if (hasAsyncSteps) { - this.options.store = true - } - if ( hasStepTimeouts || hasRetriesTimeout || hasTransactionTimeout || isIdempotent || - this.options.retentionTime + this.options.retentionTime || + hasAsyncSteps ) { this.options.store = true } @@ -1628,6 +1625,12 @@ export class TransactionOrchestrator extends EventEmitter { const definitionCopy = { ...obj } as TransactionStepsDefinition delete definitionCopy.next + const isAsync = !!definitionCopy.async + const hasRetryInterval = !!( + definitionCopy.retryInterval || definitionCopy.retryIntervalAwaiting + ) + const hasTimeout = !!definitionCopy.timeout + if (definitionCopy.async) { features.hasAsyncSteps = true } @@ -1647,6 +1650,20 @@ export class TransactionOrchestrator extends EventEmitter { features.hasNestedTransactions = true } + /** + * Force the checkpoint to save even for sync step when they have specific configurations. + */ + definitionCopy.store = !!( + definitionCopy.store || + isAsync || + hasRetryInterval || + hasTimeout + ) + + if (existingSteps?.[id]) { + existingSteps[id].definition.store = definitionCopy.store + } + states[id] = Object.assign( new TransactionStep(), existingSteps?.[id] || { diff --git a/packages/core/orchestration/src/transaction/types.ts b/packages/core/orchestration/src/transaction/types.ts index 1da51fcbdc..94f6e05f9a 100644 --- a/packages/core/orchestration/src/transaction/types.ts +++ b/packages/core/orchestration/src/transaction/types.ts @@ -115,6 +115,12 @@ export type TransactionStepsDefinition = { */ next?: TransactionStepsDefinition | TransactionStepsDefinition[] + /** + * @private + * Whether we need to store checkpoint at this step. + */ + store?: boolean + // TODO: add metadata field for customizations } diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_retry_interval.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_retry_interval.ts new file mode 100644 index 0000000000..40a443087d --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_retry_interval.ts @@ -0,0 +1,74 @@ +/** + * Test fixture for workflow with retry interval + * Tests that steps with retry intervals properly retry after failures + */ + +import { + createStep, + createWorkflow, + StepResponse, + WorkflowResponse, +} from "@medusajs/framework/workflows-sdk" + +// Mock counters to track execution attempts +export const retryIntervalStep1InvokeMock = jest.fn() +export const retryIntervalStep2InvokeMock = jest.fn() + +// Step 1: Fails first 2 times, succeeds on 3rd attempt +const step_1_retry_interval = createStep( + { + name: "step_1_retry_interval", + async: true, + retryInterval: 1, // 1 second retry interval + maxRetries: 3, + }, + async (input: { attemptToSucceedOn: number }) => { + const attemptCount = retryIntervalStep1InvokeMock.mock.calls.length + 1 + retryIntervalStep1InvokeMock(input) + + // Fail until we reach the target attempt + if (attemptCount < input.attemptToSucceedOn) { + throw new Error(`Step 1 failed on attempt ${attemptCount}, will retry`) + } + + return new StepResponse({ + success: true, + attempts: attemptCount, + step: "step_1", + }) + } +) + +// Step 2: Always succeeds (to verify workflow continues after retry) +const step_2_after_retry = createStep( + { + name: "step_2_after_retry", + async: true, + }, + async (input: any) => { + retryIntervalStep2InvokeMock(input) + + return new StepResponse({ + success: true, + step: "step_2", + }) + } +) + +export const workflowRetryIntervalId = "workflow_retry_interval_test" + +createWorkflow( + { + name: workflowRetryIntervalId, + retentionTime: 600, // Keep for 10 minutes for debugging + }, + function (input: { attemptToSucceedOn: number }) { + const step1Result = step_1_retry_interval(input) + const step2Result = step_2_after_retry({ step1Result }) + + return new WorkflowResponse({ + step1: step1Result, + step2: step2Result, + }) + } +) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync_retry_interval.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync_retry_interval.ts new file mode 100644 index 0000000000..2498ca5b6a --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync_retry_interval.ts @@ -0,0 +1,84 @@ +/** + * Test fixture for workflow with retry interval + * Tests that steps with retry intervals properly retry after failures + */ + +import { + createStep, + createWorkflow, + StepResponse, + WorkflowResponse, +} from "@medusajs/framework/workflows-sdk" + +// Mock counters to track execution attempts +export const retryIntervalStep1InvokeMock = jest.fn() +export const retryIntervalStep2InvokeMock = jest.fn() +export const retryIntervalStep0InvokeMock = jest.fn() + +const step_0_retry_interval = createStep( + { + name: "step_0_sync_retry_interval", + }, + async (input: any) => { + retryIntervalStep0InvokeMock(input) + return new StepResponse(input) + } +) + +// Step 1: Fails first 2 times, succeeds on 3rd attempt +const step_1_retry_interval = createStep( + { + name: "step_1_sync_retry_interval", + retryInterval: 1, // 1 second retry interval + maxRetries: 3, + }, + async (input: { attemptToSucceedOn: number }) => { + const attemptCount = retryIntervalStep1InvokeMock.mock.calls.length + 1 + retryIntervalStep1InvokeMock(input) + + // Fail until we reach the target attempt + if (attemptCount < input.attemptToSucceedOn) { + throw new Error(`Step 1 failed on attempt ${attemptCount}, will retry`) + } + + return new StepResponse({ + success: true, + attempts: attemptCount, + step: "step_1", + }) + } +) + +// Step 2: Always succeeds (to verify workflow continues after retry) +const step_2_after_retry = createStep( + { + name: "step_2_sync_after_retry", + }, + async (input: any) => { + retryIntervalStep2InvokeMock(input) + + return new StepResponse({ + success: true, + step: "step_2", + }) + } +) + +export const workflowRetryIntervalId = "workflow_sync_retry_interval_test" + +createWorkflow( + { + name: workflowRetryIntervalId, + retentionTime: 600, // Keep for 10 minutes for debugging + }, + function (input: { attemptToSucceedOn: number }) { + const step0Result = step_0_retry_interval(input) + const step1Result = step_1_retry_interval(step0Result) + const step2Result = step_2_after_retry({ step1Result }) + + return new WorkflowResponse({ + step1: step1Result, + step2: step2Result, + }) + } +) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/retry-interval.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/retry-interval.spec.ts new file mode 100644 index 0000000000..e85f57faef --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/retry-interval.spec.ts @@ -0,0 +1,180 @@ +/** + * Integration test for workflow step retry intervals + * + * This test verifies the fix for the bug where steps with retry intervals + * would get stuck after the first retry attempt due to retryRescheduledAt + * not being properly cleared. + */ + +import { IWorkflowEngineService } from "@medusajs/framework/types" +import { Modules } from "@medusajs/framework/utils" +import { moduleIntegrationTestRunner } from "@medusajs/test-utils" +import { + retryIntervalStep1InvokeMock, + retryIntervalStep2InvokeMock, + workflowRetryIntervalId, +} from "../__fixtures__/workflow_retry_interval" +import { + retryIntervalStep0InvokeMock as retryIntervalStep0InvokeMockSync, + retryIntervalStep1InvokeMock as retryIntervalStep1InvokeMockSync, + retryIntervalStep2InvokeMock as retryIntervalStep2InvokeMockSync, + workflowRetryIntervalId as workflowRetryIntervalIdSync, +} from "../__fixtures__/workflow_sync_retry_interval" + +jest.setTimeout(60000) // Increase timeout for async retries + +moduleIntegrationTestRunner({ + moduleName: Modules.WORKFLOW_ENGINE, + resolve: __dirname + "/../..", + moduleOptions: { + redis: { + url: "localhost:6379", + }, + }, + testSuite: ({ service: workflowOrcModule }) => { + describe("Workflow Retry Interval", function () { + it("should properly retry sync step with retry interval after failures", async () => { + // Configure step to succeed on 3rd attempt + const attemptToSucceedOn = 3 + + // Track when each attempt happens + const attemptTimestamps: number[] = [] + retryIntervalStep1InvokeMockSync.mockImplementation(() => { + attemptTimestamps.push(Date.now()) + }) + + // Create promise to wait for workflow completion + const workflowCompletion = new Promise<{ result: any; errors: any }>( + (resolve) => { + workflowOrcModule.subscribe({ + workflowId: workflowRetryIntervalIdSync, + subscriber: async (data) => { + if (data.eventType === "onFinish") { + resolve({ + result: data.result, + errors: data.errors, + }) + } + }, + }) + } + ) + + // Execute workflow + await workflowOrcModule.run(workflowRetryIntervalIdSync, { + input: { attemptToSucceedOn }, + throwOnError: false, + }) + + // Wait for async workflow to complete + const { result, errors } = await workflowCompletion + + // Assertions + + // Step 0 should have been called once + expect(retryIntervalStep0InvokeMockSync).toHaveBeenCalledTimes(1) + + // Step 1 should have been called 3 times (2 failures + 1 success) + expect(retryIntervalStep1InvokeMockSync).toHaveBeenCalledTimes(3) + expect(retryIntervalStep1InvokeMockSync).toHaveBeenNthCalledWith(1, { + attemptToSucceedOn, + }) + expect(retryIntervalStep1InvokeMockSync).toHaveBeenNthCalledWith(2, { + attemptToSucceedOn, + }) + expect(retryIntervalStep1InvokeMockSync).toHaveBeenNthCalledWith(3, { + attemptToSucceedOn, + }) + + // Step 2 should have been called once (after step 1 succeeded) + expect(retryIntervalStep2InvokeMockSync).toHaveBeenCalledTimes(1) + + // Workflow should complete successfully + expect(errors === undefined || errors.length === 0).toBe(true) + expect(result).toBeDefined() + expect(result.step1).toBeDefined() + expect(result.step1.success).toBe(true) + expect(result.step1.attempts).toBe(3) + + // Verify retry intervals are approximately 1 second (with some tolerance) + if (attemptTimestamps.length >= 2) { + const firstRetryInterval = attemptTimestamps[1] - attemptTimestamps[0] + expect(firstRetryInterval).toBeGreaterThan(800) // At least 800ms + expect(firstRetryInterval).toBeLessThan(2000) // Less than 2s + } + + if (attemptTimestamps.length >= 3) { + const secondRetryInterval = + attemptTimestamps[2] - attemptTimestamps[1] + expect(secondRetryInterval).toBeGreaterThan(800) + expect(secondRetryInterval).toBeLessThan(2000) + } + }) + + it("should properly retry async step with retry interval after failures", async () => { + // Configure step to succeed on 3rd attempt + const attemptToSucceedOn = 3 + + // Track when each attempt happens + const attemptTimestamps: number[] = [] + retryIntervalStep1InvokeMock.mockImplementation(() => { + attemptTimestamps.push(Date.now()) + }) + + // Create promise to wait for workflow completion + const workflowCompletion = new Promise<{ result: any; errors: any }>( + (resolve) => { + workflowOrcModule.subscribe({ + workflowId: workflowRetryIntervalId, + subscriber: async (data) => { + if (data.eventType === "onFinish") { + resolve({ + result: data.result, + errors: data.errors, + }) + } + }, + }) + } + ) + + // Execute workflow + await workflowOrcModule.run(workflowRetryIntervalId, { + input: { attemptToSucceedOn }, + throwOnError: false, + }) + + // Wait for async workflow to complete + const { result, errors } = await workflowCompletion + + // Assertions + // Step 1 should have been called 3 times (2 failures + 1 success) + expect(retryIntervalStep1InvokeMock).toHaveBeenCalledTimes(3) + + // Step 2 should have been called once (after step 1 succeeded) + expect(retryIntervalStep2InvokeMock).toHaveBeenCalledTimes(1) + + // Workflow should complete successfully + expect(errors === undefined || errors.length === 0).toBe(true) + expect(result).toBeDefined() + expect(result.step1).toBeDefined() + expect(result.step1.success).toBe(true) + expect(result.step1.attempts).toBe(3) + + // Verify retry intervals are approximately 1 second (with some tolerance) + if (attemptTimestamps.length >= 2) { + const firstRetryInterval = attemptTimestamps[1] - attemptTimestamps[0] + expect(firstRetryInterval).toBeGreaterThan(800) // At least 800ms + expect(firstRetryInterval).toBeLessThan(2000) // Less than 2s + } + + if (attemptTimestamps.length >= 3) { + const secondRetryInterval = + attemptTimestamps[2] - attemptTimestamps[1] + expect(secondRetryInterval).toBeGreaterThan(800) + expect(secondRetryInterval).toBeLessThan(2000) + } + }) + }) + }, +}) diff --git a/packages/modules/workflow-engine-inmemory/package.json b/packages/modules/workflow-engine-inmemory/package.json index 461fe8ac08..922372922f 100644 --- a/packages/modules/workflow-engine-inmemory/package.json +++ b/packages/modules/workflow-engine-inmemory/package.json @@ -29,7 +29,7 @@ "resolve:aliases": "yarn run -T tsc --showConfig -p tsconfig.json > tsconfig.resolved.json && yarn run -T tsc-alias -p tsconfig.resolved.json && yarn run -T rimraf tsconfig.resolved.json", "build": "yarn run -T rimraf dist && yarn run -T tsc --build && npm run resolve:aliases", "test": "../../../node_modules/.bin/jest --passWithNoTests --bail --forceExit --testPathPattern=src", - "test:integration": "../../../node_modules/.bin/jest --passWithNoTests --forceExit --testPathPattern=\"integration-tests/__tests__/index\\.spec\\.ts\" && ../../../node_modules/.bin/jest --passWithNoTests --forceExit --testPathPattern=\"integration-tests/__tests__/race\\.spec\\.ts\" && ../../../node_modules/.bin/jest --passWithNoTests --forceExit --testPathPattern=\"integration-tests/__tests__/subscribe\\.spec\\.ts\"", + "test:integration": "../../../node_modules/.bin/jest --passWithNoTests --forceExit --testPathPattern=\"integration-tests/__tests__/index\\.spec\\.ts\" && ../../../node_modules/.bin/jest --passWithNoTests --forceExit --testPathPattern=\"integration-tests/__tests__/race\\.spec\\.ts\" && ../../../node_modules/.bin/jest --passWithNoTests --forceExit --testPathPattern=\"integration-tests/__tests__/subscribe\\.spec\\.ts\" && ../../../node_modules/.bin/jest --passWithNoTests --forceExit --testPathPattern=\"integration-tests/__tests__/retry-interval\\.spec\\.ts\"", "migration:initial": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:create --initial", "migration:create": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:create", "migration:up": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:up", diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index f9db3bd756..0f110043de 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -190,7 +190,6 @@ export class InMemoryDistributedTransactionStorage const stepsArray = Object.values(data.flow.steps) as TransactionStep[] let currentStep!: TransactionStep - let currentStepsIsAsync = false const targetStates = isFlowInvoking ? new Set([ @@ -215,6 +214,7 @@ export class InMemoryDistributedTransactionStorage } } + let shouldStoreCurrentSteps = false if (currentStep) { for (const step of stepsArray) { if (step.id === "_root") { @@ -223,9 +223,9 @@ export class InMemoryDistributedTransactionStorage if ( step.depth === currentStep.depth && - step?.definition?.async === true + step?.definition?.store === true ) { - currentStepsIsAsync = true + shouldStoreCurrentSteps = true break } } @@ -233,7 +233,7 @@ export class InMemoryDistributedTransactionStorage if ( !(isNotStarted || isFinished || isWaitingToCompensate) && - !currentStepsIsAsync && + !shouldStoreCurrentSteps && !asyncVersion ) { return diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_retry_interval.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_retry_interval.ts index 12e8126224..40a443087d 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_retry_interval.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_retry_interval.ts @@ -28,9 +28,7 @@ const step_1_retry_interval = createStep( // Fail until we reach the target attempt if (attemptCount < input.attemptToSucceedOn) { - throw new Error( - `Step 1 failed on attempt ${attemptCount}, will retry` - ) + throw new Error(`Step 1 failed on attempt ${attemptCount}, will retry`) } return new StepResponse({ @@ -47,7 +45,7 @@ const step_2_after_retry = createStep( name: "step_2_after_retry", async: true, }, - async (input) => { + async (input: any) => { retryIntervalStep2InvokeMock(input) return new StepResponse({ diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync_retry_interval.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync_retry_interval.ts new file mode 100644 index 0000000000..2498ca5b6a --- /dev/null +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync_retry_interval.ts @@ -0,0 +1,84 @@ +/** + * Test fixture for workflow with retry interval + * Tests that steps with retry intervals properly retry after failures + */ + +import { + createStep, + createWorkflow, + StepResponse, + WorkflowResponse, +} from "@medusajs/framework/workflows-sdk" + +// Mock counters to track execution attempts +export const retryIntervalStep1InvokeMock = jest.fn() +export const retryIntervalStep2InvokeMock = jest.fn() +export const retryIntervalStep0InvokeMock = jest.fn() + +const step_0_retry_interval = createStep( + { + name: "step_0_sync_retry_interval", + }, + async (input: any) => { + retryIntervalStep0InvokeMock(input) + return new StepResponse(input) + } +) + +// Step 1: Fails first 2 times, succeeds on 3rd attempt +const step_1_retry_interval = createStep( + { + name: "step_1_sync_retry_interval", + retryInterval: 1, // 1 second retry interval + maxRetries: 3, + }, + async (input: { attemptToSucceedOn: number }) => { + const attemptCount = retryIntervalStep1InvokeMock.mock.calls.length + 1 + retryIntervalStep1InvokeMock(input) + + // Fail until we reach the target attempt + if (attemptCount < input.attemptToSucceedOn) { + throw new Error(`Step 1 failed on attempt ${attemptCount}, will retry`) + } + + return new StepResponse({ + success: true, + attempts: attemptCount, + step: "step_1", + }) + } +) + +// Step 2: Always succeeds (to verify workflow continues after retry) +const step_2_after_retry = createStep( + { + name: "step_2_sync_after_retry", + }, + async (input: any) => { + retryIntervalStep2InvokeMock(input) + + return new StepResponse({ + success: true, + step: "step_2", + }) + } +) + +export const workflowRetryIntervalId = "workflow_sync_retry_interval_test" + +createWorkflow( + { + name: workflowRetryIntervalId, + retentionTime: 600, // Keep for 10 minutes for debugging + }, + function (input: { attemptToSucceedOn: number }) { + const step0Result = step_0_retry_interval(input) + const step1Result = step_1_retry_interval(step0Result) + const step2Result = step_2_after_retry({ step1Result }) + + return new WorkflowResponse({ + step1: step1Result, + step2: step2Result, + }) + } +) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/retry-interval.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/retry-interval.spec.ts index 8923de4e69..7e93d63313 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/retry-interval.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/retry-interval.spec.ts @@ -9,12 +9,17 @@ import { IWorkflowEngineService } from "@medusajs/framework/types" import { Modules } from "@medusajs/framework/utils" import { moduleIntegrationTestRunner } from "@medusajs/test-utils" -import { setTimeout } from "timers/promises" import { retryIntervalStep1InvokeMock, retryIntervalStep2InvokeMock, workflowRetryIntervalId, } from "../__fixtures__/workflow_retry_interval" +import { + retryIntervalStep0InvokeMock as retryIntervalStep0InvokeMockSync, + retryIntervalStep1InvokeMock as retryIntervalStep1InvokeMockSync, + retryIntervalStep2InvokeMock as retryIntervalStep2InvokeMockSync, + workflowRetryIntervalId as workflowRetryIntervalIdSync, +} from "../__fixtures__/workflow_sync_retry_interval" import { TestDatabase } from "../utils" jest.setTimeout(60000) // Increase timeout for async retries @@ -38,6 +43,84 @@ moduleIntegrationTestRunner({ await TestDatabase.clearTables() }) + it("should properly retry sync step with retry interval after failures", async () => { + // Configure step to succeed on 3rd attempt + const attemptToSucceedOn = 3 + + // Track when each attempt happens + const attemptTimestamps: number[] = [] + retryIntervalStep1InvokeMockSync.mockImplementation(() => { + attemptTimestamps.push(Date.now()) + }) + + // Create promise to wait for workflow completion + const workflowCompletion = new Promise<{ result: any; errors: any }>( + (resolve) => { + workflowOrcModule.subscribe({ + workflowId: workflowRetryIntervalIdSync, + subscriber: async (data) => { + if (data.eventType === "onFinish") { + resolve({ + result: data.result, + errors: data.errors, + }) + } + }, + }) + } + ) + + // Execute workflow + await workflowOrcModule.run(workflowRetryIntervalIdSync, { + input: { attemptToSucceedOn }, + throwOnError: false, + }) + + // Wait for async workflow to complete + const { result, errors } = await workflowCompletion + + // Assertions + + // Step 0 should have been called once + expect(retryIntervalStep0InvokeMockSync).toHaveBeenCalledTimes(1) + + // Step 1 should have been called 3 times (2 failures + 1 success) + expect(retryIntervalStep1InvokeMockSync).toHaveBeenCalledTimes(3) + expect(retryIntervalStep1InvokeMockSync).toHaveBeenNthCalledWith(1, { + attemptToSucceedOn, + }) + expect(retryIntervalStep1InvokeMockSync).toHaveBeenNthCalledWith(2, { + attemptToSucceedOn, + }) + expect(retryIntervalStep1InvokeMockSync).toHaveBeenNthCalledWith(3, { + attemptToSucceedOn, + }) + + // Step 2 should have been called once (after step 1 succeeded) + expect(retryIntervalStep2InvokeMockSync).toHaveBeenCalledTimes(1) + + // Workflow should complete successfully + expect(errors === undefined || errors.length === 0).toBe(true) + expect(result).toBeDefined() + expect(result.step1).toBeDefined() + expect(result.step1.success).toBe(true) + expect(result.step1.attempts).toBe(3) + + // Verify retry intervals are approximately 1 second (with some tolerance) + if (attemptTimestamps.length >= 2) { + const firstRetryInterval = attemptTimestamps[1] - attemptTimestamps[0] + expect(firstRetryInterval).toBeGreaterThan(800) // At least 800ms + expect(firstRetryInterval).toBeLessThan(2000) // Less than 2s + } + + if (attemptTimestamps.length >= 3) { + const secondRetryInterval = + attemptTimestamps[2] - attemptTimestamps[1] + expect(secondRetryInterval).toBeGreaterThan(800) + expect(secondRetryInterval).toBeLessThan(2000) + } + }) + it("should properly retry async step with retry interval after failures", async () => { // Configure step to succeed on 3rd attempt const attemptToSucceedOn = 3 @@ -49,29 +132,28 @@ moduleIntegrationTestRunner({ }) // Create promise to wait for workflow completion - const workflowCompletion = new Promise<{ result: any; errors: any }>((resolve) => { - workflowOrcModule.subscribe({ - workflowId: workflowRetryIntervalId, - subscriber: async (data) => { - if (data.eventType === "onFinish") { - resolve({ - result: data.result, - errors: data.errors, - }) - } - }, - }) - }) - - // Execute workflow - await workflowOrcModule.run( - workflowRetryIntervalId, - { - input: { attemptToSucceedOn }, - throwOnError: false, + const workflowCompletion = new Promise<{ result: any; errors: any }>( + (resolve) => { + workflowOrcModule.subscribe({ + workflowId: workflowRetryIntervalId, + subscriber: async (data) => { + if (data.eventType === "onFinish") { + resolve({ + result: data.result, + errors: data.errors, + }) + } + }, + }) } ) + // Execute workflow + await workflowOrcModule.run(workflowRetryIntervalId, { + input: { attemptToSucceedOn }, + throwOnError: false, + }) + // Wait for async workflow to complete const { result, errors } = await workflowCompletion diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 3e4584571c..cb4a3c6f65 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -291,7 +291,6 @@ export class RedisDistributedTransactionStorage const stepsArray = Object.values(data.flow.steps) as TransactionStep[] let currentStep!: TransactionStep - let currentStepsIsAsync = false const targetStates = isFlowInvoking ? new Set([ @@ -316,6 +315,7 @@ export class RedisDistributedTransactionStorage } } + let shouldStoreCurrentSteps = false if (currentStep) { for (const step of stepsArray) { if (step.id === "_root") { @@ -324,9 +324,9 @@ export class RedisDistributedTransactionStorage if ( step.depth === currentStep.depth && - step?.definition?.async === true + step?.definition?.store === true ) { - currentStepsIsAsync = true + shouldStoreCurrentSteps = true break } } @@ -334,7 +334,7 @@ export class RedisDistributedTransactionStorage if ( !(isNotStarted || isFinished || isWaitingToCompensate) && - !currentStepsIsAsync && + !shouldStoreCurrentSteps && !asyncVersion ) { return