feat: run workflow hooks inside a when/then block (#11963)

* feat: run workflow hooks inside a when/then block

* fix conditionals and add test

---------

Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
Co-authored-by: Carlos R. L. Rodrigues <rodrigolr@gmail.com>
This commit is contained in:
Harminder Virk
2025-05-23 18:22:18 +05:30
committed by GitHub
parent 044c5dd138
commit 117fc25aea
6 changed files with 142 additions and 22 deletions

View File

@@ -809,6 +809,87 @@ describe("Workflow composer", function () {
expect(stepResults).toEqual(["step3", "step1"])
})
it("should compose a new workflow with conditional steps", async () => {
const stepResults: string[] = []
let hookCalled = jest.fn()
let timesExecuted = 0
const mockStep1Fn = jest.fn().mockImplementation(async () => {
timesExecuted += 1
stepResults.push("step1")
return new StepResponse(true)
}) as any
const mockStep2Fn = jest.fn().mockImplementation(() => {
stepResults.push("step2")
return new StepResponse(true)
}) as any
const step1 = createStep("step1", mockStep1Fn)
const step2 = createStep("step2", mockStep2Fn)
const workflow = createWorkflow(
"workflow1",
function (input: { timesExecuted: number }) {
const ret = when("cond", input, ({ timesExecuted }) => {
return timesExecuted < 2
}).then(() => {
createHook("validate", {
executed: input.timesExecuted,
})
const ret1 = step1()
const ret2 = step2()
const parallelized = parallelize(ret1, ret2)
return [ret1, ret2, parallelized]
})
return new WorkflowResponse(ret)
}
)
;(workflow.hooks as any).validate((input) => {
hookCalled(input)
})
const { result: workflowResult } = await workflow().run({
input: {
timesExecuted,
},
})
const { result: workflowResultSecondTime } = await workflow().run({
input: {
timesExecuted,
},
})
const { result: workflowResultThirdTime } = await workflow().run({
input: {
timesExecuted,
},
})
const { result: workflowResultFourthTime } = await workflow().run({
input: {
timesExecuted,
},
})
expect(hookCalled).toHaveBeenCalledTimes(2)
expect(mockStep1Fn).toHaveBeenCalledTimes(2)
expect(mockStep2Fn).toHaveBeenCalledTimes(2)
expect(workflowResult).toEqual([true, true, [true, true]])
expect(workflowResultSecondTime).toEqual([true, true, [true, true]])
expect(workflowResultThirdTime).toEqual(undefined)
expect(workflowResultFourthTime).toEqual(undefined)
expect(stepResults).toEqual(["step1", "step2", "step1", "step2"])
})
it("should compose a new workflow with parallelize steps and rollback them all in case of error", async () => {
const step1CompensationFn = jest.fn().mockImplementation(() => {
return "step1 compensation"

View File

@@ -1,9 +1,14 @@
import { type ZodSchema } from "zod"
import { OrchestrationUtils } from "@medusajs/utils"
import type { CreateWorkflowComposerContext } from "./type"
import { CompensateFn, createStep, InvokeFn } from "./create-step"
import { createStepHandler } from "./helpers/create-step-handler"
import { type ZodSchema } from "zod"
import {
CompensateFn,
createStep,
InvokeFn,
wrapConditionalStep,
} from "./create-step"
import { StepResponse } from "./helpers"
import { createStepHandler } from "./helpers/create-step-handler"
import type { CreateWorkflowComposerContext } from "./type"
const NOOP_RESULT = Symbol.for("NOOP")
@@ -37,7 +42,7 @@ export type Hook<Name extends string, Input, Output> = {
* Learn more in [this documentation](https://docs.medusajs.com/learn/fundamentals/workflows/workflow-hooks).
*
* @param name - The hook's name. This is used when the hook handler is registered to consume the workflow.
* @param input - The input to pass to the hook handler.
* @param hookInput - The input to pass to the hook handler.
* @returns A workflow hook.
*
* @example
@@ -66,7 +71,7 @@ export type Hook<Name extends string, Input, Output> = {
*/
export function createHook<Name extends string, TInvokeInput, TInvokeOutput>(
name: Name,
input: TInvokeInput,
hookInput: TInvokeInput,
options: {
resultValidator?: ZodSchema<TInvokeOutput>
} = {}
@@ -102,14 +107,14 @@ export function createHook<Name extends string, TInvokeInput, TInvokeOutput>(
name,
(_: TInvokeInput) => new StepResponse(NOOP_RESULT),
() => void 0
)(input)
)(hookInput)
function hook<
TInvokeResultCompensateInput
>(this: CreateWorkflowComposerContext, invokeFn: InvokeFn<TInvokeInput, unknown, TInvokeResultCompensateInput>, compensateFn?: CompensateFn<TInvokeResultCompensateInput>) {
const handlers = createStepHandler.bind(this)({
stepName: name,
input,
input: hookInput,
invokeFn,
compensateFn,
})
@@ -120,6 +125,11 @@ export function createHook<Name extends string, TInvokeInput, TInvokeOutput>(
)
}
const conditional = this.stepConditions_[name]
if (conditional) {
wrapConditionalStep(conditional.input, conditional.condition, handlers)
}
this.hooks_.registered.push(name)
this.handlers.set(name, handlers)
}

View File

@@ -189,6 +189,10 @@ export function applyStep<
this.flow.replaceAction(stepConfig.uuid!, newStepName, newConfig)
this.isAsync ||= !!(newConfig.async || newConfig.compensateAsync)
const stepCondition = this.stepConditions_[stepName]
delete this.stepConditions_[stepName]
this.stepConditions_[newStepName] = stepCondition
ret.__step__ = newStepName
WorkflowManager.update(this.workflowId, this.flow, this.handlers)
@@ -213,6 +217,11 @@ export function applyStep<
throw new Error("Condition must be a function")
}
this.stepConditions_[ret.__step__] = {
condition,
input,
}
wrapConditionalStep(input, condition, handler)
this.handlers.set(ret.__step__, handler)
@@ -293,7 +302,7 @@ function wrapAsyncHandler(
* @param condition
* @param handle
*/
function wrapConditionalStep(
export function wrapConditionalStep(
input: any,
condition: (...args: any) => boolean | WorkflowData,
handle: {
@@ -304,6 +313,7 @@ function wrapConditionalStep(
const originalInvoke = handle.invoke
handle.invoke = async (stepArguments: WorkflowStepHandlerArguments) => {
const args = await resolveValue(input, stepArguments)
const canContinue = await condition(args, stepArguments)
if (stepArguments.step.definition?.async) {

View File

@@ -122,6 +122,7 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
registered: [],
},
hooksCallback_: {},
stepConditions_: {},
hookBinder: (name, fn) => {
context.hooks_.declared.push(name)
context.hooksCallback_[name] = fn.bind(context)()

View File

@@ -117,6 +117,13 @@ export type CreateWorkflowComposerContext = {
fn: StepFunctionResult
) => WorkflowData<TOutput>
hookBinder: (name: string, fn: () => HookHandler) => void
stepConditions_: Record<
string,
{
condition: (...args: any[]) => boolean | WorkflowData
input: any
}
>
parallelizeBinder: <
TOutput extends (WorkflowData | undefined)[] = WorkflowData[]
>(

View File

@@ -1,4 +1,4 @@
import { isDefined, OrchestrationUtils } from "@medusajs/utils"
import { isDefined, isObject, OrchestrationUtils } from "@medusajs/utils"
import { ulid } from "ulid"
import { createStep } from "./create-step"
import { StepResponse } from "./helpers/step-response"
@@ -20,30 +20,30 @@ type ThenFunc = <ThenResolver extends () => any>(
: ReturnType<ThenResolver>
/**
* This function allows you to execute steps only if a condition is satisfied. As you can't use if conditions in
* This function allows you to execute steps only if a condition is satisfied. As you can't use if conditions in
* a workflow's constructor function, use `when-then` instead.
*
*
* Learn more about why you can't use if conditions and `when-then` in [this documentation](https://docs.medusajs.com/learn/fundamentals/workflows/conditions).
*
*
* @param values - The data to pass to the second parameter function.
* @param condition - A function that returns a boolean value, indicating whether the steps in `then` should be executed.
*
*
* @example
* import {
* import {
* createWorkflow,
* WorkflowResponse,
* when,
* } from "@medusajs/framework/workflows-sdk"
* // step imports...
*
*
* export const workflow = createWorkflow(
* "workflow",
* "workflow",
* function (input: {
* is_active: boolean
* }) {
*
*
* const result = when(
* input,
* input,
* (input) => {
* return input.is_active
* }
@@ -51,10 +51,10 @@ type ThenFunc = <ThenResolver extends () => any>(
* const stepResult = isActiveStep()
* return stepResult
* })
*
*
* // executed without condition
* const anotherStepResult = anotherStep(result)
*
*
* return new WorkflowResponse(
* anotherStepResult
* )
@@ -135,7 +135,18 @@ export function when(...args) {
name,
({ input }: { input: any }) => new StepResponse(input)
)
returnStep = retStep({ input: ret })
/**
* object ret = { result, hooks }
*/
if (isObject(ret) && "hooks" in ret && "result" in ret) {
returnStep = {
hooks: ret.hooks,
result: retStep({ input: ret.result }),
}
} else {
returnStep = retStep({ input: ret })
}
}
for (const step of applyCondition) {