feat(workflows-sdk): conditional step (#7912)

* chore: move ModuleRegistrationName to utils

* chore(workflows-sdk): conditional step

* type

* when condition
This commit is contained in:
Carlos R. L. Rodrigues
2024-07-05 05:54:18 -03:00
committed by GitHub
parent 36c1db7479
commit 9badad24aa
13 changed files with 264 additions and 525 deletions

View File

@@ -1,6 +1,9 @@
export const SymbolMedusaWorkflowComposerContext = Symbol.for(
"MedusaWorkflowComposerContext"
).toString()
export const SymbolMedusaWorkflowComposerCondition = Symbol.for(
"MedusaWorkflowComposerCondition"
).toString()
export const SymbolInputReference = Symbol.for(
"WorkflowInputReference"
).toString()

View File

@@ -1,56 +0,0 @@
import { mergeData } from "../merge-data"
import { WorkflowStepMiddlewareReturn } from "../pipe"
describe("merge", function () {
it("should merge a new object from the source into a specify target", async function () {
const source = {
stringProp: "stringProp",
anArray: ["anArray"],
input: {
test: "test",
},
another: {
anotherTest: "anotherTest",
},
}
const result = (await mergeData(
["input", "another", "stringProp", "anArray"],
"payload"
)({ data: source } as any)) as unknown as WorkflowStepMiddlewareReturn
expect(result).toEqual({
alias: "payload",
value: {
...source.input,
...source.another,
anArray: source.anArray,
stringProp: source.stringProp,
},
})
})
it("should merge a new object from the entire source into the resul object", async function () {
const source = {
stringProp: "stringProp",
anArray: ["anArray"],
input: {
test: "test",
},
another: {
anotherTest: "anotherTest",
},
}
const { value: result } = (await mergeData()({
data: source,
} as any)) as unknown as WorkflowStepMiddlewareReturn
expect(result).toEqual({
...source.input,
...source.another,
anArray: source.anArray,
stringProp: source.stringProp,
})
})
})

View File

@@ -1,234 +0,0 @@
import { pipe } from "../pipe"
describe("Pipe", function () {
it("should evaluate the input object and append the values to the data object and return the result from the handler", async function () {
const payload = { input: "input" }
const output = { test: "test" }
const invoke = {
input: payload,
step1: { ...payload, step1Data: { test: "test" } },
step2: { ...payload, step2Data: { test: "test" } },
}
const handler = jest.fn().mockImplementation(async () => output)
const input = {
inputAlias: "payload",
invoke: [
{
from: "payload",
alias: "input",
},
{
from: "step1",
alias: "previousDataStep1",
},
{
from: "step2",
alias: "previousDataStep2",
},
],
}
const result = await pipe(input, handler)({ invoke, payload } as any)
expect(handler).toHaveBeenCalled()
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
data: {
input: payload,
previousDataStep1: invoke.step1,
previousDataStep2: invoke.step2,
},
})
)
expect(result).toBeDefined()
expect(result).toEqual(output)
})
it("should evaluate the input object and append the values to the data object using the merge and return the result from the handler", async function () {
const payload = { input: "input" }
const output = { test: "test" }
const invoke = {
input: payload,
step1: { step1Data: { test: "test" } },
step2: [{ test: "test" }],
}
const handler = jest.fn().mockImplementation(async () => output)
const input = {
inputAlias: "payload",
merge: true,
invoke: [
{
from: "payload",
},
{
from: "step1",
},
{
from: "step2",
alias: "step2Data",
},
],
}
const result = await pipe(input, handler)({ invoke, payload } as any)
expect(handler).toHaveBeenCalled()
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
data: {
...payload,
...invoke.step1,
step2Data: invoke.step2,
},
})
)
expect(result).toBeDefined()
expect(result).toEqual(output)
})
it("should evaluate the input object and append the values to the data object using the merge to store on the merge alias and return the result from the handler", async function () {
const payload = { input: "input" }
const output = { test: "test" }
const invoke = {
input: payload,
step1: { step1Data: { test: "test" } },
step2: { step2Data: { test: "test" } },
}
const handler = jest.fn().mockImplementation(async () => output)
const input = {
inputAlias: "payload",
mergeAlias: "mergedData",
invoke: [
{
from: "payload",
alias: "input",
},
{
from: "step1",
alias: "step1Data",
},
{
from: "step2",
alias: "step2Data",
},
],
}
const result = await pipe(input, handler)({ invoke, payload } as any)
expect(handler).toHaveBeenCalled()
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
data: {
input: payload,
step1Data: invoke.step1,
step2Data: invoke.step2,
mergedData: {
...payload,
...invoke.step1,
...invoke.step2,
},
},
})
)
expect(result).toBeDefined()
expect(result).toEqual(output)
})
it("should evaluate the input object and append the values to the data object using the merge to store on the merge alias from the merge from values and return the result from the handler", async function () {
const payload = { input: "input" }
const output = { test: "test" }
const invoke = {
input: payload,
step1: { step1Data: { test: "test" } },
step2: { step2Data: { test: "test" } },
}
const handler = jest.fn().mockImplementation(async () => output)
const input = {
inputAlias: "payload",
mergeAlias: "mergedData",
mergeFrom: ["input", "step1Data"],
invoke: [
{
from: "payload",
alias: "input",
},
{
from: "step1",
alias: "step1Data",
},
{
from: "step2",
alias: "step2Data",
},
],
}
const result = await pipe(input, handler)({ invoke, payload } as any)
expect(handler).toHaveBeenCalled()
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
data: {
input: payload,
step1Data: invoke.step1,
step2Data: invoke.step2,
mergedData: {
...payload,
...invoke.step1,
},
},
})
)
expect(result).toBeDefined()
expect(result).toEqual(output)
})
it("should execute onComplete function if available but the output result shouldn't change", async function () {
const payload = { input: "input" }
const output = { test: "test" }
const invoke = {
input: payload,
}
const onComplete = jest.fn(async ({ data }) => {
data.__changed = true
return
})
const handler = jest.fn().mockImplementation(async () => output)
const input = {
inputAlias: "payload",
invoke: [
{
from: "payload",
alias: "input",
},
],
onComplete,
}
const result = await pipe(input, handler)({ invoke, payload } as any)
expect(handler).toHaveBeenCalled()
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
data: {
input: payload,
},
})
)
expect(onComplete).toHaveBeenCalled()
expect(result).toEqual(output)
})
})

View File

@@ -1,4 +1,2 @@
export * from "./merge-data"
export * from "./pipe"
export * from "./workflow-export"
export * from "./type"
export * from "./workflow-export"

View File

@@ -1,50 +0,0 @@
import { PipelineHandler, WorkflowArguments } from "./pipe"
import { isObject } from "@medusajs/utils"
/**
* Pipe utils that merges data from an object into a new object.
* The new object will have a target key with the merged data from the keys if specified.
* @deprecated
* @param keys
* @param target
*/
export function mergeData<
T extends Record<string, unknown> = Record<string, unknown>,
TKeys extends keyof T = keyof T,
Target extends "payload" | string = string
>(keys: TKeys[] = [], target?: Target): PipelineHandler {
return async function ({ data }: WorkflowArguments<T>) {
const workingKeys = (keys.length ? keys : Object.keys(data)) as TKeys[]
const value = workingKeys.reduce((acc, key) => {
let targetAcc = { ...(target ? acc[target as string] : acc) }
targetAcc ??= {}
if (Array.isArray(data[key as string])) {
targetAcc[key as string] = data[key as string]
} else if (isObject(data[key as string])) {
targetAcc = {
...targetAcc,
...(data[key as string] as object),
}
} else {
targetAcc[key as string] = data[key as string]
}
if (target) {
acc[target as string] = {
...acc[target as string],
...targetAcc,
}
} else {
acc = targetAcc
}
return acc
}, {})
return {
alias: target,
value: target ? value[target as string] : value,
}
}
}

View File

@@ -1,172 +0,0 @@
import {
DistributedTransaction,
TransactionMetadata,
WorkflowStepHandler,
} from "@medusajs/orchestration"
import { Context, MedusaContainer, SharedContext } from "@medusajs/types"
import { mergeData } from "./merge-data"
export type WorkflowStepMiddlewareReturn = {
alias?: string
value: any
}
export type WorkflowStepMiddlewareInput = {
from: string
alias?: string
}
interface PipelineInput {
/**
* The alias of the input data to store in
*/
inputAlias?: string
/**
* Descriptors to get the data from
*/
invoke?: WorkflowStepMiddlewareInput | WorkflowStepMiddlewareInput[]
compensate?: WorkflowStepMiddlewareInput | WorkflowStepMiddlewareInput[]
onComplete?: (args: WorkflowOnCompleteArguments) => Promise<void>
/**
* Apply the data merging
*/
merge?: boolean
/**
* Store the merged data in a new key, if this is present no need to set merge: true
*/
mergeAlias?: string
/**
* Store the merged data from the chosen aliases, if this is present no need to set merge: true
*/
mergeFrom?: string[]
}
export type WorkflowArguments<T = any> = {
container: MedusaContainer
payload: unknown
data: T
metadata: TransactionMetadata
context: Context | SharedContext
}
export type WorkflowOnCompleteArguments<T = any> = {
container: MedusaContainer
payload: unknown
data: T
metadata: TransactionMetadata
transaction: DistributedTransaction
context: Context | SharedContext
}
export type PipelineHandler<T extends any = undefined> = (
args: WorkflowArguments
) => Promise<
T extends undefined
? WorkflowStepMiddlewareReturn | WorkflowStepMiddlewareReturn[]
: T
>
/**
* @deprecated
* @param input
* @param functions
*/
export function pipe<T>(
input: PipelineInput,
...functions: [...PipelineHandler[], PipelineHandler<T>]
): WorkflowStepHandler {
// Apply the aggregator just before the last handler
if (
(input.merge || input.mergeAlias || input.mergeFrom) &&
functions.length
) {
const handler = functions.pop()!
functions.push(mergeData(input.mergeFrom, input.mergeAlias), handler)
}
return async ({
container,
payload,
invoke,
compensate,
metadata,
transaction,
context,
}) => {
let data = {}
const original = {
invoke: invoke ?? {},
compensate: compensate ?? {},
}
if (input.inputAlias) {
Object.assign(original.invoke, { [input.inputAlias]: payload })
}
const dataKeys = ["invoke", "compensate"]
for (const key of dataKeys) {
if (!input[key]) {
continue
}
if (!Array.isArray(input[key])) {
input[key] = [input[key]]
}
for (const action of input[key]) {
if (action.alias) {
data[action.alias] = original[key][action.from]
} else {
data[action.from] = original[key][action.from]
}
}
}
let finalResult
for (const fn of functions) {
let result = await fn({
container,
payload,
data,
metadata,
context: context as Context,
})
if (Array.isArray(result)) {
for (const action of result) {
if (action?.alias) {
data[action.alias] = action.value
}
}
} else if (
result &&
"alias" in (result as WorkflowStepMiddlewareReturn)
) {
if ((result as WorkflowStepMiddlewareReturn).alias) {
data[(result as WorkflowStepMiddlewareReturn).alias!] = (
result as WorkflowStepMiddlewareReturn
).value
} else {
data = (result as WorkflowStepMiddlewareReturn).value
}
}
finalResult = result
}
if (typeof input.onComplete === "function") {
const dataCopy = JSON.parse(JSON.stringify(data))
await input.onComplete({
container,
payload,
data: dataCopy,
metadata,
transaction,
context: context as Context,
})
}
return finalResult
}
}

View File

@@ -3,6 +3,7 @@ import { createWorkflow } from "../create-workflow"
import { StepResponse } from "../helpers"
import { transform } from "../transform"
import { WorkflowData } from "../type"
import { when } from "../when"
let count = 1
const getNewWorkflowId = () => `workflow-${count++}`
@@ -40,6 +41,45 @@ describe("Workflow composer", () => {
expect(result).toEqual({ result: "hi from outside" })
})
it("should skip step if condition is true", async function () {
const step1 = createStep("step1", async (_, context) => {
return new StepResponse({ result: "step1" })
})
const step2 = createStep("step2", async (input: string, context) => {
return new StepResponse({ result: input })
})
const step3 = createStep("step3", async (input: string, context) => {
return new StepResponse({ result: input ?? "default response" })
})
const subWorkflow = createWorkflow(
getNewWorkflowId(),
function (input: WorkflowData<string>) {
step1()
return step2(input)
}
)
const workflow = createWorkflow(
getNewWorkflowId(),
function (input: { callSubFlow: boolean }) {
const subWorkflowRes = when({ input }, ({ input }) => {
return input.callSubFlow
}).then(() => {
return subWorkflow.runAsStep({
input: "hi from outside",
})
})
return step3(subWorkflowRes.result)
}
)
const { result } = await workflow.run({ input: { callSubFlow: false } })
expect(result).toEqual({ result: "default response" })
})
it("should revert the workflow and sub workflow on failure", async function () {
const step1Mock = jest.fn()
const step1 = createStep(

View File

@@ -4,9 +4,9 @@ import {
WorkflowStepHandler,
WorkflowStepHandlerArguments,
} from "@medusajs/orchestration"
import { deepCopy, isString, OrchestrationUtils } from "@medusajs/utils"
import { OrchestrationUtils, deepCopy, isString } from "@medusajs/utils"
import { ulid } from "ulid"
import { resolveValue, StepResponse } from "./helpers"
import { StepResponse, resolveValue } from "./helpers"
import { proxify } from "./helpers/proxy"
import {
CreateWorkflowComposerContext,
@@ -214,24 +214,60 @@ function applyStep<
>
) => {
const newStepName = localConfig.name ?? stepName
const newConfig = {
...stepConfig,
...localConfig,
}
delete localConfig.name
this.handlers.set(newStepName, handler)
this.flow.replaceAction(stepConfig.uuid!, newStepName, {
...stepConfig,
...localConfig,
})
this.flow.replaceAction(stepConfig.uuid!, newStepName, newConfig)
ret.__step__ = newStepName
WorkflowManager.update(this.workflowId, this.flow, this.handlers)
const confRef = proxify(ret)
if (global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition]) {
const flagSteps =
global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition]
.steps
const idx = flagSteps.findIndex((a) => a.__step__ === ret.__step__)
if (idx > -1) {
flagSteps.splice(idx, 1)
}
flagSteps.push(confRef)
}
return confRef
},
if: (
input: any,
condition: (...args: any) => boolean | WorkflowData
): WorkflowData<TInvokeResultOutput> => {
if (typeof condition !== "function") {
throw new Error("Condition must be a function")
}
wrapConditionalStep(input, condition, handler)
this.handlers.set(ret.__step__, handler)
return proxify(ret)
},
}
return proxify(ret)
const refRet = proxify(ret) as WorkflowData<TInvokeResultOutput>
if (global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition]) {
global[
OrchestrationUtils.SymbolMedusaWorkflowComposerCondition
].steps.push(refRet)
}
return refRet
}
}
@@ -291,6 +327,39 @@ function wrapAsyncHandler(
}
}
/**
* @internal
*
* Internal function to handle conditional steps.
*
* @param condition
* @param handle
*/
function wrapConditionalStep(
input: any,
condition: (...args: any) => boolean | WorkflowData,
handle: {
invoke: WorkflowStepHandler
compensate?: WorkflowStepHandler
}
) {
const originalInvoke = handle.invoke
handle.invoke = async (stepArguments: WorkflowStepHandlerArguments) => {
const args = await resolveValue(input, stepArguments)
const canContinue = await condition(args)
if (stepArguments.step.definition?.async) {
stepArguments.step.definition.backgroundExecution = true
}
if (!canContinue) {
return
}
return await originalInvoke(stepArguments)
}
}
/**
* This function creates a {@link StepFunction} that can be used as a step in a workflow constructed by the {@link createWorkflow} function.
*

View File

@@ -1,8 +1,9 @@
export * from "./create-step"
export * from "./create-workflow"
export * from "./hook"
export * from "./parallelize"
export * from "./helpers/resolve-value"
export * from "./helpers/step-response"
export * from "./hook"
export * from "./parallelize"
export * from "./transform"
export * from "./type"
export * from "./when"

View File

@@ -0,0 +1,33 @@
import { OrchestrationUtils } from "@medusajs/utils"
export function when(input, condition) {
global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition] = {
input,
condition,
steps: [],
}
let thenCalled = false
process.nextTick(() => {
if (!thenCalled) {
throw new Error(`".then" is missing after "when" condition`)
}
})
return {
then: (fn) => {
thenCalled = true
const ret = fn()
const applyCondition =
global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition].steps
for (const step of applyCondition) {
step.if(input, condition)
}
delete global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition]
return ret
},
}
}

View File

@@ -1,6 +1,7 @@
export * from "./workflow_1"
export * from "./workflow_2"
export * from "./workflow_async"
export * from "./workflow_conditional_step"
export * from "./workflow_idempotent"
export * from "./workflow_step_timeout"
export * from "./workflow_transaction_timeout"

View File

@@ -0,0 +1,57 @@
import {
createStep,
createWorkflow,
StepResponse,
} from "@medusajs/workflows-sdk"
import { when } from "@medusajs/workflows-sdk/src/utils/composer"
const step_1 = createStep(
"step_1",
jest.fn((input) => {
input.test = "test"
return new StepResponse(input, { compensate: 123 })
})
)
export const conditionalStep2Invoke = jest.fn((input, context) => {
if (input) {
return new StepResponse({ notAsyncResponse: input.hey })
}
})
const step_2 = createStep("step_2", conditionalStep2Invoke)
export const conditionalStep3Invoke = jest.fn((res) => {
return new StepResponse({
done: {
inputFromSyncStep: res.notAsyncResponse,
},
})
})
const step_3 = createStep("step_3", conditionalStep3Invoke)
createWorkflow(
{
name: "workflow_conditional_step",
retentionTime: 1000,
},
function (input) {
step_1(input)
const ret = step_2({ hey: "oh" })
const ret2_async = when({ input, ret }, ({ input, ret }) => {
return input.runNewStepName && ret.notAsyncResponse === "oh"
}).then(() => {
return step_2({ hey: "hello" }).config({
name: "new_step_name",
async: true,
})
})
return when({ ret2_async }, ({ ret2_async }) => {
return ret2_async?.notAsyncResponse === "hello"
}).then(() => {
return step_3(ret2_async)
})
}
)

View File

@@ -8,7 +8,12 @@ import { Modules, TransactionHandlerType } from "@medusajs/utils"
import { moduleIntegrationTestRunner } from "medusa-test-utils"
import { setTimeout as setTimeoutPromise } from "timers/promises"
import "../__fixtures__"
import { workflow2Step2Invoke, workflow2Step3Invoke } from "../__fixtures__"
import {
conditionalStep2Invoke,
conditionalStep3Invoke,
workflow2Step2Invoke,
workflow2Step3Invoke,
} from "../__fixtures__"
import {
eventGroupWorkflowId,
workflowEventGroupIdStep1Mock,
@@ -92,6 +97,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
describe("Testing basic workflow", function () {
beforeEach(() => {
jest.clearAllMocks()
})
it("should return a list of workflow executions and remove after completed when there is no retentionTime set", async () => {
await workflowOrcModule.run("workflow_1", {
input: {
@@ -232,6 +241,46 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
expect(onFinish).toHaveBeenCalledTimes(0)
})
it("should run conditional steps if condition is true", (done) => {
void workflowOrcModule.subscribe({
workflowId: "workflow_conditional_step",
subscriber: (event) => {
if (event.eventType === "onFinish") {
done()
expect(conditionalStep2Invoke).toHaveBeenCalledTimes(2)
expect(conditionalStep3Invoke).toHaveBeenCalledTimes(1)
}
},
})
workflowOrcModule.run("workflow_conditional_step", {
input: {
runNewStepName: true,
},
throwOnError: true,
})
})
it("should not run conditional steps if condition is false", (done) => {
void workflowOrcModule.subscribe({
workflowId: "workflow_conditional_step",
subscriber: (event) => {
if (event.eventType === "onFinish") {
done()
expect(conditionalStep2Invoke).toHaveBeenCalledTimes(1)
expect(conditionalStep3Invoke).toHaveBeenCalledTimes(0)
}
},
})
workflowOrcModule.run("workflow_conditional_step", {
input: {
runNewStepName: false,
},
throwOnError: true,
})
})
})
describe("Scheduled workflows", () => {