From 9412669e654c994e2dbee9cf61c18004d79f6475 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Date: Thu, 28 Aug 2025 10:04:00 -0300 Subject: [PATCH] chore: idempotent cart operations (#13236) * chore(core-flows): idempotent cart operations * changeset * add tests * revert * revert route * promo test * skip bugs * fix test * tests * avoid workflow name conflict * prevent nested workflow from being deleted until the top level parent finishes * remove unused setTimeout * update changeset * rm comments --------- Co-authored-by: adrien2p --- .changeset/lemon-buttons-look.md | 11 + .../promotions/admin/promotions.spec.ts | 682 +++++++++++++++++- .../workflow-engine/workflow-engine.ts | 8 +- .../src/cart/workflows/add-to-cart.ts | 5 +- .../create-payment-collection-for-cart.ts | 5 +- .../src/cart/workflows/refresh-cart-items.ts | 5 +- .../refresh-cart-shipping-methods.ts | 5 +- .../workflows/refresh-payment-collection.ts | 5 +- ...refund-payment-recreate-payment-session.ts | 5 +- .../cart/workflows/transfer-cart-customer.ts | 7 +- .../cart/workflows/update-cart-promotions.ts | 5 +- .../src/cart/workflows/update-cart.ts | 5 +- .../workflows/update-line-item-in-cart.ts | 5 +- .../line-item/workflows/delete-line-items.ts | 15 +- .../transaction/transaction-orchestrator.ts | 14 +- .../src/workflow/local-workflow.ts | 1 + packages/core/types/src/shared-context.ts | 5 + .../src/utils/composer/create-workflow.ts | 1 + .../composer/helpers/create-step-handler.ts | 1 + .../workflows-sdk/src/utils/composer/type.ts | 5 + .../api/store/carts/[id]/customer/route.ts | 8 +- .../carts/[id]/line-items/[line_id]/route.ts | 14 +- .../api/store/carts/[id]/line-items/route.ts | 7 +- .../api/store/carts/[id]/promotions/route.ts | 21 +- .../medusa/src/api/store/carts/[id]/route.ts | 8 +- .../api/store/payment-collections/route.ts | 15 +- .../__fixtures__/workflow_idempotent.ts | 1 + .../__fixtures__/workflow_sync.ts | 3 +- .../.snapshot-medusa-workflows.json | 9 + .../src/migrations/Migration20250819104213.ts | 13 + .../src/models/workflow-execution.ts | 4 + .../src/services/workflows-module.ts | 1 + .../utils/workflow-orchestrator-storage.ts | 13 +- .../__fixtures__/workflow_sync.ts | 3 +- .../.snapshot-medusa-workflows.json | 9 + .../src/migrations/Migration20250819110923.ts | 13 + .../src/models/workflow-execution.ts | 4 + .../utils/workflow-orchestrator-storage.ts | 13 +- 38 files changed, 890 insertions(+), 64 deletions(-) create mode 100644 .changeset/lemon-buttons-look.md create mode 100644 packages/modules/workflow-engine-inmemory/src/migrations/Migration20250819104213.ts create mode 100644 packages/modules/workflow-engine-redis/src/migrations/Migration20250819110923.ts diff --git a/.changeset/lemon-buttons-look.md b/.changeset/lemon-buttons-look.md new file mode 100644 index 0000000000..69da660344 --- /dev/null +++ b/.changeset/lemon-buttons-look.md @@ -0,0 +1,11 @@ +--- +"@medusajs/workflow-engine-inmemory": minor +"@medusajs/workflow-engine-redis": minor +"@medusajs/core-flows": minor +"@medusajs/medusa": minor +"@medusajs/orchestration": minor +"@medusajs/types": minor +"@medusajs/workflows-sdk": minor +--- + +chore(core-flows): idempotent cart operations and nested workflow deletion lifecycle fix diff --git a/integration-tests/http/__tests__/promotions/admin/promotions.spec.ts b/integration-tests/http/__tests__/promotions/admin/promotions.spec.ts index 15e0f1c027..5c08664c1c 100644 --- a/integration-tests/http/__tests__/promotions/admin/promotions.spec.ts +++ b/integration-tests/http/__tests__/promotions/admin/promotions.spec.ts @@ -8,7 +8,7 @@ import { import { setupTaxStructure } from "../../../../modules/__tests__/fixtures/tax" import { medusaTshirtProduct } from "../../../__fixtures__/product" -jest.setTimeout(50000) +jest.setTimeout(50000000) const adminHeaders = { headers: { "x-medusa-access-token": "test_token" }, @@ -609,6 +609,11 @@ medusaIntegrationTestRunner({ expect(cartWithPromotion1).toEqual( expect.objectContaining({ + items: [ + expect.objectContaining({ + adjustments: [], + }), + ], promotions: [], }) ) @@ -621,6 +626,140 @@ medusaIntegrationTestRunner({ ) ).data.cart + expect(cartWithPromotion2).toEqual( + expect.objectContaining({ + items: [ + expect.objectContaining({ + adjustments: [ + expect.objectContaining({ + code: response.data.promotion.code, + }), + ], + }), + ], + promotions: [ + expect.objectContaining({ + code: response.data.promotion.code, + }), + ], + }) + ) + }) + + it("should add promotion and remove it from cart using delete", async () => { + const publishableKey = await generatePublishableKey(appContainer) + const storeHeaders = generateStoreHeaders({ publishableKey }) + + const salesChannel = ( + await api.post( + "/admin/sales-channels", + { name: "Webshop", description: "channel" }, + adminHeaders + ) + ).data.sales_channel + + const region = ( + await api.post( + "/admin/regions", + { name: "US", currency_code: "usd", countries: ["us"] }, + adminHeaders + ) + ).data.region + + const product = ( + await api.post( + "/admin/products", + { + ...medusaTshirtProduct, + shipping_profile_id: shippingProfile.id, + }, + adminHeaders + ) + ).data.product + + const cart = ( + await api.post( + `/store/carts`, + { + currency_code: "usd", + sales_channel_id: salesChannel.id, + region_id: region.id, + items: [{ variant_id: product.variants[0].id, quantity: 1 }], + }, + storeHeaders + ) + ).data.cart + + const response = await api.post( + `/admin/promotions`, + { + code: "TEST", + type: PromotionType.STANDARD, + status: PromotionStatus.ACTIVE, + is_automatic: false, + application_method: { + target_type: "items", + type: "fixed", + allocation: "each", + currency_code: "usd", + value: 100, + max_quantity: 100, + }, + rules: [ + { + attribute: "subtotal", + operator: "gte", + values: "1", + }, + ], + }, + adminHeaders + ) + + // Simulate concurrent requests + await Promise.all([ + api + .post( + `/store/carts/${cart.id}`, + { + promo_codes: [response.data.promotion.code], + }, + storeHeaders + ) + .catch(() => {}), + api + .post( + `/store/carts/${cart.id}`, + { + promo_codes: [response.data.promotion.code], + }, + storeHeaders + ) + .catch(() => {}), + ]) + + const cartAfterPromotion = ( + await api.get(`/store/carts/${cart.id}`, storeHeaders) + ).data.cart + + expect(cartAfterPromotion).toEqual( + expect.objectContaining({ + promotions: [ + expect.objectContaining({ + code: response.data.promotion.code, + }), + ], + }) + ) + + const cartWithPromotion2 = ( + await api.post( + `/store/carts/${cart.id}/line-items`, + { variant_id: product.variants[0].id, quantity: 40 }, + storeHeaders + ) + ).data.cart + expect(cartWithPromotion2).toEqual( expect.objectContaining({ promotions: [ @@ -630,6 +769,547 @@ medusaIntegrationTestRunner({ ], }) ) + + await api.delete(`/store/carts/${cart.id}/promotions`, { + data: { + promo_codes: [response.data.promotion.code], + }, + ...storeHeaders, + }) + + const cartWithoutPromotion = ( + await api.get(`/store/carts/${cart.id}`, storeHeaders) + ).data.cart + + expect(cartWithoutPromotion).toEqual( + expect.objectContaining({ + promotions: [], + }) + ) + }) + + it("should add promotion and remove it from cart using update", async () => { + const publishableKey = await generatePublishableKey(appContainer) + const storeHeaders = generateStoreHeaders({ publishableKey }) + + const salesChannel = ( + await api.post( + "/admin/sales-channels", + { name: "Webshop", description: "channel" }, + adminHeaders + ) + ).data.sales_channel + + const region = ( + await api.post( + "/admin/regions", + { name: "US", currency_code: "usd", countries: ["us"] }, + adminHeaders + ) + ).data.region + + const product = ( + await api.post( + "/admin/products", + { + ...medusaTshirtProduct, + shipping_profile_id: shippingProfile.id, + }, + adminHeaders + ) + ).data.product + + const cart = ( + await api.post( + `/store/carts`, + { + currency_code: "usd", + sales_channel_id: salesChannel.id, + region_id: region.id, + items: [{ variant_id: product.variants[0].id, quantity: 1 }], + }, + storeHeaders + ) + ).data.cart + + const response = await api.post( + `/admin/promotions`, + { + code: "TEST", + type: PromotionType.STANDARD, + status: PromotionStatus.ACTIVE, + is_automatic: false, + application_method: { + target_type: "items", + type: "fixed", + allocation: "each", + currency_code: "usd", + value: 100, + max_quantity: 100, + }, + rules: [ + { + attribute: "subtotal", + operator: "gte", + values: "1", + }, + ], + }, + adminHeaders + ) + + await api.post( + `/store/carts/${cart.id}`, + { + promo_codes: [response.data.promotion.code], + }, + storeHeaders + ) + + const cartAfterPromotion = ( + await api.get(`/store/carts/${cart.id}`, storeHeaders) + ).data.cart + + expect(cartAfterPromotion).toEqual( + expect.objectContaining({ + items: [ + expect.objectContaining({ + adjustments: [ + expect.objectContaining({ + code: response.data.promotion.code, + }), + ], + }), + ], + promotions: [ + expect.objectContaining({ + code: response.data.promotion.code, + }), + ], + }) + ) + + const cartWithPromotion2 = ( + await api.post( + `/store/carts/${cart.id}/line-items`, + { variant_id: product.variants[0].id, quantity: 40 }, + storeHeaders + ) + ).data.cart + + expect(cartWithPromotion2).toEqual( + expect.objectContaining({ + items: [ + expect.objectContaining({ + adjustments: [ + expect.objectContaining({ + code: response.data.promotion.code, + }), + ], + }), + ], + promotions: [ + expect.objectContaining({ + code: response.data.promotion.code, + }), + ], + }) + ) + + await api.post( + `/store/carts/${cart.id}`, + { + promo_codes: [], + }, + storeHeaders + ) + + const cartWithoutPromotion = ( + await api.get(`/store/carts/${cart.id}`, storeHeaders) + ).data.cart + + expect(cartWithoutPromotion).toEqual( + expect.objectContaining({ + items: [ + expect.objectContaining({ + adjustments: [], + }), + ], + promotions: [], + }) + ) + }) + + it.skip("should add two promotions and remove one from cart using delete", async () => { + const publishableKey = await generatePublishableKey(appContainer) + const storeHeaders = generateStoreHeaders({ publishableKey }) + + const salesChannel = ( + await api.post( + "/admin/sales-channels", + { name: "Webshop", description: "channel" }, + adminHeaders + ) + ).data.sales_channel + + const region = ( + await api.post( + "/admin/regions", + { name: "US", currency_code: "usd", countries: ["us"] }, + adminHeaders + ) + ).data.region + + const product = ( + await api.post( + "/admin/products", + { + ...medusaTshirtProduct, + shipping_profile_id: shippingProfile.id, + }, + adminHeaders + ) + ).data.product + + const cart = ( + await api.post( + `/store/carts`, + { + currency_code: "usd", + sales_channel_id: salesChannel.id, + region_id: region.id, + items: [{ variant_id: product.variants[0].id, quantity: 1 }], + }, + storeHeaders + ) + ).data.cart + + const promo1 = await api.post( + `/admin/promotions`, + { + code: "TEST", + type: PromotionType.STANDARD, + status: PromotionStatus.ACTIVE, + is_automatic: false, + application_method: { + target_type: "items", + type: "fixed", + allocation: "each", + currency_code: "usd", + value: 100, + max_quantity: 100, + }, + rules: [ + { + attribute: "subtotal", + operator: "gte", + values: "1", + }, + ], + }, + adminHeaders + ) + const promo2 = await api.post( + `/admin/promotions`, + { + code: "TEST2", + type: PromotionType.STANDARD, + status: PromotionStatus.ACTIVE, + is_automatic: false, + application_method: { + target_type: "items", + type: "fixed", + allocation: "each", + currency_code: "usd", + value: 100, + max_quantity: 100, + }, + rules: [ + { + attribute: "subtotal", + operator: "gte", + values: "2000", + }, + ], + }, + adminHeaders + ) + const cartWithPromotion2 = ( + await api.post( + `/store/carts/${cart.id}/line-items`, + { variant_id: product.variants[0].id, quantity: 40 }, + storeHeaders + ) + ).data.cart + + expect(cartWithPromotion2).toEqual( + expect.objectContaining({ + items: [ + expect.objectContaining({ + adjustments: [ + expect.objectContaining({ + code: promo1.data.promotion.code, + }), + expect.objectContaining({ + code: promo2.data.promotion.code, + }), + ], + }), + ], + promotions: [ + expect.objectContaining({ + code: promo1.data.promotion.code, + }), + expect.objectContaining({ + code: promo2.data.promotion.code, + }), + ], + }) + ) + + await api.delete(`/store/carts/${cart.id}/promotions`, { + data: { + promo_codes: [promo1.data.promotion.code], + }, + ...storeHeaders, + }) + + const cartWithoutPromotion1 = ( + await api.get(`/store/carts/${cart.id}`, storeHeaders) + ).data.cart + + expect(cartWithoutPromotion1).toEqual( + expect.objectContaining({ + items: [ + expect.objectContaining({ + adjustments: [ + expect.objectContaining({ + code: promo2.data.promotion.code, + }), + ], + }), + ], + promotions: [ + expect.objectContaining({ + code: promo2.data.promotion.code, + }), + ], + }) + ) + }) + + it.skip("should add two promotions and remove one from cart using update", async () => { + const publishableKey = await generatePublishableKey(appContainer) + const storeHeaders = generateStoreHeaders({ publishableKey }) + + const salesChannel = ( + await api.post( + "/admin/sales-channels", + { name: "Webshop", description: "channel" }, + adminHeaders + ) + ).data.sales_channel + + const region = ( + await api.post( + "/admin/regions", + { name: "US", currency_code: "usd", countries: ["us"] }, + adminHeaders + ) + ).data.region + + const product = ( + await api.post( + "/admin/products", + { + ...medusaTshirtProduct, + shipping_profile_id: shippingProfile.id, + }, + adminHeaders + ) + ).data.product + + const cart = ( + await api.post( + `/store/carts`, + { + currency_code: "usd", + sales_channel_id: salesChannel.id, + region_id: region.id, + items: [{ variant_id: product.variants[0].id, quantity: 1 }], + }, + storeHeaders + ) + ).data.cart + + const [promo1, promo2, promoAutomatic] = await Promise.all([ + api.post( + `/admin/promotions`, + { + code: "TEST", + type: PromotionType.STANDARD, + status: PromotionStatus.ACTIVE, + is_automatic: false, + application_method: { + target_type: "items", + type: "fixed", + allocation: "each", + currency_code: "usd", + value: 50, + max_quantity: 100, + }, + rules: [ + { + attribute: "subtotal", + operator: "gte", + values: "1", + }, + ], + }, + adminHeaders + ), + api.post( + `/admin/promotions`, + { + code: "TEST_CODE_123", + type: PromotionType.STANDARD, + status: PromotionStatus.ACTIVE, + is_automatic: false, + application_method: { + target_type: "items", + type: "fixed", + allocation: "each", + currency_code: "usd", + value: 10, + max_quantity: 100, + }, + rules: [ + { + attribute: "subtotal", + operator: "gte", + values: "2000", + }, + ], + }, + adminHeaders + ), + api.post( + `/admin/promotions`, + { + code: "AUTOMATIC_PROMO", + type: PromotionType.STANDARD, + status: PromotionStatus.ACTIVE, + is_automatic: true, + application_method: { + target_type: "items", + type: "fixed", + allocation: "each", + currency_code: "usd", + value: 5, + max_quantity: 100, + }, + rules: [ + { + attribute: "subtotal", + operator: "gte", + values: "500", + }, + ], + }, + adminHeaders + ), + ]) + + // apply promotions + await api.post( + `/store/carts/${cart.id}`, + { + promo_codes: [ + promo1.data.promotion.code, + promo2.data.promotion.code, + ], + }, + storeHeaders + ) + + const cartWithPromotion2 = ( + await api.post( + `/store/carts/${cart.id}/line-items`, + { + variant_id: product.variants[0].id, + quantity: 40, + }, + storeHeaders + ) + ).data.cart + + expect(cartWithPromotion2).toEqual( + expect.objectContaining({ + items: [ + expect.objectContaining({ + adjustments: [ + expect.objectContaining({ + code: promo1.data.promotion.code, + }), + expect.objectContaining({ + code: promo2.data.promotion.code, + }), + expect.objectContaining({ + code: promoAutomatic.data.promotion.code, + }), + ], + }), + ], + promotions: [ + expect.objectContaining({ + code: promo1.data.promotion.code, + }), + expect.objectContaining({ + code: promo2.data.promotion.code, + }), + expect.objectContaining({ + code: promoAutomatic.data.promotion.code, + }), + ], + }) + ) + + await api.post( + `/store/carts/${cart.id}`, + { + promo_codes: [promo2.data.promotion.code], + }, + storeHeaders + ) + + const cartWithoutPromotion1 = ( + await api.get(`/store/carts/${cart.id}`, storeHeaders) + ).data.cart + + expect(cartWithoutPromotion1).toEqual( + expect.objectContaining({ + items: [ + expect.objectContaining({ + adjustments: [ + expect.objectContaining({ + code: promo2.data.promotion.code, + }), + expect.objectContaining({ + code: promoAutomatic.data.promotion.code, + }), + ], + }), + ], + promotions: [ + expect.objectContaining({ + code: promo2.data.promotion.code, + }), + expect.objectContaining({ + code: promoAutomatic.data.promotion.code, + }), + ], + }) + ) }) }) diff --git a/integration-tests/modules/__tests__/workflow-engine/workflow-engine.ts b/integration-tests/modules/__tests__/workflow-engine/workflow-engine.ts index 96e8ecb81e..3060018c25 100644 --- a/integration-tests/modules/__tests__/workflow-engine/workflow-engine.ts +++ b/integration-tests/modules/__tests__/workflow-engine/workflow-engine.ts @@ -1,3 +1,4 @@ +import { emitEventStep } from "@medusajs/core-flows" import { Modules, TransactionState } from "@medusajs/framework/utils" import { createStep, @@ -7,12 +8,11 @@ import { WorkflowResponse, } from "@medusajs/framework/workflows-sdk" import { medusaIntegrationTestRunner } from "@medusajs/test-utils" +import { IEventBusModuleService } from "@medusajs/types" import { adminHeaders, createAdminUser, } from "../../../helpers/create-admin-user" -import { emitEventStep } from "@medusajs/core-flows" -import { IEventBusModuleService } from "@medusajs/types" jest.setTimeout(300000) @@ -106,7 +106,7 @@ medusaIntegrationTestRunner({ createWorkflow( { - name: "my-workflow-name", + name: "my-workflow-name-2", retentionTime: 50, }, function (input: WorkflowData<{ initial: string }>) { @@ -137,7 +137,7 @@ medusaIntegrationTestRunner({ const engine = container.resolve(Modules.WORKFLOW_ENGINE) const transactionId = "trx-id-failing-event" - const res = await engine.run("my-workflow-name", { + const res = await engine.run("my-workflow-name-2", { transactionId, input: { initial: "abc", diff --git a/packages/core/core-flows/src/cart/workflows/add-to-cart.ts b/packages/core/core-flows/src/cart/workflows/add-to-cart.ts index 84556b901b..4662bcbe7a 100644 --- a/packages/core/core-flows/src/cart/workflows/add-to-cart.ts +++ b/packages/core/core-flows/src/cart/workflows/add-to-cart.ts @@ -114,7 +114,10 @@ export const addToCartWorkflowId = "add-to-cart" * ::: */ export const addToCartWorkflow = createWorkflow( - addToCartWorkflowId, + { + name: addToCartWorkflowId, + idempotent: true, + }, (input: WorkflowData) => { const cartQuery = useQueryGraphStep({ entity: "cart", diff --git a/packages/core/core-flows/src/cart/workflows/create-payment-collection-for-cart.ts b/packages/core/core-flows/src/cart/workflows/create-payment-collection-for-cart.ts index 4ec277172d..08cd0dc01c 100644 --- a/packages/core/core-flows/src/cart/workflows/create-payment-collection-for-cart.ts +++ b/packages/core/core-flows/src/cart/workflows/create-payment-collection-for-cart.ts @@ -79,7 +79,10 @@ export const createPaymentCollectionForCartWorkflowId = * Create payment collection for cart. */ export const createPaymentCollectionForCartWorkflow = createWorkflow( - createPaymentCollectionForCartWorkflowId, + { + name: createPaymentCollectionForCartWorkflowId, + idempotent: true, + }, ( input: WorkflowData ): WorkflowData => { diff --git a/packages/core/core-flows/src/cart/workflows/refresh-cart-items.ts b/packages/core/core-flows/src/cart/workflows/refresh-cart-items.ts index b7f75327dd..bb1980d4be 100644 --- a/packages/core/core-flows/src/cart/workflows/refresh-cart-items.ts +++ b/packages/core/core-flows/src/cart/workflows/refresh-cart-items.ts @@ -128,7 +128,10 @@ export const refreshCartItemsWorkflowId = "refresh-cart-items" * */ export const refreshCartItemsWorkflow = createWorkflow( - refreshCartItemsWorkflowId, + { + name: refreshCartItemsWorkflowId, + idempotent: true, + }, (input: WorkflowData) => { const setPricingContext = createHook( "setPricingContext", diff --git a/packages/core/core-flows/src/cart/workflows/refresh-cart-shipping-methods.ts b/packages/core/core-flows/src/cart/workflows/refresh-cart-shipping-methods.ts index 709e742111..3b8bd0552c 100644 --- a/packages/core/core-flows/src/cart/workflows/refresh-cart-shipping-methods.ts +++ b/packages/core/core-flows/src/cart/workflows/refresh-cart-shipping-methods.ts @@ -50,7 +50,10 @@ export const refreshCartShippingMethodsWorkflowId = * @property hooks.validate - This hook is executed before all operations. You can consume this hook to perform any custom validation. If validation fails, you can throw an error to stop the workflow execution. */ export const refreshCartShippingMethodsWorkflow = createWorkflow( - refreshCartShippingMethodsWorkflowId, + { + name: refreshCartShippingMethodsWorkflowId, + idempotent: true, + }, (input: WorkflowData) => { const fetchCart = when("fetch-cart", { input }, ({ input }) => { return !input.cart diff --git a/packages/core/core-flows/src/cart/workflows/refresh-payment-collection.ts b/packages/core/core-flows/src/cart/workflows/refresh-payment-collection.ts index 5c28d46d7c..f316cdf698 100644 --- a/packages/core/core-flows/src/cart/workflows/refresh-payment-collection.ts +++ b/packages/core/core-flows/src/cart/workflows/refresh-payment-collection.ts @@ -54,7 +54,10 @@ export const refreshPaymentCollectionForCartWorkflowId = * @property hooks.validate - This hook is executed before all operations. You can consume this hook to perform any custom validation. If validation fails, you can throw an error to stop the workflow execution. */ export const refreshPaymentCollectionForCartWorkflow = createWorkflow( - refreshPaymentCollectionForCartWorkflowId, + { + name: refreshPaymentCollectionForCartWorkflowId, + idempotent: true, + }, (input: WorkflowData) => { const fetchCart = when("should-fetch-cart", { input }, ({ input }) => { return !input.cart diff --git a/packages/core/core-flows/src/cart/workflows/refund-payment-recreate-payment-session.ts b/packages/core/core-flows/src/cart/workflows/refund-payment-recreate-payment-session.ts index 472010eb3e..070fa01bb6 100644 --- a/packages/core/core-flows/src/cart/workflows/refund-payment-recreate-payment-session.ts +++ b/packages/core/core-flows/src/cart/workflows/refund-payment-recreate-payment-session.ts @@ -62,7 +62,10 @@ export const refundPaymentAndRecreatePaymentSessionWorkflowId = * Refund a payment and create a new payment session. */ export const refundPaymentAndRecreatePaymentSessionWorkflow = createWorkflow( - refundPaymentAndRecreatePaymentSessionWorkflowId, + { + name: refundPaymentAndRecreatePaymentSessionWorkflowId, + idempotent: true, + }, ( input: WorkflowData ): WorkflowResponse => { diff --git a/packages/core/core-flows/src/cart/workflows/transfer-cart-customer.ts b/packages/core/core-flows/src/cart/workflows/transfer-cart-customer.ts index 310f99687c..a5357cdd5c 100644 --- a/packages/core/core-flows/src/cart/workflows/transfer-cart-customer.ts +++ b/packages/core/core-flows/src/cart/workflows/transfer-cart-customer.ts @@ -1,3 +1,4 @@ +import { CartWorkflowEvents } from "@medusajs/framework/utils" import { createHook, createWorkflow, @@ -9,7 +10,6 @@ import { import { emitEventStep, useQueryGraphStep } from "../../common" import { updateCartsStep } from "../steps" import { refreshCartItemsWorkflow } from "./refresh-cart-items" -import { CartWorkflowEvents } from "@medusajs/framework/utils" /** * The cart ownership transfer details. @@ -49,7 +49,10 @@ export const transferCartCustomerWorkflowId = "transfer-cart-customer" * @property hooks.validate - This hook is executed before all operations. You can consume this hook to perform any custom validation. If validation fails, you can throw an error to stop the workflow execution. */ export const transferCartCustomerWorkflow = createWorkflow( - transferCartCustomerWorkflowId, + { + name: transferCartCustomerWorkflowId, + idempotent: true, + }, (input: WorkflowData) => { const cartQuery = useQueryGraphStep({ entity: "cart", diff --git a/packages/core/core-flows/src/cart/workflows/update-cart-promotions.ts b/packages/core/core-flows/src/cart/workflows/update-cart-promotions.ts index 95599be690..b5227ae1e5 100644 --- a/packages/core/core-flows/src/cart/workflows/update-cart-promotions.ts +++ b/packages/core/core-flows/src/cart/workflows/update-cart-promotions.ts @@ -73,7 +73,10 @@ export const updateCartPromotionsWorkflowId = "update-cart-promotions" * @property hooks.validate - This hook is executed before all operations. You can consume this hook to perform any custom validation. If validation fails, you can throw an error to stop the workflow execution. */ export const updateCartPromotionsWorkflow = createWorkflow( - updateCartPromotionsWorkflowId, + { + name: updateCartPromotionsWorkflowId, + idempotent: true, + }, (input: WorkflowData) => { const fetchCart = when("should-fetch-cart", { input }, ({ input }) => { return !input.cart diff --git a/packages/core/core-flows/src/cart/workflows/update-cart.ts b/packages/core/core-flows/src/cart/workflows/update-cart.ts index b2c013a67b..ddc2a80857 100644 --- a/packages/core/core-flows/src/cart/workflows/update-cart.ts +++ b/packages/core/core-flows/src/cart/workflows/update-cart.ts @@ -78,7 +78,10 @@ export const updateCartWorkflowId = "update-cart" * @property hooks.cartUpdated - This hook is executed after a cart is update. You can consume this hook to perform custom actions on the updated cart. */ export const updateCartWorkflow = createWorkflow( - updateCartWorkflowId, + { + name: updateCartWorkflowId, + idempotent: true, + }, (input: WorkflowData) => { const { data: cartToUpdate } = useQueryGraphStep({ entity: "cart", diff --git a/packages/core/core-flows/src/cart/workflows/update-line-item-in-cart.ts b/packages/core/core-flows/src/cart/workflows/update-line-item-in-cart.ts index 9dc055434a..49a01d0082 100644 --- a/packages/core/core-flows/src/cart/workflows/update-line-item-in-cart.ts +++ b/packages/core/core-flows/src/cart/workflows/update-line-item-in-cart.ts @@ -104,7 +104,10 @@ export const updateLineItemInCartWorkflowId = "update-line-item-in-cart" * ::: */ export const updateLineItemInCartWorkflow = createWorkflow( - updateLineItemInCartWorkflowId, + { + name: updateLineItemInCartWorkflowId, + idempotent: true, + }, ( input: WorkflowData ) => { diff --git a/packages/core/core-flows/src/line-item/workflows/delete-line-items.ts b/packages/core/core-flows/src/line-item/workflows/delete-line-items.ts index c749a4a43f..e59f05ddf0 100644 --- a/packages/core/core-flows/src/line-item/workflows/delete-line-items.ts +++ b/packages/core/core-flows/src/line-item/workflows/delete-line-items.ts @@ -5,7 +5,7 @@ import { deleteLineItemsStep } from "../steps/delete-line-items" /** * The data to delete line items from a cart. */ -export type DeleteLineItemsWorkflowInput = { +export type DeleteLineItemsWorkflowInput = { /** * The cart's ID. */ @@ -20,10 +20,10 @@ export const deleteLineItemsWorkflowId = "delete-line-items" /** * This workflow deletes line items from a cart. It's used by the * [Delete Line Item Store API Route](https://docs.medusajs.com/api/store#carts_deletecartsidlineitemsline_id). - * + * * You can use this workflow within your customizations or your own custom workflows, allowing you to * delete line items from a cart within your custom flows. - * + * * @example * const { result } = await deleteLineItemsWorkflow(container) * .run({ @@ -32,13 +32,16 @@ export const deleteLineItemsWorkflowId = "delete-line-items" * ids: ["li_123"] * } * }) - * + * * @summary - * + * * Delete line items from a cart. */ export const deleteLineItemsWorkflow = createWorkflow( - deleteLineItemsWorkflowId, + { + name: deleteLineItemsWorkflowId, + idempotent: true, + }, (input: WorkflowData) => { deleteLineItemsStep(input.ids) diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index e044e74731..1efad116eb 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -37,6 +37,7 @@ import { TransactionStepTimeoutError, TransactionTimeoutError, } from "./errors" +import { Context } from "@medusajs/types" /** * @class TransactionOrchestrator is responsible for managing and executing distributed transactions. @@ -1380,7 +1381,8 @@ export class TransactionOrchestrator extends EventEmitter { private createTransactionFlow( transactionId: string, - flowMetadata?: TransactionFlow["metadata"] + flowMetadata?: TransactionFlow["metadata"], + context?: Context ): TransactionFlow { const [steps, features] = TransactionOrchestrator.buildSteps( this.definition @@ -1390,7 +1392,7 @@ export class TransactionOrchestrator extends EventEmitter { modelId: this.id, options: this.options, transactionId: transactionId, - runId: ulid(), + runId: context?.runId ?? ulid(), metadata: flowMetadata, hasAsyncSteps: features.hasAsyncSteps, hasFailedSteps: false, @@ -1541,12 +1543,14 @@ export class TransactionOrchestrator extends EventEmitter { handler, payload, flowMetadata, + context, onLoad, }: { transactionId: string handler: TransactionStepHandler payload?: unknown flowMetadata?: TransactionFlow["metadata"] + context?: Context onLoad?: (transaction: DistributedTransactionType) => Promise | void }): Promise { const existingTransaction = @@ -1555,7 +1559,11 @@ export class TransactionOrchestrator extends EventEmitter { let newTransaction = false let modelFlow: TransactionFlow if (!existingTransaction) { - modelFlow = this.createTransactionFlow(transactionId, flowMetadata) + modelFlow = this.createTransactionFlow( + transactionId, + flowMetadata, + context + ) newTransaction = true } else { modelFlow = existingTransaction.flow diff --git a/packages/core/orchestration/src/workflow/local-workflow.ts b/packages/core/orchestration/src/workflow/local-workflow.ts index ca995c725e..cdcb1d7b62 100644 --- a/packages/core/orchestration/src/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/workflow/local-workflow.ts @@ -362,6 +362,7 @@ export class LocalWorkflow { handler: handler(this.container_, context), payload: input, flowMetadata, + context, onLoad: this.onLoad.bind(this), }) diff --git a/packages/core/types/src/shared-context.ts b/packages/core/types/src/shared-context.ts index 3af0ee09a3..4b6f6b04d1 100644 --- a/packages/core/types/src/shared-context.ts +++ b/packages/core/types/src/shared-context.ts @@ -54,6 +54,11 @@ export type Context = { */ transactionId?: string + /** + * A string indicating the ID of the current run. + */ + runId?: string + /** * An instance of a message aggregator, which is used to aggregate messages to be emitted at a later point. */ diff --git a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts index 7807d9d4fe..5794fd692e 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts @@ -211,6 +211,7 @@ export function createWorkflow( step.__step__ + "-" + (stepContext.transactionId ?? ulid()), parentStepIdempotencyKey: stepContext.idempotencyKey, preventReleaseEvents: true, + runId: stepContext.runId, } let transaction diff --git a/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts b/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts index f8b74dedd4..7e43bbc3f8 100644 --- a/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts +++ b/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts @@ -38,6 +38,7 @@ function buildStepContext({ parentStepIdempotencyKey: flowMetadata?.parentStepIdempotencyKey as string, preventReleaseEvents: flowMetadata?.preventReleaseEvents ?? false, transactionId: stepArguments.context!.transactionId, + runId: flow.runId, context: stepArguments.context!, " stepDefinition": stepDefinition, " getStepResult"( diff --git a/packages/core/workflows-sdk/src/utils/composer/type.ts b/packages/core/workflows-sdk/src/utils/composer/type.ts index 5b343e9b65..afb0c14b36 100644 --- a/packages/core/workflows-sdk/src/utils/composer/type.ts +++ b/packages/core/workflows-sdk/src/utils/composer/type.ts @@ -192,6 +192,11 @@ export interface StepExecutionContext { */ transactionId?: string + /** + * A string indicating the ID of the current run. + */ + runId?: string + /** * Get access to the result returned by a named step. Returns undefined * when step is not found or when nothing was returned. diff --git a/packages/medusa/src/api/store/carts/[id]/customer/route.ts b/packages/medusa/src/api/store/carts/[id]/customer/route.ts index c54b77b686..04921567b7 100644 --- a/packages/medusa/src/api/store/carts/[id]/customer/route.ts +++ b/packages/medusa/src/api/store/carts/[id]/customer/route.ts @@ -1,23 +1,25 @@ -import { transferCartCustomerWorkflow } from "@medusajs/core-flows" +import { transferCartCustomerWorkflowId } from "@medusajs/core-flows" import { HttpTypes } from "@medusajs/framework/types" import { AuthenticatedMedusaRequest, MedusaResponse, } from "@medusajs/framework/http" +import { Modules } from "@medusajs/framework/utils" import { refetchCart } from "../../helpers" export const POST = async ( req: AuthenticatedMedusaRequest, res: MedusaResponse ) => { - const workflow = transferCartCustomerWorkflow(req.scope) + const we = req.scope.resolve(Modules.WORKFLOW_ENGINE) - await workflow.run({ + await we.run(transferCartCustomerWorkflowId, { input: { id: req.params.id, customer_id: req.auth_context?.actor_id, }, + transactionId: "cart-transfer-customer-" + req.params.id, }) const cart = await refetchCart( diff --git a/packages/medusa/src/api/store/carts/[id]/line-items/[line_id]/route.ts b/packages/medusa/src/api/store/carts/[id]/line-items/[line_id]/route.ts index 189e060b47..41d94eeade 100644 --- a/packages/medusa/src/api/store/carts/[id]/line-items/[line_id]/route.ts +++ b/packages/medusa/src/api/store/carts/[id]/line-items/[line_id]/route.ts @@ -1,11 +1,11 @@ import { - deleteLineItemsWorkflow, - updateLineItemInCartWorkflow, + deleteLineItemsWorkflowId, + updateLineItemInCartWorkflowId, } from "@medusajs/core-flows" import { prepareListQuery } from "@medusajs/framework" import { MedusaRequest, MedusaResponse } from "@medusajs/framework/http" import { HttpTypes } from "@medusajs/framework/types" -import { MedusaError } from "@medusajs/framework/utils" +import { MedusaError, Modules } from "@medusajs/framework/utils" import { refetchCart } from "../../../helpers" import { StoreUpdateCartLineItemType } from "../../../validators" @@ -40,12 +40,14 @@ export const POST = async ( ) } - await updateLineItemInCartWorkflow(req.scope).run({ + const we = req.scope.resolve(Modules.WORKFLOW_ENGINE) + await we.run(updateLineItemInCartWorkflowId, { input: { cart_id: req.params.id, item_id: item.id, update: req.validatedBody, }, + transactionId: "cart-update-item-" + req.params.id, }) const updatedCart = await refetchCart( @@ -63,8 +65,10 @@ export const DELETE = async ( ) => { const id = req.params.line_id - await deleteLineItemsWorkflow(req.scope).run({ + const we = req.scope.resolve(Modules.WORKFLOW_ENGINE) + await we.run(deleteLineItemsWorkflowId, { input: { cart_id: req.params.id, ids: [id] }, + transactionId: "cart-delete-item-" + req.params.id, }) const cart = await refetchCart( diff --git a/packages/medusa/src/api/store/carts/[id]/line-items/route.ts b/packages/medusa/src/api/store/carts/[id]/line-items/route.ts index c071a9116d..69e6253c1c 100644 --- a/packages/medusa/src/api/store/carts/[id]/line-items/route.ts +++ b/packages/medusa/src/api/store/carts/[id]/line-items/route.ts @@ -1,6 +1,7 @@ -import { addToCartWorkflow } from "@medusajs/core-flows" +import { addToCartWorkflowId } from "@medusajs/core-flows" import { MedusaRequest, MedusaResponse } from "@medusajs/framework/http" import { HttpTypes } from "@medusajs/framework/types" +import { Modules } from "@medusajs/utils" import { refetchCart } from "../../helpers" import { StoreAddCartLineItemType } from "../../validators" @@ -8,11 +9,13 @@ export const POST = async ( req: MedusaRequest, res: MedusaResponse ) => { - await addToCartWorkflow(req.scope).run({ + const we = req.scope.resolve(Modules.WORKFLOW_ENGINE) + await we.run(addToCartWorkflowId, { input: { cart_id: req.params.id, items: [req.validatedBody], }, + transactionId: "cart-add-item-" + req.params.id, }) const cart = await refetchCart( diff --git a/packages/medusa/src/api/store/carts/[id]/promotions/route.ts b/packages/medusa/src/api/store/carts/[id]/promotions/route.ts index 1f32b3f1c6..4ade40d2e9 100644 --- a/packages/medusa/src/api/store/carts/[id]/promotions/route.ts +++ b/packages/medusa/src/api/store/carts/[id]/promotions/route.ts @@ -1,22 +1,26 @@ -import { updateCartPromotionsWorkflow } from "@medusajs/core-flows" -import { PromotionActions } from "@medusajs/framework/utils" +import { updateCartPromotionsWorkflowId } from "@medusajs/core-flows" import { MedusaRequest, MedusaResponse } from "@medusajs/framework/http" -import { refetchCart } from "../../helpers" import { HttpTypes } from "@medusajs/framework/types" +import { Modules, PromotionActions } from "@medusajs/framework/utils" +import { refetchCart } from "../../helpers" export const POST = async ( req: MedusaRequest, res: MedusaResponse ) => { - const workflow = updateCartPromotionsWorkflow(req.scope) + const we = req.scope.resolve(Modules.WORKFLOW_ENGINE) const payload = req.validatedBody - await workflow.run({ + await we.run(updateCartPromotionsWorkflowId, { input: { promo_codes: payload.promo_codes, cart_id: req.params.id, - action: PromotionActions.ADD, + action: + payload.promo_codes.length > 0 + ? PromotionActions.ADD + : PromotionActions.REPLACE, }, + transactionId: "cart-update-promotions-" + req.params.id, }) const cart = await refetchCart( @@ -34,15 +38,16 @@ export const DELETE = async ( cart: HttpTypes.StoreCart }> ) => { - const workflow = updateCartPromotionsWorkflow(req.scope) + const we = req.scope.resolve(Modules.WORKFLOW_ENGINE) const payload = req.validatedBody - await workflow.run({ + await we.run(updateCartPromotionsWorkflowId, { input: { promo_codes: payload.promo_codes, cart_id: req.params.id, action: PromotionActions.REMOVE, }, + transactionId: "cart-delete-promotions-" + req.params.id, }) const cart = await refetchCart( diff --git a/packages/medusa/src/api/store/carts/[id]/route.ts b/packages/medusa/src/api/store/carts/[id]/route.ts index 99fb325022..d2fc6bbd9a 100644 --- a/packages/medusa/src/api/store/carts/[id]/route.ts +++ b/packages/medusa/src/api/store/carts/[id]/route.ts @@ -1,4 +1,4 @@ -import { updateCartWorkflow } from "@medusajs/core-flows" +import { updateCartWorkflowId } from "@medusajs/core-flows" import { AdditionalData, HttpTypes, @@ -6,6 +6,7 @@ import { } from "@medusajs/framework/types" import { MedusaRequest, MedusaResponse } from "@medusajs/framework/http" +import { Modules } from "@medusajs/framework/utils" import { refetchCart } from "../helpers" export const GET = async ( @@ -27,13 +28,14 @@ export const POST = async ( cart: HttpTypes.StoreCart }> ) => { - const workflow = updateCartWorkflow(req.scope) + const we = req.scope.resolve(Modules.WORKFLOW_ENGINE) - await workflow.run({ + await we.run(updateCartWorkflowId, { input: { ...req.validatedBody, id: req.params.id, }, + transactionId: "cart-update-" + req.params.id, }) const cart = await refetchCart( diff --git a/packages/medusa/src/api/store/payment-collections/route.ts b/packages/medusa/src/api/store/payment-collections/route.ts index ea8c7a762e..a41fc2364a 100644 --- a/packages/medusa/src/api/store/payment-collections/route.ts +++ b/packages/medusa/src/api/store/payment-collections/route.ts @@ -1,13 +1,14 @@ -import { createPaymentCollectionForCartWorkflow } from "@medusajs/core-flows" -import { - ContainerRegistrationKeys, - remoteQueryObjectFromString, -} from "@medusajs/framework/utils" +import { createPaymentCollectionForCartWorkflowId } from "@medusajs/core-flows" import { AuthenticatedMedusaRequest, MedusaResponse, } from "@medusajs/framework/http" import { HttpTypes } from "@medusajs/framework/types" +import { + ContainerRegistrationKeys, + remoteQueryObjectFromString, +} from "@medusajs/framework/utils" +import { Modules } from "@medusajs/utils" export const POST = async ( req: AuthenticatedMedusaRequest, @@ -27,8 +28,10 @@ export const POST = async ( let paymentCollection = cartCollectionRelation?.payment_collection if (!paymentCollection) { - await createPaymentCollectionForCartWorkflow(req.scope).run({ + const we = req.scope.resolve(Modules.WORKFLOW_ENGINE) + await we.run(createPaymentCollectionForCartWorkflowId, { input: req.body, + transactionId: "create-payment-collection-for-cart-" + cart_id, }) const [cartCollectionRelation] = await remoteQuery( diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_idempotent.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_idempotent.ts index df5b3e72f5..88e4c5957c 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_idempotent.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_idempotent.ts @@ -54,6 +54,7 @@ createWorkflow( { name: "workflow_idempotent", idempotent: true, + retentionTime: 20, }, function (input) { step_1(input) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync.ts index 8c87afa4e1..092f7013d0 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync.ts @@ -1,10 +1,10 @@ +import { isPresent } from "@medusajs/framework/utils" import { createStep, createWorkflow, StepResponse, WorkflowResponse, } from "@medusajs/framework/workflows-sdk" -import { isPresent } from "@medusajs/framework/utils" const step_1 = createStep( "step_1", @@ -53,6 +53,7 @@ const step_3 = createStep( createWorkflow( { name: "workflow_sync", + retentionTime: 20, idempotent: true, }, function (input) { diff --git a/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json b/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json index 1dd73c39a1..20e0503696 100644 --- a/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json +++ b/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json @@ -177,6 +177,15 @@ "unique": false, "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state\" ON \"workflow_execution\" (state) WHERE deleted_at IS NULL" }, + { + "keyName": "IDX_workflow_execution_run_id", + "columnNames": [], + "composite": false, + "constraint": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_run_id\" ON \"workflow_execution\" (run_id) WHERE deleted_at IS NULL" + }, { "keyName": "workflow_execution_pkey", "columnNames": [ diff --git a/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250819104213.ts b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250819104213.ts new file mode 100644 index 0000000000..d758bc325d --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250819104213.ts @@ -0,0 +1,13 @@ +import { Migration } from '@mikro-orm/migrations'; + +export class Migration20250819104213 extends Migration { + + override async up(): Promise { + this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_run_id" ON "workflow_execution" (run_id) WHERE deleted_at IS NULL;`); + } + + override async down(): Promise { + this.addSql(`drop index if exists "IDX_workflow_execution_run_id";`); + } + +} diff --git a/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts b/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts index 15f56fbd13..6a92309133 100644 --- a/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts +++ b/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts @@ -34,4 +34,8 @@ export const WorkflowExecution = model on: ["state"], where: "deleted_at IS NULL", }, + { + on: ["run_id"], + where: "deleted_at IS NULL", + }, ]) diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts index 1f9084a9d5..1df1e024fe 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts @@ -180,6 +180,7 @@ export class WorkflowsModuleService< ...(restContext ?? {}), ...(options_.context ?? {}), eventGroupId, + runId: options_.runId, preventReleaseEvents: localPreventReleaseEvents, } diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index c299e74598..9b968766cd 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -138,8 +138,6 @@ export class InMemoryDistributedTransactionStorage private async deleteFromDb(data: TransactionCheckpoint) { await this.workflowExecutionService_.delete([ { - workflow_id: data.flow.modelId, - transaction_id: data.flow.transactionId, run_id: data.flow.runId, }, ]) @@ -223,7 +221,7 @@ export class InMemoryDistributedTransactionStorage TransactionState.REVERTED, ].includes(data.flow.state) - const { retentionTime, idempotent } = options ?? {} + const { retentionTime } = options ?? {} await this.#preventRaceConditionExecutionIfNecessary({ data, @@ -261,8 +259,13 @@ export class InMemoryDistributedTransactionStorage // Optimize DB operations - only perform when necessary if (hasFinished) { - if (!retentionTime && !idempotent) { - await this.deleteFromDb(data) + if (!retentionTime) { + // If the workflow is nested, we cant just remove it because it would break the compensation algorithm. Instead, it will get deleted when the top level parent is deleted. + if (!flow.metadata?.parentStepIdempotencyKey) { + await this.deleteFromDb(data) + } else { + await this.saveToDb(data, retentionTime) + } } else { await this.saveToDb(data, retentionTime) } diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync.ts index 8c87afa4e1..0f198178f6 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync.ts @@ -1,10 +1,10 @@ +import { isPresent } from "@medusajs/framework/utils" import { createStep, createWorkflow, StepResponse, WorkflowResponse, } from "@medusajs/framework/workflows-sdk" -import { isPresent } from "@medusajs/framework/utils" const step_1 = createStep( "step_1", @@ -54,6 +54,7 @@ createWorkflow( { name: "workflow_sync", idempotent: true, + retentionTime: 20, }, function (input) { step_1(input) diff --git a/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json b/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json index 1dd73c39a1..20e0503696 100644 --- a/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json +++ b/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json @@ -177,6 +177,15 @@ "unique": false, "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state\" ON \"workflow_execution\" (state) WHERE deleted_at IS NULL" }, + { + "keyName": "IDX_workflow_execution_run_id", + "columnNames": [], + "composite": false, + "constraint": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_run_id\" ON \"workflow_execution\" (run_id) WHERE deleted_at IS NULL" + }, { "keyName": "workflow_execution_pkey", "columnNames": [ diff --git a/packages/modules/workflow-engine-redis/src/migrations/Migration20250819110923.ts b/packages/modules/workflow-engine-redis/src/migrations/Migration20250819110923.ts new file mode 100644 index 0000000000..6ca8cace84 --- /dev/null +++ b/packages/modules/workflow-engine-redis/src/migrations/Migration20250819110923.ts @@ -0,0 +1,13 @@ +import { Migration } from '@mikro-orm/migrations'; + +export class Migration20250819110923 extends Migration { + + override async up(): Promise { + this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_run_id" ON "workflow_execution" (run_id) WHERE deleted_at IS NULL;`); + } + + override async down(): Promise { + this.addSql(`drop index if exists "IDX_workflow_execution_run_id";`); + } + +} diff --git a/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts b/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts index 15f56fbd13..6a92309133 100644 --- a/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts +++ b/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts @@ -34,4 +34,8 @@ export const WorkflowExecution = model on: ["state"], where: "deleted_at IS NULL", }, + { + on: ["run_id"], + where: "deleted_at IS NULL", + }, ]) diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 6c93e74b88..9aa2f5c1af 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -220,8 +220,6 @@ export class RedisDistributedTransactionStorage private async deleteFromDb(data: TransactionCheckpoint) { await this.workflowExecutionService_.delete([ { - workflow_id: data.flow.modelId, - transaction_id: data.flow.transactionId, run_id: data.flow.runId, }, ]) @@ -351,7 +349,7 @@ export class RedisDistributedTransactionStorage TransactionState.REVERTED, ].includes(data.flow.state) - const { retentionTime, idempotent } = options ?? {} + const { retentionTime } = options ?? {} await this.#preventRaceConditionExecutionIfNecessary({ data, @@ -416,8 +414,13 @@ export class RedisDistributedTransactionStorage }) // Database operations - if (hasFinished && !retentionTime && !idempotent) { - await promiseAll([pipelinePromise, this.deleteFromDb(data)]) + if (hasFinished && !retentionTime) { + // If the workflow is nested, we cant just remove it because it would break the compensation algorithm. Instead, it will get deleted when the top level parent is deleted. + if (!data.flow.metadata?.parentStepIdempotencyKey) { + await promiseAll([pipelinePromise, this.deleteFromDb(data)]) + } else { + await promiseAll([pipelinePromise, this.saveToDb(data, retentionTime)]) + } } else { await promiseAll([pipelinePromise, this.saveToDb(data, retentionTime)]) }