diff --git a/.changeset/silly-suns-chew.md b/.changeset/silly-suns-chew.md new file mode 100644 index 0000000000..08d1204414 --- /dev/null +++ b/.changeset/silly-suns-chew.md @@ -0,0 +1,7 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/orchestration": patch +--- + +chore(): Workflow engine timers and notification improvements diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index c1c39b6c72..9f346afa4f 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -566,7 +566,11 @@ export class TransactionOrchestrator extends EventEmitter { cleaningUp.push(transaction.clearStepTimeout(step)) } - await promiseAll(cleaningUp) + if (cleaningUp.length) { + setImmediate(async () => { + await promiseAll(cleaningUp) + }) + } if (shouldEmit) { const eventName = step.isCompensating() @@ -645,7 +649,11 @@ export class TransactionOrchestrator extends EventEmitter { cleaningUp.push(transaction.clearStepTimeout(step)) } - await promiseAll(cleaningUp) + if (cleaningUp.length) { + setImmediate(async () => { + await promiseAll(cleaningUp) + }) + } if (shouldEmit) { const eventName = DistributedTransactionEvent.STEP_SKIPPED @@ -847,7 +855,11 @@ export class TransactionOrchestrator extends EventEmitter { cleaningUp.push(transaction.clearRetry(step)) } - await promiseAll(cleaningUp) + if (cleaningUp.length) { + setImmediate(async () => { + await promiseAll(cleaningUp) + }) + } if (!result.stopExecution) { const eventName = step.isCompensating() diff --git a/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json b/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json index 20e0503696..b39926fdcb 100644 --- a/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json +++ b/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json @@ -186,6 +186,42 @@ "unique": false, "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_run_id\" ON \"workflow_execution\" (run_id) WHERE deleted_at IS NULL" }, + { + "keyName": "IDX_workflow_execution_workflow_id_transaction_id", + "columnNames": [], + "composite": false, + "constraint": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id\" ON \"workflow_execution\" (workflow_id, transaction_id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_state_updated_at", + "columnNames": [], + "composite": false, + "constraint": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state_updated_at\" ON \"workflow_execution\" (state, updated_at) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_retention_time_updated_at_state", + "columnNames": [], + "composite": false, + "constraint": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_retention_time_updated_at_state\" ON \"workflow_execution\" (retention_time, updated_at, state) WHERE deleted_at IS NULL AND retention_time IS NOT NULL" + }, + { + "keyName": "IDX_workflow_execution_updated_at_retention_time", + "columnNames": [], + "composite": false, + "constraint": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_updated_at_retention_time\" ON \"workflow_execution\" (updated_at, retention_time) WHERE deleted_at IS NULL AND retention_time IS NOT NULL AND state IN ('done', 'failed', 'reverted')" + }, { "keyName": "workflow_execution_pkey", "columnNames": [ diff --git a/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250908080305.ts b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250908080305.ts new file mode 100644 index 0000000000..562642939e --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250908080305.ts @@ -0,0 +1,19 @@ +import { Migration } from '@mikro-orm/migrations'; + +export class Migration20250908080305 extends Migration { + + override async up(): Promise { + this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id_transaction_id" ON "workflow_execution" (workflow_id, transaction_id) WHERE deleted_at IS NULL;`); + this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_state_updated_at" ON "workflow_execution" (state, updated_at) WHERE deleted_at IS NULL;`); + this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_retention_time_updated_at_state" ON "workflow_execution" (retention_time, updated_at, state) WHERE deleted_at IS NULL AND retention_time IS NOT NULL;`); + this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_updated_at_retention_time" ON "workflow_execution" (updated_at, retention_time) WHERE deleted_at IS NULL AND retention_time IS NOT NULL AND state IN ('done', 'failed', 'reverted');`); + } + + override async down(): Promise { + this.addSql(`drop index if exists "IDX_workflow_execution_workflow_id_transaction_id";`); + this.addSql(`drop index if exists "IDX_workflow_execution_state_updated_at";`); + this.addSql(`drop index if exists "IDX_workflow_execution_retention_time_updated_at_state";`); + this.addSql(`drop index if exists "IDX_workflow_execution_updated_at_retention_time";`); + } + +} diff --git a/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts b/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts index 6a92309133..eb21c1abf3 100644 --- a/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts +++ b/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts @@ -38,4 +38,21 @@ export const WorkflowExecution = model on: ["run_id"], where: "deleted_at IS NULL", }, + { + on: ["workflow_id", "transaction_id"], + where: "deleted_at IS NULL", + }, + { + on: ["state", "updated_at"], + where: "deleted_at IS NULL", + }, + { + on: ["retention_time", "updated_at", "state"], + where: "deleted_at IS NULL AND retention_time IS NOT NULL", + }, + { + on: ["updated_at", "retention_time"], + where: + "deleted_at IS NULL AND retention_time IS NOT NULL AND state IN ('done', 'failed', 'reverted')", + }, ]) diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index ff67c1cb22..3678ba850d 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -9,11 +9,13 @@ import { import { ContainerLike, Context, + Logger, MedusaContainer, } from "@medusajs/framework/types" import { isString, MedusaError, + promiseAll, TransactionState, } from "@medusajs/framework/utils" import { @@ -101,6 +103,7 @@ export class WorkflowOrchestratorService { private subscribers: Subscribers = new Map() private container_: MedusaContainer private inMemoryDistributedTransactionStorage_: InMemoryDistributedTransactionStorage + readonly #logger: Logger constructor({ inMemoryDistributedTransactionStorage, @@ -113,6 +116,10 @@ export class WorkflowOrchestratorService { this.container_ = sharedContainer this.inMemoryDistributedTransactionStorage_ = inMemoryDistributedTransactionStorage + + this.#logger = + this.container_.resolve("logger", { allowUnregistered: true }) ?? console + inMemoryDistributedTransactionStorage.setWorkflowOrchestratorService(this) DistributedTransaction.setStorage(inMemoryDistributedTransactionStorage) WorkflowScheduler.setStorage(inMemoryDistributedTransactionStorage) @@ -673,46 +680,49 @@ export class WorkflowOrchestratorService { } private notify(options: NotifyOptions) { - const { - eventType, - workflowId, - transactionId, - errors, - result, - step, - response, - state, - } = options + // Process subscribers asynchronously to avoid blocking workflow execution + setImmediate(() => this.processSubscriberNotifications(options)) + } + private async processSubscriberNotifications(options: NotifyOptions) { + const { workflowId, transactionId, eventType } = options const subscribers: TransactionSubscribers = this.subscribers.get(workflowId) ?? new Map() - const notifySubscribers = (handlers: SubscriberHandler[]) => { - handlers.forEach((handler) => { - handler({ - eventType, - workflowId, - transactionId, - step, - response, - result, - errors, - state, - }) + const notifySubscribersAsync = async (handlers: SubscriberHandler[]) => { + const promises = handlers.map(async (handler) => { + try { + const result = handler(options) as void | Promise + if (result && typeof result === "object" && "then" in result) { + await (result as Promise) + } + } catch (error) { + this.#logger.error(`Subscriber error: ${error}`) + } }) + + await promiseAll(promises) } + const tasks: Promise[] = [] + if (transactionId) { const transactionSubscribers = subscribers.get(transactionId) ?? [] - notifySubscribers(transactionSubscribers) + if (transactionSubscribers.length > 0) { + tasks.push(notifySubscribersAsync(transactionSubscribers)) + } - if (options.eventType === "onFinish") { + if (eventType === "onFinish") { subscribers.delete(transactionId) } } const workflowSubscribers = subscribers.get(AnySubscriber) ?? [] - notifySubscribers(workflowSubscribers) + if (workflowSubscribers.length > 0) { + tasks.push(notifySubscribersAsync(workflowSubscribers)) + } + + await promiseAll(tasks) } private buildWorkflowEvents({ diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts index 930780e2a2..a33ed75536 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts @@ -284,36 +284,26 @@ export class WorkflowsModuleService< } as any) } - @InjectSharedContext() - async subscribe( - args: { - workflowId: string - transactionId?: string - subscriber: Function - subscriberId?: string - }, - @MedusaContext() context: Context = {} - ) { + async subscribe(args: { + workflowId: string + transactionId?: string + subscriber: Function + subscriberId?: string + }) { return this.workflowOrchestratorService_.subscribe(args as any) } - @InjectSharedContext() - async unsubscribe( - args: { - workflowId: string - transactionId?: string - subscriberOrId: string | Function - }, - @MedusaContext() context: Context = {} - ) { + async unsubscribe(args: { + workflowId: string + transactionId?: string + subscriberOrId: string | Function + }) { return this.workflowOrchestratorService_.unsubscribe(args as any) } - @InjectSharedContext() async cancel>( workflowIdOrWorkflow: TWorkflow, - options: WorkflowOrchestratorCancelOptions, - @MedusaContext() context: Context = {} + options: WorkflowOrchestratorCancelOptions ) { return await this.workflowOrchestratorService_.cancel( workflowIdOrWorkflow, diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 02db64216a..0cc222a59e 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -69,6 +69,24 @@ function parseNextExecution( return result } +const invokingStatesSet = new Set([ + TransactionStepState.INVOKING, + TransactionStepState.NOT_STARTED, +]) + +const compensatingStatesSet = new Set([ + TransactionStepState.COMPENSATING, + TransactionStepState.NOT_STARTED, +]) + +function isInvokingState(step: TransactionStep) { + return invokingStatesSet.has(step.invoke?.state) +} + +function isCompensatingState(step: TransactionStep) { + return compensatingStatesSet.has(step.compensate?.state) +} + export class InMemoryDistributedTransactionStorage implements IDistributedTransactionStorage, IDistributedSchedulerStorage { @@ -87,8 +105,9 @@ export class InMemoryDistributedTransactionStorage config: SchedulerOptions } > = new Map() - private retries: Map = new Map() - private timeouts: Map = new Map() + private retries: Map = new Map() + private timeouts: Map = new Map() + private pendingTimers: Set = new Set() private clearTimeout_: NodeJS.Timeout @@ -113,12 +132,46 @@ export class InMemoryDistributedTransactionStorage async onApplicationShutdown() { clearInterval(this.clearTimeout_) + + for (const timer of this.pendingTimers) { + clearTimeout(timer) + } + this.pendingTimers.clear() + + for (const timer of this.retries.values()) { + clearTimeout(timer) + } + this.retries.clear() + + for (const timer of this.timeouts.values()) { + clearTimeout(timer) + } + this.timeouts.clear() + + // Clean up scheduled job timers + for (const job of this.scheduled.values()) { + clearTimeout(job.timer) + } + this.scheduled.clear() } setWorkflowOrchestratorService(workflowOrchestratorService) { this.workflowOrchestratorService_ = workflowOrchestratorService } + private createManagedTimer( + callback: () => void, + delay: number + ): NodeJS.Timeout { + const timer = setTimeout(() => { + this.pendingTimers.delete(timer) + callback() + }, delay) + + this.pendingTimers.add(timer) + return timer + } + private async saveToDb(data: TransactionCheckpoint, retentionTime?: number) { const isNotStarted = data.flow.state === TransactionState.NOT_STARTED const isFinished = [ @@ -453,38 +506,43 @@ export class InMemoryDistributedTransactionStorage ) } - // Predefined states for quick lookup - const invokingStates = [ - TransactionStepState.INVOKING, - TransactionStepState.NOT_STARTED, - ] - - const compensatingStates = [ - TransactionStepState.COMPENSATING, - TransactionStepState.NOT_STARTED, - ] - - const isInvokingState = (step: TransactionStep) => - invokingStates.includes(step.invoke?.state) - - const isCompensatingState = (step: TransactionStep) => - compensatingStates.includes(step.compensate?.state) - const currentFlowLastInvokingStepIndex = currentFlowSteps.findIndex(isInvokingState) - const latestUpdatedFlowLastInvokingStepIndex = !latestUpdatedFlow.steps + let latestUpdatedFlowLastInvokingStepIndex = !latestUpdatedFlow.steps ? 1 // There is no other execution, so the current execution is the latest - : latestUpdatedFlowSteps.findIndex(isInvokingState) + : -1 - const reversedCurrentFlowSteps = [...currentFlowSteps].reverse() - const currentFlowLastCompensatingStepIndex = - reversedCurrentFlowSteps.findIndex(isCompensatingState) + if (latestUpdatedFlow.steps) { + for (let i = 0; i < latestUpdatedFlowSteps.length; i++) { + if (isInvokingState(latestUpdatedFlowSteps[i])) { + latestUpdatedFlowLastInvokingStepIndex = i + break + } + } + } - const reversedLatestUpdatedFlowSteps = [...latestUpdatedFlowSteps].reverse() - const latestUpdatedFlowLastCompensatingStepIndex = !latestUpdatedFlow.steps + let currentFlowLastCompensatingStepIndex = -1 + for (let i = currentFlowSteps.length - 1; i >= 0; i--) { + if (isCompensatingState(currentFlowSteps[i])) { + currentFlowLastCompensatingStepIndex = currentFlowSteps.length - 1 - i + break + } + } + + let latestUpdatedFlowLastCompensatingStepIndex = !latestUpdatedFlow.steps ? -1 // There is no other execution, so the current execution is the latest - : reversedLatestUpdatedFlowSteps.findIndex(isCompensatingState) + : -1 + + if (latestUpdatedFlow.steps) { + for (let i = latestUpdatedFlowSteps.length - 1; i >= 0; i--) { + if (isCompensatingState(latestUpdatedFlowSteps[i])) { + latestUpdatedFlowLastCompensatingStepIndex = + latestUpdatedFlowSteps.length - 1 - i + break + } + } + } const isLatestExecutionFinishedIndex = -1 const invokeShouldBeSkipped = @@ -536,8 +594,16 @@ export class InMemoryDistributedTransactionStorage interval: number ): Promise { const { modelId: workflowId, transactionId } = transaction + const key = `${workflowId}:${transactionId}:${step.id}` - const inter = setTimeout(async () => { + const existingTimer = this.retries.get(key) + if (existingTimer) { + clearTimeout(existingTimer) + this.pendingTimers.delete(existingTimer) + } + + const timer = this.createManagedTimer(async () => { + this.retries.delete(key) const context = transaction.getFlow().metadata ?? {} await this.workflowOrchestratorService_.run(workflowId, { transactionId, @@ -551,8 +617,7 @@ export class InMemoryDistributedTransactionStorage }) }, interval * 1e3) - const key = `${workflowId}:${transactionId}:${step.id}` - this.retries.set(key, inter) + this.retries.set(key, timer) } async clearRetry( @@ -562,9 +627,10 @@ export class InMemoryDistributedTransactionStorage const { modelId: workflowId, transactionId } = transaction const key = `${workflowId}:${transactionId}:${step.id}` - const inter = this.retries.get(key) - if (inter) { - clearTimeout(inter as NodeJS.Timeout) + const timer = this.retries.get(key) + if (timer) { + clearTimeout(timer) + this.pendingTimers.delete(timer) this.retries.delete(key) } } @@ -575,8 +641,16 @@ export class InMemoryDistributedTransactionStorage interval: number ): Promise { const { modelId: workflowId, transactionId } = transaction + const key = `${workflowId}:${transactionId}` - const inter = setTimeout(async () => { + const existingTimer = this.timeouts.get(key) + if (existingTimer) { + clearTimeout(existingTimer) + this.pendingTimers.delete(existingTimer) + } + + const timer = this.createManagedTimer(async () => { + this.timeouts.delete(key) const context = transaction.getFlow().metadata ?? {} await this.workflowOrchestratorService_.run(workflowId, { transactionId, @@ -590,8 +664,7 @@ export class InMemoryDistributedTransactionStorage }) }, interval * 1e3) - const key = `${workflowId}:${transactionId}` - this.timeouts.set(key, inter) + this.timeouts.set(key, timer) } async clearTransactionTimeout( @@ -600,9 +673,10 @@ export class InMemoryDistributedTransactionStorage const { modelId: workflowId, transactionId } = transaction const key = `${workflowId}:${transactionId}` - const inter = this.timeouts.get(key) - if (inter) { - clearTimeout(inter as NodeJS.Timeout) + const timer = this.timeouts.get(key) + if (timer) { + clearTimeout(timer) + this.pendingTimers.delete(timer) this.timeouts.delete(key) } } @@ -614,8 +688,16 @@ export class InMemoryDistributedTransactionStorage interval: number ): Promise { const { modelId: workflowId, transactionId } = transaction + const key = `${workflowId}:${transactionId}:${step.id}` - const inter = setTimeout(async () => { + const existingTimer = this.timeouts.get(key) + if (existingTimer) { + clearTimeout(existingTimer) + this.pendingTimers.delete(existingTimer) + } + + const timer = this.createManagedTimer(async () => { + this.timeouts.delete(key) const context = transaction.getFlow().metadata ?? {} await this.workflowOrchestratorService_.run(workflowId, { transactionId, @@ -629,8 +711,7 @@ export class InMemoryDistributedTransactionStorage }) }, interval * 1e3) - const key = `${workflowId}:${transactionId}:${step.id}` - this.timeouts.set(key, inter) + this.timeouts.set(key, timer) } async clearStepTimeout( @@ -640,9 +721,10 @@ export class InMemoryDistributedTransactionStorage const { modelId: workflowId, transactionId } = transaction const key = `${workflowId}:${transactionId}:${step.id}` - const inter = this.timeouts.get(key) - if (inter) { - clearTimeout(inter as NodeJS.Timeout) + const timer = this.timeouts.get(key) + if (timer) { + clearTimeout(timer) + this.pendingTimers.delete(timer) this.timeouts.delete(key) } } @@ -726,11 +808,8 @@ export class InMemoryDistributedTransactionStorage throwOnError: false, }) - // Only schedule the next job execution after the current one completes successfully - const timer = setTimeout(async () => { - setImmediate(() => { - this.jobHandler(jobId) - }) + const timer = this.createManagedTimer(() => { + this.jobHandler(jobId) }, nextExecution) // Prevent timer from keeping the process alive diff --git a/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json b/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json index 20e0503696..b39926fdcb 100644 --- a/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json +++ b/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json @@ -186,6 +186,42 @@ "unique": false, "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_run_id\" ON \"workflow_execution\" (run_id) WHERE deleted_at IS NULL" }, + { + "keyName": "IDX_workflow_execution_workflow_id_transaction_id", + "columnNames": [], + "composite": false, + "constraint": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id\" ON \"workflow_execution\" (workflow_id, transaction_id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_state_updated_at", + "columnNames": [], + "composite": false, + "constraint": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state_updated_at\" ON \"workflow_execution\" (state, updated_at) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_retention_time_updated_at_state", + "columnNames": [], + "composite": false, + "constraint": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_retention_time_updated_at_state\" ON \"workflow_execution\" (retention_time, updated_at, state) WHERE deleted_at IS NULL AND retention_time IS NOT NULL" + }, + { + "keyName": "IDX_workflow_execution_updated_at_retention_time", + "columnNames": [], + "composite": false, + "constraint": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_updated_at_retention_time\" ON \"workflow_execution\" (updated_at, retention_time) WHERE deleted_at IS NULL AND retention_time IS NOT NULL AND state IN ('done', 'failed', 'reverted')" + }, { "keyName": "workflow_execution_pkey", "columnNames": [ diff --git a/packages/modules/workflow-engine-redis/src/migrations/Migration20250908080326.ts b/packages/modules/workflow-engine-redis/src/migrations/Migration20250908080326.ts new file mode 100644 index 0000000000..b7a60acdfa --- /dev/null +++ b/packages/modules/workflow-engine-redis/src/migrations/Migration20250908080326.ts @@ -0,0 +1,19 @@ +import { Migration } from '@mikro-orm/migrations'; + +export class Migration20250908080326 extends Migration { + + override async up(): Promise { + this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id_transaction_id" ON "workflow_execution" (workflow_id, transaction_id) WHERE deleted_at IS NULL;`); + this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_state_updated_at" ON "workflow_execution" (state, updated_at) WHERE deleted_at IS NULL;`); + this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_retention_time_updated_at_state" ON "workflow_execution" (retention_time, updated_at, state) WHERE deleted_at IS NULL AND retention_time IS NOT NULL;`); + this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_updated_at_retention_time" ON "workflow_execution" (updated_at, retention_time) WHERE deleted_at IS NULL AND retention_time IS NOT NULL AND state IN ('done', 'failed', 'reverted');`); + } + + override async down(): Promise { + this.addSql(`drop index if exists "IDX_workflow_execution_workflow_id_transaction_id";`); + this.addSql(`drop index if exists "IDX_workflow_execution_state_updated_at";`); + this.addSql(`drop index if exists "IDX_workflow_execution_retention_time_updated_at_state";`); + this.addSql(`drop index if exists "IDX_workflow_execution_updated_at_retention_time";`); + } + +} diff --git a/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts b/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts index 6a92309133..eb21c1abf3 100644 --- a/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts +++ b/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts @@ -38,4 +38,21 @@ export const WorkflowExecution = model on: ["run_id"], where: "deleted_at IS NULL", }, + { + on: ["workflow_id", "transaction_id"], + where: "deleted_at IS NULL", + }, + { + on: ["state", "updated_at"], + where: "deleted_at IS NULL", + }, + { + on: ["retention_time", "updated_at", "state"], + where: "deleted_at IS NULL AND retention_time IS NOT NULL", + }, + { + on: ["updated_at", "retention_time"], + where: + "deleted_at IS NULL AND retention_time IS NOT NULL AND state IN ('done', 'failed', 'reverted')", + }, ]) diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index d058c432ef..f1455eee87 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -12,7 +12,11 @@ import { Logger, MedusaContainer, } from "@medusajs/framework/types" -import { isString, TransactionState } from "@medusajs/framework/utils" +import { + isString, + promiseAll, + TransactionState, +} from "@medusajs/framework/utils" import { FlowCancelOptions, FlowRunOptions, @@ -21,7 +25,6 @@ import { ReturnWorkflow, } from "@medusajs/framework/workflows-sdk" import Redis from "ioredis" -import { setTimeout } from "timers" import { ulid } from "ulid" import type { RedisDistributedTransactionStorage } from "../utils" @@ -112,7 +115,6 @@ export class WorkflowOrchestratorService { protected redisSubscriber: Redis protected container_: MedusaContainer private subscribers: Subscribers = new Map() - private activeStepsCount: number = 0 readonly #logger: Logger @@ -149,10 +151,16 @@ export class WorkflowOrchestratorService { this.redisDistributedTransactionStorage_ = redisDistributedTransactionStorage - this.redisSubscriber.on("message", async (_, message) => { - const { instanceId, data } = JSON.parse(message) + this.redisSubscriber.on("message", async (channel, message) => { + const workflowId = channel.split(":")[1] + if (!this.subscribers.has(workflowId)) return - await this.notify(data, false, instanceId) + try { + const { instanceId, data } = JSON.parse(message) + await this.notify(data, false, instanceId) + } catch (error) { + this.#logger.error(`Failed to process Redis message: ${error}`) + } }) } @@ -163,10 +171,6 @@ export class WorkflowOrchestratorService { async onApplicationPrepareShutdown() { // eslint-disable-next-line max-len await this.redisDistributedTransactionStorage_.onApplicationPrepareShutdown() - - while (this.activeStepsCount > 0) { - await new Promise((resolve) => setTimeout(resolve, 1000)) - } } async onApplicationStart() { @@ -745,70 +749,67 @@ export class WorkflowOrchestratorService { return } - const { - isFlowAsync, - eventType, - workflowId, - transactionId, - errors, - result, - step, - response, - state, - } = options + const { workflowId, isFlowAsync } = options + // Non-blocking Redis publishing if (publish && isFlowAsync) { - const channel = this.getChannelName(options.workflowId) - const message = JSON.stringify({ - instanceId: this.instanceId, - data: options, + setImmediate(async () => { + try { + const channel = this.getChannelName(workflowId) + const message = JSON.stringify({ + instanceId: this.instanceId, + data: options, + }) + await this.redisPublisher.publish(channel, message) + } catch (error) { + this.#logger.error(`Failed to publish to Redis: ${error}`) + } }) - await this.redisPublisher.publish(channel, message) } + // Process subscribers asynchronously + setImmediate(() => this.processSubscriberNotifications(options)) + } + + private async processSubscriberNotifications(options: NotifyOptions) { + const { workflowId, transactionId, eventType } = options const subscribers: TransactionSubscribers = this.subscribers.get(workflowId) ?? new Map() - const notifySubscribers = (handlers: SubscriberHandler[]) => { - handlers.forEach((handler) => { - const args = { - eventType, - workflowId, - transactionId, - isFlowAsync, - step, - response, - result, - errors, - state, - } - const isPromise = "then" in handler - if (isPromise) { - ;(handler(args) as unknown as Promise).catch((e) => { - this.#logger.error(e) - }) - } else { - try { - handler(args) - } catch (e) { - this.#logger.error(e) + const notifySubscribersAsync = async (handlers: SubscriberHandler[]) => { + const promises = handlers.map(async (handler) => { + try { + const result = handler(options) as void | Promise + if (result && typeof result === "object" && "then" in result) { + await (result as Promise) } + } catch (error) { + this.#logger.error(`Subscriber error: ${error}`) } }) + + await promiseAll(promises) } + const tasks: Promise[] = [] + if (transactionId) { const transactionSubscribers = subscribers.get(transactionId) ?? [] - notifySubscribers(transactionSubscribers) + if (transactionSubscribers.length > 0) { + tasks.push(notifySubscribersAsync(transactionSubscribers)) + } - // removes transaction id subscribers on finish if (eventType === "onFinish") { subscribers.delete(transactionId) } } const workflowSubscribers = subscribers.get(AnySubscriber) ?? [] - notifySubscribers(workflowSubscribers) + if (workflowSubscribers.length > 0) { + tasks.push(notifySubscribersAsync(workflowSubscribers)) + } + + await promiseAll(tasks) } private getChannelName(workflowId: string): string { @@ -890,8 +891,6 @@ export class WorkflowOrchestratorService { onStepBegin: async ({ step, transaction }) => { customEventHandlers?.onStepBegin?.({ step, transaction }) - this.activeStepsCount++ - await notify({ eventType: "onStepBegin", step, @@ -911,8 +910,6 @@ export class WorkflowOrchestratorService { response, isFlowAsync: transaction.getFlow().hasAsyncSteps, }) - - this.activeStepsCount-- }, onStepFailure: async ({ step, transaction }) => { const stepName = step.definition.action! @@ -927,8 +924,6 @@ export class WorkflowOrchestratorService { errors, isFlowAsync: transaction.getFlow().hasAsyncSteps, }) - - this.activeStepsCount-- }, onStepAwaiting: async ({ step, transaction }) => { customEventHandlers?.onStepAwaiting?.({ step, transaction }) @@ -938,8 +933,6 @@ export class WorkflowOrchestratorService { step, isFlowAsync: transaction.getFlow().hasAsyncSteps, }) - - this.activeStepsCount-- }, onCompensateStepSuccess: async ({ step, transaction }) => { diff --git a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts index ed085588dd..0bb8456cfd 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts @@ -293,37 +293,24 @@ export class WorkflowsModuleService< }) } - @InjectSharedContext() - async subscribe( - args: { - workflowId: string - transactionId?: string - subscriber: Function - subscriberId?: string - }, - @MedusaContext() context: Context = {} - ) { + async subscribe(args: { + workflowId: string + transactionId?: string + subscriber: Function + subscriberId?: string + }) { return this.workflowOrchestratorService_.subscribe(args as any) } - @InjectSharedContext() - async unsubscribe( - args: { - workflowId: string - transactionId?: string - subscriberOrId: string | Function - }, - @MedusaContext() context: Context = {} - ) { + async unsubscribe(args: { + workflowId: string + transactionId?: string + subscriberOrId: string | Function + }) { return this.workflowOrchestratorService_.unsubscribe(args as any) } - @InjectSharedContext() - async cancel( - workflowId: string, - options: WorkflowOrchestratorCancelOptions, - @MedusaContext() context: Context = {} - ) { + async cancel(workflowId: string, options: WorkflowOrchestratorCancelOptions) { return await this.workflowOrchestratorService_.cancel(workflowId, options) } } diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index dbe27321eb..c4cd1dde29 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -36,6 +36,24 @@ enum JobType { const ONE_HOUR_IN_MS = 1000 * 60 * 60 const REPEATABLE_CLEARER_JOB_ID = "clear-expired-executions" +const invokingStatesSet = new Set([ + TransactionStepState.INVOKING, + TransactionStepState.NOT_STARTED, +]) + +const compensatingStatesSet = new Set([ + TransactionStepState.COMPENSATING, + TransactionStepState.NOT_STARTED, +]) + +function isInvokingState(step: TransactionStep) { + return invokingStatesSet.has(step.invoke?.state) +} + +function isCompensatingState(step: TransactionStep) { + return compensatingStatesSet.has(step.compensate?.state) +} + export class RedisDistributedTransactionStorage implements IDistributedTransactionStorage, IDistributedSchedulerStorage { @@ -100,6 +118,7 @@ export class RedisDistributedTransactionStorage await this.worker?.close() await this.jobWorker?.close() + // Clean up repeatable jobs const repeatableJobs = (await this.cleanerQueue_?.getRepeatableJobs()) ?? [] for (const job of repeatableJobs) { if (job.id === REPEATABLE_CLEARER_JOB_ID) { @@ -780,38 +799,48 @@ export class RedisDistributedTransactionStorage ) } - // Predefined states for quick lookup - const invokingStates = [ - TransactionStepState.INVOKING, - TransactionStepState.NOT_STARTED, - ] + let currentFlowLastInvokingStepIndex = -1 + for (let i = 0; i < currentFlowSteps.length; i++) { + if (isInvokingState(currentFlowSteps[i])) { + currentFlowLastInvokingStepIndex = i + break + } + } - const compensatingStates = [ - TransactionStepState.COMPENSATING, - TransactionStepState.NOT_STARTED, - ] - - const isInvokingState = (step: TransactionStep) => - invokingStates.includes(step.invoke?.state) - - const isCompensatingState = (step: TransactionStep) => - compensatingStates.includes(step.compensate?.state) - - const currentFlowLastInvokingStepIndex = - currentFlowSteps.findIndex(isInvokingState) - - const latestUpdatedFlowLastInvokingStepIndex = !latestUpdatedFlow.steps + let latestUpdatedFlowLastInvokingStepIndex = !latestUpdatedFlow.steps ? 1 // There is no other execution, so the current execution is the latest - : latestUpdatedFlowSteps.findIndex(isInvokingState) + : -1 - const reversedCurrentFlowSteps = [...currentFlowSteps].reverse() - const currentFlowLastCompensatingStepIndex = - reversedCurrentFlowSteps.findIndex(isCompensatingState) + if (latestUpdatedFlow.steps) { + for (let i = 0; i < latestUpdatedFlowSteps.length; i++) { + if (isInvokingState(latestUpdatedFlowSteps[i])) { + latestUpdatedFlowLastInvokingStepIndex = i + break + } + } + } - const reversedLatestUpdatedFlowSteps = [...latestUpdatedFlowSteps].reverse() - const latestUpdatedFlowLastCompensatingStepIndex = !latestUpdatedFlow.steps - ? -1 - : reversedLatestUpdatedFlowSteps.findIndex(isCompensatingState) + let currentFlowLastCompensatingStepIndex = -1 + for (let i = currentFlowSteps.length - 1; i >= 0; i--) { + if (isCompensatingState(currentFlowSteps[i])) { + currentFlowLastCompensatingStepIndex = currentFlowSteps.length - 1 - i + break + } + } + + let latestUpdatedFlowLastCompensatingStepIndex = !latestUpdatedFlow.steps + ? -1 // There is no other execution, so the current execution is the latest + : -1 + + if (latestUpdatedFlow.steps) { + for (let i = latestUpdatedFlowSteps.length - 1; i >= 0; i--) { + if (isCompensatingState(latestUpdatedFlowSteps[i])) { + latestUpdatedFlowLastCompensatingStepIndex = + latestUpdatedFlowSteps.length - 1 - i + break + } + } + } const isLatestExecutionFinishedIndex = -1 const invokeShouldBeSkipped =