diff --git a/.changeset/lovely-seals-sing.md b/.changeset/lovely-seals-sing.md new file mode 100644 index 0000000000..6f360c7ebf --- /dev/null +++ b/.changeset/lovely-seals-sing.md @@ -0,0 +1,5 @@ +--- +"@medusajs/core-flows": patch +--- + +chore(core-flows): lock on cart operations diff --git a/packages/core/core-flows/src/cart/workflows/add-to-cart.ts b/packages/core/core-flows/src/cart/workflows/add-to-cart.ts index 07dcf4b41c..d80c7eb30c 100644 --- a/packages/core/core-flows/src/cart/workflows/add-to-cart.ts +++ b/packages/core/core-flows/src/cart/workflows/add-to-cart.ts @@ -21,6 +21,7 @@ import { } from "@medusajs/framework/workflows-sdk" import { useQueryGraphStep } from "../../common" import { emitEventStep } from "../../common/steps/emit-event" +import { acquireLockStep, releaseLockStep } from "../../locking" import { createLineItemsStep, getLineItemActionsStep, @@ -119,6 +120,13 @@ export const addToCartWorkflow = createWorkflow( idempotent: false, }, (input: WorkflowData) => { + acquireLockStep({ + key: input.cart_id, + timeout: 2, + ttl: 10, + skipOnSubWorkflow: true, + }) + const cartQuery = useQueryGraphStep({ entity: "cart", filters: { id: input.cart_id }, @@ -308,10 +316,16 @@ export const addToCartWorkflow = createWorkflow( input: { cart_id: cart.id, items: allItems }, }) - emitEventStep({ - eventName: CartWorkflowEvents.UPDATED, - data: { id: cart.id }, - }) + parallelize( + emitEventStep({ + eventName: CartWorkflowEvents.UPDATED, + data: { id: cart.id }, + }), + releaseLockStep({ + key: cart.id, + skipOnSubWorkflow: true, + }) + ) return new WorkflowResponse(void 0, { hooks: [validate, setPricingContext] as const, diff --git a/packages/core/core-flows/src/cart/workflows/create-payment-collection-for-cart.ts b/packages/core/core-flows/src/cart/workflows/create-payment-collection-for-cart.ts index ba4ad402eb..0f3a2330e9 100644 --- a/packages/core/core-flows/src/cart/workflows/create-payment-collection-for-cart.ts +++ b/packages/core/core-flows/src/cart/workflows/create-payment-collection-for-cart.ts @@ -10,10 +10,11 @@ import { parallelize, transform, WorkflowData, - WorkflowResponse + WorkflowResponse, } from "@medusajs/framework/workflows-sdk" import { createRemoteLinkStep } from "../../common/steps/create-remote-links" import { useRemoteQueryStep } from "../../common/steps/use-remote-query" +import { acquireLockStep, releaseLockStep } from "../../locking" import { createPaymentCollectionsStep } from "../steps/create-payment-collection" import { validateCartStep } from "../steps/validate-cart" @@ -88,6 +89,13 @@ export const createPaymentCollectionForCartWorkflow = createWorkflow( ( input: WorkflowData ): WorkflowResponse => { + acquireLockStep({ + key: input.cart_id, + timeout: 2, + ttl: 10, + skipOnSubWorkflow: true, + }) + const cart = useRemoteQueryStep({ entry_point: "cart", fields: [ @@ -133,6 +141,11 @@ export const createPaymentCollectionForCartWorkflow = createWorkflow( name: "cart-payment-collection-link", }) + releaseLockStep({ + key: input.cart_id, + skipOnSubWorkflow: true, + }) + return new WorkflowResponse(created[0]) } ) diff --git a/packages/core/core-flows/src/cart/workflows/refresh-cart-items.ts b/packages/core/core-flows/src/cart/workflows/refresh-cart-items.ts index f349b622fa..e61d95ffe8 100644 --- a/packages/core/core-flows/src/cart/workflows/refresh-cart-items.ts +++ b/packages/core/core-flows/src/cart/workflows/refresh-cart-items.ts @@ -14,6 +14,7 @@ import { import { AdditionalData, CartDTO } from "@medusajs/types" import { useQueryGraphStep } from "../../common" import { useRemoteQueryStep } from "../../common/steps/use-remote-query" +import { acquireLockStep, releaseLockStep } from "../../locking" import { getVariantPriceSetsStep, updateLineItemsStep } from "../steps" import { validateVariantPricesStep } from "../steps/validate-variant-prices" import { @@ -133,6 +134,13 @@ export const refreshCartItemsWorkflow = createWorkflow( idempotent: false, }, (input: WorkflowData) => { + acquireLockStep({ + key: input.cart_id, + timeout: 2, + ttl: 10, + skipOnSubWorkflow: true, + }) + const setPricingContext = createHook( "setPricingContext", { @@ -324,6 +332,11 @@ export const refreshCartItemsWorkflow = createWorkflow( input: { cart: refetchedCart }, }) + releaseLockStep({ + key: input.cart_id, + skipOnSubWorkflow: true, + }) + return new WorkflowResponse(refetchedCart, { hooks: [setPricingContext, beforeRefreshingPaymentCollection] as const, }) diff --git a/packages/core/core-flows/src/cart/workflows/refresh-cart-shipping-methods.ts b/packages/core/core-flows/src/cart/workflows/refresh-cart-shipping-methods.ts index 851eaa0c0d..dd34731805 100644 --- a/packages/core/core-flows/src/cart/workflows/refresh-cart-shipping-methods.ts +++ b/packages/core/core-flows/src/cart/workflows/refresh-cart-shipping-methods.ts @@ -9,6 +9,7 @@ import { WorkflowResponse, } from "@medusajs/framework/workflows-sdk" import { useRemoteQueryStep } from "../../common" +import { acquireLockStep, releaseLockStep } from "../../locking" import { removeShippingMethodFromCartStep } from "../steps" import { updateShippingMethodsStep } from "../steps/update-shipping-methods" import { listShippingOptionsForCartWithPricingWorkflow } from "./list-shipping-options-for-cart-with-pricing" @@ -98,6 +99,13 @@ export const refreshCartShippingMethodsWorkflow = createWorkflow( return fetchCart ?? input.cart }) + acquireLockStep({ + key: cart.id, + timeout: 2, + ttl: 10, + skipOnSubWorkflow: true, + }) + const listShippingOptionsInput = transform({ cart }, ({ cart }) => (cart.shipping_methods || []) .map((shippingMethod) => ({ @@ -193,6 +201,11 @@ export const refreshCartShippingMethodsWorkflow = createWorkflow( }), updateShippingMethodsStep(shippingMethodsData.shippingMethodsToUpdate) ) + + releaseLockStep({ + key: cart.id, + skipOnSubWorkflow: true, + }) }) return new WorkflowResponse(void 0, { diff --git a/packages/core/core-flows/src/cart/workflows/refresh-payment-collection.ts b/packages/core/core-flows/src/cart/workflows/refresh-payment-collection.ts index 32ca1ec5e1..daccc39cff 100644 --- a/packages/core/core-flows/src/cart/workflows/refresh-payment-collection.ts +++ b/packages/core/core-flows/src/cart/workflows/refresh-payment-collection.ts @@ -9,6 +9,7 @@ import { when, } from "@medusajs/framework/workflows-sdk" import { useRemoteQueryStep } from "../../common/steps/use-remote-query" +import { acquireLockStep, releaseLockStep } from "../../locking" import { updatePaymentCollectionStep } from "../../payment-collection" import { deletePaymentSessionsWorkflow } from "../../payment-collection/workflows/delete-payment-sessions" @@ -96,6 +97,13 @@ export const refreshPaymentCollectionForCartWorkflow = createWorkflow( return fetchCart ?? input.cart }) + acquireLockStep({ + key: cart.id, + timeout: 2, + ttl: 10, + skipOnSubWorkflow: true, + }) + const validate = createHook("validate", { input, cart, @@ -151,6 +159,11 @@ export const refreshPaymentCollectionForCartWorkflow = createWorkflow( ) }) + releaseLockStep({ + key: cart.id, + skipOnSubWorkflow: true, + }) + return new WorkflowResponse(void 0, { hooks: [validate], }) diff --git a/packages/core/core-flows/src/cart/workflows/refund-payment-recreate-payment-session.ts b/packages/core/core-flows/src/cart/workflows/refund-payment-recreate-payment-session.ts index 897a64065d..7f8810f064 100644 --- a/packages/core/core-flows/src/cart/workflows/refund-payment-recreate-payment-session.ts +++ b/packages/core/core-flows/src/cart/workflows/refund-payment-recreate-payment-session.ts @@ -64,7 +64,6 @@ export const refundPaymentAndRecreatePaymentSessionWorkflowId = export const refundPaymentAndRecreatePaymentSessionWorkflow = createWorkflow( { name: refundPaymentAndRecreatePaymentSessionWorkflowId, - idempotent: false, }, ( input: WorkflowData diff --git a/packages/core/core-flows/src/cart/workflows/transfer-cart-customer.ts b/packages/core/core-flows/src/cart/workflows/transfer-cart-customer.ts index c6ad67cf09..975d5e64d0 100644 --- a/packages/core/core-flows/src/cart/workflows/transfer-cart-customer.ts +++ b/packages/core/core-flows/src/cart/workflows/transfer-cart-customer.ts @@ -2,12 +2,14 @@ import { CartWorkflowEvents } from "@medusajs/framework/utils" import { createHook, createWorkflow, + parallelize, transform, when, WorkflowData, WorkflowResponse, } from "@medusajs/framework/workflows-sdk" import { emitEventStep, useQueryGraphStep } from "../../common" +import { acquireLockStep, releaseLockStep } from "../../locking" import { updateCartsStep } from "../steps" import { refreshCartItemsWorkflow } from "./refresh-cart-items" @@ -101,6 +103,13 @@ export const transferCartCustomerWorkflow = createWorkflow( { shouldTransfer }, ({ shouldTransfer }) => shouldTransfer ).then(() => { + acquireLockStep({ + key: cart.id, + timeout: 2, + ttl: 10, + skipOnSubWorkflow: true, + }) + const cartInput = transform({ cart, customer }, ({ cart, customer }) => [ { id: cart.id, @@ -115,13 +124,19 @@ export const transferCartCustomerWorkflow = createWorkflow( input: { cart_id: input.id, force_refresh: true }, }) - emitEventStep({ - eventName: CartWorkflowEvents.CUSTOMER_TRANSFERRED, - data: { - id: input.id, - customer_id: customer.customer_id, - }, - }) + parallelize( + emitEventStep({ + eventName: CartWorkflowEvents.CUSTOMER_TRANSFERRED, + data: { + id: input.id, + customer_id: customer.customer_id, + }, + }), + releaseLockStep({ + key: cart.id, + skipOnSubWorkflow: true, + }) + ) }) return new WorkflowResponse(void 0, { diff --git a/packages/core/core-flows/src/cart/workflows/update-cart-promotions.ts b/packages/core/core-flows/src/cart/workflows/update-cart-promotions.ts index 80ed7c5f11..d719e8540a 100644 --- a/packages/core/core-flows/src/cart/workflows/update-cart-promotions.ts +++ b/packages/core/core-flows/src/cart/workflows/update-cart-promotions.ts @@ -9,6 +9,7 @@ import { WorkflowResponse, } from "@medusajs/framework/workflows-sdk" import { useRemoteQueryStep } from "../../common" +import { acquireLockStep, releaseLockStep } from "../../locking" import { createLineItemAdjustmentsStep, createShippingMethodAdjustmentsStep, @@ -93,6 +94,13 @@ export const updateCartPromotionsWorkflow = createWorkflow( return input.cart ?? fetchCart }) + acquireLockStep({ + key: cart.id, + timeout: 2, + ttl: 10, + skipOnSubWorkflow: true, + }) + const validate = createHook("validate", { input, cart, @@ -141,6 +149,11 @@ export const updateCartPromotionsWorkflow = createWorkflow( }) ) + releaseLockStep({ + key: cart.id, + skipOnSubWorkflow: true, + }) + return new WorkflowResponse(void 0, { hooks: [validate], }) diff --git a/packages/core/core-flows/src/cart/workflows/update-cart.ts b/packages/core/core-flows/src/cart/workflows/update-cart.ts index 0984454c70..e62214bf61 100644 --- a/packages/core/core-flows/src/cart/workflows/update-cart.ts +++ b/packages/core/core-flows/src/cart/workflows/update-cart.ts @@ -19,6 +19,7 @@ import { } from "@medusajs/framework/workflows-sdk" import { emitEventStep, useQueryGraphStep } from "../../common" import { deleteLineItemsStep } from "../../line-item" +import { acquireLockStep, releaseLockStep } from "../../locking" import { findOrCreateCustomerStep, findSalesChannelStep, @@ -83,6 +84,13 @@ export const updateCartWorkflow = createWorkflow( idempotent: false, }, (input: WorkflowData) => { + acquireLockStep({ + key: input.id, + timeout: 2, + ttl: 10, + skipOnSubWorkflow: true, + }) + const { data: cartToUpdate } = useQueryGraphStep({ entity: "cart", filters: { id: input.id }, @@ -309,6 +317,11 @@ export const updateCartWorkflow = createWorkflow( additional_data: input.additional_data, }) + releaseLockStep({ + key: input.id, + skipOnSubWorkflow: true, + }) + return new WorkflowResponse(void 0, { hooks: [validate, cartUpdated], }) diff --git a/packages/core/core-flows/src/cart/workflows/update-line-item-in-cart.ts b/packages/core/core-flows/src/cart/workflows/update-line-item-in-cart.ts index fa782b0688..286f89c34e 100644 --- a/packages/core/core-flows/src/cart/workflows/update-line-item-in-cart.ts +++ b/packages/core/core-flows/src/cart/workflows/update-line-item-in-cart.ts @@ -16,6 +16,7 @@ import { import { createHook, createWorkflow, + parallelize, transform, when, WorkflowData, @@ -24,6 +25,7 @@ import { import { useQueryGraphStep } from "../../common" import { emitEventStep } from "../../common/steps/emit-event" import { updateLineItemsStepWithSelector } from "../../line-item/steps" +import { acquireLockStep, releaseLockStep } from "../../locking" import { validateCartStep } from "../steps/validate-cart" import { validateVariantPricesStep } from "../steps/validate-variant-prices" import { @@ -111,6 +113,13 @@ export const updateLineItemInCartWorkflow = createWorkflow( ( input: WorkflowData ) => { + acquireLockStep({ + key: input.cart_id, + timeout: 2, + ttl: 10, + skipOnSubWorkflow: true, + }) + const { data: cart } = useQueryGraphStep({ entity: "cart", filters: { id: input.cart_id }, @@ -257,10 +266,16 @@ export const updateLineItemInCartWorkflow = createWorkflow( input: { cart_id: input.cart_id }, }) - emitEventStep({ - eventName: CartWorkflowEvents.UPDATED, - data: { id: input.cart_id }, - }) + parallelize( + emitEventStep({ + eventName: CartWorkflowEvents.UPDATED, + data: { id: input.cart_id }, + }), + releaseLockStep({ + key: input.cart_id, + skipOnSubWorkflow: true, + }) + ) return new WorkflowResponse(void 0, { hooks: [validate, setPricingContext] as const, diff --git a/packages/core/core-flows/src/line-item/workflows/delete-line-items.ts b/packages/core/core-flows/src/line-item/workflows/delete-line-items.ts index ac838b2bfc..b8637ea523 100644 --- a/packages/core/core-flows/src/line-item/workflows/delete-line-items.ts +++ b/packages/core/core-flows/src/line-item/workflows/delete-line-items.ts @@ -1,5 +1,6 @@ import { WorkflowData, createWorkflow } from "@medusajs/framework/workflows-sdk" import { refreshCartItemsWorkflow } from "../../cart/workflows/refresh-cart-items" +import { acquireLockStep, releaseLockStep } from "../../locking" import { deleteLineItemsStep } from "../steps/delete-line-items" /** @@ -43,10 +44,22 @@ export const deleteLineItemsWorkflow = createWorkflow( idempotent: false, }, (input: WorkflowData) => { + acquireLockStep({ + key: input.cart_id, + timeout: 2, + ttl: 10, + skipOnSubWorkflow: true, + }) + deleteLineItemsStep(input.ids) refreshCartItemsWorkflow.runAsStep({ input: { cart_id: input.cart_id }, }) + + releaseLockStep({ + key: input.cart_id, + skipOnSubWorkflow: true, + }) } ) diff --git a/packages/core/core-flows/src/locking/acquire-lock.ts b/packages/core/core-flows/src/locking/acquire-lock.ts index fced4e951a..88c6939072 100644 --- a/packages/core/core-flows/src/locking/acquire-lock.ts +++ b/packages/core/core-flows/src/locking/acquire-lock.ts @@ -12,6 +12,7 @@ export interface AcquireLockStepInput { ttl?: number // in seconds ownerId?: string provider?: string + skipOnSubWorkflow?: boolean } export const acquireLockStepId = "acquire-lock-step" @@ -26,7 +27,10 @@ export const acquireLockStepId = "acquire-lock-step" */ export const acquireLockStep = createStep( acquireLockStepId, - async (data: AcquireLockStepInput, { container }) => { + async ( + data: AcquireLockStepInput, + { container, parentStepIdempotencyKey } + ) => { const keys = Array.isArray(data.key) ? data.key : isDefined(data.key) @@ -37,6 +41,11 @@ export const acquireLockStep = createStep( return new StepResponse(void 0) } + const isSubWorkflow = !!parentStepIdempotencyKey + if (isSubWorkflow && data.skipOnSubWorkflow) { + return StepResponse.skip() as any + } + const locking = container.resolve(Modules.LOCKING) const retryInterval = data.retryInterval ?? 0.3 @@ -65,7 +74,7 @@ export const acquireLockStep = createStep( provider: data.provider, }) }, - async (data, { container }) => { + async (data: any, { container }) => { if (!data?.keys?.length) { return } diff --git a/packages/core/core-flows/src/locking/release-lock.ts b/packages/core/core-flows/src/locking/release-lock.ts index 3670aa0c0a..52a68eaca6 100644 --- a/packages/core/core-flows/src/locking/release-lock.ts +++ b/packages/core/core-flows/src/locking/release-lock.ts @@ -8,6 +8,7 @@ export interface ReleaseLockStepInput { key: string | string[] ownerId?: string provider?: string + skipOnSubWorkflow?: boolean } export const releaseLockStepId = "release-lock-step" @@ -21,7 +22,10 @@ export const releaseLockStepId = "release-lock-step" */ export const releaseLockStep = createStep( releaseLockStepId, - async (data: ReleaseLockStepInput, { container }) => { + async ( + data: ReleaseLockStepInput, + { container, parentStepIdempotencyKey } + ) => { const keys = Array.isArray(data.key) ? data.key : isDefined(data.key) @@ -32,9 +36,15 @@ export const releaseLockStep = createStep( return new StepResponse(true) } + const isSubWorkflow = !!parentStepIdempotencyKey + if (isSubWorkflow && data.skipOnSubWorkflow) { + return StepResponse.skip() as any + } + + const ownerId = data.ownerId const locking = container.resolve(Modules.LOCKING) const released = await locking.release(keys, { - ownerId: data.ownerId, + ownerId, provider: data.provider, })