fix(workflows-sdk): Miss match context usage within run as step (#12449)
**What** Currently, runAsStep keep reference of the workflow context that is being run as step, except that the step is composed for the current workflow composition and not the workflow being run as a step. Therefore, the context are currently miss matched leading to wrong configuration being used in case of async workflows. **BUG** This fix allow the runAsStep to use the current composition context to configure the step for the sub workflow to be run **BUG BREAKING** fix the step config wrongly used to wrap async step handlers. Now steps configured async through .config that returns a new step response will indeed marked itself as success without the need for background execution or calling setStepSuccess (as it was expected originally) **FEATURE** This pr also add support for cancelling running transaction, the transaction will be marked as being cancelled, once the current step finished, it will cancel the transaction to start compensating all previous steps including itself Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
ab22faaa52
commit
7fdbf2a965
@@ -98,3 +98,19 @@ export class SkipExecutionError extends Error {
|
||||
this.name = "SkipExecutionError"
|
||||
}
|
||||
}
|
||||
|
||||
export class SkipCancelledExecutionError extends Error {
|
||||
static isSkipCancelledExecutionError(
|
||||
error: Error
|
||||
): error is SkipCancelledExecutionError {
|
||||
return (
|
||||
error instanceof SkipCancelledExecutionError ||
|
||||
error?.name === "SkipCancelledExecutionError"
|
||||
)
|
||||
}
|
||||
|
||||
constructor(message?: string) {
|
||||
super(message)
|
||||
this.name = "SkipCancelledExecutionError"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ import {
|
||||
import { EventEmitter } from "events"
|
||||
import {
|
||||
PermanentStepFailureError,
|
||||
SkipCancelledExecutionError,
|
||||
SkipExecutionError,
|
||||
SkipStepResponse,
|
||||
TransactionStepTimeoutError,
|
||||
@@ -494,6 +495,7 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
response: unknown
|
||||
): Promise<{
|
||||
stopExecution: boolean
|
||||
transactionIsCancelling?: boolean
|
||||
}> {
|
||||
const hasStepTimedOut =
|
||||
step.getStates().state === TransactionStepState.TIMEOUT
|
||||
@@ -519,10 +521,20 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
}
|
||||
|
||||
let shouldEmit = true
|
||||
let transactionIsCancelling = false
|
||||
try {
|
||||
await transaction.saveCheckpoint()
|
||||
} catch (error) {
|
||||
shouldEmit = false
|
||||
if (
|
||||
!SkipCancelledExecutionError.isSkipCancelledExecutionError(error) &&
|
||||
!SkipExecutionError.isSkipExecutionError(error)
|
||||
) {
|
||||
throw error
|
||||
}
|
||||
|
||||
transactionIsCancelling =
|
||||
SkipCancelledExecutionError.isSkipCancelledExecutionError(error)
|
||||
shouldEmit = !SkipExecutionError.isSkipExecutionError(error)
|
||||
}
|
||||
|
||||
const cleaningUp: Promise<unknown>[] = []
|
||||
@@ -544,6 +556,7 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
|
||||
return {
|
||||
stopExecution: !shouldEmit,
|
||||
transactionIsCancelling,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -555,6 +568,7 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
step: TransactionStep
|
||||
}): Promise<{
|
||||
stopExecution: boolean
|
||||
transactionIsCancelling?: boolean
|
||||
}> {
|
||||
const hasStepTimedOut =
|
||||
step.getStates().state === TransactionStepState.TIMEOUT
|
||||
@@ -565,13 +579,22 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
}
|
||||
|
||||
let shouldEmit = true
|
||||
let transactionIsCancelling = false
|
||||
try {
|
||||
await transaction.saveCheckpoint()
|
||||
} catch (error) {
|
||||
if (
|
||||
!SkipCancelledExecutionError.isSkipCancelledExecutionError(error) &&
|
||||
!SkipExecutionError.isSkipExecutionError(error)
|
||||
) {
|
||||
throw error
|
||||
}
|
||||
|
||||
transactionIsCancelling =
|
||||
SkipCancelledExecutionError.isSkipCancelledExecutionError(error)
|
||||
|
||||
if (SkipExecutionError.isSkipExecutionError(error)) {
|
||||
shouldEmit = false
|
||||
} else {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
@@ -592,6 +615,7 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
|
||||
return {
|
||||
stopExecution: !shouldEmit,
|
||||
transactionIsCancelling,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -649,6 +673,7 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
timeoutError?: TransactionStepTimeoutError | TransactionTimeoutError
|
||||
): Promise<{
|
||||
stopExecution: boolean
|
||||
transactionIsCancelling?: boolean
|
||||
}> {
|
||||
if (SkipExecutionError.isSkipExecutionError(error)) {
|
||||
return {
|
||||
@@ -734,14 +759,23 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
let transactionIsCancelling = false
|
||||
let shouldEmit = true
|
||||
try {
|
||||
await transaction.saveCheckpoint()
|
||||
} catch (error) {
|
||||
if (
|
||||
!SkipCancelledExecutionError.isSkipCancelledExecutionError(error) &&
|
||||
!SkipExecutionError.isSkipExecutionError(error)
|
||||
) {
|
||||
throw error
|
||||
}
|
||||
|
||||
transactionIsCancelling =
|
||||
SkipCancelledExecutionError.isSkipCancelledExecutionError(error)
|
||||
|
||||
if (SkipExecutionError.isSkipExecutionError(error)) {
|
||||
shouldEmit = false
|
||||
} else {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
@@ -760,6 +794,7 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
|
||||
return {
|
||||
stopExecution: !shouldEmit,
|
||||
transactionIsCancelling,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -785,15 +820,30 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
return
|
||||
}
|
||||
|
||||
const execution: Promise<void | unknown>[] = []
|
||||
for (const step of nextSteps.next) {
|
||||
const stepsShouldContinueExecution = nextSteps.next.map((step) => {
|
||||
const { shouldContinueExecution } = this.prepareStepForExecution(
|
||||
step,
|
||||
flow
|
||||
)
|
||||
|
||||
// Should stop the execution if next step cant be handled
|
||||
if (!shouldContinueExecution) {
|
||||
return shouldContinueExecution
|
||||
})
|
||||
|
||||
await transaction.saveCheckpoint().catch((error) => {
|
||||
if (SkipExecutionError.isSkipExecutionError(error)) {
|
||||
continueExecution = false
|
||||
return
|
||||
}
|
||||
|
||||
throw error
|
||||
})
|
||||
|
||||
const execution: Promise<void | unknown>[] = []
|
||||
|
||||
let i = 0
|
||||
for (const step of nextSteps.next) {
|
||||
const stepIndex = i++
|
||||
if (!stepsShouldContinueExecution[stepIndex]) {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -813,16 +863,6 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
// Compute current transaction state
|
||||
await this.computeCurrentTransactionState(transaction)
|
||||
|
||||
// Save checkpoint before executing step
|
||||
await transaction.saveCheckpoint().catch((error) => {
|
||||
if (SkipExecutionError.isSkipExecutionError(error)) {
|
||||
continueExecution = false
|
||||
return
|
||||
}
|
||||
|
||||
throw error
|
||||
})
|
||||
|
||||
if (!continueExecution) {
|
||||
break
|
||||
}
|
||||
@@ -1130,6 +1170,10 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
response
|
||||
)
|
||||
|
||||
if (ret.transactionIsCancelling) {
|
||||
return await this.cancelTransaction(transaction)
|
||||
}
|
||||
|
||||
if (isAsync && !ret.stopExecution) {
|
||||
// Schedule to continue the execution of async steps because they are not awaited on purpose and can be handled by another machine
|
||||
await transaction.scheduleRetry(step, 0)
|
||||
@@ -1156,12 +1200,16 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
)
|
||||
}
|
||||
|
||||
await TransactionOrchestrator.setStepFailure(
|
||||
const ret = await TransactionOrchestrator.setStepFailure(
|
||||
transaction,
|
||||
step,
|
||||
error,
|
||||
isPermanent ? 0 : step.definition.maxRetries
|
||||
)
|
||||
|
||||
if (ret.transactionIsCancelling) {
|
||||
return await this.cancelTransaction(transaction)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1245,6 +1293,8 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
flow.state = TransactionState.WAITING_TO_COMPENSATE
|
||||
flow.cancelledAt = Date.now()
|
||||
|
||||
await transaction.saveCheckpoint()
|
||||
|
||||
await this.executeNext(transaction)
|
||||
}
|
||||
|
||||
@@ -1667,12 +1717,17 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
transaction: curTransaction,
|
||||
})
|
||||
|
||||
await TransactionOrchestrator.setStepSuccess(
|
||||
const ret = await TransactionOrchestrator.setStepSuccess(
|
||||
curTransaction,
|
||||
step,
|
||||
response
|
||||
)
|
||||
|
||||
if (ret.transactionIsCancelling) {
|
||||
await this.cancelTransaction(curTransaction)
|
||||
return curTransaction
|
||||
}
|
||||
|
||||
await this.executeNext(curTransaction)
|
||||
} else {
|
||||
throw new MedusaError(
|
||||
@@ -1721,13 +1776,18 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
transaction: curTransaction,
|
||||
})
|
||||
|
||||
await TransactionOrchestrator.setStepFailure(
|
||||
const ret = await TransactionOrchestrator.setStepFailure(
|
||||
curTransaction,
|
||||
step,
|
||||
error,
|
||||
0
|
||||
)
|
||||
|
||||
if (ret.transactionIsCancelling) {
|
||||
await this.cancelTransaction(curTransaction)
|
||||
return curTransaction
|
||||
}
|
||||
|
||||
await this.executeNext(curTransaction)
|
||||
} else {
|
||||
throw new MedusaError(
|
||||
|
||||
@@ -402,10 +402,18 @@ export class LocalWorkflow {
|
||||
this.medusaContext = context
|
||||
const { orchestrator } = this.workflow
|
||||
|
||||
const transaction = isString(transactionOrTransactionId)
|
||||
let transaction = isString(transactionOrTransactionId)
|
||||
? await this.getRunningTransaction(transactionOrTransactionId, context)
|
||||
: transactionOrTransactionId
|
||||
|
||||
// not a distributed transaction instance
|
||||
if (!transaction.getFlow) {
|
||||
transaction = await this.getRunningTransaction(
|
||||
(transaction as any).flow.transactionId,
|
||||
context
|
||||
)
|
||||
}
|
||||
|
||||
if (this.medusaContext) {
|
||||
this.medusaContext.eventGroupId =
|
||||
transaction.getFlow().metadata?.eventGroupId
|
||||
|
||||
Reference in New Issue
Block a user