fix(): Identify step that should require a save on checkpoint (#14037)

* fix(): Identify step that force save checkpoint

* Create sour-peas-provide.md
This commit is contained in:
Adrien de Peretti
2025-11-17 12:47:57 +01:00
committed by GitHub
parent 4e1c474dfa
commit 1ea932a56f
12 changed files with 571 additions and 39 deletions

View File

@@ -0,0 +1,7 @@
---
"@medusajs/workflow-engine-inmemory": patch
"@medusajs/workflow-engine-redis": patch
"@medusajs/orchestration": patch
---
fix(): Identify step that force save checkpoint

View File

@@ -1504,16 +1504,13 @@ export class TransactionOrchestrator extends EventEmitter {
const hasTransactionTimeout = !!this.options.timeout
const isIdempotent = !!this.options.idempotent
if (hasAsyncSteps) {
this.options.store = true
}
if (
hasStepTimeouts ||
hasRetriesTimeout ||
hasTransactionTimeout ||
isIdempotent ||
this.options.retentionTime
this.options.retentionTime ||
hasAsyncSteps
) {
this.options.store = true
}
@@ -1628,6 +1625,12 @@ export class TransactionOrchestrator extends EventEmitter {
const definitionCopy = { ...obj } as TransactionStepsDefinition
delete definitionCopy.next
const isAsync = !!definitionCopy.async
const hasRetryInterval = !!(
definitionCopy.retryInterval || definitionCopy.retryIntervalAwaiting
)
const hasTimeout = !!definitionCopy.timeout
if (definitionCopy.async) {
features.hasAsyncSteps = true
}
@@ -1647,6 +1650,20 @@ export class TransactionOrchestrator extends EventEmitter {
features.hasNestedTransactions = true
}
/**
* Force the checkpoint to save even for sync step when they have specific configurations.
*/
definitionCopy.store = !!(
definitionCopy.store ||
isAsync ||
hasRetryInterval ||
hasTimeout
)
if (existingSteps?.[id]) {
existingSteps[id].definition.store = definitionCopy.store
}
states[id] = Object.assign(
new TransactionStep(),
existingSteps?.[id] || {

View File

@@ -115,6 +115,12 @@ export type TransactionStepsDefinition = {
*/
next?: TransactionStepsDefinition | TransactionStepsDefinition[]
/**
* @private
* Whether we need to store checkpoint at this step.
*/
store?: boolean
// TODO: add metadata field for customizations
}

View File

@@ -0,0 +1,74 @@
/**
* Test fixture for workflow with retry interval
* Tests that steps with retry intervals properly retry after failures
*/
import {
createStep,
createWorkflow,
StepResponse,
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"
// Mock counters to track execution attempts
export const retryIntervalStep1InvokeMock = jest.fn()
export const retryIntervalStep2InvokeMock = jest.fn()
// Step 1: Fails first 2 times, succeeds on 3rd attempt
const step_1_retry_interval = createStep(
{
name: "step_1_retry_interval",
async: true,
retryInterval: 1, // 1 second retry interval
maxRetries: 3,
},
async (input: { attemptToSucceedOn: number }) => {
const attemptCount = retryIntervalStep1InvokeMock.mock.calls.length + 1
retryIntervalStep1InvokeMock(input)
// Fail until we reach the target attempt
if (attemptCount < input.attemptToSucceedOn) {
throw new Error(`Step 1 failed on attempt ${attemptCount}, will retry`)
}
return new StepResponse({
success: true,
attempts: attemptCount,
step: "step_1",
})
}
)
// Step 2: Always succeeds (to verify workflow continues after retry)
const step_2_after_retry = createStep(
{
name: "step_2_after_retry",
async: true,
},
async (input: any) => {
retryIntervalStep2InvokeMock(input)
return new StepResponse({
success: true,
step: "step_2",
})
}
)
export const workflowRetryIntervalId = "workflow_retry_interval_test"
createWorkflow(
{
name: workflowRetryIntervalId,
retentionTime: 600, // Keep for 10 minutes for debugging
},
function (input: { attemptToSucceedOn: number }) {
const step1Result = step_1_retry_interval(input)
const step2Result = step_2_after_retry({ step1Result })
return new WorkflowResponse({
step1: step1Result,
step2: step2Result,
})
}
)

View File

@@ -0,0 +1,84 @@
/**
* Test fixture for workflow with retry interval
* Tests that steps with retry intervals properly retry after failures
*/
import {
createStep,
createWorkflow,
StepResponse,
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"
// Mock counters to track execution attempts
export const retryIntervalStep1InvokeMock = jest.fn()
export const retryIntervalStep2InvokeMock = jest.fn()
export const retryIntervalStep0InvokeMock = jest.fn()
const step_0_retry_interval = createStep(
{
name: "step_0_sync_retry_interval",
},
async (input: any) => {
retryIntervalStep0InvokeMock(input)
return new StepResponse(input)
}
)
// Step 1: Fails first 2 times, succeeds on 3rd attempt
const step_1_retry_interval = createStep(
{
name: "step_1_sync_retry_interval",
retryInterval: 1, // 1 second retry interval
maxRetries: 3,
},
async (input: { attemptToSucceedOn: number }) => {
const attemptCount = retryIntervalStep1InvokeMock.mock.calls.length + 1
retryIntervalStep1InvokeMock(input)
// Fail until we reach the target attempt
if (attemptCount < input.attemptToSucceedOn) {
throw new Error(`Step 1 failed on attempt ${attemptCount}, will retry`)
}
return new StepResponse({
success: true,
attempts: attemptCount,
step: "step_1",
})
}
)
// Step 2: Always succeeds (to verify workflow continues after retry)
const step_2_after_retry = createStep(
{
name: "step_2_sync_after_retry",
},
async (input: any) => {
retryIntervalStep2InvokeMock(input)
return new StepResponse({
success: true,
step: "step_2",
})
}
)
export const workflowRetryIntervalId = "workflow_sync_retry_interval_test"
createWorkflow(
{
name: workflowRetryIntervalId,
retentionTime: 600, // Keep for 10 minutes for debugging
},
function (input: { attemptToSucceedOn: number }) {
const step0Result = step_0_retry_interval(input)
const step1Result = step_1_retry_interval(step0Result)
const step2Result = step_2_after_retry({ step1Result })
return new WorkflowResponse({
step1: step1Result,
step2: step2Result,
})
}
)

View File

@@ -0,0 +1,180 @@
/**
* Integration test for workflow step retry intervals
*
* This test verifies the fix for the bug where steps with retry intervals
* would get stuck after the first retry attempt due to retryRescheduledAt
* not being properly cleared.
*/
import { IWorkflowEngineService } from "@medusajs/framework/types"
import { Modules } from "@medusajs/framework/utils"
import { moduleIntegrationTestRunner } from "@medusajs/test-utils"
import {
retryIntervalStep1InvokeMock,
retryIntervalStep2InvokeMock,
workflowRetryIntervalId,
} from "../__fixtures__/workflow_retry_interval"
import {
retryIntervalStep0InvokeMock as retryIntervalStep0InvokeMockSync,
retryIntervalStep1InvokeMock as retryIntervalStep1InvokeMockSync,
retryIntervalStep2InvokeMock as retryIntervalStep2InvokeMockSync,
workflowRetryIntervalId as workflowRetryIntervalIdSync,
} from "../__fixtures__/workflow_sync_retry_interval"
jest.setTimeout(60000) // Increase timeout for async retries
moduleIntegrationTestRunner<IWorkflowEngineService>({
moduleName: Modules.WORKFLOW_ENGINE,
resolve: __dirname + "/../..",
moduleOptions: {
redis: {
url: "localhost:6379",
},
},
testSuite: ({ service: workflowOrcModule }) => {
describe("Workflow Retry Interval", function () {
it("should properly retry sync step with retry interval after failures", async () => {
// Configure step to succeed on 3rd attempt
const attemptToSucceedOn = 3
// Track when each attempt happens
const attemptTimestamps: number[] = []
retryIntervalStep1InvokeMockSync.mockImplementation(() => {
attemptTimestamps.push(Date.now())
})
// Create promise to wait for workflow completion
const workflowCompletion = new Promise<{ result: any; errors: any }>(
(resolve) => {
workflowOrcModule.subscribe({
workflowId: workflowRetryIntervalIdSync,
subscriber: async (data) => {
if (data.eventType === "onFinish") {
resolve({
result: data.result,
errors: data.errors,
})
}
},
})
}
)
// Execute workflow
await workflowOrcModule.run(workflowRetryIntervalIdSync, {
input: { attemptToSucceedOn },
throwOnError: false,
})
// Wait for async workflow to complete
const { result, errors } = await workflowCompletion
// Assertions
// Step 0 should have been called once
expect(retryIntervalStep0InvokeMockSync).toHaveBeenCalledTimes(1)
// Step 1 should have been called 3 times (2 failures + 1 success)
expect(retryIntervalStep1InvokeMockSync).toHaveBeenCalledTimes(3)
expect(retryIntervalStep1InvokeMockSync).toHaveBeenNthCalledWith(1, {
attemptToSucceedOn,
})
expect(retryIntervalStep1InvokeMockSync).toHaveBeenNthCalledWith(2, {
attemptToSucceedOn,
})
expect(retryIntervalStep1InvokeMockSync).toHaveBeenNthCalledWith(3, {
attemptToSucceedOn,
})
// Step 2 should have been called once (after step 1 succeeded)
expect(retryIntervalStep2InvokeMockSync).toHaveBeenCalledTimes(1)
// Workflow should complete successfully
expect(errors === undefined || errors.length === 0).toBe(true)
expect(result).toBeDefined()
expect(result.step1).toBeDefined()
expect(result.step1.success).toBe(true)
expect(result.step1.attempts).toBe(3)
// Verify retry intervals are approximately 1 second (with some tolerance)
if (attemptTimestamps.length >= 2) {
const firstRetryInterval = attemptTimestamps[1] - attemptTimestamps[0]
expect(firstRetryInterval).toBeGreaterThan(800) // At least 800ms
expect(firstRetryInterval).toBeLessThan(2000) // Less than 2s
}
if (attemptTimestamps.length >= 3) {
const secondRetryInterval =
attemptTimestamps[2] - attemptTimestamps[1]
expect(secondRetryInterval).toBeGreaterThan(800)
expect(secondRetryInterval).toBeLessThan(2000)
}
})
it("should properly retry async step with retry interval after failures", async () => {
// Configure step to succeed on 3rd attempt
const attemptToSucceedOn = 3
// Track when each attempt happens
const attemptTimestamps: number[] = []
retryIntervalStep1InvokeMock.mockImplementation(() => {
attemptTimestamps.push(Date.now())
})
// Create promise to wait for workflow completion
const workflowCompletion = new Promise<{ result: any; errors: any }>(
(resolve) => {
workflowOrcModule.subscribe({
workflowId: workflowRetryIntervalId,
subscriber: async (data) => {
if (data.eventType === "onFinish") {
resolve({
result: data.result,
errors: data.errors,
})
}
},
})
}
)
// Execute workflow
await workflowOrcModule.run(workflowRetryIntervalId, {
input: { attemptToSucceedOn },
throwOnError: false,
})
// Wait for async workflow to complete
const { result, errors } = await workflowCompletion
// Assertions
// Step 1 should have been called 3 times (2 failures + 1 success)
expect(retryIntervalStep1InvokeMock).toHaveBeenCalledTimes(3)
// Step 2 should have been called once (after step 1 succeeded)
expect(retryIntervalStep2InvokeMock).toHaveBeenCalledTimes(1)
// Workflow should complete successfully
expect(errors === undefined || errors.length === 0).toBe(true)
expect(result).toBeDefined()
expect(result.step1).toBeDefined()
expect(result.step1.success).toBe(true)
expect(result.step1.attempts).toBe(3)
// Verify retry intervals are approximately 1 second (with some tolerance)
if (attemptTimestamps.length >= 2) {
const firstRetryInterval = attemptTimestamps[1] - attemptTimestamps[0]
expect(firstRetryInterval).toBeGreaterThan(800) // At least 800ms
expect(firstRetryInterval).toBeLessThan(2000) // Less than 2s
}
if (attemptTimestamps.length >= 3) {
const secondRetryInterval =
attemptTimestamps[2] - attemptTimestamps[1]
expect(secondRetryInterval).toBeGreaterThan(800)
expect(secondRetryInterval).toBeLessThan(2000)
}
})
})
},
})

View File

@@ -29,7 +29,7 @@
"resolve:aliases": "yarn run -T tsc --showConfig -p tsconfig.json > tsconfig.resolved.json && yarn run -T tsc-alias -p tsconfig.resolved.json && yarn run -T rimraf tsconfig.resolved.json",
"build": "yarn run -T rimraf dist && yarn run -T tsc --build && npm run resolve:aliases",
"test": "../../../node_modules/.bin/jest --passWithNoTests --bail --forceExit --testPathPattern=src",
"test:integration": "../../../node_modules/.bin/jest --passWithNoTests --forceExit --testPathPattern=\"integration-tests/__tests__/index\\.spec\\.ts\" && ../../../node_modules/.bin/jest --passWithNoTests --forceExit --testPathPattern=\"integration-tests/__tests__/race\\.spec\\.ts\" && ../../../node_modules/.bin/jest --passWithNoTests --forceExit --testPathPattern=\"integration-tests/__tests__/subscribe\\.spec\\.ts\"",
"test:integration": "../../../node_modules/.bin/jest --passWithNoTests --forceExit --testPathPattern=\"integration-tests/__tests__/index\\.spec\\.ts\" && ../../../node_modules/.bin/jest --passWithNoTests --forceExit --testPathPattern=\"integration-tests/__tests__/race\\.spec\\.ts\" && ../../../node_modules/.bin/jest --passWithNoTests --forceExit --testPathPattern=\"integration-tests/__tests__/subscribe\\.spec\\.ts\" && ../../../node_modules/.bin/jest --passWithNoTests --forceExit --testPathPattern=\"integration-tests/__tests__/retry-interval\\.spec\\.ts\"",
"migration:initial": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:create --initial",
"migration:create": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:create",
"migration:up": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:up",

View File

@@ -190,7 +190,6 @@ export class InMemoryDistributedTransactionStorage
const stepsArray = Object.values(data.flow.steps) as TransactionStep[]
let currentStep!: TransactionStep
let currentStepsIsAsync = false
const targetStates = isFlowInvoking
? new Set([
@@ -215,6 +214,7 @@ export class InMemoryDistributedTransactionStorage
}
}
let shouldStoreCurrentSteps = false
if (currentStep) {
for (const step of stepsArray) {
if (step.id === "_root") {
@@ -223,9 +223,9 @@ export class InMemoryDistributedTransactionStorage
if (
step.depth === currentStep.depth &&
step?.definition?.async === true
step?.definition?.store === true
) {
currentStepsIsAsync = true
shouldStoreCurrentSteps = true
break
}
}
@@ -233,7 +233,7 @@ export class InMemoryDistributedTransactionStorage
if (
!(isNotStarted || isFinished || isWaitingToCompensate) &&
!currentStepsIsAsync &&
!shouldStoreCurrentSteps &&
!asyncVersion
) {
return

View File

@@ -28,9 +28,7 @@ const step_1_retry_interval = createStep(
// Fail until we reach the target attempt
if (attemptCount < input.attemptToSucceedOn) {
throw new Error(
`Step 1 failed on attempt ${attemptCount}, will retry`
)
throw new Error(`Step 1 failed on attempt ${attemptCount}, will retry`)
}
return new StepResponse({
@@ -47,7 +45,7 @@ const step_2_after_retry = createStep(
name: "step_2_after_retry",
async: true,
},
async (input) => {
async (input: any) => {
retryIntervalStep2InvokeMock(input)
return new StepResponse({

View File

@@ -0,0 +1,84 @@
/**
* Test fixture for workflow with retry interval
* Tests that steps with retry intervals properly retry after failures
*/
import {
createStep,
createWorkflow,
StepResponse,
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"
// Mock counters to track execution attempts
export const retryIntervalStep1InvokeMock = jest.fn()
export const retryIntervalStep2InvokeMock = jest.fn()
export const retryIntervalStep0InvokeMock = jest.fn()
const step_0_retry_interval = createStep(
{
name: "step_0_sync_retry_interval",
},
async (input: any) => {
retryIntervalStep0InvokeMock(input)
return new StepResponse(input)
}
)
// Step 1: Fails first 2 times, succeeds on 3rd attempt
const step_1_retry_interval = createStep(
{
name: "step_1_sync_retry_interval",
retryInterval: 1, // 1 second retry interval
maxRetries: 3,
},
async (input: { attemptToSucceedOn: number }) => {
const attemptCount = retryIntervalStep1InvokeMock.mock.calls.length + 1
retryIntervalStep1InvokeMock(input)
// Fail until we reach the target attempt
if (attemptCount < input.attemptToSucceedOn) {
throw new Error(`Step 1 failed on attempt ${attemptCount}, will retry`)
}
return new StepResponse({
success: true,
attempts: attemptCount,
step: "step_1",
})
}
)
// Step 2: Always succeeds (to verify workflow continues after retry)
const step_2_after_retry = createStep(
{
name: "step_2_sync_after_retry",
},
async (input: any) => {
retryIntervalStep2InvokeMock(input)
return new StepResponse({
success: true,
step: "step_2",
})
}
)
export const workflowRetryIntervalId = "workflow_sync_retry_interval_test"
createWorkflow(
{
name: workflowRetryIntervalId,
retentionTime: 600, // Keep for 10 minutes for debugging
},
function (input: { attemptToSucceedOn: number }) {
const step0Result = step_0_retry_interval(input)
const step1Result = step_1_retry_interval(step0Result)
const step2Result = step_2_after_retry({ step1Result })
return new WorkflowResponse({
step1: step1Result,
step2: step2Result,
})
}
)

View File

@@ -9,12 +9,17 @@
import { IWorkflowEngineService } from "@medusajs/framework/types"
import { Modules } from "@medusajs/framework/utils"
import { moduleIntegrationTestRunner } from "@medusajs/test-utils"
import { setTimeout } from "timers/promises"
import {
retryIntervalStep1InvokeMock,
retryIntervalStep2InvokeMock,
workflowRetryIntervalId,
} from "../__fixtures__/workflow_retry_interval"
import {
retryIntervalStep0InvokeMock as retryIntervalStep0InvokeMockSync,
retryIntervalStep1InvokeMock as retryIntervalStep1InvokeMockSync,
retryIntervalStep2InvokeMock as retryIntervalStep2InvokeMockSync,
workflowRetryIntervalId as workflowRetryIntervalIdSync,
} from "../__fixtures__/workflow_sync_retry_interval"
import { TestDatabase } from "../utils"
jest.setTimeout(60000) // Increase timeout for async retries
@@ -38,6 +43,84 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
await TestDatabase.clearTables()
})
it("should properly retry sync step with retry interval after failures", async () => {
// Configure step to succeed on 3rd attempt
const attemptToSucceedOn = 3
// Track when each attempt happens
const attemptTimestamps: number[] = []
retryIntervalStep1InvokeMockSync.mockImplementation(() => {
attemptTimestamps.push(Date.now())
})
// Create promise to wait for workflow completion
const workflowCompletion = new Promise<{ result: any; errors: any }>(
(resolve) => {
workflowOrcModule.subscribe({
workflowId: workflowRetryIntervalIdSync,
subscriber: async (data) => {
if (data.eventType === "onFinish") {
resolve({
result: data.result,
errors: data.errors,
})
}
},
})
}
)
// Execute workflow
await workflowOrcModule.run(workflowRetryIntervalIdSync, {
input: { attemptToSucceedOn },
throwOnError: false,
})
// Wait for async workflow to complete
const { result, errors } = await workflowCompletion
// Assertions
// Step 0 should have been called once
expect(retryIntervalStep0InvokeMockSync).toHaveBeenCalledTimes(1)
// Step 1 should have been called 3 times (2 failures + 1 success)
expect(retryIntervalStep1InvokeMockSync).toHaveBeenCalledTimes(3)
expect(retryIntervalStep1InvokeMockSync).toHaveBeenNthCalledWith(1, {
attemptToSucceedOn,
})
expect(retryIntervalStep1InvokeMockSync).toHaveBeenNthCalledWith(2, {
attemptToSucceedOn,
})
expect(retryIntervalStep1InvokeMockSync).toHaveBeenNthCalledWith(3, {
attemptToSucceedOn,
})
// Step 2 should have been called once (after step 1 succeeded)
expect(retryIntervalStep2InvokeMockSync).toHaveBeenCalledTimes(1)
// Workflow should complete successfully
expect(errors === undefined || errors.length === 0).toBe(true)
expect(result).toBeDefined()
expect(result.step1).toBeDefined()
expect(result.step1.success).toBe(true)
expect(result.step1.attempts).toBe(3)
// Verify retry intervals are approximately 1 second (with some tolerance)
if (attemptTimestamps.length >= 2) {
const firstRetryInterval = attemptTimestamps[1] - attemptTimestamps[0]
expect(firstRetryInterval).toBeGreaterThan(800) // At least 800ms
expect(firstRetryInterval).toBeLessThan(2000) // Less than 2s
}
if (attemptTimestamps.length >= 3) {
const secondRetryInterval =
attemptTimestamps[2] - attemptTimestamps[1]
expect(secondRetryInterval).toBeGreaterThan(800)
expect(secondRetryInterval).toBeLessThan(2000)
}
})
it("should properly retry async step with retry interval after failures", async () => {
// Configure step to succeed on 3rd attempt
const attemptToSucceedOn = 3
@@ -49,29 +132,28 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
// Create promise to wait for workflow completion
const workflowCompletion = new Promise<{ result: any; errors: any }>((resolve) => {
workflowOrcModule.subscribe({
workflowId: workflowRetryIntervalId,
subscriber: async (data) => {
if (data.eventType === "onFinish") {
resolve({
result: data.result,
errors: data.errors,
})
}
},
})
})
// Execute workflow
await workflowOrcModule.run(
workflowRetryIntervalId,
{
input: { attemptToSucceedOn },
throwOnError: false,
const workflowCompletion = new Promise<{ result: any; errors: any }>(
(resolve) => {
workflowOrcModule.subscribe({
workflowId: workflowRetryIntervalId,
subscriber: async (data) => {
if (data.eventType === "onFinish") {
resolve({
result: data.result,
errors: data.errors,
})
}
},
})
}
)
// Execute workflow
await workflowOrcModule.run(workflowRetryIntervalId, {
input: { attemptToSucceedOn },
throwOnError: false,
})
// Wait for async workflow to complete
const { result, errors } = await workflowCompletion

View File

@@ -291,7 +291,6 @@ export class RedisDistributedTransactionStorage
const stepsArray = Object.values(data.flow.steps) as TransactionStep[]
let currentStep!: TransactionStep
let currentStepsIsAsync = false
const targetStates = isFlowInvoking
? new Set([
@@ -316,6 +315,7 @@ export class RedisDistributedTransactionStorage
}
}
let shouldStoreCurrentSteps = false
if (currentStep) {
for (const step of stepsArray) {
if (step.id === "_root") {
@@ -324,9 +324,9 @@ export class RedisDistributedTransactionStorage
if (
step.depth === currentStep.depth &&
step?.definition?.async === true
step?.definition?.store === true
) {
currentStepsIsAsync = true
shouldStoreCurrentSteps = true
break
}
}
@@ -334,7 +334,7 @@ export class RedisDistributedTransactionStorage
if (
!(isNotStarted || isFinished || isWaitingToCompensate) &&
!currentStepsIsAsync &&
!shouldStoreCurrentSteps &&
!asyncVersion
) {
return