chore(workflow-engine): Migrate to DML (#10477)

RESOLVES FRMW-2832
RESOLVES FRMW-2833

**What**
Migrate workflow engines to DML. Alos includes and update to the linkable generation which now takes into account id and primary keys to generate the linkable instead of only primary keys
This commit is contained in:
Adrien de Peretti
2024-12-06 14:23:07 +01:00
committed by GitHub
parent b0448a7c35
commit 0a077d48e1
16 changed files with 608 additions and 258 deletions

View File

@@ -1,10 +1,12 @@
import {
DistributedTransactionType,
TransactionStepTimeoutError,
TransactionTimeoutError,
WorkflowManager,
} from "@medusajs/framework/orchestration"
import {
IWorkflowEngineService,
Logger,
MedusaContainer,
RemoteQueryFunction,
} from "@medusajs/framework/types"
@@ -99,6 +101,20 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
serviceName: "workflows",
field: "workflowExecution",
},
transaction_id: {
entity: "WorkflowExecution",
field: "workflowExecution",
linkable: "workflow_execution_transaction_id",
primaryKey: "transaction_id",
serviceName: "workflows",
},
workflow_id: {
entity: "WorkflowExecution",
field: "workflowExecution",
linkable: "workflow_execution_workflow_id",
primaryKey: "workflow_id",
serviceName: "workflows",
},
},
})
})
@@ -112,10 +128,9 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
throwOnError: true,
})
let executionsList = await query({
workflow_executions: {
fields: ["workflow_id", "transaction_id", "state"],
},
let { data: executionsList } = await query.graph({
entity: "workflow_executions",
fields: ["workflow_id", "transaction_id", "state"],
})
expect(executionsList).toHaveLength(1)
@@ -130,11 +145,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
stepResponse: { uhuuuu: "yeaah!" },
})
executionsList = await query({
workflow_executions: {
fields: ["id"],
},
})
;({ data: executionsList } = await query.graph({
entity: "workflow_executions",
fields: ["id"],
}))
expect(executionsList).toHaveLength(0)
expect(result).toEqual({
@@ -153,10 +167,9 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
transactionId: "transaction_1",
})
let executionsList = await query({
workflow_executions: {
fields: ["id"],
},
let { data: executionsList } = await query.graph({
entity: "workflow_executions",
fields: ["id"],
})
expect(executionsList).toHaveLength(1)
@@ -170,12 +183,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
},
stepResponse: { uhuuuu: "yeaah!" },
})
executionsList = await query({
workflow_executions: {
fields: ["id"],
},
})
;({ data: executionsList } = await query.graph({
entity: "workflow_executions",
fields: ["id"],
}))
expect(executionsList).toHaveLength(1)
})
@@ -188,10 +199,9 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
transactionId: "transaction_1",
})
let executionsList = await query({
workflow_executions: {
fields: ["id"],
},
let { data: executionsList } = await query.graph({
entity: "workflow_executions",
fields: ["id"],
})
expect(executionsList).toHaveLength(1)
@@ -205,12 +215,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
},
stepResponse: { uhuuuu: "yeaah!" },
})
executionsList = await query({
workflow_executions: {
fields: ["id", "state"],
},
})
;({ data: executionsList } = await query.graph({
entity: "workflow_executions",
fields: ["id", "state"],
}))
expect(executionsList).toHaveLength(1)
expect(executionsList[0].state).toEqual("reverted")
@@ -237,10 +245,9 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
},
})
let executionsList = await query({
workflow_executions: {
fields: ["id"],
},
let { data: executionsList } = await query.graph({
entity: "workflow_executions",
fields: ["id"],
})
expect(executionsList).toHaveLength(1)
@@ -260,12 +267,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
expect(setStepError).toEqual({ uhuuuu: "yeaah!" })
executionsList = await query({
workflow_executions: {
fields: ["id", "state", "context"],
},
})
;({ data: executionsList } = await query.graph({
entity: "workflow_executions",
fields: ["id", "state", "context"],
}))
expect(executionsList).toHaveLength(1)
expect(executionsList[0].state).toEqual("failed")
@@ -273,7 +278,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
it("should revert the entire transaction when a step timeout expires", async () => {
const { transaction, result, errors } = await workflowOrcModule.run(
const { transaction, result, errors } = (await workflowOrcModule.run(
"workflow_step_timeout",
{
input: {
@@ -282,9 +287,13 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
throwOnError: false,
logOnError: true,
}
)
)) as Awaited<{
transaction: DistributedTransactionType
result: any
errors: any
}>
expect(transaction.flow.state).toEqual("reverted")
expect(transaction.getFlow().state).toEqual("reverted")
expect(result).toEqual({
myInput: "123",
})
@@ -294,16 +303,20 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
it("should revert the entire transaction when the transaction timeout expires", async () => {
const { transaction, result, errors } = await workflowOrcModule.run(
const { transaction, result, errors } = (await workflowOrcModule.run(
"workflow_transaction_timeout",
{
input: {},
transactionId: "trx",
throwOnError: false,
}
)
)) as Awaited<{
transaction: DistributedTransactionType
result: any
errors: any
}>
expect(transaction.flow.state).toEqual("reverted")
expect(transaction.getFlow().state).toEqual("reverted")
expect(result).toEqual({ executed: true })
expect(errors).toHaveLength(1)
expect(errors[0].action).toEqual("step_1")
@@ -323,7 +336,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
await setTimeout(200)
const { transaction, result, errors } = await workflowOrcModule.run(
const { transaction, result, errors } = (await workflowOrcModule.run(
"workflow_step_timeout_async",
{
input: {
@@ -332,9 +345,13 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
transactionId: "transaction_1",
throwOnError: false,
}
)
)) as Awaited<{
transaction: DistributedTransactionType
result: any
errors: any
}>
expect(transaction.flow.state).toEqual("reverted")
expect(transaction.getFlow().state).toEqual("reverted")
expect(result).toEqual(undefined)
expect(errors).toHaveLength(1)
expect(errors[0].action).toEqual("step_1_async")
@@ -354,16 +371,20 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
await setTimeout(200)
const { transaction, result, errors } = await workflowOrcModule.run(
const { transaction, result, errors } = (await workflowOrcModule.run(
"workflow_transaction_timeout_async",
{
input: {},
transactionId: "transaction_1",
throwOnError: false,
}
)
)) as Awaited<{
transaction: DistributedTransactionType
result: any
errors: any
}>
expect(transaction.flow.state).toEqual("reverted")
expect(transaction.getFlow().state).toEqual("reverted")
expect(result).toEqual(undefined)
expect(errors).toHaveLength(1)
expect(errors[0].action).toEqual("step_1")

View File

@@ -15,11 +15,11 @@ const redisUrl = process.env.REDIS_URL || "redis://localhost:6379"
const redis = new Redis(redisUrl)
interface TestDatabase {
clearTables(knex): Promise<void>
clearTables(): Promise<void>
}
export const TestDatabase: TestDatabase = {
clearTables: async (knex) => {
clearTables: async () => {
await cleanRedis()
},
}

View File

@@ -0,0 +1,163 @@
{
"namespaces": [
"public"
],
"name": "public",
"tables": [
{
"columns": {
"workflow_id": {
"name": "workflow_id",
"type": "text",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"mappedType": "text"
},
"transaction_id": {
"name": "transaction_id",
"type": "text",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"mappedType": "text"
},
"id": {
"name": "id",
"type": "text",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"mappedType": "text"
},
"execution": {
"name": "execution",
"type": "jsonb",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": true,
"mappedType": "json"
},
"context": {
"name": "context",
"type": "jsonb",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": true,
"mappedType": "json"
},
"state": {
"name": "state",
"type": "text",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"enumItems": [
"not_started",
"invoking",
"waiting_to_compensate",
"compensating",
"done",
"reverted",
"failed"
],
"mappedType": "enum"
},
"created_at": {
"name": "created_at",
"type": "timestamptz",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"length": 6,
"default": "now()",
"mappedType": "datetime"
},
"updated_at": {
"name": "updated_at",
"type": "timestamptz",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"length": 6,
"default": "now()",
"mappedType": "datetime"
},
"deleted_at": {
"name": "deleted_at",
"type": "timestamptz",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": true,
"length": 6,
"mappedType": "datetime"
}
},
"name": "workflow_execution",
"schema": "public",
"indexes": [
{
"keyName": "IDX_workflow_execution_deleted_at",
"columnNames": [],
"composite": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_deleted_at\" ON \"workflow_execution\" (deleted_at) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_workflow_execution_id",
"columnNames": [],
"composite": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_id\" ON \"workflow_execution\" (id) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_workflow_execution_workflow_id",
"columnNames": [],
"composite": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id\" ON \"workflow_execution\" (workflow_id) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_workflow_execution_transaction_id",
"columnNames": [],
"composite": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_transaction_id\" ON \"workflow_execution\" (transaction_id) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_workflow_execution_state",
"columnNames": [],
"composite": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state\" ON \"workflow_execution\" (state) WHERE deleted_at IS NULL"
},
{
"keyName": "workflow_execution_pkey",
"columnNames": [
"workflow_id",
"transaction_id"
],
"composite": true,
"primary": true,
"unique": true
}
],
"checks": [],
"foreignKeys": {}
}
]
}

View File

@@ -0,0 +1,27 @@
import { Migration } from "@mikro-orm/migrations"
export class Migration20241206123341 extends Migration {
async up(): Promise<void> {
this.addSql(
`DROP INDEX IF EXISTS "IDX_workflow_execution_id";
DROP INDEX IF EXISTS "IDX_workflow_execution_workflow_id";
DROP INDEX IF EXISTS "IDX_workflow_execution_transaction_id";
DROP INDEX IF EXISTS "IDX_workflow_execution_state";`
)
this.addSql(
'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_deleted_at" ON "workflow_execution" (deleted_at) WHERE deleted_at IS NULL;'
)
this.addSql(
'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_id" ON "workflow_execution" (id) WHERE deleted_at IS NULL;'
)
this.addSql(
'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id" ON "workflow_execution" (workflow_id) WHERE deleted_at IS NULL;'
)
this.addSql(
'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_transaction_id" ON "workflow_execution" (transaction_id) WHERE deleted_at IS NULL;'
)
this.addSql(
'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_state" ON "workflow_execution" (state) WHERE deleted_at IS NULL;'
)
}
}

View File

@@ -1 +1 @@
export { default as WorkflowExecution } from "./workflow-execution"
export { WorkflowExecution } from "./workflow-execution"

View File

@@ -1,76 +1,30 @@
import { TransactionState } from "@medusajs/framework/orchestration"
import { DALUtils, generateEntityId } from "@medusajs/framework/utils"
import {
BeforeCreate,
Entity,
Enum,
Filter,
Index,
OnInit,
OptionalProps,
PrimaryKey,
Property,
Unique,
} from "@mikro-orm/core"
import { model } from "@medusajs/framework/utils"
type OptionalFields = "deleted_at"
@Entity()
@Unique({
name: "IDX_workflow_execution_workflow_id_transaction_id_unique",
properties: ["workflow_id", "transaction_id"],
})
@Filter(DALUtils.mikroOrmSoftDeletableFilterOptions)
export default class WorkflowExecution {
[OptionalProps]?: OptionalFields
@Property({ columnType: "text", nullable: false })
@Index({ name: "IDX_workflow_execution_id" })
id!: string
@Index({ name: "IDX_workflow_execution_workflow_id" })
@PrimaryKey({ columnType: "text" })
workflow_id: string
@Index({ name: "IDX_workflow_execution_transaction_id" })
@PrimaryKey({ columnType: "text" })
transaction_id: string
@Property({ columnType: "jsonb", nullable: true })
execution: Record<string, unknown> | null = null
@Property({ columnType: "jsonb", nullable: true })
context: Record<string, unknown> | null = null
@Index({ name: "IDX_workflow_execution_state" })
@Enum(() => TransactionState)
state: TransactionState
@Property({
onCreate: () => new Date(),
columnType: "timestamptz",
defaultRaw: "now()",
export const WorkflowExecution = model
.define("workflow_execution", {
id: model.id({ prefix: "wf_exec" }),
workflow_id: model.text().primaryKey(),
transaction_id: model.text().primaryKey(),
execution: model.json().nullable(),
context: model.json().nullable(),
state: model.enum(TransactionState),
})
created_at: Date
@Property({
onCreate: () => new Date(),
onUpdate: () => new Date(),
columnType: "timestamptz",
defaultRaw: "now()",
})
updated_at: Date
@Property({ columnType: "timestamptz", nullable: true })
deleted_at: Date | null = null
@BeforeCreate()
onCreate() {
this.id = generateEntityId(this.id, "wf_exec")
}
@OnInit()
onInit() {
this.id = generateEntityId(this.id, "wf_exec")
}
}
.indexes([
{
on: ["id"],
where: "deleted_at IS NULL",
},
{
on: ["workflow_id"],
where: "deleted_at IS NULL",
},
{
on: ["transaction_id"],
where: "deleted_at IS NULL",
},
{
on: ["state"],
where: "deleted_at IS NULL",
},
])

View File

@@ -1,6 +1,7 @@
import {
Context,
DAL,
InferEntityType,
InternalModuleDeclaration,
ModulesSdkTypes,
WorkflowsSdkTypes,
@@ -25,9 +26,11 @@ type InjectedDependencies = {
}
export class WorkflowsModuleService<
TWorkflowExecution extends WorkflowExecution = WorkflowExecution
TWorkflowExecution extends InferEntityType<
typeof WorkflowExecution
> = InferEntityType<typeof WorkflowExecution>
> extends ModulesSdkUtils.MedusaService<{
WorkflowExecution: { dto: WorkflowExecution }
WorkflowExecution: { dto: InferEntityType<typeof WorkflowExecution> }
}>({ WorkflowExecution }) {
protected baseRepository_: DAL.RepositoryService
protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService<TWorkflowExecution>