From 7e93eda1a44310311d2f3f8a1d634f60e7c48cb5 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Date: Fri, 22 Mar 2024 11:03:06 -0300 Subject: [PATCH] feat(orchestration,workflows-sdk,core-flows): workflow cancel (#6778) --- .changeset/nine-parrots-deliver.md | 7 ++ .../cart/store/cart.workflows.spec.ts | 107 ++++++++++++++++++ .../definition/cart/steps/update-tax-lines.ts | 18 ++- .../transaction/transaction-orchestrator.ts | 2 +- .../src/workflow/local-workflow.ts | 10 +- .../src/helper/__tests__/compose.ts | 36 ++++++ .../helper/__tests__/workflow-export.spec.ts | 13 ++- .../src/helper/workflow-export.ts | 94 +++++++++++++-- .../src/utils/composer/create-step.ts | 6 +- 9 files changed, 271 insertions(+), 22 deletions(-) create mode 100644 .changeset/nine-parrots-deliver.md diff --git a/.changeset/nine-parrots-deliver.md b/.changeset/nine-parrots-deliver.md new file mode 100644 index 0000000000..5217807cf5 --- /dev/null +++ b/.changeset/nine-parrots-deliver.md @@ -0,0 +1,7 @@ +--- +"@medusajs/orchestration": patch +"@medusajs/workflows-sdk": patch +"@medusajs/core-flows": patch +--- + +Feat: workflow cancel diff --git a/integration-tests/modules/__tests__/cart/store/cart.workflows.spec.ts b/integration-tests/modules/__tests__/cart/store/cart.workflows.spec.ts index 98228cfeec..483c42136e 100644 --- a/integration-tests/modules/__tests__/cart/store/cart.workflows.spec.ts +++ b/integration-tests/modules/__tests__/cart/store/cart.workflows.spec.ts @@ -204,6 +204,113 @@ medusaIntegrationTestRunner({ ) }) + it("should revert if the cart creation fails", async () => { + const region = await regionModuleService.create({ + name: "US", + currency_code: "usd", + }) + + const salesChannel = await scModuleService.create({ + name: "Webshop", + }) + + const location = await stockLocationModule.create({ + name: "Warehouse", + }) + + const [product] = await productModule.create([ + { + title: "Test product", + variants: [ + { + title: "Test variant", + }, + ], + }, + ]) + + const inventoryItem = await inventoryModule.create({ + sku: "inv-1234", + }) + + await inventoryModule.createInventoryLevels([ + { + inventory_item_id: inventoryItem.id, + location_id: location.id, + stocked_quantity: 2, + reserved_quantity: 0, + }, + ]) + + const priceSet = await pricingModule.create({ + prices: [ + { + amount: 3000, + currency_code: "usd", + }, + ], + }) + + await remoteLink.create([ + { + [Modules.PRODUCT]: { + variant_id: product.variants[0].id, + }, + [Modules.PRICING]: { + price_set_id: priceSet.id, + }, + }, + { + [Modules.SALES_CHANNEL]: { + sales_channel_id: salesChannel.id, + }, + [Modules.STOCK_LOCATION]: { + stock_location_id: location.id, + }, + }, + { + [Modules.PRODUCT]: { + variant_id: product.variants[0].id, + }, + [Modules.INVENTORY]: { + inventory_item_id: inventoryItem.id, + }, + }, + ]) + + const workflow = createCartWorkflow(appContainer) + + workflow.addAction( + "throw", + { + invoke: async function failStep() { + throw new Error(`Failed to create cart`) + }, + }, + { + noCompensation: true, + } + ) + + const { transaction } = await workflow.run({ + throwOnError: false, + input: { + email: "tony@stark.com", + currency_code: "usd", + region_id: region.id, + sales_channel_id: salesChannel.id, + items: [ + { + variant_id: product.variants[0].id, + quantity: 1, + }, + ], + }, + }) + + expect(transaction.flow.state).toEqual("reverted") + }) + it("should throw when no regions exist", async () => { await regionModuleService.delete(defaultRegion.id) diff --git a/packages/core-flows/src/definition/cart/steps/update-tax-lines.ts b/packages/core-flows/src/definition/cart/steps/update-tax-lines.ts index 399251f8c1..444d96cffb 100644 --- a/packages/core-flows/src/definition/cart/steps/update-tax-lines.ts +++ b/packages/core-flows/src/definition/cart/steps/update-tax-lines.ts @@ -16,10 +16,20 @@ interface StepInput { export const updateTaxLinesStepId = "update-tax-lines-step" export const updateTaxLinesStep = createStep( updateTaxLinesStepId, - async (input: StepInput, { container }) => { - // TODO: manually trigger rollback on workflow when step fails - await updateTaxLinesWorkflow(container).run({ input }) + async (input: StepInput, { container, idempotencyKey }) => { + const { transaction } = await updateTaxLinesWorkflow(container).run({ + input, + }) - return new StepResponse(null) + return new StepResponse(null, { transaction }) + }, + async (flow, { container }) => { + if (!flow) { + return + } + + await updateTaxLinesWorkflow(container).cancel({ + transaction: flow.transaction, + }) } ) diff --git a/packages/orchestration/src/transaction/transaction-orchestrator.ts b/packages/orchestration/src/transaction/transaction-orchestrator.ts index 7a7fa54c2a..e248b6e68e 100644 --- a/packages/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/orchestration/src/transaction/transaction-orchestrator.ts @@ -781,7 +781,7 @@ export class TransactionOrchestrator extends EventEmitter { if (flow.state === TransactionState.FAILED) { throw new MedusaError( MedusaError.Types.NOT_ALLOWED, - `Cannot revert a perment failed transaction.` + `Cannot revert a permanent failed transaction.` ) } diff --git a/packages/orchestration/src/workflow/local-workflow.ts b/packages/orchestration/src/workflow/local-workflow.ts index d82879985e..a9a58691eb 100644 --- a/packages/orchestration/src/workflow/local-workflow.ts +++ b/packages/orchestration/src/workflow/local-workflow.ts @@ -5,6 +5,7 @@ import { MedusaModuleType, createMedusaContainer, isDefined, + isString, } from "@medusajs/utils" import { asValue } from "awilix" import { @@ -348,17 +349,16 @@ export class LocalWorkflow { } async cancel( - uniqueTransactionId: string, + transactionOrTransactionId: string | DistributedTransaction, context?: Context, subscribe?: DistributedTransactionEvents ) { this.medusaContext = context const { orchestrator } = this.workflow - const transaction = await this.getRunningTransaction( - uniqueTransactionId, - context - ) + const transaction = isString(transactionOrTransactionId) + ? await this.getRunningTransaction(transactionOrTransactionId, context) + : transactionOrTransactionId const { cleanUpEventListeners } = this.registerEventCallbacks({ orchestrator, diff --git a/packages/workflows-sdk/src/helper/__tests__/compose.ts b/packages/workflows-sdk/src/helper/__tests__/compose.ts index b0c594596b..cddc739d40 100644 --- a/packages/workflows-sdk/src/helper/__tests__/compose.ts +++ b/packages/workflows-sdk/src/helper/__tests__/compose.ts @@ -2086,4 +2086,40 @@ describe("Workflow composer", function () { }, ]) }) + + it("should cancel the workflow after completed", async () => { + const mockStep1Fn = jest.fn().mockImplementation(function (input) { + return new StepResponse({ obj: "return from 1" }, { data: "data" }) + }) + + const mockCompensateSte1 = jest.fn().mockImplementation(function (input) { + return input + }) + + const step1 = createStep("step1", mockStep1Fn, mockCompensateSte1) + + const workflow = createWorkflow("workflow1", function (input) { + return step1(input) + }) + + const workflowInput = { test: "payload1" } + const { transaction } = await workflow().run({ + input: workflowInput, + throwOnError: false, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockCompensateSte1).toHaveBeenCalledTimes(0) + + await workflow().cancel({ + transaction, + throwOnError: false, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockCompensateSte1).toHaveBeenCalledTimes(1) + + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + expect(mockCompensateSte1.mock.calls[0][0]).toEqual({ data: "data" }) + }) }) diff --git a/packages/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts b/packages/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts index 23161e1afa..2cc329e125 100644 --- a/packages/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts +++ b/packages/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts @@ -1,5 +1,5 @@ -import { exportWorkflow } from "../workflow-export" import { createMedusaContainer } from "@medusajs/utils" +import { exportWorkflow } from "../workflow-export" jest.mock("@medusajs/orchestration", () => { return { @@ -46,6 +46,17 @@ jest.mock("@medusajs/orchestration", () => { }), } }), + cancel: jest.fn(() => { + return { + getErrors: jest.fn(), + getState: jest.fn(() => "reverted"), + getContext: jest.fn(() => { + return { + invoke: { result_step: "invoke_test" }, + } + }), + } + }), } }), } diff --git a/packages/workflows-sdk/src/helper/workflow-export.ts b/packages/workflows-sdk/src/helper/workflow-export.ts index 5a104a2827..cfd1f54679 100644 --- a/packages/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/workflows-sdk/src/helper/workflow-export.ts @@ -41,6 +41,14 @@ export type FlowRegisterStepFailureOptions = { events?: DistributedTransactionEvents } +export type FlowCancelOptions = { + transaction?: DistributedTransaction + transactionId?: string + context?: Context + throwOnError?: boolean + events?: DistributedTransactionEvents +} + export type WorkflowResult = { errors: TransactionStepError[] transaction: DistributedTransaction @@ -80,6 +88,7 @@ export type ExportedWorkflow< TResultOverride extends undefined ? TResult : TResultOverride > > + cancel: (args?: FlowCancelOptions) => Promise } export type MainExportedWorkflow = { @@ -88,12 +97,12 @@ export type MainExportedWorkflow = { container?: LoadedModule[] | MedusaContainer ): Omit< LocalWorkflow, - "run" | "registerStepSuccess" | "registerStepFailure" + "run" | "registerStepSuccess" | "registerStepFailure" | "cancel" > & ExportedWorkflow /** - * You can also directly call run, registerStepSuccess and registerStepFailure on the exported workflow + * You can also directly call run, registerStepSuccess, registerStepFailure and cancel on the exported workflow */ run( @@ -131,6 +140,12 @@ export type MainExportedWorkflow = { TResultOverride extends undefined ? TResult : TResultOverride > > + + cancel( + args?: FlowCancelOptions & { + container?: LoadedModule[] | MedusaContainer + } + ): Promise } function createContextualWorkflowRunner< @@ -152,7 +167,10 @@ function createContextualWorkflowRunner< wrappedInput?: boolean } container?: LoadedModule[] | MedusaContainer -}): Omit & +}): Omit< + LocalWorkflow, + "run" | "registerStepSuccess" | "registerStepFailure" | "cancel" +> & ExportedWorkflow { if (!container) { container = MedusaModule.getLoadedModules().map( @@ -165,10 +183,11 @@ function createContextualWorkflowRunner< const originalRun = flow.run.bind(flow) const originalRegisterStepSuccess = flow.registerStepSuccess.bind(flow) const originalRegisterStepFailure = flow.registerStepFailure.bind(flow) + const originalCancel = flow.cancel.bind(flow) const originalExecution = async ( method, - { throwOnError, resultFrom }, + { throwOnError, resultFrom, isCancel = false }, ...args ) => { const transaction = await method.apply(method, args) @@ -176,7 +195,14 @@ function createContextualWorkflowRunner< const errors = transaction.getErrors(TransactionHandlerType.INVOKE) const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED] - if (failedStatus.includes(transaction.getState()) && throwOnError) { + const isCancelled = + isCancel && transaction.getState() === TransactionState.REVERTED + + if ( + !isCancelled && + failedStatus.includes(transaction.getState()) && + throwOnError + ) { const errorMessage = errors ?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`) ?.join(`${EOL}`) @@ -316,6 +342,39 @@ function createContextualWorkflowRunner< } flow.registerStepFailure = newRegisterStepFailure as any + const newCancel = async ( + { + transaction, + transactionId, + context: outerContext, + throwOnError, + events, + }: FlowCancelOptions = { + throwOnError: true, + } + ) => { + throwOnError ??= true + + const context = { + ...outerContext, + transactionId, + __type: MedusaContextType, + } + + return await originalExecution( + originalCancel, + { + throwOnError, + resultFrom: undefined, + isCancel: true, + }, + transaction ?? transactionId, + context, + events + ) + } + flow.cancel = newCancel as any + return flow as unknown as LocalWorkflow & ExportedWorkflow } @@ -335,7 +394,7 @@ export const exportWorkflow = ( container?: LoadedModule[] | MedusaContainer ): Omit< LocalWorkflow, - "run" | "registerStepSuccess" | "registerStepFailure" + "run" | "registerStepSuccess" | "registerStepFailure" | "cancel" > & ExportedWorkflow { return createContextualWorkflowRunner< @@ -353,11 +412,15 @@ export const exportWorkflow = ( } const buildRunnerFn = < - TAction extends "run" | "registerStepSuccess" | "registerStepFailure", + TAction extends + | "run" + | "registerStepSuccess" + | "registerStepFailure" + | "cancel", TDataOverride, TResultOverride >( - action: "run" | "registerStepSuccess" | "registerStepFailure", + action: "run" | "registerStepSuccess" | "registerStepFailure" | "cancel", container?: LoadedModule[] | MedusaContainer ) => { const contextualRunner = createContextualWorkflowRunner< @@ -467,6 +530,21 @@ export const exportWorkflow = ( )(inputArgs) } + exportedWorkflow.cancel = async ( + args?: FlowCancelOptions & { + container?: LoadedModule[] | MedusaContainer + } + ): Promise => { + const container = args?.container + delete args?.container + const inputArgs = { ...args } as FlowCancelOptions + + return await buildRunnerFn<"cancel", unknown, unknown>( + "cancel", + container + )(inputArgs) + } + MedusaWorkflow.registerWorkflow(workflowId, exportedWorkflow) return exportedWorkflow } diff --git a/packages/workflows-sdk/src/utils/composer/create-step.ts b/packages/workflows-sdk/src/utils/composer/create-step.ts index 7b3f0bde01..83c078f8c3 100644 --- a/packages/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/workflows-sdk/src/utils/composer/create-step.ts @@ -2,7 +2,7 @@ import { TransactionStepsDefinition, WorkflowManager, } from "@medusajs/orchestration" -import { OrchestrationUtils, isString } from "@medusajs/utils" +import { OrchestrationUtils, deepCopy, isString } from "@medusajs/utils" import { ulid } from "ulid" import { StepResponse, resolveValue } from "./helpers" import { proxify } from "./helpers/proxy" @@ -168,8 +168,8 @@ function applyStep< stepOutput?.__type === OrchestrationUtils.SymbolWorkflowStepResponse ? stepOutput.compensateInput && - JSON.parse(JSON.stringify(stepOutput.compensateInput)) - : stepOutput && JSON.parse(JSON.stringify(stepOutput)) + deepCopy(stepOutput.compensateInput) + : stepOutput && deepCopy(stepOutput) const args = [invokeResult, executionContext] const output = await compensateFn.apply(this, args)