breaking: implement workflow hooks (first iteration) (#8346)

This commit is contained in:
Harminder Virk
2024-07-31 15:36:38 +05:30
committed by GitHub
parent 6ccf83128c
commit 864bb0df05
157 changed files with 1161 additions and 838 deletions

View File

@@ -2,10 +2,10 @@ import {
createStep,
createWorkflow,
StepResponse,
transform,
when,
WorkflowData,
} from "./composer"
import { createHook } from "./composer/create-hook"
import { WorkflowResponse } from "./composer/helpers/workflow-response"
const step1 = createStep("step1", async (input: {}, context) => {
return new StepResponse({ step1: "step1" })
@@ -24,12 +24,21 @@ const workflow = createWorkflow(
"sub-workflow",
function (input: WorkflowData<{ outsideWorkflowData: string }>) {
step1()
const somethingHook = createHook("something", { id: "1" })
step3()
return step2({ filters: { id: input.outsideWorkflowData } })
return new WorkflowResponse({ id: 1 }, { hooks: [somethingHook] })
}
)
const workflow2 = createWorkflow("workflow", function () {
workflow.hooks.something((input) => {
console.log("input>", input)
})
workflow.run().then((res) => {
console.log("res", res)
})
/*const workflow2 = createWorkflow("workflow", function () {
const step1Res = step1()
const step3Res = when({ value: true }, ({ value }) => {
@@ -45,13 +54,13 @@ const workflow2 = createWorkflow("workflow", function () {
const workflowRes = workflow.asStep({ outsideWorkflowData: step1Res.step1 })
return workflowRes
})
})*/
workflow2()
.run({})
.then((res) => {
console.log(res.result)
})
// workflow()
// .run({})
// .then((res) => {
// console.log(res.result)
// })
/*const step1 = createStep("step1", async (input: {}, context) => {
return new StepResponse({ step1: ["step1"] })

View File

@@ -1,4 +1,9 @@
import { WorkflowManager, WorkflowScheduler } from "@medusajs/orchestration"
import {
IDistributedSchedulerStorage,
SchedulerOptions,
WorkflowManager,
WorkflowScheduler,
} from "@medusajs/orchestration"
import {
ModuleRegistrationName,
composeMessage,
@@ -8,14 +13,14 @@ import {
import { asValue } from "awilix"
import {
StepResponse,
WorkflowResponse,
createStep,
createWorkflow,
hook,
parallelize,
transform,
} from ".."
import { MedusaWorkflow } from "../../../medusa-workflow"
import { IDistributedSchedulerStorage, SchedulerOptions } from "../../dist"
import { createHook } from "../create-hook"
jest.setTimeout(30000)
@@ -109,7 +114,7 @@ describe("Workflow composer", function () {
const step1 = createStep({ name: "step1", maxRetries }, mockStep1Fn)
const workflow = createWorkflow("workflow1", function (input) {
return step1(input)
return new WorkflowResponse(step1(input))
})
const workflowInput = { test: "payload1" }
@@ -154,7 +159,7 @@ describe("Workflow composer", function () {
const workflow = createWorkflow("workflow1", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2 })
return new WorkflowResponse(step3({ one: returnStep1, two: ret2 }))
})
const workflowInput = { test: "payload1" }
@@ -239,13 +244,13 @@ describe("Workflow composer", function () {
const workflow = createWorkflow("workflow1", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2 })
return new WorkflowResponse(step3({ one: returnStep1, two: ret2 }))
})
const workflow2 = createWorkflow("workflow2", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2 })
return new WorkflowResponse(step3({ one: returnStep1, two: ret2 }))
})
const workflowInput = { test: "payload1" }
@@ -348,13 +353,13 @@ describe("Workflow composer", function () {
createWorkflow("workflow1", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2 })
return new WorkflowResponse(step3({ one: returnStep1, two: ret2 }))
}),
createWorkflow("workflow2", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2 })
return new WorkflowResponse(step3({ one: returnStep1, two: ret2 }))
}),
])
@@ -458,13 +463,13 @@ describe("Workflow composer", function () {
createWorkflow("workflow1", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2 })
return new WorkflowResponse(step3({ one: returnStep1, two: ret2 }))
}),
createWorkflow("workflow2", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2 })
return new WorkflowResponse(step3({ one: returnStep1, two: ret2 }))
}),
])
@@ -573,7 +578,7 @@ describe("Workflow composer", function () {
const workflow = createWorkflow("workflow1", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2 })
return new WorkflowResponse(step3({ one: returnStep1, two: ret2 }))
})
const workflowInput = { test: "payload1" }
@@ -675,7 +680,7 @@ describe("Workflow composer", function () {
const workflow = createWorkflow("workflow1", function (input) {
const returnStep1 = step1(input)
const [ret2, ret3] = parallelize(step2(returnStep1), step3(returnStep1))
return step4({ one: ret2, two: ret3 })
return new WorkflowResponse(step4({ one: ret2, two: ret3 }))
})
const workflowInput = { test: "payload1" }
@@ -803,7 +808,7 @@ describe("Workflow composer", function () {
const avg = transform({ obj: ret2 }, transform3Fn)
return step3(avg)
return new WorkflowResponse(step3(avg))
})
const workflowInput = { a: 1, b: 2 }
@@ -878,9 +883,7 @@ describe("Workflow composer", function () {
expect(mockStep3Fn.mock.calls[0][0]).toEqual({ variant: "variant_2" })
})
it("should compose a new workflow exposing hooks and log warns if multiple handlers are registered for the same hook", async () => {
const warn = jest.spyOn(console, "warn").mockImplementation(() => {})
it("should throw error when multiple handlers are defined for a single hook", async () => {
const mockStep1Fn = jest.fn().mockImplementation(({ input }) => {
return { id: input, product: "product_1", variant: "variant_2" }
})
@@ -896,64 +899,29 @@ describe("Workflow composer", function () {
const workflow = createWorkflow("workflow1", function (input) {
const data = getData({ input })
const hookReturn = hook("changeProduct", {
const hookReturn = createHook("changeProduct", {
opinionatedPropertyName: data,
})
const transformedData = transform(
{ data, hookReturn },
({ data, hookReturn }: { data: any; hookReturn: any }) => {
return {
...data,
...hookReturn,
}
}
)
return saveProduct({ product: transformedData })
return new WorkflowResponse(saveProduct({ product: data }), {
hooks: [hookReturn],
})
})
workflow.changeProduct(({ opinionatedPropertyName }) => {
return {
newProperties: "new properties",
prod: opinionatedPropertyName.product + "**",
var: opinionatedPropertyName.variant + "**",
other: [1, 2, 3],
nested: {
a: {
b: "c",
},
},
moreProperties: "more properties",
}
})
workflow.changeProduct((theReturnOfThePreviousHook) => {
return {
...theReturnOfThePreviousHook,
moreProperties: "2nd hook update",
}
})
workflow.hooks.changeProduct(() => {})
expect(() => workflow.hooks.changeProduct(() => {})).toThrow(
"Cannot define multiple hook handlers for the changeProduct hook"
)
const workflowInput = "id_123"
const { result: final } = await workflow().run({
input: workflowInput,
})
expect(warn).toHaveBeenCalledTimes(1)
expect(final).toEqual({
id: "id_123",
prod: "product_1**",
var: "variant_2**",
variant: "variant_2",
product: "Saved product - product_1",
newProperties: "new properties",
other: [1, 2, 3],
nested: {
a: {
b: "c",
},
},
moreProperties: "more properties",
})
})
})
@@ -976,7 +944,7 @@ describe("Workflow composer", function () {
const step1 = createStep({ name: "step1", maxRetries }, mockStep1Fn)
const workflow = createWorkflow("workflow1", function (input) {
return step1(input)
return new WorkflowResponse(step1(input))
})
const workflowInput = { test: "payload1" }
@@ -1005,7 +973,7 @@ describe("Workflow composer", function () {
const step1 = createStep({ name: "step1", maxRetries }, mockStep1Fn)
const workflow = createWorkflow("workflow1", function (input) {
return step1(input)
return new WorkflowResponse(step1(input))
})
const workflowInput = { test: "payload1" }
@@ -1054,7 +1022,7 @@ describe("Workflow composer", function () {
const workflow = createWorkflow("workflow1", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2 })
return new WorkflowResponse(step3({ one: returnStep1, two: ret2 }))
})
const workflowInput = { test: "payload1" }
@@ -1140,7 +1108,9 @@ describe("Workflow composer", function () {
const workflow = createWorkflow("workflow1", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2, input })
return new WorkflowResponse(
step3({ one: returnStep1, two: ret2, input })
)
})
const { result: workflowResult, transaction } = await workflow().run({
@@ -1215,13 +1185,13 @@ describe("Workflow composer", function () {
const workflow = createWorkflow("workflow1", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2 })
return new WorkflowResponse(step3({ one: returnStep1, two: ret2 }))
})
const workflow2 = createWorkflow("workflow2", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2 })
return new WorkflowResponse(step3({ one: returnStep1, two: ret2 }))
})
const workflowInput = { test: "payload1" }
@@ -1324,13 +1294,13 @@ describe("Workflow composer", function () {
createWorkflow("workflow1", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2 })
return new WorkflowResponse(step3({ one: returnStep1, two: ret2 }))
}),
createWorkflow("workflow2", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2 })
return new WorkflowResponse(step3({ one: returnStep1, two: ret2 }))
}),
])
@@ -1434,13 +1404,13 @@ describe("Workflow composer", function () {
createWorkflow("workflow1", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2 })
return new WorkflowResponse(step3({ one: returnStep1, two: ret2 }))
}),
createWorkflow("workflow2", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2 })
return new WorkflowResponse(step3({ one: returnStep1, two: ret2 }))
}),
])
@@ -1549,7 +1519,7 @@ describe("Workflow composer", function () {
const workflow = createWorkflow("workflow1", function (input) {
const returnStep1 = step1(input)
const ret2 = step2(returnStep1)
return step3({ one: returnStep1, two: ret2 })
return new WorkflowResponse(step3({ one: returnStep1, two: ret2 }))
})
const workflowInput = { test: "payload1" }
@@ -1651,7 +1621,7 @@ describe("Workflow composer", function () {
const workflow = createWorkflow("workflow1", function (input) {
const returnStep1 = step1(input)
const [ret2, ret3] = parallelize(step2(returnStep1), step3(returnStep1))
return step4({ one: ret2, two: ret3 })
return new WorkflowResponse(step4({ one: ret2, two: ret3 }))
})
const workflowInput = { test: "payload1" }
@@ -1779,7 +1749,7 @@ describe("Workflow composer", function () {
const avg = transform({ obj: ret2 }, transform3Fn)
return step3(avg)
return new WorkflowResponse(step3(avg))
})
const workflowInput = { a: 1, b: 2 }
@@ -1858,9 +1828,7 @@ describe("Workflow composer", function () {
expect(mockStep3Fn.mock.calls[0][0]).toEqual({ variant: "variant_2" })
})
it("should compose a new workflow exposing hooks and log warns if multiple handlers are registered for the same hook", async () => {
const warn = jest.spyOn(console, "warn").mockImplementation(() => {})
it("should throw error when multiple handlers for the same hook are defined", async () => {
const mockStep1Fn = jest.fn().mockImplementation(({ input }) => {
return new StepResponse({
id: input,
@@ -1880,64 +1848,29 @@ describe("Workflow composer", function () {
const workflow = createWorkflow("workflow1", function (input) {
const data = getData({ input })
const hookReturn = hook("changeProduct", {
const hookReturn = createHook("changeProduct", {
opinionatedPropertyName: data,
})
const transformedData = transform(
{ data, hookReturn },
({ data, hookReturn }: { data: any; hookReturn: any }) => {
return {
...data,
...hookReturn,
}
}
)
return saveProduct({ product: transformedData })
return new WorkflowResponse(saveProduct({ product: data }), {
hooks: [hookReturn],
})
})
workflow.changeProduct(({ opinionatedPropertyName }) => {
return {
newProperties: "new properties",
prod: opinionatedPropertyName.product + "**",
var: opinionatedPropertyName.variant + "**",
other: [1, 2, 3],
nested: {
a: {
b: "c",
},
},
moreProperties: "more properties",
}
})
workflow.changeProduct((theReturnOfThePreviousHook) => {
return {
...theReturnOfThePreviousHook,
moreProperties: "2nd hook update",
}
})
workflow.hooks.changeProduct(() => {})
expect(() => workflow.hooks.changeProduct(() => {})).toThrow(
"Cannot define multiple hook handlers for the changeProduct hook"
)
const workflowInput = "id_123"
const { result: final } = await workflow().run({
input: workflowInput,
})
expect(warn).toHaveBeenCalledTimes(1)
expect(final).toEqual({
id: "id_123",
prod: "product_1**",
var: "variant_2**",
variant: "variant_2",
product: "Saved product - product_1",
newProperties: "new properties",
other: [1, 2, 3],
nested: {
a: {
b: "c",
},
},
moreProperties: "more properties",
})
})
})
@@ -1954,7 +1887,7 @@ describe("Workflow composer", function () {
const step1 = createStep("step1", mockStep1Fn, mockCompensateSte1)
const workflow = createWorkflow("workflow1", function (input) {
return step1(input)
return new WorkflowResponse(step1(input))
})
const workflowInput = { test: "payload1" }
@@ -1998,7 +1931,7 @@ describe("Workflow composer", function () {
const workflow = createWorkflow("workflow1", function () {
const { property, obj } = step1()
return { someOtherName: property, obj }
return new WorkflowResponse({ someOtherName: property, obj })
})
const { result } = await workflow().run({
@@ -2032,7 +1965,7 @@ describe("Workflow composer", function () {
const s1 = step1()
const s2 = step2()
return [s1, s2]
return new WorkflowResponse([s1, s2])
})
const { result } = await workflow().run({
@@ -2068,7 +2001,7 @@ describe("Workflow composer", function () {
const { obj } = step1()
const s2 = step2()
return [{ step1_nested_obj: obj.nested }, s2]
return new WorkflowResponse([{ step1_nested_obj: obj.nested }, s2])
})
const { result } = await workflow().run({
@@ -2097,7 +2030,7 @@ describe("Workflow composer", function () {
const step1 = createStep("step1", mockStep1Fn, mockCompensateSte1)
const workflow = createWorkflow("workflow1", function (input) {
return step1(input)
return new WorkflowResponse(step1(input))
})
const workflowInput = { test: "payload1" }
@@ -2146,7 +2079,7 @@ describe("Workflow composer", function () {
const { obj } = step1()
const s2 = step2()
return [{ step1_nested_obj: obj.nested }, s2]
return new WorkflowResponse([{ step1_nested_obj: obj.nested }, s2])
})
expect(() =>
@@ -2155,7 +2088,7 @@ describe("Workflow composer", function () {
const s2 = step2()
step3()
return [{ step1_nested_obj: obj.nested }, s2]
return new WorkflowResponse([{ step1_nested_obj: obj.nested }, s2])
})
).toThrowError(
`Workflow with id "workflow1" and step definition already exists.`
@@ -2165,7 +2098,7 @@ describe("Workflow composer", function () {
const { obj } = step1()
const s2 = step2()
return [{ step1_nested_obj: obj.nested }, s2]
return new WorkflowResponse([{ step1_nested_obj: obj.nested }, s2])
})
})
@@ -2270,7 +2203,7 @@ describe("Workflow composer", function () {
const workflow = createWorkflow("workflow1", function (input) {
step1(input)
step2()
step2(input)
})
await workflow(container).run({

View File

@@ -0,0 +1,74 @@
import { CompensateFn, createStep, InvokeFn } from "./create-step"
import { OrchestrationUtils } from "@medusajs/utils"
import { CreateWorkflowComposerContext } from "./type"
import { createStepHandler } from "./helpers/create-step-handler"
/**
* Representation of a hook definition.
*/
export type Hook<Name extends string, Input> = {
__type: typeof OrchestrationUtils.SymbolWorkflowHook
name: Name
/**
* By prefixing a key with a space, we remove it from the
* intellisense of TypeScript. This is needed because
* input is not set at runtime. It is a type-only
* property to infer input data type of a hook
*/
" input": Input
}
/**
* Define a workflow hook to be executed within steps. The created hook
* is exposed publicly for the workflow consumers to inject and run
* custom logic.
*
* Hooks behaves like steps
*/
export function createHook<Name extends string, TInvokeInput>(
name: Name,
input: TInvokeInput
): Hook<Name, TInvokeInput> {
const context = global[
OrchestrationUtils.SymbolMedusaWorkflowComposerContext
] as CreateWorkflowComposerContext
context.hookBinder(name, function (this: CreateWorkflowComposerContext) {
/**
* We start by registering a new step within the workflow. This will be a noop
* step that can be replaced (optionally) by the workflow consumer.
*/
createStep(
name,
(_: TInvokeInput) => void 0,
() => void 0
)(input)
function hook<
TInvokeResultCompensateInput
>(this: CreateWorkflowComposerContext, invokeFn: InvokeFn<TInvokeInput, unknown, TInvokeResultCompensateInput>, compensateFn?: CompensateFn<TInvokeResultCompensateInput>) {
const handlers = createStepHandler.bind(this)({
stepName: name,
input,
invokeFn,
compensateFn,
})
if (this.hooks_.registered.includes(name)) {
throw new Error(
`Cannot define multiple hook handlers for the ${name} hook`
)
}
this.hooks_.registered.push(name)
this.handlers.set(name, handlers)
}
return hook
})
return {
__type: OrchestrationUtils.SymbolWorkflowHook,
name,
} as Hook<Name, TInvokeInput>
}

View File

@@ -4,7 +4,7 @@ import {
WorkflowStepHandler,
WorkflowStepHandlerArguments,
} from "@medusajs/orchestration"
import { OrchestrationUtils, deepCopy, isString } from "@medusajs/utils"
import { OrchestrationUtils, isString } from "@medusajs/utils"
import { ulid } from "ulid"
import { StepResponse, resolveValue } from "./helpers"
import { proxify } from "./helpers/proxy"
@@ -15,6 +15,7 @@ import {
StepFunctionResult,
WorkflowData,
} from "./type"
import { createStepHandler } from "./helpers/create-step-handler"
/**
* The type of invocation function passed to a step.
@@ -25,7 +26,7 @@ import {
*
* @returns The expected output based on the type parameter `TOutput`.
*/
type InvokeFn<TInput, TOutput, TCompensateInput> = (
export type InvokeFn<TInput, TOutput, TCompensateInput> = (
/**
* The input of the step.
*/
@@ -53,7 +54,7 @@ type InvokeFn<TInput, TOutput, TCompensateInput> = (
*
* @returns There's no expected type to be returned by the compensation function.
*/
type CompensateFn<T> = (
export type CompensateFn<T> = (
/**
* The argument passed to the compensation function.
*/
@@ -64,7 +65,7 @@ type CompensateFn<T> = (
context: StepExecutionContext
) => unknown | Promise<unknown>
interface ApplyStepOptions<
export interface ApplyStepOptions<
TStepInputs extends {
[K in keyof TInvokeInput]: WorkflowData<TInvokeInput[K]>
},
@@ -95,7 +96,7 @@ interface ApplyStepOptions<
* @param invokeFn
* @param compensateFn
*/
function applyStep<
export function applyStep<
TInvokeInput,
TStepInput extends {
[K in keyof TInvokeInput]: WorkflowData<TInvokeInput[K]>
@@ -121,77 +122,12 @@ function applyStep<
)
}
const handler = {
invoke: async (stepArguments: WorkflowStepHandlerArguments) => {
const metadata = stepArguments.metadata
const idempotencyKey = metadata.idempotency_key
stepArguments.context!.idempotencyKey = idempotencyKey
const executionContext: StepExecutionContext = {
workflowId: metadata.model_id,
stepName: metadata.action,
action: "invoke",
idempotencyKey,
attempt: metadata.attempt,
container: stepArguments.container,
metadata,
eventGroupId:
stepArguments.transaction.getFlow()?.metadata?.eventGroupId ??
stepArguments.context!.eventGroupId,
transactionId: stepArguments.context!.transactionId,
context: stepArguments.context!,
}
const argInput = input ? await resolveValue(input, stepArguments) : {}
const stepResponse: StepResponse<any, any> = await invokeFn.apply(
this,
[argInput, executionContext]
)
const stepResponseJSON =
stepResponse?.__type === OrchestrationUtils.SymbolWorkflowStepResponse
? stepResponse.toJSON()
: stepResponse
return {
__type: OrchestrationUtils.SymbolWorkflowWorkflowData,
output: stepResponseJSON,
}
},
compensate: compensateFn
? async (stepArguments: WorkflowStepHandlerArguments) => {
const metadata = stepArguments.metadata
const idempotencyKey = metadata.idempotency_key
stepArguments.context!.idempotencyKey = idempotencyKey
const executionContext: StepExecutionContext = {
workflowId: metadata.model_id,
stepName: metadata.action,
action: "compensate",
idempotencyKey,
attempt: metadata.attempt,
container: stepArguments.container,
metadata,
context: stepArguments.context!,
}
const stepOutput = (stepArguments.invoke[stepName] as any)?.output
const invokeResult =
stepOutput?.__type ===
OrchestrationUtils.SymbolWorkflowStepResponse
? stepOutput.compensateInput &&
deepCopy(stepOutput.compensateInput)
: stepOutput && deepCopy(stepOutput)
const args = [invokeResult, executionContext]
const output = await compensateFn.apply(this, args)
return {
output,
}
}
: undefined,
}
const handler = createStepHandler.bind(this)({
stepName,
input,
invokeFn,
compensateFn,
})
wrapAsyncHandler(stepConfig, handler)
@@ -450,19 +386,17 @@ export function createStep<
}
| undefined
): WorkflowData<TInvokeResultOutput> {
if (!global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext]) {
const context = global[
OrchestrationUtils.SymbolMedusaWorkflowComposerContext
] as CreateWorkflowComposerContext
if (!context) {
throw new Error(
"createStep must be used inside a createWorkflow definition"
)
}
const stepBinder = (
global[
OrchestrationUtils.SymbolMedusaWorkflowComposerContext
] as CreateWorkflowComposerContext
).stepBinder
return stepBinder<TInvokeResultOutput>(
return context.stepBinder<TInvokeResultOutput>(
applyStep<
TInvokeInput,
{ [K in keyof TInvokeInput]: WorkflowData<TInvokeInput[K]> },

View File

@@ -14,8 +14,9 @@ import {
ReturnWorkflow,
StepFunction,
WorkflowData,
WorkflowDataProperties,
HookHandler,
} from "./type"
import { WorkflowResponse } from "./helpers/workflow-response"
global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = null
@@ -74,11 +75,7 @@ global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = null
* }
*/
export function createWorkflow<
TData,
TResult,
THooks extends Record<string, Function> = Record<string, Function>
>(
export function createWorkflow<TData, TResult, THooks extends any[]>(
/**
* The name of the workflow or its configuration.
*/
@@ -88,15 +85,9 @@ export function createWorkflow<
* The function can't be an arrow function or an asynchronus function. It also can't directly manipulate data.
* You'll have to use the {@link transform} function if you need to directly manipulate data.
*/
composer: (input: WorkflowData<TData>) =>
| void
| WorkflowData<TResult>
| {
[K in keyof TResult]:
| WorkflowData<TResult[K]>
| WorkflowDataProperties<TResult[K]>
| TResult[K]
}
composer: (
input: WorkflowData<TData>
) => void | WorkflowResponse<TResult, THooks>
): ReturnWorkflow<TData, TResult, THooks> {
const name = isString(nameOrConfig) ? nameOrConfig : nameOrConfig.name
const options = isString(nameOrConfig) ? {} : nameOrConfig
@@ -110,14 +101,18 @@ export function createWorkflow<
}
const context: CreateWorkflowComposerContext = {
__type: OrchestrationUtils.SymbolMedusaWorkflowComposerContext,
workflowId: name,
flow: WorkflowManager.getEmptyTransactionDefinition(),
handlers,
hooks_: [],
hooks_: {
declared: [],
registered: [],
},
hooksCallback_: {},
hookBinder: (name, fn) => {
context.hooks_.push(name)
return fn(context)
context.hooks_.declared.push(name)
context.hooksCallback_[name] = fn.bind(context)()
},
stepBinder: (fn) => {
return fn.bind(context)()
@@ -169,28 +164,13 @@ export function createWorkflow<
return expandedFlow
}
let shouldRegisterHookHandler = true
for (const hook of context.hooks_) {
mainFlow[hook] = (fn) => {
context.hooksCallback_[hook] ??= []
if (!shouldRegisterHookHandler) {
console.warn(
`A hook handler has already been registered for the ${hook} hook. The current handler registration will be skipped.`
)
return
}
context.hooksCallback_[hook].push(fn)
shouldRegisterHookHandler = false
}
mainFlow.hooks = {} as Record<string, HookHandler>
for (const hook of context.hooks_.declared) {
mainFlow.hooks[hook] = context.hooksCallback_[hook].bind(context)
}
mainFlow.getName = () => name
mainFlow.run = mainFlow().run
mainFlow.runAsStep = ({
input,
}: {

View File

@@ -0,0 +1,105 @@
import {
CreateWorkflowComposerContext,
StepExecutionContext,
WorkflowData,
} from "../type"
import { WorkflowStepHandlerArguments } from "@medusajs/orchestration"
import { resolveValue } from "./resolve-value"
import { StepResponse } from "./step-response"
import { deepCopy, OrchestrationUtils } from "@medusajs/utils"
import { ApplyStepOptions } from "../create-step"
export function createStepHandler<
TInvokeInput,
TStepInput extends {
[K in keyof TInvokeInput]: WorkflowData<TInvokeInput[K]>
},
TInvokeResultOutput,
TInvokeResultCompensateInput
>(
this: CreateWorkflowComposerContext,
{
stepName,
input,
invokeFn,
compensateFn,
}: ApplyStepOptions<
TStepInput,
TInvokeInput,
TInvokeResultOutput,
TInvokeResultCompensateInput
>
) {
const handler = {
invoke: async (stepArguments: WorkflowStepHandlerArguments) => {
const metadata = stepArguments.metadata
const idempotencyKey = metadata.idempotency_key
stepArguments.context!.idempotencyKey = idempotencyKey
const executionContext: StepExecutionContext = {
workflowId: metadata.model_id,
stepName: metadata.action,
action: "invoke",
idempotencyKey,
attempt: metadata.attempt,
container: stepArguments.container,
metadata,
eventGroupId:
stepArguments.transaction.getFlow()?.metadata?.eventGroupId ??
stepArguments.context!.eventGroupId,
transactionId: stepArguments.context!.transactionId,
context: stepArguments.context!,
}
const argInput = input ? await resolveValue(input, stepArguments) : {}
const stepResponse: StepResponse<any, any> = await invokeFn.apply(this, [
argInput,
executionContext,
])
const stepResponseJSON =
stepResponse?.__type === OrchestrationUtils.SymbolWorkflowStepResponse
? stepResponse.toJSON()
: stepResponse
return {
__type: OrchestrationUtils.SymbolWorkflowWorkflowData,
output: stepResponseJSON,
}
},
compensate: compensateFn
? async (stepArguments: WorkflowStepHandlerArguments) => {
const metadata = stepArguments.metadata
const idempotencyKey = metadata.idempotency_key
stepArguments.context!.idempotencyKey = idempotencyKey
const executionContext: StepExecutionContext = {
workflowId: metadata.model_id,
stepName: metadata.action,
action: "compensate",
idempotencyKey,
attempt: metadata.attempt,
container: stepArguments.container,
metadata,
context: stepArguments.context!,
}
const stepOutput = (stepArguments.invoke[stepName] as any)?.output
const invokeResult =
stepOutput?.__type === OrchestrationUtils.SymbolWorkflowStepResponse
? stepOutput.compensateInput &&
deepCopy(stepOutput.compensateInput)
: stepOutput && deepCopy(stepOutput)
const args = [invokeResult, executionContext]
const output = await compensateFn.apply(this, args)
return {
output,
}
}
: undefined,
}
return handler
}

View File

@@ -5,12 +5,14 @@ async function resolveProperty(property, transactionContext) {
if (property?.__type === OrchestrationUtils.SymbolInputReference) {
return transactionContext.payload
} else if (
property?.__type === OrchestrationUtils.SymbolMedusaWorkflowResponse
) {
return resolveValue(property.$result, transactionContext)
} else if (
property?.__type === OrchestrationUtils.SymbolWorkflowStepTransformer
) {
return await property.__resolver(transactionContext)
} else if (property?.__type === OrchestrationUtils.SymbolWorkflowHook) {
return await property.__value(transactionContext)
} else if (property?.__type === OrchestrationUtils.SymbolWorkflowStep) {
const output =
invokeRes[property.__step__]?.output ?? invokeRes[property.__step__]

View File

@@ -0,0 +1,22 @@
import { OrchestrationUtils } from "@medusajs/utils"
import { WorkflowData, WorkflowDataProperties } from "../type"
/**
* Workflow response class encapsulates the return value of a workflow
*/
export class WorkflowResponse<TResult, THooks = []> {
__type: typeof OrchestrationUtils.SymbolMedusaWorkflowResponse =
OrchestrationUtils.SymbolMedusaWorkflowResponse
constructor(
public $result:
| WorkflowData<TResult>
| {
[K in keyof TResult]:
| WorkflowData<TResult[K]>
| WorkflowDataProperties<TResult[K]>
| TResult[K]
},
public options?: { hooks: THooks }
) {}
}

View File

@@ -1,146 +0,0 @@
import {WorkflowStepHandlerArguments} from "@medusajs/orchestration"
import {deepCopy, OrchestrationUtils} from "@medusajs/utils"
import {resolveValue} from "./helpers"
import {
CreateWorkflowComposerContext,
StepExecutionContext,
WorkflowData,
} from "./type"
/**
*
* @ignore
*
* This function allows you to add hooks in your workflow that provide access to some data. Then, consumers of that workflow can add a handler function that performs
* an action with the provided data or modify it.
*
* For example, in a "create product" workflow, you may add a hook after the product is created, providing access to the created product.
* Then, developers using that workflow can hook into that point to access the product, modify its attributes, then return the updated product.
*
* @typeParam TOutput - The expected output of the hook's handler function.
* @returns The output of handler functions of this hook. If there are no handler functions, the output is `undefined`.
*
* @example
* import {
* createWorkflow,
* StepExecutionContext,
* hook,
* transform
* } from "@medusajs/workflows-sdk"
* import {
* createProductStep,
* getProductStep,
* createPricesStep
* } from "./steps"
* import {
* MedusaRequest,
* MedusaResponse,
* Product, ProductService
* } from "@medusajs/medusa"
*
* interface WorkflowInput {
* title: string
* }
*
* const myWorkflow = createWorkflow<
* WorkflowInput,
* Product
* >("my-workflow",
* function (input) {
* const product = createProductStep(input)
*
* const hookProduct = hook<Product>("createdProductHook", product)
*
* const newProduct = transform({
* product,
* hookProduct
* }, (input) => {
* return input.hookProduct || input.product
* })
*
* const prices = createPricesStep(newProduct)
*
* return getProductStep(product.id)
* }
* )
*
* myWorkflow.createdProductHook(
* async (product, context: StepExecutionContext) => {
* const productService: ProductService = context.container.resolve("productService")
*
* const updatedProduct = await productService.updateProducts(product.id, {
* description: "a cool shirt"
* })
*
* return updatedProduct
* })
*
* export async function POST(
* req: MedusaRequest,
* res: MedusaResponse
* ) {
* const { result: product } = await myWorkflow(req.scope)
* .run({
* input: {
* title: req.body.title
* }
* })
*
* res.json({
* product
* })
* }
*/
export function hook<TOutput>(
/**
* The name of the hook. This will be used by the consumer to add a handler method for the hook.
*/
name: string,
/**
* The data that a handler function receives as a parameter.
*/
value: any
): WorkflowData<TOutput> {
const hookBinder = (
global[
OrchestrationUtils.SymbolMedusaWorkflowComposerContext
] as CreateWorkflowComposerContext
).hookBinder
return hookBinder(name, function (context) {
return {
__value: async function (
transactionContext: WorkflowStepHandlerArguments
) {
const metadata = transactionContext.metadata
const idempotencyKey = metadata.idempotency_key
transactionContext.context!.idempotencyKey = idempotencyKey
const executionContext: StepExecutionContext = {
workflowId: metadata.model_id,
stepName: metadata.action,
action: metadata.action_type,
idempotencyKey,
attempt: metadata.attempt,
container: transactionContext.container,
metadata,
context: transactionContext.context!,
}
const allValues = await resolveValue(value, transactionContext)
const stepValue = allValues ? deepCopy(allValues) : allValues
let finalResult
const functions = context.hooksCallback_[name]
for (let i = 0; i < functions.length; i++) {
const fn = functions[i]
const arg = i === 0 ? stepValue : finalResult
finalResult = await fn.apply(fn, [arg, executionContext])
}
return finalResult
},
__type: OrchestrationUtils.SymbolWorkflowHook,
}
})
}

View File

@@ -2,7 +2,8 @@ export * from "./create-step"
export * from "./create-workflow"
export * from "./helpers/resolve-value"
export * from "./helpers/step-response"
export * from "./hook"
export * from "./helpers/workflow-response"
export * from "./create-hook"
export * from "./parallelize"
export * from "./transform"
export * from "./type"

View File

@@ -9,6 +9,7 @@ import {
} from "@medusajs/orchestration"
import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
import { ExportedWorkflow } from "../../helper"
import { Hook } from "./create-hook"
export type StepFunctionResult<TOutput extends unknown | unknown[] = unknown> =
(this: CreateWorkflowComposerContext) => WorkflowData<TOutput>
@@ -23,6 +24,18 @@ type StepFunctionReturnConfig<TOutput> = {
}
type KeysOfUnion<T> = T extends T ? keyof T : never
export type HookHandler = (...args: any[]) => void | Promise<void>
/**
* Helper to convert an array of hooks to functions
*/
type ConvertHooksToFunctions<THooks extends any[]> = {
[K in keyof THooks]: THooks[K] extends Hook<infer Name, infer Input>
? {
[Fn in Name]: (callback: (input: Input) => any) => void
}
: never
}[number]
/**
* A step function to be used in a workflow.
@@ -73,18 +86,19 @@ export type WorkflowData<T = unknown> = (T extends Array<infer Item>
}
export type CreateWorkflowComposerContext = {
hooks_: string[]
hooksCallback_: Record<string, Function[]>
__type: string
hooks_: {
declared: string[]
registered: string[]
}
hooksCallback_: Record<string, HookHandler>
workflowId: string
flow: OrchestratorBuilder
handlers: WorkflowHandler
stepBinder: <TOutput = unknown>(
fn: StepFunctionResult
) => WorkflowData<TOutput>
hookBinder: <TOutput = unknown>(
name: string,
fn: Function
) => WorkflowData<TOutput>
hookBinder: (name: string, fn: () => HookHandler) => void
parallelizeBinder: <TOutput extends WorkflowData[] = WorkflowData[]>(
fn: (this: CreateWorkflowComposerContext) => TOutput
) => TOutput
@@ -193,11 +207,7 @@ export type WorkflowTransactionContext = StepExecutionContext &
* }
* ```
*/
export type ReturnWorkflow<
TData,
TResult,
THooks extends Record<string, Function>
> = {
export type ReturnWorkflow<TData, TResult, THooks extends any[]> = {
<TDataOverride = undefined, TResultOverride = undefined>(
container?: LoadedModule[] | MedusaContainer
): Omit<
@@ -205,22 +215,23 @@ export type ReturnWorkflow<
"run" | "registerStepSuccess" | "registerStepFailure" | "cancel"
> &
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>
} & THooks & {
runAsStep: ({
input,
}: {
input: TData | WorkflowData<TData>
}) => ReturnType<StepFunction<TData, TResult>>
run: <TDataOverride = undefined, TResultOverride = undefined>(
...args: Parameters<
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>["run"]
>
) => ReturnType<
} & {
runAsStep: ({
input,
}: {
input: TData | WorkflowData<TData>
}) => ReturnType<StepFunction<TData, TResult>>
run: <TDataOverride = undefined, TResultOverride = undefined>(
...args: Parameters<
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>["run"]
>
getName: () => string
config: (config: TransactionModelOptions) => void
}
) => ReturnType<
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>["run"]
>
getName: () => string
config: (config: TransactionModelOptions) => void
hooks: ConvertHooksToFunctions<THooks>
}
/**
* Extract the raw type of the expected input data of a workflow.