feat(medusa): Transaction Orchestrator (#2861)

* chore: transaction orchestrator
This commit is contained in:
Carlos R. L. Rodrigues
2023-01-16 20:16:41 -03:00
committed by GitHub
parent 7d4b8b9cc5
commit 0175388835
6 changed files with 1699 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
---
"@medusajs/medusa": patch
---
feat(medusa): Transaction Orchestrator

View File

@@ -0,0 +1,669 @@
import {
TransactionOrchestrator,
TransactionStepsDefinition,
TransactionHandlerType,
TransactionPayload,
TransactionState,
} from "../../transaction"
describe("Transaction Orchestrator", () => {
it("Should follow the flow by calling steps in order with the correct payload", async () => {
const mocks = {
one: jest.fn().mockImplementation((payload) => {
return payload
}),
two: jest.fn().mockImplementation((payload) => {
return payload
}),
}
async function handler(
actionId: string,
functionHandlerType: TransactionHandlerType,
payload: TransactionPayload
) {
const command = {
firstMethod: {
[TransactionHandlerType.INVOKE]: () => {
mocks.one(payload)
},
},
secondMethod: {
[TransactionHandlerType.INVOKE]: () => {
mocks.two(payload)
},
},
}
return command[actionId][functionHandlerType](payload)
}
const flow: TransactionStepsDefinition = {
next: {
action: "firstMethod",
next: {
action: "secondMethod",
},
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const transaction = await strategy.beginTransaction(
"transaction_id_123",
handler,
{
prop: 123,
}
)
await strategy.resume(transaction)
expect(transaction.transactionId).toBe("transaction_id_123")
expect(transaction.getState()).toBe(TransactionState.DONE)
expect(mocks.one).toBeCalledWith(
expect.objectContaining({
metadata: {
producer: "transaction-name",
reply_to_topic: "trans:transaction-name",
idempotency_key: "transaction_id_123:firstMethod:invoke",
action: "firstMethod",
action_type: "invoke",
attempt: 1,
timestamp: expect.any(Number),
},
data: { prop: 123 },
})
)
expect(mocks.two).toBeCalledWith(
expect.objectContaining({
metadata: {
producer: "transaction-name",
reply_to_topic: "trans:transaction-name",
idempotency_key: "transaction_id_123:secondMethod:invoke",
action: "secondMethod",
action_type: "invoke",
attempt: 1,
timestamp: expect.any(Number),
},
data: { prop: 123 },
})
)
})
it("Should run steps in parallel if 'next' is an array", async () => {
const actionOrder: string[] = []
async function handler(
actionId: string,
functionHandlerType: TransactionHandlerType,
payload: TransactionPayload
) {
return actionOrder.push(actionId)
}
const flow: TransactionStepsDefinition = {
next: [
{
action: "one",
},
{
action: "two",
next: {
action: "four",
next: {
action: "six",
},
},
},
{
action: "three",
next: {
action: "five",
},
},
],
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const transaction = await strategy.beginTransaction(
"transaction_id_123",
handler
)
await strategy.resume(transaction)
expect(actionOrder).toEqual(["one", "two", "three", "four", "five", "six"])
})
it("Should not execute next steps when a step fails", async () => {
const actionOrder: string[] = []
async function handler(
actionId: string,
functionHandlerType: TransactionHandlerType,
payload: TransactionPayload
) {
if (functionHandlerType === TransactionHandlerType.INVOKE) {
actionOrder.push(actionId)
}
if (TransactionHandlerType.INVOKE && actionId === "three") {
throw new Error()
}
}
const flow: TransactionStepsDefinition = {
next: [
{
action: "one",
},
{
action: "two",
next: {
action: "four",
next: {
action: "six",
},
},
},
{
action: "three",
maxRetries: 0,
next: {
action: "five",
},
},
],
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const transaction = await strategy.beginTransaction(
"transaction_id_123",
handler
)
await strategy.resume(transaction)
expect(actionOrder).toEqual(["one", "two", "three"])
})
it("Should forward step response if flag 'forwardResponse' is set to true", async () => {
const mocks = {
one: jest.fn().mockImplementation((data) => {
return { abc: 1234 }
}),
two: jest.fn().mockImplementation((data) => {
return { def: "567" }
}),
three: jest.fn().mockImplementation((data) => {
return { end: true }
}),
}
async function handler(
actionId: string,
functionHandlerType: TransactionHandlerType,
payload: TransactionPayload
) {
const command = {
firstMethod: {
[TransactionHandlerType.INVOKE]: (data) => {
return mocks.one(data)
},
},
secondMethod: {
[TransactionHandlerType.INVOKE]: (data) => {
return mocks.two(data)
},
},
thirdMethod: {
[TransactionHandlerType.INVOKE]: (data) => {
return mocks.three(data)
},
},
}
return command[actionId][functionHandlerType]({ ...payload.data })
}
const flow: TransactionStepsDefinition = {
next: {
action: "firstMethod",
forwardResponse: true,
next: {
action: "secondMethod",
forwardResponse: true,
next: {
action: "thirdMethod",
},
},
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const transaction = await strategy.beginTransaction(
"transaction_id_123",
handler,
{
prop: 123,
}
)
await strategy.resume(transaction)
expect(mocks.one).toBeCalledWith({ prop: 123 })
expect(mocks.two).toBeCalledWith({ prop: 123, _response: { abc: 1234 } })
expect(mocks.three).toBeCalledWith({ prop: 123, _response: { def: "567" } })
})
it("Should continue the exection of next steps without waiting for the execution of all its parents when flag 'noWait' is set to true", async () => {
const actionOrder: string[] = []
async function handler(
actionId: string,
functionHandlerType: TransactionHandlerType,
payload: TransactionPayload
) {
if (functionHandlerType === TransactionHandlerType.INVOKE) {
actionOrder.push(actionId)
}
if (
functionHandlerType === TransactionHandlerType.INVOKE &&
actionId === "three"
) {
throw new Error()
}
}
const flow: TransactionStepsDefinition = {
next: [
{
action: "one",
next: {
action: "five",
},
},
{
action: "two",
noWait: true,
next: {
action: "four",
},
},
{
action: "three",
maxRetries: 0,
},
],
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const transaction = await strategy.beginTransaction(
"transaction_id_123",
handler
)
strategy.resume(transaction)
await new Promise((ok) => {
strategy.on("finish", ok)
})
expect(actionOrder).toEqual(["one", "two", "three", "four"])
})
it("Should retry steps X times when a step fails and compensate steps afterward", async () => {
const mocks = {
one: jest.fn().mockImplementation((payload) => {
return payload
}),
compensateOne: jest.fn().mockImplementation((payload) => {
return payload
}),
two: jest.fn().mockImplementation((payload) => {
throw new Error()
}),
compensateTwo: jest.fn().mockImplementation((payload) => {
return payload
}),
}
async function handler(
actionId: string,
functionHandlerType: TransactionHandlerType,
payload: TransactionPayload
) {
const command = {
firstMethod: {
[TransactionHandlerType.INVOKE]: () => {
mocks.one(payload)
},
[TransactionHandlerType.COMPENSATE]: () => {
mocks.compensateOne(payload)
},
},
secondMethod: {
[TransactionHandlerType.INVOKE]: () => {
mocks.two(payload)
},
[TransactionHandlerType.COMPENSATE]: () => {
mocks.compensateTwo(payload)
},
},
}
return command[actionId][functionHandlerType](payload)
}
const flow: TransactionStepsDefinition = {
next: {
action: "firstMethod",
next: {
action: "secondMethod",
},
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const transaction = await strategy.beginTransaction(
"transaction_id_123",
handler
)
await strategy.resume(transaction)
expect(transaction.transactionId).toBe("transaction_id_123")
expect(mocks.one).toBeCalledTimes(1)
expect(mocks.two).toBeCalledTimes(1 + strategy.DEFAULT_RETRIES)
expect(transaction.getState()).toBe(TransactionState.REVERTED)
expect(mocks.compensateOne).toBeCalledTimes(1)
expect(mocks.two).nthCalledWith(
1,
expect.objectContaining({
metadata: expect.objectContaining({
attempt: 1,
}),
})
)
expect(mocks.two).nthCalledWith(
4,
expect.objectContaining({
metadata: expect.objectContaining({
attempt: 4,
}),
})
)
})
it("Should fail a transaction if any step fails after retrying X time to compensate it", async () => {
const mocks = {
one: jest.fn().mockImplementation((payload) => {
throw new Error()
}),
}
async function handler(
actionId: string,
functionHandlerType: TransactionHandlerType,
payload: TransactionPayload
) {
const command = {
firstMethod: {
[TransactionHandlerType.INVOKE]: () => {
mocks.one(payload)
},
},
}
return command[actionId][functionHandlerType](payload)
}
const flow: TransactionStepsDefinition = {
next: {
action: "firstMethod",
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const transaction = await strategy.beginTransaction(
"transaction_id_123",
handler
)
await strategy.resume(transaction)
expect(mocks.one).toBeCalledTimes(1 + strategy.DEFAULT_RETRIES)
expect(transaction.getState()).toBe(TransactionState.FAILED)
})
it("Should complete a transaction if a failing step has the flag 'continueOnPermanentFailure' set to true", async () => {
const mocks = {
one: jest.fn().mockImplementation((payload) => {
return
}),
two: jest.fn().mockImplementation((payload) => {
throw new Error()
}),
}
async function handler(
actionId: string,
functionHandlerType: TransactionHandlerType,
payload: TransactionPayload
) {
const command = {
firstMethod: {
[TransactionHandlerType.INVOKE]: () => {
mocks.one(payload)
},
},
secondMethod: {
[TransactionHandlerType.INVOKE]: () => {
mocks.two(payload)
},
},
}
return command[actionId][functionHandlerType](payload)
}
const flow: TransactionStepsDefinition = {
next: {
action: "firstMethod",
next: {
action: "secondMethod",
maxRetries: 1,
continueOnPermanentFailure: true,
},
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const transaction = await strategy.beginTransaction(
"transaction_id_123",
handler
)
await strategy.resume(transaction)
expect(transaction.transactionId).toBe("transaction_id_123")
expect(mocks.one).toBeCalledTimes(1)
expect(mocks.two).toBeCalledTimes(2)
expect(transaction.getState()).toBe(TransactionState.DONE)
expect(transaction.isPartiallyCompleted).toBe(true)
})
it("Should hold the status INVOKING while the transaction hasn't finished", async () => {
const mocks = {
one: jest.fn().mockImplementation((payload) => {
return
}),
two: jest.fn().mockImplementation((payload) => {
return
}),
}
async function handler(
actionId: string,
functionHandlerType: TransactionHandlerType,
payload: TransactionPayload
) {
const command = {
firstMethod: {
[TransactionHandlerType.INVOKE]: () => {
mocks.one(payload)
},
},
secondMethod: {
[TransactionHandlerType.INVOKE]: () => {
mocks.two(payload)
},
},
}
return command[actionId][functionHandlerType](payload)
}
const flow: TransactionStepsDefinition = {
next: {
action: "firstMethod",
async: true,
next: {
action: "secondMethod",
},
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const transaction = await strategy.beginTransaction(
"transaction_id_123",
handler
)
await strategy.resume(transaction)
expect(mocks.one).toBeCalledTimes(1)
expect(mocks.two).toBeCalledTimes(0)
expect(transaction.getState()).toBe(TransactionState.INVOKING)
const mocktransactionId = TransactionOrchestrator.getKeyName(
transaction.transactionId,
"firstMethod",
TransactionHandlerType.INVOKE
)
await strategy.registerStepSuccess(
mocktransactionId,
undefined,
transaction
)
expect(transaction.getState()).toBe(TransactionState.DONE)
})
it("Should hold the status COMPENSATING while the transaction hasn't finished compensating", async () => {
const mocks = {
one: jest.fn().mockImplementation((payload) => {
return
}),
compensateOne: jest.fn().mockImplementation((payload) => {
return
}),
two: jest.fn().mockImplementation((payload) => {
return
}),
}
async function handler(
actionId: string,
functionHandlerType: TransactionHandlerType,
payload: TransactionPayload
) {
const command = {
firstMethod: {
[TransactionHandlerType.INVOKE]: () => {
mocks.one(payload)
},
[TransactionHandlerType.COMPENSATE]: () => {
mocks.compensateOne(payload)
},
},
secondMethod: {
[TransactionHandlerType.INVOKE]: () => {
mocks.two(payload)
},
},
}
return command[actionId][functionHandlerType](payload)
}
const flow: TransactionStepsDefinition = {
next: {
action: "firstMethod",
async: true,
next: {
action: "secondMethod",
},
},
}
const strategy = new TransactionOrchestrator("transaction-name", flow)
const transaction = await strategy.beginTransaction(
"transaction_id_123",
handler
)
const mocktransactionId = TransactionOrchestrator.getKeyName(
transaction.transactionId,
"firstMethod",
TransactionHandlerType.INVOKE
)
const registerBeforeAllowed = await strategy
.registerStepFailure(mocktransactionId, null, handler)
.catch((e) => e.message)
await strategy.resume(transaction)
expect(mocks.one).toBeCalledTimes(1)
expect(mocks.compensateOne).toBeCalledTimes(0)
expect(mocks.two).toBeCalledTimes(0)
expect(registerBeforeAllowed).toEqual(
"Cannot set step failure when status is idle"
)
expect(transaction.getState()).toBe(TransactionState.INVOKING)
const resumedTransaction = await strategy.registerStepFailure(
mocktransactionId,
null,
handler
)
expect(resumedTransaction.getState()).toBe(TransactionState.COMPENSATING)
expect(mocks.compensateOne).toBeCalledTimes(1)
const mocktransactionIdCompensate = TransactionOrchestrator.getKeyName(
transaction.transactionId,
"firstMethod",
TransactionHandlerType.COMPENSATE
)
await strategy.registerStepSuccess(
mocktransactionIdCompensate,
undefined,
resumedTransaction
)
expect(resumedTransaction.getState()).toBe(TransactionState.REVERTED)
})
})

View File

@@ -0,0 +1,132 @@
import { TransactionFlow, TransactionHandlerType, TransactionState } from "."
/**
* @typedef {Object} TransactionMetadata
* @property {string} producer - The id of the producer that created the transaction (transactionModelId).
* @property {string} reply_to_topic - The topic to reply to for the transaction.
* @property {string} idempotency_key - The idempotency key of the transaction.
* @property {string} action - The action of the transaction.
* @property {TransactionHandlerType} action_type - The type of the transaction.
* @property {number} attempt - The number of attempts for the transaction.
* @property {number} timestamp - The timestamp of the transaction.
*/
export type TransactionMetadata = {
producer: string
reply_to_topic: string
idempotency_key: string
action: string
action_type: TransactionHandlerType
attempt: number
timestamp: number
}
export class TransactionPayload {
/**
* @param metadata - The metadata of the transaction.
* @param data - The payload data of the transaction and the response of the previous step if forwardResponse is true.
*/
constructor(
public metadata: TransactionMetadata,
public data: Record<string, unknown> & {
_response: Record<string, unknown>
}
) {}
}
/**
* DistributedTransaction represents a distributed transaction, which is a transaction that is composed of multiple steps that are executed in a specific order.
*/
export class DistributedTransaction {
public modelId: string
public transactionId: string
public errors: {
action: string
handlerType: TransactionHandlerType
error: Error | null
}[] = []
constructor(
private flow: TransactionFlow,
public handler: (
actionId: string,
handlerType: TransactionHandlerType,
payload: TransactionPayload
) => Promise<unknown>,
public payload?: any
) {
this.transactionId = flow.transactionId
this.modelId = flow.transactionModelId
}
public getFlow() {
return this.flow
}
public addError(
action: string,
handlerType: TransactionHandlerType,
error: Error | null
) {
this.errors.push({
action,
handlerType,
error,
})
}
public hasFinished(): boolean {
return [
TransactionState.DONE,
TransactionState.REVERTED,
TransactionState.FAILED,
].includes(this.getState())
}
public getState(): TransactionState {
return this.getFlow().state
}
public get isPartiallyCompleted(): boolean {
return !!this.getFlow().hasFailedSteps || !!this.getFlow().hasSkippedSteps
}
public canInvoke(): boolean {
return (
this.getFlow().state === TransactionState.NOT_STARTED ||
this.getFlow().state === TransactionState.INVOKING
)
}
public canRevert(): boolean {
return (
this.getFlow().state === TransactionState.DONE ||
this.getFlow().state === TransactionState.COMPENSATING
)
}
public static keyValueStore: any = {} // TODO: Use Key/Value db
private static keyPrefix = "dtrans:"
public async saveCheckpoint(): Promise<void> {
// TODO: Use Key/Value db to save transactions
const key = DistributedTransaction.keyPrefix + this.transactionId
DistributedTransaction.keyValueStore[key] = JSON.stringify(this.getFlow())
}
public static async loadTransactionFlow(
transactionId: string
): Promise<TransactionFlow | null> {
// TODO: Use Key/Value db to load transactions
const key = DistributedTransaction.keyPrefix + transactionId
if (DistributedTransaction.keyValueStore[key]) {
return JSON.parse(DistributedTransaction.keyValueStore[key])
}
return null
}
public async deleteCheckpoint(): Promise<void> {
// TODO: Delete from Key/Value db
const key = DistributedTransaction.keyPrefix + this.transactionId
delete DistributedTransaction.keyValueStore[key]
}
}

View File

@@ -0,0 +1,47 @@
export enum TransactionHandlerType {
INVOKE = "invoke",
COMPENSATE = "compensate",
}
export type TransactionStepsDefinition = {
action?: string
continueOnPermanentFailure?: boolean
noCompensation?: boolean
maxRetries?: number
retryInterval?: number
timeout?: number
async?: boolean
noWait?: boolean
forwardResponse?: boolean
next?: TransactionStepsDefinition | TransactionStepsDefinition[]
}
export enum TransactionStepStatus {
IDLE = "idle",
OK = "ok",
WAITING = "waiting_response",
TEMPORARY_FAILURE = "temp_failure",
PERMANENT_FAILURE = "permanent_failure",
}
export enum TransactionState {
NOT_STARTED = "not_started",
INVOKING = "invoking",
WAITING_TO_COMPENSATE = "waiting_to_compensate",
COMPENSATING = "compensating",
DONE = "done",
REVERTED = "reverted",
FAILED = "failed",
DORMANT = "dormant",
SKIPPED = "skipped",
}
export type TransactionModel = {
id: string
flow: TransactionStepsDefinition
hash: string
}
export * from "./transaction-orchestrator"
export * from "./transaction-step"
export * from "./distributed-transaction"

View File

@@ -0,0 +1,687 @@
import { EventEmitter } from "events"
import {
TransactionHandlerType,
TransactionStepsDefinition,
TransactionStepStatus,
TransactionState,
TransactionModel,
} from "."
import {
DistributedTransaction,
TransactionPayload,
} from "./distributed-transaction"
import { TransactionStep } from "./transaction-step"
export type TransactionFlow = {
transactionModelId: string
definition: TransactionStepsDefinition
transactionId: string
hasFailedSteps: boolean
hasSkippedSteps: boolean
state: TransactionState
steps: {
[key: string]: TransactionStep
}
}
export type TransactionStepHandler = (
actionId: string,
handlerType: TransactionHandlerType,
payload: TransactionPayload
) => Promise<unknown>
/**
* @class TransactionOrchestrator is responsible for managing and executing distributed transactions.
* It is based on a single transaction definition, which is used to execute all the transaction steps
*/
export class TransactionOrchestrator extends EventEmitter {
private ROOT_STEP = "_root"
private invokeSteps: string[] = []
private compensateSteps: string[] = []
public DEFAULT_RETRIES = 3
constructor(
public id: string,
private definition: TransactionStepsDefinition
) {
super()
}
private static SEPARATOR = ":"
public static getKeyName(...params: string[]): string {
return params.join(this.SEPARATOR)
}
private getPreviousStep(flow: TransactionFlow, step: TransactionStep) {
const id = step.id.split(".")
id.pop()
const parentId = id.join(".")
return flow.steps[parentId]
}
private getInvokeSteps(flow: TransactionFlow): string[] {
if (this.invokeSteps.length) {
return this.invokeSteps
}
const steps = Object.keys(flow.steps)
steps.sort((a, b) => flow.steps[a].depth - flow.steps[b].depth)
this.invokeSteps = steps
return steps
}
private getCompensationSteps(flow: TransactionFlow): string[] {
if (this.compensateSteps.length) {
return this.compensateSteps
}
const steps = Object.keys(flow.steps)
steps.sort(
(a, b) => (flow.steps[b].depth || 0) - (flow.steps[a].depth || 0)
)
this.compensateSteps = steps
return steps
}
private canMoveForward(flow: TransactionFlow, previousStep: TransactionStep) {
const states = [
TransactionState.DONE,
TransactionState.FAILED,
TransactionState.SKIPPED,
]
const siblings = this.getPreviousStep(flow, previousStep).next.map(
(sib) => flow.steps[sib]
)
return (
!!previousStep.definition.noWait ||
siblings.every((sib) => states.includes(sib.invoke.state))
)
}
private canMoveBackward(flow: TransactionFlow, step: TransactionStep) {
const states = [
TransactionState.DONE,
TransactionState.REVERTED,
TransactionState.FAILED,
TransactionState.DORMANT,
]
const siblings = step.next.map((sib) => flow.steps[sib])
return (
siblings.length === 0 ||
siblings.every((sib) => states.includes(sib.compensate.state))
)
}
private canContinue(flow: TransactionFlow, step: TransactionStep): boolean {
if (flow.state == TransactionState.COMPENSATING) {
return this.canMoveBackward(flow, step)
} else {
const previous = this.getPreviousStep(flow, step)
if (previous.id === this.ROOT_STEP) {
return true
}
return this.canMoveForward(flow, previous)
}
}
private checkAllSteps(transaction: DistributedTransaction): {
next: TransactionStep[]
total: number
remaining: number
completed: number
} {
let hasSkipped = false
let hasIgnoredFailure = false
let hasFailed = false
let hasWaiting = false
let hasReverted = false
let completedSteps = 0
const flow = transaction.getFlow()
const nextSteps: TransactionStep[] = []
const allSteps =
flow.state === TransactionState.COMPENSATING
? this.getCompensationSteps(flow)
: this.getInvokeSteps(flow)
for (const step of allSteps) {
if (
step === this.ROOT_STEP ||
!this.canContinue(flow, flow.steps[step])
) {
continue
}
const stepDef = flow.steps[step]
const curState = stepDef.getStates()
if (curState.status === TransactionStepStatus.WAITING) {
hasWaiting = true
if (stepDef.canRetry()) {
nextSteps.push(stepDef)
}
continue
}
if (stepDef.canInvoke(flow.state) || stepDef.canCompensate(flow.state)) {
nextSteps.push(stepDef)
} else {
completedSteps++
if (curState.state === TransactionState.SKIPPED) {
hasSkipped = true
} else if (curState.state === TransactionState.REVERTED) {
hasReverted = true
} else if (curState.state === TransactionState.FAILED) {
if (stepDef.definition.continueOnPermanentFailure) {
hasIgnoredFailure = true
} else {
hasFailed = true
}
}
}
}
const totalSteps = allSteps.length - 1
if (
flow.state === TransactionState.WAITING_TO_COMPENSATE &&
nextSteps.length === 0 &&
!hasWaiting
) {
flow.state = TransactionState.COMPENSATING
this.flagStepsToRevert(flow)
this.emit("compensate", transaction)
return this.checkAllSteps(transaction)
} else if (completedSteps === totalSteps) {
if (hasSkipped) {
flow.hasSkippedSteps = true
}
if (hasIgnoredFailure) {
flow.hasFailedSteps = true
}
if (hasFailed) {
flow.state = TransactionState.FAILED
} else {
flow.state = hasReverted
? TransactionState.REVERTED
: TransactionState.DONE
}
this.emit("finish", transaction)
void transaction.deleteCheckpoint()
}
return {
next: nextSteps,
total: totalSteps,
remaining: totalSteps - completedSteps,
completed: completedSteps,
}
}
private flagStepsToRevert(flow: TransactionFlow): void {
for (const step in flow.steps) {
if (step === this.ROOT_STEP) {
continue
}
const stepDef = flow.steps[step]
const curState = stepDef.getStates()
if (
(curState.state === TransactionState.DONE ||
curState.status === TransactionStepStatus.PERMANENT_FAILURE) &&
!stepDef.definition.noCompensation
) {
stepDef.beginCompensation()
stepDef.changeState(TransactionState.NOT_STARTED)
}
}
}
private async setStepSuccess(
transaction: DistributedTransaction,
step: TransactionStep,
response: unknown
): Promise<void> {
if (step.forwardResponse) {
step.saveResponse(response)
}
step.changeStatus(TransactionStepStatus.OK)
if (step.isCompensating()) {
step.changeState(TransactionState.REVERTED)
} else {
step.changeState(TransactionState.DONE)
}
if (step.definition.async) {
await transaction.saveCheckpoint()
}
}
private async setStepFailure(
transaction: DistributedTransaction,
step: TransactionStep,
error: Error | null,
maxRetries: number = this.DEFAULT_RETRIES
): Promise<void> {
step.failures++
step.changeStatus(TransactionStepStatus.TEMPORARY_FAILURE)
if (step.failures > maxRetries) {
step.changeState(TransactionState.FAILED)
step.changeStatus(TransactionStepStatus.PERMANENT_FAILURE)
transaction.addError(
step.definition.action!,
step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE,
error
)
if (!step.isCompensating()) {
const flow = transaction.getFlow()
if (step.definition.continueOnPermanentFailure) {
for (const childStep of step.next) {
const child = flow.steps[childStep]
child.changeState(TransactionState.SKIPPED)
}
} else {
flow.state = TransactionState.WAITING_TO_COMPENSATE
}
}
}
if (step.definition.async) {
await transaction.saveCheckpoint()
}
}
private async executeNext(
transaction: DistributedTransaction
): Promise<void> {
if (transaction.hasFinished()) {
return
}
const flow = transaction.getFlow()
const nextSteps = this.checkAllSteps(transaction)
const execution: Promise<void | unknown>[] = []
for (const step of nextSteps.next) {
const curState = step.getStates()
const type = step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE
step.lastAttempt = Date.now()
step.attempts++
if (curState.state === TransactionState.NOT_STARTED) {
if (step.isCompensating()) {
step.changeState(TransactionState.COMPENSATING)
} else if (flow.state === TransactionState.INVOKING) {
step.changeState(TransactionState.INVOKING)
}
}
step.changeStatus(TransactionStepStatus.WAITING)
const parent = this.getPreviousStep(flow, step)
let payloadData = transaction.payload
if (parent.forwardResponse) {
if (!payloadData) {
payloadData = {}
}
payloadData._response = parent.getResponse()
}
const payload = new TransactionPayload(
{
producer: flow.transactionModelId,
reply_to_topic: TransactionOrchestrator.getKeyName(
"trans",
flow.transactionModelId
),
idempotency_key: TransactionOrchestrator.getKeyName(
flow.transactionId,
step.definition.action!,
type
),
action: step.definition.action + "",
action_type: type,
attempt: step.attempts,
timestamp: Date.now(),
},
payloadData
)
if (!step.definition.async) {
execution.push(
transaction
.handler(step.definition.action + "", type, payload)
.then(async (response) => {
await this.setStepSuccess(transaction, step, response)
})
.catch(async (error) => {
await this.setStepFailure(
transaction,
step,
error,
step.definition.maxRetries
)
})
)
} else {
execution.push(
transaction
.saveCheckpoint()
.then(async () =>
transaction
.handler(step.definition.action + "", type, payload)
.catch(() => void 0)
)
)
}
}
await Promise.all(execution)
if (nextSteps.next.length > 0) {
await this.executeNext(transaction)
}
}
/**
* Start a new transaction or resume a transaction that has been previously started
* @param transaction - The transaction to resume
*/
public async resume(transaction: DistributedTransaction): Promise<void> {
if (transaction.modelId !== this.id) {
throw new Error(
`TransactionModel "${transaction.modelId}" cannot be orchestrated by "${this.id}" model.`
)
}
if (transaction.hasFinished()) {
return
}
const flow = transaction.getFlow()
if (flow.state === TransactionState.NOT_STARTED) {
flow.state = TransactionState.INVOKING
this.emit("begin", transaction)
} else {
this.emit("resume", transaction)
}
await this.executeNext(transaction)
}
private async createTransactionFlow(
transactionId: string
): Promise<TransactionFlow> {
const model: TransactionFlow = {
transactionModelId: this.id,
transactionId: transactionId,
hasFailedSteps: false,
hasSkippedSteps: false,
state: TransactionState.NOT_STARTED,
definition: this.definition,
steps: this.buildSteps(this.definition),
}
return model
}
private async getTransactionFlowById(
transactionId: string
): Promise<TransactionFlow | null> {
const flow = await DistributedTransaction.loadTransactionFlow(transactionId)
if (flow !== null) {
flow.steps = this.buildSteps(flow.definition, flow.steps)
return flow
}
return null
}
private buildSteps(
flow: TransactionStepsDefinition,
existingSteps?: { [key: string]: TransactionStep }
): { [key: string]: TransactionStep } {
const states: { [key: string]: TransactionStep } = {
[this.ROOT_STEP]: {
id: this.ROOT_STEP,
next: [] as string[],
} as TransactionStep,
}
const actionNames = new Set<string>()
const queue: any[] = [{ obj: flow, level: [this.ROOT_STEP] }]
while (queue.length > 0) {
const { obj, level } = queue.shift()
for (const key in obj) {
// eslint-disable-next-line no-prototype-builtins
if (!obj.hasOwnProperty(key)) {
continue
}
if (typeof obj[key] === "object" && obj[key] !== null) {
queue.push({ obj: obj[key], level: [...level] })
} else if (key === "action") {
if (actionNames.has(obj.action)) {
throw new Error(`Action "${obj.action}" is already defined.`)
}
actionNames.add(obj.action)
level.push(obj.action)
const id = level.join(".")
const parent = level.slice(0, level.length - 1).join(".")
states[parent].next?.push(id)
const definitionCopy = { ...obj }
delete definitionCopy.next
states[id] = Object.assign(
new TransactionStep(),
existingSteps?.[id] || {
id,
depth: level.length - 1,
definition: definitionCopy,
forwardResponse: definitionCopy.forwardResponse,
invoke: {
state: TransactionState.NOT_STARTED,
status: TransactionStepStatus.IDLE,
},
compensate: {
state: TransactionState.DORMANT,
status: TransactionStepStatus.IDLE,
},
attempts: 0,
failures: 0,
lastAttempt: null,
next: [],
}
)
}
}
}
return states
}
/** Create a new transaction
* @param transactionId - unique identifier of the transaction
* @param handler - function to handle action of the transaction
* @param payload - payload to be passed to all the transaction steps
*/
public async beginTransaction(
transactionId: string,
handler: TransactionStepHandler,
payload?: unknown
): Promise<DistributedTransaction> {
let modelFlow = await this.getTransactionFlowById(transactionId)
let newTransaction = false
if (!modelFlow) {
modelFlow = await this.createTransactionFlow(transactionId)
newTransaction = true
}
const transaction = new DistributedTransaction(modelFlow, handler, payload)
if (newTransaction) {
await transaction.saveCheckpoint()
}
return transaction
}
private getStepByAction(
flow: TransactionFlow,
action: string
): TransactionStep | null {
for (const key in flow.steps) {
if (action === flow.steps[key]?.definition?.action) {
return flow.steps[key]
}
}
return null
}
private async getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey: string,
handler?: TransactionStepHandler,
transaction?: DistributedTransaction,
payload?: unknown
): Promise<[DistributedTransaction, TransactionStep]> {
const [transactionId, action, actionType] = responseIdempotencyKey.split(
TransactionOrchestrator.SEPARATOR
)
if (!transaction && !handler) {
throw new Error(
"If a transaction is not provided, the handler is required"
)
}
if (!transaction) {
const existingTransaction = await this.getTransactionFlowById(
transactionId
)
if (existingTransaction === null) {
throw new Error("Transaction could not be found.")
}
transaction = new DistributedTransaction(
existingTransaction,
handler!,
payload
)
}
const step = this.getStepByAction(transaction.getFlow(), action)
if (step === null) {
throw new Error("Action not found.")
} else if (
step.isCompensating()
? actionType !== TransactionHandlerType.COMPENSATE
: actionType !== TransactionHandlerType.INVOKE
) {
throw new Error("Incorrect action type.")
}
return [transaction, step]
}
/** Register a step success for a specific transaction and step
* @param responseIdempotencyKey - The idempotency key for the step
* @param handler - The handler function to execute the step
* @param transaction - The current transaction. If not provided it will be loaded based on the responseIdempotencyKey
* @param payload - The payload of the step
*/
public async registerStepSuccess(
responseIdempotencyKey: string,
handler?: TransactionStepHandler,
transaction?: DistributedTransaction,
payload?: unknown
): Promise<DistributedTransaction> {
const [curTransaction, step] =
await this.getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey,
handler,
transaction,
payload
)
if (step.getStates().status === TransactionStepStatus.WAITING) {
await this.setStepSuccess(curTransaction, step, payload)
this.emit("resume", curTransaction)
await this.executeNext(curTransaction)
} else {
throw new Error(
`Cannot set step success when status is ${step.getStates().status}`
)
}
return curTransaction
}
/**
* Register a step failure for a specific transaction and step
* @param responseIdempotencyKey - The idempotency key for the step
* @param error - The error that caused the failure
* @param handler - The handler function to execute the step
* @param transaction - The current transaction
* @param payload - The payload of the step
*/
public async registerStepFailure(
responseIdempotencyKey: string,
error: Error | null,
handler?: TransactionStepHandler,
transaction?: DistributedTransaction,
payload?: unknown
): Promise<DistributedTransaction> {
const [curTransaction, step] =
await this.getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey,
handler,
transaction,
payload
)
if (step.getStates().status === TransactionStepStatus.WAITING) {
await this.setStepFailure(curTransaction, step, error, 0)
this.emit("resume", curTransaction)
await this.executeNext(curTransaction)
} else {
throw new Error(
`Cannot set step failure when status is ${step.getStates().status}`
)
}
return curTransaction
}
public cancelTransaction(transactionId: string) {
// TODO: stop a transaction while in progress and compensate all executed steps
}
}

View File

@@ -0,0 +1,159 @@
import {
TransactionStepsDefinition,
TransactionStepStatus,
TransactionState,
} from "."
/**
* @class TransactionStep
* @classdesc A class representing a single step in a transaction flow
*/
export class TransactionStep {
/**
* @member id - The id of the step
* @member depth - The depth of the step in the flow
* @member definition - The definition of the step
* @member invoke - The current state and status of the invoke action of the step
* @member compensate - The current state and status of the compensate action of the step
* @member attempts - The number of attempts made to execute the step
* @member failures - The number of failures encountered while executing the step
* @member lastAttempt - The timestamp of the last attempt made to execute the step
* @member next - The ids of the next steps in the flow
* @member response - The response from the last successful execution of the step
* @member forwardResponse - A flag indicating if the response from the previous step should be passed to this step as payload
*/
private stepFailed = false
id: string
depth: number
definition: TransactionStepsDefinition
invoke: {
state: TransactionState
status: TransactionStepStatus
}
compensate: {
state: TransactionState
status: TransactionStepStatus
}
attempts: number
failures: number
lastAttempt: number | null
next: string[]
response: unknown
forwardResponse: boolean
public getStates() {
return this.isCompensating() ? this.compensate : this.invoke
}
public beginCompensation() {
if (this.isCompensating()) {
return
}
this.stepFailed = true
this.attempts = 0
this.failures = 0
this.lastAttempt = null
}
public isCompensating() {
return this.stepFailed
}
public changeState(toState: TransactionState) {
const allowed = {
[TransactionState.DORMANT]: [TransactionState.NOT_STARTED],
[TransactionState.NOT_STARTED]: [
TransactionState.INVOKING,
TransactionState.COMPENSATING,
TransactionState.FAILED,
TransactionState.SKIPPED,
],
[TransactionState.INVOKING]: [
TransactionState.FAILED,
TransactionState.DONE,
],
[TransactionState.COMPENSATING]: [
TransactionState.REVERTED,
TransactionState.FAILED,
],
[TransactionState.DONE]: [TransactionState.COMPENSATING],
}
const curState = this.getStates()
if (
curState.state === toState ||
allowed?.[curState.state]?.includes(toState)
) {
curState.state = toState
return
}
throw new Error(
`Updating State from "${curState.state}" to "${toState}" is not allowed.`
)
}
public changeStatus(toStatus: TransactionStepStatus) {
const allowed = {
[TransactionStepStatus.WAITING]: [
TransactionStepStatus.OK,
TransactionStepStatus.TEMPORARY_FAILURE,
TransactionStepStatus.PERMANENT_FAILURE,
],
[TransactionStepStatus.TEMPORARY_FAILURE]: [
TransactionStepStatus.IDLE,
TransactionStepStatus.PERMANENT_FAILURE,
],
[TransactionStepStatus.PERMANENT_FAILURE]: [TransactionStepStatus.IDLE],
}
const curState = this.getStates()
if (
curState.status === toStatus ||
toStatus === TransactionStepStatus.WAITING ||
allowed?.[curState.status]?.includes(toStatus)
) {
curState.status = toStatus
return
}
throw new Error(
`Updating Status from "${curState.status}" to "${toStatus}" is not allowed.`
)
}
public saveResponse(response) {
this.response = response
}
public getResponse(): unknown {
return this.response
}
canRetry(): boolean {
return !!(
this.lastAttempt &&
this.definition.retryInterval &&
Date.now() - this.lastAttempt > this.definition.retryInterval * 1e3
)
}
canInvoke(flowState: TransactionState): boolean {
const { status, state } = this.getStates()
return (
(!this.isCompensating() &&
state === TransactionState.NOT_STARTED &&
flowState === TransactionState.INVOKING) ||
status === TransactionStepStatus.TEMPORARY_FAILURE
)
}
canCompensate(flowState: TransactionState): boolean {
return (
this.isCompensating() &&
this.getStates().state === TransactionState.NOT_STARTED &&
flowState === TransactionState.COMPENSATING
)
}
}