From 117fc25aeaae3e4c8708b03c79c37e876c603cd6 Mon Sep 17 00:00:00 2001 From: Harminder Virk Date: Fri, 23 May 2025 18:22:18 +0530 Subject: [PATCH] feat: run workflow hooks inside a when/then block (#11963) * feat: run workflow hooks inside a when/then block * fix conditionals and add test --------- Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Co-authored-by: Carlos R. L. Rodrigues --- .../src/utils/composer/__tests__/compose.ts | 81 +++++++++++++++++++ .../src/utils/composer/create-hook.ts | 26 ++++-- .../src/utils/composer/create-step.ts | 12 ++- .../src/utils/composer/create-workflow.ts | 1 + .../workflows-sdk/src/utils/composer/type.ts | 7 ++ .../workflows-sdk/src/utils/composer/when.ts | 37 ++++++--- 6 files changed, 142 insertions(+), 22 deletions(-) diff --git a/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts b/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts index 21a5f3ce75..d82ed7e6d6 100644 --- a/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts +++ b/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts @@ -809,6 +809,87 @@ describe("Workflow composer", function () { expect(stepResults).toEqual(["step3", "step1"]) }) + it("should compose a new workflow with conditional steps", async () => { + const stepResults: string[] = [] + + let hookCalled = jest.fn() + let timesExecuted = 0 + const mockStep1Fn = jest.fn().mockImplementation(async () => { + timesExecuted += 1 + stepResults.push("step1") + return new StepResponse(true) + }) as any + const mockStep2Fn = jest.fn().mockImplementation(() => { + stepResults.push("step2") + return new StepResponse(true) + }) as any + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + + const workflow = createWorkflow( + "workflow1", + function (input: { timesExecuted: number }) { + const ret = when("cond", input, ({ timesExecuted }) => { + return timesExecuted < 2 + }).then(() => { + createHook("validate", { + executed: input.timesExecuted, + }) + const ret1 = step1() + const ret2 = step2() + const parallelized = parallelize(ret1, ret2) + + return [ret1, ret2, parallelized] + }) + + return new WorkflowResponse(ret) + } + ) + + ;(workflow.hooks as any).validate((input) => { + hookCalled(input) + }) + + const { result: workflowResult } = await workflow().run({ + input: { + timesExecuted, + }, + }) + + const { result: workflowResultSecondTime } = await workflow().run({ + input: { + timesExecuted, + }, + }) + + const { result: workflowResultThirdTime } = await workflow().run({ + input: { + timesExecuted, + }, + }) + + const { result: workflowResultFourthTime } = await workflow().run({ + input: { + timesExecuted, + }, + }) + + expect(hookCalled).toHaveBeenCalledTimes(2) + expect(mockStep1Fn).toHaveBeenCalledTimes(2) + expect(mockStep2Fn).toHaveBeenCalledTimes(2) + + expect(workflowResult).toEqual([true, true, [true, true]]) + + expect(workflowResultSecondTime).toEqual([true, true, [true, true]]) + + expect(workflowResultThirdTime).toEqual(undefined) + + expect(workflowResultFourthTime).toEqual(undefined) + + expect(stepResults).toEqual(["step1", "step2", "step1", "step2"]) + }) + it("should compose a new workflow with parallelize steps and rollback them all in case of error", async () => { const step1CompensationFn = jest.fn().mockImplementation(() => { return "step1 compensation" diff --git a/packages/core/workflows-sdk/src/utils/composer/create-hook.ts b/packages/core/workflows-sdk/src/utils/composer/create-hook.ts index 32a3c321a8..93e23f38fa 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-hook.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-hook.ts @@ -1,9 +1,14 @@ -import { type ZodSchema } from "zod" import { OrchestrationUtils } from "@medusajs/utils" -import type { CreateWorkflowComposerContext } from "./type" -import { CompensateFn, createStep, InvokeFn } from "./create-step" -import { createStepHandler } from "./helpers/create-step-handler" +import { type ZodSchema } from "zod" +import { + CompensateFn, + createStep, + InvokeFn, + wrapConditionalStep, +} from "./create-step" import { StepResponse } from "./helpers" +import { createStepHandler } from "./helpers/create-step-handler" +import type { CreateWorkflowComposerContext } from "./type" const NOOP_RESULT = Symbol.for("NOOP") @@ -37,7 +42,7 @@ export type Hook = { * Learn more in [this documentation](https://docs.medusajs.com/learn/fundamentals/workflows/workflow-hooks). * * @param name - The hook's name. This is used when the hook handler is registered to consume the workflow. - * @param input - The input to pass to the hook handler. + * @param hookInput - The input to pass to the hook handler. * @returns A workflow hook. * * @example @@ -66,7 +71,7 @@ export type Hook = { */ export function createHook( name: Name, - input: TInvokeInput, + hookInput: TInvokeInput, options: { resultValidator?: ZodSchema } = {} @@ -102,14 +107,14 @@ export function createHook( name, (_: TInvokeInput) => new StepResponse(NOOP_RESULT), () => void 0 - )(input) + )(hookInput) function hook< TInvokeResultCompensateInput >(this: CreateWorkflowComposerContext, invokeFn: InvokeFn, compensateFn?: CompensateFn) { const handlers = createStepHandler.bind(this)({ stepName: name, - input, + input: hookInput, invokeFn, compensateFn, }) @@ -120,6 +125,11 @@ export function createHook( ) } + const conditional = this.stepConditions_[name] + if (conditional) { + wrapConditionalStep(conditional.input, conditional.condition, handlers) + } + this.hooks_.registered.push(name) this.handlers.set(name, handlers) } diff --git a/packages/core/workflows-sdk/src/utils/composer/create-step.ts b/packages/core/workflows-sdk/src/utils/composer/create-step.ts index 9e7a582e04..be00f6fc40 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-step.ts @@ -189,6 +189,10 @@ export function applyStep< this.flow.replaceAction(stepConfig.uuid!, newStepName, newConfig) this.isAsync ||= !!(newConfig.async || newConfig.compensateAsync) + const stepCondition = this.stepConditions_[stepName] + delete this.stepConditions_[stepName] + this.stepConditions_[newStepName] = stepCondition + ret.__step__ = newStepName WorkflowManager.update(this.workflowId, this.flow, this.handlers) @@ -213,6 +217,11 @@ export function applyStep< throw new Error("Condition must be a function") } + this.stepConditions_[ret.__step__] = { + condition, + input, + } + wrapConditionalStep(input, condition, handler) this.handlers.set(ret.__step__, handler) @@ -293,7 +302,7 @@ function wrapAsyncHandler( * @param condition * @param handle */ -function wrapConditionalStep( +export function wrapConditionalStep( input: any, condition: (...args: any) => boolean | WorkflowData, handle: { @@ -304,6 +313,7 @@ function wrapConditionalStep( const originalInvoke = handle.invoke handle.invoke = async (stepArguments: WorkflowStepHandlerArguments) => { const args = await resolveValue(input, stepArguments) + const canContinue = await condition(args, stepArguments) if (stepArguments.step.definition?.async) { diff --git a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts index 097e251b9b..c4138a4893 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts @@ -122,6 +122,7 @@ export function createWorkflow( registered: [], }, hooksCallback_: {}, + stepConditions_: {}, hookBinder: (name, fn) => { context.hooks_.declared.push(name) context.hooksCallback_[name] = fn.bind(context)() diff --git a/packages/core/workflows-sdk/src/utils/composer/type.ts b/packages/core/workflows-sdk/src/utils/composer/type.ts index 843418ac74..38ffbd97f3 100644 --- a/packages/core/workflows-sdk/src/utils/composer/type.ts +++ b/packages/core/workflows-sdk/src/utils/composer/type.ts @@ -117,6 +117,13 @@ export type CreateWorkflowComposerContext = { fn: StepFunctionResult ) => WorkflowData hookBinder: (name: string, fn: () => HookHandler) => void + stepConditions_: Record< + string, + { + condition: (...args: any[]) => boolean | WorkflowData + input: any + } + > parallelizeBinder: < TOutput extends (WorkflowData | undefined)[] = WorkflowData[] >( diff --git a/packages/core/workflows-sdk/src/utils/composer/when.ts b/packages/core/workflows-sdk/src/utils/composer/when.ts index 1924ebc5cf..89efa0818e 100644 --- a/packages/core/workflows-sdk/src/utils/composer/when.ts +++ b/packages/core/workflows-sdk/src/utils/composer/when.ts @@ -1,4 +1,4 @@ -import { isDefined, OrchestrationUtils } from "@medusajs/utils" +import { isDefined, isObject, OrchestrationUtils } from "@medusajs/utils" import { ulid } from "ulid" import { createStep } from "./create-step" import { StepResponse } from "./helpers/step-response" @@ -20,30 +20,30 @@ type ThenFunc = any>( : ReturnType /** - * This function allows you to execute steps only if a condition is satisfied. As you can't use if conditions in + * This function allows you to execute steps only if a condition is satisfied. As you can't use if conditions in * a workflow's constructor function, use `when-then` instead. - * + * * Learn more about why you can't use if conditions and `when-then` in [this documentation](https://docs.medusajs.com/learn/fundamentals/workflows/conditions). - * + * * @param values - The data to pass to the second parameter function. * @param condition - A function that returns a boolean value, indicating whether the steps in `then` should be executed. - * + * * @example - * import { + * import { * createWorkflow, * WorkflowResponse, * when, * } from "@medusajs/framework/workflows-sdk" * // step imports... - * + * * export const workflow = createWorkflow( - * "workflow", + * "workflow", * function (input: { * is_active: boolean * }) { - * + * * const result = when( - * input, + * input, * (input) => { * return input.is_active * } @@ -51,10 +51,10 @@ type ThenFunc = any>( * const stepResult = isActiveStep() * return stepResult * }) - * + * * // executed without condition * const anotherStepResult = anotherStep(result) - * + * * return new WorkflowResponse( * anotherStepResult * ) @@ -135,7 +135,18 @@ export function when(...args) { name, ({ input }: { input: any }) => new StepResponse(input) ) - returnStep = retStep({ input: ret }) + + /** + * object ret = { result, hooks } + */ + if (isObject(ret) && "hooks" in ret && "result" in ret) { + returnStep = { + hooks: ret.hooks, + result: retStep({ input: ret.result }), + } + } else { + returnStep = retStep({ input: ret }) + } } for (const step of applyCondition) {