feat(workflows): Workflow DX (#5607)

This commit is contained in:
Carlos R. L. Rodrigues
2023-11-22 17:23:39 +00:00
committed by GitHub
parent 2850e9a772
commit 9f9db39698
22 changed files with 2768 additions and 16 deletions

View File

@@ -0,0 +1,8 @@
---
"@medusajs/orchestration": minor
"@medusajs/workflows": minor
"@medusajs/link-modules": patch
"@medusajs/medusa": patch
---
Workflows composer api

View File

@@ -21,6 +21,8 @@ packages/*
!packages/cache-inmemory !packages/cache-inmemory
!packages/create-medusa-app !packages/create-medusa-app
!packages/product !packages/product
!packages/orchestration
!packages/workflows
**/models/* **/models/*

View File

@@ -90,14 +90,14 @@ module.exports = {
"./packages/event-bus-redis/tsconfig.spec.json", "./packages/event-bus-redis/tsconfig.spec.json",
"./packages/medusa-plugin-meilisearch/tsconfig.spec.json", "./packages/medusa-plugin-meilisearch/tsconfig.spec.json",
"./packages/medusa-plugin-algolia/tsconfig.spec.json", "./packages/medusa-plugin-algolia/tsconfig.spec.json",
"./packages/admin-ui/tsconfig.json",
"./packages/inventory/tsconfig.spec.json", "./packages/inventory/tsconfig.spec.json",
"./packages/stock-location/tsconfig.spec.json", "./packages/stock-location/tsconfig.spec.json",
"./packages/cache-redis/tsconfig.spec.json", "./packages/cache-redis/tsconfig.spec.json",
"./packages/cache-inmemory/tsconfig.spec.json", "./packages/cache-inmemory/tsconfig.spec.json",
"./packages/admin-ui/tsconfig.json",
"./packages/create-medusa-app/tsconfig.json", "./packages/create-medusa-app/tsconfig.json",
"./packages/product/tsconfig.json", "./packages/product/tsconfig.json",
"./packages/orchestration/tsconfig.json",
"./packages/workflows/tsconfig.spec.json",
], ],
}, },
rules: { rules: {

File diff suppressed because it is too large Load Diff

View File

@@ -5,7 +5,7 @@
"license": "MIT", "license": "MIT",
"private": true, "private": true,
"scripts": { "scripts": {
"test:integration": "node --expose-gc ./../../node_modules/.bin/jest --silent=false --runInBand --bail --detectOpenHandles --logHeapUsage --clearMocks --no-compilation-cache --forceExit", "test:integration": "node --expose-gc ./../../node_modules/.bin/jest --silent=false --runInBand --bail --detectOpenHandles --logHeapUsage --clearMocks --forceExit",
"build": "babel src -d dist --extensions \".ts,.js\"" "build": "babel src -d dist --extensions \".ts,.js\""
}, },
"dependencies": { "dependencies": {

View File

@@ -2,3 +2,4 @@ export * from "./initialize"
export * from "./types" export * from "./types"
export * from "./loaders" export * from "./loaders"
export * from "./services" export * from "./services"
export * from "./utils/compose-link-name"

View File

@@ -96,6 +96,7 @@ export default async ({
) )
registerCoreRouters(pluginDetails, container) registerCoreRouters(pluginDetails, container)
await registerSubscribers(pluginDetails, container, activityId) await registerSubscribers(pluginDetails, container, activityId)
await registerWorkflows(pluginDetails)
}) })
) )
@@ -634,6 +635,15 @@ function registerRepositories(
}) })
} }
/**
* import files from the workflows directory to run the registration of the wofklows
* @param pluginDetails
*/
async function registerWorkflows(pluginDetails: PluginDetails): Promise<void> {
const files = glob.sync(`${pluginDetails.resolve}/workflows/*.js`, {})
await Promise.all(files.map(async (file) => import(file)))
}
/** /**
* Registers a plugin's models at the right location in our container. Models * Registers a plugin's models at the right location in our container. Models
* must inherit from BaseModel. Models are registered directly in the container. * must inherit from BaseModel. Models are registered directly in the container.

View File

@@ -70,7 +70,7 @@ export class WorkflowManager {
static register( static register(
workflowId: string, workflowId: string,
flow: TransactionStepsDefinition | OrchestratorBuilder, flow: TransactionStepsDefinition | OrchestratorBuilder | undefined,
handlers: WorkflowHandler, handlers: WorkflowHandler,
requiredModules?: Set<string>, requiredModules?: Set<string>,
optionalModules?: Set<string> optionalModules?: Set<string>
@@ -78,19 +78,22 @@ export class WorkflowManager {
const finalFlow = flow instanceof OrchestratorBuilder ? flow.build() : flow const finalFlow = flow instanceof OrchestratorBuilder ? flow.build() : flow
if (WorkflowManager.workflows.has(workflowId)) { if (WorkflowManager.workflows.has(workflowId)) {
const areStepsEqual = const areStepsEqual = finalFlow
JSON.stringify(finalFlow) === ? JSON.stringify(finalFlow) ===
JSON.stringify(WorkflowManager.workflows.get(workflowId)!.flow_) JSON.stringify(WorkflowManager.workflows.get(workflowId)!.flow_)
: true
if (!areStepsEqual) { if (!areStepsEqual) {
throw new Error(`Workflow with id "${workflowId}" and step definition already exists.`) throw new Error(
`Workflow with id "${workflowId}" and step definition already exists.`
)
} }
} }
WorkflowManager.workflows.set(workflowId, { WorkflowManager.workflows.set(workflowId, {
id: workflowId, id: workflowId,
flow_: finalFlow, flow_: finalFlow!,
orchestrator: new TransactionOrchestrator(workflowId, finalFlow), orchestrator: new TransactionOrchestrator(workflowId, finalFlow ?? {}),
handler: WorkflowManager.buildHandlers(handlers), handler: WorkflowManager.buildHandlers(handlers),
handlers_: handlers, handlers_: handlers,
requiredModules, requiredModules,

View File

@@ -11,6 +11,7 @@ import { MedusaModule } from "@medusajs/modules-sdk"
import { EOL } from "os" import { EOL } from "os"
import { ulid } from "ulid" import { ulid } from "ulid"
import { Workflows } from "../definitions" import { Workflows } from "../definitions"
import { SymbolWorkflowWorkflowData } from "../utils/composer"
export type FlowRunOptions<TData = unknown> = { export type FlowRunOptions<TData = unknown> = {
input?: TData input?: TData
@@ -26,7 +27,7 @@ export type WorkflowResult<TResult = unknown> = {
} }
export const exportWorkflow = <TData = unknown, TResult = unknown>( export const exportWorkflow = <TData = unknown, TResult = unknown>(
workflowId: Workflows, workflowId: Workflows | string,
defaultResult?: string, defaultResult?: string,
dataPreparation?: (data: TData) => Promise<unknown> dataPreparation?: (data: TData) => Promise<unknown>
) => { ) => {
@@ -63,7 +64,7 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
if (typeof dataPreparation === "function") { if (typeof dataPreparation === "function") {
try { try {
const copyInput = JSON.parse(JSON.stringify(input)) const copyInput = input ? JSON.parse(JSON.stringify(input)) : input
input = await dataPreparation(copyInput as TData) input = await dataPreparation(copyInput as TData)
} catch (err) { } catch (err) {
if (throwOnError) { if (throwOnError) {
@@ -97,11 +98,13 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
if (resultFrom) { if (resultFrom) {
if (Array.isArray(resultFrom)) { if (Array.isArray(resultFrom)) {
result = resultFrom.map( result = resultFrom.map((from) => {
(from) => transaction.getContext().invoke?.[from] const res = transaction.getContext().invoke?.[from]
) return res?.__type === SymbolWorkflowWorkflowData ? res.output : res
})
} else { } else {
result = transaction.getContext().invoke?.[resultFrom] const res = transaction.getContext().invoke?.[resultFrom]
result = res?.__type === SymbolWorkflowWorkflowData ? res.output : res
} }
} }

View File

@@ -2,3 +2,5 @@ export * from "./definition"
export * from "./definitions" export * from "./definitions"
export * as Handlers from "./handlers" export * as Handlers from "./handlers"
export * from "./helper" export * from "./helper"
export * from "./utils/composer"
export * as Composer from "./utils/composer"

View File

@@ -0,0 +1,242 @@
import {
resolveValue,
StepResponse,
SymbolMedusaWorkflowComposerContext,
SymbolWorkflowStep,
SymbolWorkflowStepBind,
SymbolWorkflowStepResponse,
SymbolWorkflowWorkflowData,
} from "./helpers"
import {
CreateWorkflowComposerContext,
StepExecutionContext,
StepFunction,
StepFunctionResult,
WorkflowData,
} from "./type"
import { proxify } from "./helpers/proxy"
type InvokeFn<TInput extends object, TOutput, TCompensateInput> = (
input: {
[Key in keyof TInput]: TInput[Key]
},
context: StepExecutionContext
) =>
| void
| StepResponse<
TOutput,
TCompensateInput extends undefined ? TOutput : TCompensateInput
>
| Promise<void | StepResponse<
TOutput,
TCompensateInput extends undefined ? TOutput : TCompensateInput
>>
type CompensateFn<T> = (
input: T,
context: StepExecutionContext
) => unknown | Promise<unknown>
interface ApplyStepOptions<
TStepInputs extends {
[K in keyof TInvokeInput]: WorkflowData<TInvokeInput[K]>
},
TInvokeInput extends object,
TInvokeResultOutput,
TInvokeResultCompensateInput
> {
stepName: string
input: TStepInputs
invokeFn: InvokeFn<
TInvokeInput,
TInvokeResultOutput,
TInvokeResultCompensateInput
>
compensateFn?: CompensateFn<TInvokeResultCompensateInput>
}
/**
* Internal function to create the invoke and compensate handler for a step.
* This is where the inputs and context are passed to the underlying invoke and compensate function.
*
* @param stepName
* @param input
* @param invokeFn
* @param compensateFn
*/
function applyStep<
TInvokeInput extends object,
TStepInput extends {
[K in keyof TInvokeInput]: WorkflowData<TInvokeInput[K]>
},
TInvokeResultOutput,
TInvokeResultCompensateInput
>({
stepName,
input,
invokeFn,
compensateFn,
}: ApplyStepOptions<
TStepInput,
TInvokeInput,
TInvokeResultOutput,
TInvokeResultCompensateInput
>): StepFunctionResult<TInvokeResultOutput> {
return function (this: CreateWorkflowComposerContext) {
if (!this.workflowId) {
throw new Error(
"createStep must be used inside a createWorkflow definition"
)
}
const handler = {
invoke: async (transactionContext) => {
const executionContext: StepExecutionContext = {
container: transactionContext.container,
metadata: transactionContext.metadata,
context: transactionContext.context,
}
const argInput = await resolveValue(input, transactionContext)
const stepResponse: StepResponse<any, any> = await invokeFn.apply(
this,
[argInput, executionContext]
)
const stepResponseJSON =
stepResponse?.__type === SymbolWorkflowStepResponse
? stepResponse.toJSON()
: stepResponse
return {
__type: SymbolWorkflowWorkflowData,
output: stepResponseJSON,
}
},
compensate: compensateFn
? async (transactionContext) => {
const executionContext: StepExecutionContext = {
container: transactionContext.container,
metadata: transactionContext.metadata,
context: transactionContext.context,
}
const stepOutput = transactionContext.invoke[stepName].output
const invokeResult =
stepOutput?.__type === SymbolWorkflowStepResponse
? stepOutput.compensateInput &&
JSON.parse(JSON.stringify(stepOutput.compensateInput))
: stepOutput && JSON.parse(JSON.stringify(stepOutput))
const args = [invokeResult, executionContext]
const output = await compensateFn.apply(this, args)
return {
output,
}
}
: undefined,
}
this.flow.addAction(stepName, {
noCompensation: !compensateFn,
})
this.handlers.set(stepName, handler)
const ret = {
__type: SymbolWorkflowStep,
__step__: stepName,
}
return proxify(ret)
}
}
/**
* Function which will create a StepFunction to be used inside a createWorkflow composer function.
* This function will return a function which can be used to bind the step to a workflow.
* The types of the input to be passed to the step function is defined by the generic of the invoke function provided.
*
* @param name
* @param invokeFn
* @param compensateFn
*
* @example
* ```ts
* interface CreateProductInput {
* title: string
* }
*
* interface CreateProductOutput {
* product: { id: string; title: string }
* compensateInput: {
* product_id: string
* }
* }
*
* export const createProductStep = createStep(
* "createProductStep",
* async function (input: Step1Input, context: StepExecutionContext): Promise<CreateProductOutput> {
* const productService = context.container.resolve("productService")
* const product = await productService.create(input)
* return {
* product,
* compensateInput: {
* product_id: product.id
* }
* }
* },
* async function (input: { product_id: string }, context: StepExecutionContext) {
* const productService = context.container.resolve("productService")
* await productService.delete(input.product_id)
* })
*/
export function createStep<
TInvokeInput extends object,
TInvokeResultOutput,
TInvokeResultCompensateInput
>(
name: string,
invokeFn: InvokeFn<
TInvokeInput,
TInvokeResultOutput,
TInvokeResultCompensateInput
>,
compensateFn?: CompensateFn<TInvokeResultCompensateInput>
): StepFunction<TInvokeInput, TInvokeResultOutput> {
const stepName = name ?? invokeFn.name
const returnFn = function (input: {
[K in keyof TInvokeInput]: WorkflowData<TInvokeInput[K]>
}): WorkflowData<TInvokeResultOutput> {
if (!global[SymbolMedusaWorkflowComposerContext]) {
throw new Error(
"createStep must be used inside a createWorkflow definition"
)
}
const stepBinder = (
global[
SymbolMedusaWorkflowComposerContext
] as CreateWorkflowComposerContext
).stepBinder
return stepBinder<TInvokeResultOutput>(
applyStep<
TInvokeInput,
{ [K in keyof TInvokeInput]: WorkflowData<TInvokeInput[K]> },
TInvokeResultOutput,
TInvokeResultCompensateInput
>({
stepName,
input,
invokeFn,
compensateFn,
})
)
}
returnFn.__type = SymbolWorkflowStepBind
returnFn.__step__ = stepName
return returnFn as unknown as StepFunction<TInvokeInput, TInvokeResultOutput>
}

View File

@@ -0,0 +1,182 @@
import {
LocalWorkflow,
WorkflowHandler,
WorkflowManager,
} from "@medusajs/orchestration"
import { LoadedModule, MedusaContainer } from "@medusajs/types"
import { exportWorkflow, FlowRunOptions, WorkflowResult } from "../../helper"
import {
CreateWorkflowComposerContext,
WorkflowData,
WorkflowDataProperties,
} from "./type"
import {
resolveValue,
SymbolInputReference,
SymbolMedusaWorkflowComposerContext,
SymbolWorkflowStep,
} from "./helpers"
import { proxify } from "./helpers/proxy"
global[SymbolMedusaWorkflowComposerContext] = null
type ReturnWorkflow<TData, TResult, THooks extends Record<string, Function>> = {
<TDataOverride = undefined, TResultOverride = undefined>(
container?: LoadedModule[] | MedusaContainer
): Omit<LocalWorkflow, "run"> & {
run: (
args?: FlowRunOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
) => Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
>
}
} & THooks
/**
* Creates a new workflow with the given name and composer function.
* The composer function will compose the workflow by using the step, parallelize and other util functions that
* will allow to define the flow of event of a workflow.
*
* @param name
* @param composer
*
* @example
* ```ts
* import { createWorkflow, WorkflowData } from "@medusajs/workflows"
* import { createProductStep, getProductStep, createPricesStep } from "./steps"
*
* interface MyWorkflowData {
* title: string
* }
*
* const myWorkflow = createWorkflow("my-workflow", (input: WorkflowData<MyWorkflowData>) => {
* // Everything here will be executed and resolved later during the execution. Including the data access.
*
* const product = createProductStep(input)
* const prices = createPricesStep(product)
*
* const id = product.id
* return getProductStep(product.id)
* })
* ```
*/
export function createWorkflow<
TData,
TResult,
THooks extends Record<string, Function> = Record<string, Function>
>(
name: string,
composer: (input: WorkflowData<TData>) =>
| void
| WorkflowData<TResult>
| {
[K in keyof TResult]:
| WorkflowData<TResult[K]>
| WorkflowDataProperties<TResult[K]>
}
): ReturnWorkflow<TData, TResult, THooks> {
const handlers: WorkflowHandler = new Map()
if (WorkflowManager.getWorkflow(name)) {
WorkflowManager.unregister(name)
}
WorkflowManager.register(name, undefined, handlers)
const context: CreateWorkflowComposerContext = {
workflowId: name,
flow: WorkflowManager.getTransactionDefinition(name),
handlers,
hooks_: [],
hooksCallback_: {},
hookBinder: (name, fn) => {
context.hooks_.push(name)
return fn(context)
},
stepBinder: (fn) => {
return fn.bind(context)()
},
parallelizeBinder: (fn) => {
return fn.bind(context)()
},
}
global[SymbolMedusaWorkflowComposerContext] = context
const inputPlaceHolder = proxify<WorkflowData>({
__type: SymbolInputReference,
__step__: "",
})
const returnedStep = composer.apply(context, [inputPlaceHolder])
delete global[SymbolMedusaWorkflowComposerContext]
WorkflowManager.update(name, context.flow, handlers)
const workflow = exportWorkflow<TData, TResult>(name)
const mainFlow = <TDataOverride = undefined, TResultOverride = undefined>(
container?: LoadedModule[] | MedusaContainer
) => {
const workflow_ = workflow<TDataOverride, TResultOverride>(container)
const originalRun = workflow_.run
workflow_.run = (async (
args?: FlowRunOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
): Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
> => {
args ??= {}
args.resultFrom ??=
returnedStep?.__type === SymbolWorkflowStep
? returnedStep.__step__
: undefined
// Forwards the input to the ref object on composer.apply
const workflowResult = (await originalRun(
args
)) as unknown as WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
workflowResult.result = await resolveValue(
workflowResult.result || returnedStep,
workflowResult.transaction.getContext()
)
return workflowResult
}) as any
return workflow_
}
let shouldRegisterHookHandler = true
for (const hook of context.hooks_) {
mainFlow[hook] = (fn) => {
context.hooksCallback_[hook] ??= []
if (!shouldRegisterHookHandler) {
console.warn(
`A hook handler has already been registered for the ${hook} hook. The current handler registration will be skipped.`
)
return
}
context.hooksCallback_[hook].push(fn)
shouldRegisterHookHandler = false
}
}
return mainFlow as ReturnWorkflow<TData, TResult, THooks>
}

View File

@@ -0,0 +1,3 @@
export * from "./step-response"
export * from "./symbol"
export * from "./resolve-value"

View File

@@ -0,0 +1,28 @@
import { transform } from "../transform"
import { WorkflowData, WorkflowTransactionContext } from "../type"
import { SymbolInputReference, SymbolWorkflowStepTransformer } from "./symbol"
import { resolveValue } from "./resolve-value"
export function proxify<T>(obj: WorkflowData<any>): T {
return new Proxy(obj, {
get(target: any, prop: string | symbol): any {
if (prop in target) {
return target[prop]
}
return transform(target[prop], async function (input, context) {
const { invoke } = context as WorkflowTransactionContext
let output =
target.__type === SymbolInputReference ||
target.__type === SymbolWorkflowStepTransformer
? target
: invoke?.[obj.__step__]?.output
output = await resolveValue(output, context)
output = output?.[prop]
return output && JSON.parse(JSON.stringify(output))
})
},
}) as unknown as T
}

View File

@@ -0,0 +1,71 @@
import { promiseAll } from "@medusajs/utils"
import {
SymbolInputReference,
SymbolWorkflowHook,
SymbolWorkflowStep,
SymbolWorkflowStepResponse,
SymbolWorkflowStepTransformer,
} from "./symbol"
async function resolveProperty(property, transactionContext) {
const { invoke: invokeRes } = transactionContext
if (property?.__type === SymbolInputReference) {
return transactionContext.payload
} else if (property?.__type === SymbolWorkflowStepTransformer) {
return await property.__resolver(transactionContext)
} else if (property?.__type === SymbolWorkflowHook) {
return await property.__value(transactionContext)
} else if (property?.__type === SymbolWorkflowStep) {
const output = invokeRes[property.__step__]?.output
if (output?.__type === SymbolWorkflowStepResponse) {
return output.output
}
return output
} else if (property?.__type === SymbolWorkflowStepResponse) {
return property.output
} else {
return property
}
}
export async function resolveValue(input, transactionContext) {
const unwrapInput = async (
inputTOUnwrap: Record<string, unknown>,
parentRef: any
) => {
if (inputTOUnwrap == null) {
return inputTOUnwrap
}
if (Array.isArray(inputTOUnwrap)) {
return await promiseAll(
inputTOUnwrap.map((i) => unwrapInput(i, transactionContext))
)
}
if (typeof inputTOUnwrap !== "object") {
return inputTOUnwrap
}
for (const key of Object.keys(inputTOUnwrap)) {
parentRef[key] = await resolveProperty(
inputTOUnwrap[key],
transactionContext
)
if (typeof parentRef[key] === "object") {
await unwrapInput(parentRef[key], parentRef[key])
}
}
return parentRef
}
const result = input?.__type
? await resolveProperty(input, transactionContext)
: await unwrapInput(input, {})
return result && JSON.parse(JSON.stringify(result))
}

View File

@@ -0,0 +1,32 @@
import { SymbolWorkflowStepResponse } from "./symbol"
export class StepResponse<TOutput, TCompensateInput = TOutput> {
readonly #__type = SymbolWorkflowStepResponse
readonly #output: TOutput
readonly #compensateInput?: TCompensateInput
constructor(output: TOutput, compensateInput?: TCompensateInput) {
this.#output = output
this.#compensateInput = (compensateInput ?? output) as TCompensateInput
}
get __type() {
return this.#__type
}
get output(): TOutput {
return this.#output
}
get compensateInput(): TCompensateInput {
return this.#compensateInput as TCompensateInput
}
toJSON() {
return {
__type: this.#__type,
output: this.#output,
compensateInput: this.#compensateInput,
}
}
}

View File

@@ -0,0 +1,12 @@
export const SymbolMedusaWorkflowComposerContext = Symbol.for(
"MedusaWorkflowComposerContext"
)
export const SymbolInputReference = Symbol.for("WorkflowInputReference")
export const SymbolWorkflowStep = Symbol.for("WorkflowStep")
export const SymbolWorkflowHook = Symbol.for("WorkflowHook")
export const SymbolWorkflowWorkflowData = Symbol.for("WorkflowWorkflowData")
export const SymbolWorkflowStepResponse = Symbol.for("WorkflowStepResponse")
export const SymbolWorkflowStepBind = Symbol.for("WorkflowStepBind")
export const SymbolWorkflowStepTransformer = Symbol.for(
"WorkflowStepTransformer"
)

View File

@@ -0,0 +1,43 @@
import {
resolveValue,
SymbolMedusaWorkflowComposerContext,
SymbolWorkflowHook,
} from "./helpers"
import {
CreateWorkflowComposerContext,
StepExecutionContext,
WorkflowData,
} from "./type"
export function hook<TOutput>(name: string, value: any): WorkflowData<TOutput> {
const hookBinder = (
global[SymbolMedusaWorkflowComposerContext] as CreateWorkflowComposerContext
).hookBinder
return hookBinder(name, function (context) {
return {
__value: async function (transactionContext) {
const executionContext: StepExecutionContext = {
container: transactionContext.container,
metadata: transactionContext.metadata,
context: transactionContext.context,
}
const allValues = await resolveValue(value, transactionContext)
const stepValue = allValues
? JSON.parse(JSON.stringify(allValues))
: allValues
let finalResult
const functions = context.hooksCallback_[name]
for (let i = 0; i < functions.length; i++) {
const fn = functions[i]
const arg = i === 0 ? stepValue : finalResult
finalResult = await fn.apply(fn, [arg, executionContext])
}
return finalResult
},
__type: SymbolWorkflowHook,
}
})
}

View File

@@ -0,0 +1,9 @@
export * from "./create-step"
export * from "./create-workflow"
export * from "./hook"
export * from "./parallelize"
export * from "./helpers/resolve-value"
export * from "./helpers/symbol"
export * from "./helpers/step-response"
export * from "./transform"
export * from "./type"

View File

@@ -0,0 +1,58 @@
import { CreateWorkflowComposerContext, WorkflowData } from "./type"
import { SymbolMedusaWorkflowComposerContext } from "./helpers"
/**
* Parallelize multiple steps.
* The steps will be run in parallel. The result of each step will be returned as part of the result array.
* Each StepResult can be accessed from the resulted array in the order they were passed to the parallelize function.
*
* @param steps
*
* @example
* ```ts
* import { createWorkflow, WorkflowData, parallelize } from "@medusajs/workflows"
* import { createProductStep, getProductStep, createPricesStep, attachProductToSalesChannelStep } from "./steps"
*
* interface MyWorkflowData {
* title: string
* }
*
* const myWorkflow = createWorkflow("my-workflow", (input: WorkflowData<MyWorkflowData>) => {
* const product = createProductStep(input)
*
* const [prices, productSalesChannel] = parallelize(
* createPricesStep(product),
* attachProductToSalesChannelStep(product)
* )
*
* const id = product.id
* return getProductStep(product.id)
* })
*/
export function parallelize<TResult extends WorkflowData[]>(
...steps: TResult
): TResult {
if (!global[SymbolMedusaWorkflowComposerContext]) {
throw new Error(
"parallelize must be used inside a createWorkflow definition"
)
}
const parallelizeBinder = (
global[SymbolMedusaWorkflowComposerContext] as CreateWorkflowComposerContext
).parallelizeBinder
const resultSteps = steps.map((step) => step)
return parallelizeBinder<TResult>(function (
this: CreateWorkflowComposerContext
) {
const stepOntoMerge = steps.shift()!
this.flow.mergeActions(
stepOntoMerge.__step__,
...steps.map((step) => step.__step__)
)
return resultSteps as unknown as TResult
})
}

View File

@@ -0,0 +1,133 @@
import { resolveValue, SymbolWorkflowStepTransformer } from "./helpers"
import { StepExecutionContext, WorkflowData } from "./type"
import { proxify } from "./helpers/proxy"
type Func1<T extends object | WorkflowData, U> = (
input: T extends WorkflowData<infer U>
? U
: T extends object
? { [K in keyof T]: T[K] extends WorkflowData<infer U> ? U : T[K] }
: {},
context: StepExecutionContext
) => U | Promise<U>
type Func<T, U> = (input: T, context: StepExecutionContext) => U | Promise<U>
// prettier-ignore
// eslint-disable-next-line max-len
export function transform<T extends object | WorkflowData, RFinal>(
values: T,
...func:
| [Func1<T, RFinal>]
): WorkflowData<RFinal>
// prettier-ignore
// eslint-disable-next-line max-len
export function transform<T extends object | WorkflowData, RA, RFinal>(
values: T,
...func:
| [Func1<T, RFinal>]
| [Func1<T, RA>, Func<RA, RFinal>]
): WorkflowData<RFinal>
// prettier-ignore
// eslint-disable-next-line max-len
export function transform<T extends object | WorkflowData, RA, RB, RFinal>(
values: T,
...func:
| [Func1<T, RFinal>]
| [Func1<T, RA>, Func<RA, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RFinal>]
): WorkflowData<RFinal>
// prettier-ignore
// eslint-disable-next-line max-len
export function transform<T extends object | WorkflowData, RA, RB, RC, RFinal>(
values: T,
...func:
| [Func1<T, RFinal>]
| [Func1<T, RA>, Func<RA, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RFinal>]
): WorkflowData<RFinal>
// prettier-ignore
// eslint-disable-next-line max-len
export function transform<T extends object | WorkflowData, RA, RB, RC, RD, RFinal>(
values: T,
...func:
| [Func1<T, RFinal>]
| [Func1<T, RA>, Func<RA, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RD>, Func<RD, RFinal>]
): WorkflowData<RFinal>
// prettier-ignore
// eslint-disable-next-line max-len
export function transform<T extends object | WorkflowData, RA, RB, RC, RD, RE, RFinal>(
values: T,
...func:
| [Func1<T, RFinal>]
| [Func1<T, RA>, Func<RA, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RD>, Func<RD, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RD>, Func<RD, RE>, Func<RE, RFinal>]
): WorkflowData<RFinal>
// prettier-ignore
// eslint-disable-next-line max-len
export function transform<T extends object | WorkflowData, RA, RB, RC, RD, RE, RF, RFinal>(
values: T,
...func:
| [Func1<T, RFinal>]
| [Func1<T, RA>, Func<RA, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RD>, Func<RD, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RD>, Func<RD, RE>, Func<RE, RFinal>]
| [Func1<T, RA>, Func<RA, RB>, Func<RB, RC>, Func<RC, RD>, Func<RD, RE>, Func<RE, RF>, Func<RF, RFinal>]
): WorkflowData<RFinal>
/**
* Transforms the input value(s) using the provided functions.
* Allow to perform transformation on the future result of the step(s) to be passed
* to other steps later on at run time.
*
* @param values
* @param functions
*/
export function transform(
values: any | any[],
...functions: Function[]
): unknown {
const ret = {
__type: SymbolWorkflowStepTransformer,
__resolver: undefined,
}
const returnFn = async function (transactionContext): Promise<any> {
const allValues = await resolveValue(values, transactionContext)
const stepValue = allValues
? JSON.parse(JSON.stringify(allValues))
: allValues
let finalResult
for (let i = 0; i < functions.length; i++) {
const fn = functions[i]
const arg = i === 0 ? stepValue : finalResult
finalResult = await fn.apply(fn, [arg, transactionContext])
}
return finalResult
}
const proxyfiedRet = proxify<WorkflowData & { __resolver: any }>(
ret as unknown as WorkflowData
)
proxyfiedRet.__resolver = returnFn as any
return proxyfiedRet
}

View File

@@ -0,0 +1,65 @@
import {
OrchestratorBuilder,
TransactionContext as OriginalWorkflowTransactionContext,
TransactionPayload,
WorkflowHandler,
} from "@medusajs/orchestration"
import { Context, MedusaContainer } from "@medusajs/types"
export type StepFunctionResult<TOutput extends unknown | unknown[] = unknown> =
(this: CreateWorkflowComposerContext) => TOutput extends []
? [
...WorkflowData<{
[K in keyof TOutput]: TOutput[number][K]
}>[]
]
: WorkflowData<{ [K in keyof TOutput]: TOutput[K] }>
export type StepFunction<TInput extends object = object, TOutput = unknown> = {
(input: { [K in keyof TInput]: WorkflowData<TInput[K]> }): WorkflowData<{
[K in keyof TOutput]: TOutput[K]
}>
} & WorkflowDataProperties<{
[K in keyof TOutput]: TOutput[K]
}>
export type WorkflowDataProperties<T = unknown> = {
__type: Symbol
__step__: string
}
export type WorkflowData<T = unknown> = (T extends object
? {
[Key in keyof T]: WorkflowData<T[Key]>
}
: WorkflowDataProperties<T>) &
WorkflowDataProperties<T>
export type CreateWorkflowComposerContext = {
hooks_: string[]
hooksCallback_: Record<string, Function[]>
workflowId: string
flow: OrchestratorBuilder
handlers: WorkflowHandler
stepBinder: <TOutput = unknown>(
fn: StepFunctionResult
) => WorkflowData<TOutput>
hookBinder: <TOutput = unknown>(
name: string,
fn: Function
) => WorkflowData<TOutput>
parallelizeBinder: <TOutput extends WorkflowData[] = WorkflowData[]>(
fn: (this: CreateWorkflowComposerContext) => TOutput
) => TOutput
}
export interface StepExecutionContext {
container: MedusaContainer
metadata: TransactionPayload["metadata"]
context: Context
}
export type WorkflowTransactionContext = StepExecutionContext &
OriginalWorkflowTransactionContext & {
invoke: { [key: string]: { output: any } }
}