fix(): Event group id propagation and event managements (#12157)

This commit is contained in:
Adrien de Peretti
2025-04-14 20:57:52 +02:00
committed by GitHub
parent 3a481290ea
commit 2f6963a5fb
22 changed files with 777 additions and 592 deletions

View File

@@ -0,0 +1,12 @@
---
"@medusajs/event-bus-local": patch
"@medusajs/orchestration": patch
"@medusajs/test-utils": patch
"@medusajs/modules-sdk": patch
"@medusajs/framework": patch
"@medusajs/index": patch
"@medusajs/types": patch
"@medusajs/utils": patch
---
fix: event group propagation and event managements

View File

@@ -1,11 +1,11 @@
import { IEventBusModuleService } from "@medusajs/types"
import { CommonEvents, Modules } from "@medusajs/utils"
import FormData from "form-data"
import fs from "fs/promises"
import { import {
medusaIntegrationTestRunner, medusaIntegrationTestRunner,
TestEventUtils, TestEventUtils,
} from "@medusajs/test-utils" } from "@medusajs/test-utils"
import { IEventBusModuleService } from "@medusajs/types"
import { CommonEvents, Modules } from "@medusajs/utils"
import FormData from "form-data"
import fs from "fs/promises"
import path from "path" import path from "path"
import { import {
adminHeaders, adminHeaders,

View File

@@ -74,13 +74,9 @@ async function populateData(api: any) {
}, },
] ]
await api await api.post("/admin/products/batch", { create: payload }, adminHeaders)
.post("/admin/products/batch", { create: payload }, adminHeaders)
.catch((err) => {
console.log(err)
})
await setTimeout(10000) await setTimeout(4000)
} }
process.env.ENABLE_INDEX_MODULE = "true" process.env.ENABLE_INDEX_MODULE = "true"
@@ -97,7 +93,7 @@ medusaIntegrationTestRunner({
process.env.ENABLE_INDEX_MODULE = "false" process.env.ENABLE_INDEX_MODULE = "false"
}) })
describe.skip("Index engine - Query.index", () => { describe("Index engine - Query.index", () => {
beforeEach(async () => { beforeEach(async () => {
await createAdminUser(dbConnection, adminHeaders, appContainer) await createAdminUser(dbConnection, adminHeaders, appContainer)
}) })
@@ -262,8 +258,7 @@ medusaIntegrationTestRunner({
]) ])
}) })
// TODO: Investigate why this test is flacky it("should use query.index to query the index module sorting by price desc", async () => {
it.skip("should use query.index to query the index module sorting by price desc", async () => {
await populateData(api) await populateData(api)
const query = appContainer.resolve( const query = appContainer.resolve(

View File

@@ -12,12 +12,58 @@ jest.setTimeout(100000)
process.env.ENABLE_INDEX_MODULE = "true" process.env.ENABLE_INDEX_MODULE = "true"
async function populateData(
api: any,
{
productCount = 50,
variantCount = 10,
priceCount = 10,
}: {
productCount?: number
variantCount?: number
priceCount?: number
} = {}
) {
const shippingProfile = (
await api.post(
`/admin/shipping-profiles`,
{ name: "Test", type: "default" },
adminHeaders
)
).data.shipping_profile
const payloads = new Array(productCount).fill(0).map((_, a) => ({
title: "Test Giftcard-" + a,
is_giftcard: true,
shipping_profile_id: shippingProfile.id,
description: "test-giftcard-description" + a,
options: [{ title: "Denominations", values: ["100"] }],
variants: new Array(variantCount).fill(0).map((_, i) => ({
title: `Test variant ${i}`,
sku: `test-variant-${i}${a}`,
prices: new Array(priceCount).fill(0).map((_, j) => ({
currency_code: Object.values(defaultCurrencies)[j].code,
amount: 10 * j,
})),
options: {
Denominations: "100",
},
})),
}))
for (const payload of payloads) {
await api.post("/admin/products", payload, adminHeaders)
}
await setTimeout(4000 * (productCount / 10))
}
medusaIntegrationTestRunner({ medusaIntegrationTestRunner({
testSuite: ({ getContainer, dbConnection, api, dbConfig }) => { testSuite: ({ getContainer, dbConnection, api, dbConfig }) => {
let indexEngine: IndexTypes.IIndexService let indexEngine: IndexTypes.IIndexService
let appContainer let appContainer
beforeAll(() => { beforeAll(async () => {
appContainer = getContainer() appContainer = getContainer()
indexEngine = appContainer.resolve(Modules.INDEX) indexEngine = appContainer.resolve(Modules.INDEX)
}) })
@@ -30,43 +76,13 @@ medusaIntegrationTestRunner({
await createAdminUser(dbConnection, adminHeaders, appContainer) await createAdminUser(dbConnection, adminHeaders, appContainer)
}) })
describe.skip("Index engine", () => { describe("Index engine", () => {
it("should search through the indexed data and return the correct results ordered and filtered [1]", async () => { it("should search through the indexed data and return the correct results ordered and filtered [1]", async () => {
const shippingProfile = ( await populateData(api, {
await api.post( productCount: 1,
`/admin/shipping-profiles`, variantCount: 10,
{ name: "Test", type: "default" }, priceCount: 10,
adminHeaders })
)
).data.shipping_profile
const payload = {
title: "Test Giftcard",
is_giftcard: true,
shipping_profile_id: shippingProfile.id,
description: "test-giftcard-description",
options: [{ title: "Denominations", values: ["100"] }],
variants: new Array(10).fill(0).map((_, i) => ({
title: `Test variant ${i}`,
sku: `test-variant-${i}`,
prices: new Array(10).fill(0).map((_, j) => ({
currency_code: Object.values(defaultCurrencies)[j].code,
amount: 10 * j,
})),
options: {
Denominations: "100",
},
})),
}
await api
.post("/admin/products", payload, adminHeaders)
.catch((err) => {
console.log(err)
})
// Timeout to allow indexing to finish
await setTimeout(4000)
const { data: results } = await fetchAndRetry( const { data: results } = await fetchAndRetry(
async () => async () =>
@@ -119,41 +135,11 @@ medusaIntegrationTestRunner({
}) })
it("should search through the indexed data and return the correct results ordered and filtered [2]", async () => { it("should search through the indexed data and return the correct results ordered and filtered [2]", async () => {
const shippingProfile = ( await populateData(api, {
await api.post( productCount: 1,
`/admin/shipping-profiles`, variantCount: 10,
{ name: "Test", type: "default" }, priceCount: 10,
adminHeaders })
)
).data.shipping_profile
const payload = {
title: "Test Giftcard",
is_giftcard: true,
description: "test-giftcard-description",
shipping_profile_id: shippingProfile.id,
options: [{ title: "Denominations", values: ["100"] }],
variants: new Array(10).fill(0).map((_, i) => ({
title: `Test variant ${i}`,
sku: `test-variant-${i}`,
prices: new Array(10).fill(0).map((_, j) => ({
currency_code: Object.values(defaultCurrencies)[j].code,
amount: 10 * j,
})),
options: {
Denominations: "100",
},
})),
}
await api
.post("/admin/products", payload, adminHeaders)
.catch((err) => {
console.log(err)
})
// Timeout to allow indexing to finish
await setTimeout(10000)
const { data: results } = await fetchAndRetry( const { data: results } = await fetchAndRetry(
async () => async () =>
@@ -205,43 +191,8 @@ medusaIntegrationTestRunner({
} }
}) })
it.skip("should search through the indexed data and return the correct results ordered and filtered [3]", async () => { it("should search through the indexed data and return the correct results ordered and filtered [3]", async () => {
const shippingProfile = ( await populateData(api)
await api.post(
`/admin/shipping-profiles`,
{ name: "Test", type: "default" },
adminHeaders
)
).data.shipping_profile
const payloads = new Array(50).fill(0).map((_, a) => ({
title: "Test Giftcard-" + a,
is_giftcard: true,
shipping_profile_id: shippingProfile.id,
description: "test-giftcard-description" + a,
options: [{ title: "Denominations", values: ["100"] }],
variants: new Array(10).fill(0).map((_, i) => ({
title: `Test variant ${i}`,
sku: `test-variant-${i}${a}`,
prices: new Array(10).fill(0).map((_, j) => ({
currency_code: Object.values(defaultCurrencies)[j].code,
amount: 10 * j,
})),
options: {
Denominations: "100",
},
})),
}))
let i = 0
for (const payload of payloads) {
++i
await api.post("/admin/products", payload, adminHeaders).then(() => {
console.log(`Created ${i} products in ${payloads.length} payloads`)
})
}
await setTimeout(5000)
const queryArgs = { const queryArgs = {
fields: [ fields: [

View File

@@ -66,16 +66,12 @@ medusaIntegrationTestRunner({
describe("Index engine syncing", () => { describe("Index engine syncing", () => {
it("should sync the data to the index based on the indexation configuration", async () => { it("should sync the data to the index based on the indexation configuration", async () => {
console.info("[Index engine] Creating products")
await populateData(api, { await populateData(api, {
productCount: 2, productCount: 2,
variantCount: 2, variantCount: 2,
priceCount: 2, priceCount: 2,
}) })
console.info("[Index engine] Creating products done")
await setTimeout(1000) await setTimeout(1000)
await dbConnection.raw('TRUNCATE TABLE "index_data";') await dbConnection.raw('TRUNCATE TABLE "index_data";')
await dbConnection.raw('TRUNCATE TABLE "index_relation";') await dbConnection.raw('TRUNCATE TABLE "index_relation";')
@@ -96,12 +92,9 @@ medusaIntegrationTestRunner({
// Prevent storage provider to be triggered though // Prevent storage provider to be triggered though
;(indexEngine as any).storageProvider_.onApplicationStart = jest.fn() ;(indexEngine as any).storageProvider_.onApplicationStart = jest.fn()
console.info("[Index engine] Triggering sync")
// Trigger a sync // Trigger a sync
await (indexEngine as any).onApplicationStart_() await (indexEngine as any).onApplicationStart_()
console.info("[Index engine] Sync done")
// 28 ms - 6511 records // 28 ms - 6511 records
const { data: results } = await indexEngine.query<"product">({ const { data: results } = await indexEngine.query<"product">({
fields: [ fields: [
@@ -122,12 +115,8 @@ medusaIntegrationTestRunner({
}) })
it("should sync the data to the index based on the updated indexation configuration", async () => { it("should sync the data to the index based on the updated indexation configuration", async () => {
console.info("[Index engine] Creating products")
await populateData(api) await populateData(api)
console.info("[Index engine] Creating products done")
await setTimeout(1000) await setTimeout(1000)
await dbConnection.raw('TRUNCATE TABLE "index_data";') await dbConnection.raw('TRUNCATE TABLE "index_data";')
await dbConnection.raw('TRUNCATE TABLE "index_relation";') await dbConnection.raw('TRUNCATE TABLE "index_relation";')
@@ -148,12 +137,9 @@ medusaIntegrationTestRunner({
// Prevent storage provider to be triggered though // Prevent storage provider to be triggered though
;(indexEngine as any).storageProvider_.onApplicationStart = jest.fn() ;(indexEngine as any).storageProvider_.onApplicationStart = jest.fn()
console.info("[Index engine] Triggering sync")
// Trigger a sync // Trigger a sync
await (indexEngine as any).onApplicationStart_() await (indexEngine as any).onApplicationStart_()
console.info("[Index engine] Sync done")
const { data: results } = await indexEngine.query<"product">({ const { data: results } = await indexEngine.query<"product">({
fields: [ fields: [
"product.*", "product.*",

View File

@@ -1,15 +1,15 @@
import { WorkflowTypes } from "@medusajs/framework/types"
import { import {
WorkflowData, WorkflowData,
WorkflowResponse, WorkflowResponse,
createWorkflow, createWorkflow,
transform, transform,
} from "@medusajs/framework/workflows-sdk" } from "@medusajs/framework/workflows-sdk"
import { WorkflowTypes } from "@medusajs/framework/types"
import { notifyOnFailureStep, sendNotificationsStep } from "../../notification" import { notifyOnFailureStep, sendNotificationsStep } from "../../notification"
import { import {
waitConfirmationProductImportStep,
groupProductsForBatchStep, groupProductsForBatchStep,
parseProductCsvStep, parseProductCsvStep,
waitConfirmationProductImportStep,
} from "../steps" } from "../steps"
import { batchProductsWorkflow } from "./batch-products" import { batchProductsWorkflow } from "./batch-products"
@@ -17,16 +17,16 @@ export const importProductsWorkflowId = "import-products"
/** /**
* This workflow starts a product import from a CSV file in the background. It's used by the * This workflow starts a product import from a CSV file in the background. It's used by the
* [Import Products Admin API Route](https://docs.medusajs.com/api/admin#products_postproductsimport). * [Import Products Admin API Route](https://docs.medusajs.com/api/admin#products_postproductsimport).
* *
* You can use this workflow within your custom workflows, allowing you to wrap custom logic around product import. * You can use this workflow within your custom workflows, allowing you to wrap custom logic around product import.
* For example, you can import products from another system. * For example, you can import products from another system.
* *
* The workflow only starts the import, but you'll have to confirm it using the [Workflow Engine](https://docs.medusajs.com/resources/architectural-modules/workflow-engine). * The workflow only starts the import, but you'll have to confirm it using the [Workflow Engine](https://docs.medusajs.com/resources/architectural-modules/workflow-engine).
* The below example shows how to confirm the import. * The below example shows how to confirm the import.
* *
* @example * @example
* To start the import of a CSV file: * To start the import of a CSV file:
* *
* ```ts * ```ts
* const { result, transaction: { transactionId } } = await importProductsWorkflow(container) * const { result, transaction: { transactionId } } = await importProductsWorkflow(container)
* .run({ * .run({
@@ -37,12 +37,12 @@ export const importProductsWorkflowId = "import-products"
* } * }
* }) * })
* ``` * ```
* *
* Notice that the workflow returns a `transaction.transactionId`. You'll use this ID to confirm the import afterwards. * Notice that the workflow returns a `transaction.transactionId`. You'll use this ID to confirm the import afterwards.
* *
* You confirm the import using the [Workflow Engine](https://docs.medusajs.com/resources/architectural-modules/workflow-engine). * You confirm the import using the [Workflow Engine](https://docs.medusajs.com/resources/architectural-modules/workflow-engine).
* For example, in an API route: * For example, in an API route:
* *
* ```ts workflow={false} * ```ts workflow={false}
* import { * import {
* AuthenticatedMedusaRequest, * AuthenticatedMedusaRequest,
@@ -55,7 +55,7 @@ export const importProductsWorkflowId = "import-products"
* import { IWorkflowEngineService } from "@medusajs/framework/types" * import { IWorkflowEngineService } from "@medusajs/framework/types"
* import { Modules, TransactionHandlerType } from "@medusajs/framework/utils" * import { Modules, TransactionHandlerType } from "@medusajs/framework/utils"
* import { StepResponse } from "@medusajs/framework/workflows-sdk" * import { StepResponse } from "@medusajs/framework/workflows-sdk"
* *
* export const POST = async ( * export const POST = async (
* req: AuthenticatedMedusaRequest, * req: AuthenticatedMedusaRequest,
* res: MedusaResponse * res: MedusaResponse
@@ -64,7 +64,7 @@ export const importProductsWorkflowId = "import-products"
* Modules.WORKFLOW_ENGINE * Modules.WORKFLOW_ENGINE
* ) * )
* const transactionId = req.params.transaction_id * const transactionId = req.params.transaction_id
* *
* await workflowEngineService.setStepSuccess({ * await workflowEngineService.setStepSuccess({
* idempotencyKey: { * idempotencyKey: {
* action: TransactionHandlerType.INVOKE, * action: TransactionHandlerType.INVOKE,
@@ -74,19 +74,19 @@ export const importProductsWorkflowId = "import-products"
* }, * },
* stepResponse: new StepResponse(true), * stepResponse: new StepResponse(true),
* }) * })
* *
* res.status(202).json({}) * res.status(202).json({})
* } * }
* ``` * ```
* *
* :::tip * :::tip
* *
* This example API route uses the same implementation as the [Confirm Product Import Admin API Route](https://docs.medusajs.com/api/admin#products_postproductsimporttransaction_idconfirm). * This example API route uses the same implementation as the [Confirm Product Import Admin API Route](https://docs.medusajs.com/api/admin#products_postproductsimporttransaction_idconfirm).
* *
* ::: * :::
* *
* @summary * @summary
* *
* Import products from a CSV file. * Import products from a CSV file.
*/ */
export const importProductsWorkflow = createWorkflow( export const importProductsWorkflow = createWorkflow(

View File

@@ -26,7 +26,6 @@ export const wrapHandler = <T extends RouteHandler | MiddlewareFunction>(
try { try {
return await fn(req, res, next) return await fn(req, res, next)
} catch (err) { } catch (err) {
console.log(err)
next(err) next(err)
} }
} }

View File

@@ -113,13 +113,21 @@ describe("Remote Link", function () {
}, },
]) ])
expect(ProductInventoryLinkModule.create).toBeCalledWith([ expect(ProductInventoryLinkModule.create).toBeCalledWith(
["var_123", "inv_123"], [
["var_abc", "inv_abc"], ["var_123", "inv_123"],
]) ["var_abc", "inv_abc"],
expect(InventoryStockLocationLink.create).toBeCalledWith([ ],
["ilev_123", "loc_123"], undefined,
]) undefined,
{}
)
expect(InventoryStockLocationLink.create).toBeCalledWith(
[["ilev_123", "loc_123"]],
undefined,
undefined,
{}
)
}) })
it("Should call delete in cascade all the modules involved in the link", async function () { it("Should call delete in cascade all the modules involved in the link", async function () {
@@ -167,18 +175,21 @@ describe("Remote Link", function () {
expect(ProductInventoryLinkModule.softDelete).toHaveBeenNthCalledWith( expect(ProductInventoryLinkModule.softDelete).toHaveBeenNthCalledWith(
1, 1,
{ variant_id: ["var_123"] }, { variant_id: ["var_123"] },
{ returnLinkableKeys: ["variant_id", "inventory_item_id"] } { returnLinkableKeys: ["variant_id", "inventory_item_id"] },
{}
) )
expect(ProductInventoryLinkModule.softDelete).toHaveBeenNthCalledWith( expect(ProductInventoryLinkModule.softDelete).toHaveBeenNthCalledWith(
2, 2,
{ variant_id: ["var_abc"] }, { variant_id: ["var_abc"] },
{ returnLinkableKeys: ["variant_id", "inventory_item_id"] } { returnLinkableKeys: ["variant_id", "inventory_item_id"] },
{}
) )
expect(ProductModule.softDelete).toBeCalledWith( expect(ProductModule.softDelete).toBeCalledWith(
{ id: ["var_123"] }, { id: ["var_123"] },
{ returnLinkableKeys: ["product_id", "variant_id"] } { returnLinkableKeys: ["product_id", "variant_id"] },
{}
) )
expect(InventoryModule.softDelete).toBeCalledWith( expect(InventoryModule.softDelete).toBeCalledWith(
@@ -189,14 +200,16 @@ describe("Remote Link", function () {
"inventory_level_id", "inventory_level_id",
"reservation_item_id", "reservation_item_id",
], ],
} },
{}
) )
expect(InventoryStockLocationLink.softDelete).toBeCalledWith( expect(InventoryStockLocationLink.softDelete).toBeCalledWith(
{ {
inventory_level_id: ["ilev_123"], inventory_level_id: ["ilev_123"],
}, },
{ returnLinkableKeys: ["inventory_level_id", "stock_location_id"] } { returnLinkableKeys: ["inventory_level_id", "stock_location_id"] },
{}
) )
}) })
}) })

View File

@@ -1,4 +1,5 @@
import { import {
Context,
ILinkModule, ILinkModule,
LinkDefinition, LinkDefinition,
LoadedModule, LoadedModule,
@@ -7,7 +8,9 @@ import {
import { import {
isObject, isObject,
MedusaContext,
MedusaError, MedusaError,
MedusaModuleType,
Modules, Modules,
promiseAll, promiseAll,
toPascalCase, toPascalCase,
@@ -55,6 +58,9 @@ type LinkDataConfig = {
} }
export class Link { export class Link {
// To not lose the context chain, we need to set the type to MedusaModuleType
static __type = MedusaModuleType
private modulesMap: Map<string, LoadedLinkModule> = new Map() private modulesMap: Map<string, LoadedLinkModule> = new Map()
private relationsPairs: Map<string, LoadedLinkModule> = new Map() private relationsPairs: Map<string, LoadedLinkModule> = new Map()
private relations: Map<string, Map<string, RemoteRelationship[]>> = new Map() private relations: Map<string, Map<string, RemoteRelationship[]>> = new Map()
@@ -171,7 +177,8 @@ export class Link {
private async executeCascade( private async executeCascade(
removedServices: DeleteEntityInput, removedServices: DeleteEntityInput,
executionMethod: "softDelete" | "restore" executionMethod: "softDelete" | "restore",
@MedusaContext() sharedContext: Context = {}
): Promise<[CascadeError[] | null, RemovedIds]> { ): Promise<[CascadeError[] | null, RemovedIds]> {
const removedIds: RemovedIds = {} const removedIds: RemovedIds = {}
const returnIdsList: RemovedIds = {} const returnIdsList: RemovedIds = {}
@@ -254,9 +261,13 @@ export class Link {
method += toPascalCase(args.methodSuffix) method += toPascalCase(args.methodSuffix)
} }
const removed = await service[method](cascadeDelKeys, { const removed = await service[method](
returnLinkableKeys: returnFields, cascadeDelKeys,
}) {
returnLinkableKeys: returnFields,
},
sharedContext
)
deletedEntities = removed as Record<string, string[]> deletedEntities = removed as Record<string, string[]>
} catch (error) { } catch (error) {
@@ -382,7 +393,10 @@ export class Link {
} }
} }
async create(link: LinkDefinition | LinkDefinition[]): Promise<unknown[]> { async create(
link: LinkDefinition | LinkDefinition[],
@MedusaContext() sharedContext: Context = {}
): Promise<unknown[]> {
const allLinks = Array.isArray(link) ? link : [link] const allLinks = Array.isArray(link) ? link : [link]
const serviceLinks = new Map< const serviceLinks = new Map<
string, string,
@@ -489,7 +503,8 @@ export class Link {
}, },
{ {
take: 1, take: 1,
} },
sharedContext
) )
if (existingLinks.length > 0) { if (existingLinks.length > 0) {
@@ -507,13 +522,18 @@ export class Link {
const promises: Promise<unknown[]>[] = [] const promises: Promise<unknown[]>[] = []
for (const [serviceName, data] of serviceLinks) { for (const [serviceName, data] of serviceLinks) {
const service = this.modulesMap.get(serviceName)! const service = this.modulesMap.get(serviceName)!
promises.push(service.create(data.linksToCreate)) promises.push(
service.create(data.linksToCreate, undefined, undefined, sharedContext)
)
} }
return (await promiseAll(promises)).flat() return (await promiseAll(promises)).flat()
} }
async dismiss(link: LinkDefinition | LinkDefinition[]): Promise<unknown[]> { async dismiss(
link: LinkDefinition | LinkDefinition[],
@MedusaContext() sharedContext: Context = {}
): Promise<unknown[]> {
const allLinks = Array.isArray(link) ? link : [link] const allLinks = Array.isArray(link) ? link : [link]
const serviceLinks = new Map<string, [string | string[], string][]>() const serviceLinks = new Map<string, [string | string[], string][]>()
@@ -541,27 +561,34 @@ export class Link {
for (const [serviceName, links] of serviceLinks) { for (const [serviceName, links] of serviceLinks) {
const service = this.modulesMap.get(serviceName)! const service = this.modulesMap.get(serviceName)!
promises.push(service.dismiss(links)) promises.push(service.dismiss(links, undefined, sharedContext))
} }
return (await promiseAll(promises)).flat() return (await promiseAll(promises)).flat()
} }
async delete( async delete(
removedServices: DeleteEntityInput removedServices: DeleteEntityInput,
@MedusaContext() sharedContext: Context = {}
): Promise<[CascadeError[] | null, RemovedIds]> { ): Promise<[CascadeError[] | null, RemovedIds]> {
return await this.executeCascade(removedServices, "softDelete") return await this.executeCascade(
removedServices,
"softDelete",
sharedContext
)
} }
async restore( async restore(
removedServices: DeleteEntityInput removedServices: DeleteEntityInput,
@MedusaContext() sharedContext: Context = {}
): Promise<[CascadeError[] | null, RestoredIds]> { ): Promise<[CascadeError[] | null, RestoredIds]> {
return await this.executeCascade(removedServices, "restore") return await this.executeCascade(removedServices, "restore", sharedContext)
} }
async list( async list(
link: LinkDefinition | LinkDefinition[], link: LinkDefinition | LinkDefinition[],
options?: { asLinkDefinition?: boolean } options?: { asLinkDefinition?: boolean },
@MedusaContext() sharedContext: Context = {}
): Promise<(object | LinkDefinition)[]> { ): Promise<(object | LinkDefinition)[]> {
const allLinks = Array.isArray(link) ? link : [link] const allLinks = Array.isArray(link) ? link : [link]
const serviceLinks = new Map<string, object[]>() const serviceLinks = new Map<string, object[]>()
@@ -587,7 +614,7 @@ export class Link {
promises.push( promises.push(
service service
.list({ $or: filters }) .list({ $or: filters }, {}, sharedContext)
.then((links: any[]) => .then((links: any[]) =>
options?.asLinkDefinition options?.asLinkDefinition
? convertRecordsToLinkDefinition(links, service) ? convertRecordsToLinkDefinition(links, service)

View File

@@ -62,13 +62,13 @@ describe("Transaction Orchestrator", () => {
definition: flow, definition: flow,
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler, handler,
{ payload: {
prop: 123, prop: 123,
} },
) })
await strategy.resume(transaction) await strategy.resume(transaction)
@@ -144,10 +144,10 @@ describe("Transaction Orchestrator", () => {
definition: flow, definition: flow,
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler handler,
) })
await strategy.resume(transaction) await strategy.resume(transaction)
expect(actionOrder).toEqual(["one", "two", "three", "four", "five", "six"]) expect(actionOrder).toEqual(["one", "two", "three", "four", "five", "six"])
@@ -216,10 +216,10 @@ describe("Transaction Orchestrator", () => {
definition: flow, definition: flow,
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler handler,
) })
await strategy.resume(transaction) await strategy.resume(transaction)
@@ -296,10 +296,10 @@ describe("Transaction Orchestrator", () => {
definition: flow, definition: flow,
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler handler,
) })
await strategy.resume(transaction) await strategy.resume(transaction)
expect(actionOrder).toEqual(["one", "two", "three"]) expect(actionOrder).toEqual(["one", "two", "three"])
@@ -376,11 +376,11 @@ describe("Transaction Orchestrator", () => {
definition: flow, definition: flow,
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler, handler,
{ prop: 123 } payload: { prop: 123 },
) })
await strategy.resume(transaction) await strategy.resume(transaction)
@@ -471,10 +471,10 @@ describe("Transaction Orchestrator", () => {
definition: flow, definition: flow,
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler handler,
) })
await strategy.resume(transaction) await strategy.resume(transaction)
const resposes = transaction.getContext() const resposes = transaction.getContext()
@@ -538,10 +538,10 @@ describe("Transaction Orchestrator", () => {
definition: flow, definition: flow,
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler handler,
) })
strategy.resume(transaction) strategy.resume(transaction)
@@ -611,10 +611,10 @@ describe("Transaction Orchestrator", () => {
definition: flow, definition: flow,
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler handler,
) })
await strategy.resume(transaction) await strategy.resume(transaction)
@@ -678,10 +678,10 @@ describe("Transaction Orchestrator", () => {
definition: flow, definition: flow,
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler handler,
) })
await strategy.resume(transaction) await strategy.resume(transaction)
@@ -736,10 +736,10 @@ describe("Transaction Orchestrator", () => {
definition: flow, definition: flow,
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler handler,
) })
await strategy.resume(transaction) await strategy.resume(transaction)
@@ -797,13 +797,13 @@ describe("Transaction Orchestrator", () => {
definition: flow, definition: flow,
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler, handler,
{ payload: {
myPayloadProp: "test", myPayloadProp: "test",
} },
) })
await strategy.resume(transaction) await strategy.resume(transaction)
@@ -818,11 +818,10 @@ describe("Transaction Orchestrator", () => {
"firstMethod", "firstMethod",
TransactionHandlerType.INVOKE TransactionHandlerType.INVOKE
) )
await strategy.registerStepSuccess( await strategy.registerStepSuccess({
mocktransactionId, responseIdempotencyKey: mocktransactionId,
undefined, transaction,
transaction })
)
expect(transaction.getState()).toBe(TransactionState.DONE) expect(transaction.getState()).toBe(TransactionState.DONE)
expect(transaction.getFlow().hasWaitingSteps).toBe(false) expect(transaction.getFlow().hasWaitingSteps).toBe(false)
@@ -883,10 +882,10 @@ describe("Transaction Orchestrator", () => {
definition: flow, definition: flow,
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler handler,
) })
const mocktransactionId = TransactionOrchestrator.getKeyName( const mocktransactionId = TransactionOrchestrator.getKeyName(
"transaction-name", "transaction-name",
@@ -909,7 +908,11 @@ describe("Transaction Orchestrator", () => {
expect(mocks.two).toHaveBeenCalledTimes(0) expect(mocks.two).toHaveBeenCalledTimes(0)
const registerBeforeAllowed = await strategy const registerBeforeAllowed = await strategy
.registerStepSuccess(mockSecondStepId, handler) .registerStepSuccess({
responseIdempotencyKey: mockSecondStepId,
handler,
transaction,
})
.catch((e) => e.message) .catch((e) => e.message)
expect(registerBeforeAllowed).toEqual( expect(registerBeforeAllowed).toEqual(
@@ -917,11 +920,11 @@ describe("Transaction Orchestrator", () => {
) )
expect(transaction.getState()).toBe(TransactionState.INVOKING) expect(transaction.getState()).toBe(TransactionState.INVOKING)
const resumedTransaction = await strategy.registerStepFailure( const resumedTransaction = await strategy.registerStepFailure({
mocktransactionId, responseIdempotencyKey: mocktransactionId,
null, handler,
handler transaction,
) })
expect(resumedTransaction.getState()).toBe(TransactionState.COMPENSATING) expect(resumedTransaction.getState()).toBe(TransactionState.COMPENSATING)
expect(mocks.compensateOne).toHaveBeenCalledTimes(1) expect(mocks.compensateOne).toHaveBeenCalledTimes(1)
@@ -932,13 +935,12 @@ describe("Transaction Orchestrator", () => {
"firstMethod", "firstMethod",
TransactionHandlerType.COMPENSATE TransactionHandlerType.COMPENSATE
) )
await strategy.registerStepSuccess( await strategy.registerStepSuccess({
mocktransactionIdCompensate, responseIdempotencyKey: mocktransactionIdCompensate,
undefined, transaction: resumedTransaction,
resumedTransaction })
)
expect(resumedTransaction.getState()).toBe(TransactionState.REVERTED) expect(transaction.getState()).toBe(TransactionState.REVERTED)
}) })
it("Should hold the status REVERTED if the steps failed and the compensation succeed and has some no compensations step set", async () => { it("Should hold the status REVERTED if the steps failed and the compensation succeed and has some no compensations step set", async () => {
@@ -1010,10 +1012,10 @@ describe("Transaction Orchestrator", () => {
definition: flow, definition: flow,
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler handler,
) })
await strategy.resume(transaction) await strategy.resume(transaction)
@@ -1082,10 +1084,10 @@ describe("Transaction Orchestrator", () => {
definition: flow, definition: flow,
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler handler,
) })
await strategy.resume(transaction) await strategy.resume(transaction)
@@ -1123,10 +1125,10 @@ describe("Transaction Orchestrator", () => {
}, },
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler handler,
) })
await strategy.resume(transaction) await strategy.resume(transaction)
@@ -1219,10 +1221,10 @@ describe("Transaction Orchestrator", () => {
}, },
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler handler,
) })
await strategy.resume(transaction) await strategy.resume(transaction)
@@ -1328,10 +1330,10 @@ describe("Transaction Orchestrator", () => {
definition: flow, definition: flow,
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler handler,
) })
await strategy.resume(transaction) await strategy.resume(transaction)
@@ -1448,10 +1450,10 @@ describe("Transaction Orchestrator", () => {
definition: flow, definition: flow,
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler handler,
) })
await strategy.resume(transaction) await strategy.resume(transaction)
@@ -1561,10 +1563,10 @@ describe("Transaction Orchestrator", () => {
}, },
}) })
const transaction = await strategy.beginTransaction( const transaction = await strategy.beginTransaction({
"transaction_id_123", transactionId: "transaction_id_123",
handler handler,
) })
await strategy.resume(transaction) await strategy.resume(transaction)

View File

@@ -92,4 +92,9 @@ export class SkipExecutionError extends Error {
error?.name === "SkipExecutionError" error?.name === "SkipExecutionError"
) )
} }
constructor(message?: string) {
super(message)
this.name = "SkipExecutionError"
}
} }

View File

@@ -512,10 +512,13 @@ export class TransactionOrchestrator extends EventEmitter {
} }
} }
private static async skipStep( private static async skipStep({
transaction: DistributedTransactionType, transaction,
step,
}: {
transaction: DistributedTransactionType
step: TransactionStep step: TransactionStep
): Promise<{ }): Promise<{
stopExecution: boolean stopExecution: boolean
}> { }> {
const hasStepTimedOut = const hasStepTimedOut =
@@ -721,72 +724,25 @@ export class TransactionOrchestrator extends EventEmitter {
const flow = transaction.getFlow() const flow = transaction.getFlow()
const nextSteps = await this.checkAllSteps(transaction) const nextSteps = await this.checkAllSteps(transaction)
const execution: Promise<void | unknown>[] = []
const hasTimedOut = await this.checkTransactionTimeout( if (await this.checkTransactionTimeout(transaction, nextSteps.current)) {
transaction,
nextSteps.current
)
if (hasTimedOut) {
continue continue
} }
if (nextSteps.remaining === 0) { if (nextSteps.remaining === 0) {
if (transaction.hasTimeout()) { await this.finalizeTransaction(transaction)
void transaction.clearTransactionTimeout() return
}
await transaction.saveCheckpoint()
this.emit(DistributedTransactionEvent.FINISH, { transaction })
} }
const execution: Promise<void | unknown>[] = []
for (const step of nextSteps.next) { for (const step of nextSteps.next) {
const curState = step.getStates() const { stopStepExecution } = this.prepareStepForExecution(step, flow)
const type = step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE
step.lastAttempt = Date.now() // Should stop the execution if next step cant be handled
step.attempts++ if (!stopStepExecution) {
continue
if (curState.state === TransactionStepState.NOT_STARTED) {
if (!step.startedAt) {
step.startedAt = Date.now()
}
if (step.isCompensating()) {
step.changeState(TransactionStepState.COMPENSATING)
if (step.definition.noCompensation) {
step.changeState(TransactionStepState.REVERTED)
continue
}
} else if (flow.state === TransactionState.INVOKING) {
step.changeState(TransactionStepState.INVOKING)
}
} }
step.changeStatus(TransactionStepStatus.WAITING)
const payload = new TransactionPayload(
{
model_id: flow.modelId,
idempotency_key: TransactionOrchestrator.getKeyName(
flow.modelId,
flow.transactionId,
step.definition.action!,
type
),
action: step.definition.action + "",
action_type: type,
attempt: step.attempts,
timestamp: Date.now(),
},
transaction.payload,
transaction.getContext()
)
if (step.hasTimeout() && !step.timedOutAt && step.attempts === 1) { if (step.hasTimeout() && !step.timedOutAt && step.attempts === 1) {
await transaction.scheduleStepTimeout(step, step.definition.timeout!) await transaction.scheduleStepTimeout(step, step.definition.timeout!)
} }
@@ -800,217 +756,368 @@ export class TransactionOrchestrator extends EventEmitter {
? step.definition.compensateAsync ? step.definition.compensateAsync
: step.definition.async : step.definition.async
const setStepFailure = async ( // Save checkpoint before executing step
error: Error | any, await transaction.saveCheckpoint().catch((error) => {
{ if (SkipExecutionError.isSkipExecutionError(error)) {
endRetry, continueExecution = false
response, return
}: {
endRetry?: boolean
response?: unknown
} = {}
) => {
if (isDefined(response) && step.saveResponse) {
transaction.addResponse(
step.definition.action!,
step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE,
response
)
} }
const ret = await TransactionOrchestrator.setStepFailure( throw error
transaction, })
step,
error,
endRetry ? 0 : step.definition.maxRetries
)
if (isAsync && !ret.stopExecution) { if (!continueExecution) {
await transaction.scheduleRetry(step, 0) break
}
return ret
} }
const traceData = { const promise = this.createStepExecutionPromise(transaction, step)
action: step.definition.action + "",
type,
step_id: step.id,
step_uuid: step.uuid + "",
attempts: step.attempts,
failures: step.failures,
async: !!(type === "invoke"
? step.definition.async
: step.definition.compensateAsync),
idempotency_key: payload.metadata.idempotency_key,
}
const handlerArgs = [
step.definition.action + "",
type,
payload,
transaction,
step,
this,
] as Parameters<TransactionStepHandler>
if (!isAsync) { if (!isAsync) {
const stepHandler = async () => {
return await transaction.handler(...handlerArgs)
}
let promise: Promise<unknown>
if (TransactionOrchestrator.traceStep) {
promise = TransactionOrchestrator.traceStep(stepHandler, traceData)
} else {
promise = stepHandler()
}
execution.push( execution.push(
promise this.executeSyncStep(promise, transaction, step, nextSteps)
.then(async (response: any) => {
if (this.hasExpired({ transaction, step }, Date.now())) {
await this.checkStepTimeout(transaction, step)
await this.checkTransactionTimeout(
transaction,
nextSteps.next.includes(step) ? nextSteps.next : [step]
)
}
const output = response?.__type ? response.output : response
if (SkipStepResponse.isSkipStepResponse(output)) {
await TransactionOrchestrator.skipStep(transaction, step)
return
}
await TransactionOrchestrator.setStepSuccess(
transaction,
step,
response
)
})
.catch(async (error) => {
const response = error?.getStepResponse?.()
if (this.hasExpired({ transaction, step }, Date.now())) {
await this.checkStepTimeout(transaction, step)
await this.checkTransactionTimeout(
transaction,
nextSteps.next.includes(step) ? nextSteps.next : [step]
)
}
if (
PermanentStepFailureError.isPermanentStepFailureError(error)
) {
await setStepFailure(error, {
endRetry: true,
response,
})
return
}
await setStepFailure(error, {
response,
})
})
) )
} else { } else {
const stepHandler = async () => {
return await transaction.handler(...handlerArgs)
}
execution.push( execution.push(
transaction.saveCheckpoint().then(() => { this.executeAsyncStep(promise, transaction, step, nextSteps)
let promise: Promise<unknown>
if (TransactionOrchestrator.traceStep) {
promise = TransactionOrchestrator.traceStep(
stepHandler,
traceData
)
} else {
promise = stepHandler()
}
promise
.then(async (response: any) => {
const output = response?.__type ? response.output : response
if (SkipStepResponse.isSkipStepResponse(output)) {
await TransactionOrchestrator.skipStep(transaction, step)
} else {
if (
!step.definition.backgroundExecution ||
step.definition.nested
) {
const eventName =
DistributedTransactionEvent.STEP_AWAITING
transaction.emit(eventName, { step, transaction })
return
}
if (this.hasExpired({ transaction, step }, Date.now())) {
await this.checkStepTimeout(transaction, step)
await this.checkTransactionTimeout(
transaction,
nextSteps.next.includes(step) ? nextSteps.next : [step]
)
}
await TransactionOrchestrator.setStepSuccess(
transaction,
step,
response
)
}
// check nested flow
await transaction.scheduleRetry(step, 0)
})
.catch(async (error) => {
const response = error?.getStepResponse?.()
if (
PermanentStepFailureError.isPermanentStepFailureError(error)
) {
await setStepFailure(error, {
endRetry: true,
response,
})
return
}
await setStepFailure(error, {
response,
})
})
})
) )
} }
} }
try {
await transaction.saveCheckpoint()
} catch (error) {
if (SkipExecutionError.isSkipExecutionError(error)) {
break
} else {
throw error
}
}
await promiseAll(execution) await promiseAll(execution)
if (nextSteps.next.length === 0) { if (nextSteps.next.length === 0) {
continueExecution = false continueExecution = false
} }
} }
// Recompute the current flow flags
await this.checkAllSteps(transaction)
await transaction.saveCheckpoint().catch((error) => {
if (!SkipExecutionError.isSkipExecutionError(error)) {
throw error
}
})
}
/**
* Finalize the transaction when all steps are complete
*/
private async finalizeTransaction(
transaction: DistributedTransactionType
): Promise<void> {
if (transaction.hasTimeout()) {
void transaction.clearTransactionTimeout()
}
await transaction.saveCheckpoint().catch((error) => {
if (!SkipExecutionError.isSkipExecutionError(error)) {
throw error
}
})
this.emit(DistributedTransactionEvent.FINISH, { transaction })
}
/**
* Prepare a step for execution by setting state and incrementing attempts
*/
private prepareStepForExecution(
step: TransactionStep,
flow: TransactionFlow
): { stopStepExecution: boolean } {
const curState = step.getStates()
step.lastAttempt = Date.now()
step.attempts++
if (curState.state === TransactionStepState.NOT_STARTED) {
if (!step.startedAt) {
step.startedAt = Date.now()
}
if (step.isCompensating()) {
step.changeState(TransactionStepState.COMPENSATING)
if (step.definition.noCompensation) {
step.changeState(TransactionStepState.REVERTED)
return { stopStepExecution: false }
}
} else if (flow.state === TransactionState.INVOKING) {
step.changeState(TransactionStepState.INVOKING)
}
}
step.changeStatus(TransactionStepStatus.WAITING)
return { stopStepExecution: true }
}
/**
* Create the payload for a step execution
*/
private createStepPayload(
transaction: DistributedTransactionType,
step: TransactionStep,
flow: TransactionFlow
): TransactionPayload {
const type = step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE
return new TransactionPayload(
{
model_id: flow.modelId,
idempotency_key: TransactionOrchestrator.getKeyName(
flow.modelId,
flow.transactionId,
step.definition.action!,
type
),
action: step.definition.action + "",
action_type: type,
attempt: step.attempts,
timestamp: Date.now(),
},
transaction.payload,
transaction.getContext()
)
}
/**
* Prepare handler arguments for step execution
*/
private prepareHandlerArgs(
transaction: DistributedTransactionType,
step: TransactionStep,
flow: TransactionFlow,
payload: TransactionPayload
): Parameters<TransactionStepHandler> {
const type = step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE
return [
step.definition.action + "",
type,
payload,
transaction,
step,
this,
] as Parameters<TransactionStepHandler>
}
/**
* Create the step execution promise with optional tracing
*/
private createStepExecutionPromise(
transaction: DistributedTransactionType,
step: TransactionStep
): () => Promise<any> {
const type = step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE
const handlerArgs = this.prepareHandlerArgs(
transaction,
step,
transaction.getFlow(),
this.createStepPayload(transaction, step, transaction.getFlow())
)
const traceData = {
action: step.definition.action + "",
type,
step_id: step.id,
step_uuid: step.uuid + "",
attempts: step.attempts,
failures: step.failures,
async: !!(type === "invoke"
? step.definition.async
: step.definition.compensateAsync),
idempotency_key: handlerArgs[2].metadata.idempotency_key,
}
const stepHandler = async () => {
return await transaction.handler(...handlerArgs)
}
// Return the appropriate promise based on tracing configuration
if (TransactionOrchestrator.traceStep) {
return () => TransactionOrchestrator.traceStep!(stepHandler, traceData)
} else {
return stepHandler
}
}
/**
* Execute a synchronous step and handle its result
*/
private executeSyncStep(
promiseFn: () => Promise<any>,
transaction: DistributedTransactionType,
step: TransactionStep,
nextSteps: { next: TransactionStep[] }
): Promise<void | unknown> {
return promiseFn()
.then(async (response: any) => {
await this.handleStepExpiration(transaction, step, nextSteps)
const output = response?.__type ? response.output : response
if (SkipStepResponse.isSkipStepResponse(output)) {
await TransactionOrchestrator.skipStep({
transaction,
step,
})
return
}
await this.handleStepSuccess(transaction, step, response)
})
.catch(async (error) => {
if (SkipExecutionError.isSkipExecutionError(error)) {
return
}
const response = error?.getStepResponse?.()
await this.handleStepExpiration(transaction, step, nextSteps)
if (PermanentStepFailureError.isPermanentStepFailureError(error)) {
await this.handleStepFailure(transaction, step, error, true, response)
return
}
await this.handleStepFailure(transaction, step, error, false, response)
})
}
/**
* Execute an asynchronous step and handle its result
*/
private executeAsyncStep(
promiseFn: () => Promise<any>,
transaction: DistributedTransactionType,
step: TransactionStep,
nextSteps: { next: TransactionStep[] }
): Promise<void | unknown> {
return promiseFn()
.then(async (response: any) => {
const output = response?.__type ? response.output : response
if (SkipStepResponse.isSkipStepResponse(output)) {
await TransactionOrchestrator.skipStep({
transaction,
step,
})
} else {
if (!step.definition.backgroundExecution || step.definition.nested) {
const eventName = DistributedTransactionEvent.STEP_AWAITING
transaction.emit(eventName, { step, transaction })
return
}
await this.handleStepExpiration(transaction, step, nextSteps)
await this.handleStepSuccess(transaction, step, response)
}
})
.catch(async (error) => {
if (SkipExecutionError.isSkipExecutionError(error)) {
return
}
const response = error?.getStepResponse?.()
if (PermanentStepFailureError.isPermanentStepFailureError(error)) {
await this.handleStepFailure(transaction, step, error, true, response)
return
}
await this.handleStepFailure(transaction, step, error, false, response)
})
}
/**
* Check if step or transaction has expired and handle timeouts
*/
private async handleStepExpiration(
transaction: DistributedTransactionType,
step: TransactionStep,
nextSteps: { next: TransactionStep[] }
): Promise<void> {
if (this.hasExpired({ transaction, step }, Date.now())) {
await this.checkStepTimeout(transaction, step)
await this.checkTransactionTimeout(
transaction,
nextSteps.next.includes(step) ? nextSteps.next : [step]
)
}
}
/**
* Handle successful step completion
*/
private async handleStepSuccess(
transaction: DistributedTransactionType,
step: TransactionStep,
response: unknown
): Promise<void> {
const isAsync = step.isCompensating()
? step.definition.compensateAsync
: step.definition.async
if (isDefined(response) && step.saveResponse) {
transaction.addResponse(
step.definition.action!,
step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE,
response
)
}
const ret = await TransactionOrchestrator.setStepSuccess(
transaction,
step,
response
)
if (isAsync && !ret.stopExecution) {
await transaction.scheduleRetry(step, 0)
}
}
/**
* Handle step failure
*/
private async handleStepFailure(
transaction: DistributedTransactionType,
step: TransactionStep,
error: Error | any,
isPermanent: boolean,
response?: unknown
): Promise<void> {
const isAsync = step.isCompensating()
? step.definition.compensateAsync
: step.definition.async
if (isDefined(response) && step.saveResponse) {
transaction.addResponse(
step.definition.action!,
step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE,
response
)
}
const ret = await TransactionOrchestrator.setStepFailure(
transaction,
step,
error,
isPermanent ? 0 : step.definition.maxRetries
)
if (isAsync && !ret.stopExecution) {
await transaction.scheduleRetry(step, 0)
}
} }
/** /**
@@ -1288,12 +1395,19 @@ export class TransactionOrchestrator extends EventEmitter {
* @param payload - payload to be passed to all the transaction steps * @param payload - payload to be passed to all the transaction steps
* @param flowMetadata - flow metadata which can include event group id for example * @param flowMetadata - flow metadata which can include event group id for example
*/ */
public async beginTransaction( public async beginTransaction({
transactionId: string, transactionId,
handler: TransactionStepHandler, handler,
payload?: unknown, payload,
flowMetadata,
onLoad,
}: {
transactionId: string
handler: TransactionStepHandler
payload?: unknown
flowMetadata?: TransactionFlow["metadata"] flowMetadata?: TransactionFlow["metadata"]
): Promise<DistributedTransactionType> { onLoad?: (transaction: DistributedTransactionType) => Promise<void> | void
}): Promise<DistributedTransactionType> {
const existingTransaction = const existingTransaction =
await TransactionOrchestrator.loadTransactionById(this.id, transactionId) await TransactionOrchestrator.loadTransactionById(this.id, transactionId)
@@ -1320,6 +1434,10 @@ export class TransactionOrchestrator extends EventEmitter {
) )
} }
if (onLoad) {
await onLoad(transaction)
}
return transaction return transaction
} }
@@ -1423,11 +1541,15 @@ export class TransactionOrchestrator extends EventEmitter {
* @param handler - The handler function to execute the step * @param handler - The handler function to execute the step
* @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey * @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey
*/ */
public async skipStep( public async skipStep({
responseIdempotencyKey: string, responseIdempotencyKey,
handler?: TransactionStepHandler, handler,
transaction,
}: {
responseIdempotencyKey: string
handler?: TransactionStepHandler
transaction?: DistributedTransactionType transaction?: DistributedTransactionType
): Promise<DistributedTransactionType> { }): Promise<DistributedTransactionType> {
const [curTransaction, step] = const [curTransaction, step] =
await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey( await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey, responseIdempotencyKey,
@@ -1440,7 +1562,10 @@ export class TransactionOrchestrator extends EventEmitter {
transaction: curTransaction, transaction: curTransaction,
}) })
await TransactionOrchestrator.skipStep(curTransaction, step) await TransactionOrchestrator.skipStep({
transaction: curTransaction,
step,
})
await this.executeNext(curTransaction) await this.executeNext(curTransaction)
} else { } else {
@@ -1459,12 +1584,19 @@ export class TransactionOrchestrator extends EventEmitter {
* @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey * @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey
* @param response - The response of the step * @param response - The response of the step
*/ */
public async registerStepSuccess( public async registerStepSuccess({
responseIdempotencyKey: string, responseIdempotencyKey,
handler?: TransactionStepHandler, handler,
transaction?: DistributedTransactionType, transaction,
response,
onLoad,
}: {
responseIdempotencyKey: string
handler?: TransactionStepHandler
transaction?: DistributedTransactionType
response?: unknown response?: unknown
): Promise<DistributedTransactionType> { onLoad?: (transaction: DistributedTransactionType) => Promise<void> | void
}): Promise<DistributedTransactionType> {
const [curTransaction, step] = const [curTransaction, step] =
await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey( await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey, responseIdempotencyKey,
@@ -1472,6 +1604,10 @@ export class TransactionOrchestrator extends EventEmitter {
transaction transaction
) )
if (onLoad) {
await onLoad(curTransaction)
}
if (step.getStates().status === TransactionStepStatus.WAITING) { if (step.getStates().status === TransactionStepStatus.WAITING) {
this.emit(DistributedTransactionEvent.RESUME, { this.emit(DistributedTransactionEvent.RESUME, {
transaction: curTransaction, transaction: curTransaction,
@@ -1502,12 +1638,19 @@ export class TransactionOrchestrator extends EventEmitter {
* @param transaction - The current transaction * @param transaction - The current transaction
* @param response - The response of the step * @param response - The response of the step
*/ */
public async registerStepFailure( public async registerStepFailure({
responseIdempotencyKey: string, responseIdempotencyKey,
error?: Error | any, error,
handler?: TransactionStepHandler, handler,
transaction,
onLoad,
}: {
responseIdempotencyKey: string
error?: Error | any
handler?: TransactionStepHandler
transaction?: DistributedTransactionType transaction?: DistributedTransactionType
): Promise<DistributedTransactionType> { onLoad?: (transaction: DistributedTransactionType) => Promise<void> | void
}): Promise<DistributedTransactionType> {
const [curTransaction, step] = const [curTransaction, step] =
await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey( await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey, responseIdempotencyKey,
@@ -1515,6 +1658,10 @@ export class TransactionOrchestrator extends EventEmitter {
transaction transaction
) )
if (onLoad) {
await onLoad(curTransaction)
}
if (step.getStates().status === TransactionStepStatus.WAITING) { if (step.getStates().status === TransactionStepStatus.WAITING) {
this.emit(DistributedTransactionEvent.RESUME, { this.emit(DistributedTransactionEvent.RESUME, {
transaction: curTransaction, transaction: curTransaction,

View File

@@ -52,11 +52,11 @@ export class GlobalWorkflow extends WorkflowManager {
const orchestrator = workflow.orchestrator const orchestrator = workflow.orchestrator
const transaction = await orchestrator.beginTransaction( const transaction = await orchestrator.beginTransaction({
uniqueTransactionId, transactionId: uniqueTransactionId,
workflow.handler(this.container, this.context), handler: workflow.handler(this.container, this.context),
input payload: input,
) })
if (this.subscribe.onStepBegin) { if (this.subscribe.onStepBegin) {
transaction.once("stepBegin", this.subscribe.onStepBegin) transaction.once("stepBegin", this.subscribe.onStepBegin)
@@ -104,12 +104,11 @@ export class GlobalWorkflow extends WorkflowManager {
} }
}) })
return await workflow.orchestrator.registerStepSuccess( return await workflow.orchestrator.registerStepSuccess({
idempotencyKey, responseIdempotencyKey: idempotencyKey,
workflow.handler(this.container, this.context), handler: workflow.handler(this.container, this.context),
undefined, response,
response })
)
} }
async registerStepFailure( async registerStepFailure(
@@ -137,10 +136,10 @@ export class GlobalWorkflow extends WorkflowManager {
} }
}) })
return await workflow.orchestrator.registerStepFailure( return await workflow.orchestrator.registerStepFailure({
idempotencyKey, responseIdempotencyKey: idempotencyKey,
error, error,
workflow.handler(this.container, this.context) handler: workflow.handler(this.container, this.context),
) })
} }
} }

View File

@@ -113,7 +113,7 @@ export class LocalWorkflow {
return target[prop] return target[prop]
} }
return async (...args) => { return (...args) => {
const ctxIndex = MedusaContext.getIndex(target, prop as string) const ctxIndex = MedusaContext.getIndex(target, prop as string)
const hasContext = args[ctxIndex!]?.__type === MedusaContextType const hasContext = args[ctxIndex!]?.__type === MedusaContextType
@@ -125,8 +125,12 @@ export class LocalWorkflow {
args[ctxIndex] = context args[ctxIndex] = context
} }
} else if (hasContext) {
args[ctxIndex!].eventGroupId ??= this_.medusaContext?.eventGroupId
} }
return await target[prop].apply(target, [...args])
const method = target[prop]
return method.apply(target, [...args])
} }
}, },
}) })
@@ -355,12 +359,18 @@ export class LocalWorkflow {
this.medusaContext = context this.medusaContext = context
const { handler, orchestrator } = this.workflow const { handler, orchestrator } = this.workflow
const transaction = await orchestrator.beginTransaction( const transaction = await orchestrator.beginTransaction({
uniqueTransactionId, transactionId: uniqueTransactionId,
handler(this.container_, context), handler: handler(this.container_, context),
input, payload: input,
flowMetadata flowMetadata,
) onLoad: (transaction) => {
if (this.medusaContext) {
this.medusaContext.eventGroupId =
transaction.getFlow().metadata?.eventGroupId
}
},
})
const { cleanUpEventListeners } = this.registerEventCallbacks({ const { cleanUpEventListeners } = this.registerEventCallbacks({
orchestrator, orchestrator,
@@ -402,6 +412,11 @@ export class LocalWorkflow {
? await this.getRunningTransaction(transactionOrTransactionId, context) ? await this.getRunningTransaction(transactionOrTransactionId, context)
: transactionOrTransactionId : transactionOrTransactionId
if (this.medusaContext) {
this.medusaContext.eventGroupId =
transaction.getFlow().metadata?.eventGroupId
}
const { cleanUpEventListeners } = this.registerEventCallbacks({ const { cleanUpEventListeners } = this.registerEventCallbacks({
orchestrator, orchestrator,
transaction, transaction,
@@ -432,12 +447,17 @@ export class LocalWorkflow {
subscribe, subscribe,
}) })
const transaction = await orchestrator.registerStepSuccess( const transaction = await orchestrator.registerStepSuccess({
idempotencyKey, responseIdempotencyKey: idempotencyKey,
handler(this.container_, context), handler: handler(this.container_, context),
undefined, response,
response onLoad: (transaction) => {
) if (this.medusaContext) {
this.medusaContext.eventGroupId =
transaction.getFlow().metadata?.eventGroupId
}
},
})
try { try {
return transaction return transaction
@@ -461,11 +481,17 @@ export class LocalWorkflow {
subscribe, subscribe,
}) })
const transaction = await orchestrator.registerStepFailure( const transaction = await orchestrator.registerStepFailure({
idempotencyKey, responseIdempotencyKey: idempotencyKey,
error, error,
handler(this.container_, context) handler: handler(this.container_, context),
) onLoad: (transaction) => {
if (this.medusaContext) {
this.medusaContext.eventGroupId =
transaction.getFlow().metadata?.eventGroupId
}
},
})
try { try {
return transaction return transaction

View File

@@ -22,6 +22,7 @@ export interface ILinkModule extends IModuleService {
| string[] | string[]
| [string | string[], string, Record<string, unknown>?][], | [string | string[], string, Record<string, unknown>?][],
foreignKeyData?: string, foreignKeyData?: string,
extraFields?: Record<string, unknown>,
sharedContext?: Context sharedContext?: Context
): Promise<unknown[]> ): Promise<unknown[]>

View File

@@ -43,7 +43,7 @@ export class MessageAggregator implements IMessageAggregator {
object: message.object, object: message.object,
action: message.action, action: message.action,
options, options,
context: sharedContext, context: message.context ?? sharedContext,
}) })
}) })
this.save(composedMessages) this.save(composedMessages)

View File

@@ -28,6 +28,7 @@ export function InjectManager(managerProperty?: string): MethodDecorator {
} }
Object.defineProperty(copiedContext, key, { Object.defineProperty(copiedContext, key, {
enumerable: true,
get: function () { get: function () {
return originalContext[key] return originalContext[key]
}, },

View File

@@ -22,10 +22,9 @@ export function InjectTransactionManager(
const argIndex = target.MedusaContextIndex_[propertyKey] const argIndex = target.MedusaContextIndex_[propertyKey]
descriptor.value = async function (...args: any[]) { descriptor.value = async function (...args: any[]) {
const context: Context = args[argIndex] ?? {}
const originalContext = args[argIndex] ?? {} const originalContext = args[argIndex] ?? {}
if (context?.transactionManager) { if (originalContext?.transactionManager) {
return await originalMethod.apply(this, args) return await originalMethod.apply(this, args)
} }
@@ -41,6 +40,7 @@ export function InjectTransactionManager(
} }
Object.defineProperty(copiedContext, key, { Object.defineProperty(copiedContext, key, {
enumerable: true,
get: function () { get: function () {
return originalContext[key] return originalContext[key]
}, },
@@ -63,10 +63,10 @@ export function InjectTransactionManager(
return await originalMethod.apply(this, args) return await originalMethod.apply(this, args)
}, },
{ {
transaction: context?.transactionManager, transaction: originalContext?.transactionManager,
isolationLevel: (context as Context)?.isolationLevel, isolationLevel: (originalContext as Context)?.isolationLevel,
enableNestedTransactions: enableNestedTransactions:
(context as Context).enableNestedTransactions ?? false, (originalContext as Context).enableNestedTransactions ?? false,
} }
) )
} }

View File

@@ -198,11 +198,31 @@ export const dbTestUtilFactory = (): any => ({
FROM information_schema.tables FROM information_schema.tables
WHERE table_schema = '${schema}';`) WHERE table_schema = '${schema}';`)
const skipIndexPartitionPrefix = "cat_"
const mainPartitionTables = ["index_data", "index_relation"]
let hasIndexTables = false
for (const { table_name } of tableNames) { for (const { table_name } of tableNames) {
if (mainPartitionTables.includes(table_name)) {
hasIndexTables = true
}
// Skipping index partition tables.
if (
table_name.startsWith(skipIndexPartitionPrefix) ||
mainPartitionTables.includes(table_name)
) {
continue
}
await runRawQuery(`DELETE await runRawQuery(`DELETE
FROM ${schema}."${table_name}";`) FROM ${schema}."${table_name}";`)
} }
if (hasIndexTables) {
await runRawQuery(`TRUNCATE TABLE ${schema}.index_data;`)
await runRawQuery(`TRUNCATE TABLE ${schema}.index_relation;`)
}
await runRawQuery(`SET session_replication_role = 'origin';`) await runRawQuery(`SET session_replication_role = 'origin';`)
}, },

View File

@@ -82,13 +82,14 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
// This is useful in the event of a distributed transaction where you'd want to emit // This is useful in the event of a distributed transaction where you'd want to emit
// events only once the transaction ends. // events only once the transaction ends.
private async groupOrEmitEvent<T = unknown>(eventData: Message<T>) { private async groupOrEmitEvent<T = unknown>(eventData: Message<T>) {
const { options, ...eventBody } = eventData const eventData_ = JSON.parse(JSON.stringify(eventData))
const { options, ...eventBody } = eventData_
const eventGroupId = eventBody.metadata?.eventGroupId const eventGroupId = eventBody.metadata?.eventGroupId
if (eventGroupId) { if (eventGroupId) {
await this.groupEvent(eventGroupId, eventData) await this.groupEvent(eventGroupId, eventData_)
} else { } else {
const { options, ...eventBody } = eventData const { options, ...eventBody } = eventData_
const options_ = options as { delay: number } const options_ = options as { delay: number }
const delay = (ms?: number) => (ms ? setTimeout(ms) : Promise.resolve()) const delay = (ms?: number) => (ms ? setTimeout(ms) : Promise.resolve())
@@ -112,7 +113,8 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
} }
async releaseGroupedEvents(eventGroupId: string) { async releaseGroupedEvents(eventGroupId: string) {
const groupedEvents = this.groupedEventsMap_.get(eventGroupId) || [] let groupedEvents = this.groupedEventsMap_.get(eventGroupId) || []
groupedEvents = JSON.parse(JSON.stringify(groupedEvents))
for (const event of groupedEvents) { for (const event of groupedEvents) {
const { options, ...eventBody } = event const { options, ...eventBody } = event

View File

@@ -686,7 +686,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
}) })
let relationsToUpsert: any[] = [] let relationsToUpsert: any[] = []
const entitiesToUpdate = cleanedData.map((entityData) => { const entitiesToUpsert = cleanedData.map((entityData) => {
relationsToUpsert.push( relationsToUpsert.push(
{ {
parent_id: entityData[parentPropertyId] as string, parent_id: entityData[parentPropertyId] as string,
@@ -714,8 +714,8 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
} }
}) })
if (entitiesToUpdate.length) { if (entitiesToUpsert.length) {
await indexRepository.upsertMany(entitiesToUpdate, { await indexRepository.upsertMany(entitiesToUpsert, {
onConflictAction: "merge", onConflictAction: "merge",
onConflictFields: ["id", "name"], onConflictFields: ["id", "name"],
onConflictMergeFields: ["data", "staled_at"], onConflictMergeFields: ["data", "staled_at"],

View File

@@ -35,8 +35,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
}, },
}, },
testSuite: ({ service: workflowOrcModule, medusaApp }) => { testSuite: ({ service: workflowOrcModule, medusaApp }) => {
// TODO: Debug the issue with this test https://github.com/medusajs/medusa/actions/runs/13900190144/job/38897122803#step:5:5616 describe("Testing race condition of the workflow during retry", () => {
describe.skip("Testing race condition of the workflow during retry", () => {
it("should prevent race continuation of the workflow during retryIntervalAwaiting in background execution", (done) => { it("should prevent race continuation of the workflow during retryIntervalAwaiting in background execution", (done) => {
const transactionId = "transaction_id" const transactionId = "transaction_id"