From 0a077d48e14976bafcf19705fc48e66756362fd6 Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Fri, 6 Dec 2024 14:23:07 +0100 Subject: [PATCH] chore(workflow-engine): Migrate to DML (#10477) RESOLVES FRMW-2832 RESOLVES FRMW-2833 **What** Migrate workflow engines to DML. Alos includes and update to the linkable generation which now takes into account id and primary keys to generate the linkable instead of only primary keys --- .changeset/clean-paws-build.md | 7 + .../__tests__/joiner-config-builder.spec.ts | 4 +- .../src/modules-sdk/joiner-config-builder.ts | 7 +- .../integration-tests/__tests__/index.spec.ts | 115 +++++++----- .../.snapshot-medusa-workflows.json | 171 ++++++++++++++++++ .../src/migrations/Migration20241206101446.ts | 27 +++ .../src/models/index.ts | 2 +- .../src/models/workflow-execution.ts | 100 +++------- .../src/services/workflows-module.ts | 7 +- .../integration-tests/__tests__/index.spec.ts | 123 +++++++------ .../integration-tests/utils/database.ts | 4 +- .../.snapshot-medusa-workflows.json | 163 +++++++++++++++++ .../src/migrations/Migration20241206123341.ts | 27 +++ .../workflow-engine-redis/src/models/index.ts | 2 +- .../src/models/workflow-execution.ts | 100 +++------- .../src/services/workflows-module.ts | 7 +- 16 files changed, 608 insertions(+), 258 deletions(-) create mode 100644 .changeset/clean-paws-build.md create mode 100644 packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json create mode 100644 packages/modules/workflow-engine-inmemory/src/migrations/Migration20241206101446.ts create mode 100644 packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json create mode 100644 packages/modules/workflow-engine-redis/src/migrations/Migration20241206123341.ts diff --git a/.changeset/clean-paws-build.md b/.changeset/clean-paws-build.md new file mode 100644 index 0000000000..95b3498134 --- /dev/null +++ b/.changeset/clean-paws-build.md @@ -0,0 +1,7 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/utils": patch +--- + +chore(workflow-engine): Migrate to DML diff --git a/packages/core/utils/src/modules-sdk/__tests__/joiner-config-builder.spec.ts b/packages/core/utils/src/modules-sdk/__tests__/joiner-config-builder.spec.ts index e7e93dfc5d..dcdfe6c00e 100644 --- a/packages/core/utils/src/modules-sdk/__tests__/joiner-config-builder.spec.ts +++ b/packages/core/utils/src/modules-sdk/__tests__/joiner-config-builder.spec.ts @@ -668,8 +668,8 @@ describe("joiner-config-builder", () => { serviceName: "myService", field: "car", entity: "Car", - linkable: "car_number_plate", - primaryKey: "number_plate", + linkable: "car_id", + primaryKey: "id", }) expect(linkConfig.user.toJSON()).toEqual({ serviceName: "myService", diff --git a/packages/core/utils/src/modules-sdk/joiner-config-builder.ts b/packages/core/utils/src/modules-sdk/joiner-config-builder.ts index 8c5809b6fb..b31278a88b 100644 --- a/packages/core/utils/src/modules-sdk/joiner-config-builder.ts +++ b/packages/core/utils/src/modules-sdk/joiner-config-builder.ts @@ -17,7 +17,7 @@ import { toCamelCase, upperCaseFirst, } from "../common" -import { DmlEntity } from "../dml" +import { DmlEntity, IdProperty } from "../dml" import { toGraphQLSchema } from "../dml/helpers/create-graphql" import { PrimaryKeyModifier } from "../dml/properties/primary-key" import { BaseRelationship } from "../dml/relations/base" @@ -396,7 +396,10 @@ export function buildLinkConfigFromModelObjects< } const parsedProperty = (value as PropertyType).parse(property) - if (PrimaryKeyModifier.isPrimaryKeyModifier(value)) { + if ( + PrimaryKeyModifier.isPrimaryKeyModifier(value) || + IdProperty.isIdProperty(value) + ) { const linkableKeyName = parsedProperty.dataType.options?.linkable ?? `${camelToSnakeCase(model.name).toLowerCase()}_${property}` diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index e499e9381b..765d64faca 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -1,4 +1,7 @@ -import { WorkflowManager } from "@medusajs/framework/orchestration" +import { + DistributedTransactionType, + WorkflowManager, +} from "@medusajs/framework/orchestration" import { Context, IWorkflowEngineService, @@ -60,6 +63,20 @@ moduleIntegrationTestRunner({ serviceName: "workflows", field: "workflowExecution", }, + transaction_id: { + linkable: "workflow_execution_transaction_id", + entity: "WorkflowExecution", + primaryKey: "transaction_id", + serviceName: "workflows", + field: "workflowExecution", + }, + workflow_id: { + linkable: "workflow_execution_workflow_id", + entity: "WorkflowExecution", + primaryKey: "workflow_id", + serviceName: "workflows", + field: "workflowExecution", + }, }, }) }) @@ -87,12 +104,12 @@ moduleIntegrationTestRunner({ }) // Validate context event group id - expect(workflowEventGroupIdStep1Mock.mock.calls[0][1]).toEqual( - expect.objectContaining({ eventGroupId }) - ) - expect(workflowEventGroupIdStep2Mock.mock.calls[0][1]).toEqual( - expect.objectContaining({ eventGroupId }) - ) + expect( + (workflowEventGroupIdStep1Mock.mock.calls[0] as any[])[1] + ).toEqual(expect.objectContaining({ eventGroupId })) + expect( + (workflowEventGroupIdStep2Mock.mock.calls[0] as any[])[1] + ).toEqual(expect.objectContaining({ eventGroupId })) }) it("should execute an async workflow keeping track of the event group id that has been auto generated", async () => { @@ -114,14 +131,19 @@ moduleIntegrationTestRunner({ stepResponse: { hey: "oh" }, }) - const generatedEventGroupId = (workflowEventGroupIdStep1Mock.mock - .calls[0][1] as unknown as Context)!.eventGroupId + const generatedEventGroupId = (( + workflowEventGroupIdStep1Mock.mock.calls[0] as any[] + )[1] as unknown as Context)!.eventGroupId // Validate context event group id - expect(workflowEventGroupIdStep1Mock.mock.calls[0][1]).toEqual( + expect( + (workflowEventGroupIdStep1Mock.mock.calls[0] as any[])[1] + ).toEqual( expect.objectContaining({ eventGroupId: generatedEventGroupId }) ) - expect(workflowEventGroupIdStep2Mock.mock.calls[0][1]).toEqual( + expect( + (workflowEventGroupIdStep2Mock.mock.calls[0] as any[])[1] + ).toEqual( expect.objectContaining({ eventGroupId: generatedEventGroupId }) ) }) @@ -139,10 +161,9 @@ moduleIntegrationTestRunner({ throwOnError: true, }) - let executionsList = await query({ - workflow_executions: { - fields: ["workflow_id", "transaction_id", "state"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["workflow_id", "transaction_id", "state"], }) expect(executionsList).toHaveLength(1) @@ -157,11 +178,10 @@ moduleIntegrationTestRunner({ stepResponse: { uhuuuu: "yeaah!" }, }) - executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, - }) + ;({ data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], + })) expect(executionsList).toHaveLength(0) expect(result).toEqual({ @@ -180,10 +200,9 @@ moduleIntegrationTestRunner({ transactionId: "transaction_1", }) - let executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], }) expect(executionsList).toHaveLength(1) @@ -208,40 +227,38 @@ moduleIntegrationTestRunner({ expect(workflow2Step3Invoke.mock.calls[0][0]).toEqual({ uhuuuu: "yeaah!", }) - - executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, - }) + ;({ data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], + })) expect(executionsList).toHaveLength(1) }) it("should revert the entire transaction when a step timeout expires", async () => { - const { transaction } = await workflowOrcModule.run( + const { transaction } = (await workflowOrcModule.run( "workflow_step_timeout", { input: {}, throwOnError: false, } - ) + )) as Awaited<{ transaction: DistributedTransactionType }> - expect(transaction.flow.state).toEqual("reverted") + expect(transaction.getFlow().state).toEqual("reverted") }) it("should revert the entire transaction when the transaction timeout expires", async () => { - const { transaction } = await workflowOrcModule.run( + const { transaction } = (await workflowOrcModule.run( "workflow_transaction_timeout", { input: {}, throwOnError: false, } - ) + )) as Awaited<{ transaction: DistributedTransactionType }> await setTimeoutPromise(200) - expect(transaction.flow.state).toEqual("reverted") + expect(transaction.getFlow().state).toEqual("reverted") }) it.skip("should subscribe to a async workflow and receive the response when it finishes", (done) => { @@ -393,7 +410,7 @@ moduleIntegrationTestRunner({ }) it("should fetch an idempotent workflow after its completion", async () => { - const { transaction: firstRun } = await workflowOrcModule.run( + const { transaction: firstRun } = (await workflowOrcModule.run( "workflow_idempotent", { input: { @@ -402,15 +419,14 @@ moduleIntegrationTestRunner({ throwOnError: true, transactionId: "transaction_1", } - ) + )) as Awaited<{ transaction: DistributedTransactionType }> - let executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], }) - const { transaction: secondRun } = await workflowOrcModule.run( + const { transaction: secondRun } = (await workflowOrcModule.run( "workflow_idempotent", { input: { @@ -419,15 +435,16 @@ moduleIntegrationTestRunner({ throwOnError: true, transactionId: "transaction_1", } - ) + )) as Awaited<{ transaction: DistributedTransactionType }> - const executionsListAfter = await query({ - workflow_executions: { - fields: ["id"], - }, + const { data: executionsListAfter } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], }) - expect(secondRun.flow.startedAt).toEqual(firstRun.flow.startedAt) + expect(secondRun.getFlow().startedAt).toEqual( + firstRun.getFlow().startedAt + ) expect(executionsList).toHaveLength(1) expect(executionsListAfter).toHaveLength(1) }) diff --git a/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json b/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json new file mode 100644 index 0000000000..0c8c584c91 --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json @@ -0,0 +1,171 @@ +{ + "namespaces": [ + "public" + ], + "name": "public", + "tables": [ + { + "columns": { + "workflow_id": { + "name": "workflow_id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "transaction_id": { + "name": "transaction_id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "id": { + "name": "id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "execution": { + "name": "execution", + "type": "jsonb", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "mappedType": "json" + }, + "context": { + "name": "context", + "type": "jsonb", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "mappedType": "json" + }, + "state": { + "name": "state", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "enumItems": [ + "not_started", + "invoking", + "waiting_to_compensate", + "compensating", + "done", + "reverted", + "failed" + ], + "mappedType": "enum" + }, + "created_at": { + "name": "created_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "length": 6, + "default": "now()", + "mappedType": "datetime" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "length": 6, + "default": "now()", + "mappedType": "datetime" + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "length": 6, + "mappedType": "datetime" + } + }, + "name": "workflow_execution", + "schema": "public", + "indexes": [ + { + "keyName": "IDX_workflow_execution_deleted_at", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_deleted_at\" ON \"workflow_execution\" (deleted_at) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_id", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_id\" ON \"workflow_execution\" (id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_workflow_id_transaction_id_unique", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id_unique\" ON \"workflow_execution\" (workflow_id, transaction_id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_workflow_id", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id\" ON \"workflow_execution\" (workflow_id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_transaction_id", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_transaction_id\" ON \"workflow_execution\" (transaction_id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_state", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state\" ON \"workflow_execution\" (state) WHERE deleted_at IS NULL" + }, + { + "keyName": "workflow_execution_pkey", + "columnNames": [ + "workflow_id", + "transaction_id" + ], + "composite": true, + "primary": true, + "unique": true + } + ], + "checks": [], + "foreignKeys": {} + } + ] +} diff --git a/packages/modules/workflow-engine-inmemory/src/migrations/Migration20241206101446.ts b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20241206101446.ts new file mode 100644 index 0000000000..be8b1cbac1 --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20241206101446.ts @@ -0,0 +1,27 @@ +import { Migration } from "@mikro-orm/migrations" + +export class Migration20241206101446 extends Migration { + async up(): Promise { + this.addSql( + `DROP INDEX IF EXISTS "IDX_workflow_execution_id"; + DROP INDEX IF EXISTS "IDX_workflow_execution_workflow_id"; + DROP INDEX IF EXISTS "IDX_workflow_execution_transaction_id"; + DROP INDEX IF EXISTS "IDX_workflow_execution_state";` + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_deleted_at" ON "workflow_execution" (deleted_at) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_id" ON "workflow_execution" (id) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id" ON "workflow_execution" (workflow_id) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_transaction_id" ON "workflow_execution" (transaction_id) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_state" ON "workflow_execution" (state) WHERE deleted_at IS NULL;' + ) + } +} diff --git a/packages/modules/workflow-engine-inmemory/src/models/index.ts b/packages/modules/workflow-engine-inmemory/src/models/index.ts index 78fcbfa921..fa5b8a3dd0 100644 --- a/packages/modules/workflow-engine-inmemory/src/models/index.ts +++ b/packages/modules/workflow-engine-inmemory/src/models/index.ts @@ -1 +1 @@ -export { default as WorkflowExecution } from "./workflow-execution" +export { WorkflowExecution } from "./workflow-execution" diff --git a/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts b/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts index 22e693d428..c41bc8936e 100644 --- a/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts +++ b/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts @@ -1,76 +1,30 @@ import { TransactionState } from "@medusajs/framework/orchestration" -import { DALUtils, generateEntityId } from "@medusajs/framework/utils" -import { - BeforeCreate, - Entity, - Enum, - Filter, - Index, - OnInit, - OptionalProps, - PrimaryKey, - Property, - Unique, -} from "@mikro-orm/core" +import { model } from "@medusajs/framework/utils" -type OptionalFields = "deleted_at" - -@Entity() -@Unique({ - name: "IDX_workflow_execution_workflow_id_transaction_id_unique", - properties: ["workflow_id", "transaction_id"], -}) -@Filter(DALUtils.mikroOrmSoftDeletableFilterOptions) -export default class WorkflowExecution { - [OptionalProps]?: OptionalFields - - @Property({ columnType: "text", nullable: false }) - @Index({ name: "IDX_workflow_execution_id" }) - id!: string - - @Index({ name: "IDX_workflow_execution_workflow_id" }) - @PrimaryKey({ columnType: "text" }) - workflow_id: string - - @Index({ name: "IDX_workflow_execution_transaction_id" }) - @PrimaryKey({ columnType: "text" }) - transaction_id: string - - @Property({ columnType: "jsonb", nullable: true }) - execution: Record | null = null - - @Property({ columnType: "jsonb", nullable: true }) - context: Record | null = null - - @Index({ name: "IDX_workflow_execution_state" }) - @Enum(() => TransactionState) - state: TransactionState - - @Property({ - onCreate: () => new Date(), - columnType: "timestamptz", - defaultRaw: "now()", +export const WorkflowExecution = model + .define("workflow_execution", { + id: model.id({ prefix: "wf_exec" }), + workflow_id: model.text().primaryKey(), + transaction_id: model.text().primaryKey(), + execution: model.json().nullable(), + context: model.json().nullable(), + state: model.enum(TransactionState), }) - created_at: Date - - @Property({ - onCreate: () => new Date(), - onUpdate: () => new Date(), - columnType: "timestamptz", - defaultRaw: "now()", - }) - updated_at: Date - - @Property({ columnType: "timestamptz", nullable: true }) - deleted_at: Date | null = null - - @BeforeCreate() - onCreate() { - this.id = generateEntityId(this.id, "wf_exec") - } - - @OnInit() - onInit() { - this.id = generateEntityId(this.id, "wf_exec") - } -} + .indexes([ + { + on: ["id"], + where: "deleted_at IS NULL", + }, + { + on: ["workflow_id"], + where: "deleted_at IS NULL", + }, + { + on: ["transaction_id"], + where: "deleted_at IS NULL", + }, + { + on: ["state"], + where: "deleted_at IS NULL", + }, + ]) diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts index 9771f48784..3b67e36d2a 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts @@ -1,6 +1,7 @@ import { Context, DAL, + InferEntityType, InternalModuleDeclaration, MedusaContainer, ModulesSdkTypes, @@ -25,9 +26,11 @@ type InjectedDependencies = { } export class WorkflowsModuleService< - TWorkflowExecution extends WorkflowExecution = WorkflowExecution + TWorkflowExecution extends InferEntityType< + typeof WorkflowExecution + > = InferEntityType > extends ModulesSdkUtils.MedusaService<{ - WorkflowExecution: { dto: WorkflowExecution } + WorkflowExecution: { dto: InferEntityType } }>({ WorkflowExecution }) { protected baseRepository_: DAL.RepositoryService protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 1e30157a24..00f60a894a 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -1,10 +1,12 @@ import { + DistributedTransactionType, TransactionStepTimeoutError, TransactionTimeoutError, WorkflowManager, } from "@medusajs/framework/orchestration" import { IWorkflowEngineService, + Logger, MedusaContainer, RemoteQueryFunction, } from "@medusajs/framework/types" @@ -99,6 +101,20 @@ moduleIntegrationTestRunner({ serviceName: "workflows", field: "workflowExecution", }, + transaction_id: { + entity: "WorkflowExecution", + field: "workflowExecution", + linkable: "workflow_execution_transaction_id", + primaryKey: "transaction_id", + serviceName: "workflows", + }, + workflow_id: { + entity: "WorkflowExecution", + field: "workflowExecution", + linkable: "workflow_execution_workflow_id", + primaryKey: "workflow_id", + serviceName: "workflows", + }, }, }) }) @@ -112,10 +128,9 @@ moduleIntegrationTestRunner({ throwOnError: true, }) - let executionsList = await query({ - workflow_executions: { - fields: ["workflow_id", "transaction_id", "state"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["workflow_id", "transaction_id", "state"], }) expect(executionsList).toHaveLength(1) @@ -130,11 +145,10 @@ moduleIntegrationTestRunner({ stepResponse: { uhuuuu: "yeaah!" }, }) - executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, - }) + ;({ data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], + })) expect(executionsList).toHaveLength(0) expect(result).toEqual({ @@ -153,10 +167,9 @@ moduleIntegrationTestRunner({ transactionId: "transaction_1", }) - let executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], }) expect(executionsList).toHaveLength(1) @@ -170,12 +183,10 @@ moduleIntegrationTestRunner({ }, stepResponse: { uhuuuu: "yeaah!" }, }) - - executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, - }) + ;({ data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], + })) expect(executionsList).toHaveLength(1) }) @@ -188,10 +199,9 @@ moduleIntegrationTestRunner({ transactionId: "transaction_1", }) - let executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], }) expect(executionsList).toHaveLength(1) @@ -205,12 +215,10 @@ moduleIntegrationTestRunner({ }, stepResponse: { uhuuuu: "yeaah!" }, }) - - executionsList = await query({ - workflow_executions: { - fields: ["id", "state"], - }, - }) + ;({ data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id", "state"], + })) expect(executionsList).toHaveLength(1) expect(executionsList[0].state).toEqual("reverted") @@ -237,10 +245,9 @@ moduleIntegrationTestRunner({ }, }) - let executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], }) expect(executionsList).toHaveLength(1) @@ -260,12 +267,10 @@ moduleIntegrationTestRunner({ }) expect(setStepError).toEqual({ uhuuuu: "yeaah!" }) - - executionsList = await query({ - workflow_executions: { - fields: ["id", "state", "context"], - }, - }) + ;({ data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id", "state", "context"], + })) expect(executionsList).toHaveLength(1) expect(executionsList[0].state).toEqual("failed") @@ -273,7 +278,7 @@ moduleIntegrationTestRunner({ }) it("should revert the entire transaction when a step timeout expires", async () => { - const { transaction, result, errors } = await workflowOrcModule.run( + const { transaction, result, errors } = (await workflowOrcModule.run( "workflow_step_timeout", { input: { @@ -282,9 +287,13 @@ moduleIntegrationTestRunner({ throwOnError: false, logOnError: true, } - ) + )) as Awaited<{ + transaction: DistributedTransactionType + result: any + errors: any + }> - expect(transaction.flow.state).toEqual("reverted") + expect(transaction.getFlow().state).toEqual("reverted") expect(result).toEqual({ myInput: "123", }) @@ -294,16 +303,20 @@ moduleIntegrationTestRunner({ }) it("should revert the entire transaction when the transaction timeout expires", async () => { - const { transaction, result, errors } = await workflowOrcModule.run( + const { transaction, result, errors } = (await workflowOrcModule.run( "workflow_transaction_timeout", { input: {}, transactionId: "trx", throwOnError: false, } - ) + )) as Awaited<{ + transaction: DistributedTransactionType + result: any + errors: any + }> - expect(transaction.flow.state).toEqual("reverted") + expect(transaction.getFlow().state).toEqual("reverted") expect(result).toEqual({ executed: true }) expect(errors).toHaveLength(1) expect(errors[0].action).toEqual("step_1") @@ -323,7 +336,7 @@ moduleIntegrationTestRunner({ await setTimeout(200) - const { transaction, result, errors } = await workflowOrcModule.run( + const { transaction, result, errors } = (await workflowOrcModule.run( "workflow_step_timeout_async", { input: { @@ -332,9 +345,13 @@ moduleIntegrationTestRunner({ transactionId: "transaction_1", throwOnError: false, } - ) + )) as Awaited<{ + transaction: DistributedTransactionType + result: any + errors: any + }> - expect(transaction.flow.state).toEqual("reverted") + expect(transaction.getFlow().state).toEqual("reverted") expect(result).toEqual(undefined) expect(errors).toHaveLength(1) expect(errors[0].action).toEqual("step_1_async") @@ -354,16 +371,20 @@ moduleIntegrationTestRunner({ await setTimeout(200) - const { transaction, result, errors } = await workflowOrcModule.run( + const { transaction, result, errors } = (await workflowOrcModule.run( "workflow_transaction_timeout_async", { input: {}, transactionId: "transaction_1", throwOnError: false, } - ) + )) as Awaited<{ + transaction: DistributedTransactionType + result: any + errors: any + }> - expect(transaction.flow.state).toEqual("reverted") + expect(transaction.getFlow().state).toEqual("reverted") expect(result).toEqual(undefined) expect(errors).toHaveLength(1) expect(errors[0].action).toEqual("step_1") diff --git a/packages/modules/workflow-engine-redis/integration-tests/utils/database.ts b/packages/modules/workflow-engine-redis/integration-tests/utils/database.ts index 3fe2da0681..7b027f5995 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/utils/database.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/utils/database.ts @@ -15,11 +15,11 @@ const redisUrl = process.env.REDIS_URL || "redis://localhost:6379" const redis = new Redis(redisUrl) interface TestDatabase { - clearTables(knex): Promise + clearTables(): Promise } export const TestDatabase: TestDatabase = { - clearTables: async (knex) => { + clearTables: async () => { await cleanRedis() }, } diff --git a/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json b/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json new file mode 100644 index 0000000000..66a9a97c21 --- /dev/null +++ b/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json @@ -0,0 +1,163 @@ +{ + "namespaces": [ + "public" + ], + "name": "public", + "tables": [ + { + "columns": { + "workflow_id": { + "name": "workflow_id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "transaction_id": { + "name": "transaction_id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "id": { + "name": "id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "execution": { + "name": "execution", + "type": "jsonb", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "mappedType": "json" + }, + "context": { + "name": "context", + "type": "jsonb", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "mappedType": "json" + }, + "state": { + "name": "state", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "enumItems": [ + "not_started", + "invoking", + "waiting_to_compensate", + "compensating", + "done", + "reverted", + "failed" + ], + "mappedType": "enum" + }, + "created_at": { + "name": "created_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "length": 6, + "default": "now()", + "mappedType": "datetime" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "length": 6, + "default": "now()", + "mappedType": "datetime" + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "length": 6, + "mappedType": "datetime" + } + }, + "name": "workflow_execution", + "schema": "public", + "indexes": [ + { + "keyName": "IDX_workflow_execution_deleted_at", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_deleted_at\" ON \"workflow_execution\" (deleted_at) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_id", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_id\" ON \"workflow_execution\" (id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_workflow_id", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id\" ON \"workflow_execution\" (workflow_id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_transaction_id", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_transaction_id\" ON \"workflow_execution\" (transaction_id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_state", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state\" ON \"workflow_execution\" (state) WHERE deleted_at IS NULL" + }, + { + "keyName": "workflow_execution_pkey", + "columnNames": [ + "workflow_id", + "transaction_id" + ], + "composite": true, + "primary": true, + "unique": true + } + ], + "checks": [], + "foreignKeys": {} + } + ] +} diff --git a/packages/modules/workflow-engine-redis/src/migrations/Migration20241206123341.ts b/packages/modules/workflow-engine-redis/src/migrations/Migration20241206123341.ts new file mode 100644 index 0000000000..50c0680ae9 --- /dev/null +++ b/packages/modules/workflow-engine-redis/src/migrations/Migration20241206123341.ts @@ -0,0 +1,27 @@ +import { Migration } from "@mikro-orm/migrations" + +export class Migration20241206123341 extends Migration { + async up(): Promise { + this.addSql( + `DROP INDEX IF EXISTS "IDX_workflow_execution_id"; + DROP INDEX IF EXISTS "IDX_workflow_execution_workflow_id"; + DROP INDEX IF EXISTS "IDX_workflow_execution_transaction_id"; + DROP INDEX IF EXISTS "IDX_workflow_execution_state";` + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_deleted_at" ON "workflow_execution" (deleted_at) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_id" ON "workflow_execution" (id) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id" ON "workflow_execution" (workflow_id) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_transaction_id" ON "workflow_execution" (transaction_id) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_state" ON "workflow_execution" (state) WHERE deleted_at IS NULL;' + ) + } +} diff --git a/packages/modules/workflow-engine-redis/src/models/index.ts b/packages/modules/workflow-engine-redis/src/models/index.ts index 78fcbfa921..fa5b8a3dd0 100644 --- a/packages/modules/workflow-engine-redis/src/models/index.ts +++ b/packages/modules/workflow-engine-redis/src/models/index.ts @@ -1 +1 @@ -export { default as WorkflowExecution } from "./workflow-execution" +export { WorkflowExecution } from "./workflow-execution" diff --git a/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts b/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts index 22e693d428..c41bc8936e 100644 --- a/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts +++ b/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts @@ -1,76 +1,30 @@ import { TransactionState } from "@medusajs/framework/orchestration" -import { DALUtils, generateEntityId } from "@medusajs/framework/utils" -import { - BeforeCreate, - Entity, - Enum, - Filter, - Index, - OnInit, - OptionalProps, - PrimaryKey, - Property, - Unique, -} from "@mikro-orm/core" +import { model } from "@medusajs/framework/utils" -type OptionalFields = "deleted_at" - -@Entity() -@Unique({ - name: "IDX_workflow_execution_workflow_id_transaction_id_unique", - properties: ["workflow_id", "transaction_id"], -}) -@Filter(DALUtils.mikroOrmSoftDeletableFilterOptions) -export default class WorkflowExecution { - [OptionalProps]?: OptionalFields - - @Property({ columnType: "text", nullable: false }) - @Index({ name: "IDX_workflow_execution_id" }) - id!: string - - @Index({ name: "IDX_workflow_execution_workflow_id" }) - @PrimaryKey({ columnType: "text" }) - workflow_id: string - - @Index({ name: "IDX_workflow_execution_transaction_id" }) - @PrimaryKey({ columnType: "text" }) - transaction_id: string - - @Property({ columnType: "jsonb", nullable: true }) - execution: Record | null = null - - @Property({ columnType: "jsonb", nullable: true }) - context: Record | null = null - - @Index({ name: "IDX_workflow_execution_state" }) - @Enum(() => TransactionState) - state: TransactionState - - @Property({ - onCreate: () => new Date(), - columnType: "timestamptz", - defaultRaw: "now()", +export const WorkflowExecution = model + .define("workflow_execution", { + id: model.id({ prefix: "wf_exec" }), + workflow_id: model.text().primaryKey(), + transaction_id: model.text().primaryKey(), + execution: model.json().nullable(), + context: model.json().nullable(), + state: model.enum(TransactionState), }) - created_at: Date - - @Property({ - onCreate: () => new Date(), - onUpdate: () => new Date(), - columnType: "timestamptz", - defaultRaw: "now()", - }) - updated_at: Date - - @Property({ columnType: "timestamptz", nullable: true }) - deleted_at: Date | null = null - - @BeforeCreate() - onCreate() { - this.id = generateEntityId(this.id, "wf_exec") - } - - @OnInit() - onInit() { - this.id = generateEntityId(this.id, "wf_exec") - } -} + .indexes([ + { + on: ["id"], + where: "deleted_at IS NULL", + }, + { + on: ["workflow_id"], + where: "deleted_at IS NULL", + }, + { + on: ["transaction_id"], + where: "deleted_at IS NULL", + }, + { + on: ["state"], + where: "deleted_at IS NULL", + }, + ]) diff --git a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts index 37c371356f..7463623e94 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts @@ -1,6 +1,7 @@ import { Context, DAL, + InferEntityType, InternalModuleDeclaration, ModulesSdkTypes, WorkflowsSdkTypes, @@ -25,9 +26,11 @@ type InjectedDependencies = { } export class WorkflowsModuleService< - TWorkflowExecution extends WorkflowExecution = WorkflowExecution + TWorkflowExecution extends InferEntityType< + typeof WorkflowExecution + > = InferEntityType > extends ModulesSdkUtils.MedusaService<{ - WorkflowExecution: { dto: WorkflowExecution } + WorkflowExecution: { dto: InferEntityType } }>({ WorkflowExecution }) { protected baseRepository_: DAL.RepositoryService protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService