chore(): Workflow engine timers and notification improvements (#13434)

RESOLVES CORE-1177

**What**
main changes are:
- not blocking execution when notifying
- timers management
- race condition checks improvements
This commit is contained in:
Adrien de Peretti
2025-09-08 20:19:55 +02:00
committed by GitHub
parent b776fd55dc
commit fc4d5f0ac9
14 changed files with 464 additions and 213 deletions

View File

@@ -0,0 +1,7 @@
---
"@medusajs/workflow-engine-inmemory": patch
"@medusajs/workflow-engine-redis": patch
"@medusajs/orchestration": patch
---
chore(): Workflow engine timers and notification improvements

View File

@@ -566,7 +566,11 @@ export class TransactionOrchestrator extends EventEmitter {
cleaningUp.push(transaction.clearStepTimeout(step)) cleaningUp.push(transaction.clearStepTimeout(step))
} }
await promiseAll(cleaningUp) if (cleaningUp.length) {
setImmediate(async () => {
await promiseAll(cleaningUp)
})
}
if (shouldEmit) { if (shouldEmit) {
const eventName = step.isCompensating() const eventName = step.isCompensating()
@@ -645,7 +649,11 @@ export class TransactionOrchestrator extends EventEmitter {
cleaningUp.push(transaction.clearStepTimeout(step)) cleaningUp.push(transaction.clearStepTimeout(step))
} }
await promiseAll(cleaningUp) if (cleaningUp.length) {
setImmediate(async () => {
await promiseAll(cleaningUp)
})
}
if (shouldEmit) { if (shouldEmit) {
const eventName = DistributedTransactionEvent.STEP_SKIPPED const eventName = DistributedTransactionEvent.STEP_SKIPPED
@@ -847,7 +855,11 @@ export class TransactionOrchestrator extends EventEmitter {
cleaningUp.push(transaction.clearRetry(step)) cleaningUp.push(transaction.clearRetry(step))
} }
await promiseAll(cleaningUp) if (cleaningUp.length) {
setImmediate(async () => {
await promiseAll(cleaningUp)
})
}
if (!result.stopExecution) { if (!result.stopExecution) {
const eventName = step.isCompensating() const eventName = step.isCompensating()

View File

@@ -186,6 +186,42 @@
"unique": false, "unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_run_id\" ON \"workflow_execution\" (run_id) WHERE deleted_at IS NULL" "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_run_id\" ON \"workflow_execution\" (run_id) WHERE deleted_at IS NULL"
}, },
{
"keyName": "IDX_workflow_execution_workflow_id_transaction_id",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id\" ON \"workflow_execution\" (workflow_id, transaction_id) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_workflow_execution_state_updated_at",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state_updated_at\" ON \"workflow_execution\" (state, updated_at) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_workflow_execution_retention_time_updated_at_state",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_retention_time_updated_at_state\" ON \"workflow_execution\" (retention_time, updated_at, state) WHERE deleted_at IS NULL AND retention_time IS NOT NULL"
},
{
"keyName": "IDX_workflow_execution_updated_at_retention_time",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_updated_at_retention_time\" ON \"workflow_execution\" (updated_at, retention_time) WHERE deleted_at IS NULL AND retention_time IS NOT NULL AND state IN ('done', 'failed', 'reverted')"
},
{ {
"keyName": "workflow_execution_pkey", "keyName": "workflow_execution_pkey",
"columnNames": [ "columnNames": [

View File

@@ -0,0 +1,19 @@
import { Migration } from '@mikro-orm/migrations';
export class Migration20250908080305 extends Migration {
override async up(): Promise<void> {
this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id_transaction_id" ON "workflow_execution" (workflow_id, transaction_id) WHERE deleted_at IS NULL;`);
this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_state_updated_at" ON "workflow_execution" (state, updated_at) WHERE deleted_at IS NULL;`);
this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_retention_time_updated_at_state" ON "workflow_execution" (retention_time, updated_at, state) WHERE deleted_at IS NULL AND retention_time IS NOT NULL;`);
this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_updated_at_retention_time" ON "workflow_execution" (updated_at, retention_time) WHERE deleted_at IS NULL AND retention_time IS NOT NULL AND state IN ('done', 'failed', 'reverted');`);
}
override async down(): Promise<void> {
this.addSql(`drop index if exists "IDX_workflow_execution_workflow_id_transaction_id";`);
this.addSql(`drop index if exists "IDX_workflow_execution_state_updated_at";`);
this.addSql(`drop index if exists "IDX_workflow_execution_retention_time_updated_at_state";`);
this.addSql(`drop index if exists "IDX_workflow_execution_updated_at_retention_time";`);
}
}

View File

@@ -38,4 +38,21 @@ export const WorkflowExecution = model
on: ["run_id"], on: ["run_id"],
where: "deleted_at IS NULL", where: "deleted_at IS NULL",
}, },
{
on: ["workflow_id", "transaction_id"],
where: "deleted_at IS NULL",
},
{
on: ["state", "updated_at"],
where: "deleted_at IS NULL",
},
{
on: ["retention_time", "updated_at", "state"],
where: "deleted_at IS NULL AND retention_time IS NOT NULL",
},
{
on: ["updated_at", "retention_time"],
where:
"deleted_at IS NULL AND retention_time IS NOT NULL AND state IN ('done', 'failed', 'reverted')",
},
]) ])

View File

@@ -9,11 +9,13 @@ import {
import { import {
ContainerLike, ContainerLike,
Context, Context,
Logger,
MedusaContainer, MedusaContainer,
} from "@medusajs/framework/types" } from "@medusajs/framework/types"
import { import {
isString, isString,
MedusaError, MedusaError,
promiseAll,
TransactionState, TransactionState,
} from "@medusajs/framework/utils" } from "@medusajs/framework/utils"
import { import {
@@ -101,6 +103,7 @@ export class WorkflowOrchestratorService {
private subscribers: Subscribers = new Map() private subscribers: Subscribers = new Map()
private container_: MedusaContainer private container_: MedusaContainer
private inMemoryDistributedTransactionStorage_: InMemoryDistributedTransactionStorage private inMemoryDistributedTransactionStorage_: InMemoryDistributedTransactionStorage
readonly #logger: Logger
constructor({ constructor({
inMemoryDistributedTransactionStorage, inMemoryDistributedTransactionStorage,
@@ -113,6 +116,10 @@ export class WorkflowOrchestratorService {
this.container_ = sharedContainer this.container_ = sharedContainer
this.inMemoryDistributedTransactionStorage_ = this.inMemoryDistributedTransactionStorage_ =
inMemoryDistributedTransactionStorage inMemoryDistributedTransactionStorage
this.#logger =
this.container_.resolve("logger", { allowUnregistered: true }) ?? console
inMemoryDistributedTransactionStorage.setWorkflowOrchestratorService(this) inMemoryDistributedTransactionStorage.setWorkflowOrchestratorService(this)
DistributedTransaction.setStorage(inMemoryDistributedTransactionStorage) DistributedTransaction.setStorage(inMemoryDistributedTransactionStorage)
WorkflowScheduler.setStorage(inMemoryDistributedTransactionStorage) WorkflowScheduler.setStorage(inMemoryDistributedTransactionStorage)
@@ -673,46 +680,49 @@ export class WorkflowOrchestratorService {
} }
private notify(options: NotifyOptions) { private notify(options: NotifyOptions) {
const { // Process subscribers asynchronously to avoid blocking workflow execution
eventType, setImmediate(() => this.processSubscriberNotifications(options))
workflowId, }
transactionId,
errors,
result,
step,
response,
state,
} = options
private async processSubscriberNotifications(options: NotifyOptions) {
const { workflowId, transactionId, eventType } = options
const subscribers: TransactionSubscribers = const subscribers: TransactionSubscribers =
this.subscribers.get(workflowId) ?? new Map() this.subscribers.get(workflowId) ?? new Map()
const notifySubscribers = (handlers: SubscriberHandler[]) => { const notifySubscribersAsync = async (handlers: SubscriberHandler[]) => {
handlers.forEach((handler) => { const promises = handlers.map(async (handler) => {
handler({ try {
eventType, const result = handler(options) as void | Promise<any>
workflowId, if (result && typeof result === "object" && "then" in result) {
transactionId, await (result as Promise<any>)
step, }
response, } catch (error) {
result, this.#logger.error(`Subscriber error: ${error}`)
errors, }
state,
})
}) })
await promiseAll(promises)
} }
const tasks: Promise<void>[] = []
if (transactionId) { if (transactionId) {
const transactionSubscribers = subscribers.get(transactionId) ?? [] const transactionSubscribers = subscribers.get(transactionId) ?? []
notifySubscribers(transactionSubscribers) if (transactionSubscribers.length > 0) {
tasks.push(notifySubscribersAsync(transactionSubscribers))
}
if (options.eventType === "onFinish") { if (eventType === "onFinish") {
subscribers.delete(transactionId) subscribers.delete(transactionId)
} }
} }
const workflowSubscribers = subscribers.get(AnySubscriber) ?? [] const workflowSubscribers = subscribers.get(AnySubscriber) ?? []
notifySubscribers(workflowSubscribers) if (workflowSubscribers.length > 0) {
tasks.push(notifySubscribersAsync(workflowSubscribers))
}
await promiseAll(tasks)
} }
private buildWorkflowEvents({ private buildWorkflowEvents({

View File

@@ -284,36 +284,26 @@ export class WorkflowsModuleService<
} as any) } as any)
} }
@InjectSharedContext() async subscribe(args: {
async subscribe( workflowId: string
args: { transactionId?: string
workflowId: string subscriber: Function
transactionId?: string subscriberId?: string
subscriber: Function }) {
subscriberId?: string
},
@MedusaContext() context: Context = {}
) {
return this.workflowOrchestratorService_.subscribe(args as any) return this.workflowOrchestratorService_.subscribe(args as any)
} }
@InjectSharedContext() async unsubscribe(args: {
async unsubscribe( workflowId: string
args: { transactionId?: string
workflowId: string subscriberOrId: string | Function
transactionId?: string }) {
subscriberOrId: string | Function
},
@MedusaContext() context: Context = {}
) {
return this.workflowOrchestratorService_.unsubscribe(args as any) return this.workflowOrchestratorService_.unsubscribe(args as any)
} }
@InjectSharedContext()
async cancel<TWorkflow extends string | ReturnWorkflow<any, any, any>>( async cancel<TWorkflow extends string | ReturnWorkflow<any, any, any>>(
workflowIdOrWorkflow: TWorkflow, workflowIdOrWorkflow: TWorkflow,
options: WorkflowOrchestratorCancelOptions, options: WorkflowOrchestratorCancelOptions
@MedusaContext() context: Context = {}
) { ) {
return await this.workflowOrchestratorService_.cancel( return await this.workflowOrchestratorService_.cancel(
workflowIdOrWorkflow, workflowIdOrWorkflow,

View File

@@ -69,6 +69,24 @@ function parseNextExecution(
return result return result
} }
const invokingStatesSet = new Set([
TransactionStepState.INVOKING,
TransactionStepState.NOT_STARTED,
])
const compensatingStatesSet = new Set([
TransactionStepState.COMPENSATING,
TransactionStepState.NOT_STARTED,
])
function isInvokingState(step: TransactionStep) {
return invokingStatesSet.has(step.invoke?.state)
}
function isCompensatingState(step: TransactionStep) {
return compensatingStatesSet.has(step.compensate?.state)
}
export class InMemoryDistributedTransactionStorage export class InMemoryDistributedTransactionStorage
implements IDistributedTransactionStorage, IDistributedSchedulerStorage implements IDistributedTransactionStorage, IDistributedSchedulerStorage
{ {
@@ -87,8 +105,9 @@ export class InMemoryDistributedTransactionStorage
config: SchedulerOptions config: SchedulerOptions
} }
> = new Map() > = new Map()
private retries: Map<string, unknown> = new Map() private retries: Map<string, NodeJS.Timeout> = new Map()
private timeouts: Map<string, unknown> = new Map() private timeouts: Map<string, NodeJS.Timeout> = new Map()
private pendingTimers: Set<NodeJS.Timeout> = new Set()
private clearTimeout_: NodeJS.Timeout private clearTimeout_: NodeJS.Timeout
@@ -113,12 +132,46 @@ export class InMemoryDistributedTransactionStorage
async onApplicationShutdown() { async onApplicationShutdown() {
clearInterval(this.clearTimeout_) clearInterval(this.clearTimeout_)
for (const timer of this.pendingTimers) {
clearTimeout(timer)
}
this.pendingTimers.clear()
for (const timer of this.retries.values()) {
clearTimeout(timer)
}
this.retries.clear()
for (const timer of this.timeouts.values()) {
clearTimeout(timer)
}
this.timeouts.clear()
// Clean up scheduled job timers
for (const job of this.scheduled.values()) {
clearTimeout(job.timer)
}
this.scheduled.clear()
} }
setWorkflowOrchestratorService(workflowOrchestratorService) { setWorkflowOrchestratorService(workflowOrchestratorService) {
this.workflowOrchestratorService_ = workflowOrchestratorService this.workflowOrchestratorService_ = workflowOrchestratorService
} }
private createManagedTimer(
callback: () => void,
delay: number
): NodeJS.Timeout {
const timer = setTimeout(() => {
this.pendingTimers.delete(timer)
callback()
}, delay)
this.pendingTimers.add(timer)
return timer
}
private async saveToDb(data: TransactionCheckpoint, retentionTime?: number) { private async saveToDb(data: TransactionCheckpoint, retentionTime?: number) {
const isNotStarted = data.flow.state === TransactionState.NOT_STARTED const isNotStarted = data.flow.state === TransactionState.NOT_STARTED
const isFinished = [ const isFinished = [
@@ -453,38 +506,43 @@ export class InMemoryDistributedTransactionStorage
) )
} }
// Predefined states for quick lookup
const invokingStates = [
TransactionStepState.INVOKING,
TransactionStepState.NOT_STARTED,
]
const compensatingStates = [
TransactionStepState.COMPENSATING,
TransactionStepState.NOT_STARTED,
]
const isInvokingState = (step: TransactionStep) =>
invokingStates.includes(step.invoke?.state)
const isCompensatingState = (step: TransactionStep) =>
compensatingStates.includes(step.compensate?.state)
const currentFlowLastInvokingStepIndex = const currentFlowLastInvokingStepIndex =
currentFlowSteps.findIndex(isInvokingState) currentFlowSteps.findIndex(isInvokingState)
const latestUpdatedFlowLastInvokingStepIndex = !latestUpdatedFlow.steps let latestUpdatedFlowLastInvokingStepIndex = !latestUpdatedFlow.steps
? 1 // There is no other execution, so the current execution is the latest ? 1 // There is no other execution, so the current execution is the latest
: latestUpdatedFlowSteps.findIndex(isInvokingState) : -1
const reversedCurrentFlowSteps = [...currentFlowSteps].reverse() if (latestUpdatedFlow.steps) {
const currentFlowLastCompensatingStepIndex = for (let i = 0; i < latestUpdatedFlowSteps.length; i++) {
reversedCurrentFlowSteps.findIndex(isCompensatingState) if (isInvokingState(latestUpdatedFlowSteps[i])) {
latestUpdatedFlowLastInvokingStepIndex = i
break
}
}
}
const reversedLatestUpdatedFlowSteps = [...latestUpdatedFlowSteps].reverse() let currentFlowLastCompensatingStepIndex = -1
const latestUpdatedFlowLastCompensatingStepIndex = !latestUpdatedFlow.steps for (let i = currentFlowSteps.length - 1; i >= 0; i--) {
if (isCompensatingState(currentFlowSteps[i])) {
currentFlowLastCompensatingStepIndex = currentFlowSteps.length - 1 - i
break
}
}
let latestUpdatedFlowLastCompensatingStepIndex = !latestUpdatedFlow.steps
? -1 // There is no other execution, so the current execution is the latest ? -1 // There is no other execution, so the current execution is the latest
: reversedLatestUpdatedFlowSteps.findIndex(isCompensatingState) : -1
if (latestUpdatedFlow.steps) {
for (let i = latestUpdatedFlowSteps.length - 1; i >= 0; i--) {
if (isCompensatingState(latestUpdatedFlowSteps[i])) {
latestUpdatedFlowLastCompensatingStepIndex =
latestUpdatedFlowSteps.length - 1 - i
break
}
}
}
const isLatestExecutionFinishedIndex = -1 const isLatestExecutionFinishedIndex = -1
const invokeShouldBeSkipped = const invokeShouldBeSkipped =
@@ -536,8 +594,16 @@ export class InMemoryDistributedTransactionStorage
interval: number interval: number
): Promise<void> { ): Promise<void> {
const { modelId: workflowId, transactionId } = transaction const { modelId: workflowId, transactionId } = transaction
const key = `${workflowId}:${transactionId}:${step.id}`
const inter = setTimeout(async () => { const existingTimer = this.retries.get(key)
if (existingTimer) {
clearTimeout(existingTimer)
this.pendingTimers.delete(existingTimer)
}
const timer = this.createManagedTimer(async () => {
this.retries.delete(key)
const context = transaction.getFlow().metadata ?? {} const context = transaction.getFlow().metadata ?? {}
await this.workflowOrchestratorService_.run(workflowId, { await this.workflowOrchestratorService_.run(workflowId, {
transactionId, transactionId,
@@ -551,8 +617,7 @@ export class InMemoryDistributedTransactionStorage
}) })
}, interval * 1e3) }, interval * 1e3)
const key = `${workflowId}:${transactionId}:${step.id}` this.retries.set(key, timer)
this.retries.set(key, inter)
} }
async clearRetry( async clearRetry(
@@ -562,9 +627,10 @@ export class InMemoryDistributedTransactionStorage
const { modelId: workflowId, transactionId } = transaction const { modelId: workflowId, transactionId } = transaction
const key = `${workflowId}:${transactionId}:${step.id}` const key = `${workflowId}:${transactionId}:${step.id}`
const inter = this.retries.get(key) const timer = this.retries.get(key)
if (inter) { if (timer) {
clearTimeout(inter as NodeJS.Timeout) clearTimeout(timer)
this.pendingTimers.delete(timer)
this.retries.delete(key) this.retries.delete(key)
} }
} }
@@ -575,8 +641,16 @@ export class InMemoryDistributedTransactionStorage
interval: number interval: number
): Promise<void> { ): Promise<void> {
const { modelId: workflowId, transactionId } = transaction const { modelId: workflowId, transactionId } = transaction
const key = `${workflowId}:${transactionId}`
const inter = setTimeout(async () => { const existingTimer = this.timeouts.get(key)
if (existingTimer) {
clearTimeout(existingTimer)
this.pendingTimers.delete(existingTimer)
}
const timer = this.createManagedTimer(async () => {
this.timeouts.delete(key)
const context = transaction.getFlow().metadata ?? {} const context = transaction.getFlow().metadata ?? {}
await this.workflowOrchestratorService_.run(workflowId, { await this.workflowOrchestratorService_.run(workflowId, {
transactionId, transactionId,
@@ -590,8 +664,7 @@ export class InMemoryDistributedTransactionStorage
}) })
}, interval * 1e3) }, interval * 1e3)
const key = `${workflowId}:${transactionId}` this.timeouts.set(key, timer)
this.timeouts.set(key, inter)
} }
async clearTransactionTimeout( async clearTransactionTimeout(
@@ -600,9 +673,10 @@ export class InMemoryDistributedTransactionStorage
const { modelId: workflowId, transactionId } = transaction const { modelId: workflowId, transactionId } = transaction
const key = `${workflowId}:${transactionId}` const key = `${workflowId}:${transactionId}`
const inter = this.timeouts.get(key) const timer = this.timeouts.get(key)
if (inter) { if (timer) {
clearTimeout(inter as NodeJS.Timeout) clearTimeout(timer)
this.pendingTimers.delete(timer)
this.timeouts.delete(key) this.timeouts.delete(key)
} }
} }
@@ -614,8 +688,16 @@ export class InMemoryDistributedTransactionStorage
interval: number interval: number
): Promise<void> { ): Promise<void> {
const { modelId: workflowId, transactionId } = transaction const { modelId: workflowId, transactionId } = transaction
const key = `${workflowId}:${transactionId}:${step.id}`
const inter = setTimeout(async () => { const existingTimer = this.timeouts.get(key)
if (existingTimer) {
clearTimeout(existingTimer)
this.pendingTimers.delete(existingTimer)
}
const timer = this.createManagedTimer(async () => {
this.timeouts.delete(key)
const context = transaction.getFlow().metadata ?? {} const context = transaction.getFlow().metadata ?? {}
await this.workflowOrchestratorService_.run(workflowId, { await this.workflowOrchestratorService_.run(workflowId, {
transactionId, transactionId,
@@ -629,8 +711,7 @@ export class InMemoryDistributedTransactionStorage
}) })
}, interval * 1e3) }, interval * 1e3)
const key = `${workflowId}:${transactionId}:${step.id}` this.timeouts.set(key, timer)
this.timeouts.set(key, inter)
} }
async clearStepTimeout( async clearStepTimeout(
@@ -640,9 +721,10 @@ export class InMemoryDistributedTransactionStorage
const { modelId: workflowId, transactionId } = transaction const { modelId: workflowId, transactionId } = transaction
const key = `${workflowId}:${transactionId}:${step.id}` const key = `${workflowId}:${transactionId}:${step.id}`
const inter = this.timeouts.get(key) const timer = this.timeouts.get(key)
if (inter) { if (timer) {
clearTimeout(inter as NodeJS.Timeout) clearTimeout(timer)
this.pendingTimers.delete(timer)
this.timeouts.delete(key) this.timeouts.delete(key)
} }
} }
@@ -726,11 +808,8 @@ export class InMemoryDistributedTransactionStorage
throwOnError: false, throwOnError: false,
}) })
// Only schedule the next job execution after the current one completes successfully const timer = this.createManagedTimer(() => {
const timer = setTimeout(async () => { this.jobHandler(jobId)
setImmediate(() => {
this.jobHandler(jobId)
})
}, nextExecution) }, nextExecution)
// Prevent timer from keeping the process alive // Prevent timer from keeping the process alive

View File

@@ -186,6 +186,42 @@
"unique": false, "unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_run_id\" ON \"workflow_execution\" (run_id) WHERE deleted_at IS NULL" "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_run_id\" ON \"workflow_execution\" (run_id) WHERE deleted_at IS NULL"
}, },
{
"keyName": "IDX_workflow_execution_workflow_id_transaction_id",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id\" ON \"workflow_execution\" (workflow_id, transaction_id) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_workflow_execution_state_updated_at",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state_updated_at\" ON \"workflow_execution\" (state, updated_at) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_workflow_execution_retention_time_updated_at_state",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_retention_time_updated_at_state\" ON \"workflow_execution\" (retention_time, updated_at, state) WHERE deleted_at IS NULL AND retention_time IS NOT NULL"
},
{
"keyName": "IDX_workflow_execution_updated_at_retention_time",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_updated_at_retention_time\" ON \"workflow_execution\" (updated_at, retention_time) WHERE deleted_at IS NULL AND retention_time IS NOT NULL AND state IN ('done', 'failed', 'reverted')"
},
{ {
"keyName": "workflow_execution_pkey", "keyName": "workflow_execution_pkey",
"columnNames": [ "columnNames": [

View File

@@ -0,0 +1,19 @@
import { Migration } from '@mikro-orm/migrations';
export class Migration20250908080326 extends Migration {
override async up(): Promise<void> {
this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id_transaction_id" ON "workflow_execution" (workflow_id, transaction_id) WHERE deleted_at IS NULL;`);
this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_state_updated_at" ON "workflow_execution" (state, updated_at) WHERE deleted_at IS NULL;`);
this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_retention_time_updated_at_state" ON "workflow_execution" (retention_time, updated_at, state) WHERE deleted_at IS NULL AND retention_time IS NOT NULL;`);
this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_updated_at_retention_time" ON "workflow_execution" (updated_at, retention_time) WHERE deleted_at IS NULL AND retention_time IS NOT NULL AND state IN ('done', 'failed', 'reverted');`);
}
override async down(): Promise<void> {
this.addSql(`drop index if exists "IDX_workflow_execution_workflow_id_transaction_id";`);
this.addSql(`drop index if exists "IDX_workflow_execution_state_updated_at";`);
this.addSql(`drop index if exists "IDX_workflow_execution_retention_time_updated_at_state";`);
this.addSql(`drop index if exists "IDX_workflow_execution_updated_at_retention_time";`);
}
}

View File

@@ -38,4 +38,21 @@ export const WorkflowExecution = model
on: ["run_id"], on: ["run_id"],
where: "deleted_at IS NULL", where: "deleted_at IS NULL",
}, },
{
on: ["workflow_id", "transaction_id"],
where: "deleted_at IS NULL",
},
{
on: ["state", "updated_at"],
where: "deleted_at IS NULL",
},
{
on: ["retention_time", "updated_at", "state"],
where: "deleted_at IS NULL AND retention_time IS NOT NULL",
},
{
on: ["updated_at", "retention_time"],
where:
"deleted_at IS NULL AND retention_time IS NOT NULL AND state IN ('done', 'failed', 'reverted')",
},
]) ])

View File

@@ -12,7 +12,11 @@ import {
Logger, Logger,
MedusaContainer, MedusaContainer,
} from "@medusajs/framework/types" } from "@medusajs/framework/types"
import { isString, TransactionState } from "@medusajs/framework/utils" import {
isString,
promiseAll,
TransactionState,
} from "@medusajs/framework/utils"
import { import {
FlowCancelOptions, FlowCancelOptions,
FlowRunOptions, FlowRunOptions,
@@ -21,7 +25,6 @@ import {
ReturnWorkflow, ReturnWorkflow,
} from "@medusajs/framework/workflows-sdk" } from "@medusajs/framework/workflows-sdk"
import Redis from "ioredis" import Redis from "ioredis"
import { setTimeout } from "timers"
import { ulid } from "ulid" import { ulid } from "ulid"
import type { RedisDistributedTransactionStorage } from "../utils" import type { RedisDistributedTransactionStorage } from "../utils"
@@ -112,7 +115,6 @@ export class WorkflowOrchestratorService {
protected redisSubscriber: Redis protected redisSubscriber: Redis
protected container_: MedusaContainer protected container_: MedusaContainer
private subscribers: Subscribers = new Map() private subscribers: Subscribers = new Map()
private activeStepsCount: number = 0
readonly #logger: Logger readonly #logger: Logger
@@ -149,10 +151,16 @@ export class WorkflowOrchestratorService {
this.redisDistributedTransactionStorage_ = this.redisDistributedTransactionStorage_ =
redisDistributedTransactionStorage redisDistributedTransactionStorage
this.redisSubscriber.on("message", async (_, message) => { this.redisSubscriber.on("message", async (channel, message) => {
const { instanceId, data } = JSON.parse(message) const workflowId = channel.split(":")[1]
if (!this.subscribers.has(workflowId)) return
await this.notify(data, false, instanceId) try {
const { instanceId, data } = JSON.parse(message)
await this.notify(data, false, instanceId)
} catch (error) {
this.#logger.error(`Failed to process Redis message: ${error}`)
}
}) })
} }
@@ -163,10 +171,6 @@ export class WorkflowOrchestratorService {
async onApplicationPrepareShutdown() { async onApplicationPrepareShutdown() {
// eslint-disable-next-line max-len // eslint-disable-next-line max-len
await this.redisDistributedTransactionStorage_.onApplicationPrepareShutdown() await this.redisDistributedTransactionStorage_.onApplicationPrepareShutdown()
while (this.activeStepsCount > 0) {
await new Promise((resolve) => setTimeout(resolve, 1000))
}
} }
async onApplicationStart() { async onApplicationStart() {
@@ -745,70 +749,67 @@ export class WorkflowOrchestratorService {
return return
} }
const { const { workflowId, isFlowAsync } = options
isFlowAsync,
eventType,
workflowId,
transactionId,
errors,
result,
step,
response,
state,
} = options
// Non-blocking Redis publishing
if (publish && isFlowAsync) { if (publish && isFlowAsync) {
const channel = this.getChannelName(options.workflowId) setImmediate(async () => {
const message = JSON.stringify({ try {
instanceId: this.instanceId, const channel = this.getChannelName(workflowId)
data: options, const message = JSON.stringify({
instanceId: this.instanceId,
data: options,
})
await this.redisPublisher.publish(channel, message)
} catch (error) {
this.#logger.error(`Failed to publish to Redis: ${error}`)
}
}) })
await this.redisPublisher.publish(channel, message)
} }
// Process subscribers asynchronously
setImmediate(() => this.processSubscriberNotifications(options))
}
private async processSubscriberNotifications(options: NotifyOptions) {
const { workflowId, transactionId, eventType } = options
const subscribers: TransactionSubscribers = const subscribers: TransactionSubscribers =
this.subscribers.get(workflowId) ?? new Map() this.subscribers.get(workflowId) ?? new Map()
const notifySubscribers = (handlers: SubscriberHandler[]) => { const notifySubscribersAsync = async (handlers: SubscriberHandler[]) => {
handlers.forEach((handler) => { const promises = handlers.map(async (handler) => {
const args = { try {
eventType, const result = handler(options) as void | Promise<any>
workflowId, if (result && typeof result === "object" && "then" in result) {
transactionId, await (result as Promise<any>)
isFlowAsync,
step,
response,
result,
errors,
state,
}
const isPromise = "then" in handler
if (isPromise) {
;(handler(args) as unknown as Promise<any>).catch((e) => {
this.#logger.error(e)
})
} else {
try {
handler(args)
} catch (e) {
this.#logger.error(e)
} }
} catch (error) {
this.#logger.error(`Subscriber error: ${error}`)
} }
}) })
await promiseAll(promises)
} }
const tasks: Promise<void>[] = []
if (transactionId) { if (transactionId) {
const transactionSubscribers = subscribers.get(transactionId) ?? [] const transactionSubscribers = subscribers.get(transactionId) ?? []
notifySubscribers(transactionSubscribers) if (transactionSubscribers.length > 0) {
tasks.push(notifySubscribersAsync(transactionSubscribers))
}
// removes transaction id subscribers on finish
if (eventType === "onFinish") { if (eventType === "onFinish") {
subscribers.delete(transactionId) subscribers.delete(transactionId)
} }
} }
const workflowSubscribers = subscribers.get(AnySubscriber) ?? [] const workflowSubscribers = subscribers.get(AnySubscriber) ?? []
notifySubscribers(workflowSubscribers) if (workflowSubscribers.length > 0) {
tasks.push(notifySubscribersAsync(workflowSubscribers))
}
await promiseAll(tasks)
} }
private getChannelName(workflowId: string): string { private getChannelName(workflowId: string): string {
@@ -890,8 +891,6 @@ export class WorkflowOrchestratorService {
onStepBegin: async ({ step, transaction }) => { onStepBegin: async ({ step, transaction }) => {
customEventHandlers?.onStepBegin?.({ step, transaction }) customEventHandlers?.onStepBegin?.({ step, transaction })
this.activeStepsCount++
await notify({ await notify({
eventType: "onStepBegin", eventType: "onStepBegin",
step, step,
@@ -911,8 +910,6 @@ export class WorkflowOrchestratorService {
response, response,
isFlowAsync: transaction.getFlow().hasAsyncSteps, isFlowAsync: transaction.getFlow().hasAsyncSteps,
}) })
this.activeStepsCount--
}, },
onStepFailure: async ({ step, transaction }) => { onStepFailure: async ({ step, transaction }) => {
const stepName = step.definition.action! const stepName = step.definition.action!
@@ -927,8 +924,6 @@ export class WorkflowOrchestratorService {
errors, errors,
isFlowAsync: transaction.getFlow().hasAsyncSteps, isFlowAsync: transaction.getFlow().hasAsyncSteps,
}) })
this.activeStepsCount--
}, },
onStepAwaiting: async ({ step, transaction }) => { onStepAwaiting: async ({ step, transaction }) => {
customEventHandlers?.onStepAwaiting?.({ step, transaction }) customEventHandlers?.onStepAwaiting?.({ step, transaction })
@@ -938,8 +933,6 @@ export class WorkflowOrchestratorService {
step, step,
isFlowAsync: transaction.getFlow().hasAsyncSteps, isFlowAsync: transaction.getFlow().hasAsyncSteps,
}) })
this.activeStepsCount--
}, },
onCompensateStepSuccess: async ({ step, transaction }) => { onCompensateStepSuccess: async ({ step, transaction }) => {

View File

@@ -293,37 +293,24 @@ export class WorkflowsModuleService<
}) })
} }
@InjectSharedContext() async subscribe(args: {
async subscribe( workflowId: string
args: { transactionId?: string
workflowId: string subscriber: Function
transactionId?: string subscriberId?: string
subscriber: Function }) {
subscriberId?: string
},
@MedusaContext() context: Context = {}
) {
return this.workflowOrchestratorService_.subscribe(args as any) return this.workflowOrchestratorService_.subscribe(args as any)
} }
@InjectSharedContext() async unsubscribe(args: {
async unsubscribe( workflowId: string
args: { transactionId?: string
workflowId: string subscriberOrId: string | Function
transactionId?: string }) {
subscriberOrId: string | Function
},
@MedusaContext() context: Context = {}
) {
return this.workflowOrchestratorService_.unsubscribe(args as any) return this.workflowOrchestratorService_.unsubscribe(args as any)
} }
@InjectSharedContext() async cancel(workflowId: string, options: WorkflowOrchestratorCancelOptions) {
async cancel(
workflowId: string,
options: WorkflowOrchestratorCancelOptions,
@MedusaContext() context: Context = {}
) {
return await this.workflowOrchestratorService_.cancel(workflowId, options) return await this.workflowOrchestratorService_.cancel(workflowId, options)
} }
} }

View File

@@ -36,6 +36,24 @@ enum JobType {
const ONE_HOUR_IN_MS = 1000 * 60 * 60 const ONE_HOUR_IN_MS = 1000 * 60 * 60
const REPEATABLE_CLEARER_JOB_ID = "clear-expired-executions" const REPEATABLE_CLEARER_JOB_ID = "clear-expired-executions"
const invokingStatesSet = new Set([
TransactionStepState.INVOKING,
TransactionStepState.NOT_STARTED,
])
const compensatingStatesSet = new Set([
TransactionStepState.COMPENSATING,
TransactionStepState.NOT_STARTED,
])
function isInvokingState(step: TransactionStep) {
return invokingStatesSet.has(step.invoke?.state)
}
function isCompensatingState(step: TransactionStep) {
return compensatingStatesSet.has(step.compensate?.state)
}
export class RedisDistributedTransactionStorage export class RedisDistributedTransactionStorage
implements IDistributedTransactionStorage, IDistributedSchedulerStorage implements IDistributedTransactionStorage, IDistributedSchedulerStorage
{ {
@@ -100,6 +118,7 @@ export class RedisDistributedTransactionStorage
await this.worker?.close() await this.worker?.close()
await this.jobWorker?.close() await this.jobWorker?.close()
// Clean up repeatable jobs
const repeatableJobs = (await this.cleanerQueue_?.getRepeatableJobs()) ?? [] const repeatableJobs = (await this.cleanerQueue_?.getRepeatableJobs()) ?? []
for (const job of repeatableJobs) { for (const job of repeatableJobs) {
if (job.id === REPEATABLE_CLEARER_JOB_ID) { if (job.id === REPEATABLE_CLEARER_JOB_ID) {
@@ -780,38 +799,48 @@ export class RedisDistributedTransactionStorage
) )
} }
// Predefined states for quick lookup let currentFlowLastInvokingStepIndex = -1
const invokingStates = [ for (let i = 0; i < currentFlowSteps.length; i++) {
TransactionStepState.INVOKING, if (isInvokingState(currentFlowSteps[i])) {
TransactionStepState.NOT_STARTED, currentFlowLastInvokingStepIndex = i
] break
}
}
const compensatingStates = [ let latestUpdatedFlowLastInvokingStepIndex = !latestUpdatedFlow.steps
TransactionStepState.COMPENSATING,
TransactionStepState.NOT_STARTED,
]
const isInvokingState = (step: TransactionStep) =>
invokingStates.includes(step.invoke?.state)
const isCompensatingState = (step: TransactionStep) =>
compensatingStates.includes(step.compensate?.state)
const currentFlowLastInvokingStepIndex =
currentFlowSteps.findIndex(isInvokingState)
const latestUpdatedFlowLastInvokingStepIndex = !latestUpdatedFlow.steps
? 1 // There is no other execution, so the current execution is the latest ? 1 // There is no other execution, so the current execution is the latest
: latestUpdatedFlowSteps.findIndex(isInvokingState) : -1
const reversedCurrentFlowSteps = [...currentFlowSteps].reverse() if (latestUpdatedFlow.steps) {
const currentFlowLastCompensatingStepIndex = for (let i = 0; i < latestUpdatedFlowSteps.length; i++) {
reversedCurrentFlowSteps.findIndex(isCompensatingState) if (isInvokingState(latestUpdatedFlowSteps[i])) {
latestUpdatedFlowLastInvokingStepIndex = i
break
}
}
}
const reversedLatestUpdatedFlowSteps = [...latestUpdatedFlowSteps].reverse() let currentFlowLastCompensatingStepIndex = -1
const latestUpdatedFlowLastCompensatingStepIndex = !latestUpdatedFlow.steps for (let i = currentFlowSteps.length - 1; i >= 0; i--) {
? -1 if (isCompensatingState(currentFlowSteps[i])) {
: reversedLatestUpdatedFlowSteps.findIndex(isCompensatingState) currentFlowLastCompensatingStepIndex = currentFlowSteps.length - 1 - i
break
}
}
let latestUpdatedFlowLastCompensatingStepIndex = !latestUpdatedFlow.steps
? -1 // There is no other execution, so the current execution is the latest
: -1
if (latestUpdatedFlow.steps) {
for (let i = latestUpdatedFlowSteps.length - 1; i >= 0; i--) {
if (isCompensatingState(latestUpdatedFlowSteps[i])) {
latestUpdatedFlowLastCompensatingStepIndex =
latestUpdatedFlowSteps.length - 1 - i
break
}
}
}
const isLatestExecutionFinishedIndex = -1 const isLatestExecutionFinishedIndex = -1
const invokeShouldBeSkipped = const invokeShouldBeSkipped =