chore: workflow internals improvementss (#9455)

This commit is contained in:
Adrien de Peretti
2024-10-10 09:11:56 +02:00
committed by GitHub
parent 1b9379be62
commit 34d57870ad
26 changed files with 354 additions and 206 deletions

View File

@@ -110,7 +110,7 @@ function createContextualWorkflowRunner<
events,
flowMetadata,
]
const transaction = await method.apply(method, args)
const transaction = await method.apply(method, args) as DistributedTransactionType
let errors = transaction.getErrors(TransactionHandlerType.INVOKE)

View File

@@ -150,55 +150,67 @@ export function applyStep<
const ret = {
__type: OrchestrationUtils.SymbolWorkflowStep,
__step__: stepName,
config: (localConfig: LocalStepConfig) => {
const newStepName = localConfig.name ?? stepName
const newConfig = {
...stepConfig,
...localConfig,
}
}
delete localConfig.name
this.handlers.set(newStepName, handler)
this.flow.replaceAction(stepConfig.uuid!, newStepName, newConfig)
this.isAsync ||= !!(newConfig.async || newConfig.compensateAsync)
ret.__step__ = newStepName
WorkflowManager.update(this.workflowId, this.flow, this.handlers)
const confRef = proxify(ret)
if (global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition]) {
const flagSteps =
global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition]
.steps
const idx = flagSteps.findIndex((a) => a.__step__ === ret.__step__)
if (idx > -1) {
flagSteps.splice(idx, 1)
}
flagSteps.push(confRef)
}
return confRef as StepFunction<TInvokeInput, TInvokeResultOutput>
},
const refRet = proxify(ret) as WorkflowData<TInvokeResultOutput> & {
if: (
input: any,
condition: (...args: any) => boolean | WorkflowData
): WorkflowData<TInvokeResultOutput> => {
if (typeof condition !== "function") {
throw new Error("Condition must be a function")
}
wrapConditionalStep(input, condition, handler)
this.handlers.set(ret.__step__, handler)
return proxify(ret)
},
) => WorkflowData<TInvokeResultOutput>
}
const refRet = proxify(ret) as WorkflowData<TInvokeResultOutput>
refRet.config = (
localConfig: { name?: string } & Omit<
TransactionStepsDefinition,
"next" | "uuid" | "action"
>
) => {
const newStepName = localConfig.name ?? stepName
const newConfig = {
async: false,
compensateAsync: false,
...stepConfig,
...localConfig,
}
delete localConfig.name
this.handlers.set(newStepName, handler)
this.flow.replaceAction(stepConfig.uuid!, newStepName, newConfig)
this.isAsync ||= !!(newConfig.async || newConfig.compensateAsync)
ret.__step__ = newStepName
WorkflowManager.update(this.workflowId, this.flow, this.handlers)
//const confRef = proxify(ret)
if (global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition]) {
const flagSteps =
global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition].steps
const idx = flagSteps.findIndex((a) => a.__step__ === ret.__step__)
if (idx > -1) {
flagSteps.splice(idx, 1)
}
flagSteps.push(refRet)
}
return refRet
}
refRet.if = (
input: any,
condition: (...args: any) => boolean | WorkflowData
): WorkflowData<TInvokeResultOutput> => {
if (typeof condition !== "function") {
throw new Error("Condition must be a function")
}
wrapConditionalStep(input, condition, handler)
this.handlers.set(ret.__step__, handler)
return refRet
}
if (global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition]) {
global[

View File

@@ -1,5 +1,5 @@
import { WorkflowStepHandlerArguments } from "@medusajs/orchestration"
import { OrchestrationUtils, deepCopy } from "@medusajs/utils"
import { OrchestrationUtils } from "@medusajs/utils"
import { ApplyStepOptions } from "../create-step"
import {
CreateWorkflowComposerContext,
@@ -9,6 +9,37 @@ import {
import { resolveValue } from "./resolve-value"
import { StepResponse } from "./step-response"
function buildStepContext({
action,
stepArguments,
}: {
action: StepExecutionContext["action"]
stepArguments: WorkflowStepHandlerArguments
}) {
const metadata = stepArguments.metadata
const idempotencyKey = metadata.idempotency_key
stepArguments.context!.idempotencyKey = idempotencyKey
const flowMetadata = stepArguments.transaction.getFlow()?.metadata
const executionContext: StepExecutionContext = {
workflowId: metadata.model_id,
stepName: metadata.action,
action,
idempotencyKey,
attempt: metadata.attempt,
container: stepArguments.container,
metadata,
eventGroupId:
flowMetadata?.eventGroupId ?? stepArguments.context!.eventGroupId,
parentStepIdempotencyKey: flowMetadata?.parentStepIdempotencyKey as string,
transactionId: stepArguments.context!.transactionId,
context: stepArguments.context!,
}
return executionContext
}
export function createStepHandler<
TInvokeInput,
TStepInput extends {
@@ -32,27 +63,10 @@ export function createStepHandler<
) {
const handler = {
invoke: async (stepArguments: WorkflowStepHandlerArguments) => {
const metadata = stepArguments.metadata
const idempotencyKey = metadata.idempotency_key
stepArguments.context!.idempotencyKey = idempotencyKey
const flowMetadata = stepArguments.transaction.getFlow()?.metadata
const executionContext: StepExecutionContext = {
workflowId: metadata.model_id,
stepName: metadata.action,
const executionContext = buildStepContext({
action: "invoke",
idempotencyKey,
attempt: metadata.attempt,
container: stepArguments.container,
metadata,
eventGroupId:
flowMetadata?.eventGroupId ?? stepArguments.context!.eventGroupId,
parentStepIdempotencyKey:
flowMetadata?.parentStepIdempotencyKey as string,
transactionId: stepArguments.context!.transactionId,
context: stepArguments.context!,
}
stepArguments,
})
const argInput = input ? await resolveValue(input, stepArguments) : {}
const stepResponse: StepResponse<any, any> = await invokeFn.apply(this, [
@@ -72,31 +86,16 @@ export function createStepHandler<
},
compensate: compensateFn
? async (stepArguments: WorkflowStepHandlerArguments) => {
const metadata = stepArguments.metadata
const idempotencyKey = metadata.idempotency_key
stepArguments.context!.idempotencyKey = idempotencyKey
const flowMetadata = stepArguments.transaction.getFlow()?.metadata
const executionContext: StepExecutionContext = {
workflowId: metadata.model_id,
stepName: metadata.action,
const executionContext = buildStepContext({
action: "compensate",
idempotencyKey,
parentStepIdempotencyKey:
flowMetadata?.parentStepIdempotencyKey as string,
attempt: metadata.attempt,
container: stepArguments.container,
metadata,
context: stepArguments.context!,
}
stepArguments,
})
const stepOutput = (stepArguments.invoke[stepName] as any)?.output
const invokeResult =
stepOutput?.__type === OrchestrationUtils.SymbolWorkflowStepResponse
? stepOutput.compensateInput &&
deepCopy(stepOutput.compensateInput)
: stepOutput && deepCopy(stepOutput)
? stepOutput.compensateInput
: stepOutput
const args = [invokeResult, executionContext]
const output = await compensateFn.apply(this, args)

View File

@@ -10,7 +10,7 @@ export function proxify<T>(obj: WorkflowData<any>): T {
return target[prop]
}
return transform(target[prop], async function (input, context) {
return transform({}, async function (_, context) {
const { invoke } = context as WorkflowTransactionContext
let output =
target.__type === OrchestrationUtils.SymbolInputReference ||
@@ -19,9 +19,8 @@ export function proxify<T>(obj: WorkflowData<any>): T {
: invoke?.[obj.__step__]?.output
output = await resolveValue(output, context)
output = output?.[prop]
return output && JSON.parse(JSON.stringify(output))
return output?.[prop]
})
},
}) as unknown as T

View File

@@ -3,31 +3,35 @@ import { deepCopy, OrchestrationUtils, promiseAll } from "@medusajs/utils"
async function resolveProperty(property, transactionContext) {
const { invoke: invokeRes } = transactionContext
let res
if (property?.__type === OrchestrationUtils.SymbolInputReference) {
return transactionContext.payload
res = transactionContext.payload
} else if (
property?.__type === OrchestrationUtils.SymbolMedusaWorkflowResponse
) {
return resolveValue(property.$result, transactionContext)
res = await resolveValue(property.$result, transactionContext)
} else if (
property?.__type === OrchestrationUtils.SymbolWorkflowStepTransformer
) {
return await property.__resolver(transactionContext)
res = await property.__resolver(transactionContext)
} else if (property?.__type === OrchestrationUtils.SymbolWorkflowStep) {
const output =
invokeRes[property.__step__]?.output ?? invokeRes[property.__step__]
if (output?.__type === OrchestrationUtils.SymbolWorkflowStepResponse) {
return output.output
res = output.output
} else {
res = output
}
return output
} else if (
property?.__type === OrchestrationUtils.SymbolWorkflowStepResponse
) {
return property.output
res = property.output
} else {
return property
res = property
}
return res
}
/**
@@ -53,9 +57,8 @@ export async function resolveValue(input, transactionContext) {
}
for (const key of Object.keys(inputTOUnwrap)) {
parentRef[key] = await resolveProperty(
inputTOUnwrap[key],
transactionContext
parentRef[key] = deepCopy(
await resolveProperty(inputTOUnwrap[key], transactionContext)
)
if (typeof parentRef[key] === "object") {
@@ -68,8 +71,8 @@ export async function resolveValue(input, transactionContext) {
const copiedInput =
input?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData
? deepCopy(input.output)
: deepCopy(input)
? input.output
: input
const result = copiedInput?.__type
? await resolveProperty(copiedInput, transactionContext)

View File

@@ -2,6 +2,11 @@ import { resolveValue } from "./helpers"
import { StepExecutionContext, WorkflowData } from "./type"
import { proxify } from "./helpers/proxy"
import { OrchestrationUtils } from "@medusajs/utils"
import { ulid } from "ulid"
import {
TransactionContext,
WorkflowStepHandlerArguments,
} from "@medusajs/orchestration"
type Func1<T extends object | WorkflowData, U> = (
input: T extends WorkflowData<infer U>
@@ -158,16 +163,26 @@ export function transform(
values: any | any[],
...functions: Function[]
): unknown {
const uniqId = ulid()
const ret = {
__id: uniqId,
__type: OrchestrationUtils.SymbolWorkflowStepTransformer,
__resolver: undefined,
}
const returnFn = async function (transactionContext): Promise<any> {
const allValues = await resolveValue(values, transactionContext)
const stepValue = allValues
? JSON.parse(JSON.stringify(allValues))
: allValues
const returnFn = async function (
// If a transformer is returned as the result of a workflow, then at this point the workflow is entirely done, in that case we have a TransactionContext
transactionContext: WorkflowStepHandlerArguments | TransactionContext
): Promise<any> {
if ("transaction" in transactionContext) {
const temporaryDataKey = `${transactionContext.transaction.modelId}_${transactionContext.transaction.transactionId}_${uniqId}`
if (transactionContext.transaction.hasTemporaryData(temporaryDataKey)) {
return transactionContext.transaction.getTemporaryData(temporaryDataKey)
}
}
const stepValue = await resolveValue(values, transactionContext)
let finalResult
for (let i = 0; i < functions.length; i++) {
@@ -177,6 +192,15 @@ export function transform(
finalResult = await fn.apply(fn, [arg, transactionContext])
}
if ("transaction" in transactionContext) {
const temporaryDataKey = `${transactionContext.transaction.modelId}_${transactionContext.transaction.transactionId}_${uniqId}`
transactionContext.transaction.setTemporaryData(
temporaryDataKey,
finalResult
)
}
return finalResult
}

View File

@@ -52,9 +52,9 @@ export function when(input, condition) {
if (ret?.__type !== OrchestrationUtils.SymbolWorkflowStep) {
const retStep = createStep(
"when-then-" + ulid(),
() => new StepResponse(ret)
({ input }: { input: any }) => new StepResponse(input)
)
returnStep = retStep()
returnStep = retStep({ input: ret })
}
for (const step of applyCondition) {