From 9badad24aa5da43121613c7784bb7049856e2ac3 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Date: Fri, 5 Jul 2024 05:54:18 -0300 Subject: [PATCH] feat(workflows-sdk): conditional step (#7912) * chore: move ModuleRegistrationName to utils * chore(workflows-sdk): conditional step * type * when condition --- .../core/utils/src/orchestration/symbol.ts | 3 + .../src/helper/__tests__/merge-data.spec.ts | 56 ----- .../src/helper/__tests__/pipe.spec.ts | 234 ------------------ .../core/workflows-sdk/src/helper/index.ts | 4 +- .../workflows-sdk/src/helper/merge-data.ts | 50 ---- .../core/workflows-sdk/src/helper/pipe.ts | 172 ------------- .../utils/composer/__tests__/index.spec.ts | 40 +++ .../src/utils/composer/create-step.ts | 83 ++++++- .../workflows-sdk/src/utils/composer/index.ts | 5 +- .../workflows-sdk/src/utils/composer/when.ts | 33 +++ .../integration-tests/__fixtures__/index.ts | 1 + .../__fixtures__/workflow_conditional_step.ts | 57 +++++ .../integration-tests/__tests__/index.spec.ts | 51 +++- 13 files changed, 264 insertions(+), 525 deletions(-) delete mode 100644 packages/core/workflows-sdk/src/helper/__tests__/merge-data.spec.ts delete mode 100644 packages/core/workflows-sdk/src/helper/__tests__/pipe.spec.ts delete mode 100644 packages/core/workflows-sdk/src/helper/merge-data.ts delete mode 100644 packages/core/workflows-sdk/src/helper/pipe.ts create mode 100644 packages/core/workflows-sdk/src/utils/composer/when.ts create mode 100644 packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_conditional_step.ts diff --git a/packages/core/utils/src/orchestration/symbol.ts b/packages/core/utils/src/orchestration/symbol.ts index 1d9bcf7f93..bbb589522d 100644 --- a/packages/core/utils/src/orchestration/symbol.ts +++ b/packages/core/utils/src/orchestration/symbol.ts @@ -1,6 +1,9 @@ export const SymbolMedusaWorkflowComposerContext = Symbol.for( "MedusaWorkflowComposerContext" ).toString() +export const SymbolMedusaWorkflowComposerCondition = Symbol.for( + "MedusaWorkflowComposerCondition" +).toString() export const SymbolInputReference = Symbol.for( "WorkflowInputReference" ).toString() diff --git a/packages/core/workflows-sdk/src/helper/__tests__/merge-data.spec.ts b/packages/core/workflows-sdk/src/helper/__tests__/merge-data.spec.ts deleted file mode 100644 index 2abe8233b6..0000000000 --- a/packages/core/workflows-sdk/src/helper/__tests__/merge-data.spec.ts +++ /dev/null @@ -1,56 +0,0 @@ -import { mergeData } from "../merge-data" -import { WorkflowStepMiddlewareReturn } from "../pipe" - -describe("merge", function () { - it("should merge a new object from the source into a specify target", async function () { - const source = { - stringProp: "stringProp", - anArray: ["anArray"], - input: { - test: "test", - }, - another: { - anotherTest: "anotherTest", - }, - } - - const result = (await mergeData( - ["input", "another", "stringProp", "anArray"], - "payload" - )({ data: source } as any)) as unknown as WorkflowStepMiddlewareReturn - - expect(result).toEqual({ - alias: "payload", - value: { - ...source.input, - ...source.another, - anArray: source.anArray, - stringProp: source.stringProp, - }, - }) - }) - - it("should merge a new object from the entire source into the resul object", async function () { - const source = { - stringProp: "stringProp", - anArray: ["anArray"], - input: { - test: "test", - }, - another: { - anotherTest: "anotherTest", - }, - } - - const { value: result } = (await mergeData()({ - data: source, - } as any)) as unknown as WorkflowStepMiddlewareReturn - - expect(result).toEqual({ - ...source.input, - ...source.another, - anArray: source.anArray, - stringProp: source.stringProp, - }) - }) -}) diff --git a/packages/core/workflows-sdk/src/helper/__tests__/pipe.spec.ts b/packages/core/workflows-sdk/src/helper/__tests__/pipe.spec.ts deleted file mode 100644 index 141b28f45f..0000000000 --- a/packages/core/workflows-sdk/src/helper/__tests__/pipe.spec.ts +++ /dev/null @@ -1,234 +0,0 @@ -import { pipe } from "../pipe" - -describe("Pipe", function () { - it("should evaluate the input object and append the values to the data object and return the result from the handler", async function () { - const payload = { input: "input" } - const output = { test: "test" } - const invoke = { - input: payload, - step1: { ...payload, step1Data: { test: "test" } }, - step2: { ...payload, step2Data: { test: "test" } }, - } - - const handler = jest.fn().mockImplementation(async () => output) - const input = { - inputAlias: "payload", - invoke: [ - { - from: "payload", - alias: "input", - }, - { - from: "step1", - alias: "previousDataStep1", - }, - { - from: "step2", - alias: "previousDataStep2", - }, - ], - } - - const result = await pipe(input, handler)({ invoke, payload } as any) - - expect(handler).toHaveBeenCalled() - expect(handler).toHaveBeenCalledWith( - expect.objectContaining({ - data: { - input: payload, - previousDataStep1: invoke.step1, - previousDataStep2: invoke.step2, - }, - }) - ) - - expect(result).toBeDefined() - expect(result).toEqual(output) - }) - - it("should evaluate the input object and append the values to the data object using the merge and return the result from the handler", async function () { - const payload = { input: "input" } - const output = { test: "test" } - const invoke = { - input: payload, - step1: { step1Data: { test: "test" } }, - step2: [{ test: "test" }], - } - - const handler = jest.fn().mockImplementation(async () => output) - const input = { - inputAlias: "payload", - merge: true, - invoke: [ - { - from: "payload", - }, - { - from: "step1", - }, - { - from: "step2", - alias: "step2Data", - }, - ], - } - - const result = await pipe(input, handler)({ invoke, payload } as any) - - expect(handler).toHaveBeenCalled() - expect(handler).toHaveBeenCalledWith( - expect.objectContaining({ - data: { - ...payload, - ...invoke.step1, - step2Data: invoke.step2, - }, - }) - ) - - expect(result).toBeDefined() - expect(result).toEqual(output) - }) - - it("should evaluate the input object and append the values to the data object using the merge to store on the merge alias and return the result from the handler", async function () { - const payload = { input: "input" } - const output = { test: "test" } - const invoke = { - input: payload, - step1: { step1Data: { test: "test" } }, - step2: { step2Data: { test: "test" } }, - } - - const handler = jest.fn().mockImplementation(async () => output) - const input = { - inputAlias: "payload", - mergeAlias: "mergedData", - invoke: [ - { - from: "payload", - alias: "input", - }, - { - from: "step1", - alias: "step1Data", - }, - { - from: "step2", - alias: "step2Data", - }, - ], - } - - const result = await pipe(input, handler)({ invoke, payload } as any) - - expect(handler).toHaveBeenCalled() - expect(handler).toHaveBeenCalledWith( - expect.objectContaining({ - data: { - input: payload, - step1Data: invoke.step1, - step2Data: invoke.step2, - mergedData: { - ...payload, - ...invoke.step1, - ...invoke.step2, - }, - }, - }) - ) - - expect(result).toBeDefined() - expect(result).toEqual(output) - }) - - it("should evaluate the input object and append the values to the data object using the merge to store on the merge alias from the merge from values and return the result from the handler", async function () { - const payload = { input: "input" } - const output = { test: "test" } - const invoke = { - input: payload, - step1: { step1Data: { test: "test" } }, - step2: { step2Data: { test: "test" } }, - } - - const handler = jest.fn().mockImplementation(async () => output) - const input = { - inputAlias: "payload", - mergeAlias: "mergedData", - mergeFrom: ["input", "step1Data"], - invoke: [ - { - from: "payload", - alias: "input", - }, - { - from: "step1", - alias: "step1Data", - }, - { - from: "step2", - alias: "step2Data", - }, - ], - } - - const result = await pipe(input, handler)({ invoke, payload } as any) - - expect(handler).toHaveBeenCalled() - expect(handler).toHaveBeenCalledWith( - expect.objectContaining({ - data: { - input: payload, - step1Data: invoke.step1, - step2Data: invoke.step2, - mergedData: { - ...payload, - ...invoke.step1, - }, - }, - }) - ) - - expect(result).toBeDefined() - expect(result).toEqual(output) - }) - - it("should execute onComplete function if available but the output result shouldn't change", async function () { - const payload = { input: "input" } - const output = { test: "test" } - const invoke = { - input: payload, - } - - const onComplete = jest.fn(async ({ data }) => { - data.__changed = true - - return - }) - - const handler = jest.fn().mockImplementation(async () => output) - const input = { - inputAlias: "payload", - invoke: [ - { - from: "payload", - alias: "input", - }, - ], - onComplete, - } - - const result = await pipe(input, handler)({ invoke, payload } as any) - - expect(handler).toHaveBeenCalled() - expect(handler).toHaveBeenCalledWith( - expect.objectContaining({ - data: { - input: payload, - }, - }) - ) - - expect(onComplete).toHaveBeenCalled() - expect(result).toEqual(output) - }) -}) diff --git a/packages/core/workflows-sdk/src/helper/index.ts b/packages/core/workflows-sdk/src/helper/index.ts index 3b5fff9e09..b681bfa6d0 100644 --- a/packages/core/workflows-sdk/src/helper/index.ts +++ b/packages/core/workflows-sdk/src/helper/index.ts @@ -1,4 +1,2 @@ -export * from "./merge-data" -export * from "./pipe" -export * from "./workflow-export" export * from "./type" +export * from "./workflow-export" diff --git a/packages/core/workflows-sdk/src/helper/merge-data.ts b/packages/core/workflows-sdk/src/helper/merge-data.ts deleted file mode 100644 index 6384bb582e..0000000000 --- a/packages/core/workflows-sdk/src/helper/merge-data.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { PipelineHandler, WorkflowArguments } from "./pipe" -import { isObject } from "@medusajs/utils" - -/** - * Pipe utils that merges data from an object into a new object. - * The new object will have a target key with the merged data from the keys if specified. - * @deprecated - * @param keys - * @param target - */ -export function mergeData< - T extends Record = Record, - TKeys extends keyof T = keyof T, - Target extends "payload" | string = string ->(keys: TKeys[] = [], target?: Target): PipelineHandler { - return async function ({ data }: WorkflowArguments) { - const workingKeys = (keys.length ? keys : Object.keys(data)) as TKeys[] - const value = workingKeys.reduce((acc, key) => { - let targetAcc = { ...(target ? acc[target as string] : acc) } - targetAcc ??= {} - - if (Array.isArray(data[key as string])) { - targetAcc[key as string] = data[key as string] - } else if (isObject(data[key as string])) { - targetAcc = { - ...targetAcc, - ...(data[key as string] as object), - } - } else { - targetAcc[key as string] = data[key as string] - } - - if (target) { - acc[target as string] = { - ...acc[target as string], - ...targetAcc, - } - } else { - acc = targetAcc - } - - return acc - }, {}) - - return { - alias: target, - value: target ? value[target as string] : value, - } - } -} diff --git a/packages/core/workflows-sdk/src/helper/pipe.ts b/packages/core/workflows-sdk/src/helper/pipe.ts deleted file mode 100644 index c92cba23dc..0000000000 --- a/packages/core/workflows-sdk/src/helper/pipe.ts +++ /dev/null @@ -1,172 +0,0 @@ -import { - DistributedTransaction, - TransactionMetadata, - WorkflowStepHandler, -} from "@medusajs/orchestration" -import { Context, MedusaContainer, SharedContext } from "@medusajs/types" -import { mergeData } from "./merge-data" - -export type WorkflowStepMiddlewareReturn = { - alias?: string - value: any -} - -export type WorkflowStepMiddlewareInput = { - from: string - alias?: string -} - -interface PipelineInput { - /** - * The alias of the input data to store in - */ - inputAlias?: string - /** - * Descriptors to get the data from - */ - invoke?: WorkflowStepMiddlewareInput | WorkflowStepMiddlewareInput[] - compensate?: WorkflowStepMiddlewareInput | WorkflowStepMiddlewareInput[] - onComplete?: (args: WorkflowOnCompleteArguments) => Promise - /** - * Apply the data merging - */ - merge?: boolean - /** - * Store the merged data in a new key, if this is present no need to set merge: true - */ - mergeAlias?: string - /** - * Store the merged data from the chosen aliases, if this is present no need to set merge: true - */ - mergeFrom?: string[] -} - -export type WorkflowArguments = { - container: MedusaContainer - payload: unknown - data: T - metadata: TransactionMetadata - context: Context | SharedContext -} - -export type WorkflowOnCompleteArguments = { - container: MedusaContainer - payload: unknown - data: T - metadata: TransactionMetadata - transaction: DistributedTransaction - context: Context | SharedContext -} - -export type PipelineHandler = ( - args: WorkflowArguments -) => Promise< - T extends undefined - ? WorkflowStepMiddlewareReturn | WorkflowStepMiddlewareReturn[] - : T -> - -/** - * @deprecated - * @param input - * @param functions - */ -export function pipe( - input: PipelineInput, - ...functions: [...PipelineHandler[], PipelineHandler] -): WorkflowStepHandler { - // Apply the aggregator just before the last handler - if ( - (input.merge || input.mergeAlias || input.mergeFrom) && - functions.length - ) { - const handler = functions.pop()! - functions.push(mergeData(input.mergeFrom, input.mergeAlias), handler) - } - - return async ({ - container, - payload, - invoke, - compensate, - metadata, - transaction, - context, - }) => { - let data = {} - - const original = { - invoke: invoke ?? {}, - compensate: compensate ?? {}, - } - - if (input.inputAlias) { - Object.assign(original.invoke, { [input.inputAlias]: payload }) - } - - const dataKeys = ["invoke", "compensate"] - for (const key of dataKeys) { - if (!input[key]) { - continue - } - - if (!Array.isArray(input[key])) { - input[key] = [input[key]] - } - - for (const action of input[key]) { - if (action.alias) { - data[action.alias] = original[key][action.from] - } else { - data[action.from] = original[key][action.from] - } - } - } - - let finalResult - for (const fn of functions) { - let result = await fn({ - container, - payload, - data, - metadata, - context: context as Context, - }) - - if (Array.isArray(result)) { - for (const action of result) { - if (action?.alias) { - data[action.alias] = action.value - } - } - } else if ( - result && - "alias" in (result as WorkflowStepMiddlewareReturn) - ) { - if ((result as WorkflowStepMiddlewareReturn).alias) { - data[(result as WorkflowStepMiddlewareReturn).alias!] = ( - result as WorkflowStepMiddlewareReturn - ).value - } else { - data = (result as WorkflowStepMiddlewareReturn).value - } - } - - finalResult = result - } - - if (typeof input.onComplete === "function") { - const dataCopy = JSON.parse(JSON.stringify(data)) - await input.onComplete({ - container, - payload, - data: dataCopy, - metadata, - transaction, - context: context as Context, - }) - } - - return finalResult - } -} diff --git a/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts b/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts index 55f7b2a8fc..805fde32d9 100644 --- a/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts +++ b/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts @@ -3,6 +3,7 @@ import { createWorkflow } from "../create-workflow" import { StepResponse } from "../helpers" import { transform } from "../transform" import { WorkflowData } from "../type" +import { when } from "../when" let count = 1 const getNewWorkflowId = () => `workflow-${count++}` @@ -40,6 +41,45 @@ describe("Workflow composer", () => { expect(result).toEqual({ result: "hi from outside" }) }) + it("should skip step if condition is true", async function () { + const step1 = createStep("step1", async (_, context) => { + return new StepResponse({ result: "step1" }) + }) + const step2 = createStep("step2", async (input: string, context) => { + return new StepResponse({ result: input }) + }) + const step3 = createStep("step3", async (input: string, context) => { + return new StepResponse({ result: input ?? "default response" }) + }) + + const subWorkflow = createWorkflow( + getNewWorkflowId(), + function (input: WorkflowData) { + step1() + return step2(input) + } + ) + + const workflow = createWorkflow( + getNewWorkflowId(), + function (input: { callSubFlow: boolean }) { + const subWorkflowRes = when({ input }, ({ input }) => { + return input.callSubFlow + }).then(() => { + return subWorkflow.runAsStep({ + input: "hi from outside", + }) + }) + + return step3(subWorkflowRes.result) + } + ) + + const { result } = await workflow.run({ input: { callSubFlow: false } }) + + expect(result).toEqual({ result: "default response" }) + }) + it("should revert the workflow and sub workflow on failure", async function () { const step1Mock = jest.fn() const step1 = createStep( diff --git a/packages/core/workflows-sdk/src/utils/composer/create-step.ts b/packages/core/workflows-sdk/src/utils/composer/create-step.ts index 2e602579f4..94ddb06d61 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-step.ts @@ -4,9 +4,9 @@ import { WorkflowStepHandler, WorkflowStepHandlerArguments, } from "@medusajs/orchestration" -import { deepCopy, isString, OrchestrationUtils } from "@medusajs/utils" +import { OrchestrationUtils, deepCopy, isString } from "@medusajs/utils" import { ulid } from "ulid" -import { resolveValue, StepResponse } from "./helpers" +import { StepResponse, resolveValue } from "./helpers" import { proxify } from "./helpers/proxy" import { CreateWorkflowComposerContext, @@ -214,24 +214,60 @@ function applyStep< > ) => { const newStepName = localConfig.name ?? stepName + const newConfig = { + ...stepConfig, + ...localConfig, + } delete localConfig.name this.handlers.set(newStepName, handler) - this.flow.replaceAction(stepConfig.uuid!, newStepName, { - ...stepConfig, - ...localConfig, - }) + this.flow.replaceAction(stepConfig.uuid!, newStepName, newConfig) ret.__step__ = newStepName WorkflowManager.update(this.workflowId, this.flow, this.handlers) + const confRef = proxify(ret) + + if (global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition]) { + const flagSteps = + global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition] + .steps + + const idx = flagSteps.findIndex((a) => a.__step__ === ret.__step__) + if (idx > -1) { + flagSteps.splice(idx, 1) + } + flagSteps.push(confRef) + } + + return confRef + }, + if: ( + input: any, + condition: (...args: any) => boolean | WorkflowData + ): WorkflowData => { + if (typeof condition !== "function") { + throw new Error("Condition must be a function") + } + + wrapConditionalStep(input, condition, handler) + this.handlers.set(ret.__step__, handler) + return proxify(ret) }, } - return proxify(ret) + const refRet = proxify(ret) as WorkflowData + + if (global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition]) { + global[ + OrchestrationUtils.SymbolMedusaWorkflowComposerCondition + ].steps.push(refRet) + } + + return refRet } } @@ -291,6 +327,39 @@ function wrapAsyncHandler( } } +/** + * @internal + * + * Internal function to handle conditional steps. + * + * @param condition + * @param handle + */ +function wrapConditionalStep( + input: any, + condition: (...args: any) => boolean | WorkflowData, + handle: { + invoke: WorkflowStepHandler + compensate?: WorkflowStepHandler + } +) { + const originalInvoke = handle.invoke + handle.invoke = async (stepArguments: WorkflowStepHandlerArguments) => { + const args = await resolveValue(input, stepArguments) + const canContinue = await condition(args) + + if (stepArguments.step.definition?.async) { + stepArguments.step.definition.backgroundExecution = true + } + + if (!canContinue) { + return + } + + return await originalInvoke(stepArguments) + } +} + /** * This function creates a {@link StepFunction} that can be used as a step in a workflow constructed by the {@link createWorkflow} function. * diff --git a/packages/core/workflows-sdk/src/utils/composer/index.ts b/packages/core/workflows-sdk/src/utils/composer/index.ts index d654c7e7f0..a3fdc29305 100644 --- a/packages/core/workflows-sdk/src/utils/composer/index.ts +++ b/packages/core/workflows-sdk/src/utils/composer/index.ts @@ -1,8 +1,9 @@ export * from "./create-step" export * from "./create-workflow" -export * from "./hook" -export * from "./parallelize" export * from "./helpers/resolve-value" export * from "./helpers/step-response" +export * from "./hook" +export * from "./parallelize" export * from "./transform" export * from "./type" +export * from "./when" diff --git a/packages/core/workflows-sdk/src/utils/composer/when.ts b/packages/core/workflows-sdk/src/utils/composer/when.ts new file mode 100644 index 0000000000..7c63a26936 --- /dev/null +++ b/packages/core/workflows-sdk/src/utils/composer/when.ts @@ -0,0 +1,33 @@ +import { OrchestrationUtils } from "@medusajs/utils" + +export function when(input, condition) { + global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition] = { + input, + condition, + steps: [], + } + + let thenCalled = false + process.nextTick(() => { + if (!thenCalled) { + throw new Error(`".then" is missing after "when" condition`) + } + }) + + return { + then: (fn) => { + thenCalled = true + const ret = fn() + + const applyCondition = + global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition].steps + + for (const step of applyCondition) { + step.if(input, condition) + } + + delete global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition] + return ret + }, + } +} diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts index ee3382b3c3..fee018fbdc 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts @@ -1,6 +1,7 @@ export * from "./workflow_1" export * from "./workflow_2" export * from "./workflow_async" +export * from "./workflow_conditional_step" export * from "./workflow_idempotent" export * from "./workflow_step_timeout" export * from "./workflow_transaction_timeout" diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_conditional_step.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_conditional_step.ts new file mode 100644 index 0000000000..7c7d231e92 --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_conditional_step.ts @@ -0,0 +1,57 @@ +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/workflows-sdk" +import { when } from "@medusajs/workflows-sdk/src/utils/composer" + +const step_1 = createStep( + "step_1", + jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) + }) +) + +export const conditionalStep2Invoke = jest.fn((input, context) => { + if (input) { + return new StepResponse({ notAsyncResponse: input.hey }) + } +}) +const step_2 = createStep("step_2", conditionalStep2Invoke) + +export const conditionalStep3Invoke = jest.fn((res) => { + return new StepResponse({ + done: { + inputFromSyncStep: res.notAsyncResponse, + }, + }) +}) +const step_3 = createStep("step_3", conditionalStep3Invoke) + +createWorkflow( + { + name: "workflow_conditional_step", + retentionTime: 1000, + }, + function (input) { + step_1(input) + + const ret = step_2({ hey: "oh" }) + + const ret2_async = when({ input, ret }, ({ input, ret }) => { + return input.runNewStepName && ret.notAsyncResponse === "oh" + }).then(() => { + return step_2({ hey: "hello" }).config({ + name: "new_step_name", + async: true, + }) + }) + + return when({ ret2_async }, ({ ret2_async }) => { + return ret2_async?.notAsyncResponse === "hello" + }).then(() => { + return step_3(ret2_async) + }) + } +) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index ba66c60beb..98736b7482 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -8,7 +8,12 @@ import { Modules, TransactionHandlerType } from "@medusajs/utils" import { moduleIntegrationTestRunner } from "medusa-test-utils" import { setTimeout as setTimeoutPromise } from "timers/promises" import "../__fixtures__" -import { workflow2Step2Invoke, workflow2Step3Invoke } from "../__fixtures__" +import { + conditionalStep2Invoke, + conditionalStep3Invoke, + workflow2Step2Invoke, + workflow2Step3Invoke, +} from "../__fixtures__" import { eventGroupWorkflowId, workflowEventGroupIdStep1Mock, @@ -92,6 +97,10 @@ moduleIntegrationTestRunner({ }) describe("Testing basic workflow", function () { + beforeEach(() => { + jest.clearAllMocks() + }) + it("should return a list of workflow executions and remove after completed when there is no retentionTime set", async () => { await workflowOrcModule.run("workflow_1", { input: { @@ -232,6 +241,46 @@ moduleIntegrationTestRunner({ expect(onFinish).toHaveBeenCalledTimes(0) }) + + it("should run conditional steps if condition is true", (done) => { + void workflowOrcModule.subscribe({ + workflowId: "workflow_conditional_step", + subscriber: (event) => { + if (event.eventType === "onFinish") { + done() + expect(conditionalStep2Invoke).toHaveBeenCalledTimes(2) + expect(conditionalStep3Invoke).toHaveBeenCalledTimes(1) + } + }, + }) + + workflowOrcModule.run("workflow_conditional_step", { + input: { + runNewStepName: true, + }, + throwOnError: true, + }) + }) + + it("should not run conditional steps if condition is false", (done) => { + void workflowOrcModule.subscribe({ + workflowId: "workflow_conditional_step", + subscriber: (event) => { + if (event.eventType === "onFinish") { + done() + expect(conditionalStep2Invoke).toHaveBeenCalledTimes(1) + expect(conditionalStep3Invoke).toHaveBeenCalledTimes(0) + } + }, + }) + + workflowOrcModule.run("workflow_conditional_step", { + input: { + runNewStepName: false, + }, + throwOnError: true, + }) + }) }) describe("Scheduled workflows", () => {