chore(workflow-engine-*): Align event subscribers management (#12976)

* chore(workflow-engine-*): Align event subscribers management

* Create nervous-eels-build.md
This commit is contained in:
Adrien de Peretti
2025-07-16 16:34:47 +02:00
committed by GitHub
parent aac2feb974
commit eb83954f23
3 changed files with 62 additions and 27 deletions

View File

@@ -0,0 +1,6 @@
---
"@medusajs/workflow-engine-inmemory": patch
"@medusajs/workflow-engine-redis": patch
---
chore(workflow-engine-*): Align event subscribers management

View File

@@ -521,14 +521,16 @@ export class WorkflowOrchestratorService {
const subscribers = this.subscribers.get(workflowId) ?? new Map()
const handlerIndex = (handlers) => {
return handlers.indexOf((s) => s === subscriber || s._id === subscriberId)
return handlers.findIndex(
(s) => s === subscriber || s._id === subscriberId
)
}
if (transactionId) {
const transactionSubscribers = subscribers.get(transactionId) ?? []
const subscriberIndex = handlerIndex(transactionSubscribers)
if (subscriberIndex !== -1) {
transactionSubscribers.slice(subscriberIndex, 1)
transactionSubscribers.splice(subscriberIndex, 1)
}
transactionSubscribers.push(subscriber)
@@ -540,7 +542,7 @@ export class WorkflowOrchestratorService {
const workflowSubscribers = subscribers.get(AnySubscriber) ?? []
const subscriberIndex = handlerIndex(workflowSubscribers)
if (subscriberIndex !== -1) {
workflowSubscribers.slice(subscriberIndex, 1)
workflowSubscribers.splice(subscriberIndex, 1)
}
workflowSubscribers.push(subscriber)
@@ -568,14 +570,22 @@ export class WorkflowOrchestratorService {
const newTransactionSubscribers = filterSubscribers(
transactionSubscribers
)
subscribers.set(transactionId, newTransactionSubscribers)
if (newTransactionSubscribers.length) {
subscribers.set(transactionId, newTransactionSubscribers)
} else {
subscribers.delete(transactionId)
}
this.subscribers.set(workflowId, subscribers)
return
}
const workflowSubscribers = subscribers.get(AnySubscriber) ?? []
const newWorkflowSubscribers = filterSubscribers(workflowSubscribers)
subscribers.set(AnySubscriber, newWorkflowSubscribers)
if (newWorkflowSubscribers.length) {
subscribers.set(AnySubscriber, newWorkflowSubscribers)
} else {
subscribers.delete(AnySubscriber)
}
this.subscribers.set(workflowId, subscribers)
}
@@ -610,6 +620,10 @@ export class WorkflowOrchestratorService {
if (transactionId) {
const transactionSubscribers = subscribers.get(transactionId) ?? []
notifySubscribers(transactionSubscribers)
if (options.eventType === "onFinish") {
subscribers.delete(transactionId)
}
}
const workflowSubscribers = subscribers.get(AnySubscriber) ?? []
@@ -664,7 +678,6 @@ export class WorkflowOrchestratorService {
notify({ eventType: "onCompensateBegin" })
},
onFinish: ({ transaction, result, errors }) => {
// TODO: unsubscribe transaction handlers on finish
customEventHandlers?.onFinish?.({ transaction, result, errors })
},

View File

@@ -572,14 +572,16 @@ export class WorkflowOrchestratorService {
}
const handlerIndex = (handlers) => {
return handlers.indexOf((s) => s === subscriber || s._id === subscriberId)
return handlers.findIndex(
(s) => s === subscriber || s._id === subscriberId
)
}
if (transactionId) {
const transactionSubscribers = subscribers.get(transactionId) ?? []
const subscriberIndex = handlerIndex(transactionSubscribers)
if (subscriberIndex !== -1) {
transactionSubscribers.slice(subscriberIndex, 1)
transactionSubscribers.splice(subscriberIndex, 1)
}
transactionSubscribers.push(subscriber)
@@ -591,7 +593,7 @@ export class WorkflowOrchestratorService {
const workflowSubscribers = subscribers.get(AnySubscriber) ?? []
const subscriberIndex = handlerIndex(workflowSubscribers)
if (subscriberIndex !== -1) {
workflowSubscribers.slice(subscriberIndex, 1)
workflowSubscribers.splice(subscriberIndex, 1)
}
workflowSubscribers.push(subscriber)
@@ -604,7 +606,10 @@ export class WorkflowOrchestratorService {
transactionId,
subscriberOrId,
}: UnsubscribeOptions) {
const subscribers = this.subscribers.get(workflowId) ?? new Map()
const subscribers = this.subscribers.get(workflowId)
if (!subscribers) {
return
}
const filterSubscribers = (handlers: SubscriberHandler[]) => {
return handlers.filter((handler) => {
@@ -614,25 +619,36 @@ export class WorkflowOrchestratorService {
})
}
// Unsubscribe instance
if (!this.subscribers.has(workflowId)) {
if (transactionId) {
const transactionSubscribers = subscribers.get(transactionId)
if (transactionSubscribers) {
const newTransactionSubscribers = filterSubscribers(
transactionSubscribers
)
if (newTransactionSubscribers.length) {
subscribers.set(transactionId, newTransactionSubscribers)
} else {
subscribers.delete(transactionId)
}
}
} else {
const workflowSubscribers = subscribers.get(AnySubscriber)
if (workflowSubscribers) {
const newWorkflowSubscribers = filterSubscribers(workflowSubscribers)
if (newWorkflowSubscribers.length) {
subscribers.set(AnySubscriber, newWorkflowSubscribers)
} else {
subscribers.delete(AnySubscriber)
}
}
}
if (subscribers.size === 0) {
this.subscribers.delete(workflowId)
void this.redisSubscriber.unsubscribe(this.getChannelName(workflowId))
}
if (transactionId) {
const transactionSubscribers = subscribers.get(transactionId) ?? []
const newTransactionSubscribers = filterSubscribers(
transactionSubscribers
)
subscribers.set(transactionId, newTransactionSubscribers)
this.subscribers.set(workflowId, subscribers)
return
}
const workflowSubscribers = subscribers.get(AnySubscriber) ?? []
const newWorkflowSubscribers = filterSubscribers(workflowSubscribers)
subscribers.set(AnySubscriber, newWorkflowSubscribers)
this.subscribers.set(workflowId, subscribers)
}
private async notify(