diff --git a/.changeset/nice-ads-taste.md b/.changeset/nice-ads-taste.md new file mode 100644 index 0000000000..828baed535 --- /dev/null +++ b/.changeset/nice-ads-taste.md @@ -0,0 +1,6 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +--- + +chore(medusa): remove expired workflow executions diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index e2fa4d527a..e52ed640d7 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -341,7 +341,9 @@ moduleIntegrationTestRunner({ }) it("should execute a scheduled workflow", async () => { - const spy = createScheduled("standard") + const spy = createScheduled("standard", { + cron: "0 0 * * * *", // Jest issue: clearExpiredExecutions runs every hour, this is scheduled to run every hour to match the number of calls + }) await jest.runOnlyPendingTimersAsync() expect(spy).toHaveBeenCalledTimes(1) diff --git a/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json b/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json index 0c8c584c91..e52c48397f 100644 --- a/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json +++ b/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json @@ -69,6 +69,15 @@ ], "mappedType": "enum" }, + "retention_time": { + "name": "retention_time", + "type": "integer", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "mappedType": "integer" + }, "created_at": { "name": "created_at", "type": "timestamptz", @@ -109,6 +118,7 @@ "keyName": "IDX_workflow_execution_deleted_at", "columnNames": [], "composite": false, + "constraint": false, "primary": false, "unique": false, "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_deleted_at\" ON \"workflow_execution\" (deleted_at) WHERE deleted_at IS NULL" @@ -117,22 +127,16 @@ "keyName": "IDX_workflow_execution_id", "columnNames": [], "composite": false, + "constraint": false, "primary": false, "unique": false, "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_id\" ON \"workflow_execution\" (id) WHERE deleted_at IS NULL" }, - { - "keyName": "IDX_workflow_execution_workflow_id_transaction_id_unique", - "columnNames": [], - "composite": false, - "primary": false, - "unique": false, - "expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id_unique\" ON \"workflow_execution\" (workflow_id, transaction_id) WHERE deleted_at IS NULL" - }, { "keyName": "IDX_workflow_execution_workflow_id", "columnNames": [], "composite": false, + "constraint": false, "primary": false, "unique": false, "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id\" ON \"workflow_execution\" (workflow_id) WHERE deleted_at IS NULL" @@ -141,14 +145,25 @@ "keyName": "IDX_workflow_execution_transaction_id", "columnNames": [], "composite": false, + "constraint": false, "primary": false, "unique": false, "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_transaction_id\" ON \"workflow_execution\" (transaction_id) WHERE deleted_at IS NULL" }, + { + "keyName": "IDX_workflow_execution_workflow_id_transaction_id_unique", + "columnNames": [], + "composite": false, + "constraint": false, + "primary": false, + "unique": false, + "expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id_unique\" ON \"workflow_execution\" (workflow_id, transaction_id) WHERE deleted_at IS NULL" + }, { "keyName": "IDX_workflow_execution_state", "columnNames": [], "composite": false, + "constraint": false, "primary": false, "unique": false, "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state\" ON \"workflow_execution\" (state) WHERE deleted_at IS NULL" @@ -160,12 +175,15 @@ "transaction_id" ], "composite": true, + "constraint": true, "primary": true, "unique": true } ], "checks": [], - "foreignKeys": {} + "foreignKeys": {}, + "nativeEnums": {} } - ] + ], + "nativeEnums": {} } diff --git a/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250128174331.ts b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250128174331.ts new file mode 100644 index 0000000000..5ae2822682 --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250128174331.ts @@ -0,0 +1,25 @@ +import { Migration } from "@mikro-orm/migrations" + +export class Migration20250128174331 extends Migration { + override async up(): Promise { + this.addSql( + `alter table if exists "workflow_execution" add column if not exists "retention_time" integer null;` + ) + this.addSql(` + UPDATE workflow_execution + SET retention_time = ( + SELECT COALESCE( + (execution->'options'->>'retentionTime')::integer, + 0 + ) + ) + WHERE execution->'options' ? 'retentionTime'; + `) + } + + override async down(): Promise { + this.addSql( + `alter table if exists "workflow_execution" drop column if exists "retention_time";` + ) + } +} diff --git a/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts b/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts index 8557e77ae6..a533a54094 100644 --- a/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts +++ b/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts @@ -9,6 +9,7 @@ export const WorkflowExecution = model execution: model.json().nullable(), context: model.json().nullable(), state: model.enum(TransactionState), + retention_time: model.number().nullable(), }) .indexes([ { diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts index 3b67e36d2a..5d89c45767 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts @@ -3,7 +3,6 @@ import { DAL, InferEntityType, InternalModuleDeclaration, - MedusaContainer, ModulesSdkTypes, WorkflowsSdkTypes, } from "@medusajs/framework/types" @@ -16,10 +15,12 @@ import type { ReturnWorkflow, UnwrapWorkflowInputDataType, } from "@medusajs/framework/workflows-sdk" +import { SqlEntityManager } from "@mikro-orm/postgresql" import { WorkflowExecution } from "@models" import { WorkflowOrchestratorService } from "@services" type InjectedDependencies = { + manager: SqlEntityManager baseRepository: DAL.RepositoryService workflowExecutionService: ModulesSdkTypes.IMedusaInternalService workflowOrchestratorService: WorkflowOrchestratorService @@ -35,10 +36,12 @@ export class WorkflowsModuleService< protected baseRepository_: DAL.RepositoryService protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService protected workflowOrchestratorService_: WorkflowOrchestratorService - protected container_: MedusaContainer + protected manager_: SqlEntityManager + private clearTimeout_: NodeJS.Timeout constructor( { + manager, baseRepository, workflowExecutionService, workflowOrchestratorService, @@ -48,11 +51,25 @@ export class WorkflowsModuleService< // @ts-ignore super(...arguments) + this.manager_ = manager this.baseRepository_ = baseRepository this.workflowExecutionService_ = workflowExecutionService this.workflowOrchestratorService_ = workflowOrchestratorService } + __hooks = { + onApplicationStart: async () => { + await this.clearExpiredExecutions() + + this.clearTimeout_ = setInterval(async () => { + await this.clearExpiredExecutions() + }, 1000 * 60 * 60) + }, + onApplicationShutdown: async () => { + clearInterval(this.clearTimeout_) + }, + } + @InjectSharedContext() async run>( workflowIdOrWorkflow: TWorkflow, @@ -155,4 +172,12 @@ export class WorkflowsModuleService< ) { return this.workflowOrchestratorService_.unsubscribe(args as any, context) } + + private async clearExpiredExecutions() { + return this.manager_.execute(` + DELETE FROM workflow_execution + WHERE retention_time IS NOT NULL AND + updated_at <= (CURRENT_TIMESTAMP - INTERVAL '1 second' * retention_time); + `) + } } diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index bd1a625dc2..0a3248084e 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -47,7 +47,7 @@ export class InMemoryDistributedTransactionStorage this.workflowOrchestratorService_ = workflowOrchestratorService } - private async saveToDb(data: TransactionCheckpoint) { + private async saveToDb(data: TransactionCheckpoint, retentionTime?: number) { await this.workflowExecutionService_.upsert([ { workflow_id: data.flow.modelId, @@ -58,6 +58,7 @@ export class InMemoryDistributedTransactionStorage errors: data.errors, }, state: data.flow.state, + retention_time: retentionTime, }, ]) } @@ -143,7 +144,7 @@ export class InMemoryDistributedTransactionStorage if (hasFinished && !retentionTime && !idempotent) { await this.deleteFromDb(data) } else { - await this.saveToDb(data) + await this.saveToDb(data, retentionTime) } if (hasFinished) { diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 6d6f2353e1..d536aeccad 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -20,8 +20,8 @@ import { } from "@medusajs/framework/utils" import { moduleIntegrationTestRunner } from "@medusajs/test-utils" import { asValue } from "awilix" -import { setTimeout } from "timers/promises" import { setTimeout as setTimeoutSync } from "timers" +import { setTimeout } from "timers/promises" import { WorkflowsModuleService } from "../../src/services" import "../__fixtures__" import { createScheduled } from "../__fixtures__/workflow_scheduled" diff --git a/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json b/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json index c94f3039a5..e52c48397f 100644 --- a/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json +++ b/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json @@ -69,6 +69,15 @@ ], "mappedType": "enum" }, + "retention_time": { + "name": "retention_time", + "type": "integer", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "mappedType": "integer" + }, "created_at": { "name": "created_at", "type": "timestamptz", @@ -109,6 +118,7 @@ "keyName": "IDX_workflow_execution_deleted_at", "columnNames": [], "composite": false, + "constraint": false, "primary": false, "unique": false, "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_deleted_at\" ON \"workflow_execution\" (deleted_at) WHERE deleted_at IS NULL" @@ -117,6 +127,7 @@ "keyName": "IDX_workflow_execution_id", "columnNames": [], "composite": false, + "constraint": false, "primary": false, "unique": false, "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_id\" ON \"workflow_execution\" (id) WHERE deleted_at IS NULL" @@ -125,6 +136,7 @@ "keyName": "IDX_workflow_execution_workflow_id", "columnNames": [], "composite": false, + "constraint": false, "primary": false, "unique": false, "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id\" ON \"workflow_execution\" (workflow_id) WHERE deleted_at IS NULL" @@ -133,6 +145,7 @@ "keyName": "IDX_workflow_execution_transaction_id", "columnNames": [], "composite": false, + "constraint": false, "primary": false, "unique": false, "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_transaction_id\" ON \"workflow_execution\" (transaction_id) WHERE deleted_at IS NULL" @@ -141,6 +154,7 @@ "keyName": "IDX_workflow_execution_workflow_id_transaction_id_unique", "columnNames": [], "composite": false, + "constraint": false, "primary": false, "unique": false, "expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id_unique\" ON \"workflow_execution\" (workflow_id, transaction_id) WHERE deleted_at IS NULL" @@ -149,6 +163,7 @@ "keyName": "IDX_workflow_execution_state", "columnNames": [], "composite": false, + "constraint": false, "primary": false, "unique": false, "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state\" ON \"workflow_execution\" (state) WHERE deleted_at IS NULL" @@ -160,12 +175,15 @@ "transaction_id" ], "composite": true, + "constraint": true, "primary": true, "unique": true } ], "checks": [], - "foreignKeys": {} + "foreignKeys": {}, + "nativeEnums": {} } - ] + ], + "nativeEnums": {} } diff --git a/packages/modules/workflow-engine-redis/src/migrations/Migration20250128174354.ts b/packages/modules/workflow-engine-redis/src/migrations/Migration20250128174354.ts new file mode 100644 index 0000000000..ff4f5a369a --- /dev/null +++ b/packages/modules/workflow-engine-redis/src/migrations/Migration20250128174354.ts @@ -0,0 +1,25 @@ +import { Migration } from "@mikro-orm/migrations" + +export class Migration20250128174354 extends Migration { + override async up(): Promise { + this.addSql( + `alter table if exists "workflow_execution" add column if not exists "retention_time" integer null;` + ) + this.addSql(` + UPDATE workflow_execution + SET retention_time = ( + SELECT COALESCE( + (execution->'options'->>'retentionTime')::integer, + 0 + ) + ) + WHERE execution->'options' ? 'retentionTime'; + `) + } + + override async down(): Promise { + this.addSql( + `alter table if exists "workflow_execution" drop column if exists "retention_time";` + ) + } +} diff --git a/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts b/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts index 8557e77ae6..a533a54094 100644 --- a/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts +++ b/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts @@ -9,6 +9,7 @@ export const WorkflowExecution = model execution: model.json().nullable(), context: model.json().nullable(), state: model.enum(TransactionState), + retention_time: model.number().nullable(), }) .indexes([ { diff --git a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts index 7463623e94..4421f659e2 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts @@ -15,10 +15,12 @@ import type { ReturnWorkflow, UnwrapWorkflowInputDataType, } from "@medusajs/framework/workflows-sdk" +import { SqlEntityManager } from "@mikro-orm/postgresql" import { WorkflowExecution } from "@models" import { WorkflowOrchestratorService } from "@services" type InjectedDependencies = { + manager: SqlEntityManager baseRepository: DAL.RepositoryService workflowExecutionService: ModulesSdkTypes.IMedusaInternalService workflowOrchestratorService: WorkflowOrchestratorService @@ -36,9 +38,12 @@ export class WorkflowsModuleService< protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService protected workflowOrchestratorService_: WorkflowOrchestratorService protected redisDisconnectHandler_: () => Promise + protected manager_: SqlEntityManager + private clearTimeout_: NodeJS.Timeout constructor( { + manager, baseRepository, workflowExecutionService, workflowOrchestratorService, @@ -49,6 +54,7 @@ export class WorkflowsModuleService< // @ts-ignore super(...arguments) + this.manager_ = manager this.baseRepository_ = baseRepository this.workflowExecutionService_ = workflowExecutionService this.workflowOrchestratorService_ = workflowOrchestratorService @@ -59,12 +65,18 @@ export class WorkflowsModuleService< onApplicationShutdown: async () => { await this.workflowOrchestratorService_.onApplicationShutdown() await this.redisDisconnectHandler_() + clearInterval(this.clearTimeout_) }, onApplicationPrepareShutdown: async () => { await this.workflowOrchestratorService_.onApplicationPrepareShutdown() }, onApplicationStart: async () => { await this.workflowOrchestratorService_.onApplicationStart() + + await this.clearExpiredExecutions() + this.clearTimeout_ = setInterval(async () => { + await this.clearExpiredExecutions() + }, 1000 * 60 * 60) }, } @@ -170,4 +182,12 @@ export class WorkflowsModuleService< ) { return this.workflowOrchestratorService_.unsubscribe(args as any, context) } + + private async clearExpiredExecutions() { + return this.manager_.execute(` + DELETE FROM workflow_execution + WHERE retention_time IS NOT NULL AND + updated_at <= (CURRENT_TIMESTAMP - INTERVAL '1 second' * retention_time); + `) + } } diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 1ef9541930..c97a751eab 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -105,7 +105,7 @@ export class RedisDistributedTransactionStorage this.workflowOrchestratorService_ = workflowOrchestratorService } - private async saveToDb(data: TransactionCheckpoint) { + private async saveToDb(data: TransactionCheckpoint, retentionTime?: number) { await this.workflowExecutionService_.upsert([ { workflow_id: data.flow.modelId, @@ -116,6 +116,7 @@ export class RedisDistributedTransactionStorage errors: data.errors, }, state: data.flow.state, + retention_time: retentionTime, }, ]) } @@ -251,7 +252,7 @@ export class RedisDistributedTransactionStorage if (hasFinished && !retentionTime && !idempotent) { await this.deleteFromDb(data) } else { - await this.saveToDb(data) + await this.saveToDb(data, retentionTime) } if (hasFinished) {