fix: prevent jobId collisions on workflow step retries (#13786)

## Summary

**What** — What changes are introduced in this PR?

This PR fixes a bug where async workflow steps with retry intervals would get stuck after the first retry attempt due to Bull queue jobId collisions preventing retry jobs from executing.

**Why** — Why are these changes relevant or necessary?  

Workflows using async steps with retry configurations (e.g., `retryInterval: 1`, `maxRetries: 5`) would fail once, schedule a retry, but the retry job would never execute, causing workflows to hang indefinitely.

**How** — How have these changes been implemented?

**Root Cause:** Bull queue was rejecting retry jobs because they had identical jobIds to the async execution jobs that already completed. Both used the format: `retry:workflow:transaction:step_id:attempts`.

**Solution:** Modified `getJobId()` in `workflow-orchestrator-storage.ts` to append a `:retry` suffix when `interval > 0`, creating unique jobIds:
- Async execution (interval=0): `retry:...:step_id:1`
- Retry scheduling (interval>0): `retry:...:step_id:1:retry`

Updated methods: `getJobId()`, `scheduleRetry()`, `removeJob()`, and `clearRetry()` to pass and handle the interval parameter.

**Testing** — How have these changes been tested, or how can the reviewer test the feature?

Added integration test `retry-interval.spec.ts` that verifies:
1. Step with `retryInterval: 1` and `maxRetries: 3` executes 3 times
2. Retry intervals are approximately 1 second between attempts
3. Workflow completes successfully after retries
4. Uses proper async workflow completion pattern with `subscribe()` and `onFinish` event

---

## Examples

```ts
// Example workflow step that would previously get stuck
export const testRetryStep = createStep(
  {
    name: "test-retry-step",
    async: true,
    retryInterval: 1, // 1 second retry interval
    maxRetries: 3,
  },
  async (input: any) => {
    // Simulate failure on first 2 attempts
    if (attempts < 3) {
      throw new Error("Temporary failure - will retry")
    }
    return { success: true }
  }
)

// Before fix: Step would fail once, schedule retry, but retry job never fired (jobId collision)
// After fix: Step properly retries up to 3 times with 1-second intervals
```

---

## Checklist

Please ensure the following before requesting a review:

- [ ] I have added a **changeset** for this PR
    - Every non-breaking change should be marked as a **patch**
    - To add a changeset, run `yarn changeset` and follow the prompts
- [ ] The changes are covered by relevant **tests**
- [ ] I have verified the code works as intended locally
- [ ] I have linked the related issue(s) if applicable

---

## Additional Context
-

Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
This commit is contained in:
Sebastian Rindom
2025-10-21 20:27:21 +02:00
committed by GitHub
parent 5df903f5fb
commit bad0858348
7 changed files with 209 additions and 7 deletions

View File

@@ -7,3 +7,4 @@ export * from "./workflow_sync"
export * from "./workflow_transaction_timeout"
export * from "./workflow_when"
export * from "./workflow_not_idempotent_with_retention"
export * from "./workflow_retry_interval"

View File

@@ -0,0 +1,76 @@
/**
* Test fixture for workflow with retry interval
* Tests that steps with retry intervals properly retry after failures
*/
import {
createStep,
createWorkflow,
StepResponse,
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"
// Mock counters to track execution attempts
export const retryIntervalStep1InvokeMock = jest.fn()
export const retryIntervalStep2InvokeMock = jest.fn()
// Step 1: Fails first 2 times, succeeds on 3rd attempt
const step_1_retry_interval = createStep(
{
name: "step_1_retry_interval",
async: true,
retryInterval: 1, // 1 second retry interval
maxRetries: 3,
},
async (input: { attemptToSucceedOn: number }) => {
const attemptCount = retryIntervalStep1InvokeMock.mock.calls.length + 1
retryIntervalStep1InvokeMock(input)
// Fail until we reach the target attempt
if (attemptCount < input.attemptToSucceedOn) {
throw new Error(
`Step 1 failed on attempt ${attemptCount}, will retry`
)
}
return new StepResponse({
success: true,
attempts: attemptCount,
step: "step_1",
})
}
)
// Step 2: Always succeeds (to verify workflow continues after retry)
const step_2_after_retry = createStep(
{
name: "step_2_after_retry",
async: true,
},
async (input) => {
retryIntervalStep2InvokeMock(input)
return new StepResponse({
success: true,
step: "step_2",
})
}
)
export const workflowRetryIntervalId = "workflow_retry_interval_test"
createWorkflow(
{
name: workflowRetryIntervalId,
retentionTime: 600, // Keep for 10 minutes for debugging
},
function (input: { attemptToSucceedOn: number }) {
const step1Result = step_1_retry_interval(input)
const step2Result = step_2_after_retry({ step1Result })
return new WorkflowResponse({
step1: step1Result,
step2: step2Result,
})
}
)

View File

@@ -0,0 +1,108 @@
/**
* Integration test for workflow step retry intervals
*
* This test verifies the fix for the bug where steps with retry intervals
* would get stuck after the first retry attempt due to retryRescheduledAt
* not being properly cleared.
*/
import { IWorkflowEngineService } from "@medusajs/framework/types"
import { Modules } from "@medusajs/framework/utils"
import { moduleIntegrationTestRunner } from "@medusajs/test-utils"
import { setTimeout } from "timers/promises"
import {
retryIntervalStep1InvokeMock,
retryIntervalStep2InvokeMock,
workflowRetryIntervalId,
} from "../__fixtures__/workflow_retry_interval"
import { TestDatabase } from "../utils"
jest.setTimeout(60000) // Increase timeout for async retries
moduleIntegrationTestRunner<IWorkflowEngineService>({
moduleName: Modules.WORKFLOW_ENGINE,
resolve: __dirname + "/../..",
moduleOptions: {
redis: {
url: "localhost:6379",
},
},
testSuite: ({ service: workflowOrcModule }) => {
describe("Workflow Retry Interval", function () {
beforeEach(async () => {
await TestDatabase.clearTables()
jest.clearAllMocks()
})
afterEach(async () => {
await TestDatabase.clearTables()
})
it("should properly retry async step with retry interval after failures", async () => {
// Configure step to succeed on 3rd attempt
const attemptToSucceedOn = 3
// Track when each attempt happens
const attemptTimestamps: number[] = []
retryIntervalStep1InvokeMock.mockImplementation(() => {
attemptTimestamps.push(Date.now())
})
// Create promise to wait for workflow completion
const workflowCompletion = new Promise<{ result: any; errors: any }>((resolve) => {
workflowOrcModule.subscribe({
workflowId: workflowRetryIntervalId,
subscriber: async (data) => {
if (data.eventType === "onFinish") {
resolve({
result: data.result,
errors: data.errors,
})
}
},
})
})
// Execute workflow
await workflowOrcModule.run(
workflowRetryIntervalId,
{
input: { attemptToSucceedOn },
throwOnError: false,
}
)
// Wait for async workflow to complete
const { result, errors } = await workflowCompletion
// Assertions
// Step 1 should have been called 3 times (2 failures + 1 success)
expect(retryIntervalStep1InvokeMock).toHaveBeenCalledTimes(3)
// Step 2 should have been called once (after step 1 succeeded)
expect(retryIntervalStep2InvokeMock).toHaveBeenCalledTimes(1)
// Workflow should complete successfully
expect(errors === undefined || errors.length === 0).toBe(true)
expect(result).toBeDefined()
expect(result.step1).toBeDefined()
expect(result.step1.success).toBe(true)
expect(result.step1.attempts).toBe(3)
// Verify retry intervals are approximately 1 second (with some tolerance)
if (attemptTimestamps.length >= 2) {
const firstRetryInterval = attemptTimestamps[1] - attemptTimestamps[0]
expect(firstRetryInterval).toBeGreaterThan(800) // At least 800ms
expect(firstRetryInterval).toBeLessThan(2000) // Less than 2s
}
if (attemptTimestamps.length >= 3) {
const secondRetryInterval =
attemptTimestamps[2] - attemptTimestamps[1]
expect(secondRetryInterval).toBeGreaterThan(800)
expect(secondRetryInterval).toBeLessThan(2000)
}
})
})
},
})

View File

@@ -29,7 +29,7 @@
"resolve:aliases": "tsc --showConfig -p tsconfig.json > tsconfig.resolved.json && tsc-alias -p tsconfig.resolved.json && rimraf tsconfig.resolved.json",
"build": "rimraf dist && tsc --build && npm run resolve:aliases",
"test": "jest --passWithNoTests --bail --forceExit -- src",
"test:integration": "jest --forceExit --runInBand -- integration-tests/**/__tests__/index.spec.ts && jest --forceExit --runInBand -- integration-tests/**/__tests__/race.spec.ts",
"test:integration": "jest --forceExit -- integration-tests/**/__tests__/index.spec.ts && jest --forceExit -- integration-tests/**/__tests__/race.spec.ts && jest --forceExit -- integration-tests/**/__tests__/retry-interval.spec.ts",
"migration:initial": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:create --initial",
"migration:create": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:create",
"migration:up": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:up",

View File

@@ -16,6 +16,7 @@ import {
} from "@medusajs/framework/orchestration"
import { Logger, ModulesSdkTypes } from "@medusajs/framework/types"
import {
isDefined,
isPresent,
MedusaError,
promiseAll,
@@ -594,7 +595,7 @@ export class RedisDistributedTransactionStorage
},
{
delay: interval > 0 ? interval * 1000 : undefined,
jobId: this.getJobId(JobType.RETRY, transaction, step),
jobId: this.getJobId(JobType.RETRY, transaction, step, interval),
removeOnComplete: true,
}
)
@@ -604,7 +605,9 @@ export class RedisDistributedTransactionStorage
transaction: DistributedTransactionType,
step: TransactionStep
): Promise<void> {
await this.removeJob(JobType.RETRY, transaction, step)
// Pass retry interval to ensure we remove the correct job (with -retry suffix if interval > 0)
const interval = step.definition.retryInterval || 0
await this.removeJob(JobType.RETRY, transaction, step, interval)
}
async scheduleTransactionTimeout(
@@ -665,12 +668,19 @@ export class RedisDistributedTransactionStorage
private getJobId(
type: JobType,
transaction: DistributedTransactionType,
step?: TransactionStep
step?: TransactionStep,
interval?: number
) {
const key = [type, transaction.modelId, transaction.transactionId]
if (step) {
key.push(step.id, step.attempts + "")
// Add suffix for retry scheduling (interval > 0) to avoid collision with async execution (interval = 0)
if (type === JobType.RETRY && isDefined(interval) && interval > 0) {
key.push("retry")
}
if (step.isCompensating()) {
key.push("compensate")
}
@@ -682,9 +692,10 @@ export class RedisDistributedTransactionStorage
private async removeJob(
type: JobType,
transaction: DistributedTransactionType,
step?: TransactionStep
step?: TransactionStep,
interval?: number
) {
const jobId = this.getJobId(type, transaction, step)
const jobId = this.getJobId(type, transaction, step, interval)
if (type === JobType.SCHEDULE) {
const job = await this.jobQueue?.getJob(jobId)