From 238e7d53c13a1c033886d7c33254919f8b5b22dc Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Fri, 18 Jul 2025 09:20:46 +0200 Subject: [PATCH] fix(wfe): should notify when finished + add state info (#12982) --- .changeset/rude-windows-boil.md | 6 +++ .../src/services/workflow-orchestrator.ts | 25 +++++++---- .../integration-tests/__tests__/index.spec.ts | 1 + .../src/services/workflow-orchestrator.ts | 43 +++++++++++-------- 4 files changed, 47 insertions(+), 28 deletions(-) create mode 100644 .changeset/rude-windows-boil.md diff --git a/.changeset/rude-windows-boil.md b/.changeset/rude-windows-boil.md new file mode 100644 index 0000000000..1ee7a7b8a9 --- /dev/null +++ b/.changeset/rude-windows-boil.md @@ -0,0 +1,6 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +--- + +fix(wfe): should notify when finished + add state info diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index c676afe73e..2e01eb9f3f 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -51,6 +51,7 @@ type NotifyOptions = { eventType: keyof DistributedTransactionEvents workflowId: string transactionId?: string + state?: TransactionState step?: TransactionStep response?: unknown result?: unknown @@ -269,9 +270,6 @@ export class WorkflowOrchestratorService { throw new Error(`Workflow with id "${workflowId}" not found.`) } - const originalOnFinishHandler = events.onFinish! - delete events.onFinish - const transaction = await this.getRunningTransaction( workflowId, transactionId, @@ -307,12 +305,11 @@ export class WorkflowOrchestratorService { const metadata = ret.transaction.getFlow().metadata const { parentStepIdempotencyKey } = metadata ?? {} - const hasFailed = [TransactionState.FAILED].includes( - ret.transaction.getFlow().state - ) + const transactionState = ret.transaction.getFlow().state + const hasFailed = [TransactionState.FAILED].includes(transactionState) const acknowledgement = { - transactionId: context.transactionId, + transactionId: transaction.transactionId, workflowId: workflowId, parentStepIdempotencyKey, hasFinished, @@ -323,8 +320,11 @@ export class WorkflowOrchestratorService { if (hasFinished) { const { result, errors } = ret - await originalOnFinishHandler({ - transaction: ret.transaction, + this.notify({ + eventType: "onFinish", + workflowId, + transactionId: transaction.transactionId, + state: transactionState as TransactionState, result, errors, }) @@ -423,6 +423,7 @@ export class WorkflowOrchestratorService { eventType: "onFinish", workflowId, transactionId, + state: ret.transaction.getFlow().state as TransactionState, result, errors, }) @@ -493,6 +494,7 @@ export class WorkflowOrchestratorService { eventType: "onFinish", workflowId, transactionId, + state: ret.transaction.getFlow().state as TransactionState, result, errors, }) @@ -598,6 +600,7 @@ export class WorkflowOrchestratorService { result, step, response, + state, } = options const subscribers: TransactionSubscribers = @@ -613,6 +616,7 @@ export class WorkflowOrchestratorService { response, result, errors, + state, }) }) } @@ -641,12 +645,14 @@ export class WorkflowOrchestratorService { result, response, errors, + state, }: { eventType: keyof DistributedTransactionEvents step?: TransactionStep response?: unknown result?: unknown errors?: unknown[] + state?: TransactionState }) => { this.notify({ workflowId, @@ -656,6 +662,7 @@ export class WorkflowOrchestratorService { step, result, errors, + state, }) } diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index e74cb52bfb..e0bc17bd1e 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -841,6 +841,7 @@ moduleIntegrationTestRunner({ void workflowOrcModule.subscribe({ workflowId: "wf-when", + transactionId: "trx_123_when", subscriber: (event) => { if (event.eventType === "onFinish") { done() diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index 718125c08d..70025a67ce 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -63,6 +63,7 @@ type NotifyOptions = { response?: unknown result?: unknown errors?: unknown[] + state?: TransactionState } type WorkflowId = string @@ -318,9 +319,6 @@ export class WorkflowOrchestratorService { throw new Error(`Workflow with id "${workflowId}" not found.`) } - const originalOnFinishHandler = events.onFinish! - delete events.onFinish - const transaction = await this.getRunningTransaction( workflowId, transactionId, @@ -352,12 +350,11 @@ export class WorkflowOrchestratorService { const metadata = ret.transaction.getFlow().metadata const { parentStepIdempotencyKey } = metadata ?? {} - const hasFailed = [TransactionState.FAILED].includes( - ret.transaction.getFlow().state - ) + const transactionState = ret.transaction.getFlow().state + const hasFailed = [TransactionState.FAILED].includes(transactionState) const acknowledgement = { - transactionId: context.transactionId, + transactionId: transaction.transactionId, workflowId: workflowId, parentStepIdempotencyKey, hasFinished, @@ -368,8 +365,11 @@ export class WorkflowOrchestratorService { if (hasFinished) { const { result, errors } = ret - await originalOnFinishHandler({ - transaction: ret.transaction, + this.notify({ + eventType: "onFinish", + workflowId, + transactionId: transaction.transactionId, + state: transactionState as TransactionState, result, errors, }) @@ -449,9 +449,6 @@ export class WorkflowOrchestratorService { workflowId, }) - const originalOnFinishHandler = events.onFinish! - delete events.onFinish - const ret = await exportedWorkflow.registerStepSuccess({ idempotencyKey: idempotencyKey_, context, @@ -466,8 +463,11 @@ export class WorkflowOrchestratorService { if (ret.transaction.hasFinished()) { const { result, errors } = ret - await originalOnFinishHandler({ - transaction: ret.transaction, + this.notify({ + eventType: "onFinish", + workflowId, + transactionId, + state: ret.transaction.getFlow().state as TransactionState, result, errors, }) @@ -520,9 +520,6 @@ export class WorkflowOrchestratorService { workflowId, }) - const originalOnFinishHandler = events.onFinish! - delete events.onFinish - const ret = await exportedWorkflow.registerStepFailure({ idempotencyKey: idempotencyKey_, context, @@ -537,8 +534,11 @@ export class WorkflowOrchestratorService { if (ret.transaction.hasFinished()) { const { result, errors } = ret - await originalOnFinishHandler({ - transaction: ret.transaction, + this.notify({ + eventType: "onFinish", + workflowId, + transactionId, + state: ret.transaction.getFlow().state as TransactionState, result, errors, }) @@ -677,6 +677,7 @@ export class WorkflowOrchestratorService { result, step, response, + state, } = options const subscribers: TransactionSubscribers = @@ -692,6 +693,7 @@ export class WorkflowOrchestratorService { response, result, errors, + state, } const isPromise = "then" in handler if (isPromise) { @@ -737,12 +739,14 @@ export class WorkflowOrchestratorService { result, response, errors, + state, }: { eventType: keyof DistributedTransactionEvents step?: TransactionStep response?: unknown result?: unknown errors?: unknown[] + state?: TransactionState }) => { await this.notify({ workflowId, @@ -752,6 +756,7 @@ export class WorkflowOrchestratorService { step, result, errors, + state, }) }