fix(): Properly handle workflow as step now that events are fixed entirely (#12196)

**What**
Now that all events management are fixed in the workflows life cycle, the run as step needs to leverage the workflow engine if present (which should always be the case for async workflows) in order to ensure the continuation and the ability to mark parent step in parent workflow as success or failure

Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
This commit is contained in:
Adrien de Peretti
2025-04-17 11:34:19 +02:00
committed by GitHub
parent 9abcf7a83a
commit 8618e6ee38
16 changed files with 410 additions and 145 deletions

View File

@@ -296,6 +296,63 @@ export class TransactionOrchestrator extends EventEmitter {
total: number
remaining: number
completed: number
}> {
const flow = transaction.getFlow()
const result = await this.computeCurrentTransactionState(transaction)
// Handle state transitions and emit events
if (
flow.state === TransactionState.WAITING_TO_COMPENSATE &&
result.next.length === 0 &&
!flow.hasWaitingSteps
) {
flow.state = TransactionState.COMPENSATING
this.flagStepsToRevert(flow)
this.emit(DistributedTransactionEvent.COMPENSATE_BEGIN, { transaction })
return await this.checkAllSteps(transaction)
} else if (result.completed === result.total) {
if (result.hasSkippedOnFailure) {
flow.hasSkippedOnFailureSteps = true
}
if (result.hasSkipped) {
flow.hasSkippedSteps = true
}
if (result.hasIgnoredFailure) {
flow.hasFailedSteps = true
}
if (result.hasFailed) {
flow.state = TransactionState.FAILED
} else {
flow.state = result.hasReverted
? TransactionState.REVERTED
: TransactionState.DONE
}
}
return {
current: result.current,
next: result.next,
total: result.total,
remaining: result.total - result.completed,
completed: result.completed,
}
}
private async computeCurrentTransactionState(
transaction: DistributedTransactionType
): Promise<{
current: TransactionStep[]
next: TransactionStep[]
total: number
completed: number
hasSkipped: boolean
hasSkippedOnFailure: boolean
hasIgnoredFailure: boolean
hasFailed: boolean
hasWaiting: boolean
hasReverted: boolean
}> {
let hasSkipped = false
let hasSkippedOnFailure = false
@@ -306,7 +363,6 @@ export class TransactionOrchestrator extends EventEmitter {
let completedSteps = 0
const flow = transaction.getFlow()
const nextSteps: TransactionStep[] = []
const currentSteps: TransactionStep[] = []
@@ -393,43 +449,17 @@ export class TransactionOrchestrator extends EventEmitter {
flow.hasWaitingSteps = hasWaiting
flow.hasRevertedSteps = hasReverted
const totalSteps = allSteps.length - 1
if (
flow.state === TransactionState.WAITING_TO_COMPENSATE &&
nextSteps.length === 0 &&
!hasWaiting
) {
flow.state = TransactionState.COMPENSATING
this.flagStepsToRevert(flow)
this.emit(DistributedTransactionEvent.COMPENSATE_BEGIN, { transaction })
return await this.checkAllSteps(transaction)
} else if (completedSteps === totalSteps) {
if (hasSkippedOnFailure) {
flow.hasSkippedOnFailureSteps = true
}
if (hasSkipped) {
flow.hasSkippedSteps = true
}
if (hasIgnoredFailure) {
flow.hasFailedSteps = true
}
if (hasFailed) {
flow.state = TransactionState.FAILED
} else {
flow.state = hasReverted
? TransactionState.REVERTED
: TransactionState.DONE
}
}
return {
current: currentSteps,
next: nextSteps,
total: totalSteps,
remaining: totalSteps - completedSteps,
total: allSteps.length - 1,
completed: completedSteps,
hasSkipped,
hasSkippedOnFailure,
hasIgnoredFailure,
hasFailed,
hasWaiting,
hasReverted,
}
}
@@ -756,6 +786,9 @@ export class TransactionOrchestrator extends EventEmitter {
? step.definition.compensateAsync
: step.definition.async
// Compute current transaction state
await this.computeCurrentTransactionState(transaction)
// Save checkpoint before executing step
await transaction.saveCheckpoint().catch((error) => {
if (SkipExecutionError.isSkipExecutionError(error)) {
@@ -777,9 +810,8 @@ export class TransactionOrchestrator extends EventEmitter {
this.executeSyncStep(promise, transaction, step, nextSteps)
)
} else {
execution.push(
this.executeAsyncStep(promise, transaction, step, nextSteps)
)
// Execute async step in background and continue the execution of the transaction
this.executeAsyncStep(promise, transaction, step, nextSteps)
}
}
@@ -789,14 +821,6 @@ export class TransactionOrchestrator extends EventEmitter {
continueExecution = false
}
}
// Recompute the current flow flags
await this.checkAllSteps(transaction)
await transaction.saveCheckpoint().catch((error) => {
if (!SkipExecutionError.isSkipExecutionError(error)) {
throw error
}
})
}
/**
@@ -1007,6 +1031,9 @@ export class TransactionOrchestrator extends EventEmitter {
transaction,
step,
})
// Schedule to continue the execution of async steps because they are not awaited on purpose and can be handled by another machine
await transaction.scheduleRetry(step, 0)
return
} else {
if (!step.definition.backgroundExecution || step.definition.nested) {
const eventName = DistributedTransactionEvent.STEP_AWAITING
@@ -1063,7 +1090,7 @@ export class TransactionOrchestrator extends EventEmitter {
? step.definition.compensateAsync
: step.definition.async
if (isDefined(response) && step.saveResponse) {
if (isDefined(response) && step.saveResponse && !isAsync) {
transaction.addResponse(
step.definition.action!,
step.isCompensating()
@@ -1080,6 +1107,7 @@ export class TransactionOrchestrator extends EventEmitter {
)
if (isAsync && !ret.stopExecution) {
// Schedule to continue the execution of async steps because they are not awaited on purpose and can be handled by another machine
await transaction.scheduleRetry(step, 0)
}
}
@@ -1094,10 +1122,6 @@ export class TransactionOrchestrator extends EventEmitter {
isPermanent: boolean,
response?: unknown
): Promise<void> {
const isAsync = step.isCompensating()
? step.definition.compensateAsync
: step.definition.async
if (isDefined(response) && step.saveResponse) {
transaction.addResponse(
step.definition.action!,
@@ -1108,16 +1132,12 @@ export class TransactionOrchestrator extends EventEmitter {
)
}
const ret = await TransactionOrchestrator.setStepFailure(
await TransactionOrchestrator.setStepFailure(
transaction,
step,
error,
isPermanent ? 0 : step.definition.maxRetries
)
if (isAsync && !ret.stopExecution) {
await transaction.scheduleRetry(step, 0)
}
}
/**

View File

@@ -258,6 +258,8 @@ export type TransactionFlow = {
eventGroupId?: string
parentIdempotencyKey?: string
sourcePath?: string
preventReleaseEvents?: boolean
parentStepIdempotencyKey?: string
[key: string]: unknown
}
hasAsyncSteps: boolean

View File

@@ -125,8 +125,6 @@ export class LocalWorkflow {
args[ctxIndex] = context
}
} else if (hasContext) {
args[ctxIndex!].eventGroupId ??= this_.medusaContext?.eventGroupId
}
const method = target[prop]
@@ -364,12 +362,7 @@ export class LocalWorkflow {
handler: handler(this.container_, context),
payload: input,
flowMetadata,
onLoad: (transaction) => {
if (this.medusaContext) {
this.medusaContext.eventGroupId =
transaction.getFlow().metadata?.eventGroupId
}
},
onLoad: this.onLoad.bind(this),
})
const { cleanUpEventListeners } = this.registerEventCallbacks({
@@ -451,12 +444,7 @@ export class LocalWorkflow {
responseIdempotencyKey: idempotencyKey,
handler: handler(this.container_, context),
response,
onLoad: (transaction) => {
if (this.medusaContext) {
this.medusaContext.eventGroupId =
transaction.getFlow().metadata?.eventGroupId
}
},
onLoad: this.onLoad.bind(this),
})
try {
@@ -485,12 +473,7 @@ export class LocalWorkflow {
responseIdempotencyKey: idempotencyKey,
error,
handler: handler(this.container_, context),
onLoad: (transaction) => {
if (this.medusaContext) {
this.medusaContext.eventGroupId =
transaction.getFlow().metadata?.eventGroupId
}
},
onLoad: this.onLoad.bind(this),
})
try {
@@ -594,4 +577,16 @@ export class LocalWorkflow {
)
}
}
private onLoad(transaction: DistributedTransactionType) {
if (this.medusaContext) {
const flow = transaction.getFlow() ?? {}
const metadata = (flow.metadata ??
{}) as Required<TransactionFlow>["metadata"]
this.medusaContext.eventGroupId = metadata.eventGroupId
this.medusaContext.parentStepIdempotencyKey =
metadata.parentStepIdempotencyKey
this.medusaContext.preventReleaseEvents = metadata?.preventReleaseEvents
}
}
}