chore(core-flows): lock on cart operations (#13424)

What:
 - acquire a lock before cart mutations to avoid concurrent changes, e.g applying promotions.
This commit is contained in:
Carlos R. L. Rodrigues
2025-09-08 15:05:09 -03:00
committed by GitHub
parent 8e5c22a8e8
commit b776fd55dc
14 changed files with 179 additions and 21 deletions

View File

@@ -0,0 +1,5 @@
---
"@medusajs/core-flows": patch
---
chore(core-flows): lock on cart operations

View File

@@ -21,6 +21,7 @@ import {
} from "@medusajs/framework/workflows-sdk"
import { useQueryGraphStep } from "../../common"
import { emitEventStep } from "../../common/steps/emit-event"
import { acquireLockStep, releaseLockStep } from "../../locking"
import {
createLineItemsStep,
getLineItemActionsStep,
@@ -119,6 +120,13 @@ export const addToCartWorkflow = createWorkflow(
idempotent: false,
},
(input: WorkflowData<AddToCartWorkflowInputDTO & AdditionalData>) => {
acquireLockStep({
key: input.cart_id,
timeout: 2,
ttl: 10,
skipOnSubWorkflow: true,
})
const cartQuery = useQueryGraphStep({
entity: "cart",
filters: { id: input.cart_id },
@@ -308,10 +316,16 @@ export const addToCartWorkflow = createWorkflow(
input: { cart_id: cart.id, items: allItems },
})
emitEventStep({
eventName: CartWorkflowEvents.UPDATED,
data: { id: cart.id },
})
parallelize(
emitEventStep({
eventName: CartWorkflowEvents.UPDATED,
data: { id: cart.id },
}),
releaseLockStep({
key: cart.id,
skipOnSubWorkflow: true,
})
)
return new WorkflowResponse(void 0, {
hooks: [validate, setPricingContext] as const,

View File

@@ -10,10 +10,11 @@ import {
parallelize,
transform,
WorkflowData,
WorkflowResponse
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"
import { createRemoteLinkStep } from "../../common/steps/create-remote-links"
import { useRemoteQueryStep } from "../../common/steps/use-remote-query"
import { acquireLockStep, releaseLockStep } from "../../locking"
import { createPaymentCollectionsStep } from "../steps/create-payment-collection"
import { validateCartStep } from "../steps/validate-cart"
@@ -88,6 +89,13 @@ export const createPaymentCollectionForCartWorkflow = createWorkflow(
(
input: WorkflowData<CreatePaymentCollectionForCartWorkflowInputDTO>
): WorkflowResponse<PaymentCollectionDTO> => {
acquireLockStep({
key: input.cart_id,
timeout: 2,
ttl: 10,
skipOnSubWorkflow: true,
})
const cart = useRemoteQueryStep({
entry_point: "cart",
fields: [
@@ -133,6 +141,11 @@ export const createPaymentCollectionForCartWorkflow = createWorkflow(
name: "cart-payment-collection-link",
})
releaseLockStep({
key: input.cart_id,
skipOnSubWorkflow: true,
})
return new WorkflowResponse(created[0])
}
)

View File

@@ -14,6 +14,7 @@ import {
import { AdditionalData, CartDTO } from "@medusajs/types"
import { useQueryGraphStep } from "../../common"
import { useRemoteQueryStep } from "../../common/steps/use-remote-query"
import { acquireLockStep, releaseLockStep } from "../../locking"
import { getVariantPriceSetsStep, updateLineItemsStep } from "../steps"
import { validateVariantPricesStep } from "../steps/validate-variant-prices"
import {
@@ -133,6 +134,13 @@ export const refreshCartItemsWorkflow = createWorkflow(
idempotent: false,
},
(input: WorkflowData<RefreshCartItemsWorkflowInput & AdditionalData>) => {
acquireLockStep({
key: input.cart_id,
timeout: 2,
ttl: 10,
skipOnSubWorkflow: true,
})
const setPricingContext = createHook(
"setPricingContext",
{
@@ -324,6 +332,11 @@ export const refreshCartItemsWorkflow = createWorkflow(
input: { cart: refetchedCart },
})
releaseLockStep({
key: input.cart_id,
skipOnSubWorkflow: true,
})
return new WorkflowResponse(refetchedCart, {
hooks: [setPricingContext, beforeRefreshingPaymentCollection] as const,
})

View File

@@ -9,6 +9,7 @@ import {
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"
import { useRemoteQueryStep } from "../../common"
import { acquireLockStep, releaseLockStep } from "../../locking"
import { removeShippingMethodFromCartStep } from "../steps"
import { updateShippingMethodsStep } from "../steps/update-shipping-methods"
import { listShippingOptionsForCartWithPricingWorkflow } from "./list-shipping-options-for-cart-with-pricing"
@@ -98,6 +99,13 @@ export const refreshCartShippingMethodsWorkflow = createWorkflow(
return fetchCart ?? input.cart
})
acquireLockStep({
key: cart.id,
timeout: 2,
ttl: 10,
skipOnSubWorkflow: true,
})
const listShippingOptionsInput = transform({ cart }, ({ cart }) =>
(cart.shipping_methods || [])
.map((shippingMethod) => ({
@@ -193,6 +201,11 @@ export const refreshCartShippingMethodsWorkflow = createWorkflow(
}),
updateShippingMethodsStep(shippingMethodsData.shippingMethodsToUpdate)
)
releaseLockStep({
key: cart.id,
skipOnSubWorkflow: true,
})
})
return new WorkflowResponse(void 0, {

View File

@@ -9,6 +9,7 @@ import {
when,
} from "@medusajs/framework/workflows-sdk"
import { useRemoteQueryStep } from "../../common/steps/use-remote-query"
import { acquireLockStep, releaseLockStep } from "../../locking"
import { updatePaymentCollectionStep } from "../../payment-collection"
import { deletePaymentSessionsWorkflow } from "../../payment-collection/workflows/delete-payment-sessions"
@@ -96,6 +97,13 @@ export const refreshPaymentCollectionForCartWorkflow = createWorkflow(
return fetchCart ?? input.cart
})
acquireLockStep({
key: cart.id,
timeout: 2,
ttl: 10,
skipOnSubWorkflow: true,
})
const validate = createHook("validate", {
input,
cart,
@@ -151,6 +159,11 @@ export const refreshPaymentCollectionForCartWorkflow = createWorkflow(
)
})
releaseLockStep({
key: cart.id,
skipOnSubWorkflow: true,
})
return new WorkflowResponse(void 0, {
hooks: [validate],
})

View File

@@ -64,7 +64,6 @@ export const refundPaymentAndRecreatePaymentSessionWorkflowId =
export const refundPaymentAndRecreatePaymentSessionWorkflow = createWorkflow(
{
name: refundPaymentAndRecreatePaymentSessionWorkflowId,
idempotent: false,
},
(
input: WorkflowData<refundPaymentAndRecreatePaymentSessionWorkflowInput>

View File

@@ -2,12 +2,14 @@ import { CartWorkflowEvents } from "@medusajs/framework/utils"
import {
createHook,
createWorkflow,
parallelize,
transform,
when,
WorkflowData,
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"
import { emitEventStep, useQueryGraphStep } from "../../common"
import { acquireLockStep, releaseLockStep } from "../../locking"
import { updateCartsStep } from "../steps"
import { refreshCartItemsWorkflow } from "./refresh-cart-items"
@@ -101,6 +103,13 @@ export const transferCartCustomerWorkflow = createWorkflow(
{ shouldTransfer },
({ shouldTransfer }) => shouldTransfer
).then(() => {
acquireLockStep({
key: cart.id,
timeout: 2,
ttl: 10,
skipOnSubWorkflow: true,
})
const cartInput = transform({ cart, customer }, ({ cart, customer }) => [
{
id: cart.id,
@@ -115,13 +124,19 @@ export const transferCartCustomerWorkflow = createWorkflow(
input: { cart_id: input.id, force_refresh: true },
})
emitEventStep({
eventName: CartWorkflowEvents.CUSTOMER_TRANSFERRED,
data: {
id: input.id,
customer_id: customer.customer_id,
},
})
parallelize(
emitEventStep({
eventName: CartWorkflowEvents.CUSTOMER_TRANSFERRED,
data: {
id: input.id,
customer_id: customer.customer_id,
},
}),
releaseLockStep({
key: cart.id,
skipOnSubWorkflow: true,
})
)
})
return new WorkflowResponse(void 0, {

View File

@@ -9,6 +9,7 @@ import {
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"
import { useRemoteQueryStep } from "../../common"
import { acquireLockStep, releaseLockStep } from "../../locking"
import {
createLineItemAdjustmentsStep,
createShippingMethodAdjustmentsStep,
@@ -93,6 +94,13 @@ export const updateCartPromotionsWorkflow = createWorkflow(
return input.cart ?? fetchCart
})
acquireLockStep({
key: cart.id,
timeout: 2,
ttl: 10,
skipOnSubWorkflow: true,
})
const validate = createHook("validate", {
input,
cart,
@@ -141,6 +149,11 @@ export const updateCartPromotionsWorkflow = createWorkflow(
})
)
releaseLockStep({
key: cart.id,
skipOnSubWorkflow: true,
})
return new WorkflowResponse(void 0, {
hooks: [validate],
})

View File

@@ -19,6 +19,7 @@ import {
} from "@medusajs/framework/workflows-sdk"
import { emitEventStep, useQueryGraphStep } from "../../common"
import { deleteLineItemsStep } from "../../line-item"
import { acquireLockStep, releaseLockStep } from "../../locking"
import {
findOrCreateCustomerStep,
findSalesChannelStep,
@@ -83,6 +84,13 @@ export const updateCartWorkflow = createWorkflow(
idempotent: false,
},
(input: WorkflowData<UpdateCartWorkflowInput>) => {
acquireLockStep({
key: input.id,
timeout: 2,
ttl: 10,
skipOnSubWorkflow: true,
})
const { data: cartToUpdate } = useQueryGraphStep({
entity: "cart",
filters: { id: input.id },
@@ -309,6 +317,11 @@ export const updateCartWorkflow = createWorkflow(
additional_data: input.additional_data,
})
releaseLockStep({
key: input.id,
skipOnSubWorkflow: true,
})
return new WorkflowResponse(void 0, {
hooks: [validate, cartUpdated],
})

View File

@@ -16,6 +16,7 @@ import {
import {
createHook,
createWorkflow,
parallelize,
transform,
when,
WorkflowData,
@@ -24,6 +25,7 @@ import {
import { useQueryGraphStep } from "../../common"
import { emitEventStep } from "../../common/steps/emit-event"
import { updateLineItemsStepWithSelector } from "../../line-item/steps"
import { acquireLockStep, releaseLockStep } from "../../locking"
import { validateCartStep } from "../steps/validate-cart"
import { validateVariantPricesStep } from "../steps/validate-variant-prices"
import {
@@ -111,6 +113,13 @@ export const updateLineItemInCartWorkflow = createWorkflow(
(
input: WorkflowData<UpdateLineItemInCartWorkflowInputDTO & AdditionalData>
) => {
acquireLockStep({
key: input.cart_id,
timeout: 2,
ttl: 10,
skipOnSubWorkflow: true,
})
const { data: cart } = useQueryGraphStep({
entity: "cart",
filters: { id: input.cart_id },
@@ -257,10 +266,16 @@ export const updateLineItemInCartWorkflow = createWorkflow(
input: { cart_id: input.cart_id },
})
emitEventStep({
eventName: CartWorkflowEvents.UPDATED,
data: { id: input.cart_id },
})
parallelize(
emitEventStep({
eventName: CartWorkflowEvents.UPDATED,
data: { id: input.cart_id },
}),
releaseLockStep({
key: input.cart_id,
skipOnSubWorkflow: true,
})
)
return new WorkflowResponse(void 0, {
hooks: [validate, setPricingContext] as const,

View File

@@ -1,5 +1,6 @@
import { WorkflowData, createWorkflow } from "@medusajs/framework/workflows-sdk"
import { refreshCartItemsWorkflow } from "../../cart/workflows/refresh-cart-items"
import { acquireLockStep, releaseLockStep } from "../../locking"
import { deleteLineItemsStep } from "../steps/delete-line-items"
/**
@@ -43,10 +44,22 @@ export const deleteLineItemsWorkflow = createWorkflow(
idempotent: false,
},
(input: WorkflowData<DeleteLineItemsWorkflowInput>) => {
acquireLockStep({
key: input.cart_id,
timeout: 2,
ttl: 10,
skipOnSubWorkflow: true,
})
deleteLineItemsStep(input.ids)
refreshCartItemsWorkflow.runAsStep({
input: { cart_id: input.cart_id },
})
releaseLockStep({
key: input.cart_id,
skipOnSubWorkflow: true,
})
}
)

View File

@@ -12,6 +12,7 @@ export interface AcquireLockStepInput {
ttl?: number // in seconds
ownerId?: string
provider?: string
skipOnSubWorkflow?: boolean
}
export const acquireLockStepId = "acquire-lock-step"
@@ -26,7 +27,10 @@ export const acquireLockStepId = "acquire-lock-step"
*/
export const acquireLockStep = createStep(
acquireLockStepId,
async (data: AcquireLockStepInput, { container }) => {
async (
data: AcquireLockStepInput,
{ container, parentStepIdempotencyKey }
) => {
const keys = Array.isArray(data.key)
? data.key
: isDefined(data.key)
@@ -37,6 +41,11 @@ export const acquireLockStep = createStep(
return new StepResponse(void 0)
}
const isSubWorkflow = !!parentStepIdempotencyKey
if (isSubWorkflow && data.skipOnSubWorkflow) {
return StepResponse.skip() as any
}
const locking = container.resolve(Modules.LOCKING)
const retryInterval = data.retryInterval ?? 0.3
@@ -65,7 +74,7 @@ export const acquireLockStep = createStep(
provider: data.provider,
})
},
async (data, { container }) => {
async (data: any, { container }) => {
if (!data?.keys?.length) {
return
}

View File

@@ -8,6 +8,7 @@ export interface ReleaseLockStepInput {
key: string | string[]
ownerId?: string
provider?: string
skipOnSubWorkflow?: boolean
}
export const releaseLockStepId = "release-lock-step"
@@ -21,7 +22,10 @@ export const releaseLockStepId = "release-lock-step"
*/
export const releaseLockStep = createStep(
releaseLockStepId,
async (data: ReleaseLockStepInput, { container }) => {
async (
data: ReleaseLockStepInput,
{ container, parentStepIdempotencyKey }
) => {
const keys = Array.isArray(data.key)
? data.key
: isDefined(data.key)
@@ -32,9 +36,15 @@ export const releaseLockStep = createStep(
return new StepResponse(true)
}
const isSubWorkflow = !!parentStepIdempotencyKey
if (isSubWorkflow && data.skipOnSubWorkflow) {
return StepResponse.skip() as any
}
const ownerId = data.ownerId
const locking = container.resolve(Modules.LOCKING)
const released = await locking.release(keys, {
ownerId: data.ownerId,
ownerId,
provider: data.provider,
})