chore: idempotent cart operations (#13236)

* chore(core-flows): idempotent cart operations

* changeset

* add tests

* revert

* revert route

* promo test

* skip bugs

* fix test

* tests

* avoid workflow name conflict

* prevent nested workflow from being deleted until the top level parent finishes

* remove unused setTimeout

* update changeset

* rm comments

---------

Co-authored-by: adrien2p <adrien.deperetti@gmail.com>
This commit is contained in:
Carlos R. L. Rodrigues
2025-08-28 10:04:00 -03:00
committed by GitHub
parent b111d01898
commit 9412669e65
38 changed files with 890 additions and 64 deletions

View File

@@ -0,0 +1,11 @@
---
"@medusajs/workflow-engine-inmemory": minor
"@medusajs/workflow-engine-redis": minor
"@medusajs/core-flows": minor
"@medusajs/medusa": minor
"@medusajs/orchestration": minor
"@medusajs/types": minor
"@medusajs/workflows-sdk": minor
---
chore(core-flows): idempotent cart operations and nested workflow deletion lifecycle fix

View File

@@ -8,7 +8,7 @@ import {
import { setupTaxStructure } from "../../../../modules/__tests__/fixtures/tax"
import { medusaTshirtProduct } from "../../../__fixtures__/product"
jest.setTimeout(50000)
jest.setTimeout(50000000)
const adminHeaders = {
headers: { "x-medusa-access-token": "test_token" },
@@ -609,6 +609,11 @@ medusaIntegrationTestRunner({
expect(cartWithPromotion1).toEqual(
expect.objectContaining({
items: [
expect.objectContaining({
adjustments: [],
}),
],
promotions: [],
})
)
@@ -621,6 +626,140 @@ medusaIntegrationTestRunner({
)
).data.cart
expect(cartWithPromotion2).toEqual(
expect.objectContaining({
items: [
expect.objectContaining({
adjustments: [
expect.objectContaining({
code: response.data.promotion.code,
}),
],
}),
],
promotions: [
expect.objectContaining({
code: response.data.promotion.code,
}),
],
})
)
})
it("should add promotion and remove it from cart using delete", async () => {
const publishableKey = await generatePublishableKey(appContainer)
const storeHeaders = generateStoreHeaders({ publishableKey })
const salesChannel = (
await api.post(
"/admin/sales-channels",
{ name: "Webshop", description: "channel" },
adminHeaders
)
).data.sales_channel
const region = (
await api.post(
"/admin/regions",
{ name: "US", currency_code: "usd", countries: ["us"] },
adminHeaders
)
).data.region
const product = (
await api.post(
"/admin/products",
{
...medusaTshirtProduct,
shipping_profile_id: shippingProfile.id,
},
adminHeaders
)
).data.product
const cart = (
await api.post(
`/store/carts`,
{
currency_code: "usd",
sales_channel_id: salesChannel.id,
region_id: region.id,
items: [{ variant_id: product.variants[0].id, quantity: 1 }],
},
storeHeaders
)
).data.cart
const response = await api.post(
`/admin/promotions`,
{
code: "TEST",
type: PromotionType.STANDARD,
status: PromotionStatus.ACTIVE,
is_automatic: false,
application_method: {
target_type: "items",
type: "fixed",
allocation: "each",
currency_code: "usd",
value: 100,
max_quantity: 100,
},
rules: [
{
attribute: "subtotal",
operator: "gte",
values: "1",
},
],
},
adminHeaders
)
// Simulate concurrent requests
await Promise.all([
api
.post(
`/store/carts/${cart.id}`,
{
promo_codes: [response.data.promotion.code],
},
storeHeaders
)
.catch(() => {}),
api
.post(
`/store/carts/${cart.id}`,
{
promo_codes: [response.data.promotion.code],
},
storeHeaders
)
.catch(() => {}),
])
const cartAfterPromotion = (
await api.get(`/store/carts/${cart.id}`, storeHeaders)
).data.cart
expect(cartAfterPromotion).toEqual(
expect.objectContaining({
promotions: [
expect.objectContaining({
code: response.data.promotion.code,
}),
],
})
)
const cartWithPromotion2 = (
await api.post(
`/store/carts/${cart.id}/line-items`,
{ variant_id: product.variants[0].id, quantity: 40 },
storeHeaders
)
).data.cart
expect(cartWithPromotion2).toEqual(
expect.objectContaining({
promotions: [
@@ -630,6 +769,547 @@ medusaIntegrationTestRunner({
],
})
)
await api.delete(`/store/carts/${cart.id}/promotions`, {
data: {
promo_codes: [response.data.promotion.code],
},
...storeHeaders,
})
const cartWithoutPromotion = (
await api.get(`/store/carts/${cart.id}`, storeHeaders)
).data.cart
expect(cartWithoutPromotion).toEqual(
expect.objectContaining({
promotions: [],
})
)
})
it("should add promotion and remove it from cart using update", async () => {
const publishableKey = await generatePublishableKey(appContainer)
const storeHeaders = generateStoreHeaders({ publishableKey })
const salesChannel = (
await api.post(
"/admin/sales-channels",
{ name: "Webshop", description: "channel" },
adminHeaders
)
).data.sales_channel
const region = (
await api.post(
"/admin/regions",
{ name: "US", currency_code: "usd", countries: ["us"] },
adminHeaders
)
).data.region
const product = (
await api.post(
"/admin/products",
{
...medusaTshirtProduct,
shipping_profile_id: shippingProfile.id,
},
adminHeaders
)
).data.product
const cart = (
await api.post(
`/store/carts`,
{
currency_code: "usd",
sales_channel_id: salesChannel.id,
region_id: region.id,
items: [{ variant_id: product.variants[0].id, quantity: 1 }],
},
storeHeaders
)
).data.cart
const response = await api.post(
`/admin/promotions`,
{
code: "TEST",
type: PromotionType.STANDARD,
status: PromotionStatus.ACTIVE,
is_automatic: false,
application_method: {
target_type: "items",
type: "fixed",
allocation: "each",
currency_code: "usd",
value: 100,
max_quantity: 100,
},
rules: [
{
attribute: "subtotal",
operator: "gte",
values: "1",
},
],
},
adminHeaders
)
await api.post(
`/store/carts/${cart.id}`,
{
promo_codes: [response.data.promotion.code],
},
storeHeaders
)
const cartAfterPromotion = (
await api.get(`/store/carts/${cart.id}`, storeHeaders)
).data.cart
expect(cartAfterPromotion).toEqual(
expect.objectContaining({
items: [
expect.objectContaining({
adjustments: [
expect.objectContaining({
code: response.data.promotion.code,
}),
],
}),
],
promotions: [
expect.objectContaining({
code: response.data.promotion.code,
}),
],
})
)
const cartWithPromotion2 = (
await api.post(
`/store/carts/${cart.id}/line-items`,
{ variant_id: product.variants[0].id, quantity: 40 },
storeHeaders
)
).data.cart
expect(cartWithPromotion2).toEqual(
expect.objectContaining({
items: [
expect.objectContaining({
adjustments: [
expect.objectContaining({
code: response.data.promotion.code,
}),
],
}),
],
promotions: [
expect.objectContaining({
code: response.data.promotion.code,
}),
],
})
)
await api.post(
`/store/carts/${cart.id}`,
{
promo_codes: [],
},
storeHeaders
)
const cartWithoutPromotion = (
await api.get(`/store/carts/${cart.id}`, storeHeaders)
).data.cart
expect(cartWithoutPromotion).toEqual(
expect.objectContaining({
items: [
expect.objectContaining({
adjustments: [],
}),
],
promotions: [],
})
)
})
it.skip("should add two promotions and remove one from cart using delete", async () => {
const publishableKey = await generatePublishableKey(appContainer)
const storeHeaders = generateStoreHeaders({ publishableKey })
const salesChannel = (
await api.post(
"/admin/sales-channels",
{ name: "Webshop", description: "channel" },
adminHeaders
)
).data.sales_channel
const region = (
await api.post(
"/admin/regions",
{ name: "US", currency_code: "usd", countries: ["us"] },
adminHeaders
)
).data.region
const product = (
await api.post(
"/admin/products",
{
...medusaTshirtProduct,
shipping_profile_id: shippingProfile.id,
},
adminHeaders
)
).data.product
const cart = (
await api.post(
`/store/carts`,
{
currency_code: "usd",
sales_channel_id: salesChannel.id,
region_id: region.id,
items: [{ variant_id: product.variants[0].id, quantity: 1 }],
},
storeHeaders
)
).data.cart
const promo1 = await api.post(
`/admin/promotions`,
{
code: "TEST",
type: PromotionType.STANDARD,
status: PromotionStatus.ACTIVE,
is_automatic: false,
application_method: {
target_type: "items",
type: "fixed",
allocation: "each",
currency_code: "usd",
value: 100,
max_quantity: 100,
},
rules: [
{
attribute: "subtotal",
operator: "gte",
values: "1",
},
],
},
adminHeaders
)
const promo2 = await api.post(
`/admin/promotions`,
{
code: "TEST2",
type: PromotionType.STANDARD,
status: PromotionStatus.ACTIVE,
is_automatic: false,
application_method: {
target_type: "items",
type: "fixed",
allocation: "each",
currency_code: "usd",
value: 100,
max_quantity: 100,
},
rules: [
{
attribute: "subtotal",
operator: "gte",
values: "2000",
},
],
},
adminHeaders
)
const cartWithPromotion2 = (
await api.post(
`/store/carts/${cart.id}/line-items`,
{ variant_id: product.variants[0].id, quantity: 40 },
storeHeaders
)
).data.cart
expect(cartWithPromotion2).toEqual(
expect.objectContaining({
items: [
expect.objectContaining({
adjustments: [
expect.objectContaining({
code: promo1.data.promotion.code,
}),
expect.objectContaining({
code: promo2.data.promotion.code,
}),
],
}),
],
promotions: [
expect.objectContaining({
code: promo1.data.promotion.code,
}),
expect.objectContaining({
code: promo2.data.promotion.code,
}),
],
})
)
await api.delete(`/store/carts/${cart.id}/promotions`, {
data: {
promo_codes: [promo1.data.promotion.code],
},
...storeHeaders,
})
const cartWithoutPromotion1 = (
await api.get(`/store/carts/${cart.id}`, storeHeaders)
).data.cart
expect(cartWithoutPromotion1).toEqual(
expect.objectContaining({
items: [
expect.objectContaining({
adjustments: [
expect.objectContaining({
code: promo2.data.promotion.code,
}),
],
}),
],
promotions: [
expect.objectContaining({
code: promo2.data.promotion.code,
}),
],
})
)
})
it.skip("should add two promotions and remove one from cart using update", async () => {
const publishableKey = await generatePublishableKey(appContainer)
const storeHeaders = generateStoreHeaders({ publishableKey })
const salesChannel = (
await api.post(
"/admin/sales-channels",
{ name: "Webshop", description: "channel" },
adminHeaders
)
).data.sales_channel
const region = (
await api.post(
"/admin/regions",
{ name: "US", currency_code: "usd", countries: ["us"] },
adminHeaders
)
).data.region
const product = (
await api.post(
"/admin/products",
{
...medusaTshirtProduct,
shipping_profile_id: shippingProfile.id,
},
adminHeaders
)
).data.product
const cart = (
await api.post(
`/store/carts`,
{
currency_code: "usd",
sales_channel_id: salesChannel.id,
region_id: region.id,
items: [{ variant_id: product.variants[0].id, quantity: 1 }],
},
storeHeaders
)
).data.cart
const [promo1, promo2, promoAutomatic] = await Promise.all([
api.post(
`/admin/promotions`,
{
code: "TEST",
type: PromotionType.STANDARD,
status: PromotionStatus.ACTIVE,
is_automatic: false,
application_method: {
target_type: "items",
type: "fixed",
allocation: "each",
currency_code: "usd",
value: 50,
max_quantity: 100,
},
rules: [
{
attribute: "subtotal",
operator: "gte",
values: "1",
},
],
},
adminHeaders
),
api.post(
`/admin/promotions`,
{
code: "TEST_CODE_123",
type: PromotionType.STANDARD,
status: PromotionStatus.ACTIVE,
is_automatic: false,
application_method: {
target_type: "items",
type: "fixed",
allocation: "each",
currency_code: "usd",
value: 10,
max_quantity: 100,
},
rules: [
{
attribute: "subtotal",
operator: "gte",
values: "2000",
},
],
},
adminHeaders
),
api.post(
`/admin/promotions`,
{
code: "AUTOMATIC_PROMO",
type: PromotionType.STANDARD,
status: PromotionStatus.ACTIVE,
is_automatic: true,
application_method: {
target_type: "items",
type: "fixed",
allocation: "each",
currency_code: "usd",
value: 5,
max_quantity: 100,
},
rules: [
{
attribute: "subtotal",
operator: "gte",
values: "500",
},
],
},
adminHeaders
),
])
// apply promotions
await api.post(
`/store/carts/${cart.id}`,
{
promo_codes: [
promo1.data.promotion.code,
promo2.data.promotion.code,
],
},
storeHeaders
)
const cartWithPromotion2 = (
await api.post(
`/store/carts/${cart.id}/line-items`,
{
variant_id: product.variants[0].id,
quantity: 40,
},
storeHeaders
)
).data.cart
expect(cartWithPromotion2).toEqual(
expect.objectContaining({
items: [
expect.objectContaining({
adjustments: [
expect.objectContaining({
code: promo1.data.promotion.code,
}),
expect.objectContaining({
code: promo2.data.promotion.code,
}),
expect.objectContaining({
code: promoAutomatic.data.promotion.code,
}),
],
}),
],
promotions: [
expect.objectContaining({
code: promo1.data.promotion.code,
}),
expect.objectContaining({
code: promo2.data.promotion.code,
}),
expect.objectContaining({
code: promoAutomatic.data.promotion.code,
}),
],
})
)
await api.post(
`/store/carts/${cart.id}`,
{
promo_codes: [promo2.data.promotion.code],
},
storeHeaders
)
const cartWithoutPromotion1 = (
await api.get(`/store/carts/${cart.id}`, storeHeaders)
).data.cart
expect(cartWithoutPromotion1).toEqual(
expect.objectContaining({
items: [
expect.objectContaining({
adjustments: [
expect.objectContaining({
code: promo2.data.promotion.code,
}),
expect.objectContaining({
code: promoAutomatic.data.promotion.code,
}),
],
}),
],
promotions: [
expect.objectContaining({
code: promo2.data.promotion.code,
}),
expect.objectContaining({
code: promoAutomatic.data.promotion.code,
}),
],
})
)
})
})

View File

@@ -1,3 +1,4 @@
import { emitEventStep } from "@medusajs/core-flows"
import { Modules, TransactionState } from "@medusajs/framework/utils"
import {
createStep,
@@ -7,12 +8,11 @@ import {
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"
import { medusaIntegrationTestRunner } from "@medusajs/test-utils"
import { IEventBusModuleService } from "@medusajs/types"
import {
adminHeaders,
createAdminUser,
} from "../../../helpers/create-admin-user"
import { emitEventStep } from "@medusajs/core-flows"
import { IEventBusModuleService } from "@medusajs/types"
jest.setTimeout(300000)
@@ -106,7 +106,7 @@ medusaIntegrationTestRunner({
createWorkflow(
{
name: "my-workflow-name",
name: "my-workflow-name-2",
retentionTime: 50,
},
function (input: WorkflowData<{ initial: string }>) {
@@ -137,7 +137,7 @@ medusaIntegrationTestRunner({
const engine = container.resolve(Modules.WORKFLOW_ENGINE)
const transactionId = "trx-id-failing-event"
const res = await engine.run("my-workflow-name", {
const res = await engine.run("my-workflow-name-2", {
transactionId,
input: {
initial: "abc",

View File

@@ -114,7 +114,10 @@ export const addToCartWorkflowId = "add-to-cart"
* :::
*/
export const addToCartWorkflow = createWorkflow(
addToCartWorkflowId,
{
name: addToCartWorkflowId,
idempotent: true,
},
(input: WorkflowData<AddToCartWorkflowInputDTO & AdditionalData>) => {
const cartQuery = useQueryGraphStep({
entity: "cart",

View File

@@ -79,7 +79,10 @@ export const createPaymentCollectionForCartWorkflowId =
* Create payment collection for cart.
*/
export const createPaymentCollectionForCartWorkflow = createWorkflow(
createPaymentCollectionForCartWorkflowId,
{
name: createPaymentCollectionForCartWorkflowId,
idempotent: true,
},
(
input: WorkflowData<CreatePaymentCollectionForCartWorkflowInputDTO>
): WorkflowData<void> => {

View File

@@ -128,7 +128,10 @@ export const refreshCartItemsWorkflowId = "refresh-cart-items"
*
*/
export const refreshCartItemsWorkflow = createWorkflow(
refreshCartItemsWorkflowId,
{
name: refreshCartItemsWorkflowId,
idempotent: true,
},
(input: WorkflowData<RefreshCartItemsWorkflowInput & AdditionalData>) => {
const setPricingContext = createHook(
"setPricingContext",

View File

@@ -50,7 +50,10 @@ export const refreshCartShippingMethodsWorkflowId =
* @property hooks.validate - This hook is executed before all operations. You can consume this hook to perform any custom validation. If validation fails, you can throw an error to stop the workflow execution.
*/
export const refreshCartShippingMethodsWorkflow = createWorkflow(
refreshCartShippingMethodsWorkflowId,
{
name: refreshCartShippingMethodsWorkflowId,
idempotent: true,
},
(input: WorkflowData<RefreshCartShippingMethodsWorkflowInput>) => {
const fetchCart = when("fetch-cart", { input }, ({ input }) => {
return !input.cart

View File

@@ -54,7 +54,10 @@ export const refreshPaymentCollectionForCartWorkflowId =
* @property hooks.validate - This hook is executed before all operations. You can consume this hook to perform any custom validation. If validation fails, you can throw an error to stop the workflow execution.
*/
export const refreshPaymentCollectionForCartWorkflow = createWorkflow(
refreshPaymentCollectionForCartWorkflowId,
{
name: refreshPaymentCollectionForCartWorkflowId,
idempotent: true,
},
(input: WorkflowData<RefreshPaymentCollectionForCartWorklowInput>) => {
const fetchCart = when("should-fetch-cart", { input }, ({ input }) => {
return !input.cart

View File

@@ -62,7 +62,10 @@ export const refundPaymentAndRecreatePaymentSessionWorkflowId =
* Refund a payment and create a new payment session.
*/
export const refundPaymentAndRecreatePaymentSessionWorkflow = createWorkflow(
refundPaymentAndRecreatePaymentSessionWorkflowId,
{
name: refundPaymentAndRecreatePaymentSessionWorkflowId,
idempotent: true,
},
(
input: WorkflowData<refundPaymentAndRecreatePaymentSessionWorkflowInput>
): WorkflowResponse<PaymentSessionDTO> => {

View File

@@ -1,3 +1,4 @@
import { CartWorkflowEvents } from "@medusajs/framework/utils"
import {
createHook,
createWorkflow,
@@ -9,7 +10,6 @@ import {
import { emitEventStep, useQueryGraphStep } from "../../common"
import { updateCartsStep } from "../steps"
import { refreshCartItemsWorkflow } from "./refresh-cart-items"
import { CartWorkflowEvents } from "@medusajs/framework/utils"
/**
* The cart ownership transfer details.
@@ -49,7 +49,10 @@ export const transferCartCustomerWorkflowId = "transfer-cart-customer"
* @property hooks.validate - This hook is executed before all operations. You can consume this hook to perform any custom validation. If validation fails, you can throw an error to stop the workflow execution.
*/
export const transferCartCustomerWorkflow = createWorkflow(
transferCartCustomerWorkflowId,
{
name: transferCartCustomerWorkflowId,
idempotent: true,
},
(input: WorkflowData<TransferCartCustomerWorkflowInput>) => {
const cartQuery = useQueryGraphStep({
entity: "cart",

View File

@@ -73,7 +73,10 @@ export const updateCartPromotionsWorkflowId = "update-cart-promotions"
* @property hooks.validate - This hook is executed before all operations. You can consume this hook to perform any custom validation. If validation fails, you can throw an error to stop the workflow execution.
*/
export const updateCartPromotionsWorkflow = createWorkflow(
updateCartPromotionsWorkflowId,
{
name: updateCartPromotionsWorkflowId,
idempotent: true,
},
(input: WorkflowData<UpdateCartPromotionsWorkflowInput>) => {
const fetchCart = when("should-fetch-cart", { input }, ({ input }) => {
return !input.cart

View File

@@ -78,7 +78,10 @@ export const updateCartWorkflowId = "update-cart"
* @property hooks.cartUpdated - This hook is executed after a cart is update. You can consume this hook to perform custom actions on the updated cart.
*/
export const updateCartWorkflow = createWorkflow(
updateCartWorkflowId,
{
name: updateCartWorkflowId,
idempotent: true,
},
(input: WorkflowData<UpdateCartWorkflowInput>) => {
const { data: cartToUpdate } = useQueryGraphStep({
entity: "cart",

View File

@@ -104,7 +104,10 @@ export const updateLineItemInCartWorkflowId = "update-line-item-in-cart"
* :::
*/
export const updateLineItemInCartWorkflow = createWorkflow(
updateLineItemInCartWorkflowId,
{
name: updateLineItemInCartWorkflowId,
idempotent: true,
},
(
input: WorkflowData<UpdateLineItemInCartWorkflowInputDTO & AdditionalData>
) => {

View File

@@ -5,7 +5,7 @@ import { deleteLineItemsStep } from "../steps/delete-line-items"
/**
* The data to delete line items from a cart.
*/
export type DeleteLineItemsWorkflowInput = {
export type DeleteLineItemsWorkflowInput = {
/**
* The cart's ID.
*/
@@ -20,10 +20,10 @@ export const deleteLineItemsWorkflowId = "delete-line-items"
/**
* This workflow deletes line items from a cart. It's used by the
* [Delete Line Item Store API Route](https://docs.medusajs.com/api/store#carts_deletecartsidlineitemsline_id).
*
*
* You can use this workflow within your customizations or your own custom workflows, allowing you to
* delete line items from a cart within your custom flows.
*
*
* @example
* const { result } = await deleteLineItemsWorkflow(container)
* .run({
@@ -32,13 +32,16 @@ export const deleteLineItemsWorkflowId = "delete-line-items"
* ids: ["li_123"]
* }
* })
*
*
* @summary
*
*
* Delete line items from a cart.
*/
export const deleteLineItemsWorkflow = createWorkflow(
deleteLineItemsWorkflowId,
{
name: deleteLineItemsWorkflowId,
idempotent: true,
},
(input: WorkflowData<DeleteLineItemsWorkflowInput>) => {
deleteLineItemsStep(input.ids)

View File

@@ -37,6 +37,7 @@ import {
TransactionStepTimeoutError,
TransactionTimeoutError,
} from "./errors"
import { Context } from "@medusajs/types"
/**
* @class TransactionOrchestrator is responsible for managing and executing distributed transactions.
@@ -1380,7 +1381,8 @@ export class TransactionOrchestrator extends EventEmitter {
private createTransactionFlow(
transactionId: string,
flowMetadata?: TransactionFlow["metadata"]
flowMetadata?: TransactionFlow["metadata"],
context?: Context
): TransactionFlow {
const [steps, features] = TransactionOrchestrator.buildSteps(
this.definition
@@ -1390,7 +1392,7 @@ export class TransactionOrchestrator extends EventEmitter {
modelId: this.id,
options: this.options,
transactionId: transactionId,
runId: ulid(),
runId: context?.runId ?? ulid(),
metadata: flowMetadata,
hasAsyncSteps: features.hasAsyncSteps,
hasFailedSteps: false,
@@ -1541,12 +1543,14 @@ export class TransactionOrchestrator extends EventEmitter {
handler,
payload,
flowMetadata,
context,
onLoad,
}: {
transactionId: string
handler: TransactionStepHandler
payload?: unknown
flowMetadata?: TransactionFlow["metadata"]
context?: Context
onLoad?: (transaction: DistributedTransactionType) => Promise<void> | void
}): Promise<DistributedTransactionType> {
const existingTransaction =
@@ -1555,7 +1559,11 @@ export class TransactionOrchestrator extends EventEmitter {
let newTransaction = false
let modelFlow: TransactionFlow
if (!existingTransaction) {
modelFlow = this.createTransactionFlow(transactionId, flowMetadata)
modelFlow = this.createTransactionFlow(
transactionId,
flowMetadata,
context
)
newTransaction = true
} else {
modelFlow = existingTransaction.flow

View File

@@ -362,6 +362,7 @@ export class LocalWorkflow {
handler: handler(this.container_, context),
payload: input,
flowMetadata,
context,
onLoad: this.onLoad.bind(this),
})

View File

@@ -54,6 +54,11 @@ export type Context<TManager = unknown> = {
*/
transactionId?: string
/**
* A string indicating the ID of the current run.
*/
runId?: string
/**
* An instance of a message aggregator, which is used to aggregate messages to be emitted at a later point.
*/

View File

@@ -211,6 +211,7 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
step.__step__ + "-" + (stepContext.transactionId ?? ulid()),
parentStepIdempotencyKey: stepContext.idempotencyKey,
preventReleaseEvents: true,
runId: stepContext.runId,
}
let transaction

View File

@@ -38,6 +38,7 @@ function buildStepContext({
parentStepIdempotencyKey: flowMetadata?.parentStepIdempotencyKey as string,
preventReleaseEvents: flowMetadata?.preventReleaseEvents ?? false,
transactionId: stepArguments.context!.transactionId,
runId: flow.runId,
context: stepArguments.context!,
" stepDefinition": stepDefinition,
" getStepResult"(

View File

@@ -192,6 +192,11 @@ export interface StepExecutionContext {
*/
transactionId?: string
/**
* A string indicating the ID of the current run.
*/
runId?: string
/**
* Get access to the result returned by a named step. Returns undefined
* when step is not found or when nothing was returned.

View File

@@ -1,23 +1,25 @@
import { transferCartCustomerWorkflow } from "@medusajs/core-flows"
import { transferCartCustomerWorkflowId } from "@medusajs/core-flows"
import { HttpTypes } from "@medusajs/framework/types"
import {
AuthenticatedMedusaRequest,
MedusaResponse,
} from "@medusajs/framework/http"
import { Modules } from "@medusajs/framework/utils"
import { refetchCart } from "../../helpers"
export const POST = async (
req: AuthenticatedMedusaRequest,
res: MedusaResponse<HttpTypes.StoreCartResponse>
) => {
const workflow = transferCartCustomerWorkflow(req.scope)
const we = req.scope.resolve(Modules.WORKFLOW_ENGINE)
await workflow.run({
await we.run(transferCartCustomerWorkflowId, {
input: {
id: req.params.id,
customer_id: req.auth_context?.actor_id,
},
transactionId: "cart-transfer-customer-" + req.params.id,
})
const cart = await refetchCart(

View File

@@ -1,11 +1,11 @@
import {
deleteLineItemsWorkflow,
updateLineItemInCartWorkflow,
deleteLineItemsWorkflowId,
updateLineItemInCartWorkflowId,
} from "@medusajs/core-flows"
import { prepareListQuery } from "@medusajs/framework"
import { MedusaRequest, MedusaResponse } from "@medusajs/framework/http"
import { HttpTypes } from "@medusajs/framework/types"
import { MedusaError } from "@medusajs/framework/utils"
import { MedusaError, Modules } from "@medusajs/framework/utils"
import { refetchCart } from "../../../helpers"
import { StoreUpdateCartLineItemType } from "../../../validators"
@@ -40,12 +40,14 @@ export const POST = async (
)
}
await updateLineItemInCartWorkflow(req.scope).run({
const we = req.scope.resolve(Modules.WORKFLOW_ENGINE)
await we.run(updateLineItemInCartWorkflowId, {
input: {
cart_id: req.params.id,
item_id: item.id,
update: req.validatedBody,
},
transactionId: "cart-update-item-" + req.params.id,
})
const updatedCart = await refetchCart(
@@ -63,8 +65,10 @@ export const DELETE = async (
) => {
const id = req.params.line_id
await deleteLineItemsWorkflow(req.scope).run({
const we = req.scope.resolve(Modules.WORKFLOW_ENGINE)
await we.run(deleteLineItemsWorkflowId, {
input: { cart_id: req.params.id, ids: [id] },
transactionId: "cart-delete-item-" + req.params.id,
})
const cart = await refetchCart(

View File

@@ -1,6 +1,7 @@
import { addToCartWorkflow } from "@medusajs/core-flows"
import { addToCartWorkflowId } from "@medusajs/core-flows"
import { MedusaRequest, MedusaResponse } from "@medusajs/framework/http"
import { HttpTypes } from "@medusajs/framework/types"
import { Modules } from "@medusajs/utils"
import { refetchCart } from "../../helpers"
import { StoreAddCartLineItemType } from "../../validators"
@@ -8,11 +9,13 @@ export const POST = async (
req: MedusaRequest<StoreAddCartLineItemType>,
res: MedusaResponse<HttpTypes.StoreCartResponse>
) => {
await addToCartWorkflow(req.scope).run({
const we = req.scope.resolve(Modules.WORKFLOW_ENGINE)
await we.run(addToCartWorkflowId, {
input: {
cart_id: req.params.id,
items: [req.validatedBody],
},
transactionId: "cart-add-item-" + req.params.id,
})
const cart = await refetchCart(

View File

@@ -1,22 +1,26 @@
import { updateCartPromotionsWorkflow } from "@medusajs/core-flows"
import { PromotionActions } from "@medusajs/framework/utils"
import { updateCartPromotionsWorkflowId } from "@medusajs/core-flows"
import { MedusaRequest, MedusaResponse } from "@medusajs/framework/http"
import { refetchCart } from "../../helpers"
import { HttpTypes } from "@medusajs/framework/types"
import { Modules, PromotionActions } from "@medusajs/framework/utils"
import { refetchCart } from "../../helpers"
export const POST = async (
req: MedusaRequest<HttpTypes.StoreCartAddPromotion>,
res: MedusaResponse<HttpTypes.StoreCartResponse>
) => {
const workflow = updateCartPromotionsWorkflow(req.scope)
const we = req.scope.resolve(Modules.WORKFLOW_ENGINE)
const payload = req.validatedBody
await workflow.run({
await we.run(updateCartPromotionsWorkflowId, {
input: {
promo_codes: payload.promo_codes,
cart_id: req.params.id,
action: PromotionActions.ADD,
action:
payload.promo_codes.length > 0
? PromotionActions.ADD
: PromotionActions.REPLACE,
},
transactionId: "cart-update-promotions-" + req.params.id,
})
const cart = await refetchCart(
@@ -34,15 +38,16 @@ export const DELETE = async (
cart: HttpTypes.StoreCart
}>
) => {
const workflow = updateCartPromotionsWorkflow(req.scope)
const we = req.scope.resolve(Modules.WORKFLOW_ENGINE)
const payload = req.validatedBody
await workflow.run({
await we.run(updateCartPromotionsWorkflowId, {
input: {
promo_codes: payload.promo_codes,
cart_id: req.params.id,
action: PromotionActions.REMOVE,
},
transactionId: "cart-delete-promotions-" + req.params.id,
})
const cart = await refetchCart(

View File

@@ -1,4 +1,4 @@
import { updateCartWorkflow } from "@medusajs/core-flows"
import { updateCartWorkflowId } from "@medusajs/core-flows"
import {
AdditionalData,
HttpTypes,
@@ -6,6 +6,7 @@ import {
} from "@medusajs/framework/types"
import { MedusaRequest, MedusaResponse } from "@medusajs/framework/http"
import { Modules } from "@medusajs/framework/utils"
import { refetchCart } from "../helpers"
export const GET = async (
@@ -27,13 +28,14 @@ export const POST = async (
cart: HttpTypes.StoreCart
}>
) => {
const workflow = updateCartWorkflow(req.scope)
const we = req.scope.resolve(Modules.WORKFLOW_ENGINE)
await workflow.run({
await we.run(updateCartWorkflowId, {
input: {
...req.validatedBody,
id: req.params.id,
},
transactionId: "cart-update-" + req.params.id,
})
const cart = await refetchCart(

View File

@@ -1,13 +1,14 @@
import { createPaymentCollectionForCartWorkflow } from "@medusajs/core-flows"
import {
ContainerRegistrationKeys,
remoteQueryObjectFromString,
} from "@medusajs/framework/utils"
import { createPaymentCollectionForCartWorkflowId } from "@medusajs/core-flows"
import {
AuthenticatedMedusaRequest,
MedusaResponse,
} from "@medusajs/framework/http"
import { HttpTypes } from "@medusajs/framework/types"
import {
ContainerRegistrationKeys,
remoteQueryObjectFromString,
} from "@medusajs/framework/utils"
import { Modules } from "@medusajs/utils"
export const POST = async (
req: AuthenticatedMedusaRequest<HttpTypes.StoreCreatePaymentCollection>,
@@ -27,8 +28,10 @@ export const POST = async (
let paymentCollection = cartCollectionRelation?.payment_collection
if (!paymentCollection) {
await createPaymentCollectionForCartWorkflow(req.scope).run({
const we = req.scope.resolve(Modules.WORKFLOW_ENGINE)
await we.run(createPaymentCollectionForCartWorkflowId, {
input: req.body,
transactionId: "create-payment-collection-for-cart-" + cart_id,
})
const [cartCollectionRelation] = await remoteQuery(

View File

@@ -54,6 +54,7 @@ createWorkflow(
{
name: "workflow_idempotent",
idempotent: true,
retentionTime: 20,
},
function (input) {
step_1(input)

View File

@@ -1,10 +1,10 @@
import { isPresent } from "@medusajs/framework/utils"
import {
createStep,
createWorkflow,
StepResponse,
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"
import { isPresent } from "@medusajs/framework/utils"
const step_1 = createStep(
"step_1",
@@ -53,6 +53,7 @@ const step_3 = createStep(
createWorkflow(
{
name: "workflow_sync",
retentionTime: 20,
idempotent: true,
},
function (input) {

View File

@@ -177,6 +177,15 @@
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state\" ON \"workflow_execution\" (state) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_workflow_execution_run_id",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_run_id\" ON \"workflow_execution\" (run_id) WHERE deleted_at IS NULL"
},
{
"keyName": "workflow_execution_pkey",
"columnNames": [

View File

@@ -0,0 +1,13 @@
import { Migration } from '@mikro-orm/migrations';
export class Migration20250819104213 extends Migration {
override async up(): Promise<void> {
this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_run_id" ON "workflow_execution" (run_id) WHERE deleted_at IS NULL;`);
}
override async down(): Promise<void> {
this.addSql(`drop index if exists "IDX_workflow_execution_run_id";`);
}
}

View File

@@ -34,4 +34,8 @@ export const WorkflowExecution = model
on: ["state"],
where: "deleted_at IS NULL",
},
{
on: ["run_id"],
where: "deleted_at IS NULL",
},
])

View File

@@ -180,6 +180,7 @@ export class WorkflowsModuleService<
...(restContext ?? {}),
...(options_.context ?? {}),
eventGroupId,
runId: options_.runId,
preventReleaseEvents: localPreventReleaseEvents,
}

View File

@@ -138,8 +138,6 @@ export class InMemoryDistributedTransactionStorage
private async deleteFromDb(data: TransactionCheckpoint) {
await this.workflowExecutionService_.delete([
{
workflow_id: data.flow.modelId,
transaction_id: data.flow.transactionId,
run_id: data.flow.runId,
},
])
@@ -223,7 +221,7 @@ export class InMemoryDistributedTransactionStorage
TransactionState.REVERTED,
].includes(data.flow.state)
const { retentionTime, idempotent } = options ?? {}
const { retentionTime } = options ?? {}
await this.#preventRaceConditionExecutionIfNecessary({
data,
@@ -261,8 +259,13 @@ export class InMemoryDistributedTransactionStorage
// Optimize DB operations - only perform when necessary
if (hasFinished) {
if (!retentionTime && !idempotent) {
await this.deleteFromDb(data)
if (!retentionTime) {
// If the workflow is nested, we cant just remove it because it would break the compensation algorithm. Instead, it will get deleted when the top level parent is deleted.
if (!flow.metadata?.parentStepIdempotencyKey) {
await this.deleteFromDb(data)
} else {
await this.saveToDb(data, retentionTime)
}
} else {
await this.saveToDb(data, retentionTime)
}

View File

@@ -1,10 +1,10 @@
import { isPresent } from "@medusajs/framework/utils"
import {
createStep,
createWorkflow,
StepResponse,
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"
import { isPresent } from "@medusajs/framework/utils"
const step_1 = createStep(
"step_1",
@@ -54,6 +54,7 @@ createWorkflow(
{
name: "workflow_sync",
idempotent: true,
retentionTime: 20,
},
function (input) {
step_1(input)

View File

@@ -177,6 +177,15 @@
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state\" ON \"workflow_execution\" (state) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_workflow_execution_run_id",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_run_id\" ON \"workflow_execution\" (run_id) WHERE deleted_at IS NULL"
},
{
"keyName": "workflow_execution_pkey",
"columnNames": [

View File

@@ -0,0 +1,13 @@
import { Migration } from '@mikro-orm/migrations';
export class Migration20250819110923 extends Migration {
override async up(): Promise<void> {
this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_run_id" ON "workflow_execution" (run_id) WHERE deleted_at IS NULL;`);
}
override async down(): Promise<void> {
this.addSql(`drop index if exists "IDX_workflow_execution_run_id";`);
}
}

View File

@@ -34,4 +34,8 @@ export const WorkflowExecution = model
on: ["state"],
where: "deleted_at IS NULL",
},
{
on: ["run_id"],
where: "deleted_at IS NULL",
},
])

View File

@@ -220,8 +220,6 @@ export class RedisDistributedTransactionStorage
private async deleteFromDb(data: TransactionCheckpoint) {
await this.workflowExecutionService_.delete([
{
workflow_id: data.flow.modelId,
transaction_id: data.flow.transactionId,
run_id: data.flow.runId,
},
])
@@ -351,7 +349,7 @@ export class RedisDistributedTransactionStorage
TransactionState.REVERTED,
].includes(data.flow.state)
const { retentionTime, idempotent } = options ?? {}
const { retentionTime } = options ?? {}
await this.#preventRaceConditionExecutionIfNecessary({
data,
@@ -416,8 +414,13 @@ export class RedisDistributedTransactionStorage
})
// Database operations
if (hasFinished && !retentionTime && !idempotent) {
await promiseAll([pipelinePromise, this.deleteFromDb(data)])
if (hasFinished && !retentionTime) {
// If the workflow is nested, we cant just remove it because it would break the compensation algorithm. Instead, it will get deleted when the top level parent is deleted.
if (!data.flow.metadata?.parentStepIdempotencyKey) {
await promiseAll([pipelinePromise, this.deleteFromDb(data)])
} else {
await promiseAll([pipelinePromise, this.saveToDb(data, retentionTime)])
}
} else {
await promiseAll([pipelinePromise, this.saveToDb(data, retentionTime)])
}