From eb83954f23077c0714125b6f2f19fd0ef0f288f9 Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Wed, 16 Jul 2025 16:34:47 +0200 Subject: [PATCH] chore(workflow-engine-*): Align event subscribers management (#12976) * chore(workflow-engine-*): Align event subscribers management * Create nervous-eels-build.md --- .changeset/nervous-eels-build.md | 6 ++ .../src/services/workflow-orchestrator.ts | 25 ++++++-- .../src/services/workflow-orchestrator.ts | 58 ++++++++++++------- 3 files changed, 62 insertions(+), 27 deletions(-) create mode 100644 .changeset/nervous-eels-build.md diff --git a/.changeset/nervous-eels-build.md b/.changeset/nervous-eels-build.md new file mode 100644 index 0000000000..7fe9a1e6e5 --- /dev/null +++ b/.changeset/nervous-eels-build.md @@ -0,0 +1,6 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +--- + +chore(workflow-engine-*): Align event subscribers management diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index b8455cad5c..c676afe73e 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -521,14 +521,16 @@ export class WorkflowOrchestratorService { const subscribers = this.subscribers.get(workflowId) ?? new Map() const handlerIndex = (handlers) => { - return handlers.indexOf((s) => s === subscriber || s._id === subscriberId) + return handlers.findIndex( + (s) => s === subscriber || s._id === subscriberId + ) } if (transactionId) { const transactionSubscribers = subscribers.get(transactionId) ?? [] const subscriberIndex = handlerIndex(transactionSubscribers) if (subscriberIndex !== -1) { - transactionSubscribers.slice(subscriberIndex, 1) + transactionSubscribers.splice(subscriberIndex, 1) } transactionSubscribers.push(subscriber) @@ -540,7 +542,7 @@ export class WorkflowOrchestratorService { const workflowSubscribers = subscribers.get(AnySubscriber) ?? [] const subscriberIndex = handlerIndex(workflowSubscribers) if (subscriberIndex !== -1) { - workflowSubscribers.slice(subscriberIndex, 1) + workflowSubscribers.splice(subscriberIndex, 1) } workflowSubscribers.push(subscriber) @@ -568,14 +570,22 @@ export class WorkflowOrchestratorService { const newTransactionSubscribers = filterSubscribers( transactionSubscribers ) - subscribers.set(transactionId, newTransactionSubscribers) + if (newTransactionSubscribers.length) { + subscribers.set(transactionId, newTransactionSubscribers) + } else { + subscribers.delete(transactionId) + } this.subscribers.set(workflowId, subscribers) return } const workflowSubscribers = subscribers.get(AnySubscriber) ?? [] const newWorkflowSubscribers = filterSubscribers(workflowSubscribers) - subscribers.set(AnySubscriber, newWorkflowSubscribers) + if (newWorkflowSubscribers.length) { + subscribers.set(AnySubscriber, newWorkflowSubscribers) + } else { + subscribers.delete(AnySubscriber) + } this.subscribers.set(workflowId, subscribers) } @@ -610,6 +620,10 @@ export class WorkflowOrchestratorService { if (transactionId) { const transactionSubscribers = subscribers.get(transactionId) ?? [] notifySubscribers(transactionSubscribers) + + if (options.eventType === "onFinish") { + subscribers.delete(transactionId) + } } const workflowSubscribers = subscribers.get(AnySubscriber) ?? [] @@ -664,7 +678,6 @@ export class WorkflowOrchestratorService { notify({ eventType: "onCompensateBegin" }) }, onFinish: ({ transaction, result, errors }) => { - // TODO: unsubscribe transaction handlers on finish customEventHandlers?.onFinish?.({ transaction, result, errors }) }, diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index f4b9c5b588..718125c08d 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -572,14 +572,16 @@ export class WorkflowOrchestratorService { } const handlerIndex = (handlers) => { - return handlers.indexOf((s) => s === subscriber || s._id === subscriberId) + return handlers.findIndex( + (s) => s === subscriber || s._id === subscriberId + ) } if (transactionId) { const transactionSubscribers = subscribers.get(transactionId) ?? [] const subscriberIndex = handlerIndex(transactionSubscribers) if (subscriberIndex !== -1) { - transactionSubscribers.slice(subscriberIndex, 1) + transactionSubscribers.splice(subscriberIndex, 1) } transactionSubscribers.push(subscriber) @@ -591,7 +593,7 @@ export class WorkflowOrchestratorService { const workflowSubscribers = subscribers.get(AnySubscriber) ?? [] const subscriberIndex = handlerIndex(workflowSubscribers) if (subscriberIndex !== -1) { - workflowSubscribers.slice(subscriberIndex, 1) + workflowSubscribers.splice(subscriberIndex, 1) } workflowSubscribers.push(subscriber) @@ -604,7 +606,10 @@ export class WorkflowOrchestratorService { transactionId, subscriberOrId, }: UnsubscribeOptions) { - const subscribers = this.subscribers.get(workflowId) ?? new Map() + const subscribers = this.subscribers.get(workflowId) + if (!subscribers) { + return + } const filterSubscribers = (handlers: SubscriberHandler[]) => { return handlers.filter((handler) => { @@ -614,25 +619,36 @@ export class WorkflowOrchestratorService { }) } - // Unsubscribe instance - if (!this.subscribers.has(workflowId)) { + if (transactionId) { + const transactionSubscribers = subscribers.get(transactionId) + if (transactionSubscribers) { + const newTransactionSubscribers = filterSubscribers( + transactionSubscribers + ) + + if (newTransactionSubscribers.length) { + subscribers.set(transactionId, newTransactionSubscribers) + } else { + subscribers.delete(transactionId) + } + } + } else { + const workflowSubscribers = subscribers.get(AnySubscriber) + if (workflowSubscribers) { + const newWorkflowSubscribers = filterSubscribers(workflowSubscribers) + + if (newWorkflowSubscribers.length) { + subscribers.set(AnySubscriber, newWorkflowSubscribers) + } else { + subscribers.delete(AnySubscriber) + } + } + } + + if (subscribers.size === 0) { + this.subscribers.delete(workflowId) void this.redisSubscriber.unsubscribe(this.getChannelName(workflowId)) } - - if (transactionId) { - const transactionSubscribers = subscribers.get(transactionId) ?? [] - const newTransactionSubscribers = filterSubscribers( - transactionSubscribers - ) - subscribers.set(transactionId, newTransactionSubscribers) - this.subscribers.set(workflowId, subscribers) - return - } - - const workflowSubscribers = subscribers.get(AnySubscriber) ?? [] - const newWorkflowSubscribers = filterSubscribers(workflowSubscribers) - subscribers.set(AnySubscriber, newWorkflowSubscribers) - this.subscribers.set(workflowId, subscribers) } private async notify(