From d50db84a336da2de9c06a59aa79f2a5e9aa558f1 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Date: Thu, 2 Feb 2023 09:01:10 -0300 Subject: [PATCH] Feat: TO variant creation (#3097) --- .changeset/perfect-worms-hammer.md | 5 + .../inventory-items/update-inventory-item.ts | 14 +- .../inventory-items/update-location-level.ts | 8 +- .../products/__tests__/update-product.js | 63 +++-- .../routes/admin/products/create-product.ts | 74 ++++-- .../routes/admin/products/create-variant.ts | 183 +------------- .../transaction/create-product-variant.ts | 225 ++++++++++++++++++ .../routes/admin/products/update-product.ts | 114 ++++++++- .../medusa/src/services/__mocks__/product.js | 23 +- .../medusa/src/services/__tests__/product.js | 68 ------ .../src/services/product-variant-inventory.ts | 10 +- packages/medusa/src/services/product.ts | 54 +---- .../transaction/transaction-orchestrator.ts | 216 +++++++++++++++-- .../transaction/distributed-transaction.ts | 105 ++++++-- .../medusa/src/utils/transaction/index.ts | 2 +- .../transaction/transaction-orchestrator.ts | 126 ++++++---- .../src/utils/transaction/transaction-step.ts | 22 +- 17 files changed, 853 insertions(+), 459 deletions(-) create mode 100644 .changeset/perfect-worms-hammer.md create mode 100644 packages/medusa/src/api/routes/admin/products/transaction/create-product-variant.ts diff --git a/.changeset/perfect-worms-hammer.md b/.changeset/perfect-worms-hammer.md new file mode 100644 index 0000000000..12febc57fc --- /dev/null +++ b/.changeset/perfect-worms-hammer.md @@ -0,0 +1,5 @@ +--- +"@medusajs/medusa": minor +--- + +ProductService.update no longer create product variants. It was moved to the endpoint handler update-product.ts diff --git a/packages/medusa/src/api/routes/admin/inventory-items/update-inventory-item.ts b/packages/medusa/src/api/routes/admin/inventory-items/update-inventory-item.ts index 8e7974fa01..ad4628f209 100644 --- a/packages/medusa/src/api/routes/admin/inventory-items/update-inventory-item.ts +++ b/packages/medusa/src/api/routes/admin/inventory-items/update-inventory-item.ts @@ -3,6 +3,7 @@ import { IsBoolean, IsNumber, IsOptional, IsString } from "class-validator" import { IInventoryService } from "../../../../interfaces" import { FindParams } from "../../../../types/common" +import { EntityManager } from "typeorm" /** * @oas [post] /inventory-items/{id} @@ -71,11 +72,16 @@ export default async (req: Request, res: Response) => { const inventoryService: IInventoryService = req.scope.resolve("inventoryService") + const manager: EntityManager = req.scope.resolve("manager") - await inventoryService.updateInventoryItem( - id, - req.validatedBody as AdminPostInventoryItemsInventoryItemReq - ) + await manager.transaction(async (transactionManager) => { + await inventoryService + .withTransaction(transactionManager) + .updateInventoryItem( + id, + req.validatedBody as AdminPostInventoryItemsInventoryItemReq + ) + }) const inventoryItem = await inventoryService.retrieveInventoryItem( id, diff --git a/packages/medusa/src/api/routes/admin/inventory-items/update-location-level.ts b/packages/medusa/src/api/routes/admin/inventory-items/update-location-level.ts index 0be5bc1649..fb0218de3d 100644 --- a/packages/medusa/src/api/routes/admin/inventory-items/update-location-level.ts +++ b/packages/medusa/src/api/routes/admin/inventory-items/update-location-level.ts @@ -3,6 +3,7 @@ import { IsOptional, IsNumber } from "class-validator" import { IInventoryService } from "../../../../interfaces" import { FindParams } from "../../../../types/common" +import { EntityManager } from "typeorm" /** * @oas [post] /inventory-items/{id}/location-levels/{location_id} @@ -72,11 +73,16 @@ export default async (req: Request, res: Response) => { const inventoryService: IInventoryService = req.scope.resolve("inventoryService") + const manager: EntityManager = req.scope.resolve("manager") const validatedBody = req.validatedBody as AdminPostInventoryItemsItemLocationLevelsLevelReq - await inventoryService.updateInventoryLevel(id, location_id, validatedBody) + await manager.transaction(async (transactionManager) => { + await inventoryService + .withTransaction(transactionManager) + .updateInventoryLevel(id, location_id, validatedBody) + }) const inventoryItem = await inventoryService.retrieveInventoryItem( id, diff --git a/packages/medusa/src/api/routes/admin/products/__tests__/update-product.js b/packages/medusa/src/api/routes/admin/products/__tests__/update-product.js index 374b6374d9..16ff180587 100644 --- a/packages/medusa/src/api/routes/admin/products/__tests__/update-product.js +++ b/packages/medusa/src/api/routes/admin/products/__tests__/update-product.js @@ -1,6 +1,8 @@ import { IdMap } from "medusa-test-utils" import { request } from "../../../../../helpers/test-request" import { ProductServiceMock } from "../../../../../services/__mocks__/product" +import { ProductVariantServiceMock } from "../../../../../services/__mocks__/product-variant" +import { EventBusServiceMock } from "../../../../../services/__mocks__/event-bus" describe("POST /admin/products/:id", () => { describe("successfully updates a product", () => { @@ -9,12 +11,17 @@ describe("POST /admin/products/:id", () => { beforeAll(async () => { subject = await request( "POST", - `/admin/products/${IdMap.getId("product1")}`, + `/admin/products/${IdMap.getId("multipleVariants")}`, { payload: { title: "Product 1", description: "Updated test description", handle: "handle", + variants: [ + { id: IdMap.getId("variant_1"), title: "Green" }, + { title: "Blue" }, + { title: "Yellow" }, + ], }, adminSession: { jwt: { @@ -32,7 +39,7 @@ describe("POST /admin/products/:id", () => { it("calls update", () => { expect(ProductServiceMock.update).toHaveBeenCalledTimes(1) expect(ProductServiceMock.update).toHaveBeenCalledWith( - IdMap.getId("product1"), + IdMap.getId("multipleVariants"), expect.objectContaining({ title: "Product 1", description: "Updated test description", @@ -40,51 +47,35 @@ describe("POST /admin/products/:id", () => { }) ) }) + + it("successfully updates variants and create new ones", async () => { + expect(ProductVariantServiceMock.delete).toHaveBeenCalledTimes(2) + expect(ProductVariantServiceMock.update).toHaveBeenCalledTimes(1) + expect(ProductVariantServiceMock.create).toHaveBeenCalledTimes(2) + }) }) describe("handles failed update operation", () => { - it("throws if metadata is to be updated", async () => { - try { - await request("POST", `/admin/products/${IdMap.getId("product1")}`, { + it("throws on wrong variant in update", async () => { + const subject = await request( + "POST", + `/admin/products/${IdMap.getId("variantsWithPrices")}`, + { payload: { - _id: IdMap.getId("product1"), title: "Product 1", - metadata: "Test Description", + variants: [{ id: "test_321", title: "Green" }], }, adminSession: { jwt: { userId: IdMap.getId("admin_user"), }, }, - }) - } catch (error) { - expect(error.status).toEqual(400) - expect(error.message).toEqual( - "Use setMetadata to update metadata fields" - ) - } - }) - - it("throws if variants is to be updated", async () => { - try { - await request("POST", `/admin/products/${IdMap.getId("product1")}`, { - payload: { - _id: IdMap.getId("product1"), - title: "Product 1", - metadata: "Test Description", - }, - adminSession: { - jwt: { - userId: IdMap.getId("admin_user"), - }, - }, - }) - } catch (error) { - expect(error.status).toEqual(400) - expect(error.message).toEqual( - "Use addVariant, reorderVariants, removeVariant to update Product Variants" - ) - } + } + ) + expect(subject.status).toEqual(404) + expect(subject.error.text).toEqual( + `{"type":"not_found","message":"Variant with id: test_321 is not associated with this product"}` + ) }) }) }) diff --git a/packages/medusa/src/api/routes/admin/products/create-product.ts b/packages/medusa/src/api/routes/admin/products/create-product.ts index 67b2ed752c..0a84603f98 100644 --- a/packages/medusa/src/api/routes/admin/products/create-product.ts +++ b/packages/medusa/src/api/routes/admin/products/create-product.ts @@ -12,6 +12,7 @@ import { defaultAdminProductFields, defaultAdminProductRelations } from "." import { PricingService, ProductService, + ProductVariantInventoryService, ProductVariantService, ShippingProfileService, } from "../../../../services" @@ -32,6 +33,14 @@ import SalesChannelFeatureFlag from "../../../../loaders/feature-flags/sales-cha import { ProductStatus } from "../../../../models" import { FeatureFlagDecorators } from "../../../../utils/feature-flag-decorators" import { validator } from "../../../../utils/validator" +import { IInventoryService } from "../../../../interfaces" + +import { + createVariantTransaction, + revertVariantTransaction, +} from "./transaction/create-product-variant" +import { DistributedTransaction } from "../../../../utils/transaction" +import { Logger } from "../../../../types/global" /** * @oas [post] /products @@ -98,6 +107,7 @@ import { validator } from "../../../../utils/validator" export default async (req, res) => { const validated = await validator(AdminPostProductsReq, req.body) + const logger: Logger = req.scope.resolve("logger") const productService: ProductService = req.scope.resolve("productService") const pricingService: PricingService = req.scope.resolve("pricingService") const productVariantService: ProductVariantService = req.scope.resolve( @@ -106,6 +116,10 @@ export default async (req, res) => { const shippingProfileService: ShippingProfileService = req.scope.resolve( "shippingProfileService" ) + const productVariantInventoryService: ProductVariantInventoryService = + req.scope.resolve("productVariantInventoryService") + const inventoryService: IInventoryService | undefined = + req.scope.resolve("inventoryService") const entityManager: EntityManager = req.scope.resolve("manager") @@ -134,8 +148,8 @@ export default async (req, res) => { .create({ ...validated, profile_id: shippingProfile.id }) if (variants) { - for (const [i, variant] of variants.entries()) { - variant["variant_rank"] = i + for (const [index, variant] of variants.entries()) { + variant["variant_rank"] = index } const optionIds = @@ -143,22 +157,48 @@ export default async (req, res) => { (o) => newProduct.options.find((newO) => newO.title === o.title)?.id ) || [] - await Promise.all( - variants.map(async (v) => { - const variant = { - ...v, - options: - v?.options?.map((o, index) => ({ - ...o, - option_id: optionIds[index], - })) || [], - } + const allVariantTransactions: DistributedTransaction[] = [] + const transactionDependencies = { + manager, + inventoryService, + productVariantInventoryService, + productVariantService, + } - await productVariantService - .withTransaction(manager) - .create(newProduct.id, variant as CreateProductVariantInput) - }) - ) + try { + await Promise.all( + variants.map(async (variant) => { + const options = + variant?.options?.map((option, index) => ({ + ...option, + option_id: optionIds[index], + })) || [] + + const input = { + ...variant, + options, + } + + const varTransation = await createVariantTransaction( + transactionDependencies, + newProduct.id, + input as CreateProductVariantInput + ) + allVariantTransactions.push(varTransation) + }) + ) + } catch (e) { + await Promise.all( + allVariantTransactions.map(async (transaction) => { + await revertVariantTransaction( + transactionDependencies, + transaction + ).catch(() => logger.warn("Transaction couldn't be reverted.")) + }) + ) + + throw e + } } return newProduct diff --git a/packages/medusa/src/api/routes/admin/products/create-variant.ts b/packages/medusa/src/api/routes/admin/products/create-variant.ts index 099328f398..553e8271bd 100644 --- a/packages/medusa/src/api/routes/admin/products/create-variant.ts +++ b/packages/medusa/src/api/routes/admin/products/create-variant.ts @@ -22,18 +22,10 @@ import { } from "../../../../types/product-variant" import { validator } from "../../../../utils/validator" -import { - TransactionHandlerType, - TransactionOrchestrator, - TransactionPayload, - TransactionState, - TransactionStepsDefinition, -} from "../../../../utils/transaction" - -import { ulid } from "ulid" -import { MedusaError } from "medusa-core-utils" import { EntityManager } from "typeorm" +import { createVariantTransaction } from "./transaction/create-product-variant" + /** * @oas [post] /products/{id}/variants * operationId: "PostProductsProductVariants" @@ -122,47 +114,6 @@ import { EntityManager } from "typeorm" * $ref: "#/components/responses/500_error" */ -enum actions { - createVariant = "createVariant", - createInventoryItem = "createInventoryItem", - attachInventoryItem = "attachInventoryItem", -} - -const simpleFlow: TransactionStepsDefinition = { - next: { - action: actions.createVariant, - maxRetries: 0, - }, -} - -const flowWithInventory: TransactionStepsDefinition = { - next: { - action: actions.createVariant, - forwardResponse: true, - maxRetries: 0, - next: { - action: actions.createInventoryItem, - forwardResponse: true, - maxRetries: 0, - next: { - action: actions.attachInventoryItem, - noCompensation: true, - maxRetries: 0, - }, - }, - }, -} - -const createSimpleVariantStrategy = new TransactionOrchestrator( - "create-variant", - simpleFlow -) - -const createVariantStrategyWithInventory = new TransactionOrchestrator( - "create-variant-with-inventory", - flowWithInventory -) - export default async (req, res) => { const { id } = req.params @@ -179,129 +130,19 @@ export default async (req, res) => { "productVariantService" ) - const createdId: Record = { - variant: null, - inventoryItem: null, - } - const manager: EntityManager = req.scope.resolve("manager") + await manager.transaction(async (transactionManager) => { - const inventoryServiceTx = - inventoryService?.withTransaction(transactionManager) - - const productVariantInventoryServiceTx = - productVariantInventoryService.withTransaction(transactionManager) - - const productVariantServiceTx = - productVariantService.withTransaction(transactionManager) - - async function createVariant() { - const variant = await productVariantServiceTx.create( - id, - validated as CreateProductVariantInput - ) - - createdId.variant = variant.id - - return { variant } - } - - async function removeVariant() { - if (createdId.variant) { - await productVariantServiceTx.delete(createdId.variant) - } - } - - async function createInventoryItem(variant) { - if (!validated.manage_inventory) { - return - } - - const inventoryItem = await inventoryServiceTx!.createInventoryItem({ - sku: validated.sku, - origin_country: validated.origin_country, - hs_code: validated.hs_code, - mid_code: validated.mid_code, - material: validated.material, - weight: validated.weight, - length: validated.length, - height: validated.height, - width: validated.width, - }) - - createdId.inventoryItem = inventoryItem.id - - return { variant, inventoryItem } - } - - async function removeInventoryItem() { - if (createdId.inventoryItem) { - await inventoryServiceTx!.deleteInventoryItem(createdId.inventoryItem) - } - } - - async function attachInventoryItem(variant, inventoryItem) { - if (!validated.manage_inventory) { - return - } - - await productVariantInventoryServiceTx.attachInventoryItem( - variant.id, - inventoryItem.id, - validated.inventory_quantity - ) - } - - async function transactionHandler( - actionId: string, - type: TransactionHandlerType, - payload: TransactionPayload - ) { - const command = { - [actions.createVariant]: { - [TransactionHandlerType.INVOKE]: async () => { - return await createVariant() - }, - [TransactionHandlerType.COMPENSATE]: async () => { - await removeVariant() - }, - }, - [actions.createInventoryItem]: { - [TransactionHandlerType.INVOKE]: async (data) => { - const { variant } = data._response ?? {} - return await createInventoryItem(variant) - }, - [TransactionHandlerType.COMPENSATE]: async () => { - await removeInventoryItem() - }, - }, - [actions.attachInventoryItem]: { - [TransactionHandlerType.INVOKE]: async (data) => { - const { variant, inventoryItem } = data._response ?? {} - return await attachInventoryItem(variant, inventoryItem) - }, - }, - } - return command[actionId][type](payload.data) - } - - const strategy = inventoryService - ? createVariantStrategyWithInventory - : createSimpleVariantStrategy - - const transaction = await strategy.beginTransaction( - ulid(), - transactionHandler, - validated + await createVariantTransaction( + { + manager: transactionManager, + inventoryService, + productVariantInventoryService, + productVariantService, + }, + id, + validated as CreateProductVariantInput ) - await strategy.resume(transaction) - - if (transaction.getState() !== TransactionState.DONE) { - throw new MedusaError( - MedusaError.Types.INVALID_DATA, - transaction.errors.map((err) => err.error?.message).join("\n") - ) - } }) const productService: ProductService = req.scope.resolve("productService") diff --git a/packages/medusa/src/api/routes/admin/products/transaction/create-product-variant.ts b/packages/medusa/src/api/routes/admin/products/transaction/create-product-variant.ts new file mode 100644 index 0000000000..fbb69c885c --- /dev/null +++ b/packages/medusa/src/api/routes/admin/products/transaction/create-product-variant.ts @@ -0,0 +1,225 @@ +import { + DistributedTransaction, + TransactionHandlerType, + TransactionOrchestrator, + TransactionPayload, + TransactionState, + TransactionStepsDefinition, +} from "../../../../../utils/transaction" +import { ulid } from "ulid" +import { EntityManager } from "typeorm" +import { IInventoryService } from "../../../../../interfaces" +import { + ProductVariantInventoryService, + ProductVariantService, +} from "../../../../../services" +import { CreateProductVariantInput } from "../../../../../types/product-variant" +import { InventoryItemDTO } from "../../../../../types/inventory" +import { ProductVariant } from "../../../../../models" +import { MedusaError } from "medusa-core-utils" + +enum actions { + createVariant = "createVariant", + createInventoryItem = "createInventoryItem", + attachInventoryItem = "attachInventoryItem", +} + +const simpleFlow: TransactionStepsDefinition = { + next: { + action: actions.createVariant, + }, +} + +const flowWithInventory: TransactionStepsDefinition = { + next: { + action: actions.createVariant, + saveResponse: true, + next: { + action: actions.createInventoryItem, + saveResponse: true, + next: { + action: actions.attachInventoryItem, + noCompensation: true, + }, + }, + }, +} + +const createSimpleVariantStrategy = new TransactionOrchestrator( + "create-variant", + simpleFlow +) + +const createVariantStrategyWithInventory = new TransactionOrchestrator( + "create-variant-with-inventory", + flowWithInventory +) + +type InjectedDependencies = { + manager: EntityManager + productVariantService: ProductVariantService + productVariantInventoryService: ProductVariantInventoryService + inventoryService?: IInventoryService +} + +export const createVariantTransaction = async ( + dependencies: InjectedDependencies, + productId: string, + input: CreateProductVariantInput +): Promise => { + const { + manager, + productVariantService, + inventoryService, + productVariantInventoryService, + } = dependencies + + const inventoryServiceTx = inventoryService?.withTransaction(manager) + + const productVariantInventoryServiceTx = + productVariantInventoryService.withTransaction(manager) + + const productVariantServiceTx = productVariantService.withTransaction(manager) + + async function createVariant(variantInput: CreateProductVariantInput) { + const variant = await productVariantServiceTx.create( + productId, + variantInput + ) + + return { variant } + } + + async function removeVariant(variant: ProductVariant) { + if (variant) { + await productVariantServiceTx.delete(variant.id) + } + } + + async function createInventoryItem(variant: ProductVariant) { + if (!variant.manage_inventory) { + return + } + + const inventoryItem = await inventoryServiceTx!.createInventoryItem({ + sku: variant.sku, + origin_country: variant.origin_country, + hs_code: variant.hs_code, + mid_code: variant.mid_code, + material: variant.material, + weight: variant.weight, + length: variant.length, + height: variant.height, + width: variant.width, + }) + + return { inventoryItem } + } + + async function removeInventoryItem(inventoryItem: InventoryItemDTO) { + if (inventoryItem) { + await inventoryServiceTx!.deleteInventoryItem(inventoryItem.id) + } + } + + async function attachInventoryItem( + variant: ProductVariant, + inventoryItem: InventoryItemDTO + ) { + if (!variant.manage_inventory) { + return + } + + await productVariantInventoryServiceTx.attachInventoryItem( + variant.id, + inventoryItem.id + ) + } + + async function transactionHandler( + actionId: string, + type: TransactionHandlerType, + payload: TransactionPayload + ) { + const command = { + [actions.createVariant]: { + [TransactionHandlerType.INVOKE]: async ( + data: CreateProductVariantInput + ) => { + return await createVariant(data) + }, + [TransactionHandlerType.COMPENSATE]: async ( + data: CreateProductVariantInput, + { invoke } + ) => { + await removeVariant(invoke[actions.createVariant]) + }, + }, + [actions.createInventoryItem]: { + [TransactionHandlerType.INVOKE]: async ( + data: CreateProductVariantInput, + { invoke } + ) => { + const { [actions.createVariant]: variant } = invoke + + return await createInventoryItem(variant) + }, + [TransactionHandlerType.COMPENSATE]: async ( + data: CreateProductVariantInput, + { invoke } + ) => { + await removeInventoryItem(invoke[actions.createInventoryItem]) + }, + }, + [actions.attachInventoryItem]: { + [TransactionHandlerType.INVOKE]: async ( + data: CreateProductVariantInput, + { invoke } + ) => { + const { + [actions.createVariant]: variant, + [actions.createInventoryItem]: inventoryItem, + } = invoke + + return await attachInventoryItem(variant, inventoryItem) + }, + }, + } + return command[actionId][type](payload.data, payload.context) + } + + const strategy = inventoryService + ? createVariantStrategyWithInventory + : createSimpleVariantStrategy + + const transaction = await strategy.beginTransaction( + ulid(), + transactionHandler, + input + ) + await strategy.resume(transaction) + + if (transaction.getState() !== TransactionState.DONE) { + throw new MedusaError( + MedusaError.Types.INVALID_DATA, + transaction + .getErrors() + .map((err) => err.error?.message) + .join("\n") + ) + } + + return transaction +} + +export const revertVariantTransaction = async ( + dependencies: InjectedDependencies, + transaction: DistributedTransaction +) => { + const { inventoryService } = dependencies + const strategy = inventoryService + ? createVariantStrategyWithInventory + : createSimpleVariantStrategy + + await strategy.cancelTransaction(transaction) +} diff --git a/packages/medusa/src/api/routes/admin/products/update-product.ts b/packages/medusa/src/api/routes/admin/products/update-product.ts index 3141737567..b685171b1e 100644 --- a/packages/medusa/src/api/routes/admin/products/update-product.ts +++ b/packages/medusa/src/api/routes/admin/products/update-product.ts @@ -12,7 +12,12 @@ import { ValidateNested, } from "class-validator" import { defaultAdminProductFields, defaultAdminProductRelations } from "." -import { PricingService, ProductService } from "../../../../services" +import { + PricingService, + ProductService, + ProductVariantInventoryService, + ProductVariantService, +} from "../../../../services" import { ProductSalesChannelReq, ProductTagReq, @@ -23,10 +28,21 @@ import { import { Type } from "class-transformer" import { EntityManager } from "typeorm" import SalesChannelFeatureFlag from "../../../../loaders/feature-flags/sales-channels" -import { ProductStatus } from "../../../../models" -import { ProductVariantPricesUpdateReq } from "../../../../types/product-variant" +import { ProductStatus, ProductVariant } from "../../../../models" +import { + CreateProductVariantInput, + ProductVariantPricesUpdateReq, +} from "../../../../types/product-variant" import { FeatureFlagDecorators } from "../../../../utils/feature-flag-decorators" import { validator } from "../../../../utils/validator" +import { MedusaError } from "medusa-core-utils" +import { DistributedTransaction } from "../../../../utils/transaction" +import { + createVariantTransaction, + revertVariantTransaction, +} from "./transaction/create-product-variant" +import { IInventoryService } from "../../../../interfaces" +import { Logger } from "../../../../types/global" /** * @oas [post] /products/{id} @@ -96,14 +112,106 @@ export default async (req, res) => { const validated = await validator(AdminPostProductsProductReq, req.body) + const logger: Logger = req.scope.resolve("logger") const productService: ProductService = req.scope.resolve("productService") const pricingService: PricingService = req.scope.resolve("pricingService") + const productVariantService: ProductVariantService = req.scope.resolve( + "productVariantService" + ) + const productVariantInventoryService: ProductVariantInventoryService = + req.scope.resolve("productVariantInventoryService") + const inventoryService: IInventoryService | undefined = + req.scope.resolve("inventoryService") const manager: EntityManager = req.scope.resolve("manager") await manager.transaction(async (transactionManager) => { + const { variants } = validated + delete validated.variants + await productService .withTransaction(transactionManager) .update(id, validated) + + if (!variants) { + return + } + + const product = await productService + .withTransaction(transactionManager) + .retrieve(id, { + relations: ["variants"], + }) + + // Iterate product variants and update their properties accordingly + for (const variant of product.variants) { + const exists = variants.find((v) => v.id && variant.id === v.id) + if (!exists) { + await productVariantService + .withTransaction(transactionManager) + .delete(variant.id) + } + } + + const allVariantTransactions: DistributedTransaction[] = [] + const transactionDependencies = { + manager: transactionManager, + inventoryService, + productVariantInventoryService, + productVariantService, + } + + for (const [index, newVariant] of variants.entries()) { + const variantRank = index + + if (newVariant.id) { + const variant = product.variants.find((v) => v.id === newVariant.id) + + if (!variant) { + throw new MedusaError( + MedusaError.Types.NOT_FOUND, + `Variant with id: ${newVariant.id} is not associated with this product` + ) + } + + await productVariantService + .withTransaction(transactionManager) + .update(variant, { + ...newVariant, + variant_rank: variantRank, + product_id: variant.product_id, + }) + } else { + // If the provided variant does not have an id, we assume that it + // should be created + + try { + const input = { + ...newVariant, + variant_rank: variantRank, + options: newVariant.options || [], + prices: newVariant.prices || [], + } + + const varTransation = await createVariantTransaction( + transactionDependencies, + product.id, + input as CreateProductVariantInput + ) + allVariantTransactions.push(varTransation) + } catch (e) { + await Promise.all( + allVariantTransactions.map(async (transaction) => { + await revertVariantTransaction( + transactionDependencies, + transaction + ).catch(() => logger.warn("Transaction couldn't be reverted.")) + }) + ) + + throw e + } + } + } }) const rawProduct = await productService.retrieve(id, { diff --git a/packages/medusa/src/services/__mocks__/product.js b/packages/medusa/src/services/__mocks__/product.js index 59bae5d3fc..f9c9e330e9 100644 --- a/packages/medusa/src/services/__mocks__/product.js +++ b/packages/medusa/src/services/__mocks__/product.js @@ -60,10 +60,28 @@ export const products = { }, ], }, + multipleVariants: { + id: IdMap.getId("multipleVariants"), + title: "Multiple Variants", + variants: [ + { + id: IdMap.getId("variant_1"), + title: "Variant 1", + }, + { + id: IdMap.getId("variant_2"), + title: "Variant 2", + }, + { + id: IdMap.getId("variant_3"), + title: "Variant 3", + }, + ], + }, } export const ProductServiceMock = { - withTransaction: function() { + withTransaction: function () { return this }, create: jest.fn().mockImplementation((data) => { @@ -135,6 +153,9 @@ export const ProductServiceMock = { if (productId === IdMap.getId("variantsWithPrices")) { return Promise.resolve(products.variantsWithPrices) } + if (productId === IdMap.getId("multipleVariants")) { + return Promise.resolve(products.multipleVariants) + } return Promise.resolve(undefined) }), update: jest.fn().mockImplementation((product, data) => { diff --git a/packages/medusa/src/services/__tests__/product.js b/packages/medusa/src/services/__tests__/product.js index 79d52bee29..29b5c92843 100644 --- a/packages/medusa/src/services/__tests__/product.js +++ b/packages/medusa/src/services/__tests__/product.js @@ -247,20 +247,6 @@ describe("ProductService", () => { }) productTypeRepository.upsertType = mockUpsertType - const productVariantRepository = MockRepository() - - const productVariantService = { - withTransaction: function () { - return this - }, - update: (variant, update) => { - if (variant.id) { - return update - } - return Promise.resolve() - }, - } - const productTagRepository = MockRepository({ findOne: (data) => { if (data.where.value === "test") { @@ -295,8 +281,6 @@ describe("ProductService", () => { const productService = new ProductService({ manager: MockManager, productRepository, - productVariantService, - productVariantRepository, productTagRepository, productTypeRepository, eventBusService, @@ -327,17 +311,6 @@ describe("ProductService", () => { }) }) - it("successfully updates product variants", async () => { - await productService.update(IdMap.getId("ironman&co"), { - variants: [{ id: IdMap.getId("green"), title: "Greener" }], - }) - - // The update of variants will be tested in product variant test file - // Here we just test, that the function reaches its end when updating - // variants - expect(productRepository.save).toHaveBeenCalledTimes(1) - }) - it("successfully updates product status", async () => { await productService.update(IdMap.getId("ironman"), { status: "published", @@ -376,30 +349,6 @@ describe("ProductService", () => { }) }) - it("successfully updates variant ranking", async () => { - await productService.update("ranking test", { - variants: [ - { id: "test_321", title: "Greener", variant_rank: 1 }, - { id: "test_123", title: "Blueer", variant_rank: 0 }, - ], - }) - - expect(eventBusService.emit).toHaveBeenCalledTimes(1) - expect(eventBusService.emit).toHaveBeenCalledWith( - "product.updated", - expect.any(Object) - ) - - expect(productRepository.save).toHaveBeenCalledTimes(1) - expect(productRepository.save).toHaveBeenCalledWith({ - id: "ranking test", - variants: [ - { id: "test_321", title: "Greener", variant_rank: 0 }, - { id: "test_123", title: "Blueer", variant_rank: 1 }, - ], - }) - }) - it("successfully updates tags", async () => { await productService.update(IdMap.getId("ironman"), { tags: [ @@ -431,23 +380,6 @@ describe("ProductService", () => { expect(err.message).toEqual("Product with id: 123 was not found") } }) - - it("throws on wrong variant in update", async () => { - try { - await productService.update(IdMap.getId("ironman&co"), { - variants: [ - { id: IdMap.getId("yellow") }, - { id: IdMap.getId("green") }, - ], - }) - } catch (err) { - expect(err.message).toEqual( - `Variant with id: ${IdMap.getId( - "yellow" - )} is not associated with this product` - ) - } - }) }) describe("delete", () => { diff --git a/packages/medusa/src/services/product-variant-inventory.ts b/packages/medusa/src/services/product-variant-inventory.ts index 7e20e40ee7..ac9554c0e3 100644 --- a/packages/medusa/src/services/product-variant-inventory.ts +++ b/packages/medusa/src/services/product-variant-inventory.ts @@ -249,13 +249,13 @@ class ProductVariantInventoryService extends TransactionBaseService { * Attach a variant to an inventory item * @param variantId variant id * @param inventoryItemId inventory item id - * @param quantity quantity of variant to attach + * @param requiredQuantity quantity of variant to attach * @returns the variant inventory item */ async attachInventoryItem( variantId: string, inventoryItemId: string, - quantity?: number + requiredQuantity?: number ): Promise { const manager = this.transactionManager_ || this.manager_ @@ -289,14 +289,14 @@ class ProductVariantInventoryService extends TransactionBaseService { } let quantityToStore = 1 - if (typeof quantity !== "undefined") { - if (quantity < 1) { + if (typeof requiredQuantity !== "undefined") { + if (requiredQuantity < 1) { throw new MedusaError( MedusaError.Types.INVALID_DATA, "Quantity must be greater than 0" ) } else { - quantityToStore = quantity + quantityToStore = requiredQuantity } } diff --git a/packages/medusa/src/services/product.ts b/packages/medusa/src/services/product.ts index 71ba150f5f..d45b363989 100644 --- a/packages/medusa/src/services/product.ts +++ b/packages/medusa/src/services/product.ts @@ -504,7 +504,7 @@ class ProductService extends TransactionBaseService { ) const imageRepo = manager.getCustomRepository(this.imageRepository_) - const relations = ["variants", "tags", "images"] + const relations = ["tags", "images"] if ( this.featureFlagRouter_.isFeatureEnabled(SalesChannelFeatureFlag.key) @@ -526,7 +526,6 @@ class ProductService extends TransactionBaseService { }) const { - variants, metadata, images, tags, @@ -581,57 +580,6 @@ class ProductService extends TransactionBaseService { } } - if (variants) { - // Iterate product variants and update their properties accordingly - for (const variant of product.variants) { - const exists = variants.find((v) => v.id && variant.id === v.id) - if (!exists) { - await productVariantRepo.remove(variant) - } - } - - const newVariants: ProductVariant[] = [] - for (const [i, newVariant] of variants.entries()) { - const variant_rank = i - - if (newVariant.id) { - const variant = product.variants.find((v) => v.id === newVariant.id) - - if (!variant) { - throw new MedusaError( - MedusaError.Types.NOT_FOUND, - `Variant with id: ${newVariant.id} is not associated with this product` - ) - } - - const saved = await this.productVariantService_ - .withTransaction(manager) - .update(variant, { - ...newVariant, - variant_rank, - product_id: variant.product_id, - }) - - newVariants.push(saved) - } else { - // If the provided variant does not have an id, we assume that it - // should be created - const created = await this.productVariantService_ - .withTransaction(manager) - .create(product.id, { - ...newVariant, - variant_rank, - options: newVariant.options || [], - prices: newVariant.prices || [], - }) - - newVariants.push(created) - } - } - - product.variants = newVariants - } - for (const [key, value] of Object.entries(rest)) { if (isDefined(value)) { product[key] = value diff --git a/packages/medusa/src/utils/__tests__/transaction/transaction-orchestrator.ts b/packages/medusa/src/utils/__tests__/transaction/transaction-orchestrator.ts index 7b1f9046d7..d00d3b198f 100644 --- a/packages/medusa/src/utils/__tests__/transaction/transaction-orchestrator.ts +++ b/packages/medusa/src/utils/__tests__/transaction/transaction-orchestrator.ts @@ -7,6 +7,10 @@ import { } from "../../transaction" describe("Transaction Orchestrator", () => { + afterEach(() => { + jest.clearAllMocks() + }) + it("Should follow the flow by calling steps in order with the correct payload", async () => { const mocks = { one: jest.fn().mockImplementation((payload) => { @@ -187,7 +191,7 @@ describe("Transaction Orchestrator", () => { expect(actionOrder).toEqual(["one", "two", "three"]) }) - it("Should forward step response if flag 'forwardResponse' is set to true", async () => { + it("Should store invoke's step response if flag 'saveResponse' is set to true", async () => { const mocks = { one: jest.fn().mockImplementation((data) => { return { abc: 1234 } @@ -195,8 +199,11 @@ describe("Transaction Orchestrator", () => { two: jest.fn().mockImplementation((data) => { return { def: "567" } }), - three: jest.fn().mockImplementation((data) => { - return { end: true } + three: jest.fn().mockImplementation((data, context) => { + return { end: true, onePropAbc: context.invoke.firstMethod.abc } + }), + four: jest.fn().mockImplementation((data) => { + return null }), } @@ -217,24 +224,36 @@ describe("Transaction Orchestrator", () => { }, }, thirdMethod: { + [TransactionHandlerType.INVOKE]: (data, context) => { + return mocks.three(data, context) + }, + }, + fourthMethod: { [TransactionHandlerType.INVOKE]: (data) => { - return mocks.three(data) + return mocks.four(data) }, }, } - return command[actionId][functionHandlerType]({ ...payload.data }) + return command[actionId][functionHandlerType]( + payload.data, + payload.context + ) } const flow: TransactionStepsDefinition = { next: { action: "firstMethod", - forwardResponse: true, + saveResponse: true, next: { action: "secondMethod", - forwardResponse: true, + saveResponse: true, next: { action: "thirdMethod", + saveResponse: true, + next: { + action: "fourthMethod", + }, }, }, }, @@ -245,18 +264,111 @@ describe("Transaction Orchestrator", () => { const transaction = await strategy.beginTransaction( "transaction_id_123", handler, - { - prop: 123, - } + { prop: 123 } ) await strategy.resume(transaction) expect(mocks.one).toBeCalledWith({ prop: 123 }) + expect(mocks.two).toBeCalledWith({ prop: 123 }) - expect(mocks.two).toBeCalledWith({ prop: 123, _response: { abc: 1234 } }) + expect(mocks.three).toBeCalledWith( + { prop: 123 }, + { + invoke: { + firstMethod: { abc: 1234 }, + secondMethod: { def: "567" }, + thirdMethod: { end: true, onePropAbc: 1234 }, + }, + compensate: {}, + } + ) + }) - expect(mocks.three).toBeCalledWith({ prop: 123, _response: { def: "567" } }) + it("Should store compensate's step responses if flag 'saveResponse' is set to true", async () => { + const mocks = { + one: jest.fn().mockImplementation(() => { + return 1 + }), + two: jest.fn().mockImplementation(() => { + return 2 + }), + compensateOne: jest.fn().mockImplementation((compensateContext) => { + return "compensate 1 - 2 = " + compensateContext.secondMethod.two + }), + compensateTwo: jest.fn().mockImplementation((compensateContext) => { + return { two: "isCompensated" } + }), + } + + async function handler( + actionId: string, + functionHandlerType: TransactionHandlerType, + payload: TransactionPayload + ) { + const command = { + firstMethod: { + [TransactionHandlerType.INVOKE]: () => { + return mocks.one() + }, + [TransactionHandlerType.COMPENSATE]: ({ compensate }) => { + return mocks.compensateOne({ ...compensate }) + }, + }, + secondMethod: { + [TransactionHandlerType.INVOKE]: () => { + return mocks.two() + }, + [TransactionHandlerType.COMPENSATE]: ({ compensate }) => { + return mocks.compensateTwo({ ...compensate }) + }, + }, + thirdMethod: { + [TransactionHandlerType.INVOKE]: () => { + throw new Error("failed") + }, + }, + } + + return command[actionId][functionHandlerType](payload.context) + } + + const flow: TransactionStepsDefinition = { + next: { + action: "firstMethod", + saveResponse: true, + next: { + action: "secondMethod", + saveResponse: true, + next: { + action: "thirdMethod", + noCompensation: true, + }, + }, + }, + } + + const strategy = new TransactionOrchestrator("transaction-name", flow) + + const transaction = await strategy.beginTransaction( + "transaction_id_123", + handler + ) + + await strategy.resume(transaction) + const resposes = transaction.getContext() + + expect(mocks.compensateTwo).toBeCalledWith({}) + + expect(mocks.compensateOne).toBeCalledWith({ + secondMethod: { + two: "isCompensated", + }, + }) + + expect(resposes.compensate.firstMethod).toEqual( + "compensate 1 - 2 = isCompensated" + ) }) it("Should continue the exection of next steps without waiting for the execution of all its parents when flag 'noWait' is set to true", async () => { @@ -362,8 +474,10 @@ describe("Transaction Orchestrator", () => { const flow: TransactionStepsDefinition = { next: { action: "firstMethod", + maxRetries: 3, next: { action: "secondMethod", + maxRetries: 3, }, }, } @@ -379,7 +493,7 @@ describe("Transaction Orchestrator", () => { expect(transaction.transactionId).toBe("transaction_id_123") expect(mocks.one).toBeCalledTimes(1) - expect(mocks.two).toBeCalledTimes(1 + strategy.DEFAULT_RETRIES) + expect(mocks.two).toBeCalledTimes(4) expect(transaction.getState()).toBe(TransactionState.REVERTED) expect(mocks.compensateOne).toBeCalledTimes(1) @@ -428,6 +542,7 @@ describe("Transaction Orchestrator", () => { const flow: TransactionStepsDefinition = { next: { action: "firstMethod", + maxRetries: 1, }, } @@ -440,7 +555,7 @@ describe("Transaction Orchestrator", () => { await strategy.resume(transaction) - expect(mocks.one).toBeCalledTimes(1 + strategy.DEFAULT_RETRIES) + expect(mocks.one).toBeCalledTimes(2) expect(transaction.getState()).toBe(TransactionState.FAILED) }) @@ -666,4 +781,77 @@ describe("Transaction Orchestrator", () => { expect(resumedTransaction.getState()).toBe(TransactionState.REVERTED) }) + + it("Should revert a transaction when .cancelTransaction() is called", async () => { + const mocks = { + one: jest.fn().mockImplementation((payload) => { + return payload + }), + oneCompensate: jest.fn().mockImplementation((payload) => { + return payload + }), + two: jest.fn().mockImplementation((payload) => { + return payload + }), + twoCompensate: jest.fn().mockImplementation((payload) => { + return payload + }), + } + + async function handler( + actionId: string, + functionHandlerType: TransactionHandlerType, + payload: TransactionPayload + ) { + const command = { + firstMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.one(payload) + }, + [TransactionHandlerType.COMPENSATE]: () => { + mocks.oneCompensate(payload) + }, + }, + secondMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.two(payload) + }, + [TransactionHandlerType.COMPENSATE]: () => { + mocks.twoCompensate(payload) + }, + }, + } + return command[actionId][functionHandlerType](payload) + } + + const flow: TransactionStepsDefinition = { + next: { + action: "firstMethod", + next: { + action: "secondMethod", + }, + }, + } + + const strategy = new TransactionOrchestrator("transaction-name", flow) + + const transaction = await strategy.beginTransaction( + "transaction_id_123", + handler + ) + + await strategy.resume(transaction) + + expect(transaction.getState()).toBe(TransactionState.DONE) + expect(mocks.one).toBeCalledTimes(1) + expect(mocks.two).toBeCalledTimes(1) + + await strategy.cancelTransaction(transaction) + + expect(transaction.getState()).toBe(TransactionState.REVERTED) + expect(mocks.one).toBeCalledTimes(1) + expect(mocks.two).toBeCalledTimes(1) + expect(mocks.oneCompensate).toBeCalledTimes(1) + expect(mocks.twoCompensate).toBeCalledTimes(1) + }) }) diff --git a/packages/medusa/src/utils/transaction/distributed-transaction.ts b/packages/medusa/src/utils/transaction/distributed-transaction.ts index 5f6b2c7ce6..ff977bcf1e 100644 --- a/packages/medusa/src/utils/transaction/distributed-transaction.ts +++ b/packages/medusa/src/utils/transaction/distributed-transaction.ts @@ -1,14 +1,15 @@ +import e from "express" import { TransactionFlow, TransactionHandlerType, TransactionState } from "." /** - * @typedef {Object} TransactionMetadata - * @property {string} producer - The id of the producer that created the transaction (transactionModelId). - * @property {string} reply_to_topic - The topic to reply to for the transaction. - * @property {string} idempotency_key - The idempotency key of the transaction. - * @property {string} action - The action of the transaction. - * @property {TransactionHandlerType} action_type - The type of the transaction. - * @property {number} attempt - The number of attempts for the transaction. - * @property {number} timestamp - The timestamp of the transaction. + * @typedef TransactionMetadata + * @property producer - The id of the producer that created the transaction (transactionModelId). + * @property reply_to_topic - The topic to reply to for the transaction. + * @property idempotency_key - The idempotency key of the transaction. + * @property action - The action of the transaction. + * @property action_type - The type of the transaction. + * @property attempt - The number of attempts for the transaction. + * @property timestamp - The timestamp of the transaction. */ export type TransactionMetadata = { producer: string @@ -20,16 +21,44 @@ export type TransactionMetadata = { timestamp: number } +/** + * @typedef TransactionContext + * @property invoke - Object containing responses of Invoke handlers on steps flagged with saveResponse. + * @property compensate - Object containing responses of Compensate handlers on steps flagged with saveResponse. + */ +export class TransactionContext { + constructor( + public invoke: Record = {}, + public compensate: Record = {} + ) {} +} + +export class TransactionStepError { + constructor( + public action: string, + public handlerType: TransactionHandlerType, + public error: Error | null + ) {} +} + +export class TransactionCheckpoint { + constructor( + public flow: TransactionFlow, + public context: TransactionContext, + public errors: TransactionStepError[] = [] + ) {} +} + export class TransactionPayload { /** * @param metadata - The metadata of the transaction. - * @param data - The payload data of the transaction and the response of the previous step if forwardResponse is true. + * @param data - The initial payload data to begin a transation. + * @param context - Object gathering responses of all steps flagged with saveResponse. */ constructor( public metadata: TransactionMetadata, - public data: Record & { - _response: Record - } + public data: Record, + public context: TransactionContext ) {} } @@ -40,11 +69,10 @@ export class TransactionPayload { export class DistributedTransaction { public modelId: string public transactionId: string - public errors: { - action: string - handlerType: TransactionHandlerType - error: Error | null - }[] = [] + + private errors: TransactionStepError[] = [] + + private context: TransactionContext = new TransactionContext() constructor( private flow: TransactionFlow, @@ -53,16 +81,34 @@ export class DistributedTransaction { handlerType: TransactionHandlerType, payload: TransactionPayload ) => Promise, - public payload?: any + public payload?: any, + errors?: TransactionStepError[], + context?: TransactionContext ) { this.transactionId = flow.transactionId this.modelId = flow.transactionModelId + + if (errors) { + this.errors = errors + } + + if (context) { + this.context = context + } } public getFlow() { return this.flow } + public getContext() { + return this.context + } + + public getErrors() { + return this.errors + } + public addError( action: string, handlerType: TransactionHandlerType, @@ -75,6 +121,14 @@ export class DistributedTransaction { }) } + public addResponse( + action: string, + handlerType: TransactionHandlerType, + response: unknown + ) { + this.context[handlerType][action] = response + } + public hasFinished(): boolean { return [ TransactionState.DONE, @@ -106,15 +160,22 @@ export class DistributedTransaction { public static keyValueStore: any = {} // TODO: Use Key/Value db private static keyPrefix = "dtrans:" - public async saveCheckpoint(): Promise { + public async saveCheckpoint(): Promise { // TODO: Use Key/Value db to save transactions const key = DistributedTransaction.keyPrefix + this.transactionId - DistributedTransaction.keyValueStore[key] = JSON.stringify(this.getFlow()) + const data = new TransactionCheckpoint( + this.getFlow(), + this.getContext(), + this.getErrors() + ) + DistributedTransaction.keyValueStore[key] = JSON.stringify(data) + + return data } - public static async loadTransactionFlow( + public static async loadTransaction( transactionId: string - ): Promise { + ): Promise { // TODO: Use Key/Value db to load transactions const key = DistributedTransaction.keyPrefix + transactionId if (DistributedTransaction.keyValueStore[key]) { diff --git a/packages/medusa/src/utils/transaction/index.ts b/packages/medusa/src/utils/transaction/index.ts index 5d6d9fb628..e9fee3f8b7 100644 --- a/packages/medusa/src/utils/transaction/index.ts +++ b/packages/medusa/src/utils/transaction/index.ts @@ -12,7 +12,7 @@ export type TransactionStepsDefinition = { timeout?: number async?: boolean noWait?: boolean - forwardResponse?: boolean + saveResponse?: boolean next?: TransactionStepsDefinition | TransactionStepsDefinition[] } diff --git a/packages/medusa/src/utils/transaction/transaction-orchestrator.ts b/packages/medusa/src/utils/transaction/transaction-orchestrator.ts index b8387137d4..cade8aa2f9 100644 --- a/packages/medusa/src/utils/transaction/transaction-orchestrator.ts +++ b/packages/medusa/src/utils/transaction/transaction-orchestrator.ts @@ -9,9 +9,10 @@ import { } from "." import { DistributedTransaction, + TransactionCheckpoint, TransactionPayload, } from "./distributed-transaction" -import { TransactionStep } from "./transaction-step" +import { TransactionStep, TransactionStepHandler } from "./transaction-step" export type TransactionFlow = { transactionModelId: string @@ -25,12 +26,6 @@ export type TransactionFlow = { } } -export type TransactionStepHandler = ( - actionId: string, - handlerType: TransactionHandlerType, - payload: TransactionPayload -) => Promise - /** * @class TransactionOrchestrator is responsible for managing and executing distributed transactions. * It is based on a single transaction definition, which is used to execute all the transaction steps @@ -40,7 +35,7 @@ export class TransactionOrchestrator extends EventEmitter { private invokeSteps: string[] = [] private compensateSteps: string[] = [] - public DEFAULT_RETRIES = 3 + public DEFAULT_RETRIES = 0 constructor( public id: string, private definition: TransactionStepsDefinition @@ -218,6 +213,7 @@ export class TransactionOrchestrator extends EventEmitter { this.emit("finish", transaction) + // TODO: check TransactionModel if it should delete the checkpoint when the transaction is done void transaction.deleteCheckpoint() } @@ -253,8 +249,14 @@ export class TransactionOrchestrator extends EventEmitter { step: TransactionStep, response: unknown ): Promise { - if (step.forwardResponse) { - step.saveResponse(response) + if (step.saveResponse) { + transaction.addResponse( + step.definition.action!, + step.isCompensating() + ? TransactionHandlerType.COMPENSATE + : TransactionHandlerType.INVOKE, + response + ) } step.changeStatus(TransactionStepStatus.OK) @@ -340,17 +342,6 @@ export class TransactionOrchestrator extends EventEmitter { step.changeStatus(TransactionStepStatus.WAITING) - const parent = this.getPreviousStep(flow, step) - let payloadData = transaction.payload - - if (parent.forwardResponse) { - if (!payloadData) { - payloadData = {} - } - - payloadData._response = parent.getResponse() - } - const payload = new TransactionPayload( { producer: flow.transactionModelId, @@ -368,7 +359,8 @@ export class TransactionOrchestrator extends EventEmitter { attempt: step.attempts, timestamp: Date.now(), }, - payloadData + transaction.payload, + transaction.getContext() ) if (!step.definition.async) { @@ -434,10 +426,33 @@ export class TransactionOrchestrator extends EventEmitter { await this.executeNext(transaction) } + /** + * Cancel and revert a transaction compensating all its executed steps. It can be an ongoing transaction or a completed one + * @param transaction - The transaction to be reverted + */ + public async cancelTransaction( + transaction: DistributedTransaction + ): Promise { + if (transaction.modelId !== this.id) { + throw new Error( + `TransactionModel "${transaction.modelId}" cannot be orchestrated by "${this.id}" model.` + ) + } + + const flow = transaction.getFlow() + if (flow.state === TransactionState.FAILED) { + throw new Error(`Cannot revert a perment failed transaction.`) + } + + flow.state = TransactionState.WAITING_TO_COMPENSATE + + await this.executeNext(transaction) + } + private async createTransactionFlow( transactionId: string ): Promise { - const model: TransactionFlow = { + return { transactionModelId: this.id, transactionId: transactionId, hasFailedSteps: false, @@ -446,16 +461,19 @@ export class TransactionOrchestrator extends EventEmitter { definition: this.definition, steps: this.buildSteps(this.definition), } - return model } - private async getTransactionFlowById( + private async loadTransactionById( transactionId: string - ): Promise { - const flow = await DistributedTransaction.loadTransactionFlow(transactionId) - if (flow !== null) { - flow.steps = this.buildSteps(flow.definition, flow.steps) - return flow + ): Promise { + const transaction = await DistributedTransaction.loadTransaction( + transactionId + ) + + if (transaction !== null) { + const flow = transaction.flow + transaction.flow.steps = this.buildSteps(flow.definition, flow.steps) + return transaction } return null @@ -495,6 +513,7 @@ export class TransactionOrchestrator extends EventEmitter { level.push(obj.action) const id = level.join(".") const parent = level.slice(0, level.length - 1).join(".") + states[parent].next?.push(id) const definitionCopy = { ...obj } @@ -506,7 +525,7 @@ export class TransactionOrchestrator extends EventEmitter { id, depth: level.length - 1, definition: definitionCopy, - forwardResponse: definitionCopy.forwardResponse, + saveResponse: definitionCopy.saveResponse, invoke: { state: TransactionState.NOT_STARTED, status: TransactionStepStatus.IDLE, @@ -538,15 +557,24 @@ export class TransactionOrchestrator extends EventEmitter { handler: TransactionStepHandler, payload?: unknown ): Promise { - let modelFlow = await this.getTransactionFlowById(transactionId) + const existingTransaction = await this.loadTransactionById(transactionId) let newTransaction = false - if (!modelFlow) { + let modelFlow + if (!existingTransaction) { modelFlow = await this.createTransactionFlow(transactionId) newTransaction = true + } else { + modelFlow = existingTransaction.flow } - const transaction = new DistributedTransaction(modelFlow, handler, payload) + const transaction = new DistributedTransaction( + modelFlow, + handler, + payload, + existingTransaction?.errors, + existingTransaction?.context + ) if (newTransaction) { await transaction.saveCheckpoint() } @@ -583,18 +611,18 @@ export class TransactionOrchestrator extends EventEmitter { } if (!transaction) { - const existingTransaction = await this.getTransactionFlowById( - transactionId - ) + const existingTransaction = await this.loadTransactionById(transactionId) if (existingTransaction === null) { - throw new Error("Transaction could not be found.") + throw new Error(`Transaction ${transactionId} could not be found.`) } transaction = new DistributedTransaction( - existingTransaction, + existingTransaction.flow, handler!, - payload + payload, + existingTransaction.errors, + existingTransaction.context ) } @@ -616,24 +644,24 @@ export class TransactionOrchestrator extends EventEmitter { * @param responseIdempotencyKey - The idempotency key for the step * @param handler - The handler function to execute the step * @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey - * @param payload - The payload of the step + * @param response - The response of the step */ public async registerStepSuccess( responseIdempotencyKey: string, handler?: TransactionStepHandler, transaction?: DistributedTransaction, - payload?: unknown + response?: unknown ): Promise { const [curTransaction, step] = await this.getTransactionAndStepFromIdempotencyKey( responseIdempotencyKey, handler, transaction, - payload + response ) if (step.getStates().status === TransactionStepStatus.WAITING) { - await this.setStepSuccess(curTransaction, step, payload) + await this.setStepSuccess(curTransaction, step, response) this.emit("resume", curTransaction) await this.executeNext(curTransaction) } else { @@ -651,21 +679,21 @@ export class TransactionOrchestrator extends EventEmitter { * @param error - The error that caused the failure * @param handler - The handler function to execute the step * @param transaction - The current transaction - * @param payload - The payload of the step + * @param response - The response of the step */ public async registerStepFailure( responseIdempotencyKey: string, error: Error | null, handler?: TransactionStepHandler, transaction?: DistributedTransaction, - payload?: unknown + response?: unknown ): Promise { const [curTransaction, step] = await this.getTransactionAndStepFromIdempotencyKey( responseIdempotencyKey, handler, transaction, - payload + response ) if (step.getStates().status === TransactionStepStatus.WAITING) { @@ -680,8 +708,4 @@ export class TransactionOrchestrator extends EventEmitter { return curTransaction } - - public cancelTransaction(transactionId: string) { - // TODO: stop a transaction while in progress and compensate all executed steps - } } diff --git a/packages/medusa/src/utils/transaction/transaction-step.ts b/packages/medusa/src/utils/transaction/transaction-step.ts index 3dcb579359..d6c502c34d 100644 --- a/packages/medusa/src/utils/transaction/transaction-step.ts +++ b/packages/medusa/src/utils/transaction/transaction-step.ts @@ -2,8 +2,16 @@ import { TransactionStepsDefinition, TransactionStepStatus, TransactionState, + TransactionHandlerType, + TransactionPayload, } from "." +export type TransactionStepHandler = ( + actionId: string, + handlerType: TransactionHandlerType, + payload: TransactionPayload +) => Promise + /** * @class TransactionStep * @classdesc A class representing a single step in a transaction flow @@ -19,8 +27,7 @@ export class TransactionStep { * @member failures - The number of failures encountered while executing the step * @member lastAttempt - The timestamp of the last attempt made to execute the step * @member next - The ids of the next steps in the flow - * @member response - The response from the last successful execution of the step - * @member forwardResponse - A flag indicating if the response from the previous step should be passed to this step as payload + * @member saveResponse - A flag indicating if the response of a step should be shared in the transaction context and available to subsequent steps - default is false */ private stepFailed = false id: string @@ -38,8 +45,7 @@ export class TransactionStep { failures: number lastAttempt: number | null next: string[] - response: unknown - forwardResponse: boolean + saveResponse: boolean public getStates() { return this.isCompensating() ? this.compensate : this.invoke @@ -123,14 +129,6 @@ export class TransactionStep { ) } - public saveResponse(response) { - this.response = response - } - - public getResponse(): unknown { - return this.response - } - canRetry(): boolean { return !!( this.lastAttempt &&