From bad0858348dca100533bb6fcdcdefe90b0668407 Mon Sep 17 00:00:00 2001 From: Sebastian Rindom Date: Tue, 21 Oct 2025 20:27:21 +0200 Subject: [PATCH] fix: prevent jobId collisions on workflow step retries (#13786) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary **What** — What changes are introduced in this PR? This PR fixes a bug where async workflow steps with retry intervals would get stuck after the first retry attempt due to Bull queue jobId collisions preventing retry jobs from executing. **Why** — Why are these changes relevant or necessary? Workflows using async steps with retry configurations (e.g., `retryInterval: 1`, `maxRetries: 5`) would fail once, schedule a retry, but the retry job would never execute, causing workflows to hang indefinitely. **How** — How have these changes been implemented? **Root Cause:** Bull queue was rejecting retry jobs because they had identical jobIds to the async execution jobs that already completed. Both used the format: `retry:workflow:transaction:step_id:attempts`. **Solution:** Modified `getJobId()` in `workflow-orchestrator-storage.ts` to append a `:retry` suffix when `interval > 0`, creating unique jobIds: - Async execution (interval=0): `retry:...:step_id:1` - Retry scheduling (interval>0): `retry:...:step_id:1:retry` Updated methods: `getJobId()`, `scheduleRetry()`, `removeJob()`, and `clearRetry()` to pass and handle the interval parameter. **Testing** — How have these changes been tested, or how can the reviewer test the feature? Added integration test `retry-interval.spec.ts` that verifies: 1. Step with `retryInterval: 1` and `maxRetries: 3` executes 3 times 2. Retry intervals are approximately 1 second between attempts 3. Workflow completes successfully after retries 4. Uses proper async workflow completion pattern with `subscribe()` and `onFinish` event --- ## Examples ```ts // Example workflow step that would previously get stuck export const testRetryStep = createStep( { name: "test-retry-step", async: true, retryInterval: 1, // 1 second retry interval maxRetries: 3, }, async (input: any) => { // Simulate failure on first 2 attempts if (attempts < 3) { throw new Error("Temporary failure - will retry") } return { success: true } } ) // Before fix: Step would fail once, schedule retry, but retry job never fired (jobId collision) // After fix: Step properly retries up to 3 times with 1-second intervals ``` --- ## Checklist Please ensure the following before requesting a review: - [ ] I have added a **changeset** for this PR - Every non-breaking change should be marked as a **patch** - To add a changeset, run `yarn changeset` and follow the prompts - [ ] The changes are covered by relevant **tests** - [ ] I have verified the code works as intended locally - [ ] I have linked the related issue(s) if applicable --- ## Additional Context - Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com> --- .changeset/chilled-chicken-bow.md | 6 + .../src/transaction/transaction-step.ts | 2 +- .../integration-tests/__fixtures__/index.ts | 1 + .../__fixtures__/workflow_retry_interval.ts | 76 ++++++++++++ .../__tests__/retry-interval.spec.ts | 108 ++++++++++++++++++ .../workflow-engine-redis/package.json | 2 +- .../utils/workflow-orchestrator-storage.ts | 21 +++- 7 files changed, 209 insertions(+), 7 deletions(-) create mode 100644 .changeset/chilled-chicken-bow.md create mode 100644 packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_retry_interval.ts create mode 100644 packages/modules/workflow-engine-redis/integration-tests/__tests__/retry-interval.spec.ts diff --git a/.changeset/chilled-chicken-bow.md b/.changeset/chilled-chicken-bow.md new file mode 100644 index 0000000000..02a5413f25 --- /dev/null +++ b/.changeset/chilled-chicken-bow.md @@ -0,0 +1,6 @@ +--- +"@medusajs/workflow-engine-redis": patch +"@medusajs/orchestration": patch +--- + +fix: avoid jobId collisions on retry diff --git a/packages/core/orchestration/src/transaction/transaction-step.ts b/packages/core/orchestration/src/transaction/transaction-step.ts index aa9feef2dc..61cda4baa2 100644 --- a/packages/core/orchestration/src/transaction/transaction-step.ts +++ b/packages/core/orchestration/src/transaction/transaction-step.ts @@ -177,7 +177,7 @@ export class TransactionStep { !!( this.lastAttempt && this.definition.retryInterval && - Date.now() - this.lastAttempt > this.definition.retryInterval * 1e3 + Date.now() - this.lastAttempt >= this.definition.retryInterval * 1e3 ) ) } diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/index.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/index.ts index c256744924..3943091cea 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/index.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/index.ts @@ -7,3 +7,4 @@ export * from "./workflow_sync" export * from "./workflow_transaction_timeout" export * from "./workflow_when" export * from "./workflow_not_idempotent_with_retention" +export * from "./workflow_retry_interval" diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_retry_interval.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_retry_interval.ts new file mode 100644 index 0000000000..12e8126224 --- /dev/null +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_retry_interval.ts @@ -0,0 +1,76 @@ +/** + * Test fixture for workflow with retry interval + * Tests that steps with retry intervals properly retry after failures + */ + +import { + createStep, + createWorkflow, + StepResponse, + WorkflowResponse, +} from "@medusajs/framework/workflows-sdk" + +// Mock counters to track execution attempts +export const retryIntervalStep1InvokeMock = jest.fn() +export const retryIntervalStep2InvokeMock = jest.fn() + +// Step 1: Fails first 2 times, succeeds on 3rd attempt +const step_1_retry_interval = createStep( + { + name: "step_1_retry_interval", + async: true, + retryInterval: 1, // 1 second retry interval + maxRetries: 3, + }, + async (input: { attemptToSucceedOn: number }) => { + const attemptCount = retryIntervalStep1InvokeMock.mock.calls.length + 1 + retryIntervalStep1InvokeMock(input) + + // Fail until we reach the target attempt + if (attemptCount < input.attemptToSucceedOn) { + throw new Error( + `Step 1 failed on attempt ${attemptCount}, will retry` + ) + } + + return new StepResponse({ + success: true, + attempts: attemptCount, + step: "step_1", + }) + } +) + +// Step 2: Always succeeds (to verify workflow continues after retry) +const step_2_after_retry = createStep( + { + name: "step_2_after_retry", + async: true, + }, + async (input) => { + retryIntervalStep2InvokeMock(input) + + return new StepResponse({ + success: true, + step: "step_2", + }) + } +) + +export const workflowRetryIntervalId = "workflow_retry_interval_test" + +createWorkflow( + { + name: workflowRetryIntervalId, + retentionTime: 600, // Keep for 10 minutes for debugging + }, + function (input: { attemptToSucceedOn: number }) { + const step1Result = step_1_retry_interval(input) + const step2Result = step_2_after_retry({ step1Result }) + + return new WorkflowResponse({ + step1: step1Result, + step2: step2Result, + }) + } +) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/retry-interval.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/retry-interval.spec.ts new file mode 100644 index 0000000000..8923de4e69 --- /dev/null +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/retry-interval.spec.ts @@ -0,0 +1,108 @@ +/** + * Integration test for workflow step retry intervals + * + * This test verifies the fix for the bug where steps with retry intervals + * would get stuck after the first retry attempt due to retryRescheduledAt + * not being properly cleared. + */ + +import { IWorkflowEngineService } from "@medusajs/framework/types" +import { Modules } from "@medusajs/framework/utils" +import { moduleIntegrationTestRunner } from "@medusajs/test-utils" +import { setTimeout } from "timers/promises" +import { + retryIntervalStep1InvokeMock, + retryIntervalStep2InvokeMock, + workflowRetryIntervalId, +} from "../__fixtures__/workflow_retry_interval" +import { TestDatabase } from "../utils" + +jest.setTimeout(60000) // Increase timeout for async retries + +moduleIntegrationTestRunner({ + moduleName: Modules.WORKFLOW_ENGINE, + resolve: __dirname + "/../..", + moduleOptions: { + redis: { + url: "localhost:6379", + }, + }, + testSuite: ({ service: workflowOrcModule }) => { + describe("Workflow Retry Interval", function () { + beforeEach(async () => { + await TestDatabase.clearTables() + jest.clearAllMocks() + }) + + afterEach(async () => { + await TestDatabase.clearTables() + }) + + it("should properly retry async step with retry interval after failures", async () => { + // Configure step to succeed on 3rd attempt + const attemptToSucceedOn = 3 + + // Track when each attempt happens + const attemptTimestamps: number[] = [] + retryIntervalStep1InvokeMock.mockImplementation(() => { + attemptTimestamps.push(Date.now()) + }) + + // Create promise to wait for workflow completion + const workflowCompletion = new Promise<{ result: any; errors: any }>((resolve) => { + workflowOrcModule.subscribe({ + workflowId: workflowRetryIntervalId, + subscriber: async (data) => { + if (data.eventType === "onFinish") { + resolve({ + result: data.result, + errors: data.errors, + }) + } + }, + }) + }) + + // Execute workflow + await workflowOrcModule.run( + workflowRetryIntervalId, + { + input: { attemptToSucceedOn }, + throwOnError: false, + } + ) + + // Wait for async workflow to complete + const { result, errors } = await workflowCompletion + + // Assertions + // Step 1 should have been called 3 times (2 failures + 1 success) + expect(retryIntervalStep1InvokeMock).toHaveBeenCalledTimes(3) + + // Step 2 should have been called once (after step 1 succeeded) + expect(retryIntervalStep2InvokeMock).toHaveBeenCalledTimes(1) + + // Workflow should complete successfully + expect(errors === undefined || errors.length === 0).toBe(true) + expect(result).toBeDefined() + expect(result.step1).toBeDefined() + expect(result.step1.success).toBe(true) + expect(result.step1.attempts).toBe(3) + + // Verify retry intervals are approximately 1 second (with some tolerance) + if (attemptTimestamps.length >= 2) { + const firstRetryInterval = attemptTimestamps[1] - attemptTimestamps[0] + expect(firstRetryInterval).toBeGreaterThan(800) // At least 800ms + expect(firstRetryInterval).toBeLessThan(2000) // Less than 2s + } + + if (attemptTimestamps.length >= 3) { + const secondRetryInterval = + attemptTimestamps[2] - attemptTimestamps[1] + expect(secondRetryInterval).toBeGreaterThan(800) + expect(secondRetryInterval).toBeLessThan(2000) + } + }) + }) + }, +}) diff --git a/packages/modules/workflow-engine-redis/package.json b/packages/modules/workflow-engine-redis/package.json index 079614e6a6..1d401480dd 100644 --- a/packages/modules/workflow-engine-redis/package.json +++ b/packages/modules/workflow-engine-redis/package.json @@ -29,7 +29,7 @@ "resolve:aliases": "tsc --showConfig -p tsconfig.json > tsconfig.resolved.json && tsc-alias -p tsconfig.resolved.json && rimraf tsconfig.resolved.json", "build": "rimraf dist && tsc --build && npm run resolve:aliases", "test": "jest --passWithNoTests --bail --forceExit -- src", - "test:integration": "jest --forceExit --runInBand -- integration-tests/**/__tests__/index.spec.ts && jest --forceExit --runInBand -- integration-tests/**/__tests__/race.spec.ts", + "test:integration": "jest --forceExit -- integration-tests/**/__tests__/index.spec.ts && jest --forceExit -- integration-tests/**/__tests__/race.spec.ts && jest --forceExit -- integration-tests/**/__tests__/retry-interval.spec.ts", "migration:initial": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:create --initial", "migration:create": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:create", "migration:up": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:up", diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index bdb0868f47..6cf1295894 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -16,6 +16,7 @@ import { } from "@medusajs/framework/orchestration" import { Logger, ModulesSdkTypes } from "@medusajs/framework/types" import { + isDefined, isPresent, MedusaError, promiseAll, @@ -594,7 +595,7 @@ export class RedisDistributedTransactionStorage }, { delay: interval > 0 ? interval * 1000 : undefined, - jobId: this.getJobId(JobType.RETRY, transaction, step), + jobId: this.getJobId(JobType.RETRY, transaction, step, interval), removeOnComplete: true, } ) @@ -604,7 +605,9 @@ export class RedisDistributedTransactionStorage transaction: DistributedTransactionType, step: TransactionStep ): Promise { - await this.removeJob(JobType.RETRY, transaction, step) + // Pass retry interval to ensure we remove the correct job (with -retry suffix if interval > 0) + const interval = step.definition.retryInterval || 0 + await this.removeJob(JobType.RETRY, transaction, step, interval) } async scheduleTransactionTimeout( @@ -665,12 +668,19 @@ export class RedisDistributedTransactionStorage private getJobId( type: JobType, transaction: DistributedTransactionType, - step?: TransactionStep + step?: TransactionStep, + interval?: number ) { const key = [type, transaction.modelId, transaction.transactionId] if (step) { key.push(step.id, step.attempts + "") + + // Add suffix for retry scheduling (interval > 0) to avoid collision with async execution (interval = 0) + if (type === JobType.RETRY && isDefined(interval) && interval > 0) { + key.push("retry") + } + if (step.isCompensating()) { key.push("compensate") } @@ -682,9 +692,10 @@ export class RedisDistributedTransactionStorage private async removeJob( type: JobType, transaction: DistributedTransactionType, - step?: TransactionStep + step?: TransactionStep, + interval?: number ) { - const jobId = this.getJobId(type, transaction, step) + const jobId = this.getJobId(type, transaction, step, interval) if (type === JobType.SCHEDULE) { const job = await this.jobQueue?.getJob(jobId)