fix(wfe): should notify when finished + add state info (#12982)

This commit is contained in:
Adrien de Peretti
2025-07-18 09:20:46 +02:00
committed by GitHub
parent 89a57edb30
commit 238e7d53c1
4 changed files with 47 additions and 28 deletions

View File

@@ -51,6 +51,7 @@ type NotifyOptions = {
eventType: keyof DistributedTransactionEvents
workflowId: string
transactionId?: string
state?: TransactionState
step?: TransactionStep
response?: unknown
result?: unknown
@@ -269,9 +270,6 @@ export class WorkflowOrchestratorService {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const originalOnFinishHandler = events.onFinish!
delete events.onFinish
const transaction = await this.getRunningTransaction(
workflowId,
transactionId,
@@ -307,12 +305,11 @@ export class WorkflowOrchestratorService {
const metadata = ret.transaction.getFlow().metadata
const { parentStepIdempotencyKey } = metadata ?? {}
const hasFailed = [TransactionState.FAILED].includes(
ret.transaction.getFlow().state
)
const transactionState = ret.transaction.getFlow().state
const hasFailed = [TransactionState.FAILED].includes(transactionState)
const acknowledgement = {
transactionId: context.transactionId,
transactionId: transaction.transactionId,
workflowId: workflowId,
parentStepIdempotencyKey,
hasFinished,
@@ -323,8 +320,11 @@ export class WorkflowOrchestratorService {
if (hasFinished) {
const { result, errors } = ret
await originalOnFinishHandler({
transaction: ret.transaction,
this.notify({
eventType: "onFinish",
workflowId,
transactionId: transaction.transactionId,
state: transactionState as TransactionState,
result,
errors,
})
@@ -423,6 +423,7 @@ export class WorkflowOrchestratorService {
eventType: "onFinish",
workflowId,
transactionId,
state: ret.transaction.getFlow().state as TransactionState,
result,
errors,
})
@@ -493,6 +494,7 @@ export class WorkflowOrchestratorService {
eventType: "onFinish",
workflowId,
transactionId,
state: ret.transaction.getFlow().state as TransactionState,
result,
errors,
})
@@ -598,6 +600,7 @@ export class WorkflowOrchestratorService {
result,
step,
response,
state,
} = options
const subscribers: TransactionSubscribers =
@@ -613,6 +616,7 @@ export class WorkflowOrchestratorService {
response,
result,
errors,
state,
})
})
}
@@ -641,12 +645,14 @@ export class WorkflowOrchestratorService {
result,
response,
errors,
state,
}: {
eventType: keyof DistributedTransactionEvents
step?: TransactionStep
response?: unknown
result?: unknown
errors?: unknown[]
state?: TransactionState
}) => {
this.notify({
workflowId,
@@ -656,6 +662,7 @@ export class WorkflowOrchestratorService {
step,
result,
errors,
state,
})
}

View File

@@ -841,6 +841,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
void workflowOrcModule.subscribe({
workflowId: "wf-when",
transactionId: "trx_123_when",
subscriber: (event) => {
if (event.eventType === "onFinish") {
done()

View File

@@ -63,6 +63,7 @@ type NotifyOptions = {
response?: unknown
result?: unknown
errors?: unknown[]
state?: TransactionState
}
type WorkflowId = string
@@ -318,9 +319,6 @@ export class WorkflowOrchestratorService {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const originalOnFinishHandler = events.onFinish!
delete events.onFinish
const transaction = await this.getRunningTransaction(
workflowId,
transactionId,
@@ -352,12 +350,11 @@ export class WorkflowOrchestratorService {
const metadata = ret.transaction.getFlow().metadata
const { parentStepIdempotencyKey } = metadata ?? {}
const hasFailed = [TransactionState.FAILED].includes(
ret.transaction.getFlow().state
)
const transactionState = ret.transaction.getFlow().state
const hasFailed = [TransactionState.FAILED].includes(transactionState)
const acknowledgement = {
transactionId: context.transactionId,
transactionId: transaction.transactionId,
workflowId: workflowId,
parentStepIdempotencyKey,
hasFinished,
@@ -368,8 +365,11 @@ export class WorkflowOrchestratorService {
if (hasFinished) {
const { result, errors } = ret
await originalOnFinishHandler({
transaction: ret.transaction,
this.notify({
eventType: "onFinish",
workflowId,
transactionId: transaction.transactionId,
state: transactionState as TransactionState,
result,
errors,
})
@@ -449,9 +449,6 @@ export class WorkflowOrchestratorService {
workflowId,
})
const originalOnFinishHandler = events.onFinish!
delete events.onFinish
const ret = await exportedWorkflow.registerStepSuccess({
idempotencyKey: idempotencyKey_,
context,
@@ -466,8 +463,11 @@ export class WorkflowOrchestratorService {
if (ret.transaction.hasFinished()) {
const { result, errors } = ret
await originalOnFinishHandler({
transaction: ret.transaction,
this.notify({
eventType: "onFinish",
workflowId,
transactionId,
state: ret.transaction.getFlow().state as TransactionState,
result,
errors,
})
@@ -520,9 +520,6 @@ export class WorkflowOrchestratorService {
workflowId,
})
const originalOnFinishHandler = events.onFinish!
delete events.onFinish
const ret = await exportedWorkflow.registerStepFailure({
idempotencyKey: idempotencyKey_,
context,
@@ -537,8 +534,11 @@ export class WorkflowOrchestratorService {
if (ret.transaction.hasFinished()) {
const { result, errors } = ret
await originalOnFinishHandler({
transaction: ret.transaction,
this.notify({
eventType: "onFinish",
workflowId,
transactionId,
state: ret.transaction.getFlow().state as TransactionState,
result,
errors,
})
@@ -677,6 +677,7 @@ export class WorkflowOrchestratorService {
result,
step,
response,
state,
} = options
const subscribers: TransactionSubscribers =
@@ -692,6 +693,7 @@ export class WorkflowOrchestratorService {
response,
result,
errors,
state,
}
const isPromise = "then" in handler
if (isPromise) {
@@ -737,12 +739,14 @@ export class WorkflowOrchestratorService {
result,
response,
errors,
state,
}: {
eventType: keyof DistributedTransactionEvents
step?: TransactionStep
response?: unknown
result?: unknown
errors?: unknown[]
state?: TransactionState
}) => {
await this.notify({
workflowId,
@@ -752,6 +756,7 @@ export class WorkflowOrchestratorService {
step,
result,
errors,
state,
})
}