diff --git a/.changeset/stupid-wasps-greet.md b/.changeset/stupid-wasps-greet.md new file mode 100644 index 0000000000..617ee5258c --- /dev/null +++ b/.changeset/stupid-wasps-greet.md @@ -0,0 +1,8 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/orchestration": patch +"@medusajs/workflows-sdk": patch +--- + +feat(workflows-*): Allow to re run non idempotent but stored workflow with the same transaction id if considered done diff --git a/packages/core/orchestration/package.json b/packages/core/orchestration/package.json index b5ee0276fb..472ec3e8b9 100644 --- a/packages/core/orchestration/package.json +++ b/packages/core/orchestration/package.json @@ -40,7 +40,8 @@ }, "dependencies": { "@medusajs/types": "2.7.1", - "@medusajs/utils": "2.7.1" + "@medusajs/utils": "2.7.1", + "ulid": "^2.3.0" }, "peerDependencies": { "@mikro-orm/core": "6.4.3", diff --git a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts index d1ad9a4e71..48b1ca5911 100644 --- a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts @@ -73,6 +73,7 @@ describe("Transaction Orchestrator", () => { await strategy.resume(transaction) expect(transaction.transactionId).toBe("transaction_id_123") + expect(transaction.runId).toEqual(expect.any(String)) expect(transaction.getState()).toBe(TransactionState.DONE) expect(mocks.one).toBeCalledWith( diff --git a/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts b/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts index 3bd37150c1..c16a0f07d3 100644 --- a/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts +++ b/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts @@ -19,7 +19,7 @@ export interface IDistributedSchedulerStorage { export interface IDistributedTransactionStorage { get( key: string, - options?: TransactionOptions + options?: TransactionOptions & { isCancelling?: boolean } ): Promise list(): Promise save( diff --git a/packages/core/orchestration/src/transaction/distributed-transaction.ts b/packages/core/orchestration/src/transaction/distributed-transaction.ts index e312aff39f..9778d3c638 100644 --- a/packages/core/orchestration/src/transaction/distributed-transaction.ts +++ b/packages/core/orchestration/src/transaction/distributed-transaction.ts @@ -79,6 +79,7 @@ export class TransactionPayload { class DistributedTransaction extends EventEmitter { public modelId: string public transactionId: string + public runId: string private readonly errors: TransactionStepError[] = [] private readonly context: TransactionContext = new TransactionContext() @@ -109,7 +110,7 @@ class DistributedTransaction extends EventEmitter { this.transactionId = flow.transactionId this.modelId = flow.modelId - + this.runId = flow.runId if (errors) { this.errors = errors } @@ -220,7 +221,8 @@ class DistributedTransaction extends EventEmitter { public static async loadTransaction( modelId: string, - transactionId: string + transactionId: string, + options?: { isCancelling?: boolean } ): Promise { const key = TransactionOrchestrator.getKeyName( DistributedTransaction.keyPrefix, @@ -228,12 +230,13 @@ class DistributedTransaction extends EventEmitter { transactionId ) - const options = TransactionOrchestrator.getWorkflowOptions(modelId) + const workflowOptions = TransactionOrchestrator.getWorkflowOptions(modelId) + + const loadedData = await DistributedTransaction.keyValueStore.get(key, { + ...workflowOptions, + isCancelling: options?.isCancelling, + }) - const loadedData = await DistributedTransaction.keyValueStore.get( - key, - options - ) if (loadedData) { return loadedData } diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 9c0df8b0a9..6d42cf2172 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -1,3 +1,4 @@ +import { ulid } from "ulid" import { DistributedTransaction, DistributedTransactionType, @@ -786,10 +787,13 @@ export class TransactionOrchestrator extends EventEmitter { const execution: Promise[] = [] for (const step of nextSteps.next) { - const { stopStepExecution } = this.prepareStepForExecution(step, flow) + const { shouldContinueExecution } = this.prepareStepForExecution( + step, + flow + ) // Should stop the execution if next step cant be handled - if (!stopStepExecution) { + if (!shouldContinueExecution) { continue } @@ -867,7 +871,7 @@ export class TransactionOrchestrator extends EventEmitter { private prepareStepForExecution( step: TransactionStep, flow: TransactionFlow - ): { stopStepExecution: boolean } { + ): { shouldContinueExecution: boolean } { const curState = step.getStates() step.lastAttempt = Date.now() @@ -883,7 +887,7 @@ export class TransactionOrchestrator extends EventEmitter { if (step.definition.noCompensation) { step.changeState(TransactionStepState.REVERTED) - return { stopStepExecution: false } + return { shouldContinueExecution: false } } } else if (flow.state === TransactionState.INVOKING) { step.changeState(TransactionStepState.INVOKING) @@ -892,7 +896,7 @@ export class TransactionOrchestrator extends EventEmitter { step.changeStatus(TransactionStepStatus.WAITING) - return { stopStepExecution: true } + return { shouldContinueExecution: true } } /** @@ -1239,6 +1243,7 @@ export class TransactionOrchestrator extends EventEmitter { } flow.state = TransactionState.WAITING_TO_COMPENSATE + flow.cancelledAt = Date.now() await this.executeNext(transaction) } @@ -1264,7 +1269,8 @@ export class TransactionOrchestrator extends EventEmitter { hasStepTimeouts || hasRetriesTimeout || hasTransactionTimeout || - isIdempotent + isIdempotent || + this.options.retentionTime ) { this.options.store = true } @@ -1292,6 +1298,7 @@ export class TransactionOrchestrator extends EventEmitter { modelId: this.id, options: this.options, transactionId: transactionId, + runId: ulid(), metadata: flowMetadata, hasAsyncSteps: features.hasAsyncSteps, hasFailedSteps: false, @@ -1310,11 +1317,13 @@ export class TransactionOrchestrator extends EventEmitter { private static async loadTransactionById( modelId: string, - transactionId: string + transactionId: string, + options?: { isCancelling?: boolean } ): Promise { const transaction = await DistributedTransaction.loadTransaction( modelId, - transactionId + transactionId, + options ) if (transaction !== null) { @@ -1487,10 +1496,15 @@ export class TransactionOrchestrator extends EventEmitter { */ public async retrieveExistingTransaction( transactionId: string, - handler: TransactionStepHandler + handler: TransactionStepHandler, + options?: { isCancelling?: boolean } ): Promise { const existingTransaction = - await TransactionOrchestrator.loadTransactionById(this.id, transactionId) + await TransactionOrchestrator.loadTransactionById( + this.id, + transactionId, + { isCancelling: options?.isCancelling } + ) if (!existingTransaction) { throw new MedusaError( diff --git a/packages/core/orchestration/src/transaction/types.ts b/packages/core/orchestration/src/transaction/types.ts index 785df77842..e50fd3c827 100644 --- a/packages/core/orchestration/src/transaction/types.ts +++ b/packages/core/orchestration/src/transaction/types.ts @@ -261,6 +261,7 @@ export type TransactionFlow = { options?: TransactionModelOptions definition: TransactionStepsDefinition transactionId: string + runId: string metadata?: { eventGroupId?: string parentIdempotencyKey?: string @@ -277,6 +278,7 @@ export type TransactionFlow = { hasRevertedSteps: boolean timedOutAt: number | null startedAt?: number + cancelledAt?: number state: TransactionState steps: { [key: string]: TransactionStep diff --git a/packages/core/orchestration/src/workflow/local-workflow.ts b/packages/core/orchestration/src/workflow/local-workflow.ts index ffd368353d..15ab1a1abb 100644 --- a/packages/core/orchestration/src/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/workflow/local-workflow.ts @@ -386,7 +386,8 @@ export class LocalWorkflow { const transaction = await orchestrator.retrieveExistingTransaction( uniqueTransactionId, - handler(this.container_, context) + handler(this.container_, context), + { isCancelling: context?.isCancelling } ) return transaction diff --git a/packages/core/types/src/shared-context.ts b/packages/core/types/src/shared-context.ts index 1a21abc630..3af0ee09a3 100644 --- a/packages/core/types/src/shared-context.ts +++ b/packages/core/types/src/shared-context.ts @@ -78,4 +78,9 @@ export type Context = { * preventReleaseEvents */ preventReleaseEvents?: boolean + + /** + * A boolean value indicating whether the current workflow execution is being cancelled. + */ + isCancelling?: boolean } diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index 872639a581..3d3d6122cb 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -105,6 +105,8 @@ function createContextualWorkflowRunner< preventReleaseEvents, } + context.isCancelling = isCancel + const args = [ transactionOrIdOrIdempotencyKey, input, @@ -188,7 +190,7 @@ function createContextualWorkflowRunner< __type: MedusaContextType as Context["__type"], } - context.transactionId ??= ulid() + context.transactionId ??= "auto-" + ulid() context.eventGroupId ??= ulid() return await originalExecution( diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts index 87a9596f4a..04635d6d64 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts @@ -6,3 +6,4 @@ export * from "./workflow_idempotent" export * from "./workflow_step_timeout" export * from "./workflow_sync" export * from "./workflow_transaction_timeout" +export * from "./workflow_not_idempotent_with_retention" diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts new file mode 100644 index 0000000000..dd179f4b34 --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts @@ -0,0 +1,71 @@ +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/framework/workflows-sdk" + +const step_1 = createStep( + "step_1", + jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) + }), + jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) + }) +) + +export const workflowNotIdempotentWithRetentionStep2Invoke = jest.fn( + (input, context) => { + if (input) { + return new StepResponse({ notAsyncResponse: input.hey }) + } + } +) +const step_2 = createStep( + "step_2", + workflowNotIdempotentWithRetentionStep2Invoke, + jest.fn((_, context) => { + return new StepResponse({ + step: context.metadata.action, + idempotency_key: context.metadata.idempotency_key, + reverted: true, + }) + }) +) + +export const workflowNotIdempotentWithRetentionStep3Invoke = jest.fn((res) => { + return new StepResponse({ + done: { + inputFromSyncStep: res.notAsyncResponse, + }, + }) +}) +const step_3 = createStep( + "step_3", + workflowNotIdempotentWithRetentionStep3Invoke +) + +createWorkflow( + { + name: "workflow_not_idempotent_with_retention", + retentionTime: 60, + }, + function (input) { + step_1(input) + + step_2({ hey: "oh" }) + + const ret2 = step_2({ hey: "hello" }).config({ + name: "new_step_name", + }) + + return step_3(ret2) + } +) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index 8eae13339b..b97a5698a6 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -1,5 +1,6 @@ import { DistributedTransactionType, + TransactionState, WorkflowManager, } from "@medusajs/framework/orchestration" import { @@ -10,6 +11,7 @@ import { import { Module, Modules, + promiseAll, TransactionHandlerType, } from "@medusajs/framework/utils" import { @@ -21,6 +23,7 @@ import { import { moduleIntegrationTestRunner } from "@medusajs/test-utils" import { WorkflowsModuleService } from "@services" import { asFunction } from "awilix" +import { ulid } from "ulid" import { setTimeout as setTimeoutSync } from "timers" import { setTimeout as setTimeoutPromise } from "timers/promises" import "../__fixtures__" @@ -29,6 +32,8 @@ import { conditionalStep3Invoke, workflow2Step2Invoke, workflow2Step3Invoke, + workflowNotIdempotentWithRetentionStep2Invoke, + workflowNotIdempotentWithRetentionStep3Invoke, } from "../__fixtures__" import { eventGroupWorkflowId, @@ -43,7 +48,7 @@ const failTrap = (done) => { setTimeoutSync(() => { // REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending console.warn( - "Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually." + `Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually.` ) done() }, 5000) @@ -94,10 +99,55 @@ moduleIntegrationTestRunner({ serviceName: "workflows", field: "workflowExecution", }, + run_id: { + linkable: "workflow_execution_run_id", + entity: "WorkflowExecution", + primaryKey: "run_id", + serviceName: "workflows", + field: "workflowExecution", + }, }, }) }) + it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => { + const transactionId = "transaction_id" + const workflowId = "workflow_id" + ulid() + + const step1 = createStep("step1", async () => { + await setTimeoutPromise(100) + return new StepResponse("step1") + }) + + createWorkflow( + { + name: workflowId, + retentionTime: 1000, + }, + function () { + return new WorkflowResponse(step1()) + } + ) + + const [result1, result2] = await promiseAll([ + workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }), + workflowOrcModule + .run(workflowId, { + input: {}, + transactionId, + }) + .catch((e) => e), + ]) + + expect(result1.result).toEqual("step1") + expect(result2.message).toEqual( + "Transaction already started for transactionId: " + transactionId + ) + }) + it("should execute an async workflow keeping track of the event group id provided in the context", async () => { const eventGroupId = "event-group-id" @@ -232,6 +282,7 @@ moduleIntegrationTestRunner({ { obj: "return from 2" }, { obj: "return from 3" }, ]) + done() } }, }) @@ -309,13 +360,13 @@ moduleIntegrationTestRunner({ stepResponse: { uhuuuu: "yeaah!" }, }) - expect(workflow2Step2Invoke).toBeCalledTimes(2) + expect(workflow2Step2Invoke).toHaveBeenCalledTimes(2) expect(workflow2Step2Invoke.mock.calls[0][0]).toEqual({ hey: "oh" }) expect(workflow2Step2Invoke.mock.calls[1][0]).toEqual({ hey: "async hello", }) - expect(workflow2Step3Invoke).toBeCalledTimes(1) + expect(workflow2Step3Invoke).toHaveBeenCalledTimes(1) expect(workflow2Step3Invoke.mock.calls[0][0]).toEqual({ uhuuuu: "yeaah!", }) @@ -327,6 +378,72 @@ moduleIntegrationTestRunner({ expect(executionsList).toHaveLength(1) }) + it("should return a list of workflow executions and keep it saved when there is a retentionTime set but allow for executing the same workflow multiple times with different run_id if the workflow is considered done", async () => { + const transactionId = "transaction_1" + await workflowOrcModule.run( + "workflow_not_idempotent_with_retention", + { + input: { + value: "123", + }, + transactionId, + } + ) + + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id", "run_id", "transaction_id"], + }) + + expect(executionsList).toHaveLength(1) + + expect( + workflowNotIdempotentWithRetentionStep2Invoke + ).toHaveBeenCalledTimes(2) + expect( + workflowNotIdempotentWithRetentionStep2Invoke.mock.calls[0][0] + ).toEqual({ hey: "oh" }) + expect( + workflowNotIdempotentWithRetentionStep2Invoke.mock.calls[1][0] + ).toEqual({ + hey: "hello", + }) + expect( + workflowNotIdempotentWithRetentionStep3Invoke + ).toHaveBeenCalledTimes(1) + expect( + workflowNotIdempotentWithRetentionStep3Invoke.mock.calls[0][0] + ).toEqual({ + notAsyncResponse: "hello", + }) + + await workflowOrcModule.run( + "workflow_not_idempotent_with_retention", + { + input: { + value: "123", + }, + transactionId, + } + ) + + const { data: executionsList2 } = await query.graph({ + entity: "workflow_executions", + filters: { + id: { $nin: executionsList.map((e) => e.id) }, + }, + fields: ["id", "run_id", "transaction_id"], + }) + + expect(executionsList2).toHaveLength(1) + expect(executionsList2[0].run_id).not.toEqual( + executionsList[0].run_id + ) + expect(executionsList2[0].transaction_id).toEqual( + executionsList[0].transaction_id + ) + }) + it("should revert the entire transaction when a step timeout expires", async () => { const { transaction } = (await workflowOrcModule.run( "workflow_step_timeout", @@ -402,6 +519,37 @@ moduleIntegrationTestRunner({ expect(transaction.getFlow().state).toEqual("reverted") }) + it("should cancel and revert a non idempotent completed workflow with rentention time given a specific transaction id", async () => { + const workflowId = "workflow_not_idempotent_with_retention" + const transactionId = "trx_123" + + await workflowOrcModule.run(workflowId, { + input: { + value: "123", + }, + transactionId, + }) + + let executions = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(executions.length).toEqual(1) + expect(executions[0].state).toEqual(TransactionState.DONE) + expect(executions[0].transaction_id).toEqual(transactionId) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + executions = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(executions.length).toEqual(1) + expect(executions[0].state).toEqual(TransactionState.REVERTED) + }) + it("should run conditional steps if condition is true", (done) => { void workflowOrcModule.subscribe({ workflowId: "workflow_conditional_step", @@ -449,8 +597,8 @@ moduleIntegrationTestRunner({ describe("Scheduled workflows", () => { beforeEach(() => { - jest.useFakeTimers() jest.clearAllMocks() + jest.useFakeTimers() // Register test-value in the container for all tests const sharedContainer = diff --git a/packages/modules/workflow-engine-inmemory/package.json b/packages/modules/workflow-engine-inmemory/package.json index 1d4eec6d8d..3580e9daf1 100644 --- a/packages/modules/workflow-engine-inmemory/package.json +++ b/packages/modules/workflow-engine-inmemory/package.json @@ -29,7 +29,7 @@ "resolve:aliases": "tsc --showConfig -p tsconfig.json > tsconfig.resolved.json && tsc-alias -p tsconfig.resolved.json && rimraf tsconfig.resolved.json", "build": "rimraf dist && tsc --build && npm run resolve:aliases", "test": "jest --passWithNoTests --runInBand --bail --forceExit -- src", - "test:integration": "jest --forceExit -- integration-tests/**/__tests__/**/*.ts", + "test:integration": "jest --forceExit -- integration-tests/**/__tests__/*.ts", "migration:initial": " MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts medusa-mikro-orm migration:create --initial", "migration:create": " MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts medusa-mikro-orm migration:create", "migration:up": " MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts medusa-mikro-orm migration:up", diff --git a/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json b/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json index e52c48397f..1dd73c39a1 100644 --- a/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json +++ b/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json @@ -24,6 +24,15 @@ "nullable": false, "mappedType": "text" }, + "run_id": { + "name": "run_id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, "id": { "name": "id", "type": "text", @@ -151,13 +160,13 @@ "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_transaction_id\" ON \"workflow_execution\" (transaction_id) WHERE deleted_at IS NULL" }, { - "keyName": "IDX_workflow_execution_workflow_id_transaction_id_unique", + "keyName": "IDX_workflow_execution_workflow_id_transaction_id_run_id_unique", "columnNames": [], "composite": false, "constraint": false, "primary": false, "unique": false, - "expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id_unique\" ON \"workflow_execution\" (workflow_id, transaction_id) WHERE deleted_at IS NULL" + "expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id_run_id_unique\" ON \"workflow_execution\" (workflow_id, transaction_id, run_id) WHERE deleted_at IS NULL" }, { "keyName": "IDX_workflow_execution_state", @@ -172,7 +181,8 @@ "keyName": "workflow_execution_pkey", "columnNames": [ "workflow_id", - "transaction_id" + "transaction_id", + "run_id" ], "composite": true, "constraint": true, diff --git a/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250505092459.ts b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250505092459.ts new file mode 100644 index 0000000000..84f58aecd2 --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250505092459.ts @@ -0,0 +1,45 @@ +import { Migration } from "@mikro-orm/migrations" +import { ulid } from "ulid" + +export class Migration20250505092459 extends Migration { + override async up(): Promise { + this.addSql( + `alter table if exists "workflow_execution" drop constraint if exists "workflow_execution_workflow_id_transaction_id_run_id_unique";` + ) + this.addSql( + `drop index if exists "IDX_workflow_execution_workflow_id_transaction_id_unique";` + ) + this.addSql( + `alter table if exists "workflow_execution" drop constraint if exists "PK_workflow_execution_workflow_id_transaction_id";` + ) + + this.addSql( + `alter table if exists "workflow_execution" add column if not exists "run_id" text not null default '${ulid()}';` + ) + this.addSql( + `CREATE UNIQUE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id_transaction_id_run_id_unique" ON "workflow_execution" (workflow_id, transaction_id, run_id) WHERE deleted_at IS NULL;` + ) + this.addSql( + `alter table if exists "workflow_execution" add constraint "workflow_execution_pkey" primary key ("workflow_id", "transaction_id", "run_id");` + ) + } + + override async down(): Promise { + this.addSql( + `drop index if exists "IDX_workflow_execution_workflow_id_transaction_id_run_id_unique";` + ) + this.addSql( + `alter table if exists "workflow_execution" drop constraint if exists "workflow_execution_pkey";` + ) + this.addSql( + `alter table if exists "workflow_execution" drop column if exists "run_id";` + ) + + this.addSql( + `CREATE UNIQUE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id_transaction_id_unique" ON "workflow_execution" (workflow_id, transaction_id) WHERE deleted_at IS NULL;` + ) + this.addSql( + `alter table if exists "workflow_execution" add constraint "workflow_execution_pkey" primary key ("workflow_id", "transaction_id");` + ) + } +} diff --git a/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts b/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts index a533a54094..15f56fbd13 100644 --- a/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts +++ b/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts @@ -6,6 +6,7 @@ export const WorkflowExecution = model id: model.id({ prefix: "wf_exec" }), workflow_id: model.text().primaryKey(), transaction_id: model.text().primaryKey(), + run_id: model.text().primaryKey(), execution: model.json().nullable(), context: model.json().nullable(), state: model.enum(TransactionState), @@ -25,7 +26,7 @@ export const WorkflowExecution = model where: "deleted_at IS NULL", }, { - on: ["workflow_id", "transaction_id"], + on: ["workflow_id", "transaction_id", "run_id"], unique: true, where: "deleted_at IS NULL", }, diff --git a/packages/modules/workflow-engine-inmemory/src/schema/index.ts b/packages/modules/workflow-engine-inmemory/src/schema/index.ts index 2d332c551c..c0e1e8c57f 100644 --- a/packages/modules/workflow-engine-inmemory/src/schema/index.ts +++ b/packages/modules/workflow-engine-inmemory/src/schema/index.ts @@ -16,6 +16,7 @@ type WorkflowExecution { deleted_at: DateTime workflow_id: string transaction_id: string + run_id: string execution: JSON context: JSON state: TransactionState diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 8d940b31f2..1f16c71332 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -31,6 +31,7 @@ export type WorkflowOrchestratorRunOptions = Omit< "container" > & { transactionId?: string + runId?: string container?: ContainerLike } @@ -140,7 +141,7 @@ export class WorkflowOrchestratorService { let { throwOnError, context } = options ?? {} throwOnError ??= true context ??= {} - context.transactionId = transactionId ?? ulid() + context.transactionId = transactionId ?? "auto-" + ulid() const workflowId = isString(workflowIdOrWorkflow) ? workflowIdOrWorkflow @@ -259,7 +260,10 @@ export class WorkflowOrchestratorService { const transaction = await this.getRunningTransaction( workflowId, transactionId, - options + { + ...options, + isCancelling: true, + } ) if (!transaction) { diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index dddef0ec99..59f0e493fd 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -5,11 +5,17 @@ import { SchedulerOptions, SkipExecutionError, TransactionCheckpoint, + TransactionContext, TransactionFlow, TransactionOptions, TransactionStep, + TransactionStepError, } from "@medusajs/framework/orchestration" -import { Logger, ModulesSdkTypes } from "@medusajs/framework/types" +import { + InferEntityType, + Logger, + ModulesSdkTypes, +} from "@medusajs/framework/types" import { MedusaError, TransactionState, @@ -19,6 +25,7 @@ import { } from "@medusajs/framework/utils" import { WorkflowOrchestratorService } from "@services" import { type CronExpression, parseExpression } from "cron-parser" +import { WorkflowExecution } from "../models/workflow-execution" function parseNextExecution( optionsOrExpression: SchedulerOptions | CronExpression | string | number @@ -86,6 +93,7 @@ export class InMemoryDistributedTransactionStorage { workflow_id: data.flow.modelId, transaction_id: data.flow.transactionId, + run_id: data.flow.runId, execution: data.flow, context: { data: data.context, @@ -102,13 +110,16 @@ export class InMemoryDistributedTransactionStorage { workflow_id: data.flow.modelId, transaction_id: data.flow.transactionId, + run_id: data.flow.runId, }, ]) } async get( key: string, - options?: TransactionOptions + options?: TransactionOptions & { + isCancelling?: boolean + } ): Promise { const data = this.storage.get(key) @@ -122,23 +133,53 @@ export class InMemoryDistributedTransactionStorage } const [_, workflowId, transactionId] = key.split(":") - const trx = await this.workflowExecutionService_ - .retrieve( - { - workflow_id: workflowId, - transaction_id: transactionId, - }, - { - select: ["execution", "context"], - } - ) - .catch(() => undefined) + const trx: InferEntityType | undefined = + await this.workflowExecutionService_ + .list( + { + workflow_id: workflowId, + transaction_id: transactionId, + }, + { + select: ["execution", "context"], + order: { + id: "desc", + }, + take: 1, + } + ) + .then((trx) => trx[0]) + .catch(() => undefined) if (trx) { + const execution = trx.execution as TransactionFlow + + if (!idempotent) { + const isFailedOrReverted = [ + TransactionState.REVERTED, + TransactionState.FAILED, + ].includes(execution.state) + + const isDone = execution.state === TransactionState.DONE + + const isCancellingAndFailedOrReverted = + options?.isCancelling && isFailedOrReverted + + const isNotCancellingAndDoneOrFailedOrReverted = + !options?.isCancelling && (isDone || isFailedOrReverted) + + if ( + isCancellingAndFailedOrReverted || + isNotCancellingAndDoneOrFailedOrReverted + ) { + return + } + } + return { - flow: trx.execution, - context: trx.context.data, - errors: trx.context.errors, + flow: trx.execution as TransactionFlow, + context: trx.context?.data as TransactionContext, + errors: trx.context?.errors as TransactionStepError[], } } @@ -181,6 +222,20 @@ export class InMemoryDistributedTransactionStorage } // Store in memory + const isNotStarted = data.flow.state === TransactionState.NOT_STARTED + const isManualTransactionId = !data.flow.transactionId.startsWith("auto-") + + if (isNotStarted && isManualTransactionId) { + const storedData = this.storage.get(key) + if (storedData) { + throw new MedusaError( + MedusaError.Types.INVALID_ARGUMENT, + "Transaction already started for transactionId: " + + data.flow.transactionId + ) + } + } + this.storage.set(key, data) // Optimize DB operations - only perform when necessary @@ -206,15 +261,23 @@ export class InMemoryDistributedTransactionStorage key: string options?: TransactionOptions }) { - const isInitialCheckpoint = data.flow.state === TransactionState.NOT_STARTED + const isInitialCheckpoint = [TransactionState.NOT_STARTED].includes( + data.flow.state + ) /** * In case many execution can succeed simultaneously, we need to ensure that the latest * execution does continue if a previous execution is considered finished */ const currentFlow = data.flow + + const getOptions = { + ...options, + isCancelling: !!data.flow.cancelledAt, + } as Parameters[1] + const { flow: latestUpdatedFlow } = - (await this.get(key, options)) ?? + (await this.get(key, getOptions)) ?? ({ flow: {} } as { flow: TransactionFlow }) if (!isInitialCheckpoint && !isPresent(latestUpdatedFlow)) { diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/index.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/index.ts index 3b47cf4e8d..c256744924 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/index.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/index.ts @@ -6,3 +6,4 @@ export * from "./workflow_step_timeout" export * from "./workflow_sync" export * from "./workflow_transaction_timeout" export * from "./workflow_when" +export * from "./workflow_not_idempotent_with_retention" diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts new file mode 100644 index 0000000000..dd179f4b34 --- /dev/null +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts @@ -0,0 +1,71 @@ +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/framework/workflows-sdk" + +const step_1 = createStep( + "step_1", + jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) + }), + jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) + }) +) + +export const workflowNotIdempotentWithRetentionStep2Invoke = jest.fn( + (input, context) => { + if (input) { + return new StepResponse({ notAsyncResponse: input.hey }) + } + } +) +const step_2 = createStep( + "step_2", + workflowNotIdempotentWithRetentionStep2Invoke, + jest.fn((_, context) => { + return new StepResponse({ + step: context.metadata.action, + idempotency_key: context.metadata.idempotency_key, + reverted: true, + }) + }) +) + +export const workflowNotIdempotentWithRetentionStep3Invoke = jest.fn((res) => { + return new StepResponse({ + done: { + inputFromSyncStep: res.notAsyncResponse, + }, + }) +}) +const step_3 = createStep( + "step_3", + workflowNotIdempotentWithRetentionStep3Invoke +) + +createWorkflow( + { + name: "workflow_not_idempotent_with_retention", + retentionTime: 60, + }, + function (input) { + step_1(input) + + step_2({ hey: "oh" }) + + const ret2 = step_2({ hey: "hello" }).config({ + name: "new_step_name", + }) + + return step_3(ret2) + } +) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 17baf4c5fe..26cadc61fa 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -16,6 +16,7 @@ import { ContainerRegistrationKeys, Module, Modules, + promiseAll, TransactionHandlerType, TransactionStepState, } from "@medusajs/framework/utils" @@ -33,14 +34,19 @@ import { WorkflowsModuleService } from "../../src/services" import "../__fixtures__" import { createScheduled } from "../__fixtures__/workflow_scheduled" import { TestDatabase } from "../utils" +import { + workflowNotIdempotentWithRetentionStep2Invoke, + workflowNotIdempotentWithRetentionStep3Invoke, +} from "../__fixtures__" +import { ulid } from "ulid" jest.setTimeout(300000) -const failTrap = (done) => { +const failTrap = (done, name) => { setTimeoutSync(() => { // REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending console.warn( - "Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually." + `Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually. ${name}` ) done() }, 5000) @@ -132,11 +138,56 @@ moduleIntegrationTestRunner({ primaryKey: "workflow_id", serviceName: "workflows", }, + run_id: { + entity: "WorkflowExecution", + field: "workflowExecution", + linkable: "workflow_execution_run_id", + primaryKey: "run_id", + serviceName: "workflows", + }, }, }) }) describe("Testing basic workflow", function () { + it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => { + const transactionId = "transaction_id" + const workflowId = "workflow_id" + ulid() + + const step1 = createStep("step1", async () => { + await setTimeout(100) + return new StepResponse("step1") + }) + + createWorkflow( + { + name: workflowId, + retentionTime: 1000, + }, + function () { + return new WorkflowResponse(step1()) + } + ) + + const [result1, result2] = await promiseAll([ + workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }), + workflowOrcModule + .run(workflowId, { + input: {}, + transactionId, + }) + .catch((e) => e), + ]) + + expect(result1.result).toEqual("step1") + expect(result2.message).toEqual( + "Transaction already started for transactionId: " + transactionId + ) + }) + it("should return a list of workflow executions and remove after completed when there is no retentionTime set", async () => { await workflowOrcModule.run("workflow_1", { input: { @@ -294,6 +345,72 @@ moduleIntegrationTestRunner({ expect(done).toBe(true) }) + it("should return a list of workflow executions and keep it saved when there is a retentionTime set but allow for executing the same workflow multiple times with different run_id if the workflow is considered done", async () => { + const transactionId = "transaction_1" + await workflowOrcModule.run( + "workflow_not_idempotent_with_retention", + { + input: { + value: "123", + }, + transactionId, + } + ) + + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id", "run_id", "transaction_id"], + }) + + expect(executionsList).toHaveLength(1) + + expect( + workflowNotIdempotentWithRetentionStep2Invoke + ).toHaveBeenCalledTimes(2) + expect( + workflowNotIdempotentWithRetentionStep2Invoke.mock.calls[0][0] + ).toEqual({ hey: "oh" }) + expect( + workflowNotIdempotentWithRetentionStep2Invoke.mock.calls[1][0] + ).toEqual({ + hey: "hello", + }) + expect( + workflowNotIdempotentWithRetentionStep3Invoke + ).toHaveBeenCalledTimes(1) + expect( + workflowNotIdempotentWithRetentionStep3Invoke.mock.calls[0][0] + ).toEqual({ + notAsyncResponse: "hello", + }) + + await workflowOrcModule.run( + "workflow_not_idempotent_with_retention", + { + input: { + value: "123", + }, + transactionId, + } + ) + + const { data: executionsList2 } = await query.graph({ + entity: "workflow_executions", + filters: { + id: { $nin: executionsList.map((e) => e.id) }, + }, + fields: ["id", "run_id", "transaction_id"], + }) + + expect(executionsList2).toHaveLength(1) + expect(executionsList2[0].run_id).not.toEqual( + executionsList[0].run_id + ) + expect(executionsList2[0].transaction_id).toEqual( + executionsList[0].transaction_id + ) + }) + it("should revert the entire transaction when a step timeout expires", async () => { const { transaction, result, errors } = (await workflowOrcModule.run( "workflow_step_timeout", @@ -386,7 +503,7 @@ moduleIntegrationTestRunner({ throwOnError: false, }) - await setTimeout(200) + await setTimeout(500) const { transaction, result, errors } = (await workflowOrcModule.run( "workflow_transaction_timeout_async", @@ -437,7 +554,7 @@ moduleIntegrationTestRunner({ }, }) - failTrap(done) + failTrap(done, "workflow_async_background") }) it("should subscribe to a async workflow and receive the response when it finishes", (done) => { @@ -466,7 +583,7 @@ moduleIntegrationTestRunner({ expect(onFinish).toHaveBeenCalledTimes(0) - failTrap(done) + failTrap(done, "workflow_async_background") }) it("should not skip step if condition is true", function (done) { @@ -488,7 +605,7 @@ moduleIntegrationTestRunner({ }, }) - failTrap(done) + failTrap(done, "wf-when") }) it("should cancel an async sub workflow when compensating", (done) => { @@ -526,7 +643,7 @@ moduleIntegrationTestRunner({ }, }) - failTrap(done) + failTrap(done, "workflow_async_background_fail") }) it("should cancel and revert a completed workflow", async () => { diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts index 3e1333bb9a..ad7028f429 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts @@ -10,6 +10,7 @@ import { import { moduleIntegrationTestRunner } from "@medusajs/test-utils" import { setTimeout as setTimeoutSync } from "timers" import { setTimeout } from "timers/promises" +import { ulid } from "ulid" import "../__fixtures__" jest.setTimeout(300000) @@ -38,6 +39,8 @@ moduleIntegrationTestRunner({ describe("Testing race condition of the workflow during retry", () => { it("should prevent race continuation of the workflow during retryIntervalAwaiting in background execution", (done) => { const transactionId = "transaction_id" + const workflowId = "workflow-1" + ulid() + const subWorkflowId = "sub-" + workflowId const step0InvokeMock = jest.fn() const step1InvokeMock = jest.fn() @@ -60,12 +63,12 @@ moduleIntegrationTestRunner({ return new StepResponse({ result: input }) }) - const subWorkflow = createWorkflow("sub-workflow-1", function () { + const subWorkflow = createWorkflow(subWorkflowId, function () { const status = step1() return new WorkflowResponse(status) }) - createWorkflow("workflow-1", function () { + createWorkflow(workflowId, function () { const build = step0() const status = subWorkflow.runAsStep({} as any).config({ @@ -87,21 +90,28 @@ moduleIntegrationTestRunner({ }) void workflowOrcModule.subscribe({ - workflowId: "workflow-1", + workflowId, transactionId, - subscriber: (event) => { + subscriber: async (event) => { if (event.eventType === "onFinish") { - expect(step0InvokeMock).toHaveBeenCalledTimes(1) - expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(1) - expect(step2InvokeMock).toHaveBeenCalledTimes(1) - expect(transformMock).toHaveBeenCalledTimes(1) - setTimeoutSync(done, 500) + try { + expect(step0InvokeMock).toHaveBeenCalledTimes(1) + expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(1) + expect(step2InvokeMock).toHaveBeenCalledTimes(1) + expect(transformMock).toHaveBeenCalledTimes(1) + + // Prevent killing the test to early + await setTimeout(500) + done() + } catch (e) { + return done(e) + } } }, }) workflowOrcModule - .run("workflow-1", { transactionId }) + .run(workflowId, { transactionId }) .then(({ result }) => { expect(result).toBe("result from step 0") }) @@ -179,14 +189,19 @@ moduleIntegrationTestRunner({ transactionId, subscriber: (event) => { if (event.eventType === "onFinish") { - expect(step0InvokeMock).toHaveBeenCalledTimes(1) - expect(step0CompensateMock).toHaveBeenCalledTimes(2) // TODO: review this. - expect(step1InvokeMock).toHaveBeenCalledTimes(1) - expect(step1CompensateMock).toHaveBeenCalledTimes(1) - expect(step2InvokeMock).toHaveBeenCalledTimes(0) - expect(transformMock).toHaveBeenCalledTimes(0) - - done() + try { + expect(step0InvokeMock).toHaveBeenCalledTimes(1) + expect(step0CompensateMock).toHaveBeenCalledTimes(1) + expect( + step1InvokeMock.mock.calls.length + ).toBeGreaterThanOrEqual(2) // Called every 0.1s at least (it can take more than 0.1sdepending on the event loop congestions) + expect(step1CompensateMock).toHaveBeenCalledTimes(1) + expect(step2InvokeMock).toHaveBeenCalledTimes(0) + expect(transformMock).toHaveBeenCalledTimes(0) + done() + } catch (e) { + return done(e) + } } }, }) diff --git a/packages/modules/workflow-engine-redis/package.json b/packages/modules/workflow-engine-redis/package.json index 561f4e2a07..ce1bd28150 100644 --- a/packages/modules/workflow-engine-redis/package.json +++ b/packages/modules/workflow-engine-redis/package.json @@ -29,7 +29,7 @@ "resolve:aliases": "tsc --showConfig -p tsconfig.json > tsconfig.resolved.json && tsc-alias -p tsconfig.resolved.json && rimraf tsconfig.resolved.json", "build": "rimraf dist && tsc --build && npm run resolve:aliases", "test": "jest --passWithNoTests --runInBand --bail --forceExit -- src", - "test:integration": "jest --forceExit -- integration-tests/**/__tests__/**/*.ts", + "test:integration": "jest --forceExit -- integration-tests/**/__tests__/*.ts", "migration:initial": " MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts medusa-mikro-orm migration:create --initial", "migration:create": " MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts medusa-mikro-orm migration:create", "migration:up": " MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts medusa-mikro-orm migration:up", diff --git a/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json b/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json index e52c48397f..1dd73c39a1 100644 --- a/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json +++ b/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json @@ -24,6 +24,15 @@ "nullable": false, "mappedType": "text" }, + "run_id": { + "name": "run_id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, "id": { "name": "id", "type": "text", @@ -151,13 +160,13 @@ "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_transaction_id\" ON \"workflow_execution\" (transaction_id) WHERE deleted_at IS NULL" }, { - "keyName": "IDX_workflow_execution_workflow_id_transaction_id_unique", + "keyName": "IDX_workflow_execution_workflow_id_transaction_id_run_id_unique", "columnNames": [], "composite": false, "constraint": false, "primary": false, "unique": false, - "expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id_unique\" ON \"workflow_execution\" (workflow_id, transaction_id) WHERE deleted_at IS NULL" + "expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id_run_id_unique\" ON \"workflow_execution\" (workflow_id, transaction_id, run_id) WHERE deleted_at IS NULL" }, { "keyName": "IDX_workflow_execution_state", @@ -172,7 +181,8 @@ "keyName": "workflow_execution_pkey", "columnNames": [ "workflow_id", - "transaction_id" + "transaction_id", + "run_id" ], "composite": true, "constraint": true, diff --git a/packages/modules/workflow-engine-redis/src/migrations/Migration20250505101505.ts b/packages/modules/workflow-engine-redis/src/migrations/Migration20250505101505.ts new file mode 100644 index 0000000000..60680b26aa --- /dev/null +++ b/packages/modules/workflow-engine-redis/src/migrations/Migration20250505101505.ts @@ -0,0 +1,45 @@ +import { Migration } from "@mikro-orm/migrations" +import { ulid } from "ulid" + +export class Migration20250505101505 extends Migration { + override async up(): Promise { + this.addSql( + `alter table if exists "workflow_execution" drop constraint if exists "workflow_execution_workflow_id_transaction_id_run_id_unique";` + ) + this.addSql( + `drop index if exists "IDX_workflow_execution_workflow_id_transaction_id_unique";` + ) + this.addSql( + `alter table if exists "workflow_execution" drop constraint if exists "PK_workflow_execution_workflow_id_transaction_id";` + ) + + this.addSql( + `alter table if exists "workflow_execution" add column if not exists "run_id" text not null default '${ulid()}';` + ) + this.addSql( + `CREATE UNIQUE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id_transaction_id_run_id_unique" ON "workflow_execution" (workflow_id, transaction_id, run_id) WHERE deleted_at IS NULL;` + ) + this.addSql( + `alter table if exists "workflow_execution" add constraint "workflow_execution_pkey" primary key ("workflow_id", "transaction_id", "run_id");` + ) + } + + override async down(): Promise { + this.addSql( + `drop index if exists "IDX_workflow_execution_workflow_id_transaction_id_run_id_unique";` + ) + this.addSql( + `alter table if exists "workflow_execution" drop constraint if exists "workflow_execution_pkey";` + ) + this.addSql( + `alter table if exists "workflow_execution" drop column if exists "run_id";` + ) + + this.addSql( + `CREATE UNIQUE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id_transaction_id_unique" ON "workflow_execution" (workflow_id, transaction_id) WHERE deleted_at IS NULL;` + ) + this.addSql( + `alter table if exists "workflow_execution" add constraint "workflow_execution_pkey" primary key ("workflow_id", "transaction_id");` + ) + } +} diff --git a/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts b/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts index a533a54094..15f56fbd13 100644 --- a/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts +++ b/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts @@ -6,6 +6,7 @@ export const WorkflowExecution = model id: model.id({ prefix: "wf_exec" }), workflow_id: model.text().primaryKey(), transaction_id: model.text().primaryKey(), + run_id: model.text().primaryKey(), execution: model.json().nullable(), context: model.json().nullable(), state: model.enum(TransactionState), @@ -25,7 +26,7 @@ export const WorkflowExecution = model where: "deleted_at IS NULL", }, { - on: ["workflow_id", "transaction_id"], + on: ["workflow_id", "transaction_id", "run_id"], unique: true, where: "deleted_at IS NULL", }, diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index aa0db54014..db4a6b9280 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -30,6 +30,7 @@ export type WorkflowOrchestratorRunOptions = Omit< "container" > & { transactionId?: string + runId?: string container?: ContainerLike } @@ -38,6 +39,7 @@ export type WorkflowOrchestratorCancelOptions = Omit< "transaction" | "transactionId" | "container" > & { transactionId: string + runId?: string container?: ContainerLike } @@ -205,7 +207,6 @@ export class WorkflowOrchestratorService { throwOnError ??= true context ??= {} context.transactionId = transactionId ?? ulid() - const workflowId = isString(workflowIdOrWorkflow) ? workflowIdOrWorkflow : workflowIdOrWorkflow.getName() @@ -319,7 +320,7 @@ export class WorkflowOrchestratorService { const transaction = await this.getRunningTransaction( workflowId, transactionId, - options + { ...options, isCancelling: true } ) if (!transaction) { if (!throwOnError) { diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index f421dcba2a..acd3c438a8 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -6,9 +6,11 @@ import { SchedulerOptions, SkipExecutionError, TransactionCheckpoint, + TransactionContext, TransactionFlow, TransactionOptions, TransactionStep, + TransactionStepError, } from "@medusajs/framework/orchestration" import { Logger, ModulesSdkTypes } from "@medusajs/framework/types" import { @@ -160,6 +162,7 @@ export class RedisDistributedTransactionStorage { workflow_id: data.flow.modelId, transaction_id: data.flow.transactionId, + run_id: data.flow.runId, execution: data.flow, context: { data: data.context, @@ -176,6 +179,7 @@ export class RedisDistributedTransactionStorage { workflow_id: data.flow.modelId, transaction_id: data.flow.transactionId, + run_id: data.flow.runId, }, ]) } @@ -223,7 +227,7 @@ export class RedisDistributedTransactionStorage async get( key: string, - options?: TransactionOptions + options?: TransactionOptions & { isCancelling?: boolean } ): Promise { const data = await this.redisClient.get(key) @@ -240,26 +244,54 @@ export class RedisDistributedTransactionStorage const [_, workflowId, transactionId] = key.split(":") const trx = await this.workflowExecutionService_ - .retrieve( + .list( { workflow_id: workflowId, transaction_id: transactionId, }, { select: ["execution", "context"], + order: { + id: "desc", + }, + take: 1, } ) + .then((trx) => trx[0]) .catch(() => undefined) if (trx) { - const checkpointData = { - flow: trx.execution, - context: trx.context.data, - errors: trx.context.errors, + const execution = trx.execution as TransactionFlow + + if (!idempotent) { + const isFailedOrReverted = [ + TransactionState.REVERTED, + TransactionState.FAILED, + ].includes(execution.state) + + const isDone = execution.state === TransactionState.DONE + + const isCancellingAndFailedOrReverted = + options?.isCancelling && isFailedOrReverted + + const isNotCancellingAndDoneOrFailedOrReverted = + !options?.isCancelling && (isDone || isFailedOrReverted) + + if ( + isCancellingAndFailedOrReverted || + isNotCancellingAndDoneOrFailedOrReverted + ) { + return + } } - return checkpointData + return { + flow: trx.execution as TransactionFlow, + context: trx.context?.data as TransactionContext, + errors: trx.context?.errors as TransactionStepError[], + } } + return } @@ -325,6 +357,11 @@ export class RedisDistributedTransactionStorage }) } + const isNotStarted = data.flow.state === TransactionState.NOT_STARTED + const isManualTransactionId = !data.flow.transactionId.startsWith("auto-") + // Only set if not exists + const shouldSetNX = isNotStarted && isManualTransactionId + // Prepare operations to be executed in batch or pipeline const stringifiedData = JSON.stringify(data) const pipeline = this.redisClient.pipeline() @@ -332,19 +369,45 @@ export class RedisDistributedTransactionStorage // Execute Redis operations if (!hasFinished) { if (ttl) { - pipeline.set(key, stringifiedData, "EX", ttl) + if (shouldSetNX) { + pipeline.set(key, stringifiedData, "EX", ttl, "NX") + } else { + pipeline.set(key, stringifiedData, "EX", ttl) + } } else { - pipeline.set(key, stringifiedData) + if (shouldSetNX) { + pipeline.set(key, stringifiedData, "NX") + } else { + pipeline.set(key, stringifiedData) + } } } else { pipeline.unlink(key) } + const pipelinePromise = pipeline.exec().then((result) => { + if (!shouldSetNX) { + return result + } + + const actionResult = result?.pop() + const isOk = !!actionResult?.pop() + if (!isOk) { + throw new MedusaError( + MedusaError.Types.INVALID_ARGUMENT, + "Transaction already started for transactionId: " + + data.flow.transactionId + ) + } + + return result + }) + // Database operations if (hasFinished && !retentionTime && !idempotent) { - await promiseAll([pipeline.exec(), this.deleteFromDb(data)]) + await promiseAll([pipelinePromise, this.deleteFromDb(data)]) } else { - await promiseAll([pipeline.exec(), this.saveToDb(data, retentionTime)]) + await promiseAll([pipelinePromise, this.saveToDb(data, retentionTime)]) } } @@ -540,15 +603,23 @@ export class RedisDistributedTransactionStorage key: string options?: TransactionOptions }) { - const isInitialCheckpoint = data.flow.state === TransactionState.NOT_STARTED + const isInitialCheckpoint = [TransactionState.NOT_STARTED].includes( + data.flow.state + ) /** * In case many execution can succeed simultaneously, we need to ensure that the latest * execution does continue if a previous execution is considered finished */ const currentFlow = data.flow + + const getOptions = { + ...options, + isCancelling: !!data.flow.cancelledAt, + } as Parameters[1] + const { flow: latestUpdatedFlow } = - (await this.get(key, options)) ?? + (await this.get(key, getOptions)) ?? ({ flow: {} } as { flow: TransactionFlow }) if (!isInitialCheckpoint && !isPresent(latestUpdatedFlow)) { diff --git a/yarn.lock b/yarn.lock index d9f3168bed..62300922b6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6800,6 +6800,7 @@ __metadata: pg: ^8.13.0 rimraf: ^5.0.1 typescript: ^5.6.2 + ulid: ^2.3.0 peerDependencies: "@mikro-orm/core": 6.4.3 "@mikro-orm/knex": 6.4.3