Feat: TO variant creation (#3097)
This commit is contained in:
committed by
GitHub
parent
a049987215
commit
d50db84a33
5
.changeset/perfect-worms-hammer.md
Normal file
5
.changeset/perfect-worms-hammer.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
"@medusajs/medusa": minor
|
||||
---
|
||||
|
||||
ProductService.update no longer create product variants. It was moved to the endpoint handler update-product.ts
|
||||
@@ -3,6 +3,7 @@ import { IsBoolean, IsNumber, IsOptional, IsString } from "class-validator"
|
||||
|
||||
import { IInventoryService } from "../../../../interfaces"
|
||||
import { FindParams } from "../../../../types/common"
|
||||
import { EntityManager } from "typeorm"
|
||||
|
||||
/**
|
||||
* @oas [post] /inventory-items/{id}
|
||||
@@ -71,11 +72,16 @@ export default async (req: Request, res: Response) => {
|
||||
|
||||
const inventoryService: IInventoryService =
|
||||
req.scope.resolve("inventoryService")
|
||||
const manager: EntityManager = req.scope.resolve("manager")
|
||||
|
||||
await inventoryService.updateInventoryItem(
|
||||
id,
|
||||
req.validatedBody as AdminPostInventoryItemsInventoryItemReq
|
||||
)
|
||||
await manager.transaction(async (transactionManager) => {
|
||||
await inventoryService
|
||||
.withTransaction(transactionManager)
|
||||
.updateInventoryItem(
|
||||
id,
|
||||
req.validatedBody as AdminPostInventoryItemsInventoryItemReq
|
||||
)
|
||||
})
|
||||
|
||||
const inventoryItem = await inventoryService.retrieveInventoryItem(
|
||||
id,
|
||||
|
||||
@@ -3,6 +3,7 @@ import { IsOptional, IsNumber } from "class-validator"
|
||||
|
||||
import { IInventoryService } from "../../../../interfaces"
|
||||
import { FindParams } from "../../../../types/common"
|
||||
import { EntityManager } from "typeorm"
|
||||
|
||||
/**
|
||||
* @oas [post] /inventory-items/{id}/location-levels/{location_id}
|
||||
@@ -72,11 +73,16 @@ export default async (req: Request, res: Response) => {
|
||||
|
||||
const inventoryService: IInventoryService =
|
||||
req.scope.resolve("inventoryService")
|
||||
const manager: EntityManager = req.scope.resolve("manager")
|
||||
|
||||
const validatedBody =
|
||||
req.validatedBody as AdminPostInventoryItemsItemLocationLevelsLevelReq
|
||||
|
||||
await inventoryService.updateInventoryLevel(id, location_id, validatedBody)
|
||||
await manager.transaction(async (transactionManager) => {
|
||||
await inventoryService
|
||||
.withTransaction(transactionManager)
|
||||
.updateInventoryLevel(id, location_id, validatedBody)
|
||||
})
|
||||
|
||||
const inventoryItem = await inventoryService.retrieveInventoryItem(
|
||||
id,
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { IdMap } from "medusa-test-utils"
|
||||
import { request } from "../../../../../helpers/test-request"
|
||||
import { ProductServiceMock } from "../../../../../services/__mocks__/product"
|
||||
import { ProductVariantServiceMock } from "../../../../../services/__mocks__/product-variant"
|
||||
import { EventBusServiceMock } from "../../../../../services/__mocks__/event-bus"
|
||||
|
||||
describe("POST /admin/products/:id", () => {
|
||||
describe("successfully updates a product", () => {
|
||||
@@ -9,12 +11,17 @@ describe("POST /admin/products/:id", () => {
|
||||
beforeAll(async () => {
|
||||
subject = await request(
|
||||
"POST",
|
||||
`/admin/products/${IdMap.getId("product1")}`,
|
||||
`/admin/products/${IdMap.getId("multipleVariants")}`,
|
||||
{
|
||||
payload: {
|
||||
title: "Product 1",
|
||||
description: "Updated test description",
|
||||
handle: "handle",
|
||||
variants: [
|
||||
{ id: IdMap.getId("variant_1"), title: "Green" },
|
||||
{ title: "Blue" },
|
||||
{ title: "Yellow" },
|
||||
],
|
||||
},
|
||||
adminSession: {
|
||||
jwt: {
|
||||
@@ -32,7 +39,7 @@ describe("POST /admin/products/:id", () => {
|
||||
it("calls update", () => {
|
||||
expect(ProductServiceMock.update).toHaveBeenCalledTimes(1)
|
||||
expect(ProductServiceMock.update).toHaveBeenCalledWith(
|
||||
IdMap.getId("product1"),
|
||||
IdMap.getId("multipleVariants"),
|
||||
expect.objectContaining({
|
||||
title: "Product 1",
|
||||
description: "Updated test description",
|
||||
@@ -40,51 +47,35 @@ describe("POST /admin/products/:id", () => {
|
||||
})
|
||||
)
|
||||
})
|
||||
|
||||
it("successfully updates variants and create new ones", async () => {
|
||||
expect(ProductVariantServiceMock.delete).toHaveBeenCalledTimes(2)
|
||||
expect(ProductVariantServiceMock.update).toHaveBeenCalledTimes(1)
|
||||
expect(ProductVariantServiceMock.create).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
})
|
||||
|
||||
describe("handles failed update operation", () => {
|
||||
it("throws if metadata is to be updated", async () => {
|
||||
try {
|
||||
await request("POST", `/admin/products/${IdMap.getId("product1")}`, {
|
||||
it("throws on wrong variant in update", async () => {
|
||||
const subject = await request(
|
||||
"POST",
|
||||
`/admin/products/${IdMap.getId("variantsWithPrices")}`,
|
||||
{
|
||||
payload: {
|
||||
_id: IdMap.getId("product1"),
|
||||
title: "Product 1",
|
||||
metadata: "Test Description",
|
||||
variants: [{ id: "test_321", title: "Green" }],
|
||||
},
|
||||
adminSession: {
|
||||
jwt: {
|
||||
userId: IdMap.getId("admin_user"),
|
||||
},
|
||||
},
|
||||
})
|
||||
} catch (error) {
|
||||
expect(error.status).toEqual(400)
|
||||
expect(error.message).toEqual(
|
||||
"Use setMetadata to update metadata fields"
|
||||
)
|
||||
}
|
||||
})
|
||||
|
||||
it("throws if variants is to be updated", async () => {
|
||||
try {
|
||||
await request("POST", `/admin/products/${IdMap.getId("product1")}`, {
|
||||
payload: {
|
||||
_id: IdMap.getId("product1"),
|
||||
title: "Product 1",
|
||||
metadata: "Test Description",
|
||||
},
|
||||
adminSession: {
|
||||
jwt: {
|
||||
userId: IdMap.getId("admin_user"),
|
||||
},
|
||||
},
|
||||
})
|
||||
} catch (error) {
|
||||
expect(error.status).toEqual(400)
|
||||
expect(error.message).toEqual(
|
||||
"Use addVariant, reorderVariants, removeVariant to update Product Variants"
|
||||
)
|
||||
}
|
||||
}
|
||||
)
|
||||
expect(subject.status).toEqual(404)
|
||||
expect(subject.error.text).toEqual(
|
||||
`{"type":"not_found","message":"Variant with id: test_321 is not associated with this product"}`
|
||||
)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -12,6 +12,7 @@ import { defaultAdminProductFields, defaultAdminProductRelations } from "."
|
||||
import {
|
||||
PricingService,
|
||||
ProductService,
|
||||
ProductVariantInventoryService,
|
||||
ProductVariantService,
|
||||
ShippingProfileService,
|
||||
} from "../../../../services"
|
||||
@@ -32,6 +33,14 @@ import SalesChannelFeatureFlag from "../../../../loaders/feature-flags/sales-cha
|
||||
import { ProductStatus } from "../../../../models"
|
||||
import { FeatureFlagDecorators } from "../../../../utils/feature-flag-decorators"
|
||||
import { validator } from "../../../../utils/validator"
|
||||
import { IInventoryService } from "../../../../interfaces"
|
||||
|
||||
import {
|
||||
createVariantTransaction,
|
||||
revertVariantTransaction,
|
||||
} from "./transaction/create-product-variant"
|
||||
import { DistributedTransaction } from "../../../../utils/transaction"
|
||||
import { Logger } from "../../../../types/global"
|
||||
|
||||
/**
|
||||
* @oas [post] /products
|
||||
@@ -98,6 +107,7 @@ import { validator } from "../../../../utils/validator"
|
||||
export default async (req, res) => {
|
||||
const validated = await validator(AdminPostProductsReq, req.body)
|
||||
|
||||
const logger: Logger = req.scope.resolve("logger")
|
||||
const productService: ProductService = req.scope.resolve("productService")
|
||||
const pricingService: PricingService = req.scope.resolve("pricingService")
|
||||
const productVariantService: ProductVariantService = req.scope.resolve(
|
||||
@@ -106,6 +116,10 @@ export default async (req, res) => {
|
||||
const shippingProfileService: ShippingProfileService = req.scope.resolve(
|
||||
"shippingProfileService"
|
||||
)
|
||||
const productVariantInventoryService: ProductVariantInventoryService =
|
||||
req.scope.resolve("productVariantInventoryService")
|
||||
const inventoryService: IInventoryService | undefined =
|
||||
req.scope.resolve("inventoryService")
|
||||
|
||||
const entityManager: EntityManager = req.scope.resolve("manager")
|
||||
|
||||
@@ -134,8 +148,8 @@ export default async (req, res) => {
|
||||
.create({ ...validated, profile_id: shippingProfile.id })
|
||||
|
||||
if (variants) {
|
||||
for (const [i, variant] of variants.entries()) {
|
||||
variant["variant_rank"] = i
|
||||
for (const [index, variant] of variants.entries()) {
|
||||
variant["variant_rank"] = index
|
||||
}
|
||||
|
||||
const optionIds =
|
||||
@@ -143,22 +157,48 @@ export default async (req, res) => {
|
||||
(o) => newProduct.options.find((newO) => newO.title === o.title)?.id
|
||||
) || []
|
||||
|
||||
await Promise.all(
|
||||
variants.map(async (v) => {
|
||||
const variant = {
|
||||
...v,
|
||||
options:
|
||||
v?.options?.map((o, index) => ({
|
||||
...o,
|
||||
option_id: optionIds[index],
|
||||
})) || [],
|
||||
}
|
||||
const allVariantTransactions: DistributedTransaction[] = []
|
||||
const transactionDependencies = {
|
||||
manager,
|
||||
inventoryService,
|
||||
productVariantInventoryService,
|
||||
productVariantService,
|
||||
}
|
||||
|
||||
await productVariantService
|
||||
.withTransaction(manager)
|
||||
.create(newProduct.id, variant as CreateProductVariantInput)
|
||||
})
|
||||
)
|
||||
try {
|
||||
await Promise.all(
|
||||
variants.map(async (variant) => {
|
||||
const options =
|
||||
variant?.options?.map((option, index) => ({
|
||||
...option,
|
||||
option_id: optionIds[index],
|
||||
})) || []
|
||||
|
||||
const input = {
|
||||
...variant,
|
||||
options,
|
||||
}
|
||||
|
||||
const varTransation = await createVariantTransaction(
|
||||
transactionDependencies,
|
||||
newProduct.id,
|
||||
input as CreateProductVariantInput
|
||||
)
|
||||
allVariantTransactions.push(varTransation)
|
||||
})
|
||||
)
|
||||
} catch (e) {
|
||||
await Promise.all(
|
||||
allVariantTransactions.map(async (transaction) => {
|
||||
await revertVariantTransaction(
|
||||
transactionDependencies,
|
||||
transaction
|
||||
).catch(() => logger.warn("Transaction couldn't be reverted."))
|
||||
})
|
||||
)
|
||||
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
return newProduct
|
||||
|
||||
@@ -22,18 +22,10 @@ import {
|
||||
} from "../../../../types/product-variant"
|
||||
import { validator } from "../../../../utils/validator"
|
||||
|
||||
import {
|
||||
TransactionHandlerType,
|
||||
TransactionOrchestrator,
|
||||
TransactionPayload,
|
||||
TransactionState,
|
||||
TransactionStepsDefinition,
|
||||
} from "../../../../utils/transaction"
|
||||
|
||||
import { ulid } from "ulid"
|
||||
import { MedusaError } from "medusa-core-utils"
|
||||
import { EntityManager } from "typeorm"
|
||||
|
||||
import { createVariantTransaction } from "./transaction/create-product-variant"
|
||||
|
||||
/**
|
||||
* @oas [post] /products/{id}/variants
|
||||
* operationId: "PostProductsProductVariants"
|
||||
@@ -122,47 +114,6 @@ import { EntityManager } from "typeorm"
|
||||
* $ref: "#/components/responses/500_error"
|
||||
*/
|
||||
|
||||
enum actions {
|
||||
createVariant = "createVariant",
|
||||
createInventoryItem = "createInventoryItem",
|
||||
attachInventoryItem = "attachInventoryItem",
|
||||
}
|
||||
|
||||
const simpleFlow: TransactionStepsDefinition = {
|
||||
next: {
|
||||
action: actions.createVariant,
|
||||
maxRetries: 0,
|
||||
},
|
||||
}
|
||||
|
||||
const flowWithInventory: TransactionStepsDefinition = {
|
||||
next: {
|
||||
action: actions.createVariant,
|
||||
forwardResponse: true,
|
||||
maxRetries: 0,
|
||||
next: {
|
||||
action: actions.createInventoryItem,
|
||||
forwardResponse: true,
|
||||
maxRetries: 0,
|
||||
next: {
|
||||
action: actions.attachInventoryItem,
|
||||
noCompensation: true,
|
||||
maxRetries: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
const createSimpleVariantStrategy = new TransactionOrchestrator(
|
||||
"create-variant",
|
||||
simpleFlow
|
||||
)
|
||||
|
||||
const createVariantStrategyWithInventory = new TransactionOrchestrator(
|
||||
"create-variant-with-inventory",
|
||||
flowWithInventory
|
||||
)
|
||||
|
||||
export default async (req, res) => {
|
||||
const { id } = req.params
|
||||
|
||||
@@ -179,129 +130,19 @@ export default async (req, res) => {
|
||||
"productVariantService"
|
||||
)
|
||||
|
||||
const createdId: Record<string, string | null> = {
|
||||
variant: null,
|
||||
inventoryItem: null,
|
||||
}
|
||||
|
||||
const manager: EntityManager = req.scope.resolve("manager")
|
||||
|
||||
await manager.transaction(async (transactionManager) => {
|
||||
const inventoryServiceTx =
|
||||
inventoryService?.withTransaction(transactionManager)
|
||||
|
||||
const productVariantInventoryServiceTx =
|
||||
productVariantInventoryService.withTransaction(transactionManager)
|
||||
|
||||
const productVariantServiceTx =
|
||||
productVariantService.withTransaction(transactionManager)
|
||||
|
||||
async function createVariant() {
|
||||
const variant = await productVariantServiceTx.create(
|
||||
id,
|
||||
validated as CreateProductVariantInput
|
||||
)
|
||||
|
||||
createdId.variant = variant.id
|
||||
|
||||
return { variant }
|
||||
}
|
||||
|
||||
async function removeVariant() {
|
||||
if (createdId.variant) {
|
||||
await productVariantServiceTx.delete(createdId.variant)
|
||||
}
|
||||
}
|
||||
|
||||
async function createInventoryItem(variant) {
|
||||
if (!validated.manage_inventory) {
|
||||
return
|
||||
}
|
||||
|
||||
const inventoryItem = await inventoryServiceTx!.createInventoryItem({
|
||||
sku: validated.sku,
|
||||
origin_country: validated.origin_country,
|
||||
hs_code: validated.hs_code,
|
||||
mid_code: validated.mid_code,
|
||||
material: validated.material,
|
||||
weight: validated.weight,
|
||||
length: validated.length,
|
||||
height: validated.height,
|
||||
width: validated.width,
|
||||
})
|
||||
|
||||
createdId.inventoryItem = inventoryItem.id
|
||||
|
||||
return { variant, inventoryItem }
|
||||
}
|
||||
|
||||
async function removeInventoryItem() {
|
||||
if (createdId.inventoryItem) {
|
||||
await inventoryServiceTx!.deleteInventoryItem(createdId.inventoryItem)
|
||||
}
|
||||
}
|
||||
|
||||
async function attachInventoryItem(variant, inventoryItem) {
|
||||
if (!validated.manage_inventory) {
|
||||
return
|
||||
}
|
||||
|
||||
await productVariantInventoryServiceTx.attachInventoryItem(
|
||||
variant.id,
|
||||
inventoryItem.id,
|
||||
validated.inventory_quantity
|
||||
)
|
||||
}
|
||||
|
||||
async function transactionHandler(
|
||||
actionId: string,
|
||||
type: TransactionHandlerType,
|
||||
payload: TransactionPayload
|
||||
) {
|
||||
const command = {
|
||||
[actions.createVariant]: {
|
||||
[TransactionHandlerType.INVOKE]: async () => {
|
||||
return await createVariant()
|
||||
},
|
||||
[TransactionHandlerType.COMPENSATE]: async () => {
|
||||
await removeVariant()
|
||||
},
|
||||
},
|
||||
[actions.createInventoryItem]: {
|
||||
[TransactionHandlerType.INVOKE]: async (data) => {
|
||||
const { variant } = data._response ?? {}
|
||||
return await createInventoryItem(variant)
|
||||
},
|
||||
[TransactionHandlerType.COMPENSATE]: async () => {
|
||||
await removeInventoryItem()
|
||||
},
|
||||
},
|
||||
[actions.attachInventoryItem]: {
|
||||
[TransactionHandlerType.INVOKE]: async (data) => {
|
||||
const { variant, inventoryItem } = data._response ?? {}
|
||||
return await attachInventoryItem(variant, inventoryItem)
|
||||
},
|
||||
},
|
||||
}
|
||||
return command[actionId][type](payload.data)
|
||||
}
|
||||
|
||||
const strategy = inventoryService
|
||||
? createVariantStrategyWithInventory
|
||||
: createSimpleVariantStrategy
|
||||
|
||||
const transaction = await strategy.beginTransaction(
|
||||
ulid(),
|
||||
transactionHandler,
|
||||
validated
|
||||
await createVariantTransaction(
|
||||
{
|
||||
manager: transactionManager,
|
||||
inventoryService,
|
||||
productVariantInventoryService,
|
||||
productVariantService,
|
||||
},
|
||||
id,
|
||||
validated as CreateProductVariantInput
|
||||
)
|
||||
await strategy.resume(transaction)
|
||||
|
||||
if (transaction.getState() !== TransactionState.DONE) {
|
||||
throw new MedusaError(
|
||||
MedusaError.Types.INVALID_DATA,
|
||||
transaction.errors.map((err) => err.error?.message).join("\n")
|
||||
)
|
||||
}
|
||||
})
|
||||
|
||||
const productService: ProductService = req.scope.resolve("productService")
|
||||
|
||||
@@ -0,0 +1,225 @@
|
||||
import {
|
||||
DistributedTransaction,
|
||||
TransactionHandlerType,
|
||||
TransactionOrchestrator,
|
||||
TransactionPayload,
|
||||
TransactionState,
|
||||
TransactionStepsDefinition,
|
||||
} from "../../../../../utils/transaction"
|
||||
import { ulid } from "ulid"
|
||||
import { EntityManager } from "typeorm"
|
||||
import { IInventoryService } from "../../../../../interfaces"
|
||||
import {
|
||||
ProductVariantInventoryService,
|
||||
ProductVariantService,
|
||||
} from "../../../../../services"
|
||||
import { CreateProductVariantInput } from "../../../../../types/product-variant"
|
||||
import { InventoryItemDTO } from "../../../../../types/inventory"
|
||||
import { ProductVariant } from "../../../../../models"
|
||||
import { MedusaError } from "medusa-core-utils"
|
||||
|
||||
enum actions {
|
||||
createVariant = "createVariant",
|
||||
createInventoryItem = "createInventoryItem",
|
||||
attachInventoryItem = "attachInventoryItem",
|
||||
}
|
||||
|
||||
const simpleFlow: TransactionStepsDefinition = {
|
||||
next: {
|
||||
action: actions.createVariant,
|
||||
},
|
||||
}
|
||||
|
||||
const flowWithInventory: TransactionStepsDefinition = {
|
||||
next: {
|
||||
action: actions.createVariant,
|
||||
saveResponse: true,
|
||||
next: {
|
||||
action: actions.createInventoryItem,
|
||||
saveResponse: true,
|
||||
next: {
|
||||
action: actions.attachInventoryItem,
|
||||
noCompensation: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
const createSimpleVariantStrategy = new TransactionOrchestrator(
|
||||
"create-variant",
|
||||
simpleFlow
|
||||
)
|
||||
|
||||
const createVariantStrategyWithInventory = new TransactionOrchestrator(
|
||||
"create-variant-with-inventory",
|
||||
flowWithInventory
|
||||
)
|
||||
|
||||
type InjectedDependencies = {
|
||||
manager: EntityManager
|
||||
productVariantService: ProductVariantService
|
||||
productVariantInventoryService: ProductVariantInventoryService
|
||||
inventoryService?: IInventoryService
|
||||
}
|
||||
|
||||
export const createVariantTransaction = async (
|
||||
dependencies: InjectedDependencies,
|
||||
productId: string,
|
||||
input: CreateProductVariantInput
|
||||
): Promise<DistributedTransaction> => {
|
||||
const {
|
||||
manager,
|
||||
productVariantService,
|
||||
inventoryService,
|
||||
productVariantInventoryService,
|
||||
} = dependencies
|
||||
|
||||
const inventoryServiceTx = inventoryService?.withTransaction(manager)
|
||||
|
||||
const productVariantInventoryServiceTx =
|
||||
productVariantInventoryService.withTransaction(manager)
|
||||
|
||||
const productVariantServiceTx = productVariantService.withTransaction(manager)
|
||||
|
||||
async function createVariant(variantInput: CreateProductVariantInput) {
|
||||
const variant = await productVariantServiceTx.create(
|
||||
productId,
|
||||
variantInput
|
||||
)
|
||||
|
||||
return { variant }
|
||||
}
|
||||
|
||||
async function removeVariant(variant: ProductVariant) {
|
||||
if (variant) {
|
||||
await productVariantServiceTx.delete(variant.id)
|
||||
}
|
||||
}
|
||||
|
||||
async function createInventoryItem(variant: ProductVariant) {
|
||||
if (!variant.manage_inventory) {
|
||||
return
|
||||
}
|
||||
|
||||
const inventoryItem = await inventoryServiceTx!.createInventoryItem({
|
||||
sku: variant.sku,
|
||||
origin_country: variant.origin_country,
|
||||
hs_code: variant.hs_code,
|
||||
mid_code: variant.mid_code,
|
||||
material: variant.material,
|
||||
weight: variant.weight,
|
||||
length: variant.length,
|
||||
height: variant.height,
|
||||
width: variant.width,
|
||||
})
|
||||
|
||||
return { inventoryItem }
|
||||
}
|
||||
|
||||
async function removeInventoryItem(inventoryItem: InventoryItemDTO) {
|
||||
if (inventoryItem) {
|
||||
await inventoryServiceTx!.deleteInventoryItem(inventoryItem.id)
|
||||
}
|
||||
}
|
||||
|
||||
async function attachInventoryItem(
|
||||
variant: ProductVariant,
|
||||
inventoryItem: InventoryItemDTO
|
||||
) {
|
||||
if (!variant.manage_inventory) {
|
||||
return
|
||||
}
|
||||
|
||||
await productVariantInventoryServiceTx.attachInventoryItem(
|
||||
variant.id,
|
||||
inventoryItem.id
|
||||
)
|
||||
}
|
||||
|
||||
async function transactionHandler(
|
||||
actionId: string,
|
||||
type: TransactionHandlerType,
|
||||
payload: TransactionPayload
|
||||
) {
|
||||
const command = {
|
||||
[actions.createVariant]: {
|
||||
[TransactionHandlerType.INVOKE]: async (
|
||||
data: CreateProductVariantInput
|
||||
) => {
|
||||
return await createVariant(data)
|
||||
},
|
||||
[TransactionHandlerType.COMPENSATE]: async (
|
||||
data: CreateProductVariantInput,
|
||||
{ invoke }
|
||||
) => {
|
||||
await removeVariant(invoke[actions.createVariant])
|
||||
},
|
||||
},
|
||||
[actions.createInventoryItem]: {
|
||||
[TransactionHandlerType.INVOKE]: async (
|
||||
data: CreateProductVariantInput,
|
||||
{ invoke }
|
||||
) => {
|
||||
const { [actions.createVariant]: variant } = invoke
|
||||
|
||||
return await createInventoryItem(variant)
|
||||
},
|
||||
[TransactionHandlerType.COMPENSATE]: async (
|
||||
data: CreateProductVariantInput,
|
||||
{ invoke }
|
||||
) => {
|
||||
await removeInventoryItem(invoke[actions.createInventoryItem])
|
||||
},
|
||||
},
|
||||
[actions.attachInventoryItem]: {
|
||||
[TransactionHandlerType.INVOKE]: async (
|
||||
data: CreateProductVariantInput,
|
||||
{ invoke }
|
||||
) => {
|
||||
const {
|
||||
[actions.createVariant]: variant,
|
||||
[actions.createInventoryItem]: inventoryItem,
|
||||
} = invoke
|
||||
|
||||
return await attachInventoryItem(variant, inventoryItem)
|
||||
},
|
||||
},
|
||||
}
|
||||
return command[actionId][type](payload.data, payload.context)
|
||||
}
|
||||
|
||||
const strategy = inventoryService
|
||||
? createVariantStrategyWithInventory
|
||||
: createSimpleVariantStrategy
|
||||
|
||||
const transaction = await strategy.beginTransaction(
|
||||
ulid(),
|
||||
transactionHandler,
|
||||
input
|
||||
)
|
||||
await strategy.resume(transaction)
|
||||
|
||||
if (transaction.getState() !== TransactionState.DONE) {
|
||||
throw new MedusaError(
|
||||
MedusaError.Types.INVALID_DATA,
|
||||
transaction
|
||||
.getErrors()
|
||||
.map((err) => err.error?.message)
|
||||
.join("\n")
|
||||
)
|
||||
}
|
||||
|
||||
return transaction
|
||||
}
|
||||
|
||||
export const revertVariantTransaction = async (
|
||||
dependencies: InjectedDependencies,
|
||||
transaction: DistributedTransaction
|
||||
) => {
|
||||
const { inventoryService } = dependencies
|
||||
const strategy = inventoryService
|
||||
? createVariantStrategyWithInventory
|
||||
: createSimpleVariantStrategy
|
||||
|
||||
await strategy.cancelTransaction(transaction)
|
||||
}
|
||||
@@ -12,7 +12,12 @@ import {
|
||||
ValidateNested,
|
||||
} from "class-validator"
|
||||
import { defaultAdminProductFields, defaultAdminProductRelations } from "."
|
||||
import { PricingService, ProductService } from "../../../../services"
|
||||
import {
|
||||
PricingService,
|
||||
ProductService,
|
||||
ProductVariantInventoryService,
|
||||
ProductVariantService,
|
||||
} from "../../../../services"
|
||||
import {
|
||||
ProductSalesChannelReq,
|
||||
ProductTagReq,
|
||||
@@ -23,10 +28,21 @@ import {
|
||||
import { Type } from "class-transformer"
|
||||
import { EntityManager } from "typeorm"
|
||||
import SalesChannelFeatureFlag from "../../../../loaders/feature-flags/sales-channels"
|
||||
import { ProductStatus } from "../../../../models"
|
||||
import { ProductVariantPricesUpdateReq } from "../../../../types/product-variant"
|
||||
import { ProductStatus, ProductVariant } from "../../../../models"
|
||||
import {
|
||||
CreateProductVariantInput,
|
||||
ProductVariantPricesUpdateReq,
|
||||
} from "../../../../types/product-variant"
|
||||
import { FeatureFlagDecorators } from "../../../../utils/feature-flag-decorators"
|
||||
import { validator } from "../../../../utils/validator"
|
||||
import { MedusaError } from "medusa-core-utils"
|
||||
import { DistributedTransaction } from "../../../../utils/transaction"
|
||||
import {
|
||||
createVariantTransaction,
|
||||
revertVariantTransaction,
|
||||
} from "./transaction/create-product-variant"
|
||||
import { IInventoryService } from "../../../../interfaces"
|
||||
import { Logger } from "../../../../types/global"
|
||||
|
||||
/**
|
||||
* @oas [post] /products/{id}
|
||||
@@ -96,14 +112,106 @@ export default async (req, res) => {
|
||||
|
||||
const validated = await validator(AdminPostProductsProductReq, req.body)
|
||||
|
||||
const logger: Logger = req.scope.resolve("logger")
|
||||
const productService: ProductService = req.scope.resolve("productService")
|
||||
const pricingService: PricingService = req.scope.resolve("pricingService")
|
||||
const productVariantService: ProductVariantService = req.scope.resolve(
|
||||
"productVariantService"
|
||||
)
|
||||
const productVariantInventoryService: ProductVariantInventoryService =
|
||||
req.scope.resolve("productVariantInventoryService")
|
||||
const inventoryService: IInventoryService | undefined =
|
||||
req.scope.resolve("inventoryService")
|
||||
|
||||
const manager: EntityManager = req.scope.resolve("manager")
|
||||
await manager.transaction(async (transactionManager) => {
|
||||
const { variants } = validated
|
||||
delete validated.variants
|
||||
|
||||
await productService
|
||||
.withTransaction(transactionManager)
|
||||
.update(id, validated)
|
||||
|
||||
if (!variants) {
|
||||
return
|
||||
}
|
||||
|
||||
const product = await productService
|
||||
.withTransaction(transactionManager)
|
||||
.retrieve(id, {
|
||||
relations: ["variants"],
|
||||
})
|
||||
|
||||
// Iterate product variants and update their properties accordingly
|
||||
for (const variant of product.variants) {
|
||||
const exists = variants.find((v) => v.id && variant.id === v.id)
|
||||
if (!exists) {
|
||||
await productVariantService
|
||||
.withTransaction(transactionManager)
|
||||
.delete(variant.id)
|
||||
}
|
||||
}
|
||||
|
||||
const allVariantTransactions: DistributedTransaction[] = []
|
||||
const transactionDependencies = {
|
||||
manager: transactionManager,
|
||||
inventoryService,
|
||||
productVariantInventoryService,
|
||||
productVariantService,
|
||||
}
|
||||
|
||||
for (const [index, newVariant] of variants.entries()) {
|
||||
const variantRank = index
|
||||
|
||||
if (newVariant.id) {
|
||||
const variant = product.variants.find((v) => v.id === newVariant.id)
|
||||
|
||||
if (!variant) {
|
||||
throw new MedusaError(
|
||||
MedusaError.Types.NOT_FOUND,
|
||||
`Variant with id: ${newVariant.id} is not associated with this product`
|
||||
)
|
||||
}
|
||||
|
||||
await productVariantService
|
||||
.withTransaction(transactionManager)
|
||||
.update(variant, {
|
||||
...newVariant,
|
||||
variant_rank: variantRank,
|
||||
product_id: variant.product_id,
|
||||
})
|
||||
} else {
|
||||
// If the provided variant does not have an id, we assume that it
|
||||
// should be created
|
||||
|
||||
try {
|
||||
const input = {
|
||||
...newVariant,
|
||||
variant_rank: variantRank,
|
||||
options: newVariant.options || [],
|
||||
prices: newVariant.prices || [],
|
||||
}
|
||||
|
||||
const varTransation = await createVariantTransaction(
|
||||
transactionDependencies,
|
||||
product.id,
|
||||
input as CreateProductVariantInput
|
||||
)
|
||||
allVariantTransactions.push(varTransation)
|
||||
} catch (e) {
|
||||
await Promise.all(
|
||||
allVariantTransactions.map(async (transaction) => {
|
||||
await revertVariantTransaction(
|
||||
transactionDependencies,
|
||||
transaction
|
||||
).catch(() => logger.warn("Transaction couldn't be reverted."))
|
||||
})
|
||||
)
|
||||
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
const rawProduct = await productService.retrieve(id, {
|
||||
|
||||
@@ -60,10 +60,28 @@ export const products = {
|
||||
},
|
||||
],
|
||||
},
|
||||
multipleVariants: {
|
||||
id: IdMap.getId("multipleVariants"),
|
||||
title: "Multiple Variants",
|
||||
variants: [
|
||||
{
|
||||
id: IdMap.getId("variant_1"),
|
||||
title: "Variant 1",
|
||||
},
|
||||
{
|
||||
id: IdMap.getId("variant_2"),
|
||||
title: "Variant 2",
|
||||
},
|
||||
{
|
||||
id: IdMap.getId("variant_3"),
|
||||
title: "Variant 3",
|
||||
},
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
export const ProductServiceMock = {
|
||||
withTransaction: function() {
|
||||
withTransaction: function () {
|
||||
return this
|
||||
},
|
||||
create: jest.fn().mockImplementation((data) => {
|
||||
@@ -135,6 +153,9 @@ export const ProductServiceMock = {
|
||||
if (productId === IdMap.getId("variantsWithPrices")) {
|
||||
return Promise.resolve(products.variantsWithPrices)
|
||||
}
|
||||
if (productId === IdMap.getId("multipleVariants")) {
|
||||
return Promise.resolve(products.multipleVariants)
|
||||
}
|
||||
return Promise.resolve(undefined)
|
||||
}),
|
||||
update: jest.fn().mockImplementation((product, data) => {
|
||||
|
||||
@@ -247,20 +247,6 @@ describe("ProductService", () => {
|
||||
})
|
||||
productTypeRepository.upsertType = mockUpsertType
|
||||
|
||||
const productVariantRepository = MockRepository()
|
||||
|
||||
const productVariantService = {
|
||||
withTransaction: function () {
|
||||
return this
|
||||
},
|
||||
update: (variant, update) => {
|
||||
if (variant.id) {
|
||||
return update
|
||||
}
|
||||
return Promise.resolve()
|
||||
},
|
||||
}
|
||||
|
||||
const productTagRepository = MockRepository({
|
||||
findOne: (data) => {
|
||||
if (data.where.value === "test") {
|
||||
@@ -295,8 +281,6 @@ describe("ProductService", () => {
|
||||
const productService = new ProductService({
|
||||
manager: MockManager,
|
||||
productRepository,
|
||||
productVariantService,
|
||||
productVariantRepository,
|
||||
productTagRepository,
|
||||
productTypeRepository,
|
||||
eventBusService,
|
||||
@@ -327,17 +311,6 @@ describe("ProductService", () => {
|
||||
})
|
||||
})
|
||||
|
||||
it("successfully updates product variants", async () => {
|
||||
await productService.update(IdMap.getId("ironman&co"), {
|
||||
variants: [{ id: IdMap.getId("green"), title: "Greener" }],
|
||||
})
|
||||
|
||||
// The update of variants will be tested in product variant test file
|
||||
// Here we just test, that the function reaches its end when updating
|
||||
// variants
|
||||
expect(productRepository.save).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it("successfully updates product status", async () => {
|
||||
await productService.update(IdMap.getId("ironman"), {
|
||||
status: "published",
|
||||
@@ -376,30 +349,6 @@ describe("ProductService", () => {
|
||||
})
|
||||
})
|
||||
|
||||
it("successfully updates variant ranking", async () => {
|
||||
await productService.update("ranking test", {
|
||||
variants: [
|
||||
{ id: "test_321", title: "Greener", variant_rank: 1 },
|
||||
{ id: "test_123", title: "Blueer", variant_rank: 0 },
|
||||
],
|
||||
})
|
||||
|
||||
expect(eventBusService.emit).toHaveBeenCalledTimes(1)
|
||||
expect(eventBusService.emit).toHaveBeenCalledWith(
|
||||
"product.updated",
|
||||
expect.any(Object)
|
||||
)
|
||||
|
||||
expect(productRepository.save).toHaveBeenCalledTimes(1)
|
||||
expect(productRepository.save).toHaveBeenCalledWith({
|
||||
id: "ranking test",
|
||||
variants: [
|
||||
{ id: "test_321", title: "Greener", variant_rank: 0 },
|
||||
{ id: "test_123", title: "Blueer", variant_rank: 1 },
|
||||
],
|
||||
})
|
||||
})
|
||||
|
||||
it("successfully updates tags", async () => {
|
||||
await productService.update(IdMap.getId("ironman"), {
|
||||
tags: [
|
||||
@@ -431,23 +380,6 @@ describe("ProductService", () => {
|
||||
expect(err.message).toEqual("Product with id: 123 was not found")
|
||||
}
|
||||
})
|
||||
|
||||
it("throws on wrong variant in update", async () => {
|
||||
try {
|
||||
await productService.update(IdMap.getId("ironman&co"), {
|
||||
variants: [
|
||||
{ id: IdMap.getId("yellow") },
|
||||
{ id: IdMap.getId("green") },
|
||||
],
|
||||
})
|
||||
} catch (err) {
|
||||
expect(err.message).toEqual(
|
||||
`Variant with id: ${IdMap.getId(
|
||||
"yellow"
|
||||
)} is not associated with this product`
|
||||
)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
describe("delete", () => {
|
||||
|
||||
@@ -249,13 +249,13 @@ class ProductVariantInventoryService extends TransactionBaseService {
|
||||
* Attach a variant to an inventory item
|
||||
* @param variantId variant id
|
||||
* @param inventoryItemId inventory item id
|
||||
* @param quantity quantity of variant to attach
|
||||
* @param requiredQuantity quantity of variant to attach
|
||||
* @returns the variant inventory item
|
||||
*/
|
||||
async attachInventoryItem(
|
||||
variantId: string,
|
||||
inventoryItemId: string,
|
||||
quantity?: number
|
||||
requiredQuantity?: number
|
||||
): Promise<ProductVariantInventoryItem> {
|
||||
const manager = this.transactionManager_ || this.manager_
|
||||
|
||||
@@ -289,14 +289,14 @@ class ProductVariantInventoryService extends TransactionBaseService {
|
||||
}
|
||||
|
||||
let quantityToStore = 1
|
||||
if (typeof quantity !== "undefined") {
|
||||
if (quantity < 1) {
|
||||
if (typeof requiredQuantity !== "undefined") {
|
||||
if (requiredQuantity < 1) {
|
||||
throw new MedusaError(
|
||||
MedusaError.Types.INVALID_DATA,
|
||||
"Quantity must be greater than 0"
|
||||
)
|
||||
} else {
|
||||
quantityToStore = quantity
|
||||
quantityToStore = requiredQuantity
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -504,7 +504,7 @@ class ProductService extends TransactionBaseService {
|
||||
)
|
||||
const imageRepo = manager.getCustomRepository(this.imageRepository_)
|
||||
|
||||
const relations = ["variants", "tags", "images"]
|
||||
const relations = ["tags", "images"]
|
||||
|
||||
if (
|
||||
this.featureFlagRouter_.isFeatureEnabled(SalesChannelFeatureFlag.key)
|
||||
@@ -526,7 +526,6 @@ class ProductService extends TransactionBaseService {
|
||||
})
|
||||
|
||||
const {
|
||||
variants,
|
||||
metadata,
|
||||
images,
|
||||
tags,
|
||||
@@ -581,57 +580,6 @@ class ProductService extends TransactionBaseService {
|
||||
}
|
||||
}
|
||||
|
||||
if (variants) {
|
||||
// Iterate product variants and update their properties accordingly
|
||||
for (const variant of product.variants) {
|
||||
const exists = variants.find((v) => v.id && variant.id === v.id)
|
||||
if (!exists) {
|
||||
await productVariantRepo.remove(variant)
|
||||
}
|
||||
}
|
||||
|
||||
const newVariants: ProductVariant[] = []
|
||||
for (const [i, newVariant] of variants.entries()) {
|
||||
const variant_rank = i
|
||||
|
||||
if (newVariant.id) {
|
||||
const variant = product.variants.find((v) => v.id === newVariant.id)
|
||||
|
||||
if (!variant) {
|
||||
throw new MedusaError(
|
||||
MedusaError.Types.NOT_FOUND,
|
||||
`Variant with id: ${newVariant.id} is not associated with this product`
|
||||
)
|
||||
}
|
||||
|
||||
const saved = await this.productVariantService_
|
||||
.withTransaction(manager)
|
||||
.update(variant, {
|
||||
...newVariant,
|
||||
variant_rank,
|
||||
product_id: variant.product_id,
|
||||
})
|
||||
|
||||
newVariants.push(saved)
|
||||
} else {
|
||||
// If the provided variant does not have an id, we assume that it
|
||||
// should be created
|
||||
const created = await this.productVariantService_
|
||||
.withTransaction(manager)
|
||||
.create(product.id, {
|
||||
...newVariant,
|
||||
variant_rank,
|
||||
options: newVariant.options || [],
|
||||
prices: newVariant.prices || [],
|
||||
})
|
||||
|
||||
newVariants.push(created)
|
||||
}
|
||||
}
|
||||
|
||||
product.variants = newVariants
|
||||
}
|
||||
|
||||
for (const [key, value] of Object.entries(rest)) {
|
||||
if (isDefined(value)) {
|
||||
product[key] = value
|
||||
|
||||
@@ -7,6 +7,10 @@ import {
|
||||
} from "../../transaction"
|
||||
|
||||
describe("Transaction Orchestrator", () => {
|
||||
afterEach(() => {
|
||||
jest.clearAllMocks()
|
||||
})
|
||||
|
||||
it("Should follow the flow by calling steps in order with the correct payload", async () => {
|
||||
const mocks = {
|
||||
one: jest.fn().mockImplementation((payload) => {
|
||||
@@ -187,7 +191,7 @@ describe("Transaction Orchestrator", () => {
|
||||
expect(actionOrder).toEqual(["one", "two", "three"])
|
||||
})
|
||||
|
||||
it("Should forward step response if flag 'forwardResponse' is set to true", async () => {
|
||||
it("Should store invoke's step response if flag 'saveResponse' is set to true", async () => {
|
||||
const mocks = {
|
||||
one: jest.fn().mockImplementation((data) => {
|
||||
return { abc: 1234 }
|
||||
@@ -195,8 +199,11 @@ describe("Transaction Orchestrator", () => {
|
||||
two: jest.fn().mockImplementation((data) => {
|
||||
return { def: "567" }
|
||||
}),
|
||||
three: jest.fn().mockImplementation((data) => {
|
||||
return { end: true }
|
||||
three: jest.fn().mockImplementation((data, context) => {
|
||||
return { end: true, onePropAbc: context.invoke.firstMethod.abc }
|
||||
}),
|
||||
four: jest.fn().mockImplementation((data) => {
|
||||
return null
|
||||
}),
|
||||
}
|
||||
|
||||
@@ -217,24 +224,36 @@ describe("Transaction Orchestrator", () => {
|
||||
},
|
||||
},
|
||||
thirdMethod: {
|
||||
[TransactionHandlerType.INVOKE]: (data, context) => {
|
||||
return mocks.three(data, context)
|
||||
},
|
||||
},
|
||||
fourthMethod: {
|
||||
[TransactionHandlerType.INVOKE]: (data) => {
|
||||
return mocks.three(data)
|
||||
return mocks.four(data)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return command[actionId][functionHandlerType]({ ...payload.data })
|
||||
return command[actionId][functionHandlerType](
|
||||
payload.data,
|
||||
payload.context
|
||||
)
|
||||
}
|
||||
|
||||
const flow: TransactionStepsDefinition = {
|
||||
next: {
|
||||
action: "firstMethod",
|
||||
forwardResponse: true,
|
||||
saveResponse: true,
|
||||
next: {
|
||||
action: "secondMethod",
|
||||
forwardResponse: true,
|
||||
saveResponse: true,
|
||||
next: {
|
||||
action: "thirdMethod",
|
||||
saveResponse: true,
|
||||
next: {
|
||||
action: "fourthMethod",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -245,18 +264,111 @@ describe("Transaction Orchestrator", () => {
|
||||
const transaction = await strategy.beginTransaction(
|
||||
"transaction_id_123",
|
||||
handler,
|
||||
{
|
||||
prop: 123,
|
||||
}
|
||||
{ prop: 123 }
|
||||
)
|
||||
|
||||
await strategy.resume(transaction)
|
||||
|
||||
expect(mocks.one).toBeCalledWith({ prop: 123 })
|
||||
expect(mocks.two).toBeCalledWith({ prop: 123 })
|
||||
|
||||
expect(mocks.two).toBeCalledWith({ prop: 123, _response: { abc: 1234 } })
|
||||
expect(mocks.three).toBeCalledWith(
|
||||
{ prop: 123 },
|
||||
{
|
||||
invoke: {
|
||||
firstMethod: { abc: 1234 },
|
||||
secondMethod: { def: "567" },
|
||||
thirdMethod: { end: true, onePropAbc: 1234 },
|
||||
},
|
||||
compensate: {},
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
expect(mocks.three).toBeCalledWith({ prop: 123, _response: { def: "567" } })
|
||||
it("Should store compensate's step responses if flag 'saveResponse' is set to true", async () => {
|
||||
const mocks = {
|
||||
one: jest.fn().mockImplementation(() => {
|
||||
return 1
|
||||
}),
|
||||
two: jest.fn().mockImplementation(() => {
|
||||
return 2
|
||||
}),
|
||||
compensateOne: jest.fn().mockImplementation((compensateContext) => {
|
||||
return "compensate 1 - 2 = " + compensateContext.secondMethod.two
|
||||
}),
|
||||
compensateTwo: jest.fn().mockImplementation((compensateContext) => {
|
||||
return { two: "isCompensated" }
|
||||
}),
|
||||
}
|
||||
|
||||
async function handler(
|
||||
actionId: string,
|
||||
functionHandlerType: TransactionHandlerType,
|
||||
payload: TransactionPayload
|
||||
) {
|
||||
const command = {
|
||||
firstMethod: {
|
||||
[TransactionHandlerType.INVOKE]: () => {
|
||||
return mocks.one()
|
||||
},
|
||||
[TransactionHandlerType.COMPENSATE]: ({ compensate }) => {
|
||||
return mocks.compensateOne({ ...compensate })
|
||||
},
|
||||
},
|
||||
secondMethod: {
|
||||
[TransactionHandlerType.INVOKE]: () => {
|
||||
return mocks.two()
|
||||
},
|
||||
[TransactionHandlerType.COMPENSATE]: ({ compensate }) => {
|
||||
return mocks.compensateTwo({ ...compensate })
|
||||
},
|
||||
},
|
||||
thirdMethod: {
|
||||
[TransactionHandlerType.INVOKE]: () => {
|
||||
throw new Error("failed")
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return command[actionId][functionHandlerType](payload.context)
|
||||
}
|
||||
|
||||
const flow: TransactionStepsDefinition = {
|
||||
next: {
|
||||
action: "firstMethod",
|
||||
saveResponse: true,
|
||||
next: {
|
||||
action: "secondMethod",
|
||||
saveResponse: true,
|
||||
next: {
|
||||
action: "thirdMethod",
|
||||
noCompensation: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
const strategy = new TransactionOrchestrator("transaction-name", flow)
|
||||
|
||||
const transaction = await strategy.beginTransaction(
|
||||
"transaction_id_123",
|
||||
handler
|
||||
)
|
||||
|
||||
await strategy.resume(transaction)
|
||||
const resposes = transaction.getContext()
|
||||
|
||||
expect(mocks.compensateTwo).toBeCalledWith({})
|
||||
|
||||
expect(mocks.compensateOne).toBeCalledWith({
|
||||
secondMethod: {
|
||||
two: "isCompensated",
|
||||
},
|
||||
})
|
||||
|
||||
expect(resposes.compensate.firstMethod).toEqual(
|
||||
"compensate 1 - 2 = isCompensated"
|
||||
)
|
||||
})
|
||||
|
||||
it("Should continue the exection of next steps without waiting for the execution of all its parents when flag 'noWait' is set to true", async () => {
|
||||
@@ -362,8 +474,10 @@ describe("Transaction Orchestrator", () => {
|
||||
const flow: TransactionStepsDefinition = {
|
||||
next: {
|
||||
action: "firstMethod",
|
||||
maxRetries: 3,
|
||||
next: {
|
||||
action: "secondMethod",
|
||||
maxRetries: 3,
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -379,7 +493,7 @@ describe("Transaction Orchestrator", () => {
|
||||
|
||||
expect(transaction.transactionId).toBe("transaction_id_123")
|
||||
expect(mocks.one).toBeCalledTimes(1)
|
||||
expect(mocks.two).toBeCalledTimes(1 + strategy.DEFAULT_RETRIES)
|
||||
expect(mocks.two).toBeCalledTimes(4)
|
||||
expect(transaction.getState()).toBe(TransactionState.REVERTED)
|
||||
expect(mocks.compensateOne).toBeCalledTimes(1)
|
||||
|
||||
@@ -428,6 +542,7 @@ describe("Transaction Orchestrator", () => {
|
||||
const flow: TransactionStepsDefinition = {
|
||||
next: {
|
||||
action: "firstMethod",
|
||||
maxRetries: 1,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -440,7 +555,7 @@ describe("Transaction Orchestrator", () => {
|
||||
|
||||
await strategy.resume(transaction)
|
||||
|
||||
expect(mocks.one).toBeCalledTimes(1 + strategy.DEFAULT_RETRIES)
|
||||
expect(mocks.one).toBeCalledTimes(2)
|
||||
expect(transaction.getState()).toBe(TransactionState.FAILED)
|
||||
})
|
||||
|
||||
@@ -666,4 +781,77 @@ describe("Transaction Orchestrator", () => {
|
||||
|
||||
expect(resumedTransaction.getState()).toBe(TransactionState.REVERTED)
|
||||
})
|
||||
|
||||
it("Should revert a transaction when .cancelTransaction() is called", async () => {
|
||||
const mocks = {
|
||||
one: jest.fn().mockImplementation((payload) => {
|
||||
return payload
|
||||
}),
|
||||
oneCompensate: jest.fn().mockImplementation((payload) => {
|
||||
return payload
|
||||
}),
|
||||
two: jest.fn().mockImplementation((payload) => {
|
||||
return payload
|
||||
}),
|
||||
twoCompensate: jest.fn().mockImplementation((payload) => {
|
||||
return payload
|
||||
}),
|
||||
}
|
||||
|
||||
async function handler(
|
||||
actionId: string,
|
||||
functionHandlerType: TransactionHandlerType,
|
||||
payload: TransactionPayload
|
||||
) {
|
||||
const command = {
|
||||
firstMethod: {
|
||||
[TransactionHandlerType.INVOKE]: () => {
|
||||
mocks.one(payload)
|
||||
},
|
||||
[TransactionHandlerType.COMPENSATE]: () => {
|
||||
mocks.oneCompensate(payload)
|
||||
},
|
||||
},
|
||||
secondMethod: {
|
||||
[TransactionHandlerType.INVOKE]: () => {
|
||||
mocks.two(payload)
|
||||
},
|
||||
[TransactionHandlerType.COMPENSATE]: () => {
|
||||
mocks.twoCompensate(payload)
|
||||
},
|
||||
},
|
||||
}
|
||||
return command[actionId][functionHandlerType](payload)
|
||||
}
|
||||
|
||||
const flow: TransactionStepsDefinition = {
|
||||
next: {
|
||||
action: "firstMethod",
|
||||
next: {
|
||||
action: "secondMethod",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
const strategy = new TransactionOrchestrator("transaction-name", flow)
|
||||
|
||||
const transaction = await strategy.beginTransaction(
|
||||
"transaction_id_123",
|
||||
handler
|
||||
)
|
||||
|
||||
await strategy.resume(transaction)
|
||||
|
||||
expect(transaction.getState()).toBe(TransactionState.DONE)
|
||||
expect(mocks.one).toBeCalledTimes(1)
|
||||
expect(mocks.two).toBeCalledTimes(1)
|
||||
|
||||
await strategy.cancelTransaction(transaction)
|
||||
|
||||
expect(transaction.getState()).toBe(TransactionState.REVERTED)
|
||||
expect(mocks.one).toBeCalledTimes(1)
|
||||
expect(mocks.two).toBeCalledTimes(1)
|
||||
expect(mocks.oneCompensate).toBeCalledTimes(1)
|
||||
expect(mocks.twoCompensate).toBeCalledTimes(1)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
import e from "express"
|
||||
import { TransactionFlow, TransactionHandlerType, TransactionState } from "."
|
||||
|
||||
/**
|
||||
* @typedef {Object} TransactionMetadata
|
||||
* @property {string} producer - The id of the producer that created the transaction (transactionModelId).
|
||||
* @property {string} reply_to_topic - The topic to reply to for the transaction.
|
||||
* @property {string} idempotency_key - The idempotency key of the transaction.
|
||||
* @property {string} action - The action of the transaction.
|
||||
* @property {TransactionHandlerType} action_type - The type of the transaction.
|
||||
* @property {number} attempt - The number of attempts for the transaction.
|
||||
* @property {number} timestamp - The timestamp of the transaction.
|
||||
* @typedef TransactionMetadata
|
||||
* @property producer - The id of the producer that created the transaction (transactionModelId).
|
||||
* @property reply_to_topic - The topic to reply to for the transaction.
|
||||
* @property idempotency_key - The idempotency key of the transaction.
|
||||
* @property action - The action of the transaction.
|
||||
* @property action_type - The type of the transaction.
|
||||
* @property attempt - The number of attempts for the transaction.
|
||||
* @property timestamp - The timestamp of the transaction.
|
||||
*/
|
||||
export type TransactionMetadata = {
|
||||
producer: string
|
||||
@@ -20,16 +21,44 @@ export type TransactionMetadata = {
|
||||
timestamp: number
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef TransactionContext
|
||||
* @property invoke - Object containing responses of Invoke handlers on steps flagged with saveResponse.
|
||||
* @property compensate - Object containing responses of Compensate handlers on steps flagged with saveResponse.
|
||||
*/
|
||||
export class TransactionContext {
|
||||
constructor(
|
||||
public invoke: Record<string, unknown> = {},
|
||||
public compensate: Record<string, unknown> = {}
|
||||
) {}
|
||||
}
|
||||
|
||||
export class TransactionStepError {
|
||||
constructor(
|
||||
public action: string,
|
||||
public handlerType: TransactionHandlerType,
|
||||
public error: Error | null
|
||||
) {}
|
||||
}
|
||||
|
||||
export class TransactionCheckpoint {
|
||||
constructor(
|
||||
public flow: TransactionFlow,
|
||||
public context: TransactionContext,
|
||||
public errors: TransactionStepError[] = []
|
||||
) {}
|
||||
}
|
||||
|
||||
export class TransactionPayload {
|
||||
/**
|
||||
* @param metadata - The metadata of the transaction.
|
||||
* @param data - The payload data of the transaction and the response of the previous step if forwardResponse is true.
|
||||
* @param data - The initial payload data to begin a transation.
|
||||
* @param context - Object gathering responses of all steps flagged with saveResponse.
|
||||
*/
|
||||
constructor(
|
||||
public metadata: TransactionMetadata,
|
||||
public data: Record<string, unknown> & {
|
||||
_response: Record<string, unknown>
|
||||
}
|
||||
public data: Record<string, unknown>,
|
||||
public context: TransactionContext
|
||||
) {}
|
||||
}
|
||||
|
||||
@@ -40,11 +69,10 @@ export class TransactionPayload {
|
||||
export class DistributedTransaction {
|
||||
public modelId: string
|
||||
public transactionId: string
|
||||
public errors: {
|
||||
action: string
|
||||
handlerType: TransactionHandlerType
|
||||
error: Error | null
|
||||
}[] = []
|
||||
|
||||
private errors: TransactionStepError[] = []
|
||||
|
||||
private context: TransactionContext = new TransactionContext()
|
||||
|
||||
constructor(
|
||||
private flow: TransactionFlow,
|
||||
@@ -53,16 +81,34 @@ export class DistributedTransaction {
|
||||
handlerType: TransactionHandlerType,
|
||||
payload: TransactionPayload
|
||||
) => Promise<unknown>,
|
||||
public payload?: any
|
||||
public payload?: any,
|
||||
errors?: TransactionStepError[],
|
||||
context?: TransactionContext
|
||||
) {
|
||||
this.transactionId = flow.transactionId
|
||||
this.modelId = flow.transactionModelId
|
||||
|
||||
if (errors) {
|
||||
this.errors = errors
|
||||
}
|
||||
|
||||
if (context) {
|
||||
this.context = context
|
||||
}
|
||||
}
|
||||
|
||||
public getFlow() {
|
||||
return this.flow
|
||||
}
|
||||
|
||||
public getContext() {
|
||||
return this.context
|
||||
}
|
||||
|
||||
public getErrors() {
|
||||
return this.errors
|
||||
}
|
||||
|
||||
public addError(
|
||||
action: string,
|
||||
handlerType: TransactionHandlerType,
|
||||
@@ -75,6 +121,14 @@ export class DistributedTransaction {
|
||||
})
|
||||
}
|
||||
|
||||
public addResponse(
|
||||
action: string,
|
||||
handlerType: TransactionHandlerType,
|
||||
response: unknown
|
||||
) {
|
||||
this.context[handlerType][action] = response
|
||||
}
|
||||
|
||||
public hasFinished(): boolean {
|
||||
return [
|
||||
TransactionState.DONE,
|
||||
@@ -106,15 +160,22 @@ export class DistributedTransaction {
|
||||
|
||||
public static keyValueStore: any = {} // TODO: Use Key/Value db
|
||||
private static keyPrefix = "dtrans:"
|
||||
public async saveCheckpoint(): Promise<void> {
|
||||
public async saveCheckpoint(): Promise<TransactionCheckpoint> {
|
||||
// TODO: Use Key/Value db to save transactions
|
||||
const key = DistributedTransaction.keyPrefix + this.transactionId
|
||||
DistributedTransaction.keyValueStore[key] = JSON.stringify(this.getFlow())
|
||||
const data = new TransactionCheckpoint(
|
||||
this.getFlow(),
|
||||
this.getContext(),
|
||||
this.getErrors()
|
||||
)
|
||||
DistributedTransaction.keyValueStore[key] = JSON.stringify(data)
|
||||
|
||||
return data
|
||||
}
|
||||
|
||||
public static async loadTransactionFlow(
|
||||
public static async loadTransaction(
|
||||
transactionId: string
|
||||
): Promise<TransactionFlow | null> {
|
||||
): Promise<TransactionCheckpoint | null> {
|
||||
// TODO: Use Key/Value db to load transactions
|
||||
const key = DistributedTransaction.keyPrefix + transactionId
|
||||
if (DistributedTransaction.keyValueStore[key]) {
|
||||
|
||||
@@ -12,7 +12,7 @@ export type TransactionStepsDefinition = {
|
||||
timeout?: number
|
||||
async?: boolean
|
||||
noWait?: boolean
|
||||
forwardResponse?: boolean
|
||||
saveResponse?: boolean
|
||||
next?: TransactionStepsDefinition | TransactionStepsDefinition[]
|
||||
}
|
||||
|
||||
|
||||
@@ -9,9 +9,10 @@ import {
|
||||
} from "."
|
||||
import {
|
||||
DistributedTransaction,
|
||||
TransactionCheckpoint,
|
||||
TransactionPayload,
|
||||
} from "./distributed-transaction"
|
||||
import { TransactionStep } from "./transaction-step"
|
||||
import { TransactionStep, TransactionStepHandler } from "./transaction-step"
|
||||
|
||||
export type TransactionFlow = {
|
||||
transactionModelId: string
|
||||
@@ -25,12 +26,6 @@ export type TransactionFlow = {
|
||||
}
|
||||
}
|
||||
|
||||
export type TransactionStepHandler = (
|
||||
actionId: string,
|
||||
handlerType: TransactionHandlerType,
|
||||
payload: TransactionPayload
|
||||
) => Promise<unknown>
|
||||
|
||||
/**
|
||||
* @class TransactionOrchestrator is responsible for managing and executing distributed transactions.
|
||||
* It is based on a single transaction definition, which is used to execute all the transaction steps
|
||||
@@ -40,7 +35,7 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
private invokeSteps: string[] = []
|
||||
private compensateSteps: string[] = []
|
||||
|
||||
public DEFAULT_RETRIES = 3
|
||||
public DEFAULT_RETRIES = 0
|
||||
constructor(
|
||||
public id: string,
|
||||
private definition: TransactionStepsDefinition
|
||||
@@ -218,6 +213,7 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
|
||||
this.emit("finish", transaction)
|
||||
|
||||
// TODO: check TransactionModel if it should delete the checkpoint when the transaction is done
|
||||
void transaction.deleteCheckpoint()
|
||||
}
|
||||
|
||||
@@ -253,8 +249,14 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
step: TransactionStep,
|
||||
response: unknown
|
||||
): Promise<void> {
|
||||
if (step.forwardResponse) {
|
||||
step.saveResponse(response)
|
||||
if (step.saveResponse) {
|
||||
transaction.addResponse(
|
||||
step.definition.action!,
|
||||
step.isCompensating()
|
||||
? TransactionHandlerType.COMPENSATE
|
||||
: TransactionHandlerType.INVOKE,
|
||||
response
|
||||
)
|
||||
}
|
||||
|
||||
step.changeStatus(TransactionStepStatus.OK)
|
||||
@@ -340,17 +342,6 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
|
||||
step.changeStatus(TransactionStepStatus.WAITING)
|
||||
|
||||
const parent = this.getPreviousStep(flow, step)
|
||||
let payloadData = transaction.payload
|
||||
|
||||
if (parent.forwardResponse) {
|
||||
if (!payloadData) {
|
||||
payloadData = {}
|
||||
}
|
||||
|
||||
payloadData._response = parent.getResponse()
|
||||
}
|
||||
|
||||
const payload = new TransactionPayload(
|
||||
{
|
||||
producer: flow.transactionModelId,
|
||||
@@ -368,7 +359,8 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
attempt: step.attempts,
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
payloadData
|
||||
transaction.payload,
|
||||
transaction.getContext()
|
||||
)
|
||||
|
||||
if (!step.definition.async) {
|
||||
@@ -434,10 +426,33 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
await this.executeNext(transaction)
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel and revert a transaction compensating all its executed steps. It can be an ongoing transaction or a completed one
|
||||
* @param transaction - The transaction to be reverted
|
||||
*/
|
||||
public async cancelTransaction(
|
||||
transaction: DistributedTransaction
|
||||
): Promise<void> {
|
||||
if (transaction.modelId !== this.id) {
|
||||
throw new Error(
|
||||
`TransactionModel "${transaction.modelId}" cannot be orchestrated by "${this.id}" model.`
|
||||
)
|
||||
}
|
||||
|
||||
const flow = transaction.getFlow()
|
||||
if (flow.state === TransactionState.FAILED) {
|
||||
throw new Error(`Cannot revert a perment failed transaction.`)
|
||||
}
|
||||
|
||||
flow.state = TransactionState.WAITING_TO_COMPENSATE
|
||||
|
||||
await this.executeNext(transaction)
|
||||
}
|
||||
|
||||
private async createTransactionFlow(
|
||||
transactionId: string
|
||||
): Promise<TransactionFlow> {
|
||||
const model: TransactionFlow = {
|
||||
return {
|
||||
transactionModelId: this.id,
|
||||
transactionId: transactionId,
|
||||
hasFailedSteps: false,
|
||||
@@ -446,16 +461,19 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
definition: this.definition,
|
||||
steps: this.buildSteps(this.definition),
|
||||
}
|
||||
return model
|
||||
}
|
||||
|
||||
private async getTransactionFlowById(
|
||||
private async loadTransactionById(
|
||||
transactionId: string
|
||||
): Promise<TransactionFlow | null> {
|
||||
const flow = await DistributedTransaction.loadTransactionFlow(transactionId)
|
||||
if (flow !== null) {
|
||||
flow.steps = this.buildSteps(flow.definition, flow.steps)
|
||||
return flow
|
||||
): Promise<TransactionCheckpoint | null> {
|
||||
const transaction = await DistributedTransaction.loadTransaction(
|
||||
transactionId
|
||||
)
|
||||
|
||||
if (transaction !== null) {
|
||||
const flow = transaction.flow
|
||||
transaction.flow.steps = this.buildSteps(flow.definition, flow.steps)
|
||||
return transaction
|
||||
}
|
||||
|
||||
return null
|
||||
@@ -495,6 +513,7 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
level.push(obj.action)
|
||||
const id = level.join(".")
|
||||
const parent = level.slice(0, level.length - 1).join(".")
|
||||
|
||||
states[parent].next?.push(id)
|
||||
|
||||
const definitionCopy = { ...obj }
|
||||
@@ -506,7 +525,7 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
id,
|
||||
depth: level.length - 1,
|
||||
definition: definitionCopy,
|
||||
forwardResponse: definitionCopy.forwardResponse,
|
||||
saveResponse: definitionCopy.saveResponse,
|
||||
invoke: {
|
||||
state: TransactionState.NOT_STARTED,
|
||||
status: TransactionStepStatus.IDLE,
|
||||
@@ -538,15 +557,24 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
handler: TransactionStepHandler,
|
||||
payload?: unknown
|
||||
): Promise<DistributedTransaction> {
|
||||
let modelFlow = await this.getTransactionFlowById(transactionId)
|
||||
const existingTransaction = await this.loadTransactionById(transactionId)
|
||||
|
||||
let newTransaction = false
|
||||
if (!modelFlow) {
|
||||
let modelFlow
|
||||
if (!existingTransaction) {
|
||||
modelFlow = await this.createTransactionFlow(transactionId)
|
||||
newTransaction = true
|
||||
} else {
|
||||
modelFlow = existingTransaction.flow
|
||||
}
|
||||
|
||||
const transaction = new DistributedTransaction(modelFlow, handler, payload)
|
||||
const transaction = new DistributedTransaction(
|
||||
modelFlow,
|
||||
handler,
|
||||
payload,
|
||||
existingTransaction?.errors,
|
||||
existingTransaction?.context
|
||||
)
|
||||
if (newTransaction) {
|
||||
await transaction.saveCheckpoint()
|
||||
}
|
||||
@@ -583,18 +611,18 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
}
|
||||
|
||||
if (!transaction) {
|
||||
const existingTransaction = await this.getTransactionFlowById(
|
||||
transactionId
|
||||
)
|
||||
const existingTransaction = await this.loadTransactionById(transactionId)
|
||||
|
||||
if (existingTransaction === null) {
|
||||
throw new Error("Transaction could not be found.")
|
||||
throw new Error(`Transaction ${transactionId} could not be found.`)
|
||||
}
|
||||
|
||||
transaction = new DistributedTransaction(
|
||||
existingTransaction,
|
||||
existingTransaction.flow,
|
||||
handler!,
|
||||
payload
|
||||
payload,
|
||||
existingTransaction.errors,
|
||||
existingTransaction.context
|
||||
)
|
||||
}
|
||||
|
||||
@@ -616,24 +644,24 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
* @param responseIdempotencyKey - The idempotency key for the step
|
||||
* @param handler - The handler function to execute the step
|
||||
* @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey
|
||||
* @param payload - The payload of the step
|
||||
* @param response - The response of the step
|
||||
*/
|
||||
public async registerStepSuccess(
|
||||
responseIdempotencyKey: string,
|
||||
handler?: TransactionStepHandler,
|
||||
transaction?: DistributedTransaction,
|
||||
payload?: unknown
|
||||
response?: unknown
|
||||
): Promise<DistributedTransaction> {
|
||||
const [curTransaction, step] =
|
||||
await this.getTransactionAndStepFromIdempotencyKey(
|
||||
responseIdempotencyKey,
|
||||
handler,
|
||||
transaction,
|
||||
payload
|
||||
response
|
||||
)
|
||||
|
||||
if (step.getStates().status === TransactionStepStatus.WAITING) {
|
||||
await this.setStepSuccess(curTransaction, step, payload)
|
||||
await this.setStepSuccess(curTransaction, step, response)
|
||||
this.emit("resume", curTransaction)
|
||||
await this.executeNext(curTransaction)
|
||||
} else {
|
||||
@@ -651,21 +679,21 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
* @param error - The error that caused the failure
|
||||
* @param handler - The handler function to execute the step
|
||||
* @param transaction - The current transaction
|
||||
* @param payload - The payload of the step
|
||||
* @param response - The response of the step
|
||||
*/
|
||||
public async registerStepFailure(
|
||||
responseIdempotencyKey: string,
|
||||
error: Error | null,
|
||||
handler?: TransactionStepHandler,
|
||||
transaction?: DistributedTransaction,
|
||||
payload?: unknown
|
||||
response?: unknown
|
||||
): Promise<DistributedTransaction> {
|
||||
const [curTransaction, step] =
|
||||
await this.getTransactionAndStepFromIdempotencyKey(
|
||||
responseIdempotencyKey,
|
||||
handler,
|
||||
transaction,
|
||||
payload
|
||||
response
|
||||
)
|
||||
|
||||
if (step.getStates().status === TransactionStepStatus.WAITING) {
|
||||
@@ -680,8 +708,4 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
|
||||
return curTransaction
|
||||
}
|
||||
|
||||
public cancelTransaction(transactionId: string) {
|
||||
// TODO: stop a transaction while in progress and compensate all executed steps
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,8 +2,16 @@ import {
|
||||
TransactionStepsDefinition,
|
||||
TransactionStepStatus,
|
||||
TransactionState,
|
||||
TransactionHandlerType,
|
||||
TransactionPayload,
|
||||
} from "."
|
||||
|
||||
export type TransactionStepHandler = (
|
||||
actionId: string,
|
||||
handlerType: TransactionHandlerType,
|
||||
payload: TransactionPayload
|
||||
) => Promise<unknown>
|
||||
|
||||
/**
|
||||
* @class TransactionStep
|
||||
* @classdesc A class representing a single step in a transaction flow
|
||||
@@ -19,8 +27,7 @@ export class TransactionStep {
|
||||
* @member failures - The number of failures encountered while executing the step
|
||||
* @member lastAttempt - The timestamp of the last attempt made to execute the step
|
||||
* @member next - The ids of the next steps in the flow
|
||||
* @member response - The response from the last successful execution of the step
|
||||
* @member forwardResponse - A flag indicating if the response from the previous step should be passed to this step as payload
|
||||
* @member saveResponse - A flag indicating if the response of a step should be shared in the transaction context and available to subsequent steps - default is false
|
||||
*/
|
||||
private stepFailed = false
|
||||
id: string
|
||||
@@ -38,8 +45,7 @@ export class TransactionStep {
|
||||
failures: number
|
||||
lastAttempt: number | null
|
||||
next: string[]
|
||||
response: unknown
|
||||
forwardResponse: boolean
|
||||
saveResponse: boolean
|
||||
|
||||
public getStates() {
|
||||
return this.isCompensating() ? this.compensate : this.invoke
|
||||
@@ -123,14 +129,6 @@ export class TransactionStep {
|
||||
)
|
||||
}
|
||||
|
||||
public saveResponse(response) {
|
||||
this.response = response
|
||||
}
|
||||
|
||||
public getResponse(): unknown {
|
||||
return this.response
|
||||
}
|
||||
|
||||
canRetry(): boolean {
|
||||
return !!(
|
||||
this.lastAttempt &&
|
||||
|
||||
Reference in New Issue
Block a user