chore(workflow-engine-*): cleanup and improvements (#13789)

**What**
Cleanup recent work on workflows
This commit is contained in:
Adrien de Peretti
2025-10-23 12:50:24 +02:00
committed by GitHub
parent 356dcc94ce
commit d51ae2768b
10 changed files with 344 additions and 310 deletions

View File

@@ -0,0 +1,8 @@
---
"@medusajs/workflow-engine-inmemory": patch
"@medusajs/workflow-engine-redis": patch
"@medusajs/orchestration": patch
"@medusajs/workflows-sdk": patch
---
chore(workflow-engine-\*): cleanup and improvements

View File

@@ -55,6 +55,9 @@ const mergeStep = (
} }
} }
const getErrorSignature = (err: TransactionStepError) =>
`${err.action}:${err.handlerType}:${err.error?.message}`
/** /**
* @typedef TransactionMetadata * @typedef TransactionMetadata
* @property model_id - The id of the model_id that created the transaction (modelId). * @property model_id - The id of the model_id that created the transaction (modelId).
@@ -105,7 +108,52 @@ const stateFlowOrder = [
TransactionState.FAILED, TransactionState.FAILED,
] ]
const stateFlowOrderMap = new Map<TransactionState, number>(
stateFlowOrder.map((state, index) => [state, index])
)
const finishedStatesSet = new Set([
TransactionState.DONE,
TransactionState.REVERTED,
TransactionState.FAILED,
])
export class TransactionCheckpoint { export class TransactionCheckpoint {
static readonly #ALLOWED_STATE_TRANSITIONS = {
[TransactionStepState.DORMANT]: [TransactionStepState.NOT_STARTED],
[TransactionStepState.NOT_STARTED]: [
TransactionStepState.INVOKING,
TransactionStepState.COMPENSATING,
TransactionStepState.FAILED,
TransactionStepState.SKIPPED,
TransactionStepState.SKIPPED_FAILURE,
],
[TransactionStepState.INVOKING]: [
TransactionStepState.FAILED,
TransactionStepState.DONE,
TransactionStepState.TIMEOUT,
TransactionStepState.SKIPPED,
],
[TransactionStepState.COMPENSATING]: [
TransactionStepState.REVERTED,
TransactionStepState.FAILED,
],
[TransactionStepState.DONE]: [TransactionStepState.COMPENSATING],
} as const
static readonly #ALLOWED_STATUS_TRANSITIONS = {
[TransactionStepStatus.WAITING]: [
TransactionStepStatus.OK,
TransactionStepStatus.TEMPORARY_FAILURE,
TransactionStepStatus.PERMANENT_FAILURE,
],
[TransactionStepStatus.TEMPORARY_FAILURE]: [
TransactionStepStatus.IDLE,
TransactionStepStatus.PERMANENT_FAILURE,
],
[TransactionStepStatus.PERMANENT_FAILURE]: [TransactionStepStatus.IDLE],
} as const
constructor( constructor(
public flow: TransactionFlow, public flow: TransactionFlow,
public context: TransactionContext, public context: TransactionContext,
@@ -165,17 +213,15 @@ export class TransactionCheckpoint {
currentTransactionData.flow[prop] ?? 0 currentTransactionData.flow[prop] ?? 0
) )
} else if (prop === "state") { } else if (prop === "state") {
const curState = stateFlowOrder.findIndex( const currentStateIndex =
(state) => state === currentTransactionData.flow.state stateFlowOrderMap.get(currentTransactionData.flow.state) ?? -1
) const storedStateIndex =
const storedState = stateFlowOrder.findIndex( stateFlowOrderMap.get(storedData.flow.state) ?? -1
(state) => state === storedData.flow.state
)
if (storedState > curState) { if (storedStateIndex > currentStateIndex) {
currentTransactionData.flow.state = storedData.flow.state currentTransactionData.flow.state = storedData.flow.state
} else if ( } else if (
curState < storedState && currentStateIndex < storedStateIndex &&
currentTransactionData.flow.state !== currentTransactionData.flow.state !==
TransactionState.WAITING_TO_COMPENSATE TransactionState.WAITING_TO_COMPENSATE
) { ) {
@@ -265,43 +311,6 @@ export class TransactionCheckpoint {
status: TransactionStepStatus status: TransactionStepStatus
} }
): boolean { ): boolean {
// Define allowed state transitions
const allowedStateTransitions = {
[TransactionStepState.DORMANT]: [TransactionStepState.NOT_STARTED],
[TransactionStepState.NOT_STARTED]: [
TransactionStepState.INVOKING,
TransactionStepState.COMPENSATING,
TransactionStepState.FAILED,
TransactionStepState.SKIPPED,
TransactionStepState.SKIPPED_FAILURE,
],
[TransactionStepState.INVOKING]: [
TransactionStepState.FAILED,
TransactionStepState.DONE,
TransactionStepState.TIMEOUT,
TransactionStepState.SKIPPED,
],
[TransactionStepState.COMPENSATING]: [
TransactionStepState.REVERTED,
TransactionStepState.FAILED,
],
[TransactionStepState.DONE]: [TransactionStepState.COMPENSATING],
}
// Define allowed status transitions
const allowedStatusTransitions = {
[TransactionStepStatus.WAITING]: [
TransactionStepStatus.OK,
TransactionStepStatus.TEMPORARY_FAILURE,
TransactionStepStatus.PERMANENT_FAILURE,
],
[TransactionStepStatus.TEMPORARY_FAILURE]: [
TransactionStepStatus.IDLE,
TransactionStepStatus.PERMANENT_FAILURE,
],
[TransactionStepStatus.PERMANENT_FAILURE]: [TransactionStepStatus.IDLE],
}
if ( if (
currentStepState.state === storedStepState.state && currentStepState.state === storedStepState.state &&
currentStepState.status === storedStepState.status currentStepState.status === storedStepState.status
@@ -311,7 +320,9 @@ export class TransactionCheckpoint {
// Check if state transition from stored to current is allowed // Check if state transition from stored to current is allowed
const allowedStatesFromCurrent = const allowedStatesFromCurrent =
allowedStateTransitions[currentStepState.state] || [] TransactionCheckpoint.#ALLOWED_STATE_TRANSITIONS[
currentStepState.state
] || []
const isStateTransitionValid = allowedStatesFromCurrent.includes( const isStateTransitionValid = allowedStatesFromCurrent.includes(
storedStepState.state storedStepState.state
) )
@@ -328,7 +339,9 @@ export class TransactionCheckpoint {
// Check if status transition from stored to current is allowed // Check if status transition from stored to current is allowed
const allowedStatusesFromCurrent = const allowedStatusesFromCurrent =
allowedStatusTransitions[currentStepState.status] || [] TransactionCheckpoint.#ALLOWED_STATUS_TRANSITIONS[
currentStepState.status
] || []
return allowedStatusesFromCurrent.includes(storedStepState.status) return allowedStatusesFromCurrent.includes(storedStepState.status)
} }
@@ -338,15 +351,13 @@ export class TransactionCheckpoint {
incomingErrors: TransactionStepError[] incomingErrors: TransactionStepError[]
): void { ): void {
const existingErrorSignatures = new Set( const existingErrorSignatures = new Set(
currentErrors.map( currentErrors.map(getErrorSignature)
(err) => `${err.action}:${err.handlerType}:${err.error?.message}`
)
) )
for (const error of incomingErrors) { for (const error of incomingErrors) {
const signature = `${error.action}:${error.handlerType}:${error.error?.message}` if (!existingErrorSignatures.has(getErrorSignature(error))) {
if (!existingErrorSignatures.has(signature)) {
currentErrors.push(error) currentErrors.push(error)
existingErrorSignatures.add(getErrorSignature(error))
} }
} }
} }
@@ -451,11 +462,7 @@ class DistributedTransaction extends EventEmitter {
} }
public hasFinished(): boolean { public hasFinished(): boolean {
return [ return finishedStatesSet.has(this.getState())
TransactionState.DONE,
TransactionState.REVERTED,
TransactionState.FAILED,
].includes(this.getState())
} }
public getState(): TransactionState { public getState(): TransactionState {
@@ -520,7 +527,7 @@ class DistributedTransaction extends EventEmitter {
this.transactionId this.transactionId
) )
let checkpoint let checkpoint: TransactionCheckpoint
let retries = 0 let retries = 0
let backoffMs = 50 let backoffMs = 50
@@ -553,7 +560,7 @@ class DistributedTransaction extends EventEmitter {
await setTimeoutPromise(backoffMs + jitter) await setTimeoutPromise(backoffMs + jitter)
backoffMs = Math.min(backoffMs * 2, 1000) backoffMs = Math.min(backoffMs * 2, 500)
const lastCheckpoint = await DistributedTransaction.loadTransaction( const lastCheckpoint = await DistributedTransaction.loadTransaction(
this.modelId, this.modelId,
@@ -697,65 +704,45 @@ class DistributedTransaction extends EventEmitter {
* @returns * @returns
*/ */
#serializeCheckpointData() { #serializeCheckpointData() {
const data = new TransactionCheckpoint(
this.getFlow(),
this.getContext(),
this.getErrors()
)
const isSerializable = (obj) => {
try {
JSON.parse(JSON.stringify(obj))
return true
} catch {
return false
}
}
let rawData
try { try {
rawData = JSON.parse(JSON.stringify(data)) JSON.stringify(this.context)
} catch (e) { } catch {
if (!isSerializable(this.context)) { throw new NonSerializableCheckPointError(
// This is a safe guard, we should never reach this point "Unable to serialize context object. Please make sure the workflow input and steps response are serializable."
// If we do, it means that the context is not serializable
// and we should throw an error
throw new NonSerializableCheckPointError(
"Unable to serialize context object. Please make sure the workflow input and steps response are serializable."
)
}
if (!isSerializable(this.errors)) {
const nonSerializableErrors: TransactionStepError[] = []
for (const error of this.errors) {
if (!isSerializable(error.error)) {
error.error = {
name: error.error.name,
message: error.error.message,
stack: error.error.stack,
}
nonSerializableErrors.push({
...error,
error: e,
})
}
}
if (nonSerializableErrors.length) {
this.errors.push(...nonSerializableErrors)
}
}
const data = new TransactionCheckpoint(
this.getFlow(),
this.getContext(),
this.getErrors()
) )
rawData = JSON.parse(JSON.stringify(data))
} }
return rawData let errorsToUse = this.getErrors()
try {
JSON.stringify(errorsToUse)
} catch {
// Sanitize non-serializable errors
const sanitizedErrors: TransactionStepError[] = []
for (const error of this.errors) {
try {
JSON.stringify(error)
sanitizedErrors.push(error)
} catch {
sanitizedErrors.push({
action: error.action,
handlerType: error.handlerType,
error: {
name: error.error?.name || "Error",
message: error.error?.message || String(error.error),
stack: error.error?.stack,
},
})
}
}
errorsToUse = sanitizedErrors
this.errors = sanitizedErrors
}
return new TransactionCheckpoint(
JSON.parse(JSON.stringify(this.getFlow())),
this.getContext(),
[...errorsToUse]
)
} }
} }

View File

@@ -40,6 +40,33 @@ import {
TransactionTimeoutError, TransactionTimeoutError,
} from "./errors" } from "./errors"
const canMoveForwardStates = new Set([
TransactionStepState.DONE,
TransactionStepState.FAILED,
TransactionStepState.TIMEOUT,
TransactionStepState.SKIPPED,
TransactionStepState.SKIPPED_FAILURE,
])
const canMoveBackwardStates = new Set([
TransactionStepState.DONE,
TransactionStepState.REVERTED,
TransactionStepState.FAILED,
TransactionStepState.DORMANT,
TransactionStepState.SKIPPED,
])
const flagStepsToRevertStates = new Set([
TransactionStepState.DONE,
TransactionStepState.TIMEOUT,
])
const setStepTimeoutSkipStates = new Set([
TransactionStepState.TIMEOUT,
TransactionStepState.DONE,
TransactionStepState.REVERTED,
])
/** /**
* @class TransactionOrchestrator is responsible for managing and executing distributed transactions. * @class TransactionOrchestrator is responsible for managing and executing distributed transactions.
* It is based on a single transaction definition, which is used to execute all the transaction steps * It is based on a single transaction definition, which is used to execute all the transaction steps
@@ -184,14 +211,6 @@ export class TransactionOrchestrator extends EventEmitter {
} }
private canMoveForward(flow: TransactionFlow, previousStep: TransactionStep) { private canMoveForward(flow: TransactionFlow, previousStep: TransactionStep) {
const states = [
TransactionStepState.DONE,
TransactionStepState.FAILED,
TransactionStepState.TIMEOUT,
TransactionStepState.SKIPPED,
TransactionStepState.SKIPPED_FAILURE,
]
const siblings = TransactionOrchestrator.getPreviousStep( const siblings = TransactionOrchestrator.getPreviousStep(
flow, flow,
previousStep previousStep
@@ -199,23 +218,15 @@ export class TransactionOrchestrator extends EventEmitter {
return ( return (
!!previousStep.definition.noWait || !!previousStep.definition.noWait ||
siblings.every((sib) => states.includes(sib.invoke.state)) siblings.every((sib) => canMoveForwardStates.has(sib.invoke.state))
) )
} }
private canMoveBackward(flow: TransactionFlow, step: TransactionStep) { private canMoveBackward(flow: TransactionFlow, step: TransactionStep) {
const states = [
TransactionStepState.DONE,
TransactionStepState.REVERTED,
TransactionStepState.FAILED,
TransactionStepState.DORMANT,
TransactionStepState.SKIPPED,
]
const siblings = step.next.map((sib) => flow.steps[sib]) const siblings = step.next.map((sib) => flow.steps[sib])
return ( return (
siblings.length === 0 || siblings.length === 0 ||
siblings.every((sib) => states.includes(sib.compensate.state)) siblings.every((sib) => canMoveBackwardStates.has(sib.compensate.state))
) )
} }
@@ -521,9 +532,7 @@ export class TransactionOrchestrator extends EventEmitter {
} }
if ( if (
[TransactionStepState.DONE, TransactionStepState.TIMEOUT].includes( flagStepsToRevertStates.has(curState.state) ||
curState.state
) ||
curState.status === TransactionStepStatus.PERMANENT_FAILURE curState.status === TransactionStepStatus.PERMANENT_FAILURE
) { ) {
stepDef.beginCompensation() stepDef.beginCompensation()
@@ -707,13 +716,7 @@ export class TransactionOrchestrator extends EventEmitter {
step: TransactionStep, step: TransactionStep,
error: TransactionStepTimeoutError | TransactionTimeoutError error: TransactionStepTimeoutError | TransactionTimeoutError
): Promise<void> { ): Promise<void> {
if ( if (setStepTimeoutSkipStates.has(step.getStates().state)) {
[
TransactionStepState.TIMEOUT,
TransactionStepState.DONE,
TransactionStepState.REVERTED,
].includes(step.getStates().state)
) {
return return
} }
@@ -773,13 +776,10 @@ export class TransactionOrchestrator extends EventEmitter {
error = serializeError(error) error = serializeError(error)
} else { } else {
try { try {
if (error?.message) { const serialized = JSON.stringify(error)
error = JSON.parse(JSON.stringify(error)) error = error?.message
} else { ? JSON.parse(serialized)
error = { : { message: serialized }
message: JSON.stringify(error),
}
}
} catch (e) { } catch (e) {
error = { error = {
message: "Unknown non-serializable error", message: "Unknown non-serializable error",
@@ -953,6 +953,18 @@ export class TransactionOrchestrator extends EventEmitter {
return shouldContinueExecution return shouldContinueExecution
}) })
let asyncStepCount = 0
for (const s of nextSteps.next) {
const stepIsAsync = s.isCompensating()
? s.definition.compensateAsync
: s.definition.async
if (stepIsAsync) asyncStepCount++
}
const hasMultipleAsyncSteps = asyncStepCount > 1
const hasAsyncSteps = !!asyncStepCount
// If there is any async step, we don't need to save the checkpoint here as it will be saved
// later down there
await transaction.saveCheckpoint().catch((error) => { await transaction.saveCheckpoint().catch((error) => {
if (TransactionOrchestrator.isExpectedError(error)) { if (TransactionOrchestrator.isExpectedError(error)) {
continueExecution = false continueExecution = false
@@ -962,11 +974,15 @@ export class TransactionOrchestrator extends EventEmitter {
throw error throw error
}) })
if (!continueExecution) {
break
}
const execution: Promise<void | unknown>[] = [] const execution: Promise<void | unknown>[] = []
const executionAsync: (() => Promise<void | unknown>)[] = [] const executionAsync: (() => Promise<void | unknown>)[] = []
let i = 0 let i = 0
let hasAsyncSteps = false
for (const step of nextSteps.next) { for (const step of nextSteps.next) {
const stepIndex = i++ const stepIndex = i++
if (!stepsShouldContinueExecution[stepIndex]) { if (!stepsShouldContinueExecution[stepIndex]) {
@@ -988,21 +1004,9 @@ export class TransactionOrchestrator extends EventEmitter {
// Compute current transaction state // Compute current transaction state
await this.computeCurrentTransactionState(transaction) await this.computeCurrentTransactionState(transaction)
if (!continueExecution) {
break
}
const promise = this.createStepExecutionPromise(transaction, step) const promise = this.createStepExecutionPromise(transaction, step)
const hasMultipleAsyncSteps =
nextSteps.next.filter((step) => {
const isAsync = step.isCompensating()
? step.definition.compensateAsync
: step.definition.async
return isAsync
}).length > 1
const hasVersionControl = const hasVersionControl =
hasMultipleAsyncSteps || step.hasAwaitingRetry() hasMultipleAsyncSteps || step.hasAwaitingRetry()
@@ -1017,7 +1021,6 @@ export class TransactionOrchestrator extends EventEmitter {
) )
} else { } else {
// Execute async step in background as part of the next event loop cycle and continue the execution of the transaction // Execute async step in background as part of the next event loop cycle and continue the execution of the transaction
hasAsyncSteps = true
executionAsync.push(() => executionAsync.push(() =>
this.executeAsyncStep(promise, transaction, step, nextSteps) this.executeAsyncStep(promise, transaction, step, nextSteps)
) )
@@ -1105,12 +1108,9 @@ export class TransactionOrchestrator extends EventEmitter {
private createStepPayload( private createStepPayload(
transaction: DistributedTransactionType, transaction: DistributedTransactionType,
step: TransactionStep, step: TransactionStep,
flow: TransactionFlow flow: TransactionFlow,
type: TransactionHandlerType
): TransactionPayload { ): TransactionPayload {
const type = step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE
return new TransactionPayload( return new TransactionPayload(
{ {
model_id: flow.modelId, model_id: flow.modelId,
@@ -1136,13 +1136,9 @@ export class TransactionOrchestrator extends EventEmitter {
private prepareHandlerArgs( private prepareHandlerArgs(
transaction: DistributedTransactionType, transaction: DistributedTransactionType,
step: TransactionStep, step: TransactionStep,
flow: TransactionFlow, payload: TransactionPayload,
payload: TransactionPayload type: TransactionHandlerType
): Parameters<TransactionStepHandler> { ): Parameters<TransactionStepHandler> {
const type = step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE
return [ return [
step.definition.action + "", step.definition.action + "",
type, type,
@@ -1164,11 +1160,13 @@ export class TransactionOrchestrator extends EventEmitter {
? TransactionHandlerType.COMPENSATE ? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE : TransactionHandlerType.INVOKE
const flow = transaction.getFlow()
const payload = this.createStepPayload(transaction, step, flow, type)
const handlerArgs = this.prepareHandlerArgs( const handlerArgs = this.prepareHandlerArgs(
transaction, transaction,
step, step,
transaction.getFlow(), payload,
this.createStepPayload(transaction, step, transaction.getFlow()) type
) )
const traceData = { const traceData = {

View File

@@ -1,18 +1,49 @@
import { isDefined } from "./is-defined" import { isDefined } from "./is-defined"
/** /**
* Only apply JSON.parse JSON.stringify when we have objects, arrays, dates, etc.. * Creates a deep copy of the input, ensuring it's JSON-serializable.
* - Breaks all reference sharing (creates new objects/arrays)
* - Removes non-serializable values (functions, symbols, undefined properties)
* - Normalizes special types (Date -> string)
* - Only stringifies special objects, not entire tree (optimization)
* @param result * @param result
* @returns * @returns A deep copy with no shared references, guaranteed to be JSON-serializable
*/ */
export function parseStringifyIfNecessary(result: unknown) { export function parseStringifyIfNecessary(result: unknown): any {
if (typeof result == null || typeof result !== "object") { if (result == null || typeof result !== "object") {
return result return result
} }
const strResult = JSON.stringify(result) if (Array.isArray(result)) {
if (isDefined(strResult)) { return result.map((item) => parseStringifyIfNecessary(item))
return JSON.parse(strResult)
} }
return result
const isPlainObject =
result.constructor === Object || result.constructor === undefined
if (!isPlainObject) {
const strResult = JSON.stringify(result)
if (isDefined(strResult)) {
return JSON.parse(strResult)
}
return undefined
}
const copy: any = {}
for (const key in result) {
if (result.hasOwnProperty(key)) {
const value = (result as any)[key]
if (typeof value === "function" || typeof value === "symbol") {
continue
}
const copiedValue = parseStringifyIfNecessary(value)
if (copiedValue !== undefined) {
copy[key] = copiedValue
}
}
}
return copy
} }

View File

@@ -49,7 +49,7 @@
"scripts": { "scripts": {
"build": "rimraf dist && tsc --build", "build": "rimraf dist && tsc --build",
"watch": "tsc --build --watch", "watch": "tsc --build --watch",
"test": "jest --bail --forceExit", "test": "jest --bail --forceExit -- src/**/__tests__/**/*.spec.ts",
"test:run": "node ./dist/utils/_playground.js" "test:run": "node ./dist/utils/_playground.js"
} }
} }

View File

@@ -1,5 +1,4 @@
import { import {
deepCopy,
isObject, isObject,
OrchestrationUtils, OrchestrationUtils,
parseStringifyIfNecessary, parseStringifyIfNecessary,
@@ -10,10 +9,10 @@ import * as util from "node:util"
type InputPrimitive = string | Symbol type InputPrimitive = string | Symbol
type InputObject = object & { __type?: string | Symbol; output?: any } type InputObject = object & { __type?: string | Symbol; output?: any }
function resolveProperty(property, transactionContext) { function resolveProperty(property: any, transactionContext: any) {
const { invoke: invokeRes } = transactionContext const { invoke: invokeRes } = transactionContext
let res let res: any
if (property.__type === OrchestrationUtils.SymbolInputReference) { if (property.__type === OrchestrationUtils.SymbolInputReference) {
res = transactionContext.payload res = transactionContext.payload
@@ -132,7 +131,7 @@ function unwrapInput({
if (result != null && typeof result === "object") { if (result != null && typeof result === "object") {
const unwrapped = unwrapInput({ const unwrapped = unwrapInput({
inputTOUnwrap: result, inputTOUnwrap: result,
parentRef: parentRef[key] || {}, parentRef: {},
transactionContext, transactionContext,
}) })
if (unwrapped instanceof Promise) { if (unwrapped instanceof Promise) {
@@ -161,7 +160,7 @@ function unwrapInput({
if (resolved != null && typeof resolved === "object") { if (resolved != null && typeof resolved === "object") {
const unwrapped = unwrapInput({ const unwrapped = unwrapInput({
inputTOUnwrap: resolved, inputTOUnwrap: resolved,
parentRef: parentRef[key] || {}, parentRef: {},
transactionContext, transactionContext,
}) })
if (unwrapped instanceof Promise) { if (unwrapped instanceof Promise) {
@@ -184,18 +183,17 @@ function unwrapInput({
export function resolveValue( export function resolveValue(
input: InputPrimitive | InputObject | unknown | undefined, input: InputPrimitive | InputObject | unknown | undefined,
transactionContext transactionContext: any
): Promise<any> | any { ): Promise<any> | any {
if (input == null || typeof input !== "object") { if (input == null || typeof input !== "object") {
return input return input
} }
const input_ = deepCopy( const input_ =
(input as InputObject)?.__type === (input as InputObject)?.__type ===
OrchestrationUtils.SymbolWorkflowWorkflowData OrchestrationUtils.SymbolWorkflowWorkflowData
? (input as InputObject).output ? (input as InputObject).output
: input : input
)
let result: any let result: any

View File

@@ -1102,7 +1102,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
expect(executionsListAfter).toHaveLength(1) expect(executionsListAfter).toHaveLength(1)
}) })
it("should display error when multple async steps are running in parallel", async () => { it("should display error when multiple async steps are running in parallel", async () => {
let errors: Error[] = [] let errors: Error[] = []
const onFinishPromise = new Promise<void>((resolve) => { const onFinishPromise = new Promise<void>((resolve) => {
void workflowOrcModule.subscribe({ void workflowOrcModule.subscribe({

View File

@@ -31,22 +31,25 @@ import { WorkflowExecution } from "../models/workflow-execution"
const THIRTY_MINUTES_IN_MS = 1000 * 60 * 30 const THIRTY_MINUTES_IN_MS = 1000 * 60 * 30
const doneStates = [ const doneStates = new Set([
TransactionStepState.DONE, TransactionStepState.DONE,
TransactionStepState.REVERTED, TransactionStepState.REVERTED,
TransactionStepState.FAILED, TransactionStepState.FAILED,
TransactionStepState.SKIPPED, TransactionStepState.SKIPPED,
TransactionStepState.SKIPPED_FAILURE, TransactionStepState.SKIPPED_FAILURE,
TransactionStepState.TIMEOUT, TransactionStepState.TIMEOUT,
] ])
const finishedStates = [ const finishedStates = new Set([
TransactionState.DONE, TransactionState.DONE,
TransactionState.FAILED, TransactionState.FAILED,
TransactionState.REVERTED, TransactionState.REVERTED,
] ])
const failedStates = [TransactionState.FAILED, TransactionState.REVERTED] const failedStates = new Set([
TransactionState.FAILED,
TransactionState.REVERTED,
])
function calculateDelayFromExpression(expression: CronExpression): number { function calculateDelayFromExpression(expression: CronExpression): number {
const nextTime = expression.next().getTime() const nextTime = expression.next().getTime()
@@ -179,7 +182,7 @@ export class InMemoryDistributedTransactionStorage
private async saveToDb(data: TransactionCheckpoint, retentionTime?: number) { private async saveToDb(data: TransactionCheckpoint, retentionTime?: number) {
const isNotStarted = data.flow.state === TransactionState.NOT_STARTED const isNotStarted = data.flow.state === TransactionState.NOT_STARTED
const asyncVersion = data.flow._v const asyncVersion = data.flow._v
const isFinished = finishedStates.includes(data.flow.state) const isFinished = finishedStates.has(data.flow.state)
const isWaitingToCompensate = const isWaitingToCompensate =
data.flow.state === TransactionState.WAITING_TO_COMPENSATE data.flow.state === TransactionState.WAITING_TO_COMPENSATE
@@ -187,16 +190,16 @@ export class InMemoryDistributedTransactionStorage
const stepsArray = Object.values(data.flow.steps) as TransactionStep[] const stepsArray = Object.values(data.flow.steps) as TransactionStep[]
let currentStep!: TransactionStep let currentStep!: TransactionStep
let currentStepsIsAsync = false
const targetStates = isFlowInvoking const targetStates = isFlowInvoking
? [ ? new Set([
TransactionStepState.INVOKING, TransactionStepState.INVOKING,
TransactionStepState.DONE, TransactionStepState.DONE,
TransactionStepState.FAILED, TransactionStepState.FAILED,
] ])
: [TransactionStepState.COMPENSATING] : new Set([TransactionStepState.COMPENSATING])
// Find the current step from the end
for (let i = stepsArray.length - 1; i >= 0; i--) { for (let i = stepsArray.length - 1; i >= 0; i--) {
const step = stepsArray[i] const step = stepsArray[i]
@@ -204,20 +207,29 @@ export class InMemoryDistributedTransactionStorage
break break
} }
const isTargetState = targetStates.includes(step.invoke?.state) const isTargetState = targetStates.has(step.invoke?.state)
if (isTargetState) { if (isTargetState && !currentStep) {
currentStep = step currentStep = step
break break
} }
} }
const currentStepsIsAsync = currentStep if (currentStep) {
? stepsArray.some( for (const step of stepsArray) {
(step) => if (step.id === "_root") {
step?.definition?.async === true && step.depth === currentStep.depth continue
) }
: false
if (
step.depth === currentStep.depth &&
step?.definition?.async === true
) {
currentStepsIsAsync = true
break
}
}
}
if ( if (
!(isNotStarted || isFinished || isWaitingToCompensate) && !(isNotStarted || isFinished || isWaitingToCompensate) &&
@@ -284,7 +296,7 @@ export class InMemoryDistributedTransactionStorage
const execution = trx.execution as TransactionFlow const execution = trx.execution as TransactionFlow
if (!idempotent) { if (!idempotent) {
const isFailedOrReverted = failedStates.includes(execution.state) const isFailedOrReverted = failedStates.has(execution.state)
const isDone = execution.state === TransactionState.DONE const isDone = execution.state === TransactionState.DONE
@@ -331,21 +343,12 @@ export class InMemoryDistributedTransactionStorage
*/ */
const { retentionTime } = options ?? {} const { retentionTime } = options ?? {}
const hasFinished = finishedStates.includes(data.flow.state) const hasFinished = finishedStates.has(data.flow.state)
let cachedCheckpoint: TransactionCheckpoint | undefined
const getCheckpoint = async (options?: TransactionOptions) => {
if (!cachedCheckpoint) {
cachedCheckpoint = await this.get(key, options)
}
return cachedCheckpoint
}
await this.#preventRaceConditionExecutionIfNecessary({ await this.#preventRaceConditionExecutionIfNecessary({
data, data,
key, key,
options, options,
getCheckpoint,
}) })
// Only store retention time if it's provided // Only store retention time if it's provided
@@ -377,12 +380,12 @@ export class InMemoryDistributedTransactionStorage
TransactionCheckpoint.mergeCheckpoints(data, storedData) TransactionCheckpoint.mergeCheckpoints(data, storedData)
} }
const { flow, errors } = data const { flow, context, errors } = data
this.storage[key] = { this.storage[key] = {
flow, flow: JSON.parse(JSON.stringify(flow)),
context: {} as TransactionContext, context: JSON.parse(JSON.stringify(context)),
errors, errors: [...errors],
} as TransactionCheckpoint } as TransactionCheckpoint
// Optimize DB operations - only perform when necessary // Optimize DB operations - only perform when necessary
@@ -412,14 +415,10 @@ export class InMemoryDistributedTransactionStorage
data, data,
key, key,
options, options,
getCheckpoint,
}: { }: {
data: TransactionCheckpoint data: TransactionCheckpoint
key: string key: string
options?: TransactionOptions options?: TransactionOptions
getCheckpoint: (
options: TransactionOptions
) => Promise<TransactionCheckpoint | undefined>
}) { }) {
const isInitialCheckpoint = [TransactionState.NOT_STARTED].includes( const isInitialCheckpoint = [TransactionState.NOT_STARTED].includes(
data.flow.state data.flow.state
@@ -441,7 +440,7 @@ export class InMemoryDistributedTransactionStorage
} as Parameters<typeof this.get>[1] } as Parameters<typeof this.get>[1]
data_ = data_ =
(await getCheckpoint(getOptions as TransactionOptions)) ?? (await this.get(key, getOptions as TransactionOptions)) ??
({ flow: {} } as TransactionCheckpoint) ({ flow: {} } as TransactionCheckpoint)
} }
@@ -457,7 +456,7 @@ export class InMemoryDistributedTransactionStorage
? latestStep.compensate?.state ? latestStep.compensate?.state
: latestStep.invoke?.state : latestStep.invoke?.state
const shouldSkip = doneStates.includes(latestState) const shouldSkip = doneStates.has(latestState)
if (shouldSkip) { if (shouldSkip) {
throw new SkipStepAlreadyFinishedError( throw new SkipStepAlreadyFinishedError(
@@ -750,7 +749,7 @@ export class InMemoryDistributedTransactionStorage
}, },
updated_at: { updated_at: {
$lte: raw( $lte: raw(
(alias) => (_alias) =>
`CURRENT_TIMESTAMP - (INTERVAL '1 second' * "retention_time")` `CURRENT_TIMESTAMP - (INTERVAL '1 second' * "retention_time")`
), ),
}, },

View File

@@ -37,22 +37,25 @@ enum JobType {
const THIRTY_MINUTES_IN_MS = 1000 * 60 * 30 const THIRTY_MINUTES_IN_MS = 1000 * 60 * 30
const REPEATABLE_CLEARER_JOB_ID = "clear-expired-executions" const REPEATABLE_CLEARER_JOB_ID = "clear-expired-executions"
const doneStates = [ const doneStates = new Set([
TransactionStepState.DONE, TransactionStepState.DONE,
TransactionStepState.REVERTED, TransactionStepState.REVERTED,
TransactionStepState.FAILED, TransactionStepState.FAILED,
TransactionStepState.SKIPPED, TransactionStepState.SKIPPED,
TransactionStepState.SKIPPED_FAILURE, TransactionStepState.SKIPPED_FAILURE,
TransactionStepState.TIMEOUT, TransactionStepState.TIMEOUT,
] ])
const finishedStates = [ const finishedStates = new Set([
TransactionState.DONE, TransactionState.DONE,
TransactionState.FAILED, TransactionState.FAILED,
TransactionState.REVERTED, TransactionState.REVERTED,
] ])
const failedStates = [TransactionState.FAILED, TransactionState.REVERTED] const failedStates = new Set([
TransactionState.FAILED,
TransactionState.REVERTED,
])
export class RedisDistributedTransactionStorage export class RedisDistributedTransactionStorage
implements IDistributedTransactionStorage, IDistributedSchedulerStorage implements IDistributedTransactionStorage, IDistributedSchedulerStorage
{ {
@@ -280,7 +283,7 @@ export class RedisDistributedTransactionStorage
const isNotStarted = data.flow.state === TransactionState.NOT_STARTED const isNotStarted = data.flow.state === TransactionState.NOT_STARTED
const asyncVersion = data.flow._v const asyncVersion = data.flow._v
const isFinished = finishedStates.includes(data.flow.state) const isFinished = finishedStates.has(data.flow.state)
const isWaitingToCompensate = const isWaitingToCompensate =
data.flow.state === TransactionState.WAITING_TO_COMPENSATE data.flow.state === TransactionState.WAITING_TO_COMPENSATE
@@ -288,16 +291,16 @@ export class RedisDistributedTransactionStorage
const stepsArray = Object.values(data.flow.steps) as TransactionStep[] const stepsArray = Object.values(data.flow.steps) as TransactionStep[]
let currentStep!: TransactionStep let currentStep!: TransactionStep
let currentStepsIsAsync = false
const targetStates = isFlowInvoking const targetStates = isFlowInvoking
? [ ? new Set([
TransactionStepState.INVOKING, TransactionStepState.INVOKING,
TransactionStepState.DONE, TransactionStepState.DONE,
TransactionStepState.FAILED, TransactionStepState.FAILED,
] ])
: [TransactionStepState.COMPENSATING] : new Set([TransactionStepState.COMPENSATING])
// Find the current step from the end
for (let i = stepsArray.length - 1; i >= 0; i--) { for (let i = stepsArray.length - 1; i >= 0; i--) {
const step = stepsArray[i] const step = stepsArray[i]
@@ -305,20 +308,29 @@ export class RedisDistributedTransactionStorage
break break
} }
const isTargetState = targetStates.includes(step.invoke?.state) const isTargetState = targetStates.has(step.invoke?.state)
if (isTargetState) { if (isTargetState && !currentStep) {
currentStep = step currentStep = step
break break
} }
} }
const currentStepsIsAsync = currentStep if (currentStep) {
? stepsArray.some( for (const step of stepsArray) {
(step) => if (step.id === "_root") {
step?.definition?.async === true && step.depth === currentStep.depth continue
) }
: false
if (
step.depth === currentStep.depth &&
step?.definition?.async === true
) {
currentStepsIsAsync = true
break
}
}
}
if ( if (
!(isNotStarted || isFinished || isWaitingToCompensate) && !(isNotStarted || isFinished || isWaitingToCompensate) &&
@@ -395,29 +407,36 @@ export class RedisDistributedTransactionStorage
async get( async get(
key: string, key: string,
options?: TransactionOptions & { isCancelling?: boolean } options?: TransactionOptions & {
isCancelling?: boolean
_cachedRawData?: string | null
}
): Promise<TransactionCheckpoint | undefined> { ): Promise<TransactionCheckpoint | undefined> {
const [_, workflowId, transactionId] = key.split(":") const [_, workflowId, transactionId] = key.split(":")
const trx = await this.workflowExecutionService_
.list( const [trx, rawData] = await promiseAll([
{ this.workflowExecutionService_
workflow_id: workflowId, .list(
transaction_id: transactionId, {
}, workflow_id: workflowId,
{ transaction_id: transactionId,
select: ["execution", "context"],
order: {
id: "desc",
}, },
take: 1, {
} select: ["execution", "context"],
) order: {
.then((trx) => trx[0]) id: "desc",
.catch(() => undefined) },
take: 1,
}
)
.then((trx) => trx[0])
.catch(() => undefined),
options?._cachedRawData !== undefined
? Promise.resolve(options._cachedRawData)
: this.redisClient.get(key),
])
if (trx) { if (trx) {
const rawData = await this.redisClient.get(key)
let flow!: TransactionFlow, errors!: TransactionStepError[] let flow!: TransactionFlow, errors!: TransactionStepError[]
if (rawData) { if (rawData) {
const data = JSON.parse(rawData) const data = JSON.parse(rawData)
@@ -429,7 +448,7 @@ export class RedisDistributedTransactionStorage
const execution = trx.execution as TransactionFlow const execution = trx.execution as TransactionFlow
if (!idempotent) { if (!idempotent) {
const isFailedOrReverted = failedStates.includes(execution.state) const isFailedOrReverted = failedStates.has(execution.state)
const isDone = execution.state === TransactionState.DONE const isDone = execution.state === TransactionState.DONE
@@ -470,6 +489,8 @@ export class RedisDistributedTransactionStorage
let lockAcquired = false let lockAcquired = false
let storedData: TransactionCheckpoint | undefined
if (data.flow._v) { if (data.flow._v) {
lockAcquired = await this.#acquireLock(key) lockAcquired = await this.#acquireLock(key)
@@ -477,7 +498,7 @@ export class RedisDistributedTransactionStorage
throw new Error("Lock not acquired") throw new Error("Lock not acquired")
} }
const storedData = await this.get(key, { storedData = await this.get(key, {
isCancelling: !!data.flow.cancelledAt, isCancelling: !!data.flow.cancelledAt,
} as any) } as any)
@@ -485,21 +506,13 @@ export class RedisDistributedTransactionStorage
} }
try { try {
const hasFinished = finishedStates.includes(data.flow.state) const hasFinished = finishedStates.has(data.flow.state)
let cachedCheckpoint: TransactionCheckpoint | undefined
const getCheckpoint = async (options?: TransactionOptions) => {
if (!cachedCheckpoint) {
cachedCheckpoint = await this.get(key, options)
}
return cachedCheckpoint
}
await this.#preventRaceConditionExecutionIfNecessary({ await this.#preventRaceConditionExecutionIfNecessary({
data: data, data: data,
key, key,
options, options,
getCheckpoint, storedData,
}) })
// Only set if not exists // Only set if not exists
@@ -514,11 +527,10 @@ export class RedisDistributedTransactionStorage
} }
const execPipeline = () => { const execPipeline = () => {
const lightData_ = { const stringifiedData = JSON.stringify({
errors: data.errors, errors: data.errors,
flow: data.flow, flow: data.flow,
} })
const stringifiedData = JSON.stringify(lightData_)
const pipeline = this.redisClient.pipeline() const pipeline = this.redisClient.pipeline()
@@ -558,17 +570,15 @@ export class RedisDistributedTransactionStorage
}) })
} }
// Parallelize DB and Redis operations for better performance
if (hasFinished && !retentionTime) { if (hasFinished && !retentionTime) {
if (!data.flow.metadata?.parentStepIdempotencyKey) { if (!data.flow.metadata?.parentStepIdempotencyKey) {
await this.deleteFromDb(data) await promiseAll([this.deleteFromDb(data), execPipeline()])
await execPipeline()
} else { } else {
await this.saveToDb(data, retentionTime) await promiseAll([this.saveToDb(data, retentionTime), execPipeline()])
await execPipeline()
} }
} else { } else {
await this.saveToDb(data, retentionTime) await promiseAll([this.saveToDb(data, retentionTime), execPipeline()])
await execPipeline()
} }
return data as TransactionCheckpoint return data as TransactionCheckpoint
@@ -801,14 +811,12 @@ export class RedisDistributedTransactionStorage
data, data,
key, key,
options, options,
getCheckpoint, storedData,
}: { }: {
data: TransactionCheckpoint data: TransactionCheckpoint
key: string key: string
options?: TransactionOptions options?: TransactionOptions
getCheckpoint: ( storedData?: TransactionCheckpoint
options: TransactionOptions
) => Promise<TransactionCheckpoint | undefined>
}) { }) {
const isInitialCheckpoint = [TransactionState.NOT_STARTED].includes( const isInitialCheckpoint = [TransactionState.NOT_STARTED].includes(
data.flow.state data.flow.state
@@ -819,19 +827,24 @@ export class RedisDistributedTransactionStorage
*/ */
const currentFlow = data.flow const currentFlow = data.flow
const rawData = await this.redisClient.get(key) let data_ = storedData ?? ({} as TransactionCheckpoint)
let data_ = {} as TransactionCheckpoint
if (rawData) {
data_ = JSON.parse(rawData)
} else {
const getOptions = {
...options,
isCancelling: !!data.flow.cancelledAt,
} as Parameters<typeof this.get>[1]
data_ = if (!storedData) {
(await getCheckpoint(getOptions as TransactionOptions)) ?? const rawData = await this.redisClient.get(key)
({ flow: {} } as TransactionCheckpoint) if (rawData) {
data_ = JSON.parse(rawData)
} else {
// Pass cached raw data to avoid redundant Redis fetch
const getOptions = {
...options,
isCancelling: !!data.flow.cancelledAt,
_cachedRawData: rawData,
} as Parameters<typeof this.get>[1]
data_ =
(await this.get(key, getOptions as TransactionOptions)) ??
({ flow: {} } as TransactionCheckpoint)
}
} }
const { flow: latestUpdatedFlow } = data_ const { flow: latestUpdatedFlow } = data_
@@ -846,7 +859,7 @@ export class RedisDistributedTransactionStorage
? latestStep.compensate?.state ? latestStep.compensate?.state
: latestStep.invoke?.state : latestStep.invoke?.state
const shouldSkip = doneStates.includes(latestState) const shouldSkip = doneStates.has(latestState)
if (shouldSkip) { if (shouldSkip) {
throw new SkipStepAlreadyFinishedError( throw new SkipStepAlreadyFinishedError(
@@ -891,7 +904,7 @@ export class RedisDistributedTransactionStorage
}, },
updated_at: { updated_at: {
$lte: raw( $lte: raw(
(alias) => (_alias) =>
`CURRENT_TIMESTAMP - (INTERVAL '1 second' * "retention_time")` `CURRENT_TIMESTAMP - (INTERVAL '1 second' * "retention_time")`
), ),
}, },