chore(): Improve workflows sdk tooling (#13421)

RESOLVES CORE-1171

**What**
 - Reduced async overhead for objects with mixed sync/async properties
 - Lower memory pressure from eliminated promise allocations
 - Faster primitive value processing with early returns
 - Better concurrency through selective batching of truly async operations
 - Event loop friendly behavior preventing artificial delays
 - Reduced memory allocation from eliminated duplicate processing
 - Decreased GC pressure from WeakMap-based caching instead of map

**note**
Now, `resolveValue`always treat every resoltion as sync operation unless it is not, meaning we do not create promise overhead when not necessary and only when actually treating with promises
This commit is contained in:
Adrien de Peretti
2025-09-08 18:24:10 +02:00
committed by GitHub
parent 8e11998895
commit 8e5c22a8e8
10 changed files with 304 additions and 110 deletions

View File

@@ -0,0 +1,7 @@
---
"@medusajs/orchestration": patch
"@medusajs/utils": patch
"@medusajs/workflows-sdk": patch
---
chore(): Improve workflows sdk tooling

View File

@@ -91,7 +91,7 @@ class DistributedTransaction extends EventEmitter {
*
* @private
*/
#temporaryStorage = new Map<string, unknown>()
#temporaryStorage = new WeakMap<{ key: string }, unknown>()
public static setStorage(storage: IDistributedTransactionStorage) {
this.keyValueStore = storage
@@ -312,15 +312,15 @@ class DistributedTransaction extends EventEmitter {
}
public setTemporaryData(key: string, value: unknown) {
this.#temporaryStorage.set(key, value)
this.#temporaryStorage.set({ key }, value)
}
public getTemporaryData(key: string) {
return this.#temporaryStorage.get(key)
return this.#temporaryStorage.get({ key })
}
public hasTemporaryData(key: string) {
return this.#temporaryStorage.has(key)
return this.#temporaryStorage.has({ key })
}
/**

View File

@@ -33,12 +33,12 @@ export function deepCopy<
return copy
}
if (util.types.isProxy(obj)) {
return obj as unknown as TOutput
}
// Handle objects
if (isObject(obj)) {
if (util.types.isProxy(obj)) {
return obj as unknown as TOutput
}
copy = {} as TOutput
cache.set(obj, copy) // Add to cache before recursing

View File

@@ -6,7 +6,7 @@ import { isDefined } from "./is-defined"
* @returns
*/
export function parseStringifyIfNecessary(result: unknown) {
if (typeof result !== "object") {
if (typeof result == null || typeof result !== "object") {
return result
}

View File

@@ -34,6 +34,18 @@ import {
WorkflowResult,
} from "./type"
// Cache for loaded modules to avoid repeated traversal
let cachedLoadedModules: LoadedModule[] | null = null
function getCachedLoadedModules(): LoadedModule[] {
if (!cachedLoadedModules) {
cachedLoadedModules = MedusaModule.getLoadedModules().map(
(mod) => Object.values(mod)[0]
)
}
return cachedLoadedModules
}
function createContextualWorkflowRunner<
TData = unknown,
TResult = unknown,
@@ -91,9 +103,7 @@ function createContextualWorkflowRunner<
const container_ = flow.container as MedusaContainer
if (!container_ || !isPresent(container_?.registrations)) {
executionContainer = MedusaModule.getLoadedModules().map(
(mod) => Object.values(mod)[0]
)
executionContainer = getCachedLoadedModules()
}
}
@@ -203,8 +213,10 @@ function createContextualWorkflowRunner<
__type: MedusaContextType as Context["__type"],
}
context.transactionId ??= "auto-" + ulid()
context.eventGroupId ??= ulid()
const uniqId = ulid()
context.transactionId ??= "auto-" + uniqId
context.eventGroupId ??= uniqId
return await originalExecution(
originalRun,
@@ -481,14 +493,11 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
> => {
const container = args?.container
delete args?.container
const inputArgs = { ...args } as FlowRunOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
return await buildRunnerFn<"run", TDataOverride, TResultOverride>(
"run",
container
)(inputArgs)
)(args)
}
exportedWorkflow.registerStepSuccess = async <
@@ -505,9 +514,6 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
> => {
const container = args?.container
delete args?.container
const inputArgs = { ...args } as FlowRegisterStepSuccessOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
return await buildRunnerFn<
"registerStepSuccess",
@@ -516,7 +522,7 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
>(
"registerStepSuccess",
container
)(inputArgs)
)(args)
}
exportedWorkflow.registerStepFailure = async <
@@ -533,9 +539,6 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
> => {
const container = args?.container
delete args?.container
const inputArgs = { ...args } as FlowRegisterStepFailureOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
return await buildRunnerFn<
"registerStepFailure",
@@ -544,7 +547,7 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
>(
"registerStepFailure",
container
)(inputArgs)
)(args)
}
exportedWorkflow.retryStep = async <
@@ -568,12 +571,11 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
): Promise<WorkflowResult> => {
const container = args?.container
delete args?.container
const inputArgs = { ...args } as FlowCancelOptions
return await buildRunnerFn<"cancel", unknown, unknown>(
"cancel",
container
)(inputArgs)
)(args)
}
MedusaWorkflow.registerWorkflow(workflowId, exportedWorkflow)

View File

@@ -101,6 +101,38 @@ export interface ApplyStepOptions<
* @param invokeFn
* @param compensateFn
*/
// Factory function to create and configure handlers
function createAndConfigureHandler<
TInvokeInput,
TStepInput extends {
[K in keyof TInvokeInput]: WorkflowData<TInvokeInput[K]>
},
TInvokeResultOutput,
TInvokeResultCompensateInput
>(
context: CreateWorkflowComposerContext,
stepName: string,
config: TransactionStepsDefinition,
input: TStepInput | undefined,
invokeFn: InvokeFn<
TInvokeInput,
TInvokeResultOutput,
TInvokeResultCompensateInput
>,
compensateFn?: CompensateFn<TInvokeResultCompensateInput>
) {
const handler = createStepHandler.bind(context)({
stepName,
input,
invokeFn,
compensateFn,
})
wrapAsyncHandler(config, handler)
return handler
}
export function applyStep<
TInvokeInput,
TStepInput extends {
@@ -127,14 +159,14 @@ export function applyStep<
)
}
const handler = createStepHandler.bind(this)({
const handler = createAndConfigureHandler(
this,
stepName,
stepConfig,
input,
invokeFn,
compensateFn,
})
wrapAsyncHandler(stepConfig, handler)
compensateFn
)
stepConfig.uuid = ulid()
stepConfig.noCompensation = !compensateFn
@@ -179,14 +211,14 @@ export function applyStep<
delete newConfig.name
const handler = createStepHandler.bind(this)({
stepName: newStepName,
const handler = createAndConfigureHandler(
this,
newStepName,
newConfig,
input,
invokeFn,
compensateFn,
})
wrapAsyncHandler(newConfig, handler)
compensateFn
)
this.handlers.set(stepName, this.overriddenHandler.get(stepName)!)
this.overriddenHandler.delete(stepName)
@@ -319,7 +351,10 @@ export function wrapConditionalStep(
) {
const originalInvoke = handle.invoke
handle.invoke = async (stepArguments: WorkflowStepHandlerArguments) => {
const args = await resolveValue(input, stepArguments)
let args = resolveValue(input, stepArguments)
if (args instanceof Promise) {
args = await args
}
const canContinue = await condition(args, stepArguments)

View File

@@ -80,14 +80,28 @@ export function createStepHandler<
stepArguments,
})
const argInput = input ? await resolveValue(input, stepArguments) : {}
let argInput = {}
if (input) {
argInput = resolveValue(input, stepArguments)
if (argInput instanceof Promise) {
argInput = await argInput
}
}
const stepResponse: StepResponse<any, any> = await invokeFn.apply(this, [
argInput,
executionContext,
])
if (!stepResponse || typeof stepResponse !== "object") {
return {
__type: OrchestrationUtils.SymbolWorkflowWorkflowData,
output: stepResponse,
}
}
const stepResponseJSON =
stepResponse?.__type === OrchestrationUtils.SymbolWorkflowStepResponse
stepResponse.__type === OrchestrationUtils.SymbolWorkflowStepResponse
? stepResponse.toJSON()
: stepResponse
@@ -104,13 +118,24 @@ export function createStepHandler<
})
const stepOutput = (stepArguments.invoke[stepName] as any)?.output
if (!stepOutput) {
const output = await compensateFn.apply(this, [
stepOutput,
executionContext,
])
return { output }
}
const invokeResult =
stepOutput?.__type === OrchestrationUtils.SymbolWorkflowStepResponse
stepOutput.__type === OrchestrationUtils.SymbolWorkflowStepResponse
? stepOutput.compensateInput
: stepOutput
const args = [invokeResult, executionContext]
const output = await compensateFn.apply(this, args)
const output = await compensateFn.apply(this, [
invokeResult,
executionContext,
])
return {
output,
}

View File

@@ -1,7 +1,5 @@
import { transform } from "../transform"
import { WorkflowData, WorkflowTransactionContext } from "../type"
import { OrchestrationUtils } from "@medusajs/utils"
import { resolveValue } from "./resolve-value"
import { WorkflowData } from "../type"
export function proxify<T>(obj: WorkflowData<any>): T {
return new Proxy(obj, {
@@ -10,18 +8,11 @@ export function proxify<T>(obj: WorkflowData<any>): T {
return target[prop]
}
return transform({}, async function (_, context) {
const { invoke } = context as WorkflowTransactionContext
let output =
target.__type === OrchestrationUtils.SymbolInputReference ||
target.__type === OrchestrationUtils.SymbolWorkflowStepTransformer
? target
: invoke?.[obj.__step__]?.output
output = await resolveValue(output, context)
return output?.[prop]
const transformer = transform({ target }, function (data) {
return data.target?.[prop]
})
return transformer
},
}) as unknown as T
}

View File

@@ -1,26 +1,31 @@
import {
deepCopy,
isObject,
OrchestrationUtils,
parseStringifyIfNecessary,
promiseAll,
} from "@medusajs/utils"
import * as util from "node:util"
async function resolveProperty(property, transactionContext) {
type InputPrimitive = string | Symbol
type InputObject = object & { __type?: string | Symbol; output?: any }
function resolveProperty(property, transactionContext) {
const { invoke: invokeRes } = transactionContext
let res
if (property?.__type === OrchestrationUtils.SymbolInputReference) {
if (property.__type === OrchestrationUtils.SymbolInputReference) {
res = transactionContext.payload
} else if (
property?.__type === OrchestrationUtils.SymbolMedusaWorkflowResponse
property.__type === OrchestrationUtils.SymbolMedusaWorkflowResponse
) {
res = await resolveValue(property.$result, transactionContext)
res = resolveValue(property.$result, transactionContext)
} else if (
property?.__type === OrchestrationUtils.SymbolWorkflowStepTransformer
property.__type === OrchestrationUtils.SymbolWorkflowStepTransformer
) {
res = await property.__resolver(transactionContext)
} else if (property?.__type === OrchestrationUtils.SymbolWorkflowStep) {
res = property.__resolver(transactionContext)
} else if (property.__type === OrchestrationUtils.SymbolWorkflowStep) {
const output =
invokeRes[property.__step__]?.output ?? invokeRes[property.__step__]
if (output?.__type === OrchestrationUtils.SymbolWorkflowStepResponse) {
@@ -29,7 +34,7 @@ async function resolveProperty(property, transactionContext) {
res = output
}
} else if (
property?.__type === OrchestrationUtils.SymbolWorkflowStepResponse
property.__type === OrchestrationUtils.SymbolWorkflowStepResponse
) {
res = property.output
} else {
@@ -39,49 +44,176 @@ async function resolveProperty(property, transactionContext) {
return res
}
/**
* @internal
*/
export async function resolveValue(input, transactionContext) {
const unwrapInput = async (
inputTOUnwrap: Record<string, unknown>,
parentRef: any
) => {
if (inputTOUnwrap == null) {
return inputTOUnwrap
}
function unwrapInput({
inputTOUnwrap,
parentRef,
transactionContext,
}: {
inputTOUnwrap: InputObject
parentRef: any
transactionContext: any
}): any {
if (inputTOUnwrap == null) {
return inputTOUnwrap
}
if (Array.isArray(inputTOUnwrap)) {
return await promiseAll(
inputTOUnwrap.map((i) => resolveValue(i, transactionContext))
)
}
if (Array.isArray(inputTOUnwrap)) {
const promises: { promise: Promise<any>; index: number }[] = []
const resolvedItems: any[] = new Array(inputTOUnwrap.length)
if (typeof inputTOUnwrap !== "object") {
return inputTOUnwrap
}
for (const key of Object.keys(inputTOUnwrap)) {
parentRef[key] = deepCopy(
await resolveProperty(inputTOUnwrap[key], transactionContext)
)
if (typeof parentRef[key] === "object") {
parentRef[key] = await unwrapInput(parentRef[key], parentRef[key])
for (let i = 0; i < inputTOUnwrap.length; i++) {
const item = inputTOUnwrap[i]
if (item == null || typeof item !== "object") {
resolvedItems[i] = item
} else {
const resolved = resolveValue(item, transactionContext)
if (resolved instanceof Promise) {
promises.push({ promise: resolved, index: i })
} else {
resolvedItems[i] = resolved
}
}
}
return parentRef
if (promises.length > 0) {
return promiseAll(promises.map((p) => p.promise)).then(
(resolvedPromises) => {
for (let i = 0; i < promises.length; i++) {
resolvedItems[promises[i].index] = resolvedPromises[i]
}
return resolvedItems
}
)
}
return resolvedItems
}
const copiedInput =
input?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData
? input.output
: input
if (util.types.isProxy(inputTOUnwrap)) {
const resolved = resolveProperty(inputTOUnwrap, transactionContext)
if (resolved instanceof Promise) {
return resolved.then((r) => {
inputTOUnwrap = r
if (!isObject(inputTOUnwrap)) {
return inputTOUnwrap
}
return unwrapInput({
inputTOUnwrap,
parentRef: {},
transactionContext,
})
})
}
inputTOUnwrap = resolved
}
const result = copiedInput?.__type
? await resolveProperty(copiedInput, transactionContext)
: await unwrapInput(copiedInput, {})
if (!isObject(inputTOUnwrap)) {
return inputTOUnwrap
}
return parseStringifyIfNecessary(result)
const keys = Object.keys(inputTOUnwrap)
const promises: { promise: Promise<any>; keyIndex: number }[] = []
for (let i = 0; i < keys.length; i++) {
const key = keys[i]
if (inputTOUnwrap[key] == null || typeof inputTOUnwrap[key] !== "object") {
parentRef[key] = inputTOUnwrap[key]
continue
}
const result = resolveProperty(inputTOUnwrap[key], transactionContext)
if (result instanceof Promise) {
promises.push({ promise: result, keyIndex: i })
} else {
parentRef[key] = result
if (result != null && typeof result === "object") {
const unwrapped = unwrapInput({
inputTOUnwrap: result,
parentRef: parentRef[key] || {},
transactionContext,
})
if (unwrapped instanceof Promise) {
promises.push({
promise: unwrapped.then((r) => ({ key, value: r })),
keyIndex: i,
})
} else {
parentRef[key] = unwrapped
}
}
}
}
if (promises.length > 0) {
return promiseAll(promises.map((p) => p.promise)).then(
(resolvedPromises) => {
for (let i = 0; i < promises.length; i++) {
const resolved = resolvedPromises[i]
if (resolved && typeof resolved === "object" && "key" in resolved) {
parentRef[resolved.key] = resolved.value
} else {
const key = keys[promises[i].keyIndex]
parentRef[key] = resolved
if (resolved != null && typeof resolved === "object") {
const unwrapped = unwrapInput({
inputTOUnwrap: resolved,
parentRef: parentRef[key] || {},
transactionContext,
})
if (unwrapped instanceof Promise) {
return unwrapped.then((r) => {
parentRef[key] = r
return parentRef
})
}
parentRef[key] = unwrapped
}
}
}
return parentRef
}
)
}
return parentRef
}
export function resolveValue(
input: InputPrimitive | InputObject | unknown | undefined,
transactionContext
): Promise<any> | any {
if (input == null || typeof input !== "object") {
return input
}
const input_ = deepCopy(
(input as InputObject)?.__type ===
OrchestrationUtils.SymbolWorkflowWorkflowData
? (input as InputObject).output
: input
)
let result: any
if (input_?.__type) {
result = resolveProperty(input_, transactionContext)
if (result instanceof Promise) {
return result.then((r) => parseStringifyIfNecessary(r))
}
return parseStringifyIfNecessary(result)
} else {
result = unwrapInput({
inputTOUnwrap: input_,
parentRef: {},
transactionContext,
})
if (result instanceof Promise) {
return result.then((r) => parseStringifyIfNecessary(r))
}
return parseStringifyIfNecessary(result)
}
}

View File

@@ -1,12 +1,11 @@
import { resolveValue } from "./helpers"
import { StepExecutionContext, WorkflowData } from "./type"
import { proxify } from "./helpers/proxy"
import { OrchestrationUtils } from "@medusajs/utils"
import { ulid } from "ulid"
import {
TransactionContext,
WorkflowStepHandlerArguments,
} from "@medusajs/orchestration"
import { OrchestrationUtils } from "@medusajs/utils"
import { resolveValue } from "./helpers"
import { proxify } from "./helpers/proxy"
import { StepExecutionContext, WorkflowData } from "./type"
type Func1<T extends object | WorkflowData, U> = (
input: T extends WorkflowData<infer U>
@@ -163,7 +162,7 @@ export function transform(
values: any | any[],
...functions: Function[]
): unknown {
const uniqId = ulid()
const uniqId = Math.random().toString(36).substring(2, 20)
const ret = {
__id: uniqId,
@@ -182,7 +181,10 @@ export function transform(
}
}
const stepValue = await resolveValue(values, transactionContext)
let stepValue = resolveValue(values, transactionContext)
if (stepValue instanceof Promise) {
stepValue = await stepValue
}
let finalResult
for (let i = 0; i < functions.length; i++) {