diff --git a/.changeset/fresh-boxes-scream.md b/.changeset/fresh-boxes-scream.md new file mode 100644 index 0000000000..95cd2fb067 --- /dev/null +++ b/.changeset/fresh-boxes-scream.md @@ -0,0 +1,6 @@ +--- +"@medusajs/orchestration": patch +"@medusajs/workflows-sdk": patch +--- + +feat(orchestration, workflows-sdk): Add events management and implementation to manage async workflows diff --git a/packages/orchestration/src/__tests__/transaction/transaction-orchestrator.ts b/packages/orchestration/src/__tests__/transaction/transaction-orchestrator.ts index 557291f6d3..19dcbdcfbb 100644 --- a/packages/orchestration/src/__tests__/transaction/transaction-orchestrator.ts +++ b/packages/orchestration/src/__tests__/transaction/transaction-orchestrator.ts @@ -70,8 +70,8 @@ describe("Transaction Orchestrator", () => { expect.objectContaining({ metadata: { model_id: "transaction-name", - reply_to_topic: "trans:transaction-name", - idempotency_key: "transaction_id_123:firstMethod:invoke", + idempotency_key: + "transaction-name:transaction_id_123:firstMethod:invoke", action: "firstMethod", action_type: "invoke", attempt: 1, @@ -85,8 +85,8 @@ describe("Transaction Orchestrator", () => { expect.objectContaining({ metadata: { model_id: "transaction-name", - reply_to_topic: "trans:transaction-name", - idempotency_key: "transaction_id_123:secondMethod:invoke", + idempotency_key: + "transaction-name:transaction_id_123:secondMethod:invoke", action: "secondMethod", action_type: "invoke", attempt: 1, @@ -654,6 +654,7 @@ describe("Transaction Orchestrator", () => { next: { action: "firstMethod", async: true, + compensateAsync: true, next: { action: "secondMethod", }, @@ -675,8 +676,10 @@ describe("Transaction Orchestrator", () => { expect(mocks.one).toBeCalledTimes(1) expect(mocks.two).toBeCalledTimes(0) expect(transaction.getState()).toBe(TransactionState.INVOKING) + expect(transaction.getFlow().hasWaitingSteps).toBe(true) const mocktransactionId = TransactionOrchestrator.getKeyName( + "transaction-name", transaction.transactionId, "firstMethod", TransactionHandlerType.INVOKE @@ -688,6 +691,7 @@ describe("Transaction Orchestrator", () => { ) expect(transaction.getState()).toBe(TransactionState.DONE) + expect(transaction.getFlow().hasWaitingSteps).toBe(false) }) it("Should hold the status COMPENSATING while the transaction hasn't finished compensating", async () => { @@ -731,8 +735,11 @@ describe("Transaction Orchestrator", () => { next: { action: "firstMethod", async: true, + compensateAsync: true, next: { action: "secondMethod", + async: true, + compensateAsync: true, }, }, } @@ -745,22 +752,31 @@ describe("Transaction Orchestrator", () => { ) const mocktransactionId = TransactionOrchestrator.getKeyName( + "transaction-name", transaction.transactionId, "firstMethod", TransactionHandlerType.INVOKE ) - const registerBeforeAllowed = await strategy - .registerStepFailure(mocktransactionId, null, handler) - .catch((e) => e.message) + const mockSecondStepId = TransactionOrchestrator.getKeyName( + "transaction-name", + transaction.transactionId, + "secondMethod", + TransactionHandlerType.INVOKE + ) await strategy.resume(transaction) expect(mocks.one).toBeCalledTimes(1) expect(mocks.compensateOne).toBeCalledTimes(0) expect(mocks.two).toBeCalledTimes(0) + + const registerBeforeAllowed = await strategy + .registerStepSuccess(mockSecondStepId, handler) + .catch((e) => e.message) + expect(registerBeforeAllowed).toEqual( - "Cannot set step failure when status is idle" + "Cannot set step success when status is idle" ) expect(transaction.getState()).toBe(TransactionState.INVOKING) @@ -774,6 +790,7 @@ describe("Transaction Orchestrator", () => { expect(mocks.compensateOne).toBeCalledTimes(1) const mocktransactionIdCompensate = TransactionOrchestrator.getKeyName( + "transaction-name", transaction.transactionId, "firstMethod", TransactionHandlerType.COMPENSATE diff --git a/packages/orchestration/src/transaction/datastore/abstract-storage.ts b/packages/orchestration/src/transaction/datastore/abstract-storage.ts new file mode 100644 index 0000000000..aab8c5e677 --- /dev/null +++ b/packages/orchestration/src/transaction/datastore/abstract-storage.ts @@ -0,0 +1,118 @@ +import { + DistributedTransaction, + TransactionCheckpoint, +} from "../distributed-transaction" +import { TransactionStep } from "../transaction-step" +import { TransactionModelOptions } from "../types" + +export interface IDistributedTransactionStorage { + get(key: string): Promise + list(): Promise + save(key: string, data: TransactionCheckpoint, ttl?: number): Promise + delete(key: string): Promise + archive(key: string, options?: TransactionModelOptions): Promise + scheduleRetry( + transaction: DistributedTransaction, + step: TransactionStep, + timestamp: number, + interval: number + ): Promise + clearRetry( + transaction: DistributedTransaction, + step: TransactionStep + ): Promise + scheduleTransactionTimeout( + transaction: DistributedTransaction, + timestamp: number, + interval: number + ): Promise + scheduleStepTimeout( + transaction: DistributedTransaction, + step: TransactionStep, + timestamp: number, + interval: number + ): Promise + clearTransactionTimeout(transaction: DistributedTransaction): Promise + clearStepTimeout( + transaction: DistributedTransaction, + step: TransactionStep + ): Promise +} + +export abstract class DistributedTransactionStorage + implements IDistributedTransactionStorage +{ + constructor() { + /* noop */ + } + + async get(key: string): Promise { + throw new Error("Method 'get' not implemented.") + } + + async list(): Promise { + throw new Error("Method 'list' not implemented.") + } + + async save( + key: string, + data: TransactionCheckpoint, + ttl?: number + ): Promise { + throw new Error("Method 'save' not implemented.") + } + + async delete(key: string): Promise { + throw new Error("Method 'delete' not implemented.") + } + + async archive(key: string, options?: TransactionModelOptions): Promise { + throw new Error("Method 'archive' not implemented.") + } + + async scheduleRetry( + transaction: DistributedTransaction, + step: TransactionStep, + timestamp: number, + interval: number + ): Promise { + throw new Error("Method 'scheduleRetry' not implemented.") + } + + async clearRetry( + transaction: DistributedTransaction, + step: TransactionStep + ): Promise { + throw new Error("Method 'clearRetry' not implemented.") + } + + async scheduleTransactionTimeout( + transaction: DistributedTransaction, + timestamp: number, + interval: number + ): Promise { + throw new Error("Method 'scheduleTransactionTimeout' not implemented.") + } + + async clearTransactionTimeout( + transaction: DistributedTransaction + ): Promise { + throw new Error("Method 'clearTransactionTimeout' not implemented.") + } + + async scheduleStepTimeout( + transaction: DistributedTransaction, + step: TransactionStep, + timestamp: number, + interval: number + ): Promise { + throw new Error("Method 'scheduleStepTimeout' not implemented.") + } + + async clearStepTimeout( + transaction: DistributedTransaction, + step: TransactionStep + ): Promise { + throw new Error("Method 'clearStepTimeout' not implemented.") + } +} diff --git a/packages/orchestration/src/transaction/datastore/base-in-memory-storage.ts b/packages/orchestration/src/transaction/datastore/base-in-memory-storage.ts new file mode 100644 index 0000000000..69ab557a03 --- /dev/null +++ b/packages/orchestration/src/transaction/datastore/base-in-memory-storage.ts @@ -0,0 +1,37 @@ +import { TransactionCheckpoint } from "../distributed-transaction" +import { TransactionModelOptions } from "../types" +import { DistributedTransactionStorage } from "./abstract-storage" + +// eslint-disable-next-line max-len +export class BaseInMemoryDistributedTransactionStorage extends DistributedTransactionStorage { + private storage: Map + + constructor() { + super() + this.storage = new Map() + } + + async get(key: string): Promise { + return this.storage.get(key) + } + + async list(): Promise { + return Array.from(this.storage.values()) + } + + async save( + key: string, + data: TransactionCheckpoint, + ttl?: number + ): Promise { + this.storage.set(key, data) + } + + async delete(key: string): Promise { + this.storage.delete(key) + } + + async archive(key: string, options?: TransactionModelOptions): Promise { + this.storage.delete(key) + } +} diff --git a/packages/orchestration/src/transaction/distributed-transaction.ts b/packages/orchestration/src/transaction/distributed-transaction.ts index cd283e0a7e..16d05c2f62 100644 --- a/packages/orchestration/src/transaction/distributed-transaction.ts +++ b/packages/orchestration/src/transaction/distributed-transaction.ts @@ -1,12 +1,17 @@ import { isDefined } from "@medusajs/utils" -import { TransactionFlow } from "./transaction-orchestrator" -import { TransactionStepHandler } from "./transaction-step" +import { EventEmitter } from "events" +import { IDistributedTransactionStorage } from "./datastore/abstract-storage" +import { BaseInMemoryDistributedTransactionStorage } from "./datastore/base-in-memory-storage" +import { + TransactionFlow, + TransactionOrchestrator, +} from "./transaction-orchestrator" +import { TransactionStep, TransactionStepHandler } from "./transaction-step" import { TransactionHandlerType, TransactionState } from "./types" /** * @typedef TransactionMetadata * @property model_id - The id of the model_id that created the transaction (modelId). - * @property reply_to_topic - The topic to reply to for the transaction. * @property idempotency_key - The idempotency key of the transaction. * @property action - The action of the transaction. * @property action_type - The type of the transaction. @@ -15,7 +20,6 @@ import { TransactionHandlerType, TransactionState } from "./types" */ export type TransactionMetadata = { model_id: string - reply_to_topic: string idempotency_key: string action: string action_type: TransactionHandlerType @@ -70,13 +74,19 @@ export class TransactionPayload { * DistributedTransaction represents a distributed transaction, which is a transaction that is composed of multiple steps that are executed in a specific order. */ -export class DistributedTransaction { +export class DistributedTransaction extends EventEmitter { public modelId: string public transactionId: string private readonly errors: TransactionStepError[] = [] - private readonly context: TransactionContext = new TransactionContext() + private static keyValueStore: IDistributedTransactionStorage + + public static setStorage(storage: IDistributedTransactionStorage) { + this.keyValueStore = storage + } + + private static keyPrefix = "dtrans" constructor( private flow: TransactionFlow, @@ -85,6 +95,8 @@ export class DistributedTransaction { errors?: TransactionStepError[], context?: TransactionContext ) { + super() + this.transactionId = flow.transactionId this.modelId = flow.modelId @@ -156,6 +168,7 @@ export class DistributedTransaction { this.getFlow().state === TransactionState.INVOKING ) } + public canRevert(): boolean { return ( this.getFlow().state === TransactionState.DONE || @@ -163,36 +176,129 @@ export class DistributedTransaction { ) } - public static keyValueStore: any = {} // TODO: Use Key/Value db - private static keyPrefix = "dtrans:" - public async saveCheckpoint(): Promise { - // TODO: Use Key/Value db to save transactions - const key = DistributedTransaction.keyPrefix + this.transactionId + public hasTimeout(): boolean { + return !!this.getFlow().definition.timeout + } + + public getTimeoutInterval(): number | undefined { + return this.getFlow().definition.timeout + } + + public async saveCheckpoint( + ttl = 0 + ): Promise { + const options = this.getFlow().options + if (!options?.storeExecution) { + return + } + const data = new TransactionCheckpoint( this.getFlow(), this.getContext(), this.getErrors() ) - DistributedTransaction.keyValueStore[key] = JSON.stringify(data) + + const key = TransactionOrchestrator.getKeyName( + DistributedTransaction.keyPrefix, + this.modelId, + this.transactionId + ) + await DistributedTransaction.keyValueStore.save(key, data, ttl) return data } public static async loadTransaction( + modelId: string, transactionId: string ): Promise { - // TODO: Use Key/Value db to load transactions - const key = DistributedTransaction.keyPrefix + transactionId - if (DistributedTransaction.keyValueStore[key]) { - return JSON.parse(DistributedTransaction.keyValueStore[key]) + const key = TransactionOrchestrator.getKeyName( + DistributedTransaction.keyPrefix, + modelId, + transactionId + ) + + const loadedData = await DistributedTransaction.keyValueStore.get(key) + if (loadedData) { + return loadedData } return null } public async deleteCheckpoint(): Promise { - // TODO: Delete from Key/Value db - const key = DistributedTransaction.keyPrefix + this.transactionId - delete DistributedTransaction.keyValueStore[key] + const options = this.getFlow().options + if (!options?.storeExecution) { + return + } + + const key = TransactionOrchestrator.getKeyName( + DistributedTransaction.keyPrefix, + this.modelId, + this.transactionId + ) + await DistributedTransaction.keyValueStore.delete(key) + } + + public async archiveCheckpoint(): Promise { + const options = this.getFlow().options + + const key = TransactionOrchestrator.getKeyName( + DistributedTransaction.keyPrefix, + this.modelId, + this.transactionId + ) + await DistributedTransaction.keyValueStore.archive(key, options) + } + + public async scheduleRetry( + step: TransactionStep, + interval: number + ): Promise { + await this.saveCheckpoint() + await DistributedTransaction.keyValueStore.scheduleRetry( + this, + step, + Date.now(), + interval + ) + } + + public async clearRetry(step: TransactionStep): Promise { + await DistributedTransaction.keyValueStore.clearRetry(this, step) + } + + public async scheduleTransactionTimeout(interval: number): Promise { + await this.saveCheckpoint() + await DistributedTransaction.keyValueStore.scheduleTransactionTimeout( + this, + Date.now(), + interval + ) + } + + public async clearTransactionTimeout(): Promise { + await DistributedTransaction.keyValueStore.clearTransactionTimeout(this) + } + + public async scheduleStepTimeout( + step: TransactionStep, + interval: number + ): Promise { + await this.saveCheckpoint() + await DistributedTransaction.keyValueStore.scheduleStepTimeout( + this, + step, + Date.now(), + interval + ) + } + + public async clearStepTimeout(step: TransactionStep): Promise { + await DistributedTransaction.keyValueStore.clearStepTimeout(this, step) } } + +DistributedTransaction.setStorage( + new BaseInMemoryDistributedTransactionStorage() +) diff --git a/packages/orchestration/src/transaction/errors.ts b/packages/orchestration/src/transaction/errors.ts index 95a093c5b3..331784e80c 100644 --- a/packages/orchestration/src/transaction/errors.ts +++ b/packages/orchestration/src/transaction/errors.ts @@ -13,3 +13,32 @@ export class PermanentStepFailureError extends Error { this.name = "PermanentStepFailure" } } + +export class StepTimeoutError extends Error { + static isStepTimeoutError(error: Error): error is StepTimeoutError { + return ( + error instanceof StepTimeoutError || error.name === "StepTimeoutError" + ) + } + + constructor(message?: string) { + super(message) + this.name = "StepTimeoutError" + } +} + +export class TransactionTimeoutError extends Error { + static isTransactionTimeoutError( + error: Error + ): error is TransactionTimeoutError { + return ( + error instanceof TransactionTimeoutError || + error.name === "TransactionTimeoutError" + ) + } + + constructor(message?: string) { + super(message) + this.name = "TransactionTimeoutError" + } +} diff --git a/packages/orchestration/src/transaction/index.ts b/packages/orchestration/src/transaction/index.ts index 6633e3b3c3..d06cb879ef 100644 --- a/packages/orchestration/src/transaction/index.ts +++ b/packages/orchestration/src/transaction/index.ts @@ -1,6 +1,7 @@ -export * from "./types" +export * from "./datastore/abstract-storage" +export * from "./distributed-transaction" +export * from "./errors" +export * from "./orchestrator-builder" export * from "./transaction-orchestrator" export * from "./transaction-step" -export * from "./distributed-transaction" -export * from "./orchestrator-builder" -export * from "./errors" +export * from "./types" diff --git a/packages/orchestration/src/transaction/transaction-orchestrator.ts b/packages/orchestration/src/transaction/transaction-orchestrator.ts index fd6b74cd23..1e1c79a842 100644 --- a/packages/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/orchestration/src/transaction/transaction-orchestrator.ts @@ -5,22 +5,33 @@ import { } from "./distributed-transaction" import { TransactionStep, TransactionStepHandler } from "./transaction-step" import { + DistributedTransactionEvent, TransactionHandlerType, + TransactionModelOptions, TransactionState, - TransactionStepsDefinition, TransactionStepStatus, + TransactionStepsDefinition, } from "./types" +import { MedusaError, promiseAll } from "@medusajs/utils" import { EventEmitter } from "events" -import { promiseAll } from "@medusajs/utils" -import { PermanentStepFailureError } from "./errors" +import { + PermanentStepFailureError, + StepTimeoutError, + TransactionTimeoutError, +} from "./errors" export type TransactionFlow = { modelId: string + options?: TransactionModelOptions definition: TransactionStepsDefinition transactionId: string + hasAsyncSteps: boolean hasFailedSteps: boolean + hasWaitingSteps: boolean hasSkippedSteps: boolean + timedOutAt: number | null + startedAt?: number state: TransactionState steps: { [key: string]: TransactionStep @@ -33,13 +44,15 @@ export type TransactionFlow = { */ export class TransactionOrchestrator extends EventEmitter { private static ROOT_STEP = "_root" + public static DEFAULT_TTL = 30 private invokeSteps: string[] = [] private compensateSteps: string[] = [] public static DEFAULT_RETRIES = 0 constructor( public id: string, - private definition: TransactionStepsDefinition + private definition: TransactionStepsDefinition, + private options?: TransactionModelOptions ) { super() } @@ -126,12 +139,34 @@ export class TransactionOrchestrator extends EventEmitter { } } - private checkAllSteps(transaction: DistributedTransaction): { + private async checkStepTimeout(transaction, step) { + let hasTimedOut = false + if ( + step.hasTimeout() && + !step.timedOutAt && + step.canCancel() && + step.startedAt! + step.getTimeoutInterval()! * 1e3 < Date.now() + ) { + step.timedOutAt = Date.now() + await transaction.saveCheckpoint() + this.emit(DistributedTransactionEvent.TIMEOUT, { transaction }) + await TransactionOrchestrator.setStepFailure( + transaction, + step, + new StepTimeoutError(), + 0 + ) + hasTimedOut = true + } + return hasTimedOut + } + + private async checkAllSteps(transaction: DistributedTransaction): Promise<{ next: TransactionStep[] total: number remaining: number completed: number - } { + }> { let hasSkipped = false let hasIgnoredFailure = false let hasFailed = false @@ -158,12 +193,44 @@ export class TransactionOrchestrator extends EventEmitter { const stepDef = flow.steps[step] const curState = stepDef.getStates() + const hasTimedOut = await this.checkStepTimeout(transaction, stepDef) + if (hasTimedOut) { + continue + } + if (curState.status === TransactionStepStatus.WAITING) { hasWaiting = true - if (stepDef.canRetry()) { - nextSteps.push(stepDef) + + if (stepDef.hasAwaitingRetry()) { + if (stepDef.canRetryAwaiting()) { + stepDef.retryRescheduledAt = null + nextSteps.push(stepDef) + } else if (!stepDef.retryRescheduledAt) { + stepDef.hasScheduledRetry = true + stepDef.retryRescheduledAt = Date.now() + + await transaction.scheduleRetry( + stepDef, + stepDef.definition.retryIntervalAwaiting! + ) + } } + continue + } else if (curState.status === TransactionStepStatus.TEMPORARY_FAILURE) { + if (!stepDef.canRetry()) { + if (stepDef.hasRetryInterval() && !stepDef.retryRescheduledAt) { + stepDef.hasScheduledRetry = true + stepDef.retryRescheduledAt = Date.now() + + await transaction.scheduleRetry( + stepDef, + stepDef.definition.retryInterval! + ) + } + continue + } + stepDef.retryRescheduledAt = null } if (stepDef.canInvoke(flow.state) || stepDef.canCompensate(flow.state)) { @@ -185,6 +252,8 @@ export class TransactionOrchestrator extends EventEmitter { } } + flow.hasWaitingSteps = hasWaiting + const totalSteps = allSteps.length - 1 if ( flow.state === TransactionState.WAITING_TO_COMPENSATE && @@ -194,9 +263,9 @@ export class TransactionOrchestrator extends EventEmitter { flow.state = TransactionState.COMPENSATING this.flagStepsToRevert(flow) - this.emit("compensate", transaction) + this.emit(DistributedTransactionEvent.COMPENSATE_BEGIN, { transaction }) - return this.checkAllSteps(transaction) + return await this.checkAllSteps(transaction) } else if (completedSteps === totalSteps) { if (hasSkipped) { flow.hasSkippedSteps = true @@ -211,11 +280,6 @@ export class TransactionOrchestrator extends EventEmitter { ? TransactionState.REVERTED : TransactionState.DONE } - - this.emit("finish", transaction) - - // TODO: check TransactionModel if it should delete the checkpoint when the transaction is done - void transaction.deleteCheckpoint() } return { @@ -267,9 +331,25 @@ export class TransactionOrchestrator extends EventEmitter { step.changeState(TransactionState.DONE) } - if (step.definition.async) { + const flow = transaction.getFlow() + if (step.definition.async || flow.options?.strictCheckpoints) { await transaction.saveCheckpoint() } + + const cleaningUp: Promise[] = [] + if (step.hasRetryScheduled()) { + cleaningUp.push(transaction.clearRetry(step)) + } + if (step.hasTimeout()) { + cleaningUp.push(transaction.clearStepTimeout(step)) + } + + await promiseAll(cleaningUp) + + const eventName = step.isCompensating() + ? DistributedTransactionEvent.COMPENSATE_STEP_SUCCESS + : DistributedTransactionEvent.STEP_SUCCESS + transaction.emit(eventName, { step, transaction }) } private static async setStepFailure( @@ -282,6 +362,8 @@ export class TransactionOrchestrator extends EventEmitter { step.changeStatus(TransactionStepStatus.TEMPORARY_FAILURE) + const flow = transaction.getFlow() + const cleaningUp: Promise[] = [] if (step.failures > maxRetries) { step.changeState(TransactionState.FAILED) step.changeStatus(TransactionStepStatus.PERMANENT_FAILURE) @@ -295,7 +377,6 @@ export class TransactionOrchestrator extends EventEmitter { ) if (!step.isCompensating()) { - const flow = transaction.getFlow() if (step.definition.continueOnPermanentFailure) { for (const childStep of step.next) { const child = flow.steps[childStep] @@ -305,107 +386,175 @@ export class TransactionOrchestrator extends EventEmitter { flow.state = TransactionState.WAITING_TO_COMPENSATE } } + + if (step.hasTimeout()) { + cleaningUp.push(transaction.clearStepTimeout(step)) + } } - if (step.definition.async) { + if (step.definition.async || flow.options?.strictCheckpoints) { await transaction.saveCheckpoint() } + + if (step.hasRetryScheduled()) { + cleaningUp.push(transaction.clearRetry(step)) + } + + await promiseAll(cleaningUp) + + const eventName = step.isCompensating() + ? DistributedTransactionEvent.COMPENSATE_STEP_FAILURE + : DistributedTransactionEvent.STEP_FAILURE + transaction.emit(eventName, { step, transaction }) + } + + private async checkTransactionTimeout(transaction, currentSteps) { + let hasTimedOut = false + const flow = transaction.getFlow() + if ( + transaction.hasTimeout() && + !flow.timedOutAt && + flow.startedAt! + transaction.getTimeoutInterval()! * 1e3 < Date.now() + ) { + flow.timedOutAt = Date.now() + this.emit(DistributedTransactionEvent.TIMEOUT, { transaction }) + + for (const step of currentSteps) { + await TransactionOrchestrator.setStepFailure( + transaction, + step, + new TransactionTimeoutError(), + 0 + ) + } + + await transaction.saveCheckpoint() + + hasTimedOut = true + } + return hasTimedOut } private async executeNext( transaction: DistributedTransaction ): Promise { - if (transaction.hasFinished()) { - return - } + let continueExecution = true - const flow = transaction.getFlow() - const nextSteps = this.checkAllSteps(transaction) - const execution: Promise[] = [] - - for (const step of nextSteps.next) { - const curState = step.getStates() - const type = step.isCompensating() - ? TransactionHandlerType.COMPENSATE - : TransactionHandlerType.INVOKE - - step.lastAttempt = Date.now() - step.attempts++ - - if (curState.state === TransactionState.NOT_STARTED) { - if (step.isCompensating()) { - step.changeState(TransactionState.COMPENSATING) - - if (step.definition.noCompensation) { - step.changeState(TransactionState.REVERTED) - continue - } - } else if (flow.state === TransactionState.INVOKING) { - step.changeState(TransactionState.INVOKING) - } + while (continueExecution) { + if (transaction.hasFinished()) { + return } - step.changeStatus(TransactionStepStatus.WAITING) + const flow = transaction.getFlow() + const nextSteps = await this.checkAllSteps(transaction) + const execution: Promise[] = [] - const payload = new TransactionPayload( - { - model_id: flow.modelId, - reply_to_topic: TransactionOrchestrator.getKeyName( - "trans", - flow.modelId - ), - idempotency_key: TransactionOrchestrator.getKeyName( - flow.transactionId, - step.definition.action!, - type - ), - action: step.definition.action + "", - action_type: type, - attempt: step.attempts, - timestamp: Date.now(), - }, - transaction.payload, - transaction.getContext() + const hasTimedOut = await this.checkTransactionTimeout( + transaction, + nextSteps.next ) - - const setStepFailure = async ( - error: Error | any, - { endRetry }: { endRetry?: boolean } = {} - ) => { - return TransactionOrchestrator.setStepFailure( - transaction, - step, - error, - endRetry ? 0 : step.definition.maxRetries - ) + if (hasTimedOut) { + continue } - if (!step.definition.async) { - execution.push( - transaction - .handler(step.definition.action + "", type, payload, transaction) - .then(async (response: any) => { - await TransactionOrchestrator.setStepSuccess( - transaction, - step, - response - ) - }) - .catch(async (error) => { - if ( - PermanentStepFailureError.isPermanentStepFailureError(error) - ) { - await setStepFailure(error, { endRetry: true }) - return - } - await setStepFailure(error) - }) + if (nextSteps.remaining === 0) { + if (transaction.hasTimeout()) { + await transaction.clearTransactionTimeout() + } + + if (flow.options?.retentionTime == undefined) { + await transaction.deleteCheckpoint() + } else { + await transaction.archiveCheckpoint() + } + + this.emit(DistributedTransactionEvent.FINISH, { transaction }) + } + + let hasSyncSteps = false + for (const step of nextSteps.next) { + const curState = step.getStates() + const type = step.isCompensating() + ? TransactionHandlerType.COMPENSATE + : TransactionHandlerType.INVOKE + + step.lastAttempt = Date.now() + step.attempts++ + + if (curState.state === TransactionState.NOT_STARTED) { + if (!step.startedAt) { + step.startedAt = Date.now() + } + + if (step.isCompensating()) { + step.changeState(TransactionState.COMPENSATING) + + if (step.definition.noCompensation) { + step.changeState(TransactionState.REVERTED) + continue + } + } else if (flow.state === TransactionState.INVOKING) { + step.changeState(TransactionState.INVOKING) + } + } + + step.changeStatus(TransactionStepStatus.WAITING) + + const payload = new TransactionPayload( + { + model_id: flow.modelId, + idempotency_key: TransactionOrchestrator.getKeyName( + flow.modelId, + flow.transactionId, + step.definition.action!, + type + ), + action: step.definition.action + "", + action_type: type, + attempt: step.attempts, + timestamp: Date.now(), + }, + transaction.payload, + transaction.getContext() ) - } else { - execution.push( - transaction.saveCheckpoint().then(async () => + + if (step.hasTimeout() && !step.timedOutAt && step.attempts === 1) { + await transaction.scheduleStepTimeout(step, step.definition.timeout!) + } + + transaction.emit(DistributedTransactionEvent.STEP_BEGIN, { + step, + transaction, + }) + + const setStepFailure = async ( + error: Error | any, + { endRetry }: { endRetry?: boolean } = {} + ) => { + return TransactionOrchestrator.setStepFailure( + transaction, + step, + error, + endRetry ? 0 : step.definition.maxRetries + ) + } + + const isAsync = step.isCompensating() + ? step.definition.compensateAsync + : step.definition.async + + if (!isAsync) { + hasSyncSteps = true + execution.push( transaction .handler(step.definition.action + "", type, payload, transaction) + .then(async (response: any) => { + await TransactionOrchestrator.setStepSuccess( + transaction, + step, + response + ) + }) .catch(async (error) => { if ( PermanentStepFailureError.isPermanentStepFailureError(error) @@ -413,17 +562,44 @@ export class TransactionOrchestrator extends EventEmitter { await setStepFailure(error, { endRetry: true }) return } + await setStepFailure(error) }) ) - ) + } else { + execution.push( + transaction.saveCheckpoint().then(async () => + transaction + .handler( + step.definition.action + "", + type, + payload, + transaction + ) + .catch(async (error) => { + if ( + PermanentStepFailureError.isPermanentStepFailureError(error) + ) { + await setStepFailure(error, { endRetry: true }) + return + } + + await setStepFailure(error) + }) + ) + ) + } } - } - await promiseAll(execution) + if (hasSyncSteps && flow.options?.strictCheckpoints) { + await transaction.saveCheckpoint() + } - if (nextSteps.next.length > 0) { - await this.executeNext(transaction) + await promiseAll(execution) + + if (nextSteps.next.length === 0) { + continueExecution = false + } } } @@ -433,7 +609,8 @@ export class TransactionOrchestrator extends EventEmitter { */ public async resume(transaction: DistributedTransaction): Promise { if (transaction.modelId !== this.id) { - throw new Error( + throw new MedusaError( + MedusaError.Types.NOT_ALLOWED, `TransactionModel "${transaction.modelId}" cannot be orchestrated by "${this.id}" model.` ) } @@ -446,9 +623,23 @@ export class TransactionOrchestrator extends EventEmitter { if (flow.state === TransactionState.NOT_STARTED) { flow.state = TransactionState.INVOKING - this.emit("begin", transaction) + flow.startedAt = Date.now() + + if (this.options?.storeExecution) { + await transaction.saveCheckpoint( + flow.hasAsyncSteps ? 0 : TransactionOrchestrator.DEFAULT_TTL + ) + } + + if (transaction.hasTimeout()) { + await transaction.scheduleTransactionTimeout( + transaction.getTimeoutInterval()! + ) + } + + this.emit(DistributedTransactionEvent.BEGIN, { transaction }) } else { - this.emit("resume", transaction) + this.emit(DistributedTransactionEvent.RESUME, { transaction }) } await this.executeNext(transaction) @@ -462,14 +653,18 @@ export class TransactionOrchestrator extends EventEmitter { transaction: DistributedTransaction ): Promise { if (transaction.modelId !== this.id) { - throw new Error( + throw new MedusaError( + MedusaError.Types.NOT_ALLOWED, `TransactionModel "${transaction.modelId}" cannot be orchestrated by "${this.id}" model.` ) } const flow = transaction.getFlow() if (flow.state === TransactionState.FAILED) { - throw new Error(`Cannot revert a perment failed transaction.`) + throw new MedusaError( + MedusaError.Types.NOT_ALLOWED, + `Cannot revert a perment failed transaction.` + ) } flow.state = TransactionState.WAITING_TO_COMPENSATE @@ -477,33 +672,54 @@ export class TransactionOrchestrator extends EventEmitter { await this.executeNext(transaction) } - private async createTransactionFlow( - transactionId: string - ): Promise { - return { + private createTransactionFlow(transactionId: string): TransactionFlow { + const [steps, features] = TransactionOrchestrator.buildSteps( + this.definition + ) + + const hasAsyncSteps = features.hasAsyncSteps + const hasStepTimeouts = features.hasStepTimeouts + const hasRetriesTimeout = features.hasRetriesTimeout + + this.options ??= {} + if (hasAsyncSteps || hasStepTimeouts || hasRetriesTimeout) { + this.options.storeExecution = true + } + + const flow: TransactionFlow = { modelId: this.id, + options: this.options, transactionId: transactionId, + hasAsyncSteps, hasFailedSteps: false, hasSkippedSteps: false, + hasWaitingSteps: false, + timedOutAt: null, state: TransactionState.NOT_STARTED, definition: this.definition, - steps: TransactionOrchestrator.buildSteps(this.definition), + steps, } + + return flow } private static async loadTransactionById( + modelId: string, transactionId: string ): Promise { const transaction = await DistributedTransaction.loadTransaction( + modelId, transactionId ) if (transaction !== null) { const flow = transaction.flow - transaction.flow.steps = TransactionOrchestrator.buildSteps( + const [steps] = TransactionOrchestrator.buildSteps( flow.definition, flow.steps ) + + transaction.flow.steps = steps return transaction } @@ -513,7 +729,14 @@ export class TransactionOrchestrator extends EventEmitter { private static buildSteps( flow: TransactionStepsDefinition, existingSteps?: { [key: string]: TransactionStep } - ): { [key: string]: TransactionStep } { + ): [ + { [key: string]: TransactionStep }, + { + hasAsyncSteps: boolean + hasStepTimeouts: boolean + hasRetriesTimeout: boolean + } + ] { const states: { [key: string]: TransactionStep } = { [TransactionOrchestrator.ROOT_STEP]: { id: TransactionOrchestrator.ROOT_STEP, @@ -526,6 +749,12 @@ export class TransactionOrchestrator extends EventEmitter { { obj: flow, level: [TransactionOrchestrator.ROOT_STEP] }, ] + const features = { + hasAsyncSteps: false, + hasStepTimeouts: false, + hasRetriesTimeout: false, + } + while (queue.length > 0) { const { obj, level } = queue.shift() @@ -547,11 +776,28 @@ export class TransactionOrchestrator extends EventEmitter { const id = level.join(".") const parent = level.slice(0, level.length - 1).join(".") - states[parent].next?.push(id) + if (!existingSteps || parent === TransactionOrchestrator.ROOT_STEP) { + states[parent].next?.push(id) + } const definitionCopy = { ...obj } delete definitionCopy.next + if (definitionCopy.async) { + features.hasAsyncSteps = true + } + + if (definitionCopy.timeout) { + features.hasStepTimeouts = true + } + + if ( + definitionCopy.retryInterval || + definitionCopy.retryIntervalAwaiting + ) { + features.hasRetriesTimeout = true + } + states[id] = Object.assign( new TransactionStep(), existingSteps?.[id] || { @@ -577,7 +823,7 @@ export class TransactionOrchestrator extends EventEmitter { } } - return states + return [states, features] } /** Create a new transaction @@ -591,12 +837,12 @@ export class TransactionOrchestrator extends EventEmitter { payload?: unknown ): Promise { const existingTransaction = - await TransactionOrchestrator.loadTransactionById(transactionId) + await TransactionOrchestrator.loadTransactionById(this.id, transactionId) let newTransaction = false - let modelFlow + let modelFlow: TransactionFlow if (!existingTransaction) { - modelFlow = await this.createTransactionFlow(transactionId) + modelFlow = this.createTransactionFlow(transactionId) newTransaction = true } else { modelFlow = existingTransaction.flow @@ -609,13 +855,49 @@ export class TransactionOrchestrator extends EventEmitter { existingTransaction?.errors, existingTransaction?.context ) - if (newTransaction) { - await transaction.saveCheckpoint() + + if ( + newTransaction && + this.options?.storeExecution && + this.options?.strictCheckpoints + ) { + await transaction.saveCheckpoint( + modelFlow.hasAsyncSteps ? 0 : TransactionOrchestrator.DEFAULT_TTL + ) } return transaction } + /** Returns an existing transaction + * @param transactionId - unique identifier of the transaction + * @param handler - function to handle action of the transaction + */ + public async retrieveExistingTransaction( + transactionId: string, + handler: TransactionStepHandler + ): Promise { + const existingTransaction = + await TransactionOrchestrator.loadTransactionById(this.id, transactionId) + + if (!existingTransaction) { + throw new MedusaError( + MedusaError.Types.NOT_FOUND, + `Transaction ${transactionId} could not be found.` + ) + } + + const transaction = new DistributedTransaction( + existingTransaction.flow, + handler, + undefined, + existingTransaction?.errors, + existingTransaction?.context + ) + + return transaction + } + private static getStepByAction( flow: TransactionFlow, action: string @@ -633,9 +915,8 @@ export class TransactionOrchestrator extends EventEmitter { handler?: TransactionStepHandler, transaction?: DistributedTransaction ): Promise<[DistributedTransaction, TransactionStep]> { - const [transactionId, action, actionType] = responseIdempotencyKey.split( - TransactionOrchestrator.SEPARATOR - ) + const [modelId, transactionId, action, actionType] = + responseIdempotencyKey.split(TransactionOrchestrator.SEPARATOR) if (!transaction && !handler) { throw new Error( @@ -645,10 +926,16 @@ export class TransactionOrchestrator extends EventEmitter { if (!transaction) { const existingTransaction = - await TransactionOrchestrator.loadTransactionById(transactionId) + await TransactionOrchestrator.loadTransactionById( + modelId, + transactionId + ) if (existingTransaction === null) { - throw new Error(`Transaction ${transactionId} could not be found.`) + throw new MedusaError( + MedusaError.Types.NOT_FOUND, + `Transaction ${transactionId} could not be found.` + ) } transaction = new DistributedTransaction( @@ -697,16 +984,20 @@ export class TransactionOrchestrator extends EventEmitter { ) if (step.getStates().status === TransactionStepStatus.WAITING) { + this.emit(DistributedTransactionEvent.RESUME, { + transaction: curTransaction, + }) + await TransactionOrchestrator.setStepSuccess( curTransaction, step, response ) - this.emit("resume", curTransaction) await this.executeNext(curTransaction) } else { - throw new Error( + throw new MedusaError( + MedusaError.Types.NOT_ALLOWED, `Cannot set step success when status is ${step.getStates().status}` ) } @@ -736,16 +1027,21 @@ export class TransactionOrchestrator extends EventEmitter { ) if (step.getStates().status === TransactionStepStatus.WAITING) { + this.emit(DistributedTransactionEvent.RESUME, { + transaction: curTransaction, + }) + await TransactionOrchestrator.setStepFailure( curTransaction, step, error, 0 ) - this.emit("resume", curTransaction) + await this.executeNext(curTransaction) } else { - throw new Error( + throw new MedusaError( + MedusaError.Types.NOT_ALLOWED, `Cannot set step failure when status is ${step.getStates().status}` ) } diff --git a/packages/orchestration/src/transaction/transaction-step.ts b/packages/orchestration/src/transaction/transaction-step.ts index 72f63e9d25..57b06acf31 100644 --- a/packages/orchestration/src/transaction/transaction-step.ts +++ b/packages/orchestration/src/transaction/transaction-step.ts @@ -1,3 +1,4 @@ +import { MedusaError } from "@medusajs/utils" import { DistributedTransaction, TransactionPayload, @@ -5,8 +6,8 @@ import { import { TransactionHandlerType, TransactionState, - TransactionStepStatus, TransactionStepsDefinition, + TransactionStepStatus, } from "./types" export type TransactionStepHandler = ( @@ -30,6 +31,8 @@ export class TransactionStep { * @member attempts - The number of attempts made to execute the step * @member failures - The number of failures encountered while executing the step * @member lastAttempt - The timestamp of the last attempt made to execute the step + * @member hasScheduledRetry - A flag indicating if a retry has been scheduled + * @member retryRescheduledAt - The timestamp of the last retry scheduled * @member next - The ids of the next steps in the flow * @member saveResponse - A flag indicating if the response of a step should be shared in the transaction context and available to subsequent steps - default is true */ @@ -48,6 +51,10 @@ export class TransactionStep { attempts: number failures: number lastAttempt: number | null + retryRescheduledAt: number | null + hasScheduledRetry: boolean + timedOutAt: number | null + startedAt?: number next: string[] saveResponse: boolean @@ -70,6 +77,10 @@ export class TransactionStep { return this.stepFailed } + public isInvoking() { + return !this.stepFailed + } + public changeState(toState: TransactionState) { const allowed = { [TransactionState.DORMANT]: [TransactionState.NOT_STARTED], @@ -99,7 +110,8 @@ export class TransactionStep { return } - throw new Error( + throw new MedusaError( + MedusaError.Types.NOT_ALLOWED, `Updating State from "${curState.state}" to "${toState}" is not allowed.` ) } @@ -128,16 +140,49 @@ export class TransactionStep { return } - throw new Error( + throw new MedusaError( + MedusaError.Types.NOT_ALLOWED, `Updating Status from "${curState.status}" to "${toStatus}" is not allowed.` ) } + hasRetryScheduled(): boolean { + return !!this.hasScheduledRetry + } + + hasRetryInterval(): boolean { + return !!this.definition.retryInterval + } + + hasTimeout(): boolean { + return !!this.definition.timeout + } + + getTimeoutInterval(): number | undefined { + return this.definition.timeout + } + canRetry(): boolean { + return ( + !this.definition.retryInterval || + !!( + this.lastAttempt && + this.definition.retryInterval && + Date.now() - this.lastAttempt > this.definition.retryInterval * 1e3 + ) + ) + } + + hasAwaitingRetry(): boolean { + return !!this.definition.retryIntervalAwaiting + } + + canRetryAwaiting(): boolean { return !!( + this.hasAwaitingRetry() && this.lastAttempt && - this.definition.retryInterval && - Date.now() - this.lastAttempt > this.definition.retryInterval * 1e3 + Date.now() - this.lastAttempt > + this.definition.retryIntervalAwaiting! * 1e3 ) } @@ -158,4 +203,14 @@ export class TransactionStep { flowState === TransactionState.COMPENSATING ) } + + canCancel(): boolean { + return ( + !this.isCompensating() && + [ + TransactionStepStatus.WAITING, + TransactionStepStatus.TEMPORARY_FAILURE, + ].includes(this.getStates().status) + ) + } } diff --git a/packages/orchestration/src/transaction/types.ts b/packages/orchestration/src/transaction/types.ts index 25ab21c78d..16f27985f5 100644 --- a/packages/orchestration/src/transaction/types.ts +++ b/packages/orchestration/src/transaction/types.ts @@ -1,3 +1,6 @@ +import { DistributedTransaction } from "./distributed-transaction" +import { TransactionStep } from "./transaction-step" + export enum TransactionHandlerType { INVOKE = "invoke", COMPENSATE = "compensate", @@ -9,8 +12,10 @@ export type TransactionStepsDefinition = { noCompensation?: boolean maxRetries?: number retryInterval?: number + retryIntervalAwaiting?: number timeout?: number async?: boolean + compensateAsync?: boolean noWait?: boolean saveResponse?: boolean next?: TransactionStepsDefinition | TransactionStepsDefinition[] @@ -36,8 +41,67 @@ export enum TransactionState { SKIPPED = "skipped", } +export type TransactionModelOptions = { + timeout?: number + storeExecution?: boolean + retentionTime?: number + strictCheckpoints?: boolean +} + export type TransactionModel = { id: string flow: TransactionStepsDefinition hash: string + options?: TransactionModelOptions +} + +export enum DistributedTransactionEvent { + BEGIN = "begin", + RESUME = "resume", + COMPENSATE_BEGIN = "compensateBegin", + FINISH = "finish", + TIMEOUT = "timeout", + STEP_BEGIN = "stepBegin", + STEP_SUCCESS = "stepSuccess", + STEP_FAILURE = "stepFailure", + COMPENSATE_STEP_SUCCESS = "compensateStepSuccess", + COMPENSATE_STEP_FAILURE = "compensateStepFailure", +} + +export type DistributedTransactionEvents = { + onBegin?: (args: { transaction: DistributedTransaction }) => void + onResume?: (args: { transaction: DistributedTransaction }) => void + onFinish?: (args: { + transaction: DistributedTransaction + result?: unknown + errors?: unknown[] + }) => void + onTimeout?: (args: { transaction: DistributedTransaction }) => void + + onStepBegin?: (args: { + step: TransactionStep + transaction: DistributedTransaction + }) => void + + onStepSuccess?: (args: { + step: TransactionStep + transaction: DistributedTransaction + }) => void + + onStepFailure?: (args: { + step: TransactionStep + transaction: DistributedTransaction + }) => void + + onCompensateBegin?: (args: { transaction: DistributedTransaction }) => void + + onCompensateStepSuccess?: (args: { + step: TransactionStep + transaction: DistributedTransaction + }) => void + + onCompensateStepFailure?: (args: { + step: TransactionStep + transaction: DistributedTransaction + }) => void } diff --git a/packages/orchestration/src/workflow/global-workflow.ts b/packages/orchestration/src/workflow/global-workflow.ts index 0cc69e2ac0..4997506098 100644 --- a/packages/orchestration/src/workflow/global-workflow.ts +++ b/packages/orchestration/src/workflow/global-workflow.ts @@ -2,17 +2,22 @@ import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" import { createContainerLike, createMedusaContainer } from "@medusajs/utils" import { asValue } from "awilix" -import { DistributedTransaction } from "../transaction" +import { + DistributedTransaction, + DistributedTransactionEvents, +} from "../transaction" import { WorkflowDefinition, WorkflowManager } from "./workflow-manager" export class GlobalWorkflow extends WorkflowManager { protected static workflows: Map = new Map() protected container: MedusaContainer protected context: Context + protected subscribe: DistributedTransactionEvents constructor( modulesLoaded?: LoadedModule[] | MedusaContainer, - context?: Context + context?: Context, + subscribe?: DistributedTransactionEvents ) { super() @@ -35,6 +40,7 @@ export class GlobalWorkflow extends WorkflowManager { this.container = container this.context = context ?? {} + this.subscribe = subscribe ?? {} } async run(workflowId: string, uniqueTransactionId: string, input?: unknown) { @@ -52,6 +58,18 @@ export class GlobalWorkflow extends WorkflowManager { input ) + if (this.subscribe.onStepBegin) { + transaction.once("stepBegin", this.subscribe.onStepBegin) + } + + if (this.subscribe.onStepSuccess) { + transaction.once("stepSuccess", this.subscribe.onStepSuccess) + } + + if (this.subscribe.onStepFailure) { + transaction.once("stepFailure", this.subscribe.onStepFailure) + } + await orchestrator.resume(transaction) return transaction @@ -67,6 +85,21 @@ export class GlobalWorkflow extends WorkflowManager { } const workflow = WorkflowManager.workflows.get(workflowId)! + const orchestrator = workflow.orchestrator + orchestrator.once("resume", (transaction) => { + if (this.subscribe.onStepBegin) { + transaction.once("stepBegin", this.subscribe.onStepBegin) + } + + if (this.subscribe.onStepSuccess) { + transaction.once("stepSuccess", this.subscribe.onStepSuccess) + } + + if (this.subscribe.onStepFailure) { + transaction.once("stepFailure", this.subscribe.onStepFailure) + } + }) + return await workflow.orchestrator.registerStepSuccess( idempotencyKey, workflow.handler(this.container, this.context), @@ -85,6 +118,21 @@ export class GlobalWorkflow extends WorkflowManager { } const workflow = WorkflowManager.workflows.get(workflowId)! + const orchestrator = workflow.orchestrator + orchestrator.once("resume", (transaction) => { + if (this.subscribe.onStepBegin) { + transaction.once("stepBegin", this.subscribe.onStepBegin) + } + + if (this.subscribe.onStepSuccess) { + transaction.once("stepSuccess", this.subscribe.onStepSuccess) + } + + if (this.subscribe.onStepFailure) { + transaction.once("stepFailure", this.subscribe.onStepFailure) + } + }) + return await workflow.orchestrator.registerStepFailure( idempotencyKey, error, diff --git a/packages/orchestration/src/workflow/local-workflow.ts b/packages/orchestration/src/workflow/local-workflow.ts index 5228b2ee28..c19436a14c 100644 --- a/packages/orchestration/src/workflow/local-workflow.ts +++ b/packages/orchestration/src/workflow/local-workflow.ts @@ -3,6 +3,8 @@ import { createContainerLike, createMedusaContainer } from "@medusajs/utils" import { asValue } from "awilix" import { DistributedTransaction, + DistributedTransactionEvent, + DistributedTransactionEvents, TransactionOrchestrator, TransactionStepsDefinition, } from "../transaction" @@ -79,7 +81,174 @@ export class LocalWorkflow { return this.workflow.flow_ } - async run(uniqueTransactionId: string, input?: unknown, context?: Context) { + private registerEventCallbacks({ + orchestrator, + transaction, + subscribe, + idempotencyKey, + }: { + orchestrator: TransactionOrchestrator + transaction?: DistributedTransaction + subscribe?: DistributedTransactionEvents + idempotencyKey?: string + }) { + const modelId = orchestrator.id + let transactionId + + if (transaction) { + transactionId = transaction!.transactionId + } else if (idempotencyKey) { + const [, trxId] = idempotencyKey!.split(":") + transactionId = trxId + } + + const eventWrapperMap = new Map() + for (const [key, handler] of Object.entries(subscribe ?? {})) { + eventWrapperMap.set(key, (args) => { + const { transaction } = args + + if ( + transaction.transactionId !== transactionId || + transaction.modelId !== modelId + ) { + return + } + + handler(args) + }) + } + + if (subscribe?.onBegin) { + orchestrator.on( + DistributedTransactionEvent.BEGIN, + eventWrapperMap.get("onBegin") + ) + } + + if (subscribe?.onResume) { + orchestrator.on( + DistributedTransactionEvent.RESUME, + eventWrapperMap.get("onResume") + ) + } + + if (subscribe?.onCompensateBegin) { + orchestrator.on( + DistributedTransactionEvent.COMPENSATE_BEGIN, + eventWrapperMap.get("onCompensateBegin") + ) + } + + if (subscribe?.onTimeout) { + orchestrator.on( + DistributedTransactionEvent.TIMEOUT, + eventWrapperMap.get("onTimeout") + ) + } + + if (subscribe?.onFinish) { + orchestrator.on( + DistributedTransactionEvent.FINISH, + eventWrapperMap.get("onFinish") + ) + } + + const resumeWrapper = ({ transaction }) => { + if ( + transaction.modelId !== modelId || + transaction.transactionId !== transactionId + ) { + return + } + + if (subscribe?.onStepBegin) { + transaction.on( + DistributedTransactionEvent.STEP_BEGIN, + eventWrapperMap.get("onStepBegin") + ) + } + + if (subscribe?.onStepSuccess) { + transaction.on( + DistributedTransactionEvent.STEP_SUCCESS, + eventWrapperMap.get("onStepSuccess") + ) + } + + if (subscribe?.onStepFailure) { + transaction.on( + DistributedTransactionEvent.STEP_FAILURE, + eventWrapperMap.get("onStepFailure") + ) + } + + if (subscribe?.onCompensateStepSuccess) { + transaction.on( + DistributedTransactionEvent.COMPENSATE_STEP_SUCCESS, + eventWrapperMap.get("onCompensateStepSuccess") + ) + } + + if (subscribe?.onCompensateStepFailure) { + transaction.on( + DistributedTransactionEvent.COMPENSATE_STEP_FAILURE, + eventWrapperMap.get("onCompensateStepFailure") + ) + } + } + + if (transaction) { + resumeWrapper({ transaction }) + } else { + orchestrator.once("resume", resumeWrapper) + } + + const cleanUp = () => { + subscribe?.onFinish && + orchestrator.removeListener( + DistributedTransactionEvent.FINISH, + eventWrapperMap.get("onFinish") + ) + subscribe?.onResume && + orchestrator.removeListener( + DistributedTransactionEvent.RESUME, + eventWrapperMap.get("onResume") + ) + subscribe?.onBegin && + orchestrator.removeListener( + DistributedTransactionEvent.BEGIN, + eventWrapperMap.get("onBegin") + ) + subscribe?.onCompensateBegin && + orchestrator.removeListener( + DistributedTransactionEvent.COMPENSATE_BEGIN, + eventWrapperMap.get("onCompensateBegin") + ) + subscribe?.onTimeout && + orchestrator.removeListener( + DistributedTransactionEvent.TIMEOUT, + eventWrapperMap.get("onTimeout") + ) + + orchestrator.removeListener( + DistributedTransactionEvent.RESUME, + resumeWrapper + ) + + eventWrapperMap.clear() + } + + return { + cleanUpEventListeners: cleanUp, + } + } + + async run( + uniqueTransactionId: string, + input?: unknown, + context?: Context, + subscribe?: DistributedTransactionEvents + ) { if (this.flow.hasChanges) { this.commit() } @@ -92,36 +261,104 @@ export class LocalWorkflow { input ) + const { cleanUpEventListeners } = this.registerEventCallbacks({ + orchestrator, + transaction, + subscribe, + }) + await orchestrator.resume(transaction) + cleanUpEventListeners() + + return transaction + } + + async getRunningTransaction(uniqueTransactionId: string, context?: Context) { + const { handler, orchestrator } = this.workflow + + const transaction = await orchestrator.retrieveExistingTransaction( + uniqueTransactionId, + handler(this.container, context) + ) + + return transaction + } + + async cancel( + uniqueTransactionId: string, + context?: Context, + subscribe?: DistributedTransactionEvents + ) { + const { orchestrator } = this.workflow + + const transaction = await this.getRunningTransaction( + uniqueTransactionId, + context + ) + + const { cleanUpEventListeners } = this.registerEventCallbacks({ + orchestrator, + transaction, + subscribe, + }) + + await orchestrator.cancelTransaction(transaction) + + cleanUpEventListeners() + return transaction } async registerStepSuccess( idempotencyKey: string, response?: unknown, - context?: Context + context?: Context, + subscribe?: DistributedTransactionEvents ): Promise { const { handler, orchestrator } = this.workflow - return await orchestrator.registerStepSuccess( + + const { cleanUpEventListeners } = this.registerEventCallbacks({ + orchestrator, + idempotencyKey, + subscribe, + }) + + const transaction = await orchestrator.registerStepSuccess( idempotencyKey, handler(this.container, context), undefined, response ) + + cleanUpEventListeners() + + return transaction } async registerStepFailure( idempotencyKey: string, error?: Error | any, - context?: Context + context?: Context, + subscribe?: DistributedTransactionEvents ): Promise { const { handler, orchestrator } = this.workflow - return await orchestrator.registerStepFailure( + + const { cleanUpEventListeners } = this.registerEventCallbacks({ + orchestrator, + idempotencyKey, + subscribe, + }) + + const transaction = await orchestrator.registerStepFailure( idempotencyKey, error, handler(this.container, context) ) + + cleanUpEventListeners() + + return transaction } addAction( diff --git a/packages/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts b/packages/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts index 9a4222555c..f479bd9046 100644 --- a/packages/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts +++ b/packages/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts @@ -23,6 +23,28 @@ jest.mock("@medusajs/orchestration", () => { }), } }), + registerStepSuccess: jest.fn(() => { + return { + getErrors: jest.fn(), + getState: jest.fn(() => "done"), + getContext: jest.fn(() => { + return { + invoke: { result_step: "invoke_test" }, + } + }), + } + }), + registerStepFailure: jest.fn(() => { + return { + getErrors: jest.fn(), + getState: jest.fn(() => "done"), + getContext: jest.fn(() => { + return { + invoke: { result_step: "invoke_test" }, + } + }), + } + }), } }), } diff --git a/packages/workflows-sdk/src/helper/workflow-export.ts b/packages/workflows-sdk/src/helper/workflow-export.ts index 7b940609f2..7a319cb156 100644 --- a/packages/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/workflows-sdk/src/helper/workflow-export.ts @@ -1,5 +1,6 @@ import { DistributedTransaction, + DistributedTransactionEvents, LocalWorkflow, TransactionHandlerType, TransactionState, @@ -8,15 +9,36 @@ import { import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" import { MedusaModule } from "@medusajs/modules-sdk" +import { OrchestrationUtils } from "@medusajs/utils" import { EOL } from "os" import { ulid } from "ulid" -import { OrchestrationUtils } from "@medusajs/utils" +import { MedusaWorkflow } from "../medusa-workflow" +import { resolveValue } from "../utils/composer" export type FlowRunOptions = { input?: TData context?: Context - resultFrom?: string | string[] + resultFrom?: string | string[] | Symbol throwOnError?: boolean + events?: DistributedTransactionEvents +} + +export type FlowRegisterStepSuccessOptions = { + idempotencyKey: string + response?: TData + context?: Context + resultFrom?: string | string[] | Symbol + throwOnError?: boolean + events?: DistributedTransactionEvents +} + +export type FlowRegisterStepFailureOptions = { + idempotencyKey: string + response?: TData + context?: Context + resultFrom?: string | string[] | Symbol + throwOnError?: boolean + events?: DistributedTransactionEvents } export type WorkflowResult = { @@ -25,24 +47,59 @@ export type WorkflowResult = { result: TResult } +export type ExportedWorkflow< + TData = unknown, + TResult = unknown, + TDataOverride = undefined, + TResultOverride = undefined +> = { + run: ( + args?: FlowRunOptions< + TDataOverride extends undefined ? TData : TDataOverride + > + ) => Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > + registerStepSuccess: ( + args?: FlowRegisterStepSuccessOptions< + TDataOverride extends undefined ? TData : TDataOverride + > + ) => Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > + registerStepFailure: ( + args?: FlowRegisterStepFailureOptions< + TDataOverride extends undefined ? TData : TDataOverride + > + ) => Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > +} + export const exportWorkflow = ( workflowId: string, - defaultResult?: string, - dataPreparation?: (data: TData) => Promise + defaultResult?: string | Symbol, + dataPreparation?: (data: TData) => Promise, + options?: { + wrappedInput?: boolean + } ) => { - return function ( + function exportedWorkflow< + TDataOverride = undefined, + TResultOverride = undefined + >( container?: LoadedModule[] | MedusaContainer - ): Omit & { - run: ( - args?: FlowRunOptions< - TDataOverride extends undefined ? TData : TDataOverride - > - ) => Promise< - WorkflowResult< - TResultOverride extends undefined ? TResult : TResultOverride - > - > - } { + ): Omit< + LocalWorkflow, + "run" | "registerStepSuccess" | "registerStepFailure" + > & + ExportedWorkflow { if (!container) { container = MedusaModule.getLoadedModules().map( (mod) => Object.values(mod)[0] @@ -52,8 +109,64 @@ export const exportWorkflow = ( const flow = new LocalWorkflow(workflowId, container) const originalRun = flow.run.bind(flow) + const originalRegisterStepSuccess = flow.registerStepSuccess.bind(flow) + const originalRegisterStepFailure = flow.registerStepFailure.bind(flow) + + const originalExecution = async ( + method, + { throwOnError, resultFrom }, + ...args + ) => { + const transaction = await method.apply(method, args) + + const errors = transaction.getErrors(TransactionHandlerType.INVOKE) + + const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED] + if (failedStatus.includes(transaction.getState()) && throwOnError) { + const errorMessage = errors + ?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`) + ?.join(`${EOL}`) + throw new Error(errorMessage) + } + + let result: any = undefined + + const resFrom = + resultFrom?.__type === OrchestrationUtils.SymbolWorkflowStep + ? resultFrom.__step__ + : resultFrom + + if (resFrom) { + if (Array.isArray(resFrom)) { + result = resFrom.map((from) => { + const res = transaction.getContext().invoke?.[from] + return res?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData + ? res.output + : res + }) + } else { + const res = transaction.getContext().invoke?.[resFrom] + result = + res?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData + ? res.output + : res + } + + const ret = result || resFrom + result = options?.wrappedInput + ? await resolveValue(ret, transaction.getContext()) + : ret + } + + return { + errors, + transaction, + result, + } + } + const newRun = async ( - { input, context, throwOnError, resultFrom }: FlowRunOptions = { + { input, context, throwOnError, resultFrom, events }: FlowRunOptions = { throwOnError: true, resultFrom: defaultResult, } @@ -77,59 +190,77 @@ export const exportWorkflow = ( } } - const transaction = await originalRun( + return await originalExecution( + originalRun, + { throwOnError, resultFrom }, context?.transactionId ?? ulid(), input, - context + context, + events ) - - const errors = transaction.getErrors(TransactionHandlerType.INVOKE) - - const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED] - if (failedStatus.includes(transaction.getState()) && throwOnError) { - const errorMessage = errors - ?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`) - ?.join(`${EOL}`) - throw new Error(errorMessage) - } - - let result: any = undefined - - if (resultFrom) { - if (Array.isArray(resultFrom)) { - result = resultFrom.map((from) => { - const res = transaction.getContext().invoke?.[from] - return res?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData - ? res.output - : res - }) - } else { - const res = transaction.getContext().invoke?.[resultFrom] - result = - res?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData - ? res.output - : res - } - } - - return { - errors, - transaction, - result, - } } flow.run = newRun as any - return flow as unknown as LocalWorkflow & { - run: ( - args?: FlowRunOptions< - TDataOverride extends undefined ? TData : TDataOverride - > - ) => Promise< - WorkflowResult< - TResultOverride extends undefined ? TResult : TResultOverride - > - > + const newRegisterStepSuccess = async ( + { + response, + idempotencyKey, + context, + throwOnError, + resultFrom, + events, + }: FlowRegisterStepSuccessOptions = { + idempotencyKey: "", + throwOnError: true, + resultFrom: defaultResult, + } + ) => { + resultFrom ??= defaultResult + throwOnError ??= true + + return await originalExecution( + originalRegisterStepSuccess, + { throwOnError, resultFrom }, + idempotencyKey, + response, + context, + events + ) } + flow.registerStepSuccess = newRegisterStepSuccess as any + + const newRegisterStepFailure = async ( + { + response, + idempotencyKey, + context, + throwOnError, + resultFrom, + events, + }: FlowRegisterStepFailureOptions = { + idempotencyKey: "", + throwOnError: true, + resultFrom: defaultResult, + } + ) => { + resultFrom ??= defaultResult + throwOnError ??= true + + return await originalExecution( + originalRegisterStepFailure, + { throwOnError, resultFrom }, + idempotencyKey, + response, + context, + events + ) + } + flow.registerStepFailure = newRegisterStepFailure as any + + return flow as unknown as LocalWorkflow & + ExportedWorkflow } + + MedusaWorkflow.registerWorkflow(workflowId, exportedWorkflow) + return exportedWorkflow } diff --git a/packages/workflows-sdk/src/index.ts b/packages/workflows-sdk/src/index.ts index b7791bb21d..9c27d4e26a 100644 --- a/packages/workflows-sdk/src/index.ts +++ b/packages/workflows-sdk/src/index.ts @@ -1,3 +1,4 @@ export * from "./helper" +export * from "./medusa-workflow" export * from "./utils/composer" export * as Composer from "./utils/composer" diff --git a/packages/workflows-sdk/src/medusa-workflow.ts b/packages/workflows-sdk/src/medusa-workflow.ts new file mode 100644 index 0000000000..0a48ef2d72 --- /dev/null +++ b/packages/workflows-sdk/src/medusa-workflow.ts @@ -0,0 +1,27 @@ +import { LocalWorkflow } from "@medusajs/orchestration" +import { LoadedModule, MedusaContainer } from "@medusajs/types" +import { ExportedWorkflow } from "./helper" + +export class MedusaWorkflow { + static workflows: Record< + string, + ( + container?: LoadedModule[] | MedusaContainer + ) => Omit< + LocalWorkflow, + "run" | "registerStepSuccess" | "registerStepFailure" + > & + ExportedWorkflow + > = {} + + static registerWorkflow(workflowId, exportedWorkflow) { + MedusaWorkflow.workflows[workflowId] = exportedWorkflow + } + + static getWorkflow(workflowId) { + return MedusaWorkflow.workflows[workflowId] + } +} + +global.MedusaWorkflow ??= MedusaWorkflow +exports.MedusaWorkflow = global.MedusaWorkflow diff --git a/packages/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/workflows-sdk/src/utils/composer/create-workflow.ts index 546515a65e..072a6cf0d4 100644 --- a/packages/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/workflows-sdk/src/utils/composer/create-workflow.ts @@ -5,8 +5,7 @@ import { } from "@medusajs/orchestration" import { LoadedModule, MedusaContainer } from "@medusajs/types" import { OrchestrationUtils } from "@medusajs/utils" -import { exportWorkflow, FlowRunOptions, WorkflowResult } from "../../helper" -import { resolveValue } from "./helpers" +import { ExportedWorkflow, exportWorkflow } from "../../helper" import { proxify } from "./helpers/proxy" import { CreateWorkflowComposerContext, @@ -66,17 +65,11 @@ global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = null type ReturnWorkflow> = { ( container?: LoadedModule[] | MedusaContainer - ): Omit & { - run: ( - args?: FlowRunOptions< - TDataOverride extends undefined ? TData : TDataOverride - > - ) => Promise< - WorkflowResult< - TResultOverride extends undefined ? TResult : TResultOverride - > - > - } + ): Omit< + LocalWorkflow, + "run" | "registerStepSuccess" | "registerStepFailure" + > & + ExportedWorkflow } & THooks & { getName: () => string } @@ -198,43 +191,19 @@ export function createWorkflow< WorkflowManager.update(name, context.flow, handlers) - const workflow = exportWorkflow(name) + const workflow = exportWorkflow( + name, + returnedStep, + undefined, + { + wrappedInput: true, + } + ) const mainFlow = ( container?: LoadedModule[] | MedusaContainer ) => { const workflow_ = workflow(container) - const originalRun = workflow_.run - - workflow_.run = (async ( - args?: FlowRunOptions< - TDataOverride extends undefined ? TData : TDataOverride - > - ): Promise< - WorkflowResult< - TResultOverride extends undefined ? TResult : TResultOverride - > - > => { - args ??= {} - args.resultFrom ??= - returnedStep?.__type === OrchestrationUtils.SymbolWorkflowStep - ? returnedStep.__step__ - : undefined - - // Forwards the input to the ref object on composer.apply - const workflowResult = (await originalRun( - args - )) as unknown as WorkflowResult< - TResultOverride extends undefined ? TResult : TResultOverride - > - - workflowResult.result = await resolveValue( - workflowResult.result || returnedStep, - workflowResult.transaction.getContext() - ) - - return workflowResult - }) as any return workflow_ } diff --git a/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts b/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts index 8fb49aa423..0fdf7d2881 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts +++ b/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts @@ -41,7 +41,7 @@ export async function resolveValue(input, transactionContext) { if (Array.isArray(inputTOUnwrap)) { return await promiseAll( - inputTOUnwrap.map((i) => unwrapInput(i, transactionContext)) + inputTOUnwrap.map((i) => resolveValue(i, transactionContext)) ) }