fix(workflow-engine-*): Cleanup expired executions and reduce redis storage usage (#12795)

This commit is contained in:
Adrien de Peretti
2025-06-24 13:32:10 +02:00
committed by GitHub
parent c0807f5496
commit 316a325b63
14 changed files with 620 additions and 392 deletions

View File

@@ -5,11 +5,19 @@ import {
StepResponse,
} from "@medusajs/framework/workflows-sdk"
export const createScheduled = (name: string, schedule?: SchedulerOptions) => {
export const createScheduled = (
name: string,
next: () => void,
schedule?: SchedulerOptions
) => {
const workflowScheduledStepInvoke = jest.fn((input, { container }) => {
return new StepResponse({
testValue: container.resolve("test-value"),
})
try {
return new StepResponse({
testValue: container.resolve("test-value", { allowUnregistered: true }),
})
} finally {
next()
}
})
const step = createStep("step_1", workflowScheduledStepInvoke)

View File

@@ -6,9 +6,11 @@ import {
import {
Context,
IWorkflowEngineService,
Logger,
RemoteQueryFunction,
} from "@medusajs/framework/types"
import {
ContainerRegistrationKeys,
Module,
Modules,
promiseAll,
@@ -41,6 +43,7 @@ import {
workflowEventGroupIdStep2Mock,
} from "../__fixtures__/workflow_event_group_id"
import { createScheduled } from "../__fixtures__/workflow_scheduled"
import { container, MedusaContainer } from "@medusajs/framework"
jest.setTimeout(60000)
@@ -54,15 +57,44 @@ const failTrap = (done) => {
}, 5000)
}
function times(num) {
let resolver
let counter = 0
const promise = new Promise((resolve) => {
resolver = resolve
})
return {
next: () => {
counter += 1
if (counter === num) {
resolver()
}
},
// Force resolution after 10 seconds to prevent infinite awaiting
promise: Promise.race([
promise,
new Promise((_, reject) => {
setTimeoutSync(
() => reject("times has not been resolved after 10 seconds."),
10000
)
}),
]),
}
}
moduleIntegrationTestRunner<IWorkflowEngineService>({
moduleName: Modules.WORKFLOW_ENGINE,
resolve: __dirname + "/../..",
testSuite: ({ service: workflowOrcModule, medusaApp }) => {
describe("Workflow Orchestrator module", function () {
let query: RemoteQueryFunction
let sharedContainer_: MedusaContainer
beforeEach(() => {
query = medusaApp.query
sharedContainer_ = medusaApp.sharedContainer
})
it(`should export the appropriate linkable configuration`, () => {
@@ -797,7 +829,6 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
describe("Scheduled workflows", () => {
beforeEach(() => {
jest.clearAllMocks()
jest.useFakeTimers()
// Register test-value in the container for all tests
const sharedContainer =
@@ -809,62 +840,48 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
)
})
afterEach(() => {
jest.useRealTimers()
})
it("should execute a scheduled workflow", async () => {
const spy = createScheduled("standard", {
cron: "0 0 * * * *", // Runs at the start of every hour
})
expect(spy).toHaveBeenCalledTimes(0)
await jest.runOnlyPendingTimersAsync()
expect(spy).toHaveBeenCalledTimes(1)
await jest.runOnlyPendingTimersAsync()
const wait = times(2)
const spy = createScheduled("standard", wait.next)
await wait.promise
expect(spy).toHaveBeenCalledTimes(2)
WorkflowManager.unregister("standard")
})
it("should stop executions after the set number of executions", async () => {
const spy = await createScheduled("num-executions", {
const wait = times(2)
const spy = createScheduled("num-executions", wait.next, {
interval: 1000,
numberOfExecutions: 2,
})
expect(spy).toHaveBeenCalledTimes(0)
await jest.advanceTimersByTimeAsync(1100)
expect(spy).toHaveBeenCalledTimes(1)
await jest.advanceTimersByTimeAsync(1100)
await wait.promise
expect(spy).toHaveBeenCalledTimes(2)
await jest.advanceTimersByTimeAsync(1100)
// Make sure that on the next tick it doesn't execute again
await setTimeoutPromise(1100)
expect(spy).toHaveBeenCalledTimes(2)
WorkflowManager.unregister("num-execution")
})
it("should remove scheduled workflow if workflow no longer exists", async () => {
const spy = await createScheduled("remove-scheduled", {
const wait = times(1)
const logger = sharedContainer_.resolve<Logger>(
ContainerRegistrationKeys.LOGGER
)
const spy = createScheduled("remove-scheduled", wait.next, {
interval: 1000,
})
const logSpy = jest.spyOn(console, "warn")
expect(spy).toHaveBeenCalledTimes(0)
await jest.advanceTimersByTimeAsync(1100)
const logSpy = jest.spyOn(logger, "warn")
await wait.promise
expect(spy).toHaveBeenCalledTimes(1)
WorkflowManager["workflows"].delete("remove-scheduled")
await jest.advanceTimersByTimeAsync(1100)
await setTimeoutPromise(1100)
expect(spy).toHaveBeenCalledTimes(1)
expect(logSpy).toHaveBeenCalledWith(
"Tried to execute a scheduled workflow with ID remove-scheduled that does not exist, removing it from the scheduler."
@@ -872,23 +889,20 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
it("the scheduled workflow should have access to the shared container", async () => {
const spy = await createScheduled("shared-container-job", {
const wait = times(1)
const spy = await createScheduled("shared-container-job", wait.next, {
interval: 1000,
numberOfExecutions: 1,
})
await wait.promise
const initialCallCount = spy.mock.calls.length
expect(spy).toHaveBeenCalledTimes(1)
await jest.advanceTimersByTimeAsync(1100)
expect(spy).toHaveBeenCalledTimes(initialCallCount + 1)
console.log(spy.mock.results)
expect(spy).toHaveReturnedWith(
expect.objectContaining({ output: { testValue: "test" } })
)
await jest.advanceTimersByTimeAsync(1100)
expect(spy).toHaveBeenCalledTimes(initialCallCount + 1)
WorkflowManager.unregister("shared-container-job")
})
it("should fetch an idempotent workflow after its completion", async () => {
@@ -931,6 +945,120 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
expect(executionsListAfter).toHaveLength(1)
})
})
describe("Cleaner job", function () {
it("should remove expired executions of finished workflows and keep the others", async () => {
const doneWorkflowId = "done-workflow-" + ulid()
createWorkflow({ name: doneWorkflowId, retentionTime: 1 }, () => {
return new WorkflowResponse("done")
})
const failingWorkflowId = "failing-workflow-" + ulid()
const failingStep = createStep("failing-step", () => {
throw new Error("I am failing")
})
createWorkflow({ name: failingWorkflowId, retentionTime: 1 }, () => {
failingStep()
})
const revertingStep = createStep(
"reverting-step",
() => {
throw new Error("I am reverting")
},
() => {
return new StepResponse("reverted")
}
)
const revertingWorkflowId = "reverting-workflow-" + ulid()
createWorkflow(
{ name: revertingWorkflowId, retentionTime: 1 },
() => {
revertingStep()
return new WorkflowResponse("reverted")
}
)
const runningWorkflowId = "running-workflow-" + ulid()
const longRunningStep = createStep("long-running-step", async () => {
await setTimeoutPromise(10000)
return new StepResponse("long running finished")
})
createWorkflow({ name: runningWorkflowId, retentionTime: 1 }, () => {
longRunningStep().config({ async: true, backgroundExecution: true })
return new WorkflowResponse("running workflow started")
})
const notExpiredWorkflowId = "not-expired-workflow-" + ulid()
createWorkflow(
{ name: notExpiredWorkflowId, retentionTime: 1000 },
() => {
return new WorkflowResponse("not expired")
}
)
const trx_done = "trx-done-" + ulid()
const trx_failed = "trx-failed-" + ulid()
const trx_reverting = "trx-reverting-" + ulid()
const trx_running = "trx-running-" + ulid()
const trx_not_expired = "trx-not-expired-" + ulid()
// run workflows
await workflowOrcModule.run(doneWorkflowId, {
transactionId: trx_done,
})
await workflowOrcModule.run(failingWorkflowId, {
transactionId: trx_failed,
throwOnError: false,
})
await workflowOrcModule.run(revertingWorkflowId, {
transactionId: trx_reverting,
throwOnError: false,
})
await workflowOrcModule.run(runningWorkflowId, {
transactionId: trx_running,
})
await workflowOrcModule.run(notExpiredWorkflowId, {
transactionId: trx_not_expired,
})
const executions = await workflowOrcModule.listWorkflowExecutions()
expect(executions).toHaveLength(5)
await setTimeoutPromise(2000)
// Manually trigger cleaner
await (workflowOrcModule as any).workflowOrchestratorService_[
"inMemoryDistributedTransactionStorage_"
]["clearExpiredExecutions"]()
let remainingExecutions =
await workflowOrcModule.listWorkflowExecutions()
expect(remainingExecutions).toHaveLength(2)
const remainingTrxIds = remainingExecutions
.map((e) => e.transaction_id)
.sort()
expect(remainingTrxIds).toEqual([trx_not_expired, trx_running].sort())
const notExpiredExec = remainingExecutions.find(
(e) => e.transaction_id === trx_not_expired
)
expect(notExpiredExec?.state).toBe(TransactionState.DONE)
const runningExec = remainingExecutions.find(
(e) => e.transaction_id === trx_running
)
expect(runningExec?.state).toBe(TransactionState.INVOKING)
})
})
})
},
})

View File

@@ -87,6 +87,7 @@ const AnySubscriber = "any"
export class WorkflowOrchestratorService {
private subscribers: Subscribers = new Map()
private container_: MedusaContainer
private inMemoryDistributedTransactionStorage_: InMemoryDistributedTransactionStorage
constructor({
inMemoryDistributedTransactionStorage,
@@ -97,11 +98,21 @@ export class WorkflowOrchestratorService {
sharedContainer: MedusaContainer
}) {
this.container_ = sharedContainer
this.inMemoryDistributedTransactionStorage_ =
inMemoryDistributedTransactionStorage
inMemoryDistributedTransactionStorage.setWorkflowOrchestratorService(this)
DistributedTransaction.setStorage(inMemoryDistributedTransactionStorage)
WorkflowScheduler.setStorage(inMemoryDistributedTransactionStorage)
}
async onApplicationStart() {
await this.inMemoryDistributedTransactionStorage_.onApplicationStart()
}
async onApplicationShutdown() {
await this.inMemoryDistributedTransactionStorage_.onApplicationShutdown()
}
private async triggerParentStep(transaction, result) {
const metadata = transaction.flow.metadata
const { parentStepIdempotencyKey } = metadata ?? {}

View File

@@ -43,7 +43,6 @@ export class WorkflowsModuleService<
protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService<TWorkflowExecution>
protected workflowOrchestratorService_: WorkflowOrchestratorService
protected manager_: SqlEntityManager
private clearTimeout_: NodeJS.Timeout
constructor(
{
@@ -65,16 +64,10 @@ export class WorkflowsModuleService<
__hooks = {
onApplicationStart: async () => {
await this.clearExpiredExecutions()
this.clearTimeout_ = setInterval(async () => {
try {
await this.clearExpiredExecutions()
} catch {}
}, 1000 * 60 * 60)
await this.workflowOrchestratorService_.onApplicationStart()
},
onApplicationShutdown: async () => {
clearInterval(this.clearTimeout_)
await this.workflowOrchestratorService_.onApplicationShutdown()
},
}
@@ -289,14 +282,6 @@ export class WorkflowsModuleService<
return this.workflowOrchestratorService_.unsubscribe(args as any)
}
private async clearExpiredExecutions() {
return this.manager_.execute(`
DELETE FROM workflow_execution
WHERE retention_time IS NOT NULL AND
updated_at <= (CURRENT_TIMESTAMP - INTERVAL '1 second' * retention_time);
`)
}
@InjectSharedContext()
async cancel<TWorkflow extends string | ReturnWorkflow<any, any, any>>(
workflowIdOrWorkflow: TWorkflow,

View File

@@ -21,9 +21,9 @@ import {
MedusaError,
TransactionState,
TransactionStepState,
isDefined,
isPresent,
} from "@medusajs/framework/utils"
import { raw } from "@mikro-orm/core"
import { WorkflowOrchestratorService } from "@services"
import { type CronExpression, parseExpression } from "cron-parser"
import { WorkflowExecution } from "../models/workflow-execution"
@@ -61,7 +61,8 @@ export class InMemoryDistributedTransactionStorage
private logger_: Logger
private workflowOrchestratorService_: WorkflowOrchestratorService
private storage: Map<string, TransactionCheckpoint> = new Map()
private storage: Map<string, Omit<TransactionCheckpoint, "context">> =
new Map()
private scheduled: Map<
string,
{
@@ -74,6 +75,8 @@ export class InMemoryDistributedTransactionStorage
private retries: Map<string, unknown> = new Map()
private timeouts: Map<string, unknown> = new Map()
private clearTimeout_: NodeJS.Timeout
constructor({
workflowExecutionService,
logger,
@@ -85,6 +88,18 @@ export class InMemoryDistributedTransactionStorage
this.logger_ = logger
}
async onApplicationStart() {
this.clearTimeout_ = setInterval(async () => {
try {
await this.clearExpiredExecutions()
} catch {}
}, 1000 * 60 * 60)
}
async onApplicationShutdown() {
clearInterval(this.clearTimeout_)
}
setWorkflowOrchestratorService(workflowOrchestratorService) {
this.workflowOrchestratorService_ = workflowOrchestratorService
}
@@ -122,17 +137,6 @@ export class InMemoryDistributedTransactionStorage
isCancelling?: boolean
}
): Promise<TransactionCheckpoint | undefined> {
const data = this.storage.get(key)
if (data) {
return data
}
const { idempotent, store, retentionTime } = options ?? {}
if (!idempotent && !(store && isDefined(retentionTime))) {
return
}
const [_, workflowId, transactionId] = key.split(":")
const trx: InferEntityType<typeof WorkflowExecution> | undefined =
await this.workflowExecutionService_
@@ -153,6 +157,7 @@ export class InMemoryDistributedTransactionStorage
.catch(() => undefined)
if (trx) {
const { idempotent } = options ?? {}
const execution = trx.execution as TransactionFlow
if (!idempotent) {
@@ -187,10 +192,6 @@ export class InMemoryDistributedTransactionStorage
return
}
async list(): Promise<TransactionCheckpoint[]> {
return Array.from(this.storage.values())
}
async save(
key: string,
data: TransactionCheckpoint,
@@ -237,7 +238,11 @@ export class InMemoryDistributedTransactionStorage
}
}
this.storage.set(key, data)
const { flow, errors } = data
this.storage.set(key, {
flow,
errors,
})
// Optimize DB operations - only perform when necessary
if (hasFinished) {
@@ -272,14 +277,22 @@ export class InMemoryDistributedTransactionStorage
*/
const currentFlow = data.flow
const getOptions = {
...options,
isCancelling: !!data.flow.cancelledAt,
} as Parameters<typeof this.get>[1]
const rawData = this.storage.get(key)
let data_ = {} as TransactionCheckpoint
if (rawData) {
data_ = rawData as TransactionCheckpoint
} else {
const getOptions = {
...options,
isCancelling: !!data.flow.cancelledAt,
} as Parameters<typeof this.get>[1]
const { flow: latestUpdatedFlow } =
(await this.get(key, getOptions)) ??
({ flow: {} } as { flow: TransactionFlow })
data_ =
(await this.get(key, getOptions)) ??
({ flow: {} } as TransactionCheckpoint)
}
const { flow: latestUpdatedFlow } = data_
if (!isInitialCheckpoint && !isPresent(latestUpdatedFlow)) {
/**
@@ -613,4 +626,25 @@ export class InMemoryDistributedTransactionStorage
throw e
}
}
async clearExpiredExecutions(): Promise<void> {
await this.workflowExecutionService_.delete({
retention_time: {
$ne: null,
},
updated_at: {
$lte: raw(
(alias) =>
`CURRENT_TIMESTAMP - (INTERVAL '1 second' * retention_time)`
),
},
state: {
$in: [
TransactionState.DONE,
TransactionState.FAILED,
TransactionState.REVERTED,
],
},
})
}
}

View File

@@ -1072,7 +1072,6 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
WorkflowManager["workflows"].delete("remove-scheduled")
await setTimeout(1100)
0
expect(spy).toHaveBeenCalledTimes(1)
expect(logSpy).toHaveBeenCalledWith(
"Tried to execute a scheduled workflow with ID remove-scheduled that does not exist, removing it from the scheduler."
@@ -1107,6 +1106,120 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
})
})
describe("Cleaner job", function () {
it("should remove expired executions of finished workflows and keep the others", async () => {
const doneWorkflowId = "done-workflow-" + ulid()
createWorkflow({ name: doneWorkflowId, retentionTime: 1 }, () => {
return new WorkflowResponse("done")
})
const failingWorkflowId = "failing-workflow-" + ulid()
const failingStep = createStep("failing-step", () => {
throw new Error("I am failing")
})
createWorkflow({ name: failingWorkflowId, retentionTime: 1 }, () => {
failingStep()
})
const revertingStep = createStep(
"reverting-step",
() => {
throw new Error("I am reverting")
},
() => {
return new StepResponse("reverted")
}
)
const revertingWorkflowId = "reverting-workflow-" + ulid()
createWorkflow(
{ name: revertingWorkflowId, retentionTime: 1 },
() => {
revertingStep()
return new WorkflowResponse("reverted")
}
)
const runningWorkflowId = "running-workflow-" + ulid()
const longRunningStep = createStep("long-running-step", async () => {
await setTimeout(10000)
return new StepResponse("long running finished")
})
createWorkflow({ name: runningWorkflowId, retentionTime: 1 }, () => {
longRunningStep().config({ async: true, backgroundExecution: true })
return new WorkflowResponse("running workflow started")
})
const notExpiredWorkflowId = "not-expired-workflow-" + ulid()
createWorkflow(
{ name: notExpiredWorkflowId, retentionTime: 1000 },
() => {
return new WorkflowResponse("not expired")
}
)
const trx_done = "trx-done-" + ulid()
const trx_failed = "trx-failed-" + ulid()
const trx_reverting = "trx-reverting-" + ulid()
const trx_running = "trx-running-" + ulid()
const trx_not_expired = "trx-not-expired-" + ulid()
// run workflows
await workflowOrcModule.run(doneWorkflowId, {
transactionId: trx_done,
})
await workflowOrcModule.run(failingWorkflowId, {
transactionId: trx_failed,
throwOnError: false,
})
await workflowOrcModule.run(revertingWorkflowId, {
transactionId: trx_reverting,
throwOnError: false,
})
await workflowOrcModule.run(runningWorkflowId, {
transactionId: trx_running,
})
await workflowOrcModule.run(notExpiredWorkflowId, {
transactionId: trx_not_expired,
})
let executions = await workflowOrcModule.listWorkflowExecutions()
expect(executions).toHaveLength(5)
await setTimeout(2000)
// Manually trigger cleaner
await (workflowOrcModule as any).workflowOrchestratorService_[
"redisDistributedTransactionStorage_"
]["clearExpiredExecutions"]()
let remainingExecutions =
await workflowOrcModule.listWorkflowExecutions()
expect(remainingExecutions).toHaveLength(2)
const remainingTrxIds = remainingExecutions
.map((e) => e.transaction_id)
.sort()
expect(remainingTrxIds).toEqual([trx_not_expired, trx_running].sort())
const notExpiredExec = remainingExecutions.find(
(e) => e.transaction_id === trx_not_expired
)
expect(notExpiredExec?.state).toBe(TransactionState.DONE)
const runningExec = remainingExecutions.find(
(e) => e.transaction_id === trx_running
)
expect(runningExec?.state).toBe(TransactionState.INVOKING)
})
})
})
},
})

View File

@@ -96,7 +96,9 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
if (event.eventType === "onFinish") {
try {
expect(step0InvokeMock).toHaveBeenCalledTimes(1)
expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(1)
expect(
step1InvokeMock.mock.calls.length
).toBeGreaterThanOrEqual(1)
expect(step2InvokeMock).toHaveBeenCalledTimes(1)
expect(transformMock).toHaveBeenCalledTimes(1)

View File

@@ -47,7 +47,6 @@ export class WorkflowsModuleService<
protected workflowOrchestratorService_: WorkflowOrchestratorService
protected redisDisconnectHandler_: () => Promise<void>
protected manager_: SqlEntityManager
private clearTimeout_: NodeJS.Timeout
constructor(
{
@@ -72,13 +71,6 @@ export class WorkflowsModuleService<
__hooks = {
onApplicationStart: async () => {
await this.workflowOrchestratorService_.onApplicationStart()
await this.clearExpiredExecutions()
this.clearTimeout_ = setInterval(async () => {
try {
await this.clearExpiredExecutions()
} catch {}
}, 1000 * 60 * 60)
},
onApplicationPrepareShutdown: async () => {
await this.workflowOrchestratorService_.onApplicationPrepareShutdown()
@@ -86,7 +78,6 @@ export class WorkflowsModuleService<
onApplicationShutdown: async () => {
await this.workflowOrchestratorService_.onApplicationShutdown()
await this.redisDisconnectHandler_()
clearInterval(this.clearTimeout_)
},
}
@@ -301,14 +292,6 @@ export class WorkflowsModuleService<
return this.workflowOrchestratorService_.unsubscribe(args as any)
}
private async clearExpiredExecutions() {
return this.manager_.execute(`
DELETE FROM workflow_execution
WHERE retention_time IS NOT NULL AND
updated_at <= (CURRENT_TIMESTAMP - INTERVAL '1 second' * retention_time);
`)
}
@InjectSharedContext()
async cancel(
workflowId: string,

View File

@@ -1,5 +1,4 @@
import {
DistributedTransaction,
DistributedTransactionType,
IDistributedSchedulerStorage,
IDistributedTransactionStorage,
@@ -15,13 +14,13 @@ import {
} from "@medusajs/framework/orchestration"
import { Logger, ModulesSdkTypes } from "@medusajs/framework/types"
import {
isDefined,
isPresent,
MedusaError,
promiseAll,
TransactionState,
TransactionStepState,
} from "@medusajs/framework/utils"
import { raw } from "@mikro-orm/core"
import { WorkflowOrchestratorService } from "@services"
import { Queue, RepeatOptions, Worker } from "bullmq"
import Redis from "ioredis"
@@ -33,6 +32,9 @@ enum JobType {
TRANSACTION_TIMEOUT = "transaction_timeout",
}
const ONE_HOUR_IN_MS = 1000 * 60 * 60
const REPEATABLE_CLEARER_JOB_ID = "clear-expired-executions"
export class RedisDistributedTransactionStorage
implements IDistributedTransactionStorage, IDistributedSchedulerStorage
{
@@ -48,6 +50,9 @@ export class RedisDistributedTransactionStorage
private jobQueue?: Queue
private worker: Worker
private jobWorker?: Worker
private cleanerQueueName: string
private cleanerWorker_: Worker
private cleanerQueue_?: Queue
#isWorkerMode: boolean = false
@@ -72,6 +77,7 @@ export class RedisDistributedTransactionStorage
this.logger_ = logger
this.redisClient = redisConnection
this.redisWorkerConnection = redisWorkerConnection
this.cleanerQueueName = "workflows-cleaner"
this.queueName = redisQueueName
this.jobQueueName = redisJobQueueName
this.queue = new Queue(redisQueueName, { connection: this.redisClient })
@@ -80,6 +86,11 @@ export class RedisDistributedTransactionStorage
connection: this.redisClient,
})
: undefined
this.cleanerQueue_ = isWorkerMode
? new Queue(this.cleanerQueueName, {
connection: this.redisClient,
})
: undefined
this.#isWorkerMode = isWorkerMode
}
@@ -87,11 +98,21 @@ export class RedisDistributedTransactionStorage
// Close worker gracefully, i.e. wait for the current jobs to finish
await this.worker?.close()
await this.jobWorker?.close()
const repeatableJobs = (await this.cleanerQueue_?.getRepeatableJobs()) ?? []
for (const job of repeatableJobs) {
if (job.id === REPEATABLE_CLEARER_JOB_ID) {
await this.cleanerQueue_?.removeRepeatableByKey(job.key)
}
}
await this.cleanerWorker_?.close()
}
async onApplicationShutdown() {
await this.queue?.close()
await this.jobQueue?.close()
await this.cleanerQueue_?.close()
}
async onApplicationStart() {
@@ -151,6 +172,27 @@ export class RedisDistributedTransactionStorage
},
workerOptions
)
this.cleanerWorker_ = new Worker(
this.cleanerQueueName,
async () => {
await this.clearExpiredExecutions()
},
{ connection: this.redisClient }
)
await this.cleanerQueue_?.add(
"cleaner",
{},
{
repeat: {
every: ONE_HOUR_IN_MS,
},
jobId: REPEATABLE_CLEARER_JOB_ID,
removeOnComplete: true,
removeOnFail: true,
}
)
}
}
@@ -230,19 +272,6 @@ export class RedisDistributedTransactionStorage
key: string,
options?: TransactionOptions & { isCancelling?: boolean }
): Promise<TransactionCheckpoint | undefined> {
const data = await this.redisClient.get(key)
if (data) {
const parsedData = JSON.parse(data) as TransactionCheckpoint
return parsedData
}
// Not in Redis either - check database if needed
const { idempotent, store, retentionTime } = options ?? {}
if (!idempotent && !(store && isDefined(retentionTime))) {
return
}
const [_, workflowId, transactionId] = key.split(":")
const trx = await this.workflowExecutionService_
.list(
@@ -262,6 +291,7 @@ export class RedisDistributedTransactionStorage
.catch(() => undefined)
if (trx) {
const { idempotent } = options ?? {}
const execution = trx.execution as TransactionFlow
if (!idempotent) {
@@ -296,38 +326,6 @@ export class RedisDistributedTransactionStorage
return
}
async list(): Promise<TransactionCheckpoint[]> {
// Replace Redis KEYS with SCAN to avoid blocking the server
const transactions: TransactionCheckpoint[] = []
let cursor = "0"
do {
// Use SCAN instead of KEYS to avoid blocking Redis
const [nextCursor, keys] = await this.redisClient.scan(
cursor,
"MATCH",
DistributedTransaction.keyPrefix + ":*",
"COUNT",
100 // Fetch in reasonable batches
)
cursor = nextCursor
if (keys.length) {
// Use mget to batch retrieve multiple keys at once
const values = await this.redisClient.mget(keys)
for (const value of values) {
if (value) {
transactions.push(JSON.parse(value))
}
}
}
} while (cursor !== "0")
return transactions
}
async save(
key: string,
data: TransactionCheckpoint,
@@ -364,7 +362,11 @@ export class RedisDistributedTransactionStorage
const shouldSetNX = isNotStarted && isManualTransactionId
// Prepare operations to be executed in batch or pipeline
const stringifiedData = JSON.stringify(data)
const data_ = {
errors: data.errors,
flow: data.flow,
}
const stringifiedData = JSON.stringify(data_)
const pipeline = this.redisClient.pipeline()
// Execute Redis operations
@@ -614,14 +616,22 @@ export class RedisDistributedTransactionStorage
*/
const currentFlow = data.flow
const getOptions = {
...options,
isCancelling: !!data.flow.cancelledAt,
} as Parameters<typeof this.get>[1]
const rawData = await this.redisClient.get(key)
let data_ = {} as TransactionCheckpoint
if (rawData) {
data_ = JSON.parse(rawData)
} else {
const getOptions = {
...options,
isCancelling: !!data.flow.cancelledAt,
} as Parameters<typeof this.get>[1]
const { flow: latestUpdatedFlow } =
(await this.get(key, getOptions)) ??
({ flow: {} } as { flow: TransactionFlow })
data_ =
(await this.get(key, getOptions)) ??
({ flow: {} } as TransactionCheckpoint)
}
const { flow: latestUpdatedFlow } = data_
if (!isInitialCheckpoint && !isPresent(latestUpdatedFlow)) {
/**
@@ -728,4 +738,25 @@ export class RedisDistributedTransactionStorage
throw new SkipExecutionError("Already finished by another execution")
}
}
async clearExpiredExecutions() {
await this.workflowExecutionService_.delete({
retention_time: {
$ne: null,
},
updated_at: {
$lte: raw(
(alias) =>
`CURRENT_TIMESTAMP - (INTERVAL '1 second' * "retention_time")`
),
},
state: {
$in: [
TransactionState.DONE,
TransactionState.FAILED,
TransactionState.REVERTED,
],
},
})
}
}