feat(medusa): Improve add line item to cart perf and transaction management + clustering start command (#5701)

This commit is contained in:
Adrien de Peretti
2023-12-06 08:53:35 +01:00
committed by GitHub
parent a458cd144d
commit 6975eacb33
14 changed files with 341 additions and 147 deletions

View File

@@ -0,0 +1,7 @@
---
"@medusajs/medusa": patch
"@medusajs/medusa-cli": patch
"@medusajs/admin-ui": patch
---
feat(medusa, medusa-cli): Improve add line item + cluster starting with medusa cli

View File

@@ -145,27 +145,23 @@ module.exports = async (dataSource, data = {}) => {
name: "Test Region",
currency_code: "usd",
tax_rate: 0,
payment_providers: [
{
id: "test-pay",
is_installed: true,
},
],
})
await manager.query(
`insert into region_payment_providers values ('test-region', 'test-pay');`
)
await manager.insert(Region, {
id: "test-region-2",
name: "Test Region 2",
currency_code: "eur",
tax_rate: 0,
payment_providers: [
{
id: "test-pay",
is_installed: true,
},
],
})
await manager.query(
`insert into region_payment_providers values ('test-region-2', 'test-pay');`
)
await manager.insert(DiscountRule, {
id: "discount_rule_id",
description: "test description",

View File

@@ -17,7 +17,7 @@ const {
getContainer,
} = require("../../../../environment-helpers/use-container")
jest.setTimeout(30000)
jest.setTimeout(60000)
const adminHeaders = { headers: { "x-medusa-access-token": "test_token" } }

View File

@@ -15,7 +15,9 @@ export async function build({
options,
reporting = "fancy",
}: BuildArgs) {
await createCacheDir({ appDir, plugins })
if (!process.env.PLUGIN_ADMIN_UI_SKIP_CACHE) {
await createCacheDir({ appDir, plugins })
}
const cacheDir = path.resolve(appDir, ".cache")
const entry = path.resolve(cacheDir, "admin", "src", "main.tsx")

View File

@@ -5,6 +5,10 @@ import { createEntry } from "./create-entry"
import { logger } from "./logger"
async function copyAdmin(dest: string) {
if (process.env.PLUGIN_ADMIN_UI_SKIP_CACHE) {
return true
}
const adminDir = path.resolve(__dirname, "..", "ui")
const destDir = path.resolve(dest, "admin")

View File

@@ -13,6 +13,10 @@ import {
const FILE_EXT_REGEX = /\.[^/.]+$/
async function copyLocalExtensions(src: string, dest: string) {
if (process.env.PLUGIN_ADMIN_UI_SKIP_CACHE) {
return true
}
try {
await fse.copy(src, dest, {
filter: copyFilter,

View File

@@ -1,13 +1,13 @@
import path from "path"
import resolveCwd from "resolve-cwd"
import { sync as existsSync } from "fs-exists-cached"
import { setTelemetryEnabled } from "medusa-telemetry"
import path from "path"
import resolveCwd from "resolve-cwd"
import { getLocalMedusaVersion } from "./util/version"
import { didYouMean } from "./did-you-mean"
import { getLocalMedusaVersion } from "./util/version"
import reporter from "./reporter"
import { newStarter } from "./commands/new"
import reporter from "./reporter"
const yargs = require(`yargs`)
@@ -233,6 +233,42 @@ function buildLocalCommands(cli, isLocalProject) {
})
),
})
.command({
command: `start-cluster`,
desc: `Start development server in cluster mode (beta).`,
builder: (_) =>
_.option(`H`, {
alias: `host`,
type: `string`,
default: defaultHost,
describe: `Set host. Defaults to ${defaultHost}`,
})
.option(`p`, {
alias: `port`,
type: `string`,
default: process.env.PORT || defaultPort,
describe: process.env.PORT
? `Set port. Defaults to ${process.env.PORT} (set by env.PORT) (otherwise defaults ${defaultPort})`
: `Set port. Defaults to ${defaultPort}`,
})
.option(`c`, {
alias: `cpus`,
type: `number`,
default: process.env.CPUS,
describe:
"Set number of cpus to use. Defaults to max number of cpus available on the system (set by env.CPUS)",
}),
handler: handlerP(
getCommandHandler(`start-cluster`, (args, cmd) => {
process.env.NODE_ENV = process.env.NODE_ENV || `development`
cmd(args)
// Return an empty promise to prevent handlerP from exiting early.
// The development server shouldn't ever exit until the user directly
// kills it so this is fine.
return new Promise((resolve) => {})
})
),
})
.command({
command: `user`,
desc: `Create a user`,

View File

@@ -2,16 +2,18 @@ import { IsInt, IsOptional, IsString } from "class-validator"
import { EntityManager } from "typeorm"
import { validator } from "../../../../../utils/validator"
import {
addOrUpdateLineItem,
CreateLineItemSteps,
handleAddOrUpdateLineItem,
setPaymentSession,
setVariantAvailability,
} from "./utils/handler-steps"
import { IdempotencyKey } from "../../../../../models"
import {
initializeIdempotencyRequest,
runIdempotencyStep,
RunIdempotencyStepOptions,
} from "../../../../../utils/idempotency"
import { initializeIdempotencyRequest } from "../../../../../utils/idempotency"
import { cleanResponseData } from "../../../../../utils/clean-response-data"
import IdempotencyKeyService from "../../../../../services/idempotency-key"
import { defaultStoreCartFields, defaultStoreCartRelations } from "../index"
import { CartService } from "../../../../../services"
import { promiseAll } from "@medusajs/utils"
/**
* @oas [post] /store/carts/{id}/line-items
@@ -89,34 +91,82 @@ export default async (req, res) => {
let inProgress = true
let err: unknown = false
const stepOptions: RunIdempotencyStepOptions = {
manager,
idempotencyKey,
container: req.scope,
isolationLevel: "SERIALIZABLE",
}
const idempotencyKeyService: IdempotencyKeyService = req.scope.resolve(
"idempotencyKeyService"
)
while (inProgress) {
switch (idempotencyKey.recovery_point) {
case CreateLineItemSteps.STARTED: {
await runIdempotencyStep(async ({ manager }) => {
return await handleAddOrUpdateLineItem(
id,
{
customer_id: customerId,
metadata: validated.metadata,
quantity: validated.quantity,
variant_id: validated.variant_id,
},
{
manager,
container: req.scope,
}
)
}, stepOptions).catch((e) => {
try {
const cartId = id
const data = {
customer_id: customerId,
metadata: validated.metadata,
quantity: validated.quantity,
variant_id: validated.variant_id,
}
await addOrUpdateLineItem({
cartId,
container: req.scope,
manager,
data,
})
idempotencyKey = await idempotencyKeyService
.withTransaction(manager)
.update(idempotencyKey.idempotency_key, {
recovery_point: CreateLineItemSteps.SET_PAYMENT_SESSIONS,
})
} catch (e) {
inProgress = false
err = e
})
}
break
}
case CreateLineItemSteps.SET_PAYMENT_SESSIONS: {
try {
const cartService: CartService = req.scope.resolve("cartService")
const cart = await cartService
.withTransaction(manager)
.retrieveWithTotals(id, {
select: defaultStoreCartFields,
relations: [
...defaultStoreCartRelations,
"billing_address",
"region.payment_providers",
"payment_sessions",
"customer",
],
})
const args = {
cart,
container: req.scope,
manager,
}
await promiseAll([
setVariantAvailability(args),
setPaymentSession(args),
])
idempotencyKey = await idempotencyKeyService
.withTransaction(manager)
.update(idempotencyKey.idempotency_key, {
recovery_point: CreateLineItemSteps.FINISHED,
response_code: 200,
response_body: { cart },
})
} catch (e) {
inProgress = false
err = e
}
break
}

View File

@@ -1,6 +1,3 @@
import { FlagRouter } from "@medusajs/utils"
import { AwilixContainer } from "awilix"
import { EntityManager } from "typeorm"
import { Cart } from "../../../../../../models"
import {
CartService,
@@ -8,36 +5,25 @@ import {
ProductVariantInventoryService,
} from "../../../../../../services"
import { WithRequiredProperty } from "../../../../../../types/common"
import { IdempotencyCallbackResult } from "../../../../../../types/idempotency-key"
import { defaultStoreCartFields, defaultStoreCartRelations } from "../../index"
import SalesChannelFeatureFlag from "../../../../../../loaders/feature-flags/sales-channels"
import { MedusaError } from "medusa-core-utils"
import { featureFlagRouter } from "../../../../../../loaders/feature-flags"
export const CreateLineItemSteps = {
STARTED: "started",
SET_PAYMENT_SESSIONS: "set-payment-sessions",
FINISHED: "finished",
}
export async function handleAddOrUpdateLineItem(
cartId: string,
data: {
metadata?: Record<string, unknown>
customer_id?: string
variant_id: string
quantity: number
},
{ container, manager }: { container: AwilixContainer; manager: EntityManager }
): Promise<IdempotencyCallbackResult> {
export async function addOrUpdateLineItem({
cartId,
container,
manager,
data,
}) {
const cartService: CartService = container.resolve("cartService")
const lineItemService: LineItemService = container.resolve("lineItemService")
const featureFlagRouter: FlagRouter = container.resolve("featureFlagRouter")
const productVariantInventoryService: ProductVariantInventoryService =
container.resolve("productVariantInventoryService")
const txCartService = cartService.withTransaction(manager)
let cart = await txCartService.retrieve(cartId, {
const cart = await cartService.retrieve(cartId, {
select: ["id", "region_id", "customer_id"],
})
@@ -48,42 +34,46 @@ export async function handleAddOrUpdateLineItem(
metadata: data.metadata,
})
await txCartService.addOrUpdateLineItems(cart.id, line, {
validateSalesChannels: featureFlagRouter.isFeatureEnabled("sales_channels"),
})
await manager.transaction(async (transactionManager) => {
const txCartService = cartService.withTransaction(transactionManager)
const relations = [
...defaultStoreCartRelations,
"billing_address",
"region.payment_providers",
"payment_sessions",
"customer",
]
await txCartService.addOrUpdateLineItems(cart.id, line, {
validateSalesChannels:
featureFlagRouter.isFeatureEnabled("sales_channels"),
})
})
}
export async function setPaymentSession({ cart, container, manager }) {
const cartService: CartService = container.resolve("cartService")
const txCartService = cartService.withTransaction(manager)
if (!cart.payment_sessions?.length) {
return
}
return await txCartService.setPaymentSessions(
cart as WithRequiredProperty<Cart, "total">
)
}
export async function setVariantAvailability({ cart, container, manager }) {
const productVariantInventoryService: ProductVariantInventoryService =
container.resolve("productVariantInventoryService")
const shouldSetAvailability =
relations?.some((rel) => rel.includes("variant")) &&
cart.items?.some((item) => !!item.variant) &&
featureFlagRouter.isFeatureEnabled(SalesChannelFeatureFlag.key)
cart = await txCartService.retrieveWithTotals(cart.id, {
select: defaultStoreCartFields,
relations,
})
if (!shouldSetAvailability) {
return
}
if (shouldSetAvailability) {
await productVariantInventoryService.setVariantAvailability(
return await productVariantInventoryService
.withTransaction(manager)
.setVariantAvailability(
cart.items.map((i) => i.variant),
cart.sales_channel_id!
)
}
if (cart.payment_sessions?.length) {
await txCartService.setPaymentSessions(
cart as WithRequiredProperty<Cart, "total">
)
}
return {
response_code: 200,
response_body: { cart },
}
}

View File

@@ -8,7 +8,6 @@ import { defaultStoreCartFields, defaultStoreCartRelations } from "."
import { EntityManager } from "typeorm"
import { MedusaError } from "medusa-core-utils"
import { cleanResponseData } from "../../../../utils/clean-response-data"
import { handleAddOrUpdateLineItem } from "./create-line-item/utils/handler-steps"
/**
* @oas [post] /store/carts/{id}/line-items/{line_id}

View File

@@ -0,0 +1,94 @@
import "core-js/stable"
import "regenerator-runtime/runtime"
import cluster from "cluster"
import express from "express"
import { GracefulShutdownServer } from "medusa-core-utils"
import { track } from "medusa-telemetry"
import { scheduleJob } from "node-schedule"
import os from "os"
import loaders from "../loaders"
import Logger from "../loaders/logger"
const EVERY_SIXTH_HOUR = "0 */6 * * *"
const CRON_SCHEDULE = EVERY_SIXTH_HOUR
let isShuttingDown = false
export default async function ({ port, cpus, directory }) {
if (cluster.isPrimary) {
const killMainProccess = () => process.exit(0)
cpus ??= os.cpus().length
const numCPUs = Math.min(os.cpus().length, cpus)
for (let index = 0; index < numCPUs; index++) {
const worker = cluster.fork()
worker.send({ index })
}
cluster.on("exit", (worker) => {
if (!isShuttingDown) {
cluster.fork()
} else if (Object.keys(cluster.workers).length === 0) {
setTimeout(killMainProccess, 100)
}
})
const gracefulShutDown = () => {
isShuttingDown = true
for (const id of Object.keys(cluster.workers)) {
cluster.workers[id].kill("SIGTERM")
}
}
scheduleJob(CRON_SCHEDULE, () => {
track("PING")
})
process.on("SIGTERM", gracefulShutDown)
process.on("SIGINT", gracefulShutDown)
} else {
const start = async () => {
track("CLI_START")
const app = express()
const { dbConnection } = await loaders({ directory, expressApp: app })
const serverActivity = Logger.activity(`Creating server`)
const server = GracefulShutdownServer.create(
app.listen(port, (err) => {
if (err) {
return
}
Logger.success(serverActivity, `Server is ready on port: ${port}`)
track("CLI_START_COMPLETED")
})
)
const gracefulShutDown = () => {
server
.shutdown()
.then(() => {
process.exit(0)
})
.catch((e) => {
process.exit(1)
})
}
process.on("SIGTERM", gracefulShutDown)
process.on("SIGINT", gracefulShutDown)
return { dbConnection, server }
}
process.on("message", async (msg) => {
if (msg.index > 0) {
process.env.PLUGIN_ADMIN_UI_SKIP_CACHE = true
}
await start()
})
}
}

View File

@@ -27,6 +27,12 @@ export const IdempotencyKeyService = {
}
}
}),
update: jest.fn().mockImplementation(async (key, data) => {
return {
...data,
idempotency_key: key,
}
}),
}
const mock = jest.fn().mockImplementation(() => {

View File

@@ -797,47 +797,45 @@ class ProductVariantInventoryService extends TransactionBaseService {
availabilityContext
)
return await promiseAll(
variants.map(async (variant) => {
if (!variant.id) {
return variant
}
variant.purchasable = variant.allow_backorder
if (!variant.manage_inventory) {
variant.purchasable = true
return variant
}
const variantInventory = variantInventoryMap.get(variant.id) || []
if (!variantInventory.length) {
delete variant.inventory_quantity
variant.purchasable = true
return variant
}
if (!salesChannelId) {
delete variant.inventory_quantity
variant.purchasable = false
return variant
}
const locations =
inventoryLocationMap.get(variantInventory[0].inventory_item_id) ?? []
variant.inventory_quantity = locations.reduce(
(acc, next) => acc + (next.stocked_quantity - next.reserved_quantity),
0
)
variant.purchasable =
variant.inventory_quantity > 0 || variant.allow_backorder
return variants.map((variant) => {
if (!variant.id) {
return variant
})
)
}
variant.purchasable = variant.allow_backorder
if (!variant.manage_inventory) {
variant.purchasable = true
return variant
}
const variantInventory = variantInventoryMap.get(variant.id) || []
if (!variantInventory.length) {
delete variant.inventory_quantity
variant.purchasable = true
return variant
}
if (!salesChannelId) {
delete variant.inventory_quantity
variant.purchasable = false
return variant
}
const locations =
inventoryLocationMap.get(variantInventory[0].inventory_item_id) ?? []
variant.inventory_quantity = locations.reduce(
(acc, next) => acc + (next.stocked_quantity - next.reserved_quantity),
0
)
variant.purchasable =
variant.inventory_quantity > 0 || variant.allow_backorder
return variant
})
}
private async getAvailabilityContext(

View File

@@ -244,20 +244,28 @@ class CartCompletionStrategy extends AbstractCartCompletionStrategy {
return res
}
const cart = await this.cartService_
.withTransaction(manager)
.authorizePayment(id, {
...context,
cart_id: id,
idempotency_key: idempotencyKey,
})
const txCartService = this.cartService_.withTransaction(manager)
let cart = await txCartService.retrieve(id, {
relations: ["payment_sessions"],
})
if (cart.payment_sessions?.length) {
await txCartService.setPaymentSessions(id)
}
cart = await txCartService.authorizePayment(id, {
...context,
cart_id: id,
idempotency_key: idempotencyKey,
})
if (cart.payment_session) {
if (
cart.payment_session.status === "requires_more" ||
cart.payment_session.status === "pending"
) {
await this.cartService_.withTransaction(manager).deleteTaxLines(id)
await txCartService.deleteTaxLines(id)
return {
response_code: 200,