fix(orchestration, workflow-sdk): Local workflow separated orchestrator (#8765)

FIXES TRI-174

**What**
Currently, every time a Local workflow is being instantiated, it will grab the global workflow definition including the orchestrator instance. This leads to issues when we have concurrent running workflows which all register their event listeners to this single orchestrator instance which can lead to exhausting the listerners. 
With this fix, every local workflow will have a copy of the global workflow definition plus a new instance (cloned) of the orchestrator meaning that from now on, every local workflow will have its own orchestrator.
This commit is contained in:
Adrien de Peretti
2024-08-27 10:59:56 +02:00
committed by GitHub
parent 59217a9796
commit b09c19912b
5 changed files with 141 additions and 54 deletions

View File

@@ -6,8 +6,8 @@ import {
TransactionOrchestrator,
TransactionPayload,
TransactionState,
TransactionStepTimeoutError,
TransactionStepsDefinition,
TransactionStepTimeoutError,
TransactionTimeoutError,
} from "../../transaction"
@@ -55,7 +55,10 @@ describe("Transaction Orchestrator", () => {
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
})
const transaction = await strategy.beginTransaction(
"transaction_id_123",
@@ -134,7 +137,10 @@ describe("Transaction Orchestrator", () => {
],
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
})
const transaction = await strategy.beginTransaction(
"transaction_id_123",
@@ -185,7 +191,10 @@ describe("Transaction Orchestrator", () => {
],
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
})
const transaction = await strategy.beginTransaction(
"transaction_id_123",
@@ -262,7 +271,10 @@ describe("Transaction Orchestrator", () => {
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
})
const transaction = await strategy.beginTransaction(
"transaction_id_123",
@@ -354,7 +366,10 @@ describe("Transaction Orchestrator", () => {
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
})
const transaction = await strategy.beginTransaction(
"transaction_id_123",
@@ -418,7 +433,10 @@ describe("Transaction Orchestrator", () => {
],
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
})
const transaction = await strategy.beginTransaction(
"transaction_id_123",
@@ -488,7 +506,10 @@ describe("Transaction Orchestrator", () => {
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
})
const transaction = await strategy.beginTransaction(
"transaction_id_123",
@@ -552,7 +573,10 @@ describe("Transaction Orchestrator", () => {
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
})
const transaction = await strategy.beginTransaction(
"transaction_id_123",
@@ -607,7 +631,10 @@ describe("Transaction Orchestrator", () => {
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
})
const transaction = await strategy.beginTransaction(
"transaction_id_123",
@@ -665,7 +692,10 @@ describe("Transaction Orchestrator", () => {
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
})
const transaction = await strategy.beginTransaction(
"transaction_id_123",
@@ -748,7 +778,10 @@ describe("Transaction Orchestrator", () => {
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
})
const transaction = await strategy.beginTransaction(
"transaction_id_123",
@@ -872,7 +905,10 @@ describe("Transaction Orchestrator", () => {
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
})
const transaction = await strategy.beginTransaction(
"transaction_id_123",
@@ -941,7 +977,10 @@ describe("Transaction Orchestrator", () => {
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
})
const transaction = await strategy.beginTransaction(
"transaction_id_123",
@@ -975,9 +1014,12 @@ describe("Transaction Orchestrator", () => {
transactionInHandler = transaction
}
const strategy = new TransactionOrchestrator("transaction-name", {
next: {
action: "firstMethod",
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: {
next: {
action: "firstMethod",
},
},
})
@@ -1069,8 +1111,12 @@ describe("Transaction Orchestrator", () => {
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow, {
timeout: 0.1, // 100ms
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
options: {
timeout: 0.1, // 100ms
},
})
const transaction = await strategy.beginTransaction(
@@ -1177,7 +1223,10 @@ describe("Transaction Orchestrator", () => {
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
})
const transaction = await strategy.beginTransaction(
"transaction_id_123",
@@ -1294,7 +1343,10 @@ describe("Transaction Orchestrator", () => {
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
})
const transaction = await strategy.beginTransaction(
"transaction_id_123",
@@ -1401,8 +1453,12 @@ describe("Transaction Orchestrator", () => {
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow, {
timeout: 0.1, // 100ms
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
options: {
timeout: 0.1, // 100ms
},
})
const transaction = await strategy.beginTransaction(

View File

@@ -33,10 +33,14 @@ import {
* It is based on a single transaction definition, which is used to execute all the transaction steps
*/
export class TransactionOrchestrator extends EventEmitter {
id: string
private static ROOT_STEP = "_root"
public static DEFAULT_TTL = 30
private invokeSteps: string[] = []
private compensateSteps: string[] = []
private definition: TransactionStepsDefinition
private options?: TransactionModelOptions
public static DEFAULT_RETRIES = 0
@@ -48,13 +52,35 @@ export class TransactionOrchestrator extends EventEmitter {
return this.workflowOptions[modelId]
}
constructor(
public id: string,
private definition: TransactionStepsDefinition,
private options?: TransactionModelOptions
) {
constructor({
id,
definition,
options,
isClone,
}: {
id: string
definition: TransactionStepsDefinition
options?: TransactionModelOptions
isClone?: boolean
}) {
super()
this.parseFlowOptions()
this.id = id
this.definition = definition
this.options = options
if (!isClone) {
this.parseFlowOptions()
}
}
static clone(orchestrator: TransactionOrchestrator): TransactionOrchestrator {
return new TransactionOrchestrator({
id: orchestrator.id,
definition: orchestrator.definition,
options: orchestrator.options,
isClone: true,
})
}
private static SEPARATOR = ":"

View File

@@ -1,18 +1,18 @@
import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
import {
createMedusaContainer,
isDefined,
isString,
MedusaContext,
MedusaContextType,
MedusaError,
MedusaModuleType,
createMedusaContainer,
isDefined,
isString,
} from "@medusajs/utils"
import { asValue } from "awilix"
import {
DistributedTransactionType,
DistributedTransactionEvent,
DistributedTransactionEvents,
DistributedTransactionType,
TransactionFlow,
TransactionModelOptions,
TransactionOrchestrator,
@@ -59,10 +59,15 @@ export class LocalWorkflow {
)
}
this.flow = new OrchestratorBuilder(globalWorkflow.flow_)
const workflow = {
...globalWorkflow,
orchestrator: TransactionOrchestrator.clone(globalWorkflow.orchestrator),
}
this.flow = new OrchestratorBuilder(workflow.flow_)
this.workflowId = workflowId
this.workflow = globalWorkflow
this.handlers = new Map(globalWorkflow.handlers_)
this.workflow = workflow
this.handlers = new Map(workflow.handlers_)
this.resolveContainer(modulesLoaded)
}
@@ -141,11 +146,11 @@ export class LocalWorkflow {
this.workflow = {
id: this.workflowId,
flow_: finalFlow,
orchestrator: new TransactionOrchestrator(
this.workflowId,
finalFlow,
customOptions
),
orchestrator: new TransactionOrchestrator({
id: this.workflowId,
definition: finalFlow,
options: customOptions,
}),
options: customOptions,
handler: WorkflowManager.buildHandlers(this.handlers),
handlers_: this.handlers,

View File

@@ -122,11 +122,11 @@ class WorkflowManager {
const workflow = {
id: workflowId,
flow_: finalFlow!,
orchestrator: new TransactionOrchestrator(
workflowId,
finalFlow ?? {},
options
),
orchestrator: new TransactionOrchestrator({
id: workflowId,
definition: finalFlow ?? {},
options,
}),
handler: WorkflowManager.buildHandlers(handlers),
handlers_: handlers,
options,
@@ -167,11 +167,11 @@ class WorkflowManager {
WorkflowManager.workflows.set(workflowId, {
id: workflowId,
flow_: finalFlow,
orchestrator: new TransactionOrchestrator(
workflowId,
finalFlow,
updatedOptions
),
orchestrator: new TransactionOrchestrator({
id: workflowId,
definition: finalFlow,
options,
}),
handler: WorkflowManager.buildHandlers(workflow.handlers_),
handlers_: workflow.handlers_,
options: updatedOptions,

View File

@@ -9,9 +9,9 @@ import {
import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
import {
ContainerRegistrationKeys,
isPresent,
MedusaContextType,
ModuleRegistrationName,
isPresent,
} from "@medusajs/utils"
import { EOL } from "os"
import { ulid } from "ulid"
@@ -518,12 +518,12 @@ function attachOnFinishReleaseEvents(
const TERMINAL_SIZE = process.stdout?.columns ?? 60
const separator = new Array(TERMINAL_SIZE).join("-")
const worflowName = transaction.getFlow().modelId
const workflowName = transaction.getFlow().modelId
const allWorkflowErrors = transaction
.getErrors()
.map(
(err) =>
`${worflowName}:${err?.action}:${err?.handlerType} - ${err?.error?.message}${EOL}${err?.error?.stack}`
`${workflowName}:${err?.action}:${err?.handlerType} - ${err?.error?.message}${EOL}${err?.error?.stack}`
)
.join(EOL + separator + EOL)