diff --git a/.changeset/slimy-pandas-hug.md b/.changeset/slimy-pandas-hug.md new file mode 100644 index 0000000000..4992c8d938 --- /dev/null +++ b/.changeset/slimy-pandas-hug.md @@ -0,0 +1,7 @@ +--- +"@medusajs/orchestration": patch +"@medusajs/utils": patch +"@medusajs/workflows-sdk": patch +--- + +chore(): Improve workflows sdk tooling diff --git a/packages/core/orchestration/src/transaction/distributed-transaction.ts b/packages/core/orchestration/src/transaction/distributed-transaction.ts index 9778d3c638..b6096f6369 100644 --- a/packages/core/orchestration/src/transaction/distributed-transaction.ts +++ b/packages/core/orchestration/src/transaction/distributed-transaction.ts @@ -91,7 +91,7 @@ class DistributedTransaction extends EventEmitter { * * @private */ - #temporaryStorage = new Map() + #temporaryStorage = new WeakMap<{ key: string }, unknown>() public static setStorage(storage: IDistributedTransactionStorage) { this.keyValueStore = storage @@ -312,15 +312,15 @@ class DistributedTransaction extends EventEmitter { } public setTemporaryData(key: string, value: unknown) { - this.#temporaryStorage.set(key, value) + this.#temporaryStorage.set({ key }, value) } public getTemporaryData(key: string) { - return this.#temporaryStorage.get(key) + return this.#temporaryStorage.get({ key }) } public hasTemporaryData(key: string) { - return this.#temporaryStorage.has(key) + return this.#temporaryStorage.has({ key }) } /** diff --git a/packages/core/utils/src/common/deep-copy.ts b/packages/core/utils/src/common/deep-copy.ts index 3090a8220a..2b2fee8984 100644 --- a/packages/core/utils/src/common/deep-copy.ts +++ b/packages/core/utils/src/common/deep-copy.ts @@ -33,12 +33,12 @@ export function deepCopy< return copy } + if (util.types.isProxy(obj)) { + return obj as unknown as TOutput + } + // Handle objects if (isObject(obj)) { - if (util.types.isProxy(obj)) { - return obj as unknown as TOutput - } - copy = {} as TOutput cache.set(obj, copy) // Add to cache before recursing diff --git a/packages/core/utils/src/common/parse-stringify-if-necessary.ts b/packages/core/utils/src/common/parse-stringify-if-necessary.ts index b341f782db..25888292f9 100644 --- a/packages/core/utils/src/common/parse-stringify-if-necessary.ts +++ b/packages/core/utils/src/common/parse-stringify-if-necessary.ts @@ -6,7 +6,7 @@ import { isDefined } from "./is-defined" * @returns */ export function parseStringifyIfNecessary(result: unknown) { - if (typeof result !== "object") { + if (typeof result == null || typeof result !== "object") { return result } diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index 555dad0e66..0f373ba1fe 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -34,6 +34,18 @@ import { WorkflowResult, } from "./type" +// Cache for loaded modules to avoid repeated traversal +let cachedLoadedModules: LoadedModule[] | null = null + +function getCachedLoadedModules(): LoadedModule[] { + if (!cachedLoadedModules) { + cachedLoadedModules = MedusaModule.getLoadedModules().map( + (mod) => Object.values(mod)[0] + ) + } + return cachedLoadedModules +} + function createContextualWorkflowRunner< TData = unknown, TResult = unknown, @@ -91,9 +103,7 @@ function createContextualWorkflowRunner< const container_ = flow.container as MedusaContainer if (!container_ || !isPresent(container_?.registrations)) { - executionContainer = MedusaModule.getLoadedModules().map( - (mod) => Object.values(mod)[0] - ) + executionContainer = getCachedLoadedModules() } } @@ -203,8 +213,10 @@ function createContextualWorkflowRunner< __type: MedusaContextType as Context["__type"], } - context.transactionId ??= "auto-" + ulid() - context.eventGroupId ??= ulid() + const uniqId = ulid() + + context.transactionId ??= "auto-" + uniqId + context.eventGroupId ??= uniqId return await originalExecution( originalRun, @@ -481,14 +493,11 @@ export const exportWorkflow = ( > => { const container = args?.container delete args?.container - const inputArgs = { ...args } as FlowRunOptions< - TDataOverride extends undefined ? TData : TDataOverride - > return await buildRunnerFn<"run", TDataOverride, TResultOverride>( "run", container - )(inputArgs) + )(args) } exportedWorkflow.registerStepSuccess = async < @@ -505,9 +514,6 @@ export const exportWorkflow = ( > => { const container = args?.container delete args?.container - const inputArgs = { ...args } as FlowRegisterStepSuccessOptions< - TDataOverride extends undefined ? TData : TDataOverride - > return await buildRunnerFn< "registerStepSuccess", @@ -516,7 +522,7 @@ export const exportWorkflow = ( >( "registerStepSuccess", container - )(inputArgs) + )(args) } exportedWorkflow.registerStepFailure = async < @@ -533,9 +539,6 @@ export const exportWorkflow = ( > => { const container = args?.container delete args?.container - const inputArgs = { ...args } as FlowRegisterStepFailureOptions< - TDataOverride extends undefined ? TData : TDataOverride - > return await buildRunnerFn< "registerStepFailure", @@ -544,7 +547,7 @@ export const exportWorkflow = ( >( "registerStepFailure", container - )(inputArgs) + )(args) } exportedWorkflow.retryStep = async < @@ -568,12 +571,11 @@ export const exportWorkflow = ( ): Promise => { const container = args?.container delete args?.container - const inputArgs = { ...args } as FlowCancelOptions return await buildRunnerFn<"cancel", unknown, unknown>( "cancel", container - )(inputArgs) + )(args) } MedusaWorkflow.registerWorkflow(workflowId, exportedWorkflow) diff --git a/packages/core/workflows-sdk/src/utils/composer/create-step.ts b/packages/core/workflows-sdk/src/utils/composer/create-step.ts index 7991bc5301..8e43f79ba1 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-step.ts @@ -101,6 +101,38 @@ export interface ApplyStepOptions< * @param invokeFn * @param compensateFn */ +// Factory function to create and configure handlers +function createAndConfigureHandler< + TInvokeInput, + TStepInput extends { + [K in keyof TInvokeInput]: WorkflowData + }, + TInvokeResultOutput, + TInvokeResultCompensateInput +>( + context: CreateWorkflowComposerContext, + stepName: string, + config: TransactionStepsDefinition, + input: TStepInput | undefined, + invokeFn: InvokeFn< + TInvokeInput, + TInvokeResultOutput, + TInvokeResultCompensateInput + >, + compensateFn?: CompensateFn +) { + const handler = createStepHandler.bind(context)({ + stepName, + input, + invokeFn, + compensateFn, + }) + + wrapAsyncHandler(config, handler) + + return handler +} + export function applyStep< TInvokeInput, TStepInput extends { @@ -127,14 +159,14 @@ export function applyStep< ) } - const handler = createStepHandler.bind(this)({ + const handler = createAndConfigureHandler( + this, stepName, + stepConfig, input, invokeFn, - compensateFn, - }) - - wrapAsyncHandler(stepConfig, handler) + compensateFn + ) stepConfig.uuid = ulid() stepConfig.noCompensation = !compensateFn @@ -179,14 +211,14 @@ export function applyStep< delete newConfig.name - const handler = createStepHandler.bind(this)({ - stepName: newStepName, + const handler = createAndConfigureHandler( + this, + newStepName, + newConfig, input, invokeFn, - compensateFn, - }) - - wrapAsyncHandler(newConfig, handler) + compensateFn + ) this.handlers.set(stepName, this.overriddenHandler.get(stepName)!) this.overriddenHandler.delete(stepName) @@ -319,7 +351,10 @@ export function wrapConditionalStep( ) { const originalInvoke = handle.invoke handle.invoke = async (stepArguments: WorkflowStepHandlerArguments) => { - const args = await resolveValue(input, stepArguments) + let args = resolveValue(input, stepArguments) + if (args instanceof Promise) { + args = await args + } const canContinue = await condition(args, stepArguments) diff --git a/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts b/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts index 7e43bbc3f8..0ce63e31a5 100644 --- a/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts +++ b/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts @@ -80,14 +80,28 @@ export function createStepHandler< stepArguments, }) - const argInput = input ? await resolveValue(input, stepArguments) : {} + let argInput = {} + if (input) { + argInput = resolveValue(input, stepArguments) + if (argInput instanceof Promise) { + argInput = await argInput + } + } + const stepResponse: StepResponse = await invokeFn.apply(this, [ argInput, executionContext, ]) + if (!stepResponse || typeof stepResponse !== "object") { + return { + __type: OrchestrationUtils.SymbolWorkflowWorkflowData, + output: stepResponse, + } + } + const stepResponseJSON = - stepResponse?.__type === OrchestrationUtils.SymbolWorkflowStepResponse + stepResponse.__type === OrchestrationUtils.SymbolWorkflowStepResponse ? stepResponse.toJSON() : stepResponse @@ -104,13 +118,24 @@ export function createStepHandler< }) const stepOutput = (stepArguments.invoke[stepName] as any)?.output + + if (!stepOutput) { + const output = await compensateFn.apply(this, [ + stepOutput, + executionContext, + ]) + return { output } + } + const invokeResult = - stepOutput?.__type === OrchestrationUtils.SymbolWorkflowStepResponse + stepOutput.__type === OrchestrationUtils.SymbolWorkflowStepResponse ? stepOutput.compensateInput : stepOutput - const args = [invokeResult, executionContext] - const output = await compensateFn.apply(this, args) + const output = await compensateFn.apply(this, [ + invokeResult, + executionContext, + ]) return { output, } diff --git a/packages/core/workflows-sdk/src/utils/composer/helpers/proxy.ts b/packages/core/workflows-sdk/src/utils/composer/helpers/proxy.ts index 412b076fe5..cf6847e30a 100644 --- a/packages/core/workflows-sdk/src/utils/composer/helpers/proxy.ts +++ b/packages/core/workflows-sdk/src/utils/composer/helpers/proxy.ts @@ -1,7 +1,5 @@ import { transform } from "../transform" -import { WorkflowData, WorkflowTransactionContext } from "../type" -import { OrchestrationUtils } from "@medusajs/utils" -import { resolveValue } from "./resolve-value" +import { WorkflowData } from "../type" export function proxify(obj: WorkflowData): T { return new Proxy(obj, { @@ -10,18 +8,11 @@ export function proxify(obj: WorkflowData): T { return target[prop] } - return transform({}, async function (_, context) { - const { invoke } = context as WorkflowTransactionContext - let output = - target.__type === OrchestrationUtils.SymbolInputReference || - target.__type === OrchestrationUtils.SymbolWorkflowStepTransformer - ? target - : invoke?.[obj.__step__]?.output - - output = await resolveValue(output, context) - - return output?.[prop] + const transformer = transform({ target }, function (data) { + return data.target?.[prop] }) + + return transformer }, }) as unknown as T } diff --git a/packages/core/workflows-sdk/src/utils/composer/helpers/resolve-value.ts b/packages/core/workflows-sdk/src/utils/composer/helpers/resolve-value.ts index 0b6b8de748..a7b2b950fd 100644 --- a/packages/core/workflows-sdk/src/utils/composer/helpers/resolve-value.ts +++ b/packages/core/workflows-sdk/src/utils/composer/helpers/resolve-value.ts @@ -1,26 +1,31 @@ import { deepCopy, + isObject, OrchestrationUtils, parseStringifyIfNecessary, promiseAll, } from "@medusajs/utils" +import * as util from "node:util" -async function resolveProperty(property, transactionContext) { +type InputPrimitive = string | Symbol +type InputObject = object & { __type?: string | Symbol; output?: any } + +function resolveProperty(property, transactionContext) { const { invoke: invokeRes } = transactionContext let res - if (property?.__type === OrchestrationUtils.SymbolInputReference) { + if (property.__type === OrchestrationUtils.SymbolInputReference) { res = transactionContext.payload } else if ( - property?.__type === OrchestrationUtils.SymbolMedusaWorkflowResponse + property.__type === OrchestrationUtils.SymbolMedusaWorkflowResponse ) { - res = await resolveValue(property.$result, transactionContext) + res = resolveValue(property.$result, transactionContext) } else if ( - property?.__type === OrchestrationUtils.SymbolWorkflowStepTransformer + property.__type === OrchestrationUtils.SymbolWorkflowStepTransformer ) { - res = await property.__resolver(transactionContext) - } else if (property?.__type === OrchestrationUtils.SymbolWorkflowStep) { + res = property.__resolver(transactionContext) + } else if (property.__type === OrchestrationUtils.SymbolWorkflowStep) { const output = invokeRes[property.__step__]?.output ?? invokeRes[property.__step__] if (output?.__type === OrchestrationUtils.SymbolWorkflowStepResponse) { @@ -29,7 +34,7 @@ async function resolveProperty(property, transactionContext) { res = output } } else if ( - property?.__type === OrchestrationUtils.SymbolWorkflowStepResponse + property.__type === OrchestrationUtils.SymbolWorkflowStepResponse ) { res = property.output } else { @@ -39,49 +44,176 @@ async function resolveProperty(property, transactionContext) { return res } -/** - * @internal - */ -export async function resolveValue(input, transactionContext) { - const unwrapInput = async ( - inputTOUnwrap: Record, - parentRef: any - ) => { - if (inputTOUnwrap == null) { - return inputTOUnwrap - } +function unwrapInput({ + inputTOUnwrap, + parentRef, + transactionContext, +}: { + inputTOUnwrap: InputObject + parentRef: any + transactionContext: any +}): any { + if (inputTOUnwrap == null) { + return inputTOUnwrap + } - if (Array.isArray(inputTOUnwrap)) { - return await promiseAll( - inputTOUnwrap.map((i) => resolveValue(i, transactionContext)) - ) - } + if (Array.isArray(inputTOUnwrap)) { + const promises: { promise: Promise; index: number }[] = [] + const resolvedItems: any[] = new Array(inputTOUnwrap.length) - if (typeof inputTOUnwrap !== "object") { - return inputTOUnwrap - } - - for (const key of Object.keys(inputTOUnwrap)) { - parentRef[key] = deepCopy( - await resolveProperty(inputTOUnwrap[key], transactionContext) - ) - - if (typeof parentRef[key] === "object") { - parentRef[key] = await unwrapInput(parentRef[key], parentRef[key]) + for (let i = 0; i < inputTOUnwrap.length; i++) { + const item = inputTOUnwrap[i] + if (item == null || typeof item !== "object") { + resolvedItems[i] = item + } else { + const resolved = resolveValue(item, transactionContext) + if (resolved instanceof Promise) { + promises.push({ promise: resolved, index: i }) + } else { + resolvedItems[i] = resolved + } } } - return parentRef + if (promises.length > 0) { + return promiseAll(promises.map((p) => p.promise)).then( + (resolvedPromises) => { + for (let i = 0; i < promises.length; i++) { + resolvedItems[promises[i].index] = resolvedPromises[i] + } + return resolvedItems + } + ) + } + + return resolvedItems } - const copiedInput = - input?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData - ? input.output - : input + if (util.types.isProxy(inputTOUnwrap)) { + const resolved = resolveProperty(inputTOUnwrap, transactionContext) + if (resolved instanceof Promise) { + return resolved.then((r) => { + inputTOUnwrap = r + if (!isObject(inputTOUnwrap)) { + return inputTOUnwrap + } + return unwrapInput({ + inputTOUnwrap, + parentRef: {}, + transactionContext, + }) + }) + } + inputTOUnwrap = resolved + } - const result = copiedInput?.__type - ? await resolveProperty(copiedInput, transactionContext) - : await unwrapInput(copiedInput, {}) + if (!isObject(inputTOUnwrap)) { + return inputTOUnwrap + } - return parseStringifyIfNecessary(result) + const keys = Object.keys(inputTOUnwrap) + const promises: { promise: Promise; keyIndex: number }[] = [] + + for (let i = 0; i < keys.length; i++) { + const key = keys[i] + + if (inputTOUnwrap[key] == null || typeof inputTOUnwrap[key] !== "object") { + parentRef[key] = inputTOUnwrap[key] + continue + } + + const result = resolveProperty(inputTOUnwrap[key], transactionContext) + + if (result instanceof Promise) { + promises.push({ promise: result, keyIndex: i }) + } else { + parentRef[key] = result + + if (result != null && typeof result === "object") { + const unwrapped = unwrapInput({ + inputTOUnwrap: result, + parentRef: parentRef[key] || {}, + transactionContext, + }) + if (unwrapped instanceof Promise) { + promises.push({ + promise: unwrapped.then((r) => ({ key, value: r })), + keyIndex: i, + }) + } else { + parentRef[key] = unwrapped + } + } + } + } + + if (promises.length > 0) { + return promiseAll(promises.map((p) => p.promise)).then( + (resolvedPromises) => { + for (let i = 0; i < promises.length; i++) { + const resolved = resolvedPromises[i] + if (resolved && typeof resolved === "object" && "key" in resolved) { + parentRef[resolved.key] = resolved.value + } else { + const key = keys[promises[i].keyIndex] + parentRef[key] = resolved + + if (resolved != null && typeof resolved === "object") { + const unwrapped = unwrapInput({ + inputTOUnwrap: resolved, + parentRef: parentRef[key] || {}, + transactionContext, + }) + if (unwrapped instanceof Promise) { + return unwrapped.then((r) => { + parentRef[key] = r + return parentRef + }) + } + parentRef[key] = unwrapped + } + } + } + return parentRef + } + ) + } + + return parentRef +} + +export function resolveValue( + input: InputPrimitive | InputObject | unknown | undefined, + transactionContext +): Promise | any { + if (input == null || typeof input !== "object") { + return input + } + + const input_ = deepCopy( + (input as InputObject)?.__type === + OrchestrationUtils.SymbolWorkflowWorkflowData + ? (input as InputObject).output + : input + ) + + let result: any + + if (input_?.__type) { + result = resolveProperty(input_, transactionContext) + if (result instanceof Promise) { + return result.then((r) => parseStringifyIfNecessary(r)) + } + return parseStringifyIfNecessary(result) + } else { + result = unwrapInput({ + inputTOUnwrap: input_, + parentRef: {}, + transactionContext, + }) + if (result instanceof Promise) { + return result.then((r) => parseStringifyIfNecessary(r)) + } + return parseStringifyIfNecessary(result) + } } diff --git a/packages/core/workflows-sdk/src/utils/composer/transform.ts b/packages/core/workflows-sdk/src/utils/composer/transform.ts index a42a5b4a23..f39a8a629f 100644 --- a/packages/core/workflows-sdk/src/utils/composer/transform.ts +++ b/packages/core/workflows-sdk/src/utils/composer/transform.ts @@ -1,12 +1,11 @@ -import { resolveValue } from "./helpers" -import { StepExecutionContext, WorkflowData } from "./type" -import { proxify } from "./helpers/proxy" -import { OrchestrationUtils } from "@medusajs/utils" -import { ulid } from "ulid" import { TransactionContext, WorkflowStepHandlerArguments, } from "@medusajs/orchestration" +import { OrchestrationUtils } from "@medusajs/utils" +import { resolveValue } from "./helpers" +import { proxify } from "./helpers/proxy" +import { StepExecutionContext, WorkflowData } from "./type" type Func1 = ( input: T extends WorkflowData @@ -163,7 +162,7 @@ export function transform( values: any | any[], ...functions: Function[] ): unknown { - const uniqId = ulid() + const uniqId = Math.random().toString(36).substring(2, 20) const ret = { __id: uniqId, @@ -182,7 +181,10 @@ export function transform( } } - const stepValue = await resolveValue(values, transactionContext) + let stepValue = resolveValue(values, transactionContext) + if (stepValue instanceof Promise) { + stepValue = await stepValue + } let finalResult for (let i = 0; i < functions.length; i++) {