From 76aa4a48b3cbec519ab16c96cc87ee3288de28de Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Thu, 2 Oct 2025 16:11:38 +0200 Subject: [PATCH] fix(): workflows concurrency (#13645) --- .changeset/fair-mirrors-enjoy.md | 10 +++ .../http/__tests__/cart/store/cart.spec.ts | 90 +++++++++++++++++-- .../src/cart/workflows/complete-cart.ts | 7 +- .../transaction/transaction-orchestrator.ts | 23 +++-- .../src/__tests__/workflow/global-workflow.ts | 4 + .../src/__tests__/workflow/local-workflow.ts | 4 + .../transaction/transaction-orchestrator.ts | 6 +- .../src/utils/composer/transform.ts | 22 +++-- .../api/store/carts/[id]/complete/route.ts | 17 +++- .../utils/workflow-orchestrator-storage.ts | 26 +++--- .../integration-tests/__tests__/index.spec.ts | 20 ++--- .../utils/workflow-orchestrator-storage.ts | 55 ++++++------ 12 files changed, 197 insertions(+), 87 deletions(-) create mode 100644 .changeset/fair-mirrors-enjoy.md diff --git a/.changeset/fair-mirrors-enjoy.md b/.changeset/fair-mirrors-enjoy.md new file mode 100644 index 0000000000..ed533aabc3 --- /dev/null +++ b/.changeset/fair-mirrors-enjoy.md @@ -0,0 +1,10 @@ +--- +"@medusajs/medusa": patch +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/core-flows": patch +"@medusajs/orchestration": patch +"@medusajs/workflows-sdk": patch +--- + +Fix/workflows concurrency diff --git a/integration-tests/http/__tests__/cart/store/cart.spec.ts b/integration-tests/http/__tests__/cart/store/cart.spec.ts index 6ff7470ff8..f99fa57863 100644 --- a/integration-tests/http/__tests__/cart/store/cart.spec.ts +++ b/integration-tests/http/__tests__/cart/store/cart.spec.ts @@ -9,10 +9,15 @@ import { PromotionStatus, PromotionType, } from "@medusajs/utils" -import { createAdminUser, generatePublishableKey, generateStoreHeaders, } from "../../../../helpers/create-admin-user" +import { + createAdminUser, + generatePublishableKey, + generateStoreHeaders, +} from "../../../../helpers/create-admin-user" import { setupTaxStructure } from "../../../../modules/__tests__/fixtures" import { createAuthenticatedCustomer } from "../../../../modules/helpers/create-authenticated-customer" import { medusaTshirtProduct } from "../../../__fixtures__/product" +import { setTimeout } from "timers/promises" jest.setTimeout(100000) @@ -150,10 +155,9 @@ medusaIntegrationTestRunner({ describe("GET /store/carts/[id]", () => { it("should return 404 when trying to fetch a cart that does not exist", async () => { - const response = await api.get( - `/store/carts/fake`, - storeHeadersWithCustomer - ).catch((e) => e) + const response = await api + .get(`/store/carts/fake`, storeHeadersWithCustomer) + .catch((e) => e) expect(response.response.status).toEqual(404) }) @@ -1868,6 +1872,80 @@ medusaIntegrationTestRunner({ ) }) + it("should successfully complete cart and fail on concurrent complete", async () => { + const paymentCollection = ( + await api.post( + `/store/payment-collections`, + { cart_id: cart.id }, + storeHeaders + ) + ).data.payment_collection + + await api.post( + `/store/payment-collections/${paymentCollection.id}/payment-sessions`, + { provider_id: "pp_system_default" }, + storeHeaders + ) + + await createCartCreditLinesWorkflow.run({ + input: [ + { + cart_id: cart.id, + amount: 100, + currency_code: "usd", + reference: "test", + reference_id: "test", + }, + ], + container: appContainer, + }) + + // Concurrently complete the cart + let completedCart: any[] = [] + for (let i = 0; i < 5; i++) { + completedCart.push( + api + .post(`/store/carts/${cart.id}/complete`, {}, storeHeaders) + .catch((e) => e) + ) + + await setTimeout(25) + } + + let all = await Promise.all(completedCart) + + let success = all.filter((res) => res.status === 200) + let failure = all.filter((res) => res.status !== 200) + + const successData = success[0].data.order + for (const res of success) { + expect(res.data.order).toEqual(successData) + } + + expect(failure.length).toBeGreaterThan(0) + + expect(successData).toEqual( + expect.objectContaining({ + id: expect.any(String), + currency_code: "usd", + credit_lines: [ + expect.objectContaining({ + amount: 100, + reference: "test", + reference_id: "test", + }), + ], + items: expect.arrayContaining([ + expect.objectContaining({ + unit_price: 1500, + compare_at_unit_price: null, + quantity: 1, + }), + ]), + }) + ) + }) + it("should successfully complete cart", async () => { const paymentCollection = ( await api.post( @@ -1883,7 +1961,7 @@ medusaIntegrationTestRunner({ storeHeaders ) - createCartCreditLinesWorkflow.run({ + await createCartCreditLinesWorkflow.run({ input: [ { cart_id: cart.id, diff --git a/packages/core/core-flows/src/cart/workflows/complete-cart.ts b/packages/core/core-flows/src/cart/workflows/complete-cart.ts index 983b0eb7f7..2a8cf5bbb4 100644 --- a/packages/core/core-flows/src/cart/workflows/complete-cart.ts +++ b/packages/core/core-flows/src/cart/workflows/complete-cart.ts @@ -109,10 +109,13 @@ export const completeCartWorkflow = createWorkflow( entity: "order_cart", fields: ["cart_id", "order_id"], filters: { cart_id: input.id }, + options: { + isList: false, + }, }) const orderId = transform({ orderCart }, ({ orderCart }) => { - return orderCart.data[0]?.order_id + return orderCart?.data?.order_id }) const cart = useRemoteQueryStep({ @@ -263,7 +266,7 @@ export const completeCartWorkflow = createWorkflow( const createdOrders = createOrdersStep([cartToOrder]) const createdOrder = transform({ createdOrders }, ({ createdOrders }) => { - return createdOrders?.[0] ?? undefined + return createdOrders[0] }) const reservationItemsData = transform( diff --git a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts index 547c93ad23..9ee168cb99 100644 --- a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts @@ -207,7 +207,6 @@ describe("Transaction Orchestrator", () => { }, { action: "three", - async: true, maxRetries: 0, next: { action: "five", @@ -228,24 +227,14 @@ describe("Transaction Orchestrator", () => { await strategy.resume(transaction) - expect(transaction.getErrors()).toHaveLength(2) + expect(transaction.getErrors()).toHaveLength(1) expect(transaction.getErrors()).toEqual([ { action: "three", - error: { + error: expect.objectContaining({ message: "Step 3 failed", name: "Error", stack: expect.any(String), - }, - handlerType: "invoke", - }, - { - action: "three", - error: expect.objectContaining({ - message: expect.stringContaining( - "Converting circular structure to JSON" - ), - stack: expect.any(String), }), handlerType: "invoke", }, @@ -1052,6 +1041,8 @@ describe("Transaction Orchestrator", () => { await strategy.resume(transaction) + await new Promise((resolve) => process.nextTick(resolve)) + expect(mocks.one).toHaveBeenCalledTimes(1) expect(mocks.two).toHaveBeenCalledTimes(0) expect(transaction.getState()).toBe(TransactionState.INVOKING) @@ -1148,6 +1139,8 @@ describe("Transaction Orchestrator", () => { await strategy.resume(transaction) + await new Promise((resolve) => process.nextTick(resolve)) + expect(mocks.one).toHaveBeenCalledTimes(1) expect(mocks.compensateOne).toHaveBeenCalledTimes(0) expect(mocks.two).toHaveBeenCalledTimes(0) @@ -1171,6 +1164,8 @@ describe("Transaction Orchestrator", () => { transaction, }) + await new Promise((resolve) => process.nextTick(resolve)) + expect(resumedTransaction.getState()).toBe(TransactionState.COMPENSATING) expect(mocks.compensateOne).toHaveBeenCalledTimes(1) @@ -1263,6 +1258,7 @@ describe("Transaction Orchestrator", () => { }) await strategy.resume(transaction) + await new Promise((resolve) => process.nextTick(resolve)) expect(mocks.one).toHaveBeenCalledTimes(1) expect(mocks.compensateOne).toHaveBeenCalledTimes(1) @@ -1335,6 +1331,7 @@ describe("Transaction Orchestrator", () => { }) await strategy.resume(transaction) + await new Promise((resolve) => process.nextTick(resolve)) expect(transaction.getState()).toBe(TransactionState.DONE) expect(mocks.one).toHaveBeenCalledTimes(1) diff --git a/packages/core/orchestration/src/__tests__/workflow/global-workflow.ts b/packages/core/orchestration/src/__tests__/workflow/global-workflow.ts index 78f666cf22..b35340bc94 100644 --- a/packages/core/orchestration/src/__tests__/workflow/global-workflow.ts +++ b/packages/core/orchestration/src/__tests__/workflow/global-workflow.ts @@ -116,6 +116,8 @@ describe("WorkflowManager", () => { it("should continue an asyncronous transaction after reporting a successful step", async () => { const transaction = await flow.run("deliver-product", "t-id") + await new Promise((resolve) => process.nextTick(resolve)) + expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1) expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0) @@ -135,6 +137,8 @@ describe("WorkflowManager", () => { it("should revert an asyncronous transaction after reporting a failure step", async () => { const transaction = await flow.run("deliver-product", "t-id") + await new Promise((resolve) => process.nextTick(resolve)) + expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1) expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0) diff --git a/packages/core/orchestration/src/__tests__/workflow/local-workflow.ts b/packages/core/orchestration/src/__tests__/workflow/local-workflow.ts index a3c59c41f8..ee976e6f12 100644 --- a/packages/core/orchestration/src/__tests__/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/__tests__/workflow/local-workflow.ts @@ -158,6 +158,8 @@ describe("WorkflowManager", () => { const flow = new LocalWorkflow("deliver-product", container) const transaction = await flow.run("t-id") + await new Promise((resolve) => process.nextTick(resolve)) + expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1) expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0) @@ -177,6 +179,8 @@ describe("WorkflowManager", () => { const flow = new LocalWorkflow("deliver-product", container) const transaction = await flow.run("t-id") + await new Promise((resolve) => process.nextTick(resolve)) + expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1) expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0) diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 8ddfc2eeb5..f049a92d35 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -951,8 +951,10 @@ export class TransactionOrchestrator extends EventEmitter { this.executeSyncStep(promise, transaction, step, nextSteps) ) } else { - // Execute async step in background and continue the execution of the transaction - this.executeAsyncStep(promise, transaction, step, nextSteps) + // Execute async step in background as part of the next event loop cycle and continue the execution of the transaction + process.nextTick(() => + this.executeAsyncStep(promise, transaction, step, nextSteps) + ) hasAsyncSteps = true } } diff --git a/packages/core/workflows-sdk/src/utils/composer/transform.ts b/packages/core/workflows-sdk/src/utils/composer/transform.ts index f304277306..e79c40c0a2 100644 --- a/packages/core/workflows-sdk/src/utils/composer/transform.ts +++ b/packages/core/workflows-sdk/src/utils/composer/transform.ts @@ -167,7 +167,10 @@ export function transform( const ret = { __id: uniqId, __type: OrchestrationUtils.SymbolWorkflowStepTransformer, - __temporary_storage_key: null as { key: string } | null, + } as WorkflowData & { + __id: string + __type: string + __temporary_storage_key: { key: string } | null } const returnFn = async function ( @@ -176,6 +179,7 @@ export function transform( ): Promise { if ("transaction" in transactionContext) { const temporaryDataKey = `${transactionContext.transaction.modelId}_${transactionContext.transaction.transactionId}_${uniqId}` + ret.__temporary_storage_key ??= { key: temporaryDataKey } if ( @@ -199,15 +203,14 @@ export function transform( const fn = functions[i] const arg = i === 0 ? stepValue : finalResult - finalResult = await fn.apply(fn, [arg, transactionContext]) + finalResult = fn.apply(fn, [arg, transactionContext]) + if (finalResult instanceof Promise) { + finalResult = await finalResult + } } if ("transaction" in transactionContext) { const temporaryDataKey = ret.__temporary_storage_key! - if (!temporaryDataKey) { - return finalResult - } - transactionContext.transaction.setTemporaryData( temporaryDataKey, finalResult @@ -217,10 +220,11 @@ export function transform( return finalResult } - const proxyfiedRet = proxify( - ret as unknown as WorkflowData - ) + const proxyfiedRet = proxify< + WorkflowData & { __resolver: any; __temporary_storage_key: string | null } + >(ret as unknown as WorkflowData) proxyfiedRet.__resolver = returnFn as any + proxyfiedRet.__temporary_storage_key = null as string | null return proxyfiedRet } diff --git a/packages/medusa/src/api/store/carts/[id]/complete/route.ts b/packages/medusa/src/api/store/carts/[id]/complete/route.ts index 70bbe97977..8f80320ece 100644 --- a/packages/medusa/src/api/store/carts/[id]/complete/route.ts +++ b/packages/medusa/src/api/store/carts/[id]/complete/route.ts @@ -1,10 +1,11 @@ -import { completeCartWorkflow } from "@medusajs/core-flows" +import { completeCartWorkflowId } from "@medusajs/core-flows" import { prepareRetrieveQuery } from "@medusajs/framework" import { MedusaRequest, MedusaResponse } from "@medusajs/framework/http" import { HttpTypes } from "@medusajs/framework/types" import { ContainerRegistrationKeys, MedusaError, + Modules, } from "@medusajs/framework/utils" import { refetchCart } from "../../helpers" import { defaultStoreCartFields } from "../../query-config" @@ -14,13 +15,21 @@ export const POST = async ( res: MedusaResponse ) => { const cart_id = req.params.id + const we = req.scope.resolve(Modules.WORKFLOW_ENGINE) - const { errors, result } = await completeCartWorkflow(req.scope).run({ + const { errors, result, transaction } = await we.run(completeCartWorkflowId, { input: { id: cart_id }, - context: { transactionId: cart_id }, + transactionId: cart_id, throwOnError: false, }) + if (!transaction.hasFinished()) { + throw new MedusaError( + MedusaError.Types.CONFLICT, + "Cart is already being completed by another request" + ) + } + const query = req.scope.resolve(ContainerRegistrationKeys.QUERY) // When an error occurs on the workflow, its potentially got to with cart validations, payments @@ -47,7 +56,7 @@ export const POST = async ( ).remoteQueryConfig.fields ) - if (!statusOKErrors.includes(error.type)) { + if (!statusOKErrors.includes(error?.type)) { throw error } diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 5f13c88d7a..c0c6718f8a 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -1,3 +1,4 @@ +import { raw } from "@medusajs/framework/mikro-orm/core" import { DistributedTransactionType, IDistributedSchedulerStorage, @@ -24,7 +25,6 @@ import { TransactionStepState, isPresent, } from "@medusajs/framework/utils" -import { raw } from "@medusajs/framework/mikro-orm/core" import { WorkflowOrchestratorService } from "@services" import { type CronExpression, parseExpression } from "cron-parser" import { WorkflowExecution } from "../models/workflow-execution" @@ -162,12 +162,15 @@ export class InMemoryDistributedTransactionStorage } private createManagedTimer( - callback: () => void, + callback: () => void | Promise, delay: number ): NodeJS.Timeout { - const timer = setTimeout(() => { + const timer = setTimeout(async () => { this.pendingTimers.delete(timer) - callback() + const res = callback() + if (res instanceof Promise) { + await res + } }, delay) this.pendingTimers.add(timer) @@ -341,13 +344,11 @@ export class InMemoryDistributedTransactionStorage const { retentionTime } = options ?? {} - if (data.flow.hasAsyncSteps) { - await this.#preventRaceConditionExecutionIfNecessary({ - data, - key, - options, - }) - } + await this.#preventRaceConditionExecutionIfNecessary({ + data, + key, + options, + }) // Only store retention time if it's provided if (retentionTime) { @@ -363,8 +364,7 @@ export class InMemoryDistributedTransactionStorage if (isNotStarted && isManualTransactionId) { const storedData = this.storage.get(key) if (storedData) { - throw new MedusaError( - MedusaError.Types.INVALID_ARGUMENT, + throw new SkipExecutionError( "Transaction already started for transactionId: " + data.flow.transactionId ) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index da3b0b46b5..b14e1ce213 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -1,3 +1,4 @@ +import { asValue } from "@medusajs/framework/awilix" import { DistributedTransactionType, TransactionState, @@ -27,7 +28,6 @@ import { WorkflowResponse, } from "@medusajs/framework/workflows-sdk" import { moduleIntegrationTestRunner } from "@medusajs/test-utils" -import { asValue } from "@medusajs/framework/awilix" import { setTimeout as setTimeoutSync } from "timers" import { setTimeout } from "timers/promises" import { ulid } from "ulid" @@ -37,28 +37,26 @@ import { workflowNotIdempotentWithRetentionStep2Invoke, workflowNotIdempotentWithRetentionStep3Invoke, } from "../__fixtures__" -import { createScheduled } from "../__fixtures__/workflow_scheduled" import { - step1InvokeMock as step1InvokeMockAutoRetries, - step2InvokeMock as step2InvokeMockAutoRetries, step1CompensateMock as step1CompensateMockAutoRetries, + step1InvokeMock as step1InvokeMockAutoRetries, step2CompensateMock as step2CompensateMockAutoRetries, + step2InvokeMock as step2InvokeMockAutoRetries, } from "../__fixtures__/workflow_1_auto_retries" import { - step1InvokeMock as step1InvokeMockAutoRetriesFalse, - step2InvokeMock as step2InvokeMockAutoRetriesFalse, step1CompensateMock as step1CompensateMockAutoRetriesFalse, + step1InvokeMock as step1InvokeMockAutoRetriesFalse, step2CompensateMock as step2CompensateMockAutoRetriesFalse, + step2InvokeMock as step2InvokeMockAutoRetriesFalse, } from "../__fixtures__/workflow_1_auto_retries_false" +import { createScheduled } from "../__fixtures__/workflow_scheduled" +import { Redis } from "ioredis" import { step1InvokeMock as step1InvokeMockManualRetry, step2InvokeMock as step2InvokeMockManualRetry, - step1CompensateMock as step1CompensateMockManualRetry, - step2CompensateMock as step2CompensateMockManualRetry, } from "../__fixtures__/workflow_1_manual_retry_step" import { TestDatabase } from "../utils" -import { Redis } from "ioredis" jest.setTimeout(300000) @@ -523,7 +521,7 @@ moduleIntegrationTestRunner({ expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) - await setTimeout(3000) + await setTimeout(3000) await workflowOrcModule.run(workflowId, { input: {}, @@ -533,7 +531,7 @@ moduleIntegrationTestRunner({ await setTimeout(2000) - await setTimeout(3000) + await setTimeout(3000) await workflowOrcModule.run(workflowId, { input: {}, diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index d2ede1aecd..950bb48c97 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -1,3 +1,4 @@ +import { raw } from "@medusajs/framework/mikro-orm/core" import { DistributedTransactionType, IDistributedSchedulerStorage, @@ -21,7 +22,6 @@ import { TransactionState, TransactionStepState, } from "@medusajs/framework/utils" -import { raw } from "@medusajs/framework/mikro-orm/core" import { WorkflowOrchestratorService } from "@services" import { Queue, RepeatOptions, Worker } from "bullmq" import Redis from "ioredis" @@ -425,13 +425,11 @@ export class RedisDistributedTransactionStorage const { retentionTime } = options ?? {} - if (data.flow.hasAsyncSteps) { - await this.#preventRaceConditionExecutionIfNecessary({ - data, - key, - options, - }) - } + await this.#preventRaceConditionExecutionIfNecessary({ + data, + key, + options, + }) if (hasFinished && retentionTime) { Object.assign(data, { @@ -471,34 +469,37 @@ export class RedisDistributedTransactionStorage pipeline.unlink(key) } - const pipelinePromise = pipeline.exec().then((result) => { - if (!shouldSetNX) { + const execPipeline = () => { + return pipeline.exec().then((result) => { + if (!shouldSetNX) { + return result + } + + const actionResult = result?.pop() + const isOk = !!actionResult?.pop() + if (!isOk) { + throw new SkipExecutionError( + "Transaction already started for transactionId: " + + data.flow.transactionId + ) + } + return result - } - - const actionResult = result?.pop() - const isOk = !!actionResult?.pop() - if (!isOk) { - throw new MedusaError( - MedusaError.Types.INVALID_ARGUMENT, - "Transaction already started for transactionId: " + - data.flow.transactionId - ) - } - - return result - }) + }) + } // Database operations if (hasFinished && !retentionTime) { // If the workflow is nested, we cant just remove it because it would break the compensation algorithm. Instead, it will get deleted when the top level parent is deleted. if (!data.flow.metadata?.parentStepIdempotencyKey) { - await promiseAll([pipelinePromise, this.deleteFromDb(data)]) + await promiseAll([execPipeline(), this.deleteFromDb(data)]) } else { - await promiseAll([pipelinePromise, this.saveToDb(data, retentionTime)]) + await this.saveToDb(data, retentionTime) + await execPipeline() } } else { - await promiseAll([pipelinePromise, this.saveToDb(data, retentionTime)]) + await this.saveToDb(data, retentionTime) + await execPipeline() } }