Feat(medusa) - Orchestrator builder (#4472)

* chore: Trasanction Orchestrator builder

* Feat(medusa): Workflow Manager (#4506)
This commit is contained in:
Carlos R. L. Rodrigues
2023-07-13 10:53:55 -03:00
committed by GitHub
parent fe6586e560
commit 43427b8893
11 changed files with 1408 additions and 112 deletions

View File

@@ -0,0 +1,5 @@
---
"@medusajs/medusa": patch
---
Transaction Orchestrator Flow Builder

View File

@@ -0,0 +1,435 @@
import { OrchestratorBuilder } from "../../transaction/orchestrator-builder"
describe("OrchestratorBuilder", () => {
let builder: OrchestratorBuilder
beforeEach(() => {
builder = new OrchestratorBuilder()
})
it("should load a TransactionStepsDefinition", () => {
builder.load({ action: "foo" })
expect(builder.build()).toEqual({
action: "foo",
})
})
it("should add a new action after the last action set", () => {
builder.addAction("foo")
expect(builder.build()).toEqual({
action: "foo",
})
builder.addAction("bar")
expect(builder.build()).toEqual({
action: "foo",
next: {
action: "bar",
},
})
})
it("should replace an action by another keeping its next steps", () => {
builder.addAction("foo").addAction("axe").replaceAction("foo", "bar")
expect(builder.build()).toEqual({
action: "bar",
next: {
action: "axe",
},
})
})
it("should insert a new action before an existing action", () => {
builder.addAction("foo").addAction("bar").insertActionBefore("bar", "axe")
expect(builder.build()).toEqual({
action: "foo",
next: {
action: "axe",
next: {
action: "bar",
},
},
})
})
it("should insert a new action after an existing action", () => {
builder.addAction("foo").addAction("axe").insertActionAfter("foo", "bar")
expect(builder.build()).toEqual({
action: "foo",
next: {
action: "bar",
next: {
action: "axe",
},
},
})
})
it("should move an existing action and its next steps to another place. the destination will become next steps of the final branch", () => {
builder
.addAction("foo")
.addAction("bar")
.addAction("axe")
.addAction("zzz")
.moveAction("axe", "foo")
expect(builder.build()).toEqual({
action: "axe",
next: {
action: "zzz",
next: {
action: "foo",
next: {
action: "bar",
},
},
},
})
})
it("should merge two action to run in parallel", () => {
builder
.addAction("foo")
.addAction("bar")
.addAction("axe")
.mergeActions("foo", "axe")
expect(builder.build()).toEqual({
next: [
{
action: "foo",
next: { action: "bar" },
},
{ action: "axe" },
],
})
})
it("should merge multiple actions to run in parallel", () => {
builder
.addAction("foo")
.addAction("bar")
.addAction("axe")
.addAction("step")
.mergeActions("bar", "axe", "step")
expect(builder.build()).toEqual({
action: "foo",
next: [
{
action: "bar",
},
{
action: "axe",
},
{
action: "step",
},
],
})
})
it("should delete an action", () => {
builder.addAction("foo").deleteAction("foo")
expect(builder.build()).toEqual({})
})
it("should delete an action and keep all the next steps of that branch", () => {
builder
.addAction("foo")
.addAction("bar")
.addAction("axe")
.deleteAction("bar")
expect(builder.build()).toEqual({
action: "foo",
next: {
action: "axe",
},
})
})
it("should delete an action and remove all the next steps of that branch", () => {
builder
.addAction("foo")
.addAction("bar")
.addAction("axe")
.addAction("step")
.pruneAction("bar")
expect(builder.build()).toEqual({
action: "foo",
})
})
it("should append a new action to the end of a given action's branch", () => {
builder
.load({
action: "foo",
next: [
{
action: "bar",
next: {
action: "zzz",
},
},
{
action: "axe",
},
],
})
.appendAction("step", "bar", { saveResponse: true })
expect(builder.build()).toEqual({
action: "foo",
next: [
{
action: "bar",
next: {
action: "zzz",
next: {
action: "step",
saveResponse: true,
},
},
},
{
action: "axe",
},
],
})
})
describe("Composing Complex Transactions", () => {
const loadedFlow = {
next: {
action: "createProduct",
saveResponse: true,
next: {
action: "attachToSalesChannel",
saveResponse: true,
next: {
action: "createPrices",
saveResponse: true,
next: {
action: "createInventoryItems",
saveResponse: true,
next: {
action: "attachInventoryItems",
noCompensation: true,
},
},
},
},
},
}
it("should load a transaction and add two steps", () => {
const builder = new OrchestratorBuilder(loadedFlow)
builder
.addAction("step_1", { saveResponse: true })
.addAction("step_2", { saveResponse: true })
expect(builder.build()).toEqual({
action: "createProduct",
saveResponse: true,
next: {
action: "attachToSalesChannel",
saveResponse: true,
next: {
action: "createPrices",
saveResponse: true,
next: {
action: "createInventoryItems",
saveResponse: true,
next: {
action: "attachInventoryItems",
noCompensation: true,
next: {
action: "step_1",
saveResponse: true,
next: {
action: "step_2",
saveResponse: true,
},
},
},
},
},
},
})
})
it("should load a transaction, add 2 steps and merge step_1 to run in parallel with createProduct", () => {
const builder = new OrchestratorBuilder(loadedFlow)
builder
.addAction("step_1", { saveResponse: true })
.addAction("step_2", { saveResponse: true })
.mergeActions("createProduct", "step_1")
expect(builder.build()).toEqual({
next: [
{
action: "createProduct",
saveResponse: true,
next: {
action: "attachToSalesChannel",
saveResponse: true,
next: {
action: "createPrices",
saveResponse: true,
next: {
action: "createInventoryItems",
saveResponse: true,
next: {
action: "attachInventoryItems",
noCompensation: true,
},
},
},
},
},
{
action: "step_1",
saveResponse: true,
next: {
action: "step_2",
saveResponse: true,
},
},
],
})
})
it("should load a transaction, add 2 steps and move 'step_1' and all its next steps to run before 'createPrices'", () => {
const builder = new OrchestratorBuilder(loadedFlow)
builder
.addAction("step_1", { saveResponse: true })
.addAction("step_2", { saveResponse: true })
.moveAction("step_1", "createPrices")
expect(builder.build()).toEqual({
action: "createProduct",
saveResponse: true,
next: {
action: "attachToSalesChannel",
saveResponse: true,
next: {
action: "step_1",
saveResponse: true,
next: {
action: "step_2",
saveResponse: true,
next: {
action: "createPrices",
saveResponse: true,
next: {
action: "createInventoryItems",
saveResponse: true,
next: {
action: "attachInventoryItems",
noCompensation: true,
},
},
},
},
},
},
})
})
it("should load a transaction, add 2 steps and move 'step_1' to run before 'createPrices' and merge next steps", () => {
const builder = new OrchestratorBuilder(loadedFlow)
builder
.addAction("step_1", { saveResponse: true })
.addAction("step_2", { saveResponse: true })
.moveAndMergeNextAction("step_1", "createPrices")
expect(builder.build()).toEqual({
action: "createProduct",
saveResponse: true,
next: {
action: "attachToSalesChannel",
saveResponse: true,
next: {
action: "step_1",
saveResponse: true,
next: [
{
action: "step_2",
saveResponse: true,
},
{
action: "createPrices",
saveResponse: true,
next: {
action: "createInventoryItems",
saveResponse: true,
next: {
action: "attachInventoryItems",
noCompensation: true,
},
},
},
],
},
},
})
})
it("Fully compose a complex transaction", () => {
const builder = new OrchestratorBuilder()
builder
.addAction("step_1", { saveResponse: true })
.addAction("step_2", { saveResponse: true })
.addAction("step_3", { saveResponse: true })
builder.insertActionBefore("step_3", "step_2.5", {
saveResponse: false,
noCompensation: true,
})
builder.insertActionAfter("step_1", "step_1.1", { saveResponse: true })
builder.insertActionAfter("step_3", "step_4", { async: false })
builder
.mergeActions("step_2", "step_2.5", "step_3")
.addAction("step_5", { noCompensation: true })
builder.deleteAction("step_3")
expect(builder.build()).toEqual({
action: "step_1",
saveResponse: true,
next: {
action: "step_1.1",
saveResponse: true,
next: [
{
action: "step_2",
saveResponse: true,
},
{
action: "step_2.5",
saveResponse: false,
noCompensation: true,
},
{
action: "step_4",
async: false,
next: {
action: "step_5",
noCompensation: true,
},
},
],
},
})
})
})
})

View File

@@ -68,7 +68,7 @@ describe("Transaction Orchestrator", () => {
expect(mocks.one).toBeCalledWith(
expect.objectContaining({
metadata: {
producer: "transaction-name",
model_id: "transaction-name",
reply_to_topic: "trans:transaction-name",
idempotency_key: "transaction_id_123:firstMethod:invoke",
action: "firstMethod",
@@ -83,7 +83,7 @@ describe("Transaction Orchestrator", () => {
expect(mocks.two).toBeCalledWith(
expect.objectContaining({
metadata: {
producer: "transaction-name",
model_id: "transaction-name",
reply_to_topic: "trans:transaction-name",
idempotency_key: "transaction_id_123:secondMethod:invoke",
action: "secondMethod",
@@ -191,7 +191,7 @@ describe("Transaction Orchestrator", () => {
expect(actionOrder).toEqual(["one", "two", "three"])
})
it("Should store invoke's step response if flag 'saveResponse' is set to true", async () => {
it("Should store invoke's step response by default or if flag 'saveResponse' is set to true and ignore it if set to false", async () => {
const mocks = {
one: jest.fn().mockImplementation((data) => {
return { abc: 1234 }
@@ -244,15 +244,13 @@ describe("Transaction Orchestrator", () => {
const flow: TransactionStepsDefinition = {
next: {
action: "firstMethod",
saveResponse: true,
next: {
action: "secondMethod",
saveResponse: true,
next: {
action: "thirdMethod",
saveResponse: true,
next: {
action: "fourthMethod",
saveResponse: false,
},
},
},
@@ -275,6 +273,9 @@ describe("Transaction Orchestrator", () => {
expect(mocks.three).toBeCalledWith(
{ prop: 123 },
{
payload: {
prop: 123,
},
invoke: {
firstMethod: { abc: 1234 },
secondMethod: { def: "567" },
@@ -662,7 +663,10 @@ describe("Transaction Orchestrator", () => {
const transaction = await strategy.beginTransaction(
"transaction_id_123",
handler
handler,
{
myPayloadProp: "test",
}
)
await strategy.resume(transaction)

View File

@@ -0,0 +1,176 @@
import { WorkflowManager } from "../../transaction/workflow-manager"
import { TransactionState } from "../../transaction/types"
describe("WorkflowManager", () => {
const container: any = {}
let handlers
let flow: WorkflowManager
let asyncStepIdempotencyKey: string
beforeEach(() => {
jest.resetAllMocks()
WorkflowManager.unregisterAll()
handlers = new Map()
handlers.set("foo", {
invoke: jest.fn().mockResolvedValue({ done: true }),
compensate: jest.fn(() => {}),
})
handlers.set("bar", {
invoke: jest.fn().mockResolvedValue({ done: true }),
compensate: jest.fn().mockResolvedValue({}),
})
handlers.set("broken", {
invoke: jest.fn(() => {
throw new Error("Step Failed")
}),
compensate: jest.fn().mockResolvedValue({ bar: 123, reverted: true }),
})
handlers.set("callExternal", {
invoke: jest.fn((container, payload, invoke, metadata) => {
asyncStepIdempotencyKey = metadata.idempotency_key
}),
})
WorkflowManager.register(
"create-product",
{
action: "foo",
next: {
action: "bar",
},
},
handlers
)
WorkflowManager.register(
"broken-delivery",
{
action: "foo",
next: {
action: "broken",
},
},
handlers
)
WorkflowManager.register(
"deliver-product",
{
action: "foo",
next: {
action: "callExternal",
async: true,
noCompensation: true,
next: {
action: "bar",
},
},
},
handlers
)
flow = new WorkflowManager(container)
})
it("should return all registered workflows", () => {
const wf = Object.keys(Object.fromEntries(WorkflowManager.getWorkflows()))
expect(wf).toEqual(["create-product", "broken-delivery", "deliver-product"])
})
it("should begin a transaction and returns its final state", async () => {
const transaction = await flow.begin("create-product", "t-id", {
input: 123,
})
expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("foo").compensate).toHaveBeenCalledTimes(0)
expect(handlers.get("foo").compensate).toHaveBeenCalledTimes(0)
expect(transaction.getState()).toBe(TransactionState.DONE)
})
it("should begin a transaction and revert it when fail", async () => {
const transaction = await flow.begin("broken-delivery", "t-id")
expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("broken").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("foo").compensate).toHaveBeenCalledTimes(1)
expect(handlers.get("broken").compensate).toHaveBeenCalledTimes(1)
expect(transaction.getState()).toBe(TransactionState.REVERTED)
})
it("should continue an asyncronous transaction after reporting a successful step", async () => {
const transaction = await flow.begin("deliver-product", "t-id")
expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0)
expect(transaction.getState()).toBe(TransactionState.INVOKING)
const continuation = await flow.registerStepSuccess(
"deliver-product",
asyncStepIdempotencyKey,
{ ok: true }
)
expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(1)
expect(continuation.getState()).toBe(TransactionState.DONE)
})
it("should revert an asyncronous transaction after reporting a failure step", async () => {
const transaction = await flow.begin("deliver-product", "t-id")
expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0)
expect(transaction.getState()).toBe(TransactionState.INVOKING)
const continuation = await flow.registerStepFailure(
"deliver-product",
asyncStepIdempotencyKey,
{ ok: true }
)
expect(handlers.get("foo").compensate).toHaveBeenCalledTimes(1)
expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0)
expect(handlers.get("bar").compensate).toHaveBeenCalledTimes(0)
// Failed because the async is flagged as noCompensation
expect(continuation.getState()).toBe(TransactionState.FAILED)
})
it("should update an existing flow with a new step and a new handler", async () => {
const definition =
WorkflowManager.getTransactionDefinition("create-product")
definition.insertActionBefore("bar", "xor", { maxRetries: 3 })
const additionalHandlers = new Map()
additionalHandlers.set("xor", {
invoke: jest.fn().mockResolvedValue({ done: true }),
compensate: jest.fn().mockResolvedValue({}),
})
WorkflowManager.update("create-product", definition, additionalHandlers)
const transaction = await flow.begin("create-product", "t-id")
console.log(transaction)
expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1)
expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(1)
expect(additionalHandlers.get("xor").invoke).toHaveBeenCalledTimes(1)
expect(transaction.getState()).toBe(TransactionState.DONE)
})
})

View File

@@ -1,8 +1,9 @@
import { TransactionFlow, TransactionHandlerType, TransactionState } from "."
import { TransactionFlow } from "./transaction-orchestrator"
import { TransactionHandlerType, TransactionState } from "./types"
/**
* @typedef TransactionMetadata
* @property producer - The id of the producer that created the transaction (transactionModelId).
* @property model_id - The id of the model_id that created the transaction (modelId).
* @property reply_to_topic - The topic to reply to for the transaction.
* @property idempotency_key - The idempotency key of the transaction.
* @property action - The action of the transaction.
@@ -11,7 +12,7 @@ import { TransactionFlow, TransactionHandlerType, TransactionState } from "."
* @property timestamp - The timestamp of the transaction.
*/
export type TransactionMetadata = {
producer: string
model_id: string
reply_to_topic: string
idempotency_key: string
action: string
@@ -22,11 +23,13 @@ export type TransactionMetadata = {
/**
* @typedef TransactionContext
* @property payload - Object containing the initial payload.
* @property invoke - Object containing responses of Invoke handlers on steps flagged with saveResponse.
* @property compensate - Object containing responses of Compensate handlers on steps flagged with saveResponse.
*/
export class TransactionContext {
constructor(
public payload: unknown = undefined,
public invoke: Record<string, unknown> = {},
public compensate: Record<string, unknown> = {}
) {}
@@ -36,7 +39,7 @@ export class TransactionStepError {
constructor(
public action: string,
public handlerType: TransactionHandlerType,
public error: Error | null
public error: Error | any
) {}
}
@@ -85,14 +88,15 @@ export class DistributedTransaction {
context?: TransactionContext
) {
this.transactionId = flow.transactionId
this.modelId = flow.transactionModelId
this.modelId = flow.modelId
if (errors) {
this.errors = errors
}
this.context.payload = payload
if (context) {
this.context = context
this.context = { ...context }
}
}
@@ -111,7 +115,7 @@ export class DistributedTransaction {
public addError(
action: string,
handlerType: TransactionHandlerType,
error: Error | null
error: Error | any
) {
this.errors.push({
action,

View File

@@ -1,47 +1,4 @@
export enum TransactionHandlerType {
INVOKE = "invoke",
COMPENSATE = "compensate",
}
export type TransactionStepsDefinition = {
action?: string
continueOnPermanentFailure?: boolean
noCompensation?: boolean
maxRetries?: number
retryInterval?: number
timeout?: number
async?: boolean
noWait?: boolean
saveResponse?: boolean
next?: TransactionStepsDefinition | TransactionStepsDefinition[]
}
export enum TransactionStepStatus {
IDLE = "idle",
OK = "ok",
WAITING = "waiting_response",
TEMPORARY_FAILURE = "temp_failure",
PERMANENT_FAILURE = "permanent_failure",
}
export enum TransactionState {
NOT_STARTED = "not_started",
INVOKING = "invoking",
WAITING_TO_COMPENSATE = "waiting_to_compensate",
COMPENSATING = "compensating",
DONE = "done",
REVERTED = "reverted",
FAILED = "failed",
DORMANT = "dormant",
SKIPPED = "skipped",
}
export type TransactionModel = {
id: string
flow: TransactionStepsDefinition
hash: string
}
export * from "./types"
export * from "./transaction-orchestrator"
export * from "./transaction-step"
export * from "./distributed-transaction"

View File

@@ -0,0 +1,422 @@
import { TransactionStepsDefinition } from "./types"
export type ActionHandler = {
[type: string]: (data: any, context: any) => Promise<any>
}
interface InternalStep extends TransactionStepsDefinition {
next?: InternalStep | InternalStep[]
depth: number
parent?: InternalStep | null
}
export class OrchestratorBuilder {
private steps: InternalStep
constructor(steps?: TransactionStepsDefinition) {
this.load(steps)
}
load(steps?: TransactionStepsDefinition) {
this.steps = {
depth: -1,
parent: null,
next: steps
? JSON.parse(
JSON.stringify((steps.action ? steps : steps.next) as InternalStep)
)
: undefined,
}
this.updateDepths(this.steps, {}, 1, -1)
return this
}
addAction(action: string, options: Partial<TransactionStepsDefinition> = {}) {
const step = this.findLastStep()
const newAction = {
action,
depth: step.depth + 1,
parent: step.action,
...options,
} as InternalStep
step.next = newAction
return this
}
replaceAction(
existingAction: string,
action: string,
options: Partial<TransactionStepsDefinition> = {}
) {
const step = this.findOrThrowStepByAction(existingAction)
step.action = action
Object.assign(step, options)
return this
}
insertActionBefore(
existingAction: string,
action: string,
options: Partial<TransactionStepsDefinition> = {}
) {
const parentStep = this.findParentStepByAction(existingAction)
if (parentStep) {
const oldNext = parentStep.next!
const newDepth = parentStep.depth + 1
if (Array.isArray(parentStep.next)) {
const index = parentStep.next.findIndex(
(step) => step.action === existingAction
)
if (index > -1) {
parentStep.next[index] = {
action,
...options,
next: oldNext[index],
depth: newDepth,
} as InternalStep
}
} else {
parentStep.next = {
action,
...options,
next: oldNext,
depth: newDepth,
} as InternalStep
}
this.updateDepths(oldNext as InternalStep, parentStep)
}
return this
}
insertActionAfter(
existingAction: string,
action: string,
options: Partial<TransactionStepsDefinition> = {}
) {
const step = this.findOrThrowStepByAction(existingAction)
const oldNext = step.next
const newDepth = step.depth + 1
step.next = {
action,
...options,
next: oldNext,
depth: newDepth,
parent: step.action,
} as InternalStep
this.updateDepths(oldNext as InternalStep, step.next)
return this
}
private appendTo(step: InternalStep | string, newStep: InternalStep) {
if (typeof step === "string") {
step = this.findOrThrowStepByAction(step)
}
step.next = {
...newStep,
depth: step.depth + 1,
parent: step.action,
} as InternalStep
return this
}
appendAction(
action: string,
to: string,
options: Partial<TransactionStepsDefinition> = {}
) {
const newAction = {
action,
...options,
} as InternalStep
const branch = this.findLastStep(this.findStepByAction(to))
this.appendTo(branch, newAction)
return this
}
private move(
actionToMove: string,
targetAction: string,
{
runInParallel,
mergeNext,
}: {
runInParallel?: boolean
mergeNext?: boolean
} = {
runInParallel: false,
mergeNext: false,
}
): OrchestratorBuilder {
const parentActionToMoveStep = this.findParentStepByAction(actionToMove)!
const parentTargetActionStep = this.findParentStepByAction(targetAction)!
const actionToMoveStep = this.findStepByAction(
actionToMove,
parentTargetActionStep
)!
if (!actionToMoveStep) {
throw new Error(
`Action "${actionToMove}" could not be found in the following steps of "${targetAction}"`
)
}
if (Array.isArray(parentActionToMoveStep.next)) {
const index = parentActionToMoveStep.next.findIndex(
(step) => step.action === actionToMove
)
if (index > -1) {
parentActionToMoveStep.next.splice(index, 1)
}
} else {
delete parentActionToMoveStep.next
}
if (runInParallel) {
if (Array.isArray(parentTargetActionStep.next)) {
parentTargetActionStep.next.push(actionToMoveStep)
} else if (parentTargetActionStep.next) {
parentTargetActionStep.next = [
parentTargetActionStep.next,
actionToMoveStep,
]
}
} else {
if (actionToMoveStep.next) {
if (mergeNext) {
if (Array.isArray(actionToMoveStep.next)) {
actionToMoveStep.next.push(
parentTargetActionStep.next as InternalStep
)
} else {
actionToMoveStep.next = [
actionToMoveStep.next,
parentTargetActionStep.next as InternalStep,
]
}
} else {
this.appendTo(
this.findLastStep(actionToMoveStep),
parentTargetActionStep.next as InternalStep
)
}
} else {
actionToMoveStep.next = parentTargetActionStep.next
}
parentTargetActionStep.next = actionToMoveStep
}
this.updateDepths(
actionToMoveStep as InternalStep,
parentTargetActionStep,
1,
parentTargetActionStep.depth
)
return this
}
moveAction(actionToMove: string, targetAction: string): OrchestratorBuilder {
this.move(actionToMove, targetAction)
return this
}
moveAndMergeNextAction(
actionToMove: string,
targetAction: string
): OrchestratorBuilder {
this.move(actionToMove, targetAction, { mergeNext: true })
return this
}
mergeActions(where: string, ...actions: string[]) {
actions.unshift(where)
if (actions.length < 2) {
throw new Error("Cannot merge less than two actions")
}
for (const action of actions) {
if (action !== where) {
this.move(action, where, { runInParallel: true })
}
}
return this
}
deleteAction(action: string, steps: InternalStep = this.steps) {
const actionStep = this.findOrThrowStepByAction(action)
const parentStep = this.findParentStepByAction(action, steps)!
if (Array.isArray(parentStep.next)) {
const index = parentStep.next.findIndex((step) => step.action === action)
if (index > -1 && actionStep.next) {
if (actionStep.next) {
parentStep.next[index] = actionStep.next as InternalStep
} else {
parentStep.next.splice(index, 1)
}
}
} else {
parentStep.next = actionStep.next
}
this.updateDepths(
actionStep.next as InternalStep,
parentStep,
1,
parentStep.depth
)
return this
}
pruneAction(action: string) {
const actionStep = this.findOrThrowStepByAction(action)
const parentStep = this.findParentStepByAction(action, this.steps)!
if (Array.isArray(parentStep.next)) {
const index = parentStep.next.findIndex((step) => step.action === action)
if (index > -1) {
parentStep.next.splice(index, 1)
}
} else {
delete parentStep.next
}
return this
}
private findStepByAction(
action: string,
step: InternalStep = this.steps
): InternalStep | undefined {
if (step.action === action) {
return step
}
if (Array.isArray(step.next)) {
for (const subStep of step.next) {
const found = this.findStepByAction(action, subStep as InternalStep)
if (found) {
return found
}
}
} else if (step.next && typeof step.next === "object") {
return this.findStepByAction(action, step.next as InternalStep)
}
return
}
private findOrThrowStepByAction(
action: string,
steps: InternalStep = this.steps
): InternalStep {
const step = this.findStepByAction(action, steps)
if (!step) {
throw new Error(`Action "${action}" could not be found`)
}
return step
}
private findParentStepByAction(
action: string,
step: InternalStep = this.steps
): InternalStep | undefined {
if (!step.next) {
return
}
const nextSteps = Array.isArray(step.next) ? step.next : [step.next]
for (const nextStep of nextSteps) {
if (!nextStep) {
continue
}
if (nextStep.action === action) {
return step
}
const foundStep = this.findParentStepByAction(
action,
nextStep as InternalStep
)
if (foundStep) {
return foundStep
}
}
return
}
private findLastStep(steps: InternalStep = this.steps): InternalStep {
let step = steps as InternalStep
while (step.next) {
step = Array.isArray(step.next)
? (step.next[step.next.length - 1] as InternalStep)
: (step.next as InternalStep)
}
return step
}
private updateDepths(
startingStep: InternalStep,
parent,
incr = 1,
beginFrom?: number
): void {
if (!startingStep) {
return
}
const update = (step: InternalStep, parent, beginFrom) => {
step.depth = beginFrom + incr
step.parent = parent.action
if (Array.isArray(step.next)) {
step.next.forEach((nextAction) => update(nextAction, step, step.depth))
} else if (step.next) {
update(step.next, step, step.depth)
}
}
update(startingStep, parent, beginFrom ?? startingStep.depth)
}
build(): TransactionStepsDefinition {
if (!this.steps.next) {
return {}
}
const ignore = ["depth", "parent"]
const result = JSON.parse(
JSON.stringify(
Array.isArray(this.steps.next) ? this.steps : this.steps.next,
null
),
(key, value) => {
if (ignore.includes(key)) {
return
}
return value
}
)
return result
}
}

View File

@@ -1,12 +1,11 @@
import { EventEmitter } from "events"
import {
TransactionHandlerType,
TransactionStepsDefinition,
TransactionStepStatus,
TransactionState,
TransactionModel,
} from "."
} from "./types"
import {
DistributedTransaction,
TransactionCheckpoint,
@@ -15,7 +14,7 @@ import {
import { TransactionStep, TransactionStepHandler } from "./transaction-step"
export type TransactionFlow = {
transactionModelId: string
modelId: string
definition: TransactionStepsDefinition
transactionId: string
hasFailedSteps: boolean
@@ -31,11 +30,11 @@ export type TransactionFlow = {
* It is based on a single transaction definition, which is used to execute all the transaction steps
*/
export class TransactionOrchestrator extends EventEmitter {
private ROOT_STEP = "_root"
private static ROOT_STEP = "_root"
private invokeSteps: string[] = []
private compensateSteps: string[] = []
public DEFAULT_RETRIES = 0
public static DEFAULT_RETRIES = 0
constructor(
public id: string,
private definition: TransactionStepsDefinition
@@ -117,7 +116,7 @@ export class TransactionOrchestrator extends EventEmitter {
return this.canMoveBackward(flow, step)
} else {
const previous = this.getPreviousStep(flow, step)
if (previous.id === this.ROOT_STEP) {
if (previous.id === TransactionOrchestrator.ROOT_STEP) {
return true
}
@@ -148,7 +147,7 @@ export class TransactionOrchestrator extends EventEmitter {
for (const step of allSteps) {
if (
step === this.ROOT_STEP ||
step === TransactionOrchestrator.ROOT_STEP ||
!this.canContinue(flow, flow.steps[step])
) {
continue
@@ -227,7 +226,7 @@ export class TransactionOrchestrator extends EventEmitter {
private flagStepsToRevert(flow: TransactionFlow): void {
for (const step in flow.steps) {
if (step === this.ROOT_STEP) {
if (step === TransactionOrchestrator.ROOT_STEP) {
continue
}
@@ -244,7 +243,7 @@ export class TransactionOrchestrator extends EventEmitter {
}
}
private async setStepSuccess(
private static async setStepSuccess(
transaction: DistributedTransaction,
step: TransactionStep,
response: unknown
@@ -272,11 +271,11 @@ export class TransactionOrchestrator extends EventEmitter {
}
}
private async setStepFailure(
private static async setStepFailure(
transaction: DistributedTransaction,
step: TransactionStep,
error: Error | null,
maxRetries: number = this.DEFAULT_RETRIES
error: Error | any,
maxRetries: number = TransactionOrchestrator.DEFAULT_RETRIES
): Promise<void> {
step.failures++
@@ -344,10 +343,10 @@ export class TransactionOrchestrator extends EventEmitter {
const payload = new TransactionPayload(
{
producer: flow.transactionModelId,
model_id: flow.modelId,
reply_to_topic: TransactionOrchestrator.getKeyName(
"trans",
flow.transactionModelId
flow.modelId
),
idempotency_key: TransactionOrchestrator.getKeyName(
flow.transactionId,
@@ -368,10 +367,14 @@ export class TransactionOrchestrator extends EventEmitter {
transaction
.handler(step.definition.action + "", type, payload)
.then(async (response) => {
await this.setStepSuccess(transaction, step, response)
await TransactionOrchestrator.setStepSuccess(
transaction,
step,
response
)
})
.catch(async (error) => {
await this.setStepFailure(
await TransactionOrchestrator.setStepFailure(
transaction,
step,
error,
@@ -381,13 +384,18 @@ export class TransactionOrchestrator extends EventEmitter {
)
} else {
execution.push(
transaction
.saveCheckpoint()
.then(async () =>
transaction
.handler(step.definition.action + "", type, payload)
.catch(() => void 0)
)
transaction.saveCheckpoint().then(async () =>
transaction
.handler(step.definition.action + "", type, payload)
.catch(async (error) => {
await TransactionOrchestrator.setStepFailure(
transaction,
step,
error,
step.definition.maxRetries
)
})
)
)
}
}
@@ -453,17 +461,17 @@ export class TransactionOrchestrator extends EventEmitter {
transactionId: string
): Promise<TransactionFlow> {
return {
transactionModelId: this.id,
modelId: this.id,
transactionId: transactionId,
hasFailedSteps: false,
hasSkippedSteps: false,
state: TransactionState.NOT_STARTED,
definition: this.definition,
steps: this.buildSteps(this.definition),
steps: TransactionOrchestrator.buildSteps(this.definition),
}
}
private async loadTransactionById(
private static async loadTransactionById(
transactionId: string
): Promise<TransactionCheckpoint | null> {
const transaction = await DistributedTransaction.loadTransaction(
@@ -472,26 +480,31 @@ export class TransactionOrchestrator extends EventEmitter {
if (transaction !== null) {
const flow = transaction.flow
transaction.flow.steps = this.buildSteps(flow.definition, flow.steps)
transaction.flow.steps = TransactionOrchestrator.buildSteps(
flow.definition,
flow.steps
)
return transaction
}
return null
}
private buildSteps(
private static buildSteps(
flow: TransactionStepsDefinition,
existingSteps?: { [key: string]: TransactionStep }
): { [key: string]: TransactionStep } {
const states: { [key: string]: TransactionStep } = {
[this.ROOT_STEP]: {
id: this.ROOT_STEP,
[TransactionOrchestrator.ROOT_STEP]: {
id: TransactionOrchestrator.ROOT_STEP,
next: [] as string[],
} as TransactionStep,
}
const actionNames = new Set<string>()
const queue: any[] = [{ obj: flow, level: [this.ROOT_STEP] }]
const queue: any[] = [
{ obj: flow, level: [TransactionOrchestrator.ROOT_STEP] },
]
while (queue.length > 0) {
const { obj, level } = queue.shift()
@@ -525,7 +538,7 @@ export class TransactionOrchestrator extends EventEmitter {
id,
depth: level.length - 1,
definition: definitionCopy,
saveResponse: definitionCopy.saveResponse,
saveResponse: definitionCopy.saveResponse ?? true,
invoke: {
state: TransactionState.NOT_STARTED,
status: TransactionStepStatus.IDLE,
@@ -557,7 +570,8 @@ export class TransactionOrchestrator extends EventEmitter {
handler: TransactionStepHandler,
payload?: unknown
): Promise<DistributedTransaction> {
const existingTransaction = await this.loadTransactionById(transactionId)
const existingTransaction =
await TransactionOrchestrator.loadTransactionById(transactionId)
let newTransaction = false
let modelFlow
@@ -582,7 +596,7 @@ export class TransactionOrchestrator extends EventEmitter {
return transaction
}
private getStepByAction(
private static getStepByAction(
flow: TransactionFlow,
action: string
): TransactionStep | null {
@@ -594,11 +608,10 @@ export class TransactionOrchestrator extends EventEmitter {
return null
}
private async getTransactionAndStepFromIdempotencyKey(
private static async getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey: string,
handler?: TransactionStepHandler,
transaction?: DistributedTransaction,
payload?: unknown
transaction?: DistributedTransaction
): Promise<[DistributedTransaction, TransactionStep]> {
const [transactionId, action, actionType] = responseIdempotencyKey.split(
TransactionOrchestrator.SEPARATOR
@@ -611,7 +624,8 @@ export class TransactionOrchestrator extends EventEmitter {
}
if (!transaction) {
const existingTransaction = await this.loadTransactionById(transactionId)
const existingTransaction =
await TransactionOrchestrator.loadTransactionById(transactionId)
if (existingTransaction === null) {
throw new Error(`Transaction ${transactionId} could not be found.`)
@@ -620,13 +634,16 @@ export class TransactionOrchestrator extends EventEmitter {
transaction = new DistributedTransaction(
existingTransaction.flow,
handler!,
payload,
undefined,
existingTransaction.errors,
existingTransaction.context
)
}
const step = this.getStepByAction(transaction.getFlow(), action)
const step = TransactionOrchestrator.getStepByAction(
transaction.getFlow(),
action
)
if (step === null) {
throw new Error("Action not found.")
@@ -653,15 +670,19 @@ export class TransactionOrchestrator extends EventEmitter {
response?: unknown
): Promise<DistributedTransaction> {
const [curTransaction, step] =
await this.getTransactionAndStepFromIdempotencyKey(
await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey,
handler,
transaction,
response
transaction
)
if (step.getStates().status === TransactionStepStatus.WAITING) {
await this.setStepSuccess(curTransaction, step, response)
await TransactionOrchestrator.setStepSuccess(
curTransaction,
step,
response
)
this.emit("resume", curTransaction)
await this.executeNext(curTransaction)
} else {
@@ -683,21 +704,24 @@ export class TransactionOrchestrator extends EventEmitter {
*/
public async registerStepFailure(
responseIdempotencyKey: string,
error: Error | null,
error?: Error | any,
handler?: TransactionStepHandler,
transaction?: DistributedTransaction,
response?: unknown
transaction?: DistributedTransaction
): Promise<DistributedTransaction> {
const [curTransaction, step] =
await this.getTransactionAndStepFromIdempotencyKey(
await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey,
handler,
transaction,
response
transaction
)
if (step.getStates().status === TransactionStepStatus.WAITING) {
await this.setStepFailure(curTransaction, step, error, 0)
await TransactionOrchestrator.setStepFailure(
curTransaction,
step,
error,
0
)
this.emit("resume", curTransaction)
await this.executeNext(curTransaction)
} else {

View File

@@ -1,10 +1,10 @@
import { TransactionPayload } from "./distributed-transaction"
import {
TransactionStepsDefinition,
TransactionStepStatus,
TransactionState,
TransactionHandlerType,
TransactionPayload,
} from "."
} from "./types"
export type TransactionStepHandler = (
actionId: string,
@@ -27,7 +27,7 @@ export class TransactionStep {
* @member failures - The number of failures encountered while executing the step
* @member lastAttempt - The timestamp of the last attempt made to execute the step
* @member next - The ids of the next steps in the flow
* @member saveResponse - A flag indicating if the response of a step should be shared in the transaction context and available to subsequent steps - default is false
* @member saveResponse - A flag indicating if the response of a step should be shared in the transaction context and available to subsequent steps - default is true
*/
private stepFailed = false
id: string

View File

@@ -0,0 +1,43 @@
export enum TransactionHandlerType {
INVOKE = "invoke",
COMPENSATE = "compensate",
}
export type TransactionStepsDefinition = {
action?: string
continueOnPermanentFailure?: boolean
noCompensation?: boolean
maxRetries?: number
retryInterval?: number
timeout?: number
async?: boolean
noWait?: boolean
saveResponse?: boolean
next?: TransactionStepsDefinition | TransactionStepsDefinition[]
}
export enum TransactionStepStatus {
IDLE = "idle",
OK = "ok",
WAITING = "waiting_response",
TEMPORARY_FAILURE = "temp_failure",
PERMANENT_FAILURE = "permanent_failure",
}
export enum TransactionState {
NOT_STARTED = "not_started",
INVOKING = "invoking",
WAITING_TO_COMPENSATE = "waiting_to_compensate",
COMPENSATING = "compensating",
DONE = "done",
REVERTED = "reverted",
FAILED = "failed",
DORMANT = "dormant",
SKIPPED = "skipped",
}
export type TransactionModel = {
id: string
flow: TransactionStepsDefinition
hash: string
}

View File

@@ -0,0 +1,226 @@
import { MedusaContainer } from "@medusajs/types"
import {
DistributedTransaction,
TransactionMetadata,
} from "./distributed-transaction"
import { TransactionOrchestrator } from "./transaction-orchestrator"
import { TransactionStepHandler } from "./transaction-step"
import { TransactionHandlerType, TransactionStepsDefinition } from "./types"
import { OrchestratorBuilder } from "./orchestrator-builder"
interface Workflow {
id: string
handler: (container: MedusaContainer) => TransactionStepHandler
orchestrator: TransactionOrchestrator
flow_: TransactionStepsDefinition
handlers_: Map<
string,
{ invoke: InvokeHandler; compensate?: CompensateHandler }
>
requiredModules?: Set<string>
optionalModules?: Set<string>
}
type InvokeHandler = (
container: MedusaContainer,
payload: any,
invoke: { [actions: string]: any },
metadata: TransactionMetadata
) => Promise<any>
type CompensateHandler = (
container: MedusaContainer,
payload: any,
invoke: { [actions: string]: any },
compensate: { [actions: string]: any },
metadata: TransactionMetadata
) => Promise<any>
export class WorkflowManager {
protected static workflows: Map<string, Workflow> = new Map()
protected container: MedusaContainer
constructor(container?: MedusaContainer) {
this.container = container as MedusaContainer
}
static unregister(workflowId: string) {
WorkflowManager.workflows.delete(workflowId)
}
static unregisterAll() {
WorkflowManager.workflows.clear()
}
static getWorkflows() {
return WorkflowManager.workflows
}
static getTransactionDefinition(workflowId): OrchestratorBuilder {
if (!WorkflowManager.workflows.has(workflowId)) {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const workflow = WorkflowManager.workflows.get(workflowId)!
return new OrchestratorBuilder(workflow.flow_)
}
static register(
workflowId: string,
flow: TransactionStepsDefinition | OrchestratorBuilder,
handlers: Map<
string,
{ invoke: InvokeHandler; compensate?: CompensateHandler }
>,
requiredModules?: Set<string>,
optionalModules?: Set<string>
) {
if (WorkflowManager.workflows.has(workflowId)) {
throw new Error(`Workflow with id "${workflowId}" is already defined.`)
}
const finalFlow = flow instanceof OrchestratorBuilder ? flow.build() : flow
WorkflowManager.workflows.set(workflowId, {
id: workflowId,
flow_: finalFlow,
orchestrator: new TransactionOrchestrator(workflowId, finalFlow),
handler: WorkflowManager.buildHandlers(handlers),
handlers_: handlers,
requiredModules,
optionalModules,
})
}
static update(
workflowId: string,
flow: TransactionStepsDefinition | OrchestratorBuilder,
handlers: Map<
string,
{ invoke: InvokeHandler; compensate?: CompensateHandler }
>,
requiredModules?: Set<string>,
optionalModules?: Set<string>
) {
if (!WorkflowManager.workflows.has(workflowId)) {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const workflow = WorkflowManager.workflows.get(workflowId)!
for (const [key, value] of handlers.entries()) {
workflow.handlers_.set(key, value)
}
const finalFlow = flow instanceof OrchestratorBuilder ? flow.build() : flow
WorkflowManager.workflows.set(workflowId, {
id: workflowId,
flow_: finalFlow,
orchestrator: new TransactionOrchestrator(workflowId, finalFlow),
handler: WorkflowManager.buildHandlers(workflow.handlers_),
handlers_: workflow.handlers_,
requiredModules,
optionalModules,
})
}
private static buildHandlers(
handlers: Map<
string,
{ invoke: InvokeHandler; compensate?: CompensateHandler }
>
): (container: MedusaContainer) => TransactionStepHandler {
return (container: MedusaContainer): TransactionStepHandler => {
return async (
actionId: string,
handlerType: TransactionHandlerType,
payload?: any
) => {
const command = handlers.get(actionId)
if (!command) {
throw new Error(`Handler for action "${actionId}" not found.`)
} else if (!command[handlerType]) {
throw new Error(
`"${handlerType}" handler for action "${actionId}" not found.`
)
}
const { invoke, compensate, payload: input } = payload.context
const { metadata } = payload
if (handlerType === TransactionHandlerType.COMPENSATE) {
return await command[handlerType]!(
container,
input,
invoke,
compensate,
metadata
)
}
return await command[handlerType](container, input, invoke, metadata)
}
}
}
async begin(
workflowId: string,
uniqueTransactionId: string,
input?: unknown
) {
if (!WorkflowManager.workflows.has(workflowId)) {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const workflow = WorkflowManager.workflows.get(workflowId)!
const orchestrator = workflow.orchestrator
const transaction = await orchestrator.beginTransaction(
uniqueTransactionId,
workflow.handler(this.container),
input
)
await orchestrator.resume(transaction)
return transaction
}
async registerStepSuccess(
workflowId: string,
idempotencyKey: string,
response?: unknown
): Promise<DistributedTransaction> {
if (!WorkflowManager.workflows.has(workflowId)) {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const workflow = WorkflowManager.workflows.get(workflowId)!
return await workflow.orchestrator.registerStepSuccess(
idempotencyKey,
workflow.handler(this.container),
undefined,
response
)
}
async registerStepFailure(
workflowId: string,
idempotencyKey: string,
error?: Error | any
): Promise<DistributedTransaction> {
if (!WorkflowManager.workflows.has(workflowId)) {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const workflow = WorkflowManager.workflows.get(workflowId)!
return await workflow.orchestrator.registerStepFailure(
idempotencyKey,
error,
workflow.handler(this.container)
)
}
}