diff --git a/.changeset/perfect-dolphins-attend.md b/.changeset/perfect-dolphins-attend.md new file mode 100644 index 0000000000..716d808e89 --- /dev/null +++ b/.changeset/perfect-dolphins-attend.md @@ -0,0 +1,12 @@ +--- +"@medusajs/event-bus-local": patch +"@medusajs/orchestration": patch +"@medusajs/test-utils": patch +"@medusajs/modules-sdk": patch +"@medusajs/framework": patch +"@medusajs/index": patch +"@medusajs/types": patch +"@medusajs/utils": patch +--- + +fix: event group propagation and event managements diff --git a/integration-tests/http/__tests__/product/admin/product-import.spec.ts b/integration-tests/http/__tests__/product/admin/product-import.spec.ts index e8a2888a49..7ba3772f28 100644 --- a/integration-tests/http/__tests__/product/admin/product-import.spec.ts +++ b/integration-tests/http/__tests__/product/admin/product-import.spec.ts @@ -1,11 +1,11 @@ -import { IEventBusModuleService } from "@medusajs/types" -import { CommonEvents, Modules } from "@medusajs/utils" -import FormData from "form-data" -import fs from "fs/promises" import { medusaIntegrationTestRunner, TestEventUtils, } from "@medusajs/test-utils" +import { IEventBusModuleService } from "@medusajs/types" +import { CommonEvents, Modules } from "@medusajs/utils" +import FormData from "form-data" +import fs from "fs/promises" import path from "path" import { adminHeaders, diff --git a/integration-tests/modules/__tests__/index/query-index.spec.ts b/integration-tests/modules/__tests__/index/query-index.spec.ts index fa7a5fdcc1..c778348652 100644 --- a/integration-tests/modules/__tests__/index/query-index.spec.ts +++ b/integration-tests/modules/__tests__/index/query-index.spec.ts @@ -74,13 +74,9 @@ async function populateData(api: any) { }, ] - await api - .post("/admin/products/batch", { create: payload }, adminHeaders) - .catch((err) => { - console.log(err) - }) + await api.post("/admin/products/batch", { create: payload }, adminHeaders) - await setTimeout(10000) + await setTimeout(4000) } process.env.ENABLE_INDEX_MODULE = "true" @@ -97,7 +93,7 @@ medusaIntegrationTestRunner({ process.env.ENABLE_INDEX_MODULE = "false" }) - describe.skip("Index engine - Query.index", () => { + describe("Index engine - Query.index", () => { beforeEach(async () => { await createAdminUser(dbConnection, adminHeaders, appContainer) }) @@ -262,8 +258,7 @@ medusaIntegrationTestRunner({ ]) }) - // TODO: Investigate why this test is flacky - it.skip("should use query.index to query the index module sorting by price desc", async () => { + it("should use query.index to query the index module sorting by price desc", async () => { await populateData(api) const query = appContainer.resolve( diff --git a/integration-tests/modules/__tests__/index/search.spec.ts b/integration-tests/modules/__tests__/index/search.spec.ts index deb67d4b27..30b0f35f6e 100644 --- a/integration-tests/modules/__tests__/index/search.spec.ts +++ b/integration-tests/modules/__tests__/index/search.spec.ts @@ -12,12 +12,58 @@ jest.setTimeout(100000) process.env.ENABLE_INDEX_MODULE = "true" +async function populateData( + api: any, + { + productCount = 50, + variantCount = 10, + priceCount = 10, + }: { + productCount?: number + variantCount?: number + priceCount?: number + } = {} +) { + const shippingProfile = ( + await api.post( + `/admin/shipping-profiles`, + { name: "Test", type: "default" }, + adminHeaders + ) + ).data.shipping_profile + + const payloads = new Array(productCount).fill(0).map((_, a) => ({ + title: "Test Giftcard-" + a, + is_giftcard: true, + shipping_profile_id: shippingProfile.id, + description: "test-giftcard-description" + a, + options: [{ title: "Denominations", values: ["100"] }], + variants: new Array(variantCount).fill(0).map((_, i) => ({ + title: `Test variant ${i}`, + sku: `test-variant-${i}${a}`, + prices: new Array(priceCount).fill(0).map((_, j) => ({ + currency_code: Object.values(defaultCurrencies)[j].code, + amount: 10 * j, + })), + options: { + Denominations: "100", + }, + })), + })) + + for (const payload of payloads) { + await api.post("/admin/products", payload, adminHeaders) + } + + await setTimeout(4000 * (productCount / 10)) +} + medusaIntegrationTestRunner({ testSuite: ({ getContainer, dbConnection, api, dbConfig }) => { let indexEngine: IndexTypes.IIndexService let appContainer - beforeAll(() => { + beforeAll(async () => { appContainer = getContainer() indexEngine = appContainer.resolve(Modules.INDEX) }) @@ -30,43 +76,13 @@ medusaIntegrationTestRunner({ await createAdminUser(dbConnection, adminHeaders, appContainer) }) - describe.skip("Index engine", () => { + describe("Index engine", () => { it("should search through the indexed data and return the correct results ordered and filtered [1]", async () => { - const shippingProfile = ( - await api.post( - `/admin/shipping-profiles`, - { name: "Test", type: "default" }, - adminHeaders - ) - ).data.shipping_profile - - const payload = { - title: "Test Giftcard", - is_giftcard: true, - shipping_profile_id: shippingProfile.id, - description: "test-giftcard-description", - options: [{ title: "Denominations", values: ["100"] }], - variants: new Array(10).fill(0).map((_, i) => ({ - title: `Test variant ${i}`, - sku: `test-variant-${i}`, - prices: new Array(10).fill(0).map((_, j) => ({ - currency_code: Object.values(defaultCurrencies)[j].code, - amount: 10 * j, - })), - options: { - Denominations: "100", - }, - })), - } - - await api - .post("/admin/products", payload, adminHeaders) - .catch((err) => { - console.log(err) - }) - - // Timeout to allow indexing to finish - await setTimeout(4000) + await populateData(api, { + productCount: 1, + variantCount: 10, + priceCount: 10, + }) const { data: results } = await fetchAndRetry( async () => @@ -119,41 +135,11 @@ medusaIntegrationTestRunner({ }) it("should search through the indexed data and return the correct results ordered and filtered [2]", async () => { - const shippingProfile = ( - await api.post( - `/admin/shipping-profiles`, - { name: "Test", type: "default" }, - adminHeaders - ) - ).data.shipping_profile - - const payload = { - title: "Test Giftcard", - is_giftcard: true, - description: "test-giftcard-description", - shipping_profile_id: shippingProfile.id, - options: [{ title: "Denominations", values: ["100"] }], - variants: new Array(10).fill(0).map((_, i) => ({ - title: `Test variant ${i}`, - sku: `test-variant-${i}`, - prices: new Array(10).fill(0).map((_, j) => ({ - currency_code: Object.values(defaultCurrencies)[j].code, - amount: 10 * j, - })), - options: { - Denominations: "100", - }, - })), - } - - await api - .post("/admin/products", payload, adminHeaders) - .catch((err) => { - console.log(err) - }) - - // Timeout to allow indexing to finish - await setTimeout(10000) + await populateData(api, { + productCount: 1, + variantCount: 10, + priceCount: 10, + }) const { data: results } = await fetchAndRetry( async () => @@ -205,43 +191,8 @@ medusaIntegrationTestRunner({ } }) - it.skip("should search through the indexed data and return the correct results ordered and filtered [3]", async () => { - const shippingProfile = ( - await api.post( - `/admin/shipping-profiles`, - { name: "Test", type: "default" }, - adminHeaders - ) - ).data.shipping_profile - - const payloads = new Array(50).fill(0).map((_, a) => ({ - title: "Test Giftcard-" + a, - is_giftcard: true, - shipping_profile_id: shippingProfile.id, - description: "test-giftcard-description" + a, - options: [{ title: "Denominations", values: ["100"] }], - variants: new Array(10).fill(0).map((_, i) => ({ - title: `Test variant ${i}`, - sku: `test-variant-${i}${a}`, - prices: new Array(10).fill(0).map((_, j) => ({ - currency_code: Object.values(defaultCurrencies)[j].code, - amount: 10 * j, - })), - options: { - Denominations: "100", - }, - })), - })) - - let i = 0 - for (const payload of payloads) { - ++i - await api.post("/admin/products", payload, adminHeaders).then(() => { - console.log(`Created ${i} products in ${payloads.length} payloads`) - }) - } - - await setTimeout(5000) + it("should search through the indexed data and return the correct results ordered and filtered [3]", async () => { + await populateData(api) const queryArgs = { fields: [ diff --git a/integration-tests/modules/__tests__/index/sync.spec.ts b/integration-tests/modules/__tests__/index/sync.spec.ts index c6072f8856..e1a2dd382a 100644 --- a/integration-tests/modules/__tests__/index/sync.spec.ts +++ b/integration-tests/modules/__tests__/index/sync.spec.ts @@ -66,16 +66,12 @@ medusaIntegrationTestRunner({ describe("Index engine syncing", () => { it("should sync the data to the index based on the indexation configuration", async () => { - console.info("[Index engine] Creating products") - await populateData(api, { productCount: 2, variantCount: 2, priceCount: 2, }) - console.info("[Index engine] Creating products done") - await setTimeout(1000) await dbConnection.raw('TRUNCATE TABLE "index_data";') await dbConnection.raw('TRUNCATE TABLE "index_relation";') @@ -96,12 +92,9 @@ medusaIntegrationTestRunner({ // Prevent storage provider to be triggered though ;(indexEngine as any).storageProvider_.onApplicationStart = jest.fn() - console.info("[Index engine] Triggering sync") // Trigger a sync await (indexEngine as any).onApplicationStart_() - console.info("[Index engine] Sync done") - // 28 ms - 6511 records const { data: results } = await indexEngine.query<"product">({ fields: [ @@ -122,12 +115,8 @@ medusaIntegrationTestRunner({ }) it("should sync the data to the index based on the updated indexation configuration", async () => { - console.info("[Index engine] Creating products") - await populateData(api) - console.info("[Index engine] Creating products done") - await setTimeout(1000) await dbConnection.raw('TRUNCATE TABLE "index_data";') await dbConnection.raw('TRUNCATE TABLE "index_relation";') @@ -148,12 +137,9 @@ medusaIntegrationTestRunner({ // Prevent storage provider to be triggered though ;(indexEngine as any).storageProvider_.onApplicationStart = jest.fn() - console.info("[Index engine] Triggering sync") // Trigger a sync await (indexEngine as any).onApplicationStart_() - console.info("[Index engine] Sync done") - const { data: results } = await indexEngine.query<"product">({ fields: [ "product.*", diff --git a/packages/core/core-flows/src/product/workflows/import-products.ts b/packages/core/core-flows/src/product/workflows/import-products.ts index 8d887d9058..1c152c9452 100644 --- a/packages/core/core-flows/src/product/workflows/import-products.ts +++ b/packages/core/core-flows/src/product/workflows/import-products.ts @@ -1,15 +1,15 @@ +import { WorkflowTypes } from "@medusajs/framework/types" import { WorkflowData, WorkflowResponse, createWorkflow, transform, } from "@medusajs/framework/workflows-sdk" -import { WorkflowTypes } from "@medusajs/framework/types" import { notifyOnFailureStep, sendNotificationsStep } from "../../notification" import { - waitConfirmationProductImportStep, groupProductsForBatchStep, parseProductCsvStep, + waitConfirmationProductImportStep, } from "../steps" import { batchProductsWorkflow } from "./batch-products" @@ -17,16 +17,16 @@ export const importProductsWorkflowId = "import-products" /** * This workflow starts a product import from a CSV file in the background. It's used by the * [Import Products Admin API Route](https://docs.medusajs.com/api/admin#products_postproductsimport). - * + * * You can use this workflow within your custom workflows, allowing you to wrap custom logic around product import. * For example, you can import products from another system. - * + * * The workflow only starts the import, but you'll have to confirm it using the [Workflow Engine](https://docs.medusajs.com/resources/architectural-modules/workflow-engine). * The below example shows how to confirm the import. - * + * * @example * To start the import of a CSV file: - * + * * ```ts * const { result, transaction: { transactionId } } = await importProductsWorkflow(container) * .run({ @@ -37,12 +37,12 @@ export const importProductsWorkflowId = "import-products" * } * }) * ``` - * + * * Notice that the workflow returns a `transaction.transactionId`. You'll use this ID to confirm the import afterwards. - * + * * You confirm the import using the [Workflow Engine](https://docs.medusajs.com/resources/architectural-modules/workflow-engine). * For example, in an API route: - * + * * ```ts workflow={false} * import { * AuthenticatedMedusaRequest, @@ -55,7 +55,7 @@ export const importProductsWorkflowId = "import-products" * import { IWorkflowEngineService } from "@medusajs/framework/types" * import { Modules, TransactionHandlerType } from "@medusajs/framework/utils" * import { StepResponse } from "@medusajs/framework/workflows-sdk" - * + * * export const POST = async ( * req: AuthenticatedMedusaRequest, * res: MedusaResponse @@ -64,7 +64,7 @@ export const importProductsWorkflowId = "import-products" * Modules.WORKFLOW_ENGINE * ) * const transactionId = req.params.transaction_id - * + * * await workflowEngineService.setStepSuccess({ * idempotencyKey: { * action: TransactionHandlerType.INVOKE, @@ -74,19 +74,19 @@ export const importProductsWorkflowId = "import-products" * }, * stepResponse: new StepResponse(true), * }) - * + * * res.status(202).json({}) * } * ``` - * + * * :::tip - * + * * This example API route uses the same implementation as the [Confirm Product Import Admin API Route](https://docs.medusajs.com/api/admin#products_postproductsimporttransaction_idconfirm). - * + * * ::: - * + * * @summary - * + * * Import products from a CSV file. */ export const importProductsWorkflow = createWorkflow( diff --git a/packages/core/framework/src/http/utils/wrap-handler.ts b/packages/core/framework/src/http/utils/wrap-handler.ts index 420bd3bd0e..0cc0d39467 100644 --- a/packages/core/framework/src/http/utils/wrap-handler.ts +++ b/packages/core/framework/src/http/utils/wrap-handler.ts @@ -26,7 +26,6 @@ export const wrapHandler = ( try { return await fn(req, res, next) } catch (err) { - console.log(err) next(err) } } diff --git a/packages/core/modules-sdk/src/__tests__/remote-link.spec.ts b/packages/core/modules-sdk/src/__tests__/remote-link.spec.ts index 788c1d0a8c..34aad4a62c 100644 --- a/packages/core/modules-sdk/src/__tests__/remote-link.spec.ts +++ b/packages/core/modules-sdk/src/__tests__/remote-link.spec.ts @@ -113,13 +113,21 @@ describe("Remote Link", function () { }, ]) - expect(ProductInventoryLinkModule.create).toBeCalledWith([ - ["var_123", "inv_123"], - ["var_abc", "inv_abc"], - ]) - expect(InventoryStockLocationLink.create).toBeCalledWith([ - ["ilev_123", "loc_123"], - ]) + expect(ProductInventoryLinkModule.create).toBeCalledWith( + [ + ["var_123", "inv_123"], + ["var_abc", "inv_abc"], + ], + undefined, + undefined, + {} + ) + expect(InventoryStockLocationLink.create).toBeCalledWith( + [["ilev_123", "loc_123"]], + undefined, + undefined, + {} + ) }) it("Should call delete in cascade all the modules involved in the link", async function () { @@ -167,18 +175,21 @@ describe("Remote Link", function () { expect(ProductInventoryLinkModule.softDelete).toHaveBeenNthCalledWith( 1, { variant_id: ["var_123"] }, - { returnLinkableKeys: ["variant_id", "inventory_item_id"] } + { returnLinkableKeys: ["variant_id", "inventory_item_id"] }, + {} ) expect(ProductInventoryLinkModule.softDelete).toHaveBeenNthCalledWith( 2, { variant_id: ["var_abc"] }, - { returnLinkableKeys: ["variant_id", "inventory_item_id"] } + { returnLinkableKeys: ["variant_id", "inventory_item_id"] }, + {} ) expect(ProductModule.softDelete).toBeCalledWith( { id: ["var_123"] }, - { returnLinkableKeys: ["product_id", "variant_id"] } + { returnLinkableKeys: ["product_id", "variant_id"] }, + {} ) expect(InventoryModule.softDelete).toBeCalledWith( @@ -189,14 +200,16 @@ describe("Remote Link", function () { "inventory_level_id", "reservation_item_id", ], - } + }, + {} ) expect(InventoryStockLocationLink.softDelete).toBeCalledWith( { inventory_level_id: ["ilev_123"], }, - { returnLinkableKeys: ["inventory_level_id", "stock_location_id"] } + { returnLinkableKeys: ["inventory_level_id", "stock_location_id"] }, + {} ) }) }) diff --git a/packages/core/modules-sdk/src/link.ts b/packages/core/modules-sdk/src/link.ts index 480e00d6e9..54498f1f62 100644 --- a/packages/core/modules-sdk/src/link.ts +++ b/packages/core/modules-sdk/src/link.ts @@ -1,4 +1,5 @@ import { + Context, ILinkModule, LinkDefinition, LoadedModule, @@ -7,7 +8,9 @@ import { import { isObject, + MedusaContext, MedusaError, + MedusaModuleType, Modules, promiseAll, toPascalCase, @@ -55,6 +58,9 @@ type LinkDataConfig = { } export class Link { + // To not lose the context chain, we need to set the type to MedusaModuleType + static __type = MedusaModuleType + private modulesMap: Map = new Map() private relationsPairs: Map = new Map() private relations: Map> = new Map() @@ -171,7 +177,8 @@ export class Link { private async executeCascade( removedServices: DeleteEntityInput, - executionMethod: "softDelete" | "restore" + executionMethod: "softDelete" | "restore", + @MedusaContext() sharedContext: Context = {} ): Promise<[CascadeError[] | null, RemovedIds]> { const removedIds: RemovedIds = {} const returnIdsList: RemovedIds = {} @@ -254,9 +261,13 @@ export class Link { method += toPascalCase(args.methodSuffix) } - const removed = await service[method](cascadeDelKeys, { - returnLinkableKeys: returnFields, - }) + const removed = await service[method]( + cascadeDelKeys, + { + returnLinkableKeys: returnFields, + }, + sharedContext + ) deletedEntities = removed as Record } catch (error) { @@ -382,7 +393,10 @@ export class Link { } } - async create(link: LinkDefinition | LinkDefinition[]): Promise { + async create( + link: LinkDefinition | LinkDefinition[], + @MedusaContext() sharedContext: Context = {} + ): Promise { const allLinks = Array.isArray(link) ? link : [link] const serviceLinks = new Map< string, @@ -489,7 +503,8 @@ export class Link { }, { take: 1, - } + }, + sharedContext ) if (existingLinks.length > 0) { @@ -507,13 +522,18 @@ export class Link { const promises: Promise[] = [] for (const [serviceName, data] of serviceLinks) { const service = this.modulesMap.get(serviceName)! - promises.push(service.create(data.linksToCreate)) + promises.push( + service.create(data.linksToCreate, undefined, undefined, sharedContext) + ) } return (await promiseAll(promises)).flat() } - async dismiss(link: LinkDefinition | LinkDefinition[]): Promise { + async dismiss( + link: LinkDefinition | LinkDefinition[], + @MedusaContext() sharedContext: Context = {} + ): Promise { const allLinks = Array.isArray(link) ? link : [link] const serviceLinks = new Map() @@ -541,27 +561,34 @@ export class Link { for (const [serviceName, links] of serviceLinks) { const service = this.modulesMap.get(serviceName)! - promises.push(service.dismiss(links)) + promises.push(service.dismiss(links, undefined, sharedContext)) } return (await promiseAll(promises)).flat() } async delete( - removedServices: DeleteEntityInput + removedServices: DeleteEntityInput, + @MedusaContext() sharedContext: Context = {} ): Promise<[CascadeError[] | null, RemovedIds]> { - return await this.executeCascade(removedServices, "softDelete") + return await this.executeCascade( + removedServices, + "softDelete", + sharedContext + ) } async restore( - removedServices: DeleteEntityInput + removedServices: DeleteEntityInput, + @MedusaContext() sharedContext: Context = {} ): Promise<[CascadeError[] | null, RestoredIds]> { - return await this.executeCascade(removedServices, "restore") + return await this.executeCascade(removedServices, "restore", sharedContext) } async list( link: LinkDefinition | LinkDefinition[], - options?: { asLinkDefinition?: boolean } + options?: { asLinkDefinition?: boolean }, + @MedusaContext() sharedContext: Context = {} ): Promise<(object | LinkDefinition)[]> { const allLinks = Array.isArray(link) ? link : [link] const serviceLinks = new Map() @@ -587,7 +614,7 @@ export class Link { promises.push( service - .list({ $or: filters }) + .list({ $or: filters }, {}, sharedContext) .then((links: any[]) => options?.asLinkDefinition ? convertRecordsToLinkDefinition(links, service) diff --git a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts index 0cfa7c1e23..9d2620087f 100644 --- a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts @@ -62,13 +62,13 @@ describe("Transaction Orchestrator", () => { definition: flow, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", handler, - { + payload: { prop: 123, - } - ) + }, + }) await strategy.resume(transaction) @@ -144,10 +144,10 @@ describe("Transaction Orchestrator", () => { definition: flow, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", - handler - ) + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) await strategy.resume(transaction) expect(actionOrder).toEqual(["one", "two", "three", "four", "five", "six"]) @@ -216,10 +216,10 @@ describe("Transaction Orchestrator", () => { definition: flow, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", - handler - ) + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) await strategy.resume(transaction) @@ -296,10 +296,10 @@ describe("Transaction Orchestrator", () => { definition: flow, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", - handler - ) + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) await strategy.resume(transaction) expect(actionOrder).toEqual(["one", "two", "three"]) @@ -376,11 +376,11 @@ describe("Transaction Orchestrator", () => { definition: flow, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", handler, - { prop: 123 } - ) + payload: { prop: 123 }, + }) await strategy.resume(transaction) @@ -471,10 +471,10 @@ describe("Transaction Orchestrator", () => { definition: flow, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", - handler - ) + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) await strategy.resume(transaction) const resposes = transaction.getContext() @@ -538,10 +538,10 @@ describe("Transaction Orchestrator", () => { definition: flow, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", - handler - ) + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) strategy.resume(transaction) @@ -611,10 +611,10 @@ describe("Transaction Orchestrator", () => { definition: flow, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", - handler - ) + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) await strategy.resume(transaction) @@ -678,10 +678,10 @@ describe("Transaction Orchestrator", () => { definition: flow, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", - handler - ) + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) await strategy.resume(transaction) @@ -736,10 +736,10 @@ describe("Transaction Orchestrator", () => { definition: flow, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", - handler - ) + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) await strategy.resume(transaction) @@ -797,13 +797,13 @@ describe("Transaction Orchestrator", () => { definition: flow, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", handler, - { + payload: { myPayloadProp: "test", - } - ) + }, + }) await strategy.resume(transaction) @@ -818,11 +818,10 @@ describe("Transaction Orchestrator", () => { "firstMethod", TransactionHandlerType.INVOKE ) - await strategy.registerStepSuccess( - mocktransactionId, - undefined, - transaction - ) + await strategy.registerStepSuccess({ + responseIdempotencyKey: mocktransactionId, + transaction, + }) expect(transaction.getState()).toBe(TransactionState.DONE) expect(transaction.getFlow().hasWaitingSteps).toBe(false) @@ -883,10 +882,10 @@ describe("Transaction Orchestrator", () => { definition: flow, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", - handler - ) + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) const mocktransactionId = TransactionOrchestrator.getKeyName( "transaction-name", @@ -909,7 +908,11 @@ describe("Transaction Orchestrator", () => { expect(mocks.two).toHaveBeenCalledTimes(0) const registerBeforeAllowed = await strategy - .registerStepSuccess(mockSecondStepId, handler) + .registerStepSuccess({ + responseIdempotencyKey: mockSecondStepId, + handler, + transaction, + }) .catch((e) => e.message) expect(registerBeforeAllowed).toEqual( @@ -917,11 +920,11 @@ describe("Transaction Orchestrator", () => { ) expect(transaction.getState()).toBe(TransactionState.INVOKING) - const resumedTransaction = await strategy.registerStepFailure( - mocktransactionId, - null, - handler - ) + const resumedTransaction = await strategy.registerStepFailure({ + responseIdempotencyKey: mocktransactionId, + handler, + transaction, + }) expect(resumedTransaction.getState()).toBe(TransactionState.COMPENSATING) expect(mocks.compensateOne).toHaveBeenCalledTimes(1) @@ -932,13 +935,12 @@ describe("Transaction Orchestrator", () => { "firstMethod", TransactionHandlerType.COMPENSATE ) - await strategy.registerStepSuccess( - mocktransactionIdCompensate, - undefined, - resumedTransaction - ) + await strategy.registerStepSuccess({ + responseIdempotencyKey: mocktransactionIdCompensate, + transaction: resumedTransaction, + }) - expect(resumedTransaction.getState()).toBe(TransactionState.REVERTED) + expect(transaction.getState()).toBe(TransactionState.REVERTED) }) it("Should hold the status REVERTED if the steps failed and the compensation succeed and has some no compensations step set", async () => { @@ -1010,10 +1012,10 @@ describe("Transaction Orchestrator", () => { definition: flow, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", - handler - ) + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) await strategy.resume(transaction) @@ -1082,10 +1084,10 @@ describe("Transaction Orchestrator", () => { definition: flow, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", - handler - ) + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) await strategy.resume(transaction) @@ -1123,10 +1125,10 @@ describe("Transaction Orchestrator", () => { }, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", - handler - ) + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) await strategy.resume(transaction) @@ -1219,10 +1221,10 @@ describe("Transaction Orchestrator", () => { }, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", - handler - ) + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) await strategy.resume(transaction) @@ -1328,10 +1330,10 @@ describe("Transaction Orchestrator", () => { definition: flow, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", - handler - ) + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) await strategy.resume(transaction) @@ -1448,10 +1450,10 @@ describe("Transaction Orchestrator", () => { definition: flow, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", - handler - ) + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) await strategy.resume(transaction) @@ -1561,10 +1563,10 @@ describe("Transaction Orchestrator", () => { }, }) - const transaction = await strategy.beginTransaction( - "transaction_id_123", - handler - ) + const transaction = await strategy.beginTransaction({ + transactionId: "transaction_id_123", + handler, + }) await strategy.resume(transaction) diff --git a/packages/core/orchestration/src/transaction/errors.ts b/packages/core/orchestration/src/transaction/errors.ts index 081f067db5..fb94e45db0 100644 --- a/packages/core/orchestration/src/transaction/errors.ts +++ b/packages/core/orchestration/src/transaction/errors.ts @@ -92,4 +92,9 @@ export class SkipExecutionError extends Error { error?.name === "SkipExecutionError" ) } + + constructor(message?: string) { + super(message) + this.name = "SkipExecutionError" + } } diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index f729bb0582..b7895ccff5 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -512,10 +512,13 @@ export class TransactionOrchestrator extends EventEmitter { } } - private static async skipStep( - transaction: DistributedTransactionType, + private static async skipStep({ + transaction, + step, + }: { + transaction: DistributedTransactionType step: TransactionStep - ): Promise<{ + }): Promise<{ stopExecution: boolean }> { const hasStepTimedOut = @@ -721,72 +724,25 @@ export class TransactionOrchestrator extends EventEmitter { const flow = transaction.getFlow() const nextSteps = await this.checkAllSteps(transaction) - const execution: Promise[] = [] - const hasTimedOut = await this.checkTransactionTimeout( - transaction, - nextSteps.current - ) - - if (hasTimedOut) { + if (await this.checkTransactionTimeout(transaction, nextSteps.current)) { continue } if (nextSteps.remaining === 0) { - if (transaction.hasTimeout()) { - void transaction.clearTransactionTimeout() - } - - await transaction.saveCheckpoint() - this.emit(DistributedTransactionEvent.FINISH, { transaction }) + await this.finalizeTransaction(transaction) + return } + const execution: Promise[] = [] for (const step of nextSteps.next) { - const curState = step.getStates() - const type = step.isCompensating() - ? TransactionHandlerType.COMPENSATE - : TransactionHandlerType.INVOKE + const { stopStepExecution } = this.prepareStepForExecution(step, flow) - step.lastAttempt = Date.now() - step.attempts++ - - if (curState.state === TransactionStepState.NOT_STARTED) { - if (!step.startedAt) { - step.startedAt = Date.now() - } - - if (step.isCompensating()) { - step.changeState(TransactionStepState.COMPENSATING) - - if (step.definition.noCompensation) { - step.changeState(TransactionStepState.REVERTED) - continue - } - } else if (flow.state === TransactionState.INVOKING) { - step.changeState(TransactionStepState.INVOKING) - } + // Should stop the execution if next step cant be handled + if (!stopStepExecution) { + continue } - step.changeStatus(TransactionStepStatus.WAITING) - - const payload = new TransactionPayload( - { - model_id: flow.modelId, - idempotency_key: TransactionOrchestrator.getKeyName( - flow.modelId, - flow.transactionId, - step.definition.action!, - type - ), - action: step.definition.action + "", - action_type: type, - attempt: step.attempts, - timestamp: Date.now(), - }, - transaction.payload, - transaction.getContext() - ) - if (step.hasTimeout() && !step.timedOutAt && step.attempts === 1) { await transaction.scheduleStepTimeout(step, step.definition.timeout!) } @@ -800,217 +756,368 @@ export class TransactionOrchestrator extends EventEmitter { ? step.definition.compensateAsync : step.definition.async - const setStepFailure = async ( - error: Error | any, - { - endRetry, - response, - }: { - endRetry?: boolean - response?: unknown - } = {} - ) => { - if (isDefined(response) && step.saveResponse) { - transaction.addResponse( - step.definition.action!, - step.isCompensating() - ? TransactionHandlerType.COMPENSATE - : TransactionHandlerType.INVOKE, - response - ) + // Save checkpoint before executing step + await transaction.saveCheckpoint().catch((error) => { + if (SkipExecutionError.isSkipExecutionError(error)) { + continueExecution = false + return } - const ret = await TransactionOrchestrator.setStepFailure( - transaction, - step, - error, - endRetry ? 0 : step.definition.maxRetries - ) + throw error + }) - if (isAsync && !ret.stopExecution) { - await transaction.scheduleRetry(step, 0) - } - - return ret + if (!continueExecution) { + break } - const traceData = { - action: step.definition.action + "", - type, - step_id: step.id, - step_uuid: step.uuid + "", - attempts: step.attempts, - failures: step.failures, - async: !!(type === "invoke" - ? step.definition.async - : step.definition.compensateAsync), - idempotency_key: payload.metadata.idempotency_key, - } - - const handlerArgs = [ - step.definition.action + "", - type, - payload, - transaction, - step, - this, - ] as Parameters + const promise = this.createStepExecutionPromise(transaction, step) if (!isAsync) { - const stepHandler = async () => { - return await transaction.handler(...handlerArgs) - } - - let promise: Promise - if (TransactionOrchestrator.traceStep) { - promise = TransactionOrchestrator.traceStep(stepHandler, traceData) - } else { - promise = stepHandler() - } - execution.push( - promise - .then(async (response: any) => { - if (this.hasExpired({ transaction, step }, Date.now())) { - await this.checkStepTimeout(transaction, step) - await this.checkTransactionTimeout( - transaction, - nextSteps.next.includes(step) ? nextSteps.next : [step] - ) - } - - const output = response?.__type ? response.output : response - if (SkipStepResponse.isSkipStepResponse(output)) { - await TransactionOrchestrator.skipStep(transaction, step) - return - } - - await TransactionOrchestrator.setStepSuccess( - transaction, - step, - response - ) - }) - .catch(async (error) => { - const response = error?.getStepResponse?.() - - if (this.hasExpired({ transaction, step }, Date.now())) { - await this.checkStepTimeout(transaction, step) - await this.checkTransactionTimeout( - transaction, - nextSteps.next.includes(step) ? nextSteps.next : [step] - ) - } - - if ( - PermanentStepFailureError.isPermanentStepFailureError(error) - ) { - await setStepFailure(error, { - endRetry: true, - response, - }) - - return - } - - await setStepFailure(error, { - response, - }) - }) + this.executeSyncStep(promise, transaction, step, nextSteps) ) } else { - const stepHandler = async () => { - return await transaction.handler(...handlerArgs) - } - execution.push( - transaction.saveCheckpoint().then(() => { - let promise: Promise - - if (TransactionOrchestrator.traceStep) { - promise = TransactionOrchestrator.traceStep( - stepHandler, - traceData - ) - } else { - promise = stepHandler() - } - - promise - .then(async (response: any) => { - const output = response?.__type ? response.output : response - - if (SkipStepResponse.isSkipStepResponse(output)) { - await TransactionOrchestrator.skipStep(transaction, step) - } else { - if ( - !step.definition.backgroundExecution || - step.definition.nested - ) { - const eventName = - DistributedTransactionEvent.STEP_AWAITING - transaction.emit(eventName, { step, transaction }) - - return - } - - if (this.hasExpired({ transaction, step }, Date.now())) { - await this.checkStepTimeout(transaction, step) - await this.checkTransactionTimeout( - transaction, - nextSteps.next.includes(step) ? nextSteps.next : [step] - ) - } - - await TransactionOrchestrator.setStepSuccess( - transaction, - step, - response - ) - } - - // check nested flow - await transaction.scheduleRetry(step, 0) - }) - .catch(async (error) => { - const response = error?.getStepResponse?.() - - if ( - PermanentStepFailureError.isPermanentStepFailureError(error) - ) { - await setStepFailure(error, { - endRetry: true, - response, - }) - - return - } - - await setStepFailure(error, { - response, - }) - }) - }) + this.executeAsyncStep(promise, transaction, step, nextSteps) ) } } - try { - await transaction.saveCheckpoint() - } catch (error) { - if (SkipExecutionError.isSkipExecutionError(error)) { - break - } else { - throw error - } - } - await promiseAll(execution) if (nextSteps.next.length === 0) { continueExecution = false } } + + // Recompute the current flow flags + await this.checkAllSteps(transaction) + await transaction.saveCheckpoint().catch((error) => { + if (!SkipExecutionError.isSkipExecutionError(error)) { + throw error + } + }) + } + + /** + * Finalize the transaction when all steps are complete + */ + private async finalizeTransaction( + transaction: DistributedTransactionType + ): Promise { + if (transaction.hasTimeout()) { + void transaction.clearTransactionTimeout() + } + + await transaction.saveCheckpoint().catch((error) => { + if (!SkipExecutionError.isSkipExecutionError(error)) { + throw error + } + }) + this.emit(DistributedTransactionEvent.FINISH, { transaction }) + } + + /** + * Prepare a step for execution by setting state and incrementing attempts + */ + private prepareStepForExecution( + step: TransactionStep, + flow: TransactionFlow + ): { stopStepExecution: boolean } { + const curState = step.getStates() + + step.lastAttempt = Date.now() + step.attempts++ + + if (curState.state === TransactionStepState.NOT_STARTED) { + if (!step.startedAt) { + step.startedAt = Date.now() + } + + if (step.isCompensating()) { + step.changeState(TransactionStepState.COMPENSATING) + + if (step.definition.noCompensation) { + step.changeState(TransactionStepState.REVERTED) + return { stopStepExecution: false } + } + } else if (flow.state === TransactionState.INVOKING) { + step.changeState(TransactionStepState.INVOKING) + } + } + + step.changeStatus(TransactionStepStatus.WAITING) + + return { stopStepExecution: true } + } + + /** + * Create the payload for a step execution + */ + private createStepPayload( + transaction: DistributedTransactionType, + step: TransactionStep, + flow: TransactionFlow + ): TransactionPayload { + const type = step.isCompensating() + ? TransactionHandlerType.COMPENSATE + : TransactionHandlerType.INVOKE + + return new TransactionPayload( + { + model_id: flow.modelId, + idempotency_key: TransactionOrchestrator.getKeyName( + flow.modelId, + flow.transactionId, + step.definition.action!, + type + ), + action: step.definition.action + "", + action_type: type, + attempt: step.attempts, + timestamp: Date.now(), + }, + transaction.payload, + transaction.getContext() + ) + } + + /** + * Prepare handler arguments for step execution + */ + private prepareHandlerArgs( + transaction: DistributedTransactionType, + step: TransactionStep, + flow: TransactionFlow, + payload: TransactionPayload + ): Parameters { + const type = step.isCompensating() + ? TransactionHandlerType.COMPENSATE + : TransactionHandlerType.INVOKE + + return [ + step.definition.action + "", + type, + payload, + transaction, + step, + this, + ] as Parameters + } + + /** + * Create the step execution promise with optional tracing + */ + private createStepExecutionPromise( + transaction: DistributedTransactionType, + step: TransactionStep + ): () => Promise { + const type = step.isCompensating() + ? TransactionHandlerType.COMPENSATE + : TransactionHandlerType.INVOKE + + const handlerArgs = this.prepareHandlerArgs( + transaction, + step, + transaction.getFlow(), + this.createStepPayload(transaction, step, transaction.getFlow()) + ) + + const traceData = { + action: step.definition.action + "", + type, + step_id: step.id, + step_uuid: step.uuid + "", + attempts: step.attempts, + failures: step.failures, + async: !!(type === "invoke" + ? step.definition.async + : step.definition.compensateAsync), + idempotency_key: handlerArgs[2].metadata.idempotency_key, + } + + const stepHandler = async () => { + return await transaction.handler(...handlerArgs) + } + + // Return the appropriate promise based on tracing configuration + if (TransactionOrchestrator.traceStep) { + return () => TransactionOrchestrator.traceStep!(stepHandler, traceData) + } else { + return stepHandler + } + } + + /** + * Execute a synchronous step and handle its result + */ + private executeSyncStep( + promiseFn: () => Promise, + transaction: DistributedTransactionType, + step: TransactionStep, + nextSteps: { next: TransactionStep[] } + ): Promise { + return promiseFn() + .then(async (response: any) => { + await this.handleStepExpiration(transaction, step, nextSteps) + + const output = response?.__type ? response.output : response + if (SkipStepResponse.isSkipStepResponse(output)) { + await TransactionOrchestrator.skipStep({ + transaction, + step, + }) + return + } + + await this.handleStepSuccess(transaction, step, response) + }) + .catch(async (error) => { + if (SkipExecutionError.isSkipExecutionError(error)) { + return + } + + const response = error?.getStepResponse?.() + await this.handleStepExpiration(transaction, step, nextSteps) + + if (PermanentStepFailureError.isPermanentStepFailureError(error)) { + await this.handleStepFailure(transaction, step, error, true, response) + return + } + + await this.handleStepFailure(transaction, step, error, false, response) + }) + } + + /** + * Execute an asynchronous step and handle its result + */ + private executeAsyncStep( + promiseFn: () => Promise, + transaction: DistributedTransactionType, + step: TransactionStep, + nextSteps: { next: TransactionStep[] } + ): Promise { + return promiseFn() + .then(async (response: any) => { + const output = response?.__type ? response.output : response + + if (SkipStepResponse.isSkipStepResponse(output)) { + await TransactionOrchestrator.skipStep({ + transaction, + step, + }) + } else { + if (!step.definition.backgroundExecution || step.definition.nested) { + const eventName = DistributedTransactionEvent.STEP_AWAITING + transaction.emit(eventName, { step, transaction }) + return + } + + await this.handleStepExpiration(transaction, step, nextSteps) + await this.handleStepSuccess(transaction, step, response) + } + }) + .catch(async (error) => { + if (SkipExecutionError.isSkipExecutionError(error)) { + return + } + + const response = error?.getStepResponse?.() + + if (PermanentStepFailureError.isPermanentStepFailureError(error)) { + await this.handleStepFailure(transaction, step, error, true, response) + return + } + + await this.handleStepFailure(transaction, step, error, false, response) + }) + } + + /** + * Check if step or transaction has expired and handle timeouts + */ + private async handleStepExpiration( + transaction: DistributedTransactionType, + step: TransactionStep, + nextSteps: { next: TransactionStep[] } + ): Promise { + if (this.hasExpired({ transaction, step }, Date.now())) { + await this.checkStepTimeout(transaction, step) + await this.checkTransactionTimeout( + transaction, + nextSteps.next.includes(step) ? nextSteps.next : [step] + ) + } + } + + /** + * Handle successful step completion + */ + private async handleStepSuccess( + transaction: DistributedTransactionType, + step: TransactionStep, + response: unknown + ): Promise { + const isAsync = step.isCompensating() + ? step.definition.compensateAsync + : step.definition.async + + if (isDefined(response) && step.saveResponse) { + transaction.addResponse( + step.definition.action!, + step.isCompensating() + ? TransactionHandlerType.COMPENSATE + : TransactionHandlerType.INVOKE, + response + ) + } + + const ret = await TransactionOrchestrator.setStepSuccess( + transaction, + step, + response + ) + + if (isAsync && !ret.stopExecution) { + await transaction.scheduleRetry(step, 0) + } + } + + /** + * Handle step failure + */ + private async handleStepFailure( + transaction: DistributedTransactionType, + step: TransactionStep, + error: Error | any, + isPermanent: boolean, + response?: unknown + ): Promise { + const isAsync = step.isCompensating() + ? step.definition.compensateAsync + : step.definition.async + + if (isDefined(response) && step.saveResponse) { + transaction.addResponse( + step.definition.action!, + step.isCompensating() + ? TransactionHandlerType.COMPENSATE + : TransactionHandlerType.INVOKE, + response + ) + } + + const ret = await TransactionOrchestrator.setStepFailure( + transaction, + step, + error, + isPermanent ? 0 : step.definition.maxRetries + ) + + if (isAsync && !ret.stopExecution) { + await transaction.scheduleRetry(step, 0) + } } /** @@ -1288,12 +1395,19 @@ export class TransactionOrchestrator extends EventEmitter { * @param payload - payload to be passed to all the transaction steps * @param flowMetadata - flow metadata which can include event group id for example */ - public async beginTransaction( - transactionId: string, - handler: TransactionStepHandler, - payload?: unknown, + public async beginTransaction({ + transactionId, + handler, + payload, + flowMetadata, + onLoad, + }: { + transactionId: string + handler: TransactionStepHandler + payload?: unknown flowMetadata?: TransactionFlow["metadata"] - ): Promise { + onLoad?: (transaction: DistributedTransactionType) => Promise | void + }): Promise { const existingTransaction = await TransactionOrchestrator.loadTransactionById(this.id, transactionId) @@ -1320,6 +1434,10 @@ export class TransactionOrchestrator extends EventEmitter { ) } + if (onLoad) { + await onLoad(transaction) + } + return transaction } @@ -1423,11 +1541,15 @@ export class TransactionOrchestrator extends EventEmitter { * @param handler - The handler function to execute the step * @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey */ - public async skipStep( - responseIdempotencyKey: string, - handler?: TransactionStepHandler, + public async skipStep({ + responseIdempotencyKey, + handler, + transaction, + }: { + responseIdempotencyKey: string + handler?: TransactionStepHandler transaction?: DistributedTransactionType - ): Promise { + }): Promise { const [curTransaction, step] = await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey( responseIdempotencyKey, @@ -1440,7 +1562,10 @@ export class TransactionOrchestrator extends EventEmitter { transaction: curTransaction, }) - await TransactionOrchestrator.skipStep(curTransaction, step) + await TransactionOrchestrator.skipStep({ + transaction: curTransaction, + step, + }) await this.executeNext(curTransaction) } else { @@ -1459,12 +1584,19 @@ export class TransactionOrchestrator extends EventEmitter { * @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey * @param response - The response of the step */ - public async registerStepSuccess( - responseIdempotencyKey: string, - handler?: TransactionStepHandler, - transaction?: DistributedTransactionType, + public async registerStepSuccess({ + responseIdempotencyKey, + handler, + transaction, + response, + onLoad, + }: { + responseIdempotencyKey: string + handler?: TransactionStepHandler + transaction?: DistributedTransactionType response?: unknown - ): Promise { + onLoad?: (transaction: DistributedTransactionType) => Promise | void + }): Promise { const [curTransaction, step] = await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey( responseIdempotencyKey, @@ -1472,6 +1604,10 @@ export class TransactionOrchestrator extends EventEmitter { transaction ) + if (onLoad) { + await onLoad(curTransaction) + } + if (step.getStates().status === TransactionStepStatus.WAITING) { this.emit(DistributedTransactionEvent.RESUME, { transaction: curTransaction, @@ -1502,12 +1638,19 @@ export class TransactionOrchestrator extends EventEmitter { * @param transaction - The current transaction * @param response - The response of the step */ - public async registerStepFailure( - responseIdempotencyKey: string, - error?: Error | any, - handler?: TransactionStepHandler, + public async registerStepFailure({ + responseIdempotencyKey, + error, + handler, + transaction, + onLoad, + }: { + responseIdempotencyKey: string + error?: Error | any + handler?: TransactionStepHandler transaction?: DistributedTransactionType - ): Promise { + onLoad?: (transaction: DistributedTransactionType) => Promise | void + }): Promise { const [curTransaction, step] = await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey( responseIdempotencyKey, @@ -1515,6 +1658,10 @@ export class TransactionOrchestrator extends EventEmitter { transaction ) + if (onLoad) { + await onLoad(curTransaction) + } + if (step.getStates().status === TransactionStepStatus.WAITING) { this.emit(DistributedTransactionEvent.RESUME, { transaction: curTransaction, diff --git a/packages/core/orchestration/src/workflow/global-workflow.ts b/packages/core/orchestration/src/workflow/global-workflow.ts index 94f86f84f3..520069d594 100644 --- a/packages/core/orchestration/src/workflow/global-workflow.ts +++ b/packages/core/orchestration/src/workflow/global-workflow.ts @@ -52,11 +52,11 @@ export class GlobalWorkflow extends WorkflowManager { const orchestrator = workflow.orchestrator - const transaction = await orchestrator.beginTransaction( - uniqueTransactionId, - workflow.handler(this.container, this.context), - input - ) + const transaction = await orchestrator.beginTransaction({ + transactionId: uniqueTransactionId, + handler: workflow.handler(this.container, this.context), + payload: input, + }) if (this.subscribe.onStepBegin) { transaction.once("stepBegin", this.subscribe.onStepBegin) @@ -104,12 +104,11 @@ export class GlobalWorkflow extends WorkflowManager { } }) - return await workflow.orchestrator.registerStepSuccess( - idempotencyKey, - workflow.handler(this.container, this.context), - undefined, - response - ) + return await workflow.orchestrator.registerStepSuccess({ + responseIdempotencyKey: idempotencyKey, + handler: workflow.handler(this.container, this.context), + response, + }) } async registerStepFailure( @@ -137,10 +136,10 @@ export class GlobalWorkflow extends WorkflowManager { } }) - return await workflow.orchestrator.registerStepFailure( - idempotencyKey, + return await workflow.orchestrator.registerStepFailure({ + responseIdempotencyKey: idempotencyKey, error, - workflow.handler(this.container, this.context) - ) + handler: workflow.handler(this.container, this.context), + }) } } diff --git a/packages/core/orchestration/src/workflow/local-workflow.ts b/packages/core/orchestration/src/workflow/local-workflow.ts index 4e2c2e6203..6b4dc3e537 100644 --- a/packages/core/orchestration/src/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/workflow/local-workflow.ts @@ -113,7 +113,7 @@ export class LocalWorkflow { return target[prop] } - return async (...args) => { + return (...args) => { const ctxIndex = MedusaContext.getIndex(target, prop as string) const hasContext = args[ctxIndex!]?.__type === MedusaContextType @@ -125,8 +125,12 @@ export class LocalWorkflow { args[ctxIndex] = context } + } else if (hasContext) { + args[ctxIndex!].eventGroupId ??= this_.medusaContext?.eventGroupId } - return await target[prop].apply(target, [...args]) + + const method = target[prop] + return method.apply(target, [...args]) } }, }) @@ -355,12 +359,18 @@ export class LocalWorkflow { this.medusaContext = context const { handler, orchestrator } = this.workflow - const transaction = await orchestrator.beginTransaction( - uniqueTransactionId, - handler(this.container_, context), - input, - flowMetadata - ) + const transaction = await orchestrator.beginTransaction({ + transactionId: uniqueTransactionId, + handler: handler(this.container_, context), + payload: input, + flowMetadata, + onLoad: (transaction) => { + if (this.medusaContext) { + this.medusaContext.eventGroupId = + transaction.getFlow().metadata?.eventGroupId + } + }, + }) const { cleanUpEventListeners } = this.registerEventCallbacks({ orchestrator, @@ -402,6 +412,11 @@ export class LocalWorkflow { ? await this.getRunningTransaction(transactionOrTransactionId, context) : transactionOrTransactionId + if (this.medusaContext) { + this.medusaContext.eventGroupId = + transaction.getFlow().metadata?.eventGroupId + } + const { cleanUpEventListeners } = this.registerEventCallbacks({ orchestrator, transaction, @@ -432,12 +447,17 @@ export class LocalWorkflow { subscribe, }) - const transaction = await orchestrator.registerStepSuccess( - idempotencyKey, - handler(this.container_, context), - undefined, - response - ) + const transaction = await orchestrator.registerStepSuccess({ + responseIdempotencyKey: idempotencyKey, + handler: handler(this.container_, context), + response, + onLoad: (transaction) => { + if (this.medusaContext) { + this.medusaContext.eventGroupId = + transaction.getFlow().metadata?.eventGroupId + } + }, + }) try { return transaction @@ -461,11 +481,17 @@ export class LocalWorkflow { subscribe, }) - const transaction = await orchestrator.registerStepFailure( - idempotencyKey, + const transaction = await orchestrator.registerStepFailure({ + responseIdempotencyKey: idempotencyKey, error, - handler(this.container_, context) - ) + handler: handler(this.container_, context), + onLoad: (transaction) => { + if (this.medusaContext) { + this.medusaContext.eventGroupId = + transaction.getFlow().metadata?.eventGroupId + } + }, + }) try { return transaction diff --git a/packages/core/types/src/link-modules/service.ts b/packages/core/types/src/link-modules/service.ts index cc97a4b1e6..ce5dbac0a9 100644 --- a/packages/core/types/src/link-modules/service.ts +++ b/packages/core/types/src/link-modules/service.ts @@ -22,6 +22,7 @@ export interface ILinkModule extends IModuleService { | string[] | [string | string[], string, Record?][], foreignKeyData?: string, + extraFields?: Record, sharedContext?: Context ): Promise diff --git a/packages/core/utils/src/event-bus/message-aggregator.ts b/packages/core/utils/src/event-bus/message-aggregator.ts index 4baa185e64..4c70127273 100644 --- a/packages/core/utils/src/event-bus/message-aggregator.ts +++ b/packages/core/utils/src/event-bus/message-aggregator.ts @@ -43,7 +43,7 @@ export class MessageAggregator implements IMessageAggregator { object: message.object, action: message.action, options, - context: sharedContext, + context: message.context ?? sharedContext, }) }) this.save(composedMessages) diff --git a/packages/core/utils/src/modules-sdk/decorators/inject-manager.ts b/packages/core/utils/src/modules-sdk/decorators/inject-manager.ts index ff0de77db7..e3bc4fe47f 100644 --- a/packages/core/utils/src/modules-sdk/decorators/inject-manager.ts +++ b/packages/core/utils/src/modules-sdk/decorators/inject-manager.ts @@ -28,6 +28,7 @@ export function InjectManager(managerProperty?: string): MethodDecorator { } Object.defineProperty(copiedContext, key, { + enumerable: true, get: function () { return originalContext[key] }, diff --git a/packages/core/utils/src/modules-sdk/decorators/inject-transaction-manager.ts b/packages/core/utils/src/modules-sdk/decorators/inject-transaction-manager.ts index ab5b864077..32b44a5270 100644 --- a/packages/core/utils/src/modules-sdk/decorators/inject-transaction-manager.ts +++ b/packages/core/utils/src/modules-sdk/decorators/inject-transaction-manager.ts @@ -22,10 +22,9 @@ export function InjectTransactionManager( const argIndex = target.MedusaContextIndex_[propertyKey] descriptor.value = async function (...args: any[]) { - const context: Context = args[argIndex] ?? {} const originalContext = args[argIndex] ?? {} - if (context?.transactionManager) { + if (originalContext?.transactionManager) { return await originalMethod.apply(this, args) } @@ -41,6 +40,7 @@ export function InjectTransactionManager( } Object.defineProperty(copiedContext, key, { + enumerable: true, get: function () { return originalContext[key] }, @@ -63,10 +63,10 @@ export function InjectTransactionManager( return await originalMethod.apply(this, args) }, { - transaction: context?.transactionManager, - isolationLevel: (context as Context)?.isolationLevel, + transaction: originalContext?.transactionManager, + isolationLevel: (originalContext as Context)?.isolationLevel, enableNestedTransactions: - (context as Context).enableNestedTransactions ?? false, + (originalContext as Context).enableNestedTransactions ?? false, } ) } diff --git a/packages/medusa-test-utils/src/database.ts b/packages/medusa-test-utils/src/database.ts index 341af26a23..c533272d51 100644 --- a/packages/medusa-test-utils/src/database.ts +++ b/packages/medusa-test-utils/src/database.ts @@ -198,11 +198,31 @@ export const dbTestUtilFactory = (): any => ({ FROM information_schema.tables WHERE table_schema = '${schema}';`) + const skipIndexPartitionPrefix = "cat_" + const mainPartitionTables = ["index_data", "index_relation"] + let hasIndexTables = false for (const { table_name } of tableNames) { + if (mainPartitionTables.includes(table_name)) { + hasIndexTables = true + } + + // Skipping index partition tables. + if ( + table_name.startsWith(skipIndexPartitionPrefix) || + mainPartitionTables.includes(table_name) + ) { + continue + } + await runRawQuery(`DELETE FROM ${schema}."${table_name}";`) } + if (hasIndexTables) { + await runRawQuery(`TRUNCATE TABLE ${schema}.index_data;`) + await runRawQuery(`TRUNCATE TABLE ${schema}.index_relation;`) + } + await runRawQuery(`SET session_replication_role = 'origin';`) }, diff --git a/packages/modules/event-bus-local/src/services/event-bus-local.ts b/packages/modules/event-bus-local/src/services/event-bus-local.ts index 1ac4001a32..8b6780852c 100644 --- a/packages/modules/event-bus-local/src/services/event-bus-local.ts +++ b/packages/modules/event-bus-local/src/services/event-bus-local.ts @@ -82,13 +82,14 @@ export default class LocalEventBusService extends AbstractEventBusModuleService // This is useful in the event of a distributed transaction where you'd want to emit // events only once the transaction ends. private async groupOrEmitEvent(eventData: Message) { - const { options, ...eventBody } = eventData + const eventData_ = JSON.parse(JSON.stringify(eventData)) + const { options, ...eventBody } = eventData_ const eventGroupId = eventBody.metadata?.eventGroupId if (eventGroupId) { - await this.groupEvent(eventGroupId, eventData) + await this.groupEvent(eventGroupId, eventData_) } else { - const { options, ...eventBody } = eventData + const { options, ...eventBody } = eventData_ const options_ = options as { delay: number } const delay = (ms?: number) => (ms ? setTimeout(ms) : Promise.resolve()) @@ -112,7 +113,8 @@ export default class LocalEventBusService extends AbstractEventBusModuleService } async releaseGroupedEvents(eventGroupId: string) { - const groupedEvents = this.groupedEventsMap_.get(eventGroupId) || [] + let groupedEvents = this.groupedEventsMap_.get(eventGroupId) || [] + groupedEvents = JSON.parse(JSON.stringify(groupedEvents)) for (const event of groupedEvents) { const { options, ...eventBody } = event diff --git a/packages/modules/index/src/services/postgres-provider.ts b/packages/modules/index/src/services/postgres-provider.ts index df3d422f7e..13ac0461ed 100644 --- a/packages/modules/index/src/services/postgres-provider.ts +++ b/packages/modules/index/src/services/postgres-provider.ts @@ -686,7 +686,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { }) let relationsToUpsert: any[] = [] - const entitiesToUpdate = cleanedData.map((entityData) => { + const entitiesToUpsert = cleanedData.map((entityData) => { relationsToUpsert.push( { parent_id: entityData[parentPropertyId] as string, @@ -714,8 +714,8 @@ export class PostgresProvider implements IndexTypes.StorageProvider { } }) - if (entitiesToUpdate.length) { - await indexRepository.upsertMany(entitiesToUpdate, { + if (entitiesToUpsert.length) { + await indexRepository.upsertMany(entitiesToUpsert, { onConflictAction: "merge", onConflictFields: ["id", "name"], onConflictMergeFields: ["data", "staled_at"], diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts index 1d13cbd3a3..cbe9a51c78 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts @@ -35,8 +35,7 @@ moduleIntegrationTestRunner({ }, }, testSuite: ({ service: workflowOrcModule, medusaApp }) => { - // TODO: Debug the issue with this test https://github.com/medusajs/medusa/actions/runs/13900190144/job/38897122803#step:5:5616 - describe.skip("Testing race condition of the workflow during retry", () => { + describe("Testing race condition of the workflow during retry", () => { it("should prevent race continuation of the workflow during retryIntervalAwaiting in background execution", (done) => { const transactionId = "transaction_id"