From 2b89510df3b87c153beae5207b1f9184002129d7 Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Thu, 4 Sep 2025 09:12:09 +0200 Subject: [PATCH] chore(): only execute race execution checks and publish message only for async workflows (#13396) * chore(): only execute race execution checks for async workflows * chore(): workflow redis publish only for async flows * Create cyan-gorillas-poke.md * chore(): workflow redis publish only for async flows * fix negative check --------- Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com> --- .changeset/cyan-gorillas-poke.md | 6 + .../integration-tests/__tests__/index.spec.ts | 34 ++++-- .../integration-tests/__tests__/race.spec.ts | 14 ++- .../utils/workflow-orchestrator-storage.ts | 12 +- .../integration-tests/__tests__/race.spec.ts | 14 ++- .../src/services/workflow-orchestrator.ts | 104 +++++++++++++----- .../utils/workflow-orchestrator-storage.ts | 12 +- 7 files changed, 144 insertions(+), 52 deletions(-) create mode 100644 .changeset/cyan-gorillas-poke.md diff --git a/.changeset/cyan-gorillas-poke.md b/.changeset/cyan-gorillas-poke.md new file mode 100644 index 0000000000..732ce17745 --- /dev/null +++ b/.changeset/cyan-gorillas-poke.md @@ -0,0 +1,6 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +--- + +chore(): only execute race execution checks for async workflows diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index fecd170ea5..d85bc7f072 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -47,11 +47,11 @@ import { createScheduled } from "../__fixtures__/workflow_scheduled" jest.setTimeout(60000) -const failTrap = (done) => { +const failTrap = (done, name) => { setTimeoutSync(() => { // REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending console.warn( - `Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually.` + `Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually. ${name}` ) done() }, 5000) @@ -200,7 +200,10 @@ moduleIntegrationTestRunner({ }) }) - failTrap(done) + failTrap( + done, + "should cancel an ongoing execution with async unfinished yet step" + ) }) it("should cancel a complete execution with a sync workflow running as async", async () => { @@ -537,7 +540,10 @@ moduleIntegrationTestRunner({ }) }) - failTrap(done) + failTrap( + done, + "should subscribe to a async workflow and receive the response when it finishes" + ) }) describe("Testing basic workflow", function () { @@ -744,7 +750,10 @@ moduleIntegrationTestRunner({ }) expect(onFinish).toHaveBeenCalledTimes(0) - failTrap(done) + failTrap( + done, + "should subscribe to a async workflow and receive the response when it finishes" + ) }) it("should cancel and revert a completed workflow", async () => { @@ -817,7 +826,10 @@ moduleIntegrationTestRunner({ throwOnError: true, }) - failTrap(done) + failTrap( + done, + "should not run conditional steps if condition is false" + ) }) it("should not run conditional steps if condition is false", (done) => { @@ -839,7 +851,10 @@ moduleIntegrationTestRunner({ throwOnError: true, }) - failTrap(done) + failTrap( + done, + "should not run conditional steps if condition is false" + ) }) }) @@ -988,7 +1003,10 @@ moduleIntegrationTestRunner({ }, }) - failTrap(done) + failTrap( + done, + "should display error when multple async steps are running in parallel" + ) }) }) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/race.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/race.spec.ts index 359015ea31..1dc8308483 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/race.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/race.spec.ts @@ -14,11 +14,11 @@ import "../__fixtures__" jest.setTimeout(300000) -const failTrap = (done) => { +const failTrap = (done, name) => { setTimeoutSync(() => { // REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending console.warn( - "Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually." + `Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually. ${name}` ) done() }, 5000) @@ -99,7 +99,10 @@ moduleIntegrationTestRunner({ }) .catch((e) => e) - failTrap(done) + failTrap( + done, + "should prevent race continuation of the workflow during retryIntervalAwaiting in background execution" + ) }) it("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => { @@ -190,7 +193,10 @@ moduleIntegrationTestRunner({ }) .catch((e) => e) - failTrap(done) + failTrap( + done, + "should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution" + ) }) }) }, diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 09b48c83af..19c8a63767 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -285,11 +285,13 @@ export class InMemoryDistributedTransactionStorage const { retentionTime } = options ?? {} - await this.#preventRaceConditionExecutionIfNecessary({ - data, - key, - options, - }) + if (data.flow.hasAsyncSteps) { + await this.#preventRaceConditionExecutionIfNecessary({ + data, + key, + options, + }) + } // Only store retention time if it's provided if (retentionTime) { diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts index dc8462e2da..3e75c500e8 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts @@ -15,11 +15,11 @@ import "../__fixtures__" jest.setTimeout(300000) -const failTrap = (done) => { +const failTrap = (done, name) => { setTimeoutSync(() => { // REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending console.warn( - "Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually." + `Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually. ${name}` ) done() }, 5000) @@ -118,7 +118,10 @@ moduleIntegrationTestRunner({ expect(result).toBe("result from step 0") }) - failTrap(done) + failTrap( + done, + "should prevent race continuation of the workflow during retryIntervalAwaiting in background execution" + ) }) it("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => { @@ -214,7 +217,10 @@ moduleIntegrationTestRunner({ expect(result).toBe("result from step 0") }) - failTrap(done) + failTrap( + done, + "should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution" + ) }) }) }, diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index 2d71276cb0..04520fcfe9 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -64,6 +64,7 @@ type IdempotencyKeyParts = { type NotifyOptions = { eventType: keyof DistributedTransactionEvents + isFlowAsync: boolean workflowId: string transactionId?: string step?: TransactionStep @@ -315,12 +316,6 @@ export class WorkflowOrchestratorService { throw new Error("Transaction ID is required") } - const events: FlowRunOptions["events"] = this.buildWorkflowEvents({ - customEventHandlers: eventHandlers, - workflowId, - transactionId: transactionId, - }) - const exportedWorkflow = MedusaWorkflow.getWorkflow(workflowId) if (!exportedWorkflow) { throw new Error(`Workflow with id "${workflowId}" not found.`) @@ -344,6 +339,12 @@ export class WorkflowOrchestratorService { throw new Error("Transaction not found") } + const events: FlowRunOptions["events"] = this.buildWorkflowEvents({ + customEventHandlers: eventHandlers, + workflowId, + transactionId: transactionId, + }) + const ret = await exportedWorkflow.cancel({ transaction, throwOnError: false, @@ -373,6 +374,7 @@ export class WorkflowOrchestratorService { const { result, errors } = ret this.notify({ + isFlowAsync: ret.transaction.getFlow().hasAsyncSteps, eventType: "onFinish", workflowId, transactionId: transaction.transactionId, @@ -471,6 +473,7 @@ export class WorkflowOrchestratorService { const { result, errors } = ret this.notify({ + isFlowAsync: ret.transaction.getFlow().hasAsyncSteps, eventType: "onFinish", workflowId, transactionId, @@ -544,6 +547,7 @@ export class WorkflowOrchestratorService { const { result, errors } = ret this.notify({ + isFlowAsync: ret.transaction.getFlow().hasAsyncSteps, eventType: "onFinish", workflowId, transactionId, @@ -669,16 +673,8 @@ export class WorkflowOrchestratorService { return } - if (publish) { - const channel = this.getChannelName(options.workflowId) - const message = JSON.stringify({ - instanceId: this.instanceId, - data: options, - }) - await this.redisPublisher.publish(channel, message) - } - const { + isFlowAsync, eventType, workflowId, transactionId, @@ -689,6 +685,15 @@ export class WorkflowOrchestratorService { state, } = options + if (publish && isFlowAsync) { + const channel = this.getChannelName(options.workflowId) + const message = JSON.stringify({ + instanceId: this.instanceId, + data: options, + }) + await this.redisPublisher.publish(channel, message) + } + const subscribers: TransactionSubscribers = this.subscribers.get(workflowId) ?? new Map() @@ -698,6 +703,7 @@ export class WorkflowOrchestratorService { eventType, workflowId, transactionId, + isFlowAsync, step, response, result, @@ -743,6 +749,7 @@ export class WorkflowOrchestratorService { transactionId, }): DistributedTransactionEvents { const notify = async ({ + isFlowAsync, eventType, step, result, @@ -750,6 +757,7 @@ export class WorkflowOrchestratorService { errors, state, }: { + isFlowAsync: boolean eventType: keyof DistributedTransactionEvents step?: TransactionStep response?: unknown @@ -758,6 +766,7 @@ export class WorkflowOrchestratorService { state?: TransactionState }) => { await this.notify({ + isFlowAsync, workflowId, transactionId, eventType, @@ -772,31 +781,50 @@ export class WorkflowOrchestratorService { return { onTimeout: async ({ transaction }) => { customEventHandlers?.onTimeout?.({ transaction }) - await notify({ eventType: "onTimeout" }) + await notify({ + eventType: "onTimeout", + isFlowAsync: transaction.getFlow().hasAsyncSteps, + }) }, onBegin: async ({ transaction }) => { customEventHandlers?.onBegin?.({ transaction }) - await notify({ eventType: "onBegin" }) + await notify({ + eventType: "onBegin", + isFlowAsync: transaction.getFlow().hasAsyncSteps, + }) }, onResume: async ({ transaction }) => { customEventHandlers?.onResume?.({ transaction }) - await notify({ eventType: "onResume" }) + await notify({ + eventType: "onResume", + isFlowAsync: transaction.getFlow().hasAsyncSteps, + }) }, onCompensateBegin: async ({ transaction }) => { customEventHandlers?.onCompensateBegin?.({ transaction }) - await notify({ eventType: "onCompensateBegin" }) + await notify({ + eventType: "onCompensateBegin", + isFlowAsync: transaction.getFlow().hasAsyncSteps, + }) }, onFinish: async ({ transaction, result, errors }) => { customEventHandlers?.onFinish?.({ transaction, result, errors }) - await notify({ eventType: "onFinish" }) + await notify({ + eventType: "onFinish", + isFlowAsync: transaction.getFlow().hasAsyncSteps, + }) }, onStepBegin: async ({ step, transaction }) => { customEventHandlers?.onStepBegin?.({ step, transaction }) this.activeStepsCount++ - await notify({ eventType: "onStepBegin", step }) + await notify({ + eventType: "onStepBegin", + step, + isFlowAsync: transaction.getFlow().hasAsyncSteps, + }) }, onStepSuccess: async ({ step, transaction }) => { const stepName = step.definition.action! @@ -805,7 +833,12 @@ export class WorkflowOrchestratorService { transaction ) customEventHandlers?.onStepSuccess?.({ step, transaction, response }) - await notify({ eventType: "onStepSuccess", step, response }) + await notify({ + eventType: "onStepSuccess", + step, + response, + isFlowAsync: transaction.getFlow().hasAsyncSteps, + }) this.activeStepsCount-- }, @@ -816,14 +849,23 @@ export class WorkflowOrchestratorService { .filter((err) => err.action === stepName) customEventHandlers?.onStepFailure?.({ step, transaction, errors }) - await notify({ eventType: "onStepFailure", step, errors }) + await notify({ + eventType: "onStepFailure", + step, + errors, + isFlowAsync: transaction.getFlow().hasAsyncSteps, + }) this.activeStepsCount-- }, onStepAwaiting: async ({ step, transaction }) => { customEventHandlers?.onStepAwaiting?.({ step, transaction }) - await notify({ eventType: "onStepAwaiting", step }) + await notify({ + eventType: "onStepAwaiting", + step, + isFlowAsync: transaction.getFlow().hasAsyncSteps, + }) this.activeStepsCount-- }, @@ -837,7 +879,12 @@ export class WorkflowOrchestratorService { response, }) - await notify({ eventType: "onCompensateStepSuccess", step, response }) + await notify({ + eventType: "onCompensateStepSuccess", + step, + response, + isFlowAsync: transaction.getFlow().hasAsyncSteps, + }) }, onCompensateStepFailure: async ({ step, transaction }) => { const stepName = step.definition.action! @@ -847,7 +894,12 @@ export class WorkflowOrchestratorService { customEventHandlers?.onStepFailure?.({ step, transaction, errors }) - await notify({ eventType: "onCompensateStepFailure", step, errors }) + await notify({ + eventType: "onCompensateStepFailure", + step, + errors, + isFlowAsync: transaction.getFlow().hasAsyncSteps, + }) }, } } diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index f25a4bc1cf..8789aa174f 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -413,11 +413,13 @@ export class RedisDistributedTransactionStorage const { retentionTime } = options ?? {} - await this.#preventRaceConditionExecutionIfNecessary({ - data, - key, - options, - }) + if (data.flow.hasAsyncSteps) { + await this.#preventRaceConditionExecutionIfNecessary({ + data, + key, + options, + }) + } if (hasFinished && retentionTime) { Object.assign(data, {