From 0625f76cd4f040963829b61dcc70563e1d1b7070 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Date: Mon, 17 Mar 2025 09:59:09 -0300 Subject: [PATCH] chore(workflow-engine): export cancel method (#11844) What: * Workflow engine exports the method `cancel` to revert a workflow. --- .changeset/good-wolves-travel.md | 8 ++ .../transaction/transaction-orchestrator.ts | 130 +++++++++--------- .../core/types/src/workflows-sdk/service.ts | 25 ++-- .../integration-tests/__fixtures__/index.ts | 1 + .../__fixtures__/workflow_sync.ts | 64 +++++++++ .../integration-tests/__tests__/index.spec.ts | 20 +++ .../src/services/workflow-orchestrator.ts | 16 +-- .../src/services/workflows-module.ts | 13 ++ .../src/types/index.ts | 3 +- .../integration-tests/__fixtures__/index.ts | 3 +- .../__fixtures__/workflow_sync.ts | 64 +++++++++ .../integration-tests/__tests__/index.spec.ts | 20 +++ .../src/services/workflow-orchestrator.ts | 18 +-- .../src/services/workflows-module.ts | 16 ++- 14 files changed, 309 insertions(+), 92 deletions(-) create mode 100644 .changeset/good-wolves-travel.md create mode 100644 packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync.ts create mode 100644 packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync.ts diff --git a/.changeset/good-wolves-travel.md b/.changeset/good-wolves-travel.md new file mode 100644 index 0000000000..b3375938da --- /dev/null +++ b/.changeset/good-wolves-travel.md @@ -0,0 +1,8 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/orchestration": patch +"@medusajs/types": patch +--- + +chore(workflow-engine): expose cancel method diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index a2d70e9889..f729bb0582 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -20,6 +20,7 @@ import { import { isDefined, isErrorLike, + isObject, MedusaError, promiseAll, serializeError, @@ -188,6 +189,7 @@ export class TransactionOrchestrator extends EventEmitter { TransactionStepState.DORMANT, TransactionStepState.SKIPPED, ] + const siblings = step.next.map((sib) => flow.steps[sib]) return ( siblings.length === 0 || @@ -1208,70 +1210,72 @@ export class TransactionOrchestrator extends EventEmitter { while (queue.length > 0) { const { obj, level } = queue.shift() - for (const key of Object.keys(obj)) { - if (typeof obj[key] === "object" && obj[key] !== null) { - queue.push({ obj: obj[key], level: [...level] }) - } else if (key === "action") { - if (actionNames.has(obj.action)) { - throw new Error( - `Step ${obj.action} is already defined in workflow.` - ) - } - - actionNames.add(obj.action) - level.push(obj.action) - const id = level.join(".") - const parent = level.slice(0, level.length - 1).join(".") - - if (!existingSteps || parent === TransactionOrchestrator.ROOT_STEP) { - states[parent].next?.push(id) - } - - const definitionCopy = { ...obj } - delete definitionCopy.next - - if (definitionCopy.async) { - features.hasAsyncSteps = true - } - - if (definitionCopy.timeout) { - features.hasStepTimeouts = true - } - - if ( - definitionCopy.retryInterval || - definitionCopy.retryIntervalAwaiting - ) { - features.hasRetriesTimeout = true - } - - if (definitionCopy.nested) { - features.hasNestedTransactions = true - } - - states[id] = Object.assign( - new TransactionStep(), - existingSteps?.[id] || { - id, - uuid: definitionCopy.uuid, - depth: level.length - 1, - definition: definitionCopy, - saveResponse: definitionCopy.saveResponse ?? true, - invoke: { - state: TransactionStepState.NOT_STARTED, - status: TransactionStepStatus.IDLE, - }, - compensate: { - state: TransactionStepState.DORMANT, - status: TransactionStepStatus.IDLE, - }, - attempts: 0, - failures: 0, - lastAttempt: null, - next: [], - } - ) + if (obj.action) { + if (actionNames.has(obj.action)) { + throw new Error(`Step ${obj.action} is already defined in workflow.`) } + + actionNames.add(obj.action) + level.push(obj.action) + const id = level.join(".") + const parent = level.slice(0, level.length - 1).join(".") + + if (!existingSteps || parent === TransactionOrchestrator.ROOT_STEP) { + states[parent].next?.push(id) + } + + const definitionCopy = { ...obj } + delete definitionCopy.next + + if (definitionCopy.async) { + features.hasAsyncSteps = true + } + + if (definitionCopy.timeout) { + features.hasStepTimeouts = true + } + + if ( + definitionCopy.retryInterval || + definitionCopy.retryIntervalAwaiting + ) { + features.hasRetriesTimeout = true + } + + if (definitionCopy.nested) { + features.hasNestedTransactions = true + } + + states[id] = Object.assign( + new TransactionStep(), + existingSteps?.[id] || { + id, + uuid: definitionCopy.uuid, + depth: level.length - 1, + definition: definitionCopy, + saveResponse: definitionCopy.saveResponse ?? true, + invoke: { + state: TransactionStepState.NOT_STARTED, + status: TransactionStepStatus.IDLE, + }, + compensate: { + state: TransactionStepState.DORMANT, + status: TransactionStepStatus.IDLE, + }, + attempts: 0, + failures: 0, + lastAttempt: null, + next: [], + } + ) + } + + if (Array.isArray(obj.next)) { + for (const next of obj.next) { + queue.push({ obj: next, level: [...level] }) + } + } else if (isObject(obj.next)) { + queue.push({ obj: obj.next, level: [...level] }) } } diff --git a/packages/core/types/src/workflows-sdk/service.ts b/packages/core/types/src/workflows-sdk/service.ts index 59e719c3ba..a2a95ab6de 100644 --- a/packages/core/types/src/workflows-sdk/service.ts +++ b/packages/core/types/src/workflows-sdk/service.ts @@ -1,5 +1,5 @@ import { FindConfig } from "../common" -import { IModuleService } from "../modules-sdk" +import { ContainerLike, IModuleService } from "../modules-sdk" import { Context } from "../shared-context" import { FilterableWorkflowExecutionProps, @@ -28,6 +28,15 @@ export interface WorkflowOrchestratorRunDTO transactionId?: string } +export interface WorkflowOrchestratorCancelOptionsDTO { + transactionId: string + context?: Context + throwOnError?: boolean + logOnError?: boolean + events?: Record + container?: ContainerLike +} + export type IdempotencyKeyParts = { workflowId: string transactionId: string @@ -63,17 +72,11 @@ export interface IWorkflowEngineService extends IModuleService { workflowId: string, options?: WorkflowOrchestratorRunDTO, sharedContext?: Context - ): Promise<{ - errors: Error[] - transaction: object - result: any - acknowledgement: Acknowledgement - }> + ) getRunningTransaction( workflowId: string, transactionId: string, - options?: Record, sharedContext?: Context ): Promise @@ -121,4 +124,10 @@ export interface IWorkflowEngineService extends IModuleService { }, sharedContext?: Context ) + + cancel( + workflowId: string, + options: WorkflowOrchestratorCancelOptionsDTO, + sharedContext?: Context + ) } diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts index fee018fbdc..87a9596f4a 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts @@ -4,4 +4,5 @@ export * from "./workflow_async" export * from "./workflow_conditional_step" export * from "./workflow_idempotent" export * from "./workflow_step_timeout" +export * from "./workflow_sync" export * from "./workflow_transaction_timeout" diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync.ts new file mode 100644 index 0000000000..40fc5fb822 --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync.ts @@ -0,0 +1,64 @@ +import { + createStep, + createWorkflow, + StepResponse, + WorkflowResponse, +} from "@medusajs/framework/workflows-sdk" + +const step_1 = createStep( + "step_1", + jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) + }), + jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) + }) +) + +const step_2 = createStep( + "step_2", + jest.fn((input, context) => { + if (input) { + return new StepResponse({ notAsyncResponse: input.hey }) + } + }), + jest.fn((_, context) => { + return new StepResponse({ + step: context.metadata.action, + idempotency_key: context.metadata.idempotency_key, + reverted: true, + }) + }) +) + +const step_3 = createStep( + "step_3", + jest.fn((res) => { + return new StepResponse({ + done: { + inputFromSyncStep: res.notAsyncResponse, + }, + }) + }) +) + +createWorkflow( + { + name: "workflow_sync", + idempotent: true, + }, + function (input) { + step_1(input) + + const ret2 = step_2({ hey: "oh" }) + + return new WorkflowResponse(step_3(ret2)) + } +) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index 241e4da9ed..cc2d482f2b 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -300,6 +300,26 @@ moduleIntegrationTestRunner({ expect(onFinish).toHaveBeenCalledTimes(0) }) + it("should cancel and revert a completed workflow", async () => { + const workflowId = "workflow_sync" + + const { acknowledgement, transaction: trx } = + await workflowOrcModule.run(workflowId, { + input: { + value: "123", + }, + }) + + expect(trx.getFlow().state).toEqual("done") + expect(acknowledgement.hasFinished).toBe(true) + + const { transaction } = await workflowOrcModule.cancel(workflowId, { + transactionId: acknowledgement.transactionId, + }) + + expect(transaction.getFlow().state).toEqual("reverted") + }) + it("should run conditional steps if condition is true", (done) => { void workflowOrcModule.subscribe({ workflowId: "workflow_conditional_step", diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 02586d8a6e..483de93037 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -6,7 +6,11 @@ import { TransactionStep, WorkflowScheduler, } from "@medusajs/framework/orchestration" -import { ContainerLike, MedusaContainer } from "@medusajs/framework/types" +import { + ContainerLike, + Context, + MedusaContainer, +} from "@medusajs/framework/types" import { isString, MedusaError, @@ -18,9 +22,9 @@ import { resolveValue, ReturnWorkflow, } from "@medusajs/framework/workflows-sdk" +import { WorkflowOrchestratorCancelOptions } from "@types" import { ulid } from "ulid" import { InMemoryDistributedTransactionStorage } from "../utils" -import { WorkflowOrchestratorCancelOptions } from "@types" export type WorkflowOrchestratorRunOptions = Omit< FlowRunOptions, @@ -319,10 +323,8 @@ export class WorkflowOrchestratorService { async getRunningTransaction( workflowId: string, transactionId: string, - options?: WorkflowOrchestratorRunOptions + context?: Context ): Promise { - let { context, container } = options ?? {} - if (!workflowId) { throw new Error("Workflow ID is required") } @@ -339,9 +341,7 @@ export class WorkflowOrchestratorService { throw new Error(`Workflow with id "${workflowId}" not found.`) } - const flow = exportedWorkflow( - (container as MedusaContainer) ?? this.container_ - ) + const flow = exportedWorkflow() const transaction = await flow.getRunningTransaction(transactionId, context) diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts index 07d1c3aa59..b397b83d83 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts @@ -18,6 +18,7 @@ import type { import { SqlEntityManager } from "@mikro-orm/postgresql" import { WorkflowExecution } from "@models" import { WorkflowOrchestratorService } from "@services" +import { WorkflowOrchestratorCancelOptions } from "@types" type InjectedDependencies = { manager: SqlEntityManager @@ -185,4 +186,16 @@ export class WorkflowsModuleService< updated_at <= (CURRENT_TIMESTAMP - INTERVAL '1 second' * retention_time); `) } + + @InjectSharedContext() + async cancel>( + workflowIdOrWorkflow: TWorkflow, + options: WorkflowOrchestratorCancelOptions, + @MedusaContext() context: Context = {} + ) { + return this.workflowOrchestratorService_.cancel( + workflowIdOrWorkflow, + options + ) + } } diff --git a/packages/modules/workflow-engine-inmemory/src/types/index.ts b/packages/modules/workflow-engine-inmemory/src/types/index.ts index 7a7ac40112..272f33d10a 100644 --- a/packages/modules/workflow-engine-inmemory/src/types/index.ts +++ b/packages/modules/workflow-engine-inmemory/src/types/index.ts @@ -8,7 +8,8 @@ export type InitializeModuleInjectableDependencies = { export type WorkflowOrchestratorCancelOptions = Omit< FlowCancelOptions, - "transaction" | "container" + "transaction" | "transactionId" | "container" > & { + transactionId: string container?: ContainerLike } diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/index.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/index.ts index f3183ed070..3b47cf4e8d 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/index.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/index.ts @@ -1,7 +1,8 @@ export * from "./workflow_1" export * from "./workflow_2" export * from "./workflow_async" +export * from "./workflow_async_compensate" export * from "./workflow_step_timeout" +export * from "./workflow_sync" export * from "./workflow_transaction_timeout" export * from "./workflow_when" -export * from "./workflow_async_compensate" diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync.ts new file mode 100644 index 0000000000..40fc5fb822 --- /dev/null +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync.ts @@ -0,0 +1,64 @@ +import { + createStep, + createWorkflow, + StepResponse, + WorkflowResponse, +} from "@medusajs/framework/workflows-sdk" + +const step_1 = createStep( + "step_1", + jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) + }), + jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + return new StepResponse({ + reverted: true, + }) + }) +) + +const step_2 = createStep( + "step_2", + jest.fn((input, context) => { + if (input) { + return new StepResponse({ notAsyncResponse: input.hey }) + } + }), + jest.fn((_, context) => { + return new StepResponse({ + step: context.metadata.action, + idempotency_key: context.metadata.idempotency_key, + reverted: true, + }) + }) +) + +const step_3 = createStep( + "step_3", + jest.fn((res) => { + return new StepResponse({ + done: { + inputFromSyncStep: res.notAsyncResponse, + }, + }) + }) +) + +createWorkflow( + { + name: "workflow_sync", + idempotent: true, + }, + function (input) { + step_1(input) + + const ret2 = step_2({ hey: "oh" }) + + return new WorkflowResponse(step_3(ret2)) + } +) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index a3410fb009..16f01254ac 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -512,6 +512,26 @@ moduleIntegrationTestRunner({ failTrap(done) }) + + it("should cancel and revert a completed workflow", async () => { + const workflowId = "workflow_sync" + + const { acknowledgement, transaction: trx } = + await workflowOrcModule.run(workflowId, { + input: { + value: "123", + }, + }) + + expect(trx.getFlow().state).toEqual("done") + expect(acknowledgement.hasFinished).toBe(true) + + const { transaction } = await workflowOrcModule.cancel(workflowId, { + transactionId: acknowledgement.transactionId, + }) + + expect(transaction.getFlow().state).toEqual("reverted") + }) }) // Note: These tests depend on actual Redis instance and waiting for the scheduled jobs to run, which isn't great. diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index b5293cc482..eef069c029 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -35,8 +35,11 @@ export type WorkflowOrchestratorRunOptions = Omit< export type WorkflowOrchestratorCancelOptions = Omit< FlowCancelOptions, - "transaction" -> + "transaction" | "transactionId" | "container" +> & { + transactionId: string + container?: ContainerLike +} type RegisterStepSuccessOptions = Omit< WorkflowOrchestratorRunOptions, @@ -379,10 +382,8 @@ export class WorkflowOrchestratorService { async getRunningTransaction( workflowId: string, transactionId: string, - options?: { context?: Context } + context?: Context ): Promise { - let { context } = options ?? {} - if (!workflowId) { throw new Error("Workflow ID is required") } @@ -398,10 +399,9 @@ export class WorkflowOrchestratorService { throw new Error(`Workflow with id "${workflowId}" not found.`) } - const transaction = await exportedWorkflow.getRunningTransaction( - transactionId, - context - ) + const flow = exportedWorkflow() + + const transaction = await flow.getRunningTransaction(transactionId, context) return transaction } diff --git a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts index 2795c4f7e5..ff1a71a605 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts @@ -17,7 +17,10 @@ import type { } from "@medusajs/framework/workflows-sdk" import { SqlEntityManager } from "@mikro-orm/postgresql" import { WorkflowExecution } from "@models" -import { WorkflowOrchestratorService } from "@services" +import { + WorkflowOrchestratorCancelOptions, + WorkflowOrchestratorService, +} from "@services" type InjectedDependencies = { manager: SqlEntityManager @@ -112,7 +115,7 @@ export class WorkflowsModuleService< return await this.workflowOrchestratorService_.getRunningTransaction( workflowId, transactionId, - { context } + context ) } @@ -194,4 +197,13 @@ export class WorkflowsModuleService< updated_at <= (CURRENT_TIMESTAMP - INTERVAL '1 second' * retention_time); `) } + + @InjectSharedContext() + async cancel( + workflowId: string, + options: WorkflowOrchestratorCancelOptions, + @MedusaContext() context: Context = {} + ) { + return this.workflowOrchestratorService_.cancel(workflowId, options) + } }