diff --git a/.changeset/orange-forks-exercise.md b/.changeset/orange-forks-exercise.md new file mode 100644 index 0000000000..c0f90b34ac --- /dev/null +++ b/.changeset/orange-forks-exercise.md @@ -0,0 +1,6 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +--- + +fix(): Workflow save to db + index integration instability diff --git a/integration-tests/helpers/wait-for-index.ts b/integration-tests/helpers/wait-for-index.ts new file mode 100644 index 0000000000..3caf24c008 --- /dev/null +++ b/integration-tests/helpers/wait-for-index.ts @@ -0,0 +1,54 @@ +/** + * Helper functions to wait for index synchronization in a deterministic way + * by directly checking the index_data table instead of using arbitrary timeouts. + */ + +export interface WaitForIndexOptions { + timeout?: number + pollInterval?: number +} + +/** + * Wait for specific entities to be indexed by checking the index_data table directly. + * This is more reliable than using arbitrary timeouts. + */ +export async function waitForIndexedEntities( + dbConnection: any, + entityName: string, + entityIds: string[], + options: WaitForIndexOptions = {} +): Promise { + const { timeout = 120000, pollInterval = 100 } = options + const startTime = Date.now() + + while (Date.now() - startTime < timeout) { + try { + // Query the index_data table to check if all entities are indexed + const result = await dbConnection.raw( + `SELECT id FROM index_data WHERE name = ? AND id = ANY(?) AND staled_at IS NULL`, + [entityName, entityIds] + ) + + const indexedIds = result.rows + ? result.rows.map((row: any) => row.id) + : result.map((row: any) => row.id) + + // Check if all expected entities are indexed + const allIndexed = entityIds.every((id) => indexedIds.includes(id)) + + if (allIndexed) { + return + } + } catch (error) { + // Continue polling on database errors + } + + await new Promise((resolve) => setTimeout(resolve, pollInterval)) + } + + throw new Error( + `Entities [${entityIds.join( + ", " + )}] of type '${entityName}' were not indexed within ${timeout}ms` + ) +} diff --git a/integration-tests/modules/__tests__/index/query-index.spec.ts b/integration-tests/modules/__tests__/index/query-index.spec.ts index 967e776698..107944f291 100644 --- a/integration-tests/modules/__tests__/index/query-index.spec.ts +++ b/integration-tests/modules/__tests__/index/query-index.spec.ts @@ -7,15 +7,16 @@ import { defaultCurrencies, defineLink, Modules, + promiseAll, } from "@medusajs/utils" -import { setTimeout } from "timers/promises" import { adminHeaders, createAdminUser, } from "../../../helpers/create-admin-user" import { fetchAndRetry } from "../../../helpers/retry" +import { waitForIndexedEntities } from "../../../helpers/wait-for-index" -jest.setTimeout(120000) +jest.setTimeout(300000) // NOTE: In this tests, both API are used to query, we use object pattern and string pattern @@ -91,8 +92,6 @@ async function populateData(api: any) { ) const products = response.data.created - await setTimeout(5000) - return products } @@ -144,12 +143,31 @@ medusaIntegrationTestRunner({ }, }) - await setTimeout(1000) - const query = appContainer.resolve( ContainerRegistrationKeys.QUERY ) as RemoteQueryFunction + await promiseAll([ + waitForIndexedEntities( + dbConnection, + "Product", + products.map((p) => p.id) + ), + waitForIndexedEntities( + dbConnection, + "ProductVariant", + products.flatMap((p) => p.variants.map((v) => v.id)) + ), + waitForIndexedEntities( + dbConnection, + "Price", + products.flatMap((p) => + p.variants.flatMap((v) => v.prices.map((p) => p.id)) + ) + ), + waitForIndexedEntities(dbConnection, "Brand", [brand.id]), + ]) + const resultset = await fetchAndRetry( async () => await query.index({ @@ -338,12 +356,32 @@ medusaIntegrationTestRunner({ }) it("should use query.index to query the index module sorting by price desc", async () => { - await populateData(api) + const products = await populateData(api) const query = appContainer.resolve( ContainerRegistrationKeys.QUERY ) as RemoteQueryFunction + await promiseAll([ + waitForIndexedEntities( + dbConnection, + "Product", + products.map((p) => p.id) + ), + waitForIndexedEntities( + dbConnection, + "ProductVariant", + products.flatMap((p) => p.variants.map((v) => v.id)) + ), + waitForIndexedEntities( + dbConnection, + "Price", + products.flatMap((p) => + p.variants.flatMap((v) => v.prices.map((p) => p.id)) + ) + ), + ]) + const resultset = await fetchAndRetry( async () => await query.index({ @@ -446,7 +484,27 @@ medusaIntegrationTestRunner({ }) it("should use query.index to get products by an array of handles", async () => { - await populateData(api) + const products = await populateData(api) + + await promiseAll([ + waitForIndexedEntities( + dbConnection, + "Product", + products.map((p) => p.id) + ), + waitForIndexedEntities( + dbConnection, + "ProductVariant", + products.flatMap((p) => p.variants.map((v) => v.id)) + ), + waitForIndexedEntities( + dbConnection, + "Price", + products.flatMap((p) => + p.variants.flatMap((v) => v.prices.map((p) => p.id)) + ) + ), + ]) const query = appContainer.resolve( ContainerRegistrationKeys.QUERY @@ -475,7 +533,27 @@ medusaIntegrationTestRunner({ }) it("should query by custom linkable field and default field using query.index", async () => { - await populateData(api) + const products = await populateData(api) + + await promiseAll([ + waitForIndexedEntities( + dbConnection, + "Product", + products.map((p) => p.id) + ), + waitForIndexedEntities( + dbConnection, + "ProductVariant", + products.flatMap((p) => p.variants.map((v) => v.id)) + ), + waitForIndexedEntities( + dbConnection, + "Price", + products.flatMap((p) => + p.variants.flatMap((v) => v.prices.map((p) => p.id)) + ) + ), + ]) const query = appContainer.resolve( ContainerRegistrationKeys.QUERY @@ -515,12 +593,31 @@ medusaIntegrationTestRunner({ }, }) - await setTimeout(1000) - const query = appContainer.resolve( ContainerRegistrationKeys.QUERY ) as RemoteQueryFunction + await promiseAll([ + waitForIndexedEntities( + dbConnection, + "Product", + products.map((p) => p.id) + ), + waitForIndexedEntities( + dbConnection, + "ProductVariant", + products.flatMap((p) => p.variants.map((v) => v.id)) + ), + waitForIndexedEntities( + dbConnection, + "Price", + products.flatMap((p) => + p.variants.flatMap((v) => v.prices.map((p) => p.id)) + ) + ), + waitForIndexedEntities(dbConnection, "Brand", [brand.id]), + ]) + const resultset = await fetchAndRetry( async () => await query.index({ diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index c0c6718f8a..627cbdfa2f 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -184,6 +184,8 @@ export class InMemoryDistributedTransactionStorage TransactionState.FAILED, TransactionState.REVERTED, ].includes(data.flow.state) + const isWaitingToCompensate = + data.flow.state === TransactionState.WAITING_TO_COMPENSATE /** * Bit of explanation: @@ -236,7 +238,10 @@ export class InMemoryDistributedTransactionStorage ) : false - if (!(isNotStarted || isFinished) && !currentStepsIsAsync) { + if ( + !(isNotStarted || isFinished || isWaitingToCompensate) && + !currentStepsIsAsync + ) { return } diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index b14e1ce213..ea8b9a4506 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -51,7 +51,6 @@ import { } from "../__fixtures__/workflow_1_auto_retries_false" import { createScheduled } from "../__fixtures__/workflow_scheduled" -import { Redis } from "ioredis" import { step1InvokeMock as step1InvokeMockManualRetry, step2InvokeMock as step2InvokeMockManualRetry, @@ -110,32 +109,19 @@ moduleIntegrationTestRunner({ testSuite: ({ service: workflowOrcModule, medusaApp }) => { describe("Workflow Orchestrator module", function () { beforeEach(async () => { - await TestDatabase.clearTables() jest.clearAllMocks() + + query = medusaApp.query + sharedContainer_ = medusaApp.sharedContainer }) - afterAll(async () => { - // empty redis - const connection = new Redis("localhost:6379", { - lazyConnect: true, - }) - - await new Promise(async (resolve) => { - await connection.connect(resolve) - }) - - await connection.flushall() - await connection.disconnect() + afterEach(async () => { + await TestDatabase.clearTables() }) let query: RemoteQueryFunction let sharedContainer_: MedusaContainer - beforeEach(() => { - query = medusaApp.query - sharedContainer_ = medusaApp.sharedContainer - }) - it(`should export the appropriate linkable configuration`, () => { const linkable = Module(Modules.WORKFLOW_ENGINE, { service: WorkflowsModuleService, @@ -483,62 +469,66 @@ moduleIntegrationTestRunner({ ) }) - it("should not retry steps X times automatically when maxRetries is set and autoRetry is false", (done) => { - ;(async () => { - const transactionId = "transaction-auto-retries" + ulid() - const workflowId = "workflow_1_auto_retries_false" + it("should not retry steps X times automatically when maxRetries is set and autoRetry is false", async () => { + const transactionId = "transaction-auto-retries" + ulid() + const workflowId = "workflow_1_auto_retries_false" - await workflowOrcModule.run(workflowId, { - input: {}, - transactionId, - throwOnError: false, - }) + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) + const onFinishPromise = new Promise((resolve, reject) => { workflowOrcModule.subscribe({ workflowId, transactionId, subscriber: async (event) => { if (event.eventType === "onFinish") { - expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes( - 1 - ) - expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes( - 3 - ) - expect( - step1CompensateMockAutoRetriesFalse - ).toHaveBeenCalledTimes(1) - expect( - step2CompensateMockAutoRetriesFalse - ).toHaveBeenCalledTimes(1) - done() + try { + expect( + step1InvokeMockAutoRetriesFalse + ).toHaveBeenCalledTimes(1) + expect( + step2InvokeMockAutoRetriesFalse + ).toHaveBeenCalledTimes(3) + expect( + step1CompensateMockAutoRetriesFalse + ).toHaveBeenCalledTimes(1) + expect( + step2CompensateMockAutoRetriesFalse + ).toHaveBeenCalledTimes(1) + resolve() + } catch (error) { + reject(error) + } } }, }) + }) - expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) - expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) - expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) - expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) - await setTimeout(3000) + await setTimeout(3000) - await workflowOrcModule.run(workflowId, { - input: {}, - transactionId, - throwOnError: false, - }) + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) - await setTimeout(2000) + await setTimeout(3000) - await setTimeout(3000) + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) - await workflowOrcModule.run(workflowId, { - input: {}, - transactionId, - throwOnError: false, - }) - })() + await onFinishPromise }) it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => { diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts index d84158f021..49dcfc592c 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts @@ -12,6 +12,7 @@ import { setTimeout as setTimeoutSync } from "timers" import { setTimeout } from "timers/promises" import { ulid } from "ulid" import "../__fixtures__" +import { TestDatabase } from "../utils" jest.setTimeout(300000) @@ -37,6 +38,10 @@ moduleIntegrationTestRunner({ }, testSuite: ({ service: workflowOrcModule, medusaApp }) => { describe("Testing race condition of the workflow during retry", () => { + afterEach(async () => { + await TestDatabase.clearTables() + }) + it("should prevent race continuation of the workflow during retryIntervalAwaiting in background execution", (done) => { const transactionId = "transaction_id" + ulid() const workflowId = "workflow-1" + ulid() diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 19ca2df899..3ed403cd45 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -278,6 +278,8 @@ export class RedisDistributedTransactionStorage TransactionState.FAILED, TransactionState.REVERTED, ].includes(data.flow.state) + const isWaitingToCompensate = + data.flow.state === TransactionState.WAITING_TO_COMPENSATE /** * Bit of explanation: @@ -330,7 +332,10 @@ export class RedisDistributedTransactionStorage ) : false - if (!(isNotStarted || isFinished) && !currentStepsIsAsync) { + if ( + !(isNotStarted || isFinished || isWaitingToCompensate) && + !currentStepsIsAsync + ) { return }