diff --git a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts index 9f2373aea0..f88a45d70a 100644 --- a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts @@ -6,8 +6,8 @@ import { TransactionOrchestrator, TransactionPayload, TransactionState, - TransactionStepTimeoutError, TransactionStepsDefinition, + TransactionStepTimeoutError, TransactionTimeoutError, } from "../../transaction" @@ -55,7 +55,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -134,7 +137,10 @@ describe("Transaction Orchestrator", () => { ], } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -185,7 +191,10 @@ describe("Transaction Orchestrator", () => { ], } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -262,7 +271,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -354,7 +366,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -418,7 +433,10 @@ describe("Transaction Orchestrator", () => { ], } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -488,7 +506,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -552,7 +573,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -607,7 +631,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -665,7 +692,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -748,7 +778,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -872,7 +905,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -941,7 +977,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -975,9 +1014,12 @@ describe("Transaction Orchestrator", () => { transactionInHandler = transaction } - const strategy = new TransactionOrchestrator("transaction-name", { - next: { - action: "firstMethod", + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: { + next: { + action: "firstMethod", + }, }, }) @@ -1069,8 +1111,12 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow, { - timeout: 0.1, // 100ms + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + options: { + timeout: 0.1, // 100ms + }, }) const transaction = await strategy.beginTransaction( @@ -1177,7 +1223,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -1294,7 +1343,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -1401,8 +1453,12 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow, { - timeout: 0.1, // 100ms + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + options: { + timeout: 0.1, // 100ms + }, }) const transaction = await strategy.beginTransaction( diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index d3611f72d3..04130f239e 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -33,10 +33,14 @@ import { * It is based on a single transaction definition, which is used to execute all the transaction steps */ export class TransactionOrchestrator extends EventEmitter { + id: string + private static ROOT_STEP = "_root" public static DEFAULT_TTL = 30 private invokeSteps: string[] = [] private compensateSteps: string[] = [] + private definition: TransactionStepsDefinition + private options?: TransactionModelOptions public static DEFAULT_RETRIES = 0 @@ -48,13 +52,35 @@ export class TransactionOrchestrator extends EventEmitter { return this.workflowOptions[modelId] } - constructor( - public id: string, - private definition: TransactionStepsDefinition, - private options?: TransactionModelOptions - ) { + constructor({ + id, + definition, + options, + isClone, + }: { + id: string + definition: TransactionStepsDefinition + options?: TransactionModelOptions + isClone?: boolean + }) { super() - this.parseFlowOptions() + + this.id = id + this.definition = definition + this.options = options + + if (!isClone) { + this.parseFlowOptions() + } + } + + static clone(orchestrator: TransactionOrchestrator): TransactionOrchestrator { + return new TransactionOrchestrator({ + id: orchestrator.id, + definition: orchestrator.definition, + options: orchestrator.options, + isClone: true, + }) } private static SEPARATOR = ":" diff --git a/packages/core/orchestration/src/workflow/local-workflow.ts b/packages/core/orchestration/src/workflow/local-workflow.ts index 2ffc0deafd..53b6846160 100644 --- a/packages/core/orchestration/src/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/workflow/local-workflow.ts @@ -1,18 +1,18 @@ import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" import { + createMedusaContainer, + isDefined, + isString, MedusaContext, MedusaContextType, MedusaError, MedusaModuleType, - createMedusaContainer, - isDefined, - isString, } from "@medusajs/utils" import { asValue } from "awilix" import { - DistributedTransactionType, DistributedTransactionEvent, DistributedTransactionEvents, + DistributedTransactionType, TransactionFlow, TransactionModelOptions, TransactionOrchestrator, @@ -59,10 +59,15 @@ export class LocalWorkflow { ) } - this.flow = new OrchestratorBuilder(globalWorkflow.flow_) + const workflow = { + ...globalWorkflow, + orchestrator: TransactionOrchestrator.clone(globalWorkflow.orchestrator), + } + + this.flow = new OrchestratorBuilder(workflow.flow_) this.workflowId = workflowId - this.workflow = globalWorkflow - this.handlers = new Map(globalWorkflow.handlers_) + this.workflow = workflow + this.handlers = new Map(workflow.handlers_) this.resolveContainer(modulesLoaded) } @@ -141,11 +146,11 @@ export class LocalWorkflow { this.workflow = { id: this.workflowId, flow_: finalFlow, - orchestrator: new TransactionOrchestrator( - this.workflowId, - finalFlow, - customOptions - ), + orchestrator: new TransactionOrchestrator({ + id: this.workflowId, + definition: finalFlow, + options: customOptions, + }), options: customOptions, handler: WorkflowManager.buildHandlers(this.handlers), handlers_: this.handlers, diff --git a/packages/core/orchestration/src/workflow/workflow-manager.ts b/packages/core/orchestration/src/workflow/workflow-manager.ts index c2f6a40d93..7612cbf5bb 100644 --- a/packages/core/orchestration/src/workflow/workflow-manager.ts +++ b/packages/core/orchestration/src/workflow/workflow-manager.ts @@ -122,11 +122,11 @@ class WorkflowManager { const workflow = { id: workflowId, flow_: finalFlow!, - orchestrator: new TransactionOrchestrator( - workflowId, - finalFlow ?? {}, - options - ), + orchestrator: new TransactionOrchestrator({ + id: workflowId, + definition: finalFlow ?? {}, + options, + }), handler: WorkflowManager.buildHandlers(handlers), handlers_: handlers, options, @@ -167,11 +167,11 @@ class WorkflowManager { WorkflowManager.workflows.set(workflowId, { id: workflowId, flow_: finalFlow, - orchestrator: new TransactionOrchestrator( - workflowId, - finalFlow, - updatedOptions - ), + orchestrator: new TransactionOrchestrator({ + id: workflowId, + definition: finalFlow, + options, + }), handler: WorkflowManager.buildHandlers(workflow.handlers_), handlers_: workflow.handlers_, options: updatedOptions, diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index a9d2e794df..c89e703657 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -9,9 +9,9 @@ import { import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" import { ContainerRegistrationKeys, + isPresent, MedusaContextType, ModuleRegistrationName, - isPresent, } from "@medusajs/utils" import { EOL } from "os" import { ulid } from "ulid" @@ -518,12 +518,12 @@ function attachOnFinishReleaseEvents( const TERMINAL_SIZE = process.stdout?.columns ?? 60 const separator = new Array(TERMINAL_SIZE).join("-") - const worflowName = transaction.getFlow().modelId + const workflowName = transaction.getFlow().modelId const allWorkflowErrors = transaction .getErrors() .map( (err) => - `${worflowName}:${err?.action}:${err?.handlerType} - ${err?.error?.message}${EOL}${err?.error?.stack}` + `${workflowName}:${err?.action}:${err?.handlerType} - ${err?.error?.message}${EOL}${err?.error?.stack}` ) .join(EOL + separator + EOL)