fix: Idempotent cart completion (#9231)

What
- Store result of cart-completion workflow for three days by default
  - This enables the built-in idempotency mechanism to kick-in, provided the same transaction ID is used on workflow executions
- Return order from cart-completion workflow if the cart has already been completed
  - In case transaction ID is not used on workflow executions, we still only want to complete a cart once
This commit is contained in:
Oli Juhl
2024-10-04 14:01:09 +02:00
committed by GitHub
parent 6055f4c9cf
commit f7472a6fa6
14 changed files with 809 additions and 525 deletions

View File

@@ -20,7 +20,7 @@ import {
ProductStatus,
PromotionRuleOperator,
PromotionType,
RuleOperator,
RuleOperator
} from "@medusajs/utils"
import { medusaIntegrationTestRunner } from "medusa-test-utils"
import {
@@ -3085,6 +3085,67 @@ medusaIntegrationTestRunner({
})
})
it("should return order when cart is already completed", async () => {
const cart = (
await api.post(
`/store/carts`,
{
currency_code: "usd",
email: "tony@stark-industries.com",
shipping_address: {
address_1: "test address 1",
address_2: "test address 2",
city: "ny",
country_code: "us",
province: "ny",
postal_code: "94016",
},
sales_channel_id: salesChannel.id,
items: [{ quantity: 1, variant_id: product.variants[0].id }],
},
storeHeaders
)
).data.cart
const paymentCollection = (
await api.post(
`/store/payment-collections`,
{
cart_id: cart.id,
},
storeHeaders
)
).data.payment_collection
await api.post(
`/store/payment-collections/${paymentCollection.id}/payment-sessions`,
{ provider_id: "pp_system_default" },
storeHeaders
)
await api.post(`/store/carts/${cart.id}/complete`, {}, storeHeaders)
const cartRefetch = (
await api.get(`/store/carts/${cart.id}`, storeHeaders)
).data.cart
expect(cartRefetch.completed_at).toBeTruthy()
const order = await api.post(
`/store/carts/${cart.id}/complete`,
{},
storeHeaders
)
expect(order.status).toEqual(200)
expect(order.data).toEqual({
type: "order",
order: expect.objectContaining({
id: expect.any(String),
}),
})
})
it("should return cart when payment authorization fails", async () => {
const paymentModuleService = appContainer.resolve(Modules.PAYMENT)
const authorizePaymentSessionSpy = jest.spyOn(

View File

@@ -34,6 +34,7 @@ export const validateCartPaymentsStep = createStep(
const processablePaymentStatuses = [
PaymentSessionStatus.PENDING,
PaymentSessionStatus.REQUIRES_MORE,
PaymentSessionStatus.AUTHORIZED, // E.g. payment was authorized, but the cart was not completed
]
const paymentsToProcess = paymentCollection.payment_sessions?.filter((ps) =>

View File

@@ -1,7 +1,6 @@
import {
CartWorkflowDTO,
OrderDTO,
UsageComputedActions,
UsageComputedActions
} from "@medusajs/framework/types"
import {
Modules,
@@ -12,6 +11,7 @@ import {
createWorkflow,
parallelize,
transform,
when,
WorkflowData,
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"
@@ -20,12 +20,12 @@ import {
emitEventStep,
useRemoteQueryStep,
} from "../../common"
import { useQueryStep } from "../../common/steps/use-query"
import { createOrdersStep } from "../../order/steps/create-orders"
import { authorizePaymentSessionStep } from "../../payment/steps/authorize-payment-session"
import { registerUsageStep } from "../../promotion/steps/register-usage"
import { updateCartsStep, validateCartPaymentsStep } from "../steps"
import { reserveInventoryStep } from "../steps/reserve-inventory"
import { validateCartStep } from "../steps/validate-cart"
import { completeCartFields } from "../utils/fields"
import { prepareConfirmInventoryInput } from "../utils/prepare-confirm-inventory-input"
import {
@@ -38,36 +38,56 @@ export type CompleteCartWorkflowInput = {
id: string
}
export const THREE_DAYS = 60 * 60 * 24 * 3
export const completeCartWorkflowId = "complete-cart"
/**
* This workflow completes a cart.
*/
export const completeCartWorkflow = createWorkflow(
completeCartWorkflowId,
{
name: completeCartWorkflowId,
store: true,
idempotent: true,
// 3 days of retention time
retentionTime: THREE_DAYS,
},
(
input: WorkflowData<CompleteCartWorkflowInput>
): WorkflowResponse<OrderDTO> => {
const cart = useRemoteQueryStep({
entry_point: "cart",
fields: completeCartFields,
variables: { id: input.id },
list: false,
): WorkflowResponse<{ id: string }> => {
const orderCart = useQueryStep({
entity: "order_cart",
fields: ["cart_id", "order_id"],
filters: { cart_id: input.id },
})
validateCartStep({ cart })
const paymentSessions = validateCartPaymentsStep({ cart })
authorizePaymentSessionStep({
// We choose the first payment session, as there will only be one active payment session
// This might change in the future.
id: paymentSessions[0].id,
context: { cart_id: cart.id },
const orderId = transform({ orderCart }, ({ orderCart }) => {
return orderCart.data[0]?.order_id
})
const { variants, sales_channel_id } = transform({ cart }, (data) => {
const allItems: any[] = []
const allVariants: any[] = []
// If order ID does not exist, we are completing the cart for the first time
const order = when({ orderId }, ({ orderId }) => {
return !orderId
}).then(() => {
const cart = useRemoteQueryStep({
entry_point: "cart",
fields: completeCartFields,
variables: { id: input.id },
list: false,
})
const paymentSessions = validateCartPaymentsStep({ cart })
authorizePaymentSessionStep({
// We choose the first payment session, as there will only be one active payment session
// This might change in the future.
id: paymentSessions[0].id,
context: { cart_id: cart.id },
})
const { variants, sales_channel_id } = transform({ cart }, (data) => {
const allItems: any[] = []
const allVariants: any[] = []
data.cart?.items?.forEach((item) => {
allItems.push({
@@ -78,163 +98,164 @@ export const completeCartWorkflow = createWorkflow(
allVariants.push(item.variant)
})
return {
variants: allVariants,
items: allItems,
sales_channel_id: data.cart.sales_channel_id,
}
})
const finalCart = useRemoteQueryStep({
entry_point: "cart",
fields: completeCartFields,
variables: { id: input.id },
list: false,
}).config({ name: "final-cart" })
const cartToOrder = transform({ cart }, ({ cart }) => {
const allItems = (cart.items ?? []).map((item) => {
return prepareLineItemData({
item,
variant: item.variant,
unitPrice: item.raw_unit_price ?? item.unit_price,
isTaxInclusive: item.is_tax_inclusive,
quantity: item.raw_quantity ?? item.quantity,
metadata: item?.metadata,
taxLines: item.tax_lines ?? [],
adjustments: item.adjustments ?? [],
})
})
const shippingMethods = (cart.shipping_methods ?? []).map((sm) => {
return {
name: sm.name,
description: sm.description,
amount: sm.raw_amount ?? sm.amount,
is_tax_inclusive: sm.is_tax_inclusive,
shipping_option_id: sm.shipping_option_id,
data: sm.data,
metadata: sm.metadata,
tax_lines: prepareTaxLinesData(sm.tax_lines ?? []),
adjustments: prepareAdjustmentsData(sm.adjustments ?? []),
variants: allVariants,
items: allItems,
sales_channel_id: data.cart.sales_channel_id,
}
})
const itemAdjustments = allItems
.map((item) => item.adjustments ?? [])
.flat(1)
const shippingAdjustments = shippingMethods
.map((sm) => sm.adjustments ?? [])
.flat(1)
const cartToOrder = transform({ cart }, ({ cart }) => {
const allItems = (cart.items ?? []).map((item) => {
return prepareLineItemData({
item,
variant: item.variant,
unitPrice: item.raw_unit_price ?? item.unit_price,
isTaxInclusive: item.is_tax_inclusive,
quantity: item.raw_quantity ?? item.quantity,
metadata: item?.metadata,
taxLines: item.tax_lines ?? [],
adjustments: item.adjustments ?? [],
})
})
const promoCodes = [...itemAdjustments, ...shippingAdjustments]
.map((adjustment) => adjustment.code)
.filter((code) => Boolean) as string[]
const shippingMethods = (cart.shipping_methods ?? []).map((sm) => {
return {
name: sm.name,
description: sm.description,
amount: sm.raw_amount ?? sm.amount,
is_tax_inclusive: sm.is_tax_inclusive,
shipping_option_id: sm.shipping_option_id,
data: sm.data,
metadata: sm.metadata,
tax_lines: prepareTaxLinesData(sm.tax_lines ?? []),
adjustments: prepareAdjustmentsData(sm.adjustments ?? []),
}
})
return {
region_id: cart.region?.id,
customer_id: cart.customer?.id,
sales_channel_id: cart.sales_channel_id,
status: OrderStatus.PENDING,
email: cart.email,
currency_code: cart.currency_code,
shipping_address: cart.shipping_address,
billing_address: cart.billing_address,
no_notification: false,
items: allItems,
shipping_methods: shippingMethods,
metadata: cart.metadata,
promo_codes: promoCodes,
}
})
const itemAdjustments = allItems
.map((item) => item.adjustments ?? [])
.flat(1)
const shippingAdjustments = shippingMethods
.map((sm) => sm.adjustments ?? [])
.flat(1)
const createdOrders = createOrdersStep([cartToOrder])
const promoCodes = [...itemAdjustments, ...shippingAdjustments]
.map((adjustment) => adjustment.code)
.filter(Boolean)
const order = transform(
{ createdOrders },
({ createdOrders }) => createdOrders[0]
)
return {
region_id: cart.region?.id,
customer_id: cart.customer?.id,
sales_channel_id: cart.sales_channel_id,
status: OrderStatus.PENDING,
email: cart.email,
currency_code: cart.currency_code,
shipping_address: cart.shipping_address,
billing_address: cart.billing_address,
no_notification: false,
items: allItems,
shipping_methods: shippingMethods,
metadata: cart.metadata,
promo_codes: promoCodes,
}
})
const reservationItemsData = transform({ order }, ({ order }) =>
order.items!.map((i) => ({
variant_id: i.variant_id,
quantity: i.quantity,
id: i.id,
}))
)
const createdOrders = createOrdersStep([cartToOrder])
const formatedInventoryItems = transform(
{
input: {
sales_channel_id,
variants,
items: reservationItemsData,
},
},
prepareConfirmInventoryInput
)
const createdOrder = transform({ createdOrders }, ({ createdOrders }) => {
return createdOrders?.[0] ?? undefined
})
const updateCompletedAt = transform({ cart }, ({ cart }) => {
return {
id: cart.id,
completed_at: new Date(),
}
})
const reservationItemsData = transform(
{ createdOrder },
({ createdOrder }) =>
createdOrder.items!.map((i) => ({
variant_id: i.variant_id,
quantity: i.quantity,
id: i.id,
}))
)
parallelize(
createRemoteLinkStep([
const formatedInventoryItems = transform(
{
[Modules.ORDER]: { order_id: order.id },
[Modules.CART]: { cart_id: finalCart.id },
},
{
[Modules.ORDER]: { order_id: order.id },
[Modules.PAYMENT]: {
payment_collection_id: cart.payment_collection.id,
input: {
sales_channel_id,
variants,
items: reservationItemsData,
},
},
]),
updateCartsStep([updateCompletedAt]),
reserveInventoryStep(formatedInventoryItems),
emitEventStep({
eventName: OrderWorkflowEvents.PLACED,
data: { id: order.id },
prepareConfirmInventoryInput
)
const updateCompletedAt = transform({ cart }, ({ cart }) => {
return {
id: cart.id,
completed_at: new Date(),
}
})
)
const promotionUsage = transform(
{ cart },
({ cart }: { cart: CartWorkflowDTO }) => {
const promotionUsage: UsageComputedActions[] = []
parallelize(
createRemoteLinkStep([
{
[Modules.ORDER]: { order_id: createdOrder.id },
[Modules.CART]: { cart_id: cart.id },
},
{
[Modules.ORDER]: { order_id: createdOrder.id },
[Modules.PAYMENT]: {
payment_collection_id: cart.payment_collection.id,
},
},
]),
updateCartsStep([updateCompletedAt]),
reserveInventoryStep(formatedInventoryItems),
emitEventStep({
eventName: OrderWorkflowEvents.PLACED,
data: { id: createdOrder.id },
})
)
const itemAdjustments = (cart.items ?? [])
.map((item) => item.adjustments ?? [])
.flat(1)
const promotionUsage = transform(
{ cart },
({ cart }: { cart: CartWorkflowDTO }) => {
const promotionUsage: UsageComputedActions[] = []
const shippingAdjustments = (cart.shipping_methods ?? [])
.map((item) => item.adjustments ?? [])
.flat(1)
const itemAdjustments = (cart.items ?? [])
.map((item) => item.adjustments ?? [])
.flat(1)
for (const adjustment of itemAdjustments) {
promotionUsage.push({
amount: adjustment.amount,
code: adjustment.code!,
})
const shippingAdjustments = (cart.shipping_methods ?? [])
.map((item) => item.adjustments ?? [])
.flat(1)
for (const adjustment of itemAdjustments) {
promotionUsage.push({
amount: adjustment.amount,
code: adjustment.code!,
})
}
for (const adjustment of shippingAdjustments) {
promotionUsage.push({
amount: adjustment.amount,
code: adjustment.code!,
})
}
return promotionUsage
}
)
for (const adjustment of shippingAdjustments) {
promotionUsage.push({
amount: adjustment.amount,
code: adjustment.code!,
})
}
registerUsageStep(promotionUsage)
return promotionUsage
}
)
return createdOrder
})
registerUsageStep(promotionUsage)
const result = transform({ order, orderId }, ({ order, orderId }) => {
return { id: order?.id ?? orderId }
})
return new WorkflowResponse(order)
return new WorkflowResponse(result)
}
)

View File

@@ -0,0 +1,21 @@
import { ContainerRegistrationKeys } from "@medusajs/utils"
import { createStep, StepResponse } from "@medusajs/workflows-sdk"
interface QueryInput {
entity: string
fields: string[]
filters?: Record<string, unknown>
context?: any
}
export const useQueryStepId = "use-query"
export const useQueryStep = createStep(
useQueryStepId,
async (data: QueryInput, { container }) => {
const query = container.resolve(ContainerRegistrationKeys.QUERY)
const result = await query.graph(data)
return new StepResponse(result)
}
)

View File

@@ -1,2 +1,5 @@
export * from "./capture-payment"
export * from "./on-payment-processed"
export * from "./process-payment"
export * from "./refund-payment"

View File

@@ -0,0 +1,38 @@
import { WebhookActionResult } from "@medusajs/types"
import { createWorkflow, when } from "@medusajs/workflows-sdk"
import { completeCartWorkflow } from "../../cart"
import { useRemoteQueryStep } from "../../common"
import { useQueryStep } from "../../common/steps/use-query"
export const onPaymentProcessedWorkflowId = "on-payment-processed-workflow"
export const onPaymentProcessedWorkflow = createWorkflow(
onPaymentProcessedWorkflowId,
(input: WebhookActionResult) => {
const paymentSessionResult = useRemoteQueryStep({
entry_point: "payment_session",
fields: ["payment_collection_id"],
variables: { filters: { id: input.data?.session_id } },
list: false,
})
const cartPaymentCollection = useQueryStep({
entity: "cart_payment_collection",
fields: ["cart_id"],
filters: {
payment_collection_id: paymentSessionResult.payment_collection_id,
},
})
when({ cartPaymentCollection }, ({ cartPaymentCollection }) => {
return !!cartPaymentCollection.data.length
}).then(() => {
completeCartWorkflow.runAsStep({
input: {
id: cartPaymentCollection.data[0].cart_id,
},
})
})
// TODO: Add more cases down the line, e.g. order payments
}
)

View File

@@ -0,0 +1,44 @@
import { WebhookActionResult } from "@medusajs/types"
import { PaymentActions } from "@medusajs/utils"
import { createWorkflow, when } from "@medusajs/workflows-sdk"
import { useQueryStep } from "../../common/steps/use-query"
import { authorizePaymentSessionStep } from "../steps"
import { capturePaymentWorkflow } from "./capture-payment"
interface ProcessPaymentWorkflowInput extends WebhookActionResult {}
export const processPaymentWorkflowId = "process-payment-workflow"
export const processPaymentWorkflow = createWorkflow(
processPaymentWorkflowId,
(input: ProcessPaymentWorkflowInput) => {
const paymentData = useQueryStep({
entity: "payment",
fields: ["id"],
filters: { payment_session_id: input.data?.session_id },
})
when({ input }, ({ input }) => {
return (
input.action === PaymentActions.SUCCESSFUL && !!paymentData.data.length
)
}).then(() => {
capturePaymentWorkflow.runAsStep({
input: {
payment_id: paymentData.data[0].id,
amount: input.data?.amount,
},
})
})
when({ input }, ({ input }) => {
return (
input.action === PaymentActions.AUTHORIZED && !!input.data?.session_id
)
}).then(() => {
authorizePaymentSessionStep({
id: input.data!.session_id,
context: {},
})
})
}
)

View File

@@ -1148,7 +1148,7 @@ export class TransactionOrchestrator extends EventEmitter {
queue.push({ obj: obj[key], level: [...level] })
} else if (key === "action") {
if (actionNames.has(obj.action)) {
throw new Error(`Action "${obj.action}" is already defined.`)
throw new Error(`Step ${obj.action} is already defined in workflow.`)
}
actionNames.add(obj.action)

View File

@@ -195,24 +195,17 @@ export type WebhookActionData = {
*
* The actions that the payment provider informs the Payment Module to perform.
*/
export type WebhookActionResult =
| {
/**
* Received an event that is not processable.
*/
action: "not_supported"
}
| {
/**
* Normalized events from payment provider to internal payment module events.
*/
action: PaymentActions
export type WebhookActionResult = {
/**
* Normalized events from payment provider to internal payment module events.
*/
action: PaymentActions
/**
* The webhook action's details.
*/
data: WebhookActionData
}
/**
* The webhook action's details.
*/
data?: WebhookActionData
}
export interface IPaymentProvider {
/**

View File

@@ -31,6 +31,7 @@ import {
UpdateRefundReasonDTO,
UpsertPaymentCollectionDTO,
} from "./mutations"
import { WebhookActionResult } from "./provider"
/**
* The main service interface for the Payment Module.
@@ -1055,7 +1056,7 @@ export interface IPaymentModuleService extends IModuleService {
/* ********** HOOKS ********** */
/**
* This method handles a webhook event with the associated payment provider.
* This method retrieves webhook event data with the associated payment provider.
*
* Learn more about handling webhook events in [this guide](https://docs.medusajs.com/experimental/payment/webhook-events/)
*
@@ -1066,7 +1067,7 @@ export interface IPaymentModuleService extends IModuleService {
* In the following example, `req` is an instance of `MedusaRequest`:
*
* ```ts
* await paymentModuleService.processEvent({
* const dataAndAction = await paymentModuleService.getWebhookActionAndData({
* provider: "stripe",
* payload: {
* data: req.body,
@@ -1076,7 +1077,7 @@ export interface IPaymentModuleService extends IModuleService {
* })
* ```
*/
processEvent(data: ProviderWebhookPayload): Promise<void>
getWebhookActionAndData(data: ProviderWebhookPayload): Promise<WebhookActionResult>
}
/**

View File

@@ -1,11 +1,13 @@
import { completeCartWorkflow } from "@medusajs/core-flows"
import { MedusaError } from "@medusajs/framework/utils"
import { MedusaRequest, MedusaResponse } from "@medusajs/framework/http"
import { prepareRetrieveQuery } from "@medusajs/framework"
import { refetchOrder } from "../../../orders/helpers"
import { MedusaRequest, MedusaResponse } from "@medusajs/framework/http"
import { HttpTypes } from "@medusajs/framework/types"
import {
ContainerRegistrationKeys,
MedusaError,
} from "@medusajs/framework/utils"
import { refetchCart } from "../../helpers"
import { defaultStoreCartFields } from "../../query-config"
import { HttpTypes } from "@medusajs/framework/types"
export const POST = async (
req: MedusaRequest,
@@ -19,6 +21,8 @@ export const POST = async (
throwOnError: false,
})
const query = req.scope.resolve(ContainerRegistrationKeys.QUERY)
// When an error occurs on the workflow, its potentially got to with cart validations, payments
// or inventory checks. Return the cart here along with errors for the consumer to take more action
// and fix them
@@ -58,14 +62,14 @@ export const POST = async (
})
}
const order = await refetchOrder(
result.id,
req.scope,
req.remoteQueryConfig.fields
)
const { data } = await query.graph({
entity: "order",
fields: req.remoteQueryConfig.fields,
filters: { id: result.id },
})
res.status(200).json({
type: "order",
order,
order: data[0],
})
}

View File

@@ -1,8 +1,16 @@
import {
onPaymentProcessedWorkflow,
processPaymentWorkflow,
} from "@medusajs/core-flows"
import {
IPaymentModuleService,
ProviderWebhookPayload,
} from "@medusajs/framework/types"
import { Modules, PaymentWebhookEvents } from "@medusajs/framework/utils"
import {
Modules,
PaymentActions,
PaymentWebhookEvents,
} from "@medusajs/framework/utils"
import { SubscriberArgs, SubscriberConfig } from "../types/subscribers"
type SerializedBuffer = {
@@ -27,7 +35,25 @@ export default async function paymentWebhookhandler({
(input.payload.rawData as unknown as SerializedBuffer).data
)
}
await paymentService.processEvent(input)
const processedEvent = await paymentService.getWebhookActionAndData(input)
if (processedEvent?.action === PaymentActions.NOT_SUPPORTED) {
return
}
if (!processedEvent.data) {
return
}
await processPaymentWorkflow(container).run({
input: processedEvent,
})
// We process the intended side effects of payment processing separately.
await onPaymentProcessedWorkflow(container).run({
input: processedEvent,
})
}
export const config: SubscriberConfig = {

View File

@@ -27,6 +27,7 @@ import {
UpdatePaymentDTO,
UpdatePaymentSessionDTO,
UpsertPaymentCollectionDTO,
WebhookActionResult,
} from "@medusajs/framework/types"
import {
BigNumber,
@@ -37,7 +38,6 @@ import {
MedusaContext,
MedusaError,
ModulesSdkUtils,
PaymentActions,
PaymentCollectionStatus,
PaymentSessionStatus,
promiseAll,
@@ -425,20 +425,19 @@ export default class PaymentModuleService
"amount",
"raw_amount",
"currency_code",
"authorized_at",
"payment_collection_id",
],
relations: ["payment", "payment_collection"],
},
sharedContext
)
// this method needs to be idempotent
if (session.authorized_at) {
const payment = await this.paymentService_.retrieve(
{ session_id: session.id },
{ relations: ["payment_collection"] },
sharedContext
)
return await this.baseRepository_.serialize(payment, { populate: true })
if (session.payment && session.authorized_at) {
return await this.baseRepository_.serialize(session.payment, {
populate: true,
})
}
let { data, status } = await this.paymentProviderService_.authorizePayment(
@@ -849,45 +848,16 @@ export default class PaymentModuleService
}
@InjectManager()
async processEvent(
async getWebhookActionAndData(
eventData: ProviderWebhookPayload,
@MedusaContext() sharedContext?: Context
): Promise<void> {
): Promise<WebhookActionResult> {
const providerId = `pp_${eventData.provider}`
const event = await this.paymentProviderService_.getWebhookActionAndData(
return await this.paymentProviderService_.getWebhookActionAndData(
providerId,
eventData.payload
)
if (event.action === PaymentActions.NOT_SUPPORTED) {
return
}
switch (event.action) {
case PaymentActions.SUCCESSFUL: {
const [payment] = await this.listPayments(
{
payment_session_id: event.data.session_id,
},
{},
sharedContext
)
if (payment && !payment.captured_at) {
await this.capturePayment(
{ payment_id: payment.id, amount: event.data.amount },
sharedContext
)
}
break
}
case PaymentActions.AUTHORIZED:
await this.authorizePaymentSession(
event.data.session_id,
{},
sharedContext
)
}
}
@InjectManager()

693
yarn.lock

File diff suppressed because it is too large Load Diff