fix(medusa): Transaction lock issues on create/update cart items (#2612)

* fix(medusa): Transaction lock issues on create/update cart items

* fix add missing trans

* cleanup

* cleanup

* Create perfect-bears-invent.md

* cleanup

* revert draft order to no take it in that pr

* cleanup handler

* cleanup steps

* fix reference issue

* cleanup + fix tests and mock

* cleanup type

* rename file

* cleanup

* fix missing transaction

* wip

* Address pr feedback

* cleanup and fix unit tests

* fix handler

Co-authored-by: Oliver Windall Juhl <59018053+olivermrbl@users.noreply.github.com>
This commit is contained in:
Adrien de Peretti
2022-11-18 12:15:53 +01:00
committed by GitHub
parent 5332081972
commit a77780671a
14 changed files with 310 additions and 117 deletions

View File

@@ -0,0 +1,5 @@
---
"@medusajs/medusa": patch
---
fix(medusa): Transaction lock issues on create/update cart items

View File

@@ -25,7 +25,7 @@ describe("POST /store/carts/:id", () => {
})
it("calls CartService retrieve", () => {
expect(CartServiceMock.retrieve).toHaveBeenCalledTimes(2)
expect(CartServiceMock.retrieve).toHaveBeenCalledTimes(1)
expect(CartServiceMock.retrieveWithTotals).toHaveBeenCalledTimes(1)
})

View File

@@ -1,9 +1,16 @@
import { IsInt, IsOptional, IsString } from "class-validator"
import { EntityManager } from "typeorm"
import { defaultStoreCartFields, defaultStoreCartRelations } from "."
import { CartService, LineItemService } from "../../../../services"
import { validator } from "../../../../utils/validator"
import { FlagRouter } from "../../../../utils/flag-router"
import { validator } from "../../../../../utils/validator"
import {
CreateLineItemSteps,
handleAddOrUpdateLineItem,
} from "./utils/handler-steps"
import { IdempotencyKey } from "../../../../../models"
import {
initializeIdempotencyRequest,
runIdempotencyStep,
RunIdempotencyStepOptions,
} from "../../../../../utils/idempotency"
/**
* @oas [post] /carts/{id}/line-items
@@ -63,46 +70,65 @@ import { FlagRouter } from "../../../../utils/flag-router"
export default async (req, res) => {
const { id } = req.params
const customerId = req.user?.customer_id
const customerId: string | undefined = req.user?.customer_id
const validated = await validator(StorePostCartsCartLineItemsReq, req.body)
const lineItemService: LineItemService = req.scope.resolve("lineItemService")
const cartService: CartService = req.scope.resolve("cartService")
const manager: EntityManager = req.scope.resolve("manager")
const featureFlagRouter: FlagRouter = req.scope.resolve("featureFlagRouter")
await manager.transaction(async (m) => {
const txCartService = cartService.withTransaction(m)
const cart = await txCartService.retrieve(id)
let idempotencyKey!: IdempotencyKey
try {
idempotencyKey = await initializeIdempotencyRequest(req, res)
} catch {
res.status(409).send("Failed to create idempotency key")
return
}
const line = await lineItemService
.withTransaction(m)
.generate(validated.variant_id, cart.region_id, validated.quantity, {
customer_id: customerId || cart.customer_id,
metadata: validated.metadata,
})
let inProgress = true
let err: unknown = false
await txCartService.addLineItem(id, line, {
validateSalesChannels:
featureFlagRouter.isFeatureEnabled("sales_channels"),
})
const stepOptions: RunIdempotencyStepOptions = {
manager,
idempotencyKey,
container: req.scope,
isolationLevel: "SERIALIZABLE",
}
const updated = await txCartService.retrieve(id, {
relations: ["payment_sessions"],
})
while (inProgress) {
switch (idempotencyKey.recovery_point) {
case CreateLineItemSteps.STARTED: {
await runIdempotencyStep(async ({ manager }) => {
return await handleAddOrUpdateLineItem(
id,
{
customer_id: customerId,
metadata: validated.metadata,
quantity: validated.quantity,
variant_id: validated.variant_id,
},
{
manager,
container: req.scope,
}
)
}, stepOptions).catch((e) => {
inProgress = false
err = e
})
break
}
if (updated.payment_sessions?.length) {
await txCartService.setPaymentSessions(id)
case CreateLineItemSteps.FINISHED: {
inProgress = false
break
}
}
})
}
const data = await cartService.retrieveWithTotals(id, {
select: defaultStoreCartFields,
relations: defaultStoreCartRelations,
})
if (err) {
throw err
}
res.status(200).json({ cart: data })
res.status(idempotencyKey.response_code).json(idempotencyKey.response_body)
}
export class StorePostCartsCartLineItemsReq {

View File

@@ -0,0 +1,67 @@
import { AwilixContainer } from "awilix"
import { EntityManager } from "typeorm"
import { CartService, LineItemService } from "../../../../../../services"
import { FlagRouter } from "../../../../../../utils/flag-router"
import { defaultStoreCartFields, defaultStoreCartRelations } from "../../index"
import { IdempotencyCallbackResult } from "../../../../../../types/idempotency-key"
import { WithRequiredProperty } from "../../../../../../types/common"
import { Cart } from "../../../../../../models"
export const CreateLineItemSteps = {
STARTED: "started",
FINISHED: "finished",
}
export async function handleAddOrUpdateLineItem(
cartId: string,
data: {
metadata?: Record<string, unknown>
customer_id?: string
variant_id: string
quantity: number
},
{ container, manager }: { container: AwilixContainer; manager: EntityManager }
): Promise<IdempotencyCallbackResult> {
const cartService: CartService = container.resolve("cartService")
const lineItemService: LineItemService = container.resolve("lineItemService")
const featureFlagRouter: FlagRouter = container.resolve("featureFlagRouter")
const txCartService = cartService.withTransaction(manager)
let cart = await txCartService.retrieve(cartId, {
select: ["id", "region_id", "customer_id"],
})
const line = await lineItemService
.withTransaction(manager)
.generate(data.variant_id, cart.region_id, data.quantity, {
customer_id: data.customer_id || cart.customer_id,
metadata: data.metadata,
})
await txCartService.addLineItem(cart.id, line, {
validateSalesChannels: featureFlagRouter.isFeatureEnabled("sales_channels"),
})
cart = await txCartService.retrieveWithTotals(cart.id, {
select: defaultStoreCartFields,
relations: [
...defaultStoreCartRelations,
"billing_address",
"region.payment_providers",
"payment_sessions",
"customer",
],
})
if (cart.payment_sessions?.length) {
await txCartService.setPaymentSessions(
cart as WithRequiredProperty<Cart, "total">
)
}
return {
response_code: 200,
response_body: { cart },
}
}

View File

@@ -11,24 +11,20 @@ export const IdempotencyKeyService = {
}
}),
workStage: jest.fn().mockImplementation(async (key, fn) => {
try {
const { recovery_point, response_code, response_body } = await fn(
MockManager
)
const { recovery_point, response_code, response_body } = await fn(
MockManager
)
if (recovery_point) {
return {
recovery_point,
}
} else {
return {
recovery_point: "finished",
response_body,
response_code,
}
if (recovery_point) {
return {
recovery_point,
}
} else {
return {
recovery_point: "finished",
response_body,
response_code,
}
} catch (err) {
return { error: err }
}
}),
}

View File

@@ -284,7 +284,8 @@ describe("CartService", () => {
describe("addLineItem", () => {
const lineItemService = {
update: jest.fn(),
update: jest.fn().mockImplementation(() => Promise.resolve()),
list: jest.fn().mockImplementation(() => Promise.resolve([])),
create: jest.fn(),
withTransaction: function () {
return this
@@ -353,7 +354,27 @@ describe("CartService", () => {
manager: MockManager,
totalsService,
cartRepository,
lineItemService,
lineItemService: {
...lineItemService,
list: jest.fn().mockImplementation((where) => {
if (
where.cart_id === IdMap.getId("cartWithLine") &&
where.variant_id === IdMap.getId("existing") &&
where.should_merge
) {
return Promise.resolve([
{
...where,
id: IdMap.getId("merger"),
quantity: 1,
metadata: {},
},
])
}
return Promise.resolve([])
}),
},
lineItemRepository: MockRepository(),
newTotalsService: newTotalsServiceMock,
eventBusService,
@@ -453,12 +474,14 @@ describe("CartService", () => {
variant_id: IdMap.getId("existing"),
should_merge: true,
quantity: 1,
metadata: {},
}
await cartService.addLineItem(IdMap.getId("cartWithLine"), lineItem)
expect(lineItemService.update).toHaveBeenCalledTimes(1)
expect(lineItemService.update).toHaveBeenCalledWith(
expect(lineItemService.update).toHaveBeenCalledTimes(2)
expect(lineItemService.update).toHaveBeenNthCalledWith(
1,
IdMap.getId("merger"),
{
quantity: 2,
@@ -519,8 +542,9 @@ describe("CartService", () => {
describe("addLineItem w. SalesChannel", () => {
const lineItemService = {
update: jest.fn(),
update: jest.fn().mockImplementation(() => Promise.resolve()),
create: jest.fn(),
list: jest.fn().mockImplementation(() => Promise.resolve([])),
withTransaction: function () {
return this
},

View File

@@ -549,15 +549,15 @@ class CartService extends TransactionBaseService {
/**
* Check if line item's variant belongs to the cart's sales channel.
*
* @param cart - the cart for the line item
* @param sales_channel_id - the cart for the line item
* @param lineItem - the line item being added
* @return a boolean indicating validation result
*/
protected async validateLineItem(
cart: Cart,
{ sales_channel_id }: { sales_channel_id: string | null },
lineItem: LineItem
): Promise<boolean> {
if (!cart.sales_channel_id) {
if (!sales_channel_id) {
return true
}
@@ -570,7 +570,7 @@ class CartService extends TransactionBaseService {
.withTransaction(this.manager_)
.filterProductsBySalesChannel(
[lineItemVariant.product_id],
cart.sales_channel_id
sales_channel_id
)
).length
}
@@ -588,21 +588,16 @@ class CartService extends TransactionBaseService {
cartId: string,
lineItem: LineItem,
config = { validateSalesChannels: true }
): Promise<Cart> {
): Promise<void> {
const select: (keyof Cart)[] = ["id"]
if (this.featureFlagRouter_.isFeatureEnabled("sales_channels")) {
select.push("sales_channel_id")
}
return await this.atomicPhase_(
async (transactionManager: EntityManager) => {
const cart = await this.retrieve(cartId, {
relations: [
"shipping_methods",
"items",
"items.adjustments",
"payment_sessions",
"items.variant",
"items.variant.product",
"discounts",
"discounts.rule",
],
})
let cart = await this.retrieve(cartId, { select })
if (this.featureFlagRouter_.isFeatureEnabled("sales_channels")) {
if (config.validateSalesChannels) {
@@ -615,14 +610,25 @@ class CartService extends TransactionBaseService {
}
}
const lineItemServiceTx =
this.lineItemService_.withTransaction(transactionManager)
let currentItem: LineItem | undefined
if (lineItem.should_merge) {
currentItem = cart.items.find((item) => {
if (item.should_merge && item.variant_id === lineItem.variant_id) {
return isEqual(item.metadata, lineItem.metadata)
}
return false
})
const [existingItem] = await lineItemServiceTx.list(
{
cart_id: cart.id,
variant_id: lineItem.variant_id,
should_merge: true,
},
{ take: 1, select: ["id", "metadata", "quantity"] }
)
if (
existingItem &&
isEqual(existingItem.metadata, lineItem.metadata)
) {
currentItem = existingItem
}
}
// If content matches one of the line items currently in the cart we can
@@ -637,44 +643,33 @@ class CartService extends TransactionBaseService {
.confirmInventory(lineItem.variant_id, quantity)
if (currentItem) {
await this.lineItemService_
.withTransaction(transactionManager)
.update(currentItem.id, {
quantity: currentItem.quantity,
})
await lineItemServiceTx.update(currentItem.id, {
quantity: currentItem.quantity,
})
} else {
await this.lineItemService_
.withTransaction(transactionManager)
.create({
...lineItem,
has_shipping: false,
cart_id: cartId,
})
await lineItemServiceTx.create({
...lineItem,
has_shipping: false,
cart_id: cart.id,
})
}
const lineItemRepository = transactionManager.getCustomRepository(
this.lineItemRepository_
)
await lineItemRepository.update(
{
id: In(cart.items.map((item) => item.id)),
},
{
has_shipping: false,
}
)
await lineItemServiceTx
.update(
{ cart_id: cartId, has_shipping: true },
{ has_shipping: false }
)
.catch(() => void 0)
const result = await this.retrieve(cartId, {
cart = await this.retrieve(cart.id, {
relations: ["items", "discounts", "discounts.rule", "region"],
})
await this.refreshAdjustments_(result)
await this.refreshAdjustments_(cart)
await this.eventBus_
.withTransaction(transactionManager)
.emit(CartService.Events.UPDATED, result)
return result
.emit(CartService.Events.UPDATED, { id: cart.id })
}
)
}

View File

@@ -4,7 +4,10 @@ import { TransactionBaseService } from "../interfaces"
import { DeepPartial, EntityManager } from "typeorm"
import { IdempotencyKeyRepository } from "../repositories/idempotency-key"
import { IdempotencyKey } from "../models"
import { CreateIdempotencyKeyInput } from "../types/idempotency-key"
import {
CreateIdempotencyKeyInput,
IdempotencyCallbackResult,
} from "../types/idempotency-key"
const KEY_LOCKED_TIMEOUT = 1000
@@ -163,14 +166,9 @@ class IdempotencyKeyService extends TransactionBaseService {
*/
async workStage(
idempotencyKey: string,
callback: (transactionManager: EntityManager) => Promise<
| {
recovery_point?: string
response_code?: number
response_body?: Record<string, unknown>
}
| never
>
callback: (
transactionManager: EntityManager
) => Promise<IdempotencyCallbackResult | never>
): Promise<IdempotencyKey> {
return await this.atomicPhase_(async (manager) => {
const { recovery_point, response_code, response_body } = await callback(

View File

@@ -285,8 +285,8 @@ class LineItemService extends TransactionBaseService {
this.lineItemRepository_
)
const lineItem = lineItemRepository.create(data)
return await lineItemRepository.save(lineItem)
const item = lineItemRepository.create(data)
return await lineItemRepository.save(item)
}
)
}

View File

@@ -4,3 +4,9 @@ export type CreateIdempotencyKeyInput = {
request_path: string
idempotency_key?: string
}
export type IdempotencyCallbackResult = {
recovery_point?: string
response_code?: number
response_body?: Record<string, unknown>
}

View File

@@ -3,7 +3,9 @@ import { MedusaError } from "medusa-core-utils"
export enum PostgresError {
DUPLICATE_ERROR = "23505",
FOREIGN_KEY_ERROR = "23503",
SERIALIZATION_FAILURE = "40001",
}
export const formatException = (err): MedusaError => {
switch (err.code) {
case PostgresError.DUPLICATE_ERROR:
@@ -35,6 +37,12 @@ export const formatException = (err): MedusaError => {
} ${matches[2]} does not exist.`
)
}
case PostgresError.SERIALIZATION_FAILURE: {
return new MedusaError(
MedusaError.Types.CONFLICT,
err?.detail ?? err?.message
)
}
default:
return err
}

View File

@@ -0,0 +1,2 @@
export * from "./run-idempotency-step"
export * from "./initialize-idempotency-request"

View File

@@ -0,0 +1,26 @@
import { Request, Response } from "express"
import { IdempotencyKey } from "../../models"
import IdempotencyKeyService from "../../services/idempotency-key"
import { EntityManager } from "typeorm"
export async function initializeIdempotencyRequest(
req: Request,
res: Response
): Promise<IdempotencyKey> {
const idempotencyKeyService: IdempotencyKeyService = req.scope.resolve(
"idempotencyKeyService"
)
const manager: EntityManager = req.scope.resolve("manager")
const headerKey = req.get("Idempotency-Key") || ""
let idempotencyKey
idempotencyKey = await idempotencyKeyService
.withTransaction(manager)
.initializeRequest(headerKey, req.method, req.params, req.path)
res.setHeader("Access-Control-Expose-Headers", "Idempotency-Key")
res.setHeader("Idempotency-Key", idempotencyKey.idempotency_key)
return idempotencyKey
}

View File

@@ -0,0 +1,40 @@
import { IdempotencyCallbackResult } from "../../types/idempotency-key"
import { EntityManager } from "typeorm"
import { IdempotencyKey } from "../../models"
import { AwilixContainer } from "awilix"
import IdempotencyKeyService from "../../services/idempotency-key"
import { IsolationLevel } from "typeorm/driver/types/IsolationLevel"
export type RunIdempotencyStepOptions = {
manager: EntityManager
idempotencyKey: IdempotencyKey
container: AwilixContainer
isolationLevel: IsolationLevel
}
export async function runIdempotencyStep(
handler: ({ manager: EntityManager }) => Promise<IdempotencyCallbackResult>,
{
manager,
idempotencyKey,
container,
isolationLevel,
}: RunIdempotencyStepOptions
) {
const idempotencyKeyService: IdempotencyKeyService = container.resolve(
"idempotencyKeyService"
)
return await manager.transaction(
isolationLevel,
async (transactionManager) => {
const idempotencyKey_ = await idempotencyKeyService
.withTransaction(transactionManager)
.workStage(idempotencyKey.idempotency_key, async (stageManager) => {
return await handler({ manager: stageManager })
})
idempotencyKey.response_code = idempotencyKey_.response_code
idempotencyKey.response_body = idempotencyKey_.response_body
idempotencyKey.recovery_point = idempotencyKey_.recovery_point
}
)
}