chore(workflows, core-flows): Split workflows tooling and definitions (#5705)

This commit is contained in:
Adrien de Peretti
2023-11-24 14:55:48 +01:00
committed by GitHub
parent fc1ef29ed9
commit ddbeed4ea6
138 changed files with 274 additions and 241 deletions

View File

@@ -0,0 +1,56 @@
import { mergeData } from "../merge-data"
import { WorkflowStepMiddlewareReturn } from "../pipe"
describe("merge", function () {
it("should merge a new object from the source into a specify target", async function () {
const source = {
stringProp: "stringProp",
anArray: ["anArray"],
input: {
test: "test",
},
another: {
anotherTest: "anotherTest",
},
}
const result = (await mergeData(
["input", "another", "stringProp", "anArray"],
"payload"
)({ data: source } as any)) as unknown as WorkflowStepMiddlewareReturn
expect(result).toEqual({
alias: "payload",
value: {
...source.input,
...source.another,
anArray: source.anArray,
stringProp: source.stringProp,
},
})
})
it("should merge a new object from the entire source into the resul object", async function () {
const source = {
stringProp: "stringProp",
anArray: ["anArray"],
input: {
test: "test",
},
another: {
anotherTest: "anotherTest",
},
}
const { value: result } = (await mergeData()({
data: source,
} as any)) as unknown as WorkflowStepMiddlewareReturn
expect(result).toEqual({
...source.input,
...source.another,
anArray: source.anArray,
stringProp: source.stringProp,
})
})
})

View File

@@ -0,0 +1,234 @@
import { pipe } from "../pipe"
describe("Pipe", function () {
it("should evaluate the input object and append the values to the data object and return the result from the handler", async function () {
const payload = { input: "input" }
const output = { test: "test" }
const invoke = {
input: payload,
step1: { ...payload, step1Data: { test: "test" } },
step2: { ...payload, step2Data: { test: "test" } },
}
const handler = jest.fn().mockImplementation(async () => output)
const input = {
inputAlias: "payload",
invoke: [
{
from: "payload",
alias: "input",
},
{
from: "step1",
alias: "previousDataStep1",
},
{
from: "step2",
alias: "previousDataStep2",
},
],
}
const result = await pipe(input, handler)({ invoke, payload } as any)
expect(handler).toHaveBeenCalled()
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
data: {
input: payload,
previousDataStep1: invoke.step1,
previousDataStep2: invoke.step2,
},
})
)
expect(result).toBeDefined()
expect(result).toEqual(output)
})
it("should evaluate the input object and append the values to the data object using the merge and return the result from the handler", async function () {
const payload = { input: "input" }
const output = { test: "test" }
const invoke = {
input: payload,
step1: { step1Data: { test: "test" } },
step2: [{ test: "test" }],
}
const handler = jest.fn().mockImplementation(async () => output)
const input = {
inputAlias: "payload",
merge: true,
invoke: [
{
from: "payload",
},
{
from: "step1",
},
{
from: "step2",
alias: "step2Data",
},
],
}
const result = await pipe(input, handler)({ invoke, payload } as any)
expect(handler).toHaveBeenCalled()
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
data: {
...payload,
...invoke.step1,
step2Data: invoke.step2,
},
})
)
expect(result).toBeDefined()
expect(result).toEqual(output)
})
it("should evaluate the input object and append the values to the data object using the merge to store on the merge alias and return the result from the handler", async function () {
const payload = { input: "input" }
const output = { test: "test" }
const invoke = {
input: payload,
step1: { step1Data: { test: "test" } },
step2: { step2Data: { test: "test" } },
}
const handler = jest.fn().mockImplementation(async () => output)
const input = {
inputAlias: "payload",
mergeAlias: "mergedData",
invoke: [
{
from: "payload",
alias: "input",
},
{
from: "step1",
alias: "step1Data",
},
{
from: "step2",
alias: "step2Data",
},
],
}
const result = await pipe(input, handler)({ invoke, payload } as any)
expect(handler).toHaveBeenCalled()
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
data: {
input: payload,
step1Data: invoke.step1,
step2Data: invoke.step2,
mergedData: {
...payload,
...invoke.step1,
...invoke.step2,
},
},
})
)
expect(result).toBeDefined()
expect(result).toEqual(output)
})
it("should evaluate the input object and append the values to the data object using the merge to store on the merge alias from the merge from values and return the result from the handler", async function () {
const payload = { input: "input" }
const output = { test: "test" }
const invoke = {
input: payload,
step1: { step1Data: { test: "test" } },
step2: { step2Data: { test: "test" } },
}
const handler = jest.fn().mockImplementation(async () => output)
const input = {
inputAlias: "payload",
mergeAlias: "mergedData",
mergeFrom: ["input", "step1Data"],
invoke: [
{
from: "payload",
alias: "input",
},
{
from: "step1",
alias: "step1Data",
},
{
from: "step2",
alias: "step2Data",
},
],
}
const result = await pipe(input, handler)({ invoke, payload } as any)
expect(handler).toHaveBeenCalled()
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
data: {
input: payload,
step1Data: invoke.step1,
step2Data: invoke.step2,
mergedData: {
...payload,
...invoke.step1,
},
},
})
)
expect(result).toBeDefined()
expect(result).toEqual(output)
})
it("should execute onComplete function if available but the output result shouldn't change", async function () {
const payload = { input: "input" }
const output = { test: "test" }
const invoke = {
input: payload,
}
const onComplete = jest.fn(async ({ data }) => {
data.__changed = true
return
})
const handler = jest.fn().mockImplementation(async () => output)
const input = {
inputAlias: "payload",
invoke: [
{
from: "payload",
alias: "input",
},
],
onComplete,
}
const result = await pipe(input, handler)({ invoke, payload } as any)
expect(handler).toHaveBeenCalled()
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
data: {
input: payload,
},
})
)
expect(onComplete).toHaveBeenCalled()
expect(result).toEqual(output)
})
})

View File

@@ -0,0 +1,64 @@
import { exportWorkflow } from "../workflow-export"
jest.mock("@medusajs/orchestration", () => {
return {
TransactionHandlerType: {
INVOKE: "invoke",
COMPENSATE: "compensate",
},
TransactionState: {
FAILED: "failed",
REVERTED: "reverted",
},
LocalWorkflow: jest.fn(() => {
return {
run: jest.fn(() => {
return {
getErrors: jest.fn(),
getState: jest.fn(() => "done"),
getContext: jest.fn(() => {
return {
invoke: { result_step: "invoke_test" },
}
}),
}
}),
}
}),
}
})
describe("Export Workflow", function () {
it("should prepare the input data before initializing the transaction", async function () {
let transformedInput
const prepare = jest.fn().mockImplementation(async (data) => {
data.__transformed = true
transformedInput = data
return data
})
const work = exportWorkflow("id" as any, "result_step", prepare)
const wfHandler = work()
const input = {
test: "payload",
}
const { result } = await wfHandler.run({
input,
})
expect(input).toEqual({
test: "payload",
})
expect(transformedInput).toEqual({
test: "payload",
__transformed: true,
})
expect(result).toEqual("invoke_test")
})
})

View File

@@ -0,0 +1 @@
export const emptyHandler: any = () => {}

View File

@@ -0,0 +1,4 @@
export * from "./merge-data"
export * from "./empty-handler"
export * from "./pipe"
export * from "./workflow-export"

View File

@@ -0,0 +1,49 @@
import { PipelineHandler, WorkflowArguments } from "./pipe"
import { isObject } from "@medusajs/utils"
/**
* Pipe utils that merges data from an object into a new object.
* The new object will have a target key with the merged data from the keys if specified.
* @param keys
* @param target
*/
export function mergeData<
T extends Record<string, unknown> = Record<string, unknown>,
TKeys extends keyof T = keyof T,
Target extends "payload" | string = string
>(keys: TKeys[] = [], target?: Target): PipelineHandler {
return async function ({ data }: WorkflowArguments<T>) {
const workingKeys = (keys.length ? keys : Object.keys(data)) as TKeys[]
const value = workingKeys.reduce((acc, key) => {
let targetAcc = { ...(target ? acc[target as string] : acc) }
targetAcc ??= {}
if (Array.isArray(data[key as string])) {
targetAcc[key as string] = data[key as string]
} else if (isObject(data[key as string])) {
targetAcc = {
...targetAcc,
...(data[key as string] as object),
}
} else {
targetAcc[key as string] = data[key as string]
}
if (target) {
acc[target as string] = {
...acc[target as string],
...targetAcc,
}
} else {
acc = targetAcc
}
return acc
}, {})
return {
alias: target,
value: target ? value[target as string] : value,
}
}
}

View File

@@ -0,0 +1,167 @@
import {
DistributedTransaction,
TransactionMetadata,
WorkflowStepHandler,
} from "@medusajs/orchestration"
import { Context, MedusaContainer, SharedContext } from "@medusajs/types"
import { mergeData } from "./merge-data"
export type WorkflowStepMiddlewareReturn = {
alias?: string
value: any
}
export type WorkflowStepMiddlewareInput = {
from: string
alias?: string
}
interface PipelineInput {
/**
* The alias of the input data to store in
*/
inputAlias?: string
/**
* Descriptors to get the data from
*/
invoke?: WorkflowStepMiddlewareInput | WorkflowStepMiddlewareInput[]
compensate?: WorkflowStepMiddlewareInput | WorkflowStepMiddlewareInput[]
onComplete?: (args: WorkflowOnCompleteArguments) => Promise<void>
/**
* Apply the data merging
*/
merge?: boolean
/**
* Store the merged data in a new key, if this is present no need to set merge: true
*/
mergeAlias?: string
/**
* Store the merged data from the chosen aliases, if this is present no need to set merge: true
*/
mergeFrom?: string[]
}
export type WorkflowArguments<T = any> = {
container: MedusaContainer
payload: unknown
data: T
metadata: TransactionMetadata
context: Context | SharedContext
}
export type WorkflowOnCompleteArguments<T = any> = {
container: MedusaContainer
payload: unknown
data: T
metadata: TransactionMetadata
transaction: DistributedTransaction
context: Context | SharedContext
}
export type PipelineHandler<T extends any = undefined> = (
args: WorkflowArguments
) => Promise<
T extends undefined
? WorkflowStepMiddlewareReturn | WorkflowStepMiddlewareReturn[]
: T
>
export function pipe<T>(
input: PipelineInput,
...functions: [...PipelineHandler[], PipelineHandler<T>]
): WorkflowStepHandler {
// Apply the aggregator just before the last handler
if (
(input.merge || input.mergeAlias || input.mergeFrom) &&
functions.length
) {
const handler = functions.pop()!
functions.push(mergeData(input.mergeFrom, input.mergeAlias), handler)
}
return async ({
container,
payload,
invoke,
compensate,
metadata,
transaction,
context,
}) => {
let data = {}
const original = {
invoke: invoke ?? {},
compensate: compensate ?? {},
}
if (input.inputAlias) {
Object.assign(original.invoke, { [input.inputAlias]: payload })
}
const dataKeys = ["invoke", "compensate"]
for (const key of dataKeys) {
if (!input[key]) {
continue
}
if (!Array.isArray(input[key])) {
input[key] = [input[key]]
}
for (const action of input[key]) {
if (action.alias) {
data[action.alias] = original[key][action.from]
} else {
data[action.from] = original[key][action.from]
}
}
}
let finalResult
for (const fn of functions) {
let result = await fn({
container,
payload,
data,
metadata,
context: context as Context,
})
if (Array.isArray(result)) {
for (const action of result) {
if (action?.alias) {
data[action.alias] = action.value
}
}
} else if (
result &&
"alias" in (result as WorkflowStepMiddlewareReturn)
) {
if ((result as WorkflowStepMiddlewareReturn).alias) {
data[(result as WorkflowStepMiddlewareReturn).alias!] = (
result as WorkflowStepMiddlewareReturn
).value
} else {
data = (result as WorkflowStepMiddlewareReturn).value
}
}
finalResult = result
}
if (typeof input.onComplete === "function") {
const dataCopy = JSON.parse(JSON.stringify(data))
await input.onComplete({
container,
payload,
data: dataCopy,
metadata,
transaction,
context: context as Context,
})
}
return finalResult
}
}

View File

@@ -0,0 +1,130 @@
import {
DistributedTransaction,
LocalWorkflow,
TransactionHandlerType,
TransactionState,
TransactionStepError,
} from "@medusajs/orchestration"
import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
import { MedusaModule } from "@medusajs/modules-sdk"
import { EOL } from "os"
import { ulid } from "ulid"
import { SymbolWorkflowWorkflowData } from "../utils/composer"
export type FlowRunOptions<TData = unknown> = {
input?: TData
context?: Context
resultFrom?: string | string[]
throwOnError?: boolean
}
export type WorkflowResult<TResult = unknown> = {
errors: TransactionStepError[]
transaction: DistributedTransaction
result: TResult
}
export const exportWorkflow = <TData = unknown, TResult = unknown>(
workflowId: string,
defaultResult?: string,
dataPreparation?: (data: TData) => Promise<unknown>
) => {
return function <TDataOverride = undefined, TResultOverride = undefined>(
container?: LoadedModule[] | MedusaContainer
): Omit<LocalWorkflow, "run"> & {
run: (
args?: FlowRunOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
) => Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
>
} {
if (!container) {
container = MedusaModule.getLoadedModules().map(
(mod) => Object.values(mod)[0]
)
}
const flow = new LocalWorkflow(workflowId, container)
const originalRun = flow.run.bind(flow)
const newRun = async (
{ input, context, throwOnError, resultFrom }: FlowRunOptions = {
throwOnError: true,
resultFrom: defaultResult,
}
) => {
resultFrom ??= defaultResult
throwOnError ??= true
if (typeof dataPreparation === "function") {
try {
const copyInput = input ? JSON.parse(JSON.stringify(input)) : input
input = await dataPreparation(copyInput as TData)
} catch (err) {
if (throwOnError) {
throw new Error(
`Data preparation failed: ${err.message}${EOL}${err.stack}`
)
}
return {
errors: [err],
}
}
}
const transaction = await originalRun(
context?.transactionId ?? ulid(),
input,
context
)
const errors = transaction.getErrors(TransactionHandlerType.INVOKE)
const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED]
if (failedStatus.includes(transaction.getState()) && throwOnError) {
const errorMessage = errors
?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`)
?.join(`${EOL}`)
throw new Error(errorMessage)
}
let result: any = undefined
if (resultFrom) {
if (Array.isArray(resultFrom)) {
result = resultFrom.map((from) => {
const res = transaction.getContext().invoke?.[from]
return res?.__type === SymbolWorkflowWorkflowData ? res.output : res
})
} else {
const res = transaction.getContext().invoke?.[resultFrom]
result = res?.__type === SymbolWorkflowWorkflowData ? res.output : res
}
}
return {
errors,
transaction,
result,
}
}
flow.run = newRun as any
return flow as unknown as LocalWorkflow & {
run: (
args?: FlowRunOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
) => Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
>
}
}
}

View File

@@ -0,0 +1,3 @@
export * from "./helper"
export * from "./utils/composer"
export * as Composer from "./utils/composer"

View File

@@ -0,0 +1,295 @@
import {
resolveValue,
StepResponse,
SymbolMedusaWorkflowComposerContext,
SymbolWorkflowStep,
SymbolWorkflowStepBind,
SymbolWorkflowStepResponse,
SymbolWorkflowWorkflowData,
} from "./helpers"
import {
CreateWorkflowComposerContext,
StepExecutionContext,
StepFunction,
StepFunctionResult,
WorkflowData,
} from "./type"
import { proxify } from "./helpers/proxy"
/**
* The type of invocation function passed to a step.
*
* @typeParam TInput - The type of the input that the function expects.
* @typeParam TOutput - The type of the output that the function returns.
* @typeParam TCompensateInput - The type of the input that the compensation function expects.
*
* @returns The expected output based on the type parameter `TOutput`.
*/
type InvokeFn<TInput extends object, TOutput, TCompensateInput> = (
/**
* The input of the step.
*/
input: {
[Key in keyof TInput]: TInput[Key]
},
/**
* The step's context.
*/
context: StepExecutionContext
) =>
| void
| StepResponse<
TOutput,
TCompensateInput extends undefined ? TOutput : TCompensateInput
>
| Promise<void | StepResponse<
TOutput,
TCompensateInput extends undefined ? TOutput : TCompensateInput
>>
/**
* The type of compensation function passed to a step.
*
* @typeParam T -
* The type of the argument passed to the compensation function. If not specified, then it will be the same type as the invocation function's output.
*
* @returns There's no expected type to be returned by the compensation function.
*/
type CompensateFn<T> = (
/**
* The argument passed to the compensation function.
*/
input: T | undefined,
/**
* The step's context.
*/
context: StepExecutionContext
) => unknown | Promise<unknown>
interface ApplyStepOptions<
TStepInputs extends {
[K in keyof TInvokeInput]: WorkflowData<TInvokeInput[K]>
},
TInvokeInput extends object,
TInvokeResultOutput,
TInvokeResultCompensateInput
> {
stepName: string
input: TStepInputs
invokeFn: InvokeFn<
TInvokeInput,
TInvokeResultOutput,
TInvokeResultCompensateInput
>
compensateFn?: CompensateFn<TInvokeResultCompensateInput>
}
/**
* @internal
*
* Internal function to create the invoke and compensate handler for a step.
* This is where the inputs and context are passed to the underlying invoke and compensate function.
*
* @param stepName
* @param input
* @param invokeFn
* @param compensateFn
*/
function applyStep<
TInvokeInput extends object,
TStepInput extends {
[K in keyof TInvokeInput]: WorkflowData<TInvokeInput[K]>
},
TInvokeResultOutput,
TInvokeResultCompensateInput
>({
stepName,
input,
invokeFn,
compensateFn,
}: ApplyStepOptions<
TStepInput,
TInvokeInput,
TInvokeResultOutput,
TInvokeResultCompensateInput
>): StepFunctionResult<TInvokeResultOutput> {
return function (this: CreateWorkflowComposerContext) {
if (!this.workflowId) {
throw new Error(
"createStep must be used inside a createWorkflow definition"
)
}
const handler = {
invoke: async (transactionContext) => {
const executionContext: StepExecutionContext = {
container: transactionContext.container,
metadata: transactionContext.metadata,
context: transactionContext.context,
}
const argInput = await resolveValue(input, transactionContext)
const stepResponse: StepResponse<any, any> = await invokeFn.apply(
this,
[argInput, executionContext]
)
const stepResponseJSON =
stepResponse?.__type === SymbolWorkflowStepResponse
? stepResponse.toJSON()
: stepResponse
return {
__type: SymbolWorkflowWorkflowData,
output: stepResponseJSON,
}
},
compensate: compensateFn
? async (transactionContext) => {
const executionContext: StepExecutionContext = {
container: transactionContext.container,
metadata: transactionContext.metadata,
context: transactionContext.context,
}
const stepOutput = transactionContext.invoke[stepName]?.output
const invokeResult =
stepOutput?.__type === SymbolWorkflowStepResponse
? stepOutput.compensateInput &&
JSON.parse(JSON.stringify(stepOutput.compensateInput))
: stepOutput && JSON.parse(JSON.stringify(stepOutput))
const args = [invokeResult, executionContext]
const output = await compensateFn.apply(this, args)
return {
output,
}
}
: undefined,
}
this.flow.addAction(stepName, {
noCompensation: !compensateFn,
})
this.handlers.set(stepName, handler)
const ret = {
__type: SymbolWorkflowStep,
__step__: stepName,
}
return proxify(ret)
}
}
/**
* This function creates a {@link StepFunction} that can be used as a step in a workflow constructed by the {@link createWorkflow} function.
*
* @typeParam TInvokeInput - The type of the expected input parameter to the invocation function.
* @typeParam TInvokeResultOutput - The type of the expected output parameter of the invocation function.
* @typeParam TInvokeResultCompensateInput - The type of the expected input parameter to the compensation function.
*
* @returns A step function to be used in a workflow.
*
* @example
* import {
* createStep,
* StepResponse,
* StepExecutionContext,
* WorkflowData
* } from "@medusajs/workflows-sdk"
*
* interface CreateProductInput {
* title: string
* }
*
* export const createProductStep = createStep(
* "createProductStep",
* async function (
* input: CreateProductInput,
* context
* ) {
* const productService = context.container.resolve(
* "productService"
* )
* const product = await productService.create(input)
* return new StepResponse({
* product
* }, {
* product_id: product.id
* })
* },
* async function (
* input,
* context
* ) {
* const productService = context.container.resolve(
* "productService"
* )
* await productService.delete(input.product_id)
* }
* )
*/
export function createStep<
TInvokeInput extends object,
TInvokeResultOutput,
TInvokeResultCompensateInput
>(
/**
* The name of the step.
*/
name: string,
/**
* An invocation function that will be executed when the workflow is executed. The function must return an instance of {@link StepResponse}. The constructor of {@link StepResponse}
* accepts the output of the step as a first argument, and optionally as a second argument the data to be passed to the compensation function as a parameter.
*/
invokeFn: InvokeFn<
TInvokeInput,
TInvokeResultOutput,
TInvokeResultCompensateInput
>,
/**
* A compensation function that's executed if an error occurs in the workflow. It's used to roll-back actions when errors occur.
* It accepts as a parameter the second argument passed to the constructor of the {@link StepResponse} instance returned by the invocation function. If the
* invocation function doesn't pass the second argument to `StepResponse` constructor, the compensation function receives the first argument
* passed to the `StepResponse` constructor instead.
*/
compensateFn?: CompensateFn<TInvokeResultCompensateInput>
): StepFunction<TInvokeInput, TInvokeResultOutput> {
const stepName = name ?? invokeFn.name
const returnFn = function (input: {
[K in keyof TInvokeInput]: WorkflowData<TInvokeInput[K]>
}): WorkflowData<TInvokeResultOutput> {
if (!global[SymbolMedusaWorkflowComposerContext]) {
throw new Error(
"createStep must be used inside a createWorkflow definition"
)
}
const stepBinder = (
global[
SymbolMedusaWorkflowComposerContext
] as CreateWorkflowComposerContext
).stepBinder
return stepBinder<TInvokeResultOutput>(
applyStep<
TInvokeInput,
{ [K in keyof TInvokeInput]: WorkflowData<TInvokeInput[K]> },
TInvokeResultOutput,
TInvokeResultCompensateInput
>({
stepName,
input,
invokeFn,
compensateFn,
})
)
}
returnFn.__type = SymbolWorkflowStepBind
returnFn.__step__ = stepName
return returnFn as unknown as StepFunction<TInvokeInput, TInvokeResultOutput>
}

View File

@@ -0,0 +1,263 @@
import {
LocalWorkflow,
WorkflowHandler,
WorkflowManager,
} from "@medusajs/orchestration"
import { LoadedModule, MedusaContainer } from "@medusajs/types"
import { exportWorkflow, FlowRunOptions, WorkflowResult } from "../../helper"
import {
CreateWorkflowComposerContext,
WorkflowData,
WorkflowDataProperties,
} from "./type"
import {
resolveValue,
SymbolInputReference,
SymbolMedusaWorkflowComposerContext,
SymbolWorkflowStep,
} from "./helpers"
import { proxify } from "./helpers/proxy"
global[SymbolMedusaWorkflowComposerContext] = null
/**
* An exported workflow, which is the type of a workflow constructed by the {@link createWorkflow} function. The exported workflow can be invoked to create
* an executable workflow, optionally within a specified container. So, to execute the workflow, you must invoke the exported workflow, then run the
* `run` method of the exported workflow.
*
* @example
* To execute a workflow:
*
* ```ts
* myWorkflow()
* .run({
* input: {
* name: "John"
* }
* })
* .then(({ result }) => {
* console.log(result)
* })
* ```
*
* To specify the container of the workflow, you can pass it as an argument to the call of the exported workflow. This is necessary when executing the workflow
* within a Medusa resource such as an API Route or a Subscriber.
*
* For example:
*
* ```ts
* import type {
* MedusaRequest,
* MedusaResponse
* } from "@medusajs/medusa";
* import myWorkflow from "../../../workflows/hello-world";
*
* export async function GET(
* req: MedusaRequest,
* res: MedusaResponse
* ) {
* const { result } = await myWorkflow(req.scope)
* .run({
* input: {
* name: req.query.name as string
* }
* })
*
* res.send(result)
* }
* ```
*/
type ReturnWorkflow<TData, TResult, THooks extends Record<string, Function>> = {
<TDataOverride = undefined, TResultOverride = undefined>(
container?: LoadedModule[] | MedusaContainer
): Omit<LocalWorkflow, "run"> & {
run: (
args?: FlowRunOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
) => Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
>
}
} & THooks
/**
* This function creates a workflow with the provided name and a constructor function.
* The constructor function builds the workflow from steps created by the {@link createStep} function.
* The returned workflow is an exported workflow of type {@link ReturnWorkflow}, meaning it's not executed right away. To execute it,
* invoke the exported workflow, then run its `run` method.
*
* @typeParam TData - The type of the input passed to the composer function.
* @typeParam TResult - The type of the output returned by the composer function.
* @typeParam THooks - The type of hooks defined in the workflow.
*
* @returns The created workflow. You can later execute the workflow by invoking it, then using its `run` method.
*
* @example
* import { createWorkflow } from "@medusajs/workflows-sdk"
* import { MedusaRequest, MedusaResponse, Product } from "@medusajs/medusa"
* import {
* createProductStep,
* getProductStep,
* createPricesStep
* } from "./steps"
*
* interface WorkflowInput {
* title: string
* }
*
* const myWorkflow = createWorkflow<
* WorkflowInput,
* Product
* >("my-workflow", (input) => {
* // Everything here will be executed and resolved later
* // during the execution. Including the data access.
*
* const product = createProductStep(input)
* const prices = createPricesStep(product)
* return getProductStep(product.id)
* }
* )
*
* export async function GET(
* req: MedusaRequest,
* res: MedusaResponse
* ) {
* const { result: product } = await myWorkflow(req.scope)
* .run({
* input: {
* title: "Shirt"
* }
* })
*
* res.json({
* product
* })
* }
*/
export function createWorkflow<
TData,
TResult,
THooks extends Record<string, Function> = Record<string, Function>
>(
/**
* The name of the workflow.
*/
name: string,
/**
* The constructor function that is executed when the `run` method in {@link ReturnWorkflow} is used.
* The function can't be an arrow function or an asynchronus function. It also can't directly manipulate data.
* You'll have to use the {@link transform} function if you need to directly manipulate data.
*/
composer: (input: WorkflowData<TData>) =>
| void
| WorkflowData<TResult>
| {
[K in keyof TResult]:
| WorkflowData<TResult[K]>
| WorkflowDataProperties<TResult[K]>
}
): ReturnWorkflow<TData, TResult, THooks> {
const handlers: WorkflowHandler = new Map()
if (WorkflowManager.getWorkflow(name)) {
WorkflowManager.unregister(name)
}
WorkflowManager.register(name, undefined, handlers)
const context: CreateWorkflowComposerContext = {
workflowId: name,
flow: WorkflowManager.getTransactionDefinition(name),
handlers,
hooks_: [],
hooksCallback_: {},
hookBinder: (name, fn) => {
context.hooks_.push(name)
return fn(context)
},
stepBinder: (fn) => {
return fn.bind(context)()
},
parallelizeBinder: (fn) => {
return fn.bind(context)()
},
}
global[SymbolMedusaWorkflowComposerContext] = context
const inputPlaceHolder = proxify<WorkflowData>({
__type: SymbolInputReference,
__step__: "",
})
const returnedStep = composer.apply(context, [inputPlaceHolder])
delete global[SymbolMedusaWorkflowComposerContext]
WorkflowManager.update(name, context.flow, handlers)
const workflow = exportWorkflow<TData, TResult>(name)
const mainFlow = <TDataOverride = undefined, TResultOverride = undefined>(
container?: LoadedModule[] | MedusaContainer
) => {
const workflow_ = workflow<TDataOverride, TResultOverride>(container)
const originalRun = workflow_.run
workflow_.run = (async (
args?: FlowRunOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
): Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
> => {
args ??= {}
args.resultFrom ??=
returnedStep?.__type === SymbolWorkflowStep
? returnedStep.__step__
: undefined
// Forwards the input to the ref object on composer.apply
const workflowResult = (await originalRun(
args
)) as unknown as WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
workflowResult.result = await resolveValue(
workflowResult.result || returnedStep,
workflowResult.transaction.getContext()
)
return workflowResult
}) as any
return workflow_
}
let shouldRegisterHookHandler = true
for (const hook of context.hooks_) {
mainFlow[hook] = (fn) => {
context.hooksCallback_[hook] ??= []
if (!shouldRegisterHookHandler) {
console.warn(
`A hook handler has already been registered for the ${hook} hook. The current handler registration will be skipped.`
)
return
}
context.hooksCallback_[hook].push(fn)
shouldRegisterHookHandler = false
}
}
return mainFlow as ReturnWorkflow<TData, TResult, THooks>
}

View File

@@ -0,0 +1,3 @@
export * from "./step-response"
export * from "./symbol"
export * from "./resolve-value"

View File

@@ -0,0 +1,28 @@
import { transform } from "../transform"
import { WorkflowData, WorkflowTransactionContext } from "../type"
import { SymbolInputReference, SymbolWorkflowStepTransformer } from "./symbol"
import { resolveValue } from "./resolve-value"
export function proxify<T>(obj: WorkflowData<any>): T {
return new Proxy(obj, {
get(target: any, prop: string | symbol): any {
if (prop in target) {
return target[prop]
}
return transform(target[prop], async function (input, context) {
const { invoke } = context as WorkflowTransactionContext
let output =
target.__type === SymbolInputReference ||
target.__type === SymbolWorkflowStepTransformer
? target
: invoke?.[obj.__step__]?.output
output = await resolveValue(output, context)
output = output?.[prop]
return output && JSON.parse(JSON.stringify(output))
})
},
}) as unknown as T
}

View File

@@ -0,0 +1,74 @@
import { promiseAll } from "@medusajs/utils"
import {
SymbolInputReference,
SymbolWorkflowHook,
SymbolWorkflowStep,
SymbolWorkflowStepResponse,
SymbolWorkflowStepTransformer,
} from "./symbol"
async function resolveProperty(property, transactionContext) {
const { invoke: invokeRes } = transactionContext
if (property?.__type === SymbolInputReference) {
return transactionContext.payload
} else if (property?.__type === SymbolWorkflowStepTransformer) {
return await property.__resolver(transactionContext)
} else if (property?.__type === SymbolWorkflowHook) {
return await property.__value(transactionContext)
} else if (property?.__type === SymbolWorkflowStep) {
const output = invokeRes[property.__step__]?.output
if (output?.__type === SymbolWorkflowStepResponse) {
return output.output
}
return output
} else if (property?.__type === SymbolWorkflowStepResponse) {
return property.output
} else {
return property
}
}
/**
* @internal
*/
export async function resolveValue(input, transactionContext) {
const unwrapInput = async (
inputTOUnwrap: Record<string, unknown>,
parentRef: any
) => {
if (inputTOUnwrap == null) {
return inputTOUnwrap
}
if (Array.isArray(inputTOUnwrap)) {
return await promiseAll(
inputTOUnwrap.map((i) => unwrapInput(i, transactionContext))
)
}
if (typeof inputTOUnwrap !== "object") {
return inputTOUnwrap
}
for (const key of Object.keys(inputTOUnwrap)) {
parentRef[key] = await resolveProperty(
inputTOUnwrap[key],
transactionContext
)
if (typeof parentRef[key] === "object") {
await unwrapInput(parentRef[key], parentRef[key])
}
}
return parentRef
}
const result = input?.__type
? await resolveProperty(input, transactionContext)
: await unwrapInput(input, {})
return result && JSON.parse(JSON.stringify(result))
}

View File

@@ -0,0 +1,69 @@
import { SymbolWorkflowStepResponse } from "./symbol"
/**
* This class is used to create the response returned by a step. A step return its data by returning an instance of `StepResponse`.
*
* @typeParam TOutput - The type of the output of the step.
* @typeParam TCompensateInput -
* The type of the compensation input. If the step doesn't specify any compensation input, then the type of `TCompensateInput` is the same
* as that of `TOutput`.
*/
export class StepResponse<TOutput, TCompensateInput = TOutput> {
readonly #__type = SymbolWorkflowStepResponse
readonly #output: TOutput
readonly #compensateInput?: TCompensateInput
/**
* The constructor of the StepResponse
*
* @typeParam TOutput - The type of the output of the step.
* @typeParam TCompensateInput -
* The type of the compensation input. If the step doesn't specify any compensation input, then the type of `TCompensateInput` is the same
* as that of `TOutput`.
*/
constructor(
/**
* The output of the step.
*/
output: TOutput,
/**
* The input to be passed as a parameter to the step's compensation function. If not provided, the `output` will be provided instead.
*/
compensateInput?: TCompensateInput
) {
this.#output = output
this.#compensateInput = (compensateInput ?? output) as TCompensateInput
}
/**
* @internal
*/
get __type() {
return this.#__type
}
/**
* @internal
*/
get output(): TOutput {
return this.#output
}
/**
* @internal
*/
get compensateInput(): TCompensateInput {
return this.#compensateInput as TCompensateInput
}
/**
* @internal
*/
toJSON() {
return {
__type: this.#__type,
output: this.#output,
compensateInput: this.#compensateInput,
}
}
}

View File

@@ -0,0 +1,12 @@
export const SymbolMedusaWorkflowComposerContext = Symbol.for(
"MedusaWorkflowComposerContext"
)
export const SymbolInputReference = Symbol.for("WorkflowInputReference")
export const SymbolWorkflowStep = Symbol.for("WorkflowStep")
export const SymbolWorkflowHook = Symbol.for("WorkflowHook")
export const SymbolWorkflowWorkflowData = Symbol.for("WorkflowWorkflowData")
export const SymbolWorkflowStepResponse = Symbol.for("WorkflowStepResponse")
export const SymbolWorkflowStepBind = Symbol.for("WorkflowStepBind")
export const SymbolWorkflowStepTransformer = Symbol.for(
"WorkflowStepTransformer"
)

View File

@@ -0,0 +1,136 @@
import {
resolveValue,
SymbolMedusaWorkflowComposerContext,
SymbolWorkflowHook,
} from "./helpers"
import {
CreateWorkflowComposerContext,
StepExecutionContext,
WorkflowData,
} from "./type"
/**
*
* @ignore
*
* This function allows you to add hooks in your workflow that provide access to some data. Then, consumers of that workflow can add a handler function that performs
* an action with the provided data or modify it.
*
* For example, in a "create product" workflow, you may add a hook after the product is created, providing access to the created product.
* Then, developers using that workflow can hook into that point to access the product, modify its attributes, then return the updated product.
*
* @typeParam TOutput - The expected output of the hook's handler function.
* @returns The output of handler functions of this hook. If there are no handler functions, the output is `undefined`.
*
* @example
* import {
* createWorkflow,
* StepExecutionContext,
* hook,
* transform
* } from "@medusajs/workflows-sdk"
* import {
* createProductStep,
* getProductStep,
* createPricesStep
* } from "./steps"
* import {
* MedusaRequest,
* MedusaResponse,
* Product, ProductService
* } from "@medusajs/medusa"
*
* interface WorkflowInput {
* title: string
* }
*
* const myWorkflow = createWorkflow<
* WorkflowInput,
* Product
* >("my-workflow",
* function (input) {
* const product = createProductStep(input)
*
* const hookProduct = hook<Product>("createdProductHook", product)
*
* const newProduct = transform({
* product,
* hookProduct
* }, (input) => {
* return input.hookProduct || input.product
* })
*
* const prices = createPricesStep(newProduct)
*
* return getProductStep(product.id)
* }
* )
*
* myWorkflow.createdProductHook(
* async (product, context: StepExecutionContext) => {
* const productService: ProductService = context.container.resolve("productService")
*
* const updatedProduct = await productService.update(product.id, {
* description: "a cool shirt"
* })
*
* return updatedProduct
* })
*
* export async function POST(
* req: MedusaRequest,
* res: MedusaResponse
* ) {
* const { result: product } = await myWorkflow(req.scope)
* .run({
* input: {
* title: req.body.title
* }
* })
*
* res.json({
* product
* })
* }
*/
export function hook<TOutput>(
/**
* The name of the hook. This will be used by the consumer to add a handler method for the hook.
*/
name: string,
/**
* The data that a handler function receives as a parameter.
*/
value: any
): WorkflowData<TOutput> {
const hookBinder = (
global[SymbolMedusaWorkflowComposerContext] as CreateWorkflowComposerContext
).hookBinder
return hookBinder(name, function (context) {
return {
__value: async function (transactionContext) {
const executionContext: StepExecutionContext = {
container: transactionContext.container,
metadata: transactionContext.metadata,
context: transactionContext.context,
}
const allValues = await resolveValue(value, transactionContext)
const stepValue = allValues
? JSON.parse(JSON.stringify(allValues))
: allValues
let finalResult
const functions = context.hooksCallback_[name]
for (let i = 0; i < functions.length; i++) {
const fn = functions[i]
const arg = i === 0 ? stepValue : finalResult
finalResult = await fn.apply(fn, [arg, executionContext])
}
return finalResult
},
__type: SymbolWorkflowHook,
}
})
}

View File

@@ -0,0 +1,9 @@
export * from "./create-step"
export * from "./create-workflow"
export * from "./hook"
export * from "./parallelize"
export * from "./helpers/resolve-value"
export * from "./helpers/symbol"
export * from "./helpers/step-response"
export * from "./transform"
export * from "./type"

View File

@@ -0,0 +1,70 @@
import { CreateWorkflowComposerContext, WorkflowData } from "./type"
import { SymbolMedusaWorkflowComposerContext } from "./helpers"
/**
* This function is used to run multiple steps in parallel. The result of each step will be returned as part of the result array.
*
* @typeParam TResult - The type of the expected result.
*
* @returns The step results. The results are ordered in the array by the order they're passed in the function's parameter.
*
* @example
* ```ts
* import {
* createWorkflow,
* parallelize
* } from "@medusajs/workflows-sdk"
* import {
* createProductStep,
* getProductStep,
* createPricesStep,
* attachProductToSalesChannelStep
* } from "./steps"
*
* interface WorkflowInput {
* title: string
* }
*
* const myWorkflow = createWorkflow<
* WorkflowInput,
* Product
* >("my-workflow", (input) => {
* const product = createProductStep(input)
*
* const [prices, productSalesChannel] = parallelize(
* createPricesStep(product),
* attachProductToSalesChannelStep(product)
* )
*
* const id = product.id
* return getProductStep(product.id)
* }
* )
*/
export function parallelize<TResult extends WorkflowData[]>(
...steps: TResult
): TResult {
if (!global[SymbolMedusaWorkflowComposerContext]) {
throw new Error(
"parallelize must be used inside a createWorkflow definition"
)
}
const parallelizeBinder = (
global[SymbolMedusaWorkflowComposerContext] as CreateWorkflowComposerContext
).parallelizeBinder
const resultSteps = steps.map((step) => step)
return parallelizeBinder<TResult>(function (
this: CreateWorkflowComposerContext
) {
const stepOntoMerge = steps.shift()!
this.flow.mergeActions(
stepOntoMerge.__step__,
...steps.map((step) => step.__step__)
)
return resultSteps as unknown as TResult
})
}

View File

@@ -0,0 +1,193 @@
import { resolveValue, SymbolWorkflowStepTransformer } from "./helpers"
import { StepExecutionContext, WorkflowData } from "./type"
import { proxify } from "./helpers/proxy"
type Func1<T extends object | WorkflowData, U> = (
input: T extends WorkflowData<infer U>
? U
: T extends object
? { [K in keyof T]: T[K] extends WorkflowData<infer U> ? U : T[K] }
: {},
context: StepExecutionContext
) => U | Promise<U>
type Func<T, U> = (input: T, context: StepExecutionContext) => U | Promise<U>
/**
*
* This function transforms the output of other utility functions.
*
* For example, if you're using the value(s) of some step(s) as an input to a later step. As you can't directly manipulate data in the workflow constructor function passed to {@link createWorkflow},
* the `transform` function provides access to the runtime value of the step(s) output so that you can manipulate them.
*
* Another example is if you're using the runtime value of some step(s) as the output of a workflow.
*
* If you're also retrieving the output of a hook and want to check if its value is set, you must use a workflow to get the runtime value of that hook.
*
* @returns There's no expected value to be returned by the `transform` function.
*
* @example
* import {
* createWorkflow,
* transform
* } from "@medusajs/workflows-sdk"
* import { step1, step2 } from "./steps"
*
* type WorkflowInput = {
* name: string
* }
*
* type WorkflowOutput = {
* message: string
* }
*
* const myWorkflow = createWorkflow<
* WorkflowInput,
* WorkflowOutput
* >
* ("hello-world", (input) => {
* const str1 = step1(input)
* const str2 = step2(input)
*
* return transform({
* str1,
* str2
* }, (input) => ({
* message: `${input.str1}${input.str2}`
* }))
* })
*/
// prettier-ignore
// eslint-disable-next-line max-len
export function transform<T extends object | WorkflowData, RFinal>(
/**
* The output(s) of other step functions.
*/
values: T,
/**
* The transform function used to perform action on the runtime values of the provided `values`.
*/
...func:
| [Func1<T, RFinal>]
): WorkflowData<RFinal>
/**
* @internal
*/
// prettier-ignore
// eslint-disable-next-line max-len
export function transform<T extends object | WorkflowData, RA, RFinal>(
values: T,
...func:
| [Func1<T, RFinal>]
| [Func1<T, RA>, Func<RA, RFinal>]
): WorkflowData<RFinal>
/**
* @internal
*/
// prettier-ignore
// eslint-disable-next-line max-len
export function transform<T extends object | WorkflowData, RA, RB, RFinal>(
values: T,
...func:
| [Func1<T, RFinal>]
| [Func1<T, RA>, Func<RA, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RFinal>]
): WorkflowData<RFinal>
/**
* @internal
*/
// prettier-ignore
// eslint-disable-next-line max-len
export function transform<T extends object | WorkflowData, RA, RB, RC, RFinal>(
values: T,
...func:
| [Func1<T, RFinal>]
| [Func1<T, RA>, Func<RA, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RFinal>]
): WorkflowData<RFinal>
/**
* @internal
*/
// prettier-ignore
// eslint-disable-next-line max-len
export function transform<T extends object | WorkflowData, RA, RB, RC, RD, RFinal>(
values: T,
...func:
| [Func1<T, RFinal>]
| [Func1<T, RA>, Func<RA, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RD>, Func<RD, RFinal>]
): WorkflowData<RFinal>
/**
* @internal
*/
// prettier-ignore
// eslint-disable-next-line max-len
export function transform<T extends object | WorkflowData, RA, RB, RC, RD, RE, RFinal>(
values: T,
...func:
| [Func1<T, RFinal>]
| [Func1<T, RA>, Func<RA, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RD>, Func<RD, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RD>, Func<RD, RE>, Func<RE, RFinal>]
): WorkflowData<RFinal>
/**
* @internal
*/
// prettier-ignore
// eslint-disable-next-line max-len
export function transform<T extends object | WorkflowData, RA, RB, RC, RD, RE, RF, RFinal>(
values: T,
...func:
| [Func1<T, RFinal>]
| [Func1<T, RA>, Func<RA, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RD>, Func<RD, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RD>, Func<RD, RE>, Func<RE, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RD>, Func<RD, RE>, Func<RE, RF>, Func<RF, RFinal>]
): WorkflowData<RFinal>
export function transform(
values: any | any[],
...functions: Function[]
): unknown {
const ret = {
__type: SymbolWorkflowStepTransformer,
__resolver: undefined,
}
const returnFn = async function (transactionContext): Promise<any> {
const allValues = await resolveValue(values, transactionContext)
const stepValue = allValues
? JSON.parse(JSON.stringify(allValues))
: allValues
let finalResult
for (let i = 0; i < functions.length; i++) {
const fn = functions[i]
const arg = i === 0 ? stepValue : finalResult
finalResult = await fn.apply(fn, [arg, transactionContext])
}
return finalResult
}
const proxyfiedRet = proxify<WorkflowData & { __resolver: any }>(
ret as unknown as WorkflowData
)
proxyfiedRet.__resolver = returnFn as any
return proxyfiedRet
}

View File

@@ -0,0 +1,88 @@
import {
OrchestratorBuilder,
TransactionContext as OriginalWorkflowTransactionContext,
TransactionPayload,
WorkflowHandler,
} from "@medusajs/orchestration"
import { Context, MedusaContainer } from "@medusajs/types"
export type StepFunctionResult<TOutput extends unknown | unknown[] = unknown> =
(this: CreateWorkflowComposerContext) => TOutput extends []
? [
...WorkflowData<{
[K in keyof TOutput]: TOutput[number][K]
}>[]
]
: WorkflowData<{ [K in keyof TOutput]: TOutput[K] }>
/**
* A step function to be used in a workflow.
*
* @typeParam TInput - The type of the input of the step.
* @typeParam TOutput - The type of the output of the step.
*/
export type StepFunction<TInput extends object = object, TOutput = unknown> = {
(input: { [K in keyof TInput]: WorkflowData<TInput[K]> }): WorkflowData<{
[K in keyof TOutput]: TOutput[K]
}>
} & WorkflowDataProperties<{
[K in keyof TOutput]: TOutput[K]
}>
export type WorkflowDataProperties<T = unknown> = {
__type: Symbol
__step__: string
}
/**
* This type is used to encapsulate the input or output type of all utils.
*
* @typeParam T - The type of a step's input or result.
*/
export type WorkflowData<T = unknown> = (T extends object
? {
[Key in keyof T]: WorkflowData<T[Key]>
}
: WorkflowDataProperties<T>) &
WorkflowDataProperties<T>
export type CreateWorkflowComposerContext = {
hooks_: string[]
hooksCallback_: Record<string, Function[]>
workflowId: string
flow: OrchestratorBuilder
handlers: WorkflowHandler
stepBinder: <TOutput = unknown>(
fn: StepFunctionResult
) => WorkflowData<TOutput>
hookBinder: <TOutput = unknown>(
name: string,
fn: Function
) => WorkflowData<TOutput>
parallelizeBinder: <TOutput extends WorkflowData[] = WorkflowData[]>(
fn: (this: CreateWorkflowComposerContext) => TOutput
) => TOutput
}
/**
* The step's context.
*/
export interface StepExecutionContext {
/**
* The container used to access resources, such as services, in the step.
*/
container: MedusaContainer
/**
* Metadata passed in the input.
*/
metadata: TransactionPayload["metadata"]
/**
* {@inheritDoc Context}
*/
context: Context
}
export type WorkflowTransactionContext = StepExecutionContext &
OriginalWorkflowTransactionContext & {
invoke: { [key: string]: { output: any } }
}