diff --git a/packages/core/orchestration/src/transaction/errors.ts b/packages/core/orchestration/src/transaction/errors.ts index fb94e45db0..627df65aa3 100644 --- a/packages/core/orchestration/src/transaction/errors.ts +++ b/packages/core/orchestration/src/transaction/errors.ts @@ -98,3 +98,19 @@ export class SkipExecutionError extends Error { this.name = "SkipExecutionError" } } + +export class SkipCancelledExecutionError extends Error { + static isSkipCancelledExecutionError( + error: Error + ): error is SkipCancelledExecutionError { + return ( + error instanceof SkipCancelledExecutionError || + error?.name === "SkipCancelledExecutionError" + ) + } + + constructor(message?: string) { + super(message) + this.name = "SkipCancelledExecutionError" + } +} diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 6d42cf2172..1a9330faf3 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -31,6 +31,7 @@ import { import { EventEmitter } from "events" import { PermanentStepFailureError, + SkipCancelledExecutionError, SkipExecutionError, SkipStepResponse, TransactionStepTimeoutError, @@ -494,6 +495,7 @@ export class TransactionOrchestrator extends EventEmitter { response: unknown ): Promise<{ stopExecution: boolean + transactionIsCancelling?: boolean }> { const hasStepTimedOut = step.getStates().state === TransactionStepState.TIMEOUT @@ -519,10 +521,20 @@ export class TransactionOrchestrator extends EventEmitter { } let shouldEmit = true + let transactionIsCancelling = false try { await transaction.saveCheckpoint() } catch (error) { - shouldEmit = false + if ( + !SkipCancelledExecutionError.isSkipCancelledExecutionError(error) && + !SkipExecutionError.isSkipExecutionError(error) + ) { + throw error + } + + transactionIsCancelling = + SkipCancelledExecutionError.isSkipCancelledExecutionError(error) + shouldEmit = !SkipExecutionError.isSkipExecutionError(error) } const cleaningUp: Promise[] = [] @@ -544,6 +556,7 @@ export class TransactionOrchestrator extends EventEmitter { return { stopExecution: !shouldEmit, + transactionIsCancelling, } } @@ -555,6 +568,7 @@ export class TransactionOrchestrator extends EventEmitter { step: TransactionStep }): Promise<{ stopExecution: boolean + transactionIsCancelling?: boolean }> { const hasStepTimedOut = step.getStates().state === TransactionStepState.TIMEOUT @@ -565,13 +579,22 @@ export class TransactionOrchestrator extends EventEmitter { } let shouldEmit = true + let transactionIsCancelling = false try { await transaction.saveCheckpoint() } catch (error) { + if ( + !SkipCancelledExecutionError.isSkipCancelledExecutionError(error) && + !SkipExecutionError.isSkipExecutionError(error) + ) { + throw error + } + + transactionIsCancelling = + SkipCancelledExecutionError.isSkipCancelledExecutionError(error) + if (SkipExecutionError.isSkipExecutionError(error)) { shouldEmit = false - } else { - throw error } } @@ -592,6 +615,7 @@ export class TransactionOrchestrator extends EventEmitter { return { stopExecution: !shouldEmit, + transactionIsCancelling, } } @@ -649,6 +673,7 @@ export class TransactionOrchestrator extends EventEmitter { timeoutError?: TransactionStepTimeoutError | TransactionTimeoutError ): Promise<{ stopExecution: boolean + transactionIsCancelling?: boolean }> { if (SkipExecutionError.isSkipExecutionError(error)) { return { @@ -734,14 +759,23 @@ export class TransactionOrchestrator extends EventEmitter { } } + let transactionIsCancelling = false let shouldEmit = true try { await transaction.saveCheckpoint() } catch (error) { + if ( + !SkipCancelledExecutionError.isSkipCancelledExecutionError(error) && + !SkipExecutionError.isSkipExecutionError(error) + ) { + throw error + } + + transactionIsCancelling = + SkipCancelledExecutionError.isSkipCancelledExecutionError(error) + if (SkipExecutionError.isSkipExecutionError(error)) { shouldEmit = false - } else { - throw error } } @@ -760,6 +794,7 @@ export class TransactionOrchestrator extends EventEmitter { return { stopExecution: !shouldEmit, + transactionIsCancelling, } } @@ -785,15 +820,30 @@ export class TransactionOrchestrator extends EventEmitter { return } - const execution: Promise[] = [] - for (const step of nextSteps.next) { + const stepsShouldContinueExecution = nextSteps.next.map((step) => { const { shouldContinueExecution } = this.prepareStepForExecution( step, flow ) - // Should stop the execution if next step cant be handled - if (!shouldContinueExecution) { + return shouldContinueExecution + }) + + await transaction.saveCheckpoint().catch((error) => { + if (SkipExecutionError.isSkipExecutionError(error)) { + continueExecution = false + return + } + + throw error + }) + + const execution: Promise[] = [] + + let i = 0 + for (const step of nextSteps.next) { + const stepIndex = i++ + if (!stepsShouldContinueExecution[stepIndex]) { continue } @@ -813,16 +863,6 @@ export class TransactionOrchestrator extends EventEmitter { // Compute current transaction state await this.computeCurrentTransactionState(transaction) - // Save checkpoint before executing step - await transaction.saveCheckpoint().catch((error) => { - if (SkipExecutionError.isSkipExecutionError(error)) { - continueExecution = false - return - } - - throw error - }) - if (!continueExecution) { break } @@ -1130,6 +1170,10 @@ export class TransactionOrchestrator extends EventEmitter { response ) + if (ret.transactionIsCancelling) { + return await this.cancelTransaction(transaction) + } + if (isAsync && !ret.stopExecution) { // Schedule to continue the execution of async steps because they are not awaited on purpose and can be handled by another machine await transaction.scheduleRetry(step, 0) @@ -1156,12 +1200,16 @@ export class TransactionOrchestrator extends EventEmitter { ) } - await TransactionOrchestrator.setStepFailure( + const ret = await TransactionOrchestrator.setStepFailure( transaction, step, error, isPermanent ? 0 : step.definition.maxRetries ) + + if (ret.transactionIsCancelling) { + return await this.cancelTransaction(transaction) + } } /** @@ -1245,6 +1293,8 @@ export class TransactionOrchestrator extends EventEmitter { flow.state = TransactionState.WAITING_TO_COMPENSATE flow.cancelledAt = Date.now() + await transaction.saveCheckpoint() + await this.executeNext(transaction) } @@ -1667,12 +1717,17 @@ export class TransactionOrchestrator extends EventEmitter { transaction: curTransaction, }) - await TransactionOrchestrator.setStepSuccess( + const ret = await TransactionOrchestrator.setStepSuccess( curTransaction, step, response ) + if (ret.transactionIsCancelling) { + await this.cancelTransaction(curTransaction) + return curTransaction + } + await this.executeNext(curTransaction) } else { throw new MedusaError( @@ -1721,13 +1776,18 @@ export class TransactionOrchestrator extends EventEmitter { transaction: curTransaction, }) - await TransactionOrchestrator.setStepFailure( + const ret = await TransactionOrchestrator.setStepFailure( curTransaction, step, error, 0 ) + if (ret.transactionIsCancelling) { + await this.cancelTransaction(curTransaction) + return curTransaction + } + await this.executeNext(curTransaction) } else { throw new MedusaError( diff --git a/packages/core/orchestration/src/workflow/local-workflow.ts b/packages/core/orchestration/src/workflow/local-workflow.ts index 15ab1a1abb..08dde264f7 100644 --- a/packages/core/orchestration/src/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/workflow/local-workflow.ts @@ -402,10 +402,18 @@ export class LocalWorkflow { this.medusaContext = context const { orchestrator } = this.workflow - const transaction = isString(transactionOrTransactionId) + let transaction = isString(transactionOrTransactionId) ? await this.getRunningTransaction(transactionOrTransactionId, context) : transactionOrTransactionId + // not a distributed transaction instance + if (!transaction.getFlow) { + transaction = await this.getRunningTransaction( + (transaction as any).flow.transactionId, + context + ) + } + if (this.medusaContext) { this.medusaContext.eventGroupId = transaction.getFlow().metadata?.eventGroupId diff --git a/packages/core/workflows-sdk/src/utils/composer/create-step.ts b/packages/core/workflows-sdk/src/utils/composer/create-step.ts index 9fa20f2364..9e7a582e04 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-step.ts @@ -182,7 +182,7 @@ export function applyStep< compensateFn, }) - wrapAsyncHandler(stepConfig, handler) + wrapAsyncHandler(newConfig, handler) this.handlers.set(newStepName, handler) diff --git a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts index 74f35e3e42..097e251b9b 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts @@ -184,11 +184,16 @@ export function createWorkflow( }: { input: TData }): ReturnType> => { + // Get current workflow composition context + const workflowCompositionContext = + global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] + + const runAsAsync = workflowCompositionContext.isAsync || context.isAsync const step = createStep( { name: `${name}-as-step`, - async: context.isAsync, - nested: context.isAsync, // if async we flag this is a nested transaction + async: runAsAsync, + nested: runAsAsync, // if async we flag this is a nested transaction }, async (stepInput: TData, stepContext) => { const { container, ...sharedContext } = stepContext @@ -206,7 +211,7 @@ export function createWorkflow( } let transaction - if (workflowEngine && context.isAsync) { + if (workflowEngine && runAsAsync) { transaction = await workflowEngine.run(name, { input: stepInput as any, context: executionContext, @@ -221,7 +226,7 @@ export function createWorkflow( return new StepResponse( transaction.result, - context.isAsync ? stepContext.transactionId : transaction + runAsAsync ? stepContext.transactionId : transaction ) }, async (transaction, stepContext) => { @@ -246,7 +251,7 @@ export function createWorkflow( const transactionId = step.__step__ + "-" + stepContext.transactionId - if (workflowEngine && context.isAsync) { + if (workflowEngine && runAsAsync) { await workflowEngine.cancel(name, { transactionId: transactionId, context: executionContext, diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1.ts index 8cdcc73123..d9caec12b1 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1.ts @@ -1,3 +1,4 @@ +import { isPresent } from "@medusajs/framework/utils" import { createStep, createWorkflow, @@ -24,7 +25,7 @@ const step_1 = createStep( const step_2 = createStep( "step_2", jest.fn((input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } }), @@ -53,7 +54,7 @@ createWorkflow("workflow_1", function (input) { const ret2 = step_2({ hey: "oh" }) - step_2({ hey: "async hello" }).config({ + step_2().config({ name: "new_step_name", async: true, }) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_2.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_2.ts index b85a54e9be..24e96d07de 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_2.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_2.ts @@ -1,3 +1,4 @@ +import { isPresent } from "@medusajs/framework/utils" import { createStep, createWorkflow, @@ -22,7 +23,7 @@ const step_1 = createStep( ) export const workflow2Step2Invoke = jest.fn((input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } }) @@ -57,7 +58,7 @@ createWorkflow( step_2({ hey: "oh" }) - const ret2_async = step_2({ hey: "async hello" }).config({ + const ret2_async = step_2().config({ name: "new_step_name", async: true, }) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_conditional_step.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_conditional_step.ts index 813b1b4738..899b08189d 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_conditional_step.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_conditional_step.ts @@ -1,3 +1,4 @@ +import { isPresent } from "@medusajs/framework/utils" import { createStep, createWorkflow, @@ -14,7 +15,7 @@ const step_1 = createStep( ) export const conditionalStep2Invoke = jest.fn((input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } }) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_idempotent.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_idempotent.ts index 074db595a7..df5b3e72f5 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_idempotent.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_idempotent.ts @@ -1,3 +1,4 @@ +import { isPresent } from "@medusajs/framework/utils" import { createStep, createWorkflow, @@ -25,7 +26,7 @@ const step_1 = createStep( const step_2 = createStep( "step_2", jest.fn((input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } }), diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts index dd179f4b34..647e50115b 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts @@ -1,3 +1,4 @@ +import { isPresent } from "@medusajs/framework/utils" import { createStep, createWorkflow, @@ -23,7 +24,7 @@ const step_1 = createStep( export const workflowNotIdempotentWithRetentionStep2Invoke = jest.fn( (input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } } diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync.ts index 40fc5fb822..8c87afa4e1 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync.ts @@ -4,6 +4,7 @@ import { StepResponse, WorkflowResponse, } from "@medusajs/framework/workflows-sdk" +import { isPresent } from "@medusajs/framework/utils" const step_1 = createStep( "step_1", @@ -25,7 +26,7 @@ const step_1 = createStep( const step_2 = createStep( "step_2", jest.fn((input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } }), diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index b97a5698a6..6632842364 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -23,9 +23,9 @@ import { import { moduleIntegrationTestRunner } from "@medusajs/test-utils" import { WorkflowsModuleService } from "@services" import { asFunction } from "awilix" -import { ulid } from "ulid" import { setTimeout as setTimeoutSync } from "timers" import { setTimeout as setTimeoutPromise } from "timers/promises" +import { ulid } from "ulid" import "../__fixtures__" import { conditionalStep2Invoke, @@ -110,6 +110,205 @@ moduleIntegrationTestRunner({ }) }) + describe("Cancel transaction", function () { + it("should cancel an ongoing execution with async unfinished yet step", async () => { + const transactionId = "transaction-to-cancel-id" + const step1 = createStep("step1", async () => { + return new StepResponse("step1") + }) + + const step2 = createStep("step2", async () => { + await setTimeoutPromise(500) + return new StepResponse("step2") + }) + + const step3 = createStep("step3", async () => { + return new StepResponse("step3") + }) + + const workflowId = "workflow-to-cancel-id" + ulid() + + createWorkflow({ name: workflowId, retentionTime: 60 }, function () { + step1() + step2().config({ async: true }) + step3() + + return new WorkflowResponse("finished") + }) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + await setTimeoutPromise(100) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + await setTimeoutPromise(1000) + + const execution = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(execution.length).toEqual(1) + expect(execution[0].state).toEqual(TransactionState.REVERTED) + }) + + it("should cancel a complete execution with a sync workflow running as async", async () => { + const workflowId = "workflow-to-cancel-id" + ulid() + const transactionId = "transaction-to-cancel-id" + const step1 = createStep("step1", async () => { + return new StepResponse("step1") + }) + + const step2 = createStep("step2", async () => { + return new StepResponse("step2") + }) + + const step3 = createStep("step3", async () => { + return new StepResponse("step3") + }) + + const subWorkflowId = "sub-workflow-id" + ulid() + const subWorkflow = createWorkflow( + { name: subWorkflowId, retentionTime: 60 }, + function () { + return new WorkflowResponse(step2()) + } + ) + + createWorkflow({ name: workflowId, retentionTime: 60 }, function () { + step1() + subWorkflow.runAsStep({ input: {} }).config({ async: true }) + step3() + + return new WorkflowResponse("finished") + }) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + await setTimeoutPromise(100) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + await setTimeoutPromise(500) + + const execution = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(execution.length).toEqual(1) + expect(execution[0].state).toEqual(TransactionState.REVERTED) + }) + + it("should cancel an ongoing execution with a sync workflow running as async", async () => { + const workflowId = "workflow-to-cancel-id" + ulid() + const transactionId = "transaction-to-cancel-id" + const step1 = createStep("step1", async () => { + return new StepResponse("step1") + }) + + const step2 = createStep("step2", async () => { + await setTimeoutPromise(500) + return new StepResponse("step2") + }) + + const step3 = createStep("step3", async () => { + return new StepResponse("step3") + }) + + const subWorkflowId = "sub-workflow-id" + ulid() + const subWorkflow = createWorkflow( + { name: subWorkflowId, retentionTime: 60 }, + function () { + return new WorkflowResponse(step2()) + } + ) + + createWorkflow({ name: workflowId, retentionTime: 60 }, function () { + step1() + subWorkflow.runAsStep({ input: {} }).config({ async: true }) + step3() + + return new WorkflowResponse("finished") + }) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + await setTimeoutPromise(100) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + await setTimeoutPromise(1000) + + const execution = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(execution.length).toEqual(1) + expect(execution[0].state).toEqual(TransactionState.REVERTED) + }) + + it("should cancel an ongoing execution with sync steps only", async () => { + const transactionId = "transaction-to-cancel-id" + const step1 = createStep("step1", async () => { + return new StepResponse("step1") + }) + + const step2 = createStep("step2", async () => { + await setTimeoutPromise(500) + return new StepResponse("step2") + }) + + const step3 = createStep("step3", async () => { + return new StepResponse("step3") + }) + + const workflowId = "workflow-to-cancel-id" + ulid() + + createWorkflow({ name: workflowId, retentionTime: 60 }, function () { + step1() + step2() + step3() + + return new WorkflowResponse("finished") + }) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + await setTimeoutPromise(100) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + await setTimeoutPromise(1000) + + const execution = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(execution.length).toEqual(1) + expect(execution[0].state).toEqual(TransactionState.REVERTED) + }) + }) + it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => { const transactionId = "transaction_id" const workflowId = "workflow_id" + ulid() @@ -130,10 +329,12 @@ moduleIntegrationTestRunner({ ) const [result1, result2] = await promiseAll([ - workflowOrcModule.run(workflowId, { - input: {}, - transactionId, - }), + workflowOrcModule + .run(workflowId, { + input: {}, + transactionId, + }) + .catch((e) => e), workflowOrcModule .run(workflowId, { input: {}, @@ -142,8 +343,8 @@ moduleIntegrationTestRunner({ .catch((e) => e), ]) - expect(result1.result).toEqual("step1") - expect(result2.message).toEqual( + expect(result1.result || result2.result).toEqual("step1") + expect(result2.message || result1.message).toEqual( "Transaction already started for transactionId: " + transactionId ) }) @@ -362,9 +563,7 @@ moduleIntegrationTestRunner({ expect(workflow2Step2Invoke).toHaveBeenCalledTimes(2) expect(workflow2Step2Invoke.mock.calls[0][0]).toEqual({ hey: "oh" }) - expect(workflow2Step2Invoke.mock.calls[1][0]).toEqual({ - hey: "async hello", - }) + expect(workflow2Step2Invoke.mock.calls[1][0]).toEqual({}) expect(workflow2Step3Invoke).toHaveBeenCalledTimes(1) expect(workflow2Step3Invoke.mock.calls[0][0]).toEqual({ diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 59f0e493fd..14553aeb38 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -3,6 +3,7 @@ import { IDistributedSchedulerStorage, IDistributedTransactionStorage, SchedulerOptions, + SkipCancelledExecutionError, SkipExecutionError, TransactionCheckpoint, TransactionContext, @@ -290,6 +291,19 @@ export class InMemoryDistributedTransactionStorage throw new SkipExecutionError("Already finished by another execution") } + // First ensure that the latest execution was not cancelled, otherwise we skip the execution + const latestTransactionCancelledAt = latestUpdatedFlow.cancelledAt + const currentTransactionCancelledAt = currentFlow.cancelledAt + + if ( + !!latestTransactionCancelledAt && + currentTransactionCancelledAt == null + ) { + throw new SkipCancelledExecutionError( + "Workflow execution has been cancelled during the execution" + ) + } + const currentFlowSteps = Object.values(currentFlow.steps || {}) const latestUpdatedFlowSteps = latestUpdatedFlow.steps ? Object.values( diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1.ts index 19738012b5..3244c3f814 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1.ts @@ -1,3 +1,4 @@ +import { isPresent } from "@medusajs/framework/utils" import { createStep, createWorkflow, @@ -25,7 +26,7 @@ const step_1 = createStep( const step_2 = createStep( "step_2", jest.fn((input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } }), @@ -54,7 +55,7 @@ createWorkflow("workflow_1", function (input) { const ret2 = step_2({ hey: "oh" }) - step_2({ hey: "async hello" }).config({ + step_2().config({ name: "new_step_name", async: true, }) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts index 653567f999..cdb8c16738 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts @@ -1,3 +1,4 @@ +import { isPresent } from "@medusajs/framework/utils" import { createStep, createWorkflow, @@ -26,7 +27,7 @@ const step_1 = createStep( const step_2 = createStep( "step_2", jest.fn((input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } }), @@ -68,7 +69,7 @@ createWorkflow( const ret2 = step_2({ hey: "oh" }) - step_2({ hey: "async hello" }).config({ + step_2().config({ name: "new_step_name", async: true, }) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts index dd179f4b34..c85959cee7 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts @@ -3,6 +3,7 @@ import { createWorkflow, StepResponse, } from "@medusajs/framework/workflows-sdk" +import { isPresent } from "@medusajs/framework/utils" const step_1 = createStep( "step_1", @@ -23,7 +24,7 @@ const step_1 = createStep( export const workflowNotIdempotentWithRetentionStep2Invoke = jest.fn( (input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } } diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync.ts index 40fc5fb822..8c87afa4e1 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync.ts @@ -4,6 +4,7 @@ import { StepResponse, WorkflowResponse, } from "@medusajs/framework/workflows-sdk" +import { isPresent } from "@medusajs/framework/utils" const step_1 = createStep( "step_1", @@ -25,7 +26,7 @@ const step_1 = createStep( const step_2 = createStep( "step_2", jest.fn((input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } }), diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 26cadc61fa..f6a6c6f612 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -30,15 +30,15 @@ import { moduleIntegrationTestRunner } from "@medusajs/test-utils" import { asValue } from "awilix" import { setTimeout as setTimeoutSync } from "timers" import { setTimeout } from "timers/promises" +import { ulid } from "ulid" import { WorkflowsModuleService } from "../../src/services" import "../__fixtures__" -import { createScheduled } from "../__fixtures__/workflow_scheduled" -import { TestDatabase } from "../utils" import { workflowNotIdempotentWithRetentionStep2Invoke, workflowNotIdempotentWithRetentionStep3Invoke, } from "../__fixtures__" -import { ulid } from "ulid" +import { createScheduled } from "../__fixtures__/workflow_scheduled" +import { TestDatabase } from "../utils" jest.setTimeout(300000) @@ -150,9 +150,220 @@ moduleIntegrationTestRunner({ }) describe("Testing basic workflow", function () { + describe("Cancel transaction", function () { + it("should cancel an ongoing execution with async unfinished yet step", async () => { + const transactionId = "transaction-to-cancel-id" + const step1 = createStep("step1", async () => { + return new StepResponse("step1") + }) + + const step2 = createStep("step2", async () => { + await setTimeout(500) + return new StepResponse("step2") + }) + + const step3 = createStep("step3", async () => { + return new StepResponse("step3") + }) + + const workflowId = "workflow-to-cancel-id" + ulid() + + createWorkflow( + { name: workflowId, retentionTime: 60 }, + function () { + step1() + step2().config({ async: true }) + step3() + + return new WorkflowResponse("finished") + } + ) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + await setTimeout(100) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + await setTimeout(1000) + + const execution = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(execution.length).toEqual(1) + expect(execution[0].state).toEqual(TransactionState.REVERTED) + }) + + it("should cancel a complete execution with a sync workflow running as async", async () => { + const workflowId = "workflow-to-cancel-id" + ulid() + const transactionId = "transaction-to-cancel-id" + const step1 = createStep("step1", async () => { + return new StepResponse("step1") + }) + + const step2 = createStep("step2", async () => { + return new StepResponse("step2") + }) + + const step3 = createStep("step3", async () => { + return new StepResponse("step3") + }) + + const subWorkflowId = "sub-workflow-id" + ulid() + const subWorkflow = createWorkflow( + { name: subWorkflowId, retentionTime: 60 }, + function () { + return new WorkflowResponse(step2()) + } + ) + + createWorkflow( + { name: workflowId, retentionTime: 60 }, + function () { + step1() + subWorkflow.runAsStep({ input: {} }).config({ async: true }) + step3() + + return new WorkflowResponse("finished") + } + ) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + await setTimeout(100) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + await setTimeout(500) + + const execution = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(execution.length).toEqual(1) + expect(execution[0].state).toEqual(TransactionState.REVERTED) + }) + + it("should cancel an ongoing execution with a sync workflow running as async", async () => { + const workflowId = "workflow-to-cancel-id" + ulid() + const transactionId = "transaction-to-cancel-id" + const step1 = createStep("step1", async () => { + return new StepResponse("step1") + }) + + const step2 = createStep("step2", async () => { + await setTimeout(500) + return new StepResponse("step2") + }) + + const step3 = createStep("step3", async () => { + return new StepResponse("step3") + }) + + const subWorkflowId = "sub-workflow-id" + ulid() + const subWorkflow = createWorkflow( + { name: subWorkflowId, retentionTime: 60 }, + function () { + return new WorkflowResponse(step2()) + } + ) + + createWorkflow( + { name: workflowId, retentionTime: 60 }, + function () { + step1() + subWorkflow.runAsStep({ input: {} }).config({ async: true }) + step3() + + return new WorkflowResponse("finished") + } + ) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + await setTimeout(100) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + await setTimeout(1000) + + const execution = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(execution.length).toEqual(1) + expect(execution[0].state).toEqual(TransactionState.REVERTED) + }) + + it("should cancel an ongoing execution with sync steps only", async () => { + const transactionId = "transaction-to-cancel-id" + const step1 = createStep("step1", async () => { + return new StepResponse("step1") + }) + + const step2 = createStep("step2", async () => { + await setTimeout(500) + return new StepResponse("step2") + }) + + const step3 = createStep("step3", async () => { + return new StepResponse("step3") + }) + + const workflowId = "workflow-to-cancel-id" + ulid() + + createWorkflow( + { name: workflowId, retentionTime: 60 }, + function () { + step1() + step2() + step3() + + return new WorkflowResponse("finished") + } + ) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + await setTimeout(100) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + await setTimeout(1000) + + const execution = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(execution.length).toEqual(1) + expect(execution[0].state).toEqual(TransactionState.REVERTED) + }) + }) + it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => { - const transactionId = "transaction_id" - const workflowId = "workflow_id" + ulid() + const transactionId = "concurrency_transaction_id" + const workflowId = "concurrency_workflow_id" + ulid() const step1 = createStep("step1", async () => { await setTimeout(100) @@ -170,10 +381,12 @@ moduleIntegrationTestRunner({ ) const [result1, result2] = await promiseAll([ - workflowOrcModule.run(workflowId, { - input: {}, - transactionId, - }), + workflowOrcModule + .run(workflowId, { + input: {}, + transactionId, + }) + .catch((e) => e), workflowOrcModule .run(workflowId, { input: {}, @@ -182,8 +395,8 @@ moduleIntegrationTestRunner({ .catch((e) => e), ]) - expect(result1.result).toEqual("step1") - expect(result2.message).toEqual( + expect(result1.result || result2.result).toEqual("step1") + expect(result2.message || result1.message).toEqual( "Transaction already started for transactionId: " + transactionId ) }) diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index acd3c438a8..978a8835b7 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -4,6 +4,7 @@ import { IDistributedSchedulerStorage, IDistributedTransactionStorage, SchedulerOptions, + SkipCancelledExecutionError, SkipExecutionError, TransactionCheckpoint, TransactionContext, @@ -632,6 +633,19 @@ export class RedisDistributedTransactionStorage throw new SkipExecutionError("Already finished by another execution") } + // First ensure that the latest execution was not cancelled, otherwise we skip the execution + const latestTransactionCancelledAt = latestUpdatedFlow.cancelledAt + const currentTransactionCancelledAt = currentFlow.cancelledAt + + if ( + !!latestTransactionCancelledAt && + currentTransactionCancelledAt == null + ) { + throw new SkipCancelledExecutionError( + "Workflow execution has been cancelled during the execution" + ) + } + const currentFlowSteps = Object.values(currentFlow.steps || {}) const latestUpdatedFlowSteps = latestUpdatedFlow.steps ? Object.values(