From 7fdbf2a965a1bcb26d28820ed357be3b82768077 Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Wed, 14 May 2025 15:28:16 +0200 Subject: [PATCH] fix(workflows-sdk): Miss match context usage within run as step (#12449) **What** Currently, runAsStep keep reference of the workflow context that is being run as step, except that the step is composed for the current workflow composition and not the workflow being run as a step. Therefore, the context are currently miss matched leading to wrong configuration being used in case of async workflows. **BUG** This fix allow the runAsStep to use the current composition context to configure the step for the sub workflow to be run **BUG BREAKING** fix the step config wrongly used to wrap async step handlers. Now steps configured async through .config that returns a new step response will indeed marked itself as success without the need for background execution or calling setStepSuccess (as it was expected originally) **FEATURE** This pr also add support for cancelling running transaction, the transaction will be marked as being cancelled, once the current step finished, it will cancel the transaction to start compensating all previous steps including itself Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com> --- .../orchestration/src/transaction/errors.ts | 16 ++ .../transaction/transaction-orchestrator.ts | 104 ++++++-- .../src/workflow/local-workflow.ts | 10 +- .../src/utils/composer/create-step.ts | 2 +- .../src/utils/composer/create-workflow.ts | 15 +- .../__fixtures__/workflow_1.ts | 5 +- .../__fixtures__/workflow_2.ts | 5 +- .../__fixtures__/workflow_conditional_step.ts | 3 +- .../__fixtures__/workflow_idempotent.ts | 3 +- .../workflow_not_idempotent_with_retention.ts | 3 +- .../__fixtures__/workflow_sync.ts | 3 +- .../integration-tests/__tests__/index.spec.ts | 219 +++++++++++++++- .../utils/workflow-orchestrator-storage.ts | 14 ++ .../__fixtures__/workflow_1.ts | 5 +- .../__fixtures__/workflow_2.ts | 5 +- .../workflow_not_idempotent_with_retention.ts | 3 +- .../__fixtures__/workflow_sync.ts | 3 +- .../integration-tests/__tests__/index.spec.ts | 235 +++++++++++++++++- .../utils/workflow-orchestrator-storage.ts | 14 ++ 19 files changed, 603 insertions(+), 64 deletions(-) diff --git a/packages/core/orchestration/src/transaction/errors.ts b/packages/core/orchestration/src/transaction/errors.ts index fb94e45db0..627df65aa3 100644 --- a/packages/core/orchestration/src/transaction/errors.ts +++ b/packages/core/orchestration/src/transaction/errors.ts @@ -98,3 +98,19 @@ export class SkipExecutionError extends Error { this.name = "SkipExecutionError" } } + +export class SkipCancelledExecutionError extends Error { + static isSkipCancelledExecutionError( + error: Error + ): error is SkipCancelledExecutionError { + return ( + error instanceof SkipCancelledExecutionError || + error?.name === "SkipCancelledExecutionError" + ) + } + + constructor(message?: string) { + super(message) + this.name = "SkipCancelledExecutionError" + } +} diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 6d42cf2172..1a9330faf3 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -31,6 +31,7 @@ import { import { EventEmitter } from "events" import { PermanentStepFailureError, + SkipCancelledExecutionError, SkipExecutionError, SkipStepResponse, TransactionStepTimeoutError, @@ -494,6 +495,7 @@ export class TransactionOrchestrator extends EventEmitter { response: unknown ): Promise<{ stopExecution: boolean + transactionIsCancelling?: boolean }> { const hasStepTimedOut = step.getStates().state === TransactionStepState.TIMEOUT @@ -519,10 +521,20 @@ export class TransactionOrchestrator extends EventEmitter { } let shouldEmit = true + let transactionIsCancelling = false try { await transaction.saveCheckpoint() } catch (error) { - shouldEmit = false + if ( + !SkipCancelledExecutionError.isSkipCancelledExecutionError(error) && + !SkipExecutionError.isSkipExecutionError(error) + ) { + throw error + } + + transactionIsCancelling = + SkipCancelledExecutionError.isSkipCancelledExecutionError(error) + shouldEmit = !SkipExecutionError.isSkipExecutionError(error) } const cleaningUp: Promise[] = [] @@ -544,6 +556,7 @@ export class TransactionOrchestrator extends EventEmitter { return { stopExecution: !shouldEmit, + transactionIsCancelling, } } @@ -555,6 +568,7 @@ export class TransactionOrchestrator extends EventEmitter { step: TransactionStep }): Promise<{ stopExecution: boolean + transactionIsCancelling?: boolean }> { const hasStepTimedOut = step.getStates().state === TransactionStepState.TIMEOUT @@ -565,13 +579,22 @@ export class TransactionOrchestrator extends EventEmitter { } let shouldEmit = true + let transactionIsCancelling = false try { await transaction.saveCheckpoint() } catch (error) { + if ( + !SkipCancelledExecutionError.isSkipCancelledExecutionError(error) && + !SkipExecutionError.isSkipExecutionError(error) + ) { + throw error + } + + transactionIsCancelling = + SkipCancelledExecutionError.isSkipCancelledExecutionError(error) + if (SkipExecutionError.isSkipExecutionError(error)) { shouldEmit = false - } else { - throw error } } @@ -592,6 +615,7 @@ export class TransactionOrchestrator extends EventEmitter { return { stopExecution: !shouldEmit, + transactionIsCancelling, } } @@ -649,6 +673,7 @@ export class TransactionOrchestrator extends EventEmitter { timeoutError?: TransactionStepTimeoutError | TransactionTimeoutError ): Promise<{ stopExecution: boolean + transactionIsCancelling?: boolean }> { if (SkipExecutionError.isSkipExecutionError(error)) { return { @@ -734,14 +759,23 @@ export class TransactionOrchestrator extends EventEmitter { } } + let transactionIsCancelling = false let shouldEmit = true try { await transaction.saveCheckpoint() } catch (error) { + if ( + !SkipCancelledExecutionError.isSkipCancelledExecutionError(error) && + !SkipExecutionError.isSkipExecutionError(error) + ) { + throw error + } + + transactionIsCancelling = + SkipCancelledExecutionError.isSkipCancelledExecutionError(error) + if (SkipExecutionError.isSkipExecutionError(error)) { shouldEmit = false - } else { - throw error } } @@ -760,6 +794,7 @@ export class TransactionOrchestrator extends EventEmitter { return { stopExecution: !shouldEmit, + transactionIsCancelling, } } @@ -785,15 +820,30 @@ export class TransactionOrchestrator extends EventEmitter { return } - const execution: Promise[] = [] - for (const step of nextSteps.next) { + const stepsShouldContinueExecution = nextSteps.next.map((step) => { const { shouldContinueExecution } = this.prepareStepForExecution( step, flow ) - // Should stop the execution if next step cant be handled - if (!shouldContinueExecution) { + return shouldContinueExecution + }) + + await transaction.saveCheckpoint().catch((error) => { + if (SkipExecutionError.isSkipExecutionError(error)) { + continueExecution = false + return + } + + throw error + }) + + const execution: Promise[] = [] + + let i = 0 + for (const step of nextSteps.next) { + const stepIndex = i++ + if (!stepsShouldContinueExecution[stepIndex]) { continue } @@ -813,16 +863,6 @@ export class TransactionOrchestrator extends EventEmitter { // Compute current transaction state await this.computeCurrentTransactionState(transaction) - // Save checkpoint before executing step - await transaction.saveCheckpoint().catch((error) => { - if (SkipExecutionError.isSkipExecutionError(error)) { - continueExecution = false - return - } - - throw error - }) - if (!continueExecution) { break } @@ -1130,6 +1170,10 @@ export class TransactionOrchestrator extends EventEmitter { response ) + if (ret.transactionIsCancelling) { + return await this.cancelTransaction(transaction) + } + if (isAsync && !ret.stopExecution) { // Schedule to continue the execution of async steps because they are not awaited on purpose and can be handled by another machine await transaction.scheduleRetry(step, 0) @@ -1156,12 +1200,16 @@ export class TransactionOrchestrator extends EventEmitter { ) } - await TransactionOrchestrator.setStepFailure( + const ret = await TransactionOrchestrator.setStepFailure( transaction, step, error, isPermanent ? 0 : step.definition.maxRetries ) + + if (ret.transactionIsCancelling) { + return await this.cancelTransaction(transaction) + } } /** @@ -1245,6 +1293,8 @@ export class TransactionOrchestrator extends EventEmitter { flow.state = TransactionState.WAITING_TO_COMPENSATE flow.cancelledAt = Date.now() + await transaction.saveCheckpoint() + await this.executeNext(transaction) } @@ -1667,12 +1717,17 @@ export class TransactionOrchestrator extends EventEmitter { transaction: curTransaction, }) - await TransactionOrchestrator.setStepSuccess( + const ret = await TransactionOrchestrator.setStepSuccess( curTransaction, step, response ) + if (ret.transactionIsCancelling) { + await this.cancelTransaction(curTransaction) + return curTransaction + } + await this.executeNext(curTransaction) } else { throw new MedusaError( @@ -1721,13 +1776,18 @@ export class TransactionOrchestrator extends EventEmitter { transaction: curTransaction, }) - await TransactionOrchestrator.setStepFailure( + const ret = await TransactionOrchestrator.setStepFailure( curTransaction, step, error, 0 ) + if (ret.transactionIsCancelling) { + await this.cancelTransaction(curTransaction) + return curTransaction + } + await this.executeNext(curTransaction) } else { throw new MedusaError( diff --git a/packages/core/orchestration/src/workflow/local-workflow.ts b/packages/core/orchestration/src/workflow/local-workflow.ts index 15ab1a1abb..08dde264f7 100644 --- a/packages/core/orchestration/src/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/workflow/local-workflow.ts @@ -402,10 +402,18 @@ export class LocalWorkflow { this.medusaContext = context const { orchestrator } = this.workflow - const transaction = isString(transactionOrTransactionId) + let transaction = isString(transactionOrTransactionId) ? await this.getRunningTransaction(transactionOrTransactionId, context) : transactionOrTransactionId + // not a distributed transaction instance + if (!transaction.getFlow) { + transaction = await this.getRunningTransaction( + (transaction as any).flow.transactionId, + context + ) + } + if (this.medusaContext) { this.medusaContext.eventGroupId = transaction.getFlow().metadata?.eventGroupId diff --git a/packages/core/workflows-sdk/src/utils/composer/create-step.ts b/packages/core/workflows-sdk/src/utils/composer/create-step.ts index 9fa20f2364..9e7a582e04 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-step.ts @@ -182,7 +182,7 @@ export function applyStep< compensateFn, }) - wrapAsyncHandler(stepConfig, handler) + wrapAsyncHandler(newConfig, handler) this.handlers.set(newStepName, handler) diff --git a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts index 74f35e3e42..097e251b9b 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts @@ -184,11 +184,16 @@ export function createWorkflow( }: { input: TData }): ReturnType> => { + // Get current workflow composition context + const workflowCompositionContext = + global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] + + const runAsAsync = workflowCompositionContext.isAsync || context.isAsync const step = createStep( { name: `${name}-as-step`, - async: context.isAsync, - nested: context.isAsync, // if async we flag this is a nested transaction + async: runAsAsync, + nested: runAsAsync, // if async we flag this is a nested transaction }, async (stepInput: TData, stepContext) => { const { container, ...sharedContext } = stepContext @@ -206,7 +211,7 @@ export function createWorkflow( } let transaction - if (workflowEngine && context.isAsync) { + if (workflowEngine && runAsAsync) { transaction = await workflowEngine.run(name, { input: stepInput as any, context: executionContext, @@ -221,7 +226,7 @@ export function createWorkflow( return new StepResponse( transaction.result, - context.isAsync ? stepContext.transactionId : transaction + runAsAsync ? stepContext.transactionId : transaction ) }, async (transaction, stepContext) => { @@ -246,7 +251,7 @@ export function createWorkflow( const transactionId = step.__step__ + "-" + stepContext.transactionId - if (workflowEngine && context.isAsync) { + if (workflowEngine && runAsAsync) { await workflowEngine.cancel(name, { transactionId: transactionId, context: executionContext, diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1.ts index 8cdcc73123..d9caec12b1 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_1.ts @@ -1,3 +1,4 @@ +import { isPresent } from "@medusajs/framework/utils" import { createStep, createWorkflow, @@ -24,7 +25,7 @@ const step_1 = createStep( const step_2 = createStep( "step_2", jest.fn((input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } }), @@ -53,7 +54,7 @@ createWorkflow("workflow_1", function (input) { const ret2 = step_2({ hey: "oh" }) - step_2({ hey: "async hello" }).config({ + step_2().config({ name: "new_step_name", async: true, }) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_2.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_2.ts index b85a54e9be..24e96d07de 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_2.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_2.ts @@ -1,3 +1,4 @@ +import { isPresent } from "@medusajs/framework/utils" import { createStep, createWorkflow, @@ -22,7 +23,7 @@ const step_1 = createStep( ) export const workflow2Step2Invoke = jest.fn((input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } }) @@ -57,7 +58,7 @@ createWorkflow( step_2({ hey: "oh" }) - const ret2_async = step_2({ hey: "async hello" }).config({ + const ret2_async = step_2().config({ name: "new_step_name", async: true, }) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_conditional_step.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_conditional_step.ts index 813b1b4738..899b08189d 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_conditional_step.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_conditional_step.ts @@ -1,3 +1,4 @@ +import { isPresent } from "@medusajs/framework/utils" import { createStep, createWorkflow, @@ -14,7 +15,7 @@ const step_1 = createStep( ) export const conditionalStep2Invoke = jest.fn((input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } }) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_idempotent.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_idempotent.ts index 074db595a7..df5b3e72f5 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_idempotent.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_idempotent.ts @@ -1,3 +1,4 @@ +import { isPresent } from "@medusajs/framework/utils" import { createStep, createWorkflow, @@ -25,7 +26,7 @@ const step_1 = createStep( const step_2 = createStep( "step_2", jest.fn((input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } }), diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts index dd179f4b34..647e50115b 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts @@ -1,3 +1,4 @@ +import { isPresent } from "@medusajs/framework/utils" import { createStep, createWorkflow, @@ -23,7 +24,7 @@ const step_1 = createStep( export const workflowNotIdempotentWithRetentionStep2Invoke = jest.fn( (input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } } diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync.ts index 40fc5fb822..8c87afa4e1 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_sync.ts @@ -4,6 +4,7 @@ import { StepResponse, WorkflowResponse, } from "@medusajs/framework/workflows-sdk" +import { isPresent } from "@medusajs/framework/utils" const step_1 = createStep( "step_1", @@ -25,7 +26,7 @@ const step_1 = createStep( const step_2 = createStep( "step_2", jest.fn((input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } }), diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index b97a5698a6..6632842364 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -23,9 +23,9 @@ import { import { moduleIntegrationTestRunner } from "@medusajs/test-utils" import { WorkflowsModuleService } from "@services" import { asFunction } from "awilix" -import { ulid } from "ulid" import { setTimeout as setTimeoutSync } from "timers" import { setTimeout as setTimeoutPromise } from "timers/promises" +import { ulid } from "ulid" import "../__fixtures__" import { conditionalStep2Invoke, @@ -110,6 +110,205 @@ moduleIntegrationTestRunner({ }) }) + describe("Cancel transaction", function () { + it("should cancel an ongoing execution with async unfinished yet step", async () => { + const transactionId = "transaction-to-cancel-id" + const step1 = createStep("step1", async () => { + return new StepResponse("step1") + }) + + const step2 = createStep("step2", async () => { + await setTimeoutPromise(500) + return new StepResponse("step2") + }) + + const step3 = createStep("step3", async () => { + return new StepResponse("step3") + }) + + const workflowId = "workflow-to-cancel-id" + ulid() + + createWorkflow({ name: workflowId, retentionTime: 60 }, function () { + step1() + step2().config({ async: true }) + step3() + + return new WorkflowResponse("finished") + }) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + await setTimeoutPromise(100) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + await setTimeoutPromise(1000) + + const execution = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(execution.length).toEqual(1) + expect(execution[0].state).toEqual(TransactionState.REVERTED) + }) + + it("should cancel a complete execution with a sync workflow running as async", async () => { + const workflowId = "workflow-to-cancel-id" + ulid() + const transactionId = "transaction-to-cancel-id" + const step1 = createStep("step1", async () => { + return new StepResponse("step1") + }) + + const step2 = createStep("step2", async () => { + return new StepResponse("step2") + }) + + const step3 = createStep("step3", async () => { + return new StepResponse("step3") + }) + + const subWorkflowId = "sub-workflow-id" + ulid() + const subWorkflow = createWorkflow( + { name: subWorkflowId, retentionTime: 60 }, + function () { + return new WorkflowResponse(step2()) + } + ) + + createWorkflow({ name: workflowId, retentionTime: 60 }, function () { + step1() + subWorkflow.runAsStep({ input: {} }).config({ async: true }) + step3() + + return new WorkflowResponse("finished") + }) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + await setTimeoutPromise(100) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + await setTimeoutPromise(500) + + const execution = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(execution.length).toEqual(1) + expect(execution[0].state).toEqual(TransactionState.REVERTED) + }) + + it("should cancel an ongoing execution with a sync workflow running as async", async () => { + const workflowId = "workflow-to-cancel-id" + ulid() + const transactionId = "transaction-to-cancel-id" + const step1 = createStep("step1", async () => { + return new StepResponse("step1") + }) + + const step2 = createStep("step2", async () => { + await setTimeoutPromise(500) + return new StepResponse("step2") + }) + + const step3 = createStep("step3", async () => { + return new StepResponse("step3") + }) + + const subWorkflowId = "sub-workflow-id" + ulid() + const subWorkflow = createWorkflow( + { name: subWorkflowId, retentionTime: 60 }, + function () { + return new WorkflowResponse(step2()) + } + ) + + createWorkflow({ name: workflowId, retentionTime: 60 }, function () { + step1() + subWorkflow.runAsStep({ input: {} }).config({ async: true }) + step3() + + return new WorkflowResponse("finished") + }) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + await setTimeoutPromise(100) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + await setTimeoutPromise(1000) + + const execution = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(execution.length).toEqual(1) + expect(execution[0].state).toEqual(TransactionState.REVERTED) + }) + + it("should cancel an ongoing execution with sync steps only", async () => { + const transactionId = "transaction-to-cancel-id" + const step1 = createStep("step1", async () => { + return new StepResponse("step1") + }) + + const step2 = createStep("step2", async () => { + await setTimeoutPromise(500) + return new StepResponse("step2") + }) + + const step3 = createStep("step3", async () => { + return new StepResponse("step3") + }) + + const workflowId = "workflow-to-cancel-id" + ulid() + + createWorkflow({ name: workflowId, retentionTime: 60 }, function () { + step1() + step2() + step3() + + return new WorkflowResponse("finished") + }) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + await setTimeoutPromise(100) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + await setTimeoutPromise(1000) + + const execution = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(execution.length).toEqual(1) + expect(execution[0].state).toEqual(TransactionState.REVERTED) + }) + }) + it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => { const transactionId = "transaction_id" const workflowId = "workflow_id" + ulid() @@ -130,10 +329,12 @@ moduleIntegrationTestRunner({ ) const [result1, result2] = await promiseAll([ - workflowOrcModule.run(workflowId, { - input: {}, - transactionId, - }), + workflowOrcModule + .run(workflowId, { + input: {}, + transactionId, + }) + .catch((e) => e), workflowOrcModule .run(workflowId, { input: {}, @@ -142,8 +343,8 @@ moduleIntegrationTestRunner({ .catch((e) => e), ]) - expect(result1.result).toEqual("step1") - expect(result2.message).toEqual( + expect(result1.result || result2.result).toEqual("step1") + expect(result2.message || result1.message).toEqual( "Transaction already started for transactionId: " + transactionId ) }) @@ -362,9 +563,7 @@ moduleIntegrationTestRunner({ expect(workflow2Step2Invoke).toHaveBeenCalledTimes(2) expect(workflow2Step2Invoke.mock.calls[0][0]).toEqual({ hey: "oh" }) - expect(workflow2Step2Invoke.mock.calls[1][0]).toEqual({ - hey: "async hello", - }) + expect(workflow2Step2Invoke.mock.calls[1][0]).toEqual({}) expect(workflow2Step3Invoke).toHaveBeenCalledTimes(1) expect(workflow2Step3Invoke.mock.calls[0][0]).toEqual({ diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 59f0e493fd..14553aeb38 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -3,6 +3,7 @@ import { IDistributedSchedulerStorage, IDistributedTransactionStorage, SchedulerOptions, + SkipCancelledExecutionError, SkipExecutionError, TransactionCheckpoint, TransactionContext, @@ -290,6 +291,19 @@ export class InMemoryDistributedTransactionStorage throw new SkipExecutionError("Already finished by another execution") } + // First ensure that the latest execution was not cancelled, otherwise we skip the execution + const latestTransactionCancelledAt = latestUpdatedFlow.cancelledAt + const currentTransactionCancelledAt = currentFlow.cancelledAt + + if ( + !!latestTransactionCancelledAt && + currentTransactionCancelledAt == null + ) { + throw new SkipCancelledExecutionError( + "Workflow execution has been cancelled during the execution" + ) + } + const currentFlowSteps = Object.values(currentFlow.steps || {}) const latestUpdatedFlowSteps = latestUpdatedFlow.steps ? Object.values( diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1.ts index 19738012b5..3244c3f814 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_1.ts @@ -1,3 +1,4 @@ +import { isPresent } from "@medusajs/framework/utils" import { createStep, createWorkflow, @@ -25,7 +26,7 @@ const step_1 = createStep( const step_2 = createStep( "step_2", jest.fn((input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } }), @@ -54,7 +55,7 @@ createWorkflow("workflow_1", function (input) { const ret2 = step_2({ hey: "oh" }) - step_2({ hey: "async hello" }).config({ + step_2().config({ name: "new_step_name", async: true, }) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts index 653567f999..cdb8c16738 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts @@ -1,3 +1,4 @@ +import { isPresent } from "@medusajs/framework/utils" import { createStep, createWorkflow, @@ -26,7 +27,7 @@ const step_1 = createStep( const step_2 = createStep( "step_2", jest.fn((input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } }), @@ -68,7 +69,7 @@ createWorkflow( const ret2 = step_2({ hey: "oh" }) - step_2({ hey: "async hello" }).config({ + step_2().config({ name: "new_step_name", async: true, }) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts index dd179f4b34..c85959cee7 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_not_idempotent_with_retention.ts @@ -3,6 +3,7 @@ import { createWorkflow, StepResponse, } from "@medusajs/framework/workflows-sdk" +import { isPresent } from "@medusajs/framework/utils" const step_1 = createStep( "step_1", @@ -23,7 +24,7 @@ const step_1 = createStep( export const workflowNotIdempotentWithRetentionStep2Invoke = jest.fn( (input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } } diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync.ts index 40fc5fb822..8c87afa4e1 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_sync.ts @@ -4,6 +4,7 @@ import { StepResponse, WorkflowResponse, } from "@medusajs/framework/workflows-sdk" +import { isPresent } from "@medusajs/framework/utils" const step_1 = createStep( "step_1", @@ -25,7 +26,7 @@ const step_1 = createStep( const step_2 = createStep( "step_2", jest.fn((input, context) => { - if (input) { + if (isPresent(input)) { return new StepResponse({ notAsyncResponse: input.hey }) } }), diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 26cadc61fa..f6a6c6f612 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -30,15 +30,15 @@ import { moduleIntegrationTestRunner } from "@medusajs/test-utils" import { asValue } from "awilix" import { setTimeout as setTimeoutSync } from "timers" import { setTimeout } from "timers/promises" +import { ulid } from "ulid" import { WorkflowsModuleService } from "../../src/services" import "../__fixtures__" -import { createScheduled } from "../__fixtures__/workflow_scheduled" -import { TestDatabase } from "../utils" import { workflowNotIdempotentWithRetentionStep2Invoke, workflowNotIdempotentWithRetentionStep3Invoke, } from "../__fixtures__" -import { ulid } from "ulid" +import { createScheduled } from "../__fixtures__/workflow_scheduled" +import { TestDatabase } from "../utils" jest.setTimeout(300000) @@ -150,9 +150,220 @@ moduleIntegrationTestRunner({ }) describe("Testing basic workflow", function () { + describe("Cancel transaction", function () { + it("should cancel an ongoing execution with async unfinished yet step", async () => { + const transactionId = "transaction-to-cancel-id" + const step1 = createStep("step1", async () => { + return new StepResponse("step1") + }) + + const step2 = createStep("step2", async () => { + await setTimeout(500) + return new StepResponse("step2") + }) + + const step3 = createStep("step3", async () => { + return new StepResponse("step3") + }) + + const workflowId = "workflow-to-cancel-id" + ulid() + + createWorkflow( + { name: workflowId, retentionTime: 60 }, + function () { + step1() + step2().config({ async: true }) + step3() + + return new WorkflowResponse("finished") + } + ) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + await setTimeout(100) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + await setTimeout(1000) + + const execution = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(execution.length).toEqual(1) + expect(execution[0].state).toEqual(TransactionState.REVERTED) + }) + + it("should cancel a complete execution with a sync workflow running as async", async () => { + const workflowId = "workflow-to-cancel-id" + ulid() + const transactionId = "transaction-to-cancel-id" + const step1 = createStep("step1", async () => { + return new StepResponse("step1") + }) + + const step2 = createStep("step2", async () => { + return new StepResponse("step2") + }) + + const step3 = createStep("step3", async () => { + return new StepResponse("step3") + }) + + const subWorkflowId = "sub-workflow-id" + ulid() + const subWorkflow = createWorkflow( + { name: subWorkflowId, retentionTime: 60 }, + function () { + return new WorkflowResponse(step2()) + } + ) + + createWorkflow( + { name: workflowId, retentionTime: 60 }, + function () { + step1() + subWorkflow.runAsStep({ input: {} }).config({ async: true }) + step3() + + return new WorkflowResponse("finished") + } + ) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + await setTimeout(100) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + await setTimeout(500) + + const execution = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(execution.length).toEqual(1) + expect(execution[0].state).toEqual(TransactionState.REVERTED) + }) + + it("should cancel an ongoing execution with a sync workflow running as async", async () => { + const workflowId = "workflow-to-cancel-id" + ulid() + const transactionId = "transaction-to-cancel-id" + const step1 = createStep("step1", async () => { + return new StepResponse("step1") + }) + + const step2 = createStep("step2", async () => { + await setTimeout(500) + return new StepResponse("step2") + }) + + const step3 = createStep("step3", async () => { + return new StepResponse("step3") + }) + + const subWorkflowId = "sub-workflow-id" + ulid() + const subWorkflow = createWorkflow( + { name: subWorkflowId, retentionTime: 60 }, + function () { + return new WorkflowResponse(step2()) + } + ) + + createWorkflow( + { name: workflowId, retentionTime: 60 }, + function () { + step1() + subWorkflow.runAsStep({ input: {} }).config({ async: true }) + step3() + + return new WorkflowResponse("finished") + } + ) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + await setTimeout(100) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + await setTimeout(1000) + + const execution = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(execution.length).toEqual(1) + expect(execution[0].state).toEqual(TransactionState.REVERTED) + }) + + it("should cancel an ongoing execution with sync steps only", async () => { + const transactionId = "transaction-to-cancel-id" + const step1 = createStep("step1", async () => { + return new StepResponse("step1") + }) + + const step2 = createStep("step2", async () => { + await setTimeout(500) + return new StepResponse("step2") + }) + + const step3 = createStep("step3", async () => { + return new StepResponse("step3") + }) + + const workflowId = "workflow-to-cancel-id" + ulid() + + createWorkflow( + { name: workflowId, retentionTime: 60 }, + function () { + step1() + step2() + step3() + + return new WorkflowResponse("finished") + } + ) + + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + }) + + await setTimeout(100) + + await workflowOrcModule.cancel(workflowId, { + transactionId, + }) + + await setTimeout(1000) + + const execution = await workflowOrcModule.listWorkflowExecutions({ + transaction_id: transactionId, + }) + + expect(execution.length).toEqual(1) + expect(execution[0].state).toEqual(TransactionState.REVERTED) + }) + }) + it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => { - const transactionId = "transaction_id" - const workflowId = "workflow_id" + ulid() + const transactionId = "concurrency_transaction_id" + const workflowId = "concurrency_workflow_id" + ulid() const step1 = createStep("step1", async () => { await setTimeout(100) @@ -170,10 +381,12 @@ moduleIntegrationTestRunner({ ) const [result1, result2] = await promiseAll([ - workflowOrcModule.run(workflowId, { - input: {}, - transactionId, - }), + workflowOrcModule + .run(workflowId, { + input: {}, + transactionId, + }) + .catch((e) => e), workflowOrcModule .run(workflowId, { input: {}, @@ -182,8 +395,8 @@ moduleIntegrationTestRunner({ .catch((e) => e), ]) - expect(result1.result).toEqual("step1") - expect(result2.message).toEqual( + expect(result1.result || result2.result).toEqual("step1") + expect(result2.message || result1.message).toEqual( "Transaction already started for transactionId: " + transactionId ) }) diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index acd3c438a8..978a8835b7 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -4,6 +4,7 @@ import { IDistributedSchedulerStorage, IDistributedTransactionStorage, SchedulerOptions, + SkipCancelledExecutionError, SkipExecutionError, TransactionCheckpoint, TransactionContext, @@ -632,6 +633,19 @@ export class RedisDistributedTransactionStorage throw new SkipExecutionError("Already finished by another execution") } + // First ensure that the latest execution was not cancelled, otherwise we skip the execution + const latestTransactionCancelledAt = latestUpdatedFlow.cancelledAt + const currentTransactionCancelledAt = currentFlow.cancelledAt + + if ( + !!latestTransactionCancelledAt && + currentTransactionCancelledAt == null + ) { + throw new SkipCancelledExecutionError( + "Workflow execution has been cancelled during the execution" + ) + } + const currentFlowSteps = Object.values(currentFlow.steps || {}) const latestUpdatedFlowSteps = latestUpdatedFlow.steps ? Object.values(