Feat: @medusajs/workflows (#4553)

feat: medusa workflows
This commit is contained in:
Carlos R. L. Rodrigues
2023-07-25 10:13:14 -03:00
committed by GitHub
parent ae33f4825f
commit f12299deb1
52 changed files with 1358 additions and 331 deletions
@@ -0,0 +1,196 @@
import { TransactionFlow } from "./transaction-orchestrator"
import { TransactionHandlerType, TransactionState } from "./types"
/**
* @typedef TransactionMetadata
* @property model_id - The id of the model_id that created the transaction (modelId).
* @property reply_to_topic - The topic to reply to for the transaction.
* @property idempotency_key - The idempotency key of the transaction.
* @property action - The action of the transaction.
* @property action_type - The type of the transaction.
* @property attempt - The number of attempts for the transaction.
* @property timestamp - The timestamp of the transaction.
*/
export type TransactionMetadata = {
model_id: string
reply_to_topic: string
idempotency_key: string
action: string
action_type: TransactionHandlerType
attempt: number
timestamp: number
}
/**
* @typedef TransactionContext
* @property payload - Object containing the initial payload.
* @property invoke - Object containing responses of Invoke handlers on steps flagged with saveResponse.
* @property compensate - Object containing responses of Compensate handlers on steps flagged with saveResponse.
*/
export class TransactionContext {
constructor(
public payload: unknown = undefined,
public invoke: Record<string, unknown> = {},
public compensate: Record<string, unknown> = {}
) {}
}
export class TransactionStepError {
constructor(
public action: string,
public handlerType: TransactionHandlerType,
public error: Error | any
) {}
}
export class TransactionCheckpoint {
constructor(
public flow: TransactionFlow,
public context: TransactionContext,
public errors: TransactionStepError[] = []
) {}
}
export class TransactionPayload {
/**
* @param metadata - The metadata of the transaction.
* @param data - The initial payload data to begin a transation.
* @param context - Object gathering responses of all steps flagged with saveResponse.
*/
constructor(
public metadata: TransactionMetadata,
public data: Record<string, unknown>,
public context: TransactionContext
) {}
}
/**
* DistributedTransaction represents a distributed transaction, which is a transaction that is composed of multiple steps that are executed in a specific order.
*/
export class DistributedTransaction {
public modelId: string
public transactionId: string
private errors: TransactionStepError[] = []
private context: TransactionContext = new TransactionContext()
constructor(
private flow: TransactionFlow,
public handler: (
actionId: string,
handlerType: TransactionHandlerType,
payload: TransactionPayload
) => Promise<unknown>,
public payload?: any,
errors?: TransactionStepError[],
context?: TransactionContext
) {
this.transactionId = flow.transactionId
this.modelId = flow.modelId
if (errors) {
this.errors = errors
}
this.context.payload = payload
if (context) {
this.context = { ...context }
}
}
public getFlow() {
return this.flow
}
public getContext() {
return this.context
}
public getErrors() {
return this.errors
}
public addError(
action: string,
handlerType: TransactionHandlerType,
error: Error | any
) {
this.errors.push({
action,
handlerType,
error,
})
}
public addResponse(
action: string,
handlerType: TransactionHandlerType,
response: unknown
) {
this.context[handlerType][action] = response
}
public hasFinished(): boolean {
return [
TransactionState.DONE,
TransactionState.REVERTED,
TransactionState.FAILED,
].includes(this.getState())
}
public getState(): TransactionState {
return this.getFlow().state
}
public get isPartiallyCompleted(): boolean {
return !!this.getFlow().hasFailedSteps || !!this.getFlow().hasSkippedSteps
}
public canInvoke(): boolean {
return (
this.getFlow().state === TransactionState.NOT_STARTED ||
this.getFlow().state === TransactionState.INVOKING
)
}
public canRevert(): boolean {
return (
this.getFlow().state === TransactionState.DONE ||
this.getFlow().state === TransactionState.COMPENSATING
)
}
public static keyValueStore: any = {} // TODO: Use Key/Value db
private static keyPrefix = "dtrans:"
public async saveCheckpoint(): Promise<TransactionCheckpoint> {
// TODO: Use Key/Value db to save transactions
const key = DistributedTransaction.keyPrefix + this.transactionId
const data = new TransactionCheckpoint(
this.getFlow(),
this.getContext(),
this.getErrors()
)
DistributedTransaction.keyValueStore[key] = JSON.stringify(data)
return data
}
public static async loadTransaction(
transactionId: string
): Promise<TransactionCheckpoint | null> {
// TODO: Use Key/Value db to load transactions
const key = DistributedTransaction.keyPrefix + transactionId
if (DistributedTransaction.keyValueStore[key]) {
return JSON.parse(DistributedTransaction.keyValueStore[key])
}
return null
}
public async deleteCheckpoint(): Promise<void> {
// TODO: Delete from Key/Value db
const key = DistributedTransaction.keyPrefix + this.transactionId
delete DistributedTransaction.keyValueStore[key]
}
}
@@ -0,0 +1,5 @@
export * from "./types"
export * from "./transaction-orchestrator"
export * from "./transaction-step"
export * from "./distributed-transaction"
export * from "./orchestrator-builder"
@@ -0,0 +1,431 @@
import { TransactionStepsDefinition } from "./types"
interface InternalStep extends TransactionStepsDefinition {
next?: InternalStep | InternalStep[]
depth: number
parent?: InternalStep | null
}
export class OrchestratorBuilder {
protected steps: InternalStep
protected hasChanges_ = false
get hasChanges() {
return this.hasChanges_
}
constructor(steps?: TransactionStepsDefinition) {
this.load(steps)
}
load(steps?: TransactionStepsDefinition) {
this.steps = {
depth: -1,
parent: null,
next: steps
? JSON.parse(
JSON.stringify((steps.action ? steps : steps.next) as InternalStep)
)
: undefined,
}
this.updateDepths(this.steps, {}, 1, -1)
return this
}
addAction(action: string, options: Partial<TransactionStepsDefinition> = {}) {
const step = this.findLastStep()
const newAction = {
action,
depth: step.depth + 1,
parent: step.action,
...options,
} as InternalStep
step.next = newAction
this.hasChanges_ = true
return this
}
replaceAction(
existingAction: string,
action: string,
options: Partial<TransactionStepsDefinition> = {}
) {
const step = this.findOrThrowStepByAction(existingAction)
step.action = action
Object.assign(step, options)
this.hasChanges_ = true
return this
}
insertActionBefore(
existingAction: string,
action: string,
options: Partial<TransactionStepsDefinition> = {}
) {
const parentStep = this.findParentStepByAction(existingAction)
if (parentStep) {
const oldNext = parentStep.next!
const newDepth = parentStep.depth + 1
if (Array.isArray(parentStep.next)) {
const index = parentStep.next.findIndex(
(step) => step.action === existingAction
)
if (index > -1) {
parentStep.next[index] = {
action,
...options,
next: oldNext[index],
depth: newDepth,
} as InternalStep
}
} else {
parentStep.next = {
action,
...options,
next: oldNext,
depth: newDepth,
} as InternalStep
}
this.updateDepths(oldNext as InternalStep, parentStep)
}
this.hasChanges_ = true
return this
}
insertActionAfter(
existingAction: string,
action: string,
options: Partial<TransactionStepsDefinition> = {}
) {
const step = this.findOrThrowStepByAction(existingAction)
const oldNext = step.next
const newDepth = step.depth + 1
step.next = {
action,
...options,
next: oldNext,
depth: newDepth,
parent: step.action,
} as InternalStep
this.updateDepths(oldNext as InternalStep, step.next)
this.hasChanges_ = true
return this
}
protected appendTo(step: InternalStep | string, newStep: InternalStep) {
if (typeof step === "string") {
step = this.findOrThrowStepByAction(step)
}
step.next = {
...newStep,
depth: step.depth + 1,
parent: step.action,
} as InternalStep
this.hasChanges_ = true
return this
}
appendAction(
action: string,
to: string,
options: Partial<TransactionStepsDefinition> = {}
) {
const newAction = {
action,
...options,
} as InternalStep
const branch = this.findLastStep(this.findStepByAction(to))
this.appendTo(branch, newAction)
return this
}
protected move(
actionToMove: string,
targetAction: string,
{
runInParallel,
mergeNext,
}: {
runInParallel?: boolean
mergeNext?: boolean
} = {
runInParallel: false,
mergeNext: false,
}
): OrchestratorBuilder {
const parentActionToMoveStep = this.findParentStepByAction(actionToMove)!
const parentTargetActionStep = this.findParentStepByAction(targetAction)!
const actionToMoveStep = this.findStepByAction(
actionToMove,
parentTargetActionStep
)!
if (!actionToMoveStep) {
throw new Error(
`Action "${actionToMove}" could not be found in the following steps of "${targetAction}"`
)
}
if (Array.isArray(parentActionToMoveStep.next)) {
const index = parentActionToMoveStep.next.findIndex(
(step) => step.action === actionToMove
)
if (index > -1) {
parentActionToMoveStep.next.splice(index, 1)
}
} else {
delete parentActionToMoveStep.next
}
if (runInParallel) {
if (Array.isArray(parentTargetActionStep.next)) {
parentTargetActionStep.next.push(actionToMoveStep)
} else if (parentTargetActionStep.next) {
parentTargetActionStep.next = [
parentTargetActionStep.next,
actionToMoveStep,
]
}
} else {
if (actionToMoveStep.next) {
if (mergeNext) {
if (Array.isArray(actionToMoveStep.next)) {
actionToMoveStep.next.push(
parentTargetActionStep.next as InternalStep
)
} else {
actionToMoveStep.next = [
actionToMoveStep.next,
parentTargetActionStep.next as InternalStep,
]
}
} else {
this.appendTo(
this.findLastStep(actionToMoveStep),
parentTargetActionStep.next as InternalStep
)
}
} else {
actionToMoveStep.next = parentTargetActionStep.next
}
parentTargetActionStep.next = actionToMoveStep
}
this.updateDepths(
actionToMoveStep as InternalStep,
parentTargetActionStep,
1,
parentTargetActionStep.depth
)
this.hasChanges_ = true
return this
}
moveAction(actionToMove: string, targetAction: string): OrchestratorBuilder {
return this.move(actionToMove, targetAction)
}
moveAndMergeNextAction(
actionToMove: string,
targetAction: string
): OrchestratorBuilder {
return this.move(actionToMove, targetAction, { mergeNext: true })
}
mergeActions(where: string, ...actions: string[]) {
actions.unshift(where)
if (actions.length < 2) {
throw new Error("Cannot merge less than two actions")
}
for (const action of actions) {
if (action !== where) {
this.move(action, where, { runInParallel: true })
}
}
return this
}
deleteAction(action: string, steps: InternalStep = this.steps) {
const actionStep = this.findOrThrowStepByAction(action)
const parentStep = this.findParentStepByAction(action, steps)!
if (Array.isArray(parentStep.next)) {
const index = parentStep.next.findIndex((step) => step.action === action)
if (index > -1 && actionStep.next) {
if (actionStep.next) {
parentStep.next[index] = actionStep.next as InternalStep
} else {
parentStep.next.splice(index, 1)
}
}
} else {
parentStep.next = actionStep.next
}
this.updateDepths(
actionStep.next as InternalStep,
parentStep,
1,
parentStep.depth
)
this.hasChanges_ = true
return this
}
pruneAction(action: string) {
const actionStep = this.findOrThrowStepByAction(action)
const parentStep = this.findParentStepByAction(action, this.steps)!
if (Array.isArray(parentStep.next)) {
const index = parentStep.next.findIndex((step) => step.action === action)
if (index > -1) {
parentStep.next.splice(index, 1)
}
} else {
delete parentStep.next
}
this.hasChanges_ = true
return this
}
protected findStepByAction(
action: string,
step: InternalStep = this.steps
): InternalStep | undefined {
if (step.action === action) {
return step
}
if (Array.isArray(step.next)) {
for (const subStep of step.next) {
const found = this.findStepByAction(action, subStep as InternalStep)
if (found) {
return found
}
}
} else if (step.next && typeof step.next === "object") {
return this.findStepByAction(action, step.next as InternalStep)
}
return
}
protected findOrThrowStepByAction(
action: string,
steps: InternalStep = this.steps
): InternalStep {
const step = this.findStepByAction(action, steps)
if (!step) {
throw new Error(`Action "${action}" could not be found`)
}
return step
}
protected findParentStepByAction(
action: string,
step: InternalStep = this.steps
): InternalStep | undefined {
if (!step.next) {
return
}
const nextSteps = Array.isArray(step.next) ? step.next : [step.next]
for (const nextStep of nextSteps) {
if (!nextStep) {
continue
}
if (nextStep.action === action) {
return step
}
const foundStep = this.findParentStepByAction(
action,
nextStep as InternalStep
)
if (foundStep) {
return foundStep
}
}
return
}
protected findLastStep(steps: InternalStep = this.steps): InternalStep {
let step = steps as InternalStep
while (step.next) {
step = Array.isArray(step.next)
? (step.next[step.next.length - 1] as InternalStep)
: (step.next as InternalStep)
}
return step
}
protected updateDepths(
startingStep: InternalStep,
parent,
incr = 1,
beginFrom?: number
): void {
if (!startingStep) {
return
}
const update = (step: InternalStep, parent, beginFrom) => {
step.depth = beginFrom + incr
step.parent = parent.action
if (Array.isArray(step.next)) {
step.next.forEach((nextAction) => update(nextAction, step, step.depth))
} else if (step.next) {
update(step.next, step, step.depth)
}
}
update(startingStep, parent, beginFrom ?? startingStep.depth)
}
build(): TransactionStepsDefinition {
if (!this.steps.next) {
return {}
}
const ignore = ["depth", "parent"]
const result = JSON.parse(
JSON.stringify(
Array.isArray(this.steps.next) ? this.steps : this.steps.next,
null
),
(key, value) => {
if (ignore.includes(key)) {
return
}
return value
}
)
this.hasChanges_ = false
return result
}
}
@@ -0,0 +1,736 @@
import {
DistributedTransaction,
TransactionCheckpoint,
TransactionPayload,
} from "./distributed-transaction"
import {
TransactionHandlerType,
TransactionModel,
TransactionState,
TransactionStepStatus,
TransactionStepsDefinition,
} from "./types"
import { TransactionStep, TransactionStepHandler } from "./transaction-step"
import { EventEmitter } from "events"
export type TransactionFlow = {
modelId: string
definition: TransactionStepsDefinition
transactionId: string
hasFailedSteps: boolean
hasSkippedSteps: boolean
state: TransactionState
steps: {
[key: string]: TransactionStep
}
}
/**
* @class TransactionOrchestrator is responsible for managing and executing distributed transactions.
* It is based on a single transaction definition, which is used to execute all the transaction steps
*/
export class TransactionOrchestrator extends EventEmitter {
private static ROOT_STEP = "_root"
private invokeSteps: string[] = []
private compensateSteps: string[] = []
public static DEFAULT_RETRIES = 0
constructor(
public id: string,
private definition: TransactionStepsDefinition
) {
super()
}
private static SEPARATOR = ":"
public static getKeyName(...params: string[]): string {
return params.join(this.SEPARATOR)
}
private getPreviousStep(flow: TransactionFlow, step: TransactionStep) {
const id = step.id.split(".")
id.pop()
const parentId = id.join(".")
return flow.steps[parentId]
}
private getInvokeSteps(flow: TransactionFlow): string[] {
if (this.invokeSteps.length) {
return this.invokeSteps
}
const steps = Object.keys(flow.steps)
steps.sort((a, b) => flow.steps[a].depth - flow.steps[b].depth)
this.invokeSteps = steps
return steps
}
private getCompensationSteps(flow: TransactionFlow): string[] {
if (this.compensateSteps.length) {
return this.compensateSteps
}
const steps = Object.keys(flow.steps)
steps.sort(
(a, b) => (flow.steps[b].depth || 0) - (flow.steps[a].depth || 0)
)
this.compensateSteps = steps
return steps
}
private canMoveForward(flow: TransactionFlow, previousStep: TransactionStep) {
const states = [
TransactionState.DONE,
TransactionState.FAILED,
TransactionState.SKIPPED,
]
const siblings = this.getPreviousStep(flow, previousStep).next.map(
(sib) => flow.steps[sib]
)
return (
!!previousStep.definition.noWait ||
siblings.every((sib) => states.includes(sib.invoke.state))
)
}
private canMoveBackward(flow: TransactionFlow, step: TransactionStep) {
const states = [
TransactionState.DONE,
TransactionState.REVERTED,
TransactionState.FAILED,
TransactionState.DORMANT,
]
const siblings = step.next.map((sib) => flow.steps[sib])
return (
siblings.length === 0 ||
siblings.every((sib) => states.includes(sib.compensate.state))
)
}
private canContinue(flow: TransactionFlow, step: TransactionStep): boolean {
if (flow.state == TransactionState.COMPENSATING) {
return this.canMoveBackward(flow, step)
} else {
const previous = this.getPreviousStep(flow, step)
if (previous.id === TransactionOrchestrator.ROOT_STEP) {
return true
}
return this.canMoveForward(flow, previous)
}
}
private checkAllSteps(transaction: DistributedTransaction): {
next: TransactionStep[]
total: number
remaining: number
completed: number
} {
let hasSkipped = false
let hasIgnoredFailure = false
let hasFailed = false
let hasWaiting = false
let hasReverted = false
let completedSteps = 0
const flow = transaction.getFlow()
const nextSteps: TransactionStep[] = []
const allSteps =
flow.state === TransactionState.COMPENSATING
? this.getCompensationSteps(flow)
: this.getInvokeSteps(flow)
for (const step of allSteps) {
if (
step === TransactionOrchestrator.ROOT_STEP ||
!this.canContinue(flow, flow.steps[step])
) {
continue
}
const stepDef = flow.steps[step]
const curState = stepDef.getStates()
if (curState.status === TransactionStepStatus.WAITING) {
hasWaiting = true
if (stepDef.canRetry()) {
nextSteps.push(stepDef)
}
continue
}
if (stepDef.canInvoke(flow.state) || stepDef.canCompensate(flow.state)) {
nextSteps.push(stepDef)
} else {
completedSteps++
if (curState.state === TransactionState.SKIPPED) {
hasSkipped = true
} else if (curState.state === TransactionState.REVERTED) {
hasReverted = true
} else if (curState.state === TransactionState.FAILED) {
if (stepDef.definition.continueOnPermanentFailure) {
hasIgnoredFailure = true
} else {
hasFailed = true
}
}
}
}
const totalSteps = allSteps.length - 1
if (
flow.state === TransactionState.WAITING_TO_COMPENSATE &&
nextSteps.length === 0 &&
!hasWaiting
) {
flow.state = TransactionState.COMPENSATING
this.flagStepsToRevert(flow)
this.emit("compensate", transaction)
return this.checkAllSteps(transaction)
} else if (completedSteps === totalSteps) {
if (hasSkipped) {
flow.hasSkippedSteps = true
}
if (hasIgnoredFailure) {
flow.hasFailedSteps = true
}
if (hasFailed) {
flow.state = TransactionState.FAILED
} else {
flow.state = hasReverted
? TransactionState.REVERTED
: TransactionState.DONE
}
this.emit("finish", transaction)
// TODO: check TransactionModel if it should delete the checkpoint when the transaction is done
void transaction.deleteCheckpoint()
}
return {
next: nextSteps,
total: totalSteps,
remaining: totalSteps - completedSteps,
completed: completedSteps,
}
}
private flagStepsToRevert(flow: TransactionFlow): void {
for (const step in flow.steps) {
if (step === TransactionOrchestrator.ROOT_STEP) {
continue
}
const stepDef = flow.steps[step]
const curState = stepDef.getStates()
if (
(curState.state === TransactionState.DONE ||
curState.status === TransactionStepStatus.PERMANENT_FAILURE) &&
!stepDef.definition.noCompensation
) {
stepDef.beginCompensation()
stepDef.changeState(TransactionState.NOT_STARTED)
}
}
}
private static async setStepSuccess(
transaction: DistributedTransaction,
step: TransactionStep,
response: unknown
): Promise<void> {
if (step.saveResponse) {
transaction.addResponse(
step.definition.action!,
step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE,
response
)
}
step.changeStatus(TransactionStepStatus.OK)
if (step.isCompensating()) {
step.changeState(TransactionState.REVERTED)
} else {
step.changeState(TransactionState.DONE)
}
if (step.definition.async) {
await transaction.saveCheckpoint()
}
}
private static async setStepFailure(
transaction: DistributedTransaction,
step: TransactionStep,
error: Error | any,
maxRetries: number = TransactionOrchestrator.DEFAULT_RETRIES
): Promise<void> {
step.failures++
step.changeStatus(TransactionStepStatus.TEMPORARY_FAILURE)
if (step.failures > maxRetries) {
step.changeState(TransactionState.FAILED)
step.changeStatus(TransactionStepStatus.PERMANENT_FAILURE)
transaction.addError(
step.definition.action!,
step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE,
error
)
if (!step.isCompensating()) {
const flow = transaction.getFlow()
if (step.definition.continueOnPermanentFailure) {
for (const childStep of step.next) {
const child = flow.steps[childStep]
child.changeState(TransactionState.SKIPPED)
}
} else {
flow.state = TransactionState.WAITING_TO_COMPENSATE
}
}
}
if (step.definition.async) {
await transaction.saveCheckpoint()
}
}
private async executeNext(
transaction: DistributedTransaction
): Promise<void> {
if (transaction.hasFinished()) {
return
}
const flow = transaction.getFlow()
const nextSteps = this.checkAllSteps(transaction)
const execution: Promise<void | unknown>[] = []
for (const step of nextSteps.next) {
const curState = step.getStates()
const type = step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE
step.lastAttempt = Date.now()
step.attempts++
if (curState.state === TransactionState.NOT_STARTED) {
if (step.isCompensating()) {
step.changeState(TransactionState.COMPENSATING)
} else if (flow.state === TransactionState.INVOKING) {
step.changeState(TransactionState.INVOKING)
}
}
step.changeStatus(TransactionStepStatus.WAITING)
const payload = new TransactionPayload(
{
model_id: flow.modelId,
reply_to_topic: TransactionOrchestrator.getKeyName(
"trans",
flow.modelId
),
idempotency_key: TransactionOrchestrator.getKeyName(
flow.transactionId,
step.definition.action!,
type
),
action: step.definition.action + "",
action_type: type,
attempt: step.attempts,
timestamp: Date.now(),
},
transaction.payload,
transaction.getContext()
)
if (!step.definition.async) {
execution.push(
transaction
.handler(step.definition.action + "", type, payload)
.then(async (response) => {
await TransactionOrchestrator.setStepSuccess(
transaction,
step,
response
)
})
.catch(async (error) => {
await TransactionOrchestrator.setStepFailure(
transaction,
step,
error,
step.definition.maxRetries
)
})
)
} else {
execution.push(
transaction.saveCheckpoint().then(async () =>
transaction
.handler(step.definition.action + "", type, payload)
.catch(async (error) => {
await TransactionOrchestrator.setStepFailure(
transaction,
step,
error,
step.definition.maxRetries
)
})
)
)
}
}
await Promise.all(execution)
if (nextSteps.next.length > 0) {
await this.executeNext(transaction)
}
}
/**
* Start a new transaction or resume a transaction that has been previously started
* @param transaction - The transaction to resume
*/
public async resume(transaction: DistributedTransaction): Promise<void> {
if (transaction.modelId !== this.id) {
throw new Error(
`TransactionModel "${transaction.modelId}" cannot be orchestrated by "${this.id}" model.`
)
}
if (transaction.hasFinished()) {
return
}
const flow = transaction.getFlow()
if (flow.state === TransactionState.NOT_STARTED) {
flow.state = TransactionState.INVOKING
this.emit("begin", transaction)
} else {
this.emit("resume", transaction)
}
await this.executeNext(transaction)
}
/**
* Cancel and revert a transaction compensating all its executed steps. It can be an ongoing transaction or a completed one
* @param transaction - The transaction to be reverted
*/
public async cancelTransaction(
transaction: DistributedTransaction
): Promise<void> {
if (transaction.modelId !== this.id) {
throw new Error(
`TransactionModel "${transaction.modelId}" cannot be orchestrated by "${this.id}" model.`
)
}
const flow = transaction.getFlow()
if (flow.state === TransactionState.FAILED) {
throw new Error(`Cannot revert a perment failed transaction.`)
}
flow.state = TransactionState.WAITING_TO_COMPENSATE
await this.executeNext(transaction)
}
private async createTransactionFlow(
transactionId: string
): Promise<TransactionFlow> {
return {
modelId: this.id,
transactionId: transactionId,
hasFailedSteps: false,
hasSkippedSteps: false,
state: TransactionState.NOT_STARTED,
definition: this.definition,
steps: TransactionOrchestrator.buildSteps(this.definition),
}
}
private static async loadTransactionById(
transactionId: string
): Promise<TransactionCheckpoint | null> {
const transaction = await DistributedTransaction.loadTransaction(
transactionId
)
if (transaction !== null) {
const flow = transaction.flow
transaction.flow.steps = TransactionOrchestrator.buildSteps(
flow.definition,
flow.steps
)
return transaction
}
return null
}
private static buildSteps(
flow: TransactionStepsDefinition,
existingSteps?: { [key: string]: TransactionStep }
): { [key: string]: TransactionStep } {
const states: { [key: string]: TransactionStep } = {
[TransactionOrchestrator.ROOT_STEP]: {
id: TransactionOrchestrator.ROOT_STEP,
next: [] as string[],
} as TransactionStep,
}
const actionNames = new Set<string>()
const queue: any[] = [
{ obj: flow, level: [TransactionOrchestrator.ROOT_STEP] },
]
while (queue.length > 0) {
const { obj, level } = queue.shift()
for (const key in obj) {
// eslint-disable-next-line no-prototype-builtins
if (!obj.hasOwnProperty(key)) {
continue
}
if (typeof obj[key] === "object" && obj[key] !== null) {
queue.push({ obj: obj[key], level: [...level] })
} else if (key === "action") {
if (actionNames.has(obj.action)) {
throw new Error(`Action "${obj.action}" is already defined.`)
}
actionNames.add(obj.action)
level.push(obj.action)
const id = level.join(".")
const parent = level.slice(0, level.length - 1).join(".")
states[parent].next?.push(id)
const definitionCopy = { ...obj }
delete definitionCopy.next
states[id] = Object.assign(
new TransactionStep(),
existingSteps?.[id] || {
id,
depth: level.length - 1,
definition: definitionCopy,
saveResponse: definitionCopy.saveResponse ?? true,
invoke: {
state: TransactionState.NOT_STARTED,
status: TransactionStepStatus.IDLE,
},
compensate: {
state: TransactionState.DORMANT,
status: TransactionStepStatus.IDLE,
},
attempts: 0,
failures: 0,
lastAttempt: null,
next: [],
}
)
}
}
}
return states
}
/** Create a new transaction
* @param transactionId - unique identifier of the transaction
* @param handler - function to handle action of the transaction
* @param payload - payload to be passed to all the transaction steps
*/
public async beginTransaction(
transactionId: string,
handler: TransactionStepHandler,
payload?: unknown
): Promise<DistributedTransaction> {
const existingTransaction =
await TransactionOrchestrator.loadTransactionById(transactionId)
let newTransaction = false
let modelFlow
if (!existingTransaction) {
modelFlow = await this.createTransactionFlow(transactionId)
newTransaction = true
} else {
modelFlow = existingTransaction.flow
}
const transaction = new DistributedTransaction(
modelFlow,
handler,
payload,
existingTransaction?.errors,
existingTransaction?.context
)
if (newTransaction) {
await transaction.saveCheckpoint()
}
return transaction
}
private static getStepByAction(
flow: TransactionFlow,
action: string
): TransactionStep | null {
for (const key in flow.steps) {
if (action === flow.steps[key]?.definition?.action) {
return flow.steps[key]
}
}
return null
}
private static async getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey: string,
handler?: TransactionStepHandler,
transaction?: DistributedTransaction
): Promise<[DistributedTransaction, TransactionStep]> {
const [transactionId, action, actionType] = responseIdempotencyKey.split(
TransactionOrchestrator.SEPARATOR
)
if (!transaction && !handler) {
throw new Error(
"If a transaction is not provided, the handler is required"
)
}
if (!transaction) {
const existingTransaction =
await TransactionOrchestrator.loadTransactionById(transactionId)
if (existingTransaction === null) {
throw new Error(`Transaction ${transactionId} could not be found.`)
}
transaction = new DistributedTransaction(
existingTransaction.flow,
handler!,
undefined,
existingTransaction.errors,
existingTransaction.context
)
}
const step = TransactionOrchestrator.getStepByAction(
transaction.getFlow(),
action
)
if (step === null) {
throw new Error("Action not found.")
} else if (
step.isCompensating()
? actionType !== TransactionHandlerType.COMPENSATE
: actionType !== TransactionHandlerType.INVOKE
) {
throw new Error("Incorrect action type.")
}
return [transaction, step]
}
/** Register a step success for a specific transaction and step
* @param responseIdempotencyKey - The idempotency key for the step
* @param handler - The handler function to execute the step
* @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey
* @param response - The response of the step
*/
public async registerStepSuccess(
responseIdempotencyKey: string,
handler?: TransactionStepHandler,
transaction?: DistributedTransaction,
response?: unknown
): Promise<DistributedTransaction> {
const [curTransaction, step] =
await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey,
handler,
transaction
)
if (step.getStates().status === TransactionStepStatus.WAITING) {
await TransactionOrchestrator.setStepSuccess(
curTransaction,
step,
response
)
this.emit("resume", curTransaction)
await this.executeNext(curTransaction)
} else {
throw new Error(
`Cannot set step success when status is ${step.getStates().status}`
)
}
return curTransaction
}
/**
* Register a step failure for a specific transaction and step
* @param responseIdempotencyKey - The idempotency key for the step
* @param error - The error that caused the failure
* @param handler - The handler function to execute the step
* @param transaction - The current transaction
* @param response - The response of the step
*/
public async registerStepFailure(
responseIdempotencyKey: string,
error?: Error | any,
handler?: TransactionStepHandler,
transaction?: DistributedTransaction
): Promise<DistributedTransaction> {
const [curTransaction, step] =
await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey,
handler,
transaction
)
if (step.getStates().status === TransactionStepStatus.WAITING) {
await TransactionOrchestrator.setStepFailure(
curTransaction,
step,
error,
0
)
this.emit("resume", curTransaction)
await this.executeNext(curTransaction)
} else {
throw new Error(
`Cannot set step failure when status is ${step.getStates().status}`
)
}
return curTransaction
}
}
@@ -0,0 +1,157 @@
import { TransactionPayload } from "./distributed-transaction"
import {
TransactionStepsDefinition,
TransactionStepStatus,
TransactionState,
TransactionHandlerType,
} from "./types"
export type TransactionStepHandler = (
actionId: string,
handlerType: TransactionHandlerType,
payload: TransactionPayload
) => Promise<unknown>
/**
* @class TransactionStep
* @classdesc A class representing a single step in a transaction flow
*/
export class TransactionStep {
/**
* @member id - The id of the step
* @member depth - The depth of the step in the flow
* @member definition - The definition of the step
* @member invoke - The current state and status of the invoke action of the step
* @member compensate - The current state and status of the compensate action of the step
* @member attempts - The number of attempts made to execute the step
* @member failures - The number of failures encountered while executing the step
* @member lastAttempt - The timestamp of the last attempt made to execute the step
* @member next - The ids of the next steps in the flow
* @member saveResponse - A flag indicating if the response of a step should be shared in the transaction context and available to subsequent steps - default is true
*/
private stepFailed = false
id: string
depth: number
definition: TransactionStepsDefinition
invoke: {
state: TransactionState
status: TransactionStepStatus
}
compensate: {
state: TransactionState
status: TransactionStepStatus
}
attempts: number
failures: number
lastAttempt: number | null
next: string[]
saveResponse: boolean
public getStates() {
return this.isCompensating() ? this.compensate : this.invoke
}
public beginCompensation() {
if (this.isCompensating()) {
return
}
this.stepFailed = true
this.attempts = 0
this.failures = 0
this.lastAttempt = null
}
public isCompensating() {
return this.stepFailed
}
public changeState(toState: TransactionState) {
const allowed = {
[TransactionState.DORMANT]: [TransactionState.NOT_STARTED],
[TransactionState.NOT_STARTED]: [
TransactionState.INVOKING,
TransactionState.COMPENSATING,
TransactionState.FAILED,
TransactionState.SKIPPED,
],
[TransactionState.INVOKING]: [
TransactionState.FAILED,
TransactionState.DONE,
],
[TransactionState.COMPENSATING]: [
TransactionState.REVERTED,
TransactionState.FAILED,
],
[TransactionState.DONE]: [TransactionState.COMPENSATING],
}
const curState = this.getStates()
if (
curState.state === toState ||
allowed?.[curState.state]?.includes(toState)
) {
curState.state = toState
return
}
throw new Error(
`Updating State from "${curState.state}" to "${toState}" is not allowed.`
)
}
public changeStatus(toStatus: TransactionStepStatus) {
const allowed = {
[TransactionStepStatus.WAITING]: [
TransactionStepStatus.OK,
TransactionStepStatus.TEMPORARY_FAILURE,
TransactionStepStatus.PERMANENT_FAILURE,
],
[TransactionStepStatus.TEMPORARY_FAILURE]: [
TransactionStepStatus.IDLE,
TransactionStepStatus.PERMANENT_FAILURE,
],
[TransactionStepStatus.PERMANENT_FAILURE]: [TransactionStepStatus.IDLE],
}
const curState = this.getStates()
if (
curState.status === toStatus ||
toStatus === TransactionStepStatus.WAITING ||
allowed?.[curState.status]?.includes(toStatus)
) {
curState.status = toStatus
return
}
throw new Error(
`Updating Status from "${curState.status}" to "${toStatus}" is not allowed.`
)
}
canRetry(): boolean {
return !!(
this.lastAttempt &&
this.definition.retryInterval &&
Date.now() - this.lastAttempt > this.definition.retryInterval * 1e3
)
}
canInvoke(flowState: TransactionState): boolean {
const { status, state } = this.getStates()
return (
(!this.isCompensating() &&
state === TransactionState.NOT_STARTED &&
flowState === TransactionState.INVOKING) ||
status === TransactionStepStatus.TEMPORARY_FAILURE
)
}
canCompensate(flowState: TransactionState): boolean {
return (
this.isCompensating() &&
this.getStates().state === TransactionState.NOT_STARTED &&
flowState === TransactionState.COMPENSATING
)
}
}
@@ -0,0 +1,43 @@
export enum TransactionHandlerType {
INVOKE = "invoke",
COMPENSATE = "compensate",
}
export type TransactionStepsDefinition = {
action?: string
continueOnPermanentFailure?: boolean
noCompensation?: boolean
maxRetries?: number
retryInterval?: number
timeout?: number
async?: boolean
noWait?: boolean
saveResponse?: boolean
next?: TransactionStepsDefinition | TransactionStepsDefinition[]
}
export enum TransactionStepStatus {
IDLE = "idle",
OK = "ok",
WAITING = "waiting_response",
TEMPORARY_FAILURE = "temp_failure",
PERMANENT_FAILURE = "permanent_failure",
}
export enum TransactionState {
NOT_STARTED = "not_started",
INVOKING = "invoking",
WAITING_TO_COMPENSATE = "waiting_to_compensate",
COMPENSATING = "compensating",
DONE = "done",
REVERTED = "reverted",
FAILED = "failed",
DORMANT = "dormant",
SKIPPED = "skipped",
}
export type TransactionModel = {
id: string
flow: TransactionStepsDefinition
hash: string
}