fix(): Workflow save to db + index integration instability (#13707)

**What**
- Fix index integration tests instability
- Fix workflow engine storage save to db
This commit is contained in:
Adrien de Peretti
2025-10-08 10:45:15 +02:00
committed by GitHub
parent c75a76636a
commit b43b285125
7 changed files with 234 additions and 72 deletions

View File

@@ -0,0 +1,6 @@
---
"@medusajs/workflow-engine-inmemory": patch
"@medusajs/workflow-engine-redis": patch
---
fix(): Workflow save to db + index integration instability

View File

@@ -0,0 +1,54 @@
/**
* Helper functions to wait for index synchronization in a deterministic way
* by directly checking the index_data table instead of using arbitrary timeouts.
*/
export interface WaitForIndexOptions {
timeout?: number
pollInterval?: number
}
/**
* Wait for specific entities to be indexed by checking the index_data table directly.
* This is more reliable than using arbitrary timeouts.
*/
export async function waitForIndexedEntities(
dbConnection: any,
entityName: string,
entityIds: string[],
options: WaitForIndexOptions = {}
): Promise<void> {
const { timeout = 120000, pollInterval = 100 } = options
const startTime = Date.now()
while (Date.now() - startTime < timeout) {
try {
// Query the index_data table to check if all entities are indexed
const result = await dbConnection.raw(
`SELECT id FROM index_data WHERE name = ? AND id = ANY(?) AND staled_at IS NULL`,
[entityName, entityIds]
)
const indexedIds = result.rows
? result.rows.map((row: any) => row.id)
: result.map((row: any) => row.id)
// Check if all expected entities are indexed
const allIndexed = entityIds.every((id) => indexedIds.includes(id))
if (allIndexed) {
return
}
} catch (error) {
// Continue polling on database errors
}
await new Promise((resolve) => setTimeout(resolve, pollInterval))
}
throw new Error(
`Entities [${entityIds.join(
", "
)}] of type '${entityName}' were not indexed within ${timeout}ms`
)
}

View File

@@ -7,15 +7,16 @@ import {
defaultCurrencies,
defineLink,
Modules,
promiseAll,
} from "@medusajs/utils"
import { setTimeout } from "timers/promises"
import {
adminHeaders,
createAdminUser,
} from "../../../helpers/create-admin-user"
import { fetchAndRetry } from "../../../helpers/retry"
import { waitForIndexedEntities } from "../../../helpers/wait-for-index"
jest.setTimeout(120000)
jest.setTimeout(300000)
// NOTE: In this tests, both API are used to query, we use object pattern and string pattern
@@ -91,8 +92,6 @@ async function populateData(api: any) {
)
const products = response.data.created
await setTimeout(5000)
return products
}
@@ -144,12 +143,31 @@ medusaIntegrationTestRunner({
},
})
await setTimeout(1000)
const query = appContainer.resolve(
ContainerRegistrationKeys.QUERY
) as RemoteQueryFunction
await promiseAll([
waitForIndexedEntities(
dbConnection,
"Product",
products.map((p) => p.id)
),
waitForIndexedEntities(
dbConnection,
"ProductVariant",
products.flatMap((p) => p.variants.map((v) => v.id))
),
waitForIndexedEntities(
dbConnection,
"Price",
products.flatMap((p) =>
p.variants.flatMap((v) => v.prices.map((p) => p.id))
)
),
waitForIndexedEntities(dbConnection, "Brand", [brand.id]),
])
const resultset = await fetchAndRetry(
async () =>
await query.index({
@@ -338,12 +356,32 @@ medusaIntegrationTestRunner({
})
it("should use query.index to query the index module sorting by price desc", async () => {
await populateData(api)
const products = await populateData(api)
const query = appContainer.resolve(
ContainerRegistrationKeys.QUERY
) as RemoteQueryFunction
await promiseAll([
waitForIndexedEntities(
dbConnection,
"Product",
products.map((p) => p.id)
),
waitForIndexedEntities(
dbConnection,
"ProductVariant",
products.flatMap((p) => p.variants.map((v) => v.id))
),
waitForIndexedEntities(
dbConnection,
"Price",
products.flatMap((p) =>
p.variants.flatMap((v) => v.prices.map((p) => p.id))
)
),
])
const resultset = await fetchAndRetry(
async () =>
await query.index({
@@ -446,7 +484,27 @@ medusaIntegrationTestRunner({
})
it("should use query.index to get products by an array of handles", async () => {
await populateData(api)
const products = await populateData(api)
await promiseAll([
waitForIndexedEntities(
dbConnection,
"Product",
products.map((p) => p.id)
),
waitForIndexedEntities(
dbConnection,
"ProductVariant",
products.flatMap((p) => p.variants.map((v) => v.id))
),
waitForIndexedEntities(
dbConnection,
"Price",
products.flatMap((p) =>
p.variants.flatMap((v) => v.prices.map((p) => p.id))
)
),
])
const query = appContainer.resolve(
ContainerRegistrationKeys.QUERY
@@ -475,7 +533,27 @@ medusaIntegrationTestRunner({
})
it("should query by custom linkable field and default field using query.index", async () => {
await populateData(api)
const products = await populateData(api)
await promiseAll([
waitForIndexedEntities(
dbConnection,
"Product",
products.map((p) => p.id)
),
waitForIndexedEntities(
dbConnection,
"ProductVariant",
products.flatMap((p) => p.variants.map((v) => v.id))
),
waitForIndexedEntities(
dbConnection,
"Price",
products.flatMap((p) =>
p.variants.flatMap((v) => v.prices.map((p) => p.id))
)
),
])
const query = appContainer.resolve(
ContainerRegistrationKeys.QUERY
@@ -515,12 +593,31 @@ medusaIntegrationTestRunner({
},
})
await setTimeout(1000)
const query = appContainer.resolve(
ContainerRegistrationKeys.QUERY
) as RemoteQueryFunction
await promiseAll([
waitForIndexedEntities(
dbConnection,
"Product",
products.map((p) => p.id)
),
waitForIndexedEntities(
dbConnection,
"ProductVariant",
products.flatMap((p) => p.variants.map((v) => v.id))
),
waitForIndexedEntities(
dbConnection,
"Price",
products.flatMap((p) =>
p.variants.flatMap((v) => v.prices.map((p) => p.id))
)
),
waitForIndexedEntities(dbConnection, "Brand", [brand.id]),
])
const resultset = await fetchAndRetry(
async () =>
await query.index({

View File

@@ -184,6 +184,8 @@ export class InMemoryDistributedTransactionStorage
TransactionState.FAILED,
TransactionState.REVERTED,
].includes(data.flow.state)
const isWaitingToCompensate =
data.flow.state === TransactionState.WAITING_TO_COMPENSATE
/**
* Bit of explanation:
@@ -236,7 +238,10 @@ export class InMemoryDistributedTransactionStorage
)
: false
if (!(isNotStarted || isFinished) && !currentStepsIsAsync) {
if (
!(isNotStarted || isFinished || isWaitingToCompensate) &&
!currentStepsIsAsync
) {
return
}

View File

@@ -51,7 +51,6 @@ import {
} from "../__fixtures__/workflow_1_auto_retries_false"
import { createScheduled } from "../__fixtures__/workflow_scheduled"
import { Redis } from "ioredis"
import {
step1InvokeMock as step1InvokeMockManualRetry,
step2InvokeMock as step2InvokeMockManualRetry,
@@ -110,32 +109,19 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
testSuite: ({ service: workflowOrcModule, medusaApp }) => {
describe("Workflow Orchestrator module", function () {
beforeEach(async () => {
await TestDatabase.clearTables()
jest.clearAllMocks()
query = medusaApp.query
sharedContainer_ = medusaApp.sharedContainer
})
afterAll(async () => {
// empty redis
const connection = new Redis("localhost:6379", {
lazyConnect: true,
})
await new Promise(async (resolve) => {
await connection.connect(resolve)
})
await connection.flushall()
await connection.disconnect()
afterEach(async () => {
await TestDatabase.clearTables()
})
let query: RemoteQueryFunction
let sharedContainer_: MedusaContainer
beforeEach(() => {
query = medusaApp.query
sharedContainer_ = medusaApp.sharedContainer
})
it(`should export the appropriate linkable configuration`, () => {
const linkable = Module(Modules.WORKFLOW_ENGINE, {
service: WorkflowsModuleService,
@@ -483,62 +469,66 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
)
})
it("should not retry steps X times automatically when maxRetries is set and autoRetry is false", (done) => {
;(async () => {
const transactionId = "transaction-auto-retries" + ulid()
const workflowId = "workflow_1_auto_retries_false"
it("should not retry steps X times automatically when maxRetries is set and autoRetry is false", async () => {
const transactionId = "transaction-auto-retries" + ulid()
const workflowId = "workflow_1_auto_retries_false"
await workflowOrcModule.run(workflowId, {
input: {},
transactionId,
throwOnError: false,
})
await workflowOrcModule.run(workflowId, {
input: {},
transactionId,
throwOnError: false,
})
const onFinishPromise = new Promise<void>((resolve, reject) => {
workflowOrcModule.subscribe({
workflowId,
transactionId,
subscriber: async (event) => {
if (event.eventType === "onFinish") {
expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(
1
)
expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(
3
)
expect(
step1CompensateMockAutoRetriesFalse
).toHaveBeenCalledTimes(1)
expect(
step2CompensateMockAutoRetriesFalse
).toHaveBeenCalledTimes(1)
done()
try {
expect(
step1InvokeMockAutoRetriesFalse
).toHaveBeenCalledTimes(1)
expect(
step2InvokeMockAutoRetriesFalse
).toHaveBeenCalledTimes(3)
expect(
step1CompensateMockAutoRetriesFalse
).toHaveBeenCalledTimes(1)
expect(
step2CompensateMockAutoRetriesFalse
).toHaveBeenCalledTimes(1)
resolve()
} catch (error) {
reject(error)
}
}
},
})
})
expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1)
expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1)
expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0)
expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0)
expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1)
expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1)
expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0)
expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0)
await setTimeout(3000)
await setTimeout(3000)
await workflowOrcModule.run(workflowId, {
input: {},
transactionId,
throwOnError: false,
})
await workflowOrcModule.run(workflowId, {
input: {},
transactionId,
throwOnError: false,
})
await setTimeout(2000)
await setTimeout(3000)
await setTimeout(3000)
await workflowOrcModule.run(workflowId, {
input: {},
transactionId,
throwOnError: false,
})
await workflowOrcModule.run(workflowId, {
input: {},
transactionId,
throwOnError: false,
})
})()
await onFinishPromise
})
it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => {

View File

@@ -12,6 +12,7 @@ import { setTimeout as setTimeoutSync } from "timers"
import { setTimeout } from "timers/promises"
import { ulid } from "ulid"
import "../__fixtures__"
import { TestDatabase } from "../utils"
jest.setTimeout(300000)
@@ -37,6 +38,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
},
testSuite: ({ service: workflowOrcModule, medusaApp }) => {
describe("Testing race condition of the workflow during retry", () => {
afterEach(async () => {
await TestDatabase.clearTables()
})
it("should prevent race continuation of the workflow during retryIntervalAwaiting in background execution", (done) => {
const transactionId = "transaction_id" + ulid()
const workflowId = "workflow-1" + ulid()

View File

@@ -278,6 +278,8 @@ export class RedisDistributedTransactionStorage
TransactionState.FAILED,
TransactionState.REVERTED,
].includes(data.flow.state)
const isWaitingToCompensate =
data.flow.state === TransactionState.WAITING_TO_COMPENSATE
/**
* Bit of explanation:
@@ -330,7 +332,10 @@ export class RedisDistributedTransactionStorage
)
: false
if (!(isNotStarted || isFinished) && !currentStepsIsAsync) {
if (
!(isNotStarted || isFinished || isWaitingToCompensate) &&
!currentStepsIsAsync
) {
return
}