fix(): workflows concurrency (#13645)

This commit is contained in:
Adrien de Peretti
2025-10-02 16:11:38 +02:00
committed by GitHub
parent ca334b7cc1
commit 76aa4a48b3
12 changed files with 197 additions and 87 deletions

View File

@@ -0,0 +1,10 @@
---
"@medusajs/medusa": patch
"@medusajs/workflow-engine-inmemory": patch
"@medusajs/workflow-engine-redis": patch
"@medusajs/core-flows": patch
"@medusajs/orchestration": patch
"@medusajs/workflows-sdk": patch
---
Fix/workflows concurrency

View File

@@ -9,10 +9,15 @@ import {
PromotionStatus,
PromotionType,
} from "@medusajs/utils"
import { createAdminUser, generatePublishableKey, generateStoreHeaders, } from "../../../../helpers/create-admin-user"
import {
createAdminUser,
generatePublishableKey,
generateStoreHeaders,
} from "../../../../helpers/create-admin-user"
import { setupTaxStructure } from "../../../../modules/__tests__/fixtures"
import { createAuthenticatedCustomer } from "../../../../modules/helpers/create-authenticated-customer"
import { medusaTshirtProduct } from "../../../__fixtures__/product"
import { setTimeout } from "timers/promises"
jest.setTimeout(100000)
@@ -150,10 +155,9 @@ medusaIntegrationTestRunner({
describe("GET /store/carts/[id]", () => {
it("should return 404 when trying to fetch a cart that does not exist", async () => {
const response = await api.get(
`/store/carts/fake`,
storeHeadersWithCustomer
).catch((e) => e)
const response = await api
.get(`/store/carts/fake`, storeHeadersWithCustomer)
.catch((e) => e)
expect(response.response.status).toEqual(404)
})
@@ -1868,6 +1872,80 @@ medusaIntegrationTestRunner({
)
})
it("should successfully complete cart and fail on concurrent complete", async () => {
const paymentCollection = (
await api.post(
`/store/payment-collections`,
{ cart_id: cart.id },
storeHeaders
)
).data.payment_collection
await api.post(
`/store/payment-collections/${paymentCollection.id}/payment-sessions`,
{ provider_id: "pp_system_default" },
storeHeaders
)
await createCartCreditLinesWorkflow.run({
input: [
{
cart_id: cart.id,
amount: 100,
currency_code: "usd",
reference: "test",
reference_id: "test",
},
],
container: appContainer,
})
// Concurrently complete the cart
let completedCart: any[] = []
for (let i = 0; i < 5; i++) {
completedCart.push(
api
.post(`/store/carts/${cart.id}/complete`, {}, storeHeaders)
.catch((e) => e)
)
await setTimeout(25)
}
let all = await Promise.all(completedCart)
let success = all.filter((res) => res.status === 200)
let failure = all.filter((res) => res.status !== 200)
const successData = success[0].data.order
for (const res of success) {
expect(res.data.order).toEqual(successData)
}
expect(failure.length).toBeGreaterThan(0)
expect(successData).toEqual(
expect.objectContaining({
id: expect.any(String),
currency_code: "usd",
credit_lines: [
expect.objectContaining({
amount: 100,
reference: "test",
reference_id: "test",
}),
],
items: expect.arrayContaining([
expect.objectContaining({
unit_price: 1500,
compare_at_unit_price: null,
quantity: 1,
}),
]),
})
)
})
it("should successfully complete cart", async () => {
const paymentCollection = (
await api.post(
@@ -1883,7 +1961,7 @@ medusaIntegrationTestRunner({
storeHeaders
)
createCartCreditLinesWorkflow.run({
await createCartCreditLinesWorkflow.run({
input: [
{
cart_id: cart.id,

View File

@@ -109,10 +109,13 @@ export const completeCartWorkflow = createWorkflow(
entity: "order_cart",
fields: ["cart_id", "order_id"],
filters: { cart_id: input.id },
options: {
isList: false,
},
})
const orderId = transform({ orderCart }, ({ orderCart }) => {
return orderCart.data[0]?.order_id
return orderCart?.data?.order_id
})
const cart = useRemoteQueryStep({
@@ -263,7 +266,7 @@ export const completeCartWorkflow = createWorkflow(
const createdOrders = createOrdersStep([cartToOrder])
const createdOrder = transform({ createdOrders }, ({ createdOrders }) => {
return createdOrders?.[0] ?? undefined
return createdOrders[0]
})
const reservationItemsData = transform(

View File

@@ -207,7 +207,6 @@ describe("Transaction Orchestrator", () => {
},
{
action: "three",
async: true,
maxRetries: 0,
next: {
action: "five",
@@ -228,24 +227,14 @@ describe("Transaction Orchestrator", () => {
await strategy.resume(transaction)
expect(transaction.getErrors()).toHaveLength(2)
expect(transaction.getErrors()).toHaveLength(1)
expect(transaction.getErrors()).toEqual([
{
action: "three",
error: {
error: expect.objectContaining({
message: "Step 3 failed",
name: "Error",
stack: expect.any(String),
},
handlerType: "invoke",
},
{
action: "three",
error: expect.objectContaining({
message: expect.stringContaining(
"Converting circular structure to JSON"
),
stack: expect.any(String),
}),
handlerType: "invoke",
},
@@ -1052,6 +1041,8 @@ describe("Transaction Orchestrator", () => {
await strategy.resume(transaction)
await new Promise((resolve) => process.nextTick(resolve))
expect(mocks.one).toHaveBeenCalledTimes(1)
expect(mocks.two).toHaveBeenCalledTimes(0)
expect(transaction.getState()).toBe(TransactionState.INVOKING)
@@ -1148,6 +1139,8 @@ describe("Transaction Orchestrator", () => {
await strategy.resume(transaction)
await new Promise((resolve) => process.nextTick(resolve))
expect(mocks.one).toHaveBeenCalledTimes(1)
expect(mocks.compensateOne).toHaveBeenCalledTimes(0)
expect(mocks.two).toHaveBeenCalledTimes(0)
@@ -1171,6 +1164,8 @@ describe("Transaction Orchestrator", () => {
transaction,
})
await new Promise((resolve) => process.nextTick(resolve))
expect(resumedTransaction.getState()).toBe(TransactionState.COMPENSATING)
expect(mocks.compensateOne).toHaveBeenCalledTimes(1)
@@ -1263,6 +1258,7 @@ describe("Transaction Orchestrator", () => {
})
await strategy.resume(transaction)
await new Promise((resolve) => process.nextTick(resolve))
expect(mocks.one).toHaveBeenCalledTimes(1)
expect(mocks.compensateOne).toHaveBeenCalledTimes(1)
@@ -1335,6 +1331,7 @@ describe("Transaction Orchestrator", () => {
})
await strategy.resume(transaction)
await new Promise((resolve) => process.nextTick(resolve))
expect(transaction.getState()).toBe(TransactionState.DONE)
expect(mocks.one).toHaveBeenCalledTimes(1)

View File

@@ -116,6 +116,8 @@ describe("WorkflowManager", () => {
it("should continue an asyncronous transaction after reporting a successful step", async () => {
const transaction = await flow.run("deliver-product", "t-id")
await new Promise((resolve) => process.nextTick(resolve))
expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0)
@@ -135,6 +137,8 @@ describe("WorkflowManager", () => {
it("should revert an asyncronous transaction after reporting a failure step", async () => {
const transaction = await flow.run("deliver-product", "t-id")
await new Promise((resolve) => process.nextTick(resolve))
expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0)

View File

@@ -158,6 +158,8 @@ describe("WorkflowManager", () => {
const flow = new LocalWorkflow("deliver-product", container)
const transaction = await flow.run("t-id")
await new Promise((resolve) => process.nextTick(resolve))
expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0)
@@ -177,6 +179,8 @@ describe("WorkflowManager", () => {
const flow = new LocalWorkflow("deliver-product", container)
const transaction = await flow.run("t-id")
await new Promise((resolve) => process.nextTick(resolve))
expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0)

View File

@@ -951,8 +951,10 @@ export class TransactionOrchestrator extends EventEmitter {
this.executeSyncStep(promise, transaction, step, nextSteps)
)
} else {
// Execute async step in background and continue the execution of the transaction
this.executeAsyncStep(promise, transaction, step, nextSteps)
// Execute async step in background as part of the next event loop cycle and continue the execution of the transaction
process.nextTick(() =>
this.executeAsyncStep(promise, transaction, step, nextSteps)
)
hasAsyncSteps = true
}
}

View File

@@ -167,7 +167,10 @@ export function transform(
const ret = {
__id: uniqId,
__type: OrchestrationUtils.SymbolWorkflowStepTransformer,
__temporary_storage_key: null as { key: string } | null,
} as WorkflowData & {
__id: string
__type: string
__temporary_storage_key: { key: string } | null
}
const returnFn = async function (
@@ -176,6 +179,7 @@ export function transform(
): Promise<any> {
if ("transaction" in transactionContext) {
const temporaryDataKey = `${transactionContext.transaction.modelId}_${transactionContext.transaction.transactionId}_${uniqId}`
ret.__temporary_storage_key ??= { key: temporaryDataKey }
if (
@@ -199,15 +203,14 @@ export function transform(
const fn = functions[i]
const arg = i === 0 ? stepValue : finalResult
finalResult = await fn.apply(fn, [arg, transactionContext])
finalResult = fn.apply(fn, [arg, transactionContext])
if (finalResult instanceof Promise) {
finalResult = await finalResult
}
}
if ("transaction" in transactionContext) {
const temporaryDataKey = ret.__temporary_storage_key!
if (!temporaryDataKey) {
return finalResult
}
transactionContext.transaction.setTemporaryData(
temporaryDataKey,
finalResult
@@ -217,10 +220,11 @@ export function transform(
return finalResult
}
const proxyfiedRet = proxify<WorkflowData & { __resolver: any }>(
ret as unknown as WorkflowData
)
const proxyfiedRet = proxify<
WorkflowData & { __resolver: any; __temporary_storage_key: string | null }
>(ret as unknown as WorkflowData)
proxyfiedRet.__resolver = returnFn as any
proxyfiedRet.__temporary_storage_key = null as string | null
return proxyfiedRet
}

View File

@@ -1,10 +1,11 @@
import { completeCartWorkflow } from "@medusajs/core-flows"
import { completeCartWorkflowId } from "@medusajs/core-flows"
import { prepareRetrieveQuery } from "@medusajs/framework"
import { MedusaRequest, MedusaResponse } from "@medusajs/framework/http"
import { HttpTypes } from "@medusajs/framework/types"
import {
ContainerRegistrationKeys,
MedusaError,
Modules,
} from "@medusajs/framework/utils"
import { refetchCart } from "../../helpers"
import { defaultStoreCartFields } from "../../query-config"
@@ -14,13 +15,21 @@ export const POST = async (
res: MedusaResponse<HttpTypes.StoreCompleteCartResponse>
) => {
const cart_id = req.params.id
const we = req.scope.resolve(Modules.WORKFLOW_ENGINE)
const { errors, result } = await completeCartWorkflow(req.scope).run({
const { errors, result, transaction } = await we.run(completeCartWorkflowId, {
input: { id: cart_id },
context: { transactionId: cart_id },
transactionId: cart_id,
throwOnError: false,
})
if (!transaction.hasFinished()) {
throw new MedusaError(
MedusaError.Types.CONFLICT,
"Cart is already being completed by another request"
)
}
const query = req.scope.resolve(ContainerRegistrationKeys.QUERY)
// When an error occurs on the workflow, its potentially got to with cart validations, payments
@@ -47,7 +56,7 @@ export const POST = async (
).remoteQueryConfig.fields
)
if (!statusOKErrors.includes(error.type)) {
if (!statusOKErrors.includes(error?.type)) {
throw error
}

View File

@@ -1,3 +1,4 @@
import { raw } from "@medusajs/framework/mikro-orm/core"
import {
DistributedTransactionType,
IDistributedSchedulerStorage,
@@ -24,7 +25,6 @@ import {
TransactionStepState,
isPresent,
} from "@medusajs/framework/utils"
import { raw } from "@medusajs/framework/mikro-orm/core"
import { WorkflowOrchestratorService } from "@services"
import { type CronExpression, parseExpression } from "cron-parser"
import { WorkflowExecution } from "../models/workflow-execution"
@@ -162,12 +162,15 @@ export class InMemoryDistributedTransactionStorage
}
private createManagedTimer(
callback: () => void,
callback: () => void | Promise<void>,
delay: number
): NodeJS.Timeout {
const timer = setTimeout(() => {
const timer = setTimeout(async () => {
this.pendingTimers.delete(timer)
callback()
const res = callback()
if (res instanceof Promise) {
await res
}
}, delay)
this.pendingTimers.add(timer)
@@ -341,13 +344,11 @@ export class InMemoryDistributedTransactionStorage
const { retentionTime } = options ?? {}
if (data.flow.hasAsyncSteps) {
await this.#preventRaceConditionExecutionIfNecessary({
data,
key,
options,
})
}
await this.#preventRaceConditionExecutionIfNecessary({
data,
key,
options,
})
// Only store retention time if it's provided
if (retentionTime) {
@@ -363,8 +364,7 @@ export class InMemoryDistributedTransactionStorage
if (isNotStarted && isManualTransactionId) {
const storedData = this.storage.get(key)
if (storedData) {
throw new MedusaError(
MedusaError.Types.INVALID_ARGUMENT,
throw new SkipExecutionError(
"Transaction already started for transactionId: " +
data.flow.transactionId
)

View File

@@ -1,3 +1,4 @@
import { asValue } from "@medusajs/framework/awilix"
import {
DistributedTransactionType,
TransactionState,
@@ -27,7 +28,6 @@ import {
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"
import { moduleIntegrationTestRunner } from "@medusajs/test-utils"
import { asValue } from "@medusajs/framework/awilix"
import { setTimeout as setTimeoutSync } from "timers"
import { setTimeout } from "timers/promises"
import { ulid } from "ulid"
@@ -37,28 +37,26 @@ import {
workflowNotIdempotentWithRetentionStep2Invoke,
workflowNotIdempotentWithRetentionStep3Invoke,
} from "../__fixtures__"
import { createScheduled } from "../__fixtures__/workflow_scheduled"
import {
step1InvokeMock as step1InvokeMockAutoRetries,
step2InvokeMock as step2InvokeMockAutoRetries,
step1CompensateMock as step1CompensateMockAutoRetries,
step1InvokeMock as step1InvokeMockAutoRetries,
step2CompensateMock as step2CompensateMockAutoRetries,
step2InvokeMock as step2InvokeMockAutoRetries,
} from "../__fixtures__/workflow_1_auto_retries"
import {
step1InvokeMock as step1InvokeMockAutoRetriesFalse,
step2InvokeMock as step2InvokeMockAutoRetriesFalse,
step1CompensateMock as step1CompensateMockAutoRetriesFalse,
step1InvokeMock as step1InvokeMockAutoRetriesFalse,
step2CompensateMock as step2CompensateMockAutoRetriesFalse,
step2InvokeMock as step2InvokeMockAutoRetriesFalse,
} from "../__fixtures__/workflow_1_auto_retries_false"
import { createScheduled } from "../__fixtures__/workflow_scheduled"
import { Redis } from "ioredis"
import {
step1InvokeMock as step1InvokeMockManualRetry,
step2InvokeMock as step2InvokeMockManualRetry,
step1CompensateMock as step1CompensateMockManualRetry,
step2CompensateMock as step2CompensateMockManualRetry,
} from "../__fixtures__/workflow_1_manual_retry_step"
import { TestDatabase } from "../utils"
import { Redis } from "ioredis"
jest.setTimeout(300000)
@@ -523,7 +521,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0)
expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0)
await setTimeout(3000)
await setTimeout(3000)
await workflowOrcModule.run(workflowId, {
input: {},
@@ -533,7 +531,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
await setTimeout(2000)
await setTimeout(3000)
await setTimeout(3000)
await workflowOrcModule.run(workflowId, {
input: {},

View File

@@ -1,3 +1,4 @@
import { raw } from "@medusajs/framework/mikro-orm/core"
import {
DistributedTransactionType,
IDistributedSchedulerStorage,
@@ -21,7 +22,6 @@ import {
TransactionState,
TransactionStepState,
} from "@medusajs/framework/utils"
import { raw } from "@medusajs/framework/mikro-orm/core"
import { WorkflowOrchestratorService } from "@services"
import { Queue, RepeatOptions, Worker } from "bullmq"
import Redis from "ioredis"
@@ -425,13 +425,11 @@ export class RedisDistributedTransactionStorage
const { retentionTime } = options ?? {}
if (data.flow.hasAsyncSteps) {
await this.#preventRaceConditionExecutionIfNecessary({
data,
key,
options,
})
}
await this.#preventRaceConditionExecutionIfNecessary({
data,
key,
options,
})
if (hasFinished && retentionTime) {
Object.assign(data, {
@@ -471,34 +469,37 @@ export class RedisDistributedTransactionStorage
pipeline.unlink(key)
}
const pipelinePromise = pipeline.exec().then((result) => {
if (!shouldSetNX) {
const execPipeline = () => {
return pipeline.exec().then((result) => {
if (!shouldSetNX) {
return result
}
const actionResult = result?.pop()
const isOk = !!actionResult?.pop()
if (!isOk) {
throw new SkipExecutionError(
"Transaction already started for transactionId: " +
data.flow.transactionId
)
}
return result
}
const actionResult = result?.pop()
const isOk = !!actionResult?.pop()
if (!isOk) {
throw new MedusaError(
MedusaError.Types.INVALID_ARGUMENT,
"Transaction already started for transactionId: " +
data.flow.transactionId
)
}
return result
})
})
}
// Database operations
if (hasFinished && !retentionTime) {
// If the workflow is nested, we cant just remove it because it would break the compensation algorithm. Instead, it will get deleted when the top level parent is deleted.
if (!data.flow.metadata?.parentStepIdempotencyKey) {
await promiseAll([pipelinePromise, this.deleteFromDb(data)])
await promiseAll([execPipeline(), this.deleteFromDb(data)])
} else {
await promiseAll([pipelinePromise, this.saveToDb(data, retentionTime)])
await this.saveToDb(data, retentionTime)
await execPipeline()
}
} else {
await promiseAll([pipelinePromise, this.saveToDb(data, retentionTime)])
await this.saveToDb(data, retentionTime)
await execPipeline()
}
}