From 28958b2e26e5c10dffa21d2c510268955f67404c Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Fri, 18 Apr 2025 10:35:23 +0200 Subject: [PATCH] Chore/orchestration storage improvements (#12178) **What** Cleanup and improve workflow storage utility --- .../integration-tests/__tests__/index.spec.ts | 60 +++++-- .../utils/workflow-orchestrator-storage.ts | 162 +++++++++-------- .../utils/workflow-orchestrator-storage.ts | 170 ++++++++++-------- 3 files changed, 228 insertions(+), 164 deletions(-) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index f14646b52c..8eae13339b 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -451,6 +451,15 @@ moduleIntegrationTestRunner({ beforeEach(() => { jest.useFakeTimers() jest.clearAllMocks() + + // Register test-value in the container for all tests + const sharedContainer = + workflowOrcModule["workflowOrchestratorService_"]["container_"] + + sharedContainer.register( + "test-value", + asFunction(() => "test") + ) }) afterEach(() => { @@ -459,44 +468,56 @@ moduleIntegrationTestRunner({ it("should execute a scheduled workflow", async () => { const spy = createScheduled("standard", { - cron: "0 0 * * * *", // Jest issue: clearExpiredExecutions runs every hour, this is scheduled to run every hour to match the number of calls + cron: "0 0 * * * *", // Runs at the start of every hour }) + expect(spy).toHaveBeenCalledTimes(0) + await jest.runOnlyPendingTimersAsync() + expect(spy).toHaveBeenCalledTimes(1) await jest.runOnlyPendingTimersAsync() + expect(spy).toHaveBeenCalledTimes(2) }) it("should stop executions after the set number of executions", async () => { const spy = await createScheduled("num-executions", { - cron: "* * * * * *", + interval: 1000, numberOfExecutions: 2, }) - await jest.runOnlyPendingTimersAsync() + expect(spy).toHaveBeenCalledTimes(0) + + await jest.advanceTimersByTimeAsync(1100) + expect(spy).toHaveBeenCalledTimes(1) - await jest.runOnlyPendingTimersAsync() + await jest.advanceTimersByTimeAsync(1100) + expect(spy).toHaveBeenCalledTimes(2) - await jest.runOnlyPendingTimersAsync() + await jest.advanceTimersByTimeAsync(1100) + expect(spy).toHaveBeenCalledTimes(2) }) it("should remove scheduled workflow if workflow no longer exists", async () => { const spy = await createScheduled("remove-scheduled", { - cron: "* * * * * *", + interval: 1000, }) const logSpy = jest.spyOn(console, "warn") - await jest.runOnlyPendingTimersAsync() + expect(spy).toHaveBeenCalledTimes(0) + + await jest.advanceTimersByTimeAsync(1100) + expect(spy).toHaveBeenCalledTimes(1) WorkflowManager["workflows"].delete("remove-scheduled") - await jest.runOnlyPendingTimersAsync() + await jest.advanceTimersByTimeAsync(1100) expect(spy).toHaveBeenCalledTimes(1) expect(logSpy).toHaveBeenCalledWith( "Tried to execute a scheduled workflow with ID remove-scheduled that does not exist, removing it from the scheduler." @@ -504,22 +525,23 @@ moduleIntegrationTestRunner({ }) it("the scheduled workflow should have access to the shared container", async () => { - const sharedContainer = - workflowOrcModule["workflowOrchestratorService_"]["container_"] - - sharedContainer.register( - "test-value", - asFunction(() => "test") - ) - const spy = await createScheduled("shared-container-job", { - cron: "* * * * * *", + interval: 1000, + numberOfExecutions: 1, }) - await jest.runOnlyPendingTimersAsync() - expect(spy).toHaveBeenCalledTimes(1) + + const initialCallCount = spy.mock.calls.length + + await jest.advanceTimersByTimeAsync(1100) + + expect(spy).toHaveBeenCalledTimes(initialCallCount + 1) expect(spy).toHaveReturnedWith( expect.objectContaining({ output: { testValue: "test" } }) ) + + await jest.advanceTimersByTimeAsync(1100) + + expect(spy).toHaveBeenCalledTimes(initialCallCount + 1) }) it("should fetch an idempotent workflow after its completion", async () => { diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index fbedbdcc73..dddef0ec99 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -173,19 +173,27 @@ export class InMemoryDistributedTransactionStorage options, }) - Object.assign(data, { - retention_time: retentionTime, - }) - this.storage.set(key, data) - - if (hasFinished && !retentionTime && !idempotent) { - await this.deleteFromDb(data) - } else { - await this.saveToDb(data, retentionTime) + // Only store retention time if it's provided + if (retentionTime) { + Object.assign(data, { + retention_time: retentionTime, + }) } + // Store in memory + this.storage.set(key, data) + + // Optimize DB operations - only perform when necessary if (hasFinished) { + if (!retentionTime && !idempotent) { + await this.deleteFromDb(data) + } else { + await this.saveToDb(data, retentionTime) + } + this.storage.delete(key) + } else { + await this.saveToDb(data, retentionTime) } } @@ -198,11 +206,7 @@ export class InMemoryDistributedTransactionStorage key: string options?: TransactionOptions }) { - let isInitialCheckpoint = false - - if (data.flow.state === TransactionState.NOT_STARTED) { - isInitialCheckpoint = true - } + const isInitialCheckpoint = data.flow.state === TransactionState.NOT_STARTED /** * In case many execution can succeed simultaneously, we need to ensure that the latest @@ -223,49 +227,45 @@ export class InMemoryDistributedTransactionStorage throw new SkipExecutionError("Already finished by another execution") } - const currentFlowLastInvokingStepIndex = Object.values( - currentFlow.steps - ).findIndex((step) => { - return [ - TransactionStepState.INVOKING, - TransactionStepState.NOT_STARTED, - ].includes(step.invoke?.state) - }) + const currentFlowSteps = Object.values(currentFlow.steps || {}) + const latestUpdatedFlowSteps = latestUpdatedFlow.steps + ? Object.values( + latestUpdatedFlow.steps as Record + ) + : [] + + // Predefined states for quick lookup + const invokingStates = [ + TransactionStepState.INVOKING, + TransactionStepState.NOT_STARTED, + ] + + const compensatingStates = [ + TransactionStepState.COMPENSATING, + TransactionStepState.NOT_STARTED, + ] + + const isInvokingState = (step: TransactionStep) => + invokingStates.includes(step.invoke?.state) + + const isCompensatingState = (step: TransactionStep) => + compensatingStates.includes(step.compensate?.state) + + const currentFlowLastInvokingStepIndex = + currentFlowSteps.findIndex(isInvokingState) const latestUpdatedFlowLastInvokingStepIndex = !latestUpdatedFlow.steps ? 1 // There is no other execution, so the current execution is the latest - : Object.values( - (latestUpdatedFlow.steps as Record) ?? {} - ).findIndex((step) => { - return [ - TransactionStepState.INVOKING, - TransactionStepState.NOT_STARTED, - ].includes(step.invoke?.state) - }) + : latestUpdatedFlowSteps.findIndex(isInvokingState) - const currentFlowLastCompensatingStepIndex = Object.values( - currentFlow.steps - ) - .reverse() - .findIndex((step) => { - return [ - TransactionStepState.COMPENSATING, - TransactionStepState.NOT_STARTED, - ].includes(step.compensate?.state) - }) + const reversedCurrentFlowSteps = [...currentFlowSteps].reverse() + const currentFlowLastCompensatingStepIndex = + reversedCurrentFlowSteps.findIndex(isCompensatingState) + const reversedLatestUpdatedFlowSteps = [...latestUpdatedFlowSteps].reverse() const latestUpdatedFlowLastCompensatingStepIndex = !latestUpdatedFlow.steps ? -1 // There is no other execution, so the current execution is the latest - : Object.values( - (latestUpdatedFlow.steps as Record) ?? {} - ) - .reverse() - .findIndex((step) => { - return [ - TransactionStepState.COMPENSATING, - TransactionStepState.NOT_STARTED, - ].includes(step.compensate?.state) - }) + : reversedLatestUpdatedFlowSteps.findIndex(isCompensatingState) const isLatestExecutionFinishedIndex = -1 const invokeShouldBeSkipped = @@ -282,20 +282,29 @@ export class InMemoryDistributedTransactionStorage latestUpdatedFlowLastCompensatingStepIndex !== isLatestExecutionFinishedIndex + const isCompensatingMismatch = + latestUpdatedFlow.state === TransactionState.COMPENSATING && + ![TransactionState.REVERTED, TransactionState.FAILED].includes( + currentFlow.state + ) && + currentFlow.state !== latestUpdatedFlow.state + + const isRevertedMismatch = + latestUpdatedFlow.state === TransactionState.REVERTED && + currentFlow.state !== TransactionState.REVERTED + + const isFailedMismatch = + latestUpdatedFlow.state === TransactionState.FAILED && + currentFlow.state !== TransactionState.FAILED + if ( (data.flow.state !== TransactionState.COMPENSATING && invokeShouldBeSkipped) || (data.flow.state === TransactionState.COMPENSATING && compensateShouldBeSkipped) || - (latestUpdatedFlow.state === TransactionState.COMPENSATING && - ![TransactionState.REVERTED, TransactionState.FAILED].includes( - currentFlow.state - ) && - currentFlow.state !== latestUpdatedFlow.state) || - (latestUpdatedFlow.state === TransactionState.REVERTED && - currentFlow.state !== TransactionState.REVERTED) || - (latestUpdatedFlow.state === TransactionState.FAILED && - currentFlow.state !== TransactionState.FAILED) + isCompensatingMismatch || + isRevertedMismatch || + isFailedMismatch ) { throw new SkipExecutionError("Already finished by another execution") } @@ -428,12 +437,13 @@ export class InMemoryDistributedTransactionStorage typeof jobDefinition === "string" ? jobDefinition : jobDefinition.jobId // In order to ensure that the schedule configuration is always up to date, we first cancel an existing job, if there was one - // any only then we add the new one. await this.remove(jobId) + let expression: CronExpression | number let nextExecution = parseNextExecution(schedulerOptions) if ("cron" in schedulerOptions) { + // Cache the parsed expression to avoid repeated parsing expression = parseExpression(schedulerOptions.cron) } else if ("interval" in schedulerOptions) { expression = schedulerOptions.interval @@ -448,6 +458,9 @@ export class InMemoryDistributedTransactionStorage this.jobHandler(jobId) }, nextExecution) + // Set the timer's unref to prevent it from keeping the process alive + timer.unref() + this.scheduled.set(jobId, { timer, expression, @@ -488,23 +501,28 @@ export class InMemoryDistributedTransactionStorage const nextExecution = parseNextExecution(job.expression) - const timer = setTimeout(async () => { - this.jobHandler(jobId) - }, nextExecution) - - this.scheduled.set(jobId, { - timer, - expression: job.expression, - numberOfExecutions: (job.numberOfExecutions ?? 0) + 1, - config: job.config, - }) - try { - // With running the job after setting a new timer we basically allow for concurrent runs, unless we add idempotency keys once they are supported. await this.workflowOrchestratorService_.run(jobId, { logOnError: true, throwOnError: false, }) + + // Only schedule the next job execution after the current one completes successfully + const timer = setTimeout(async () => { + setImmediate(() => { + this.jobHandler(jobId) + }) + }, nextExecution) + + // Prevent timer from keeping the process alive + timer.unref() + + this.scheduled.set(jobId, { + timer, + expression: job.expression, + numberOfExecutions: (job.numberOfExecutions ?? 0) + 1, + config: job.config, + }) } catch (e) { if (e instanceof MedusaError && e.type === MedusaError.Types.NOT_FOUND) { this.logger_?.warn( diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index fcfd70789d..f421dcba2a 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -99,8 +99,7 @@ export class RedisDistributedTransactionStorage ] const workerOptions = { - connection: - this.redisWorkerConnection /*, runRetryDelay: 100000 for tests */, + connection: this.redisWorkerConnection, } // TODO: Remove this once we have released to all clients (Added: v2.6+) @@ -229,9 +228,11 @@ export class RedisDistributedTransactionStorage const data = await this.redisClient.get(key) if (data) { - return JSON.parse(data) + const parsedData = JSON.parse(data) as TransactionCheckpoint + return parsedData } + // Not in Redis either - check database if needed const { idempotent, store, retentionTime } = options ?? {} if (!idempotent && !(store && isDefined(retentionTime))) { return @@ -251,26 +252,46 @@ export class RedisDistributedTransactionStorage .catch(() => undefined) if (trx) { - return { + const checkpointData = { flow: trx.execution, context: trx.context.data, errors: trx.context.errors, } + + return checkpointData } return } async list(): Promise { - const keys = await this.redisClient.keys( - DistributedTransaction.keyPrefix + ":*" - ) - const transactions: any[] = [] - for (const key of keys) { - const data = await this.redisClient.get(key) - if (data) { - transactions.push(JSON.parse(data)) + // Replace Redis KEYS with SCAN to avoid blocking the server + const transactions: TransactionCheckpoint[] = [] + let cursor = "0" + + do { + // Use SCAN instead of KEYS to avoid blocking Redis + const [nextCursor, keys] = await this.redisClient.scan( + cursor, + "MATCH", + DistributedTransaction.keyPrefix + ":*", + "COUNT", + 100 // Fetch in reasonable batches + ) + + cursor = nextCursor + + if (keys.length) { + // Use mget to batch retrieve multiple keys at once + const values = await this.redisClient.mget(keys) + + for (const value of values) { + if (value) { + transactions.push(JSON.parse(value)) + } + } } - } + } while (cursor !== "0") + return transactions } @@ -298,30 +319,32 @@ export class RedisDistributedTransactionStorage options, }) - if (hasFinished) { + if (hasFinished && retentionTime) { Object.assign(data, { retention_time: retentionTime, }) } + // Prepare operations to be executed in batch or pipeline const stringifiedData = JSON.stringify(data) + const pipeline = this.redisClient.pipeline() + // Execute Redis operations if (!hasFinished) { if (ttl) { - await this.redisClient.set(key, stringifiedData, "EX", ttl) + pipeline.set(key, stringifiedData, "EX", ttl) } else { - await this.redisClient.set(key, stringifiedData) + pipeline.set(key, stringifiedData) } - } - - if (hasFinished && !retentionTime && !idempotent) { - await this.deleteFromDb(data) } else { - await this.saveToDb(data, retentionTime) + pipeline.unlink(key) } - if (hasFinished) { - await this.redisClient.unlink(key) + // Database operations + if (hasFinished && !retentionTime && !idempotent) { + await promiseAll([pipeline.exec(), this.deleteFromDb(data)]) + } else { + await promiseAll([pipeline.exec(), this.saveToDb(data, retentionTime)]) } } @@ -517,11 +540,7 @@ export class RedisDistributedTransactionStorage key: string options?: TransactionOptions }) { - let isInitialCheckpoint = false - - if (data.flow.state === TransactionState.NOT_STARTED) { - isInitialCheckpoint = true - } + const isInitialCheckpoint = data.flow.state === TransactionState.NOT_STARTED /** * In case many execution can succeed simultaneously, we need to ensure that the latest @@ -542,49 +561,45 @@ export class RedisDistributedTransactionStorage throw new SkipExecutionError("Already finished by another execution") } - const currentFlowLastInvokingStepIndex = Object.values( - currentFlow.steps - ).findIndex((step) => { - return [ - TransactionStepState.INVOKING, - TransactionStepState.NOT_STARTED, - ].includes(step.invoke?.state) - }) + const currentFlowSteps = Object.values(currentFlow.steps || {}) + const latestUpdatedFlowSteps = latestUpdatedFlow.steps + ? Object.values( + latestUpdatedFlow.steps as Record + ) + : [] + + // Predefined states for quick lookup + const invokingStates = [ + TransactionStepState.INVOKING, + TransactionStepState.NOT_STARTED, + ] + + const compensatingStates = [ + TransactionStepState.COMPENSATING, + TransactionStepState.NOT_STARTED, + ] + + const isInvokingState = (step: TransactionStep) => + invokingStates.includes(step.invoke?.state) + + const isCompensatingState = (step: TransactionStep) => + compensatingStates.includes(step.compensate?.state) + + const currentFlowLastInvokingStepIndex = + currentFlowSteps.findIndex(isInvokingState) const latestUpdatedFlowLastInvokingStepIndex = !latestUpdatedFlow.steps ? 1 // There is no other execution, so the current execution is the latest - : Object.values( - (latestUpdatedFlow.steps as Record) ?? {} - ).findIndex((step) => { - return [ - TransactionStepState.INVOKING, - TransactionStepState.NOT_STARTED, - ].includes(step.invoke?.state) - }) + : latestUpdatedFlowSteps.findIndex(isInvokingState) - const currentFlowLastCompensatingStepIndex = Object.values( - currentFlow.steps - ) - .reverse() - .findIndex((step) => { - return [ - TransactionStepState.COMPENSATING, - TransactionStepState.NOT_STARTED, - ].includes(step.compensate?.state) - }) + const reversedCurrentFlowSteps = [...currentFlowSteps].reverse() + const currentFlowLastCompensatingStepIndex = + reversedCurrentFlowSteps.findIndex(isCompensatingState) + const reversedLatestUpdatedFlowSteps = [...latestUpdatedFlowSteps].reverse() const latestUpdatedFlowLastCompensatingStepIndex = !latestUpdatedFlow.steps ? -1 - : Object.values( - (latestUpdatedFlow.steps as Record) ?? {} - ) - .reverse() - .findIndex((step) => { - return [ - TransactionStepState.COMPENSATING, - TransactionStepState.NOT_STARTED, - ].includes(step.compensate?.state) - }) + : reversedLatestUpdatedFlowSteps.findIndex(isCompensatingState) const isLatestExecutionFinishedIndex = -1 const invokeShouldBeSkipped = @@ -601,20 +616,29 @@ export class RedisDistributedTransactionStorage latestUpdatedFlowLastCompensatingStepIndex !== isLatestExecutionFinishedIndex + const isCompensatingMismatch = + latestUpdatedFlow.state === TransactionState.COMPENSATING && + ![TransactionState.REVERTED, TransactionState.FAILED].includes( + currentFlow.state + ) && + currentFlow.state !== latestUpdatedFlow.state + + const isRevertedMismatch = + latestUpdatedFlow.state === TransactionState.REVERTED && + currentFlow.state !== TransactionState.REVERTED + + const isFailedMismatch = + latestUpdatedFlow.state === TransactionState.FAILED && + currentFlow.state !== TransactionState.FAILED + if ( (data.flow.state !== TransactionState.COMPENSATING && invokeShouldBeSkipped) || (data.flow.state === TransactionState.COMPENSATING && compensateShouldBeSkipped) || - (latestUpdatedFlow.state === TransactionState.COMPENSATING && - ![TransactionState.REVERTED, TransactionState.FAILED].includes( - currentFlow.state - ) && - currentFlow.state !== latestUpdatedFlow.state) || - (latestUpdatedFlow.state === TransactionState.REVERTED && - currentFlow.state !== TransactionState.REVERTED) || - (latestUpdatedFlow.state === TransactionState.FAILED && - currentFlow.state !== TransactionState.FAILED) + isCompensatingMismatch || + isRevertedMismatch || + isFailedMismatch ) { throw new SkipExecutionError("Already finished by another execution") }