From 017538883588792e1ff37abcab0fd2872c9af932 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Date: Mon, 16 Jan 2023 20:16:41 -0300 Subject: [PATCH] feat(medusa): Transaction Orchestrator (#2861) * chore: transaction orchestrator --- .changeset/eighty-melons-beg.md | 5 + .../transaction/transaction-orchestrator.ts | 669 +++++++++++++++++ .../transaction/distributed-transaction.ts | 132 ++++ .../medusa/src/utils/transaction/index.ts | 47 ++ .../transaction/transaction-orchestrator.ts | 687 ++++++++++++++++++ .../src/utils/transaction/transaction-step.ts | 159 ++++ 6 files changed, 1699 insertions(+) create mode 100644 .changeset/eighty-melons-beg.md create mode 100644 packages/medusa/src/utils/__tests__/transaction/transaction-orchestrator.ts create mode 100644 packages/medusa/src/utils/transaction/distributed-transaction.ts create mode 100644 packages/medusa/src/utils/transaction/index.ts create mode 100644 packages/medusa/src/utils/transaction/transaction-orchestrator.ts create mode 100644 packages/medusa/src/utils/transaction/transaction-step.ts diff --git a/.changeset/eighty-melons-beg.md b/.changeset/eighty-melons-beg.md new file mode 100644 index 0000000000..09fca326ff --- /dev/null +++ b/.changeset/eighty-melons-beg.md @@ -0,0 +1,5 @@ +--- +"@medusajs/medusa": patch +--- + +feat(medusa): Transaction Orchestrator diff --git a/packages/medusa/src/utils/__tests__/transaction/transaction-orchestrator.ts b/packages/medusa/src/utils/__tests__/transaction/transaction-orchestrator.ts new file mode 100644 index 0000000000..7b1f9046d7 --- /dev/null +++ b/packages/medusa/src/utils/__tests__/transaction/transaction-orchestrator.ts @@ -0,0 +1,669 @@ +import { + TransactionOrchestrator, + TransactionStepsDefinition, + TransactionHandlerType, + TransactionPayload, + TransactionState, +} from "../../transaction" + +describe("Transaction Orchestrator", () => { + it("Should follow the flow by calling steps in order with the correct payload", async () => { + const mocks = { + one: jest.fn().mockImplementation((payload) => { + return payload + }), + two: jest.fn().mockImplementation((payload) => { + return payload + }), + } + + async function handler( + actionId: string, + functionHandlerType: TransactionHandlerType, + payload: TransactionPayload + ) { + const command = { + firstMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.one(payload) + }, + }, + secondMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.two(payload) + }, + }, + } + return command[actionId][functionHandlerType](payload) + } + + const flow: TransactionStepsDefinition = { + next: { + action: "firstMethod", + next: { + action: "secondMethod", + }, + }, + } + + const strategy = new TransactionOrchestrator("transaction-name", flow) + + const transaction = await strategy.beginTransaction( + "transaction_id_123", + handler, + { + prop: 123, + } + ) + + await strategy.resume(transaction) + + expect(transaction.transactionId).toBe("transaction_id_123") + expect(transaction.getState()).toBe(TransactionState.DONE) + + expect(mocks.one).toBeCalledWith( + expect.objectContaining({ + metadata: { + producer: "transaction-name", + reply_to_topic: "trans:transaction-name", + idempotency_key: "transaction_id_123:firstMethod:invoke", + action: "firstMethod", + action_type: "invoke", + attempt: 1, + timestamp: expect.any(Number), + }, + data: { prop: 123 }, + }) + ) + + expect(mocks.two).toBeCalledWith( + expect.objectContaining({ + metadata: { + producer: "transaction-name", + reply_to_topic: "trans:transaction-name", + idempotency_key: "transaction_id_123:secondMethod:invoke", + action: "secondMethod", + action_type: "invoke", + attempt: 1, + timestamp: expect.any(Number), + }, + data: { prop: 123 }, + }) + ) + }) + + it("Should run steps in parallel if 'next' is an array", async () => { + const actionOrder: string[] = [] + async function handler( + actionId: string, + functionHandlerType: TransactionHandlerType, + payload: TransactionPayload + ) { + return actionOrder.push(actionId) + } + + const flow: TransactionStepsDefinition = { + next: [ + { + action: "one", + }, + { + action: "two", + next: { + action: "four", + next: { + action: "six", + }, + }, + }, + { + action: "three", + next: { + action: "five", + }, + }, + ], + } + + const strategy = new TransactionOrchestrator("transaction-name", flow) + + const transaction = await strategy.beginTransaction( + "transaction_id_123", + handler + ) + + await strategy.resume(transaction) + expect(actionOrder).toEqual(["one", "two", "three", "four", "five", "six"]) + }) + + it("Should not execute next steps when a step fails", async () => { + const actionOrder: string[] = [] + async function handler( + actionId: string, + functionHandlerType: TransactionHandlerType, + payload: TransactionPayload + ) { + if (functionHandlerType === TransactionHandlerType.INVOKE) { + actionOrder.push(actionId) + } + + if (TransactionHandlerType.INVOKE && actionId === "three") { + throw new Error() + } + } + + const flow: TransactionStepsDefinition = { + next: [ + { + action: "one", + }, + { + action: "two", + next: { + action: "four", + next: { + action: "six", + }, + }, + }, + { + action: "three", + maxRetries: 0, + next: { + action: "five", + }, + }, + ], + } + + const strategy = new TransactionOrchestrator("transaction-name", flow) + + const transaction = await strategy.beginTransaction( + "transaction_id_123", + handler + ) + + await strategy.resume(transaction) + expect(actionOrder).toEqual(["one", "two", "three"]) + }) + + it("Should forward step response if flag 'forwardResponse' is set to true", async () => { + const mocks = { + one: jest.fn().mockImplementation((data) => { + return { abc: 1234 } + }), + two: jest.fn().mockImplementation((data) => { + return { def: "567" } + }), + three: jest.fn().mockImplementation((data) => { + return { end: true } + }), + } + + async function handler( + actionId: string, + functionHandlerType: TransactionHandlerType, + payload: TransactionPayload + ) { + const command = { + firstMethod: { + [TransactionHandlerType.INVOKE]: (data) => { + return mocks.one(data) + }, + }, + secondMethod: { + [TransactionHandlerType.INVOKE]: (data) => { + return mocks.two(data) + }, + }, + thirdMethod: { + [TransactionHandlerType.INVOKE]: (data) => { + return mocks.three(data) + }, + }, + } + + return command[actionId][functionHandlerType]({ ...payload.data }) + } + + const flow: TransactionStepsDefinition = { + next: { + action: "firstMethod", + forwardResponse: true, + next: { + action: "secondMethod", + forwardResponse: true, + next: { + action: "thirdMethod", + }, + }, + }, + } + + const strategy = new TransactionOrchestrator("transaction-name", flow) + + const transaction = await strategy.beginTransaction( + "transaction_id_123", + handler, + { + prop: 123, + } + ) + + await strategy.resume(transaction) + + expect(mocks.one).toBeCalledWith({ prop: 123 }) + + expect(mocks.two).toBeCalledWith({ prop: 123, _response: { abc: 1234 } }) + + expect(mocks.three).toBeCalledWith({ prop: 123, _response: { def: "567" } }) + }) + + it("Should continue the exection of next steps without waiting for the execution of all its parents when flag 'noWait' is set to true", async () => { + const actionOrder: string[] = [] + async function handler( + actionId: string, + functionHandlerType: TransactionHandlerType, + payload: TransactionPayload + ) { + if (functionHandlerType === TransactionHandlerType.INVOKE) { + actionOrder.push(actionId) + } + + if ( + functionHandlerType === TransactionHandlerType.INVOKE && + actionId === "three" + ) { + throw new Error() + } + } + + const flow: TransactionStepsDefinition = { + next: [ + { + action: "one", + next: { + action: "five", + }, + }, + { + action: "two", + noWait: true, + next: { + action: "four", + }, + }, + { + action: "three", + maxRetries: 0, + }, + ], + } + + const strategy = new TransactionOrchestrator("transaction-name", flow) + + const transaction = await strategy.beginTransaction( + "transaction_id_123", + handler + ) + + strategy.resume(transaction) + + await new Promise((ok) => { + strategy.on("finish", ok) + }) + + expect(actionOrder).toEqual(["one", "two", "three", "four"]) + }) + + it("Should retry steps X times when a step fails and compensate steps afterward", async () => { + const mocks = { + one: jest.fn().mockImplementation((payload) => { + return payload + }), + compensateOne: jest.fn().mockImplementation((payload) => { + return payload + }), + two: jest.fn().mockImplementation((payload) => { + throw new Error() + }), + compensateTwo: jest.fn().mockImplementation((payload) => { + return payload + }), + } + + async function handler( + actionId: string, + functionHandlerType: TransactionHandlerType, + payload: TransactionPayload + ) { + const command = { + firstMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.one(payload) + }, + [TransactionHandlerType.COMPENSATE]: () => { + mocks.compensateOne(payload) + }, + }, + secondMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.two(payload) + }, + [TransactionHandlerType.COMPENSATE]: () => { + mocks.compensateTwo(payload) + }, + }, + } + + return command[actionId][functionHandlerType](payload) + } + + const flow: TransactionStepsDefinition = { + next: { + action: "firstMethod", + next: { + action: "secondMethod", + }, + }, + } + + const strategy = new TransactionOrchestrator("transaction-name", flow) + + const transaction = await strategy.beginTransaction( + "transaction_id_123", + handler + ) + + await strategy.resume(transaction) + + expect(transaction.transactionId).toBe("transaction_id_123") + expect(mocks.one).toBeCalledTimes(1) + expect(mocks.two).toBeCalledTimes(1 + strategy.DEFAULT_RETRIES) + expect(transaction.getState()).toBe(TransactionState.REVERTED) + expect(mocks.compensateOne).toBeCalledTimes(1) + + expect(mocks.two).nthCalledWith( + 1, + expect.objectContaining({ + metadata: expect.objectContaining({ + attempt: 1, + }), + }) + ) + + expect(mocks.two).nthCalledWith( + 4, + expect.objectContaining({ + metadata: expect.objectContaining({ + attempt: 4, + }), + }) + ) + }) + + it("Should fail a transaction if any step fails after retrying X time to compensate it", async () => { + const mocks = { + one: jest.fn().mockImplementation((payload) => { + throw new Error() + }), + } + + async function handler( + actionId: string, + functionHandlerType: TransactionHandlerType, + payload: TransactionPayload + ) { + const command = { + firstMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.one(payload) + }, + }, + } + + return command[actionId][functionHandlerType](payload) + } + + const flow: TransactionStepsDefinition = { + next: { + action: "firstMethod", + }, + } + + const strategy = new TransactionOrchestrator("transaction-name", flow) + + const transaction = await strategy.beginTransaction( + "transaction_id_123", + handler + ) + + await strategy.resume(transaction) + + expect(mocks.one).toBeCalledTimes(1 + strategy.DEFAULT_RETRIES) + expect(transaction.getState()).toBe(TransactionState.FAILED) + }) + + it("Should complete a transaction if a failing step has the flag 'continueOnPermanentFailure' set to true", async () => { + const mocks = { + one: jest.fn().mockImplementation((payload) => { + return + }), + two: jest.fn().mockImplementation((payload) => { + throw new Error() + }), + } + + async function handler( + actionId: string, + functionHandlerType: TransactionHandlerType, + payload: TransactionPayload + ) { + const command = { + firstMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.one(payload) + }, + }, + secondMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.two(payload) + }, + }, + } + + return command[actionId][functionHandlerType](payload) + } + + const flow: TransactionStepsDefinition = { + next: { + action: "firstMethod", + next: { + action: "secondMethod", + maxRetries: 1, + continueOnPermanentFailure: true, + }, + }, + } + + const strategy = new TransactionOrchestrator("transaction-name", flow) + + const transaction = await strategy.beginTransaction( + "transaction_id_123", + handler + ) + + await strategy.resume(transaction) + + expect(transaction.transactionId).toBe("transaction_id_123") + expect(mocks.one).toBeCalledTimes(1) + expect(mocks.two).toBeCalledTimes(2) + expect(transaction.getState()).toBe(TransactionState.DONE) + expect(transaction.isPartiallyCompleted).toBe(true) + }) + + it("Should hold the status INVOKING while the transaction hasn't finished", async () => { + const mocks = { + one: jest.fn().mockImplementation((payload) => { + return + }), + two: jest.fn().mockImplementation((payload) => { + return + }), + } + + async function handler( + actionId: string, + functionHandlerType: TransactionHandlerType, + payload: TransactionPayload + ) { + const command = { + firstMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.one(payload) + }, + }, + secondMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.two(payload) + }, + }, + } + + return command[actionId][functionHandlerType](payload) + } + + const flow: TransactionStepsDefinition = { + next: { + action: "firstMethod", + async: true, + next: { + action: "secondMethod", + }, + }, + } + + const strategy = new TransactionOrchestrator("transaction-name", flow) + + const transaction = await strategy.beginTransaction( + "transaction_id_123", + handler + ) + + await strategy.resume(transaction) + + expect(mocks.one).toBeCalledTimes(1) + expect(mocks.two).toBeCalledTimes(0) + expect(transaction.getState()).toBe(TransactionState.INVOKING) + + const mocktransactionId = TransactionOrchestrator.getKeyName( + transaction.transactionId, + "firstMethod", + TransactionHandlerType.INVOKE + ) + await strategy.registerStepSuccess( + mocktransactionId, + undefined, + transaction + ) + + expect(transaction.getState()).toBe(TransactionState.DONE) + }) + + it("Should hold the status COMPENSATING while the transaction hasn't finished compensating", async () => { + const mocks = { + one: jest.fn().mockImplementation((payload) => { + return + }), + compensateOne: jest.fn().mockImplementation((payload) => { + return + }), + two: jest.fn().mockImplementation((payload) => { + return + }), + } + + async function handler( + actionId: string, + functionHandlerType: TransactionHandlerType, + payload: TransactionPayload + ) { + const command = { + firstMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.one(payload) + }, + [TransactionHandlerType.COMPENSATE]: () => { + mocks.compensateOne(payload) + }, + }, + secondMethod: { + [TransactionHandlerType.INVOKE]: () => { + mocks.two(payload) + }, + }, + } + + return command[actionId][functionHandlerType](payload) + } + + const flow: TransactionStepsDefinition = { + next: { + action: "firstMethod", + async: true, + next: { + action: "secondMethod", + }, + }, + } + + const strategy = new TransactionOrchestrator("transaction-name", flow) + + const transaction = await strategy.beginTransaction( + "transaction_id_123", + handler + ) + + const mocktransactionId = TransactionOrchestrator.getKeyName( + transaction.transactionId, + "firstMethod", + TransactionHandlerType.INVOKE + ) + + const registerBeforeAllowed = await strategy + .registerStepFailure(mocktransactionId, null, handler) + .catch((e) => e.message) + + await strategy.resume(transaction) + + expect(mocks.one).toBeCalledTimes(1) + expect(mocks.compensateOne).toBeCalledTimes(0) + expect(mocks.two).toBeCalledTimes(0) + expect(registerBeforeAllowed).toEqual( + "Cannot set step failure when status is idle" + ) + expect(transaction.getState()).toBe(TransactionState.INVOKING) + + const resumedTransaction = await strategy.registerStepFailure( + mocktransactionId, + null, + handler + ) + + expect(resumedTransaction.getState()).toBe(TransactionState.COMPENSATING) + expect(mocks.compensateOne).toBeCalledTimes(1) + + const mocktransactionIdCompensate = TransactionOrchestrator.getKeyName( + transaction.transactionId, + "firstMethod", + TransactionHandlerType.COMPENSATE + ) + await strategy.registerStepSuccess( + mocktransactionIdCompensate, + undefined, + resumedTransaction + ) + + expect(resumedTransaction.getState()).toBe(TransactionState.REVERTED) + }) +}) diff --git a/packages/medusa/src/utils/transaction/distributed-transaction.ts b/packages/medusa/src/utils/transaction/distributed-transaction.ts new file mode 100644 index 0000000000..5f6b2c7ce6 --- /dev/null +++ b/packages/medusa/src/utils/transaction/distributed-transaction.ts @@ -0,0 +1,132 @@ +import { TransactionFlow, TransactionHandlerType, TransactionState } from "." + +/** + * @typedef {Object} TransactionMetadata + * @property {string} producer - The id of the producer that created the transaction (transactionModelId). + * @property {string} reply_to_topic - The topic to reply to for the transaction. + * @property {string} idempotency_key - The idempotency key of the transaction. + * @property {string} action - The action of the transaction. + * @property {TransactionHandlerType} action_type - The type of the transaction. + * @property {number} attempt - The number of attempts for the transaction. + * @property {number} timestamp - The timestamp of the transaction. + */ +export type TransactionMetadata = { + producer: string + reply_to_topic: string + idempotency_key: string + action: string + action_type: TransactionHandlerType + attempt: number + timestamp: number +} + +export class TransactionPayload { + /** + * @param metadata - The metadata of the transaction. + * @param data - The payload data of the transaction and the response of the previous step if forwardResponse is true. + */ + constructor( + public metadata: TransactionMetadata, + public data: Record & { + _response: Record + } + ) {} +} + +/** + * DistributedTransaction represents a distributed transaction, which is a transaction that is composed of multiple steps that are executed in a specific order. + */ + +export class DistributedTransaction { + public modelId: string + public transactionId: string + public errors: { + action: string + handlerType: TransactionHandlerType + error: Error | null + }[] = [] + + constructor( + private flow: TransactionFlow, + public handler: ( + actionId: string, + handlerType: TransactionHandlerType, + payload: TransactionPayload + ) => Promise, + public payload?: any + ) { + this.transactionId = flow.transactionId + this.modelId = flow.transactionModelId + } + + public getFlow() { + return this.flow + } + + public addError( + action: string, + handlerType: TransactionHandlerType, + error: Error | null + ) { + this.errors.push({ + action, + handlerType, + error, + }) + } + + public hasFinished(): boolean { + return [ + TransactionState.DONE, + TransactionState.REVERTED, + TransactionState.FAILED, + ].includes(this.getState()) + } + + public getState(): TransactionState { + return this.getFlow().state + } + + public get isPartiallyCompleted(): boolean { + return !!this.getFlow().hasFailedSteps || !!this.getFlow().hasSkippedSteps + } + + public canInvoke(): boolean { + return ( + this.getFlow().state === TransactionState.NOT_STARTED || + this.getFlow().state === TransactionState.INVOKING + ) + } + public canRevert(): boolean { + return ( + this.getFlow().state === TransactionState.DONE || + this.getFlow().state === TransactionState.COMPENSATING + ) + } + + public static keyValueStore: any = {} // TODO: Use Key/Value db + private static keyPrefix = "dtrans:" + public async saveCheckpoint(): Promise { + // TODO: Use Key/Value db to save transactions + const key = DistributedTransaction.keyPrefix + this.transactionId + DistributedTransaction.keyValueStore[key] = JSON.stringify(this.getFlow()) + } + + public static async loadTransactionFlow( + transactionId: string + ): Promise { + // TODO: Use Key/Value db to load transactions + const key = DistributedTransaction.keyPrefix + transactionId + if (DistributedTransaction.keyValueStore[key]) { + return JSON.parse(DistributedTransaction.keyValueStore[key]) + } + + return null + } + + public async deleteCheckpoint(): Promise { + // TODO: Delete from Key/Value db + const key = DistributedTransaction.keyPrefix + this.transactionId + delete DistributedTransaction.keyValueStore[key] + } +} diff --git a/packages/medusa/src/utils/transaction/index.ts b/packages/medusa/src/utils/transaction/index.ts new file mode 100644 index 0000000000..5d6d9fb628 --- /dev/null +++ b/packages/medusa/src/utils/transaction/index.ts @@ -0,0 +1,47 @@ +export enum TransactionHandlerType { + INVOKE = "invoke", + COMPENSATE = "compensate", +} + +export type TransactionStepsDefinition = { + action?: string + continueOnPermanentFailure?: boolean + noCompensation?: boolean + maxRetries?: number + retryInterval?: number + timeout?: number + async?: boolean + noWait?: boolean + forwardResponse?: boolean + next?: TransactionStepsDefinition | TransactionStepsDefinition[] +} + +export enum TransactionStepStatus { + IDLE = "idle", + OK = "ok", + WAITING = "waiting_response", + TEMPORARY_FAILURE = "temp_failure", + PERMANENT_FAILURE = "permanent_failure", +} + +export enum TransactionState { + NOT_STARTED = "not_started", + INVOKING = "invoking", + WAITING_TO_COMPENSATE = "waiting_to_compensate", + COMPENSATING = "compensating", + DONE = "done", + REVERTED = "reverted", + FAILED = "failed", + DORMANT = "dormant", + SKIPPED = "skipped", +} + +export type TransactionModel = { + id: string + flow: TransactionStepsDefinition + hash: string +} + +export * from "./transaction-orchestrator" +export * from "./transaction-step" +export * from "./distributed-transaction" diff --git a/packages/medusa/src/utils/transaction/transaction-orchestrator.ts b/packages/medusa/src/utils/transaction/transaction-orchestrator.ts new file mode 100644 index 0000000000..b8387137d4 --- /dev/null +++ b/packages/medusa/src/utils/transaction/transaction-orchestrator.ts @@ -0,0 +1,687 @@ +import { EventEmitter } from "events" + +import { + TransactionHandlerType, + TransactionStepsDefinition, + TransactionStepStatus, + TransactionState, + TransactionModel, +} from "." +import { + DistributedTransaction, + TransactionPayload, +} from "./distributed-transaction" +import { TransactionStep } from "./transaction-step" + +export type TransactionFlow = { + transactionModelId: string + definition: TransactionStepsDefinition + transactionId: string + hasFailedSteps: boolean + hasSkippedSteps: boolean + state: TransactionState + steps: { + [key: string]: TransactionStep + } +} + +export type TransactionStepHandler = ( + actionId: string, + handlerType: TransactionHandlerType, + payload: TransactionPayload +) => Promise + +/** + * @class TransactionOrchestrator is responsible for managing and executing distributed transactions. + * It is based on a single transaction definition, which is used to execute all the transaction steps + */ +export class TransactionOrchestrator extends EventEmitter { + private ROOT_STEP = "_root" + private invokeSteps: string[] = [] + private compensateSteps: string[] = [] + + public DEFAULT_RETRIES = 3 + constructor( + public id: string, + private definition: TransactionStepsDefinition + ) { + super() + } + + private static SEPARATOR = ":" + public static getKeyName(...params: string[]): string { + return params.join(this.SEPARATOR) + } + private getPreviousStep(flow: TransactionFlow, step: TransactionStep) { + const id = step.id.split(".") + id.pop() + const parentId = id.join(".") + return flow.steps[parentId] + } + + private getInvokeSteps(flow: TransactionFlow): string[] { + if (this.invokeSteps.length) { + return this.invokeSteps + } + + const steps = Object.keys(flow.steps) + + steps.sort((a, b) => flow.steps[a].depth - flow.steps[b].depth) + this.invokeSteps = steps + + return steps + } + + private getCompensationSteps(flow: TransactionFlow): string[] { + if (this.compensateSteps.length) { + return this.compensateSteps + } + + const steps = Object.keys(flow.steps) + steps.sort( + (a, b) => (flow.steps[b].depth || 0) - (flow.steps[a].depth || 0) + ) + this.compensateSteps = steps + + return steps + } + + private canMoveForward(flow: TransactionFlow, previousStep: TransactionStep) { + const states = [ + TransactionState.DONE, + TransactionState.FAILED, + TransactionState.SKIPPED, + ] + + const siblings = this.getPreviousStep(flow, previousStep).next.map( + (sib) => flow.steps[sib] + ) + + return ( + !!previousStep.definition.noWait || + siblings.every((sib) => states.includes(sib.invoke.state)) + ) + } + + private canMoveBackward(flow: TransactionFlow, step: TransactionStep) { + const states = [ + TransactionState.DONE, + TransactionState.REVERTED, + TransactionState.FAILED, + TransactionState.DORMANT, + ] + const siblings = step.next.map((sib) => flow.steps[sib]) + return ( + siblings.length === 0 || + siblings.every((sib) => states.includes(sib.compensate.state)) + ) + } + + private canContinue(flow: TransactionFlow, step: TransactionStep): boolean { + if (flow.state == TransactionState.COMPENSATING) { + return this.canMoveBackward(flow, step) + } else { + const previous = this.getPreviousStep(flow, step) + if (previous.id === this.ROOT_STEP) { + return true + } + + return this.canMoveForward(flow, previous) + } + } + + private checkAllSteps(transaction: DistributedTransaction): { + next: TransactionStep[] + total: number + remaining: number + completed: number + } { + let hasSkipped = false + let hasIgnoredFailure = false + let hasFailed = false + let hasWaiting = false + let hasReverted = false + let completedSteps = 0 + + const flow = transaction.getFlow() + + const nextSteps: TransactionStep[] = [] + const allSteps = + flow.state === TransactionState.COMPENSATING + ? this.getCompensationSteps(flow) + : this.getInvokeSteps(flow) + + for (const step of allSteps) { + if ( + step === this.ROOT_STEP || + !this.canContinue(flow, flow.steps[step]) + ) { + continue + } + + const stepDef = flow.steps[step] + const curState = stepDef.getStates() + + if (curState.status === TransactionStepStatus.WAITING) { + hasWaiting = true + if (stepDef.canRetry()) { + nextSteps.push(stepDef) + } + continue + } + + if (stepDef.canInvoke(flow.state) || stepDef.canCompensate(flow.state)) { + nextSteps.push(stepDef) + } else { + completedSteps++ + + if (curState.state === TransactionState.SKIPPED) { + hasSkipped = true + } else if (curState.state === TransactionState.REVERTED) { + hasReverted = true + } else if (curState.state === TransactionState.FAILED) { + if (stepDef.definition.continueOnPermanentFailure) { + hasIgnoredFailure = true + } else { + hasFailed = true + } + } + } + } + + const totalSteps = allSteps.length - 1 + if ( + flow.state === TransactionState.WAITING_TO_COMPENSATE && + nextSteps.length === 0 && + !hasWaiting + ) { + flow.state = TransactionState.COMPENSATING + this.flagStepsToRevert(flow) + + this.emit("compensate", transaction) + + return this.checkAllSteps(transaction) + } else if (completedSteps === totalSteps) { + if (hasSkipped) { + flow.hasSkippedSteps = true + } + if (hasIgnoredFailure) { + flow.hasFailedSteps = true + } + if (hasFailed) { + flow.state = TransactionState.FAILED + } else { + flow.state = hasReverted + ? TransactionState.REVERTED + : TransactionState.DONE + } + + this.emit("finish", transaction) + + void transaction.deleteCheckpoint() + } + + return { + next: nextSteps, + total: totalSteps, + remaining: totalSteps - completedSteps, + completed: completedSteps, + } + } + + private flagStepsToRevert(flow: TransactionFlow): void { + for (const step in flow.steps) { + if (step === this.ROOT_STEP) { + continue + } + + const stepDef = flow.steps[step] + const curState = stepDef.getStates() + if ( + (curState.state === TransactionState.DONE || + curState.status === TransactionStepStatus.PERMANENT_FAILURE) && + !stepDef.definition.noCompensation + ) { + stepDef.beginCompensation() + stepDef.changeState(TransactionState.NOT_STARTED) + } + } + } + + private async setStepSuccess( + transaction: DistributedTransaction, + step: TransactionStep, + response: unknown + ): Promise { + if (step.forwardResponse) { + step.saveResponse(response) + } + + step.changeStatus(TransactionStepStatus.OK) + + if (step.isCompensating()) { + step.changeState(TransactionState.REVERTED) + } else { + step.changeState(TransactionState.DONE) + } + + if (step.definition.async) { + await transaction.saveCheckpoint() + } + } + + private async setStepFailure( + transaction: DistributedTransaction, + step: TransactionStep, + error: Error | null, + maxRetries: number = this.DEFAULT_RETRIES + ): Promise { + step.failures++ + + step.changeStatus(TransactionStepStatus.TEMPORARY_FAILURE) + + if (step.failures > maxRetries) { + step.changeState(TransactionState.FAILED) + step.changeStatus(TransactionStepStatus.PERMANENT_FAILURE) + + transaction.addError( + step.definition.action!, + step.isCompensating() + ? TransactionHandlerType.COMPENSATE + : TransactionHandlerType.INVOKE, + error + ) + + if (!step.isCompensating()) { + const flow = transaction.getFlow() + if (step.definition.continueOnPermanentFailure) { + for (const childStep of step.next) { + const child = flow.steps[childStep] + child.changeState(TransactionState.SKIPPED) + } + } else { + flow.state = TransactionState.WAITING_TO_COMPENSATE + } + } + } + + if (step.definition.async) { + await transaction.saveCheckpoint() + } + } + + private async executeNext( + transaction: DistributedTransaction + ): Promise { + if (transaction.hasFinished()) { + return + } + + const flow = transaction.getFlow() + const nextSteps = this.checkAllSteps(transaction) + const execution: Promise[] = [] + + for (const step of nextSteps.next) { + const curState = step.getStates() + const type = step.isCompensating() + ? TransactionHandlerType.COMPENSATE + : TransactionHandlerType.INVOKE + + step.lastAttempt = Date.now() + step.attempts++ + + if (curState.state === TransactionState.NOT_STARTED) { + if (step.isCompensating()) { + step.changeState(TransactionState.COMPENSATING) + } else if (flow.state === TransactionState.INVOKING) { + step.changeState(TransactionState.INVOKING) + } + } + + step.changeStatus(TransactionStepStatus.WAITING) + + const parent = this.getPreviousStep(flow, step) + let payloadData = transaction.payload + + if (parent.forwardResponse) { + if (!payloadData) { + payloadData = {} + } + + payloadData._response = parent.getResponse() + } + + const payload = new TransactionPayload( + { + producer: flow.transactionModelId, + reply_to_topic: TransactionOrchestrator.getKeyName( + "trans", + flow.transactionModelId + ), + idempotency_key: TransactionOrchestrator.getKeyName( + flow.transactionId, + step.definition.action!, + type + ), + action: step.definition.action + "", + action_type: type, + attempt: step.attempts, + timestamp: Date.now(), + }, + payloadData + ) + + if (!step.definition.async) { + execution.push( + transaction + .handler(step.definition.action + "", type, payload) + .then(async (response) => { + await this.setStepSuccess(transaction, step, response) + }) + .catch(async (error) => { + await this.setStepFailure( + transaction, + step, + error, + step.definition.maxRetries + ) + }) + ) + } else { + execution.push( + transaction + .saveCheckpoint() + .then(async () => + transaction + .handler(step.definition.action + "", type, payload) + .catch(() => void 0) + ) + ) + } + } + + await Promise.all(execution) + + if (nextSteps.next.length > 0) { + await this.executeNext(transaction) + } + } + + /** + * Start a new transaction or resume a transaction that has been previously started + * @param transaction - The transaction to resume + */ + public async resume(transaction: DistributedTransaction): Promise { + if (transaction.modelId !== this.id) { + throw new Error( + `TransactionModel "${transaction.modelId}" cannot be orchestrated by "${this.id}" model.` + ) + } + + if (transaction.hasFinished()) { + return + } + + const flow = transaction.getFlow() + + if (flow.state === TransactionState.NOT_STARTED) { + flow.state = TransactionState.INVOKING + this.emit("begin", transaction) + } else { + this.emit("resume", transaction) + } + + await this.executeNext(transaction) + } + + private async createTransactionFlow( + transactionId: string + ): Promise { + const model: TransactionFlow = { + transactionModelId: this.id, + transactionId: transactionId, + hasFailedSteps: false, + hasSkippedSteps: false, + state: TransactionState.NOT_STARTED, + definition: this.definition, + steps: this.buildSteps(this.definition), + } + return model + } + + private async getTransactionFlowById( + transactionId: string + ): Promise { + const flow = await DistributedTransaction.loadTransactionFlow(transactionId) + if (flow !== null) { + flow.steps = this.buildSteps(flow.definition, flow.steps) + return flow + } + + return null + } + + private buildSteps( + flow: TransactionStepsDefinition, + existingSteps?: { [key: string]: TransactionStep } + ): { [key: string]: TransactionStep } { + const states: { [key: string]: TransactionStep } = { + [this.ROOT_STEP]: { + id: this.ROOT_STEP, + next: [] as string[], + } as TransactionStep, + } + + const actionNames = new Set() + const queue: any[] = [{ obj: flow, level: [this.ROOT_STEP] }] + + while (queue.length > 0) { + const { obj, level } = queue.shift() + + for (const key in obj) { + // eslint-disable-next-line no-prototype-builtins + if (!obj.hasOwnProperty(key)) { + continue + } + + if (typeof obj[key] === "object" && obj[key] !== null) { + queue.push({ obj: obj[key], level: [...level] }) + } else if (key === "action") { + if (actionNames.has(obj.action)) { + throw new Error(`Action "${obj.action}" is already defined.`) + } + + actionNames.add(obj.action) + level.push(obj.action) + const id = level.join(".") + const parent = level.slice(0, level.length - 1).join(".") + states[parent].next?.push(id) + + const definitionCopy = { ...obj } + delete definitionCopy.next + + states[id] = Object.assign( + new TransactionStep(), + existingSteps?.[id] || { + id, + depth: level.length - 1, + definition: definitionCopy, + forwardResponse: definitionCopy.forwardResponse, + invoke: { + state: TransactionState.NOT_STARTED, + status: TransactionStepStatus.IDLE, + }, + compensate: { + state: TransactionState.DORMANT, + status: TransactionStepStatus.IDLE, + }, + attempts: 0, + failures: 0, + lastAttempt: null, + next: [], + } + ) + } + } + } + + return states + } + + /** Create a new transaction + * @param transactionId - unique identifier of the transaction + * @param handler - function to handle action of the transaction + * @param payload - payload to be passed to all the transaction steps + */ + public async beginTransaction( + transactionId: string, + handler: TransactionStepHandler, + payload?: unknown + ): Promise { + let modelFlow = await this.getTransactionFlowById(transactionId) + + let newTransaction = false + if (!modelFlow) { + modelFlow = await this.createTransactionFlow(transactionId) + newTransaction = true + } + + const transaction = new DistributedTransaction(modelFlow, handler, payload) + if (newTransaction) { + await transaction.saveCheckpoint() + } + + return transaction + } + + private getStepByAction( + flow: TransactionFlow, + action: string + ): TransactionStep | null { + for (const key in flow.steps) { + if (action === flow.steps[key]?.definition?.action) { + return flow.steps[key] + } + } + return null + } + + private async getTransactionAndStepFromIdempotencyKey( + responseIdempotencyKey: string, + handler?: TransactionStepHandler, + transaction?: DistributedTransaction, + payload?: unknown + ): Promise<[DistributedTransaction, TransactionStep]> { + const [transactionId, action, actionType] = responseIdempotencyKey.split( + TransactionOrchestrator.SEPARATOR + ) + + if (!transaction && !handler) { + throw new Error( + "If a transaction is not provided, the handler is required" + ) + } + + if (!transaction) { + const existingTransaction = await this.getTransactionFlowById( + transactionId + ) + + if (existingTransaction === null) { + throw new Error("Transaction could not be found.") + } + + transaction = new DistributedTransaction( + existingTransaction, + handler!, + payload + ) + } + + const step = this.getStepByAction(transaction.getFlow(), action) + + if (step === null) { + throw new Error("Action not found.") + } else if ( + step.isCompensating() + ? actionType !== TransactionHandlerType.COMPENSATE + : actionType !== TransactionHandlerType.INVOKE + ) { + throw new Error("Incorrect action type.") + } + return [transaction, step] + } + + /** Register a step success for a specific transaction and step + * @param responseIdempotencyKey - The idempotency key for the step + * @param handler - The handler function to execute the step + * @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey + * @param payload - The payload of the step + */ + public async registerStepSuccess( + responseIdempotencyKey: string, + handler?: TransactionStepHandler, + transaction?: DistributedTransaction, + payload?: unknown + ): Promise { + const [curTransaction, step] = + await this.getTransactionAndStepFromIdempotencyKey( + responseIdempotencyKey, + handler, + transaction, + payload + ) + + if (step.getStates().status === TransactionStepStatus.WAITING) { + await this.setStepSuccess(curTransaction, step, payload) + this.emit("resume", curTransaction) + await this.executeNext(curTransaction) + } else { + throw new Error( + `Cannot set step success when status is ${step.getStates().status}` + ) + } + + return curTransaction + } + + /** + * Register a step failure for a specific transaction and step + * @param responseIdempotencyKey - The idempotency key for the step + * @param error - The error that caused the failure + * @param handler - The handler function to execute the step + * @param transaction - The current transaction + * @param payload - The payload of the step + */ + public async registerStepFailure( + responseIdempotencyKey: string, + error: Error | null, + handler?: TransactionStepHandler, + transaction?: DistributedTransaction, + payload?: unknown + ): Promise { + const [curTransaction, step] = + await this.getTransactionAndStepFromIdempotencyKey( + responseIdempotencyKey, + handler, + transaction, + payload + ) + + if (step.getStates().status === TransactionStepStatus.WAITING) { + await this.setStepFailure(curTransaction, step, error, 0) + this.emit("resume", curTransaction) + await this.executeNext(curTransaction) + } else { + throw new Error( + `Cannot set step failure when status is ${step.getStates().status}` + ) + } + + return curTransaction + } + + public cancelTransaction(transactionId: string) { + // TODO: stop a transaction while in progress and compensate all executed steps + } +} diff --git a/packages/medusa/src/utils/transaction/transaction-step.ts b/packages/medusa/src/utils/transaction/transaction-step.ts new file mode 100644 index 0000000000..3dcb579359 --- /dev/null +++ b/packages/medusa/src/utils/transaction/transaction-step.ts @@ -0,0 +1,159 @@ +import { + TransactionStepsDefinition, + TransactionStepStatus, + TransactionState, +} from "." + +/** + * @class TransactionStep + * @classdesc A class representing a single step in a transaction flow + */ +export class TransactionStep { + /** + * @member id - The id of the step + * @member depth - The depth of the step in the flow + * @member definition - The definition of the step + * @member invoke - The current state and status of the invoke action of the step + * @member compensate - The current state and status of the compensate action of the step + * @member attempts - The number of attempts made to execute the step + * @member failures - The number of failures encountered while executing the step + * @member lastAttempt - The timestamp of the last attempt made to execute the step + * @member next - The ids of the next steps in the flow + * @member response - The response from the last successful execution of the step + * @member forwardResponse - A flag indicating if the response from the previous step should be passed to this step as payload + */ + private stepFailed = false + id: string + depth: number + definition: TransactionStepsDefinition + invoke: { + state: TransactionState + status: TransactionStepStatus + } + compensate: { + state: TransactionState + status: TransactionStepStatus + } + attempts: number + failures: number + lastAttempt: number | null + next: string[] + response: unknown + forwardResponse: boolean + + public getStates() { + return this.isCompensating() ? this.compensate : this.invoke + } + + public beginCompensation() { + if (this.isCompensating()) { + return + } + + this.stepFailed = true + this.attempts = 0 + this.failures = 0 + this.lastAttempt = null + } + + public isCompensating() { + return this.stepFailed + } + + public changeState(toState: TransactionState) { + const allowed = { + [TransactionState.DORMANT]: [TransactionState.NOT_STARTED], + [TransactionState.NOT_STARTED]: [ + TransactionState.INVOKING, + TransactionState.COMPENSATING, + TransactionState.FAILED, + TransactionState.SKIPPED, + ], + [TransactionState.INVOKING]: [ + TransactionState.FAILED, + TransactionState.DONE, + ], + [TransactionState.COMPENSATING]: [ + TransactionState.REVERTED, + TransactionState.FAILED, + ], + [TransactionState.DONE]: [TransactionState.COMPENSATING], + } + + const curState = this.getStates() + if ( + curState.state === toState || + allowed?.[curState.state]?.includes(toState) + ) { + curState.state = toState + return + } + + throw new Error( + `Updating State from "${curState.state}" to "${toState}" is not allowed.` + ) + } + + public changeStatus(toStatus: TransactionStepStatus) { + const allowed = { + [TransactionStepStatus.WAITING]: [ + TransactionStepStatus.OK, + TransactionStepStatus.TEMPORARY_FAILURE, + TransactionStepStatus.PERMANENT_FAILURE, + ], + [TransactionStepStatus.TEMPORARY_FAILURE]: [ + TransactionStepStatus.IDLE, + TransactionStepStatus.PERMANENT_FAILURE, + ], + [TransactionStepStatus.PERMANENT_FAILURE]: [TransactionStepStatus.IDLE], + } + + const curState = this.getStates() + if ( + curState.status === toStatus || + toStatus === TransactionStepStatus.WAITING || + allowed?.[curState.status]?.includes(toStatus) + ) { + curState.status = toStatus + return + } + + throw new Error( + `Updating Status from "${curState.status}" to "${toStatus}" is not allowed.` + ) + } + + public saveResponse(response) { + this.response = response + } + + public getResponse(): unknown { + return this.response + } + + canRetry(): boolean { + return !!( + this.lastAttempt && + this.definition.retryInterval && + Date.now() - this.lastAttempt > this.definition.retryInterval * 1e3 + ) + } + + canInvoke(flowState: TransactionState): boolean { + const { status, state } = this.getStates() + return ( + (!this.isCompensating() && + state === TransactionState.NOT_STARTED && + flowState === TransactionState.INVOKING) || + status === TransactionStepStatus.TEMPORARY_FAILURE + ) + } + + canCompensate(flowState: TransactionState): boolean { + return ( + this.isCompensating() && + this.getStates().state === TransactionState.NOT_STARTED && + flowState === TransactionState.COMPENSATING + ) + } +}