From 43427b8893baedd7abe7b38cf5360aa2d245293d Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Date: Thu, 13 Jul 2023 10:53:55 -0300 Subject: [PATCH] Feat(medusa) - Orchestrator builder (#4472) * chore: Trasanction Orchestrator builder * Feat(medusa): Workflow Manager (#4506) --- .changeset/nice-apples-guess.md | 5 + .../transaction/orchestrator-builder.ts | 435 ++++++++++++++++++ .../transaction/transaction-orchestrator.ts | 18 +- .../__tests__/transaction/workflow-manager.ts | 176 +++++++ .../transaction/distributed-transaction.ts | 18 +- .../medusa/src/utils/transaction/index.ts | 45 +- .../utils/transaction/orchestrator-builder.ts | 422 +++++++++++++++++ .../transaction/transaction-orchestrator.ts | 126 +++-- .../src/utils/transaction/transaction-step.ts | 6 +- .../medusa/src/utils/transaction/types.ts | 43 ++ .../src/utils/transaction/workflow-manager.ts | 226 +++++++++ 11 files changed, 1408 insertions(+), 112 deletions(-) create mode 100644 .changeset/nice-apples-guess.md create mode 100644 packages/medusa/src/utils/__tests__/transaction/orchestrator-builder.ts create mode 100644 packages/medusa/src/utils/__tests__/transaction/workflow-manager.ts create mode 100644 packages/medusa/src/utils/transaction/orchestrator-builder.ts create mode 100644 packages/medusa/src/utils/transaction/types.ts create mode 100644 packages/medusa/src/utils/transaction/workflow-manager.ts diff --git a/.changeset/nice-apples-guess.md b/.changeset/nice-apples-guess.md new file mode 100644 index 0000000000..d523e46101 --- /dev/null +++ b/.changeset/nice-apples-guess.md @@ -0,0 +1,5 @@ +--- +"@medusajs/medusa": patch +--- + +Transaction Orchestrator Flow Builder diff --git a/packages/medusa/src/utils/__tests__/transaction/orchestrator-builder.ts b/packages/medusa/src/utils/__tests__/transaction/orchestrator-builder.ts new file mode 100644 index 0000000000..8659d191c9 --- /dev/null +++ b/packages/medusa/src/utils/__tests__/transaction/orchestrator-builder.ts @@ -0,0 +1,435 @@ +import { OrchestratorBuilder } from "../../transaction/orchestrator-builder" + +describe("OrchestratorBuilder", () => { + let builder: OrchestratorBuilder + + beforeEach(() => { + builder = new OrchestratorBuilder() + }) + + it("should load a TransactionStepsDefinition", () => { + builder.load({ action: "foo" }) + expect(builder.build()).toEqual({ + action: "foo", + }) + }) + + it("should add a new action after the last action set", () => { + builder.addAction("foo") + + expect(builder.build()).toEqual({ + action: "foo", + }) + + builder.addAction("bar") + + expect(builder.build()).toEqual({ + action: "foo", + next: { + action: "bar", + }, + }) + }) + + it("should replace an action by another keeping its next steps", () => { + builder.addAction("foo").addAction("axe").replaceAction("foo", "bar") + expect(builder.build()).toEqual({ + action: "bar", + next: { + action: "axe", + }, + }) + }) + + it("should insert a new action before an existing action", () => { + builder.addAction("foo").addAction("bar").insertActionBefore("bar", "axe") + + expect(builder.build()).toEqual({ + action: "foo", + next: { + action: "axe", + next: { + action: "bar", + }, + }, + }) + }) + + it("should insert a new action after an existing action", () => { + builder.addAction("foo").addAction("axe").insertActionAfter("foo", "bar") + + expect(builder.build()).toEqual({ + action: "foo", + next: { + action: "bar", + next: { + action: "axe", + }, + }, + }) + }) + + it("should move an existing action and its next steps to another place. the destination will become next steps of the final branch", () => { + builder + .addAction("foo") + .addAction("bar") + .addAction("axe") + .addAction("zzz") + .moveAction("axe", "foo") + + expect(builder.build()).toEqual({ + action: "axe", + next: { + action: "zzz", + next: { + action: "foo", + next: { + action: "bar", + }, + }, + }, + }) + }) + + it("should merge two action to run in parallel", () => { + builder + .addAction("foo") + .addAction("bar") + .addAction("axe") + .mergeActions("foo", "axe") + + expect(builder.build()).toEqual({ + next: [ + { + action: "foo", + next: { action: "bar" }, + }, + { action: "axe" }, + ], + }) + }) + + it("should merge multiple actions to run in parallel", () => { + builder + .addAction("foo") + .addAction("bar") + .addAction("axe") + .addAction("step") + .mergeActions("bar", "axe", "step") + + expect(builder.build()).toEqual({ + action: "foo", + next: [ + { + action: "bar", + }, + { + action: "axe", + }, + { + action: "step", + }, + ], + }) + }) + + it("should delete an action", () => { + builder.addAction("foo").deleteAction("foo") + + expect(builder.build()).toEqual({}) + }) + + it("should delete an action and keep all the next steps of that branch", () => { + builder + .addAction("foo") + .addAction("bar") + .addAction("axe") + .deleteAction("bar") + + expect(builder.build()).toEqual({ + action: "foo", + next: { + action: "axe", + }, + }) + }) + + it("should delete an action and remove all the next steps of that branch", () => { + builder + .addAction("foo") + .addAction("bar") + .addAction("axe") + .addAction("step") + .pruneAction("bar") + expect(builder.build()).toEqual({ + action: "foo", + }) + }) + + it("should append a new action to the end of a given action's branch", () => { + builder + .load({ + action: "foo", + next: [ + { + action: "bar", + next: { + action: "zzz", + }, + }, + { + action: "axe", + }, + ], + }) + .appendAction("step", "bar", { saveResponse: true }) + + expect(builder.build()).toEqual({ + action: "foo", + next: [ + { + action: "bar", + next: { + action: "zzz", + next: { + action: "step", + saveResponse: true, + }, + }, + }, + { + action: "axe", + }, + ], + }) + }) + + describe("Composing Complex Transactions", () => { + const loadedFlow = { + next: { + action: "createProduct", + saveResponse: true, + next: { + action: "attachToSalesChannel", + saveResponse: true, + next: { + action: "createPrices", + saveResponse: true, + next: { + action: "createInventoryItems", + saveResponse: true, + next: { + action: "attachInventoryItems", + noCompensation: true, + }, + }, + }, + }, + }, + } + + it("should load a transaction and add two steps", () => { + const builder = new OrchestratorBuilder(loadedFlow) + builder + .addAction("step_1", { saveResponse: true }) + .addAction("step_2", { saveResponse: true }) + + expect(builder.build()).toEqual({ + action: "createProduct", + saveResponse: true, + next: { + action: "attachToSalesChannel", + saveResponse: true, + next: { + action: "createPrices", + saveResponse: true, + next: { + action: "createInventoryItems", + saveResponse: true, + next: { + action: "attachInventoryItems", + noCompensation: true, + next: { + action: "step_1", + saveResponse: true, + next: { + action: "step_2", + saveResponse: true, + }, + }, + }, + }, + }, + }, + }) + }) + + it("should load a transaction, add 2 steps and merge step_1 to run in parallel with createProduct", () => { + const builder = new OrchestratorBuilder(loadedFlow) + builder + .addAction("step_1", { saveResponse: true }) + .addAction("step_2", { saveResponse: true }) + .mergeActions("createProduct", "step_1") + + expect(builder.build()).toEqual({ + next: [ + { + action: "createProduct", + saveResponse: true, + next: { + action: "attachToSalesChannel", + saveResponse: true, + next: { + action: "createPrices", + saveResponse: true, + next: { + action: "createInventoryItems", + saveResponse: true, + next: { + action: "attachInventoryItems", + noCompensation: true, + }, + }, + }, + }, + }, + { + action: "step_1", + saveResponse: true, + next: { + action: "step_2", + saveResponse: true, + }, + }, + ], + }) + }) + + it("should load a transaction, add 2 steps and move 'step_1' and all its next steps to run before 'createPrices'", () => { + const builder = new OrchestratorBuilder(loadedFlow) + builder + .addAction("step_1", { saveResponse: true }) + .addAction("step_2", { saveResponse: true }) + .moveAction("step_1", "createPrices") + + expect(builder.build()).toEqual({ + action: "createProduct", + saveResponse: true, + next: { + action: "attachToSalesChannel", + saveResponse: true, + next: { + action: "step_1", + saveResponse: true, + next: { + action: "step_2", + saveResponse: true, + next: { + action: "createPrices", + saveResponse: true, + next: { + action: "createInventoryItems", + saveResponse: true, + next: { + action: "attachInventoryItems", + noCompensation: true, + }, + }, + }, + }, + }, + }, + }) + }) + + it("should load a transaction, add 2 steps and move 'step_1' to run before 'createPrices' and merge next steps", () => { + const builder = new OrchestratorBuilder(loadedFlow) + builder + .addAction("step_1", { saveResponse: true }) + .addAction("step_2", { saveResponse: true }) + .moveAndMergeNextAction("step_1", "createPrices") + + expect(builder.build()).toEqual({ + action: "createProduct", + saveResponse: true, + next: { + action: "attachToSalesChannel", + saveResponse: true, + next: { + action: "step_1", + saveResponse: true, + next: [ + { + action: "step_2", + saveResponse: true, + }, + { + action: "createPrices", + saveResponse: true, + next: { + action: "createInventoryItems", + saveResponse: true, + next: { + action: "attachInventoryItems", + noCompensation: true, + }, + }, + }, + ], + }, + }, + }) + }) + + it("Fully compose a complex transaction", () => { + const builder = new OrchestratorBuilder() + builder + .addAction("step_1", { saveResponse: true }) + .addAction("step_2", { saveResponse: true }) + .addAction("step_3", { saveResponse: true }) + + builder.insertActionBefore("step_3", "step_2.5", { + saveResponse: false, + noCompensation: true, + }) + + builder.insertActionAfter("step_1", "step_1.1", { saveResponse: true }) + + builder.insertActionAfter("step_3", "step_4", { async: false }) + + builder + .mergeActions("step_2", "step_2.5", "step_3") + .addAction("step_5", { noCompensation: true }) + + builder.deleteAction("step_3") + + expect(builder.build()).toEqual({ + action: "step_1", + saveResponse: true, + next: { + action: "step_1.1", + saveResponse: true, + next: [ + { + action: "step_2", + saveResponse: true, + }, + { + action: "step_2.5", + saveResponse: false, + noCompensation: true, + }, + { + action: "step_4", + async: false, + next: { + action: "step_5", + noCompensation: true, + }, + }, + ], + }, + }) + }) + }) +}) diff --git a/packages/medusa/src/utils/__tests__/transaction/transaction-orchestrator.ts b/packages/medusa/src/utils/__tests__/transaction/transaction-orchestrator.ts index d00d3b198f..f08b18befa 100644 --- a/packages/medusa/src/utils/__tests__/transaction/transaction-orchestrator.ts +++ b/packages/medusa/src/utils/__tests__/transaction/transaction-orchestrator.ts @@ -68,7 +68,7 @@ describe("Transaction Orchestrator", () => { expect(mocks.one).toBeCalledWith( expect.objectContaining({ metadata: { - producer: "transaction-name", + model_id: "transaction-name", reply_to_topic: "trans:transaction-name", idempotency_key: "transaction_id_123:firstMethod:invoke", action: "firstMethod", @@ -83,7 +83,7 @@ describe("Transaction Orchestrator", () => { expect(mocks.two).toBeCalledWith( expect.objectContaining({ metadata: { - producer: "transaction-name", + model_id: "transaction-name", reply_to_topic: "trans:transaction-name", idempotency_key: "transaction_id_123:secondMethod:invoke", action: "secondMethod", @@ -191,7 +191,7 @@ describe("Transaction Orchestrator", () => { expect(actionOrder).toEqual(["one", "two", "three"]) }) - it("Should store invoke's step response if flag 'saveResponse' is set to true", async () => { + it("Should store invoke's step response by default or if flag 'saveResponse' is set to true and ignore it if set to false", async () => { const mocks = { one: jest.fn().mockImplementation((data) => { return { abc: 1234 } @@ -244,15 +244,13 @@ describe("Transaction Orchestrator", () => { const flow: TransactionStepsDefinition = { next: { action: "firstMethod", - saveResponse: true, next: { action: "secondMethod", - saveResponse: true, next: { action: "thirdMethod", - saveResponse: true, next: { action: "fourthMethod", + saveResponse: false, }, }, }, @@ -275,6 +273,9 @@ describe("Transaction Orchestrator", () => { expect(mocks.three).toBeCalledWith( { prop: 123 }, { + payload: { + prop: 123, + }, invoke: { firstMethod: { abc: 1234 }, secondMethod: { def: "567" }, @@ -662,7 +663,10 @@ describe("Transaction Orchestrator", () => { const transaction = await strategy.beginTransaction( "transaction_id_123", - handler + handler, + { + myPayloadProp: "test", + } ) await strategy.resume(transaction) diff --git a/packages/medusa/src/utils/__tests__/transaction/workflow-manager.ts b/packages/medusa/src/utils/__tests__/transaction/workflow-manager.ts new file mode 100644 index 0000000000..e0d809cf7f --- /dev/null +++ b/packages/medusa/src/utils/__tests__/transaction/workflow-manager.ts @@ -0,0 +1,176 @@ +import { WorkflowManager } from "../../transaction/workflow-manager" +import { TransactionState } from "../../transaction/types" + +describe("WorkflowManager", () => { + const container: any = {} + + let handlers + let flow: WorkflowManager + let asyncStepIdempotencyKey: string + + beforeEach(() => { + jest.resetAllMocks() + WorkflowManager.unregisterAll() + + handlers = new Map() + handlers.set("foo", { + invoke: jest.fn().mockResolvedValue({ done: true }), + compensate: jest.fn(() => {}), + }) + + handlers.set("bar", { + invoke: jest.fn().mockResolvedValue({ done: true }), + compensate: jest.fn().mockResolvedValue({}), + }) + + handlers.set("broken", { + invoke: jest.fn(() => { + throw new Error("Step Failed") + }), + compensate: jest.fn().mockResolvedValue({ bar: 123, reverted: true }), + }) + + handlers.set("callExternal", { + invoke: jest.fn((container, payload, invoke, metadata) => { + asyncStepIdempotencyKey = metadata.idempotency_key + }), + }) + + WorkflowManager.register( + "create-product", + { + action: "foo", + next: { + action: "bar", + }, + }, + handlers + ) + + WorkflowManager.register( + "broken-delivery", + { + action: "foo", + next: { + action: "broken", + }, + }, + handlers + ) + + WorkflowManager.register( + "deliver-product", + { + action: "foo", + next: { + action: "callExternal", + async: true, + noCompensation: true, + next: { + action: "bar", + }, + }, + }, + handlers + ) + + flow = new WorkflowManager(container) + }) + + it("should return all registered workflows", () => { + const wf = Object.keys(Object.fromEntries(WorkflowManager.getWorkflows())) + expect(wf).toEqual(["create-product", "broken-delivery", "deliver-product"]) + }) + + it("should begin a transaction and returns its final state", async () => { + const transaction = await flow.begin("create-product", "t-id", { + input: 123, + }) + + expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) + expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(1) + + expect(handlers.get("foo").compensate).toHaveBeenCalledTimes(0) + expect(handlers.get("foo").compensate).toHaveBeenCalledTimes(0) + + expect(transaction.getState()).toBe(TransactionState.DONE) + }) + + it("should begin a transaction and revert it when fail", async () => { + const transaction = await flow.begin("broken-delivery", "t-id") + + expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) + expect(handlers.get("broken").invoke).toHaveBeenCalledTimes(1) + + expect(handlers.get("foo").compensate).toHaveBeenCalledTimes(1) + expect(handlers.get("broken").compensate).toHaveBeenCalledTimes(1) + + expect(transaction.getState()).toBe(TransactionState.REVERTED) + }) + + it("should continue an asyncronous transaction after reporting a successful step", async () => { + const transaction = await flow.begin("deliver-product", "t-id") + + expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) + expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1) + expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0) + + expect(transaction.getState()).toBe(TransactionState.INVOKING) + + const continuation = await flow.registerStepSuccess( + "deliver-product", + asyncStepIdempotencyKey, + { ok: true } + ) + + expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(1) + expect(continuation.getState()).toBe(TransactionState.DONE) + }) + + it("should revert an asyncronous transaction after reporting a failure step", async () => { + const transaction = await flow.begin("deliver-product", "t-id") + + expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) + expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1) + expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0) + + expect(transaction.getState()).toBe(TransactionState.INVOKING) + + const continuation = await flow.registerStepFailure( + "deliver-product", + asyncStepIdempotencyKey, + { ok: true } + ) + + expect(handlers.get("foo").compensate).toHaveBeenCalledTimes(1) + expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0) + expect(handlers.get("bar").compensate).toHaveBeenCalledTimes(0) + + // Failed because the async is flagged as noCompensation + expect(continuation.getState()).toBe(TransactionState.FAILED) + }) + + it("should update an existing flow with a new step and a new handler", async () => { + const definition = + WorkflowManager.getTransactionDefinition("create-product") + + definition.insertActionBefore("bar", "xor", { maxRetries: 3 }) + + const additionalHandlers = new Map() + additionalHandlers.set("xor", { + invoke: jest.fn().mockResolvedValue({ done: true }), + compensate: jest.fn().mockResolvedValue({}), + }) + + WorkflowManager.update("create-product", definition, additionalHandlers) + + const transaction = await flow.begin("create-product", "t-id") + console.log(transaction) + + expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) + expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(1) + expect(additionalHandlers.get("xor").invoke).toHaveBeenCalledTimes(1) + + expect(transaction.getState()).toBe(TransactionState.DONE) + }) +}) diff --git a/packages/medusa/src/utils/transaction/distributed-transaction.ts b/packages/medusa/src/utils/transaction/distributed-transaction.ts index 38c3547805..b4c85c1921 100644 --- a/packages/medusa/src/utils/transaction/distributed-transaction.ts +++ b/packages/medusa/src/utils/transaction/distributed-transaction.ts @@ -1,8 +1,9 @@ -import { TransactionFlow, TransactionHandlerType, TransactionState } from "." +import { TransactionFlow } from "./transaction-orchestrator" +import { TransactionHandlerType, TransactionState } from "./types" /** * @typedef TransactionMetadata - * @property producer - The id of the producer that created the transaction (transactionModelId). + * @property model_id - The id of the model_id that created the transaction (modelId). * @property reply_to_topic - The topic to reply to for the transaction. * @property idempotency_key - The idempotency key of the transaction. * @property action - The action of the transaction. @@ -11,7 +12,7 @@ import { TransactionFlow, TransactionHandlerType, TransactionState } from "." * @property timestamp - The timestamp of the transaction. */ export type TransactionMetadata = { - producer: string + model_id: string reply_to_topic: string idempotency_key: string action: string @@ -22,11 +23,13 @@ export type TransactionMetadata = { /** * @typedef TransactionContext + * @property payload - Object containing the initial payload. * @property invoke - Object containing responses of Invoke handlers on steps flagged with saveResponse. * @property compensate - Object containing responses of Compensate handlers on steps flagged with saveResponse. */ export class TransactionContext { constructor( + public payload: unknown = undefined, public invoke: Record = {}, public compensate: Record = {} ) {} @@ -36,7 +39,7 @@ export class TransactionStepError { constructor( public action: string, public handlerType: TransactionHandlerType, - public error: Error | null + public error: Error | any ) {} } @@ -85,14 +88,15 @@ export class DistributedTransaction { context?: TransactionContext ) { this.transactionId = flow.transactionId - this.modelId = flow.transactionModelId + this.modelId = flow.modelId if (errors) { this.errors = errors } + this.context.payload = payload if (context) { - this.context = context + this.context = { ...context } } } @@ -111,7 +115,7 @@ export class DistributedTransaction { public addError( action: string, handlerType: TransactionHandlerType, - error: Error | null + error: Error | any ) { this.errors.push({ action, diff --git a/packages/medusa/src/utils/transaction/index.ts b/packages/medusa/src/utils/transaction/index.ts index e9fee3f8b7..7c51c151da 100644 --- a/packages/medusa/src/utils/transaction/index.ts +++ b/packages/medusa/src/utils/transaction/index.ts @@ -1,47 +1,4 @@ -export enum TransactionHandlerType { - INVOKE = "invoke", - COMPENSATE = "compensate", -} - -export type TransactionStepsDefinition = { - action?: string - continueOnPermanentFailure?: boolean - noCompensation?: boolean - maxRetries?: number - retryInterval?: number - timeout?: number - async?: boolean - noWait?: boolean - saveResponse?: boolean - next?: TransactionStepsDefinition | TransactionStepsDefinition[] -} - -export enum TransactionStepStatus { - IDLE = "idle", - OK = "ok", - WAITING = "waiting_response", - TEMPORARY_FAILURE = "temp_failure", - PERMANENT_FAILURE = "permanent_failure", -} - -export enum TransactionState { - NOT_STARTED = "not_started", - INVOKING = "invoking", - WAITING_TO_COMPENSATE = "waiting_to_compensate", - COMPENSATING = "compensating", - DONE = "done", - REVERTED = "reverted", - FAILED = "failed", - DORMANT = "dormant", - SKIPPED = "skipped", -} - -export type TransactionModel = { - id: string - flow: TransactionStepsDefinition - hash: string -} - +export * from "./types" export * from "./transaction-orchestrator" export * from "./transaction-step" export * from "./distributed-transaction" diff --git a/packages/medusa/src/utils/transaction/orchestrator-builder.ts b/packages/medusa/src/utils/transaction/orchestrator-builder.ts new file mode 100644 index 0000000000..6e39adcbd9 --- /dev/null +++ b/packages/medusa/src/utils/transaction/orchestrator-builder.ts @@ -0,0 +1,422 @@ +import { TransactionStepsDefinition } from "./types" + +export type ActionHandler = { + [type: string]: (data: any, context: any) => Promise +} + +interface InternalStep extends TransactionStepsDefinition { + next?: InternalStep | InternalStep[] + depth: number + parent?: InternalStep | null +} + +export class OrchestratorBuilder { + private steps: InternalStep + + constructor(steps?: TransactionStepsDefinition) { + this.load(steps) + } + + load(steps?: TransactionStepsDefinition) { + this.steps = { + depth: -1, + parent: null, + next: steps + ? JSON.parse( + JSON.stringify((steps.action ? steps : steps.next) as InternalStep) + ) + : undefined, + } + + this.updateDepths(this.steps, {}, 1, -1) + return this + } + + addAction(action: string, options: Partial = {}) { + const step = this.findLastStep() + const newAction = { + action, + depth: step.depth + 1, + parent: step.action, + ...options, + } as InternalStep + + step.next = newAction + + return this + } + + replaceAction( + existingAction: string, + action: string, + options: Partial = {} + ) { + const step = this.findOrThrowStepByAction(existingAction) + step.action = action + + Object.assign(step, options) + + return this + } + + insertActionBefore( + existingAction: string, + action: string, + options: Partial = {} + ) { + const parentStep = this.findParentStepByAction(existingAction) + if (parentStep) { + const oldNext = parentStep.next! + const newDepth = parentStep.depth + 1 + if (Array.isArray(parentStep.next)) { + const index = parentStep.next.findIndex( + (step) => step.action === existingAction + ) + if (index > -1) { + parentStep.next[index] = { + action, + ...options, + next: oldNext[index], + depth: newDepth, + } as InternalStep + } + } else { + parentStep.next = { + action, + ...options, + next: oldNext, + depth: newDepth, + } as InternalStep + } + + this.updateDepths(oldNext as InternalStep, parentStep) + } + + return this + } + + insertActionAfter( + existingAction: string, + action: string, + options: Partial = {} + ) { + const step = this.findOrThrowStepByAction(existingAction) + const oldNext = step.next + const newDepth = step.depth + 1 + step.next = { + action, + ...options, + next: oldNext, + depth: newDepth, + parent: step.action, + } as InternalStep + + this.updateDepths(oldNext as InternalStep, step.next) + + return this + } + + private appendTo(step: InternalStep | string, newStep: InternalStep) { + if (typeof step === "string") { + step = this.findOrThrowStepByAction(step) + } + + step.next = { + ...newStep, + depth: step.depth + 1, + parent: step.action, + } as InternalStep + + return this + } + + appendAction( + action: string, + to: string, + options: Partial = {} + ) { + const newAction = { + action, + ...options, + } as InternalStep + + const branch = this.findLastStep(this.findStepByAction(to)) + this.appendTo(branch, newAction) + + return this + } + + private move( + actionToMove: string, + targetAction: string, + { + runInParallel, + mergeNext, + }: { + runInParallel?: boolean + mergeNext?: boolean + } = { + runInParallel: false, + mergeNext: false, + } + ): OrchestratorBuilder { + const parentActionToMoveStep = this.findParentStepByAction(actionToMove)! + const parentTargetActionStep = this.findParentStepByAction(targetAction)! + const actionToMoveStep = this.findStepByAction( + actionToMove, + parentTargetActionStep + )! + + if (!actionToMoveStep) { + throw new Error( + `Action "${actionToMove}" could not be found in the following steps of "${targetAction}"` + ) + } + + if (Array.isArray(parentActionToMoveStep.next)) { + const index = parentActionToMoveStep.next.findIndex( + (step) => step.action === actionToMove + ) + if (index > -1) { + parentActionToMoveStep.next.splice(index, 1) + } + } else { + delete parentActionToMoveStep.next + } + + if (runInParallel) { + if (Array.isArray(parentTargetActionStep.next)) { + parentTargetActionStep.next.push(actionToMoveStep) + } else if (parentTargetActionStep.next) { + parentTargetActionStep.next = [ + parentTargetActionStep.next, + actionToMoveStep, + ] + } + } else { + if (actionToMoveStep.next) { + if (mergeNext) { + if (Array.isArray(actionToMoveStep.next)) { + actionToMoveStep.next.push( + parentTargetActionStep.next as InternalStep + ) + } else { + actionToMoveStep.next = [ + actionToMoveStep.next, + parentTargetActionStep.next as InternalStep, + ] + } + } else { + this.appendTo( + this.findLastStep(actionToMoveStep), + parentTargetActionStep.next as InternalStep + ) + } + } else { + actionToMoveStep.next = parentTargetActionStep.next + } + + parentTargetActionStep.next = actionToMoveStep + } + + this.updateDepths( + actionToMoveStep as InternalStep, + parentTargetActionStep, + 1, + parentTargetActionStep.depth + ) + + return this + } + + moveAction(actionToMove: string, targetAction: string): OrchestratorBuilder { + this.move(actionToMove, targetAction) + + return this + } + + moveAndMergeNextAction( + actionToMove: string, + targetAction: string + ): OrchestratorBuilder { + this.move(actionToMove, targetAction, { mergeNext: true }) + + return this + } + + mergeActions(where: string, ...actions: string[]) { + actions.unshift(where) + + if (actions.length < 2) { + throw new Error("Cannot merge less than two actions") + } + + for (const action of actions) { + if (action !== where) { + this.move(action, where, { runInParallel: true }) + } + } + + return this + } + + deleteAction(action: string, steps: InternalStep = this.steps) { + const actionStep = this.findOrThrowStepByAction(action) + const parentStep = this.findParentStepByAction(action, steps)! + + if (Array.isArray(parentStep.next)) { + const index = parentStep.next.findIndex((step) => step.action === action) + if (index > -1 && actionStep.next) { + if (actionStep.next) { + parentStep.next[index] = actionStep.next as InternalStep + } else { + parentStep.next.splice(index, 1) + } + } + } else { + parentStep.next = actionStep.next + } + + this.updateDepths( + actionStep.next as InternalStep, + parentStep, + 1, + parentStep.depth + ) + + return this + } + + pruneAction(action: string) { + const actionStep = this.findOrThrowStepByAction(action) + const parentStep = this.findParentStepByAction(action, this.steps)! + + if (Array.isArray(parentStep.next)) { + const index = parentStep.next.findIndex((step) => step.action === action) + if (index > -1) { + parentStep.next.splice(index, 1) + } + } else { + delete parentStep.next + } + + return this + } + + private findStepByAction( + action: string, + step: InternalStep = this.steps + ): InternalStep | undefined { + if (step.action === action) { + return step + } + + if (Array.isArray(step.next)) { + for (const subStep of step.next) { + const found = this.findStepByAction(action, subStep as InternalStep) + if (found) { + return found + } + } + } else if (step.next && typeof step.next === "object") { + return this.findStepByAction(action, step.next as InternalStep) + } + + return + } + + private findOrThrowStepByAction( + action: string, + steps: InternalStep = this.steps + ): InternalStep { + const step = this.findStepByAction(action, steps) + if (!step) { + throw new Error(`Action "${action}" could not be found`) + } + + return step + } + + private findParentStepByAction( + action: string, + step: InternalStep = this.steps + ): InternalStep | undefined { + if (!step.next) { + return + } + + const nextSteps = Array.isArray(step.next) ? step.next : [step.next] + for (const nextStep of nextSteps) { + if (!nextStep) { + continue + } + if (nextStep.action === action) { + return step + } + const foundStep = this.findParentStepByAction( + action, + nextStep as InternalStep + ) + if (foundStep) { + return foundStep + } + } + + return + } + + private findLastStep(steps: InternalStep = this.steps): InternalStep { + let step = steps as InternalStep + while (step.next) { + step = Array.isArray(step.next) + ? (step.next[step.next.length - 1] as InternalStep) + : (step.next as InternalStep) + } + + return step + } + + private updateDepths( + startingStep: InternalStep, + parent, + incr = 1, + beginFrom?: number + ): void { + if (!startingStep) { + return + } + + const update = (step: InternalStep, parent, beginFrom) => { + step.depth = beginFrom + incr + step.parent = parent.action + if (Array.isArray(step.next)) { + step.next.forEach((nextAction) => update(nextAction, step, step.depth)) + } else if (step.next) { + update(step.next, step, step.depth) + } + } + update(startingStep, parent, beginFrom ?? startingStep.depth) + } + + build(): TransactionStepsDefinition { + if (!this.steps.next) { + return {} + } + + const ignore = ["depth", "parent"] + const result = JSON.parse( + JSON.stringify( + Array.isArray(this.steps.next) ? this.steps : this.steps.next, + null + ), + (key, value) => { + if (ignore.includes(key)) { + return + } + + return value + } + ) + return result + } +} diff --git a/packages/medusa/src/utils/transaction/transaction-orchestrator.ts b/packages/medusa/src/utils/transaction/transaction-orchestrator.ts index cade8aa2f9..63f8b6d35f 100644 --- a/packages/medusa/src/utils/transaction/transaction-orchestrator.ts +++ b/packages/medusa/src/utils/transaction/transaction-orchestrator.ts @@ -1,12 +1,11 @@ import { EventEmitter } from "events" - import { TransactionHandlerType, TransactionStepsDefinition, TransactionStepStatus, TransactionState, TransactionModel, -} from "." +} from "./types" import { DistributedTransaction, TransactionCheckpoint, @@ -15,7 +14,7 @@ import { import { TransactionStep, TransactionStepHandler } from "./transaction-step" export type TransactionFlow = { - transactionModelId: string + modelId: string definition: TransactionStepsDefinition transactionId: string hasFailedSteps: boolean @@ -31,11 +30,11 @@ export type TransactionFlow = { * It is based on a single transaction definition, which is used to execute all the transaction steps */ export class TransactionOrchestrator extends EventEmitter { - private ROOT_STEP = "_root" + private static ROOT_STEP = "_root" private invokeSteps: string[] = [] private compensateSteps: string[] = [] - public DEFAULT_RETRIES = 0 + public static DEFAULT_RETRIES = 0 constructor( public id: string, private definition: TransactionStepsDefinition @@ -117,7 +116,7 @@ export class TransactionOrchestrator extends EventEmitter { return this.canMoveBackward(flow, step) } else { const previous = this.getPreviousStep(flow, step) - if (previous.id === this.ROOT_STEP) { + if (previous.id === TransactionOrchestrator.ROOT_STEP) { return true } @@ -148,7 +147,7 @@ export class TransactionOrchestrator extends EventEmitter { for (const step of allSteps) { if ( - step === this.ROOT_STEP || + step === TransactionOrchestrator.ROOT_STEP || !this.canContinue(flow, flow.steps[step]) ) { continue @@ -227,7 +226,7 @@ export class TransactionOrchestrator extends EventEmitter { private flagStepsToRevert(flow: TransactionFlow): void { for (const step in flow.steps) { - if (step === this.ROOT_STEP) { + if (step === TransactionOrchestrator.ROOT_STEP) { continue } @@ -244,7 +243,7 @@ export class TransactionOrchestrator extends EventEmitter { } } - private async setStepSuccess( + private static async setStepSuccess( transaction: DistributedTransaction, step: TransactionStep, response: unknown @@ -272,11 +271,11 @@ export class TransactionOrchestrator extends EventEmitter { } } - private async setStepFailure( + private static async setStepFailure( transaction: DistributedTransaction, step: TransactionStep, - error: Error | null, - maxRetries: number = this.DEFAULT_RETRIES + error: Error | any, + maxRetries: number = TransactionOrchestrator.DEFAULT_RETRIES ): Promise { step.failures++ @@ -344,10 +343,10 @@ export class TransactionOrchestrator extends EventEmitter { const payload = new TransactionPayload( { - producer: flow.transactionModelId, + model_id: flow.modelId, reply_to_topic: TransactionOrchestrator.getKeyName( "trans", - flow.transactionModelId + flow.modelId ), idempotency_key: TransactionOrchestrator.getKeyName( flow.transactionId, @@ -368,10 +367,14 @@ export class TransactionOrchestrator extends EventEmitter { transaction .handler(step.definition.action + "", type, payload) .then(async (response) => { - await this.setStepSuccess(transaction, step, response) + await TransactionOrchestrator.setStepSuccess( + transaction, + step, + response + ) }) .catch(async (error) => { - await this.setStepFailure( + await TransactionOrchestrator.setStepFailure( transaction, step, error, @@ -381,13 +384,18 @@ export class TransactionOrchestrator extends EventEmitter { ) } else { execution.push( - transaction - .saveCheckpoint() - .then(async () => - transaction - .handler(step.definition.action + "", type, payload) - .catch(() => void 0) - ) + transaction.saveCheckpoint().then(async () => + transaction + .handler(step.definition.action + "", type, payload) + .catch(async (error) => { + await TransactionOrchestrator.setStepFailure( + transaction, + step, + error, + step.definition.maxRetries + ) + }) + ) ) } } @@ -453,17 +461,17 @@ export class TransactionOrchestrator extends EventEmitter { transactionId: string ): Promise { return { - transactionModelId: this.id, + modelId: this.id, transactionId: transactionId, hasFailedSteps: false, hasSkippedSteps: false, state: TransactionState.NOT_STARTED, definition: this.definition, - steps: this.buildSteps(this.definition), + steps: TransactionOrchestrator.buildSteps(this.definition), } } - private async loadTransactionById( + private static async loadTransactionById( transactionId: string ): Promise { const transaction = await DistributedTransaction.loadTransaction( @@ -472,26 +480,31 @@ export class TransactionOrchestrator extends EventEmitter { if (transaction !== null) { const flow = transaction.flow - transaction.flow.steps = this.buildSteps(flow.definition, flow.steps) + transaction.flow.steps = TransactionOrchestrator.buildSteps( + flow.definition, + flow.steps + ) return transaction } return null } - private buildSteps( + private static buildSteps( flow: TransactionStepsDefinition, existingSteps?: { [key: string]: TransactionStep } ): { [key: string]: TransactionStep } { const states: { [key: string]: TransactionStep } = { - [this.ROOT_STEP]: { - id: this.ROOT_STEP, + [TransactionOrchestrator.ROOT_STEP]: { + id: TransactionOrchestrator.ROOT_STEP, next: [] as string[], } as TransactionStep, } const actionNames = new Set() - const queue: any[] = [{ obj: flow, level: [this.ROOT_STEP] }] + const queue: any[] = [ + { obj: flow, level: [TransactionOrchestrator.ROOT_STEP] }, + ] while (queue.length > 0) { const { obj, level } = queue.shift() @@ -525,7 +538,7 @@ export class TransactionOrchestrator extends EventEmitter { id, depth: level.length - 1, definition: definitionCopy, - saveResponse: definitionCopy.saveResponse, + saveResponse: definitionCopy.saveResponse ?? true, invoke: { state: TransactionState.NOT_STARTED, status: TransactionStepStatus.IDLE, @@ -557,7 +570,8 @@ export class TransactionOrchestrator extends EventEmitter { handler: TransactionStepHandler, payload?: unknown ): Promise { - const existingTransaction = await this.loadTransactionById(transactionId) + const existingTransaction = + await TransactionOrchestrator.loadTransactionById(transactionId) let newTransaction = false let modelFlow @@ -582,7 +596,7 @@ export class TransactionOrchestrator extends EventEmitter { return transaction } - private getStepByAction( + private static getStepByAction( flow: TransactionFlow, action: string ): TransactionStep | null { @@ -594,11 +608,10 @@ export class TransactionOrchestrator extends EventEmitter { return null } - private async getTransactionAndStepFromIdempotencyKey( + private static async getTransactionAndStepFromIdempotencyKey( responseIdempotencyKey: string, handler?: TransactionStepHandler, - transaction?: DistributedTransaction, - payload?: unknown + transaction?: DistributedTransaction ): Promise<[DistributedTransaction, TransactionStep]> { const [transactionId, action, actionType] = responseIdempotencyKey.split( TransactionOrchestrator.SEPARATOR @@ -611,7 +624,8 @@ export class TransactionOrchestrator extends EventEmitter { } if (!transaction) { - const existingTransaction = await this.loadTransactionById(transactionId) + const existingTransaction = + await TransactionOrchestrator.loadTransactionById(transactionId) if (existingTransaction === null) { throw new Error(`Transaction ${transactionId} could not be found.`) @@ -620,13 +634,16 @@ export class TransactionOrchestrator extends EventEmitter { transaction = new DistributedTransaction( existingTransaction.flow, handler!, - payload, + undefined, existingTransaction.errors, existingTransaction.context ) } - const step = this.getStepByAction(transaction.getFlow(), action) + const step = TransactionOrchestrator.getStepByAction( + transaction.getFlow(), + action + ) if (step === null) { throw new Error("Action not found.") @@ -653,15 +670,19 @@ export class TransactionOrchestrator extends EventEmitter { response?: unknown ): Promise { const [curTransaction, step] = - await this.getTransactionAndStepFromIdempotencyKey( + await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey( responseIdempotencyKey, handler, - transaction, - response + transaction ) if (step.getStates().status === TransactionStepStatus.WAITING) { - await this.setStepSuccess(curTransaction, step, response) + await TransactionOrchestrator.setStepSuccess( + curTransaction, + step, + response + ) + this.emit("resume", curTransaction) await this.executeNext(curTransaction) } else { @@ -683,21 +704,24 @@ export class TransactionOrchestrator extends EventEmitter { */ public async registerStepFailure( responseIdempotencyKey: string, - error: Error | null, + error?: Error | any, handler?: TransactionStepHandler, - transaction?: DistributedTransaction, - response?: unknown + transaction?: DistributedTransaction ): Promise { const [curTransaction, step] = - await this.getTransactionAndStepFromIdempotencyKey( + await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey( responseIdempotencyKey, handler, - transaction, - response + transaction ) if (step.getStates().status === TransactionStepStatus.WAITING) { - await this.setStepFailure(curTransaction, step, error, 0) + await TransactionOrchestrator.setStepFailure( + curTransaction, + step, + error, + 0 + ) this.emit("resume", curTransaction) await this.executeNext(curTransaction) } else { diff --git a/packages/medusa/src/utils/transaction/transaction-step.ts b/packages/medusa/src/utils/transaction/transaction-step.ts index d6c502c34d..6fe1d77975 100644 --- a/packages/medusa/src/utils/transaction/transaction-step.ts +++ b/packages/medusa/src/utils/transaction/transaction-step.ts @@ -1,10 +1,10 @@ +import { TransactionPayload } from "./distributed-transaction" import { TransactionStepsDefinition, TransactionStepStatus, TransactionState, TransactionHandlerType, - TransactionPayload, -} from "." +} from "./types" export type TransactionStepHandler = ( actionId: string, @@ -27,7 +27,7 @@ export class TransactionStep { * @member failures - The number of failures encountered while executing the step * @member lastAttempt - The timestamp of the last attempt made to execute the step * @member next - The ids of the next steps in the flow - * @member saveResponse - A flag indicating if the response of a step should be shared in the transaction context and available to subsequent steps - default is false + * @member saveResponse - A flag indicating if the response of a step should be shared in the transaction context and available to subsequent steps - default is true */ private stepFailed = false id: string diff --git a/packages/medusa/src/utils/transaction/types.ts b/packages/medusa/src/utils/transaction/types.ts new file mode 100644 index 0000000000..25ab21c78d --- /dev/null +++ b/packages/medusa/src/utils/transaction/types.ts @@ -0,0 +1,43 @@ +export enum TransactionHandlerType { + INVOKE = "invoke", + COMPENSATE = "compensate", +} + +export type TransactionStepsDefinition = { + action?: string + continueOnPermanentFailure?: boolean + noCompensation?: boolean + maxRetries?: number + retryInterval?: number + timeout?: number + async?: boolean + noWait?: boolean + saveResponse?: boolean + next?: TransactionStepsDefinition | TransactionStepsDefinition[] +} + +export enum TransactionStepStatus { + IDLE = "idle", + OK = "ok", + WAITING = "waiting_response", + TEMPORARY_FAILURE = "temp_failure", + PERMANENT_FAILURE = "permanent_failure", +} + +export enum TransactionState { + NOT_STARTED = "not_started", + INVOKING = "invoking", + WAITING_TO_COMPENSATE = "waiting_to_compensate", + COMPENSATING = "compensating", + DONE = "done", + REVERTED = "reverted", + FAILED = "failed", + DORMANT = "dormant", + SKIPPED = "skipped", +} + +export type TransactionModel = { + id: string + flow: TransactionStepsDefinition + hash: string +} diff --git a/packages/medusa/src/utils/transaction/workflow-manager.ts b/packages/medusa/src/utils/transaction/workflow-manager.ts new file mode 100644 index 0000000000..18f1a0dcde --- /dev/null +++ b/packages/medusa/src/utils/transaction/workflow-manager.ts @@ -0,0 +1,226 @@ +import { MedusaContainer } from "@medusajs/types" +import { + DistributedTransaction, + TransactionMetadata, +} from "./distributed-transaction" +import { TransactionOrchestrator } from "./transaction-orchestrator" +import { TransactionStepHandler } from "./transaction-step" +import { TransactionHandlerType, TransactionStepsDefinition } from "./types" +import { OrchestratorBuilder } from "./orchestrator-builder" + +interface Workflow { + id: string + handler: (container: MedusaContainer) => TransactionStepHandler + orchestrator: TransactionOrchestrator + flow_: TransactionStepsDefinition + handlers_: Map< + string, + { invoke: InvokeHandler; compensate?: CompensateHandler } + > + requiredModules?: Set + optionalModules?: Set +} + +type InvokeHandler = ( + container: MedusaContainer, + payload: any, + invoke: { [actions: string]: any }, + metadata: TransactionMetadata +) => Promise + +type CompensateHandler = ( + container: MedusaContainer, + payload: any, + invoke: { [actions: string]: any }, + compensate: { [actions: string]: any }, + metadata: TransactionMetadata +) => Promise + +export class WorkflowManager { + protected static workflows: Map = new Map() + protected container: MedusaContainer + + constructor(container?: MedusaContainer) { + this.container = container as MedusaContainer + } + + static unregister(workflowId: string) { + WorkflowManager.workflows.delete(workflowId) + } + + static unregisterAll() { + WorkflowManager.workflows.clear() + } + + static getWorkflows() { + return WorkflowManager.workflows + } + + static getTransactionDefinition(workflowId): OrchestratorBuilder { + if (!WorkflowManager.workflows.has(workflowId)) { + throw new Error(`Workflow with id "${workflowId}" not found.`) + } + + const workflow = WorkflowManager.workflows.get(workflowId)! + return new OrchestratorBuilder(workflow.flow_) + } + + static register( + workflowId: string, + flow: TransactionStepsDefinition | OrchestratorBuilder, + handlers: Map< + string, + { invoke: InvokeHandler; compensate?: CompensateHandler } + >, + requiredModules?: Set, + optionalModules?: Set + ) { + if (WorkflowManager.workflows.has(workflowId)) { + throw new Error(`Workflow with id "${workflowId}" is already defined.`) + } + + const finalFlow = flow instanceof OrchestratorBuilder ? flow.build() : flow + + WorkflowManager.workflows.set(workflowId, { + id: workflowId, + flow_: finalFlow, + orchestrator: new TransactionOrchestrator(workflowId, finalFlow), + handler: WorkflowManager.buildHandlers(handlers), + handlers_: handlers, + requiredModules, + optionalModules, + }) + } + + static update( + workflowId: string, + flow: TransactionStepsDefinition | OrchestratorBuilder, + handlers: Map< + string, + { invoke: InvokeHandler; compensate?: CompensateHandler } + >, + requiredModules?: Set, + optionalModules?: Set + ) { + if (!WorkflowManager.workflows.has(workflowId)) { + throw new Error(`Workflow with id "${workflowId}" not found.`) + } + + const workflow = WorkflowManager.workflows.get(workflowId)! + + for (const [key, value] of handlers.entries()) { + workflow.handlers_.set(key, value) + } + + const finalFlow = flow instanceof OrchestratorBuilder ? flow.build() : flow + + WorkflowManager.workflows.set(workflowId, { + id: workflowId, + flow_: finalFlow, + orchestrator: new TransactionOrchestrator(workflowId, finalFlow), + handler: WorkflowManager.buildHandlers(workflow.handlers_), + handlers_: workflow.handlers_, + requiredModules, + optionalModules, + }) + } + + private static buildHandlers( + handlers: Map< + string, + { invoke: InvokeHandler; compensate?: CompensateHandler } + > + ): (container: MedusaContainer) => TransactionStepHandler { + return (container: MedusaContainer): TransactionStepHandler => { + return async ( + actionId: string, + handlerType: TransactionHandlerType, + payload?: any + ) => { + const command = handlers.get(actionId) + + if (!command) { + throw new Error(`Handler for action "${actionId}" not found.`) + } else if (!command[handlerType]) { + throw new Error( + `"${handlerType}" handler for action "${actionId}" not found.` + ) + } + + const { invoke, compensate, payload: input } = payload.context + const { metadata } = payload + + if (handlerType === TransactionHandlerType.COMPENSATE) { + return await command[handlerType]!( + container, + input, + invoke, + compensate, + metadata + ) + } + + return await command[handlerType](container, input, invoke, metadata) + } + } + } + + async begin( + workflowId: string, + uniqueTransactionId: string, + input?: unknown + ) { + if (!WorkflowManager.workflows.has(workflowId)) { + throw new Error(`Workflow with id "${workflowId}" not found.`) + } + + const workflow = WorkflowManager.workflows.get(workflowId)! + + const orchestrator = workflow.orchestrator + + const transaction = await orchestrator.beginTransaction( + uniqueTransactionId, + workflow.handler(this.container), + input + ) + + await orchestrator.resume(transaction) + + return transaction + } + + async registerStepSuccess( + workflowId: string, + idempotencyKey: string, + response?: unknown + ): Promise { + if (!WorkflowManager.workflows.has(workflowId)) { + throw new Error(`Workflow with id "${workflowId}" not found.`) + } + + const workflow = WorkflowManager.workflows.get(workflowId)! + return await workflow.orchestrator.registerStepSuccess( + idempotencyKey, + workflow.handler(this.container), + undefined, + response + ) + } + + async registerStepFailure( + workflowId: string, + idempotencyKey: string, + error?: Error | any + ): Promise { + if (!WorkflowManager.workflows.has(workflowId)) { + throw new Error(`Workflow with id "${workflowId}" not found.`) + } + + const workflow = WorkflowManager.workflows.get(workflowId)! + return await workflow.orchestrator.registerStepFailure( + idempotencyKey, + error, + workflow.handler(this.container) + ) + } +}