diff --git a/.changeset/rotten-seals-happen.md b/.changeset/rotten-seals-happen.md new file mode 100644 index 0000000000..72facbfdcc --- /dev/null +++ b/.changeset/rotten-seals-happen.md @@ -0,0 +1,5 @@ +--- +"@medusajs/workflows-sdk": patch +--- + +feat: add support for accessing step results via context diff --git a/packages/core/workflows-sdk/package.json b/packages/core/workflows-sdk/package.json index 9104319465..d5ca3bf033 100644 --- a/packages/core/workflows-sdk/package.json +++ b/packages/core/workflows-sdk/package.json @@ -34,10 +34,12 @@ "@swc/core": "^1.7.28", "@swc/jest": "^0.2.36", "awilix": "^8.0.1", + "expect-type": "^0.20.0", "jest": "^29.7.0", "pg": "^8.13.0", "rimraf": "^5.0.1", - "typescript": "^5.6.2" + "typescript": "^5.6.2", + "zod": "3.22.4" }, "dependencies": { "@medusajs/modules-sdk": "2.6.1", @@ -53,7 +55,8 @@ "@mikro-orm/postgresql": "6.4.3", "awilix": "^8.0.1", "express": "^4.21.0", - "pg": "^8.13.0" + "pg": "^8.13.0", + "zod": "3.22.4" }, "scripts": { "build": "rimraf dist && tsc --build", diff --git a/packages/core/workflows-sdk/src/utils/_playground.ts b/packages/core/workflows-sdk/src/utils/_playground.ts index eccd5d9a2e..fdca4027ce 100644 --- a/packages/core/workflows-sdk/src/utils/_playground.ts +++ b/packages/core/workflows-sdk/src/utils/_playground.ts @@ -1,3 +1,4 @@ +import { z } from "zod" import { createStep, createWorkflow, @@ -16,7 +17,7 @@ const step2 = createStep("step2", async (input: Step2Input, context) => { return new StepResponse({ step2: input }) }) -const step3 = createStep("step3", async () => { +const step3 = createStep("step3", async function (_, context) { return new StepResponse({ step3: "step3" }) }) @@ -25,15 +26,28 @@ const workflow = createWorkflow( function (input: WorkflowData<{ outsideWorkflowData: string }>) { step1() step2({ filters: { id: [] } }) - const somethingHook = createHook("something", { id: "1" }) - step3() - return new WorkflowResponse({ id: 1 }, { hooks: [somethingHook] }) + + let somethingHook = createHook( + "something", + { id: "1" }, + { + resultValidator: z.object({ + id: z.number(), + }), + } + ) + + return new WorkflowResponse( + { r: somethingHook.getResult(), step3: step3() }, + { hooks: [somethingHook] } + ) } ) workflow.hooks.something((input, context) => { console.log("input>", input) console.log("context>", context) + return new StepResponse({ id: 2, foo: "bar" }) }) workflow.run().then((res) => { diff --git a/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts b/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts index 4720f818d6..9bb8cb816c 100644 --- a/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts +++ b/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts @@ -1,3 +1,5 @@ +import z from "zod" +import { expectTypeOf } from "expect-type" import { TransactionState } from "@medusajs/utils" import { createStep } from "../create-step" import { createWorkflow } from "../create-workflow" @@ -6,6 +8,7 @@ import { WorkflowResponse } from "../helpers/workflow-response" import { transform } from "../transform" import { WorkflowData } from "../type" import { when } from "../when" +import { createHook } from "../create-hook" let count = 1 const getNewWorkflowId = () => `workflow-${count++}` @@ -406,4 +409,312 @@ describe("Workflow composer", () => { }), ]) }) + + it("should allow reading results for a given step", async function () { + const step1 = createStep("step1", async (_, context) => { + return new StepResponse({ result: "step1" }) + }) + const step2 = createStep("step2", async (input: string, context) => { + return new StepResponse({ result: input }) + }) + const step3 = createStep("step3", async (input: string, context) => { + return new StepResponse({ + input, + step2: context[" getStepResult"]("step2"), + step1: context[" getStepResult"]("step1"), + invalid: context[" getStepResult"]("invalid"), + }) + }) + + const workflow = createWorkflow(getNewWorkflowId(), function () { + step1() + step2("step2") + return new WorkflowResponse(step3("step-3")) + }) + + const { result } = await workflow.run({ input: {} }) + expect(result).toEqual({ + input: "step-3", + step1: { + result: "step1", + }, + step2: { + result: "step2", + }, + }) + }) + + it("should allow reading results of a hook", async function () { + const step1 = createStep("step1", async (_, context) => { + return new StepResponse({ result: "step1" }) + }) + + const workflow = createWorkflow( + getNewWorkflowId(), + function (input: { id: number }) { + const step1Result = step1() + const mutateInputHook = createHook("mutateInputHook", { + input, + step1Result, + }) + + return new WorkflowResponse( + { + input, + step1Result, + hookResult: mutateInputHook.getResult(), + }, + { + hooks: [mutateInputHook], + } + ) + } + ) + + workflow.hooks.mutateInputHook((data) => { + return new StepResponse({ + input: { + id: data.input.id + 1, + }, + step1Result: { + result: `mutated-${data.step1Result.result}`, + }, + }) + }) + + const { result } = await workflow.run({ input: { id: 1 } }) + expect(result).toEqual({ + input: { id: 1 }, + step1Result: { result: "step1" }, + hookResult: { + input: { + id: 2, + }, + step1Result: { result: "mutated-step1" }, + }, + }) + }) + + it("should allow specifying a validation schema for the hook response", async function () { + const step1 = createStep("step1", async (_, context) => { + return new StepResponse({ result: "step1" }) + }) + + const workflow = createWorkflow( + getNewWorkflowId(), + + function (input: { id: number }) { + const step1Result = step1() + const mutateInputHook = createHook( + "mutateInputHook", + { + input, + step1Result, + }, + { + resultValidator: z.object({ + id: z.number(), + }), + } + ) + + expectTypeOf(mutateInputHook.getResult).returns.toMatchTypeOf< + { id: number } | undefined + >() + + return new WorkflowResponse( + { + input, + step1Result, + hookResult: mutateInputHook.getResult(), + }, + { + hooks: [mutateInputHook], + } + ) + } + ) + + workflow.hooks.mutateInputHook((data) => { + return new StepResponse({ + id: data.input.id + 1, + }) + }) + + const { result } = await workflow.run({ input: { id: 1 } }) + expect(result).toEqual({ + input: { id: 1 }, + step1Result: { result: "step1" }, + hookResult: { + id: 2, + }, + }) + }) + + it("should validate and throw error when hook response is invalid", async function () { + const step1 = createStep("step1", async (_, context) => { + return new StepResponse({ result: "step1" }) + }) + + const workflow = createWorkflow( + getNewWorkflowId(), + + function (input: { id: number }) { + const step1Result = step1() + const mutateInputHook = createHook( + "mutateInputHook", + { + input, + step1Result, + }, + { + resultValidator: z.object({ + id: z.number(), + }), + } + ) + + expectTypeOf(mutateInputHook.getResult).returns.toMatchTypeOf< + { id: number } | undefined + >() + + return new WorkflowResponse( + { + input, + step1Result, + hookResult: mutateInputHook.getResult(), + }, + { + hooks: [mutateInputHook], + } + ) + } + ) + + workflow.hooks.mutateInputHook((data) => { + return new StepResponse({} as any) + }) + + try { + await workflow.run({ input: { id: 1 } }) + throw new Error("Expected workflow to fail") + } catch (error) { + expect(error).toHaveProperty("issues") + expect(error.issues).toEqual([ + { + code: "invalid_type", + expected: "number", + message: "Required", + path: ["id"], + received: "undefined", + }, + ]) + } + }) + + it("should not validate when no hook handler has been defined", async function () { + const step1 = createStep("step1", async () => { + return new StepResponse({ result: "step1" }) + }) + + const workflow = createWorkflow( + getNewWorkflowId(), + + function (input: { id: number }) { + const step1Result = step1() + const mutateInputHook = createHook( + "mutateInputHook", + { + input, + step1Result, + }, + { + resultValidator: z.object({ + id: z.number(), + }), + } + ) + + expectTypeOf(mutateInputHook.getResult).returns.toMatchTypeOf< + { id: number } | undefined + >() + + return new WorkflowResponse( + { + input, + step1Result, + hookResult: mutateInputHook.getResult(), + }, + { + hooks: [mutateInputHook], + } + ) + } + ) + + const { result } = await workflow.run({ input: { id: 1 } }) + expect(result).toEqual({ + input: { id: 1 }, + step1Result: { result: "step1" }, + hookResult: undefined, + }) + }) + + it("should validate when hook returns undefined", async function () { + const step1 = createStep("step1", async (_, context) => { + return new StepResponse({ result: "step1" }) + }) + + const workflow = createWorkflow( + getNewWorkflowId(), + + function (input: { id: number }) { + const step1Result = step1() + const mutateInputHook = createHook( + "mutateInputHook", + { + input, + step1Result, + }, + { + resultValidator: z.object({ + id: z.number(), + }), + } + ) + + expectTypeOf(mutateInputHook.getResult).returns.toMatchTypeOf< + { id: number } | undefined + >() + + return new WorkflowResponse( + { + input, + step1Result, + hookResult: mutateInputHook.getResult(), + }, + { + hooks: [mutateInputHook], + } + ) + } + ) + + workflow.hooks.mutateInputHook((data) => {}) + try { + await workflow.run({ input: { id: 1 } }) + throw new Error("Expected workflow to fail") + } catch (error) { + expect(error).toHaveProperty("issues") + expect(error.issues).toEqual([ + { + code: "invalid_type", + expected: "object", + message: "Required", + path: [], + received: "undefined", + }, + ]) + } + }) }) diff --git a/packages/core/workflows-sdk/src/utils/composer/create-hook.ts b/packages/core/workflows-sdk/src/utils/composer/create-hook.ts index 4447d332ab..f18fde57f8 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-hook.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-hook.ts @@ -1,20 +1,31 @@ -import { CompensateFn, createStep, InvokeFn } from "./create-step" +import { type ZodSchema } from "zod" import { OrchestrationUtils } from "@medusajs/utils" -import { CreateWorkflowComposerContext } from "./type" +import type { CreateWorkflowComposerContext } from "./type" +import { CompensateFn, createStep, InvokeFn } from "./create-step" import { createStepHandler } from "./helpers/create-step-handler" +import { StepResponse } from "./helpers" + +const NOOP_RESULT = Symbol.for("NOOP") /** * Representation of a hook definition. */ -export type Hook = { +export type Hook = { __type: typeof OrchestrationUtils.SymbolWorkflowHook name: Name + + /** + * Returns the result of the hook + */ + getResult(): Output | undefined + /** * By prefixing a key with a space, we remove it from the * intellisense of TypeScript. This is needed because * input is not set at runtime. It is a type-only * property to infer input data type of a hook */ + " output": Output " input": Input } @@ -53,14 +64,32 @@ export type Hook = { * } * ) */ -export function createHook( +export function createHook( name: Name, - input: TInvokeInput -): Hook { + input: TInvokeInput, + options: { + resultValidator?: ZodSchema + } = {} +): Hook { const context = global[ OrchestrationUtils.SymbolMedusaWorkflowComposerContext ] as CreateWorkflowComposerContext + const getHookResultStep = createStep( + `get-${name}-result`, + (_, context) => { + const result = context[" getStepResult"](name) + if (result === NOOP_RESULT) { + return new StepResponse(undefined) + } + if (options.resultValidator) { + return options.resultValidator.parse(result) + } + return result + }, + () => void 0 + ) + context.hookBinder(name, function (this: CreateWorkflowComposerContext) { /** * We start by registering a new step within the workflow. This will be a noop @@ -68,7 +97,7 @@ export function createHook( */ createStep( name, - (_: TInvokeInput) => void 0, + (_: TInvokeInput) => new StepResponse(NOOP_RESULT), () => void 0 )(input) @@ -98,5 +127,13 @@ export function createHook( return { __type: OrchestrationUtils.SymbolWorkflowHook, name, - } as Hook + getResult() { + if ("cachedResult" in this) { + return this.cachedResult + } + const result = getHookResultStep() + this["cachedResult"] = result + return result + }, + } as Hook } diff --git a/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts b/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts index d1cf29d0d5..e56356ea08 100644 --- a/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts +++ b/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts @@ -35,6 +35,12 @@ function buildStepContext({ parentStepIdempotencyKey: flowMetadata?.parentStepIdempotencyKey as string, transactionId: stepArguments.context!.transactionId, context: stepArguments.context!, + " getStepResult"( + stepId: string, + action: "invoke" | "compensate" = "invoke" + ) { + return (stepArguments[action][stepId] as any)?.output?.output + }, } return executionContext diff --git a/packages/core/workflows-sdk/src/utils/composer/type.ts b/packages/core/workflows-sdk/src/utils/composer/type.ts index 89c238841f..70af643bb2 100644 --- a/packages/core/workflows-sdk/src/utils/composer/type.ts +++ b/packages/core/workflows-sdk/src/utils/composer/type.ts @@ -27,10 +27,14 @@ export type StepFunctionReturnConfig = { type KeysOfUnion = T extends T ? keyof T : never export type HookHandler = (...args: any[]) => void | Promise -type ConvertHookToObject = THook extends Hook +type ConvertHookToObject = THook extends Hook< + infer Name, + infer Input, + infer Output +> ? { - [K in Name]: ( - invoke: InvokeFn, + [K in Name]: ( + invoke: InvokeFn, compensate?: CompensateFn ) => void } @@ -170,6 +174,14 @@ export interface StepExecutionContext { * A string indicating the ID of the current transaction. */ transactionId?: string + + /** + * Get access to the result returned by a named step. Returns undefined + * when step is not found or when nothing was returned. + * + * Adding a space hides the method from the autocomplete + */ + " getStepResult"(stepId: string, action?: "invoke" | "compensate"): any } export type WorkflowTransactionContext = StepExecutionContext & diff --git a/yarn.lock b/yarn.lock index 5123a53a03..32b003824e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6885,11 +6885,13 @@ __metadata: "@swc/core": ^1.7.28 "@swc/jest": ^0.2.36 awilix: ^8.0.1 + expect-type: ^0.20.0 jest: ^29.7.0 pg: ^8.13.0 rimraf: ^5.0.1 typescript: ^5.6.2 ulid: ^2.3.0 + zod: 3.22.4 peerDependencies: "@mikro-orm/core": 6.4.3 "@mikro-orm/knex": 6.4.3 @@ -6898,6 +6900,7 @@ __metadata: awilix: ^8.0.1 express: ^4.21.0 pg: ^8.13.0 + zod: 3.22.4 languageName: unknown linkType: soft