feat: add support for accessing step results via context (#11907)
Fixes: FRMW-2934 This PR adds support for accessing the results of a hook using the `hook.getResult` method.
This commit is contained in:
5
.changeset/rotten-seals-happen.md
Normal file
5
.changeset/rotten-seals-happen.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
"@medusajs/workflows-sdk": patch
|
||||
---
|
||||
|
||||
feat: add support for accessing step results via context
|
||||
@@ -34,10 +34,12 @@
|
||||
"@swc/core": "^1.7.28",
|
||||
"@swc/jest": "^0.2.36",
|
||||
"awilix": "^8.0.1",
|
||||
"expect-type": "^0.20.0",
|
||||
"jest": "^29.7.0",
|
||||
"pg": "^8.13.0",
|
||||
"rimraf": "^5.0.1",
|
||||
"typescript": "^5.6.2"
|
||||
"typescript": "^5.6.2",
|
||||
"zod": "3.22.4"
|
||||
},
|
||||
"dependencies": {
|
||||
"@medusajs/modules-sdk": "2.6.1",
|
||||
@@ -53,7 +55,8 @@
|
||||
"@mikro-orm/postgresql": "6.4.3",
|
||||
"awilix": "^8.0.1",
|
||||
"express": "^4.21.0",
|
||||
"pg": "^8.13.0"
|
||||
"pg": "^8.13.0",
|
||||
"zod": "3.22.4"
|
||||
},
|
||||
"scripts": {
|
||||
"build": "rimraf dist && tsc --build",
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { z } from "zod"
|
||||
import {
|
||||
createStep,
|
||||
createWorkflow,
|
||||
@@ -16,7 +17,7 @@ const step2 = createStep("step2", async (input: Step2Input, context) => {
|
||||
return new StepResponse({ step2: input })
|
||||
})
|
||||
|
||||
const step3 = createStep("step3", async () => {
|
||||
const step3 = createStep("step3", async function (_, context) {
|
||||
return new StepResponse({ step3: "step3" })
|
||||
})
|
||||
|
||||
@@ -25,15 +26,28 @@ const workflow = createWorkflow(
|
||||
function (input: WorkflowData<{ outsideWorkflowData: string }>) {
|
||||
step1()
|
||||
step2({ filters: { id: [] } })
|
||||
const somethingHook = createHook("something", { id: "1" })
|
||||
step3()
|
||||
return new WorkflowResponse({ id: 1 }, { hooks: [somethingHook] })
|
||||
|
||||
let somethingHook = createHook(
|
||||
"something",
|
||||
{ id: "1" },
|
||||
{
|
||||
resultValidator: z.object({
|
||||
id: z.number(),
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
return new WorkflowResponse(
|
||||
{ r: somethingHook.getResult(), step3: step3() },
|
||||
{ hooks: [somethingHook] }
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
workflow.hooks.something((input, context) => {
|
||||
console.log("input>", input)
|
||||
console.log("context>", context)
|
||||
return new StepResponse({ id: 2, foo: "bar" })
|
||||
})
|
||||
|
||||
workflow.run().then((res) => {
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import z from "zod"
|
||||
import { expectTypeOf } from "expect-type"
|
||||
import { TransactionState } from "@medusajs/utils"
|
||||
import { createStep } from "../create-step"
|
||||
import { createWorkflow } from "../create-workflow"
|
||||
@@ -6,6 +8,7 @@ import { WorkflowResponse } from "../helpers/workflow-response"
|
||||
import { transform } from "../transform"
|
||||
import { WorkflowData } from "../type"
|
||||
import { when } from "../when"
|
||||
import { createHook } from "../create-hook"
|
||||
|
||||
let count = 1
|
||||
const getNewWorkflowId = () => `workflow-${count++}`
|
||||
@@ -406,4 +409,312 @@ describe("Workflow composer", () => {
|
||||
}),
|
||||
])
|
||||
})
|
||||
|
||||
it("should allow reading results for a given step", async function () {
|
||||
const step1 = createStep("step1", async (_, context) => {
|
||||
return new StepResponse({ result: "step1" })
|
||||
})
|
||||
const step2 = createStep("step2", async (input: string, context) => {
|
||||
return new StepResponse({ result: input })
|
||||
})
|
||||
const step3 = createStep("step3", async (input: string, context) => {
|
||||
return new StepResponse({
|
||||
input,
|
||||
step2: context[" getStepResult"]("step2"),
|
||||
step1: context[" getStepResult"]("step1"),
|
||||
invalid: context[" getStepResult"]("invalid"),
|
||||
})
|
||||
})
|
||||
|
||||
const workflow = createWorkflow(getNewWorkflowId(), function () {
|
||||
step1()
|
||||
step2("step2")
|
||||
return new WorkflowResponse(step3("step-3"))
|
||||
})
|
||||
|
||||
const { result } = await workflow.run({ input: {} })
|
||||
expect(result).toEqual({
|
||||
input: "step-3",
|
||||
step1: {
|
||||
result: "step1",
|
||||
},
|
||||
step2: {
|
||||
result: "step2",
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
it("should allow reading results of a hook", async function () {
|
||||
const step1 = createStep("step1", async (_, context) => {
|
||||
return new StepResponse({ result: "step1" })
|
||||
})
|
||||
|
||||
const workflow = createWorkflow(
|
||||
getNewWorkflowId(),
|
||||
function (input: { id: number }) {
|
||||
const step1Result = step1()
|
||||
const mutateInputHook = createHook("mutateInputHook", {
|
||||
input,
|
||||
step1Result,
|
||||
})
|
||||
|
||||
return new WorkflowResponse(
|
||||
{
|
||||
input,
|
||||
step1Result,
|
||||
hookResult: mutateInputHook.getResult(),
|
||||
},
|
||||
{
|
||||
hooks: [mutateInputHook],
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
workflow.hooks.mutateInputHook((data) => {
|
||||
return new StepResponse({
|
||||
input: {
|
||||
id: data.input.id + 1,
|
||||
},
|
||||
step1Result: {
|
||||
result: `mutated-${data.step1Result.result}`,
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
const { result } = await workflow.run({ input: { id: 1 } })
|
||||
expect(result).toEqual({
|
||||
input: { id: 1 },
|
||||
step1Result: { result: "step1" },
|
||||
hookResult: {
|
||||
input: {
|
||||
id: 2,
|
||||
},
|
||||
step1Result: { result: "mutated-step1" },
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
it("should allow specifying a validation schema for the hook response", async function () {
|
||||
const step1 = createStep("step1", async (_, context) => {
|
||||
return new StepResponse({ result: "step1" })
|
||||
})
|
||||
|
||||
const workflow = createWorkflow(
|
||||
getNewWorkflowId(),
|
||||
|
||||
function (input: { id: number }) {
|
||||
const step1Result = step1()
|
||||
const mutateInputHook = createHook(
|
||||
"mutateInputHook",
|
||||
{
|
||||
input,
|
||||
step1Result,
|
||||
},
|
||||
{
|
||||
resultValidator: z.object({
|
||||
id: z.number(),
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
expectTypeOf(mutateInputHook.getResult).returns.toMatchTypeOf<
|
||||
{ id: number } | undefined
|
||||
>()
|
||||
|
||||
return new WorkflowResponse(
|
||||
{
|
||||
input,
|
||||
step1Result,
|
||||
hookResult: mutateInputHook.getResult(),
|
||||
},
|
||||
{
|
||||
hooks: [mutateInputHook],
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
workflow.hooks.mutateInputHook((data) => {
|
||||
return new StepResponse({
|
||||
id: data.input.id + 1,
|
||||
})
|
||||
})
|
||||
|
||||
const { result } = await workflow.run({ input: { id: 1 } })
|
||||
expect(result).toEqual({
|
||||
input: { id: 1 },
|
||||
step1Result: { result: "step1" },
|
||||
hookResult: {
|
||||
id: 2,
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
it("should validate and throw error when hook response is invalid", async function () {
|
||||
const step1 = createStep("step1", async (_, context) => {
|
||||
return new StepResponse({ result: "step1" })
|
||||
})
|
||||
|
||||
const workflow = createWorkflow(
|
||||
getNewWorkflowId(),
|
||||
|
||||
function (input: { id: number }) {
|
||||
const step1Result = step1()
|
||||
const mutateInputHook = createHook(
|
||||
"mutateInputHook",
|
||||
{
|
||||
input,
|
||||
step1Result,
|
||||
},
|
||||
{
|
||||
resultValidator: z.object({
|
||||
id: z.number(),
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
expectTypeOf(mutateInputHook.getResult).returns.toMatchTypeOf<
|
||||
{ id: number } | undefined
|
||||
>()
|
||||
|
||||
return new WorkflowResponse(
|
||||
{
|
||||
input,
|
||||
step1Result,
|
||||
hookResult: mutateInputHook.getResult(),
|
||||
},
|
||||
{
|
||||
hooks: [mutateInputHook],
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
workflow.hooks.mutateInputHook((data) => {
|
||||
return new StepResponse({} as any)
|
||||
})
|
||||
|
||||
try {
|
||||
await workflow.run({ input: { id: 1 } })
|
||||
throw new Error("Expected workflow to fail")
|
||||
} catch (error) {
|
||||
expect(error).toHaveProperty("issues")
|
||||
expect(error.issues).toEqual([
|
||||
{
|
||||
code: "invalid_type",
|
||||
expected: "number",
|
||||
message: "Required",
|
||||
path: ["id"],
|
||||
received: "undefined",
|
||||
},
|
||||
])
|
||||
}
|
||||
})
|
||||
|
||||
it("should not validate when no hook handler has been defined", async function () {
|
||||
const step1 = createStep("step1", async () => {
|
||||
return new StepResponse({ result: "step1" })
|
||||
})
|
||||
|
||||
const workflow = createWorkflow(
|
||||
getNewWorkflowId(),
|
||||
|
||||
function (input: { id: number }) {
|
||||
const step1Result = step1()
|
||||
const mutateInputHook = createHook(
|
||||
"mutateInputHook",
|
||||
{
|
||||
input,
|
||||
step1Result,
|
||||
},
|
||||
{
|
||||
resultValidator: z.object({
|
||||
id: z.number(),
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
expectTypeOf(mutateInputHook.getResult).returns.toMatchTypeOf<
|
||||
{ id: number } | undefined
|
||||
>()
|
||||
|
||||
return new WorkflowResponse(
|
||||
{
|
||||
input,
|
||||
step1Result,
|
||||
hookResult: mutateInputHook.getResult(),
|
||||
},
|
||||
{
|
||||
hooks: [mutateInputHook],
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
const { result } = await workflow.run({ input: { id: 1 } })
|
||||
expect(result).toEqual({
|
||||
input: { id: 1 },
|
||||
step1Result: { result: "step1" },
|
||||
hookResult: undefined,
|
||||
})
|
||||
})
|
||||
|
||||
it("should validate when hook returns undefined", async function () {
|
||||
const step1 = createStep("step1", async (_, context) => {
|
||||
return new StepResponse({ result: "step1" })
|
||||
})
|
||||
|
||||
const workflow = createWorkflow(
|
||||
getNewWorkflowId(),
|
||||
|
||||
function (input: { id: number }) {
|
||||
const step1Result = step1()
|
||||
const mutateInputHook = createHook(
|
||||
"mutateInputHook",
|
||||
{
|
||||
input,
|
||||
step1Result,
|
||||
},
|
||||
{
|
||||
resultValidator: z.object({
|
||||
id: z.number(),
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
expectTypeOf(mutateInputHook.getResult).returns.toMatchTypeOf<
|
||||
{ id: number } | undefined
|
||||
>()
|
||||
|
||||
return new WorkflowResponse(
|
||||
{
|
||||
input,
|
||||
step1Result,
|
||||
hookResult: mutateInputHook.getResult(),
|
||||
},
|
||||
{
|
||||
hooks: [mutateInputHook],
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
workflow.hooks.mutateInputHook((data) => {})
|
||||
try {
|
||||
await workflow.run({ input: { id: 1 } })
|
||||
throw new Error("Expected workflow to fail")
|
||||
} catch (error) {
|
||||
expect(error).toHaveProperty("issues")
|
||||
expect(error.issues).toEqual([
|
||||
{
|
||||
code: "invalid_type",
|
||||
expected: "object",
|
||||
message: "Required",
|
||||
path: [],
|
||||
received: "undefined",
|
||||
},
|
||||
])
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,20 +1,31 @@
|
||||
import { CompensateFn, createStep, InvokeFn } from "./create-step"
|
||||
import { type ZodSchema } from "zod"
|
||||
import { OrchestrationUtils } from "@medusajs/utils"
|
||||
import { CreateWorkflowComposerContext } from "./type"
|
||||
import type { CreateWorkflowComposerContext } from "./type"
|
||||
import { CompensateFn, createStep, InvokeFn } from "./create-step"
|
||||
import { createStepHandler } from "./helpers/create-step-handler"
|
||||
import { StepResponse } from "./helpers"
|
||||
|
||||
const NOOP_RESULT = Symbol.for("NOOP")
|
||||
|
||||
/**
|
||||
* Representation of a hook definition.
|
||||
*/
|
||||
export type Hook<Name extends string, Input> = {
|
||||
export type Hook<Name extends string, Input, Output> = {
|
||||
__type: typeof OrchestrationUtils.SymbolWorkflowHook
|
||||
name: Name
|
||||
|
||||
/**
|
||||
* Returns the result of the hook
|
||||
*/
|
||||
getResult(): Output | undefined
|
||||
|
||||
/**
|
||||
* By prefixing a key with a space, we remove it from the
|
||||
* intellisense of TypeScript. This is needed because
|
||||
* input is not set at runtime. It is a type-only
|
||||
* property to infer input data type of a hook
|
||||
*/
|
||||
" output": Output
|
||||
" input": Input
|
||||
}
|
||||
|
||||
@@ -53,14 +64,32 @@ export type Hook<Name extends string, Input> = {
|
||||
* }
|
||||
* )
|
||||
*/
|
||||
export function createHook<Name extends string, TInvokeInput>(
|
||||
export function createHook<Name extends string, TInvokeInput, TInvokeOutput>(
|
||||
name: Name,
|
||||
input: TInvokeInput
|
||||
): Hook<Name, TInvokeInput> {
|
||||
input: TInvokeInput,
|
||||
options: {
|
||||
resultValidator?: ZodSchema<TInvokeOutput>
|
||||
} = {}
|
||||
): Hook<Name, TInvokeInput, TInvokeOutput> {
|
||||
const context = global[
|
||||
OrchestrationUtils.SymbolMedusaWorkflowComposerContext
|
||||
] as CreateWorkflowComposerContext
|
||||
|
||||
const getHookResultStep = createStep(
|
||||
`get-${name}-result`,
|
||||
(_, context) => {
|
||||
const result = context[" getStepResult"](name)
|
||||
if (result === NOOP_RESULT) {
|
||||
return new StepResponse(undefined)
|
||||
}
|
||||
if (options.resultValidator) {
|
||||
return options.resultValidator.parse(result)
|
||||
}
|
||||
return result
|
||||
},
|
||||
() => void 0
|
||||
)
|
||||
|
||||
context.hookBinder(name, function (this: CreateWorkflowComposerContext) {
|
||||
/**
|
||||
* We start by registering a new step within the workflow. This will be a noop
|
||||
@@ -68,7 +97,7 @@ export function createHook<Name extends string, TInvokeInput>(
|
||||
*/
|
||||
createStep(
|
||||
name,
|
||||
(_: TInvokeInput) => void 0,
|
||||
(_: TInvokeInput) => new StepResponse(NOOP_RESULT),
|
||||
() => void 0
|
||||
)(input)
|
||||
|
||||
@@ -98,5 +127,13 @@ export function createHook<Name extends string, TInvokeInput>(
|
||||
return {
|
||||
__type: OrchestrationUtils.SymbolWorkflowHook,
|
||||
name,
|
||||
} as Hook<Name, TInvokeInput>
|
||||
getResult() {
|
||||
if ("cachedResult" in this) {
|
||||
return this.cachedResult
|
||||
}
|
||||
const result = getHookResultStep()
|
||||
this["cachedResult"] = result
|
||||
return result
|
||||
},
|
||||
} as Hook<Name, TInvokeInput, TInvokeOutput>
|
||||
}
|
||||
|
||||
@@ -35,6 +35,12 @@ function buildStepContext({
|
||||
parentStepIdempotencyKey: flowMetadata?.parentStepIdempotencyKey as string,
|
||||
transactionId: stepArguments.context!.transactionId,
|
||||
context: stepArguments.context!,
|
||||
" getStepResult"(
|
||||
stepId: string,
|
||||
action: "invoke" | "compensate" = "invoke"
|
||||
) {
|
||||
return (stepArguments[action][stepId] as any)?.output?.output
|
||||
},
|
||||
}
|
||||
|
||||
return executionContext
|
||||
|
||||
@@ -27,10 +27,14 @@ export type StepFunctionReturnConfig<TOutput> = {
|
||||
type KeysOfUnion<T> = T extends T ? keyof T : never
|
||||
export type HookHandler = (...args: any[]) => void | Promise<void>
|
||||
|
||||
type ConvertHookToObject<THook> = THook extends Hook<infer Name, infer Input>
|
||||
type ConvertHookToObject<THook> = THook extends Hook<
|
||||
infer Name,
|
||||
infer Input,
|
||||
infer Output
|
||||
>
|
||||
? {
|
||||
[K in Name]: <TOutput, TCompensateInput>(
|
||||
invoke: InvokeFn<Input, TOutput, TCompensateInput>,
|
||||
[K in Name]: <TCompensateInput>(
|
||||
invoke: InvokeFn<Input, Output, TCompensateInput>,
|
||||
compensate?: CompensateFn<TCompensateInput>
|
||||
) => void
|
||||
}
|
||||
@@ -170,6 +174,14 @@ export interface StepExecutionContext {
|
||||
* A string indicating the ID of the current transaction.
|
||||
*/
|
||||
transactionId?: string
|
||||
|
||||
/**
|
||||
* Get access to the result returned by a named step. Returns undefined
|
||||
* when step is not found or when nothing was returned.
|
||||
*
|
||||
* Adding a space hides the method from the autocomplete
|
||||
*/
|
||||
" getStepResult"(stepId: string, action?: "invoke" | "compensate"): any
|
||||
}
|
||||
|
||||
export type WorkflowTransactionContext = StepExecutionContext &
|
||||
|
||||
@@ -6885,11 +6885,13 @@ __metadata:
|
||||
"@swc/core": ^1.7.28
|
||||
"@swc/jest": ^0.2.36
|
||||
awilix: ^8.0.1
|
||||
expect-type: ^0.20.0
|
||||
jest: ^29.7.0
|
||||
pg: ^8.13.0
|
||||
rimraf: ^5.0.1
|
||||
typescript: ^5.6.2
|
||||
ulid: ^2.3.0
|
||||
zod: 3.22.4
|
||||
peerDependencies:
|
||||
"@mikro-orm/core": 6.4.3
|
||||
"@mikro-orm/knex": 6.4.3
|
||||
@@ -6898,6 +6900,7 @@ __metadata:
|
||||
awilix: ^8.0.1
|
||||
express: ^4.21.0
|
||||
pg: ^8.13.0
|
||||
zod: 3.22.4
|
||||
languageName: unknown
|
||||
linkType: soft
|
||||
|
||||
|
||||
Reference in New Issue
Block a user