feat(orchestration, workflows-sdk): Add events management and implementation to manage async workflows (#5951)

* feat(orchestration, workflows-sdk): Add events management and implementation to manage async workflows

* Create fresh-boxes-scream.md

* cleanup

* fix: resolveValue input ref

* resolve value recursive

* chore: resolve result value only for new api

* chore: save checkpoint before scheduling

* features

* fix: beginTransaction checking existing transaction

---------

Co-authored-by: Carlos R. L. Rodrigues <rodrigolr@gmail.com>
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
This commit is contained in:
Adrien de Peretti
2024-01-05 14:40:58 +01:00
committed by GitHub
parent 99a4f94db5
commit bf63c4e6a3
18 changed files with 1449 additions and 285 deletions

View File

@@ -0,0 +1,6 @@
---
"@medusajs/orchestration": patch
"@medusajs/workflows-sdk": patch
---
feat(orchestration, workflows-sdk): Add events management and implementation to manage async workflows

View File

@@ -70,8 +70,8 @@ describe("Transaction Orchestrator", () => {
expect.objectContaining({
metadata: {
model_id: "transaction-name",
reply_to_topic: "trans:transaction-name",
idempotency_key: "transaction_id_123:firstMethod:invoke",
idempotency_key:
"transaction-name:transaction_id_123:firstMethod:invoke",
action: "firstMethod",
action_type: "invoke",
attempt: 1,
@@ -85,8 +85,8 @@ describe("Transaction Orchestrator", () => {
expect.objectContaining({
metadata: {
model_id: "transaction-name",
reply_to_topic: "trans:transaction-name",
idempotency_key: "transaction_id_123:secondMethod:invoke",
idempotency_key:
"transaction-name:transaction_id_123:secondMethod:invoke",
action: "secondMethod",
action_type: "invoke",
attempt: 1,
@@ -654,6 +654,7 @@ describe("Transaction Orchestrator", () => {
next: {
action: "firstMethod",
async: true,
compensateAsync: true,
next: {
action: "secondMethod",
},
@@ -675,8 +676,10 @@ describe("Transaction Orchestrator", () => {
expect(mocks.one).toBeCalledTimes(1)
expect(mocks.two).toBeCalledTimes(0)
expect(transaction.getState()).toBe(TransactionState.INVOKING)
expect(transaction.getFlow().hasWaitingSteps).toBe(true)
const mocktransactionId = TransactionOrchestrator.getKeyName(
"transaction-name",
transaction.transactionId,
"firstMethod",
TransactionHandlerType.INVOKE
@@ -688,6 +691,7 @@ describe("Transaction Orchestrator", () => {
)
expect(transaction.getState()).toBe(TransactionState.DONE)
expect(transaction.getFlow().hasWaitingSteps).toBe(false)
})
it("Should hold the status COMPENSATING while the transaction hasn't finished compensating", async () => {
@@ -731,8 +735,11 @@ describe("Transaction Orchestrator", () => {
next: {
action: "firstMethod",
async: true,
compensateAsync: true,
next: {
action: "secondMethod",
async: true,
compensateAsync: true,
},
},
}
@@ -745,22 +752,31 @@ describe("Transaction Orchestrator", () => {
)
const mocktransactionId = TransactionOrchestrator.getKeyName(
"transaction-name",
transaction.transactionId,
"firstMethod",
TransactionHandlerType.INVOKE
)
const registerBeforeAllowed = await strategy
.registerStepFailure(mocktransactionId, null, handler)
.catch((e) => e.message)
const mockSecondStepId = TransactionOrchestrator.getKeyName(
"transaction-name",
transaction.transactionId,
"secondMethod",
TransactionHandlerType.INVOKE
)
await strategy.resume(transaction)
expect(mocks.one).toBeCalledTimes(1)
expect(mocks.compensateOne).toBeCalledTimes(0)
expect(mocks.two).toBeCalledTimes(0)
const registerBeforeAllowed = await strategy
.registerStepSuccess(mockSecondStepId, handler)
.catch((e) => e.message)
expect(registerBeforeAllowed).toEqual(
"Cannot set step failure when status is idle"
"Cannot set step success when status is idle"
)
expect(transaction.getState()).toBe(TransactionState.INVOKING)
@@ -774,6 +790,7 @@ describe("Transaction Orchestrator", () => {
expect(mocks.compensateOne).toBeCalledTimes(1)
const mocktransactionIdCompensate = TransactionOrchestrator.getKeyName(
"transaction-name",
transaction.transactionId,
"firstMethod",
TransactionHandlerType.COMPENSATE

View File

@@ -0,0 +1,118 @@
import {
DistributedTransaction,
TransactionCheckpoint,
} from "../distributed-transaction"
import { TransactionStep } from "../transaction-step"
import { TransactionModelOptions } from "../types"
export interface IDistributedTransactionStorage {
get(key: string): Promise<TransactionCheckpoint | undefined>
list(): Promise<TransactionCheckpoint[]>
save(key: string, data: TransactionCheckpoint, ttl?: number): Promise<void>
delete(key: string): Promise<void>
archive(key: string, options?: TransactionModelOptions): Promise<void>
scheduleRetry(
transaction: DistributedTransaction,
step: TransactionStep,
timestamp: number,
interval: number
): Promise<void>
clearRetry(
transaction: DistributedTransaction,
step: TransactionStep
): Promise<void>
scheduleTransactionTimeout(
transaction: DistributedTransaction,
timestamp: number,
interval: number
): Promise<void>
scheduleStepTimeout(
transaction: DistributedTransaction,
step: TransactionStep,
timestamp: number,
interval: number
): Promise<void>
clearTransactionTimeout(transaction: DistributedTransaction): Promise<void>
clearStepTimeout(
transaction: DistributedTransaction,
step: TransactionStep
): Promise<void>
}
export abstract class DistributedTransactionStorage
implements IDistributedTransactionStorage
{
constructor() {
/* noop */
}
async get(key: string): Promise<TransactionCheckpoint | undefined> {
throw new Error("Method 'get' not implemented.")
}
async list(): Promise<TransactionCheckpoint[]> {
throw new Error("Method 'list' not implemented.")
}
async save(
key: string,
data: TransactionCheckpoint,
ttl?: number
): Promise<void> {
throw new Error("Method 'save' not implemented.")
}
async delete(key: string): Promise<void> {
throw new Error("Method 'delete' not implemented.")
}
async archive(key: string, options?: TransactionModelOptions): Promise<void> {
throw new Error("Method 'archive' not implemented.")
}
async scheduleRetry(
transaction: DistributedTransaction,
step: TransactionStep,
timestamp: number,
interval: number
): Promise<void> {
throw new Error("Method 'scheduleRetry' not implemented.")
}
async clearRetry(
transaction: DistributedTransaction,
step: TransactionStep
): Promise<void> {
throw new Error("Method 'clearRetry' not implemented.")
}
async scheduleTransactionTimeout(
transaction: DistributedTransaction,
timestamp: number,
interval: number
): Promise<void> {
throw new Error("Method 'scheduleTransactionTimeout' not implemented.")
}
async clearTransactionTimeout(
transaction: DistributedTransaction
): Promise<void> {
throw new Error("Method 'clearTransactionTimeout' not implemented.")
}
async scheduleStepTimeout(
transaction: DistributedTransaction,
step: TransactionStep,
timestamp: number,
interval: number
): Promise<void> {
throw new Error("Method 'scheduleStepTimeout' not implemented.")
}
async clearStepTimeout(
transaction: DistributedTransaction,
step: TransactionStep
): Promise<void> {
throw new Error("Method 'clearStepTimeout' not implemented.")
}
}

View File

@@ -0,0 +1,37 @@
import { TransactionCheckpoint } from "../distributed-transaction"
import { TransactionModelOptions } from "../types"
import { DistributedTransactionStorage } from "./abstract-storage"
// eslint-disable-next-line max-len
export class BaseInMemoryDistributedTransactionStorage extends DistributedTransactionStorage {
private storage: Map<string, TransactionCheckpoint>
constructor() {
super()
this.storage = new Map()
}
async get(key: string): Promise<TransactionCheckpoint | undefined> {
return this.storage.get(key)
}
async list(): Promise<TransactionCheckpoint[]> {
return Array.from(this.storage.values())
}
async save(
key: string,
data: TransactionCheckpoint,
ttl?: number
): Promise<void> {
this.storage.set(key, data)
}
async delete(key: string): Promise<void> {
this.storage.delete(key)
}
async archive(key: string, options?: TransactionModelOptions): Promise<void> {
this.storage.delete(key)
}
}

View File

@@ -1,12 +1,17 @@
import { isDefined } from "@medusajs/utils"
import { TransactionFlow } from "./transaction-orchestrator"
import { TransactionStepHandler } from "./transaction-step"
import { EventEmitter } from "events"
import { IDistributedTransactionStorage } from "./datastore/abstract-storage"
import { BaseInMemoryDistributedTransactionStorage } from "./datastore/base-in-memory-storage"
import {
TransactionFlow,
TransactionOrchestrator,
} from "./transaction-orchestrator"
import { TransactionStep, TransactionStepHandler } from "./transaction-step"
import { TransactionHandlerType, TransactionState } from "./types"
/**
* @typedef TransactionMetadata
* @property model_id - The id of the model_id that created the transaction (modelId).
* @property reply_to_topic - The topic to reply to for the transaction.
* @property idempotency_key - The idempotency key of the transaction.
* @property action - The action of the transaction.
* @property action_type - The type of the transaction.
@@ -15,7 +20,6 @@ import { TransactionHandlerType, TransactionState } from "./types"
*/
export type TransactionMetadata = {
model_id: string
reply_to_topic: string
idempotency_key: string
action: string
action_type: TransactionHandlerType
@@ -70,13 +74,19 @@ export class TransactionPayload {
* DistributedTransaction represents a distributed transaction, which is a transaction that is composed of multiple steps that are executed in a specific order.
*/
export class DistributedTransaction {
export class DistributedTransaction extends EventEmitter {
public modelId: string
public transactionId: string
private readonly errors: TransactionStepError[] = []
private readonly context: TransactionContext = new TransactionContext()
private static keyValueStore: IDistributedTransactionStorage
public static setStorage(storage: IDistributedTransactionStorage) {
this.keyValueStore = storage
}
private static keyPrefix = "dtrans"
constructor(
private flow: TransactionFlow,
@@ -85,6 +95,8 @@ export class DistributedTransaction {
errors?: TransactionStepError[],
context?: TransactionContext
) {
super()
this.transactionId = flow.transactionId
this.modelId = flow.modelId
@@ -156,6 +168,7 @@ export class DistributedTransaction {
this.getFlow().state === TransactionState.INVOKING
)
}
public canRevert(): boolean {
return (
this.getFlow().state === TransactionState.DONE ||
@@ -163,36 +176,129 @@ export class DistributedTransaction {
)
}
public static keyValueStore: any = {} // TODO: Use Key/Value db
private static keyPrefix = "dtrans:"
public async saveCheckpoint(): Promise<TransactionCheckpoint> {
// TODO: Use Key/Value db to save transactions
const key = DistributedTransaction.keyPrefix + this.transactionId
public hasTimeout(): boolean {
return !!this.getFlow().definition.timeout
}
public getTimeoutInterval(): number | undefined {
return this.getFlow().definition.timeout
}
public async saveCheckpoint(
ttl = 0
): Promise<TransactionCheckpoint | undefined> {
const options = this.getFlow().options
if (!options?.storeExecution) {
return
}
const data = new TransactionCheckpoint(
this.getFlow(),
this.getContext(),
this.getErrors()
)
DistributedTransaction.keyValueStore[key] = JSON.stringify(data)
const key = TransactionOrchestrator.getKeyName(
DistributedTransaction.keyPrefix,
this.modelId,
this.transactionId
)
await DistributedTransaction.keyValueStore.save(key, data, ttl)
return data
}
public static async loadTransaction(
modelId: string,
transactionId: string
): Promise<TransactionCheckpoint | null> {
// TODO: Use Key/Value db to load transactions
const key = DistributedTransaction.keyPrefix + transactionId
if (DistributedTransaction.keyValueStore[key]) {
return JSON.parse(DistributedTransaction.keyValueStore[key])
const key = TransactionOrchestrator.getKeyName(
DistributedTransaction.keyPrefix,
modelId,
transactionId
)
const loadedData = await DistributedTransaction.keyValueStore.get(key)
if (loadedData) {
return loadedData
}
return null
}
public async deleteCheckpoint(): Promise<void> {
// TODO: Delete from Key/Value db
const key = DistributedTransaction.keyPrefix + this.transactionId
delete DistributedTransaction.keyValueStore[key]
const options = this.getFlow().options
if (!options?.storeExecution) {
return
}
const key = TransactionOrchestrator.getKeyName(
DistributedTransaction.keyPrefix,
this.modelId,
this.transactionId
)
await DistributedTransaction.keyValueStore.delete(key)
}
public async archiveCheckpoint(): Promise<void> {
const options = this.getFlow().options
const key = TransactionOrchestrator.getKeyName(
DistributedTransaction.keyPrefix,
this.modelId,
this.transactionId
)
await DistributedTransaction.keyValueStore.archive(key, options)
}
public async scheduleRetry(
step: TransactionStep,
interval: number
): Promise<void> {
await this.saveCheckpoint()
await DistributedTransaction.keyValueStore.scheduleRetry(
this,
step,
Date.now(),
interval
)
}
public async clearRetry(step: TransactionStep): Promise<void> {
await DistributedTransaction.keyValueStore.clearRetry(this, step)
}
public async scheduleTransactionTimeout(interval: number): Promise<void> {
await this.saveCheckpoint()
await DistributedTransaction.keyValueStore.scheduleTransactionTimeout(
this,
Date.now(),
interval
)
}
public async clearTransactionTimeout(): Promise<void> {
await DistributedTransaction.keyValueStore.clearTransactionTimeout(this)
}
public async scheduleStepTimeout(
step: TransactionStep,
interval: number
): Promise<void> {
await this.saveCheckpoint()
await DistributedTransaction.keyValueStore.scheduleStepTimeout(
this,
step,
Date.now(),
interval
)
}
public async clearStepTimeout(step: TransactionStep): Promise<void> {
await DistributedTransaction.keyValueStore.clearStepTimeout(this, step)
}
}
DistributedTransaction.setStorage(
new BaseInMemoryDistributedTransactionStorage()
)

View File

@@ -13,3 +13,32 @@ export class PermanentStepFailureError extends Error {
this.name = "PermanentStepFailure"
}
}
export class StepTimeoutError extends Error {
static isStepTimeoutError(error: Error): error is StepTimeoutError {
return (
error instanceof StepTimeoutError || error.name === "StepTimeoutError"
)
}
constructor(message?: string) {
super(message)
this.name = "StepTimeoutError"
}
}
export class TransactionTimeoutError extends Error {
static isTransactionTimeoutError(
error: Error
): error is TransactionTimeoutError {
return (
error instanceof TransactionTimeoutError ||
error.name === "TransactionTimeoutError"
)
}
constructor(message?: string) {
super(message)
this.name = "TransactionTimeoutError"
}
}

View File

@@ -1,6 +1,7 @@
export * from "./types"
export * from "./datastore/abstract-storage"
export * from "./distributed-transaction"
export * from "./errors"
export * from "./orchestrator-builder"
export * from "./transaction-orchestrator"
export * from "./transaction-step"
export * from "./distributed-transaction"
export * from "./orchestrator-builder"
export * from "./errors"
export * from "./types"

View File

@@ -5,22 +5,33 @@ import {
} from "./distributed-transaction"
import { TransactionStep, TransactionStepHandler } from "./transaction-step"
import {
DistributedTransactionEvent,
TransactionHandlerType,
TransactionModelOptions,
TransactionState,
TransactionStepsDefinition,
TransactionStepStatus,
TransactionStepsDefinition,
} from "./types"
import { MedusaError, promiseAll } from "@medusajs/utils"
import { EventEmitter } from "events"
import { promiseAll } from "@medusajs/utils"
import { PermanentStepFailureError } from "./errors"
import {
PermanentStepFailureError,
StepTimeoutError,
TransactionTimeoutError,
} from "./errors"
export type TransactionFlow = {
modelId: string
options?: TransactionModelOptions
definition: TransactionStepsDefinition
transactionId: string
hasAsyncSteps: boolean
hasFailedSteps: boolean
hasWaitingSteps: boolean
hasSkippedSteps: boolean
timedOutAt: number | null
startedAt?: number
state: TransactionState
steps: {
[key: string]: TransactionStep
@@ -33,13 +44,15 @@ export type TransactionFlow = {
*/
export class TransactionOrchestrator extends EventEmitter {
private static ROOT_STEP = "_root"
public static DEFAULT_TTL = 30
private invokeSteps: string[] = []
private compensateSteps: string[] = []
public static DEFAULT_RETRIES = 0
constructor(
public id: string,
private definition: TransactionStepsDefinition
private definition: TransactionStepsDefinition,
private options?: TransactionModelOptions
) {
super()
}
@@ -126,12 +139,34 @@ export class TransactionOrchestrator extends EventEmitter {
}
}
private checkAllSteps(transaction: DistributedTransaction): {
private async checkStepTimeout(transaction, step) {
let hasTimedOut = false
if (
step.hasTimeout() &&
!step.timedOutAt &&
step.canCancel() &&
step.startedAt! + step.getTimeoutInterval()! * 1e3 < Date.now()
) {
step.timedOutAt = Date.now()
await transaction.saveCheckpoint()
this.emit(DistributedTransactionEvent.TIMEOUT, { transaction })
await TransactionOrchestrator.setStepFailure(
transaction,
step,
new StepTimeoutError(),
0
)
hasTimedOut = true
}
return hasTimedOut
}
private async checkAllSteps(transaction: DistributedTransaction): Promise<{
next: TransactionStep[]
total: number
remaining: number
completed: number
} {
}> {
let hasSkipped = false
let hasIgnoredFailure = false
let hasFailed = false
@@ -158,12 +193,44 @@ export class TransactionOrchestrator extends EventEmitter {
const stepDef = flow.steps[step]
const curState = stepDef.getStates()
const hasTimedOut = await this.checkStepTimeout(transaction, stepDef)
if (hasTimedOut) {
continue
}
if (curState.status === TransactionStepStatus.WAITING) {
hasWaiting = true
if (stepDef.canRetry()) {
nextSteps.push(stepDef)
if (stepDef.hasAwaitingRetry()) {
if (stepDef.canRetryAwaiting()) {
stepDef.retryRescheduledAt = null
nextSteps.push(stepDef)
} else if (!stepDef.retryRescheduledAt) {
stepDef.hasScheduledRetry = true
stepDef.retryRescheduledAt = Date.now()
await transaction.scheduleRetry(
stepDef,
stepDef.definition.retryIntervalAwaiting!
)
}
}
continue
} else if (curState.status === TransactionStepStatus.TEMPORARY_FAILURE) {
if (!stepDef.canRetry()) {
if (stepDef.hasRetryInterval() && !stepDef.retryRescheduledAt) {
stepDef.hasScheduledRetry = true
stepDef.retryRescheduledAt = Date.now()
await transaction.scheduleRetry(
stepDef,
stepDef.definition.retryInterval!
)
}
continue
}
stepDef.retryRescheduledAt = null
}
if (stepDef.canInvoke(flow.state) || stepDef.canCompensate(flow.state)) {
@@ -185,6 +252,8 @@ export class TransactionOrchestrator extends EventEmitter {
}
}
flow.hasWaitingSteps = hasWaiting
const totalSteps = allSteps.length - 1
if (
flow.state === TransactionState.WAITING_TO_COMPENSATE &&
@@ -194,9 +263,9 @@ export class TransactionOrchestrator extends EventEmitter {
flow.state = TransactionState.COMPENSATING
this.flagStepsToRevert(flow)
this.emit("compensate", transaction)
this.emit(DistributedTransactionEvent.COMPENSATE_BEGIN, { transaction })
return this.checkAllSteps(transaction)
return await this.checkAllSteps(transaction)
} else if (completedSteps === totalSteps) {
if (hasSkipped) {
flow.hasSkippedSteps = true
@@ -211,11 +280,6 @@ export class TransactionOrchestrator extends EventEmitter {
? TransactionState.REVERTED
: TransactionState.DONE
}
this.emit("finish", transaction)
// TODO: check TransactionModel if it should delete the checkpoint when the transaction is done
void transaction.deleteCheckpoint()
}
return {
@@ -267,9 +331,25 @@ export class TransactionOrchestrator extends EventEmitter {
step.changeState(TransactionState.DONE)
}
if (step.definition.async) {
const flow = transaction.getFlow()
if (step.definition.async || flow.options?.strictCheckpoints) {
await transaction.saveCheckpoint()
}
const cleaningUp: Promise<unknown>[] = []
if (step.hasRetryScheduled()) {
cleaningUp.push(transaction.clearRetry(step))
}
if (step.hasTimeout()) {
cleaningUp.push(transaction.clearStepTimeout(step))
}
await promiseAll(cleaningUp)
const eventName = step.isCompensating()
? DistributedTransactionEvent.COMPENSATE_STEP_SUCCESS
: DistributedTransactionEvent.STEP_SUCCESS
transaction.emit(eventName, { step, transaction })
}
private static async setStepFailure(
@@ -282,6 +362,8 @@ export class TransactionOrchestrator extends EventEmitter {
step.changeStatus(TransactionStepStatus.TEMPORARY_FAILURE)
const flow = transaction.getFlow()
const cleaningUp: Promise<unknown>[] = []
if (step.failures > maxRetries) {
step.changeState(TransactionState.FAILED)
step.changeStatus(TransactionStepStatus.PERMANENT_FAILURE)
@@ -295,7 +377,6 @@ export class TransactionOrchestrator extends EventEmitter {
)
if (!step.isCompensating()) {
const flow = transaction.getFlow()
if (step.definition.continueOnPermanentFailure) {
for (const childStep of step.next) {
const child = flow.steps[childStep]
@@ -305,107 +386,175 @@ export class TransactionOrchestrator extends EventEmitter {
flow.state = TransactionState.WAITING_TO_COMPENSATE
}
}
if (step.hasTimeout()) {
cleaningUp.push(transaction.clearStepTimeout(step))
}
}
if (step.definition.async) {
if (step.definition.async || flow.options?.strictCheckpoints) {
await transaction.saveCheckpoint()
}
if (step.hasRetryScheduled()) {
cleaningUp.push(transaction.clearRetry(step))
}
await promiseAll(cleaningUp)
const eventName = step.isCompensating()
? DistributedTransactionEvent.COMPENSATE_STEP_FAILURE
: DistributedTransactionEvent.STEP_FAILURE
transaction.emit(eventName, { step, transaction })
}
private async checkTransactionTimeout(transaction, currentSteps) {
let hasTimedOut = false
const flow = transaction.getFlow()
if (
transaction.hasTimeout() &&
!flow.timedOutAt &&
flow.startedAt! + transaction.getTimeoutInterval()! * 1e3 < Date.now()
) {
flow.timedOutAt = Date.now()
this.emit(DistributedTransactionEvent.TIMEOUT, { transaction })
for (const step of currentSteps) {
await TransactionOrchestrator.setStepFailure(
transaction,
step,
new TransactionTimeoutError(),
0
)
}
await transaction.saveCheckpoint()
hasTimedOut = true
}
return hasTimedOut
}
private async executeNext(
transaction: DistributedTransaction
): Promise<void> {
if (transaction.hasFinished()) {
return
}
let continueExecution = true
const flow = transaction.getFlow()
const nextSteps = this.checkAllSteps(transaction)
const execution: Promise<void | unknown>[] = []
for (const step of nextSteps.next) {
const curState = step.getStates()
const type = step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE
step.lastAttempt = Date.now()
step.attempts++
if (curState.state === TransactionState.NOT_STARTED) {
if (step.isCompensating()) {
step.changeState(TransactionState.COMPENSATING)
if (step.definition.noCompensation) {
step.changeState(TransactionState.REVERTED)
continue
}
} else if (flow.state === TransactionState.INVOKING) {
step.changeState(TransactionState.INVOKING)
}
while (continueExecution) {
if (transaction.hasFinished()) {
return
}
step.changeStatus(TransactionStepStatus.WAITING)
const flow = transaction.getFlow()
const nextSteps = await this.checkAllSteps(transaction)
const execution: Promise<void | unknown>[] = []
const payload = new TransactionPayload(
{
model_id: flow.modelId,
reply_to_topic: TransactionOrchestrator.getKeyName(
"trans",
flow.modelId
),
idempotency_key: TransactionOrchestrator.getKeyName(
flow.transactionId,
step.definition.action!,
type
),
action: step.definition.action + "",
action_type: type,
attempt: step.attempts,
timestamp: Date.now(),
},
transaction.payload,
transaction.getContext()
const hasTimedOut = await this.checkTransactionTimeout(
transaction,
nextSteps.next
)
const setStepFailure = async (
error: Error | any,
{ endRetry }: { endRetry?: boolean } = {}
) => {
return TransactionOrchestrator.setStepFailure(
transaction,
step,
error,
endRetry ? 0 : step.definition.maxRetries
)
if (hasTimedOut) {
continue
}
if (!step.definition.async) {
execution.push(
transaction
.handler(step.definition.action + "", type, payload, transaction)
.then(async (response: any) => {
await TransactionOrchestrator.setStepSuccess(
transaction,
step,
response
)
})
.catch(async (error) => {
if (
PermanentStepFailureError.isPermanentStepFailureError(error)
) {
await setStepFailure(error, { endRetry: true })
return
}
await setStepFailure(error)
})
if (nextSteps.remaining === 0) {
if (transaction.hasTimeout()) {
await transaction.clearTransactionTimeout()
}
if (flow.options?.retentionTime == undefined) {
await transaction.deleteCheckpoint()
} else {
await transaction.archiveCheckpoint()
}
this.emit(DistributedTransactionEvent.FINISH, { transaction })
}
let hasSyncSteps = false
for (const step of nextSteps.next) {
const curState = step.getStates()
const type = step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE
step.lastAttempt = Date.now()
step.attempts++
if (curState.state === TransactionState.NOT_STARTED) {
if (!step.startedAt) {
step.startedAt = Date.now()
}
if (step.isCompensating()) {
step.changeState(TransactionState.COMPENSATING)
if (step.definition.noCompensation) {
step.changeState(TransactionState.REVERTED)
continue
}
} else if (flow.state === TransactionState.INVOKING) {
step.changeState(TransactionState.INVOKING)
}
}
step.changeStatus(TransactionStepStatus.WAITING)
const payload = new TransactionPayload(
{
model_id: flow.modelId,
idempotency_key: TransactionOrchestrator.getKeyName(
flow.modelId,
flow.transactionId,
step.definition.action!,
type
),
action: step.definition.action + "",
action_type: type,
attempt: step.attempts,
timestamp: Date.now(),
},
transaction.payload,
transaction.getContext()
)
} else {
execution.push(
transaction.saveCheckpoint().then(async () =>
if (step.hasTimeout() && !step.timedOutAt && step.attempts === 1) {
await transaction.scheduleStepTimeout(step, step.definition.timeout!)
}
transaction.emit(DistributedTransactionEvent.STEP_BEGIN, {
step,
transaction,
})
const setStepFailure = async (
error: Error | any,
{ endRetry }: { endRetry?: boolean } = {}
) => {
return TransactionOrchestrator.setStepFailure(
transaction,
step,
error,
endRetry ? 0 : step.definition.maxRetries
)
}
const isAsync = step.isCompensating()
? step.definition.compensateAsync
: step.definition.async
if (!isAsync) {
hasSyncSteps = true
execution.push(
transaction
.handler(step.definition.action + "", type, payload, transaction)
.then(async (response: any) => {
await TransactionOrchestrator.setStepSuccess(
transaction,
step,
response
)
})
.catch(async (error) => {
if (
PermanentStepFailureError.isPermanentStepFailureError(error)
@@ -413,17 +562,44 @@ export class TransactionOrchestrator extends EventEmitter {
await setStepFailure(error, { endRetry: true })
return
}
await setStepFailure(error)
})
)
)
} else {
execution.push(
transaction.saveCheckpoint().then(async () =>
transaction
.handler(
step.definition.action + "",
type,
payload,
transaction
)
.catch(async (error) => {
if (
PermanentStepFailureError.isPermanentStepFailureError(error)
) {
await setStepFailure(error, { endRetry: true })
return
}
await setStepFailure(error)
})
)
)
}
}
}
await promiseAll(execution)
if (hasSyncSteps && flow.options?.strictCheckpoints) {
await transaction.saveCheckpoint()
}
if (nextSteps.next.length > 0) {
await this.executeNext(transaction)
await promiseAll(execution)
if (nextSteps.next.length === 0) {
continueExecution = false
}
}
}
@@ -433,7 +609,8 @@ export class TransactionOrchestrator extends EventEmitter {
*/
public async resume(transaction: DistributedTransaction): Promise<void> {
if (transaction.modelId !== this.id) {
throw new Error(
throw new MedusaError(
MedusaError.Types.NOT_ALLOWED,
`TransactionModel "${transaction.modelId}" cannot be orchestrated by "${this.id}" model.`
)
}
@@ -446,9 +623,23 @@ export class TransactionOrchestrator extends EventEmitter {
if (flow.state === TransactionState.NOT_STARTED) {
flow.state = TransactionState.INVOKING
this.emit("begin", transaction)
flow.startedAt = Date.now()
if (this.options?.storeExecution) {
await transaction.saveCheckpoint(
flow.hasAsyncSteps ? 0 : TransactionOrchestrator.DEFAULT_TTL
)
}
if (transaction.hasTimeout()) {
await transaction.scheduleTransactionTimeout(
transaction.getTimeoutInterval()!
)
}
this.emit(DistributedTransactionEvent.BEGIN, { transaction })
} else {
this.emit("resume", transaction)
this.emit(DistributedTransactionEvent.RESUME, { transaction })
}
await this.executeNext(transaction)
@@ -462,14 +653,18 @@ export class TransactionOrchestrator extends EventEmitter {
transaction: DistributedTransaction
): Promise<void> {
if (transaction.modelId !== this.id) {
throw new Error(
throw new MedusaError(
MedusaError.Types.NOT_ALLOWED,
`TransactionModel "${transaction.modelId}" cannot be orchestrated by "${this.id}" model.`
)
}
const flow = transaction.getFlow()
if (flow.state === TransactionState.FAILED) {
throw new Error(`Cannot revert a perment failed transaction.`)
throw new MedusaError(
MedusaError.Types.NOT_ALLOWED,
`Cannot revert a perment failed transaction.`
)
}
flow.state = TransactionState.WAITING_TO_COMPENSATE
@@ -477,33 +672,54 @@ export class TransactionOrchestrator extends EventEmitter {
await this.executeNext(transaction)
}
private async createTransactionFlow(
transactionId: string
): Promise<TransactionFlow> {
return {
private createTransactionFlow(transactionId: string): TransactionFlow {
const [steps, features] = TransactionOrchestrator.buildSteps(
this.definition
)
const hasAsyncSteps = features.hasAsyncSteps
const hasStepTimeouts = features.hasStepTimeouts
const hasRetriesTimeout = features.hasRetriesTimeout
this.options ??= {}
if (hasAsyncSteps || hasStepTimeouts || hasRetriesTimeout) {
this.options.storeExecution = true
}
const flow: TransactionFlow = {
modelId: this.id,
options: this.options,
transactionId: transactionId,
hasAsyncSteps,
hasFailedSteps: false,
hasSkippedSteps: false,
hasWaitingSteps: false,
timedOutAt: null,
state: TransactionState.NOT_STARTED,
definition: this.definition,
steps: TransactionOrchestrator.buildSteps(this.definition),
steps,
}
return flow
}
private static async loadTransactionById(
modelId: string,
transactionId: string
): Promise<TransactionCheckpoint | null> {
const transaction = await DistributedTransaction.loadTransaction(
modelId,
transactionId
)
if (transaction !== null) {
const flow = transaction.flow
transaction.flow.steps = TransactionOrchestrator.buildSteps(
const [steps] = TransactionOrchestrator.buildSteps(
flow.definition,
flow.steps
)
transaction.flow.steps = steps
return transaction
}
@@ -513,7 +729,14 @@ export class TransactionOrchestrator extends EventEmitter {
private static buildSteps(
flow: TransactionStepsDefinition,
existingSteps?: { [key: string]: TransactionStep }
): { [key: string]: TransactionStep } {
): [
{ [key: string]: TransactionStep },
{
hasAsyncSteps: boolean
hasStepTimeouts: boolean
hasRetriesTimeout: boolean
}
] {
const states: { [key: string]: TransactionStep } = {
[TransactionOrchestrator.ROOT_STEP]: {
id: TransactionOrchestrator.ROOT_STEP,
@@ -526,6 +749,12 @@ export class TransactionOrchestrator extends EventEmitter {
{ obj: flow, level: [TransactionOrchestrator.ROOT_STEP] },
]
const features = {
hasAsyncSteps: false,
hasStepTimeouts: false,
hasRetriesTimeout: false,
}
while (queue.length > 0) {
const { obj, level } = queue.shift()
@@ -547,11 +776,28 @@ export class TransactionOrchestrator extends EventEmitter {
const id = level.join(".")
const parent = level.slice(0, level.length - 1).join(".")
states[parent].next?.push(id)
if (!existingSteps || parent === TransactionOrchestrator.ROOT_STEP) {
states[parent].next?.push(id)
}
const definitionCopy = { ...obj }
delete definitionCopy.next
if (definitionCopy.async) {
features.hasAsyncSteps = true
}
if (definitionCopy.timeout) {
features.hasStepTimeouts = true
}
if (
definitionCopy.retryInterval ||
definitionCopy.retryIntervalAwaiting
) {
features.hasRetriesTimeout = true
}
states[id] = Object.assign(
new TransactionStep(),
existingSteps?.[id] || {
@@ -577,7 +823,7 @@ export class TransactionOrchestrator extends EventEmitter {
}
}
return states
return [states, features]
}
/** Create a new transaction
@@ -591,12 +837,12 @@ export class TransactionOrchestrator extends EventEmitter {
payload?: unknown
): Promise<DistributedTransaction> {
const existingTransaction =
await TransactionOrchestrator.loadTransactionById(transactionId)
await TransactionOrchestrator.loadTransactionById(this.id, transactionId)
let newTransaction = false
let modelFlow
let modelFlow: TransactionFlow
if (!existingTransaction) {
modelFlow = await this.createTransactionFlow(transactionId)
modelFlow = this.createTransactionFlow(transactionId)
newTransaction = true
} else {
modelFlow = existingTransaction.flow
@@ -609,13 +855,49 @@ export class TransactionOrchestrator extends EventEmitter {
existingTransaction?.errors,
existingTransaction?.context
)
if (newTransaction) {
await transaction.saveCheckpoint()
if (
newTransaction &&
this.options?.storeExecution &&
this.options?.strictCheckpoints
) {
await transaction.saveCheckpoint(
modelFlow.hasAsyncSteps ? 0 : TransactionOrchestrator.DEFAULT_TTL
)
}
return transaction
}
/** Returns an existing transaction
* @param transactionId - unique identifier of the transaction
* @param handler - function to handle action of the transaction
*/
public async retrieveExistingTransaction(
transactionId: string,
handler: TransactionStepHandler
): Promise<DistributedTransaction> {
const existingTransaction =
await TransactionOrchestrator.loadTransactionById(this.id, transactionId)
if (!existingTransaction) {
throw new MedusaError(
MedusaError.Types.NOT_FOUND,
`Transaction ${transactionId} could not be found.`
)
}
const transaction = new DistributedTransaction(
existingTransaction.flow,
handler,
undefined,
existingTransaction?.errors,
existingTransaction?.context
)
return transaction
}
private static getStepByAction(
flow: TransactionFlow,
action: string
@@ -633,9 +915,8 @@ export class TransactionOrchestrator extends EventEmitter {
handler?: TransactionStepHandler,
transaction?: DistributedTransaction
): Promise<[DistributedTransaction, TransactionStep]> {
const [transactionId, action, actionType] = responseIdempotencyKey.split(
TransactionOrchestrator.SEPARATOR
)
const [modelId, transactionId, action, actionType] =
responseIdempotencyKey.split(TransactionOrchestrator.SEPARATOR)
if (!transaction && !handler) {
throw new Error(
@@ -645,10 +926,16 @@ export class TransactionOrchestrator extends EventEmitter {
if (!transaction) {
const existingTransaction =
await TransactionOrchestrator.loadTransactionById(transactionId)
await TransactionOrchestrator.loadTransactionById(
modelId,
transactionId
)
if (existingTransaction === null) {
throw new Error(`Transaction ${transactionId} could not be found.`)
throw new MedusaError(
MedusaError.Types.NOT_FOUND,
`Transaction ${transactionId} could not be found.`
)
}
transaction = new DistributedTransaction(
@@ -697,16 +984,20 @@ export class TransactionOrchestrator extends EventEmitter {
)
if (step.getStates().status === TransactionStepStatus.WAITING) {
this.emit(DistributedTransactionEvent.RESUME, {
transaction: curTransaction,
})
await TransactionOrchestrator.setStepSuccess(
curTransaction,
step,
response
)
this.emit("resume", curTransaction)
await this.executeNext(curTransaction)
} else {
throw new Error(
throw new MedusaError(
MedusaError.Types.NOT_ALLOWED,
`Cannot set step success when status is ${step.getStates().status}`
)
}
@@ -736,16 +1027,21 @@ export class TransactionOrchestrator extends EventEmitter {
)
if (step.getStates().status === TransactionStepStatus.WAITING) {
this.emit(DistributedTransactionEvent.RESUME, {
transaction: curTransaction,
})
await TransactionOrchestrator.setStepFailure(
curTransaction,
step,
error,
0
)
this.emit("resume", curTransaction)
await this.executeNext(curTransaction)
} else {
throw new Error(
throw new MedusaError(
MedusaError.Types.NOT_ALLOWED,
`Cannot set step failure when status is ${step.getStates().status}`
)
}

View File

@@ -1,3 +1,4 @@
import { MedusaError } from "@medusajs/utils"
import {
DistributedTransaction,
TransactionPayload,
@@ -5,8 +6,8 @@ import {
import {
TransactionHandlerType,
TransactionState,
TransactionStepStatus,
TransactionStepsDefinition,
TransactionStepStatus,
} from "./types"
export type TransactionStepHandler = (
@@ -30,6 +31,8 @@ export class TransactionStep {
* @member attempts - The number of attempts made to execute the step
* @member failures - The number of failures encountered while executing the step
* @member lastAttempt - The timestamp of the last attempt made to execute the step
* @member hasScheduledRetry - A flag indicating if a retry has been scheduled
* @member retryRescheduledAt - The timestamp of the last retry scheduled
* @member next - The ids of the next steps in the flow
* @member saveResponse - A flag indicating if the response of a step should be shared in the transaction context and available to subsequent steps - default is true
*/
@@ -48,6 +51,10 @@ export class TransactionStep {
attempts: number
failures: number
lastAttempt: number | null
retryRescheduledAt: number | null
hasScheduledRetry: boolean
timedOutAt: number | null
startedAt?: number
next: string[]
saveResponse: boolean
@@ -70,6 +77,10 @@ export class TransactionStep {
return this.stepFailed
}
public isInvoking() {
return !this.stepFailed
}
public changeState(toState: TransactionState) {
const allowed = {
[TransactionState.DORMANT]: [TransactionState.NOT_STARTED],
@@ -99,7 +110,8 @@ export class TransactionStep {
return
}
throw new Error(
throw new MedusaError(
MedusaError.Types.NOT_ALLOWED,
`Updating State from "${curState.state}" to "${toState}" is not allowed.`
)
}
@@ -128,16 +140,49 @@ export class TransactionStep {
return
}
throw new Error(
throw new MedusaError(
MedusaError.Types.NOT_ALLOWED,
`Updating Status from "${curState.status}" to "${toStatus}" is not allowed.`
)
}
hasRetryScheduled(): boolean {
return !!this.hasScheduledRetry
}
hasRetryInterval(): boolean {
return !!this.definition.retryInterval
}
hasTimeout(): boolean {
return !!this.definition.timeout
}
getTimeoutInterval(): number | undefined {
return this.definition.timeout
}
canRetry(): boolean {
return (
!this.definition.retryInterval ||
!!(
this.lastAttempt &&
this.definition.retryInterval &&
Date.now() - this.lastAttempt > this.definition.retryInterval * 1e3
)
)
}
hasAwaitingRetry(): boolean {
return !!this.definition.retryIntervalAwaiting
}
canRetryAwaiting(): boolean {
return !!(
this.hasAwaitingRetry() &&
this.lastAttempt &&
this.definition.retryInterval &&
Date.now() - this.lastAttempt > this.definition.retryInterval * 1e3
Date.now() - this.lastAttempt >
this.definition.retryIntervalAwaiting! * 1e3
)
}
@@ -158,4 +203,14 @@ export class TransactionStep {
flowState === TransactionState.COMPENSATING
)
}
canCancel(): boolean {
return (
!this.isCompensating() &&
[
TransactionStepStatus.WAITING,
TransactionStepStatus.TEMPORARY_FAILURE,
].includes(this.getStates().status)
)
}
}

View File

@@ -1,3 +1,6 @@
import { DistributedTransaction } from "./distributed-transaction"
import { TransactionStep } from "./transaction-step"
export enum TransactionHandlerType {
INVOKE = "invoke",
COMPENSATE = "compensate",
@@ -9,8 +12,10 @@ export type TransactionStepsDefinition = {
noCompensation?: boolean
maxRetries?: number
retryInterval?: number
retryIntervalAwaiting?: number
timeout?: number
async?: boolean
compensateAsync?: boolean
noWait?: boolean
saveResponse?: boolean
next?: TransactionStepsDefinition | TransactionStepsDefinition[]
@@ -36,8 +41,67 @@ export enum TransactionState {
SKIPPED = "skipped",
}
export type TransactionModelOptions = {
timeout?: number
storeExecution?: boolean
retentionTime?: number
strictCheckpoints?: boolean
}
export type TransactionModel = {
id: string
flow: TransactionStepsDefinition
hash: string
options?: TransactionModelOptions
}
export enum DistributedTransactionEvent {
BEGIN = "begin",
RESUME = "resume",
COMPENSATE_BEGIN = "compensateBegin",
FINISH = "finish",
TIMEOUT = "timeout",
STEP_BEGIN = "stepBegin",
STEP_SUCCESS = "stepSuccess",
STEP_FAILURE = "stepFailure",
COMPENSATE_STEP_SUCCESS = "compensateStepSuccess",
COMPENSATE_STEP_FAILURE = "compensateStepFailure",
}
export type DistributedTransactionEvents = {
onBegin?: (args: { transaction: DistributedTransaction }) => void
onResume?: (args: { transaction: DistributedTransaction }) => void
onFinish?: (args: {
transaction: DistributedTransaction
result?: unknown
errors?: unknown[]
}) => void
onTimeout?: (args: { transaction: DistributedTransaction }) => void
onStepBegin?: (args: {
step: TransactionStep
transaction: DistributedTransaction
}) => void
onStepSuccess?: (args: {
step: TransactionStep
transaction: DistributedTransaction
}) => void
onStepFailure?: (args: {
step: TransactionStep
transaction: DistributedTransaction
}) => void
onCompensateBegin?: (args: { transaction: DistributedTransaction }) => void
onCompensateStepSuccess?: (args: {
step: TransactionStep
transaction: DistributedTransaction
}) => void
onCompensateStepFailure?: (args: {
step: TransactionStep
transaction: DistributedTransaction
}) => void
}

View File

@@ -2,17 +2,22 @@ import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
import { createContainerLike, createMedusaContainer } from "@medusajs/utils"
import { asValue } from "awilix"
import { DistributedTransaction } from "../transaction"
import {
DistributedTransaction,
DistributedTransactionEvents,
} from "../transaction"
import { WorkflowDefinition, WorkflowManager } from "./workflow-manager"
export class GlobalWorkflow extends WorkflowManager {
protected static workflows: Map<string, WorkflowDefinition> = new Map()
protected container: MedusaContainer
protected context: Context
protected subscribe: DistributedTransactionEvents
constructor(
modulesLoaded?: LoadedModule[] | MedusaContainer,
context?: Context
context?: Context,
subscribe?: DistributedTransactionEvents
) {
super()
@@ -35,6 +40,7 @@ export class GlobalWorkflow extends WorkflowManager {
this.container = container
this.context = context ?? {}
this.subscribe = subscribe ?? {}
}
async run(workflowId: string, uniqueTransactionId: string, input?: unknown) {
@@ -52,6 +58,18 @@ export class GlobalWorkflow extends WorkflowManager {
input
)
if (this.subscribe.onStepBegin) {
transaction.once("stepBegin", this.subscribe.onStepBegin)
}
if (this.subscribe.onStepSuccess) {
transaction.once("stepSuccess", this.subscribe.onStepSuccess)
}
if (this.subscribe.onStepFailure) {
transaction.once("stepFailure", this.subscribe.onStepFailure)
}
await orchestrator.resume(transaction)
return transaction
@@ -67,6 +85,21 @@ export class GlobalWorkflow extends WorkflowManager {
}
const workflow = WorkflowManager.workflows.get(workflowId)!
const orchestrator = workflow.orchestrator
orchestrator.once("resume", (transaction) => {
if (this.subscribe.onStepBegin) {
transaction.once("stepBegin", this.subscribe.onStepBegin)
}
if (this.subscribe.onStepSuccess) {
transaction.once("stepSuccess", this.subscribe.onStepSuccess)
}
if (this.subscribe.onStepFailure) {
transaction.once("stepFailure", this.subscribe.onStepFailure)
}
})
return await workflow.orchestrator.registerStepSuccess(
idempotencyKey,
workflow.handler(this.container, this.context),
@@ -85,6 +118,21 @@ export class GlobalWorkflow extends WorkflowManager {
}
const workflow = WorkflowManager.workflows.get(workflowId)!
const orchestrator = workflow.orchestrator
orchestrator.once("resume", (transaction) => {
if (this.subscribe.onStepBegin) {
transaction.once("stepBegin", this.subscribe.onStepBegin)
}
if (this.subscribe.onStepSuccess) {
transaction.once("stepSuccess", this.subscribe.onStepSuccess)
}
if (this.subscribe.onStepFailure) {
transaction.once("stepFailure", this.subscribe.onStepFailure)
}
})
return await workflow.orchestrator.registerStepFailure(
idempotencyKey,
error,

View File

@@ -3,6 +3,8 @@ import { createContainerLike, createMedusaContainer } from "@medusajs/utils"
import { asValue } from "awilix"
import {
DistributedTransaction,
DistributedTransactionEvent,
DistributedTransactionEvents,
TransactionOrchestrator,
TransactionStepsDefinition,
} from "../transaction"
@@ -79,7 +81,174 @@ export class LocalWorkflow {
return this.workflow.flow_
}
async run(uniqueTransactionId: string, input?: unknown, context?: Context) {
private registerEventCallbacks({
orchestrator,
transaction,
subscribe,
idempotencyKey,
}: {
orchestrator: TransactionOrchestrator
transaction?: DistributedTransaction
subscribe?: DistributedTransactionEvents
idempotencyKey?: string
}) {
const modelId = orchestrator.id
let transactionId
if (transaction) {
transactionId = transaction!.transactionId
} else if (idempotencyKey) {
const [, trxId] = idempotencyKey!.split(":")
transactionId = trxId
}
const eventWrapperMap = new Map()
for (const [key, handler] of Object.entries(subscribe ?? {})) {
eventWrapperMap.set(key, (args) => {
const { transaction } = args
if (
transaction.transactionId !== transactionId ||
transaction.modelId !== modelId
) {
return
}
handler(args)
})
}
if (subscribe?.onBegin) {
orchestrator.on(
DistributedTransactionEvent.BEGIN,
eventWrapperMap.get("onBegin")
)
}
if (subscribe?.onResume) {
orchestrator.on(
DistributedTransactionEvent.RESUME,
eventWrapperMap.get("onResume")
)
}
if (subscribe?.onCompensateBegin) {
orchestrator.on(
DistributedTransactionEvent.COMPENSATE_BEGIN,
eventWrapperMap.get("onCompensateBegin")
)
}
if (subscribe?.onTimeout) {
orchestrator.on(
DistributedTransactionEvent.TIMEOUT,
eventWrapperMap.get("onTimeout")
)
}
if (subscribe?.onFinish) {
orchestrator.on(
DistributedTransactionEvent.FINISH,
eventWrapperMap.get("onFinish")
)
}
const resumeWrapper = ({ transaction }) => {
if (
transaction.modelId !== modelId ||
transaction.transactionId !== transactionId
) {
return
}
if (subscribe?.onStepBegin) {
transaction.on(
DistributedTransactionEvent.STEP_BEGIN,
eventWrapperMap.get("onStepBegin")
)
}
if (subscribe?.onStepSuccess) {
transaction.on(
DistributedTransactionEvent.STEP_SUCCESS,
eventWrapperMap.get("onStepSuccess")
)
}
if (subscribe?.onStepFailure) {
transaction.on(
DistributedTransactionEvent.STEP_FAILURE,
eventWrapperMap.get("onStepFailure")
)
}
if (subscribe?.onCompensateStepSuccess) {
transaction.on(
DistributedTransactionEvent.COMPENSATE_STEP_SUCCESS,
eventWrapperMap.get("onCompensateStepSuccess")
)
}
if (subscribe?.onCompensateStepFailure) {
transaction.on(
DistributedTransactionEvent.COMPENSATE_STEP_FAILURE,
eventWrapperMap.get("onCompensateStepFailure")
)
}
}
if (transaction) {
resumeWrapper({ transaction })
} else {
orchestrator.once("resume", resumeWrapper)
}
const cleanUp = () => {
subscribe?.onFinish &&
orchestrator.removeListener(
DistributedTransactionEvent.FINISH,
eventWrapperMap.get("onFinish")
)
subscribe?.onResume &&
orchestrator.removeListener(
DistributedTransactionEvent.RESUME,
eventWrapperMap.get("onResume")
)
subscribe?.onBegin &&
orchestrator.removeListener(
DistributedTransactionEvent.BEGIN,
eventWrapperMap.get("onBegin")
)
subscribe?.onCompensateBegin &&
orchestrator.removeListener(
DistributedTransactionEvent.COMPENSATE_BEGIN,
eventWrapperMap.get("onCompensateBegin")
)
subscribe?.onTimeout &&
orchestrator.removeListener(
DistributedTransactionEvent.TIMEOUT,
eventWrapperMap.get("onTimeout")
)
orchestrator.removeListener(
DistributedTransactionEvent.RESUME,
resumeWrapper
)
eventWrapperMap.clear()
}
return {
cleanUpEventListeners: cleanUp,
}
}
async run(
uniqueTransactionId: string,
input?: unknown,
context?: Context,
subscribe?: DistributedTransactionEvents
) {
if (this.flow.hasChanges) {
this.commit()
}
@@ -92,36 +261,104 @@ export class LocalWorkflow {
input
)
const { cleanUpEventListeners } = this.registerEventCallbacks({
orchestrator,
transaction,
subscribe,
})
await orchestrator.resume(transaction)
cleanUpEventListeners()
return transaction
}
async getRunningTransaction(uniqueTransactionId: string, context?: Context) {
const { handler, orchestrator } = this.workflow
const transaction = await orchestrator.retrieveExistingTransaction(
uniqueTransactionId,
handler(this.container, context)
)
return transaction
}
async cancel(
uniqueTransactionId: string,
context?: Context,
subscribe?: DistributedTransactionEvents
) {
const { orchestrator } = this.workflow
const transaction = await this.getRunningTransaction(
uniqueTransactionId,
context
)
const { cleanUpEventListeners } = this.registerEventCallbacks({
orchestrator,
transaction,
subscribe,
})
await orchestrator.cancelTransaction(transaction)
cleanUpEventListeners()
return transaction
}
async registerStepSuccess(
idempotencyKey: string,
response?: unknown,
context?: Context
context?: Context,
subscribe?: DistributedTransactionEvents
): Promise<DistributedTransaction> {
const { handler, orchestrator } = this.workflow
return await orchestrator.registerStepSuccess(
const { cleanUpEventListeners } = this.registerEventCallbacks({
orchestrator,
idempotencyKey,
subscribe,
})
const transaction = await orchestrator.registerStepSuccess(
idempotencyKey,
handler(this.container, context),
undefined,
response
)
cleanUpEventListeners()
return transaction
}
async registerStepFailure(
idempotencyKey: string,
error?: Error | any,
context?: Context
context?: Context,
subscribe?: DistributedTransactionEvents
): Promise<DistributedTransaction> {
const { handler, orchestrator } = this.workflow
return await orchestrator.registerStepFailure(
const { cleanUpEventListeners } = this.registerEventCallbacks({
orchestrator,
idempotencyKey,
subscribe,
})
const transaction = await orchestrator.registerStepFailure(
idempotencyKey,
error,
handler(this.container, context)
)
cleanUpEventListeners()
return transaction
}
addAction(

View File

@@ -23,6 +23,28 @@ jest.mock("@medusajs/orchestration", () => {
}),
}
}),
registerStepSuccess: jest.fn(() => {
return {
getErrors: jest.fn(),
getState: jest.fn(() => "done"),
getContext: jest.fn(() => {
return {
invoke: { result_step: "invoke_test" },
}
}),
}
}),
registerStepFailure: jest.fn(() => {
return {
getErrors: jest.fn(),
getState: jest.fn(() => "done"),
getContext: jest.fn(() => {
return {
invoke: { result_step: "invoke_test" },
}
}),
}
}),
}
}),
}

View File

@@ -1,5 +1,6 @@
import {
DistributedTransaction,
DistributedTransactionEvents,
LocalWorkflow,
TransactionHandlerType,
TransactionState,
@@ -8,15 +9,36 @@ import {
import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
import { MedusaModule } from "@medusajs/modules-sdk"
import { OrchestrationUtils } from "@medusajs/utils"
import { EOL } from "os"
import { ulid } from "ulid"
import { OrchestrationUtils } from "@medusajs/utils"
import { MedusaWorkflow } from "../medusa-workflow"
import { resolveValue } from "../utils/composer"
export type FlowRunOptions<TData = unknown> = {
input?: TData
context?: Context
resultFrom?: string | string[]
resultFrom?: string | string[] | Symbol
throwOnError?: boolean
events?: DistributedTransactionEvents
}
export type FlowRegisterStepSuccessOptions<TData = unknown> = {
idempotencyKey: string
response?: TData
context?: Context
resultFrom?: string | string[] | Symbol
throwOnError?: boolean
events?: DistributedTransactionEvents
}
export type FlowRegisterStepFailureOptions<TData = unknown> = {
idempotencyKey: string
response?: TData
context?: Context
resultFrom?: string | string[] | Symbol
throwOnError?: boolean
events?: DistributedTransactionEvents
}
export type WorkflowResult<TResult = unknown> = {
@@ -25,24 +47,59 @@ export type WorkflowResult<TResult = unknown> = {
result: TResult
}
export type ExportedWorkflow<
TData = unknown,
TResult = unknown,
TDataOverride = undefined,
TResultOverride = undefined
> = {
run: (
args?: FlowRunOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
) => Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
>
registerStepSuccess: (
args?: FlowRegisterStepSuccessOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
) => Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
>
registerStepFailure: (
args?: FlowRegisterStepFailureOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
) => Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
>
}
export const exportWorkflow = <TData = unknown, TResult = unknown>(
workflowId: string,
defaultResult?: string,
dataPreparation?: (data: TData) => Promise<unknown>
defaultResult?: string | Symbol,
dataPreparation?: (data: TData) => Promise<unknown>,
options?: {
wrappedInput?: boolean
}
) => {
return function <TDataOverride = undefined, TResultOverride = undefined>(
function exportedWorkflow<
TDataOverride = undefined,
TResultOverride = undefined
>(
container?: LoadedModule[] | MedusaContainer
): Omit<LocalWorkflow, "run"> & {
run: (
args?: FlowRunOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
) => Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
>
} {
): Omit<
LocalWorkflow,
"run" | "registerStepSuccess" | "registerStepFailure"
> &
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride> {
if (!container) {
container = MedusaModule.getLoadedModules().map(
(mod) => Object.values(mod)[0]
@@ -52,8 +109,64 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
const flow = new LocalWorkflow(workflowId, container)
const originalRun = flow.run.bind(flow)
const originalRegisterStepSuccess = flow.registerStepSuccess.bind(flow)
const originalRegisterStepFailure = flow.registerStepFailure.bind(flow)
const originalExecution = async (
method,
{ throwOnError, resultFrom },
...args
) => {
const transaction = await method.apply(method, args)
const errors = transaction.getErrors(TransactionHandlerType.INVOKE)
const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED]
if (failedStatus.includes(transaction.getState()) && throwOnError) {
const errorMessage = errors
?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`)
?.join(`${EOL}`)
throw new Error(errorMessage)
}
let result: any = undefined
const resFrom =
resultFrom?.__type === OrchestrationUtils.SymbolWorkflowStep
? resultFrom.__step__
: resultFrom
if (resFrom) {
if (Array.isArray(resFrom)) {
result = resFrom.map((from) => {
const res = transaction.getContext().invoke?.[from]
return res?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData
? res.output
: res
})
} else {
const res = transaction.getContext().invoke?.[resFrom]
result =
res?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData
? res.output
: res
}
const ret = result || resFrom
result = options?.wrappedInput
? await resolveValue(ret, transaction.getContext())
: ret
}
return {
errors,
transaction,
result,
}
}
const newRun = async (
{ input, context, throwOnError, resultFrom }: FlowRunOptions = {
{ input, context, throwOnError, resultFrom, events }: FlowRunOptions = {
throwOnError: true,
resultFrom: defaultResult,
}
@@ -77,59 +190,77 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
}
}
const transaction = await originalRun(
return await originalExecution(
originalRun,
{ throwOnError, resultFrom },
context?.transactionId ?? ulid(),
input,
context
context,
events
)
const errors = transaction.getErrors(TransactionHandlerType.INVOKE)
const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED]
if (failedStatus.includes(transaction.getState()) && throwOnError) {
const errorMessage = errors
?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`)
?.join(`${EOL}`)
throw new Error(errorMessage)
}
let result: any = undefined
if (resultFrom) {
if (Array.isArray(resultFrom)) {
result = resultFrom.map((from) => {
const res = transaction.getContext().invoke?.[from]
return res?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData
? res.output
: res
})
} else {
const res = transaction.getContext().invoke?.[resultFrom]
result =
res?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData
? res.output
: res
}
}
return {
errors,
transaction,
result,
}
}
flow.run = newRun as any
return flow as unknown as LocalWorkflow & {
run: (
args?: FlowRunOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
) => Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
>
const newRegisterStepSuccess = async (
{
response,
idempotencyKey,
context,
throwOnError,
resultFrom,
events,
}: FlowRegisterStepSuccessOptions = {
idempotencyKey: "",
throwOnError: true,
resultFrom: defaultResult,
}
) => {
resultFrom ??= defaultResult
throwOnError ??= true
return await originalExecution(
originalRegisterStepSuccess,
{ throwOnError, resultFrom },
idempotencyKey,
response,
context,
events
)
}
flow.registerStepSuccess = newRegisterStepSuccess as any
const newRegisterStepFailure = async (
{
response,
idempotencyKey,
context,
throwOnError,
resultFrom,
events,
}: FlowRegisterStepFailureOptions = {
idempotencyKey: "",
throwOnError: true,
resultFrom: defaultResult,
}
) => {
resultFrom ??= defaultResult
throwOnError ??= true
return await originalExecution(
originalRegisterStepFailure,
{ throwOnError, resultFrom },
idempotencyKey,
response,
context,
events
)
}
flow.registerStepFailure = newRegisterStepFailure as any
return flow as unknown as LocalWorkflow &
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>
}
MedusaWorkflow.registerWorkflow(workflowId, exportedWorkflow)
return exportedWorkflow
}

View File

@@ -1,3 +1,4 @@
export * from "./helper"
export * from "./medusa-workflow"
export * from "./utils/composer"
export * as Composer from "./utils/composer"

View File

@@ -0,0 +1,27 @@
import { LocalWorkflow } from "@medusajs/orchestration"
import { LoadedModule, MedusaContainer } from "@medusajs/types"
import { ExportedWorkflow } from "./helper"
export class MedusaWorkflow {
static workflows: Record<
string,
(
container?: LoadedModule[] | MedusaContainer
) => Omit<
LocalWorkflow,
"run" | "registerStepSuccess" | "registerStepFailure"
> &
ExportedWorkflow
> = {}
static registerWorkflow(workflowId, exportedWorkflow) {
MedusaWorkflow.workflows[workflowId] = exportedWorkflow
}
static getWorkflow(workflowId) {
return MedusaWorkflow.workflows[workflowId]
}
}
global.MedusaWorkflow ??= MedusaWorkflow
exports.MedusaWorkflow = global.MedusaWorkflow

View File

@@ -5,8 +5,7 @@ import {
} from "@medusajs/orchestration"
import { LoadedModule, MedusaContainer } from "@medusajs/types"
import { OrchestrationUtils } from "@medusajs/utils"
import { exportWorkflow, FlowRunOptions, WorkflowResult } from "../../helper"
import { resolveValue } from "./helpers"
import { ExportedWorkflow, exportWorkflow } from "../../helper"
import { proxify } from "./helpers/proxy"
import {
CreateWorkflowComposerContext,
@@ -66,17 +65,11 @@ global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = null
type ReturnWorkflow<TData, TResult, THooks extends Record<string, Function>> = {
<TDataOverride = undefined, TResultOverride = undefined>(
container?: LoadedModule[] | MedusaContainer
): Omit<LocalWorkflow, "run"> & {
run: (
args?: FlowRunOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
) => Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
>
}
): Omit<
LocalWorkflow,
"run" | "registerStepSuccess" | "registerStepFailure"
> &
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>
} & THooks & {
getName: () => string
}
@@ -198,43 +191,19 @@ export function createWorkflow<
WorkflowManager.update(name, context.flow, handlers)
const workflow = exportWorkflow<TData, TResult>(name)
const workflow = exportWorkflow<TData, TResult>(
name,
returnedStep,
undefined,
{
wrappedInput: true,
}
)
const mainFlow = <TDataOverride = undefined, TResultOverride = undefined>(
container?: LoadedModule[] | MedusaContainer
) => {
const workflow_ = workflow<TDataOverride, TResultOverride>(container)
const originalRun = workflow_.run
workflow_.run = (async (
args?: FlowRunOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
): Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
> => {
args ??= {}
args.resultFrom ??=
returnedStep?.__type === OrchestrationUtils.SymbolWorkflowStep
? returnedStep.__step__
: undefined
// Forwards the input to the ref object on composer.apply
const workflowResult = (await originalRun(
args
)) as unknown as WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
workflowResult.result = await resolveValue(
workflowResult.result || returnedStep,
workflowResult.transaction.getContext()
)
return workflowResult
}) as any
return workflow_
}

View File

@@ -41,7 +41,7 @@ export async function resolveValue(input, transactionContext) {
if (Array.isArray(inputTOUnwrap)) {
return await promiseAll(
inputTOUnwrap.map((i) => unwrapInput(i, transactionContext))
inputTOUnwrap.map((i) => resolveValue(i, transactionContext))
)
}