chore(medusa): clear workflow execution (#11200)

CLOSES: SUP-704
This commit is contained in:
Carlos R. L. Rodrigues
2025-02-03 07:47:32 -03:00
committed by GitHub
parent a76cf3e8f5
commit c8376a9f15
13 changed files with 163 additions and 20 deletions

View File

@@ -0,0 +1,6 @@
---
"@medusajs/workflow-engine-inmemory": patch
"@medusajs/workflow-engine-redis": patch
---
chore(medusa): remove expired workflow executions

View File

@@ -341,7 +341,9 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
it("should execute a scheduled workflow", async () => {
const spy = createScheduled("standard")
const spy = createScheduled("standard", {
cron: "0 0 * * * *", // Jest issue: clearExpiredExecutions runs every hour, this is scheduled to run every hour to match the number of calls
})
await jest.runOnlyPendingTimersAsync()
expect(spy).toHaveBeenCalledTimes(1)

View File

@@ -69,6 +69,15 @@
],
"mappedType": "enum"
},
"retention_time": {
"name": "retention_time",
"type": "integer",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": true,
"mappedType": "integer"
},
"created_at": {
"name": "created_at",
"type": "timestamptz",
@@ -109,6 +118,7 @@
"keyName": "IDX_workflow_execution_deleted_at",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_deleted_at\" ON \"workflow_execution\" (deleted_at) WHERE deleted_at IS NULL"
@@ -117,22 +127,16 @@
"keyName": "IDX_workflow_execution_id",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_id\" ON \"workflow_execution\" (id) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_workflow_execution_workflow_id_transaction_id_unique",
"columnNames": [],
"composite": false,
"primary": false,
"unique": false,
"expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id_unique\" ON \"workflow_execution\" (workflow_id, transaction_id) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_workflow_execution_workflow_id",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id\" ON \"workflow_execution\" (workflow_id) WHERE deleted_at IS NULL"
@@ -141,14 +145,25 @@
"keyName": "IDX_workflow_execution_transaction_id",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_transaction_id\" ON \"workflow_execution\" (transaction_id) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_workflow_execution_workflow_id_transaction_id_unique",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id_unique\" ON \"workflow_execution\" (workflow_id, transaction_id) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_workflow_execution_state",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state\" ON \"workflow_execution\" (state) WHERE deleted_at IS NULL"
@@ -160,12 +175,15 @@
"transaction_id"
],
"composite": true,
"constraint": true,
"primary": true,
"unique": true
}
],
"checks": [],
"foreignKeys": {}
"foreignKeys": {},
"nativeEnums": {}
}
]
],
"nativeEnums": {}
}

View File

@@ -0,0 +1,25 @@
import { Migration } from "@mikro-orm/migrations"
export class Migration20250128174331 extends Migration {
override async up(): Promise<void> {
this.addSql(
`alter table if exists "workflow_execution" add column if not exists "retention_time" integer null;`
)
this.addSql(`
UPDATE workflow_execution
SET retention_time = (
SELECT COALESCE(
(execution->'options'->>'retentionTime')::integer,
0
)
)
WHERE execution->'options' ? 'retentionTime';
`)
}
override async down(): Promise<void> {
this.addSql(
`alter table if exists "workflow_execution" drop column if exists "retention_time";`
)
}
}

View File

@@ -9,6 +9,7 @@ export const WorkflowExecution = model
execution: model.json().nullable(),
context: model.json().nullable(),
state: model.enum(TransactionState),
retention_time: model.number().nullable(),
})
.indexes([
{

View File

@@ -3,7 +3,6 @@ import {
DAL,
InferEntityType,
InternalModuleDeclaration,
MedusaContainer,
ModulesSdkTypes,
WorkflowsSdkTypes,
} from "@medusajs/framework/types"
@@ -16,10 +15,12 @@ import type {
ReturnWorkflow,
UnwrapWorkflowInputDataType,
} from "@medusajs/framework/workflows-sdk"
import { SqlEntityManager } from "@mikro-orm/postgresql"
import { WorkflowExecution } from "@models"
import { WorkflowOrchestratorService } from "@services"
type InjectedDependencies = {
manager: SqlEntityManager
baseRepository: DAL.RepositoryService
workflowExecutionService: ModulesSdkTypes.IMedusaInternalService<any>
workflowOrchestratorService: WorkflowOrchestratorService
@@ -35,10 +36,12 @@ export class WorkflowsModuleService<
protected baseRepository_: DAL.RepositoryService
protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService<TWorkflowExecution>
protected workflowOrchestratorService_: WorkflowOrchestratorService
protected container_: MedusaContainer
protected manager_: SqlEntityManager
private clearTimeout_: NodeJS.Timeout
constructor(
{
manager,
baseRepository,
workflowExecutionService,
workflowOrchestratorService,
@@ -48,11 +51,25 @@ export class WorkflowsModuleService<
// @ts-ignore
super(...arguments)
this.manager_ = manager
this.baseRepository_ = baseRepository
this.workflowExecutionService_ = workflowExecutionService
this.workflowOrchestratorService_ = workflowOrchestratorService
}
__hooks = {
onApplicationStart: async () => {
await this.clearExpiredExecutions()
this.clearTimeout_ = setInterval(async () => {
await this.clearExpiredExecutions()
}, 1000 * 60 * 60)
},
onApplicationShutdown: async () => {
clearInterval(this.clearTimeout_)
},
}
@InjectSharedContext()
async run<TWorkflow extends string | ReturnWorkflow<any, any, any>>(
workflowIdOrWorkflow: TWorkflow,
@@ -155,4 +172,12 @@ export class WorkflowsModuleService<
) {
return this.workflowOrchestratorService_.unsubscribe(args as any, context)
}
private async clearExpiredExecutions() {
return this.manager_.execute(`
DELETE FROM workflow_execution
WHERE retention_time IS NOT NULL AND
updated_at <= (CURRENT_TIMESTAMP - INTERVAL '1 second' * retention_time);
`)
}
}

View File

@@ -47,7 +47,7 @@ export class InMemoryDistributedTransactionStorage
this.workflowOrchestratorService_ = workflowOrchestratorService
}
private async saveToDb(data: TransactionCheckpoint) {
private async saveToDb(data: TransactionCheckpoint, retentionTime?: number) {
await this.workflowExecutionService_.upsert([
{
workflow_id: data.flow.modelId,
@@ -58,6 +58,7 @@ export class InMemoryDistributedTransactionStorage
errors: data.errors,
},
state: data.flow.state,
retention_time: retentionTime,
},
])
}
@@ -143,7 +144,7 @@ export class InMemoryDistributedTransactionStorage
if (hasFinished && !retentionTime && !idempotent) {
await this.deleteFromDb(data)
} else {
await this.saveToDb(data)
await this.saveToDb(data, retentionTime)
}
if (hasFinished) {

View File

@@ -20,8 +20,8 @@ import {
} from "@medusajs/framework/utils"
import { moduleIntegrationTestRunner } from "@medusajs/test-utils"
import { asValue } from "awilix"
import { setTimeout } from "timers/promises"
import { setTimeout as setTimeoutSync } from "timers"
import { setTimeout } from "timers/promises"
import { WorkflowsModuleService } from "../../src/services"
import "../__fixtures__"
import { createScheduled } from "../__fixtures__/workflow_scheduled"

View File

@@ -69,6 +69,15 @@
],
"mappedType": "enum"
},
"retention_time": {
"name": "retention_time",
"type": "integer",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": true,
"mappedType": "integer"
},
"created_at": {
"name": "created_at",
"type": "timestamptz",
@@ -109,6 +118,7 @@
"keyName": "IDX_workflow_execution_deleted_at",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_deleted_at\" ON \"workflow_execution\" (deleted_at) WHERE deleted_at IS NULL"
@@ -117,6 +127,7 @@
"keyName": "IDX_workflow_execution_id",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_id\" ON \"workflow_execution\" (id) WHERE deleted_at IS NULL"
@@ -125,6 +136,7 @@
"keyName": "IDX_workflow_execution_workflow_id",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id\" ON \"workflow_execution\" (workflow_id) WHERE deleted_at IS NULL"
@@ -133,6 +145,7 @@
"keyName": "IDX_workflow_execution_transaction_id",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_transaction_id\" ON \"workflow_execution\" (transaction_id) WHERE deleted_at IS NULL"
@@ -141,6 +154,7 @@
"keyName": "IDX_workflow_execution_workflow_id_transaction_id_unique",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id_unique\" ON \"workflow_execution\" (workflow_id, transaction_id) WHERE deleted_at IS NULL"
@@ -149,6 +163,7 @@
"keyName": "IDX_workflow_execution_state",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state\" ON \"workflow_execution\" (state) WHERE deleted_at IS NULL"
@@ -160,12 +175,15 @@
"transaction_id"
],
"composite": true,
"constraint": true,
"primary": true,
"unique": true
}
],
"checks": [],
"foreignKeys": {}
"foreignKeys": {},
"nativeEnums": {}
}
]
],
"nativeEnums": {}
}

View File

@@ -0,0 +1,25 @@
import { Migration } from "@mikro-orm/migrations"
export class Migration20250128174354 extends Migration {
override async up(): Promise<void> {
this.addSql(
`alter table if exists "workflow_execution" add column if not exists "retention_time" integer null;`
)
this.addSql(`
UPDATE workflow_execution
SET retention_time = (
SELECT COALESCE(
(execution->'options'->>'retentionTime')::integer,
0
)
)
WHERE execution->'options' ? 'retentionTime';
`)
}
override async down(): Promise<void> {
this.addSql(
`alter table if exists "workflow_execution" drop column if exists "retention_time";`
)
}
}

View File

@@ -9,6 +9,7 @@ export const WorkflowExecution = model
execution: model.json().nullable(),
context: model.json().nullable(),
state: model.enum(TransactionState),
retention_time: model.number().nullable(),
})
.indexes([
{

View File

@@ -15,10 +15,12 @@ import type {
ReturnWorkflow,
UnwrapWorkflowInputDataType,
} from "@medusajs/framework/workflows-sdk"
import { SqlEntityManager } from "@mikro-orm/postgresql"
import { WorkflowExecution } from "@models"
import { WorkflowOrchestratorService } from "@services"
type InjectedDependencies = {
manager: SqlEntityManager
baseRepository: DAL.RepositoryService
workflowExecutionService: ModulesSdkTypes.IMedusaInternalService<any>
workflowOrchestratorService: WorkflowOrchestratorService
@@ -36,9 +38,12 @@ export class WorkflowsModuleService<
protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService<TWorkflowExecution>
protected workflowOrchestratorService_: WorkflowOrchestratorService
protected redisDisconnectHandler_: () => Promise<void>
protected manager_: SqlEntityManager
private clearTimeout_: NodeJS.Timeout
constructor(
{
manager,
baseRepository,
workflowExecutionService,
workflowOrchestratorService,
@@ -49,6 +54,7 @@ export class WorkflowsModuleService<
// @ts-ignore
super(...arguments)
this.manager_ = manager
this.baseRepository_ = baseRepository
this.workflowExecutionService_ = workflowExecutionService
this.workflowOrchestratorService_ = workflowOrchestratorService
@@ -59,12 +65,18 @@ export class WorkflowsModuleService<
onApplicationShutdown: async () => {
await this.workflowOrchestratorService_.onApplicationShutdown()
await this.redisDisconnectHandler_()
clearInterval(this.clearTimeout_)
},
onApplicationPrepareShutdown: async () => {
await this.workflowOrchestratorService_.onApplicationPrepareShutdown()
},
onApplicationStart: async () => {
await this.workflowOrchestratorService_.onApplicationStart()
await this.clearExpiredExecutions()
this.clearTimeout_ = setInterval(async () => {
await this.clearExpiredExecutions()
}, 1000 * 60 * 60)
},
}
@@ -170,4 +182,12 @@ export class WorkflowsModuleService<
) {
return this.workflowOrchestratorService_.unsubscribe(args as any, context)
}
private async clearExpiredExecutions() {
return this.manager_.execute(`
DELETE FROM workflow_execution
WHERE retention_time IS NOT NULL AND
updated_at <= (CURRENT_TIMESTAMP - INTERVAL '1 second' * retention_time);
`)
}
}

View File

@@ -105,7 +105,7 @@ export class RedisDistributedTransactionStorage
this.workflowOrchestratorService_ = workflowOrchestratorService
}
private async saveToDb(data: TransactionCheckpoint) {
private async saveToDb(data: TransactionCheckpoint, retentionTime?: number) {
await this.workflowExecutionService_.upsert([
{
workflow_id: data.flow.modelId,
@@ -116,6 +116,7 @@ export class RedisDistributedTransactionStorage
errors: data.errors,
},
state: data.flow.state,
retention_time: retentionTime,
},
])
}
@@ -251,7 +252,7 @@ export class RedisDistributedTransactionStorage
if (hasFinished && !retentionTime && !idempotent) {
await this.deleteFromDb(data)
} else {
await this.saveToDb(data)
await this.saveToDb(data, retentionTime)
}
if (hasFinished) {