From b09c19912bc076628b57afd9b4e343de5246b7f3 Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Tue, 27 Aug 2024 10:59:56 +0200 Subject: [PATCH] fix(orchestration, workflow-sdk): Local workflow separated orchestrator (#8765) FIXES TRI-174 **What** Currently, every time a Local workflow is being instantiated, it will grab the global workflow definition including the orchestrator instance. This leads to issues when we have concurrent running workflows which all register their event listeners to this single orchestrator instance which can lead to exhausting the listerners. With this fix, every local workflow will have a copy of the global workflow definition plus a new instance (cloned) of the orchestrator meaning that from now on, every local workflow will have its own orchestrator. --- .../transaction/transaction-orchestrator.ts | 102 ++++++++++++++---- .../transaction/transaction-orchestrator.ts | 38 +++++-- .../src/workflow/local-workflow.ts | 29 ++--- .../src/workflow/workflow-manager.ts | 20 ++-- .../src/helper/workflow-export.ts | 6 +- 5 files changed, 141 insertions(+), 54 deletions(-) diff --git a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts index 9f2373aea0..f88a45d70a 100644 --- a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts @@ -6,8 +6,8 @@ import { TransactionOrchestrator, TransactionPayload, TransactionState, - TransactionStepTimeoutError, TransactionStepsDefinition, + TransactionStepTimeoutError, TransactionTimeoutError, } from "../../transaction" @@ -55,7 +55,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -134,7 +137,10 @@ describe("Transaction Orchestrator", () => { ], } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -185,7 +191,10 @@ describe("Transaction Orchestrator", () => { ], } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -262,7 +271,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -354,7 +366,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -418,7 +433,10 @@ describe("Transaction Orchestrator", () => { ], } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -488,7 +506,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -552,7 +573,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -607,7 +631,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -665,7 +692,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -748,7 +778,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -872,7 +905,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -941,7 +977,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -975,9 +1014,12 @@ describe("Transaction Orchestrator", () => { transactionInHandler = transaction } - const strategy = new TransactionOrchestrator("transaction-name", { - next: { - action: "firstMethod", + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: { + next: { + action: "firstMethod", + }, }, }) @@ -1069,8 +1111,12 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow, { - timeout: 0.1, // 100ms + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + options: { + timeout: 0.1, // 100ms + }, }) const transaction = await strategy.beginTransaction( @@ -1177,7 +1223,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -1294,7 +1343,10 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow) + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + }) const transaction = await strategy.beginTransaction( "transaction_id_123", @@ -1401,8 +1453,12 @@ describe("Transaction Orchestrator", () => { }, } - const strategy = new TransactionOrchestrator("transaction-name", flow, { - timeout: 0.1, // 100ms + const strategy = new TransactionOrchestrator({ + id: "transaction-name", + definition: flow, + options: { + timeout: 0.1, // 100ms + }, }) const transaction = await strategy.beginTransaction( diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index d3611f72d3..04130f239e 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -33,10 +33,14 @@ import { * It is based on a single transaction definition, which is used to execute all the transaction steps */ export class TransactionOrchestrator extends EventEmitter { + id: string + private static ROOT_STEP = "_root" public static DEFAULT_TTL = 30 private invokeSteps: string[] = [] private compensateSteps: string[] = [] + private definition: TransactionStepsDefinition + private options?: TransactionModelOptions public static DEFAULT_RETRIES = 0 @@ -48,13 +52,35 @@ export class TransactionOrchestrator extends EventEmitter { return this.workflowOptions[modelId] } - constructor( - public id: string, - private definition: TransactionStepsDefinition, - private options?: TransactionModelOptions - ) { + constructor({ + id, + definition, + options, + isClone, + }: { + id: string + definition: TransactionStepsDefinition + options?: TransactionModelOptions + isClone?: boolean + }) { super() - this.parseFlowOptions() + + this.id = id + this.definition = definition + this.options = options + + if (!isClone) { + this.parseFlowOptions() + } + } + + static clone(orchestrator: TransactionOrchestrator): TransactionOrchestrator { + return new TransactionOrchestrator({ + id: orchestrator.id, + definition: orchestrator.definition, + options: orchestrator.options, + isClone: true, + }) } private static SEPARATOR = ":" diff --git a/packages/core/orchestration/src/workflow/local-workflow.ts b/packages/core/orchestration/src/workflow/local-workflow.ts index 2ffc0deafd..53b6846160 100644 --- a/packages/core/orchestration/src/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/workflow/local-workflow.ts @@ -1,18 +1,18 @@ import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" import { + createMedusaContainer, + isDefined, + isString, MedusaContext, MedusaContextType, MedusaError, MedusaModuleType, - createMedusaContainer, - isDefined, - isString, } from "@medusajs/utils" import { asValue } from "awilix" import { - DistributedTransactionType, DistributedTransactionEvent, DistributedTransactionEvents, + DistributedTransactionType, TransactionFlow, TransactionModelOptions, TransactionOrchestrator, @@ -59,10 +59,15 @@ export class LocalWorkflow { ) } - this.flow = new OrchestratorBuilder(globalWorkflow.flow_) + const workflow = { + ...globalWorkflow, + orchestrator: TransactionOrchestrator.clone(globalWorkflow.orchestrator), + } + + this.flow = new OrchestratorBuilder(workflow.flow_) this.workflowId = workflowId - this.workflow = globalWorkflow - this.handlers = new Map(globalWorkflow.handlers_) + this.workflow = workflow + this.handlers = new Map(workflow.handlers_) this.resolveContainer(modulesLoaded) } @@ -141,11 +146,11 @@ export class LocalWorkflow { this.workflow = { id: this.workflowId, flow_: finalFlow, - orchestrator: new TransactionOrchestrator( - this.workflowId, - finalFlow, - customOptions - ), + orchestrator: new TransactionOrchestrator({ + id: this.workflowId, + definition: finalFlow, + options: customOptions, + }), options: customOptions, handler: WorkflowManager.buildHandlers(this.handlers), handlers_: this.handlers, diff --git a/packages/core/orchestration/src/workflow/workflow-manager.ts b/packages/core/orchestration/src/workflow/workflow-manager.ts index c2f6a40d93..7612cbf5bb 100644 --- a/packages/core/orchestration/src/workflow/workflow-manager.ts +++ b/packages/core/orchestration/src/workflow/workflow-manager.ts @@ -122,11 +122,11 @@ class WorkflowManager { const workflow = { id: workflowId, flow_: finalFlow!, - orchestrator: new TransactionOrchestrator( - workflowId, - finalFlow ?? {}, - options - ), + orchestrator: new TransactionOrchestrator({ + id: workflowId, + definition: finalFlow ?? {}, + options, + }), handler: WorkflowManager.buildHandlers(handlers), handlers_: handlers, options, @@ -167,11 +167,11 @@ class WorkflowManager { WorkflowManager.workflows.set(workflowId, { id: workflowId, flow_: finalFlow, - orchestrator: new TransactionOrchestrator( - workflowId, - finalFlow, - updatedOptions - ), + orchestrator: new TransactionOrchestrator({ + id: workflowId, + definition: finalFlow, + options, + }), handler: WorkflowManager.buildHandlers(workflow.handlers_), handlers_: workflow.handlers_, options: updatedOptions, diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index a9d2e794df..c89e703657 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -9,9 +9,9 @@ import { import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" import { ContainerRegistrationKeys, + isPresent, MedusaContextType, ModuleRegistrationName, - isPresent, } from "@medusajs/utils" import { EOL } from "os" import { ulid } from "ulid" @@ -518,12 +518,12 @@ function attachOnFinishReleaseEvents( const TERMINAL_SIZE = process.stdout?.columns ?? 60 const separator = new Array(TERMINAL_SIZE).join("-") - const worflowName = transaction.getFlow().modelId + const workflowName = transaction.getFlow().modelId const allWorkflowErrors = transaction .getErrors() .map( (err) => - `${worflowName}:${err?.action}:${err?.handlerType} - ${err?.error?.message}${EOL}${err?.error?.stack}` + `${workflowName}:${err?.action}:${err?.handlerType} - ${err?.error?.message}${EOL}${err?.error?.stack}` ) .join(EOL + separator + EOL)