test(): Test the create product workflow compensation (#4716)

**What**
Integration tests to validate the workflow compensation.
Also, fix the transaction state when the workflow is compensating and some steps does not have any compensation
This commit is contained in:
Adrien de Peretti
2023-08-09 16:33:04 +02:00
committed by GitHub
parent 86c314eb2a
commit ac866ebb51
21 changed files with 391 additions and 193 deletions

View File

@@ -0,0 +1,12 @@
---
"@medusajs/orchestration": patch
"@medusajs/workflows": patch
"@medusajs/types": patch
"@medusajs/medusa": patch
---
test(): Test the create product workflow compensation
fix(orchestration): Fix the transaction state after compensating with no compensation steps in the middle
chore(workflows): Export and naming
feat(types): Update product workflow input types
feat(medusa): Update product workflow usage and cleanup endpoint

View File

@@ -5,7 +5,7 @@
"license": "MIT",
"private": true,
"scripts": {
"test:integration": "jest --silent=false --runInBand --bail --detectOpenHandles --forceExit",
"test:integration": "jest --silent=false --maxWorkers=50% --bail --detectOpenHandles --forceExit",
"build": "babel src -d dist --extensions \".ts,.js\""
},
"dependencies": {

View File

@@ -0,0 +1,134 @@
import path from "path"
import { initDb, useDb } from "../../../../environment-helpers/use-db"
import { bootstrapApp } from "../../../../environment-helpers/bootstrap-app"
import {
createProducts,
CreateProductsActions,
Handlers,
pipe,
} from "@medusajs/workflows"
import { IProductModuleService, WorkflowTypes } from "@medusajs/types"
import { ModuleRegistrationName } from "@medusajs/modules-sdk"
describe("CreateProduct workflow", function () {
let medusaProcess
let medusaContainer
beforeAll(async () => {
const cwd = path.resolve(path.join(__dirname, "..", "..", ".."))
await initDb({ cwd } as any)
const { container } = await bootstrapApp({ cwd })
medusaContainer = container
})
afterAll(async () => {
const db = useDb()
await db.shutdown()
medusaProcess.kill()
})
it("should compensate all the invoke if something fails", async () => {
const workflow = createProducts(medusaContainer)
workflow.appendAction(
"fail_step",
CreateProductsActions.attachInventoryItems,
{
invoke: pipe({}, async function failStep() {
throw new Error(`Failed to create products`)
}),
},
{
noCompensation: true,
}
)
const input: WorkflowTypes.ProductWorkflow.CreateProductsWorkflowInputDTO =
{
products: [
{
title: "Test product",
type: { value: "physical" },
tags: [{ value: "test" }],
subtitle: "Test subtitle",
variants: [
{
title: "Test variant",
prices: [
{
amount: 100,
currency_code: "usd",
},
],
},
],
options: [
{
title: "Test option",
},
],
},
],
}
const manager = medusaContainer.resolve("manager")
const context = {
manager,
}
const { result, errors, transaction } = await workflow.run({
input,
context,
throwOnError: false,
})
expect(errors).toEqual([
{
action: "fail_step",
handlerType: "invoke",
error: new Error(`Failed to create products`),
},
])
expect(transaction.getState()).toEqual("reverted")
expect(result).toHaveLength(1)
expect(result[0]).toEqual(
expect.objectContaining({
id: expect.any(String),
})
)
const productId = result[0].id
let [product] = await Handlers.ProductHandlers.listProducts({
container: medusaContainer,
context,
data: {
ids: [productId],
},
} as any)
expect(product).toBeUndefined()
const productModule = medusaContainer.resolve(
ModuleRegistrationName.PRODUCT
) as IProductModuleService
;[product] = await productModule.list(
{
id: productId,
},
{
withDeleted: true,
}
)
expect(product).toEqual(
expect.objectContaining({
deleted_at: expect.any(String),
})
)
})
})

View File

@@ -1,6 +1,7 @@
module.exports = {
globals: {
"ts-jest": {
tsconfig: "tsconfig.spec.json",
diagnostics: false,
isolatedModules: true,
},

View File

@@ -0,0 +1,4 @@
{
"extends": "./tsconfig.json",
"include": ["test"],
}

View File

@@ -40,7 +40,7 @@
"prepare": "cross-env NODE_ENV=production yarn run build",
"build": "rimraf dist && tsc --build",
"serve": "node dist/app.js",
"test": "jest"
"test": "jest --maxWorkers=50% --detectOpenHandles --forceExit"
},
"peerDependencies": {
"medusa-interfaces": "^1.3.7",

View File

@@ -2,6 +2,7 @@ import { IdMap } from "medusa-test-utils"
import { request } from "../../../../../helpers/test-request"
import { PriceListServiceMock } from "../../../../../services/__mocks__/price-list"
jest.setTimeout(10000)
describe("POST /price-lists", () => {
describe("successfully creates a price list", () => {
let subject

View File

@@ -35,7 +35,7 @@ import {
import { DistributedTransaction } from "@medusajs/orchestration"
import { IInventoryService, WorkflowTypes } from "@medusajs/types"
import { FlagRouter } from "@medusajs/utils"
import { Workflows, createProducts } from "@medusajs/workflows"
import { createProducts, Workflows } from "@medusajs/workflows"
import { Type } from "class-transformer"
import { EntityManager } from "typeorm"
import SalesChannelFeatureFlag from "../../../../loaders/feature-flags/sales-channels"
@@ -142,6 +142,8 @@ export default async (req, res) => {
)
}
let product
if (isWorkflowEnabled && !!productModuleService) {
const createProductWorkflow = createProducts(req.scope)
@@ -157,119 +159,114 @@ export default async (req, res) => {
},
}
const { result: products } = await createProductWorkflow.run({
const { result } = await createProductWorkflow.run({
input,
context: {
manager: entityManager,
},
})
product = result[0]
} else {
product = await entityManager.transaction(async (manager) => {
const { variants } = validated
delete validated.variants
return res.json({ product: products[0] })
if (!validated.thumbnail && validated.images && validated.images.length) {
validated.thumbnail = validated.images[0]
}
let shippingProfile
// Get default shipping profile
if (validated.is_giftcard) {
shippingProfile = await shippingProfileService
.withTransaction(manager)
.retrieveGiftCardDefault()
} else {
shippingProfile = await shippingProfileService
.withTransaction(manager)
.retrieveDefault()
}
// Provided that the feature flag is enabled and
// no sales channels are available, set the default one
if (
featureFlagRouter.isFeatureEnabled(SalesChannelFeatureFlag.key) &&
!validated?.sales_channels?.length
) {
const defaultSalesChannel = await salesChannelService
.withTransaction(manager)
.retrieveDefault()
validated.sales_channels = [defaultSalesChannel]
}
const newProduct = await productService
.withTransaction(manager)
.create({ ...validated, profile_id: shippingProfile.id })
if (variants) {
for (const [index, variant] of variants.entries()) {
variant["variant_rank"] = index
}
const optionIds =
validated?.options?.map(
(o) => newProduct.options.find((newO) => newO.title === o.title)?.id
) || []
const allVariantTransactions: DistributedTransaction[] = []
const transactionDependencies = {
manager,
inventoryService,
productVariantInventoryService,
productVariantService,
}
try {
const variantsInputData = variants.map((variant) => {
const options =
variant?.options?.map((option, index) => ({
...option,
option_id: optionIds[index],
})) || []
return {
...variant,
options,
} as CreateProductVariantInput
})
const varTransaction = await createVariantsTransaction(
transactionDependencies,
newProduct.id,
variantsInputData
)
allVariantTransactions.push(varTransaction)
} catch (e) {
await Promise.all(
allVariantTransactions.map(async (transaction) => {
await revertVariantTransaction(
transactionDependencies,
transaction
).catch(() => logger.warn("Transaction couldn't be reverted."))
})
)
throw e
}
}
return newProduct
})
}
const product = await entityManager.transaction(async (manager) => {
const { variants } = validated
delete validated.variants
if (!validated.thumbnail && validated.images && validated.images.length) {
validated.thumbnail = validated.images[0]
}
let shippingProfile
// Get default shipping profile
if (validated.is_giftcard) {
shippingProfile = await shippingProfileService
.withTransaction(manager)
.retrieveGiftCardDefault()
} else {
shippingProfile = await shippingProfileService
.withTransaction(manager)
.retrieveDefault()
}
// Provided that the feature flag is enabled and
// no sales channels are available, set the default one
if (
featureFlagRouter.isFeatureEnabled(SalesChannelFeatureFlag.key) &&
!validated?.sales_channels?.length
) {
const defaultSalesChannel = await salesChannelService
.withTransaction(manager)
.retrieveDefault()
validated.sales_channels = [defaultSalesChannel]
}
const newProduct = await productService
.withTransaction(manager)
.create({ ...validated, profile_id: shippingProfile.id })
if (variants) {
for (const [index, variant] of variants.entries()) {
variant["variant_rank"] = index
}
const optionIds =
validated?.options?.map(
(o) => newProduct.options.find((newO) => newO.title === o.title)?.id
) || []
const allVariantTransactions: DistributedTransaction[] = []
const transactionDependencies = {
manager,
inventoryService,
productVariantInventoryService,
productVariantService,
}
try {
const variantsInputData = variants.map((variant) => {
const options =
variant?.options?.map((option, index) => ({
...option,
option_id: optionIds[index],
})) || []
return {
...variant,
options,
} as CreateProductVariantInput
})
const varTransaction = await createVariantsTransaction(
transactionDependencies,
newProduct.id,
variantsInputData
)
allVariantTransactions.push(varTransaction)
} catch (e) {
await Promise.all(
allVariantTransactions.map(async (transaction) => {
await revertVariantTransaction(
transactionDependencies,
transaction
).catch(() => logger.warn("Transaction couldn't be reverted."))
})
)
throw e
}
}
const rawProduct = await productService
.withTransaction(manager)
.retrieve(newProduct.id, {
select: defaultAdminProductFields,
relations: defaultAdminProductRelations,
})
const [product] = await pricingService
.withTransaction(manager)
.setProductPrices([rawProduct])
return product
const rawProduct = await productService.retrieve(product.id, {
select: defaultAdminProductFields,
relations: defaultAdminProductRelations,
})
res.json({ product })
const [pricedProduct] = await pricingService.setProductPrices([rawProduct])
res.json({ product: pricedProduct })
}
class ProductVariantOptionReq {

View File

@@ -22,7 +22,7 @@
"prepare": "cross-env NODE_ENV=production yarn run release",
"build": "rollup --config --environment NODE_ENV:development",
"release": "rollup --config --environment NODE_ENV:production",
"test": "jest src"
"test": "jest src --passWithNoTests"
},
"dependencies": {
"camelcase": "^6.3.0",

View File

@@ -787,6 +787,88 @@ describe("Transaction Orchestrator", () => {
expect(resumedTransaction.getState()).toBe(TransactionState.REVERTED)
})
it("Should hold the status REVERTED if the steps failed and the compensation succeed and has some no compensations step set", async () => {
const mocks = {
one: jest.fn().mockImplementation((payload) => {
return
}),
compensateOne: jest.fn().mockImplementation((payload) => {
return
}),
two: jest.fn().mockImplementation((payload) => {
return
}),
compensateTwo: jest.fn().mockImplementation((payload) => {
return
}),
three: jest.fn().mockImplementation((payload) => {
throw new Error("Third step error")
}),
}
async function handler(
actionId: string,
functionHandlerType: TransactionHandlerType,
payload: TransactionPayload
) {
const command = {
firstMethod: {
[TransactionHandlerType.INVOKE]: () => {
mocks.one(payload)
},
[TransactionHandlerType.COMPENSATE]: () => {
mocks.compensateOne(payload)
},
},
secondMethod: {
[TransactionHandlerType.INVOKE]: () => {
mocks.two(payload)
},
[TransactionHandlerType.COMPENSATE]: () => {
mocks.compensateTwo(payload)
},
},
thirdMethod: {
[TransactionHandlerType.INVOKE]: () => {
mocks.three(payload)
},
},
}
return command[actionId][functionHandlerType](payload)
}
const flow: TransactionStepsDefinition = {
next: {
action: "firstMethod",
next: {
action: "secondMethod",
next: {
action: "thirdMethod",
noCompensation: true,
},
},
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const transaction = await strategy.beginTransaction(
"transaction_id_123",
handler
)
await strategy.resume(transaction)
expect(mocks.one).toBeCalledTimes(1)
expect(mocks.compensateOne).toBeCalledTimes(1)
expect(mocks.two).toBeCalledTimes(1)
expect(mocks.compensateTwo).toBeCalledTimes(1)
expect(mocks.three).toBeCalledTimes(1)
expect(transaction.getState()).toBe(TransactionState.REVERTED)
})
it("Should revert a transaction when .cancelTransaction() is called", async () => {
const mocks = {
one: jest.fn().mockImplementation((payload) => {

View File

@@ -147,8 +147,7 @@ describe("WorkflowManager", () => {
expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0)
expect(handlers.get("bar").compensate).toHaveBeenCalledTimes(0)
// Failed because the async is flagged as noCompensation
expect(continuation.getState()).toBe(TransactionState.FAILED)
expect(continuation.getState()).toBe(TransactionState.REVERTED)
})
it("should update an existing global flow with a new step and a new handler", async () => {

View File

@@ -146,8 +146,7 @@ describe("WorkflowManager", () => {
expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0)
expect(handlers.get("bar").compensate).toHaveBeenCalledTimes(0)
// Failed because the async is flagged as noCompensation
expect(continuation.getState()).toBe(TransactionState.FAILED)
expect(continuation.getState()).toBe(TransactionState.REVERTED)
})
it("should update a flow with a new step and a new handler", async () => {

View File

@@ -74,9 +74,9 @@ export class DistributedTransaction {
public modelId: string
public transactionId: string
private errors: TransactionStepError[] = []
private readonly errors: TransactionStepError[] = []
private context: TransactionContext = new TransactionContext()
private readonly context: TransactionContext = new TransactionContext()
constructor(
private flow: TransactionFlow,

View File

@@ -7,8 +7,8 @@ import { TransactionStep, TransactionStepHandler } from "./transaction-step"
import {
TransactionHandlerType,
TransactionState,
TransactionStepStatus,
TransactionStepsDefinition,
TransactionStepStatus,
} from "./types"
import { EventEmitter } from "events"
@@ -233,9 +233,8 @@ export class TransactionOrchestrator extends EventEmitter {
const stepDef = flow.steps[step]
const curState = stepDef.getStates()
if (
(curState.state === TransactionState.DONE ||
curState.status === TransactionStepStatus.PERMANENT_FAILURE) &&
!stepDef.definition.noCompensation
curState.state === TransactionState.DONE ||
curState.status === TransactionStepStatus.PERMANENT_FAILURE
) {
stepDef.beginCompensation()
stepDef.changeState(TransactionState.NOT_STARTED)
@@ -334,6 +333,11 @@ export class TransactionOrchestrator extends EventEmitter {
if (curState.state === TransactionState.NOT_STARTED) {
if (step.isCompensating()) {
step.changeState(TransactionState.COMPENSATING)
if (step.definition.noCompensation) {
step.changeState(TransactionState.REVERTED)
continue
}
} else if (flow.state === TransactionState.INVOKING) {
step.changeState(TransactionState.INVOKING)
}

View File

@@ -1,5 +1,4 @@
import { ProductStatus } from "../../product"
import { WorkflowInputConfig } from "../common"
export interface CreateProductTypeInputDTO {
id?: string
@@ -92,5 +91,4 @@ export interface CreateProductInputDTO {
export interface CreateProductsWorkflowInputDTO {
products: CreateProductInputDTO[]
config?: WorkflowInputConfig
}

View File

@@ -1,2 +1,2 @@
export * from "./create-products"
export * from "./cart"
export * from "./product"

View File

@@ -1,19 +1,18 @@
import { InputAlias, Workflows } from "../definitions"
import { InputAlias, Workflows } from "../../definitions"
import {
TransactionStepsDefinition,
WorkflowManager,
} from "@medusajs/orchestration"
import { exportWorkflow, pipe } from "../helper"
import { aggregateData, exportWorkflow, pipe } from "../../helper"
import { ProductTypes, WorkflowTypes } from "@medusajs/types"
import {
InventoryHandlers,
MiddlewaresHandlers,
ProductHandlers,
} from "../handlers"
import { aggregateData } from "../helper/aggregate"
} from "../../handlers"
export enum Actions {
export enum CreateProductsActions {
prepare = "prepare",
createProducts = "createProducts",
attachToSalesChannel = "attachToSalesChannel",
@@ -21,34 +20,29 @@ export enum Actions {
createPrices = "createPrices",
createInventoryItems = "createInventoryItems",
attachInventoryItems = "attachInventoryItems",
result = "result",
}
export const workflowSteps: TransactionStepsDefinition = {
next: {
action: Actions.prepare,
action: CreateProductsActions.prepare,
noCompensation: true,
next: {
action: Actions.createProducts,
action: CreateProductsActions.createProducts,
next: [
{
action: Actions.attachShippingProfile,
action: CreateProductsActions.attachShippingProfile,
saveResponse: false,
},
{
action: Actions.attachToSalesChannel,
action: CreateProductsActions.attachToSalesChannel,
saveResponse: false,
},
{
action: Actions.createPrices,
action: CreateProductsActions.createPrices,
next: {
action: Actions.createInventoryItems,
action: CreateProductsActions.createInventoryItems,
next: {
action: Actions.attachInventoryItems,
next: {
action: Actions.result,
noCompensation: true,
},
action: CreateProductsActions.attachInventoryItems,
},
},
},
@@ -59,7 +53,7 @@ export const workflowSteps: TransactionStepsDefinition = {
const handlers = new Map([
[
Actions.prepare,
CreateProductsActions.prepare,
{
invoke: pipe(
{
@@ -69,17 +63,17 @@ const handlers = new Map([
},
},
aggregateData(),
MiddlewaresHandlers.createProductsPrepareData
ProductHandlers.createProductsPrepareData
),
},
],
[
Actions.createProducts,
CreateProductsActions.createProducts,
{
invoke: pipe(
{
invoke: {
from: Actions.prepare,
from: CreateProductsActions.prepare,
},
},
aggregateData(),
@@ -88,7 +82,7 @@ const handlers = new Map([
compensate: pipe(
{
invoke: {
from: Actions.createProducts,
from: CreateProductsActions.createProducts,
alias: ProductHandlers.removeProducts.aliases.products,
},
},
@@ -98,16 +92,16 @@ const handlers = new Map([
},
],
[
Actions.attachShippingProfile,
CreateProductsActions.attachShippingProfile,
{
invoke: pipe(
{
invoke: [
{
from: Actions.prepare,
from: CreateProductsActions.prepare,
},
{
from: Actions.createProducts,
from: CreateProductsActions.createProducts,
alias:
ProductHandlers.attachShippingProfileToProducts.aliases
.products,
@@ -121,10 +115,10 @@ const handlers = new Map([
{
invoke: [
{
from: Actions.prepare,
from: CreateProductsActions.prepare,
},
{
from: Actions.createProducts,
from: CreateProductsActions.createProducts,
alias:
ProductHandlers.detachShippingProfileFromProducts.aliases
.products,
@@ -137,16 +131,16 @@ const handlers = new Map([
},
],
[
Actions.attachToSalesChannel,
CreateProductsActions.attachToSalesChannel,
{
invoke: pipe(
{
invoke: [
{
from: Actions.prepare,
from: CreateProductsActions.prepare,
},
{
from: Actions.createProducts,
from: CreateProductsActions.createProducts,
alias:
ProductHandlers.attachSalesChannelToProducts.aliases.products,
},
@@ -159,10 +153,10 @@ const handlers = new Map([
{
invoke: [
{
from: Actions.prepare,
from: CreateProductsActions.prepare,
},
{
from: Actions.createProducts,
from: CreateProductsActions.createProducts,
alias:
ProductHandlers.detachSalesChannelFromProducts.aliases.products,
},
@@ -174,12 +168,12 @@ const handlers = new Map([
},
],
[
Actions.createInventoryItems,
CreateProductsActions.createInventoryItems,
{
invoke: pipe(
{
invoke: {
from: Actions.createProducts,
from: CreateProductsActions.createProducts,
alias: InventoryHandlers.createInventoryItems.aliases.products,
},
},
@@ -189,7 +183,7 @@ const handlers = new Map([
compensate: pipe(
{
invoke: {
from: Actions.createInventoryItems,
from: CreateProductsActions.createInventoryItems,
alias:
InventoryHandlers.removeInventoryItems.aliases.inventoryItems,
},
@@ -200,12 +194,12 @@ const handlers = new Map([
},
],
[
Actions.attachInventoryItems,
CreateProductsActions.attachInventoryItems,
{
invoke: pipe(
{
invoke: {
from: Actions.createInventoryItems,
from: CreateProductsActions.createInventoryItems,
alias:
InventoryHandlers.attachInventoryItems.aliases.inventoryItems,
},
@@ -216,7 +210,7 @@ const handlers = new Map([
compensate: pipe(
{
invoke: {
from: Actions.createInventoryItems,
from: CreateProductsActions.createInventoryItems,
alias:
InventoryHandlers.detachInventoryItems.aliases.inventoryItems,
},
@@ -227,16 +221,16 @@ const handlers = new Map([
},
],
[
Actions.createPrices,
CreateProductsActions.createPrices,
{
invoke: pipe(
{
invoke: [
{
from: Actions.prepare,
from: CreateProductsActions.prepare,
},
{
from: Actions.createProducts,
from: CreateProductsActions.createProducts,
alias:
ProductHandlers.updateProductsVariantsPrices.aliases.products,
},
@@ -249,10 +243,10 @@ const handlers = new Map([
{
invoke: [
{
from: Actions.prepare,
from: CreateProductsActions.prepare,
},
{
from: Actions.createProducts,
from: CreateProductsActions.createProducts,
alias:
ProductHandlers.updateProductsVariantsPrices.aliases.products,
},
@@ -264,32 +258,6 @@ const handlers = new Map([
),
},
],
[
Actions.result,
{
invoke: pipe(
{
invoke: [
{
from: Actions.prepare,
},
{
from: Actions.createProducts,
alias: "products",
},
],
},
async ({ data }) => {
return {
alias: ProductHandlers.listProducts.aliases.ids,
value: data.products.map((product) => product.id),
}
},
aggregateData(),
ProductHandlers.listProducts
),
},
],
])
WorkflowManager.register(Workflows.CreateProducts, workflowSteps, handlers)
@@ -297,4 +265,4 @@ WorkflowManager.register(Workflows.CreateProducts, workflowSteps, handlers)
export const createProducts = exportWorkflow<
WorkflowTypes.ProductWorkflow.CreateProductsWorkflowInputDTO,
ProductTypes.ProductDTO[]
>(Workflows.CreateProducts, Actions.result)
>(Workflows.CreateProducts, CreateProductsActions.createProducts)

View File

@@ -0,0 +1 @@
export * from "./create-products"

View File

@@ -1,2 +1 @@
export * from "./create-products-prepare-create-prices-compensation"
export * from "./create-products-prepare-data"

View File

@@ -28,7 +28,6 @@ export type CreateProductsPreparedData = {
ProductHandle,
VariantIndexAndPrices[]
>
config?: WorkflowTypes.CommonWorkflow.WorkflowInputConfig
}
export async function createProductsPrepareData({
@@ -145,7 +144,6 @@ export async function createProductsPrepareData({
productsHandleShippingProfileIdMap,
productsHandleSalesChannelsMap,
productsHandleVariantsIndexPricesMap,
config: data.config,
}
}

View File

@@ -1,3 +1,4 @@
export * from "./create-products-prepare-data"
export * from "./create-products"
export * from "./detach-sales-channel-from-products"
export * from "./attach-sales-channel-to-products"